Thread: logical copy_replication_slot issues

logical copy_replication_slot issues

From
Arseny Sher
Date:
Hi,

While jumping around partically decoded xacts questions [1], I've read
through the copy replication slots code (9f06d79ef) and found a couple
of issues.

1) It seems quite reckless to me to dive into
DecodingContextFindStartpoint without actual WAL reservation (donors
slot restart_lsn is used, but it is not acquired). Why risking erroring
out with WAL removal error if the function advances new slot position to
updated donors one in the end anyway?

2) In the end, restart_lsn of new slot is set to updated donors
one. However, confirmed_flush field is not updated. This is just wrong
-- we could start decoding too early and stream partially decoded
transaction.

I'd probably avoid doing DecodingContextFindStartpoint at all. Its only
purpose is to assemble consistent snapshot (and establish corresponding
<restart_lsn, confirmed_flush_lsn> pair), but donor slot must have
already done that and we could use it as well. Was this considered?


[1] https://www.postgresql.org/message-id/flat/AB5978B2-1772-4FEE-A245-74C91704ECB0%40amazon.com

--
Arseny Sher
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company



Re: logical copy_replication_slot issues

From
Masahiko Sawada
Date:
On Mon, 10 Feb 2020 at 01:29, Arseny Sher <a.sher@postgrespro.ru> wrote:
>
> Hi,
>
> While jumping around partically decoded xacts questions [1], I've read
> through the copy replication slots code (9f06d79ef) and found a couple
> of issues.
>
> 1) It seems quite reckless to me to dive into
> DecodingContextFindStartpoint without actual WAL reservation (donors
> slot restart_lsn is used, but it is not acquired). Why risking erroring
> out with WAL removal error if the function advances new slot position to
> updated donors one in the end anyway?

Good catch. It's possible that DecodingContextFindStartpoint could
fail when the restart_lsn of the source slot is advanced and removed
required WAL.

>
> 2) In the end, restart_lsn of new slot is set to updated donors
> one. However, confirmed_flush field is not updated. This is just wrong
> -- we could start decoding too early and stream partially decoded
> transaction.

I think you are right.

>
> I'd probably avoid doing DecodingContextFindStartpoint at all. Its only
> purpose is to assemble consistent snapshot (and establish corresponding
> <restart_lsn, confirmed_flush_lsn> pair), but donor slot must have
> already done that and we could use it as well. Was this considered?

Skipping doing DecodingContextFindStartpoint while creating a new
destination logical slot seems sensible to me.

I've attached the draft patch fixing this issue but I'll continue
investigating it more deeply.

Regards,

-- 
Masahiko Sawada            http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachment

Re: logical copy_replication_slot issues

From
Arseny Sher
Date:
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes:

> I've attached the draft patch fixing this issue but I'll continue
> investigating it more deeply.

There also should be a check that source slot itself has consistent
snapshot (valid confirmed_flush) -- otherwise it might be possible to
create not initialized slot which is probably not an error, but weird
and somewhat meaningless. Paranoically, this ought to be checked in both
src slot lookups.

With this patch it seems like the only thing
create_logical_replication_slot does is ReplicationSlotCreate, which
questions the usefulness of this function. On the second look,
CreateInitDecodingContext checks plugin sanity (ensures it exists), so
probably it's fine.


-- cheers, arseny



Re: logical copy_replication_slot issues

From
Masahiko Sawada
Date:
On Mon, 10 Feb 2020 at 23:01, Arseny Sher <a.sher@postgrespro.ru> wrote:
>
>
> Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes:
>
> > I've attached the draft patch fixing this issue but I'll continue
> > investigating it more deeply.
>
> There also should be a check that source slot itself has consistent
> snapshot (valid confirmed_flush) -- otherwise it might be possible to
> create not initialized slot which is probably not an error, but weird
> and somewhat meaningless. Paranoically, this ought to be checked in both
> src slot lookups.
>
> With this patch it seems like the only thing
> create_logical_replication_slot does is ReplicationSlotCreate, which
> questions the usefulness of this function. On the second look,
> CreateInitDecodingContext checks plugin sanity (ensures it exists), so
> probably it's fine.
>

Thank you for reviewing this patch.

I've attached the updated version patch that incorporated your
comments. I believe we're going in the right direction for fixing this
bug. I'll register this item to the next commit fest so as not to
forget.

Regards,

-- 
Masahiko Sawada            http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachment

Re: logical copy_replication_slot issues

From
Arseny Sher
Date:
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes:

> I've attached the updated version patch that incorporated your
> comments. I believe we're going in the right direction for fixing this
> bug. I'll register this item to the next commit fest so as not to
> forget.

I've moved confirmed_flush check to the second lookup out of paranoic
considerations (e.g. slot could have been recreated and creation hasn't
finished yet) and made some minor stylistic adjustments. It looks good
to me now.

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d9..4a3c7aa0ce 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-                                bool temporary, XLogRecPtr restart_lsn)
+                                bool temporary, XLogRecPtr restart_lsn,
+                                bool find_startpoint)
 {
     LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin,
                           temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
     /*
-     * Create logical decoding context, to build the initial snapshot.
+     * Create logical decoding context to find start point or, if we don't
+     * need it, to 1) bump slot's restart_lsn 2) check plugin sanity.
      */
     ctx = CreateInitDecodingContext(plugin, NIL,
-                                    false,    /* do not build snapshot */
+                                    false,    /* do not build data snapshot */
                                     restart_lsn,
                                     logical_read_local_xlog_page, NULL, NULL,
                                     NULL);
 
     /* build initial snapshot, might take a while */
-    DecodingContextFindStartpoint(ctx);
+    if (find_startpoint)
+        DecodingContextFindStartpoint(ctx);
 
     /* don't need the decoding context anymore */
     FreeDecodingContext(ctx);
@@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     create_logical_replication_slot(NameStr(*name),
                                     NameStr(*plugin),
                                     temporary,
-                                    InvalidXLogRecPtr);
+                                    InvalidXLogRecPtr,
+                                    true);
 
     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
     /* Create new slot and acquire it */
     if (logical_slot)
+    {
+        /*
+         * WAL required for building snapshot could be removed as we haven't
+         * reserved WAL yet. So we create a new logical replication slot
+         * without building an initial snapshot.  A reasonable start point for
+         * decoding will be provided by the source slot.
+         */
         create_logical_replication_slot(NameStr(*dst_name),
                                         plugin,
                                         temporary,
-                                        src_restart_lsn);
+                                        src_restart_lsn,
+                                        false);
+    }
     else
         create_physical_replication_slot(NameStr(*dst_name),
                                          true,
@@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         TransactionId copy_xmin;
         TransactionId copy_catalog_xmin;
         XLogRecPtr    copy_restart_lsn;
+        XLogRecPtr    copy_confirmed_flush;
         bool        copy_islogical;
         char       *copy_name;
 
@@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         copy_xmin = src->data.xmin;
         copy_catalog_xmin = src->data.catalog_xmin;
         copy_restart_lsn = src->data.restart_lsn;
+        copy_confirmed_flush = src->data.confirmed_flush;
 
         /* for existence check */
         copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                             NameStr(*src_name)),
                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+        /* The source slot must have a consistent snapshot */
+        if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"),
+                     errhint("Retry when the source replication slot creation is finished.")));
+
         /* Install copied values again */
         SpinLockAcquire(&MyReplicationSlot->mutex);
         MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         MyReplicationSlot->data.xmin = copy_xmin;
         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+        MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
         SpinLockRelease(&MyReplicationSlot->mutex);
 
         ReplicationSlotMarkDirty();


-- cheers, arseny

Re: logical copy_replication_slot issues

From
Arseny Sher
Date:
I wrote:

> It looks good to me now.

After lying for some time in my head it reminded me that
CreateInitDecodingContext not only pegs the LSN, but also xmin, so
attached makes a minor comment correction.

While taking a look at the nearby code it seemed weird to me that
GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't
want to investigate this at the moment though, and not for this thread.

Also not for this thread, but I've noticed
pg_copy_logical_replication_slot doesn't allow to change plugin name
which is an omission in my view. It would be useful and trivial to do.


-- cheers, arseny

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d9..da634bef0e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-                                bool temporary, XLogRecPtr restart_lsn)
+                                bool temporary, XLogRecPtr restart_lsn,
+                                bool find_startpoint)
 {
     LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin,
                           temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
     /*
-     * Create logical decoding context, to build the initial snapshot.
+     * Create logical decoding context to find start point or, if we don't
+     * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
      */
     ctx = CreateInitDecodingContext(plugin, NIL,
-                                    false,    /* do not build snapshot */
+                                    false,    /* do not build data snapshot */
                                     restart_lsn,
                                     logical_read_local_xlog_page, NULL, NULL,
                                     NULL);
 
     /* build initial snapshot, might take a while */
-    DecodingContextFindStartpoint(ctx);
+    if (find_startpoint)
+        DecodingContextFindStartpoint(ctx);
 
     /* don't need the decoding context anymore */
     FreeDecodingContext(ctx);
@@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     create_logical_replication_slot(NameStr(*name),
                                     NameStr(*plugin),
                                     temporary,
-                                    InvalidXLogRecPtr);
+                                    InvalidXLogRecPtr,
+                                    true);
 
     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
     /* Create new slot and acquire it */
     if (logical_slot)
+    {
+        /*
+         * WAL required for building snapshot could be removed as we haven't
+         * reserved WAL yet. So we create a new logical replication slot
+         * without building an initial snapshot.  A reasonable start point for
+         * decoding will be provided by the source slot.
+         */
         create_logical_replication_slot(NameStr(*dst_name),
                                         plugin,
                                         temporary,
-                                        src_restart_lsn);
+                                        src_restart_lsn,
+                                        false);
+    }
     else
         create_physical_replication_slot(NameStr(*dst_name),
                                          true,
@@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         TransactionId copy_xmin;
         TransactionId copy_catalog_xmin;
         XLogRecPtr    copy_restart_lsn;
+        XLogRecPtr    copy_confirmed_flush;
         bool        copy_islogical;
         char       *copy_name;
 
@@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         copy_xmin = src->data.xmin;
         copy_catalog_xmin = src->data.catalog_xmin;
         copy_restart_lsn = src->data.restart_lsn;
+        copy_confirmed_flush = src->data.confirmed_flush;
 
         /* for existence check */
         copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                             NameStr(*src_name)),
                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+        /* The source slot must have a consistent snapshot */
+        if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"),
+                     errhint("Retry when the source replication slot creation is finished.")));
+
         /* Install copied values again */
         SpinLockAcquire(&MyReplicationSlot->mutex);
         MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         MyReplicationSlot->data.xmin = copy_xmin;
         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+        MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
         SpinLockRelease(&MyReplicationSlot->mutex);
 
         ReplicationSlotMarkDirty();

Re: logical copy_replication_slot issues

From
Masahiko Sawada
Date:
On Fri, 6 Mar 2020 at 20:02, Arseny Sher <a.sher@postgrespro.ru> wrote:
>
> I wrote:
>
> > It looks good to me now.
>
> After lying for some time in my head it reminded me that
> CreateInitDecodingContext not only pegs the LSN, but also xmin, so
> attached makes a minor comment correction.
>
> While taking a look at the nearby code it seemed weird to me that
> GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't
> want to investigate this at the moment though, and not for this thread.
>
> Also not for this thread, but I've noticed
> pg_copy_logical_replication_slot doesn't allow to change plugin name
> which is an omission in my view. It would be useful and trivial to do.
>

Thank you for updating the patch. The patch looks basically good to me
but I have a few questions:

    /*
-    * Create logical decoding context, to build the initial snapshot.
+    * Create logical decoding context to find start point or, if we don't
+    * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
     */

Do we need to numbering that despite not referring them?

    ctx = CreateInitDecodingContext(plugin, NIL,
-                                   false,  /* do not build snapshot */
+                                   false,  /* do not build data snapshot */
                                    restart_lsn,
                                    logical_read_local_xlog_page, NULL, NULL,
                                    NULL);

I'm not sure this change makes the comment better. Could you elaborate
on the motivation of this change?

Regards,

--
Masahiko Sawada            http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: logical copy_replication_slot issues

From
Arseny Sher
Date:
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes:

>     /*
> -    * Create logical decoding context, to build the initial snapshot.
> +    * Create logical decoding context to find start point or, if we don't
> +    * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
>      */
>
> Do we need to numbering that despite not referring them?

No, it just seemed clearer to me this way. I don't mind removing the
numbers if you feel this is better.

>     ctx = CreateInitDecodingContext(plugin, NIL,
> -                                   false,  /* do not build snapshot */
> +                                   false,  /* do not build data snapshot */
>                                     restart_lsn,
>                                     logical_read_local_xlog_page, NULL, NULL,
>                                     NULL);
> I'm not sure this change makes the comment better. Could you elaborate
> on the motivation of this change?

Well, DecodingContextFindStartpoint always builds a snapshot allowing
historical *catalog* lookups. This bool controls whether the snapshot
should additionally be suitable for looking at the actual data, this is
e.g. used by initial data sync in the native logical replication.


-- cheers, arseny



Re: logical copy_replication_slot issues

From
Masahiko Sawada
Date:
On Mon, 9 Mar 2020 at 21:46, Arseny Sher <a.sher@postgrespro.ru> wrote:
>
>
> Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes:
>
> >     /*
> > -    * Create logical decoding context, to build the initial snapshot.
> > +    * Create logical decoding context to find start point or, if we don't
> > +    * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
> >      */
> >
> > Do we need to numbering that despite not referring them?
>
> No, it just seemed clearer to me this way. I don't mind removing the
> numbers if you feel this is better.
>

Okay.

> >     ctx = CreateInitDecodingContext(plugin, NIL,
> > -                                   false,  /* do not build snapshot */
> > +                                   false,  /* do not build data snapshot */
> >                                     restart_lsn,
> >                                     logical_read_local_xlog_page, NULL, NULL,
> >                                     NULL);
> > I'm not sure this change makes the comment better. Could you elaborate
> > on the motivation of this change?
>
> Well, DecodingContextFindStartpoint always builds a snapshot allowing
> historical *catalog* lookups. This bool controls whether the snapshot
> should additionally be suitable for looking at the actual data, this is
> e.g. used by initial data sync in the native logical replication.

Okay.

Anyway, since the patch looks good to me I've marked this patch as
"Ready for Committer". I think we can defer these things to
committers.

Regards,

-- 
Masahiko Sawada            http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: logical copy_replication_slot issues

From
Alvaro Herrera
Date:
Thanks Arseny and Masahiko, I pushed this patch just now.  I changed
some comments while at it, hopefully they are improvements.

On 2020-Mar-09, Masahiko Sawada wrote:

>     ctx = CreateInitDecodingContext(plugin, NIL,
> -                                   false,  /* do not build snapshot */
> +                                   false,  /* do not build data snapshot */
>                                     restart_lsn,
>                                     logical_read_local_xlog_page, NULL, NULL,
>                                     NULL);
> 
> I'm not sure this change makes the comment better. Could you elaborate
> on the motivation of this change?

I addressed this issue by adding a comment in CreateInitDecodingContext
to explain the parameter, and then reference that comment's terminology
in this call.  I think it ends up clearer overall -- not that this whole
area is at all particularly clear.

Thanks again.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: logical copy_replication_slot issues

From
Masahiko Sawada
Date:


On Wed, 18 Mar 2020 at 04:24, Alvaro Herrera <alvherre@2ndquadrant.com> wrote:
>
> Thanks Arseny and Masahiko, I pushed this patch just now.  I changed
> some comments while at it, hopefully they are improvements.
>
> On 2020-Mar-09, Masahiko Sawada wrote:
>
> >     ctx = CreateInitDecodingContext(plugin, NIL,
> > -                                   false,  /* do not build snapshot */
> > +                                   false,  /* do not build data snapshot */
> >                                     restart_lsn,
> >                                     logical_read_local_xlog_page, NULL, NULL,
> >                                     NULL);
> >
> > I'm not sure this change makes the comment better. Could you elaborate
> > on the motivation of this change?
>
> I addressed this issue by adding a comment in CreateInitDecodingContext
> to explain the parameter, and then reference that comment's terminology
> in this call.  I think it ends up clearer overall -- not that this whole
> area is at all particularly clear.
>
> Thanks again.
>

Thank you for committing the patch! That changes look good to me.

Regards,

--
Masahiko Sawada            http://www.2ndQuadrant.com/

PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Masahiko Sawada            http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services