From 9c7f74df663c1d95e877dfd3e143e9619a724a9f Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Thu, 23 Nov 2023 12:49:05 +0530 Subject: [PATCH v39 3/3] Allow slot-sync worker to wait for the cascading standbys. The GUC standby_slot_names is needed to be set on first standby in order to allow it to wait for confirmation for cascading standbys before updating logical 'synced' slots in slot-sync worker. The intent is that the logical slots (synced ones) should not go ahead of cascading standbys. For the user created slots on first standby, we already have this wait logic in place in logical walsender and in pg_logical_slot_get_changes_guts(), but for synced slots (which can not be consumed yet), we need to make sure that they are not going ahead of cascading standbys and that is acheived by introducing the wait in slot-sync worker before we actually update the slots. --- src/backend/replication/logical/slotsync.c | 86 ++++++++++++++++++++-- src/backend/replication/walsender.c | 11 +-- src/include/replication/walsender.h | 5 ++ 3 files changed, 89 insertions(+), 13 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 9aa3562500..1aac8ce648 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -83,6 +83,9 @@ typedef struct RemoteSlot */ #define WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS 5 +static void ProcessSlotSyncInterrupts(WalReceiverConn **wrconn, + List **standby_slots); + /* * Wait for remote slot to pass locally reserved position. * @@ -452,6 +455,52 @@ construct_slot_query(StringInfo s) " WHERE failover and sync_state != 'i'"); } +/* + * Wait for cascading physical standbys corresponding to physical slots + * specified in standby_slot_names GUC to confirm receiving given lsn. + */ +static void +wait_for_standby_confirmation(XLogRecPtr wait_for_lsn, + WalReceiverConn *wrconn) +{ + List *standby_slots; + + /* Nothing to be done */ + if (strcmp(standby_slot_names, "") == 0) + return; + + standby_slots = GetStandbySlotList(true); + + for (;;) + { + int rc; + + WalSndFilterStandbySlots(wait_for_lsn, &standby_slots); + + /* Exit if done waiting for every slot. */ + if (standby_slots == NIL) + break; + + /* + * This will reload configuration and will refresh the standby_slots + * as well provided standby_slot_names GUC is changed by the user. + */ + ProcessSlotSyncInterrupts(&wrconn, &standby_slots); + + /* + * XXX: Is waiting for 5 second before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 5000L, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } +} + /* * Synchronize single slot to given position. * @@ -723,6 +772,7 @@ synchronize_slots(WalReceiverConn *wrconn) List *remote_slot_list = NIL; MemoryContext oldctx = CurrentMemoryContext; long naptime = WORKER_DEFAULT_NAPTIME_MS; + XLogRecPtr max_confirmed_lsn = 0; ListCell *cell; bool slot_updated = false; TimestampTz now; @@ -775,6 +825,9 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + if (remote_slot->confirmed_lsn > max_confirmed_lsn) + max_confirmed_lsn = remote_slot->confirmed_lsn; + /* * It is possible to get null values for LSN and Xmin if slot is * invalidated on the primary server, so handle accordingly. @@ -818,6 +871,17 @@ synchronize_slots(WalReceiverConn *wrconn) */ drop_obsolete_slots(remote_slot_list); + /* + * If there are cascading standbys, wait for their confirmation before we + * update synced logical slots locally. + * + * Instead of waiting on confirmation for lsn of each slot, let us wait + * once for confirmation on max_confirmed_lsn. If that is confirmed by + * each cascading standby, we are good to update all the slots. + */ + if (remote_slot_list) + wait_for_standby_confirmation(max_confirmed_lsn, wrconn); + /* Now sync the slots locally */ foreach(cell, remote_slot_list) { @@ -877,12 +941,18 @@ remote_connect(void) * If primary_conninfo has changed, reconnect to primary. */ static void -slotsync_reread_config(WalReceiverConn **wrconn) +slotsync_reread_config(WalReceiverConn **wrconn, List **standby_slots) { char *conninfo = pstrdup(PrimaryConnInfo); - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); + /* + * Reload configs and recreate the standby_slot_names_list if GUC + * standby_slot_names changed. + */ + if (standby_slots) + WalSndRereadConfigAndReInitSlotList(standby_slots); + else + ProcessConfigFile(PGC_SIGHUP); /* Exit if GUC primary_conninfo got changed, let the launcher relaunch it */ if (strcmp(conninfo, PrimaryConnInfo) != 0) @@ -905,7 +975,8 @@ slotsync_reread_config(WalReceiverConn **wrconn) * Interrupt handler for main loop of slot sync worker. */ static void -ProcessSlotSyncInterrupts(WalReceiverConn **wrconn) +ProcessSlotSyncInterrupts(WalReceiverConn **wrconn, + List **standby_slots) { CHECK_FOR_INTERRUPTS(); @@ -921,7 +992,10 @@ ProcessSlotSyncInterrupts(WalReceiverConn **wrconn) if (ConfigReloadPending) - slotsync_reread_config(wrconn); + { + ConfigReloadPending = false; + slotsync_reread_config(wrconn, standby_slots); + } } /* @@ -982,7 +1056,7 @@ ReplSlotSyncWorkerMain(Datum main_arg) int rc; long naptime; - ProcessSlotSyncInterrupts(&wrconn); + ProcessSlotSyncInterrupts(&wrconn, NULL /* standby_slots */ ); /* Check if got promoted */ if (!RecoveryInProgress()) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index edb3656cd2..db5af0b598 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1608,7 +1608,7 @@ PhysicalWakeupLogicalWalSnd(void) * Reload the config file and reinitialize the standby slot list if the GUC * standby_slot_names has changed. */ -static void +void WalSndRereadConfigAndReInitSlotList(List **standby_slots) { char *pre_standby_slot_names = pstrdup(standby_slot_names); @@ -1631,13 +1631,12 @@ WalSndRereadConfigAndReInitSlotList(List **standby_slots) * This function updates the passed standby_slots list, removing any slots that * have already caught up to or surpassed the given wait_for_lsn. */ -static void +void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) { ListCell *lc; - List *standby_slots_cpy = *standby_slots; - foreach(lc, standby_slots_cpy) + foreach(lc, *standby_slots) { char *name = lfirst(lc); XLogRecPtr restart_lsn = InvalidXLogRecPtr; @@ -1704,10 +1703,8 @@ WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) if (warningfmt) ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); - standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + *standby_slots = foreach_delete_current(*standby_slots, lc); } - - *standby_slots = standby_slots_cpy; } /* diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 1fcc22a127..c5c4714788 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include #include "access/xlogdefs.h" +#include "nodes/pg_list.h" /* * What to do with a snapshot in create replication slot command. @@ -50,7 +51,11 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); extern void PhysicalWakeupLogicalWalSnd(void); +extern void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, + List **standby_slots); extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); +extern List *WalSndGetStandbySlots(void); +extern void WalSndRereadConfigAndReInitSlotList(List **standby_slots); /* * Remember that we want to wakeup walsenders later -- 2.30.0.windows.2