From be5727394811456e5b1dc3e38c6faccfc390452b Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 8 Apr 2024 12:39:12 +0000 Subject: [PATCH v14 3/4] Abort prepared transactions while altering two_phase to off If we alter the two_phase parameter from "on" to "off" and there are prepared transactions on the subscriber, they won't be resolved. To avoid this issue, we allow the backend to abort all prepared transactions while altering the subscription. --- doc/src/sgml/ref/alter_subscription.sgml | 11 ++- src/backend/access/transam/twophase.c | 17 ++--- src/backend/commands/subscriptioncmds.c | 75 ++++++++++++------- src/include/access/twophase.h | 3 +- src/test/subscription/t/099_twophase_added.pl | 44 +++++++++++ 5 files changed, 110 insertions(+), 40 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 475a42a2e3..8801f37f0e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -233,8 +233,6 @@ ALTER SUBSCRIPTION name RENAME TO < failover, and two_phase. Only a superuser can set password_required = false. - The two_phase parameter can only be altered when the - subscription is disabled. @@ -256,6 +254,15 @@ ALTER SUBSCRIPTION name RENAME TO < failover option is enabled. + + + The two_phase parameter can only be altered when the + subscription is disabled. When altering the parameter from true + to false, the backend process checks for any incomplete + prepared transactions done by the logical replication worker (from when + two_phase parameter was still true) + and, if any are found, those are aborted. + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 35bce6809d..0be8a15367 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2719,13 +2719,13 @@ IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid) } /* - * LookupGXactBySubid - * Check if the prepared transaction done by apply worker exists. + * GetGidListBySubid + * Get a list of GIDs which is PREPARE'd by the given subscription. */ -bool -LookupGXactBySubid(Oid subid) +List * +GetGidListBySubid(Oid subid) { - bool found = false; + List *list = NIL; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (int i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -2735,11 +2735,8 @@ LookupGXactBySubid(Oid subid) /* Ignore not-yet-valid GIDs. */ if (gxact->valid && IsTwoPhaseTransactionGidForSubid(subid, gxact->gid)) - { - found = true; - break; - } + list = lappend(list, pstrdup(gxact->gid)); } LWLockRelease(TwoPhaseStateLock); - return found; + return list; } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 996ea6b6de..54a2c76f37 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1177,38 +1177,59 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, logicalrep_workers_stop(subid); /* - * two_phase cannot be disabled if there are any - * uncommitted prepared transactions present. - */ - if (!opts.twophase && - form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && - LookupGXactBySubid(subid)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot disable two_phase when uncommitted prepared transactions present"), - errhint("Resolve these transactions and try again"))); - - /* - * Altering the parameter from "true" to "false" within a - * transaction is prohibited. Since the apply worker does - * not alter the slot option to false, the backend must - * connect to the publisher and expressly change the - * parameter. - * - * There is no need to do something remarkable regarding - * the "false" to "true" case; the backend process alters - * subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. - * After the subscription is enabled, a new logical - * replication worker requests to change the two_phase - * option of its slot from pending to true when the - * initial data synchronization is done. The code path is - * the same as the case in which two_phase is initially - * set to true. + * If two_phase was previously enabled, there is a + * possibility that transactions have already been + * PREPARE'd. They must be checked and rolled back. */ if (!opts.twophase) + { + List *prepared_xacts; + + /* + * Altering the parameter from "true" to "false" + * within a transaction is prohibited. Since the apply + * worker does not alter the slot option to false, the + * backend must connect to the publisher and expressly + * change the parameter. + * + * There is no need to do something remarkable + * regarding the "false" to "true" case; the backend + * process alters subtwophase to + * LOGICALREP_TWOPHASE_STATE_PENDING once. After the + * subscription is enabled, a new logical replication + * worker requests to change the two_phase option of + * its slot from pending to true when the initial data + * synchronization is done. The code path is the same + * as the case in which two_phase is initially + * set to true. + */ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (two_phase = false)"); + /* + * To prevent prepared transactions from being + * isolated, they must manually be aborted. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + (prepared_xacts = GetGidListBySubid(subid)) != NIL) + { + ereport(WARNING, + (errmsg_plural("requested altering to %s but there is prepared transaction done by the subscription", + "requested altering to %s but there are prepared transactions done by the subscription", + list_length(prepared_xacts), + "two_phase = false"), + errdetail_plural("Such a transaction is being rollbacked.", + "Such transactions are being rollbacked.", + list_length(prepared_xacts)))); + + /* Abort all listed transactions */ + foreach_ptr(char, gid, prepared_xacts) + FinishPreparedTransaction(gid, false); + + list_free_deep(prepared_xacts); + } + } + /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index d37e06fdee..f7a5cf0c12 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -18,6 +18,7 @@ #include "access/xlogdefs.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "nodes/pg_list.h" /* * GlobalTransactionData is defined in twophase.c; other places have no @@ -65,6 +66,6 @@ extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); -extern bool LookupGXactBySubid(Oid subid); +extern List *GetGidListBySubid(Oid subid); #endif /* TWOPHASE_H */ diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl index 1124f7fa00..635daf7a78 100644 --- a/src/test/subscription/t/099_twophase_added.pl +++ b/src/test/subscription/t/099_twophase_added.pl @@ -96,4 +96,48 @@ $result = is($result, q(5), "prepared transactions done before altering can be replicated"); +##################### +# Check the case that prepared transactions exist on the subscriber node +# +# If the two_phase is altering from "true" to "false" and there are prepared +# transactions on the subscriber, they must be aborted. This test checks it. + +# Prepare a transaction to insert some tuples into the table +$node_publisher->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (generate_series(6, 10)); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify the prepared transaction has been replicated to the subscriber because +# two_phase is set to "true". +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(1), "transaction has been prepared on subscriber"); + +# Toggle the two_phase to "false" before the COMMIT PREPARED +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_sub DISABLE; + ALTER SUBSCRIPTION regress_sub SET (two_phase = false); + ALTER SUBSCRIPTION regress_sub ENABLE;"); + +# Verify any prepared transactions are aborted because two_phase is changed to +# "false". +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(0), "prepared transaction done by worker is aborted"); + +# Do COMMIT PREPARED the prepared transaction +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify inserted tuples are replicated +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(10) FROM tab_full;"); +is($result, q(10), "prepared transactions on publisher can be replicated"); + done_testing(); -- 2.43.0