From d6d4b8ac93c60dcaadb53c6cb9a446ae0882511b Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 5 Apr 2024 06:47:18 -0400 Subject: [PATCH v3 1/5] Allow altering of two_phase option of a SUBSCRIPTION This patch allows user to alter two_phase option of a subscriber provided no uncommitted prepared transactions are pending on that subscription. Author: Cherian Ajin, Hayato Kuroda --- src/backend/access/transam/twophase.c | 43 +++++++++++++++++++ src/backend/commands/subscriptioncmds.c | 42 +++++++++++++++--- .../libpqwalreceiver/libpqwalreceiver.c | 7 +-- src/backend/replication/logical/launcher.c | 21 +++++++-- src/backend/replication/logical/worker.c | 3 -- src/backend/replication/slot.c | 19 +++++++- src/backend/replication/walsender.c | 20 +++++++-- src/bin/psql/tab-complete.c | 2 +- src/include/access/twophase.h | 3 ++ src/include/replication/logicallauncher.h | 2 +- src/include/replication/slot.h | 3 +- src/include/replication/walreceiver.h | 5 ++- src/test/regress/expected/subscription.out | 5 +-- src/test/regress/sql/subscription.sql | 5 +-- 14 files changed, 146 insertions(+), 34 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 8090ac9fc1..495f99a357 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2682,3 +2682,46 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, LWLockRelease(TwoPhaseStateLock); return found; } + +/* + * checkGid + */ +static bool +checkGid(char *gid, Oid subid) +{ + int ret; + Oid subid_written, + xid; + + ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid); + + if (ret != 2 || subid != subid_written) + return false; + + return true; +} + +/* + * LookupGXactBySubid + * Check if the prepared transaction done by apply worker exists. + */ +bool +LookupGXactBySubid(Oid subid) +{ + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (int i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if (gxact->valid && checkGid(gxact->gid, subid)) + { + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + return found; +} diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5a47fa984d..6643fc08a6 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -16,6 +16,7 @@ #include "access/htup_details.h" #include "access/table.h" +#include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" @@ -849,7 +850,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, else if (opts.slot_name && (opts.failover || walrcv_server_version(wrconn) >= 170000)) { - walrcv_alter_slot(wrconn, opts.slot_name, opts.failover); + walrcv_alter_slot(wrconn, opts.slot_name, opts.twophase, opts.failover); } } PG_FINALLY(); @@ -868,7 +869,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, pgstat_create_subscription(subid); if (opts.enabled) - ApplyLauncherWakeupAtCommit(); + ApplyLauncherWakeupAtEOXact(true); ObjectAddressSet(myself, SubscriptionRelationId, subid); @@ -1165,7 +1166,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); @@ -1173,6 +1175,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + /* XXX */ + if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) + { + /* Stop corresponding worker */ + logicalrep_worker_stop(subid, InvalidOid); + + /* Request to start worker at the end of transaction */ + ApplyLauncherWakeupAtEOXact(false); + + /* Check whether the number of prepared transactions */ + if (!opts.twophase && + form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + LookupGXactBySubid(subid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot disable two_phase when uncommitted prepared transactions present"))); + + /* Change system catalog acoordingly */ + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(opts.twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { /* @@ -1299,7 +1326,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subenabled - 1] = true; if (opts.enabled) - ApplyLauncherWakeupAtCommit(); + ApplyLauncherWakeupAtEOXact(true); update_tuple = true; break; @@ -1521,7 +1548,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * doing the database operations we won't be able to rollback altered * slot. */ - if (replaces[Anum_pg_subscription_subfailover - 1]) + if (replaces[Anum_pg_subscription_subtwophasestate - 1] || + replaces[Anum_pg_subscription_subfailover - 1]) { bool must_use_password; char *err; @@ -1541,7 +1569,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover); + walrcv_alter_slot(wrconn, sub->slotname, opts.twophase, opts.failover); } PG_FINALLY(); { @@ -1962,7 +1990,7 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) form->oid, 0); /* Wake up related background processes to handle this change quickly. */ - ApplyLauncherWakeupAtCommit(); + ApplyLauncherWakeupAtEOXact(true); LogicalRepWorkersWakeupAtCommit(form->oid); } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 3c2b1bb496..baef3bdec0 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover); + bool two_phase, bool failover); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,14 +1121,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover) + bool two_phase, bool failover) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )", + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( TWO_PHASE %s, FAILOVER %s )", quote_identifier(slotname), + two_phase ? "true" : "false", failover ? "true" : "false"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 66070e9131..3e0e5a77e0 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -89,6 +89,7 @@ static dsa_area *last_start_times_dsa = NULL; static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; +static bool launcher_wakeup = false; static void ApplyLauncherWakeup(void); @@ -1085,13 +1086,22 @@ ApplyLauncherForgetWorkerStartTime(Oid subid) void AtEOXact_ApplyLauncher(bool isCommit) { + bool kicked = false; + if (isCommit) { if (on_commit_launcher_wakeup) + { ApplyLauncherWakeup(); + kicked = true; + } } + if (!kicked && launcher_wakeup) + ApplyLauncherWakeup(); + on_commit_launcher_wakeup = false; + launcher_wakeup = false; } /* @@ -1102,10 +1112,15 @@ AtEOXact_ApplyLauncher(bool isCommit) * tuple was added to the pg_subscription catalog. */ void -ApplyLauncherWakeupAtCommit(void) +ApplyLauncherWakeupAtEOXact(bool on_commit) { - if (!on_commit_launcher_wakeup) - on_commit_launcher_wakeup = true; + if (on_commit) + { + if (!on_commit_launcher_wakeup) + on_commit_launcher_wakeup = true; + } + else if (!launcher_wakeup) + launcher_wakeup = true; } static void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe3e8..ca3d260fc3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3911,9 +3911,6 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); - /* two-phase should not be altered */ - Assert(newsub->twophasestate == MySubscription->twophasestate); - /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker but note that the parallel apply diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cebf44bb0f..621f35ab1e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -800,8 +800,10 @@ ReplicationSlotDrop(const char *name, bool nowait) * Change the definition of the slot identified by the specified name. */ void -ReplicationSlotAlter(const char *name, bool failover) +ReplicationSlotAlter(const char *name, bool two_phase, bool failover) { + bool update_slot = false; + Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, false); @@ -844,12 +846,27 @@ ReplicationSlotAlter(const char *name, bool failover) errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a temporary replication slot")); + if (MyReplicationSlot->data.two_phase != two_phase) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.two_phase = two_phase; + SpinLockRelease(&MyReplicationSlot->mutex); + + update_slot = true; + } + + if (MyReplicationSlot->data.failover != failover) { SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.failover = failover; SpinLockRelease(&MyReplicationSlot->mutex); + update_slot = true; + } + + if (update_slot) + { ReplicationSlotMarkDirty(); ReplicationSlotSave(); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bc40c454de..be155067ce 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1411,14 +1411,25 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) * Process extra options given to ALTER_REPLICATION_SLOT. */ static void -ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) +ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, + bool *two_phase, bool *failover) { + bool two_phase_given = false; bool failover_given = false; /* Parse options */ foreach_ptr(DefElem, defel, cmd->options) { - if (strcmp(defel->defname, "failover") == 0) + if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_given = true; + *two_phase = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "failover") == 0) { if (failover_given) ereport(ERROR, @@ -1438,10 +1449,11 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd) { + bool two_phase = false; bool failover = false; - ParseAlterReplSlotOptions(cmd, &failover); - ReplicationSlotAlter(cmd->slotname, failover); + ParseAlterReplSlotOptions(cmd, &two_phase, &failover); + ReplicationSlotAlter(cmd->slotname, two_phase, failover); } /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 6fee3160f0..5ff84301cd 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1948,7 +1948,7 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + "streaming", "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 56248c0006..d493ed24c5 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -62,4 +62,7 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp); + +extern bool LookupGXactBySubid(Oid subid); + #endif /* TWOPHASE_H */ diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index ff0438b5bb..075842c67e 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -24,7 +24,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); -extern void ApplyLauncherWakeupAtCommit(void); +extern void ApplyLauncherWakeupAtEOXact(bool on_commit); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 7b937d1a0c..2fcb11418f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -243,7 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDropAcquired(void); -extern void ReplicationSlotAlter(const char *name, bool failover); +extern void ReplicationSlotAlter(const char *name, bool two_phase, + bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 12f71fa99b..a443f402f5 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -377,6 +377,7 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, + bool two_phase, bool failover); /* @@ -455,8 +456,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) -#define walrcv_alter_slot(conn, slotname, failover) \ - WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover) +#define walrcv_alter_slot(conn, slotname, two_phase, failover) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, two_phase, failover) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 1eee6b17b8..9bba656e00 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -379,10 +379,7 @@ HINT: To initiate replication, you must manually create the replication slot, e regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ---fail - alter of two_phase option not supported. -ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); -ERROR: unrecognized subscription parameter: "two_phase" --- but can alter streaming when two_phase enabled +-- We can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ List of subscriptions diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 1b2a23ba7b..9ff151f806 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -257,10 +257,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); \dRs+ ---fail - alter of two_phase option not supported. -ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); - --- but can alter streaming when two_phase enabled +-- We can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ -- 2.43.0