From 2a9649307e8537835b507e53e5a791811020d6e0 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 23 Jul 2024 08:40:29 +0000 Subject: [PATCH 2/2] Prevent origin progress advancement if failed to apply a transaction The origin progress is advanced when aborting a transaction, intented to signify the successful streaming and application of the ROLLBACK from the publisher to the subscriber in streaming parallel mode. But when an error occurred during the commit or prepare after setting replorigin_session_origin_lsn, the origin progress was advanced as well which is unexpected. This led to skipped transactions that were not replicated again. Fix it by resetting replorigin_session_origin_lsn in case of error. Originally reported by Hou Zhijie --- src/backend/replication/logical/worker.c | 28 +++++++++++++++++++++++ src/include/replication/worker_internal.h | 3 ++- src/test/subscription/t/021_twophase.pl | 14 +++++++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 46f7a5c3a5..9a1dbead60 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4430,6 +4430,14 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Reset the origin data to prevent the advancement of origin progress + * if the transaction failed to apply. + */ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -4640,6 +4648,19 @@ InitializeLogRepWorker(void) CommitTransactionCommand(); } +/* + * Reset the origin state. This is needed to prevent the advancement of origin + * progress if the transaction failed to apply. + */ +static void +replorigin_reset(int code, Datum arg) +{ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + + /* Common function to setup the leader apply or tablesync worker. */ void SetupApplyOrSyncWorker(int worker_slot) @@ -4668,6 +4689,13 @@ SetupApplyOrSyncWorker(int worker_slot) InitializeLogRepWorker(); + /* + * Register a callback to reset the origin state before aborting the + * transaction in ShutdownPostgres(). This is to prevent the advancement + * of origin progress if the transaction failed to apply. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index bdc73d2374..0bf6f737f4 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -164,7 +164,8 @@ typedef struct ParallelApplyWorkerShared /* * XactLastCommitEnd or XactLastPrepareEnd from the parallel apply worker. - * This is required by the leader worker so it can update the lsn_mappings. + * This is required by the leader worker so it can update the + * lsn_mappings. */ XLogRecPtr last_commit_end; diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 9437cd4c3b..e635be74c6 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -23,7 +23,7 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; $node_subscriber->append_conf('postgresql.conf', - qq(max_prepared_transactions = 10)); + qq(max_prepared_transactions = 0)); $node_subscriber->start; # Create some pre-existing content on publisher @@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query) # then COMMIT PREPARED ############################### +# Save the log location, to see the failure of the application +my $log_location = -s $node_subscriber->logfile; + $node_publisher->safe_psql( 'postgres', " BEGIN; INSERT INTO tab_full VALUES (11); PREPARE TRANSACTION 'test_prepared_tab_full';"); +# Confirm the ERROR is reported becasue max_prepared_transactions is zero +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/); + +# Set max_prepared_transactions to correct value to resume the replication +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->restart; + $node_publisher->wait_for_catchup($appname); # check that transaction is in prepared state on subscriber -- 2.43.0