From 49a0902648e609771502d17c6dde178331128a2f Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 13 Aug 2025 11:33:24 +0800 Subject: [PATCH v62 2/2] Resume retaining the information for conflict detection The patch allows the launcher to re-initialized invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration. --- doc/src/sgml/ref/create_subscription.sgml | 11 +- src/backend/replication/logical/launcher.c | 104 +++++++++++++---- src/backend/replication/logical/worker.c | 129 ++++++++++++++++++--- src/include/replication/worker_internal.h | 7 ++ 4 files changed, 208 insertions(+), 43 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 708284dafdf..644d4fe5aaa 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -533,26 +533,31 @@ CREATE SUBSCRIPTION subscription_name + The information useful for conflict detection is no longer retained if all apply workers associated with the subscriptions, where retain_dead_tuples is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration set within the - corresponding subscription. To re-enable retention, you can disable - retain_dead_tuples for all subscriptions and - re-enable it after confirming this replication slot has been dropped. + corresponding subscription. The retention will be automatically resumed + once at least one apply worker confirms that the retention duration is + within the specified limit, or if a new apply worker with + retain_dead_tuples enabled is started. + Note that overall retention will not stop if other subscriptions specify a greater value and have not exceeded it, or if they set this option to 0. + This option is effective only when retain_conflict_info is enabled and the apply worker associated with the subscription is active. + Note that setting a non-zero value for this option could lead to diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index f1dfb51ccf8..ae70e5441cb 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -102,9 +102,12 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); -static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); +static void compute_min_nonremovable_xid(LogicalRepWorker *worker, + bool can_advance_xmin, + TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); static void update_conflict_slot_xmin(TransactionId new_xmin); +static void init_conflict_slot_xmin(void); /* @@ -464,6 +467,8 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->stop_conflict_info_retention = (retain_dead_tuples && + !TransactionIdIsValid(MyReplicationSlot->data.xmin)); worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; @@ -1258,21 +1263,30 @@ ApplyLauncherMain(Datum main_arg) * required for conflict detection among all running apply * workers that enables retain_dead_tuples. */ - if (sub->retaindeadtuples && can_advance_xmin) - compute_min_nonremovable_xid(w, &xmin); + if (sub->retaindeadtuples) + compute_min_nonremovable_xid(w, can_advance_xmin, &xmin); /* worker is running already */ continue; } - /* - * Can't advance xmin of the slot unless all the workers - * corresponding to subscriptions with retain_dead_tuples are - * running, disabling the further computation of the minimum - * nonremovable xid. - */ if (sub->retaindeadtuples) + { + /* + * Initialize slot.xmin for a new apply worker that requests the + * retention of information useful for conflict detection. + */ + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)) + init_conflict_slot_xmin(); + + /* + * Can't advance xmin of the slot unless all the workers + * corresponding to subscriptions with retain_dead_tuples are + * running, disabling the further computation of the minimum + * nonremovable xid. + */ can_advance_xmin = false; + } /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1369,9 +1383,11 @@ ApplyLauncherMain(Datum main_arg) * in *xmin. */ static void -compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) +compute_min_nonremovable_xid(LogicalRepWorker *worker, bool can_advance_xmin, + TransactionId *xmin) { TransactionId nonremovable_xid; + bool stop_retention; Assert(worker != NULL); @@ -1383,13 +1399,43 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) SpinLockAcquire(&worker->mutex); nonremovable_xid = worker->oldest_nonremovable_xid; + stop_retention = worker->stop_conflict_info_retention; SpinLockRelease(&worker->mutex); /* * Skip collecting oldest_nonremovable_xid for workers that have stopped * conflict retention. */ + if (stop_retention) + return; + + /* + * Initialize slot.xmin as a apply worker resumes retention of information + * useful for conflict detection. + */ + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)) + init_conflict_slot_xmin(); + + /* + * Assign slot.xmin to the apply worker's oldest_nonremovable_xid if the + * latter is invalid. This ensures the apply worker continues to maintain + * the oldest_nonremovable_xid (see get_candidate_xid). + */ if (!TransactionIdIsValid(nonremovable_xid)) + { + nonremovable_xid = MyReplicationSlot->data.xmin; + + SpinLockAcquire(&worker->mutex); + worker->oldest_nonremovable_xid = nonremovable_xid; + SpinLockRelease(&worker->mutex); + + /* Notify the apply worker to start the next cycle of management */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + logicalrep_worker_wakeup_ptr(worker); + LWLockRelease(LogicalRepWorkerLock); + } + + if (!can_advance_xmin) return; if (!TransactionIdIsValid(*xmin) || @@ -1451,23 +1497,15 @@ update_conflict_slot_xmin(TransactionId new_xmin) } /* - * Create and acquire the replication slot used to retain information for - * conflict detection, if not yet. + * Initialize the xmin for the conflict detection slot. */ -void -CreateConflictDetectionSlot(void) +static void +init_conflict_slot_xmin(void) { TransactionId xmin_horizon; - /* Exit early, if the replication slot is already created and acquired */ - if (MyReplicationSlot) - return; - - ereport(LOG, - errmsg("creating replication conflict detection slot")); - - ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + Assert(MyReplicationSlot && + !TransactionIdIsValid(MyReplicationSlot->data.xmin)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -1487,6 +1525,26 @@ CreateConflictDetectionSlot(void) ReplicationSlotSave(); } +/* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + init_conflict_slot_xmin(); +} + /* * Is current process the logical replication launcher? */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b4d2ab41a2d..8316db1caf7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -418,6 +418,10 @@ typedef struct RetainDeadTuplesData long table_sync_wait_time; /* time spent waiting for table sync * to finish */ + bool wait_for_initial_xid; /* wait for the launcher to initialize + * the apply worker's + * oldest_nonremovable_xid */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -560,6 +564,7 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -4354,6 +4359,21 @@ maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, if (!can_advance_nonremovable_xid(rdt_data)) return; + /* + * If retention has been stopped but max_conflict_retention_duration is now + * set to 0, resume retention immediately. + * + * Do not resume when waiting for publisher status, as doing so may result + * in the message being processed after the data and phase have been reset, + * potentially causing it to be mistakenly identified as a new message. This + * could lead to the premature advancement of oldest_nonremovable_xid. + */ + if (MyLogicalRepWorker->stop_conflict_info_retention && + !MySubscription->maxconflretention && + !(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS && + !status_received)) + resume_conflict_info_retention(rdt_data); + process_rdt_phase_transition(rdt_data, status_received); } @@ -4376,10 +4396,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; - /* No need to advance if we have already stopped retaining */ - if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) - return false; - return true; } @@ -4417,6 +4433,33 @@ get_candidate_xid(RetainDeadTuplesData *rdt_data) TransactionId oldest_running_xid; TimestampTz now; + /* + * No need to advance if the apply worker has resumed retention but the + * launcher has not yet initialized slot.xmin and assigned it to + * oldest_nonremovable_xid. + * + * It might seem feasible to directly check the conflict detection + * slot.xmin instead of relying on the launcher to assign the worker's + * oldest_nonremovable_xid; however, that could lead to a race condition + * where slot.xmin is set to InvalidTransactionId immediately after the + * check. In such cases, oldest_nonremovable_xid would no longer be + * protected by a replication slot and could become unreliable if a + * wraparound occurs. + */ + if (rdt_data->wait_for_initial_xid) + { + TransactionId nonremovable_xid; + + SpinLockAcquire(&MyLogicalRepWorker->mutex); + nonremovable_xid = MyLogicalRepWorker->oldest_nonremovable_xid; + SpinLockRelease(&MyLogicalRepWorker->mutex); + + if (!TransactionIdIsValid(nonremovable_xid)) + return; + + rdt_data->wait_for_initial_xid = false; + } + /* * Use last_recv_time when applying changes in the loop to avoid * unnecessary system time retrieval. If last_recv_time is not available, @@ -4661,6 +4704,17 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) if (last_flushpos < rdt_data->remote_lsn) return; + /* + * If conflict info retention was previously stopped due to a timeout, and + * the time required to advance the non-removable transaction ID has now + * decreased to within acceptable limits, resume the rentention. + */ + if (MyLogicalRepWorker->stop_conflict_info_retention) + { + resume_conflict_info_retention(rdt_data); + return; + } + /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and @@ -4710,9 +4764,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) * to InvalidTransactionId, notify the launcher to set the slot.xmin to * InvalidTransactionId as well, and return true. Return false otherwise. * - * Currently, the retention will not resume automatically unless user manually - * disables retain_dead_tuples and re-enables it after confirming that the - * replication slot has been dropped. + * The retention will resume automatically if the worker has confirmed that the + * retention duration is now within the max_conflict_retention_duration. */ static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) @@ -4743,24 +4796,66 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) rdt_data->table_sync_wait_time)) return false; + /* + * Log a message and reset relevant data when the worker is about to stop + * retaining conflict information. + */ + if (!MyLogicalRepWorker->stop_conflict_info_retention) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + MySubscription->name), + errdetail("The retention duration for information used in conflict detection has exceeded the maximum limit of %u ms.", + MySubscription->maxconflretention), + errhint("You might need to increase \"%s\".", + "max_conflict_retention_duration")); + + SpinLockAcquire(&MyLogicalRepWorker->mutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + MyLogicalRepWorker->stop_conflict_info_retention = true; + SpinLockRelease(&MyLogicalRepWorker->mutex); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + } + + reset_retention_data_fields(rdt_data); + + return true; +} + +/* + * Resume the retention if conflict info retention was previously stopped due to + * a timeout, and the time required to advance the non-removable transaction ID + * has now decreased to within acceptable limits. + */ +static void +resume_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + Assert(MyLogicalRepWorker->stop_conflict_info_retention); + ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts", MySubscription->name), - errdetail("The retention duration for information used in conflict detection has exceeded the maximum limit of %u ms.", - MySubscription->maxconflretention), - errhint("You might need to increase \"%s\".", - "max_conflict_retention_duration")); + MySubscription->maxconflretention + ? errdetail("The retention duration for information used in conflict detection is now within the acceptable limit of %u ms.", + MySubscription->maxconflretention) + : errdetail("The retention duration for information used in conflict detection is now indefinite.")); + + /* + * The next step is to wait for the launcher to initialize the + * oldest_nonremovable_xid. + */ + rdt_data->wait_for_initial_xid = true; SpinLockAcquire(&MyLogicalRepWorker->mutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + MyLogicalRepWorker->stop_conflict_info_retention = false; SpinLockRelease(&MyLogicalRepWorker->mutex); - /* Notify launcher to update the conflict slot */ - ApplyLauncherWakeup(); - reset_retention_data_fields(rdt_data); - return true; + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); } /* diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9c0c2b8050c..32a1cbb6528 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -105,6 +105,13 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* + * Indicates whether the apply worker has stopped retaining information + * useful for conflict detection. This is used only when + * retain_dead_tuples is enabled. + */ + bool stop_conflict_info_retention; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; -- 2.50.1.windows.1