From b72ef318efd7af105fcac6f6fc2acd86a98a7eb5 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 26 Sep 2024 12:11:34 +0800 Subject: [PATCH v30 2/7] Maintain the replication slot in logical launcher to retain dead tuples This patch enables the logical replication launcher to create and maintain a replication slot named pg_conflict_detection. The launcher periodically collects the oldest_nonremovable_xid from all apply workers. It then computes the minimum transaction ID and advances the xmin value of the replication slot if it precedes the computed value. The interval for updating the slot (nap time) is dynamically adjusted based on the activity of the apply workers. The launcher waits for a certain period before performing the next update, with the duration varying depending on whether the xmin value of the replication slot was updated during the last cycle. --- doc/src/sgml/config.sgml | 2 + doc/src/sgml/func.sgml | 14 +- doc/src/sgml/protocol.sgml | 2 + doc/src/sgml/ref/create_subscription.sgml | 4 +- src/backend/access/transam/xlogrecovery.c | 2 +- src/backend/commands/subscriptioncmds.c | 2 +- src/backend/replication/logical/launcher.c | 180 +++++++++++++++++- .../replication/logical/reorderbuffer.c | 2 +- src/backend/replication/logical/worker.c | 3 + src/backend/replication/slot.c | 34 +++- src/include/replication/logicallauncher.h | 1 + src/include/replication/slot.h | 11 +- 12 files changed, 245 insertions(+), 12 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index c1674c22cb2..58ce478c493 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4945,6 +4945,8 @@ ANY num_sync ( The name of the slot to create. Must be a valid replication slot name (see ). + The name cannot be pg_conflict_detection, as it + is reserved for logical replication conflict detection. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 57dec28a5df..eec85cde880 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -169,7 +169,9 @@ CREATE SUBSCRIPTION subscription_name Name of the publisher's replication slot to use. The default is - to use the name of the subscription for the slot name. + to use the name of the subscription for the slot name. The name cannot + be pg_conflict_detection, as it is reserved for + logical replication conflict detection. diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 6ce979f2d8b..2dcda37bc77 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -4760,7 +4760,7 @@ bool check_primary_slot_name(char **newval, void **extra, GucSource source) { if (*newval && strcmp(*newval, "") != 0 && - !ReplicationSlotValidateName(*newval, WARNING)) + !ReplicationSlotValidateName(*newval, false, WARNING)) return false; return true; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4aec73bcc6b..46d4e65da97 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -210,7 +210,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (strcmp(opts->slot_name, "none") == 0) opts->slot_name = NULL; else - ReplicationSlotValidateName(opts->slot_name, ERROR); + ReplicationSlotValidateName(opts->slot_name, false, ERROR); } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8e42787a426..8cef4460848 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -32,6 +32,7 @@ #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; -static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); @@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); +static void create_conflict_slot_if_not_exists(void); +static void advance_conflict_slot_xmin(FullTransactionId new_xmin); +static void drop_conflict_slot_if_exists(void); /* @@ -1106,7 +1109,10 @@ ApplyLauncherWakeupAtCommit(void) on_commit_launcher_wakeup = true; } -static void +/* + * Wakeup the launcher immediately. + */ +void ApplyLauncherWakeup(void) { if (LogicalRepCtx->launcher_pid != 0) @@ -1119,6 +1125,8 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { + bool slot_maybe_exist = true; + ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -1147,6 +1155,8 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + bool can_advance_xmin = true; + FullTransactionId xmin = InvalidFullTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1166,15 +1176,56 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; + /* + * Create the conflict slot before starting the worker to prevent + * it from unnecessarily maintaining its oldest_nonremovable_xid. + */ + create_conflict_slot_if_not_exists(); + if (!sub->enabled) + { + can_advance_xmin = false; continue; + } LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (w != NULL) + { + /* + * Collect non-removable transaction IDs from all apply + * workers to determine the xmin for advancing the replication + * slot used in conflict detection. + */ + if (can_advance_xmin) + { + FullTransactionId nonremovable_xid; + + SpinLockAcquire(&w->relmutex); + nonremovable_xid = w->oldest_nonremovable_xid; + SpinLockRelease(&w->relmutex); + + /* + * Stop advancing xmin if an invalid non-removable + * transaction ID is found, otherwise update xmin. + */ + if (!FullTransactionIdIsValid(nonremovable_xid)) + can_advance_xmin = false; + else if (!FullTransactionIdIsValid(xmin) || + FullTransactionIdPrecedes(nonremovable_xid, xmin)) + xmin = nonremovable_xid; + } + continue; /* worker is running already */ + } + + /* + * The worker has not yet started, so there is no valid + * non-removable transaction ID available for advancement. + */ + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1207,6 +1258,27 @@ ApplyLauncherMain(Datum main_arg) } } + /* + * Maintain the xmin value of the replication slot for conflict + * detection if needed. + */ + if (sublist) + { + if (can_advance_xmin) + advance_conflict_slot_xmin(xmin); + + slot_maybe_exist = true; + } + + /* + * Drop the slot if we're no longer retaining dead tuples. + */ + else if (slot_maybe_exist) + { + drop_conflict_slot_if_exists(); + slot_maybe_exist = false; + } + /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ @@ -1234,6 +1306,110 @@ ApplyLauncherMain(Datum main_arg) /* Not reachable */ } +/* + * Create and acquire the replication slot used to retain dead tuples for + * conflict detection, if not yet. + */ +static void +create_conflict_slot_if_not_exists(void) +{ + TransactionId xmin_horizon; + + /* Exit early if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + /* If the replication slot exists, acquire it and exit */ + if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + { + ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); + return; + } + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, + RS_PERSISTENT, false, false, false); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + xmin_horizon = GetOldestSafeDecodingTransactionId(false); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = xmin_horizon; + MyReplicationSlot->data.xmin = xmin_horizon; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); +} + +/* + * Attempt to advance the xmin value of the replication slot used to retain + * dead tuples for conflict detection. + */ +static void +advance_conflict_slot_xmin(FullTransactionId new_xmin) +{ + FullTransactionId full_xmin; + FullTransactionId next_full_xid; + + Assert(MyReplicationSlot); + Assert(FullTransactionIdIsValid(new_xmin)); + + next_full_xid = ReadNextFullTransactionId(); + + /* + * Compute FullTransactionId for the current xmin. This handles the case + * where transaction ID wraparound has occurred. + */ + full_xmin = FullTransactionIdFromAllowableAt(next_full_xid, + MyReplicationSlot->data.xmin); + + if (FullTransactionIdPrecedesOrEquals(new_xmin, full_xmin)) + return; + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.xmin = XidFromFullTransactionId(new_xmin); + SpinLockRelease(&MyReplicationSlot->mutex); + + /* first write new xmin to disk, so we know what's up after a crash */ + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); + + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only advance + * xmin here and there's not much harm done by a concurrent computation + * missing that. + */ + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(false); + + return; +} + +/* + * Drop the replication slot used to retain dead tuples for conflict detection, + * if it exists. + */ +static void +drop_conflict_slot_if_exists(void) +{ + if (MyReplicationSlot) + ReplicationSlotDropAcquired(); + else if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + ReplicationSlotDrop(CONFLICT_DETECTION_SLOT, true); +} + /* * Is current process the logical replication launcher? */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 67655111875..85239f6c316 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -4787,7 +4787,7 @@ StartupReorderBuffer(void) continue; /* if it cannot be a slot, skip the directory */ - if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) + if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2)) continue; /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ffbd4e3a02c..4922104b018 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4357,6 +4357,9 @@ wait_for_local_flush(RetainConflictInfoData *data) LSN_FORMAT_ARGS(data->remote_lsn), XidFromFullTransactionId(data->candidate_xid)); + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + /* * Reset all data fields except those used to determine the timing for the * next round of transaction ID advancement. diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87fa9cb..668279cb4e8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config; static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); +static bool IsReservedSlotName(const char *name); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg) /* * Check whether the passed slot name is valid and report errors at elevel. * + * An error will be reported for a reserved replication slot name if + * allow_reserved_name is set to false. + * * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow * the name to be used as a directory name on every supported OS. * * Returns whether the directory name is valid or not if elevel < ERROR. */ bool -ReplicationSlotValidateName(const char *name, int elevel) +ReplicationSlotValidateName(const char *name, bool allow_reserved_name, + int elevel) { const char *cp; @@ -300,9 +306,29 @@ ReplicationSlotValidateName(const char *name, int elevel) return false; } } + + if (!allow_reserved_name && IsReservedSlotName(name)) + { + ereport(elevel, + errcode(ERRCODE_RESERVED_NAME), + errmsg("replication slot name \"%s\" is reserved", + name)); + + return false; + } + return true; } +/* + * Return true if the replication slot name is "pg_conflict_detection". + */ +static bool +IsReservedSlotName(const char *name) +{ + return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0); +} + /* * Create a new replication slot and mark it as used by this backend. * @@ -330,7 +356,11 @@ ReplicationSlotCreate(const char *name, bool db_specific, Assert(MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); + /* + * The logical launcher might be creating an internal slot, so using a + * reserved name is allowed in this case. + */ + ReplicationSlotValidateName(name, IsLogicalLauncher(), ERROR); if (failover) { diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f3305..7b29f1814db 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -25,6 +25,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); extern void ApplyLauncherWakeupAtCommit(void); +extern void ApplyLauncherWakeup(void); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index eb0b93b1114..e03e123a2ff 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -20,6 +20,13 @@ /* directory to store replication slot data in */ #define PG_REPLSLOT_DIR "pg_replslot" +/* + * The reserved name for a replication slot used to retain dead tuples for + * conflict detection in logical replication. See + * maybe_advance_nonremovable_xid() for detail. + */ +#define CONFLICT_DETECTION_SLOT "pg_conflict_detection" + /* * Behaviour of replication slots, upon release or crash. * @@ -284,7 +291,9 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern void ReplicationSlotInitialize(void); -extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern bool ReplicationSlotValidateName(const char *name, + bool allow_reserved_name, + int elevel); extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); -- 2.30.0.windows.2