From 214c2325369a5ee585ed4af14e635d008ce93d47 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Fri, 19 Jan 2024 11:02:42 +0530 Subject: [PATCH v64_2 4/6] Allow logical walsenders to wait for the physical standbys This patch introduces a mechanism to ensure that physical standby servers, which are potential failover candidates, have received and flushed changes before making them visible to subscribers. By doing so, it guarantees that the promoted standby server is not lagging behind the subscribers when a failover is necessary. A new parameter named standby_slot_names is introduced. The logical walsender now guarantees that all local changes are sent and flushed to the standby servers corresponding to the replication slots specified in standby_slot_names before sending those changes to the subscriber. Additionally, The SQL functions pg_logical_slot_get_changes and pg_replication_slot_advance are modified to wait for the replication slots mentioned in standby_slot_names to catch up before returning the changes to the user. --- doc/src/sgml/config.sgml | 24 ++ doc/src/sgml/logicaldecoding.sgml | 9 +- .../replication/logical/logicalfuncs.c | 13 + src/backend/replication/slot.c | 342 +++++++++++++++++- src/backend/replication/slotfuncs.c | 9 + src/backend/replication/walsender.c | 111 +++++- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/replication/slot.h | 7 + src/include/replication/walsender.h | 1 + src/include/replication/walsender_private.h | 7 + src/include/utils/guc_hooks.h | 3 + src/test/recovery/meson.build | 1 + src/test/recovery/t/006_logical_decoding.pl | 3 +- .../t/050_standby_failover_slots_sync.pl | 232 ++++++++++-- 16 files changed, 738 insertions(+), 41 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index bd2d2f871e..76345e433c 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4420,6 +4420,30 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical slots guarantees that logical replication slots with + failover enabled do not consume changes until those changes are received + and flushed to corresponding physical standbys. If a logical replication + connection is meant to switch to a physical standby after the standby is + promoted, the physical replication slot for the standby should be listed + here. + + + The standbys corresponding to the physical replication slots in + standby_slot_names must configure + enable_syncslot = true so they can receive + failover logical slots changes from the primary. + + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index ec14cf7325..965ee716e2 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -372,7 +372,14 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU to work, it is mandatory to have a physical replication slot between the primary and the standby, and hot_standby_feedback - must be enabled on the standby. + must be enabled on the standby. It's also highly recommended that the said + physical replication slot is named in + standby_slot_names + list on the primary, to prevent the subscriber from consuming changes + faster than the hot standby. But once we configure it, then certain latency + is expected in sending changes to logical subscribers due to wait on + physical replication slots in + standby_slot_names diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index b0081d3ce5..5ff761dd65 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -30,6 +30,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "utils/array.h" #include "utils/builtins.h" @@ -109,6 +110,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wait_for_wal_lsn; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -228,6 +230,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + if (XLogRecPtrIsInvalid(upto_lsn)) + wait_for_wal_lsn = end_of_wal; + else + wait_for_wal_lsn = Min(upto_lsn, end_of_wal); + + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to wait_for_wal_lsn. + */ + WaitForStandbyConfirmation(wait_for_wal_lsn); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 33f957b02f..d0509fb5a5 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,13 +46,18 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/interrupt.h" #include "replication/slot.h" #include "replication/walsender.h" +#include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/memutils.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -99,10 +104,19 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +/* + * This GUC lists streaming replication standby server slot names that + * logical WAL sender processes will wait for. + */ +char *standby_slot_names; + +/* This is parsed and cached list for raw standby_slot_names. */ +static List *standby_slot_names_list = NIL; + static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -2210,3 +2224,329 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * A helper function to validate slots specified in GUC standby_slot_names. + */ +static bool +validate_standby_slots(char **newval) +{ + char *rawname; + List *elemlist; + ListCell *lc; + bool ok; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Verify syntax and parse string into a list of identifiers */ + ok = SplitIdentifierString(rawname, ',', &elemlist); + + if (!ok) + GUC_check_errdetail("List syntax is invalid."); + + /* + * If there is a syntax error in the name or if the replication slots' + * data is not initialized yet (i.e., we are in the startup process), skip + * the slot verification. + */ + if (!ok || !ReplicationSlotCtl) + { + pfree(rawname); + list_free(elemlist); + return ok; + } + + foreach(lc, elemlist) + { + char *name = lfirst(lc); + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", + name); + ok = false; + break; + } + + if (!SlotIsPhysical(slot)) + { + GUC_check_errdetail("\"%s\" is not a physical replication slot", + name); + ok = false; + break; + } + } + + pfree(rawname); + list_free(elemlist); + return ok; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + if (strcmp(*newval, "") == 0) + return true; + + /* + * "*" is not accepted as in that case primary will not be able to know + * for which all standbys to wait for. Even if we have physical-slots + * info, there is no way to confirm whether there is any standby + * configured for the known physical slots. + */ + if (strcmp(*newval, "*") == 0) + { + GUC_check_errdetail("\"%s\" is not accepted for standby_slot_names", + *newval); + return false; + } + + /* Now verify if the specified slots really exist and have correct type */ + if (!validate_standby_slots(newval)) + return false; + + *extra = guc_strdup(ERROR, *newval); + + return true; +} + +/* + * GUC assign_hook for standby_slot_names + */ +void +assign_standby_slot_names(const char *newval, void *extra) +{ + List *standby_slots; + MemoryContext oldcxt; + char *standby_slot_names_cpy = extra; + + list_free(standby_slot_names_list); + standby_slot_names_list = NIL; + + /* No value is specified for standby_slot_names. */ + if (standby_slot_names_cpy == NULL) + return; + + if (!SplitIdentifierString(standby_slot_names_cpy, ',', &standby_slots)) + { + /* This should not happen if GUC checked check_standby_slot_names. */ + elog(ERROR, "invalid list syntax"); + } + + /* + * Switch to the same memory context under which GUC variables are + * allocated (GUCMemoryContext). + */ + oldcxt = MemoryContextSwitchTo(GetMemoryChunkContext(standby_slot_names_cpy)); + standby_slot_names_list = list_copy(standby_slots); + MemoryContextSwitchTo(oldcxt); +} + +/* + * Return a copy of standby_slot_names_list if the copy flag is set to true, + * otherwise return the original list. + */ +List * +GetStandbySlotList(bool copy) +{ + /* + * Since we do not support syncing slots to cascading standbys, we return + * NIL here if we are running in a standby to indicate that no standby + * slots need to be waited for. + */ + if (RecoveryInProgress()) + return NIL; + + if (copy) + return list_copy(standby_slot_names_list); + else + return standby_slot_names_list; +} + +/* + * Reload the config file and reinitialize the standby slot list if the GUC + * standby_slot_names has changed. + */ +void +RereadConfigAndReInitSlotList(List **standby_slots) +{ + char *pre_standby_slot_names; + + /* + * If we are running on a standby, there is no need to reload + * standby_slot_names since we do not support syncing slots to cascading + * standbys. + */ + if (RecoveryInProgress()) + { + ProcessConfigFile(PGC_SIGHUP); + return; + } + + pre_standby_slot_names = pstrdup(standby_slot_names); + + ProcessConfigFile(PGC_SIGHUP); + + if (strcmp(pre_standby_slot_names, standby_slot_names) != 0) + { + list_free(*standby_slots); + *standby_slots = GetStandbySlotList(true); + } + + pfree(pre_standby_slot_names); +} + +/* + * Filter the standby slots based on the specified log sequence number + * (wait_for_lsn). + * + * This function updates the passed standby_slots list, removing any slots that + * have already caught up to or surpassed the given wait_for_lsn. Additionally, + * it removes slots that have been invalidated, dropped, or converted to + * logical slots. + */ +void +FilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) +{ + ListCell *lc; + List *standby_slots_cpy = *standby_slots; + + foreach(lc, standby_slots_cpy) + { + char *name = lfirst(lc); + char *warningfmt = NULL; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + { + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + warningfmt = _("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring"); + } + else if (SlotIsLogical(slot)) + { + /* + * If a logical slot name is provided in standby_slot_names, issue + * a WARNING and skip it. Although logical slots are disallowed in + * the GUC check_hook(validate_standby_slots), it is still + * possible for a user to drop an existing physical slot and + * recreate a logical slot with the same name. Since it is + * harmless, a WARNING should be enough, no need to error-out. + */ + warningfmt = _("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring"); + } + else + { + SpinLockAcquire(&slot->mutex); + + if (slot->data.invalidated != RS_INVAL_NONE) + { + /* + * Specified physical slot have been invalidated, so no point + * in waiting for it. + */ + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring"); + } + else if (XLogRecPtrIsInvalid(slot->data.restart_lsn) || + slot->data.restart_lsn < wait_for_lsn) + { + bool inactive = (slot->active_pid == 0); + + SpinLockRelease(&slot->mutex); + + /* Log warning if no active_pid for this physical slot */ + if (inactive) + ereport(WARNING, + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the " + "standby associated with \"%s\".", name), + errhint("Consider starting standby associated with " + "\"%s\" or amend standby_slot_names.", name)); + + /* Continue if the current slot hasn't caught up. */ + continue; + } + else + { + Assert(slot->data.restart_lsn >= wait_for_lsn); + } + + SpinLockRelease(&slot->mutex); + } + + /* + * Reaching here indicates that either the slot has passed the + * wait_for_lsn or there is an issue with the slot that requires a + * warning to be reported. + */ + if (warningfmt) + ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); + + standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + } + + *standby_slots = standby_slots_cpy; +} + +/* + * Wait for physical standby to confirm receiving the given lsn. + * + * Used by logical decoding SQL functions that acquired slot with failover + * enabled. It waits for physical standbys corresponding to the physical slots + * specified in the standby_slot_names GUC. + */ +void +WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + List *standby_slots; + + if (!MyReplicationSlot->data.failover) + return; + + standby_slots = GetStandbySlotList(true); + + if (standby_slots == NIL) + return; + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + RereadConfigAndReInitSlotList(&standby_slots); + } + + FilterStandbySlots(wait_for_lsn, &standby_slots); + + /* Exit if done waiting for every slot. */ + if (standby_slots == NIL) + break; + + /* + * We wait for the slots in the standby_slot_names to catch up, but we + * use a timeout so we can also check the if the standby_slot_names has + * been changed. + */ + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000, + WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); + list_free(standby_slots); +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 843ae8cd68..0a09b6c508 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -21,6 +21,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -474,6 +475,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * crash, but this makes the data consistent after a clean shutdown. */ ReplicationSlotMarkDirty(); + + PhysicalWakeupLogicalWalSnd(); } return retlsn; @@ -514,6 +517,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) .segment_close = wal_segment_close), NULL, NULL, NULL); + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to moveto lsn. + */ + WaitForStandbyConfirmation(moveto); + /* * Start reading at the slot's restart_lsn, which we know to point to * a valid record. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c81a7a8344..acf562fe93 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1219,7 +1219,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, &failover); - if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, @@ -1728,27 +1727,78 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ProcessPendingWrites(); } +/* + * Wake up the logical walsender processes with failover-enabled slots if the + * currently acquired physical slot is specified in standby_slot_names + * GUC. + */ +void +PhysicalWakeupLogicalWalSnd(void) +{ + ListCell *lc; + List *standby_slots; + + Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot)); + + standby_slots = GetStandbySlotList(false); + + foreach(lc, standby_slots) + { + char *name = lfirst(lc); + + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); + return; + } + } +} + /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * - * Returns end LSN of flushed WAL. Normally this will be >= loc, but - * if we detect a shutdown request (either from postmaster or client) - * we will return early, so caller must always check. + * If the walsender holds a logical slot that has enabled failover, we also + * wait for all the specified streaming replication standby servers to + * confirm receipt of WAL up to RecentFlushPtr. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we + * detect a shutdown request (either from postmaster or client) we will return + * early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + bool wait_for_standby = false; + uint32 wait_event; + List *standby_slots = NIL; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + if (MyReplicationSlot->data.failover) + standby_slots = GetStandbySlotList(true); + /* - * Fast path to avoid acquiring the spinlock in case we already know we - * have enough WAL available. This is particularly interesting if we're - * far behind. + * Check if all the standby servers have confirmed receipt of WAL up to + * RecentFlushPtr even when we already know we have enough WAL available. + * + * Note that we cannot directly return without checking the status of + * standby servers because the standby_slot_names may have changed, which + * means there could be new standby slots in the list that have not yet + * caught up to the RecentFlushPtr. */ - if (RecentFlushPtr != InvalidXLogRecPtr && - loc <= RecentFlushPtr) - return RecentFlushPtr; + if (!XLogRecPtrIsInvalid(RecentFlushPtr) && loc <= RecentFlushPtr) + { + FilterStandbySlots(RecentFlushPtr, &standby_slots); + + /* + * Fast path to avoid acquiring the spinlock in case we already know + * we have enough WAL available and all the standby servers have + * confirmed receipt of WAL up to RecentFlushPtr. This is particularly + * interesting if we're far behind. + */ + if (standby_slots == NIL) + return RecentFlushPtr; + } /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) @@ -1769,7 +1819,7 @@ WalSndWaitForWal(XLogRecPtr loc) if (ConfigReloadPending) { ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); + RereadConfigAndReInitSlotList(&standby_slots); SyncRepInitConfig(); } @@ -1784,8 +1834,18 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) XLogBackgroundFlush(); + /* + * Update the standby slots that have not yet caught up to the flushed + * position. It is good to wait up to RecentFlushPtr and then let it + * send the changes to logical subscribers one by one which are + * already covered in RecentFlushPtr without needing to wait on every + * change for standby confirmation. + */ + if (wait_for_standby) + FilterStandbySlots(RecentFlushPtr, &standby_slots); + /* Update our idea of the currently flushed position. */ - if (!RecoveryInProgress()) + else if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); @@ -1813,9 +1873,18 @@ WalSndWaitForWal(XLogRecPtr loc) !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); - /* check whether we're done */ - if (loc <= RecentFlushPtr) + if (loc > RecentFlushPtr) + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL; + else if (standby_slots) + { + wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION; + wait_for_standby = true; + } + else + { + /* Already caught up and doesn't need to wait for standby_slots. */ break; + } /* Waiting for new WAL. Since we need to wait, we're now caught up. */ WalSndCaughtUp = true; @@ -1855,9 +1924,11 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); + WalSndWait(wakeEvents, sleeptime, wait_event); } + list_free(standby_slots); + /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; @@ -2265,6 +2336,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + PhysicalWakeupLogicalWalSnd(); } /* @@ -3527,6 +3599,7 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } @@ -3596,8 +3669,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. + * + * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV + * until awakened by physical walsenders after the walreceiver confirms the + * receipt of the LSN. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 3069d8790e..dd27d58a66 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -78,6 +78,7 @@ GSS_OPEN_SERVER "Waiting to read data from the client while establishing a GSSAP LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to remote server." LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." +WAIT_FOR_STANDBY_CONFIRMATION "Waiting for the WAL to be received by physical standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 3af11b2b80..a82618c3d4 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4628,6 +4628,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("Lists streaming replication standby server slot " + "names that logical WAL sender processes will wait for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "WAL sender processes only after specified " + "replication slots confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, assign_standby_slot_names, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 3868694d3f..db4afaf356 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -334,6 +334,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsender processes will wait for # - Standby Servers - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index f81bef9e42..eef9f25c45 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -229,6 +229,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *standby_slot_names; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -275,4 +276,10 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern List *GetStandbySlotList(bool copy); +extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); +extern void FilterStandbySlots(XLogRecPtr wait_for_lsn, + List **standby_slots); +extern void RereadConfigAndReInitSlotList(List **standby_slots); + #endif /* SLOT_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 1b58d50b3b..9a42b01f9a 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -45,6 +45,7 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern void PhysicalWakeupLogicalWalSnd(void); /* * Remember that we want to wakeup walsenders later diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 3113e9ea47..0f962b0c72 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -113,6 +113,13 @@ typedef struct ConditionVariable wal_flush_cv; ConditionVariable wal_replay_cv; + /* + * Used by physical walsenders holding slots specified in + * standby_slot_names to wake up logical walsenders holding + * failover-enabled slots when a walreceiver confirms the receipt of LSN. + */ + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 5300c44f3b..464996b4f0 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -162,5 +162,8 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); +extern void assign_standby_slot_names(const char *newval, void *extra); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 88fb0306f5..4152c07318 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -45,6 +45,7 @@ tests += { 't/037_invalid_database.pl', 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', + 't/050_standby_failover_slots_sync.pl', ], }, } diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 5c7b4ca5e3..85f019774c 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); # Test logical slot advancing and its durability. +# Pass failover=true (last-arg), it should not have any impact on advancing. my $logical_slot = 'logical_slot'; $node_primary->safe_psql('postgres', - "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);" + "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);" ); $node_primary->psql( 'postgres', " diff --git a/src/test/recovery/t/050_standby_failover_slots_sync.pl b/src/test/recovery/t/050_standby_failover_slots_sync.pl index c17abfb40b..d50bbb3ae7 100644 --- a/src/test/recovery/t/050_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/050_standby_failover_slots_sync.pl @@ -87,17 +87,30 @@ is( $publisher->safe_psql( $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); ################################################## -# Test logical failover slots on the standby -# Configure standby1 to replicate and synchronize logical slots configured -# for failover on the primary +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: # -# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) -# primary ---> | -# physical slot sb1_slot--->| ----> standby1 (connected via streaming replication) -# | lsub1_slot(synced_slot) +# | ----> standby1 (primary_slot_name = sb1_slot) +# | ----> standby2 (primary_slot_name = sb2_slot) +# primary ----- | +# | ----> subscriber1 (failover = true) +# | ----> subscriber2 (failover = false) +# +# standby_slot_names = 'sb1_slot' +# +# Set up is configured in such a way that the logical slot of subscriber1 is +# enabled failover, thus it will wait for the physical slot of +# standby1(sb1_slot) to catch up before sending decoded changes to subscriber1. ################################################## +# Create primary my $primary = $publisher; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + my $backup_name = 'backup'; $primary->backup($backup_name); @@ -107,21 +120,201 @@ $standby1->init_from_backup( $primary, $backup_name, has_streaming => 1, has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb1_slot' +)); +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Configure primary to disallow any logical slots that enabled failover from +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->reload; + +$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +# Create a table and refresh the publication +$subscriber1->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION WITH (copy_data = false); +]); + +# Create another subscriber node without enabling failover, wait for sync to +# complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init; +$subscriber2->start; +$subscriber2->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot, copy_data = false); +]); +# Stop the standby associated with the specified physical replication slot so +# that the logical replication slot won't receive changes until the standby +# comes up. +$standby1->stop; + +# Create some data on the primary +my $primary_row_count = 10; +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait for the standby that's up and running gets the data from primary +$primary->wait_for_replay_catchup($standby2); +my $result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for the subscription that's up and running and is not enabled for failover. +# It gets the data from primary without waiting for any standbys. +$publisher->wait_for_catchup('regress_mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The subscription that's up and running and is enabled for failover +# doesn't get the data from primary and keeps waiting for the +# standby specified in standby_slot_names. +$result = + $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', + "subscriber1 doesn't get data from primary until standby1 acknowledges changes" +); + +# Start the standby specified in standby_slot_names and wait for it to catch +# up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, +# primary must send the decoded changes to subscription enabled for failover +# While the standby was down, this subscriber didn't receive any data from +# primary i.e. the primary didn't allow it to go ahead of standby. +$publisher->wait_for_catchup('regress_mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 gets data from primary after standby1 acknowledges changes"); + +# Stop the standby associated with the specified physical replication slot so +# that the logical replication slot won't receive changes until the standby +# slot's restart_lsn is advanced or the slot is removed from the +# standby_slot_names list. +$publisher->safe_psql('postgres', "TRUNCATE tab_int;"); +$publisher->wait_for_catchup('regress_mysub1'); +$standby1->stop; + +################################################## +# Verify that when using pg_logical_slot_get_changes to consume changes from a +# logical slot with failover enabled, it will also wait for the slots specified +# in standby_slot_names to catch up. +################################################## + +# Create a logical 'test_decoding' replication slot with failover enabled +$publisher->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);" +); + +my $back_q = $primary->background_psql('postgres', on_error_stop => 0); +my $pid = $back_q->query('SELECT pg_backend_pid()'); + +# Try and get changes from the logical slot with failover enabled. +my $offset = -s $primary->logfile; +$back_q->query_until(qr//, + "SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);\n"); + +# Wait until the primary server logs a warning indicating that it is waiting +# for the sb1_slot to catch up. +$primary->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? replication slot \"sb1_slot\" specified in parameter \"standby_slot_names\" does not have active_pid/, + $offset); + +ok($primary->safe_psql('postgres', "SELECT pg_cancel_backend($pid)"), + "cancelling pg_logical_slot_get_changes command"); + +$back_q->quit; + +$publisher->safe_psql('postgres', + "SELECT pg_drop_replication_slot('test_slot');" +); + +################################################## +# Test that logical replication will wait for the user-created inactive +# physical slot to catch up until we remove the slot from standby_slot_names. +################################################## + +# Create some data on the primary +$primary_row_count = 10; +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +$result = + $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', + "subscriber1 doesn't get data as the sb1_slot doesn't catch up"); + +# Remove the standby from the standby_slot_names list and reload the +# configuration. +$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''"); +$primary->reload; + +# Since there are no slots in standby_slot_names, the primary server should now +# send the decoded changes to the subscription. +$publisher->wait_for_catchup('regress_mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 gets data from primary after standby1 is removed from the standby_slot_names list" +); + +# Put the standby back on the primary_slot_name for the rest of the tests +$primary->adjust_conf('postgresql.conf', 'standby_slot_names', 'sb1_slot'); +$primary->reload; + +################################################## +# Test logical failover slots on the standby +# Configure standby1 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb1_slot--->| ----> standby1 (connected via streaming replication) +# | lsub1_slot(synced_slot) +################################################## + +# Create a standby my $connstr_1 = $primary->connstr; $standby1->append_conf( 'postgresql.conf', qq( enable_syncslot = true hot_standby_feedback = on -primary_slot_name = 'sb1_slot' primary_conninfo = '$connstr_1 dbname=postgres' )); -$primary->psql('postgres', - q{SELECT pg_create_physical_replication_slot('sb1_slot');}); - my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; -my $offset = -s $standby1->logfile; +$offset = -s $standby1->logfile; # Start the standby so that slot syncing can begin $standby1->start; @@ -150,18 +343,11 @@ is($standby1->safe_psql('postgres', # Insert data on the primary $primary->safe_psql( 'postgres', qq[ - CREATE TABLE tab_int (a int PRIMARY KEY); + TRUNCATE TABLE tab_int; INSERT INTO tab_int SELECT generate_series(1, 10); ]); -# Subscribe to the new table data and wait for it to arrive -$subscriber1->safe_psql( - 'postgres', qq[ - CREATE TABLE tab_int (a int PRIMARY KEY); - ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION; -]); - -$subscriber1->wait_for_subscription_sync; +$primary->wait_for_catchup('regress_mysub1'); # Do not allow any further advancement of the restart_lsn and # confirmed_flush_lsn for the lsub1_slot. @@ -199,7 +385,9 @@ $standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;') $standby1->restart; # Attempting to perform logical decoding on a synced slot should result in an error -my ($result, $stdout, $stderr) = $standby1->psql('postgres', +my ($stdout, $stderr); + +($result, $stdout, $stderr) = $standby1->psql('postgres', "select * from pg_logical_slot_get_changes('lsub1_slot',NULL,NULL);"); ok($stderr =~ /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/, "logical decoding is not allowed on synced slot"); -- 2.34.1