From 50f339b5790ed0a19c1464ff6955a89292613d8c Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 21 Apr 2025 15:16:31 +0900 Subject: [PATCH v5-HEAD] Fix oversight 3f28b2f 3f28b2f introduced replorigin_reset() in apply_error_callback() in case of elevel >= ERROR to ensure the replication origin is released from the session before exiting the process. However, in some cases, apply_error_callback() can be called with ERROR, but the apply worker can continue applying the logical replication. If this happens, the worker applies changes without setting the replication origin, which can lead to data duplication. This commit moves replorigin_reset() to PG_CATCH() part in start_apply(). The path can be reached only by the exiting workers. --- src/backend/replication/logical/worker.c | 21 ++++--- src/backend/utils/error/elog.c | 17 ------ src/include/utils/elog.h | 1 - src/test/subscription/t/100_bugs.pl | 73 ++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 29 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5ce596f4576..4151a4b2a96 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -414,6 +414,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void replorigin_reset(int code, Datum arg); + /* * Form the origin name for the subscription. * @@ -4516,6 +4518,14 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Reset the origin state to prevent the advancement of origin + * progress if we fail to apply. Otherwise, this will result in + * transaction loss as that transaction won't be sent again by the + * server. + */ + replorigin_reset(0, (Datum) 0); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -5004,23 +5014,12 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; - int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); - elevel = geterrlevel(); - - /* - * Reset the origin state to prevent the advancement of origin progress if - * we fail to apply. Otherwise, this will result in transaction loss as - * that transaction won't be sent again by the server. - */ - if (elevel >= ERROR) - replorigin_reset(0, (Datum) 0); - if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index d6299633ab7..47af743990f 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1590,23 +1590,6 @@ geterrcode(void) return edata->sqlerrcode; } -/* - * geterrlevel --- return the currently set error level - * - * This is only intended for use in error callback subroutines, since there - * is no other place outside elog.c where the concept is meaningful. - */ -int -geterrlevel(void) -{ - ErrorData *edata = &errordata[errordata_stack_depth]; - - /* we don't bother incrementing recursion_depth */ - CHECK_STACK_DEPTH(); - - return edata->elevel; -} - /* * geterrposition --- return the currently set error position (0 if none) * diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index a5313c5d2d5..5eac0e16970 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -227,7 +227,6 @@ extern int internalerrquery(const char *query); extern int err_generic_string(int field, const char *str); extern int geterrcode(void); -extern int geterrlevel(void); extern int geterrposition(void); extern int getinternalerrposition(void); diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index b3924ca4b09..137bc375bcc 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -498,4 +498,77 @@ is( $node_publisher->psql( $node_publisher->stop('fast'); $node_subscriber->stop('fast'); +# The bug was that the replication origin wasn’t updated whe +# apply_error_callback() was called with elevel >= ERROR, and the apply worker +# continued running afterward. + +$node_publisher->rotate_logfile(); +$node_publisher->start(); + +$node_subscriber->rotate_logfile(); +$node_subscriber->start(); + +# Set up a publication with a table +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE t1 (a int); + CREATE PUBLICATION regress_pub FOR TABLE t1; +)); + +# Set up a subscription which subscribes the publication +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE t1 (a int); + CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub; +)); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub'); + +# Obtain current remote_lsn value to check its advancement later +my $remote_lsn = $node_subscriber->safe_psql('postgres', + "SELECT remote_lsn FROM pg_catalog.pg_replication_origin_status os, pg_catalog.pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'" +); + +# Define an after-trigger function for the table insert. It can be fired even +# by the apply worker and always raises an exception. This situation allows +# worker continue after apply_error_callback() is called with elevel = ERROR. +$node_subscriber->safe_psql( + 'postgres', q{ +CREATE FUNCTION handle_exception_trigger() +RETURNS TRIGGER AS $$ +BEGIN + BEGIN + -- Raise an exception + RAISE EXCEPTION 'This is a test exception'; + EXCEPTION + WHEN OTHERS THEN + RETURN NEW; + END; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +}); + +$node_subscriber->safe_psql( + 'postgres', q{ +CREATE TRIGGER silent_exception_trigger +AFTER INSERT OR UPDATE ON t1 +FOR EACH ROW +EXECUTE FUNCTION handle_exception_trigger(); + +ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger; +}); + +# Insert a tuple to replicate changes +$node_publisher->safe_psql('postgres', "INSERT INTO t1 VALUES (1);"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Confirms the origin can be advanced +$result = $node_subscriber->safe_psql('postgres', + "SELECT remote_lsn > '$remote_lsn' FROM pg_catalog.pg_replication_origin_status os, pg_catalog.pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'" +); +is($result, 't', + 'remote_lsn has advanced for apply worker raising an exception'); + done_testing(); -- 2.43.5