From 3cf0f657f4506f6710fe032c4f43553be262e89d Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Tue, 23 Dec 2025 13:38:37 +0800 Subject: [PATCH v2_PG16] Fix unexpected origin advancement during parallel apply failure The logical replication parallel apply worker may erroneously advance the origin progress during an error or unsuccessful apply. This can lead to transaction loss, as these transactions will not be resent by the server. Commit 3f28b2fc addressed a similar issue in both the apply worker and table sync worker, by registering a before_shmem_exit callback to reset the origin information, preventing the worker from advancing it during transaction abortion on shutdown. This commit registers the same callback for the parallel apply worker, ensuring consistent behavior across all workers. --- src/backend/replication/logical/worker.c | 30 +++++++------ .../subscription/t/023_twophase_stream.pl | 45 +++++++++++++++++++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9b5c641941f..d6bbffd7c8d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4544,6 +4544,23 @@ InitializeApplyWorker(void) MySubscription->name))); CommitTransactionCommand(); + + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an incomplete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + * + * Register this callback here to ensure that all types of logical + * replication workers that set up origins and apply remote transactions + * are protected. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); } /* Logical Replication Apply worker entry point */ @@ -4581,19 +4598,6 @@ ApplyWorkerMain(Datum main_arg) InitializeApplyWorker(); - /* - * Register a callback to reset the origin state before aborting any - * pending transaction during shutdown (see ShutdownPostgres()). This will - * avoid origin advancement for an in-complete transaction which could - * otherwise lead to its loss as such a transaction won't be sent by the - * server again. - * - * Note that even a LOG or DEBUG statement placed after setting the origin - * state may process a shutdown signal before committing the current apply - * operation. So, it is important to register such a callback here. - */ - before_shmem_exit(replorigin_reset, (Datum) 0); - InitializingApplyWorker = false; /* Connect to the origin and start the replication. */ diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 0303807846e..39ad688a7bd 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -429,6 +429,51 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); is($result, qq(1), 'transaction is committed on subscriber'); +# Test the ability to re-apply a transaction when a parallel apply worker fails +# to prepare the transaction due to insufficient max_prepared_transactions +# setting. +$node_subscriber->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 0 +debug_logical_replication_streaming = buffered +)); +$node_subscriber->restart; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(2); + PREPARE TRANSACTION 'xact'; + COMMIT PREPARED 'xact'; + }); + +$offset = -s $node_subscriber->logfile; + +# Confirm the ERROR is reported because max_prepared_transactions is zero +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/, + $offset); + +# Confirm that the parallel apply worker has encountered an error. The check +# focuses on the worker type as a keyword, since the error message content may +# differ based on whether the leader initially detected the parallel apply +# worker's failure or received a signal from it. +$node_subscriber->wait_for_log( + qr/ERROR: .*logical replication parallel apply worker.*/, + $offset); + +# Set max_prepared_transactions to correct value to resume the replication +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->restart; + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(2), 'transaction is committed on subscriber after retrying'); + ############################### # check all the cleanup ############################### -- 2.51.1.windows.1