From 496cc60401705a4512915db7ebf3358f7004014e Mon Sep 17 00:00:00 2001 From: "Chao Li (Evan)" Date: Wed, 24 Dec 2025 09:17:27 +0800 Subject: [PATCH v2] Refactor replication origin state reset helpers Factor out common logic for clearing per-transaction and per-session replication origin state into dedicated helper functions. This removes duplicated assignments of replorigin_session_origin, replorigin_session_origin_lsn, and replorigin_session_origin_timestamp across multiple call sites, and makes the intended scope of each reset (clear per-transaction state vs. clear per-session state) explicit. No functional change intended. Author: Chao Li --- .../replication/logical/applyparallelworker.c | 1 - src/backend/replication/logical/origin.c | 20 +++++++++++------- src/backend/replication/logical/tablesync.c | 5 ----- src/backend/replication/logical/worker.c | 15 ++++++------- src/include/replication/origin.h | 21 +++++++++++++++++++ 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index a4aafcf5b6e..b05279e0809 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -962,7 +962,6 @@ ParallelApplyWorkerMain(Datum main_arg) * origin which was already acquired by its leader process. */ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); - replorigin_session_origin = originid; CommitTransactionCommand(); /* diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 2380f369578..45d7bc5abc8 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1213,6 +1213,9 @@ replorigin_session_setup(RepOriginId node, int acquired_by) /* probably this one is pointless */ ConditionVariableBroadcast(&session_replication_state->origin_cv); + + /* set local state too */ + replorigin_session_origin = node; } /* @@ -1233,6 +1236,9 @@ replorigin_session_reset(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); + /* + * Clear sessioin state in shared memory + */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); session_replication_state->acquired_by = 0; @@ -1242,6 +1248,11 @@ replorigin_session_reset(void) LWLockRelease(ReplicationOriginLock); ConditionVariableBroadcast(cv); + + /* + * Clear local session state + */ + replorigin_session_clear_state(); } /* @@ -1395,8 +1406,6 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) pid = PG_GETARG_INT32(1); replorigin_session_setup(origin, pid); - replorigin_session_origin = origin; - pfree(name); PG_RETURN_VOID(); @@ -1412,10 +1421,6 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS) replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; - PG_RETURN_VOID(); } @@ -1482,8 +1487,7 @@ pg_replication_origin_xact_reset(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(true, false); - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_xact_clear_state(); PG_RETURN_VOID(); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2522e372036..6ac467a9e19 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -323,9 +323,6 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) * This is needed to allow the origin to be dropped. */ replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; /* * Drop the tablesync's origin tracking if exists. @@ -1320,7 +1317,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ originid = replorigin_by_name(originname, false); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; *origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); @@ -1407,7 +1403,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; /* * If the user did not opt to run as the owner of the subscription diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 718408bb599..651045debee 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -627,7 +627,7 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); -static void replorigin_reset(int code, Datum arg); +static void on_exit_clear_state(int code, Datum arg); /* * Form the origin name for the subscription. @@ -5594,7 +5594,7 @@ start_apply(XLogRecPtr origin_startpos) * transaction loss as that transaction won't be sent again by the * server. */ - replorigin_reset(0, (Datum) 0); + replorigin_session_clear_state(); if (MySubscription->disableonerr) DisableSubscriptionAndExit(); @@ -5652,7 +5652,6 @@ run_apply_worker(void) if (!OidIsValid(originid)) originid = replorigin_create(originname); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); @@ -5865,18 +5864,16 @@ InitializeLogRepWorker(void) * replication workers that set up origins and apply remote transactions * are protected. */ - before_shmem_exit(replorigin_reset, (Datum) 0); + before_shmem_exit(on_exit_clear_state, (Datum) 0); } /* - * Reset the origin state. + * Callback on exit to reset the origin state. */ static void -replorigin_reset(int code, Datum arg) +on_exit_clear_state(int code, Datum arg) { - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_session_clear_state(); } /* diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 2a73f6aa492..288f9ff658f 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -44,6 +44,27 @@ extern PGDLLIMPORT RepOriginId replorigin_session_origin; extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; +/* + * Clear per-transaction replication origin state. + */ +static inline void +replorigin_xact_clear_state(void) +{ + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + +/* + * Clear per-session replication origin state. + */ +static inline void +replorigin_session_clear_state(void) +{ + replorigin_xact_clear_state(); + replorigin_session_origin = InvalidRepOriginId; +} + + /* GUCs */ extern PGDLLIMPORT int max_active_replication_origins; -- 2.39.5 (Apple Git-154)