From 75b5623b725a5901ea0df459c6787786fbc79b51 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Mon, 9 Feb 2026 13:04:04 +0800 Subject: [PATCH v5 2/4] Refactoring: move similar checks to a central place The slot synchronization is skipped if the required WAL has not been received and flushed. Previously, this check was performed in two separate code paths. Such duplication can lead to coding errors if changes are made in one location without updating the other, as exemplified by the issue fixed in commit 3df4df5. This commit consolidates the check into a single location to eliminate redundancies and reduce the potential for future errors. --- src/backend/replication/logical/slotsync.c | 98 ++++++++-------------- 1 file changed, 33 insertions(+), 65 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 1ff81b58a6e..92166256e36 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -202,9 +202,33 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) bool updated_xmin_or_lsn = false; bool updated_config = false; SlotSyncSkipReason skip_reason = SS_SKIP_NONE; + XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL); Assert(slot->data.invalidated == RS_INVAL_NONE); + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + */ + if (remote_slot->confirmed_lsn > latestFlushPtr) + { + update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED); + + /* + * Can get here only if GUC 'synchronized_standby_slots' on the + * primary server was not configured correctly. + */ + ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping slot synchronization because the received slot sync" + " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + return false; + } + /* * Don't overwrite if we already have a newer catalog_xmin and * restart_lsn. @@ -621,15 +645,16 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, if (slot->slotsync_skip_reason != SS_SKIP_NONE) { /* - * We reach here when the remote slot didn't catch up to locally - * reserved position, or it cannot reach the consistent point from the - * restart_lsn. + * We reach this point when: 1) the remote slot didn't catch up to + * locally, 2) it cannot reach the consistent point from the + * restart_lsn, or 3) the WAL prior to the remote confirmed flush LSN + * has not been received and flushed. * - * We do not drop the slot because the restart_lsn can be ahead of the - * current location when recreating the slot in the next cycle. It may - * take more time to create such a slot or reach the consistent point. - * Therefore, we keep this slot and attempt the synchronization in the - * next cycle. + * We do not drop the slot because the restart_lsn and confirmed_lsn + * can be ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a slot or + * reach the consistent point. Therefore, we keep this slot and + * attempt the synchronization in the next cycle. * * We also update the slot_persistence_pending parameter, so the SQL * function can retry. @@ -670,7 +695,6 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *slot_persistence_pending) { ReplicationSlot *slot; - XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL); bool slot_updated = false; /* Search for the named slot */ @@ -737,34 +761,6 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, return slot_updated; } - /* - * Make sure that concerned WAL is received and flushed before syncing - * slot to target lsn received from the primary server. - * - * Report statistics only after the slot has been acquired, ensuring - * it cannot be dropped during the reporting process. - */ - if (remote_slot->confirmed_lsn > latestFlushPtr) - { - update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED); - - /* - * Can get here only if GUC 'synchronized_standby_slots' on the - * primary server was not configured correctly. - */ - ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("skipping slot synchronization because the received slot sync" - " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", - LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), - remote_slot->name, - LSN_FORMAT_ARGS(latestFlushPtr))); - - ReplicationSlotRelease(); - - return slot_updated; - } - /* Slot not ready yet, let's attempt to make it sync-ready now. */ if (slot->data.persistency == RS_TEMPORARY) { @@ -840,34 +836,6 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, LWLockRelease(ProcArrayLock); LWLockRelease(ReplicationSlotControlLock); - /* - * Make sure that concerned WAL is received and flushed before syncing - * slot to target lsn received from the primary server. - * - * Report statistics only after the slot has been acquired, ensuring - * it cannot be dropped during the reporting process. - */ - if (remote_slot->confirmed_lsn > latestFlushPtr) - { - update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED); - - /* - * Can get here only if GUC 'synchronized_standby_slots' on the - * primary server was not configured correctly. - */ - ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("skipping slot synchronization because the received slot sync" - " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", - LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), - remote_slot->name, - LSN_FORMAT_ARGS(latestFlushPtr))); - - ReplicationSlotRelease(); - - return false; - } - update_and_persist_local_synced_slot(remote_slot, remote_dbid, slot_persistence_pending); -- 2.51.1.windows.1