From 3a7125c89de3d6fd6c1081e1100476e99d74c9c1 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 15 Apr 2024 04:58:29 +0000 Subject: [PATCH v3 4/5] Prohibit altering from false to true if there are prepared transactions on publisher --- doc/src/sgml/ref/alter_subscription.sgml | 9 +-- src/backend/commands/subscriptioncmds.c | 73 ++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 4f33769858..12d6ca2f5e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -254,10 +254,11 @@ ALTER SUBSCRIPTION name RENAME TO < - two_phase can be altered only for disabled - subscriptions. Prepared transactions done by the logical replication - worker must not be existed. If found, the ALTER - SUBSCRIPTION command will fail. + On the publisher side, any prepared transactions must not exist. On the + subscriber side, prepared transactions done by the logical replication + worker must not exist. Prepared transactions done by users are allowed. + The ALTER SUBSCRIPTION command will fail if they are + found. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 563c757be5..57d2e615c6 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -110,6 +110,7 @@ static void check_publications_origin(WalReceiverConn *wrconn, static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +static bool IsPreparedTransactionExistsOnPublisher(Subscription *sub); /* @@ -1202,6 +1203,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("cannot alter %s to false if there are prepared transactions by the subscription", "two_phase"))); + /* + * Suppose the two_phase is altering from false to true, + * and there have been prepared transactions on the + * publisher. In that case, only the COMMIT PREPARED + * record may be decoded and sent to the subscriber. It + * occurs because confirmed_flush_lsn can be ahead of the + * PREPARE record, so decoding all the transactions might + * be skipped after enabling the subscription. + * + * We prohibit the existing prepared transactions on the + * publisher to avoid the issue. + */ + else if (opts.twophase && + IsPreparedTransactionExistsOnPublisher(sub)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter %s to true if there are prepared transactions on publisher", + "two_phase"))); /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = @@ -2489,3 +2508,57 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Check whether there are prepared transactions on the publisher node. Returns + * true if exists, otherwise false. + */ +static bool +IsPreparedTransactionExistsOnPublisher(Subscription *sub) +{ + bool must_use_password; + bool found = false; + WalReceiverConn *wrconn; + WalRcvExecResult *res; + char *err; + StringInfo cmd; + Oid tableRow[1] = {INT4OID}; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + /* Try to connect to the publisher. */ + must_use_password = (!superuser_arg(GetUserId()) && + sub->passwordrequired); + wrconn = walrcv_connect(sub->conninfo, true, true, + must_use_password, + sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* Construct a query and execute it */ + cmd = makeStringInfo(); + appendStringInfo(cmd, + "SELECT 1 FROM pg_prepared_xacts WHERE database = '%s'", + get_database_name(sub->dbid)); + + res = walrcv_exec(wrconn, cmd->data, 1, tableRow); + destroyStringInfo(cmd); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch number of prepared transactions from the primary server: %s", + res->err)); + + /* + * We are only interested in the existence of prepared transactions. + * Hence, it is sufficient to check the number of returned rows. + */ + if (tuplestore_tuple_count(res->tuplestore) > 1) + found = true; + + walrcv_clear_result(res); + + return found; +} -- 2.43.0