From b11c33159de217d21c188cfa18af0399e1277e0d Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Mon, 11 Aug 2025 03:44:55 -0400 Subject: [PATCH v5] Improve initial slot synchronization in pg_sync_replication_slots() During initial slot synchronization on a standby, the operation may fail if required catalog rows or WALs have been removed or are at risk of removal. The slotsync worker handles this by creating a temporary slot for initial sync and retain it even in case of failure. It will keep retrying until the slot on the primary has been advanced to a position where all the required data are also available on the standby. However, pg_sync_replication_slots() had no such protection mechanism. The SQL API would fail immediately if synchronization requirements weren't met. This could lead to permanent failure as the standby might continue removing the still-required data. To address this, we now make pg_sync_replication_slots() wait for the primary slot to advance to a suitable position before completing synchronization and before removing the temporary slot. Once the slot advances to a suitable position, we retry synchronization. Additionally, if a promotion occurs on the standby during this wait, the process exits gracefully and the temporary slot is removed. --- doc/src/sgml/func/func-admin.sgml | 4 +- doc/src/sgml/logicaldecoding.sgml | 40 +-- src/backend/replication/logical/slotsync.c | 437 ++++++++++++++++++++---- src/backend/utils/activity/wait_event_names.txt | 2 +- 4 files changed, 383 insertions(+), 100 deletions(-) diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 446fdfe..3608610 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1478,9 +1478,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionally, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 77c720c..6e4251a 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU Replication Slot Synchronization - The logical replication slots on the primary can be synchronized to - the hot standby by using the failover parameter of + The logical replication slots on the primary can be enabled for + synchronization to the hot standby by using the + failover parameter of pg_create_logical_replication_slot, or by using the failover option of - CREATE SUBSCRIPTION during slot creation. - Additionally, enabling - sync_replication_slots on the standby - is required. By enabling - sync_replication_slots - on the standby, the failover slots can be synchronized periodically in + CREATE SUBSCRIPTION during slot creation. After that, + synchronization can be be performed either manually by calling + + pg_sync_replication_slots + on the standby, or automatically by enabling + + sync_replication_slots on the standby. + When + sync_replication_slots is enabled + on the standby, the failover slots are periodically synchronized by the slotsync worker. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby (i.e., primary_slot_name @@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU receiving the WAL up to the latest flushed position on the primary server. - - - While enabling - sync_replication_slots allows for automatic - periodic synchronization of failover slots, they can also be manually - synchronized using the - pg_sync_replication_slots function on the standby. - However, this function is primarily intended for testing and debugging and - should be used with caution. Unlike automatic synchronization, it does not - include cyclic retries, making it more prone to synchronization failures, - particularly during initial sync scenarios where the required WAL files - or catalog rows for the slot might have already been removed or are at risk - of being removed on the standby. In contrast, automatic synchronization - via sync_replication_slots provides continuous slot - updates, enabling seamless failover and supporting high availability. - Therefore, it is the recommended method for synchronizing slots. - - - When slot synchronization is configured as recommended, and the initial synchronization is performed either automatically or diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 3773844..f9eec0b 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -113,6 +113,7 @@ bool sync_replication_slots = false; */ #define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */ +#define SLOTSYNC_API_NAPTIME_MS 2000 /* 2s */ static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS; @@ -146,6 +147,7 @@ typedef struct RemoteSlot ReplicationSlotInvalidationCause invalidated; } RemoteSlot; +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); static void slotsync_failure_callback(int code, Datum arg); static void update_synced_slots_inactive_since(void); @@ -166,7 +168,8 @@ static void update_synced_slots_inactive_since(void); static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, - bool *remote_slot_precedes) + bool *remote_slot_precedes, + int sync_iterations) { ReplicationSlot *slot = MyReplicationSlot; bool updated_xmin_or_lsn = false; @@ -209,15 +212,21 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, * to understand why the slot is not sync-ready. In the case of a * persistent slot, it would be a more common case and won't directly * impact the users, so we used DEBUG1 level to log the message. + * + * If called from pg_sync_replication_slots(), log message only for + * the first iteration. */ - ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1, - errmsg("could not synchronize replication slot \"%s\"", + if (AmLogicalSlotSyncWorkerProcess() || sync_iterations == 1) + { + ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1, + errmsg("Replication slot \"%s\" is not sync ready; will keep retrying", remote_slot->name), - errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.", - LSN_FORMAT_ARGS(remote_slot->restart_lsn), - remote_slot->catalog_xmin, - LSN_FORMAT_ARGS(slot->data.restart_lsn), - slot->data.catalog_xmin)); + errdetail("Attempting Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.", + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + slot->data.catalog_xmin)); + } if (remote_slot_precedes) *remote_slot_precedes = true; @@ -558,7 +567,9 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * false. */ static bool -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(WalReceiverConn * wrconn, + RemoteSlot * remote_slot, Oid remote_dbid, bool *sync_start_pending, + int sync_iterations) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; @@ -566,7 +577,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) (void) update_local_synced_slot(remote_slot, remote_dbid, &found_consistent_snapshot, - &remote_slot_precedes); + &remote_slot_precedes, + sync_iterations); /* * Check if the primary server has caught up. Refer to the comment atop @@ -575,13 +587,40 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) if (remote_slot_precedes) { /* - * The remote slot didn't catch up to locally reserved position. + * The remote slot didn't catch up to locally reserved + * position. * - * We do not drop the slot because the restart_lsn can be ahead of the - * current location when recreating the slot in the next cycle. It may - * take more time to create such a slot. Therefore, we keep this slot - * and attempt the synchronization in the next cycle. + * We do not drop the slot because the restart_lsn can be + * ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a + * slot. Therefore, we keep this slot and attempt the + * synchronization in the next cycle. + * + * If called from pg_sync_replication_slots(), set flag + * indicating that the slot is not yet sync ready, so that it + * can be retried. Log a message once every 5 iterations, + * which should be around 10 seconds. */ + if (!AmLogicalSlotSyncWorkerProcess()) + { + if (sync_start_pending) + *sync_start_pending = true; + + if (sync_iterations % 5 == 0) + { + /* Log a message every ten seconds */ + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X)" + " and catalog xmin %u) to pass local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + } + } + return false; } @@ -622,7 +661,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +synchronize_one_slot(WalReceiverConn * wrconn, RemoteSlot * remote_slot, + Oid remote_dbid, bool *sync_start_pending, int sync_iterations) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr; @@ -715,8 +755,11 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Slot not ready yet, let's attempt to make it sync-ready now. */ if (slot->data.persistency == RS_TEMPORARY) { - slot_updated = update_and_persist_local_synced_slot(remote_slot, - remote_dbid); + slot_updated = update_and_persist_local_synced_slot(wrconn, + remote_slot, + remote_dbid, + sync_start_pending, + sync_iterations); } /* Slot ready for sync, so sync it. */ @@ -738,7 +781,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) LSN_FORMAT_ARGS(remote_slot->confirmed_lsn))); slot_updated = update_local_synced_slot(remote_slot, remote_dbid, - NULL, NULL); + NULL, NULL, sync_iterations); } } /* Otherwise create the slot first. */ @@ -785,7 +828,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid, + sync_start_pending, + sync_iterations); slot_updated = true; } @@ -796,15 +841,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) } /* - * Synchronize slots. + * Fetch remote slots. * - * Gets the failover logical slots info from the primary server and updates - * the slots locally. Creates the slots if not present on the standby. + * Gets the failover logical slots info from the primary server and creates + * a list of remote slots that need to be synchronized locally. * - * Returns TRUE if any of the slots gets updated in this sync-cycle. + * NOTE: Caller must ensure a transaction is active before calling this function. + * + * Returns a list of RemoteSlot structures, or NIL if no slots need syncing. */ -static bool -synchronize_slots(WalReceiverConn *wrconn) +static List * +fetch_remote_slots(WalReceiverConn *wrconn) { #define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, @@ -813,21 +860,12 @@ synchronize_slots(WalReceiverConn *wrconn) WalRcvExecResult *res; TupleTableSlot *tupslot; List *remote_slot_list = NIL; - bool some_slot_updated = false; - bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," " database, invalidation_reason" " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; - /* The syscache access in walrcv_exec() needs a transaction env. */ - if (!IsTransactionState()) - { - StartTransactionCommand(); - started_tx = true; - } - /* Execute the query */ res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); if (res->status != WALRCV_OK_TUPLES) @@ -835,7 +873,7 @@ synchronize_slots(WalReceiverConn *wrconn) errmsg("could not fetch failover logical slots info from the primary server: %s", res->err)); - /* Construct the remote_slot tuple and synchronize each slot locally */ + /* Construct the remote_slot tuple and build list of slots to sync */ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) { @@ -912,6 +950,180 @@ synchronize_slots(WalReceiverConn *wrconn) ExecClearTuple(tupslot); } + walrcv_clear_result(res); + + return remote_slot_list; +} + +/* + * Update remote slots list with current values. + * + * Takes a list of RemoteSlot structures and queries the primary server to + * get updated values for those specific slots. This is useful for refreshing + * slot information without fetching all failover slots again. + * + * NOTE: Caller must ensure a transaction is active before calling this + * function. + * + * Parameters: wrconn - Connection to the primary server remote_slot_list - + * List of RemoteSlot structures to update + * + * Returns the updated list, or the original list if query fails. Slots that + * no longer exist on the primary will be removed from the list. + */ +static List * +refresh_remote_slots(WalReceiverConn * wrconn, List * remote_slot_list) +{ +#define UPDATE_SLOTSYNC_COLUMN_COUNT 10 + Oid slotRow[UPDATE_SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID}; + WalRcvExecResult *res; + TupleTableSlot *tupslot; + List *updated_slot_list = NIL; + StringInfoData query; + ListCell *lc; + bool first_slot = true; + + /* If the input list is empty, return it as-is */ + if (remote_slot_list == NIL) + return remote_slot_list; + + /* Build query with slot names from the input list */ + initStringInfo(&query); + appendStringInfoString(&query, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," + " database, invalidation_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary AND slot_name IN ("); + + /* Add slot names to the IN clause */ + foreach(lc, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + if (!first_slot) + appendStringInfoString(&query, ", "); + + appendStringInfo(&query, "'%s'", remote_slot->name); + first_slot = false; + } + appendStringInfoString(&query, ")"); + + /* Execute the query */ + res = walrcv_exec(wrconn, query.data, UPDATE_SLOTSYNC_COLUMN_COUNT, slotRow); + if (res->status != WALRCV_OK_TUPLES) + { + ereport(WARNING, + errmsg("could not fetch updated failover logical slots info" + " from the primary server: %s", + res->err)); + pfree(query.data); + return remote_slot_list; + } + + /* Process the updated slot information */ + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + Datum d; + int col = 0; + + remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + /* + * Handle possible null values for LSN and Xmin if slot is + * invalidated on the primary server. + */ + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr : + DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->catalog_xmin = isnull ? InvalidTransactionId : + DatumGetTransactionId(d); + + remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(tupslot, + ++col, &isnull)); + Assert(!isnull); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->invalidated = isnull ? RS_INVAL_NONE : + GetSlotInvalidationCause(TextDatumGetCString(d)); + + /* Sanity check */ + Assert(col == UPDATE_SLOTSYNC_COLUMN_COUNT); + + /* + * Apply the same ephemeral slot filtering as in + * fetch_remote_slots. Skip slots that are in RS_EPHEMERAL + * state (invalid LSNs/xmin but not explicitly invalidated). + */ + if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) || + XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || + !TransactionIdIsValid(remote_slot->catalog_xmin)) && + remote_slot->invalidated == RS_INVAL_NONE) + pfree(remote_slot); + else + /* Add to updated list */ + updated_slot_list = lappend(updated_slot_list, remote_slot); + + ExecClearTuple(tupslot); + } + + walrcv_clear_result(res); + pfree(query.data); + + /* + * Free the original list structures (but not the slot names, as + * they're reused) + */ + foreach(lc, remote_slot_list) + { + RemoteSlot *old_slot = (RemoteSlot *) lfirst(lc); + pfree(old_slot); + } + list_free(remote_slot_list); + + return updated_slot_list; +} + +/* + * Synchronize slots. + * + * Takes a list of remote slots and synchronizes them locally. Creates the + * slots if not present on the standby and updates existing ones. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list, + List **pending_sync_start_slots, int sync_iterations) +{ + bool some_slot_updated = false; + /* Drop local slots that no longer need to be synced. */ drop_local_obsolete_slots(remote_slot_list); @@ -919,6 +1131,7 @@ synchronize_slots(WalReceiverConn *wrconn) foreach_ptr(RemoteSlot, remote_slot, remote_slot_list) { Oid remote_dbid = get_database_oid(remote_slot->database, false); + bool sync_start_pending = false; /* * Use shared lock to prevent a conflict with @@ -927,19 +1140,16 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(wrconn, remote_slot, + remote_dbid, &sync_start_pending, sync_iterations); + + /* Only append to list if caller wants it and sync is pending */ + if (pending_sync_start_slots != NULL && sync_start_pending) + *pending_sync_start_slots = lappend(*pending_sync_start_slots, remote_slot); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } - /* We are done, free remote_slot_list elements */ - list_free_deep(remote_slot_list); - - walrcv_clear_result(res); - - if (started_tx) - CommitTransactionCommand(); - return some_slot_updated; } @@ -1131,7 +1341,7 @@ slotsync_reread_config(void) bool conninfo_changed; bool primary_slotname_changed; - Assert(sync_replication_slots); + Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots); ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); @@ -1252,31 +1462,38 @@ slotsync_worker_onexit(int code, Datum arg) * sync-cycles is reset to the minimum (200ms). */ static void -wait_for_slot_activity(bool some_slot_updated) +wait_for_slot_activity(bool some_slot_updated, bool called_from_api) { - int rc; + int rc; + int wait_time; - if (!some_slot_updated) - { - /* - * No slots were updated, so double the sleep time, but not beyond the - * maximum allowable value. - */ - sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS); - } - else - { + if (called_from_api) { /* - * Some slots were updated since the last sleep, so reset the sleep - * time. + * When called from pg_sync_replication_slots, use a fixed 2 + * second wait time. */ - sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS; + wait_time = SLOTSYNC_API_NAPTIME_MS; + } else { + if (!some_slot_updated) { + /* + * No slots were updated, so double the sleep time, + * but not beyond the maximum allowable value. + */ + sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS); + } else { + /* + * Some slots were updated since the last sleep, so + * reset the sleep time. + */ + sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS; + } + wait_time = sleep_ms; } rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - sleep_ms, - WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN); + wait_time, + WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP); if (rc & WL_LATCH_SET) ResetLatch(MyLatch); @@ -1505,12 +1722,28 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) for (;;) { bool some_slot_updated = false; + List *remote_slots; + bool started_tx = false; ProcessSlotSyncInterrupts(wrconn); - some_slot_updated = synchronize_slots(wrconn); + /* + * The syscache access in fetch_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_remote_slots(wrconn); + some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL, 0); + list_free_deep(remote_slots); + + if (started_tx) + CommitTransactionCommand(); - wait_for_slot_activity(some_slot_updated); + wait_for_slot_activity(some_slot_updated, false); } /* @@ -1736,19 +1969,85 @@ slotsync_failure_callback(int code, Datum arg) } /* - * Synchronize the failover enabled replication slots using the specified - * primary server connection. + * Synchronize failover enabled replication slots using the specified primary + * server connection. + * + * Repeatedly fetches and updates replication slot information from the + * primary until all slots are at least "sync ready". Retry is done after 2 + * sec wait. Exits early is promotion is triggered. */ void -SyncReplicationSlots(WalReceiverConn *wrconn) +SyncReplicationSlots(WalReceiverConn * wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { + List *remote_slots; + bool started_tx = false; + int sync_iterations = 0; + check_and_set_sync_info(InvalidPid); validate_remote_info(wrconn); - synchronize_slots(wrconn); + /* + * The syscache access in fetch_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_remote_slots(wrconn); + + /* Retry until all slots are sync ready atleast */ + for (;;) + { + bool some_slot_updated = false; + List *pending_sync_start_slots = NIL; + + sync_iterations++; + + /* Refresh remote slot data */ + remote_slots = refresh_remote_slots(wrconn, remote_slots); + + /* Attempt to synchronize slots */ + some_slot_updated = synchronize_slots(wrconn, remote_slots, + &pending_sync_start_slots, sync_iterations); + + /* Done if all slots are atleast sync ready */ + if (pending_sync_start_slots == NIL) + break; + else + { + list_free(pending_sync_start_slots); + pending_sync_start_slots = NIL; + + /* wait for 2 seconds before retrying */ + wait_for_slot_activity(some_slot_updated, true); + + /* + * If we've been promoted, then no point + * continuing. + */ + if (SlotSyncCtx->stopSignaled) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization as" + " promotion is triggered"))); + break; + } + + /* Handle any termination request if any */ + ProcessSlotSyncInterrupts(wrconn); + } + } + + list_free_deep(remote_slots); + + if (started_tx) + CommitTransactionCommand(); /* Cleanup the synced temporary slots */ ReplicationSlotCleanup(true); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 0be307d..3497f0f 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." -REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." +REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." -- 1.8.3.1