Re: logical copy_replication_slot issues - Mailing list pgsql-hackers
From | Arseny Sher |
---|---|
Subject | Re: logical copy_replication_slot issues |
Date | |
Msg-id | 87imjh7nky.fsf@ars-thinkpad Whole thread Raw |
In response to | Re: logical copy_replication_slot issues (Arseny Sher <a.sher@postgrespro.ru>) |
Responses |
Re: logical copy_replication_slot issues
|
List | pgsql-hackers |
I wrote: > It looks good to me now. After lying for some time in my head it reminded me that CreateInitDecodingContext not only pegs the LSN, but also xmin, so attached makes a minor comment correction. While taking a look at the nearby code it seemed weird to me that GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't want to investigate this at the moment though, and not for this thread. Also not for this thread, but I've noticed pg_copy_logical_replication_slot doesn't allow to change plugin name which is an omission in my view. It would be useful and trivial to do. -- cheers, arseny diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2c9d5de6d9..da634bef0e 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn) + bool temporary, XLogRecPtr restart_lsn, + bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* - * Create logical decoding context, to build the initial snapshot. + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. */ ctx = CreateInitDecodingContext(plugin, NIL, - false, /* do not build snapshot */ + false, /* do not build data snapshot */ restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ - DecodingContextFindStartpoint(ctx); + if (find_startpoint) + DecodingContextFindStartpoint(ctx); /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, - InvalidXLogRecPtr); + InvalidXLogRecPtr, + true); values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); @@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* Create new slot and acquire it */ if (logical_slot) + { + /* + * WAL required for building snapshot could be removed as we haven't + * reserved WAL yet. So we create a new logical replication slot + * without building an initial snapshot. A reasonable start point for + * decoding will be provided by the source slot. + */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, - src_restart_lsn); + src_restart_lsn, + false); + } else create_physical_replication_slot(NameStr(*dst_name), true, @@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) TransactionId copy_xmin; TransactionId copy_catalog_xmin; XLogRecPtr copy_restart_lsn; + XLogRecPtr copy_confirmed_flush; bool copy_islogical; char *copy_name; @@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) copy_xmin = src->data.xmin; copy_catalog_xmin = src->data.catalog_xmin; copy_restart_lsn = src->data.restart_lsn; + copy_confirmed_flush = src->data.confirmed_flush; /* for existence check */ copy_name = pstrdup(NameStr(src->data.name)); @@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) NameStr(*src_name)), errdetail("The source replication slot was modified incompatibly during the copy operation."))); + /* The source slot must have a consistent snapshot */ + if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"), + errhint("Retry when the source replication slot creation is finished."))); + /* Install copied values again */ SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->effective_xmin = copy_effective_xmin; @@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) MyReplicationSlot->data.xmin = copy_xmin; MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; MyReplicationSlot->data.restart_lsn = copy_restart_lsn; + MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush; SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotMarkDirty();
pgsql-hackers by date: