From 5df654692988e3880b6132c53f218e511cf6f51d Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 17 Apr 2024 06:18:23 +0000 Subject: [PATCH v17 2/3] Alter slot option two_phase only when altering "true" to "false" Since the two_phase option is controlled by both the publisher (as a slot option) and the subscriber (as a subscription option), the slot option must also be modified. Regarding the false->true case, the backend process alters the subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. After the subscription is enabled, a new logical replication worker requests to change the two_phase option of its slot from pending to true after the initial data synchronization is done. The code path is the same as the case in which two_phase is initially set to true, so there is no need to do something remarkable. However, for the true->false case, the backend must connect to the publisher and expressly change the parameter because the apply worker does not alter the option to false. Because this operation cannot be rolled back, altering the two_phase parameter from "true" to "false" within a transaction is prohibited. --- doc/src/sgml/ref/alter_subscription.sgml | 2 +- src/backend/commands/subscriptioncmds.c | 76 ++++++++++++++----- .../libpqwalreceiver/libpqwalreceiver.c | 23 ++++-- src/include/replication/walreceiver.h | 5 +- src/test/subscription/t/021_twophase.pl | 41 ++++++---- 5 files changed, 102 insertions(+), 45 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 0b23df1b77..475a42a2e3 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -70,7 +70,7 @@ ALTER SUBSCRIPTION name RENAME TO < ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true, ALTER SUBSCRIPTION ... SET (failover = true|false) and - ALTER SUBSCRIPTION ... SET (two_phase = true|false) + ALTER SUBSCRIPTION ... SET (two_phase = off) cannot be executed inside a transaction block. These commands also cannot be executed when the subscription has diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 255628c396..3729956147 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -112,6 +112,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CommonChecksForFailoverAndTwophase(Subscription *sub, const char *option, + bool needs_update, bool isTopLevel); @@ -1074,11 +1075,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, */ static void CommonChecksForFailoverAndTwophase(Subscription *sub, const char *option, - bool isTopLevel) + bool needs_update, bool isTopLevel) { - StringInfoData cmd; - - if (!sub->slotname) + if (needs_update && !sub->slotname) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot set %s for a subscription that does not have a slot name", @@ -1096,16 +1095,20 @@ CommonChecksForFailoverAndTwophase(Subscription *sub, const char *option, errmsg("cannot set %s for enabled subscription", option))); - initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option); - /* * The changed option of the slot can't be rolled back: prevent we are in * the transaction state. */ - PreventInTransactionBlock(isTopLevel, cmd.data); + if (needs_update) + { + StringInfoData cmd; - pfree(cmd.data); + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option); + + PreventInTransactionBlock(isTopLevel, cmd.data); + pfree(cmd.data); + } } /* @@ -1127,6 +1130,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, Form_pg_subscription form; bits32 supported_opts; SubOpts opts = {0}; + bool update_failover; + bool update_two_phase; rel = table_open(SubscriptionRelationId, RowExclusiveLock); @@ -1259,8 +1264,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) { + /* + * First mark the needs to alter the replication slot. + * Failover option is controlled by both the publisher (as + * a slot option) and the subscriber (as a subscription + * option). + */ + update_failover = true; + CommonChecksForFailoverAndTwophase(sub, "failover", - isTopLevel); + update_failover, isTopLevel); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); @@ -1269,16 +1282,36 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) { + /* + * First check the need to alter the replication slot. + * Two_phase option is controlled by both the publisher + * (as a slot option) and the subscriber (as a + * subscription option). The slot option must be altered + * only when changing "true" to "false". + * + * There is no need to do something remarkable regarding + * the "false" to "true" case; the backend process alters + * subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. + * After the subscription is enabled, a new logical + * replication worker requests to change the two_phase + * option of its slot from pending to true when the + * initial data synchronization is done. The code path is + * the same as the case in which two_phase is initially + * set to true. + */ + update_two_phase = !opts.twophase; + CommonChecksForFailoverAndTwophase(sub, "two_phase", - isTopLevel); + update_two_phase, isTopLevel); /* - * slot_name and two_phase cannot be altered - * simultaneously. The latter part refers to the pre-set - * slot name and tries to modify the slot option, so - * changing both does not make sense. + * If the wo_phase slot option must be altered, this cannot + * be altered with slot_name simultaneously. The latter + * part refers to the pre-set slot name and tries to modify + * the slot option, so changing both does not make sense. */ - if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) + if (update_two_phase && + IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("slot_name and two_phase cannot be altered at the same time"))); @@ -1300,7 +1333,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * two_phase cannot be disabled if there are any * uncommitted prepared transactions present. */ - if (!opts.twophase && + if (update_two_phase && sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && LookupGXactBySubid(subid)) ereport(ERROR, @@ -1559,14 +1592,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering slot. + * Try to acquire the connection necessary for altering slot, if needed. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (replaces[Anum_pg_subscription_subfailover - 1] || - replaces[Anum_pg_subscription_subtwophasestate - 1]) + if (update_failover || update_two_phase) { bool must_use_password; char *err; @@ -1586,7 +1618,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase); + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2f035a0c3c..07dfec947d 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase); + const bool *failover, const bool *two_phase); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,16 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase) + const bool *failover, const bool *two_phase) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )", - quote_identifier(slotname), - failover ? "true" : "false", - two_phase ? "true" : "false"); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ", + quote_identifier(slotname)); + + if (failover) + appendStringInfo(&cmd, "FAILOVER %s", + *failover ? "true" : "false"); + + if (failover && two_phase) + appendStringInfo(&cmd, ", "); + + if (two_phase) + appendStringInfo(&cmd, "TWO_PHASE %s", + *two_phase ? "true" : "false"); + + appendStringInfoString(&cmd, " );"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 31fa1257ec..7ffa5a58b3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -377,8 +377,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool failover, - bool two_phase); + const bool *failover, + const bool *two_phase); + /* * walrcv_get_backend_pid_fn diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 4e8f627f7b..f56dff4b12 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -375,6 +375,12 @@ $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); # then verify that the altered subscription reflects the two_phase option. ############################### +# Confirm two-phase slot option is enabled before altering +$result = $node_publisher->safe_psql('postgres', + "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';" +); +is($result, qq(t), 'two-phase is enabled'); + # Alter subscription two_phase to false $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); @@ -393,7 +399,13 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); $result = $node_subscriber->safe_psql('postgres', "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" ); -is($result, qq(d), 'two-phase should be disabled'); +is($result, qq(d), 'two-phase subscription option should be disabled'); + +# Make sure that the two-phase slot option is also disabled +$result = $node_publisher->safe_psql('postgres', + "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';" +); +is($result, qq(f), 'two-phase slot option should be disabled'); # Now do a prepare on the publisher and make sure that it is not replicated. $node_publisher->safe_psql( @@ -411,6 +423,19 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'should be no prepared transactions on subscriber'); +# Toggle the two_phase to "true" *before* the COMMIT PREPARED. Since we are the +# special path for the case where both two_phase and failover are altered, it +# is also set to "true". +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'" +); +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true, failover = true); + ALTER SUBSCRIPTION tap_sub_copy ENABLE;"); + # Now commit the insert and verify that it is replicated $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';"); @@ -422,20 +447,6 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); is($result, qq(3), 'replicated data in subscriber table'); -# Alter subscription two_phase to true -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); -$node_subscriber->poll_query_until('postgres', - "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'" -); -$node_subscriber->safe_psql( - 'postgres', " - ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true); - ALTER SUBSCRIPTION tap_sub_copy ENABLE;"); - -# Wait for subscription startup -$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); - # Make sure that the two-phase is enabled on the subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" -- 2.43.0