Re: Possible bug in logical replication. - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Possible bug in logical replication.
Date
Msg-id 20180518.143714.31685500.horiguchi.kyotaro@lab.ntt.co.jp
Whole thread Raw
In response to Re: Possible bug in logical replication.  (Arseny Sher <a.sher@postgrespro.ru>)
Responses Re: Possible bug in logical replication.  (Masahiko Sawada <sawada.mshk@gmail.com>)
Re: Possible bug in logical replication.  (Arseny Sher <a.sher@postgrespro.ru>)
List pgsql-hackers
At Thu, 17 May 2018 13:54:07 +0300, Arseny Sher <a.sher@postgrespro.ru> wrote in <87in7md034.fsf@ars-thinkpad>
> 
> Konstantin Knizhnik <k.knizhnik@postgrespro.ru> writes:
> 
> > I think that using restart_lsn instead of confirmed_flush is not right approach.
> > If restart_lsn is not available and confirmed_flush is pointing to page
> > boundary, then in any case we should somehow handle this case and adjust
> > startlsn to point on the valid record position (by jjust adding page header
> > size?).
> 
> Well, restart_lsn is always available on live slot: it is initially set
> in ReplicationSlotReserveWal during slot creation.

restart_lsn stays at the beginning of a transaction until the
transaction ends so just using restart_lsn allows repeated
decoding of a transaction, in short, rewinding occurs. The
function works only for inactive slot so the current code works
fine on this point. Addition to that restart_lsn also can be on a
page bounary.


We can see the problem easily.

1. Just create a logical replication slot with setting current LSN.

  select pg_create_logical_replication_slot('s1', 'pgoutput');

2. Advance LSN by two or three pages by doing anyting.

3. Advance the slot to a page bounadry.

  e.g. select pg_replication_slot_advance('s1', '0/9624000');

4. advance the slot further, then crash.

So directly set ctx->reader->EndRecPtr by startlsn fixes the
problem, but I found another problem here.

The function accepts any LSN even if it is not at the begiining
of a record. We will see errors or crashs or infinite waiting or
maybe any kind of trouble by such values. The moved LSN must
always be at the "end of a record" (that is, at the start of the
next recored). The attached patch also fixes this.

The documentation doesn't look requiring a fix.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..d3cb777f9f 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,6 +318,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 /*
  * Helper function for advancing physical replication slot forward.
+ *
+ * This function accepts arbitrary LSN even if the LSN is not at the beginning
+ * of a record. This can lead to any kind of misbehavior but currently the
+ * value is used only to determine up to what wal segment to keep and
+ * successive implicit advancing fixes the state.
  */
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
@@ -344,6 +349,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
     LogicalDecodingContext *ctx;
     ResourceOwner old_resowner = CurrentResourceOwner;
     XLogRecPtr    retlsn = InvalidXLogRecPtr;
+    XLogRecPtr    upto;
 
     PG_TRY();
     {
@@ -354,6 +360,13 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
                                     logical_read_local_xlog_page,
                                     NULL, NULL, NULL);
 
+        /*
+         * startlsn can be on page boundary but it is not accepted as explicit
+         * parameter to XLogReadRecord. Set it in reader context.
+         */
+        Assert(startlsn != InvalidXLogRecPtr);
+        upto = ctx->reader->EndRecPtr = startlsn;
+    
         CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
                                                    "logical decoding");
 
@@ -361,22 +374,18 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
         InvalidateSystemCaches();
 
         /* Decode until we run out of records */
-        while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
-               (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+        while (ctx->reader->EndRecPtr <= moveto)
         {
             XLogRecord *record;
             char       *errm = NULL;
+ 
+            /* ctx->reader->EndRecPtr cannot be go backward here */
+            upto = ctx->reader->EndRecPtr;
 
-            record = XLogReadRecord(ctx->reader, startlsn, &errm);
+            record = XLogReadRecord(ctx->reader, InvalidXLogRecPtr, &errm);
             if (errm)
                 elog(ERROR, "%s", errm);
 
-            /*
-             * Now that we've set up the xlog reader state, subsequent calls
-             * pass InvalidXLogRecPtr to say "continue from last record"
-             */
-            startlsn = InvalidXLogRecPtr;
-
             /*
              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
              * store the description into our tuplestore.
@@ -384,18 +393,14 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
             if (record != NULL)
                 LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-            /* check limits */
-            if (moveto <= ctx->reader->EndRecPtr)
-                break;
-
             CHECK_FOR_INTERRUPTS();
         }
 
         CurrentResourceOwner = old_resowner;
 
-        if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+        if (startlsn != upto)
         {
-            LogicalConfirmReceivedLocation(moveto);
+            LogicalConfirmReceivedLocation(upto);
 
             /*
              * If only the confirmed_flush_lsn has changed the slot won't get

pgsql-hackers by date:

Previous
From: Pavel Stehule
Date:
Subject: Re: Is a modern build system acceptable for older platforms
Next
From: Andres Freund
Date:
Subject: Re: Is a modern build system acceptable for older platforms