From 94c06b15915b5fa021fbb9e165fbe0d67805967e Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 5 Oct 2023 01:59:35 -0400 Subject: [PATCH v24 1/2] Allow logical walsenders to wait for the physical standbys. A new property enable_failover is added at the slot level which will be persistent information. Users can set it during the create subscription or during pg_create_logical_replication_slot. eg: create subscription mysub connection '..' publication mypub WITH (enable_failover = true); --last arg SELECT * FROM pg_create_logical_replication_slot('test_slot1', 'pgoutput', false, true, true); This 'enable_failover' is displayed as part of pg_replication_slots view. Logical replication slots with 'enable_failover = true' are the eligible candidates to be synchronized to the physical standbys. A new GUC standby_slot_names has been added. It is the list of physical replication slots that logical replication with failover enabled waits for. The intent of this wait is that no logical replication subscribers (with enable_failover=true) should go ahead of physical replication standbys (corresponding to the physical slots in standby_slot_names). --- doc/src/sgml/config.sgml | 27 ++ doc/src/sgml/func.sgml | 11 +- doc/src/sgml/system-views.sgml | 11 + src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 20 +- .../libpqwalreceiver/libpqwalreceiver.c | 14 +- .../replication/logical/logicalfuncs.c | 9 + src/backend/replication/logical/tablesync.c | 3 +- src/backend/replication/slot.c | 134 +++++++++- src/backend/replication/slotfuncs.c | 28 +- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 253 ++++++++++++++++-- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/catalog/pg_proc.dat | 14 +- src/include/catalog/pg_subscription.h | 4 + src/include/replication/slot.h | 15 +- src/include/replication/walreceiver.h | 5 +- src/include/replication/walsender.h | 4 + src/include/replication/walsender_private.h | 2 + src/include/utils/guc_hooks.h | 2 + src/test/recovery/meson.build | 1 + src/test/recovery/t/050_verify_slot_order.pl | 145 ++++++++++ src/test/regress/expected/rules.out | 5 +- 26 files changed, 667 insertions(+), 63 deletions(-) create mode 100644 src/test/recovery/t/050_verify_slot_order.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 924309af26..e8712b1262 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4344,6 +4344,33 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication with failover + enabled waits for. Specify * to wait for all the + physical replication slots. If a logical replication connection is + meant to switch to a physical standby after the standby is promoted, + the physical replication slot for the standby should be listed here. + + + The standbys corresponding to the physical replication slots in + standby_slot_names must enable + enable_syncslot for the standbys to receive + failover logical slots changes from the primary. If + enable_syncslot is not enabled on the + corresponding standbys, then it may result in indefinite waiting + on the primary for physical replication slots configured in + standby_slot_names + + + + diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index f1ad64c3d6..be407dea5b 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27307,7 +27307,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, enable_failover boolean ) record ( slot_name name, lsn pg_lsn ) @@ -27322,8 +27322,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, twophase, when set to true, specifies that the decoding of prepared transactions is enabled for this - slot. A call to this function has the same effect as the replication - protocol command CREATE_REPLICATION_SLOT ... LOGICAL. + slot. The optional fifth parameter, + enable_failover, when set to true, + specifies that this slot is enabled to be synced to the + physical standbys so that logical replication is not blocked + after failover. A call to this function has the same effect as + the replication protocol command + CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 2b35c2f91b..e720c0e9e9 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2532,6 +2532,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx invalidated). Always NULL for physical slots. + + + + enable_failover bool + + + True if this logical slot is enabled to be synced to the physical standbys + so that logical replication is not blocked after failover. Always false + for physical slots. + + diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 07c0d89c4f..03a5452de8 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -459,6 +459,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, + IN enable_failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fcb14976c0..b6df844b20 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1002,7 +1002,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflicting + L.conflicting, + L.enable_failover FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6fe111e98d..c5d67f0705 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_ENABLE_FAILOVER 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -96,6 +97,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; char *origin; + bool enable_failover; XLogRecPtr lsn; } SubOpts; @@ -157,6 +159,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_ENABLE_FAILOVER)) + opts->enable_failover = false; /* Parse options */ foreach(lc, stmt_options) @@ -326,6 +330,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", opts->origin)); } + else if (IsSet(supported_opts, SUBOPT_ENABLE_FAILOVER) && + strcmp(defel->defname, "enable_failover") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_ENABLE_FAILOVER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_ENABLE_FAILOVER; + opts->enable_failover = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -591,7 +604,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_ENABLE_FAILOVER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -710,6 +724,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subenablefailover - 1] = + BoolGetDatum(opts.enable_failover); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -807,7 +823,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); + opts.enable_failover, CRS_NOEXPORT_SNAPSHOT, NULL); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..883e9d632b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -74,6 +74,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool enable_failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); @@ -883,8 +884,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + bool temporary, bool two_phase, bool enable_failover, + CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -913,7 +914,14 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, else appendStringInfoChar(&cmd, ' '); } - + if (enable_failover) + { + appendStringInfoString(&cmd, "ENABLE_FAILOVER"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } if (use_new_options_syntax) { switch (snapshot_action) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 197169d6b0..868bf267ba 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -30,6 +30,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "utils/array.h" #include "utils/builtins.h" @@ -109,6 +110,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wal_to_wait; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -228,6 +230,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + if (XLogRecPtrIsInvalid(upto_lsn)) + wal_to_wait = end_of_wal; + else + wal_to_wait = Min(upto_lsn, end_of_wal); + + WalSndWaitForStandbyConfirmation(wal_to_wait); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e2cee92cf2..4a093503ed 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1414,7 +1414,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); + false /* enable_failover */ , CRS_USE_SNAPSHOT, + origin_startpos); /* * Setup replication origin tracking. The purpose of doing this before the diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 7e5ec500d8..f3cddbc17e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -52,6 +52,8 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -90,7 +92,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -98,9 +100,11 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +char *standby_slot_names; +List *standby_slot_names_list = NIL; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropAcquired(void); @@ -251,7 +255,8 @@ ReplicationSlotValidateName(const char *name, int elevel) */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, + bool two_phase, bool enable_failover) { ReplicationSlot *slot = NULL; int i; @@ -311,6 +316,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.enable_failover = enable_failover; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -596,6 +602,13 @@ ReplicationSlotRelease(void) MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; LWLockRelease(ProcArrayLock); + + /* + * To prevent the backend from accessing a freed name list in the next + * call, reset the name list here. This is a convenient place to reset the + * standby names as it will always be called after finishing replication. + */ + standby_slot_names_list = NIL; } /* @@ -2121,3 +2134,118 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * A helper function to simplify check_hook implementation for + * standby_slot_names GUCs. + */ +static bool +validate_slot_names(char **newval, List **elemlist) +{ + char *rawname; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', elemlist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(*elemlist); + return false; + } + + return true; +} + +/* + * A helper function to validate slots specified in standby_slot_names GUCs. + */ +static bool +validate_standby_slots(List *elemlist) +{ + ListCell *lc; + + /* + * Skip check if replication slots' data is not initialized yet i.e. we + * are in startup process. + */ + if (!ReplicationSlotCtl) + return true; + + foreach(lc, elemlist) + { + char *name = lfirst(lc); + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", name); + return false; + } + + if (SlotIsLogical(slot)) + { + GUC_check_errdetail("cannot have logical replication slot \"%s\" " + "in this parameter", name); + return false; + } + } + + return true; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + List *elemlist; + + /* Special handling for "*" which means all. */ + if (strcmp(*newval, "*") == 0) + return true; + + if (strcmp(*newval, "") == 0) + return true; + + /* Verify syntax */ + if (!validate_slot_names(newval, &elemlist)) + return false; + + /* Now verify if these really exist and have correct type */ + if (!validate_standby_slots(elemlist)) + { + list_free(elemlist); + return false; + } + + list_free(elemlist); + return true; +} + +/* + * Initialize the list from raw standby_slot_names and cache it, + * in order to avoid parsing these repeatedly. Done at WALSender + * startup and after each SIGHUP. + */ +void +SlotSyncInitConfig(void) +{ + char *rawname; + + /* Free the old one */ + list_free(standby_slot_names_list); + standby_slot_names_list = NIL; + + if (strcmp(standby_slot_names, "") != 0) + { + rawname = pstrdup(standby_slot_names); + SplitIdentifierString(rawname, ',', &standby_slot_names_list); + } +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6035cf4816..0c9391469d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -21,6 +21,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -42,7 +43,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + false); if (immediately_reserve) { @@ -117,6 +119,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, + bool enable_failover, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -133,7 +136,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + enable_failover); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +175,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool enable_failover = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -188,6 +193,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) NameStr(*plugin), temporary, two_phase, + enable_failover, InvalidXLogRecPtr, true); @@ -232,7 +238,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -412,6 +418,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(false); } + values[i++] = BoolGetDatum(slot_contents.data.enable_failover); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -440,17 +448,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) if (startlsn < moveto) { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.restart_lsn = moveto; - SpinLockRelease(&MyReplicationSlot->mutex); + PhysicalConfirmReceivedLocation(moveto); retlsn = moveto; - - /* - * Dirty the slot so as it is written out at the next checkpoint. Note - * that the LSN position advanced may still be lost in the event of a - * crash, but this makes the data consistent after a clean shutdown. - */ - ReplicationSlotMarkDirty(); } return retlsn; @@ -683,6 +682,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + bool enable_failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -738,6 +738,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + enable_failover = first_slot_contents.data.enable_failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -777,6 +778,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) plugin, temporary, false, + enable_failover, src_restart_lsn, false); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index feff709435..68073ce3ca 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -387,7 +387,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e250b0567e..9d94c60a86 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -259,7 +259,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); - +static bool WalSndSlotInList(char *slot_names, List *slot_names_list); /* Initialize walsender process before entering the main command loop */ void @@ -828,6 +828,7 @@ StartReplication(StartReplicationCmd *cmd) SpinLockRelease(&MyWalSnd->mutex); SyncRepInitConfig(); + SlotSyncInitConfig(); /* Main loop of walsender */ replication_active = true; @@ -974,12 +975,13 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, bool *enable_failover) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool enable_failover_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1029,6 +1031,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "enable_failover") == 0) + { + if (enable_failover_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + enable_failover_given = true; + *enable_failover = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1045,6 +1056,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool enable_failover = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1054,13 +1066,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &enable_failover); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); } else { @@ -1075,7 +1088,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, enable_failover); } if (cmd->kind == REPLICATION_KIND_LOGICAL) @@ -1318,6 +1331,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) replication_active = true; SyncRepInitConfig(); + SlotSyncInitConfig(); /* Main loop of walsender */ WalSndLoop(XLogSendLogical); @@ -1408,6 +1422,33 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ProcessPendingWrites(); } +/* + * Process input from the client and the timeout. + */ +static void +ProcessRepliesAndTimeOut(void) +{ + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + SlotSyncInitConfig(); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); +} + /* * Wait until there is no pending write. Also process replies from the other * side and check timeouts during that. @@ -1419,14 +1460,7 @@ ProcessPendingWrites(void) { long sleeptime; - /* Check for input from the client */ - ProcessRepliesIfAny(); - - /* die if timeout was reached */ - WalSndCheckTimeOut(); - - /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(); + ProcessRepliesAndTimeOut(); if (!pq_is_send_pending()) break; @@ -1440,16 +1474,6 @@ ProcessPendingWrites(void) /* Clear any already-pending wakeups */ ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - - /* Process any requests or signals received recently */ - if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - SyncRepInitConfig(); - } - /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); @@ -1527,6 +1551,171 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ProcessPendingWrites(); } +/* + * Does this Wal Sender need to wake up logical walsender. + * + * Check if the physical slot of this walsender is specified in + * standby_slot_names GUC. + */ +static bool +WalSndWakeupNeeded() +{ + Assert(MyReplicationSlot != NULL); + Assert(SlotIsPhysical(MyReplicationSlot)); + + /* + * Initialize the slot list if not yet. This is needed when it is called + * outside of the walsender. + */ + if (strcmp(standby_slot_names, "") != 0 && standby_slot_names_list == NIL) + SlotSyncInitConfig(); + + return WalSndSlotInList(standby_slot_names, standby_slot_names_list); +} + +/* + * Helper function for WalSndWakeupNeeded. + */ +static bool +WalSndSlotInList(char *slot_names, List *slot_names_list) +{ + ListCell *l; + bool inlist = false; + + if (strcmp(standby_slot_names, "") == 0) + return false; + + /* Special handling for "*" which means all. */ + if (strcmp(slot_names, "*") == 0) + return true; + + foreach(l, slot_names_list) + { + char *name = lfirst(l); + + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + inlist = true; + break; + } + } + + return inlist; +} + +/* + * Wait for physical standby to confirm receiving give lsn. + * + * Here logical walsender associated with failover logical slot waits + * for physical standbys corresponding to physical slots specified in + * standby_slot_names GUC. + */ +void +WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + List *standby_slot_cpy; + + if (!MyReplicationSlot->data.enable_failover) + return; + + standby_slot_cpy = list_copy(standby_slot_names_list); + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + /* + * TODO: On ConfigReload, standby_slot_cpy is not refreshed if + * standby_slot_names is changed. Fix it. + */ + for (;;) + { + ListCell *l; + long sleeptime = -1; + + foreach(l, standby_slot_cpy) + { + char *name = lfirst(l); + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + bool invalidated = false; + char *warningfmt = NULL; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (slot && SlotIsPhysical(slot)) + { + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&slot->mutex); + } + + /* Continue if the current slot hasn't caught up. */ + if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) && + restart_lsn < wait_for_lsn) + continue; + + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + else if (!slot) + warningfmt = _("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring"); + + /* + * If logical slot name is given in standby_slot_names, give + * WARNING and skip it. Since it is harmless, so WARNING should be + * enough, no need to error-out. + */ + else if (SlotIsLogical(slot)) + warningfmt = _("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring"); + + /* + * Specified physical slot may have been invalidated, so no point + * in waiting for it. + */ + else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring"); + else + Assert(restart_lsn >= wait_for_lsn); + + /* + * Reaching here indicates that either the slot has passed the + * wait_for_lsn or there is an issue with the slot that requires a + * warning to be reported. + */ + if (warningfmt) + ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); + + standby_slot_cpy = foreach_delete_current(standby_slot_cpy, l); + } + + /* Exit if done waiting for every slot. */ + if (standby_slot_cpy == NIL) + break; + + if (am_walsender) + { + ProcessRepliesAndTimeOut(); + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + /* If postmaster asked us to stop, don't wait anymore. */ + if (got_STOPPING) + break; + } + + /* + * Sleep until other physical walsenders awaken us or until a timeout + * occurs. + */ + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, sleeptime, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); +} + /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * @@ -1570,6 +1759,7 @@ WalSndWaitForWal(XLogRecPtr loc) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Check for input from the client */ @@ -1657,6 +1847,15 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); } + /* + * Wait for specified streaming replication standby servers (if any) to + * confirm receipt of WAL upto RecentFlushPtr. It is good to wait here + * upto RecentFlushPtr and then let it send the changes to logical + * subscribers one by one which are already covered in RecentFlushPtr + * without needing to wait on every change for standby confirmation. + */ + WalSndWaitForStandbyConfirmation(RecentFlushPtr); + /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; @@ -2030,7 +2229,7 @@ ProcessStandbyMessage(void) /* * Remember that a walreceiver just confirmed receipt of lsn `lsn`. */ -static void +void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { bool changed = false; @@ -2049,6 +2248,9 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + + if (WalSndWakeupNeeded()) + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); } /* @@ -2469,6 +2671,7 @@ WalSndLoop(WalSndSendDataCallback send_data) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Check for input from the client */ @@ -3311,6 +3514,8 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 9c5fdeb3ca..daf2d57d3d 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -76,6 +76,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." +WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION "Waiting for physical standby confirmation in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 16ec6c5ef0..372115dfae 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4552,6 +4552,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of streaming replication standby server slot " + "names that logical walsenders waits for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "walsenders only after specified replication slots " + "confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d08d55c3fe..014491e06f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -326,6 +326,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsenders waits for # - Standby Servers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f0b7b9cbd8..ec00ab7b48 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11079,17 +11079,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,enable_failover}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool', - proallargtypes => '{name,name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', + proargtypes => 'name name bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,enable_failover,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index be36c4a820..9c94d95818 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -93,6 +93,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + bool subenablefailover; /* True if the replication slot should be + * enabled for failover */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -144,6 +147,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + bool enablefailover; /* Allow slot to be synchronized for failover */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 758ca79a81..2b19af0f62 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a failover slot (sync candidate for physical standbys)? + * Relevant for logical slots on the primary server. + */ + bool enable_failover; } ReplicationSlotPersistentData; /* @@ -210,6 +216,10 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *standby_slot_names; + +/* Globals */ +extern PGDLLIMPORT List *standby_slot_names_list; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -218,7 +228,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase); + bool two_phase, bool enable_failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); @@ -253,4 +263,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn); +extern void SlotSyncInitConfig(void); + #endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..4081be060f 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -356,6 +356,7 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool enable_failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); @@ -429,8 +430,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, enable_failover, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, enable_failover, snapshot_action, lsn) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 9df7e50f94..1f7483c421 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -14,6 +14,8 @@ #include +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -47,6 +49,8 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); /* * Remember that we want to wakeup walsenders later diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7d919583bd..1b73695bfa 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -113,6 +113,8 @@ typedef struct ConditionVariable wal_flush_cv; ConditionVariable wal_replay_cv; + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index f04b99e3b9..2dbf7cc48d 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -160,5 +160,7 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 9d8039684a..3be3ee52fc 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -45,6 +45,7 @@ tests += { 't/037_invalid_database.pl', 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', + 't/050_verify_slot_order.pl', ], }, } diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl new file mode 100644 index 0000000000..25b3d5aac2 --- /dev/null +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -0,0 +1,145 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: +# +# | ----> standby1 (connected via streaming replication) +# | ----> standby2 (connected via streaming replication) +# primary ----- | +# | ----> subscriber1 (connected via logical replication) +# | ----> subscriber2 (connected via logical replication) +# +# Set up is configured in such a way that primary never lets subscriber1 ahead +# of standby1. + +# Create primary +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Configure primary to disallow specified logical replication slot (lsub1_slot) +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb1_slot' +)); +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Create publication on primary +my $publisher = $primary; +$publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR TABLE tab_int;"); +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; + +# Create a subscriber node, wait for sync to complete +my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$subscriber1->init(allows_streaming => 'logical'); +$subscriber1->start; +$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +# Create a subscription with enable_failover = true +$subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub1_slot, enable_failover = true);"); +$subscriber1->wait_for_subscription_sync; + +# Create another subscriber node, wait for sync to complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init(allows_streaming => 'logical'); +$subscriber2->start; +$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub2_slot);"); +$subscriber2->wait_for_subscription_sync; + +# Stop the standby associated with specified physical replication slot so that +# the logical replication slot won't receive changes until the standby comes +# up. +$standby1->stop; + +# Create some data on primary +my $primary_row_count = 10; +my $primary_insert_time = time(); +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait for the standby that's up and running gets the data from primary +$primary->wait_for_replay_catchup($standby2); +my $result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for the subscription that's up and running and is not enabled for failover. +# It gets the data from primary without waiting for any standbys. +$publisher->wait_for_catchup('mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The subscription that's up and running and is enabled for failover +# doesn't get the data from primary and keeps waiting for the +# standby specified in standby_slot_names. +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes"); + +# Start the standby specified in standby_slot_names and wait for it to catch +# up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, +# primary must send the decoded changes to subscription enabled for failover +# While the standby was down, this subscriber didn't receive any data from +# primary i.e. the primary didn't allow it to go ahead of standby. +$publisher->wait_for_catchup('mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); + +done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2c60400ade..27088c369e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflicting - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) + l.conflicting, + l.enable_failover + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, enable_failover) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1