Re: logical replication of truncate command with trigger causes Assert - Mailing list pgsql-hackers

From Tom Lane
Subject Re: logical replication of truncate command with trigger causes Assert
Date
Msg-id 1719083.1623351052@sss.pgh.pa.us
Whole thread Raw
In response to Re: logical replication of truncate command with trigger causes Assert  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: logical replication of truncate command with trigger causes Assert
List pgsql-hackers
Amit Kapila <amit.kapila16@gmail.com> writes:
> On Wed, Jun 9, 2021 at 8:44 PM Mark Dilger <mark.dilger@enterprisedb.com> wrote:
>> On Jun 9, 2021, at 7:52 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
>>> Somewhat unrelated, but ... am I reading the code correctly that
>>> apply_handle_stream_start and related routines are using Asserts
>>> to check that the remote sent stream-control messages in the correct
>>> order?

> This also needs to be changed to test-and-elog.

Here's a proposed patch for this.  It looks like pretty much all of the
bogosity is new with the streaming business.  You might quibble with
which things I thought deserved elog versus ereport.  Another thing
I'm wondering is how many of these messages really need to be
translated.  We could use errmsg_internal and avoid burdening the
translators, perhaps.

            regards, tom lane

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 98c26002e8..26738d3589 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -177,7 +177,7 @@ bool        in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;

 /* fields valid only when processing streamed transaction */
-bool        in_streamed_transaction = false;
+static bool in_streamed_transaction = false;

 static TransactionId stream_xid = InvalidTransactionId;

@@ -345,7 +345,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
      */
     xid = pq_getmsgint(s, 4);

-    Assert(TransactionIdIsValid(xid));
+    if (!TransactionIdIsValid(xid))
+        elog(ERROR, "invalid transaction ID in streamed replication transaction");

     /* Add the new subxact to the array (unless already there). */
     subxact_info_add(xid);
@@ -785,7 +786,10 @@ apply_handle_commit(StringInfo s)

     logicalrep_read_commit(s, &commit_data);

-    Assert(commit_data.commit_lsn == remote_final_lsn);
+    if (commit_data.commit_lsn != remote_final_lsn)
+        elog(ERROR, "incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+             LSN_FORMAT_ARGS(commit_data.commit_lsn),
+             LSN_FORMAT_ARGS(remote_final_lsn));

     apply_handle_commit_internal(s, &commit_data);

@@ -824,7 +828,10 @@ apply_handle_stream_start(StringInfo s)
     bool        first_segment;
     HASHCTL        hash_ctl;

-    Assert(!in_streamed_transaction);
+    if (in_streamed_transaction)
+        ereport(ERROR,
+                (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                 errmsg("duplicate STREAM START message")));

     /*
      * Start a transaction on stream start, this transaction will be committed
@@ -873,7 +880,10 @@ apply_handle_stream_start(StringInfo s)
 static void
 apply_handle_stream_stop(StringInfo s)
 {
-    Assert(in_streamed_transaction);
+    if (!in_streamed_transaction)
+        ereport(ERROR,
+                (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                 errmsg("STREAM STOP message without STREAM START")));

     /*
      * Close the file with serialized changes, and serialize information about
@@ -905,7 +915,10 @@ apply_handle_stream_abort(StringInfo s)
     TransactionId xid;
     TransactionId subxid;

-    Assert(!in_streamed_transaction);
+    if (in_streamed_transaction)
+        ereport(ERROR,
+                (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                 errmsg("STREAM ABORT message without STREAM STOP")));

     logicalrep_read_stream_abort(s, &xid, &subxid);

@@ -967,13 +980,12 @@ apply_handle_stream_abort(StringInfo s)
             return;
         }

-        Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
-
         ent = (StreamXidHash *) hash_search(xidhash,
                                             (void *) &xid,
                                             HASH_FIND,
                                             &found);
-        Assert(found);
+        if (!found)
+            elog(ERROR, "transaction %u not found in hash table", xid);

         /* open the changes file */
         changes_filename(path, MyLogicalRepWorker->subid, xid);
@@ -1012,7 +1024,10 @@ apply_handle_stream_commit(StringInfo s)
     MemoryContext oldcxt;
     BufFile    *fd;

-    Assert(!in_streamed_transaction);
+    if (in_streamed_transaction)
+        ereport(ERROR,
+                (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                 errmsg("STREAM COMMIT message without STREAM STOP")));

     xid = logicalrep_read_stream_commit(s, &commit_data);

@@ -1031,11 +1046,14 @@ apply_handle_stream_commit(StringInfo s)
     /* open the spool file for the committed transaction */
     changes_filename(path, MyLogicalRepWorker->subid, xid);
     elog(DEBUG1, "replaying changes from file \"%s\"", path);
+
     ent = (StreamXidHash *) hash_search(xidhash,
                                         (void *) &xid,
                                         HASH_FIND,
                                         &found);
-    Assert(found);
+    if (!found)
+        elog(ERROR, "transaction %u not found in hash table", xid);
+
     fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);

     buffer = palloc(BLCKSZ);
@@ -1080,7 +1098,9 @@ apply_handle_stream_commit(StringInfo s)
                      errmsg("could not read from streaming transaction's changes file \"%s\": %m",
                             path)));

-        Assert(len > 0);
+        if (len <= 0)
+            elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
+                 len, path);

         /* make sure we have sufficiently large buffer */
         buffer = repalloc(buffer, len);
@@ -1108,7 +1128,7 @@ apply_handle_stream_commit(StringInfo s)
         nchanges++;

         if (nchanges % 1000 == 0)
-            elog(DEBUG1, "replayed %d changes from file '%s'",
+            elog(DEBUG1, "replayed %d changes from file \"%s\"",
                  nchanges, path);
     }


pgsql-hackers by date:

Previous
From: Alvaro Herrera
Date:
Subject: Re: pg14b1 stuck in lazy_scan_prune/heap_page_prune of pg_statistic
Next
From: Alexander Korotkov
Date:
Subject: Re: unnesting multirange data types