From 587919a5b0ca1203f5d0a42b3bb69ac8fc189430 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 13 Nov 2023 21:44:47 +0800 Subject: [PATCH v33 3/3] Allow slot-sync workers to wait for the cascading standbys. The GUC standby_slot_names is needed to be set on first standby in order to allow it to wait for confirmation for cascading standbys before updating logical 'synced' slots in slot-sync workers. The intent is that the logical slots (synced ones) should not go ahead of cascading standbys. For the user created slots on first standby, we already have this wait logic in place in logical walsender and in pg_logical_slot_get_changes_guts(), but for synced slots (which can not be consumed yet), we need to make sure that they are not going ahead of cascading standbys and that is acheived by introducing the wait in slot-sync worker before we actually update the slots. --- src/backend/replication/logical/slotsync.c | 86 ++++++++++++++++++++-- src/backend/replication/walsender.c | 11 +-- src/include/replication/walsender.h | 5 ++ 3 files changed, 89 insertions(+), 13 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 1cee99a761..d76710727b 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -82,6 +82,9 @@ typedef struct RemoteSlot */ #define WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS 5 +static void ProcessSlotSyncInterrupts(WalReceiverConn **wrconn, + List **standby_slots); + /* * Wait for remote slot to pass locally reserved position. * @@ -544,6 +547,52 @@ drop_obsolete_slots(Oid *dbids, List *remote_slot_list) } } +/* + * Wait for cascading physical standbys corresponding to physical slots + * specified in standby_slot_names GUC to confirm receiving given lsn. + */ +static void +wait_for_standby_confirmation(XLogRecPtr wait_for_lsn, + WalReceiverConn *wrconn) +{ + List *standby_slots; + + /* Nothing to be done */ + if (strcmp(standby_slot_names, "") == 0) + return; + + standby_slots = GetStandbySlotList(true); + + for (;;) + { + int rc; + + WalSndFilterStandbySlots(wait_for_lsn, &standby_slots); + + /* Exit if done waiting for every slot. */ + if (standby_slots == NIL) + break; + + /* + * This will reload configuration and will refresh the standby_slots + * as well provided standby_slot_names GUC is changed by the user. + */ + ProcessSlotSyncInterrupts(&wrconn, &standby_slots); + + /* + * XXX: Is waiting for 5 second before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 5000L, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } +} + /* * Construct Slot Query * @@ -767,6 +816,7 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) MemoryContext oldctx = CurrentMemoryContext; long naptime = WORKER_DEFAULT_NAPTIME_MS; Oid *dbids; + XLogRecPtr max_confirmed_lsn = 0; ListCell *cell; bool slot_updated = false; TimestampTz now; @@ -841,6 +891,9 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + if (remote_slot->confirmed_lsn > max_confirmed_lsn) + max_confirmed_lsn = remote_slot->confirmed_lsn; + /* * It is possible to get null values for lsns and xmin if slot is * invalidated on the primary server, so handle accordingly. @@ -887,6 +940,17 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) */ drop_obsolete_slots(dbids, remote_slot_list); + /* + * If there are cascading standbys, wait for their confirmation before we + * update synced logical slots locally. + * + * Instead of waiting on confirmation for lsn of each slot, let us wait + * once for confirmation on max_confirmed_lsn. If that is confirmed by + * each cascading standby, we are good to update all the slots. + */ + if (remote_slot_list) + wait_for_standby_confirmation(max_confirmed_lsn, wrconn); + /* Now sync the slots locally */ foreach(cell, remote_slot_list) { @@ -945,12 +1009,18 @@ remote_connect() * If primary_conninfo has changed, reconnect to primary. */ static void -slotsync_reread_config(WalReceiverConn **wrconn) +slotsync_reread_config(WalReceiverConn **wrconn, List **standby_slots) { char *conninfo = pstrdup(PrimaryConnInfo); - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); + /* + * Reload configs and recreate the standby_slot_names_list if GUC + * standby_slot_names changed. + */ + if (standby_slots) + WalSndRereadConfigAndSlots(standby_slots); + else + ProcessConfigFile(PGC_SIGHUP); /* Reconnect if GUC primary_conninfo got changed */ if (strcmp(conninfo, PrimaryConnInfo) != 0) @@ -968,7 +1038,8 @@ slotsync_reread_config(WalReceiverConn **wrconn) * Interrupt handler for main loop of slot-sync worker. */ static void -ProcessSlotSyncInterrupts(WalReceiverConn **wrconn) +ProcessSlotSyncInterrupts(WalReceiverConn **wrconn, + List **standby_slots) { CHECK_FOR_INTERRUPTS(); @@ -984,7 +1055,10 @@ ProcessSlotSyncInterrupts(WalReceiverConn **wrconn) if (ConfigReloadPending) - slotsync_reread_config(wrconn); + { + ConfigReloadPending = false; + slotsync_reread_config(wrconn, standby_slots); + } } /* @@ -1054,7 +1128,7 @@ ReplSlotSyncWorkerMain(Datum main_arg) int rc; long naptime; - ProcessSlotSyncInterrupts(&wrconn); + ProcessSlotSyncInterrupts(&wrconn, NULL /* standby_slots */ ); /* Check if got promoted */ if (!RecoveryInProgress()) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a5f5fccde9..cdf0afae13 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1678,7 +1678,7 @@ WalSndWakeupNeeded() * Reload the config file and reinitialize the standby slot list if the GUC * standby_slot_names has changed. */ -static void +void WalSndRereadConfigAndSlots(List **standby_slots) { char *pre_standby_slot_names = pstrdup(standby_slot_names); @@ -1701,13 +1701,12 @@ WalSndRereadConfigAndSlots(List **standby_slots) * This function updates the passed standby_slots list, removing any slots that * have already caught up to or surpassed the given wait_for_lsn. */ -static void +void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) { ListCell *lc; - List *standby_slots_cpy = *standby_slots; - foreach(lc, standby_slots_cpy) + foreach(lc, *standby_slots) { char *name = lfirst(lc); XLogRecPtr restart_lsn = InvalidXLogRecPtr; @@ -1774,10 +1773,8 @@ WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) if (warningfmt) ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); - standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + *standby_slots = foreach_delete_current(*standby_slots, lc); } - - *standby_slots = standby_slots_cpy; } /* diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index ecbd3526c5..25c522993f 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include #include "access/xlogdefs.h" +#include "nodes/pg_list.h" /* * What to do with a snapshot in create replication slot command. @@ -50,7 +51,11 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); extern void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, + List **standby_slots); extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); +extern List *WalSndGetStandbySlots(void); +extern void WalSndRereadConfigAndSlots(List **standby_slots); /* * Remember that we want to wakeup walsenders later -- 2.30.0.windows.2