From 605c2759160fce0fcdb8c38babf0da8f8bed10c5 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 21 Aug 2025 15:23:02 +0800 Subject: [PATCH v64 2/2] Resume retaining the information for conflict detection The patch allows the launcher to re-initialized invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration. --- doc/src/sgml/ref/create_subscription.sgml | 7 +- src/backend/replication/logical/launcher.c | 81 ++++++-- src/backend/replication/logical/worker.c | 204 +++++++++++++++++++-- 3 files changed, 260 insertions(+), 32 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 8de0cd0d53f..f032dc2df59 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -540,8 +540,11 @@ CREATE SUBSCRIPTION subscription_namemax_conflict_retention_duration set within the corresponding subscription. To re-enable retention manually, you can - disable retain_dead_tuples for all subscriptions and - re-enable it after confirming this replication slot has been dropped. + disable retain_dead_tuples and re-enable it. + Alternatively, the retention will be automatically resumed once at + least one apply worker confirms that the retention duration is within + the specified limit, or if a new subscription with retain_dead_tuples + enabled is created. Note that overall retention will not stop if other subscriptions diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 81cae445cc0..3a86300a4fa 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -102,9 +102,12 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); -static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); +static void compute_min_nonremovable_xid(LogicalRepWorker *worker, + bool can_advance_xmin, + TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); static void update_conflict_slot_xmin(TransactionId new_xmin); +static void init_conflict_slot_xmin(void); /* @@ -1252,6 +1255,14 @@ ApplyLauncherMain(Datum main_arg) * subscription was enabled. */ CreateConflictDetectionSlot(); + + /* + * Initialize slot.xmin as a subscription resumes retention of + * information useful for conflict detection. + */ + if (sub->retentionactive && + !TransactionIdIsValid(MyReplicationSlot->data.xmin)) + init_conflict_slot_xmin(); } if (!sub->enabled) @@ -1268,9 +1279,8 @@ ApplyLauncherMain(Datum main_arg) * required for conflict detection among all running apply * workers that enables retain_dead_tuples. */ - if (sub->retaindeadtuples && sub->retentionactive && - can_advance_xmin) - compute_min_nonremovable_xid(w, &xmin); + if (sub->retaindeadtuples && sub->retentionactive) + compute_min_nonremovable_xid(w, can_advance_xmin, &xmin); /* worker is running already */ continue; @@ -1381,7 +1391,8 @@ ApplyLauncherMain(Datum main_arg) * in *xmin. */ static void -compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) +compute_min_nonremovable_xid(LogicalRepWorker *worker, bool can_advance_xmin, + TransactionId *xmin) { TransactionId nonremovable_xid; @@ -1397,7 +1408,27 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) nonremovable_xid = worker->oldest_nonremovable_xid; SpinLockRelease(&worker->mutex); - Assert(TransactionIdIsValid(nonremovable_xid)); + /* + * Assign slot.xmin to the apply worker's oldest_nonremovable_xid if the + * latter is invalid. This ensures the apply worker continues to maintain + * the oldest_nonremovable_xid (see resume_conflict_info_retention). + */ + if (!TransactionIdIsValid(nonremovable_xid)) + { + nonremovable_xid = MyReplicationSlot->data.xmin; + + SpinLockAcquire(&worker->mutex); + worker->oldest_nonremovable_xid = nonremovable_xid; + SpinLockRelease(&worker->mutex); + + /* Notify the apply worker to start the next cycle of management */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + logicalrep_worker_wakeup_ptr(worker); + LWLockRelease(LogicalRepWorkerLock); + } + + if (!can_advance_xmin) + return; if (!TransactionIdIsValid(*xmin) || TransactionIdPrecedes(nonremovable_xid, *xmin)) @@ -1458,23 +1489,15 @@ update_conflict_slot_xmin(TransactionId new_xmin) } /* - * Create and acquire the replication slot used to retain information for - * conflict detection, if not yet. + * Initialize the xmin for the conflict detection slot. */ -void -CreateConflictDetectionSlot(void) +static void +init_conflict_slot_xmin(void) { TransactionId xmin_horizon; - /* Exit early, if the replication slot is already created and acquired */ - if (MyReplicationSlot) - return; - - ereport(LOG, - errmsg("creating replication conflict detection slot")); - - ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + Assert(MyReplicationSlot && + !TransactionIdIsValid(MyReplicationSlot->data.xmin)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -1494,6 +1517,26 @@ CreateConflictDetectionSlot(void) ReplicationSlotSave(); } +/* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + init_conflict_slot_xmin(); +} + /* * Is current process the logical replication launcher? */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c7c9a0be356..0c2dcf4a0e3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -194,7 +194,7 @@ * update_deleted is necessary, as the UPDATEs in remote transactions should be * ignored if their timestamp is earlier than that of the dead tuples. * - * If max_conflict_retention_duration is defined, one additional phase is + * If max_conflict_retention_duration is defined, two additional phases are * involved: * * - RDT_STOP_CONFLICT_INFO_RETENTION: @@ -204,6 +204,20 @@ * pg_subscription.subretentionactive is updated to false within a new * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. * + * - RDT_RESUME_CONFLICT_INFO_RETENTION: + * This phase is triggered when retention was previously stopped, and the time + * required to advance the non-removable transaction ID in the + * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits + * (or if max_conflict_retention_duration is set to 0). Additionally, it + * initiates if conditions improve allowing more efficient advancement. During + * this phase, pg_subscription.subretentionactive is updated to true within a + * new transaction, and we wait for the launcher to initialize the + * oldest_nonremovable_xid before proceeding to RDT_GET_CANDIDATE_XID phase. + * + * Note that the state could transition to RDT_RESUME_CONFLICT_INFO_RETENTION at + * any phase if the retention has been stopped, but + * max_conflict_retention_duration is now set to 0. + * * Note that advancing the non-removable transaction ID is not supported if the * publisher is also a physical standby. This is because the logical walsender * on the standby can only get the WAL replay position but there may be more @@ -389,6 +403,7 @@ typedef enum RDT_WAIT_FOR_PUBLISHER_STATUS, RDT_WAIT_FOR_LOCAL_FLUSH, RDT_STOP_CONFLICT_INFO_RETENTION, + RDT_RESUME_CONFLICT_INFO_RETENTION, } RetainDeadTuplesPhase; /* @@ -433,6 +448,10 @@ typedef struct RetainDeadTuplesData long table_sync_wait_time; /* time spent waiting for table sync * to finish */ + bool wait_for_initial_xid; /* wait for the launcher to initialize + * the apply worker's + * oldest_nonremovable_xid */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -576,6 +595,9 @@ static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static bool should_resume_retention_immediately(RetainDeadTuplesData *rdt_data, + bool status_received); +static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -4377,6 +4399,13 @@ maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, if (!can_advance_nonremovable_xid(rdt_data)) return; + /* + * Resume retention immediately if required. (See + * should_resume_retention_immediately() for details). + */ + if (should_resume_retention_immediately(rdt_data, status_received)) + rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION; + process_rdt_phase_transition(rdt_data, status_received); } @@ -4399,10 +4428,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; - /* No need to advance if we have already stopped retaining */ - if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) - return false; - return true; } @@ -4431,6 +4456,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, case RDT_STOP_CONFLICT_INFO_RETENTION: stop_conflict_info_retention(rdt_data); break; + case RDT_RESUME_CONFLICT_INFO_RETENTION: + resume_conflict_info_retention(rdt_data); + break; } } @@ -4687,6 +4715,18 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) if (last_flushpos < rdt_data->remote_lsn) return; + /* + * If conflict info retention was previously stopped due to a timeout, and + * the time required to advance the non-removable transaction ID has now + * decreased to within acceptable limits, resume the rentention. + */ + if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION; + process_rdt_phase_transition(rdt_data, false); + return; + } + /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and @@ -4753,6 +4793,109 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) ApplyLauncherWakeup(); reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase. + */ +static void +resume_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + if (rdt_data->wait_for_initial_xid) + { + TransactionId nonremovable_xid; + + SpinLockAcquire(&MyLogicalRepWorker->mutex); + nonremovable_xid = MyLogicalRepWorker->oldest_nonremovable_xid; + SpinLockRelease(&MyLogicalRepWorker->mutex); + + /* + * Proceed to the next phase if the catalog has been updated and the + * launcher has initialized slot.xmin and assigned it to + * oldest_nonremovable_xid. + * + * It might seem feasible to directly check the conflict detection + * slot.xmin instead of relying on the launcher to assign the worker's + * oldest_nonremovable_xid; however, that could lead to a race + * condition where slot.xmin is set to InvalidTransactionId + * immediately after the check. In such cases, oldest_nonremovable_xid + * would no longer be protected by a replication slot and could become + * unreliable if a wraparound occurs. + */ + if (TransactionIdIsValid(nonremovable_xid)) + { + rdt_data->wait_for_initial_xid = false; + + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); + } + + /* + * Return early since the catalog has been updated and we are waiting + * for oldest_nonremovable_xid to be initialized. + */ + return; + } + + /* + * Proceed to the next phase if retention has not been stopped yet. This + * occurs when transitioning from the RDT_STOP_CONFLICT_INFO_RETENTION + * phase but subretentionactive has not been updated due to the inability + * to start a new transaction (see stop_conflict_info_retention). + */ + if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); + + return; + } + + /* + * Do not update the catalog during an active transaction. The transaction + * may be started during change application, leading to a possible + * rollback of catalog updates if the application fails subsequently. + */ + if (IsTransactionState()) + return; + + StartTransactionCommand(); + + /* + * Updating pg_subscription might involve TOAST table access, so ensure we + * have a valid snapshot. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Set pg_subscription.subretentionactive to true */ + UpdateDeadTupleRetentionStatus(MySubscription->oid, true); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts", + MySubscription->name), + MySubscription->maxconflretention + ? errdetail("The retention duration for information used in conflict detection is now within the acceptable limit of %u ms.", + MySubscription->maxconflretention) + : errdetail("The retention duration for information used in conflict detection is now indefinite.")); + + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + + /* + * The next step is to wait for the launcher to initialize the + * oldest_nonremovable_xid. + */ + rdt_data->wait_for_initial_xid = true; } /* @@ -4782,9 +4925,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return * false. * - * Currently, the retention will not resume automatically unless user manually - * disables retain_dead_tuples and re-enables it after confirming that the - * replication slot has been dropped. + * The retention will resume automatically if the worker has confirmed that the + * retention duration is now within the max_conflict_retention_duration. */ static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) @@ -4815,14 +4957,54 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) rdt_data->table_sync_wait_time)) return false; - rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; + /* Stop retention if not yet */ + if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); + } + + reset_retention_data_fields(rdt_data); return true; } +/* + * Check whether retention should be resumed immediately if it has been + * previously stopped, but max_conflict_retention_duration is now set to 0. + */ +static bool +should_resume_retention_immediately(RetainDeadTuplesData *rdt_data, bool status_received) +{ + /* Return false if retention is already being resumed */ + if (rdt_data->phase == RDT_RESUME_CONFLICT_INFO_RETENTION) + return false; + + /* Return false if max_conflict_retention_duration is not 0 */ + if (MySubscription->maxconflretention) + return false; + + /* + * Do not resume when waiting for publisher status, as doing so may result + * in the message being processed after the data and phase have been + * reset, potentially causing it to be mistakenly identified as a new + * message. This could lead to the premature advancement of + * oldest_nonremovable_xid. + */ + if (rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS && + !status_received) + return false; + + /* + * Resume retention if we are in the process of stopping or have already + * stopped retention. + */ + return rdt_data->phase == RDT_STOP_CONFLICT_INFO_RETENTION || + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid); +} + /* * Adjust the interval for advancing non-removable transaction IDs. * -- 2.31.1