From 2b5c82677f8e40b9071180c78b708260fa084678 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 11 Jun 2025 15:48:09 +0800 Subject: [PATCH v41 3/6] Re-create the replication slot if the conflict retention duration reduced The patch allows the launcher to drop and re-create the invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 5 +- src/backend/replication/logical/launcher.c | 10 ++++ src/backend/replication/logical/worker.c | 64 +++++++++++++++------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 4c870b5e806..504f7a01ef0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5423,7 +5423,10 @@ ANY num_sync ( name), + errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.", + LSN_FORMAT_ARGS(rci_data->remote_lsn), + max_conflict_retention_duration)); + + apply_worker_exit(); + } + /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and @@ -4429,6 +4446,7 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->oldest_nonremovable_xid = rci_data->candidate_xid; + MyLogicalRepWorker->stop_conflict_info_retention = false; SpinLockRelease(&MyLogicalRepWorker->relmutex); elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", @@ -4470,9 +4488,8 @@ reset_conflict_info_fields(RetainConflictInfoData *rci_data) * LogicalRepWorker->stop_conflict_info_retention to true, notify the launcher to * invalidate the slot, and return true. Return false otherwise. * - * Currently, the retention will not resume automatically unless user manually - * disables retain_conflict_info and re-enables it after confirming that the - * replication slot has been dropped. + * The retention will resume automatically if the worker has confirmed that the + * retention duration is now within the max_conflict_retention_duration. */ static bool should_stop_conflict_info_retention(RetainConflictInfoData *rci_data) @@ -4501,19 +4518,26 @@ should_stop_conflict_info_retention(RetainConflictInfoData *rci_data) max_conflict_retention_duration)) return false; - ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", - MySubscription->name), - errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", - max_conflict_retention_duration)); - - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; - MyLogicalRepWorker->stop_conflict_info_retention = true; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - /* Notify launcher to invalidate the conflict slot */ - ApplyLauncherWakeup(); + /* + * Log a message and reset relevant data when the worker is about to stop + * retaining conflict information. + */ + if (!MyLogicalRepWorker->stop_conflict_info_retention) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", + MySubscription->name), + errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", + max_conflict_retention_duration)); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + MyLogicalRepWorker->stop_conflict_info_retention = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* Notify launcher to invalidate the conflict slot */ + ApplyLauncherWakeup(); + } reset_conflict_info_fields(rci_data); -- 2.30.0.windows.2