From 60d18dd19bed14e85673a73a01fc1e5a8df46921 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 3 Dec 2025 14:16:44 +1100 Subject: [PATCH v28] Improve initial slot synchronization in pg_sync_replication_slots() During initial slot synchronization on a standby, the operation may fail if required catalog rows or WALs have been removed or are at risk of removal. The slotsync worker handles this by creating a temporary slot for initial sync and retain it even in case of failure. It will keep retrying until the slot on the primary has been advanced to a position where all the required data are also available on the standby. However, pg_sync_replication_slots() had no such protection mechanism. The SQL API would fail immediately if synchronization requirements weren't met. This could lead to permanent failure as the standby might continue removing the still-required data. To address this, we now make pg_sync_replication_slots() wait for the primary slot to advance to a suitable position before completing synchronization and before removing the temporary slot. Once the slot advances to a suitable position, we retry synchronization. Additionally, if a promotion occurs on the standby during this wait, the process exits gracefully and the temporary slot is removed. Author: Ajin Cherian Author: Hou Zhijie Reviewed-by: Shveta Malik Reviewed-by: Japin Li Reviewed-by: Ashutosh Bapat Reviewed-by: Ashutosh Sharma Reviewed-by: Chao Li Reviewed-by: Amit Kapila Discussion: https://www.postgresql.org/message-id/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com --- doc/src/sgml/func/func-admin.sgml | 4 +- doc/src/sgml/logicaldecoding.sgml | 11 +- src/backend/replication/logical/slotsync.c | 364 ++++++++++++++---- .../utils/activity/wait_event_names.txt | 2 +- .../t/040_standby_failover_slots_sync.pl | 59 ++- 5 files changed, 339 insertions(+), 101 deletions(-) diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 1b465bc8ba7..2896cd9e429 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1497,9 +1497,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionally, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index d5a5e22fe2c..33940504622 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -405,12 +405,11 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU periodic synchronization of failover slots, they can also be manually synchronized using the pg_sync_replication_slots function on the standby. - However, this function is primarily intended for testing and debugging and - should be used with caution. Unlike automatic synchronization, it does not - include cyclic retries, making it more prone to synchronization failures, - particularly during initial sync scenarios where the required WAL files - or catalog rows for the slot might have already been removed or are at risk - of being removed on the standby. In contrast, automatic synchronization + However, unlike automatic synchronization, it does not perform incremental + updates. It retries cyclically to some extent—continuing until all + the failover slots that existed on primary at the start of the function + call are synchronized. Any slots created after the function begins will + not be synchronized. In contrast, automatic synchronization via sync_replication_slots provides continuous slot updates, enabling seamless failover and supporting high availability. Therefore, it is the recommended method for synchronizing slots. diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 53c7d629239..36106dd35e1 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -39,6 +39,12 @@ * the last cycle. Refer to the comments above wait_for_slot_activity() for * more details. * + * If the SQL function pg_sync_replication is used to sync the slots, and if + * the slots are not ready to be synced and are marked as RS_TEMPORARY because + * of any of the reasons mentioned above, then the SQL function also waits and + * retries until the slots are marked as RS_PERSISTENT (which means sync-ready). + * Refer to the comments in SyncReplicationSlots() for more details. + * * Any standby synchronized slots will be dropped if they no longer need * to be synchronized. See comment atop drop_local_obsolete_slots() for more * details. @@ -64,6 +70,7 @@ #include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/timeout.h" @@ -71,9 +78,11 @@ /* * Struct for sharing information to control slot synchronization. * - * The slot sync worker's pid is needed by the startup process to shut it - * down during promotion. The startup process shuts down the slot sync worker - * and also sets stopSignaled=true to handle the race condition when the + * The pid is either the slot sync worker's pid or the backend's pid running + * the SQL function pg_sync_replication_slots(). It is needed by the startup + * process to wake these up, so that they can stop synchronization on seeing + * stopSignaled on promotion. + * Setting stopSignaled is also used to handle the race condition when the * postmaster has not noticed the promotion yet and thus may end up restarting * the slot sync worker. If stopSignaled is set, the worker will exit in such a * case. The SQL function pg_sync_replication_slots() will also error out if @@ -596,11 +605,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * local ones, then update the LSNs and persist the local synced slot for * future synchronization; otherwise, do nothing. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. It is utilized by the SQL function pg_sync_replication_slots(). + * * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise * false. */ static bool -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; @@ -624,7 +637,13 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * current location when recreating the slot in the next cycle. It may * take more time to create such a slot. Therefore, we keep this slot * and attempt the synchronization in the next cycle. + * + * We also update the slot_persistence_pending parameter, so + * the SQL function can retry. */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -639,6 +658,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.", LSN_FORMAT_ARGS(slot->data.restart_lsn))); + /* Set this, so that SQL function can retry */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -662,10 +685,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * updated. The slot is then persisted and is considered as sync-ready for * periodic syncs. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. It is utilized by the SQL function pg_sync_replication_slots(). + * * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL); @@ -767,7 +794,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) if (slot->data.persistency == RS_TEMPORARY) { slot_updated = update_and_persist_local_synced_slot(remote_slot, - remote_dbid); + remote_dbid, + slot_persistence_pending); } /* Slot ready for sync, so sync it. */ @@ -864,7 +892,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + update_and_persist_local_synced_slot(remote_slot, remote_dbid, + slot_persistence_pending); slot_updated = true; } @@ -875,15 +904,23 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) } /* - * Synchronize slots. + * Fetch remote slots. + * + * If slot_names is NIL, fetches all failover logical slots from the + * primary server, otherwise fetches only the ones with names in slot_names. * - * Gets the failover logical slots info from the primary server and updates - * the slots locally. Creates the slots if not present on the standby. + * Parameters: + * wrconn - Connection to the primary server + * slot_names - List of slot names (char *) to fetch from primary, + * or NIL to fetch all failover logical slots. + * + * Returns: + * List of remote slot information structures. Returns NIL if no slot + * is found. * - * Returns TRUE if any of the slots gets updated in this sync-cycle. */ -static bool -synchronize_slots(WalReceiverConn *wrconn) +static List * +fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names) { #define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, @@ -892,29 +929,45 @@ synchronize_slots(WalReceiverConn *wrconn) WalRcvExecResult *res; TupleTableSlot *tupslot; List *remote_slot_list = NIL; - bool some_slot_updated = false; - bool started_tx = false; - const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," - " database, invalidation_reason" - " FROM pg_catalog.pg_replication_slots" - " WHERE failover and NOT temporary"; - - /* The syscache access in walrcv_exec() needs a transaction env. */ - if (!IsTransactionState()) + StringInfoData query; + + initStringInfo(&query); + appendStringInfoString(&query, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase," + " two_phase_at, failover," + " database, invalidation_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + if (slot_names != NIL) { - StartTransactionCommand(); - started_tx = true; + bool first_slot = true; + + /* + * Construct the query to fetch only the specified slots + */ + appendStringInfoString(&query, " AND slot_name IN ("); + + foreach_ptr(char, slot_name, slot_names) + { + if (!first_slot) + appendStringInfoString(&query, ", "); + + appendStringInfo(&query, "'%s'", slot_name); + first_slot = false; + } + appendStringInfoChar(&query, ')'); } /* Execute the query */ - res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); + res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(query.data); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, errmsg("could not fetch failover logical slots info from the primary server: %s", res->err)); - /* Construct the remote_slot tuple and synchronize each slot locally */ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) { @@ -965,7 +1018,6 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated = isnull ? RS_INVAL_NONE : GetSlotInvalidationCause(TextDatumGetCString(d)); - /* Sanity check */ Assert(col == SLOTSYNC_COLUMN_COUNT); /* @@ -985,12 +1037,38 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated == RS_INVAL_NONE) pfree(remote_slot); else - /* Create list of remote slots */ remote_slot_list = lappend(remote_slot_list, remote_slot); ExecClearTuple(tupslot); } + walrcv_clear_result(res); + + return remote_slot_list; +} + +/* + * Synchronize slots. + * + * Takes a list of remote slots and synchronizes them locally. Creates the + * slots if not present on the standby and updates existing ones. + * + * Parameters: + * wrconn - Connection to the primary server + * remote_slot_list - List of RemoteSlot structures to synchronize. + * slot_persistence_pending - boolean used by SQL function + * pg_sync_replication_slots to track if any slots + * could not be persisted and need to be retried. + * + * Returns: + * TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list, + bool *slot_persistence_pending) +{ + bool some_slot_updated = false; + /* Drop local slots that no longer need to be synced. */ drop_local_obsolete_slots(remote_slot_list); @@ -1006,19 +1084,12 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid, + slot_persistence_pending); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } - /* We are done, free remote_slot_list elements */ - list_free_deep(remote_slot_list); - - walrcv_clear_result(res); - - if (started_tx) - CommitTransactionCommand(); - return some_slot_updated; } @@ -1195,10 +1266,11 @@ ValidateSlotSyncParams(int elevel) } /* - * Re-read the config file. + * Re-read the config file for slot synchronization. + * + * Exit or throw errors if relevant GUCs have changed depending on whether + * called from slotsync worker or from SQL function pg_sync_replication_slots() * - * Exit if any of the slot sync GUCs have changed. The postmaster will - * restart it. */ static void slotsync_reread_config(void) @@ -1209,57 +1281,96 @@ slotsync_reread_config(void) bool old_hot_standby_feedback = hot_standby_feedback; bool conninfo_changed; bool primary_slotname_changed; + bool worker = AmLogicalSlotSyncWorkerProcess(); + bool parameter_changed = false; - Assert(sync_replication_slots); + if (worker) + Assert(sync_replication_slots); ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + pfree(old_primary_conninfo); pfree(old_primary_slotname); + /* Check for sync_replication_slots change */ if (old_sync_replication_slots != sync_replication_slots) { - ereport(LOG, - /* translator: %s is a GUC variable name */ - errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", "sync_replication_slots")); - proc_exit(0); + if (worker) + { + ereport(LOG, + /* translator: %s is a GUC variable name */ + errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", + "sync_replication_slots")); + + proc_exit(0); + } + + parameter_changed = true; } + /* Check for parameter changes common to both API and worker */ if (conninfo_changed || primary_slotname_changed || (old_hot_standby_feedback != hot_standby_feedback)) { - ereport(LOG, - errmsg("replication slot synchronization worker will restart because of a parameter change")); - /* - * Reset the last-start time for this worker so that the postmaster - * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. - */ - SlotSyncCtx->last_start_time = 0; + if (worker) + { + ereport(LOG, + errmsg("replication slot synchronization worker will restart because of a parameter change")); - proc_exit(0); + /* + * Reset the last-start time for this worker so that the postmaster + * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. + */ + SlotSyncCtx->last_start_time = 0; + + proc_exit(0); + } + + parameter_changed = true; + } + + /* + * If we have reached here with a parameter change, we must be running in SQL function, + * emit error in such a case. + */ + if (parameter_changed) + { + Assert(!worker); + ereport(ERROR, + errmsg("replication slot synchronization will stop because of a parameter change")); } } /* - * Interrupt handler for main loop of slot sync worker. + * Interrupt handler for main loop of slot sync worker and + * SQL function pg_sync_replication_slots(). */ static void ProcessSlotSyncInterrupts(void) { CHECK_FOR_INTERRUPTS(); - if (ShutdownRequestPending) + if (SlotSyncCtx->stopSignaled) { - ereport(LOG, - errmsg("replication slot synchronization worker is shutting down on receiving SIGINT")); + if (AmLogicalSlotSyncWorkerProcess()) + { + ereport(LOG, + errmsg("replication slot synchronization worker is shutting down as promotion is triggered")); - proc_exit(0); + proc_exit(0); + } + else + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot continue replication slots synchronization" + " as standby promotion is triggered")); } if (ConfigReloadPending) @@ -1366,13 +1477,10 @@ wait_for_slot_activity(bool some_slot_updated) * Otherwise, advertise that a sync is in progress. */ static void -check_and_set_sync_info(pid_t worker_pid) +check_and_set_sync_info(pid_t sync_process_pid) { SpinLockAcquire(&SlotSyncCtx->mutex); - /* The worker pid must not be already assigned in SlotSyncCtx */ - Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid); - /* * Emit an error if startup process signaled the slot sync machinery to * stop. See comments atop SlotSyncCtxStruct. @@ -1393,13 +1501,16 @@ check_and_set_sync_info(pid_t worker_pid) errmsg("cannot synchronize replication slots concurrently")); } + /* The pid must not be already assigned in SlotSyncCtx */ + Assert(SlotSyncCtx->pid == InvalidPid); + SlotSyncCtx->syncing = true; /* * Advertise the required PID so that the startup process can kill the - * slot sync worker on promotion. + * slot sync process on promotion. */ - SlotSyncCtx->pid = worker_pid; + SlotSyncCtx->pid = sync_process_pid; SpinLockRelease(&SlotSyncCtx->mutex); @@ -1414,6 +1525,7 @@ reset_syncing_flag() { SpinLockAcquire(&SlotSyncCtx->mutex); SlotSyncCtx->syncing = false; + SlotSyncCtx->pid = InvalidPid; SpinLockRelease(&SlotSyncCtx->mutex); syncing_slots = false; @@ -1424,6 +1536,9 @@ reset_syncing_flag() * * It connects to the primary server, fetches logical failover slots * information periodically in order to create and sync the slots. + * + * Note: If any changes are made here, check if the corresponding SQL + * function logic in SyncReplicationSlots also needs to be changed. */ void ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) @@ -1488,7 +1603,6 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); - pqsignal(SIGINT, SignalHandlerForShutdownRequest); pqsignal(SIGTERM, die); pqsignal(SIGFPE, FloatExceptionHandler); pqsignal(SIGUSR1, procsignal_sigusr1_handler); @@ -1585,17 +1699,34 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) for (;;) { bool some_slot_updated = false; + bool started_tx = false; + List *remote_slots; ProcessSlotSyncInterrupts(); - some_slot_updated = synchronize_slots(wrconn); + /* + * The syscache access in fetch_or_refresh_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_remote_slots(wrconn, NIL); + some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL); + list_free_deep(remote_slots); + + if (started_tx) + CommitTransactionCommand(); wait_for_slot_activity(some_slot_updated); } /* * The slot sync worker can't get here because it will only stop when it - * receives a SIGINT from the startup process, or when there is an error. + * receives a SIGUSR1 from the startup process, or when there is an error. */ Assert(false); } @@ -1650,16 +1781,18 @@ update_synced_slots_inactive_since(void) } /* - * Shut down the slot sync worker. + * Shut down slot synchronization. * - * This function sends signal to shutdown slot sync worker, if required. It - * also waits till the slot sync worker has exited or - * pg_sync_replication_slots() has finished. + * This function wakes up the slot sync process (either worker or backend + * running SQL function pg_sync_replication_slots()) and sets + * stopSignaled=true so that worker can exit or SQL function + * pg_sync_replication_slots() can finish. It also waits tll the slot sync + * worker has exited or pg_sync_replication_slots() has finished. */ void ShutDownSlotSync(void) { - pid_t worker_pid; + pid_t sync_process_pid; SpinLockAcquire(&SlotSyncCtx->mutex); @@ -1676,12 +1809,13 @@ ShutDownSlotSync(void) return; } - worker_pid = SlotSyncCtx->pid; + sync_process_pid = SlotSyncCtx->pid; SpinLockRelease(&SlotSyncCtx->mutex); - if (worker_pid != InvalidPid) - kill(worker_pid, SIGINT); + /* Wake up slot sync process */ + if (sync_process_pid != InvalidPid) + kill(sync_process_pid, SIGUSR1); /* Wait for slot sync to end */ for (;;) @@ -1821,20 +1955,94 @@ slotsync_failure_callback(int code, Datum arg) walrcv_disconnect(wrconn); } +/* + * Helper function to extract slot names from a list of remote slots + */ +static List * +extract_slot_names(List *remote_slots) +{ + List *slot_names = NIL; + + foreach_ptr(RemoteSlot, remote_slot, remote_slots) + { + char *slot_name; + + slot_name = pstrdup(remote_slot->name); + slot_names = lappend(slot_names, slot_name); + } + + return slot_names; +} + /* * Synchronize the failover enabled replication slots using the specified * primary server connection. + * + * Repeatedly fetches and updates replication slot information from the + * primary until all slots are at least "sync ready". + * Exits early if promotion is triggered or certain critical + * configuration parameters have changed. */ void SyncReplicationSlots(WalReceiverConn *wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { - check_and_set_sync_info(InvalidPid); + List *remote_slots = NIL; + List *slot_names = NIL; /* List of slot names to track */ + + check_and_set_sync_info(MyProcPid); validate_remote_info(wrconn); - synchronize_slots(wrconn); + /* Retry until all the slots are sync-ready */ + for (;;) + { + bool slot_persistence_pending = false; + bool some_slot_updated = false; + + /* Check for interrupts and config changes */ + ProcessSlotSyncInterrupts(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* + * Fetch remote slot info for the given slot_names. If slot_names is NIL, + * fetch all failover-enabled slots. Note that we reuse slot_names from + * the first iteration; re-fetching all failover slots each time could + * cause an endless loop. Instead of reprocessing only the pending slots + * in each iteration, it's better to process all the slots received in + * the first iteration. This ensures that by the time we're done, all + * slots reflect the latest values. + */ + remote_slots = fetch_remote_slots(wrconn, slot_names); + + /* Attempt to synchronize slots */ + some_slot_updated = synchronize_slots(wrconn, remote_slots, + &slot_persistence_pending); + + /* + * If slot_persistence_pending is true, extract slot names + * for future iterations (only needed if we haven't done it yet) + */ + if (slot_names == NIL && slot_persistence_pending) + slot_names = extract_slot_names(remote_slots); + + /* Free the current remote_slots list */ + list_free_deep(remote_slots); + + /* Done if all slots are persisted i.e are sync-ready */ + if (!slot_persistence_pending) + break; + + /* wait before retrying again */ + wait_for_slot_activity(some_slot_updated); + + } + + if (slot_names) + list_free_deep(slot_names); /* Cleanup the synced temporary slots */ ReplicationSlotCleanup(true); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index c1ac71ff7f2..92101e12cd6 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -62,7 +62,7 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." -REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot synchronization." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 25777fa188c..8f63bfbb977 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -1000,6 +1000,12 @@ $primary->psql( )); $subscriber2->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub2;'); +$subscriber1->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub1;'); + +# Remove the dropped sb1_slot from the synchronized_standby_slots list and reload the +# configuration. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; # Verify that all slots have been removed except the one necessary for standby2, # which is needed for further testing. @@ -1016,34 +1022,47 @@ $primary->safe_psql('postgres', "COMMIT PREPARED 'test_twophase_slotsync';"); $primary->wait_for_replay_catchup($standby2); ################################################## -# Verify that slotsync skip statistics are correctly updated when the +# Test that pg_sync_replication_slots() on the standby skips and retries +# until the slot becomes sync-ready (when the remote slot catches up with +# the locally reserved position). +# Also verify that slotsync skip statistics are correctly updated when the # slotsync operation is skipped. ################################################## -# Create a logical replication slot and create some DDL on the primary so -# that the slot lags behind the standby. -$primary->safe_psql( - 'postgres', qq( - SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true); - CREATE TABLE wal_push(a int); -)); +# Recreate the slot by creating a subscription on the subscriber, keep it disabled. +$subscriber1->safe_psql('postgres', qq[ + CREATE TABLE push_wal (a int); + TRUNCATE tab_int; + CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, enabled = false);]); + +# Create some DDL on the primary so that the slot lags behind the standby +$primary->safe_psql('postgres', "CREATE TABLE push_wal (a int);"); + +# Make sure the DDL changes are synced to the standby $primary->wait_for_replay_catchup($standby2); $log_offset = -s $standby2->logfile; -# Enable slot sync worker +# Enable standby for slot synchronization $standby2->append_conf( - 'postgresql.conf', qq( + 'postgresql.conf', qq( hot_standby_feedback = on primary_conninfo = '$connstr_1 dbname=postgres' log_min_messages = 'debug2' -sync_replication_slots = on )); $standby2->reload; -# Confirm that the slot sync worker is able to start. -$standby2->wait_for_log(qr/slot sync worker started/, $log_offset); +# Attempt to synchronize slots using API. The API will continue retrying +# synchronization until the remote slot catches up. +# The API will not return until this happens, to be able to make +# further calls, call the API in a background process. +my $h = $standby2->background_psql('postgres', on_error_stop => 0); + +$h->query_until(qr/start/, q( + \echo start + SELECT pg_sync_replication_slots(); + )); # Confirm that the slot sync is skipped due to the remote slot lagging behind $standby2->wait_for_log( @@ -1061,4 +1080,18 @@ $result = $standby2->safe_psql('postgres', ); is($result, 't', "check slot sync skip count increments"); +# Enable the Subscription, so that the remote slot catches up +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); +$subscriber1->wait_for_subscription_sync; + +# Create xl_running_xacts on the primary to speed up restart_lsn advancement. +$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + +# Confirm from the log that the slot is sync-ready now. +$standby2->wait_for_log( + qr/newly created replication slot \"lsub1_slot\" is sync-ready now/, + $log_offset); + +$h->quit; + done_testing(); -- 2.47.3