From bb2c2c4ecca51d2675f2e5c4d6ac3490995be2b0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 7 Apr 2021 19:01:03 -0700 Subject: [PATCH 1/2] WIP: Sketch for a fix for InvalidateObsoleteReplicationSlots(). --- src/backend/replication/slot.c | 226 ++++++++++++++++++++------------- 1 file changed, 139 insertions(+), 87 deletions(-) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 75a087c2f9d..5864b9b0139 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1153,6 +1153,140 @@ ReplicationSlotReserveWal(void) } } +/* + * Helper for InvalidateObsoleteReplicationSlots. Returns whether + * ReplicationSlotControlLock was released. + */ +static bool +InvalidateObsoleteReplicationSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) +{ + int last_signaled_pid = 0; + bool released_lock = false; + + while (true) + { + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + bool slot_conflicts; + NameData slotname; + int active_pid = 0; + + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + CHECK_FOR_INTERRUPTS(); + + slot_conflicts = false; + + if (!s->in_use) + continue; + + /* + * Check if the slot needs to be invalidated. If it needs to be + * invalidated, and is not currently acquired, acquire it and mark it + * as having been invalidated. We do all of this with the spinlock + * held - otherwise there would be race conditions (e.g. the slot's + * restart_lsn moving ahead, the slot concurrently being dropped after + * we release ReplicationSlotControlLock, ...). + */ + SpinLockAcquire(&s->mutex); + + restart_lsn = s->data.restart_lsn; + + /* check if slot needs to be invalidated */ + if (!XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < oldestLSN) + { + slot_conflicts = true; + slotname = s->data.name; + active_pid = s->active_pid; + + /* check if we can acquire it */ + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + s->data.invalidated_at = s->data.restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } + } + + SpinLockRelease(&s->mutex); + + if (!slot_conflicts) + { + Assert(active_pid == 0); + + break; + } + else if (active_pid != 0) + { + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* + * Signal to terminate the process that owns the slot. + * + * There is the race condition where other process may own + * the slot after the process using it was terminated and before + * this process owns it. To handle this case, we signal again + * if the PID of the owning process is changed than the last. + * + * XXX This logic assumes that the same PID is not reused + * very quickly. + */ + if (last_signaled_pid != active_pid) + { + ereport(LOG, + (errmsg("terminating process %d because replication slot \"%s\" is too far behind", + active_pid, NameStr(slotname)))); + + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; + } + + /* + * Wait until the slot is released. + * + * Will immediately return in the first iteration, so we can + * recheck the condition before sleeping. That addresses the + * otherwise possible race of the slot already having been + * released. + */ + ConditionVariableTimedSleep(&s->active_cv, 10, + WAIT_EVENT_REPLICATION_SLOT_DROP); + + /* re-acquire for next loop iteration */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + } + else + { + /* + * Don't want to hold ReplicationSlotControlLock across file + * system operations. Now that we (temporarily) acquired the slot, + * that's safe, as long as we afterwards restart the scan from + * scratch. + */ + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); + + ereport(LOG, + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + NameStr(slotname), + LSN_FORMAT_ARGS(restart_lsn)))); + + break; + } + + } + + Assert(!released_lock == LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + return released_lock; +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. @@ -1171,99 +1305,17 @@ restart: for (int i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn = InvalidXLogRecPtr; - NameData slotname; - int wspid; - int last_signaled_pid = 0; + + CHECK_FOR_INTERRUPTS(); if (!s->in_use) continue; - SpinLockAcquire(&s->mutex); - slotname = s->data.name; - restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); - - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) - continue; - LWLockRelease(ReplicationSlotControlLock); - CHECK_FOR_INTERRUPTS(); - - /* Get ready to sleep on the slot in case it is active */ - ConditionVariablePrepareToSleep(&s->active_cv); - - for (;;) + if (InvalidateObsoleteReplicationSlot(s, oldestLSN)) { - /* - * Try to mark this slot as used by this process. - * - * Note that ReplicationSlotAcquireInternal(SAB_Inquire) - * should not cancel the prepared condition variable - * if this slot is active in other process. Because in this case - * we have to wait on that CV for the process owning - * the slot to be terminated, later. - */ - wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire); - - /* - * Exit the loop if we successfully acquired the slot or - * the slot was dropped during waiting for the owning process - * to be terminated. For example, the latter case is likely to - * happen when the slot is temporary because it's automatically - * dropped by the termination of the owning process. - */ - if (wspid <= 0) - break; - - /* - * Signal to terminate the process that owns the slot. - * - * There is the race condition where other process may own - * the slot after the process using it was terminated and before - * this process owns it. To handle this case, we signal again - * if the PID of the owning process is changed than the last. - * - * XXX This logic assumes that the same PID is not reused - * very quickly. - */ - if (last_signaled_pid != wspid) - { - ereport(LOG, - (errmsg("terminating process %d because replication slot \"%s\" is too far behind", - wspid, NameStr(slotname)))); - (void) kill(wspid, SIGTERM); - last_signaled_pid = wspid; - } - - ConditionVariableTimedSleep(&s->active_cv, 10, - WAIT_EVENT_REPLICATION_SLOT_DROP); - } - ConditionVariableCancelSleep(); - - /* - * Do nothing here and start from scratch if the slot has - * already been dropped. - */ - if (wspid == -1) + /* if the lock was released, we need to restart from scratch */ goto restart; - - ereport(LOG, - (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", - NameStr(slotname), - LSN_FORMAT_ARGS(restart_lsn)))); - - SpinLockAcquire(&s->mutex); - s->data.invalidated_at = s->data.restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - SpinLockRelease(&s->mutex); - - /* Make sure the invalidated state persists across server restart */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - ReplicationSlotRelease(); - - /* if we did anything, start from scratch */ - goto restart; + } } LWLockRelease(ReplicationSlotControlLock); } -- 2.31.0.121.g9198c13e34