From 2e9b2d343f69c9d80d3a37249283b8ba9632b1d5 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 12 Aug 2025 23:02:37 -0400 Subject: [PATCH v6] Improve initial slot synchronization in pg_sync_replication_slots() During initial slot synchronization on a standby, the operation may fail if required catalog rows or WALs have been removed or are at risk of removal. The slotsync worker handles this by creating a temporary slot for initial sync and retain it even in case of failure. It will keep retrying until the slot on the primary has been advanced to a position where all the required data are also available on the standby. However, pg_sync_replication_slots() had no such protection mechanism. The SQL API would fail immediately if synchronization requirements weren't met. This could lead to permanent failure as the standby might continue removing the still-required data. To address this, we now make pg_sync_replication_slots() wait for the primary slot to advance to a suitable position before completing synchronization and before removing the temporary slot. Once the slot advances to a suitable position, we retry synchronization. Additionally, if a promotion occurs on the standby during this wait, the process exits gracefully and the temporary slot is removed. --- doc/src/sgml/func/func-admin.sgml | 4 +- doc/src/sgml/logicaldecoding.sgml | 40 +-- src/backend/replication/logical/slotsync.c | 334 ++++++++++++++++++------ src/backend/utils/activity/wait_event_names.txt | 2 +- 4 files changed, 262 insertions(+), 118 deletions(-) diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 446fdfe..3608610 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1478,9 +1478,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionally, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 77c720c..6e4251a 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU Replication Slot Synchronization - The logical replication slots on the primary can be synchronized to - the hot standby by using the failover parameter of + The logical replication slots on the primary can be enabled for + synchronization to the hot standby by using the + failover parameter of pg_create_logical_replication_slot, or by using the failover option of - CREATE SUBSCRIPTION during slot creation. - Additionally, enabling - sync_replication_slots on the standby - is required. By enabling - sync_replication_slots - on the standby, the failover slots can be synchronized periodically in + CREATE SUBSCRIPTION during slot creation. After that, + synchronization can be be performed either manually by calling + + pg_sync_replication_slots + on the standby, or automatically by enabling + + sync_replication_slots on the standby. + When + sync_replication_slots is enabled + on the standby, the failover slots are periodically synchronized by the slotsync worker. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby (i.e., primary_slot_name @@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU receiving the WAL up to the latest flushed position on the primary server. - - - While enabling - sync_replication_slots allows for automatic - periodic synchronization of failover slots, they can also be manually - synchronized using the - pg_sync_replication_slots function on the standby. - However, this function is primarily intended for testing and debugging and - should be used with caution. Unlike automatic synchronization, it does not - include cyclic retries, making it more prone to synchronization failures, - particularly during initial sync scenarios where the required WAL files - or catalog rows for the slot might have already been removed or are at risk - of being removed on the standby. In contrast, automatic synchronization - via sync_replication_slots provides continuous slot - updates, enabling seamless failover and supporting high availability. - Therefore, it is the recommended method for synchronizing slots. - - - When slot synchronization is configured as recommended, and the initial synchronization is performed either automatically or diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 3773844..09833aa 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -99,6 +99,8 @@ typedef struct SlotSyncCtxStruct bool syncing; time_t last_start_time; slock_t mutex; + /* used by pg_sync_replication_slots() API only */ + bool slot_not_persisted; } SlotSyncCtxStruct; static SlotSyncCtxStruct *SlotSyncCtx = NULL; @@ -113,6 +115,7 @@ bool sync_replication_slots = false; */ #define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */ +#define SLOTSYNC_API_NAPTIME_MS 2000 /* 2s */ static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS; @@ -146,6 +149,7 @@ typedef struct RemoteSlot ReplicationSlotInvalidationCause invalidated; } RemoteSlot; +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); static void slotsync_failure_callback(int code, Datum arg); static void update_synced_slots_inactive_since(void); @@ -211,13 +215,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, * impact the users, so we used DEBUG1 level to log the message. */ ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1, - errmsg("could not synchronize replication slot \"%s\"", - remote_slot->name), - errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.", - LSN_FORMAT_ARGS(remote_slot->restart_lsn), - remote_slot->catalog_xmin, - LSN_FORMAT_ARGS(slot->data.restart_lsn), - slot->data.catalog_xmin)); + errmsg("Replication slot \"%s\" is not sync ready; will keep retrying", + remote_slot->name), + errdetail("Attempting Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.", + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + slot->data.catalog_xmin)); if (remote_slot_precedes) *remote_slot_precedes = true; @@ -558,7 +562,8 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * false. */ static bool -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(WalReceiverConn * wrconn, + RemoteSlot * remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; @@ -575,13 +580,18 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) if (remote_slot_precedes) { /* - * The remote slot didn't catch up to locally reserved position. + * The remote slot didn't catch up to locally reserved + * position. * - * We do not drop the slot because the restart_lsn can be ahead of the - * current location when recreating the slot in the next cycle. It may - * take more time to create such a slot. Therefore, we keep this slot - * and attempt the synchronization in the next cycle. + * We do not drop the slot because the restart_lsn can be + * ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a + * slot. Therefore, we keep this slot and attempt the + * synchronization in the next cycle. Update flag, so that + * API logic can retry. */ + SlotSyncCtx->slot_not_persisted = true; + return false; } @@ -596,11 +606,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.", LSN_FORMAT_ARGS(slot->data.restart_lsn))); + /* update flag, so that we retry */ + SlotSyncCtx->slot_not_persisted = true; + return false; } ReplicationSlotPersist(); + /* slot has been persisted, no need to retry */ + SlotSyncCtx->slot_not_persisted = false; + ereport(LOG, errmsg("newly created replication slot \"%s\" is sync-ready now", remote_slot->name)); @@ -622,7 +638,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +synchronize_one_slot(WalReceiverConn * wrconn, RemoteSlot * remote_slot, + Oid remote_dbid) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr; @@ -715,7 +732,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Slot not ready yet, let's attempt to make it sync-ready now. */ if (slot->data.persistency == RS_TEMPORARY) { - slot_updated = update_and_persist_local_synced_slot(remote_slot, + slot_updated = update_and_persist_local_synced_slot(wrconn, + remote_slot, remote_dbid); } @@ -785,7 +803,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid); slot_updated = true; } @@ -796,46 +814,87 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) } /* - * Synchronize slots. + * Fetch or refresh remote slots. * - * Gets the failover logical slots info from the primary server and updates - * the slots locally. Creates the slots if not present on the standby. + * If remote_slot_list is NIL, fetches all failover logical slots from the + * primary server. If remote_slot_list is provided, refreshes only those + * specific slots with current values from the primary server. * - * Returns TRUE if any of the slots gets updated in this sync-cycle. + * NOTE: Caller must ensure a transaction is active before calling this + * function. + * + * Parameters: + * wrconn - Connection to the primary server + * remote_slot_list - List of RemoteSlot structures to refresh, or NIL to + * fetch all failover slots + * + * Returns a list of RemoteSlot structures. If refreshing and the query fails, + * returns the original list. Slots that no longer exist on the primary will + * be removed from the list. */ -static bool -synchronize_slots(WalReceiverConn *wrconn) +static List * +fetch_or_refresh_remote_slots(WalReceiverConn *wrconn, List *remote_slot_list) { #define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID}; - WalRcvExecResult *res; TupleTableSlot *tupslot; - List *remote_slot_list = NIL; - bool some_slot_updated = false; - bool started_tx = false; - const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," - " database, invalidation_reason" - " FROM pg_catalog.pg_replication_slots" - " WHERE failover and NOT temporary"; - - /* The syscache access in walrcv_exec() needs a transaction env. */ - if (!IsTransactionState()) + List *updated_slot_list = NIL; + StringInfoData query; + ListCell *lc; + bool is_refresh = (remote_slot_list != NIL); + bool first_slot = true; + + /* Build the query based on whether we're fetching all or refreshing specific slots */ + initStringInfo(&query); + appendStringInfoString(&query, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," + " database, invalidation_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + if (is_refresh) { - StartTransactionCommand(); - started_tx = true; + /* Add IN clause for specific slot names */ + appendStringInfoString(&query, " AND slot_name IN ("); + + foreach(lc, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + if (!first_slot) + appendStringInfoString(&query, ", "); + + appendStringInfo(&query, "'%s'", remote_slot->name); + first_slot = false; + } + appendStringInfoString(&query, ")"); } /* Execute the query */ - res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); + res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow); if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - errmsg("could not fetch failover logical slots info from the primary server: %s", - res->err)); + { + if (is_refresh) + { + ereport(WARNING, + errmsg("could not fetch updated failover logical slots info" + " from the primary server: %s", + res->err)); + pfree(query.data); + return remote_slot_list; /* Return original list on refresh failure */ + } + else + { + ereport(ERROR, + errmsg("could not fetch failover logical slots info from the primary server: %s", + res->err)); + } + } - /* Construct the remote_slot tuple and synchronize each slot locally */ + /* Process the slot information */ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) { @@ -853,8 +912,8 @@ synchronize_slots(WalReceiverConn *wrconn) Assert(!isnull); /* - * It is possible to get null values for LSN and Xmin if slot is - * invalidated on the primary server, so handle accordingly. + * Handle possible null values for LSN and Xmin if slot is + * invalidated on the primary server. */ d = slot_getattr(tupslot, ++col, &isnull); remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr : @@ -890,15 +949,8 @@ synchronize_slots(WalReceiverConn *wrconn) Assert(col == SLOTSYNC_COLUMN_COUNT); /* - * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the - * slot is valid, that means we have fetched the remote_slot in its - * RS_EPHEMERAL state. In such a case, don't sync it; we can always - * sync it in the next sync cycle when the remote_slot is persisted - * and has valid lsn(s) and xmin values. - * - * XXX: In future, if we plan to expose 'slot->data.persistency' in - * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL - * slots in the first place. + * Apply ephemeral slot filtering. Skip slots that are in RS_EPHEMERAL + * state (invalid LSNs/xmin but not explicitly invalidated). */ if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) || XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || @@ -906,12 +958,42 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated == RS_INVAL_NONE) pfree(remote_slot); else - /* Create list of remote slots */ - remote_slot_list = lappend(remote_slot_list, remote_slot); + /* Add to updated list */ + updated_slot_list = lappend(updated_slot_list, remote_slot); ExecClearTuple(tupslot); } + walrcv_clear_result(res); + pfree(query.data); + + /* If refreshing, free the original list structures */ + if (is_refresh) + { + foreach(lc, remote_slot_list) + { + RemoteSlot *old_slot = (RemoteSlot *) lfirst(lc); + pfree(old_slot); + } + list_free(remote_slot_list); + } + + return updated_slot_list; +} + +/* + * Synchronize slots. + * + * Takes a list of remote slots and synchronizes them locally. Creates the + * slots if not present on the standby and updates existing ones. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list) +{ + bool some_slot_updated = false; + /* Drop local slots that no longer need to be synced. */ drop_local_obsolete_slots(remote_slot_list); @@ -927,19 +1009,12 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(wrconn, remote_slot, + remote_dbid); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } - /* We are done, free remote_slot_list elements */ - list_free_deep(remote_slot_list); - - walrcv_clear_result(res); - - if (started_tx) - CommitTransactionCommand(); - return some_slot_updated; } @@ -1131,7 +1206,7 @@ slotsync_reread_config(void) bool conninfo_changed; bool primary_slotname_changed; - Assert(sync_replication_slots); + Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots); ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); @@ -1252,31 +1327,38 @@ slotsync_worker_onexit(int code, Datum arg) * sync-cycles is reset to the minimum (200ms). */ static void -wait_for_slot_activity(bool some_slot_updated) +wait_for_slot_activity(bool some_slot_updated, bool called_from_api) { - int rc; + int rc; + int wait_time; - if (!some_slot_updated) - { + if (called_from_api) { /* - * No slots were updated, so double the sleep time, but not beyond the - * maximum allowable value. + * When called from pg_sync_replication_slots, use a fixed 2 + * second wait time. */ - sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS); - } - else - { - /* - * Some slots were updated since the last sleep, so reset the sleep - * time. - */ - sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS; + wait_time = SLOTSYNC_API_NAPTIME_MS; + } else { + if (!some_slot_updated) { + /* + * No slots were updated, so double the sleep time, + * but not beyond the maximum allowable value. + */ + sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS); + } else { + /* + * Some slots were updated since the last sleep, so + * reset the sleep time. + */ + sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS; + } + wait_time = sleep_ms; } rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - sleep_ms, - WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN); + wait_time, + WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP); if (rc & WL_LATCH_SET) ResetLatch(MyLatch); @@ -1505,12 +1587,28 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) for (;;) { bool some_slot_updated = false; + List *remote_slots; + bool started_tx = false; ProcessSlotSyncInterrupts(wrconn); - some_slot_updated = synchronize_slots(wrconn); + /* + * The syscache access in fetch_or_refresh_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_or_refresh_remote_slots(wrconn, NIL); + some_slot_updated = synchronize_slots(wrconn, remote_slots); + list_free_deep(remote_slots); - wait_for_slot_activity(some_slot_updated); + if (started_tx) + CommitTransactionCommand(); + + wait_for_slot_activity(some_slot_updated, false); } /* @@ -1736,19 +1834,81 @@ slotsync_failure_callback(int code, Datum arg) } /* - * Synchronize the failover enabled replication slots using the specified - * primary server connection. + * Synchronize failover enabled replication slots using the specified primary + * server connection. + * + * Repeatedly fetches and updates replication slot information from the + * primary until all slots are at least "sync ready". Retry is done after 2 + * sec wait. Exits early is promotion is triggered. */ void -SyncReplicationSlots(WalReceiverConn *wrconn) +SyncReplicationSlots(WalReceiverConn * wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { + List *remote_slots; + bool started_tx = false; + check_and_set_sync_info(InvalidPid); validate_remote_info(wrconn); - synchronize_slots(wrconn); + /* + * The syscache access in fetch_or_refresh_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_or_refresh_remote_slots(wrconn, NIL); + + /* Retry until all slots are sync ready atleast */ + for (;;) + { + bool some_slot_updated = false; + + /* + * Refresh the remote slot data. We keep using the original slot + * list, even if some slots are already sync ready, so that all + * slots are updated with the latest status from the primary. + */ + remote_slots = fetch_or_refresh_remote_slots(wrconn, remote_slots); + + /* Attempt to synchronize slots */ + some_slot_updated = synchronize_slots(wrconn, remote_slots); + + /* Done if all slots are atleast sync ready */ + if (!SlotSyncCtx->slot_not_persisted) + break; + else + { + /* wait for 2 seconds before retrying */ + wait_for_slot_activity(some_slot_updated, true); + + /* + * If we've been promoted, then no point + * continuing. + */ + if (SlotSyncCtx->stopSignaled) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization as" + " promotion is triggered"))); + break; + } + + /* Handle any termination request if any */ + ProcessSlotSyncInterrupts(wrconn); + } + } + + list_free_deep(remote_slots); + + if (started_tx) + CommitTransactionCommand(); /* Cleanup the synced temporary slots */ ReplicationSlotCleanup(true); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 0be307d..3497f0f 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." -REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." +REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." -- 1.8.3.1