From a48e6e816b518f7e8a71446e695b71efcca09866 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 31 Oct 2023 09:41:30 +0530 Subject: [PATCH v27 3/3] Allow slot-sync workers to wait for the cascading standbys. The GUC standby_slot_names is needed to be set on first standby in order to allow it to wait for confirmation for cascading standbys before updating logical 'synced' slots in slot-sync workers. The intent is that the logical slots (synced ones) should not go ahead of cascading standbys. For the user created slots on first standby, we already have this wait logic in place in logical walsender and in pg_logical_slot_get_changes_guts(), but for synced slots (which can not be consumed yet), we need to make sure that they are not going ahead of cascading standbys and that is acheived by introducing the wait in slot-sync worker before we actually update the slots. --- src/backend/replication/logical/slotsync.c | 99 ++++++++++++++++++++-- src/backend/replication/walsender.c | 9 +- src/include/replication/walsender.h | 3 + 3 files changed, 99 insertions(+), 12 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index c0a9e8323f..deb4fea1f5 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -77,6 +77,9 @@ typedef struct RemoteSlot /* The variable to store primary_conninfo GUC before each ConfigReload */ static char *PrimaryConnInfoPreReload = NULL; +static char *StandbySlotNamesPreReload = NULL; + +static bool ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); /* * The variable to indicate the number of attempts for @@ -492,6 +495,54 @@ drop_obsolete_slots(Oid *dbids, List *remote_slot_list) } } +/* + * Wait for cascading physical standbys corresponding to physical slots + * specified in standby_slot_names GUC to confirm receiving given lsn. + */ +static void +slot_sync_wait_for_standby_confirmation(XLogRecPtr wait_for_lsn, + WalReceiverConn *wrconn) +{ + List *standby_slot_cpy; + + /* Nothing to be done */ + if (strcmp(standby_slot_names, "") == 0) + return; + + standby_slot_cpy = list_copy(standby_slot_names_list); + + for (;;) + { + bool config_reloaded = false; + + WalSndFilterStandbySlots(wait_for_lsn, &standby_slot_cpy); + + /* Exit if done waiting for every slot. */ + if (standby_slot_cpy == NIL) + break; + + /* Process Interrupts if any */ + config_reloaded = ProcessSlotSyncInterrupts(wrconn); + + /* + * Refresh the standby_slot_cpy if standby_slot_names_list got changed + * after ConfigReload + */ + if (config_reloaded && + strcmp(StandbySlotNamesPreReload, standby_slot_names) != 0) + standby_slot_cpy = list_copy(standby_slot_names_list); + + /* + * XXX: Is waiting for 5 second before retrying enough or more or + * less? + */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 5000L, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + } +} + /* * Construct Slot Query * @@ -698,6 +749,7 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) long naptime = WORKER_DEFAULT_NAPTIME_MS; Oid *dbids; int count = 0; + XLogRecPtr max_confirmed_lsn = 0; ListCell *cell; /* The primary_slot_name is not set yet or WALs not received yet */ @@ -807,6 +859,9 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) /* Create list of remote slots */ remote_slot_list = lappend(remote_slot_list, remote_slot); + if (remote_slot->confirmed_lsn > max_confirmed_lsn) + max_confirmed_lsn = remote_slot->confirmed_lsn; + /* * Update naptime as required depending on slot activity. Check only * for the first slot, if one slot has activity then all slots will. @@ -817,6 +872,17 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) ExecClearTuple(slot); } + /* + * If there are cascading standbys, wait for their confirmation before we + * update synced logical slots locally. + * + * Instead of waiting on confirmation for lsn of each slot, let us wait + * once for confirmation on max_confirmed_lsn. If that is confirmed by + * each cascading standby, we are good to update all the slots. + */ + if (list_length(remote_slot_list)) + slot_sync_wait_for_standby_confirmation(max_confirmed_lsn, wrconn); + /* Now sync the slots locally */ foreach(cell, remote_slot_list) { @@ -901,6 +967,25 @@ reconnect_if_needed(WalReceiverConn *wrconn_prev) return wrconn; } +/* + * Save the current configurations related to slot-sync + * + * This function is invoked prior to each config-reload on receiving SIGHUP. + */ +static void +save_current_configs() +{ + /* Free the previous allocations. */ + if (PrimaryConnInfoPreReload) + pfree(PrimaryConnInfoPreReload); + + if (StandbySlotNamesPreReload) + pfree(StandbySlotNamesPreReload); + + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + StandbySlotNamesPreReload = pstrdup(standby_slot_names); +} + /* * Interrupt handler for main loop of slot-sync worker. */ @@ -925,14 +1010,14 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) { ConfigReloadPending = false; - /* Free the previous allocation. */ - if (PrimaryConnInfoPreReload) - pfree(PrimaryConnInfoPreReload); - - /* Save the GUC primary_conninfo before reloading. */ - PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + save_current_configs(); ProcessConfigFile(PGC_SIGHUP); + + /* If standby_slot_names changed, recreate the standby_slot_names_list */ + if (strcmp(StandbySlotNamesPreReload, standby_slot_names) != 0) + SlotSyncInitConfig(); + reload_done = true; } @@ -1000,6 +1085,8 @@ ReplSlotSyncWorkerMain(Datum main_arg) /* Connect to primary node */ wrconn = remote_connect(); + SlotSyncInitConfig(); + /* Main wait loop. */ for (;;) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5420767f73..7aebfd681d 100755 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1719,13 +1719,12 @@ WalSndGetStandbySlots(List **standby_slots, bool force) * This function updates the passed standby_slots list, removing any slots that * have already caught up to or surpassed the given wait_for_lsn. */ -static void +void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) { ListCell *lc; - List *standby_slots_cpy = *standby_slots; - foreach(lc, standby_slots_cpy) + foreach(lc, *standby_slots) { char *name = lfirst(lc); XLogRecPtr restart_lsn = InvalidXLogRecPtr; @@ -1792,10 +1791,8 @@ WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) if (warningfmt) ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); - standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + *standby_slots = foreach_delete_current(*standby_slots, lc); } - - *standby_slots = standby_slots_cpy; } /* diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index ecbd3526c5..74a29d3700 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include #include "access/xlogdefs.h" +#include "nodes/pg_list.h" /* * What to do with a snapshot in create replication slot command. @@ -50,6 +51,8 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); extern void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, + List **standby_slots); extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); /* -- 2.34.1