From 1f65aaeacb63502cf6f2eb0d6f3889037a1bff9f Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 17 Apr 2024 06:18:23 +0000 Subject: [PATCH v7 2/4] Alter slot option two_phase only when altering true to false --- doc/src/sgml/ref/alter_subscription.sgml | 2 +- src/backend/commands/subscriptioncmds.c | 23 +++++- .../libpqwalreceiver/libpqwalreceiver.c | 21 ++++-- src/include/replication/walreceiver.h | 5 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/099_twophase_added.pl | 72 +++++++++++++++++++ 6 files changed, 113 insertions(+), 11 deletions(-) create mode 100644 src/test/subscription/t/099_twophase_added.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 88e9a72147..0c2894a94e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -70,7 +70,7 @@ ALTER SUBSCRIPTION name RENAME TO < ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true, ALTER SUBSCRIPTION ... SET (failover = on|off) and - ALTER SUBSCRIPTION ... SET (two_phase = on|off) + ALTER SUBSCRIPTION ... SET (two_phase = off) cannot be executed inside a transaction block. These commands also cannot be executed when the subscription has diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1a2f0c1e64..71b058b385 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1190,7 +1190,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * The changed two_phase option of the slot can't be rolled * back. */ - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (two_phase)"); + if (!opts.twophase) + PreventInTransactionBlock(isTopLevel, + "ALTER SUBSCRIPTION ... SET (two_phase = off)"); /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = @@ -1560,6 +1562,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool must_use_password; char *err; WalReceiverConn *wrconn; + bool failover_needs_to_be_updated; + bool two_phase_needs_to_be_updated; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -1573,9 +1577,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not connect to the publisher: %s", err))); + /* + * Consider which slot option must be altered. + * + * We must alter the failover option whenever subfailover is updated. + * Two_phase, however, is altered only when changing true to false. + */ + failover_needs_to_be_updated = + replaces[Anum_pg_subscription_subfailover - 1]; + two_phase_needs_to_be_updated = + (replaces[Anum_pg_subscription_subtwophasestate - 1] && + !opts.twophase); + PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase); + if (two_phase_needs_to_be_updated || failover_needs_to_be_updated) + walrcv_alter_slot(wrconn, sub->slotname, + failover_needs_to_be_updated ? &opts.failover : NULL, + two_phase_needs_to_be_updated ? &opts.twophase : NULL); } PG_FINALLY(); { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 998bbd517a..c383767096 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase); + const bool *failover, const bool *two_phase); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,16 +1121,25 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase) + const bool *failover, const bool *two_phase) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )", - quote_identifier(slotname), - failover ? "true" : "false", - two_phase ? "true" : "false"); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ", + quote_identifier(slotname)); + + if (failover) + appendStringInfo(&cmd, "FAILOVER %s ", + (*failover) ? "true" : "false"); + + if (two_phase) + appendStringInfo(&cmd, "TWO_PHASE %s%s ", + (*two_phase) ? "true" : "false", + failover ? ", " : ""); + + appendStringInfoString(&cmd, ");"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 31fa1257ec..7ffa5a58b3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -377,8 +377,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool failover, - bool two_phase); + const bool *failover, + const bool *two_phase); + /* * walrcv_get_backend_pid_fn diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index c591cd7d61..b4bd522c3d 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/099_twophase_added.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl new file mode 100644 index 0000000000..c13a37675a --- /dev/null +++ b/src/test/subscription/t/099_twophase_added.pl @@ -0,0 +1,72 @@ +# Copyright (c) 2021-2024, PostgreSQL Global Development Group + +# Additional tests for altering two_phase option +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Define pre-existing tables on both nodes +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication, with two_phase = off +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); + +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION sub + CONNECTION '$publisher_connstr' PUBLICATION pub + WITH (two_phase = off, copy_data = off)"); + +###### +# Check the case that prepared transactions exist on publisher node +###### + +$node_publisher->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (generate_series(1, 5)); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup('sub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(0), "transaction is not prepared on subscriber"); + +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION sub DISABLE; + ALTER SUBSCRIPTION sub SET (two_phase = on); + ALTER SUBSCRIPTION sub ENABLE;"); + +$node_publisher->safe_psql( 'postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); +$node_publisher->wait_for_catchup('sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full;"); +is($result, q(5), + "prepared transactions done before altering can be replicated"); + +done_testing(); -- 2.43.0