From fa8c3494720935dc37497f07f28c4468eddd8535 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 21 Apr 2025 17:05:29 +0900 Subject: [PATCH v2-PG16-PG17] Fix oversight 3f28b2f 3f28b2f introduced replorigin_reset() in apply_error_callback() in case of elevel >= ERROR to ensure the replication origin is released from the session before exiting the process. However, in some cases, apply_error_callback() can be called with ERROR, but the apply worker can continue applying the logical replication. If this happens, the worker applies changes without setting the replication origin, which can lead to data duplication. This commit moves replorigin_reset() to PG_CATCH() part in start_apply(). The path can be reached only by the exiting workers. --- src/backend/replication/logical/worker.c | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 65e22306c48..1f7fa1949a9 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -416,6 +416,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void replorigin_reset(int code, Datum arg); + /* * Form the origin name for the subscription. * @@ -4441,6 +4443,13 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Reset the origin state to prevent the advancement of origin progress + * if we fail to apply. Otherwise, this will result in transaction loss + * as that transaction won't be sent again by the server. + */ + replorigin_reset(0, (Datum) 0); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -4928,23 +4937,12 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; - int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); - elevel = geterrlevel(); - - /* - * Reset the origin state to prevent the advancement of origin progress if - * we fail to apply. Otherwise, this will result in transaction loss as - * that transaction won't be sent again by the server. - */ - if (elevel >= ERROR) - replorigin_reset(0, (Datum) 0); - if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) -- 2.43.5