From d22eef501989c96d7a49b9f7fd8b60f3e749c22d Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 17 Apr 2024 06:18:23 +0000 Subject: [PATCH v14 2/4] Alter slot option two_phase only when altering "true" to "false" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since the two_phase option is controlled by both the publisher (as a slot option) and the subscriber (as a subscription option), the slot option must also be modified. Regarding the false->true case, the backend process alters the subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. After the subscription is enabled, a new logical replication worker requests to change the two_phase option of its slot from pending to true after the initial data synchronization is done. The code path is the same as the case in which two_phase is initially set to true, so there is no need to do something remarkable. However, for the true->false case, the backend must connect to the publisher and expressly change the parameter because the apply worker does not alter the option to false. Because this operation cannot be rolled back, altering the two_phase parameter from "true" to "false" within a transaction is prohibited. --- doc/src/sgml/ref/alter_subscription.sgml | 2 +- src/backend/commands/subscriptioncmds.c | 43 ++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 23 +++-- src/include/replication/walreceiver.h | 5 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/099_twophase_added.pl | 99 +++++++++++++++++++ 6 files changed, 157 insertions(+), 16 deletions(-) create mode 100644 src/test/subscription/t/099_twophase_added.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 0b23df1b77..475a42a2e3 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -70,7 +70,7 @@ ALTER SUBSCRIPTION name RENAME TO < ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true, ALTER SUBSCRIPTION ... SET (failover = true|false) and - ALTER SUBSCRIPTION ... SET (two_phase = true|false) + ALTER SUBSCRIPTION ... SET (two_phase = off) cannot be executed inside a transaction block. These commands also cannot be executed when the subscription has diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e925158a4d..996ea6b6de 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1097,6 +1097,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, Form_pg_subscription form; bits32 supported_opts; SubOpts opts = {0}; + bool update_failover; + bool update_two_phase; rel = table_open(SubscriptionRelationId, RowExclusiveLock); @@ -1187,10 +1189,25 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Resolve these transactions and try again"))); /* - * The changed two_phase option of the slot can't be - * rolled back. + * Altering the parameter from "true" to "false" within a + * transaction is prohibited. Since the apply worker does + * not alter the slot option to false, the backend must + * connect to the publisher and expressly change the + * parameter. + * + * There is no need to do something remarkable regarding + * the "false" to "true" case; the backend process alters + * subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. + * After the subscription is enabled, a new logical + * replication worker requests to change the two_phase + * option of its slot from pending to true when the + * initial data synchronization is done. The code path is + * the same as the case in which two_phase is initially + * set to true. */ - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (two_phase)"); + if (!opts.twophase) + PreventInTransactionBlock(isTopLevel, + "ALTER SUBSCRIPTION ... SET (two_phase = false)"); /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = @@ -1548,14 +1565,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering slot. + * Check the need to alter the replication slot. Failover and two_phase + * options are controlled by both the publisher (as a slot option) and the + * subscriber (as a subscription option). The slot option must be altered + * only when changing "true" to "false". The reason has already been + * described in the ALTER_SUBSCRIPTION_OPTIONS section of this function. + */ + update_failover = replaces[Anum_pg_subscription_subfailover - 1]; + update_two_phase = (replaces[Anum_pg_subscription_subtwophasestate - 1] && + !opts.twophase); + + /* + * Try to acquire the connection necessary for altering slot, if needed. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (replaces[Anum_pg_subscription_subfailover - 1] || - replaces[Anum_pg_subscription_subtwophasestate - 1]) + if (update_failover || update_two_phase) { bool must_use_password; char *err; @@ -1575,7 +1602,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase); + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2f035a0c3c..07dfec947d 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase); + const bool *failover, const bool *two_phase); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,16 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase) + const bool *failover, const bool *two_phase) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )", - quote_identifier(slotname), - failover ? "true" : "false", - two_phase ? "true" : "false"); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ", + quote_identifier(slotname)); + + if (failover) + appendStringInfo(&cmd, "FAILOVER %s", + *failover ? "true" : "false"); + + if (failover && two_phase) + appendStringInfo(&cmd, ", "); + + if (two_phase) + appendStringInfo(&cmd, "TWO_PHASE %s", + *two_phase ? "true" : "false"); + + appendStringInfoString(&cmd, " );"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 31fa1257ec..7ffa5a58b3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -377,8 +377,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool failover, - bool two_phase); + const bool *failover, + const bool *two_phase); + /* * walrcv_get_backend_pid_fn diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index c591cd7d61..b4bd522c3d 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/099_twophase_added.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl new file mode 100644 index 0000000000..1124f7fa00 --- /dev/null +++ b/src/test/subscription/t/099_twophase_added.pl @@ -0,0 +1,99 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Additional tests for altering two_phase option +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf( + 'postgresql.conf', + qq(max_prepared_transactions = 10 + log_min_messages = debug1)); +$node_subscriber->start; + +# Define tables on both nodes +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication, with two_phase = "false" +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); + +my $log_offset = -s $node_subscriber->logfile; + +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION regress_sub + CONNECTION '$publisher_connstr' PUBLICATION pub + WITH (two_phase = false, copy_data = false, failover = false)"); + +# Verify the started worker recognized two_phase was disabled +$node_subscriber->wait_for_log( + 'logical replication apply worker for subscription "regress_sub" two_phase is DISABLED', + $log_offset); + +##################### +# Check the case that prepared transactions exist on the publisher node. +# +# Since the two_phase is "false", then normally, this PREPARE will do nothing +# until the COMMIT PREPARED, but in this test, we toggle the two_phase to +# "true" again before the COMMIT PREPARED happens. + +# Prepare a transaction to insert some tuples into the table +$node_publisher->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (generate_series(1, 5)); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify the prepared transaction is not yet replicated to the subscriber +# because two_phase is set to "false". +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(0), "transaction is not prepared on subscriber"); + +$log_offset = -s $node_subscriber->logfile; + +# Toggle the two_phase to "true" *before* the COMMIT PREPARED. Since we are the +# special path for the case where both two_phase and failover are altered, it +# is also set to "true". +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_sub DISABLE; + ALTER SUBSCRIPTION regress_sub SET (two_phase = true, failover = true); + ALTER SUBSCRIPTION regress_sub ENABLE;"); + +# Verify the started worker recognized two_phase was enabled +$node_subscriber->wait_for_log( + 'logical replication apply worker for subscription "regress_sub" two_phase is ENABLED', + $log_offset); + +# And do COMMIT PREPARED the prepared transaction +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify inserted tuples are replicated +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full;"); +is($result, q(5), + "prepared transactions done before altering can be replicated"); + +done_testing(); -- 2.43.0