From 47f0fb869551667ccb2de3e9155a6adba9c11fa1 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Tue, 9 Sep 2025 11:03:36 +0800 Subject: [PATCH v72] Allow conflict-relevant data retention to resume This commit enables automatic recovery of conflict-relevant data retention for a subscription. If the retention duration for a subscription previously exceeded the max_retention_duration and caused retention to stop, the retention can resume once the duration falls within the acceptable limits. --- doc/src/sgml/ref/create_subscription.sgml | 9 +- src/backend/replication/logical/launcher.c | 49 +++- src/backend/replication/logical/worker.c | 262 ++++++++++++++++++--- src/include/replication/worker_internal.h | 6 + src/test/subscription/t/035_conflicts.pl | 27 +++ 5 files changed, 302 insertions(+), 51 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index fc314437311..ed82cf1809e 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -538,10 +538,11 @@ CREATE SUBSCRIPTION subscription_nameretain_dead_tuples is enabled, confirm that the retention duration has exceeded the max_retention_duration set within the corresponding - subscription. The retention will not be automatically resumed unless a - new subscription is created with retain_dead_tuples = - true, or the user manually re-enables - retain_dead_tuples. + subscription. The retention will automatically resume when at least one + apply worker confirms that the retention duration is within the + specified limit, or when a new subscription is created with + retain_dead_tuples = true. Alternatively, retention + can be manually resumed by re-enabling retain_dead_tuples. Note that overall retention will not stop if other subscriptions that diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index add2e2e066c..7bccc2ee796 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -101,7 +101,9 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); -static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); +static void compute_min_nonremovable_xid(LogicalRepWorker *worker, + bool can_update_xmin, + TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); static void update_conflict_slot_xmin(TransactionId new_xmin); static void init_conflict_slot_xmin(void); @@ -468,6 +470,7 @@ retry: worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->wait_for_initial_xid = false; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -1270,10 +1273,8 @@ ApplyLauncherMain(Datum main_arg) * required for conflict detection among all running apply * workers. */ - if (sub->retaindeadtuples && - sub->retentionactive && - can_update_xmin) - compute_min_nonremovable_xid(w, &xmin); + if (sub->retaindeadtuples && sub->retentionactive) + compute_min_nonremovable_xid(w, can_update_xmin, &xmin); /* worker is running already */ continue; @@ -1382,11 +1383,16 @@ ApplyLauncherMain(Datum main_arg) * Determine the minimum non-removable transaction ID across all apply workers * for subscriptions that have retain_dead_tuples enabled. Store the result * in *xmin. + * + * Additionally, if an apply worker has an invalid XID and is requesting to + * resume retention, assign the slot's xmin value to it. */ static void -compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) +compute_min_nonremovable_xid(LogicalRepWorker *worker, bool can_update_xmin, + TransactionId *xmin) { TransactionId nonremovable_xid; + bool wait_for_xid; Assert(worker != NULL); @@ -1398,16 +1404,43 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) SpinLockAcquire(&worker->relmutex); nonremovable_xid = worker->oldest_nonremovable_xid; + wait_for_xid = worker->wait_for_initial_xid; SpinLockRelease(&worker->relmutex); /* - * Return if the apply worker has stopped retention concurrently. + * Assign slot.xmin to the apply worker's oldest_nonremovable_xid if + * requested. This ensures the apply worker continues to maintain the + * oldest_nonremovable_xid (see resume_conflict_info_retention). + */ + if (wait_for_xid) + { + Assert(!TransactionIdIsValid(nonremovable_xid) && + TransactionIdIsValid(MyReplicationSlot->data.xmin)); + + nonremovable_xid = MyReplicationSlot->data.xmin; + + SpinLockAcquire(&worker->relmutex); + worker->oldest_nonremovable_xid = nonremovable_xid; + SpinLockRelease(&worker->relmutex); + + /* Notify the apply worker to start the next cycle of management */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + logicalrep_worker_wakeup_ptr(worker); + LWLockRelease(LogicalRepWorkerLock); + } + + /* + * Return if the apply worker has stopped retention concurrently and has + * not yet resumed. * * Although this function is invoked only when retentionactive is true, * the apply worker might stop retention after the launcher fetches the * retentionactive flag. */ - if (!TransactionIdIsValid(nonremovable_xid)) + else if (!TransactionIdIsValid(nonremovable_xid)) + return; + + if (!can_update_xmin) return; if (!TransactionIdIsValid(*xmin) || diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ee6ac22329f..c5e9b1bd4a1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -181,6 +181,19 @@ * pg_subscription.subretentionactive is updated to false within a new * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. * + * - RDT_RESUME_CONFLICT_INFO_RETENTION: + * This phase is required only when max_retention_duration is defined. We + * enter this phase if the retention was previously stopped, and the time + * required to advance the non-removable transaction ID in the + * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits + * (or if max_retention_duration is set to 0). During this phase, + * pg_subscription.subretentionactive is updated to true within a new + * transaction, and we wait for the launcher to initialize the + * oldest_nonremovable_xid before proceeding to RDT_GET_CANDIDATE_XID phase. + * Note that the state could transition to RDT_RESUME_CONFLICT_INFO_RETENTION + * at any phase if the retention has been stopped, but max_retention_duration + * is now set to 0. + * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> @@ -381,7 +394,8 @@ typedef enum RDT_REQUEST_PUBLISHER_STATUS, RDT_WAIT_FOR_PUBLISHER_STATUS, RDT_WAIT_FOR_LOCAL_FLUSH, - RDT_STOP_CONFLICT_INFO_RETENTION + RDT_STOP_CONFLICT_INFO_RETENTION, + RDT_RESUME_CONFLICT_INFO_RETENTION, } RetainDeadTuplesPhase; /* @@ -568,6 +582,10 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static bool should_resume_retention_immediately(RetainDeadTuplesData *rdt_data, + bool status_received); +static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static bool update_retention_status(bool active); static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -4345,6 +4363,13 @@ maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, if (!can_advance_nonremovable_xid(rdt_data)) return; + /* + * Resume retention immediately if required. (See + * should_resume_retention_immediately() for details). + */ + if (should_resume_retention_immediately(rdt_data, status_received)) + rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION; + process_rdt_phase_transition(rdt_data, status_received); } @@ -4367,10 +4392,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; - /* No need to advance if we have already stopped retaining */ - if (!MySubscription->retentionactive) - return false; - return true; } @@ -4399,6 +4420,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, case RDT_STOP_CONFLICT_INFO_RETENTION: stop_conflict_info_retention(rdt_data); break; + case RDT_RESUME_CONFLICT_INFO_RETENTION: + resume_conflict_info_retention(rdt_data); + break; } } @@ -4672,6 +4696,22 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) if (last_flushpos < rdt_data->remote_lsn) return; + /* + * Reaching this point implies should_stop_conflict_info_retention() + * returned false earlier, indicating that the most recent duration for + * advancing the non-removable transaction ID is within the + * max_retention_duration. + * + * Therefore, if conflict info retention was previously halted due to a + * timeout, proceed to resume retention now. + */ + if (!MySubscription->retentionactive) + { + rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION; + process_rdt_phase_transition(rdt_data, false); + return; + } + /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and @@ -4701,10 +4741,6 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * If retention should be stopped, transition to the * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return * false. - * - * Note: Retention won't be resumed automatically. The user must manually - * disable retain_dead_tuples and re-enable it after confirming that the - * replication slot maintained by the launcher has been dropped. */ static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) @@ -4735,10 +4771,20 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) rdt_data->table_sync_wait_time)) return false; - rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; + /* Stop retention if not yet */ + if (MySubscription->retentionactive) + { + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; + process_rdt_phase_transition(rdt_data, false); + return true; + } - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); + /* + * If retention has been stopped, reset to the initial phase to retry all + * phases. This is required to recalculate the current wait time and + * resume retention if the time falls within max_retention_duration. + */ + reset_retention_data_fields(rdt_data); return true; } @@ -4748,6 +4794,146 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) */ static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + /* + * Return if unable to update subretentionactive (see + * update_retention_status). + */ + if (!update_retention_status(false)) + return; + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + MySubscription->name), + errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.", + MySubscription->maxretention)); + + reset_retention_data_fields(rdt_data); +} + +/* + * Check whether retention should be resumed immediately if it has been + * previously stopped, but max_retention_duration is now set to 0. + */ +static bool +should_resume_retention_immediately(RetainDeadTuplesData *rdt_data, bool status_received) +{ + /* Return false if retention is already being resumed */ + if (rdt_data->phase == RDT_RESUME_CONFLICT_INFO_RETENTION) + return false; + + /* Return false if max_retention_duration is not 0 */ + if (MySubscription->maxretention) + return false; + + /* + * Do not resume when waiting for publisher status, as doing so may result + * in the message being processed after the data and phase have been + * reset, potentially causing it to be mistakenly identified as a new + * message. This could lead to the premature advancement of + * oldest_nonremovable_xid. + */ + if (rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS && + !status_received) + return false; + + /* + * Resume retention if we are in the process of stopping or have already + * stopped retention. + */ + return rdt_data->phase == RDT_STOP_CONFLICT_INFO_RETENTION || + !MySubscription->retentionactive; +} + +/* + * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase. + */ +static void +resume_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + TransactionId nonremovable_xid; + + /* Update the pg_subscription.retentionactive if not yet */ + if (!MySubscription->retentionactive) + { + /* + * Return if unable to update subretentionactive (see + * update_retention_status). + */ + if (!update_retention_status(true)) + return; + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->wait_for_initial_xid = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts", + MySubscription->name), + MySubscription->maxretention + ? errdetail("Retention of information used for conflict detection is now within the max_retention_duration of %u ms.", + MySubscription->maxretention) + : errdetail("Retention of information used for conflict detection is now indefinite.")); + } + + Assert(MyLogicalRepWorker->wait_for_initial_xid); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + nonremovable_xid = MyLogicalRepWorker->oldest_nonremovable_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* + * Return if the launcher has not initialized oldest_nonremovable_xid. + * + * It might seem feasible to directly check the conflict detection + * slot.xmin instead of relying on the launcher to assign the worker's + * oldest_nonremovable_xid; however, that could lead to a race condition + * where slot.xmin is set to InvalidTransactionId immediately after the + * check. In such cases, oldest_nonremovable_xid would no longer be + * protected by a replication slot and could become unreliable if a + * wraparound occurs. + * + * XXX An alternative could be directly restarting the worker to ensure + * the launcher initializes oldest_nonremovable_xid prior to starting. + * However, restarting may not be preferable if initialization can be + * managed on-the-fly. + */ + if (!TransactionIdIsValid(nonremovable_xid)) + return; + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->wait_for_initial_xid = false; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* + * Proceed to the next phase if either the launcher has initialized + * slot.xmin and assigned it to oldest_nonremovable_xid, or retention has + * not been stopped yet. The latter situation arises when transitioning + * from the RDT_STOP_CONFLICT_INFO_RETENTION phase but subretentionactive + * has not been updated due to the inability to start a new transaction + * (see stop_conflict_info_retention). + */ + Assert(MySubscription->retentionactive); + + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Update pg_subscription.subretentionactive to the given value within a new + * transaction. + * + * Returns true upon successful update; however, if currently within an active + * transaction, skip the update and return false. + */ +static bool +update_retention_status(bool active) { /* * Do not update the catalog during an active transaction. The transaction @@ -4755,7 +4941,7 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) * rollback of catalog updates if the application fails subsequently. */ if (IsTransactionState()) - return; + return false; StartTransactionCommand(); @@ -4765,26 +4951,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) */ PushActiveSnapshot(GetTransactionSnapshot()); - /* Set pg_subscription.subretentionactive to false */ - UpdateDeadTupleRetentionStatus(MySubscription->oid, false); + /* Update pg_subscription.subretentionactive */ + UpdateDeadTupleRetentionStatus(MySubscription->oid, active); PopActiveSnapshot(); CommitTransactionCommand(); - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", - MySubscription->name), - errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.", - MySubscription->maxretention)); - - /* Notify launcher to update the conflict slot */ + /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); - reset_retention_data_fields(rdt_data); + MySubscription->retentionactive = active; + + return true; } /* @@ -4809,19 +4987,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) /* * Adjust the interval for advancing non-removable transaction IDs. * - * If there is no activity on the node, we progressively double the interval - * used to advance non-removable transaction ID. This helps conserve CPU - * and network resources when there's little benefit to frequent updates. + * If there is no activity on the node or retention has been stopped, we + * progressively double the interval used to advance non-removable transaction + * ID. This helps conserve CPU and network resources when there's little benefit + * to frequent updates. * * The interval is capped by the lowest of the following: * - wal_receiver_status_interval (if set), * - a default maximum of 3 minutes, - * - max_retention_duration. + * - max_retention_duration (if retention is active). * - * This ensures the interval never exceeds the retention boundary, even if - * other limits are higher. Once activity resumes on the node, the interval - * is reset to lesser of 100ms and max_retention_duration, allowing timely - * advancement of non-removable transaction ID. + * This ensures the interval never exceeds the retention boundary, even if other + * limits are higher. Once activity resumes on the node and the retention is + * active, the interval is reset to lesser of 100ms and max_retention_duration, + * allowing timely advancement of non-removable transaction ID. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises. @@ -4829,7 +5008,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) { - if (!new_xid_found && rdt_data->xid_advance_interval) + if (rdt_data->xid_advance_interval && + (!new_xid_found || !MySubscription->retentionactive)) { int max_interval = wal_receiver_status_interval ? wal_receiver_status_interval * 1000 @@ -4851,9 +5031,13 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; } - /* Ensure the wait time remains within the maximum limit */ - rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, - MySubscription->maxretention); + /* + * Ensure the wait time remains within the maximum limit when retention is + * active. + */ + if (MySubscription->retentionactive) + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxretention); } /* diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index de003802612..d776949a04e 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -100,6 +100,12 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* + * Indicates whether the apply worker is resuming retention and is waiting + * for the launcher to initialize oldest_nonremovable_xid. + */ + bool wait_for_initial_xid; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index db0d5b464e8..947ea131c4d 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -628,6 +628,33 @@ $node_B->safe_psql('postgres', $node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); $node_B->reload; +############################################################################### +# Check that dead tuple retention resumes when the max_retention_duration is set +# 0. +############################################################################### + +$log_offset = -s $node_A->logfile; + +# Set max_retention_duration to 0 +$node_A->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 0);"); + +# Confirm that the retention resumes +$node_A->wait_for_log( + qr/logical replication worker for subscription "tap_sub_a_b" will resume retaining the information for detecting conflicts +.*DETAIL:.* Retention of information used for conflict detection is now indefinite.*/, + $log_offset); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); + +$result = $node_A->safe_psql('postgres', + "SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';"); +is($result, qq(t), 'retention is active'); + ############################################################################### # Check that the replication slot pg_conflict_detection is dropped after # removing all the subscriptions. -- 2.51.0.windows.1