From e4ec647fc5114440b1061d1376caca73c03f7936 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 8 Apr 2024 12:39:12 +0000 Subject: [PATCH 3/4] Abort prepared transactions while altering two_phase to false --- src/backend/access/transam/twophase.c | 19 +++++----- src/backend/commands/subscriptioncmds.c | 27 +++++++++++--- src/include/access/twophase.h | 3 +- src/test/subscription/t/099_twophase_added.pl | 35 +++++++++++++++++++ 4 files changed, 68 insertions(+), 16 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2148daba3c..e18e80cf4e 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2678,13 +2678,13 @@ checkGid(char *gid, Oid subid) } /* - * LookupGXactBySubid - * Check if the prepared transaction done by apply worker exists. + * GetGidListBySubid + * Get a list of GIDs which is PREPARE'd by the given subscription. */ -bool -LookupGXactBySubid(Oid subid) +List * +GetGidListBySubid(Oid subid) { - bool found = false; + List *list = NIL; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (int i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -2693,11 +2693,10 @@ LookupGXactBySubid(Oid subid) /* Ignore not-yet-valid GIDs. */ if (gxact->valid && checkGid(gxact->gid, subid)) - { - found = true; - break; - } + list = lappend(list, pstrdup(gxact->gid)); + } LWLockRelease(TwoPhaseStateLock); - return found; + + return list; } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 955d5e4899..b1c00e36db 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1142,6 +1142,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* XXX */ if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) { + List *prepared_xacts = NIL; + /* * two_phase can be only changed for disabled * subscriptions @@ -1158,13 +1160,28 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, */ logicalrep_workers_stop(subid); - /* Check whether the number of prepared transactions */ + /* + * If two phase was enabled, there is a possibility the + * transactions has already been PREPARE'd. + */ if (!opts.twophase && form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && - LookupGXactBySubid(subid)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot disable two_phase when uncommitted prepared transactions present"))); + (prepared_xacts = GetGidListBySubid(subid)) != NIL) + { + ListCell *cell; + + /* Must not be in the transaction */ + PreventInTransactionBlock(isTopLevel, + "ALTER SUBSCRIPTION ... SET (two_phase = ...)"); + + /* Abort all listed transactions */ + foreach(cell, prepared_xacts) + { + FinishPreparedTransaction((char *) lfirst(cell), + false); + prepared_xacts = list_delete_cell(prepared_xacts, cell); + } + } /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index dac3f27bc3..8eebfa7267 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -18,6 +18,7 @@ #include "access/xlogdefs.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "nodes/pg_list.h" /* * GlobalTransactionData is defined in twophase.c; other places have no @@ -63,6 +64,6 @@ extern void restoreTwoPhaseData(void); extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp); -extern bool LookupGXactBySubid(Oid subid); +extern List *GetGidListBySubid(Oid subid); #endif /* TWOPHASE_H */ diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl index c13a37675a..a8135b671c 100644 --- a/src/test/subscription/t/099_twophase_added.pl +++ b/src/test/subscription/t/099_twophase_added.pl @@ -69,4 +69,39 @@ $result = $node_subscriber->safe_psql('postgres', is($result, q(5), "prepared transactions done before altering can be replicated"); +###### +# Check the case that prepared transactions exist on subscriber node +###### + +$node_publisher->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (generate_series(6, 10)); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup('sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(1), "transaction has been prepared on subscriber"); + +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION sub DISABLE; + ALTER SUBSCRIPTION sub SET (two_phase = off); + ALTER SUBSCRIPTION sub ENABLE;"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(0), "prepared transaction done by worker is aborted"); + +$node_publisher->safe_psql( 'postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); +$node_publisher->wait_for_catchup('sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(10) FROM tab_full;"); +is($result, q(10), + "prepared transactions on publisher can be replicated"); + done_testing(); -- 2.43.0