Re: logical replication of truncate command with trigger causes Assert - Mailing list pgsql-hackers
| From | Tom Lane |
|---|---|
| Subject | Re: logical replication of truncate command with trigger causes Assert |
| Date | |
| Msg-id | 1719083.1623351052@sss.pgh.pa.us Whole thread Raw |
| In response to | Re: logical replication of truncate command with trigger causes Assert (Amit Kapila <amit.kapila16@gmail.com>) |
| Responses |
Re: logical replication of truncate command with trigger causes Assert
|
| List | pgsql-hackers |
Amit Kapila <amit.kapila16@gmail.com> writes:
> On Wed, Jun 9, 2021 at 8:44 PM Mark Dilger <mark.dilger@enterprisedb.com> wrote:
>> On Jun 9, 2021, at 7:52 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
>>> Somewhat unrelated, but ... am I reading the code correctly that
>>> apply_handle_stream_start and related routines are using Asserts
>>> to check that the remote sent stream-control messages in the correct
>>> order?
> This also needs to be changed to test-and-elog.
Here's a proposed patch for this. It looks like pretty much all of the
bogosity is new with the streaming business. You might quibble with
which things I thought deserved elog versus ereport. Another thing
I'm wondering is how many of these messages really need to be
translated. We could use errmsg_internal and avoid burdening the
translators, perhaps.
regards, tom lane
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 98c26002e8..26738d3589 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -177,7 +177,7 @@ bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
/* fields valid only when processing streamed transaction */
-bool in_streamed_transaction = false;
+static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
@@ -345,7 +345,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
*/
xid = pq_getmsgint(s, 4);
- Assert(TransactionIdIsValid(xid));
+ if (!TransactionIdIsValid(xid))
+ elog(ERROR, "invalid transaction ID in streamed replication transaction");
/* Add the new subxact to the array (unless already there). */
subxact_info_add(xid);
@@ -785,7 +786,10 @@ apply_handle_commit(StringInfo s)
logicalrep_read_commit(s, &commit_data);
- Assert(commit_data.commit_lsn == remote_final_lsn);
+ if (commit_data.commit_lsn != remote_final_lsn)
+ elog(ERROR, "incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ LSN_FORMAT_ARGS(commit_data.commit_lsn),
+ LSN_FORMAT_ARGS(remote_final_lsn));
apply_handle_commit_internal(s, &commit_data);
@@ -824,7 +828,10 @@ apply_handle_stream_start(StringInfo s)
bool first_segment;
HASHCTL hash_ctl;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("duplicate STREAM START message")));
/*
* Start a transaction on stream start, this transaction will be committed
@@ -873,7 +880,10 @@ apply_handle_stream_start(StringInfo s)
static void
apply_handle_stream_stop(StringInfo s)
{
- Assert(in_streamed_transaction);
+ if (!in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("STREAM STOP message without STREAM START")));
/*
* Close the file with serialized changes, and serialize information about
@@ -905,7 +915,10 @@ apply_handle_stream_abort(StringInfo s)
TransactionId xid;
TransactionId subxid;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("STREAM ABORT message without STREAM STOP")));
logicalrep_read_stream_abort(s, &xid, &subxid);
@@ -967,13 +980,12 @@ apply_handle_stream_abort(StringInfo s)
return;
}
- Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
-
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
&found);
- Assert(found);
+ if (!found)
+ elog(ERROR, "transaction %u not found in hash table", xid);
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
@@ -1012,7 +1024,10 @@ apply_handle_stream_commit(StringInfo s)
MemoryContext oldcxt;
BufFile *fd;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
@@ -1031,11 +1046,14 @@ apply_handle_stream_commit(StringInfo s)
/* open the spool file for the committed transaction */
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
+
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
&found);
- Assert(found);
+ if (!found)
+ elog(ERROR, "transaction %u not found in hash table", xid);
+
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
buffer = palloc(BLCKSZ);
@@ -1080,7 +1098,9 @@ apply_handle_stream_commit(StringInfo s)
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
path)));
- Assert(len > 0);
+ if (len <= 0)
+ elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
+ len, path);
/* make sure we have sufficiently large buffer */
buffer = repalloc(buffer, len);
@@ -1108,7 +1128,7 @@ apply_handle_stream_commit(StringInfo s)
nchanges++;
if (nchanges % 1000 == 0)
- elog(DEBUG1, "replayed %d changes from file '%s'",
+ elog(DEBUG1, "replayed %d changes from file \"%s\"",
nchanges, path);
}
pgsql-hackers by date: