From 24ff5a032267eeb918582ce84bf9e511636c676f Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 2 Oct 2025 21:08:17 +1000 Subject: [PATCH v15] Improve initial slot synchronization in pg_sync_replication_slots() During initial slot synchronization on a standby, the operation may fail if required catalog rows or WALs have been removed or are at risk of removal. The slotsync worker handles this by creating a temporary slot for initial sync and retain it even in case of failure. It will keep retrying until the slot on the primary has been advanced to a position where all the required data are also available on the standby. However, pg_sync_replication_slots() had no such protection mechanism. The SQL API would fail immediately if synchronization requirements weren't met. This could lead to permanent failure as the standby might continue removing the still-required data. To address this, we now make pg_sync_replication_slots() wait for the primary slot to advance to a suitable position before completing synchronization and before removing the temporary slot. Once the slot advances to a suitable position, we retry synchronization. Additionally, if a promotion occurs on the standby during this wait, the process exits gracefully and the temporary slot is removed. --- doc/src/sgml/func/func-admin.sgml | 4 +- doc/src/sgml/logicaldecoding.sgml | 35 +- src/backend/replication/logical/slotsync.c | 367 +++++++++++++++--- .../utils/activity/wait_event_names.txt | 2 +- 4 files changed, 333 insertions(+), 75 deletions(-) diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 1b465bc8ba7..2896cd9e429 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1497,9 +1497,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionally, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index b803a819cf1..504c79f2fd2 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -370,12 +370,16 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU pg_create_logical_replication_slot, or by using the failover option of - CREATE SUBSCRIPTION during slot creation. - Additionally, enabling - sync_replication_slots on the standby - is required. By enabling - sync_replication_slots - on the standby, the failover slots can be synchronized periodically in + CREATE SUBSCRIPTION during slot creation. After that, + synchronization can be performed either manually by calling + + pg_sync_replication_slots + on the standby, or automatically by enabling + + sync_replication_slots on the standby. + When + sync_replication_slots is enabled + on the standby, the failover slots are periodically synchronized by the slotsync worker. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby (i.e., primary_slot_name @@ -398,25 +402,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU receiving the WAL up to the latest flushed position on the primary server. - - - While enabling - sync_replication_slots allows for automatic - periodic synchronization of failover slots, they can also be manually - synchronized using the - pg_sync_replication_slots function on the standby. - However, this function is primarily intended for testing and debugging and - should be used with caution. Unlike automatic synchronization, it does not - include cyclic retries, making it more prone to synchronization failures, - particularly during initial sync scenarios where the required WAL files - or catalog rows for the slot might have already been removed or are at risk - of being removed on the standby. In contrast, automatic synchronization - via sync_replication_slots provides continuous slot - updates, enabling seamless failover and supporting high availability. - Therefore, it is the recommended method for synchronizing slots. - - - When slot synchronization is configured as recommended, and the initial synchronization is performed either automatically or diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8c061d55bdb..3ba2e500c92 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -39,6 +39,12 @@ * the last cycle. Refer to the comments above wait_for_slot_activity() for * more details. * + * If the pg_sync_replication API is used to sync the slots, and if the slots + * are not ready to be synced and are marked as RS_TEMPORARY because of any of + * the reasons mentioned above, then the API also waits and retries until the + * slots are marked as RS_PERSISTENT (which means sync-ready). Refer to the + * comments in SyncReplicationSlots() for more details. + * * Any standby synchronized slots will be dropped if they no longer need * to be synchronized. See comment atop drop_local_obsolete_slots() for more * details. @@ -64,6 +70,7 @@ #include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/timeout.h" @@ -100,6 +107,16 @@ typedef struct SlotSyncCtxStruct slock_t mutex; } SlotSyncCtxStruct; +/* + * Structure holding parameters that need to be freed on error in + * pg_sync_replication_slots() + */ +typedef struct SlotSyncApiFailureParams +{ + WalReceiverConn *wrconn; + List *slot_names; +} SlotSyncApiFailureParams; + static SlotSyncCtxStruct *SlotSyncCtx = NULL; /* GUC variable */ @@ -112,6 +129,7 @@ bool sync_replication_slots = false; */ #define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */ +#define SLOTSYNC_API_NAPTIME_MS 2000 /* 2s */ static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS; @@ -147,6 +165,7 @@ typedef struct RemoteSlot static void slotsync_failure_callback(int code, Datum arg); static void update_synced_slots_inactive_since(void); +static void slotsync_api_reread_config(void); /* * If necessary, update the local synced slot's metadata based on the data @@ -553,11 +572,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * local ones, then update the LSNs and persist the local synced slot for * future synchronization; otherwise, do nothing. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. It is utilized by the pg_sync_replication_slots() API. + * * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise * false. */ static bool -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; @@ -576,11 +599,18 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* * The remote slot didn't catch up to locally reserved position. * - * We do not drop the slot because the restart_lsn can be ahead of the - * current location when recreating the slot in the next cycle. It may - * take more time to create such a slot. Therefore, we keep this slot - * and attempt the synchronization in the next cycle. + * We do not drop the slot because the restart_lsn can be + * ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a + * slot. Therefore, we keep this slot and attempt the + * synchronization in the next cycle. + * + * We also update the slot_persistence_pending parameter, so + * the API can retry. */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -595,6 +625,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.", LSN_FORMAT_ARGS(slot->data.restart_lsn))); + /* Set this, so that API can retry */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -618,10 +652,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * updated. The slot is then persisted and is considered as sync-ready for * periodic syncs. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. It is utilized by the pg_sync_replication_slots() API. + * * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr; @@ -715,7 +753,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) if (slot->data.persistency == RS_TEMPORARY) { slot_updated = update_and_persist_local_synced_slot(remote_slot, - remote_dbid); + remote_dbid, + slot_persistence_pending); } /* Slot ready for sync, so sync it. */ @@ -784,7 +823,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + update_and_persist_local_synced_slot(remote_slot, remote_dbid, + slot_persistence_pending); slot_updated = true; } @@ -795,15 +835,23 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) } /* - * Synchronize slots. + * Fetch remote slots. * - * Gets the failover logical slots info from the primary server and updates - * the slots locally. Creates the slots if not present on the standby. + * If slot_names is NIL, fetches all failover logical slots from the + * primary server, otherwise fetches only the ones with names in slot_names. + * + * Parameters: + * wrconn - Connection to the primary server + * slot_names - List of slot names (char *) to fetch from primary, + * or NIL to fetch all failover logical slots. + * + * Returns: + * List of remote slot information structures. Returns NIL if no slot + * is found. * - * Returns TRUE if any of the slots gets updated in this sync-cycle. */ -static bool -synchronize_slots(WalReceiverConn *wrconn) +static List * +fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names) { #define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, @@ -812,29 +860,47 @@ synchronize_slots(WalReceiverConn *wrconn) WalRcvExecResult *res; TupleTableSlot *tupslot; List *remote_slot_list = NIL; - bool some_slot_updated = false; - bool started_tx = false; - const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," - " database, invalidation_reason" - " FROM pg_catalog.pg_replication_slots" - " WHERE failover and NOT temporary"; - - /* The syscache access in walrcv_exec() needs a transaction env. */ - if (!IsTransactionState()) + StringInfoData query; + + initStringInfo(&query); + appendStringInfoString(&query, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase," + " two_phase_at, failover," + " database, invalidation_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + if (slot_names != NIL) { - StartTransactionCommand(); - started_tx = true; + ListCell *lc; + bool first_slot = true; + + /* + * Construct the query to fetch only the specified slots + */ + appendStringInfoString(&query, " AND slot_name IN ("); + + foreach(lc, slot_names) + { + char *slot_name = (char *) lfirst(lc); + + if (!first_slot) + appendStringInfoString(&query, ", "); + + appendStringInfo(&query, "'%s'", slot_name); + first_slot = false; + } + appendStringInfoString(&query, ")"); } /* Execute the query */ - res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); + res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, errmsg("could not fetch failover logical slots info from the primary server: %s", res->err)); - /* Construct the remote_slot tuple and synchronize each slot locally */ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) { @@ -885,7 +951,6 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated = isnull ? RS_INVAL_NONE : GetSlotInvalidationCause(TextDatumGetCString(d)); - /* Sanity check */ Assert(col == SLOTSYNC_COLUMN_COUNT); /* @@ -905,12 +970,38 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated == RS_INVAL_NONE) pfree(remote_slot); else - /* Create list of remote slots */ remote_slot_list = lappend(remote_slot_list, remote_slot); ExecClearTuple(tupslot); } + walrcv_clear_result(res); + pfree(query.data); + + return remote_slot_list; +} + +/* + * Synchronize slots. + * + * Takes a list of remote slots and synchronizes them locally. Creates the + * slots if not present on the standby and updates existing ones. + * + * Parameters: + * wrconn - Connection to the primary server + * remote_slot_list - List of RemoteSlot structures to synchronize. + * slot_persistence_pending - boolean used by pg_sync_replication_slots + * API to track if any slots could not be + * persisted and need to be retried. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list, + bool *slot_persistence_pending) +{ + bool some_slot_updated = false; + /* Drop local slots that no longer need to be synced. */ drop_local_obsolete_slots(remote_slot_list); @@ -926,19 +1017,12 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid, + slot_persistence_pending); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } - /* We are done, free remote_slot_list elements */ - list_free_deep(remote_slot_list); - - walrcv_clear_result(res); - - if (started_tx) - CommitTransactionCommand(); - return some_slot_updated; } @@ -1186,6 +1270,26 @@ ProcessSlotSyncInterrupts(void) slotsync_reread_config(); } +/* + * Interrupt handler for pg_sync_replication_slots() API. + */ +static void +ProcessSlotSyncAPIInterrupts() +{ + CHECK_FOR_INTERRUPTS(); + + /* If we've been promoted, then no point continuing. */ + if (SlotSyncCtx->stopSignaled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot continue replication slots synchronization" + " as standby promotion is triggered"))); + + /* error out if configuration parameters changed */ + if (ConfigReloadPending) + slotsync_api_reread_config(); +} + /* * Connection cleanup function for slotsync worker. * @@ -1275,7 +1379,7 @@ wait_for_slot_activity(bool some_slot_updated) rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, sleep_ms, - WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN); + WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP); if (rc & WL_LATCH_SET) ResetLatch(MyLatch); @@ -1505,10 +1609,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) for (;;) { bool some_slot_updated = false; + bool started_tx = false; + List *remote_slots; ProcessSlotSyncInterrupts(); - some_slot_updated = synchronize_slots(wrconn); + /* + * The syscache access in fetch_or_refresh_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_remote_slots(wrconn, NIL); + some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL); + list_free_deep(remote_slots); + + if (started_tx) + CommitTransactionCommand(); wait_for_slot_activity(some_slot_updated); } @@ -1705,7 +1826,8 @@ SlotSyncShmemInit(void) static void slotsync_failure_callback(int code, Datum arg) { - WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg); + SlotSyncApiFailureParams *fparams = + (SlotSyncApiFailureParams *) DatumGetPointer(arg); /* * We need to do slots cleanup here just like WalSndErrorCleanup() does. @@ -1732,23 +1854,176 @@ slotsync_failure_callback(int code, Datum arg) if (syncing_slots) reset_syncing_flag(); - walrcv_disconnect(wrconn); + if (fparams->slot_names) + list_free_deep(fparams->slot_names); + + walrcv_disconnect(fparams->wrconn); +} + +/* + * Helper function to extract slot names from a list of remote slots + */ +static List * +extract_slot_names(List *remote_slots) +{ + List *slot_names = NIL; + ListCell *lc; + MemoryContext oldcontext; + + /* Switch to long-lived TopMemoryContext to store slot names */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + foreach(lc, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + char *slot_name; + + slot_name = pstrdup(remote_slot->name); + slot_names = lappend(slot_names, slot_name); + } + + MemoryContextSwitchTo(oldcontext); + + return slot_names; +} + +/* + * Re-read the config file and check for critical parameter changes. + * + */ +static void +slotsync_api_reread_config(void) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + + pfree(old_primary_conninfo); + pfree(old_primary_slotname); + + /* throw error for certain parameter changes */ + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(ERROR, + (errcode(ERRCODE_CONFIG_FILE_ERROR), + errmsg("cannot continue slot synchronization due" + " to parameter changes"), + errdetail("One or more of primary_conninfo," + " primary_slot_name or hot_standby_feedback" + " were modified" + errhint("Retry pg_sync_replication_slots() to use the" + " updated configuration."))); + } } /* * Synchronize the failover enabled replication slots using the specified * primary server connection. + * + * Repeatedly fetches and updates replication slot information from the + * primary until all slots are at least "sync ready". Retry is done after + * SLOTSYNC_API_NAPTIME_MS wait. Exits early if promotion is triggered or + * certain critical configuration parameters have changed. */ void SyncReplicationSlots(WalReceiverConn *wrconn) { - PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); + SlotSyncApiFailureParams fparams; + + fparams.wrconn = wrconn; + fparams.slot_names = NULL; + + PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams)); { + List *remote_slots = NIL; + List *slot_names = NIL; /* List of slot names to track */ + check_and_set_sync_info(InvalidPid); validate_remote_info(wrconn); - synchronize_slots(wrconn); + /* Retry until all the slots are sync-ready */ + for (;;) + { + int rc; + bool started_tx = false; + bool slot_persistence_pending = false; + + /* Reset flag before every iteration */ + slot_persistence_pending = false; + + /* Check for interrupts and config changes */ + ProcessSlotSyncAPIInterrupts(); + + /* + * The syscache access in fetch_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Fetch remote slot info for the given slot_names. If slot_names is NIL, + * fetch all failover-enabled slots. Note that we reuse slot_names from + * the first iteration; re-fetching all failover slots each time could + * cause an endless loop. Instead of reprocessing only the pending slots + * in each iteration, it's better to process all the slots received in + * the first iteration. This ensures that by the time we're done, all + * slots reflect the latest values. + */ + remote_slots = fetch_remote_slots(wrconn, slot_names); + + /* Attempt to synchronize slots */ + synchronize_slots(wrconn, remote_slots, &slot_persistence_pending); + + /* + * If slot_persistence_pending is true, extract slot names + * for future iterations (only needed if we haven't done it yet) + */ + if (slot_names == NIL && slot_persistence_pending) + { + slot_names = extract_slot_names(remote_slots); + + /* Update the failure structure so that it can be freed on error */ + fparams.slot_names = slot_names; + } + + /* Free the current remote_slots list */ + list_free_deep(remote_slots); + + /* Commit transaction if we started it */ + if (started_tx) + CommitTransactionCommand(); + + /* Done if all slots are persisted i.e are sync-ready */ + if (!slot_persistence_pending) + break; + + /* Wait before retrying */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + SLOTSYNC_API_NAPTIME_MS, + WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + + } + + if (slot_names) + list_free_deep(slot_names); /* Cleanup the synced temporary slots */ ReplicationSlotCleanup(true); @@ -1756,5 +2031,5 @@ SyncReplicationSlots(WalReceiverConn *wrconn) /* We are done with sync, so reset sync flag */ reset_syncing_flag(); } - PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); + PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams)); } diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 7553f6eacef..16b3b04d3c4 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." -REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." +REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." -- 2.47.3