From 5e4e79a5056bae374911d96eef6c4d945e23903e Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 17 Apr 2024 06:18:23 +0000 Subject: [PATCH v4 2/3] Alter slot option two_phase only when altering true to false --- src/backend/commands/subscriptioncmds.c | 21 +++++- .../libpqwalreceiver/libpqwalreceiver.c | 21 ++++-- src/include/replication/walreceiver.h | 8 +-- src/test/subscription/meson.build | 1 + src/test/subscription/t/099_twophase_added.pl | 72 +++++++++++++++++++ 5 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 src/test/subscription/t/099_twophase_added.pl diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3299a60fff..0d80d6e110 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -850,7 +850,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, else if (opts.slot_name && (opts.failover || walrcv_server_version(wrconn) >= 170000)) { - walrcv_alter_slot(wrconn, opts.slot_name, opts.twophase, opts.failover); + walrcv_alter_slot(wrconn, opts.slot_name, NULL, &opts.failover); } } PG_FINALLY(); @@ -1564,6 +1564,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool must_use_password; char *err; WalReceiverConn *wrconn; + bool two_phase_needs_to_be_updated; + bool failover_needs_to_be_updated; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -1577,9 +1579,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not connect to the publisher: %s", err))); + /* + * Consider which slot option must be altered. + * + * We must alter the failover option whenever subfailover is updated. + * Two_phase, however, is altered only when changing true to false. + */ + two_phase_needs_to_be_updated = + (replaces[Anum_pg_subscription_subtwophasestate - 1] && + !opts.twophase); + failover_needs_to_be_updated = + replaces[Anum_pg_subscription_subfailover - 1]; + PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.twophase, opts.failover); + if (two_phase_needs_to_be_updated || failover_needs_to_be_updated) + walrcv_alter_slot(wrconn, sub->slotname, + two_phase_needs_to_be_updated ? &opts.twophase : NULL, + failover_needs_to_be_updated ? &opts.failover : NULL); } PG_FINALLY(); { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index baef3bdec0..546b599848 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool two_phase, bool failover); + const bool *two_phase, const bool *failover); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,16 +1121,25 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool two_phase, bool failover) + const bool *two_phase, const bool *failover) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( TWO_PHASE %s, FAILOVER %s )", - quote_identifier(slotname), - two_phase ? "true" : "false", - failover ? "true" : "false"); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ", + quote_identifier(slotname)); + + if (two_phase) + appendStringInfo(&cmd, "TWO_PHASE %s%s ", + (*two_phase) ? "true" : "false", + failover ? ", " : ""); + + if (failover) + appendStringInfo(&cmd, "FAILOVER %s ", + (*failover) ? "true" : "false"); + + appendStringInfoString(&cmd, ");"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index a443f402f5..f30637aa4a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -372,13 +372,13 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, /* * walrcv_alter_slot_fn * - * Change the definition of a replication slot. Currently, it only supports - * changing the failover property of the slot. + * Change the definition of a replication slot. Currently, it supports + * changing the two_phase and the failover property of the slot. */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool two_phase, - bool failover); + const bool *two_phase, + const bool *failover); /* * walrcv_get_backend_pid_fn diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index c591cd7d61..b4bd522c3d 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/099_twophase_added.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl new file mode 100644 index 0000000000..c13a37675a --- /dev/null +++ b/src/test/subscription/t/099_twophase_added.pl @@ -0,0 +1,72 @@ +# Copyright (c) 2021-2024, PostgreSQL Global Development Group + +# Additional tests for altering two_phase option +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Define pre-existing tables on both nodes +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication, with two_phase = off +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); + +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION sub + CONNECTION '$publisher_connstr' PUBLICATION pub + WITH (two_phase = off, copy_data = off)"); + +###### +# Check the case that prepared transactions exist on publisher node +###### + +$node_publisher->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (generate_series(1, 5)); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup('sub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(0), "transaction is not prepared on subscriber"); + +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION sub DISABLE; + ALTER SUBSCRIPTION sub SET (two_phase = on); + ALTER SUBSCRIPTION sub ENABLE;"); + +$node_publisher->safe_psql( 'postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); +$node_publisher->wait_for_catchup('sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full;"); +is($result, q(5), + "prepared transactions done before altering can be replicated"); + +done_testing(); -- 2.43.0