From ea45e57d44cbe15d9166e157e7601e83184ee477 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 13 Feb 2025 14:08:11 +0800 Subject: [PATCH v28 5/7] Re-create the replication slot if the conflict retention duration reduced The patch allows the launcher to drop and re-create the invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 5 +- src/backend/replication/logical/launcher.c | 37 +++--- src/backend/replication/logical/worker.c | 140 ++++++++------------- 3 files changed, 77 insertions(+), 105 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 6decdc89eef..04067b0d437 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5239,7 +5239,10 @@ ANY num_sync ( name), + errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.", + LSN_FORMAT_ARGS(data->remote_lsn), + max_conflict_retention_duration)); + elog(DEBUG2, "confirmed remote flush up to %X/%X: new oldest_nonremovable_xid %u", LSN_FORMAT_ARGS(data->remote_lsn), XidFromFullTransactionId(data->candidate_xid)); + + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); } - else + else if (status_changed) { /* * Reaching here means the time spent applying changes up to the @@ -4405,9 +4419,9 @@ wait_for_local_flush(RetainConflictInfoData *data) * (max_conflict_retention_duration). So, we will stop retaining * conflict information. * - * Currently, the retention will not resume automatically unless user - * manually disable retain_conflict_info and re-enable it after - * confirming that the replication slot has been dropped. + * The retention will resume automatically if the worker has confirmed + * that the retention duration is now within the + * max_conflict_retention_duration. */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->oldest_nonremovable_xid = InvalidFullTransactionId; @@ -4420,6 +4434,13 @@ wait_for_local_flush(RetainConflictInfoData *data) errdetail("The time spent applying changes up to LSN %X/%X has exceeded the maximum limit of %u ms.", LSN_FORMAT_ARGS(data->remote_lsn), max_conflict_retention_duration)); + + /* + * Notify launcher to either update the xmin of the conflict slot or + * invalidate the slot if no other workers are retaining conflict + * information. + */ + ApplyLauncherWakeup(); } /* Notify launcher to update the xmin of the conflict slot */ @@ -4484,51 +4505,6 @@ adjust_xid_advance_interval(RetainConflictInfoData *data, bool new_xid_found) } } -/* - * Update the conflict retention status for the current apply worker. It checks - * whether the worker should stop retaining conflict information due to - * invalidation of the replication slot ("pg_conflict_detection"). - * - * Currently, the replication slot is invalidated only if the duration for - * retaining conflict information exceeds the allowed maximum. - */ -static void -update_conflict_retention_status(void) -{ - ReplicationSlotInvalidationCause cause = RS_INVAL_NONE; - ReplicationSlot *slot; - - /* Exit early if retaining conflict information is not required */ - if (!MySubscription->retainconflictinfo) - return; - - /* - * Only the leader apply worker manages conflict retention (see - * maybe_advance_nonremovable_xid() for details). - */ - if (!am_leader_apply_worker()) - return; - - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - - slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, false); - - if (slot) - { - SpinLockAcquire(&slot->mutex); - cause = slot->data.invalidated; - SpinLockRelease(&slot->mutex); - - Assert(cause == RS_INVAL_NONE || cause == RS_INVAL_CONFLICT_RETENTION_DURATION); - } - - LWLockRelease(ReplicationSlotControlLock); - - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->stop_conflict_retention = cause != RS_INVAL_NONE; - SpinLockRelease(&MyLogicalRepWorker->relmutex); -} - /* * Exit routine for apply workers due to subscription parameter changes. */ @@ -4700,16 +4676,6 @@ maybe_reread_subscription(void) CommitTransactionCommand(); MySubscriptionValid = true; - - /* - * Update worker status to avoid unnecessary conflict retention if the - * replication slot ("pg_conflict_detection") was invalidated prior to - * enabling the retain_conflict_info option. This is also necessary to - * restart conflict retention if the user has disabled and subsequently - * re-enabled the retain_conflict_info option, resulting in the - * replication slot being recreated. - */ - update_conflict_retention_status(); } /* @@ -5348,8 +5314,6 @@ InitializeLogRepWorker(void) MySubscription->name))); CommitTransactionCommand(); - - update_conflict_retention_status(); } /* -- 2.30.0.windows.2