Re: logical copy_replication_slot issues - Mailing list pgsql-hackers

From Arseny Sher
Subject Re: logical copy_replication_slot issues
Date
Msg-id 87imjh7nky.fsf@ars-thinkpad
Whole thread Raw
In response to Re: logical copy_replication_slot issues  (Arseny Sher <a.sher@postgrespro.ru>)
Responses Re: logical copy_replication_slot issues  (Masahiko Sawada <masahiko.sawada@2ndquadrant.com>)
List pgsql-hackers
I wrote:

> It looks good to me now.

After lying for some time in my head it reminded me that
CreateInitDecodingContext not only pegs the LSN, but also xmin, so
attached makes a minor comment correction.

While taking a look at the nearby code it seemed weird to me that
GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't
want to investigate this at the moment though, and not for this thread.

Also not for this thread, but I've noticed
pg_copy_logical_replication_slot doesn't allow to change plugin name
which is an omission in my view. It would be useful and trivial to do.


-- cheers, arseny

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d9..da634bef0e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-                                bool temporary, XLogRecPtr restart_lsn)
+                                bool temporary, XLogRecPtr restart_lsn,
+                                bool find_startpoint)
 {
     LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin,
                           temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
     /*
-     * Create logical decoding context, to build the initial snapshot.
+     * Create logical decoding context to find start point or, if we don't
+     * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
      */
     ctx = CreateInitDecodingContext(plugin, NIL,
-                                    false,    /* do not build snapshot */
+                                    false,    /* do not build data snapshot */
                                     restart_lsn,
                                     logical_read_local_xlog_page, NULL, NULL,
                                     NULL);
 
     /* build initial snapshot, might take a while */
-    DecodingContextFindStartpoint(ctx);
+    if (find_startpoint)
+        DecodingContextFindStartpoint(ctx);
 
     /* don't need the decoding context anymore */
     FreeDecodingContext(ctx);
@@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     create_logical_replication_slot(NameStr(*name),
                                     NameStr(*plugin),
                                     temporary,
-                                    InvalidXLogRecPtr);
+                                    InvalidXLogRecPtr,
+                                    true);
 
     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
     /* Create new slot and acquire it */
     if (logical_slot)
+    {
+        /*
+         * WAL required for building snapshot could be removed as we haven't
+         * reserved WAL yet. So we create a new logical replication slot
+         * without building an initial snapshot.  A reasonable start point for
+         * decoding will be provided by the source slot.
+         */
         create_logical_replication_slot(NameStr(*dst_name),
                                         plugin,
                                         temporary,
-                                        src_restart_lsn);
+                                        src_restart_lsn,
+                                        false);
+    }
     else
         create_physical_replication_slot(NameStr(*dst_name),
                                          true,
@@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         TransactionId copy_xmin;
         TransactionId copy_catalog_xmin;
         XLogRecPtr    copy_restart_lsn;
+        XLogRecPtr    copy_confirmed_flush;
         bool        copy_islogical;
         char       *copy_name;
 
@@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         copy_xmin = src->data.xmin;
         copy_catalog_xmin = src->data.catalog_xmin;
         copy_restart_lsn = src->data.restart_lsn;
+        copy_confirmed_flush = src->data.confirmed_flush;
 
         /* for existence check */
         copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                             NameStr(*src_name)),
                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+        /* The source slot must have a consistent snapshot */
+        if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"),
+                     errhint("Retry when the source replication slot creation is finished.")));
+
         /* Install copied values again */
         SpinLockAcquire(&MyReplicationSlot->mutex);
         MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         MyReplicationSlot->data.xmin = copy_xmin;
         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+        MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
         SpinLockRelease(&MyReplicationSlot->mutex);
 
         ReplicationSlotMarkDirty();

pgsql-hackers by date:

Previous
From: Fujii Masao
Date:
Subject: Re: pg_stat_progress_basebackup - progress reporting forpg_basebackup, in the server side
Next
From: Andy Fan
Date:
Subject: Re: [PATCH] Erase the distinctClause if the result is unique by definition