From c9db0959ca56fe18906f3584dcee92068b0e6f43 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Tue, 26 Dec 2023 11:35:10 +0800 Subject: [PATCH v54 1/3] Enable setting failover property for a slot through SQL API and subscription commands This commit adds the failover property to the replication slot. The failover property indicates whether the slot will be synced to the standby servers, enabling the resumption of corresponding logical replication after failover. But note that this commit does not yet include the capability to actually sync the replication slot; the next patch will address that. In addition, a new replication command named ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named walrcv_alter_slot have been implemented. These additions provide subscribers or users the ability to modify the failover property of a replication slot on the publisher. Moreover, a new subscription option called 'failover' has been added, allowing users to set it when creating a subscription. At present, altering the failover option of an existing subscription is not permitted. However, this restriction may be lifted in future versions. Also, a new parameter 'failover' is added to the pg_create_logical_replication_slot function. The value of the 'failover' flag is displayed as part of pg_replication_slots view. --- contrib/test_decoding/expected/slot.out | 58 ++++++ contrib/test_decoding/sql/slot.sql | 13 ++ doc/src/sgml/catalogs.sgml | 12 ++ doc/src/sgml/func.sgml | 11 +- doc/src/sgml/protocol.sgml | 51 ++++++ doc/src/sgml/ref/alter_subscription.sgml | 20 ++- doc/src/sgml/ref/create_subscription.sgml | 25 +++ doc/src/sgml/system-views.sgml | 11 ++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 6 +- src/backend/commands/subscriptioncmds.c | 114 ++++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 38 +++- src/backend/replication/logical/tablesync.c | 53 ++++-- src/backend/replication/logical/worker.c | 67 ++++++- src/backend/replication/repl_gram.y | 20 ++- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 33 +++- src/backend/replication/slotfuncs.c | 16 +- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 67 ++++++- src/bin/pg_dump/pg_dump.c | 20 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_upgrade/info.c | 5 +- src/bin/pg_upgrade/pg_upgrade.c | 6 +- src/bin/pg_upgrade/pg_upgrade.h | 2 + src/bin/pg_upgrade/t/003_logical_slots.pl | 6 +- src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_proc.dat | 14 +- src/include/catalog/pg_subscription.h | 11 ++ src/include/nodes/replnodes.h | 12 ++ src/include/replication/slot.h | 9 +- src/include/replication/walreceiver.h | 18 +- src/include/replication/worker_internal.h | 3 +- .../t/050_standby_failover_slots_sync.pl | 64 +++++++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/subscription.out | 165 ++++++++++-------- src/test/regress/sql/subscription.sql | 8 + src/tools/pgindent/typedefs.list | 2 + 40 files changed, 829 insertions(+), 153 deletions(-) create mode 100644 src/test/recovery/t/050_standby_failover_slots_sync.pl diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 63a9940f73..261d8886d3 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -406,3 +406,61 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp'); (1 row) +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + ?column? +---------- + init +(1 row) + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + slot_name | slot_type | failover +-----------------------+-----------+---------- + failover_true_slot | logical | t + failover_false_slot | logical | f + failover_default_slot | logical | f + physical_slot | physical | f +(4 rows) + +SELECT pg_drop_replication_slot('failover_true_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_false_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_default_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('physical_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 1aa27c5667..45aeae7fd5 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -176,3 +176,16 @@ ORDER BY o.slot_name, c.slot_name; SELECT pg_drop_replication_slot('orig_slot2'); SELECT pg_drop_replication_slot('copied_slot2_no_change'); SELECT pg_drop_replication_slot('copied_slot2_notemp'); + +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + +SELECT pg_drop_replication_slot('failover_true_slot'); +SELECT pg_drop_replication_slot('failover_false_slot'); +SELECT pg_drop_replication_slot('failover_default_slot'); +SELECT pg_drop_replication_slot('physical_slot'); diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 3ec7391ec5..e666730c64 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7990,6 +7990,18 @@ SCRAM-SHA-256$<iteration count>:&l + + + subfailoverstate char + + + State codes for failover mode: + d = disabled, + p = pending enablement, + e = enabled + + + subconninfo text diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 20da3ed033..90f1f19018 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27541,7 +27541,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean ) record ( slot_name name, lsn pg_lsn ) @@ -27556,8 +27556,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, twophase, when set to true, specifies that the decoding of prepared transactions is enabled for this - slot. A call to this function has the same effect as the replication - protocol command CREATE_REPLICATION_SLOT ... LOGICAL. + slot. The optional fifth parameter, + failover, when set to true, + specifies that this slot is enabled to be synced to the + physical standbys so that logical replication can be resumed + after failover. A call to this function has the same effect as + the replication protocol command + CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 9a66918171..65bf7f58a8 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + FAILOVER [ boolean ] + + + If true, the slot is enabled to be synced to the physical + standbys so that logical replication can be resumed after failover. + + + @@ -2124,6 +2134,47 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + ALTER_REPLICATION_SLOT slot_name ( option [, ...] ) + ALTER_REPLICATION_SLOT + + + + Change the definition of a replication slot. + See for more about + replication slots. This command is currently only supported for logical + replication slots. + + + + + slot_name + + + The name of the slot to alter. Must be a valid replication slot + name (see ). + + + + + + The following options are supported: + + + + FAILOVER [ boolean ] + + + If true, the slot is enabled to be synced to the physical + standbys so that logical replication can be resumed after failover. + + + + + + + + READ_REPLICATION_SLOT slot_name READ_REPLICATION_SLOT diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6d36ff0dc9..481e397bad 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -73,11 +73,14 @@ ALTER SUBSCRIPTION name RENAME TO < These commands also cannot be executed when the subscription has two_phase - commit enabled, unless + commit enabled or + failover + enabled, unless copy_data is false. See column subtwophasestate - of pg_subscription - to know the actual two-phase state. + and subfailoverstate of + pg_subscription + to know the actual state. @@ -230,6 +233,17 @@ ALTER SUBSCRIPTION name RENAME TO < origin. Only a superuser can set password_required = false. + + + When altering the + slot_name, + the failover property of the new slot may differ from the + failover + parameter specified in the subscription. When creating the slot, + ensure the slot failover property matches the + failover + parameter value of the subscription. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index f1c20b3a46..4d17e93a09 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -399,6 +399,31 @@ CREATE SUBSCRIPTION subscription_name + + + failover (boolean) + + + Specifies whether the replication slot associated with the subscription + is enabled to be synced to the physical standbys so that logical + replication can be resumed from the new primary after failover. + The default is false. + + + + The implementation of failover requires that replication + has successfully finished the initial table synchronization + phase. So even when failover is enabled for a + subscription, the internal failover state remains + temporarily pending until the initialization phase + completes. See column subfailoverstate + of pg_subscription + to know the actual failover state. It is the user's responsibility + to ensure that the initial table synchronization has been completed + before allowing the subscription to transition to the new primary. + + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 0ef1745631..1dc695fd3a 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2532,6 +2532,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx invalidated). Always NULL for physical slots. + + + + failover bool + + + True if this logical slot is enabled to be synced to the physical + standbys so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d6a978f136..18512955ad 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->disableonerr = subform->subdisableonerr; sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; + sub->failoverstate = subform->subfailoverstate; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 4206752881..4db796aa0b 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, + IN failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 058fc47c91..b56d1fbab2 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflicting + L.conflicting, + L.failover FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); @@ -1357,7 +1358,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, - subslotname, subsynccommit, subpublications, suborigin) + subslotname, subsynccommit, subpublications, suborigin, + subfailoverstate) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index edc82c11be..7e4cd214cf 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_FAILOVER 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -96,6 +97,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; char *origin; + bool failover; XLogRecPtr lsn; } SubOpts; @@ -157,6 +159,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_FAILOVER)) + opts->failover = false; /* Parse options */ foreach(lc, stmt_options) @@ -326,6 +330,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", opts->origin)); } + else if (IsSet(supported_opts, SUBOPT_FAILOVER) && + strcmp(defel->defname, "failover") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_FAILOVER; + opts->failover = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -591,7 +604,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_FAILOVER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -710,6 +724,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subfailoverstate - 1] = + CharGetDatum(opts.failover ? + LOGICALREP_FAILOVER_STATE_PENDING : + LOGICALREP_FAILOVER_STATE_DISABLED); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -746,6 +764,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool failover_enabled = false; + check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); @@ -776,6 +796,19 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, InvalidXLogRecPtr); } + /* + * Even if failover is set, don't create the slot with failover + * enabled. Will enable it once all the tables are synced and + * ready. The intention is that if failover happens at the time of + * table-sync, user should re-launch the subscription instead of + * relying on main slot (if synced) with no table-sync data + * present. When the subscription has no tables, leave failover as + * false to allow ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (opts.failover && !opts.copy_data && tables != NIL) + failover_enabled = true; + /* * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to @@ -807,15 +840,38 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); - - if (twophase_enabled) - UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); + failover_enabled, CRS_NOEXPORT_SNAPSHOT, NULL); + /* Update twophase and/or failover state */ + EnableTwoPhaseFailoverTriState(subid, twophase_enabled, + failover_enabled); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", opts.slot_name))); } + + /* + * If the slot_name is specified without the create_slot option, + * it is possible that the user intends to use an existing slot on + * the publisher, so here we alter the failover property of the + * slot to match the failover value in subscription. + * + * We do not need to change the failover to false if the server + * does not support failover (e.g. pre-PG17). + */ + else if (opts.slot_name && + (failover_enabled || walrcv_server_version(wrconn) >= 170000)) + { + bool failover_delayed = (!failover_enabled && opts.failover); + + walrcv_alter_slot(wrconn, opts.slot_name, failover_enabled); + ereport(NOTICE, + (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s", + opts.slot_name, failover_enabled ? "true" : "false"), + failover_delayed ? + errdetail("The failover state will be set to true once table synchronization has been completed.") + : 0)); + } } PG_FINALLY(); { @@ -1279,13 +1335,22 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH for details why + * copy_data is not allowed when twophase or failover is + * enabled. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when %s is enabled", "two_phase"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when %s is enabled", "failover"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); @@ -1334,13 +1399,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)"))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH for details why + * copy_data is not allowed when twophase or failover is + * enabled. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when %s is enabled", "two_phase"), + /* translator: %s is an SQL ALTER command */ + errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.", + isadd ? + "ALTER SUBSCRIPTION ... ADD PUBLICATION" : + "ALTER SUBSCRIPTION ... DROP PUBLICATION"))); + + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when %s is enabled", "failover"), /* translator: %s is an SQL ALTER command */ errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.", isadd ? @@ -1389,7 +1467,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when %s is enabled", "two_phase"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + + /* + * See comments above for twophasestate, same holds true for + * 'failover'. + */ + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when %s is enabled", "failover"), errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 693b3669ba..9978f67b98 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -74,8 +74,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -96,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_alter_slot = libpqrcv_alter_slot, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -888,8 +892,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + bool temporary, bool two_phase, bool failover, + CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -918,7 +922,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, else appendStringInfoChar(&cmd, ' '); } - + if (failover) + appendStringInfoString(&cmd, "FAILOVER, "); if (use_new_options_syntax) { switch (snapshot_action) @@ -987,6 +992,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, return snapshot; } +/* + * Change the definition of the replication slot. + */ +static void +libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover) +{ + StringInfoData cmd; + PGresult *res; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )", + quote_identifier(slotname), + failover ? "true" : "false"); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not alter replication slot \"%s\"", + slotname))); + + PQclear(res); +} + /* * Return PID of remote backend process. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 4d056c16c8..7b6170fe55 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -624,15 +624,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Note: If the subscription has no tables then leave the state as * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to * work. + * + * Same goes for 'failover'. Enable it only if subscription has tables + * and all the tablesyncs have reached READY state. */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING || + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING) { CommandCounterIncrement(); /* make updates visible */ if (AllTablesyncsReady()) { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + ereport(LOG, + /* translator: %s is a subscription option */ + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that %s can be enabled", + MySubscription->name, "two_phase"))); + + if (MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING) + ereport(LOG, + /* translator: %s is a subscription option */ + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that %s can be enabled", + MySubscription->name, "failover"))); + should_exit = true; } } @@ -1430,7 +1443,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); + false /* failover */ , CRS_USE_SNAPSHOT, + origin_startpos); /* * Setup replication origin tracking. The purpose of doing this before the @@ -1732,10 +1746,12 @@ AllTablesyncsReady(void) } /* - * Update the two_phase state of the specified subscription in pg_subscription. + * Update the twophase and/or failover state of the specified subscription + * in pg_subscription. */ void -UpdateTwoPhaseState(Oid suboid, char new_state) +EnableTwoPhaseFailoverTriState(Oid suboid, bool enable_twophase, + bool enable_failover) { Relation rel; HeapTuple tup; @@ -1743,9 +1759,8 @@ UpdateTwoPhaseState(Oid suboid, char new_state) bool replaces[Natts_pg_subscription]; Datum values[Natts_pg_subscription]; - Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || - new_state == LOGICALREP_TWOPHASE_STATE_PENDING || - new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + if (!enable_twophase && !enable_failover) + return; rel = table_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); @@ -1759,9 +1774,21 @@ UpdateTwoPhaseState(Oid suboid, char new_state) memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); - /* And update/set two_phase state */ - values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); - replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + /* Update/set two_phase state if asked by the caller */ + if (enable_twophase) + { + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(LOGICALREP_TWOPHASE_STATE_ENABLED); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + } + + /* Update/set failover state if asked by the caller */ + if (enable_failover) + { + values[Anum_pg_subscription_subfailoverstate - 1] = + CharGetDatum(LOGICALREP_FAILOVER_STATE_ENABLED); + replaces[Anum_pg_subscription_subfailoverstate - 1] = true; + } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 21abf34ef7..e46a1955e8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -132,6 +132,33 @@ * avoid such deadlocks, we generate a unique GID (consisting of the * subscription oid and the xid of the prepared transaction) for each prepare * transaction on the subscriber. + * + * FAILOVER + * ---------------------- + * The logical slot on the primary can be synced to the standby by specifying + * failover = true when creating the subscription. Enabling failover allows us + * to smoothly transition to the promoted standby, ensuring that we can + * subscribe to the new primary without losing any data. + * + * However, we do not enable failover for slots created by the table sync + * worker. + * + * Additionally, failover is not enabled for the main slot if the table sync is + * in progress. This is because if a failover occurs while the table sync + * worker has reached a certain state (SUBREL_STATE_FINISHEDCOPY or + * SUBREL_STATE_DATASYNC), replication will not be able to continue from the + * new primary node. + * + * As a result, we enable the failover option for the main slot only after the + * initial sync is complete. The failover option is implemented as a tri-state + * with values DISABLED, PENDING, and ENABLED. The state transition process + * between these values is the same as the two_phase option (see TWO_PHASE + * TRANSACTIONS for details). + * + * During the startup of the apply worker, it checks if all table syncs are in + * the READY state for a failover tri-state of PENDING. If so, it alters the + * main slot's failover property to true and updates the tri-state value from + * PENDING to ENABLED. *------------------------------------------------------------------------- */ @@ -3947,6 +3974,7 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || + newsub->failoverstate != MySubscription->failoverstate || !equal(newsub->publications, MySubscription->publications)) { if (am_parallel_apply_worker()) @@ -4482,6 +4510,8 @@ run_apply_worker() TimeLineID startpointTLI; char *err; bool must_use_password; + bool twophase_pending; + bool failover_pending; slotname = MySubscription->slotname; @@ -4538,17 +4568,38 @@ run_apply_worker() * Note: If the subscription has no tables then leave the state as * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to * work. + * + * Same goes for 'failover'. It is enabled only if subscription has tables + * and all the tablesyncs have reached READY state, until then it remains + * as PENDING. */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) + twophase_pending = + (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING); + failover_pending = + (MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING); + + if ((twophase_pending || failover_pending) && AllTablesyncsReady()) { /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; + if (twophase_pending) + options.proto.logical.twophase = true; + + if (failover_pending) + walrcv_alter_slot(LogRepWorkerWalRcvConn, slotname, true); + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + + /* Update twophase and/or failover */ + EnableTwoPhaseFailoverTriState(MySubscription->oid, twophase_pending, + failover_pending); + if (twophase_pending) + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + + if (failover_pending) + MySubscription->failoverstate = LOGICALREP_FAILOVER_STATE_ENABLED; + CommitTransactionCommand(); } else @@ -4557,11 +4608,15 @@ run_apply_worker() } ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s and failover is %s", MySubscription->name, MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?", + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_DISABLED ? "DISABLED" : + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING ? "PENDING" : + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED ? "ENABLED" : "?"))); /* Run the main loop. */ diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index a5d118ed68..fac73f402e 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ Node *replication_parse_result; %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT %token K_DROP_REPLICATION_SLOT +%token K_ALTER_REPLICATION_SLOT %token K_TIMELINE_HISTORY %token K_WAIT %token K_TIMELINE @@ -80,8 +81,9 @@ Node *replication_parse_result; %type command %type base_backup start_replication start_logical_replication - create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show upload_manifest + create_replication_slot drop_replication_slot + alter_replication_slot identify_system read_replication_slot + timeline_history show upload_manifest %type generic_option_list %type generic_option %type opt_timeline @@ -112,6 +114,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | alter_replication_slot | read_replication_slot | timeline_history | show @@ -259,6 +262,18 @@ drop_replication_slot: } ; +/* ALTER_REPLICATION_SLOT slot */ +alter_replication_slot: + K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')' + { + AlterReplicationSlotCmd *cmd; + cmd = makeNode(AlterReplicationSlotCmd); + cmd->slotname = $2; + cmd->options = $4; + $$ = (Node *) cmd; + } + ; + /* * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] */ @@ -410,6 +425,7 @@ ident_or_keyword: | K_START_REPLICATION { $$ = "start_replication"; } | K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; } | K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; } + | K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; } | K_TIMELINE_HISTORY { $$ = "timeline_history"; } | K_WAIT { $$ = "wait"; } | K_TIMELINE { $$ = "timeline"; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 4805da08ee..e4a155c7c8 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } +ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } @@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void) case K_START_REPLICATION: case K_CREATE_REPLICATION_SLOT: case K_DROP_REPLICATION_SLOT: + case K_ALTER_REPLICATION_SLOT: case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_UPLOAD_MANIFEST: diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 18bc28195b..1279bedd1a 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel) * during getting changes, if the two_phase option is enabled it can skip * prepare because by that time start decoding point has been moved. So the * user will only get commit prepared. + * failover: If enabled, allows the slot to be synced to physical standbys so + * that logical replication can be resumed after failover. */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, + bool two_phase, bool failover) { ReplicationSlot *slot = NULL; int i; @@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.failover = failover; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -679,6 +683,31 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotDropAcquired(); } +/* + * Change the definition of the slot identified by the specified name. + */ +void +ReplicationSlotAlter(const char *name, bool failover) +{ + Assert(MyReplicationSlot == NULL); + + ReplicationSlotAcquire(name, true); + + if (SlotIsPhysical(MyReplicationSlot)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use %s with a physical replication slot", + "ALTER_REPLICATION_SLOT")); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.failover = failover; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); +} + /* * Permanently drop the currently acquired replication slot. */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 4b694a03d0..248f9574a0 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + false); if (immediately_reserve) { @@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, + bool failover, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + failover); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool failover = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) NameStr(*plugin), temporary, two_phase, + failover, InvalidXLogRecPtr, true); @@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -412,6 +417,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(false); } + values[i++] = BoolGetDatum(slot_contents.data.failover); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -679,6 +686,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -734,6 +742,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -773,6 +782,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) plugin, temporary, false, + failover, src_restart_lsn, false); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 26ded928a7..ca61a99785 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -387,7 +387,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d4aa9e1c96..cc59e8b52e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1126,12 +1126,13 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, bool *failover) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool failover_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool failover = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1206,13 +1217,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &failover); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); if (reserve_wal) { @@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, failover); /* * Do options check early so that we can bail before calling the @@ -1398,6 +1410,46 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) ReplicationSlotDrop(cmd->slotname, !cmd->wait); } +/* + * Process extra options given to ALTER_REPLICATION_SLOT. + */ +static void +ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) +{ + ListCell *lc; + bool failover_given = false; + + /* Parse options */ + foreach(lc, cmd->options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* + * Change the definition of a replication slot. + */ +static void +AlterReplicationSlot(AlterReplicationSlotCmd *cmd) +{ + bool failover = false; + + ParseAlterReplSlotOptions(cmd, &failover); + ReplicationSlotAlter(cmd->slotname, failover); +} + /* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). @@ -1971,6 +2023,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_AlterReplicationSlotCmd: + cmdtag = "ALTER_REPLICATION_SLOT"; + set_ps_display(cmdtag); + AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 8c0b5486b9..373c2514df 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4618,6 +4618,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_suborigin; + int i_subfailoverstate; int i, ntups; @@ -4673,14 +4674,22 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " s.subpasswordrequired,\n" " s.subrunasowner,\n" - " s.suborigin\n"); + " s.suborigin,\n"); else appendPQExpBuffer(query, " 't' AS subpasswordrequired,\n" " 't' AS subrunasowner,\n" - " '%s' AS suborigin\n", + " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY); + if (fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, + " s.subfailoverstate\n"); + else + appendPQExpBuffer(query, + " '%c' AS subfailoverstate\n", + LOGICALREP_FAILOVER_STATE_DISABLED); + appendPQExpBufferStr(query, "FROM pg_subscription s\n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" @@ -4709,6 +4718,7 @@ getSubscriptions(Archive *fout) i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); i_suborigin = PQfnumber(res, "suborigin"); + i_subfailoverstate = PQfnumber(res, "subfailoverstate"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4746,6 +4756,8 @@ getSubscriptions(Archive *fout) subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + subinfo[i].subfailoverstate = + pg_strdup(PQgetvalue(res, i, i_subfailoverstate)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4771,6 +4783,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) int npubnames = 0; int i; char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'}; + char failover_disabled[] = {LOGICALREP_FAILOVER_STATE_DISABLED, '\0'}; /* Do nothing in data-only dump */ if (dopt->dataOnly) @@ -4818,6 +4831,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0) appendPQExpBufferStr(query, ", two_phase = on"); + if (strcmp(subinfo->subfailoverstate, failover_disabled) != 0) + appendPQExpBufferStr(query, ", failover = true"); + if (strcmp(subinfo->subdisableonerr, "t") == 0) appendPQExpBufferStr(query, ", disable_on_error = true"); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 2fe3cbed9a..f62a4dfd4b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -671,6 +671,7 @@ typedef struct _SubscriptionInfo char *subsynccommit; char *subpublications; char *suborigin; + char *subfailoverstate; } SubscriptionInfo; /* diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 4878aa22bf..e16286f18c 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -661,7 +661,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, " + res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " "%s as caught_up, conflicting as invalid " "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " @@ -679,6 +679,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) int i_slotname; int i_plugin; int i_twophase; + int i_failover; int i_caught_up; int i_invalid; @@ -687,6 +688,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) i_slotname = PQfnumber(res, "slot_name"); i_plugin = PQfnumber(res, "plugin"); i_twophase = PQfnumber(res, "two_phase"); + i_failover = PQfnumber(res, "failover"); i_caught_up = PQfnumber(res, "caught_up"); i_invalid = PQfnumber(res, "invalid"); @@ -697,6 +699,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname)); curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin)); curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0); + curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0); curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0); curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0); } diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 3960af4036..09f7437716 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -916,8 +916,10 @@ create_logical_replication_slots(void) appendStringLiteralConn(query, slot_info->slotname, conn); appendPQExpBuffer(query, ", "); appendStringLiteralConn(query, slot_info->plugin, conn); - appendPQExpBuffer(query, ", false, %s);", - slot_info->two_phase ? "true" : "false"); + + appendPQExpBuffer(query, ", false, %s, %s);", + slot_info->two_phase ? "true" : "false", + slot_info->failover ? "true" : "false"); PQclear(executeQueryOrDie(conn, "%s", query->data)); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index a710f325de..2d8bcb26f9 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -160,6 +160,8 @@ typedef struct bool two_phase; /* can the slot decode 2PC? */ bool caught_up; /* has the slot caught up to latest changes? */ bool invalid; /* if true, the slot is unusable */ + bool failover; /* is the slot designated to be synced to the + * physical standby? */ } LogicalSlotInfo; typedef struct diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl index 020e7aa1cc..cb3a2b3aed 100644 --- a/src/bin/pg_upgrade/t/003_logical_slots.pl +++ b/src/bin/pg_upgrade/t/003_logical_slots.pl @@ -159,7 +159,7 @@ $sub->start; $sub->safe_psql( 'postgres', qq[ CREATE TABLE tbl (a int); - CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true') + CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true', failover = 'true') ]); $sub->wait_for_subscription_sync($oldpub, 'regress_sub'); @@ -179,8 +179,8 @@ command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster'); # Check that the slot 'regress_sub' has migrated to the new cluster $newpub->start; my $result = $newpub->safe_psql('postgres', - "SELECT slot_name, two_phase FROM pg_replication_slots"); -is($result, qq(regress_sub|t), 'check the slot exists on new cluster'); + "SELECT slot_name, two_phase, failover FROM pg_replication_slots"); +is($result, qq(regress_sub|t|t), 'check the slot exists on new cluster'); # Update the connection my $new_connstr = $newpub->connstr . ' dbname=postgres'; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 5077e7b358..36795b1085 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6563,7 +6563,8 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false, false, + false}; if (pset.sversion < 100000) { @@ -6627,6 +6628,11 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Password required"), gettext_noop("Run as owner?")); + if (pset.sversion >= 170000) + appendPQExpBuffer(&buf, + ", subfailoverstate AS \"%s\"\n", + gettext_noop("Failover")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 049801186c..905964a2e8 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3327,7 +3327,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "origin", + "disable_on_error", "enabled", "failover", "origin", "password_required", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9052f5262a..7abd672858 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11115,17 +11115,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool', - proallargtypes => '{name,name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', + proargtypes => 'name name bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e0b91eacd2..3190a3889b 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -31,6 +31,14 @@ #define LOGICALREP_TWOPHASE_STATE_PENDING 'p' #define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' +/* + * failover tri-state values. See comments atop worker.c to know more about + * these states. + */ +#define LOGICALREP_FAILOVER_STATE_DISABLED 'd' +#define LOGICALREP_FAILOVER_STATE_PENDING 'p' +#define LOGICALREP_FAILOVER_STATE_ENABLED 'e' + /* * The subscription will request the publisher to only send changes that do not * have any origin. @@ -93,6 +101,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + char subfailoverstate; /* Failover state */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -145,6 +155,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + char failoverstate; /* Allow slot to be synchronized for failover */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index c98961c329..d9be317662 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd } DropReplicationSlotCmd; +/* ---------------------- + * ALTER_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct AlterReplicationSlotCmd +{ + NodeTag type; + char *slotname; + List *options; +} AlterReplicationSlotCmd; + + /* ---------------------- * START_REPLICATION command * ---------------------- diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index d3535eed58..a2e9d8e61c 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a failover slot (sync candidate for physical standbys)? Only + * relevant for logical slots on the primary server. + */ + bool failover; } ReplicationSlotPersistentData; /* @@ -218,9 +224,10 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase); + bool two_phase, bool failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 949e874f21..f1135762fb 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -355,9 +355,20 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +/* + * walrcv_alter_slot_fn + * + * Change the definition of a replication slot. Currently, it only supports + * changing the failover property of the slot. + */ +typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, + const char *slotname, + bool failover); + /* * walrcv_get_backend_pid_fn * @@ -399,6 +410,7 @@ typedef struct WalReceiverFunctionsType walrcv_receive_fn walrcv_receive; walrcv_send_fn walrcv_send; walrcv_create_slot_fn walrcv_create_slot; + walrcv_alter_slot_fn walrcv_alter_slot; walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; @@ -428,8 +440,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) +#define walrcv_alter_slot(conn, slotname, failover) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index db73408937..84bb79ac0f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -256,7 +256,8 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); extern bool AllTablesyncsReady(void); -extern void UpdateTwoPhaseState(Oid suboid, char new_state); +extern void EnableTwoPhaseFailoverTriState(Oid suboid, bool enable_twophase, + bool enable_failover); extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, diff --git a/src/test/recovery/t/050_standby_failover_slots_sync.pl b/src/test/recovery/t/050_standby_failover_slots_sync.pl new file mode 100644 index 0000000000..485e2a0191 --- /dev/null +++ b/src/test/recovery/t/050_standby_failover_slots_sync.pl @@ -0,0 +1,64 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +################################################## +# Test that when a subscription with failover enabled is created, it will alter +# the failover property of the corresponding slot on the publisher. +################################################## + +# Create publisher +my $publisher = PostgreSQL::Test::Cluster->new('publisher'); +$publisher->init(allows_streaming => 'logical'); +$publisher->start; + +$publisher->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + CREATE PUBLICATION regress_mypub FOR TABLE tab_int; +]); + +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; + +# Create a subscriber node, wait for sync to complete +my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$subscriber1->init; +$subscriber1->start; + +# Create a slot on the publisher with failover disabled +$publisher->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);" +); + +# Confirm that the failover flag on the slot is turned off +is( $publisher->safe_psql( + 'postgres', + q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "f", + 'logical slot has failover false on the publisher'); + +# Create another subscription (using the same slot created above) that enables +# failover. +$subscriber1->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, copy_data=false, failover = true, create_slot = false); +]); + +# Confirm that the failover flag on the slot has now been turned on +is( $publisher->safe_psql( + 'postgres', + q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t", + 'logical slot has failover true on the publisher'); + +$subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1"); + +done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index f645e8486b..373b7e15af 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflicting - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) + l.conflicting, + l.failover + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b15eddbff3..96c614332c 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | d | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,10 +371,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -383,10 +383,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,10 +396,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,18 +412,31 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- test failover option +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, failover = true); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | p | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 444e563ff3..e4601158b3 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -290,6 +290,14 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- test failover option +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, failover = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e37ef9aa76..611deeaae5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -85,6 +85,7 @@ AlterOwnerStmt AlterPolicyStmt AlterPublicationAction AlterPublicationStmt +AlterReplicationSlotCmd AlterRoleSetStmt AlterRoleStmt AlterSeqStmt @@ -3870,6 +3871,7 @@ varattrib_1b_e varattrib_4b vbits verifier_context +walrcv_alter_slot_fn walrcv_check_conninfo_fn walrcv_connect_fn walrcv_create_slot_fn -- 2.30.0.windows.2