From 7b5052f4b6add84e78b39ef59ac7cb26e315c238 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 31 Oct 2025 18:30:26 +0200 Subject: [PATCH 2/2] Hold SLRU bank lock across TransactionIdDidCommit in NOTIFY processing Per Tom Lane's idea --- src/backend/commands/async.c | 101 ++++++++++++++++------------------- 1 file changed, 47 insertions(+), 54 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index ba06234dc8e..b5dee75af48 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -448,7 +448,6 @@ static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer, Snapshot snapshot); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); @@ -1854,13 +1853,6 @@ asyncQueueReadAllNotifications(void) QueuePosition head; Snapshot snapshot; - /* page_buffer must be adequately aligned, so use a union */ - union - { - char buf[QUEUE_PAGESIZE]; - AsyncQueueEntry align; - } page_buffer; - /* Fetch current state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ @@ -1932,37 +1924,6 @@ asyncQueueReadAllNotifications(void) do { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; - - /* - * We copy the data from SLRU into a local buffer, so as to avoid - * holding the SLRU lock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part - * of the page we will actually inspect. - */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); - /* * Process messages up to the stop position, end of page, or an * uncommitted message. @@ -1978,9 +1939,7 @@ asyncQueueReadAllNotifications(void) * rewrite pages under us. Especially we don't want to hold a lock * while sending the notifications to the frontend. */ - reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); + reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot); } while (!reachedStop); } PG_FINALLY(); @@ -2000,12 +1959,9 @@ asyncQueueReadAllNotifications(void) * Fetch notifications from the shared queue, beginning at position current, * and deliver relevant ones to my frontend. * - * The current page must have been fetched into page_buffer from shared - * memory. (We could access the page right in shared memory, but that - * would imply holding the SLRU bank lock throughout this routine.) - * - * We stop if we reach the "stop" position, or reach a notification from an - * uncommitted transaction, or reach the end of the page. + * This function processes the notifications on one page, the page that + * 'current' points to. We stop if we reach the "stop" position, or reach a + * notification from an uncommitted transaction, or reach the end of the page. * * The function returns true once we have reached the stop position or an * uncommitted notification, and false if we have finished with the page. @@ -2015,16 +1971,35 @@ asyncQueueReadAllNotifications(void) static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer, Snapshot snapshot) { + int64 curpage = QUEUE_POS_PAGE(*current); + int slotno; + char *page_buffer; bool reachedStop = false; bool reachedEndOfPage; - AsyncQueueEntry *qe; + + /* + * We copy the entries into a local buffer, so as to avoid holding the + * SLRU lock while we transmit them to our frontend. + * + * The local buffer must be adequately aligned, so use a union. + */ + union + { + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; + } scratch; + char *scratch_end = scratch.buf; + + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + page_buffer = NotifyCtl->shared->page_buffer[slotno]; do { QueuePosition thisentry = *current; + AsyncQueueEntry *qe; if (QUEUE_POS_EQUAL(thisentry, stop)) break; @@ -2073,10 +2048,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, if (IsListeningOn(channel)) { - /* payload follows channel name */ - char *payload = qe->data + strlen(channel) + 1; - - NotifyMyFrontEnd(channel, payload, qe->srcPid); + memcpy(scratch_end, qe, qe->length); + scratch_end += qe->length; } } else @@ -2091,6 +2064,26 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* Loop back if we're not at end of page */ } while (!reachedEndOfPage); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + + /* + * Now that we have let go of the SLRU bank lock, send the notifications + * to our backend + */ + Assert(scratch_end - scratch.buf <= BLCKSZ); + for (char *p = scratch.buf; p < scratch_end;) + { + AsyncQueueEntry *qe = (AsyncQueueEntry *) p; + char *channel = qe->data; + /* payload follows channel name */ + char *payload = qe->data + strlen(channel) + 1; + + NotifyMyFrontEnd(channel, payload, qe->srcPid); + + p += qe->length; + } + if (QUEUE_POS_EQUAL(*current, stop)) reachedStop = true; -- 2.47.3