Re: logical replication of truncate command with trigger causes Assert - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: logical replication of truncate command with trigger causes Assert |
Date | |
Msg-id | 1719083.1623351052@sss.pgh.pa.us Whole thread Raw |
In response to | Re: logical replication of truncate command with trigger causes Assert (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: logical replication of truncate command with trigger causes Assert
|
List | pgsql-hackers |
Amit Kapila <amit.kapila16@gmail.com> writes: > On Wed, Jun 9, 2021 at 8:44 PM Mark Dilger <mark.dilger@enterprisedb.com> wrote: >> On Jun 9, 2021, at 7:52 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >>> Somewhat unrelated, but ... am I reading the code correctly that >>> apply_handle_stream_start and related routines are using Asserts >>> to check that the remote sent stream-control messages in the correct >>> order? > This also needs to be changed to test-and-elog. Here's a proposed patch for this. It looks like pretty much all of the bogosity is new with the streaming business. You might quibble with which things I thought deserved elog versus ereport. Another thing I'm wondering is how many of these messages really need to be translated. We could use errmsg_internal and avoid burdening the translators, perhaps. regards, tom lane diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 98c26002e8..26738d3589 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -177,7 +177,7 @@ bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; /* fields valid only when processing streamed transaction */ -bool in_streamed_transaction = false; +static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; @@ -345,7 +345,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) */ xid = pq_getmsgint(s, 4); - Assert(TransactionIdIsValid(xid)); + if (!TransactionIdIsValid(xid)) + elog(ERROR, "invalid transaction ID in streamed replication transaction"); /* Add the new subxact to the array (unless already there). */ subxact_info_add(xid); @@ -785,7 +786,10 @@ apply_handle_commit(StringInfo s) logicalrep_read_commit(s, &commit_data); - Assert(commit_data.commit_lsn == remote_final_lsn); + if (commit_data.commit_lsn != remote_final_lsn) + elog(ERROR, "incorrect commit LSN %X/%X in commit message (expected %X/%X)", + LSN_FORMAT_ARGS(commit_data.commit_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)); apply_handle_commit_internal(s, &commit_data); @@ -824,7 +828,10 @@ apply_handle_stream_start(StringInfo s) bool first_segment; HASHCTL hash_ctl; - Assert(!in_streamed_transaction); + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("duplicate STREAM START message"))); /* * Start a transaction on stream start, this transaction will be committed @@ -873,7 +880,10 @@ apply_handle_stream_start(StringInfo s) static void apply_handle_stream_stop(StringInfo s) { - Assert(in_streamed_transaction); + if (!in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("STREAM STOP message without STREAM START"))); /* * Close the file with serialized changes, and serialize information about @@ -905,7 +915,10 @@ apply_handle_stream_abort(StringInfo s) TransactionId xid; TransactionId subxid; - Assert(!in_streamed_transaction); + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("STREAM ABORT message without STREAM STOP"))); logicalrep_read_stream_abort(s, &xid, &subxid); @@ -967,13 +980,12 @@ apply_handle_stream_abort(StringInfo s) return; } - Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts)); - ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, HASH_FIND, &found); - Assert(found); + if (!found) + elog(ERROR, "transaction %u not found in hash table", xid); /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); @@ -1012,7 +1024,10 @@ apply_handle_stream_commit(StringInfo s) MemoryContext oldcxt; BufFile *fd; - Assert(!in_streamed_transaction); + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); @@ -1031,11 +1046,14 @@ apply_handle_stream_commit(StringInfo s) /* open the spool file for the committed transaction */ changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); + ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, HASH_FIND, &found); - Assert(found); + if (!found) + elog(ERROR, "transaction %u not found in hash table", xid); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); buffer = palloc(BLCKSZ); @@ -1080,7 +1098,9 @@ apply_handle_stream_commit(StringInfo s) errmsg("could not read from streaming transaction's changes file \"%s\": %m", path))); - Assert(len > 0); + if (len <= 0) + elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"", + len, path); /* make sure we have sufficiently large buffer */ buffer = repalloc(buffer, len); @@ -1108,7 +1128,7 @@ apply_handle_stream_commit(StringInfo s) nchanges++; if (nchanges % 1000 == 0) - elog(DEBUG1, "replayed %d changes from file '%s'", + elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); }
pgsql-hackers by date: