From 48e11b103d3193b8007aba212bbe7c775b0862fe Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 31 Oct 2023 11:54:15 +0530 Subject: [PATCH v30 1/3] Allow logical walsenders to wait for the physical standbys A new property 'failover' is added at the slot level which is persistent information which specifies that this logical slot is enabled to be synced to the physical standbys so that logical replication can be resumed after failover. It is always false for physical slots. Users can set it during the create subscription or during pg_create_logical_replication_slot and alter it using alter subscription. Examples: create subscription mysub connection '..' publication mypub WITH (failover = true); alter subscription mysub set (failover = true); --last arg SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput', false, true, true); This 'failover' is displayed as part of pg_replication_slots view. A new GUC standby_slot_names has been added. It is the list of physical replication slots that logical replication with failover enabled waits for. The intent of this wait is that no logical replication subscribers (with failover=true) should go ahead of physical replication standbys (corresponding to the physical slots in standby_slot_names). --- contrib/test_decoding/expected/slot.out | 19 + contrib/test_decoding/sql/slot.sql | 6 + doc/src/sgml/catalogs.sgml | 12 + doc/src/sgml/config.sgml | 22 + doc/src/sgml/func.sgml | 11 +- doc/src/sgml/protocol.sgml | 51 +++ doc/src/sgml/ref/alter_subscription.sgml | 7 +- doc/src/sgml/ref/create_subscription.sgml | 24 ++ doc/src/sgml/system-views.sgml | 11 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 88 +++- .../libpqwalreceiver/libpqwalreceiver.c | 44 +- .../replication/logical/logicalfuncs.c | 13 + src/backend/replication/logical/tablesync.c | 61 ++- src/backend/replication/logical/worker.c | 40 +- src/backend/replication/repl_gram.y | 18 +- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 155 ++++++- src/backend/replication/slotfuncs.c | 28 +- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 406 +++++++++++++++++- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/bin/psql/describe.c | 9 +- src/bin/psql/tab-complete.c | 3 +- src/include/catalog/pg_proc.dat | 14 +- src/include/catalog/pg_subscription.h | 7 + src/include/nodes/replnodes.h | 12 + src/include/replication/slot.h | 17 +- src/include/replication/walreceiver.h | 18 +- src/include/replication/walsender.h | 4 + src/include/replication/walsender_private.h | 2 + src/include/replication/worker_internal.h | 4 +- src/include/utils/guc_hooks.h | 2 + src/test/recovery/meson.build | 1 + src/test/recovery/t/006_logical_decoding.pl | 3 +- src/test/recovery/t/050_verify_slot_order.pl | 145 +++++++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/subscription.out | 152 +++---- src/tools/pgindent/typedefs.list | 2 + 43 files changed, 1275 insertions(+), 167 deletions(-) create mode 100644 src/test/recovery/t/050_verify_slot_order.pl diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 63a9940f73..1c055f329c 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -406,3 +406,22 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp'); (1 row) +-- Test logical slots creation with 'failover'=true (last arg) +SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', false, false, true); + ?column? +---------- + init +(1 row) + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + slot_name | slot_type | failover +---------------+-----------+---------- + failover_slot | logical | t +(1 row) + +SELECT pg_drop_replication_slot('failover_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 1aa27c5667..1133e45abb 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -176,3 +176,9 @@ ORDER BY o.slot_name, c.slot_name; SELECT pg_drop_replication_slot('orig_slot2'); SELECT pg_drop_replication_slot('copied_slot2_no_change'); SELECT pg_drop_replication_slot('copied_slot2_notemp'); + +-- Test logical slots creation with 'failover'=true (last arg) +SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', false, false, true); +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + +SELECT pg_drop_replication_slot('failover_slot'); diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 3ec7391ec5..e666730c64 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7990,6 +7990,18 @@ SCRAM-SHA-256$<iteration count>:&l + + + subfailoverstate char + + + State codes for failover mode: + d = disabled, + p = pending enablement, + e = enabled + + + subconninfo text diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index bd70ff2e4b..7136a925a9 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4344,6 +4344,28 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication slots with + failover enabled waits for. If a logical replication connection is + meant to switch to a physical standby after the standby is promoted, + the physical replication slot for the standby should be listed here. + + + The standbys corresponding to the physical replication slots in + standby_slot_names must enable + enable_syncslot for the standbys to receive + failover logical slots changes from the primary. + + + + diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 5d5ad7ee6a..5bb6ec4c07 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27475,7 +27475,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean ) record ( slot_name name, lsn pg_lsn ) @@ -27490,8 +27490,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, twophase, when set to true, specifies that the decoding of prepared transactions is enabled for this - slot. A call to this function has the same effect as the replication - protocol command CREATE_REPLICATION_SLOT ... LOGICAL. + slot. The optional fifth parameter, + failover, when set to true, + specifies that this slot is enabled to be synced to the + physical standbys so that logical replication can be resumed + after failover. A call to this function has the same effect as + the replication protocol command + CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index af3f016f74..900833c3ea 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + FAILOVER { 'true' | 'false' } + + + If true, the slot is enabled to be synced to the physical + standbys so that logical replication can be resumed after failover. + + + @@ -2124,6 +2134,47 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + ALTER_REPLICATION_SLOT slot_name ( option [, ...] ) + ALTER_REPLICATION_SLOT + + + + Change the definition of a replication slot. + See for more about + replication slots. This command is currently only supported for logical + replication slots. + + + + + slot_name + + + The name of the slot to alter. Must be a valid replication slot + name (see ). + + + + + + The following options are supported: + + + + FAILOVER { 'true' | 'false' } + + + If true, the slot is enabled to be synced to the physical + standbys so that logical replication can be resumed after failover. + + + + + + + + READ_REPLICATION_SLOT slot_name READ_REPLICATION_SLOT diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6d36ff0dc9..e4fad5c55c 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -73,10 +73,13 @@ ALTER SUBSCRIPTION name RENAME TO < These commands also cannot be executed when the subscription has two_phase - commit enabled, unless + commit enabled or + failover + enabled, unless copy_data is false. See column subtwophasestate - of pg_subscription + and subfailoverstate of + pg_subscription to know the actual two-phase state. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index f1c20b3a46..c7b1d7b3c0 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -399,6 +399,30 @@ CREATE SUBSCRIPTION subscription_name + + + failover (boolean) + + + Specifies whether the replication slot assocaited with the subscription + is enabled to be synced to the physical standbys so that logical + replication can be resumed from the new primary after failover. + The default is false. + + + + The implementation of failover requires that replication + has successfully finished the initial table synchronization + phase. So even when failover is enabled for a + subscription, the internal failover state remains + temporarily pending until the initialization phase + completes. See column subfailoverstate + of pg_subscription + to know the actual failover state. + + + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 7078491c4c..7ea08942c4 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2532,6 +2532,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx invalidated). Always NULL for physical slots. + + + + failover bool + + + True if this logical slot is enabled to be synced to the physical + standbys so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d6a978f136..18512955ad 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->disableonerr = subform->subdisableonerr; sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; + sub->failoverstate = subform->subfailoverstate; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 35d738d576..24ad69c333 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, + IN failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b65f6b5249..9c595ca3c9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1002,7 +1002,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflicting + L.conflicting, + L.failover FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index edc82c11be..4f4cedc1fd 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_FAILOVER 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -96,6 +97,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; char *origin; + bool failover; XLogRecPtr lsn; } SubOpts; @@ -157,6 +159,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_FAILOVER)) + opts->failover = false; /* Parse options */ foreach(lc, stmt_options) @@ -326,6 +330,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", opts->origin)); } + else if (IsSet(supported_opts, SUBOPT_FAILOVER) && + strcmp(defel->defname, "failover") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_FAILOVER; + opts->failover = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -591,7 +604,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_FAILOVER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -710,6 +724,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subfailoverstate - 1] = + CharGetDatum(opts.failover ? + LOGICALREP_FAILOVER_STATE_PENDING : + LOGICALREP_FAILOVER_STATE_DISABLED); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -746,6 +764,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool failover_enabled = false; + check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); @@ -776,6 +796,19 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, InvalidXLogRecPtr); } + /* + * Even if failover is set, don't create the slot with failover + * enabled. Will enable it once all the tables are synced and + * ready. The intention is that if failover happens at the time of + * table-sync, user should re-launch the subscription instead of + * relying on main slot (if synced) with no table-sync data + * present. When the subscription has no tables, leave failover as + * false to allow ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (opts.failover && !opts.copy_data && tables != NIL) + failover_enabled = true; + /* * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to @@ -807,15 +840,32 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); - - if (twophase_enabled) - UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); - + failover_enabled, CRS_NOEXPORT_SNAPSHOT, NULL); + + /* Update twophase and/or failover state */ + if (twophase_enabled || failover_enabled) + UpdateTwoPhaseFailoverStates(subid, + twophase_enabled, + LOGICALREP_TWOPHASE_STATE_ENABLED, + failover_enabled, + LOGICALREP_FAILOVER_STATE_ENABLED); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", opts.slot_name))); } + + /* + * If only the slot_name is specified, it is possible that the user intends to + * use an existing slot on the publisher, so here we enable failover for the + * slot if requested. + */ + else if (opts.slot_name && failover_enabled) + { + walrcv_alter_slot(wrconn, opts.slot_name, opts.failover); + ereport(NOTICE, + (errmsg("enabled failover for replication slot \"%s\" on publisher", + opts.slot_name))); + } } PG_FINALLY(); { @@ -1288,6 +1338,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when failover is enabled"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Make sure refresh sees the new list of publications. */ @@ -1347,6 +1403,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... ADD PUBLICATION" : "ALTER SUBSCRIPTION ... DROP PUBLICATION"))); + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when failover is enabled"), + /* translator: %s is an SQL ALTER command */ + errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.", + isadd ? + "ALTER SUBSCRIPTION ... ADD PUBLICATION" : + "ALTER SUBSCRIPTION ... DROP PUBLICATION"))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Refresh the new list of publications. */ @@ -1392,6 +1458,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + /* + * See comments above for twophasestate, same holds true for + * 'failover' + */ + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when failover is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); AlterSubscription_refresh(sub, opts.copy_data, NULL); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..336c2bec99 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -74,8 +74,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -96,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_alter_slot = libpqrcv_alter_slot, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -883,8 +887,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + bool temporary, bool two_phase, bool failover, + CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -913,7 +917,14 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, else appendStringInfoChar(&cmd, ' '); } - + if (failover) + { + appendStringInfoString(&cmd, "FAILOVER"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } if (use_new_options_syntax) { switch (snapshot_action) @@ -982,6 +993,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, return snapshot; } +/* + * Change the definition of the replication slot. + */ +static void +libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover) +{ + StringInfoData cmd; + PGresult *res; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )", + quote_identifier(slotname), + failover ? "true" : "false"); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not alter replication slot \"%s\" on publisher: %s", + slotname, pchomp(PQerrorMessage(conn->streamConn))))); + + PQclear(res); +} + /* * Return PID of remote backend process. */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 1067aca08f..cf22f7aa43 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -30,6 +30,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "utils/array.h" #include "utils/builtins.h" @@ -109,6 +110,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wal_to_wait; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -228,6 +230,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + if (XLogRecPtrIsInvalid(upto_lsn)) + wal_to_wait = end_of_wal; + else + wal_to_wait = Min(upto_lsn, end_of_wal); + + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL upto wal_to_wait. + */ + WalSndWaitForStandbyConfirmation(wal_to_wait); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 37a0abe2f4..7036096653 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -614,15 +614,32 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Note: If the subscription has no tables then leave the state as * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to * work. + * + * Same goes for 'failover'. Enable it only if subscription has tables + * and all the tablesyncs have reached READY state. */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING || + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING) { CommandCounterIncrement(); /* make updates visible */ if (AllTablesyncsReady()) { + char buf[100]; + + buf[0] = '\0'; + + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + strcat(buf, "twophase"); + if (MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING) + { + if (buf[0] != '\0') + strcat(buf, " and "); + strcat(buf, "failover"); + } + ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that %s can be enabled", + MySubscription->name, buf))); should_exit = true; } } @@ -1412,7 +1429,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); + false /* failover */ , CRS_USE_SNAPSHOT, + origin_startpos); /* * Setup replication origin tracking. The purpose of doing this before the @@ -1714,10 +1732,13 @@ AllTablesyncsReady(void) } /* - * Update the two_phase state of the specified subscription in pg_subscription. + * Update the twophase and/or failover state of the specified subscription + * in pg_subscription. */ void -UpdateTwoPhaseState(Oid suboid, char new_state) +UpdateTwoPhaseFailoverStates(Oid suboid, + bool update_twophase, char new_state_twophase, + bool update_failover, char new_state_failover) { Relation rel; HeapTuple tup; @@ -1725,9 +1746,15 @@ UpdateTwoPhaseState(Oid suboid, char new_state) bool replaces[Natts_pg_subscription]; Datum values[Natts_pg_subscription]; - Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || - new_state == LOGICALREP_TWOPHASE_STATE_PENDING || - new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + if (update_twophase) + Assert(new_state_twophase == LOGICALREP_TWOPHASE_STATE_DISABLED || + new_state_twophase == LOGICALREP_TWOPHASE_STATE_PENDING || + new_state_twophase == LOGICALREP_TWOPHASE_STATE_ENABLED); + + if (update_failover) + Assert(new_state_failover == LOGICALREP_FAILOVER_STATE_DISABLED || + new_state_failover == LOGICALREP_FAILOVER_STATE_PENDING || + new_state_failover == LOGICALREP_FAILOVER_STATE_ENABLED); rel = table_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); @@ -1741,9 +1768,19 @@ UpdateTwoPhaseState(Oid suboid, char new_state) memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); - /* And update/set two_phase state */ - values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); - replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + /* Update/set two_phase state if asked by the caller */ + if (update_twophase) + { + values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state_twophase); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + } + + /* Update/set failover state if asked by the caller */ + if (update_failover) + { + values[Anum_pg_subscription_subfailoverstate - 1] = CharGetDatum(new_state_failover); + replaces[Anum_pg_subscription_subfailoverstate - 1] = true; + } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ba67eb156f..8f3f353fa3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3949,6 +3949,7 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || + newsub->failoverstate != MySubscription->failoverstate || !equal(newsub->publications, MySubscription->publications)) { if (am_parallel_apply_worker()) @@ -4484,6 +4485,8 @@ run_apply_worker() TimeLineID startpointTLI; char *err; bool must_use_password; + bool twophase_pending; + bool failover_pending; slotname = MySubscription->slotname; @@ -4541,16 +4544,37 @@ run_apply_worker() * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to * work. */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) + twophase_pending = (MySubscription->twophasestate + == LOGICALREP_TWOPHASE_STATE_PENDING) ? true : false; + failover_pending = (MySubscription->failoverstate + == LOGICALREP_FAILOVER_STATE_PENDING) ? true : false; + + if ((twophase_pending || failover_pending) && AllTablesyncsReady()) { /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; + if (twophase_pending) + options.proto.logical.twophase = true; + + if (failover_pending) + walrcv_alter_slot(LogRepWorkerWalRcvConn, slotname, true); + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + + /* Update twophase and/or failover */ + if (twophase_pending || failover_pending) + UpdateTwoPhaseFailoverStates(MySubscription->oid, + twophase_pending, + LOGICALREP_TWOPHASE_STATE_ENABLED, + failover_pending, + LOGICALREP_FAILOVER_STATE_ENABLED); + if (twophase_pending) + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + + if (failover_pending) + MySubscription->failoverstate = LOGICALREP_FAILOVER_STATE_ENABLED; + CommitTransactionCommand(); } else @@ -4559,11 +4583,15 @@ run_apply_worker() } ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s and failover is %s", MySubscription->name, MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?", + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_DISABLED ? "DISABLED" : + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING ? "PENDING" : + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED ? "ENABLED" : "?"))); /* Run the main loop. */ diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..b706046811 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ Node *replication_parse_result; %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT %token K_DROP_REPLICATION_SLOT +%token K_ALTER_REPLICATION_SLOT %token K_TIMELINE_HISTORY %token K_WAIT %token K_TIMELINE @@ -79,7 +80,8 @@ Node *replication_parse_result; %type command %type base_backup start_replication start_logical_replication - create_replication_slot drop_replication_slot identify_system + create_replication_slot drop_replication_slot + alter_replication_slot identify_system read_replication_slot timeline_history show %type generic_option_list %type generic_option @@ -111,6 +113,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | alter_replication_slot | read_replication_slot | timeline_history | show @@ -257,6 +260,18 @@ drop_replication_slot: } ; +/* ALTER_REPLICATION_SLOT slot */ +alter_replication_slot: + K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')' + { + AlterReplicationSlotCmd *cmd; + cmd = makeNode(AlterReplicationSlotCmd); + cmd->slotname = $2; + cmd->options = $4; + $$ = (Node *) cmd; + } + ; + /* * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] */ @@ -399,6 +414,7 @@ ident_or_keyword: | K_START_REPLICATION { $$ = "start_replication"; } | K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; } | K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; } + | K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; } | K_TIMELINE_HISTORY { $$ = "timeline_history"; } | K_WAIT { $$ = "wait"; } | K_TIMELINE { $$ = "timeline"; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 1cc7fb858c..0b5ae23195 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } +ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } @@ -301,6 +302,7 @@ replication_scanner_is_replication_command(void) case K_START_REPLICATION: case K_CREATE_REPLICATION_SLOT: case K_DROP_REPLICATION_SLOT: + case K_ALTER_REPLICATION_SLOT: case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 99823df3c7..e9babb69d4 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -52,6 +52,8 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -90,7 +92,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -98,9 +100,11 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +char *standby_slot_names; +List *standby_slot_names_list = NIL; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropAcquired(void); @@ -251,7 +255,8 @@ ReplicationSlotValidateName(const char *name, int elevel) */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, + bool two_phase, bool failover) { ReplicationSlot *slot = NULL; int i; @@ -311,6 +316,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.failover = failover; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -649,6 +655,31 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotDropAcquired(); } +/* + * Change the definition of the slot identified by the passed in name. + */ +void +ReplicationSlotAlter(const char *name, bool failover) +{ + Assert(MyReplicationSlot == NULL); + + ReplicationSlotAcquire(name, true); + + if (SlotIsPhysical(MyReplicationSlot)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use %s with a physical replication slot", + "ALTER_REPLICATION_SLOT")); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.failover = failover; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); +} + /* * Permanently drop the currently acquired replication slot. */ @@ -2135,3 +2166,121 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * A helper function to validate slots specified in standby_slot_names GUCs. + */ +static bool +validate_standby_slots(char **newval) +{ + char *rawname; + List *elemlist; + ListCell *lc; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Verify syntax and parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &elemlist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(elemlist); + return false; + } + + /* + * Verify 'type' of slot now. + * + * Skip check if replication slots' data is not initialized yet i.e. we + * are in startup process. + */ + if (!ReplicationSlotCtl) + return true; + + foreach(lc, elemlist) + { + char *name = lfirst(lc); + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", name); + list_free(elemlist); + return false; + } + + if (SlotIsLogical(slot)) + { + GUC_check_errdetail("cannot have logical replication slot \"%s\" " + "in this parameter", name); + list_free(elemlist); + return false; + } + } + + list_free(elemlist); + return true; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + if (strcmp(*newval, "") == 0) + return true; + + /* + * "*" is not accepted as in that case primary will not be able to know + * for which all standbys to wait for. Even if we have physical-slots + * info, there is no way to confirm whether there is any standby + * configured for the known physical slots. + */ + if (strcmp(*newval, "*") == 0) + { + GUC_check_errdetail("\"%s\" is not accepted for standby_slot_names", + *newval); + return false; + } + + /* Now verify if the specified slots really exist and have correct type */ + if (!validate_standby_slots(newval)) + return false; + + return true; +} + +/* + * Free the standby_slot_names_list. + */ +void +SlotSyncFreeConfig(void) +{ + list_free(standby_slot_names_list); + standby_slot_names_list = NIL; +} + +/* + * Initialize the list from raw standby_slot_names and cache it, + * in order to avoid parsing these repeatedly. Done at WALSender + * startup and after each SIGHUP. + */ +void +SlotSyncInitConfig(void) +{ + char *rawname; + + /* Free the previous allocation */ + SlotSyncFreeConfig(); + + if (strcmp(standby_slot_names, "") != 0) + { + rawname = pstrdup(standby_slot_names); + SplitIdentifierString(rawname, ',', &standby_slot_names_list); + } +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 4b694a03d0..3565ca196f 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -21,6 +21,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -42,7 +43,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + false); if (immediately_reserve) { @@ -117,6 +119,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, + bool failover, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -133,7 +136,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + failover); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +175,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool failover = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -188,6 +193,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) NameStr(*plugin), temporary, two_phase, + failover, InvalidXLogRecPtr, true); @@ -232,7 +238,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -412,6 +418,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(false); } + values[i++] = BoolGetDatum(slot_contents.data.failover); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -440,17 +448,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) if (startlsn < moveto) { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.restart_lsn = moveto; - SpinLockRelease(&MyReplicationSlot->mutex); + PhysicalConfirmReceivedLocation(moveto); retlsn = moveto; - - /* - * Dirty the slot so as it is written out at the next checkpoint. Note - * that the LSN position advanced may still be lost in the event of a - * crash, but this makes the data consistent after a clean shutdown. - */ - ReplicationSlotMarkDirty(); } return retlsn; @@ -679,6 +678,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -734,6 +734,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -773,6 +774,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) plugin, temporary, false, + failover, src_restart_lsn, false); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a3128874b2..8f4275f374 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -387,7 +387,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e250b0567e..eeb5ea6cfa 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -148,6 +148,12 @@ static TimeLineID sendTimeLineNextTLI = 0; static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; +/* + * The variable to store the current value of standby_slot_names before each + * ConfigReload. + */ +static char *StandbySlotNamesPreReload = NULL; + /* * How far have we sent WAL already? This is also advertised in * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) @@ -247,7 +253,8 @@ static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); static void WalSndKeepaliveIfNecessary(void); static void WalSndCheckTimeOut(void); static long WalSndComputeSleeptime(TimestampTz now); -static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); +static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event, + bool wait_for_standby); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, @@ -259,7 +266,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); - +static bool WalSndSlotInList(char *slot_names, List *slot_names_list); /* Initialize walsender process before entering the main command loop */ void @@ -828,6 +835,7 @@ StartReplication(StartReplicationCmd *cmd) SpinLockRelease(&MyWalSnd->mutex); SyncRepInitConfig(); + SlotSyncInitConfig(); /* Main loop of walsender */ replication_active = true; @@ -974,12 +982,13 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, bool *failover) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool failover_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1029,6 +1038,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1045,6 +1063,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool failover = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1054,13 +1073,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &failover); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); } else { @@ -1075,7 +1095,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, failover); } if (cmd->kind == REPLICATION_KIND_LOGICAL) @@ -1246,6 +1266,46 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) ReplicationSlotDrop(cmd->slotname, !cmd->wait); } +/* + * Process extra options given to ALTER_REPLICATION_SLOT. + */ +static void +parseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) +{ + ListCell *lc; + bool failover_given = false; + + /* Parse options */ + foreach(lc, cmd->options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* + * Change the definition of a replication slot. + */ +static void +AlterReplicationSlot(AlterReplicationSlotCmd *cmd) +{ + bool failover = false; + + parseAlterReplSlotOptions(cmd, &failover); + ReplicationSlotAlter(cmd->slotname, failover); +} + /* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). @@ -1318,6 +1378,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) replication_active = true; SyncRepInitConfig(); + SlotSyncInitConfig(); /* Main loop of walsender */ WalSndLoop(XLogSendLogical); @@ -1435,7 +1496,7 @@ ProcessPendingWrites(void) /* Sleep until something happens or we time out */ WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime, - WAIT_EVENT_WAL_SENDER_WRITE_DATA); + WAIT_EVENT_WAL_SENDER_WRITE_DATA, false); /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1527,27 +1588,288 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ProcessPendingWrites(); } +/* + * Does this Wal Sender need to wake up logical walsender. + * + * Check if the physical slot of this walsender is specified in + * standby_slot_names GUC. + */ +static bool +WalSndWakeupNeeded() +{ + bool inlist = false; + + Assert(MyReplicationSlot != NULL); + Assert(SlotIsPhysical(MyReplicationSlot)); + + /* + * Initialize the slot list. This is needed when it is called outside of + * the walsender. + */ + if (!am_walsender) + SlotSyncInitConfig(); + + inlist = WalSndSlotInList(standby_slot_names, standby_slot_names_list); + + /* + * The memory context used to allocate standby_slot_names_list in the + * non-walsender case will be freed at the end of this call. So free and + * nullify the list to avoid usage of freed list in the next call. + */ + if (!am_walsender) + SlotSyncFreeConfig(); + + return inlist; +} + +/* + * Helper function for WalSndWakeupNeeded. + */ +static bool +WalSndSlotInList(char *slot_names, List *slot_names_list) +{ + ListCell *l; + bool inlist = false; + + if (strcmp(standby_slot_names, "") == 0) + return false; + + /* Special handling for "*" which means all. */ + if (strcmp(slot_names, "*") == 0) + return true; + + foreach(l, slot_names_list) + { + char *name = lfirst(l); + + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + inlist = true; + break; + } + } + + return inlist; +} + +/* + * Assigns a copy of the standby_slot_names_list to the specified standby_slots + * pointer if there is any change in the GUC standby_slot_names or if the force + * flag is set to true. + * + * If the failover flag of the current replication slot is false, the function + * returns without making any changes. + */ +static void +WalSndGetStandbySlots(List **standby_slots, bool force) +{ + if (!MyReplicationSlot->data.failover) + return; + + if (standby_slot_names_list == NIL && strcmp(standby_slot_names, "") != 0) + SlotSyncInitConfig(); + + if (force || StandbySlotNamesPreReload == NULL || + strcmp(StandbySlotNamesPreReload, standby_slot_names) != 0) + { + list_free(*standby_slots); + + if (StandbySlotNamesPreReload) + pfree(StandbySlotNamesPreReload); + + StandbySlotNamesPreReload = pstrdup(standby_slot_names); + *standby_slots = list_copy(standby_slot_names_list); + } +} + +/* + * Filter the standby slots based on the specified log sequence number + * (wait_for_lsn). + * + * This function updates the passed standby_slots list, removing any slots that + * have already caught up to or surpassed the given wait_for_lsn. + */ +static void +WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) +{ + ListCell *lc; + List *standby_slots_cpy = *standby_slots; + + foreach(lc, standby_slots_cpy) + { + char *name = lfirst(lc); + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + bool invalidated = false; + char *warningfmt = NULL; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (slot && SlotIsPhysical(slot)) + { + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&slot->mutex); + } + + /* Continue if the current slot hasn't caught up. */ + if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) && + restart_lsn < wait_for_lsn) + { + /* Log warning if no active_pid for this physical slot */ + if (slot->active_pid == 0) + ereport(WARNING, + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the " + "standby associated with \"%s\"", name), + errhint("Consider starting standby associated with " + "\"%s\" or amend standby_slot_names", name)); + + continue; + } + + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + else if (!slot) + warningfmt = _("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring"); + + /* + * If logical slot name is given in standby_slot_names, give WARNING + * and skip it. Since it is harmless, so WARNING should be enough, no + * need to error-out. + */ + else if (SlotIsLogical(slot)) + warningfmt = _("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring"); + + /* + * Specified physical slot may have been invalidated, so no point in + * waiting for it. + */ + else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring"); + else + Assert(restart_lsn >= wait_for_lsn); + + /* + * Reaching here indicates that either the slot has passed the + * wait_for_lsn or there is an issue with the slot that requires a + * warning to be reported. + */ + if (warningfmt) + ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); + + standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + } + + *standby_slots = standby_slots_cpy; +} + +/* + * Wait for physical standby to confirm receiving given lsn. + * + * Here logical walsender associated with failover logical slot waits + * for physical standbys corresponding to physical slots specified in + * standby_slot_names GUC. + */ +void +WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + List *standby_slot_cpy = NIL; + + Assert(!am_walsender); + + if (!MyReplicationSlot->data.failover) + return; + + WalSndGetStandbySlots(&standby_slot_cpy, true); + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + for (;;) + { + long sleeptime = -1; + + WalSndFilterStandbySlots(wait_for_lsn, &standby_slot_cpy); + + /* Exit if done waiting for every slot. */ + if (standby_slot_cpy == NIL) + break; + + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SlotSyncInitConfig(); + WalSndGetStandbySlots(&standby_slot_cpy, false); + } + + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, sleeptime, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); + + SlotSyncFreeConfig(); + + list_free(standby_slot_cpy); + + if (StandbySlotNamesPreReload) + pfree(StandbySlotNamesPreReload); +} + /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * - * Returns end LSN of flushed WAL. Normally this will be >= loc, but - * if we detect a shutdown request (either from postmaster or client) - * we will return early, so caller must always check. + * If the walsender holds a logical slot that has enabled failover, the + * function also waits for all the specified streaming replication standby + * servers to confirm receipt of WAL upto RecentFlushPtr. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we + * detect a shutdown request (either from postmaster or client) we will return + * early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + bool wait_for_standby = false; + uint32 wait_event; + List *standby_slots = NIL; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + WalSndGetStandbySlots(&standby_slots, true); + /* - * Fast path to avoid acquiring the spinlock in case we already know we - * have enough WAL available. This is particularly interesting if we're - * far behind. + * Check if all the standby servers have confirmed receipt of WAL upto + * RecentFlushPtr if we already know we have enough WAL available. + * + * Note that we cannot directly return without checking the status of + * standby servers because the standby_slot_names may have changed, which + * means there could be new standby slots in the list that have not yet + * caught up to the RecentFlushPtr. */ if (RecentFlushPtr != InvalidXLogRecPtr && loc <= RecentFlushPtr) - return RecentFlushPtr; + { + WalSndFilterStandbySlots(RecentFlushPtr, &standby_slots); + + /* + * Fast path to entering the loop in case we already know we have + * enough WAL available and all the standby servers has confirmed + * receipt of WAL upto RecentFlushPtr. This is particularly + * interesting if we're far behind. + */ + if (standby_slots == NIL) + return RecentFlushPtr; + } /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) @@ -1570,6 +1892,8 @@ WalSndWaitForWal(XLogRecPtr loc) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); + WalSndGetStandbySlots(&standby_slots, false); } /* Check for input from the client */ @@ -1583,8 +1907,18 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) XLogBackgroundFlush(); + /* + * Update the standby slots that have not yet caught up to the flushed + * position. It is good to wait upto RecentFlushPtr and then let it + * send the changes to logical subscribers one by one which are + * already covered in RecentFlushPtr without needing to wait on every + * change for standby confirmation. + */ + if (wait_for_standby) + WalSndFilterStandbySlots(RecentFlushPtr, &standby_slots); + /* Update our idea of the currently flushed position. */ - if (!RecoveryInProgress()) + else if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); @@ -1612,8 +1946,14 @@ WalSndWaitForWal(XLogRecPtr loc) !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); - /* check whether we're done */ - if (loc <= RecentFlushPtr) + if (loc > RecentFlushPtr) + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL; + else if (standby_slots) + { + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION; + wait_for_standby = true; + } + else break; /* Waiting for new WAL. Since we need to wait, we're now caught up. */ @@ -1654,9 +1994,14 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); + WalSndWait(wakeEvents, sleeptime, wait_event, wait_for_standby); } + list_free(standby_slots); + + if (StandbySlotNamesPreReload) + pfree(StandbySlotNamesPreReload); + /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; @@ -1819,6 +2164,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_AlterReplicationSlotCmd: + cmdtag = "ALTER_REPLICATION_SLOT"; + set_ps_display(cmdtag); + AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; @@ -2030,7 +2382,7 @@ ProcessStandbyMessage(void) /* * Remember that a walreceiver just confirmed receipt of lsn `lsn`. */ -static void +void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { bool changed = false; @@ -2049,6 +2401,9 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + + if (WalSndWakeupNeeded()) + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); } /* @@ -2469,6 +2824,7 @@ WalSndLoop(WalSndSendDataCallback send_data) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Check for input from the client */ @@ -2562,7 +2918,8 @@ WalSndLoop(WalSndSendDataCallback send_data) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN, + false); } } } @@ -3311,6 +3668,8 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } @@ -3351,7 +3710,8 @@ WalSndWakeup(bool physical, bool logical) * on postmaster death. */ static void -WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) +WalSndWait(uint32 socket_events, long timeout, uint32 wait_event, + bool wait_for_standby) { WaitEvent event; @@ -3381,7 +3741,9 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_for_standby) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index d7995931bd..f6e2ec82c1 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -76,6 +76,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." +WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION "Waiting for physical standby confirmation in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 7605eff9b9..d728e33d5e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4562,6 +4562,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of streaming replication standby server slot " + "names that logical walsenders waits for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "walsenders only after specified replication slots " + "confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index e48c066a5b..dd2769cdd3 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -326,6 +326,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsenders waits for # - Standby Servers - diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index bac94a338c..8b407b2f49 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6595,7 +6595,8 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false, false, + false}; if (pset.sversion < 100000) { @@ -6654,10 +6655,12 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", suborigin AS \"%s\"\n" ", subpasswordrequired AS \"%s\"\n" - ", subrunasowner AS \"%s\"\n", + ", subrunasowner AS \"%s\"\n" + ", subfailoverstate AS \"%s\"\n", gettext_noop("Origin"), gettext_noop("Password required"), - gettext_noop("Run as owner?")); + gettext_noop("Run as owner?"), + gettext_noop("Enable failover?")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 93742fc6ac..5f065e5c55 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3302,7 +3302,8 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "streaming", "synchronous_commit", "two_phase", + "failover"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 091f7e343c..ca626dff13 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11083,17 +11083,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool', - proallargtypes => '{name,name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', + proargtypes => 'name name bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e0b91eacd2..3f656b2f77 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -31,6 +31,10 @@ #define LOGICALREP_TWOPHASE_STATE_PENDING 'p' #define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' +#define LOGICALREP_FAILOVER_STATE_DISABLED 'd' +#define LOGICALREP_FAILOVER_STATE_PENDING 'p' +#define LOGICALREP_FAILOVER_STATE_ENABLED 'e' + /* * The subscription will request the publisher to only send changes that do not * have any origin. @@ -93,6 +97,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + char subfailoverstate; /* Enable Failover State */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -145,6 +151,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + char failoverstate; /* Allow slot to be synchronized for failover */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 5142a08729..bef8a7162e 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd } DropReplicationSlotCmd; +/* ---------------------- + * ALTER_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct AlterReplicationSlotCmd +{ + NodeTag type; + char *slotname; + List *options; +} AlterReplicationSlotCmd; + + /* ---------------------- * START_REPLICATION command * ---------------------- diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index d3535eed58..4bdb6edd83 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a failover slot (sync candidate for physical standbys)? + * Relevant for logical slots on the primary server. + */ + bool failover; } ReplicationSlotPersistentData; /* @@ -210,6 +216,10 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *standby_slot_names; + +/* Globals */ +extern PGDLLIMPORT List *standby_slot_names_list; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -218,9 +228,10 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase); + bool two_phase, bool failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); @@ -253,4 +264,8 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn); +extern void SlotSyncInitConfig(void); +extern void SlotSyncFreeConfig(void); + #endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 04b439dc50..115344f1c4 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -356,9 +356,20 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +/* + * walrcv_alter_slot_fn + * + * Change the definition of a replication slot. Currently, it only supports + * changing the failover property of the slot. + */ +typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, + const char *slotname, + bool failover); + /* * walrcv_get_backend_pid_fn * @@ -400,6 +411,7 @@ typedef struct WalReceiverFunctionsType walrcv_receive_fn walrcv_receive; walrcv_send_fn walrcv_send; walrcv_create_slot_fn walrcv_create_slot; + walrcv_alter_slot_fn walrcv_alter_slot; walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; @@ -429,8 +441,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) +#define walrcv_alter_slot(conn, slotname, failover) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 268f8e8d0f..ecbd3526c5 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -14,6 +14,8 @@ #include +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -47,6 +49,8 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); /* * Remember that we want to wakeup walsenders later diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 13fd5877a6..7655437510 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -113,6 +113,8 @@ typedef struct ConditionVariable wal_flush_cv; ConditionVariable wal_replay_cv; + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 47854b5cd4..a9bba11187 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -258,7 +258,9 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); extern bool AllTablesyncsReady(void); -extern void UpdateTwoPhaseState(Oid suboid, char new_state); +extern void UpdateTwoPhaseFailoverStates(Oid suboid, + bool update_twophase, char new_state_twophase, + bool update_failover, char new_state_failover); extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 2a191830a8..6d87f64ab0 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -160,5 +160,7 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 9d8039684a..3be3ee52fc 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -45,6 +45,7 @@ tests += { 't/037_invalid_database.pl', 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', + 't/050_verify_slot_order.pl', ], }, } diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 5025d65b1b..a3c3ee3a14 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); # Test logical slot advancing and its durability. +# Pass failover=true (last-arg), it should not have any impact on advancing. my $logical_slot = 'logical_slot'; $node_primary->safe_psql('postgres', - "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);" + "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);" ); $node_primary->psql( 'postgres', " diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl new file mode 100644 index 0000000000..42e51634c5 --- /dev/null +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -0,0 +1,145 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: +# +# | ----> standby1 (connected via streaming replication) +# | ----> standby2 (connected via streaming replication) +# primary ----- | +# | ----> subscriber1 (connected via logical replication) +# | ----> subscriber2 (connected via logical replication) +# +# Set up is configured in such a way that primary never lets subscriber1 ahead +# of standby1. + +# Create primary +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Configure primary to disallow specified logical replication slot (lsub1_slot) +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb1_slot' +)); +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Create publication on primary +my $publisher = $primary; +$publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR TABLE tab_int;"); +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; + +# Create a subscriber node, wait for sync to complete +my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$subscriber1->init(allows_streaming => 'logical'); +$subscriber1->start; +$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +# Create a subscription with failover = true +$subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub1_slot, failover = true);"); +$subscriber1->wait_for_subscription_sync; + +# Create another subscriber node, wait for sync to complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init(allows_streaming => 'logical'); +$subscriber2->start; +$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub2_slot);"); +$subscriber2->wait_for_subscription_sync; + +# Stop the standby associated with specified physical replication slot so that +# the logical replication slot won't receive changes until the standby comes +# up. +$standby1->stop; + +# Create some data on primary +my $primary_row_count = 10; +my $primary_insert_time = time(); +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait for the standby that's up and running gets the data from primary +$primary->wait_for_replay_catchup($standby2); +my $result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for the subscription that's up and running and is not enabled for failover. +# It gets the data from primary without waiting for any standbys. +$publisher->wait_for_catchup('mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The subscription that's up and running and is enabled for failover +# doesn't get the data from primary and keeps waiting for the +# standby specified in standby_slot_names. +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes"); + +# Start the standby specified in standby_slot_names and wait for it to catch +# up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, +# primary must send the decoded changes to subscription enabled for failover +# While the standby was down, this subscriber didn't receive any data from +# primary i.e. the primary didn't allow it to go ahead of standby. +$publisher->wait_for_catchup('mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); + +done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1442c43d9c..c9647e86b2 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflicting - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) + l.conflicting, + l.failover + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b15eddbff3..0253752d03 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | d | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,10 +371,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -383,10 +383,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,10 +396,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,18 +412,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 87c1aee379..0f184fb103 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -85,6 +85,7 @@ AlterOwnerStmt AlterPolicyStmt AlterPublicationAction AlterPublicationStmt +AlterReplicationSlotCmd AlterRoleSetStmt AlterRoleStmt AlterSeqStmt @@ -3856,6 +3857,7 @@ varattrib_1b_e varattrib_4b vbits verifier_context +walrcv_alter_slot_fn walrcv_check_conninfo_fn walrcv_connect_fn walrcv_create_slot_fn -- 2.30.0.windows.2