Thread: logical copy_replication_slot issues
Hi, While jumping around partically decoded xacts questions [1], I've read through the copy replication slots code (9f06d79ef) and found a couple of issues. 1) It seems quite reckless to me to dive into DecodingContextFindStartpoint without actual WAL reservation (donors slot restart_lsn is used, but it is not acquired). Why risking erroring out with WAL removal error if the function advances new slot position to updated donors one in the end anyway? 2) In the end, restart_lsn of new slot is set to updated donors one. However, confirmed_flush field is not updated. This is just wrong -- we could start decoding too early and stream partially decoded transaction. I'd probably avoid doing DecodingContextFindStartpoint at all. Its only purpose is to assemble consistent snapshot (and establish corresponding <restart_lsn, confirmed_flush_lsn> pair), but donor slot must have already done that and we could use it as well. Was this considered? [1] https://www.postgresql.org/message-id/flat/AB5978B2-1772-4FEE-A245-74C91704ECB0%40amazon.com -- Arseny Sher Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
On Mon, 10 Feb 2020 at 01:29, Arseny Sher <a.sher@postgrespro.ru> wrote: > > Hi, > > While jumping around partically decoded xacts questions [1], I've read > through the copy replication slots code (9f06d79ef) and found a couple > of issues. > > 1) It seems quite reckless to me to dive into > DecodingContextFindStartpoint without actual WAL reservation (donors > slot restart_lsn is used, but it is not acquired). Why risking erroring > out with WAL removal error if the function advances new slot position to > updated donors one in the end anyway? Good catch. It's possible that DecodingContextFindStartpoint could fail when the restart_lsn of the source slot is advanced and removed required WAL. > > 2) In the end, restart_lsn of new slot is set to updated donors > one. However, confirmed_flush field is not updated. This is just wrong > -- we could start decoding too early and stream partially decoded > transaction. I think you are right. > > I'd probably avoid doing DecodingContextFindStartpoint at all. Its only > purpose is to assemble consistent snapshot (and establish corresponding > <restart_lsn, confirmed_flush_lsn> pair), but donor slot must have > already done that and we could use it as well. Was this considered? Skipping doing DecodingContextFindStartpoint while creating a new destination logical slot seems sensible to me. I've attached the draft patch fixing this issue but I'll continue investigating it more deeply. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachment
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > I've attached the draft patch fixing this issue but I'll continue > investigating it more deeply. There also should be a check that source slot itself has consistent snapshot (valid confirmed_flush) -- otherwise it might be possible to create not initialized slot which is probably not an error, but weird and somewhat meaningless. Paranoically, this ought to be checked in both src slot lookups. With this patch it seems like the only thing create_logical_replication_slot does is ReplicationSlotCreate, which questions the usefulness of this function. On the second look, CreateInitDecodingContext checks plugin sanity (ensures it exists), so probably it's fine. -- cheers, arseny
On Mon, 10 Feb 2020 at 23:01, Arseny Sher <a.sher@postgrespro.ru> wrote: > > > Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > > > I've attached the draft patch fixing this issue but I'll continue > > investigating it more deeply. > > There also should be a check that source slot itself has consistent > snapshot (valid confirmed_flush) -- otherwise it might be possible to > create not initialized slot which is probably not an error, but weird > and somewhat meaningless. Paranoically, this ought to be checked in both > src slot lookups. > > With this patch it seems like the only thing > create_logical_replication_slot does is ReplicationSlotCreate, which > questions the usefulness of this function. On the second look, > CreateInitDecodingContext checks plugin sanity (ensures it exists), so > probably it's fine. > Thank you for reviewing this patch. I've attached the updated version patch that incorporated your comments. I believe we're going in the right direction for fixing this bug. I'll register this item to the next commit fest so as not to forget. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachment
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > I've attached the updated version patch that incorporated your > comments. I believe we're going in the right direction for fixing this > bug. I'll register this item to the next commit fest so as not to > forget. I've moved confirmed_flush check to the second lookup out of paranoic considerations (e.g. slot could have been recreated and creation hasn't finished yet) and made some minor stylistic adjustments. It looks good to me now. diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2c9d5de6d9..4a3c7aa0ce 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn) + bool temporary, XLogRecPtr restart_lsn, + bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* - * Create logical decoding context, to build the initial snapshot. + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn 2) check plugin sanity. */ ctx = CreateInitDecodingContext(plugin, NIL, - false, /* do not build snapshot */ + false, /* do not build data snapshot */ restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ - DecodingContextFindStartpoint(ctx); + if (find_startpoint) + DecodingContextFindStartpoint(ctx); /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, - InvalidXLogRecPtr); + InvalidXLogRecPtr, + true); values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); @@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* Create new slot and acquire it */ if (logical_slot) + { + /* + * WAL required for building snapshot could be removed as we haven't + * reserved WAL yet. So we create a new logical replication slot + * without building an initial snapshot. A reasonable start point for + * decoding will be provided by the source slot. + */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, - src_restart_lsn); + src_restart_lsn, + false); + } else create_physical_replication_slot(NameStr(*dst_name), true, @@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) TransactionId copy_xmin; TransactionId copy_catalog_xmin; XLogRecPtr copy_restart_lsn; + XLogRecPtr copy_confirmed_flush; bool copy_islogical; char *copy_name; @@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) copy_xmin = src->data.xmin; copy_catalog_xmin = src->data.catalog_xmin; copy_restart_lsn = src->data.restart_lsn; + copy_confirmed_flush = src->data.confirmed_flush; /* for existence check */ copy_name = pstrdup(NameStr(src->data.name)); @@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) NameStr(*src_name)), errdetail("The source replication slot was modified incompatibly during the copy operation."))); + /* The source slot must have a consistent snapshot */ + if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"), + errhint("Retry when the source replication slot creation is finished."))); + /* Install copied values again */ SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->effective_xmin = copy_effective_xmin; @@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) MyReplicationSlot->data.xmin = copy_xmin; MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; MyReplicationSlot->data.restart_lsn = copy_restart_lsn; + MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush; SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotMarkDirty(); -- cheers, arseny
I wrote: > It looks good to me now. After lying for some time in my head it reminded me that CreateInitDecodingContext not only pegs the LSN, but also xmin, so attached makes a minor comment correction. While taking a look at the nearby code it seemed weird to me that GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't want to investigate this at the moment though, and not for this thread. Also not for this thread, but I've noticed pg_copy_logical_replication_slot doesn't allow to change plugin name which is an omission in my view. It would be useful and trivial to do. -- cheers, arseny diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2c9d5de6d9..da634bef0e 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn) + bool temporary, XLogRecPtr restart_lsn, + bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* - * Create logical decoding context, to build the initial snapshot. + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. */ ctx = CreateInitDecodingContext(plugin, NIL, - false, /* do not build snapshot */ + false, /* do not build data snapshot */ restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ - DecodingContextFindStartpoint(ctx); + if (find_startpoint) + DecodingContextFindStartpoint(ctx); /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, - InvalidXLogRecPtr); + InvalidXLogRecPtr, + true); values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); @@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* Create new slot and acquire it */ if (logical_slot) + { + /* + * WAL required for building snapshot could be removed as we haven't + * reserved WAL yet. So we create a new logical replication slot + * without building an initial snapshot. A reasonable start point for + * decoding will be provided by the source slot. + */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, - src_restart_lsn); + src_restart_lsn, + false); + } else create_physical_replication_slot(NameStr(*dst_name), true, @@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) TransactionId copy_xmin; TransactionId copy_catalog_xmin; XLogRecPtr copy_restart_lsn; + XLogRecPtr copy_confirmed_flush; bool copy_islogical; char *copy_name; @@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) copy_xmin = src->data.xmin; copy_catalog_xmin = src->data.catalog_xmin; copy_restart_lsn = src->data.restart_lsn; + copy_confirmed_flush = src->data.confirmed_flush; /* for existence check */ copy_name = pstrdup(NameStr(src->data.name)); @@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) NameStr(*src_name)), errdetail("The source replication slot was modified incompatibly during the copy operation."))); + /* The source slot must have a consistent snapshot */ + if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"), + errhint("Retry when the source replication slot creation is finished."))); + /* Install copied values again */ SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->effective_xmin = copy_effective_xmin; @@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) MyReplicationSlot->data.xmin = copy_xmin; MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; MyReplicationSlot->data.restart_lsn = copy_restart_lsn; + MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush; SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotMarkDirty();
On Fri, 6 Mar 2020 at 20:02, Arseny Sher <a.sher@postgrespro.ru> wrote: > > I wrote: > > > It looks good to me now. > > After lying for some time in my head it reminded me that > CreateInitDecodingContext not only pegs the LSN, but also xmin, so > attached makes a minor comment correction. > > While taking a look at the nearby code it seemed weird to me that > GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't > want to investigate this at the moment though, and not for this thread. > > Also not for this thread, but I've noticed > pg_copy_logical_replication_slot doesn't allow to change plugin name > which is an omission in my view. It would be useful and trivial to do. > Thank you for updating the patch. The patch looks basically good to me but I have a few questions: /* - * Create logical decoding context, to build the initial snapshot. + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. */ Do we need to numbering that despite not referring them? ctx = CreateInitDecodingContext(plugin, NIL, - false, /* do not build snapshot */ + false, /* do not build data snapshot */ restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); I'm not sure this change makes the comment better. Could you elaborate on the motivation of this change? Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > /* > - * Create logical decoding context, to build the initial snapshot. > + * Create logical decoding context to find start point or, if we don't > + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. > */ > > Do we need to numbering that despite not referring them? No, it just seemed clearer to me this way. I don't mind removing the numbers if you feel this is better. > ctx = CreateInitDecodingContext(plugin, NIL, > - false, /* do not build snapshot */ > + false, /* do not build data snapshot */ > restart_lsn, > logical_read_local_xlog_page, NULL, NULL, > NULL); > I'm not sure this change makes the comment better. Could you elaborate > on the motivation of this change? Well, DecodingContextFindStartpoint always builds a snapshot allowing historical *catalog* lookups. This bool controls whether the snapshot should additionally be suitable for looking at the actual data, this is e.g. used by initial data sync in the native logical replication. -- cheers, arseny
On Mon, 9 Mar 2020 at 21:46, Arseny Sher <a.sher@postgrespro.ru> wrote: > > > Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > > > /* > > - * Create logical decoding context, to build the initial snapshot. > > + * Create logical decoding context to find start point or, if we don't > > + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. > > */ > > > > Do we need to numbering that despite not referring them? > > No, it just seemed clearer to me this way. I don't mind removing the > numbers if you feel this is better. > Okay. > > ctx = CreateInitDecodingContext(plugin, NIL, > > - false, /* do not build snapshot */ > > + false, /* do not build data snapshot */ > > restart_lsn, > > logical_read_local_xlog_page, NULL, NULL, > > NULL); > > I'm not sure this change makes the comment better. Could you elaborate > > on the motivation of this change? > > Well, DecodingContextFindStartpoint always builds a snapshot allowing > historical *catalog* lookups. This bool controls whether the snapshot > should additionally be suitable for looking at the actual data, this is > e.g. used by initial data sync in the native logical replication. Okay. Anyway, since the patch looks good to me I've marked this patch as "Ready for Committer". I think we can defer these things to committers. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Thanks Arseny and Masahiko, I pushed this patch just now. I changed some comments while at it, hopefully they are improvements. On 2020-Mar-09, Masahiko Sawada wrote: > ctx = CreateInitDecodingContext(plugin, NIL, > - false, /* do not build snapshot */ > + false, /* do not build data snapshot */ > restart_lsn, > logical_read_local_xlog_page, NULL, NULL, > NULL); > > I'm not sure this change makes the comment better. Could you elaborate > on the motivation of this change? I addressed this issue by adding a comment in CreateInitDecodingContext to explain the parameter, and then reference that comment's terminology in this call. I think it ends up clearer overall -- not that this whole area is at all particularly clear. Thanks again. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Wed, 18 Mar 2020 at 04:24, Alvaro Herrera <alvherre@2ndquadrant.com> wrote:
>
> Thanks Arseny and Masahiko, I pushed this patch just now. I changed
> some comments while at it, hopefully they are improvements.
>
> On 2020-Mar-09, Masahiko Sawada wrote:
>
> > ctx = CreateInitDecodingContext(plugin, NIL,
> > - false, /* do not build snapshot */
> > + false, /* do not build data snapshot */
> > restart_lsn,
> > logical_read_local_xlog_page, NULL, NULL,
> > NULL);
> >
> > I'm not sure this change makes the comment better. Could you elaborate
> > on the motivation of this change?
>
> I addressed this issue by adding a comment in CreateInitDecodingContext
> to explain the parameter, and then reference that comment's terminology
> in this call. I think it ends up clearer overall -- not that this whole
> area is at all particularly clear.
>
> Thanks again.
>
Thank you for committing the patch! That changes look good to me.
Regards,
--
Masahiko Sawada http://www.2ndQuadrant.com/
Masahiko Sawada http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Masahiko Sawada http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services