From cee77da433a1f365c7bfcc54bb4e4881511d87c5 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 2 Nov 2023 15:51:52 +0800 Subject: [PATCH v29 3/3] Allow slot-sync workers to wait for the cascading standbys. The GUC standby_slot_names is needed to be set on first standby in order to allow it to wait for confirmation for cascading standbys before updating logical 'synced' slots in slot-sync workers. The intent is that the logical slots (synced ones) should not go ahead of cascading standbys. For the user created slots on first standby, we already have this wait logic in place in logical walsender and in pg_logical_slot_get_changes_guts(), but for synced slots (which can not be consumed yet), we need to make sure that they are not going ahead of cascading standbys and that is acheived by introducing the wait in slot-sync worker before we actually update the slots. --- src/backend/replication/logical/slotsync.c | 99 ++++++++++++++++++++-- src/backend/replication/walsender.c | 9 +- src/include/replication/walsender.h | 3 + 3 files changed, 99 insertions(+), 12 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index ced162012d..2188ccbd75 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -77,6 +77,9 @@ typedef struct RemoteSlot /* The variable to store primary_conninfo GUC before each ConfigReload */ static char *PrimaryConnInfoPreReload = NULL; +static char *StandbySlotNamesPreReload = NULL; + +static bool ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); /* * The variable to indicate the number of attempts for @@ -492,6 +495,54 @@ drop_obsolete_slots(Oid *dbids, List *remote_slot_list) } } +/* + * Wait for cascading physical standbys corresponding to physical slots + * specified in standby_slot_names GUC to confirm receiving given lsn. + */ +static void +slot_sync_wait_for_standby_confirmation(XLogRecPtr wait_for_lsn, + WalReceiverConn *wrconn) +{ + List *standby_slot_cpy; + + /* Nothing to be done */ + if (strcmp(standby_slot_names, "") == 0) + return; + + standby_slot_cpy = list_copy(standby_slot_names_list); + + for (;;) + { + bool config_reloaded = false; + + WalSndFilterStandbySlots(wait_for_lsn, &standby_slot_cpy); + + /* Exit if done waiting for every slot. */ + if (standby_slot_cpy == NIL) + break; + + /* Process Interrupts if any */ + config_reloaded = ProcessSlotSyncInterrupts(wrconn); + + /* + * Refresh the standby_slot_cpy if standby_slot_names_list got changed + * after ConfigReload + */ + if (config_reloaded && + strcmp(StandbySlotNamesPreReload, standby_slot_names) != 0) + standby_slot_cpy = list_copy(standby_slot_names_list); + + /* + * XXX: Is waiting for 5 second before retrying enough or more or + * less? + */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 5000L, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + } +} + /* * Construct Slot Query * @@ -698,6 +749,7 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) long naptime = WORKER_DEFAULT_NAPTIME_MS; Oid *dbids; int count = 0; + XLogRecPtr max_confirmed_lsn = 0; ListCell *cell; /* The primary_slot_name is not set yet or WALs not received yet */ @@ -807,6 +859,9 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) /* Create list of remote slots */ remote_slot_list = lappend(remote_slot_list, remote_slot); + if (remote_slot->confirmed_lsn > max_confirmed_lsn) + max_confirmed_lsn = remote_slot->confirmed_lsn; + /* * Update naptime as required depending on slot activity. Check only * for the first slot, if one slot has activity then all slots will. @@ -817,6 +872,17 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) ExecClearTuple(slot); } + /* + * If there are cascading standbys, wait for their confirmation before we + * update synced logical slots locally. + * + * Instead of waiting on confirmation for lsn of each slot, let us wait + * once for confirmation on max_confirmed_lsn. If that is confirmed by + * each cascading standby, we are good to update all the slots. + */ + if (list_length(remote_slot_list)) + slot_sync_wait_for_standby_confirmation(max_confirmed_lsn, wrconn); + /* Now sync the slots locally */ foreach(cell, remote_slot_list) { @@ -901,6 +967,25 @@ reconnect_if_needed(WalReceiverConn *wrconn_prev) return wrconn; } +/* + * Save the current configurations related to slot-sync + * + * This function is invoked prior to each config-reload on receiving SIGHUP. + */ +static void +save_current_configs() +{ + /* Free the previous allocations. */ + if (PrimaryConnInfoPreReload) + pfree(PrimaryConnInfoPreReload); + + if (StandbySlotNamesPreReload) + pfree(StandbySlotNamesPreReload); + + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + StandbySlotNamesPreReload = pstrdup(standby_slot_names); +} + /* * Interrupt handler for main loop of slot-sync worker. */ @@ -925,14 +1010,14 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) { ConfigReloadPending = false; - /* Free the previous allocation. */ - if (PrimaryConnInfoPreReload) - pfree(PrimaryConnInfoPreReload); - - /* Save the GUC primary_conninfo before reloading. */ - PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + save_current_configs(); ProcessConfigFile(PGC_SIGHUP); + + /* If standby_slot_names changed, recreate the standby_slot_names_list */ + if (strcmp(StandbySlotNamesPreReload, standby_slot_names) != 0) + SlotSyncInitConfig(); + reload_done = true; } @@ -1000,6 +1085,8 @@ ReplSlotSyncWorkerMain(Datum main_arg) /* Connect to the primary server */ wrconn = remote_connect(); + SlotSyncInitConfig(); + /* Main wait loop. */ for (;;) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 56fd974633..71c670ecb8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1753,13 +1753,12 @@ WalSndGetStandbySlots(List **standby_slots, bool force) * This function updates the passed standby_slots list, removing any slots that * have already caught up to or surpassed the given wait_for_lsn. */ -static void +void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) { ListCell *lc; - List *standby_slots_cpy = *standby_slots; - foreach(lc, standby_slots_cpy) + foreach(lc, *standby_slots) { char *name = lfirst(lc); XLogRecPtr restart_lsn = InvalidXLogRecPtr; @@ -1826,10 +1825,8 @@ WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) if (warningfmt) ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); - standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + *standby_slots = foreach_delete_current(*standby_slots, lc); } - - *standby_slots = standby_slots_cpy; } /* diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index ecbd3526c5..74a29d3700 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include #include "access/xlogdefs.h" +#include "nodes/pg_list.h" /* * What to do with a snapshot in create replication slot command. @@ -50,6 +51,8 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); extern void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, + List **standby_slots); extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); /* -- 2.30.0.windows.2