diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 7c9d7831c9f..c8c0ab66b66 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -2188,6 +2188,164 @@ GetOldestQueuedNotifyXid(void) return oldestXid; } +/* + * Check if there are any active listeners in the notification queue. + * + * Returns true if at least one backend is registered as a listener, + * false otherwise. + */ +bool +asyncQueueHasListeners(void) +{ + bool hasListeners; + + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + hasListeners = (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER); + LWLockRelease(NotifyQueueLock); + + return hasListeners; +} + +/* + * Wake all listening backends to process notifications. + * + * This is called by VACUUM when it needs to advance datfrozenxid but the + * notification queue has old XIDs. We signal all listeners across all + * databases that aren't already caught up, so they can process their + * pending notifications and advance the queue tail. + */ +void +asyncQueueWakeAllListeners(void) +{ + int32 *pids; + ProcNumber *procnos; + int count; + + /* + * Identify backends that we need to signal. We don't want to send + * signals while holding the NotifyQueueLock, so this loop just builds a + * list of target PIDs. + * + * XXX in principle these pallocs could fail, which would be bad. Maybe + * preallocate the arrays? They're not that large, though. + */ + pids = (int32 *) palloc(MaxBackends * sizeof(int32)); + procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber)); + count = 0; + + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) + { + int32 pid = QUEUE_BACKEND_PID(i); + QueuePosition pos; + + Assert(pid != InvalidPid); + pos = QUEUE_BACKEND_POS(i); + + /* + * Signal listeners unless they're already caught up. + */ + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; + + /* OK, need to signal this one */ + pids[count] = pid; + procnos[count] = i; + count++; + } + LWLockRelease(NotifyQueueLock); + + /* Now send signals */ + for (int i = 0; i < count; i++) + { + int32 pid = pids[i]; + + /* + * If we are signaling our own process, no need to involve the kernel; + * just set the flag directly. + */ + if (pid == MyProcPid) + { + notifyInterruptPending = true; + continue; + } + + /* + * Note: assuming things aren't broken, a signal failure here could + * only occur if the target backend exited since we released + * NotifyQueueLock; which is unlikely but certainly possible. So we + * just log a low-level debug message if it happens. + */ + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + } + + pfree(pids); + pfree(procnos); +} + +/* + * Discard all notifications in the queue when there are no listeners. + * + * This is called by VACUUM when the notification queue has old XIDs but no + * active listeners exist. We advance the tail to the head, effectively + * discarding all queued notifications, and truncate the SLRU segments. + */ +void +asyncQueueAdvanceTailNoListeners(void) +{ + QueuePosition min; + int64 oldtailpage; + int64 newtailpage; + int64 boundary; + + /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */ + LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* + * Verify that there are still no listeners. + */ + if (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER) + { + LWLockRelease(NotifyQueueLock); + LWLockRelease(NotifyQueueTailLock); + return; + } + + /* + * Advance the logical tail to the head, discarding all notifications. + */ + min = QUEUE_HEAD; + QUEUE_TAIL = min; + oldtailpage = QUEUE_STOP_PAGE; + LWLockRelease(NotifyQueueLock); + + /* + * We can truncate something if the global tail advanced across an SLRU + * segment boundary. + * + * XXX it might be better to truncate only once every several segments, to + * reduce the number of directory scans. + */ + newtailpage = QUEUE_POS_PAGE(min); + boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); + if (asyncQueuePagePrecedes(oldtailpage, boundary)) + { + /* + * SimpleLruTruncate() will ask for SLRU bank locks but will also + * release the lock again. + */ + SimpleLruTruncate(NotifyCtl, newtailpage); + + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + QUEUE_STOP_PAGE = newtailpage; + LWLockRelease(NotifyQueueLock); + } + + LWLockRelease(NotifyQueueTailLock); +} + /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 6c601ce81aa..f93f82e9040 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -1739,11 +1739,24 @@ vac_update_datfrozenxid(void) * Also consider the oldest XID in the notification queue, since backends * will need to call TransactionIdDidCommit() on those XIDs when * processing the notifications. + * + * If the queue is blocking datfrozenxid advancement, attempt to clean it + * up. If listeners exist, wake them to process their pending + * notifications. If no listeners exist, discard all notifications. + * Either way, we back off datfrozenxid for this VACUUM cycle; the next + * VACUUM will benefit from the cleanup we've triggered. */ oldestNotifyXid = GetOldestQueuedNotifyXid(); if (TransactionIdIsValid(oldestNotifyXid) && TransactionIdPrecedes(oldestNotifyXid, newFrozenXid)) + { + if (asyncQueueHasListeners()) + asyncQueueWakeAllListeners(); + else + asyncQueueAdvanceTailNoListeners(); + newFrozenXid = oldestNotifyXid; + } Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index ac323ada492..f9dccf342b5 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -29,6 +29,11 @@ extern void NotifyMyFrontEnd(const char *channel, /* get oldest XID in the notification queue for vacuum freeze */ extern TransactionId GetOldestQueuedNotifyXid(void); +/* functions for vacuum to manage notification queue */ +extern bool asyncQueueHasListeners(void); +extern void asyncQueueWakeAllListeners(void); +extern void asyncQueueAdvanceTailNoListeners(void); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel);