From 665f7b623f46247659cf42c8239a7109ae2db819 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 9 Sep 2025 17:10:22 +1000 Subject: [PATCH v1] Reset synced slots when a standby is promoted. On promotion, reset any slots which have the 'synced' flag set so that the primary starts from a clean state. This ensures consistent behavior across all switchovers. --- src/backend/access/transam/xlog.c | 20 +++++-- src/backend/replication/slot.c | 52 +++++++++++++++++++ src/include/replication/slot.h | 1 + .../t/040_standby_failover_slots_sync.pl | 6 +-- 4 files changed, 71 insertions(+), 8 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7ffb2179151..958e2a4271f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5622,9 +5622,14 @@ StartupXLOG(void) /* * Initialize replication slots, before there's a chance to remove - * required resources. + * required resources. Clear any leftover 'synced' flags on replication + * slots when in crash recovery on the primary. The DB_IN_CRASH_RECOVERY + * state check ensures that this code is only reached when a standby + * server crashes during promotion. */ StartupReplicationSlots(); + if (ControlFile->state == DB_IN_CRASH_RECOVERY) + ResetSyncedSlots(); /* * Startup logical state, needs to be setup now so we have proper data @@ -6224,13 +6229,18 @@ StartupXLOG(void) WalSndWakeup(true, true); /* - * If this was a promotion, request an (online) checkpoint now. This isn't - * required for consistency, but the last restartpoint might be far back, - * and in case of a crash, recovering from it might take a longer than is - * appropriate now that we're not in standby mode anymore. + * If this was a promotion, first reset any slots that had been marked as + * synced during standby mode. Then request an (online) checkpoint. + * The checkpoint isn't required for consistency, but the last + * restartpoint might be far back, and in case of a crash, recovery + * could take longer than desirable now that we're not in standby + * mode anymore. */ if (promoted) + { + ResetSyncedSlots(); RequestCheckpoint(CHECKPOINT_FORCE); + } } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fd0fdb96d42..01a6e0de133 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -852,6 +852,57 @@ restart: LWLockRelease(ReplicationSlotControlLock); } +/* + * ResetSyncedSlots() + * + * Reset all replication slots that have synced=true to synced=false. + */ +void +ResetSyncedSlots(void) +{ + int i; + + /* + * Iterate through all replication slot entries and reset synced ones + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Skip inactive/unused slots */ + if (!s->in_use) + continue; + + /* we're only interested in logical slots */ + if (!SlotIsLogical(s)) + continue; + + /* Check if this slot was marked as synced */ + if (s->data.synced) + { + /* Acquire the slot */ + ReplicationSlotAcquire(NameStr(s->data.name), false, true); + + /* Reset the synced flag under spinlock protection */ + SpinLockAcquire(&s->mutex); + s->data.synced = false; + SpinLockRelease(&s->mutex); + + /* Mark dirty and save outside the spinlock */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + + ereport(LOG, + (errmsg("reset synced flag for replication slot \"%s\"", + NameStr(s->data.name)))); + + /* Release the slot */ + ReplicationSlotRelease(); + } + } + +} + /* * Permanently drop replication slot identified by the passed in name. */ @@ -2212,6 +2263,7 @@ StartupReplicationSlots(void) /* we crashed while a slot was being setup or deleted, clean up */ if (pg_str_endswith(replication_de->d_name, ".tmp")) { + elog(LOG, "there was a leftover tmp file for slots"); if (!rmtree(path, true)) { ereport(WARNING, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index fe62162cde3..7902d51781d 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -336,6 +336,7 @@ extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); +extern void ResetSyncedSlots(void); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(bool is_shutdown); diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 2c61c51e914..0f225aa09c1 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -932,13 +932,13 @@ my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';"); -# Confirm the synced slot 'lsub1_slot' is retained on the new primary +# Confirm the synced slot 'lsub1_slot' is reset on the new primary is( $standby1->safe_psql( 'postgres', q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'snap_test_slot') AND synced AND NOT temporary;} ), - 't', - 'synced slot retained on the new primary'); + 'f', + 'synced slot reset on the new primary'); # Commit the prepared transaction $standby1->safe_psql('postgres', "COMMIT PREPARED 'test_twophase_slotsync';"); -- 2.47.3