From 0f2640f30a5c0deb95b4f621492e1ea907a8bd00 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 23 Feb 2024 08:29:30 +0800 Subject: [PATCH v100] Allow logical walsenders to wait for the physical standby This patch introduces a mechanism to ensure that physical standby servers, which are potential failover candidates, have received and flushed changes before making them visible to subscribers. By doing so, it guarantees that the promoted standby server is not lagging behind the subscribers when a failover is necessary. A new parameter named 'standby_slot_names' is introduced. The logical walsender now guarantees that all local changes are sent and flushed to the standby servers corresponding to the replication slots specified in 'standby_slot_names' before sending those changes to the subscriber. Additionally, The SQL functions pg_logical_slot_get_changes and pg_replication_slot_advance are modified to wait for the replication slots mentioned in 'standby_slot_names' to catch up before returning. --- doc/src/sgml/config.sgml | 29 ++ doc/src/sgml/logicaldecoding.sgml | 7 + .../replication/logical/logicalfuncs.c | 12 + src/backend/replication/logical/slotsync.c | 13 + src/backend/replication/slot.c | 304 +++++++++++++++++- src/backend/replication/slotfuncs.c | 12 + src/backend/replication/walsender.c | 111 ++++++- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/replication/slot.h | 6 + src/include/replication/walsender.h | 1 + src/include/replication/walsender_private.h | 7 + src/include/utils/guc_hooks.h | 3 + src/test/recovery/t/006_logical_decoding.pl | 3 +- .../t/040_standby_failover_slots_sync.pl | 188 +++++++++++ 16 files changed, 697 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 36a2a5ce43..ce0fe0e9e1 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4420,6 +4420,35 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + Lists the streaming replication standby server slot names that logical + WAL sender processes will wait for. Logical WAL sender processes will + send decoded changes to plugins only after the specified replication + slots confirm receiving WAL. This guarantees that logical replication + slots with failover enabled do not consume changes until those changes + are received and flushed to corresponding physical standbys. If a + logical replication connection is meant to switch to a physical standby + after the standby is promoted, the physical replication slot for the + standby should be listed here. Note that logical replication will not + proceed if the slots specified in the standby_slot_names do not exist or + are invalidated. + + + The standbys corresponding to the physical replication slots in + standby_slot_names must configure + sync_replication_slots = true so they can receive + failover logical slots changes from the primary. + + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 930c0fa8a6..5d16ea257d 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -384,6 +384,13 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU must be enabled on the standby. It is also necessary to specify a valid dbname in the primary_conninfo. + It's highly recommended that the said physical replication slot is named in + standby_slot_names + list on the primary, to prevent the subscriber from consuming changes + faster than the hot standby. Even when correctly configured, some latency + is expected when sending changes to logical subscribers due to the waiting + on slots named in + standby_slot_names. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index b0081d3ce5..f28d6c9015 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -109,6 +109,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wait_for_wal_lsn; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -228,6 +229,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to wait_for_wal_lsn. + */ + if (XLogRecPtrIsInvalid(upto_lsn)) + wait_for_wal_lsn = end_of_wal; + else + wait_for_wal_lsn = Min(upto_lsn, end_of_wal); + + WaitForStandbyConfirmation(wait_for_wal_lsn); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 36773cfe73..de05389d61 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -491,6 +491,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) latestFlushPtr = GetStandbyFlushRecPtr(NULL); if (remote_slot->confirmed_lsn > latestFlushPtr) { + /* + * Can get here only if GUC 'standby_slot_names' on the primary server + * was not configured correctly. + */ ereport(am_slotsync_worker ? LOG : ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("skipping slot synchronization as the received slot sync" @@ -860,6 +864,15 @@ validate_remote_info(WalReceiverConn *wrconn) remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); Assert(!isnull); + /* + * Slot sync is currently not supported on a cascading standby. This is + * because if we allow it, the primary server needs to wait for all the + * cascading standbys, otherwise, logical subscribers can still be ahead + * of one of the cascading standbys which we plan to promote. Thus, to + * avoid this additional complexity, we restrict it for the time being. + * + * XXX: If needed, this can be attempted in future. + */ if (remote_in_recovery) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 0f173f63a2..d711b352c8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,13 +46,18 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/interrupt.h" #include "replication/slotsync.h" #include "replication/slot.h" +#include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/memutils.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -115,10 +120,19 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +/* + * This GUC lists streaming replication standby server slot names that + * logical WAL sender processes will wait for. + */ +char *standby_slot_names; + +/* This is parsed and cached list for raw standby_slot_names. */ +static List *standby_slot_names_list = NIL; + static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -2345,3 +2359,291 @@ GetSlotInvalidationCause(const char *conflict_reason) Assert(found); return result; } + +/* + * A helper function to validate slots specified in GUC standby_slot_names. + */ +static bool +validate_standby_slots(char **newval) +{ + char *rawname; + List *elemlist; + bool ok; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Verify syntax and parse string into a list of identifiers */ + ok = SplitIdentifierString(rawname, ',', &elemlist); + + if (!ok) + { + GUC_check_errdetail("List syntax is invalid."); + } + + /* + * If the replication slots' data have been initialized, verify if the + * specified slots exist and are logical slots. + */ + else if (ReplicationSlotCtl) + { + foreach_ptr(char, name, elemlist) + { + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", + name); + ok = false; + break; + } + + if (!SlotIsPhysical(slot)) + { + GUC_check_errdetail("\"%s\" is not a physical replication slot", + name); + ok = false; + break; + } + } + } + + pfree(rawname); + list_free(elemlist); + return ok; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + if ((*newval)[0] == '\0') + return true; + + /* + * "*" is not accepted as in that case primary will not be able to know + * for which all standbys to wait for. Even if we have physical slots + * info, there is no way to confirm whether there is any standby + * configured for the known physical slots. + */ + if (strcmp(*newval, "*") == 0) + { + GUC_check_errdetail("\"*\" is not accepted for standby_slot_names"); + return false; + } + + /* Now verify if the specified slots really exist and have correct type */ + if (!validate_standby_slots(newval)) + return false; + + *extra = guc_strdup(ERROR, *newval); + + return true; +} + +/* + * GUC assign_hook for standby_slot_names + */ +void +assign_standby_slot_names(const char *newval, void *extra) +{ + List *standby_slots; + MemoryContext oldcxt; + char *standby_slot_names_cpy = extra; + + list_free(standby_slot_names_list); + standby_slot_names_list = NIL; + + /* No value is specified for standby_slot_names. */ + if (standby_slot_names_cpy == NULL) + return; + + if (!SplitIdentifierString(standby_slot_names_cpy, ',', &standby_slots)) + { + /* This should not happen if GUC checked check_standby_slot_names. */ + elog(ERROR, "invalid list syntax"); + } + + /* + * Switch to the memory context under which GUC variables are allocated + * (GUCMemoryContext). + */ + oldcxt = MemoryContextSwitchTo(GetMemoryChunkContext(standby_slot_names_cpy)); + standby_slot_names_list = list_copy(standby_slots); + MemoryContextSwitchTo(oldcxt); +} + +/* + * Return a copy of standby_slot_names_list if the copy flag is set to true, + * otherwise return the original list. + * + * Note that since we do not support syncing slots to cascading standbys, we + * return NIL if we are running in a standby to indicate that no standby slots + * need to be waited for. + */ +List * +GetStandbySlotList(bool copy) +{ + if (RecoveryInProgress()) + return NIL; + + if (copy) + return list_copy(standby_slot_names_list); + else + return standby_slot_names_list; +} + +/* + * Filter the standby slots based on the specified log sequence number + * (wait_for_lsn). + * + * This function updates the passed standby_slots list, removing any slots that + * have already caught up to or surpassed the given wait_for_lsn. + */ +void +FilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) +{ + if (*standby_slots == NIL) + return; + + /* + * To prevent concurrent slot dropping and creation while filtering the + * slots, take the ReplicationSlotControlLock outside of the loop. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + foreach_ptr(char, name, *standby_slots) + { + XLogRecPtr restart_lsn; + bool invalidated; + bool inactive; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, false); + + if (!slot) + { + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" specified in parameter %s does not exist", + name, "standby_slot_names")); + continue; + } + + if (SlotIsLogical(slot)) + { + /* + * If a logical slot name is provided in standby_slot_names, issue + * a WARNING and skip it. Although logical slots are disallowed in + * the GUC check_hook(validate_standby_slots), it is still + * possible for a user to drop an existing physical slot and + * recreate a logical slot with the same name. + */ + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot have logical replication slot \"%s\" in parameter %s", + name, "standby_slot_names")); + continue; + } + + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + inactive = slot->active_pid == 0; + SpinLockRelease(&slot->mutex); + + if (invalidated) + { + /* Specified physical slot has been invalidated */ + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("physical slot \"%s\" specified in parameter %s has been invalidated", + name, "standby_slot_names")); + continue; + } + + if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn) + { + /* Log warning if no active_pid for this physical slot */ + if (inactive) + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the standby associated with \"%s\".", + name), + errhint("Consider starting standby associated with \"%s\" or amend parameter %s.", + name, "standby_slot_names")); + + /* Continue if the current slot hasn't caught up. */ + continue; + } + + Assert(restart_lsn >= wait_for_lsn); + *standby_slots = foreach_delete_current(*standby_slots, name); + } + + LWLockRelease(ReplicationSlotControlLock); +} + +/* + * Wait for physical standby to confirm receiving the given lsn. + * + * Used by logical decoding SQL functions that acquired failover enabled slot. + * It waits for physical standbys corresponding to the physical slots specified + * in the standby_slot_names GUC. + */ +void +WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + List *standby_slots; + + if (!MyReplicationSlot->data.failover) + return; + + standby_slots = GetStandbySlotList(true); + + if (standby_slots == NIL) + return; + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + list_free(standby_slots); + standby_slots = GetStandbySlotList(true); + } + + FilterStandbySlots(wait_for_lsn, &standby_slots); + + /* Exit if done waiting for every slot. */ + if (standby_slots == NIL) + break; + + /* + * We wait for the slots in the standby_slot_names to catch up, but we + * use a timeout (1s) so we can also check if the standby_slot_names + * has been changed. + */ + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000, + WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); + list_free(standby_slots); +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 768a304723..d864fe4133 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -464,6 +464,12 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * crash, but this makes the data consistent after a clean shutdown. */ ReplicationSlotMarkDirty(); + + /* + * Wake up logical walsenders holding failover enabled slots after + * updating the restart_lsn of the physical slot. + */ + PhysicalWakeupLogicalWalSnd(); } return retlsn; @@ -504,6 +510,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) .segment_close = wal_segment_close), NULL, NULL, NULL); + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to moveto lsn. + */ + WaitForStandbyConfirmation(moveto); + /* * Start reading at the slot's restart_lsn, which we know to point to * a valid record. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 13bc3e0aee..aae19e3b5f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1728,27 +1728,74 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ProcessPendingWrites(); } +/* + * Wake up the logical walsender processes with failover enabled slots if the + * currently acquired physical slot is specified in standby_slot_names GUC. + */ +void +PhysicalWakeupLogicalWalSnd(void) +{ + List *standby_slots; + + Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot)); + + standby_slots = GetStandbySlotList(false); + + foreach_ptr(char, name, standby_slots) + { + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); + return; + } + } +} + /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * - * Returns end LSN of flushed WAL. Normally this will be >= loc, but - * if we detect a shutdown request (either from postmaster or client) - * we will return early, so caller must always check. + * If the walsender holds a logical slot that has enabled failover, we also + * wait for all the specified streaming replication standby servers to + * confirm receipt of WAL up to RecentFlushPtr. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we + * detect a shutdown request (either from postmaster or client) we will return + * early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + bool wait_for_standby = false; + uint32 wait_event; + List *standby_slots = NIL; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + if (MyReplicationSlot->data.failover && replication_active) + standby_slots = GetStandbySlotList(true); + /* - * Fast path to avoid acquiring the spinlock in case we already know we - * have enough WAL available. This is particularly interesting if we're - * far behind. + * Check if all the standby servers have confirmed receipt of WAL up to + * RecentFlushPtr even when we already know we have enough WAL available. + * + * Note that we cannot directly return without checking the status of + * standby servers because the standby_slot_names may have changed, which + * means there could be new standby slots in the list that have not yet + * caught up to the RecentFlushPtr. */ - if (RecentFlushPtr != InvalidXLogRecPtr && - loc <= RecentFlushPtr) - return RecentFlushPtr; + if (!XLogRecPtrIsInvalid(RecentFlushPtr) && loc <= RecentFlushPtr) + { + FilterStandbySlots(RecentFlushPtr, &standby_slots); + + /* + * Fast path to avoid acquiring the spinlock in case we already know + * we have enough WAL available and all the standby servers have + * confirmed receipt of WAL up to RecentFlushPtr. This is particularly + * interesting if we're far behind. + */ + if (standby_slots == NIL) + return RecentFlushPtr; + } /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) @@ -1770,6 +1817,13 @@ WalSndWaitForWal(XLogRecPtr loc) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + if (MyReplicationSlot->data.failover && replication_active) + { + list_free(standby_slots); + standby_slots = GetStandbySlotList(true); + } + SyncRepInitConfig(); } @@ -1784,8 +1838,18 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) XLogBackgroundFlush(); + /* + * Update the standby slots that have not yet caught up to the flushed + * position. It is good to wait up to RecentFlushPtr and then let it + * send the changes to logical subscribers one by one which are + * already covered in RecentFlushPtr without needing to wait on every + * change for standby confirmation. + */ + if (wait_for_standby) + FilterStandbySlots(RecentFlushPtr, &standby_slots); + /* Update our idea of the currently flushed position. */ - if (!RecoveryInProgress()) + else if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); @@ -1813,9 +1877,18 @@ WalSndWaitForWal(XLogRecPtr loc) !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); - /* check whether we're done */ - if (loc <= RecentFlushPtr) + if (loc > RecentFlushPtr) + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL; + else if (standby_slots) + { + wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION; + wait_for_standby = true; + } + else + { + /* Already caught up and doesn't need to wait for standby_slots. */ break; + } /* Waiting for new WAL. Since we need to wait, we're now caught up. */ WalSndCaughtUp = true; @@ -1855,9 +1928,11 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); + WalSndWait(wakeEvents, sleeptime, wait_event); } + list_free(standby_slots); + /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; @@ -2265,6 +2340,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + PhysicalWakeupLogicalWalSnd(); } /* @@ -3538,6 +3614,7 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } @@ -3607,8 +3684,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. + * + * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV + * until awakened by physical walsenders after the walreceiver confirms + * the receipt of the LSN. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4fffb46625..01fe180b65 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -78,6 +78,7 @@ GSS_OPEN_SERVER "Waiting to read data from the client while establishing a GSSAP LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to remote server." LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." +WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 527a2b2734..6ee6375cb8 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4639,6 +4639,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("Lists streaming replication standby server slot " + "names that logical WAL sender processes will wait for."), + gettext_noop("Logical WAL sender processes will send decoded " + "changes to plugins only after the specified " + "replication slots confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, assign_standby_slot_names, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c97f9a25f0..cadfe10958 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -334,6 +334,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsender processes will wait for # - Standby Servers - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index acbf567150..04a2717138 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -226,6 +226,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *standby_slot_names; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -274,4 +275,9 @@ extern void CheckSlotPermissions(void); extern ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *conflict_reason); +extern List *GetStandbySlotList(bool copy); +extern void FilterStandbySlots(XLogRecPtr wait_for_lsn, + List **standby_slots); +extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); + #endif /* SLOT_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 0c3996e926..f2d8297f01 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -39,6 +39,7 @@ extern void InitWalSender(void); extern bool exec_replication_command(const char *cmd_string); extern void WalSndErrorCleanup(void); extern void WalSndResourceCleanup(bool isCommit); +extern void PhysicalWakeupLogicalWalSnd(void); extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 3113e9ea47..3c134c8edf 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -113,6 +113,13 @@ typedef struct ConditionVariable wal_flush_cv; ConditionVariable wal_replay_cv; + /* + * Used by physical walsenders holding slots specified in + * standby_slot_names to wake up logical walsenders holding failover + * enabled slots when a walreceiver confirms the receipt of LSN. + */ + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 339c490300..dcf9a040d1 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -163,5 +163,8 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); +extern void assign_standby_slot_names(const char *newval, void *extra); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 5c7b4ca5e3..b95d95c06f 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); # Test logical slot advancing and its durability. +# Passing failover=true (last arg) should not have any impact on advancing. my $logical_slot = 'logical_slot'; $node_primary->safe_psql('postgres', - "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);" + "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);" ); $node_primary->psql( 'postgres', " diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 968aa7b05b..826c68b5bb 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -446,11 +446,199 @@ ok( $standby1->poll_query_until( "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"), 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); +################################################## +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: +# +# | ----> standby1 (primary_slot_name = sb1_slot) +# | ----> standby2 (primary_slot_name = sb2_slot) +# primary ----- | +# | ----> subscriber1 (failover = true) +# | ----> subscriber2 (failover = false) +# +# standby_slot_names = 'sb1_slot' +# +# Set up is configured in such a way that the logical slot of subscriber1 is +# enabled for failover, thus it will wait for the physical slot of +# standby1(sb1_slot) to catch up before sending decoded changes to subscriber1. +################################################## + +$backup_name = 'backup3'; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->backup($backup_name); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Configure primary to disallow any logical slots that enabled failover from +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->reload; + +# Create another subscriber node without enabling failover, wait for sync to +# complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init; +$subscriber2->start; +$subscriber2->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot); +]); + +$subscriber2->wait_for_subscription_sync; + +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); + +# Stop the standby associated with the specified physical replication slot +# (sb1_slot) so that the logical replication slot (lsub1_slot) won't receive +# changes until the standby comes up. +$standby1->stop; + +# Create some data on the primary +my $primary_row_count = 20; +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, $primary_row_count);"); + +# Wait until the standby2 that's still running gets the data from the primary +$primary->wait_for_replay_catchup($standby2); +$result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for regress_mysub2 to get the data from the primary. This subscription +# was not enabled for failover so it gets the data without waiting for any +# standbys. +$primary->wait_for_catchup('regress_mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The regress_mysub1 was enabled for failover so it doesn't get the data from +# primary and keeps waiting for the standby specified in standby_slot_names +# (sb1_slot aka standby1). +$result = + $subscriber1->safe_psql('postgres', "SELECT count(*) <> $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 doesn't get data from primary until standby1 acknowledges changes" +); + +# Start the standby specified in standby_slot_names (sb1_slot aka standby1) and +# wait for it to catch up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, the +# primary can send the decoded changes to the subscription enabled for failover +# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't +# receive any data from the primary. i.e. the primary didn't allow it to go +# ahead of standby. +$primary->wait_for_catchup('regress_mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 gets data from primary after standby1 acknowledges changes"); + +################################################## +# Verify that when using pg_logical_slot_get_changes to consume changes from a +# logical slot with failover enabled, it will also wait for the slots specified +# in standby_slot_names to catch up. +################################################## + +# Stop the standby associated with the specified physical replication slot so +# that the logical replication slot won't receive changes until the standby +# slot's restart_lsn is advanced or the slot is removed from the +# standby_slot_names list. +$primary->safe_psql('postgres', "TRUNCATE tab_int;"); +$primary->wait_for_catchup('regress_mysub1'); +$standby1->stop; + +# Create a logical 'test_decoding' replication slot with failover enabled +$primary->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);" +); + +my $back_q = $primary->background_psql('postgres', on_error_stop => 0); +my $pid = $back_q->query('SELECT pg_backend_pid()'); + +# Try and get changes from the logical slot with failover enabled. +my $offset = -s $primary->logfile; +$back_q->query_until(qr//, + "SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);\n"); + +# Wait until the primary server logs a warning indicating that it is waiting +# for the sb1_slot to catch up. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/, + $offset); + +ok($primary->safe_psql('postgres', "SELECT pg_cancel_backend($pid)"), + "cancelling pg_logical_slot_get_changes command"); + +$back_q->quit; + +$primary->safe_psql('postgres', + "SELECT pg_drop_replication_slot('test_slot');" +); + +################################################## +# Test that logical replication will wait for the user-created inactive +# physical slot to catch up until we remove the slot from standby_slot_names. +################################################## + +# Create some data on the primary +$primary_row_count = 10; +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# The regress_mysub1 doesn't get the data from primary because the specified +# standby slot (sb1_slot) in standby_slot_names is inactive. +$result = + $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', + "subscriber1 doesn't get data as the sb1_slot doesn't catch up"); + +# Remove the standby from the standby_slot_names list and reload the +# configuration. +$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''"); +$primary->reload; + +# Since there are no slots in standby_slot_names, the primary server should now +# send the decoded changes to the subscription. +$primary->wait_for_catchup('regress_mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 gets data from primary after standby1 is removed from the standby_slot_names list" +); + ################################################## # Promote the standby1 to primary. Confirm that: # a) the slot 'lsub1_slot' is retained on the new primary # b) logical replication for regress_mysub1 is resumed successfully after failover ################################################## +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + $standby1->promote; # Update subscription with the new primary's connection info -- 2.30.0.windows.2