From e5da6b142c6bcfb2943cdc24c6d63bf110dcf75e Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 8 Apr 2024 02:32:38 +0000 Subject: [PATCH v3 2/5] Mandate the subscription has been disabled --- doc/src/sgml/ref/alter_subscription.sgml | 6 ++++-- src/backend/commands/subscriptioncmds.c | 20 ++++++++++++-------- src/backend/replication/logical/launcher.c | 21 +++------------------ src/backend/replication/logical/worker.c | 3 +++ src/include/replication/logicallauncher.h | 2 +- 5 files changed, 23 insertions(+), 29 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 413ce68ce2..20b45e36e0 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -227,9 +227,11 @@ ALTER SUBSCRIPTION name RENAME TO < disable_on_error, password_required, run_as_owner, - origin, and - failover. + origin, + failover, and + two_phase. Only a superuser can set password_required = false. + two_phase can be altered only for disabled subscription. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6643fc08a6..bfbb2873b1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -869,7 +869,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, pgstat_create_subscription(subid); if (opts.enabled) - ApplyLauncherWakeupAtEOXact(true); + ApplyLauncherWakeupAtCommit(); ObjectAddressSet(myself, SubscriptionRelationId, subid); @@ -1178,11 +1178,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* XXX */ if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) { - /* Stop corresponding worker */ - logicalrep_worker_stop(subid, InvalidOid); - - /* Request to start worker at the end of transaction */ - ApplyLauncherWakeupAtEOXact(false); + /* + * two_phase can be only changed for disabled + * subscriptions + */ + if (form->subenabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for enabled subscription", + "two_phase"))); /* Check whether the number of prepared transactions */ if (!opts.twophase && @@ -1326,7 +1330,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subenabled - 1] = true; if (opts.enabled) - ApplyLauncherWakeupAtEOXact(true); + ApplyLauncherWakeupAtCommit(); update_tuple = true; break; @@ -1990,7 +1994,7 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) form->oid, 0); /* Wake up related background processes to handle this change quickly. */ - ApplyLauncherWakeupAtEOXact(true); + ApplyLauncherWakeupAtCommit(); LogicalRepWorkersWakeupAtCommit(form->oid); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3e0e5a77e0..66070e9131 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -89,7 +89,6 @@ static dsa_area *last_start_times_dsa = NULL; static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; -static bool launcher_wakeup = false; static void ApplyLauncherWakeup(void); @@ -1086,22 +1085,13 @@ ApplyLauncherForgetWorkerStartTime(Oid subid) void AtEOXact_ApplyLauncher(bool isCommit) { - bool kicked = false; - if (isCommit) { if (on_commit_launcher_wakeup) - { ApplyLauncherWakeup(); - kicked = true; - } } - if (!kicked && launcher_wakeup) - ApplyLauncherWakeup(); - on_commit_launcher_wakeup = false; - launcher_wakeup = false; } /* @@ -1112,15 +1102,10 @@ AtEOXact_ApplyLauncher(bool isCommit) * tuple was added to the pg_subscription catalog. */ void -ApplyLauncherWakeupAtEOXact(bool on_commit) +ApplyLauncherWakeupAtCommit(void) { - if (on_commit) - { - if (!on_commit_launcher_wakeup) - on_commit_launcher_wakeup = true; - } - else if (!launcher_wakeup) - launcher_wakeup = true; + if (!on_commit_launcher_wakeup) + on_commit_launcher_wakeup = true; } static void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ca3d260fc3..374aa22091 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3911,6 +3911,9 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); + /* two-phase should not be altered while the worker exists */ + Assert(newsub->twophasestate == MySubscription->twophasestate); + /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker but note that the parallel apply diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 075842c67e..ff0438b5bb 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -24,7 +24,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); -extern void ApplyLauncherWakeupAtEOXact(bool on_commit); +extern void ApplyLauncherWakeupAtCommit(void); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); -- 2.43.0