Re: logical copy_replication_slot issues - Mailing list pgsql-hackers

From Arseny Sher
Subject Re: logical copy_replication_slot issues
Date
Msg-id 87k1406sj7.fsf@ars-thinkpad
Whole thread Raw
In response to Re: logical copy_replication_slot issues  (Masahiko Sawada <masahiko.sawada@2ndquadrant.com>)
Responses Re: logical copy_replication_slot issues  (Arseny Sher <a.sher@postgrespro.ru>)
List pgsql-hackers
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes:

> I've attached the updated version patch that incorporated your
> comments. I believe we're going in the right direction for fixing this
> bug. I'll register this item to the next commit fest so as not to
> forget.

I've moved confirmed_flush check to the second lookup out of paranoic
considerations (e.g. slot could have been recreated and creation hasn't
finished yet) and made some minor stylistic adjustments. It looks good
to me now.

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d9..4a3c7aa0ce 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-                                bool temporary, XLogRecPtr restart_lsn)
+                                bool temporary, XLogRecPtr restart_lsn,
+                                bool find_startpoint)
 {
     LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin,
                           temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
     /*
-     * Create logical decoding context, to build the initial snapshot.
+     * Create logical decoding context to find start point or, if we don't
+     * need it, to 1) bump slot's restart_lsn 2) check plugin sanity.
      */
     ctx = CreateInitDecodingContext(plugin, NIL,
-                                    false,    /* do not build snapshot */
+                                    false,    /* do not build data snapshot */
                                     restart_lsn,
                                     logical_read_local_xlog_page, NULL, NULL,
                                     NULL);
 
     /* build initial snapshot, might take a while */
-    DecodingContextFindStartpoint(ctx);
+    if (find_startpoint)
+        DecodingContextFindStartpoint(ctx);
 
     /* don't need the decoding context anymore */
     FreeDecodingContext(ctx);
@@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     create_logical_replication_slot(NameStr(*name),
                                     NameStr(*plugin),
                                     temporary,
-                                    InvalidXLogRecPtr);
+                                    InvalidXLogRecPtr,
+                                    true);
 
     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
     /* Create new slot and acquire it */
     if (logical_slot)
+    {
+        /*
+         * WAL required for building snapshot could be removed as we haven't
+         * reserved WAL yet. So we create a new logical replication slot
+         * without building an initial snapshot.  A reasonable start point for
+         * decoding will be provided by the source slot.
+         */
         create_logical_replication_slot(NameStr(*dst_name),
                                         plugin,
                                         temporary,
-                                        src_restart_lsn);
+                                        src_restart_lsn,
+                                        false);
+    }
     else
         create_physical_replication_slot(NameStr(*dst_name),
                                          true,
@@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         TransactionId copy_xmin;
         TransactionId copy_catalog_xmin;
         XLogRecPtr    copy_restart_lsn;
+        XLogRecPtr    copy_confirmed_flush;
         bool        copy_islogical;
         char       *copy_name;
 
@@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         copy_xmin = src->data.xmin;
         copy_catalog_xmin = src->data.catalog_xmin;
         copy_restart_lsn = src->data.restart_lsn;
+        copy_confirmed_flush = src->data.confirmed_flush;
 
         /* for existence check */
         copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                             NameStr(*src_name)),
                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+        /* The source slot must have a consistent snapshot */
+        if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"),
+                     errhint("Retry when the source replication slot creation is finished.")));
+
         /* Install copied values again */
         SpinLockAcquire(&MyReplicationSlot->mutex);
         MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         MyReplicationSlot->data.xmin = copy_xmin;
         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+        MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
         SpinLockRelease(&MyReplicationSlot->mutex);
 
         ReplicationSlotMarkDirty();


-- cheers, arseny

pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Symbolic names for the values of typalign and typstorage
Next
From: tushar
Date:
Subject: Re: [Proposal] Global temporary tables