From e5c865867983282814c60723499546913fd2d8c1 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 5 Apr 2024 06:47:18 -0400 Subject: [PATCH 1/4] Allow altering of two_phase option of a SUBSCRIPTION This patch allows user to alter two_phase option of a subscriber provided no uncommitted prepared transactions are pending on that subscription. Author: Cherian Ajin, Hayato Kuroda --- src/backend/access/transam/twophase.c | 43 ++++++++++ src/backend/commands/subscriptioncmds.c | 83 ++++++++++++++++--- src/backend/nodes/gen_node_support.pl | 2 +- .../libpqwalreceiver/libpqwalreceiver.c | 30 +++++++ src/backend/replication/logical/launcher.c | 21 +++++ src/backend/replication/logical/worker.c | 2 +- src/backend/replication/repl_gram.y | 20 ++++- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 25 ++++++ src/backend/replication/walsender.c | 47 +++++++++++ src/bin/psql/tab-complete.c | 2 +- src/include/access/twophase.h | 3 + src/include/nodes/replnodes.h | 12 +++ src/include/replication/slot.h | 2 +- src/include/replication/walreceiver.h | 10 +++ src/include/replication/worker_internal.h | 1 + src/test/regress/expected/subscription.out | 5 +- src/test/regress/sql/subscription.sql | 5 +- src/test/subscription/t/021_twophase.pl | 67 ++++++++++++++- 19 files changed, 354 insertions(+), 28 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c6af8cfd7e..2148daba3c 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2658,3 +2658,46 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, LWLockRelease(TwoPhaseStateLock); return found; } + +/* + * checkGid + */ +static bool +checkGid(char *gid, Oid subid) +{ + int ret; + Oid subid_written, + xid; + + ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid); + + if (ret != 2 || subid != subid_written) + return false; + + return true; +} + +/* + * LookupGXactBySubid + * Check if the prepared transaction done by apply worker exists. + */ +bool +LookupGXactBySubid(Oid subid) +{ + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (int i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if (gxact->valid && checkGid(gxact->gid, subid)) + { + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + return found; +} diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6fe111e98d..59852fd9af 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -16,6 +16,7 @@ #include "access/htup_details.h" #include "access/table.h" +#include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" @@ -1130,13 +1131,49 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + /* XXX */ + if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) + { + /* + * two_phase can be only changed for disabled + * subscriptions + */ + if (form->subenabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for enabled subscription", + "two_phase"))); + + /* + * Stop all the subscription workers, just in case. Workers + * may still survive even if the subscription is disabled. + */ + logicalrep_workers_stop(subid); + + /* Check whether the number of prepared transactions */ + if (!opts.twophase && + form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + LookupGXactBySubid(subid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot disable two_phase when uncommitted prepared transactions present"))); + + /* Change system catalog acoordingly */ + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(opts.twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { /* @@ -1453,6 +1490,38 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, heap_freetuple(tup); } + /* + * Try to acquire the connection necessary for altering slot. + */ + if (replaces[Anum_pg_subscription_subtwophasestate - 1]) + { + bool must_use_password; + char *err; + WalReceiverConn *wrconn; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher */ + must_use_password = (!superuser() && opts.passwordrequired); + wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + PG_TRY(); + { + walrcv_alter_slot(wrconn, sub->slotname, opts.twophase); + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + } + table_close(rel, RowExclusiveLock); ObjectAddressSet(myself, SubscriptionRelationId, subid); @@ -1481,7 +1550,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char *subname; char *conninfo; char *slotname; - List *subworkers; ListCell *lc; char originname[NAMEDATALEN]; char *err = NULL; @@ -1591,16 +1659,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * New workers won't be started because we hold an exclusive lock on the * subscription till the end of the transaction. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - subworkers = logicalrep_workers_find(subid, false); - LWLockRelease(LogicalRepWorkerLock); - foreach(lc, subworkers) - { - LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - - logicalrep_worker_stop(w->subid, w->relid); - } - list_free(subworkers); + logicalrep_workers_stop(subid); /* * Remove the no-longer-useful entry in the launcher's table of apply diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index d67565b925..fcba6f986f 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -107,7 +107,7 @@ my @nodetag_only_files = qw( # ABI stability during development. my $last_nodetag = 'WindowObjectData'; -my $last_nodetag_no = 454; +my $last_nodetag_no = 455; # output file names my @output_files; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index b4038e114d..815ab8f9df 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -75,6 +75,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool two_phase); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -95,6 +97,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_alter_slot = libpqrcv_alter_slot, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -1039,6 +1042,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, return snapshot; } +/* + * Change the definition of the replication slot. + */ +static void +libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool two_phase) +{ + StringInfoData cmd; + PGresult *res; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( TWO_PHASE %s )", + quote_identifier(slotname), + two_phase ? "true" : "false"); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not alter replication slot \"%s\": %s", + slotname, pchomp(PQerrorMessage(conn->streamConn))))); + + PQclear(res); +} + /* * Return PID of remote backend process. */ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8395ae7b23..295fe0f2a0 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -608,6 +608,27 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); } +/* + * Stop all the subscription workers. + */ +void +logicalrep_workers_stop(Oid subid) +{ + List *subworkers; + ListCell *lc; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + subworkers = logicalrep_workers_find(subid, false); + LWLockRelease(LogicalRepWorkerLock); + foreach(lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + + logicalrep_worker_stop(w->subid, w->relid); + } + list_free(subworkers); +} + /* * Stop the given logical replication parallel apply worker. * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 832b1cf764..7da6d546dd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3926,7 +3926,7 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); - /* two-phase should not be altered */ + /* two-phase should not be altered while the worker exists */ Assert(newsub->twophasestate == MySubscription->twophasestate); /* diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..bff501263e 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ Node *replication_parse_result; %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT %token K_DROP_REPLICATION_SLOT +%token K_ALTER_REPLICATION_SLOT %token K_TIMELINE_HISTORY %token K_WAIT %token K_TIMELINE @@ -79,8 +80,9 @@ Node *replication_parse_result; %type command %type base_backup start_replication start_logical_replication - create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show + create_replication_slot drop_replication_slot + alter_replication_slot identify_system read_replication_slot + timeline_history show %type generic_option_list %type generic_option %type opt_timeline @@ -111,6 +113,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | alter_replication_slot | read_replication_slot | timeline_history | show @@ -257,6 +260,18 @@ drop_replication_slot: } ; +/* ALTER_REPLICATION_SLOT slot (options) */ +alter_replication_slot: + K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')' + { + AlterReplicationSlotCmd *cmd; + cmd = makeNode(AlterReplicationSlotCmd); + cmd->slotname = $2; + cmd->options = $4; + $$ = (Node *) cmd; + } + ; + /* * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] */ @@ -399,6 +414,7 @@ ident_or_keyword: | K_START_REPLICATION { $$ = "start_replication"; } | K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; } | K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; } + | K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; } | K_TIMELINE_HISTORY { $$ = "timeline_history"; } | K_WAIT { $$ = "wait"; } | K_TIMELINE { $$ = "timeline"; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index cb467ca46f..6396463cf9 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } +ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } @@ -301,6 +302,7 @@ replication_scanner_is_replication_command(void) case K_START_REPLICATION: case K_CREATE_REPLICATION_SLOT: case K_DROP_REPLICATION_SLOT: + case K_ALTER_REPLICATION_SLOT: case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1f9d0ed9b3..04915590d2 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -648,6 +648,31 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotDropAcquired(); } +/* + * Change the definition of the slot identified by the specified name. + */ +void +ReplicationSlotAlter(const char *name, bool two_phase) +{ + Assert(MyReplicationSlot == NULL); + + ReplicationSlotAcquire(name, false); + + if (SlotIsPhysical(MyReplicationSlot)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use %s with a physical replication slot", + "ALTER_REPLICATION_SLOT")); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.two_phase = two_phase; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); +} + /* * Permanently drop the currently acquired replication slot. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4c53de08b9..4c18dadaad 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1247,6 +1247,46 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) ReplicationSlotDrop(cmd->slotname, !cmd->wait); } +/* + * Process extra options given to ALTER_REPLICATION_SLOT. + */ +static void +ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *two_phase) +{ + bool two_phase_given = false; + ListCell *lc; + + /* Parse options */ + foreach(lc, cmd->options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_given = true; + *two_phase = defGetBoolean(defel); + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* + * Change the definition of a replication slot. + */ +static void +AlterReplicationSlot(AlterReplicationSlotCmd *cmd) +{ + bool two_phase = false; + + ParseAlterReplSlotOptions(cmd, &two_phase); + ReplicationSlotAlter(cmd->slotname, two_phase); +} + /* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). @@ -1820,6 +1860,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_AlterReplicationSlotCmd: + cmdtag = "ALTER_REPLICATION_SLOT"; + set_ps_display(cmdtag); + AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index a1aa946b30..b689b6906a 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1927,7 +1927,7 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + "streaming", "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 21e2af7387..dac3f27bc3 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -62,4 +62,7 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp); + +extern bool LookupGXactBySubid(Oid subid); + #endif /* TWOPHASE_H */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 4321ba8f86..5819276d19 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd } DropReplicationSlotCmd; +/* ---------------------- + * ALTER_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct AlterReplicationSlotCmd +{ + NodeTag type; + char *slotname; + List *options; +} AlterReplicationSlotCmd; + + /* ---------------------- * START_REPLICATION command * ---------------------- diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..24ab5117bd 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -214,7 +214,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, bool two_phase); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); - +extern void ReplicationSlotAlter(const char *name, bool two_phase); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..8ba1599fad 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -359,6 +359,13 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +/* + * walrcv_alter_slot_fn + */ +typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, + const char *slotname, + bool two_phase); + /* * walrcv_get_backend_pid_fn * @@ -400,6 +407,7 @@ typedef struct WalReceiverFunctionsType walrcv_receive_fn walrcv_receive; walrcv_send_fn walrcv_send; walrcv_create_slot_fn walrcv_create_slot; + walrcv_alter_slot_fn walrcv_alter_slot; walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; @@ -431,6 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_alter_slot(conn, slotname, two_phase) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, two_phase) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 343e781896..3f19ae4f5f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -235,6 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_workers_stop(Oid subid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b15eddbff3..51c466f42e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -377,10 +377,7 @@ HINT: To initiate replication, you must manually create the replication slot, e regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ---fail - alter of two_phase option not supported. -ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); -ERROR: unrecognized subscription parameter: "two_phase" --- but can alter streaming when two_phase enabled +-- We can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ List of subscriptions diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 444e563ff3..b7764c1074 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -256,10 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); \dRs+ ---fail - alter of two_phase option not supported. -ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); - --- but can alter streaming when two_phase enabled +-- We can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 8ce4cfc983..b6ce59763a 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -367,6 +367,71 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); is($result, qq(2), 'replicated data in subscriber table'); +# Disable the subscription and alter it to two_phase = false, +# verify that the altered subscription reflects the two_phase option. + +# Alter subscription two_phase to false +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy DISABLE"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = false)"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy ENABLE"); + +# Wait for subscription startup +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); + +# Make sure that the two-phase is disabled on the subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" +); +is($result, qq(d), 'two-phase is disabled'); + +# Now do a prepare on publisher and make sure that it is not replicated. +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_publisher->safe_psql( + 'postgres', qq{ + BEGIN; + INSERT INTO tab_copy VALUES (100); + PREPARE TRANSACTION 'newgid'; + }); + +# Wait for the subscriber to catchup +$node_publisher->wait_for_catchup($appname_copy); + +# Make sure that there is 0 prepared transaction on the subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is prepared on subscriber'); + +# Now commit the insert and verify that it IS replicated +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';"); + +# Wait for the subscriber to catchup +$node_publisher->wait_for_catchup($appname_copy); + +# Made sure that the commited transaction is replicated. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); +is($result, qq(3), 'replicated data in subscriber table'); + +# Alter subscription two_phase to true +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy DISABLE"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true)"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy ENABLE"); + +# Wait for subscription startup +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); + +# Make sure that the two-phase is enabled on the subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" +); +is($result, qq(e), 'two-phase is disabled'); + $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;"); $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;"); @@ -374,8 +439,6 @@ $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;"); # check all the cleanup ############################### -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); - $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); is($result, qq(0), 'check subscription was dropped on subscriber'); -- 2.43.0