From 539d97b47c4535314c23df22e5e87ecc43149f3a Mon Sep 17 00:00:00 2001 From: Martijn van Oosterhout Date: Sat, 14 Sep 2019 11:01:11 +0200 Subject: [PATCH 1/2] Improve performance of async notifications Advancing the tail pointer requires an exclusive lock which can block backends from other databases, so it's worth keeping these attempts to a minimum. Instead of tracking the slowest backend exactly we update the queue more lazily, only checking when we switch to a new SLRU page. Additionally, instead of waking up every slow backend at once, we do them one at a time. --- src/backend/commands/async.c | 167 ++++++++++++++++++++++++++--------- 1 file changed, 124 insertions(+), 43 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index f26269b5ea..ffd7c7e90b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -73,10 +73,11 @@ * Finally, after we are out of the transaction altogether, we check if * we need to signal listening backends. In SignalBackends() we scan the * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal - * to every listening backend (we don't know which backend is listening on - * which channel so we must signal them all). We can exclude backends that - * are already up to date, though. We don't bother with a self-signal - * either, but just process the queue directly. + * to every listening backend for the relavent database (we don't know + * which backend is listening on which channel so we must signal them + * all). We can exclude backends that are already up to date, though. + * We don't bother with a self-signal either, but just process the queue + * directly. * * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * sets the process's latch, which triggers the event to be processed @@ -89,13 +90,25 @@ * Inbound-notify processing consists of reading all of the notifications * that have arrived since scanning last time. We read every notification * until we reach either a notification from an uncommitted transaction or - * the head pointer's position. Then we check if we were the laziest - * backend: if our pointer is set to the same position as the global tail - * pointer is set, then we move the global tail pointer ahead to where the - * second-laziest backend is (in general, we take the MIN of the current - * head position and all active backends' new tail pointers). Whenever we - * move the global tail pointer we also truncate now-unused pages (i.e., - * delete files in pg_notify/ that are no longer used). + * the head pointer's position. + * + * 6. To avoid SLRU wraparound and minimize disk space the tail pointer + * needs to be advanced so that old pages can be truncated. This + * however requires an exclusive lock and as such should be done + * infrequently. + * + * When a new notification is added, the writer checks to see if the + * tail pointer is more than QUEUE_CLEANUP_DELAY pages behind. If + * so, it attempts to advance the tail, and if there are slow + * backends (perhaps because all the notifications were for other + * databases), wake the slowest by sending it a signal. + * + * When the slow backend processes the queue it notes it was behind + * and so also tries to advance the tail, possibly waking up another + * slow backend. Eventually all backends will have processed the + * queue and the global tail pointer is move to a new page and we + * also truncate now-unused pages (i.e., delete files in pg_notify/ + * that are no longer used). * * An application that listens on the same channel it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, @@ -199,6 +212,12 @@ typedef struct QueuePosition #define QUEUE_POS_EQUAL(x,y) \ ((x).page == (y).page && (x).offset == (y).offset) +/* compare QueuePositions */ +#define QUEUE_POS_LT(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) ? (1) : \ + (x).page != (y).page ? (0) : \ + (x).offset < (y).offset ? (1) : (0)) + /* choose logically smaller QueuePosition */ #define QUEUE_POS_MIN(x,y) \ (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \ @@ -211,6 +230,12 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* how many pages does a backend need to be behind before it needs to be signalled */ +#define QUEUE_CLEANUP_DELAY 4 + +/* is a backend so far behind it needs to be signalled? */ +#define QUEUE_SLOW_BACKEND(i) \ + (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_BACKEND_POS(i))) > QUEUE_CLEANUP_DELAY) /* * Struct describing a listening backend's status */ @@ -252,7 +277,7 @@ typedef struct QueueBackendStatus typedef struct AsyncQueueControl { QueuePosition head; /* head points to the next free location */ - QueuePosition tail; /* the global tail is equivalent to the pos of + QueuePosition tail; /* the global tail is some place older than the * the "slowest" backend */ BackendId firstListener; /* id of first listener, or InvalidBackendId */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ @@ -402,10 +427,15 @@ static bool amRegisteredListener = false; /* has this backend sent notifications in the current transaction? */ static bool backendHasSentNotifications = false; +/* has this backend switched to new page, and so should attempt to advance + * the queue tail? */ +static bool backendTryAdvanceTail = false; + /* GUC parameter */ bool Trace_notify = false; /* local function prototypes */ +static int asyncQueuePageDiff(int p, int q); static bool asyncQueuePagePrecedes(int p, int q); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); @@ -421,7 +451,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); -static bool SignalBackends(void); +static bool SignalMyDBBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, @@ -438,8 +468,8 @@ static void ClearPendingActionsAndNotifies(void); /* * We will work on the page range of 0..QUEUE_MAX_PAGE. */ -static bool -asyncQueuePagePrecedes(int p, int q) +static int +asyncQueuePageDiff(int p, int q) { int diff; @@ -455,7 +485,13 @@ asyncQueuePagePrecedes(int p, int q) diff -= QUEUE_MAX_PAGE + 1; else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) diff += QUEUE_MAX_PAGE + 1; - return diff < 0; + return diff; +} + +static bool +asyncQueuePagePrecedes(int p, int q) +{ + return asyncQueuePageDiff(p, q) < 0; } /* @@ -905,6 +941,12 @@ PreCommit_Notify(void) (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many notifications in the NOTIFY queue"))); nextNotify = asyncQueueAddEntries(nextNotify); + + /* If we are advancing to a new page, remember this so after the + * transaction commits we can attempt to advance the tail + * pointer, see ProcessCompletedNotifies() */ + if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0) + backendTryAdvanceTail = true; LWLockRelease(AsyncQueueLock); } } @@ -1051,8 +1093,6 @@ Exec_ListenPreCommit(void) * notification to the frontend. Also, although our transaction might * have executed NOTIFY, those message(s) aren't queued yet so we can't * see them in the queue. - * - * This will also advance the global tail pointer if possible. */ if (!QUEUE_POS_EQUAL(max, head)) asyncQueueReadAllNotifications(); @@ -1185,7 +1225,7 @@ ProcessCompletedNotifies(void) StartTransactionCommand(); /* Send signals to other backends */ - signalled = SignalBackends(); + signalled = SignalMyDBBackends(); if (listenChannels != NIL) { @@ -1203,6 +1243,16 @@ ProcessCompletedNotifies(void) * harmless.) */ asyncQueueAdvanceTail(); + backendTryAdvanceTail = false; + } + + if (backendTryAdvanceTail) + { + /* We switched to a new page while writing our notifies to the + * queue, so we try to advance the tail ourselves, possibly waking + * up another backend if it is running behind */ + backendTryAdvanceTail = false; + asyncQueueAdvanceTail(); } CommitTransactionCommand(); @@ -1242,8 +1292,6 @@ IsListeningOn(const char *channel) static void asyncQueueUnregister(void) { - bool advanceTail; - Assert(listenChannels == NIL); /* else caller error */ if (!amRegisteredListener) /* nothing to do */ @@ -1253,10 +1301,7 @@ asyncQueueUnregister(void) * Need exclusive lock here to manipulate list links. */ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); - /* check if entry is valid and oldest ... */ - advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && - QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); - /* ... then mark it invalid */ + /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; /* and remove it from the list */ @@ -1279,9 +1324,9 @@ asyncQueueUnregister(void) /* mark ourselves as no longer listed in the global array */ amRegisteredListener = false; - /* If we were the laziest backend, try to advance the tail pointer */ - if (advanceTail) - asyncQueueAdvanceTail(); + /* Don't try to advance the tail. We're possibly not in a + transaction to handle errors, and it'll get cleaned up later + anyway. */ } /* @@ -1570,7 +1615,7 @@ asyncQueueFillWarning(void) } /* - * Send signals to all listening backends (except our own). + * Send signals to all listening backends (except our own) for our database. * * Returns true if we sent at least one signal. * @@ -1583,7 +1628,7 @@ asyncQueueFillWarning(void) * Since we know the BackendId and the Pid the signalling is quite cheap. */ static bool -SignalBackends(void) +SignalMyDBBackends(void) { bool signalled = false; int32 *pids; @@ -1592,9 +1637,9 @@ SignalBackends(void) int32 pid; /* - * Identify all backends that are listening and not already up-to-date. We - * don't want to send signals while holding the AsyncQueueLock, so we just - * build a list of target PIDs. + * Identify all backends with MyDatabaseId that are listening and not + * already up-to-date. We don't want to send signals while holding the + * AsyncQueueLock, so we just build a list of target PIDs. * * XXX in principle these pallocs could fail, which would be bad. Maybe * preallocate the arrays? But in practice this is only run in trivial @@ -1609,7 +1654,7 @@ SignalBackends(void) { pid = QUEUE_BACKEND_PID(i); Assert(pid != InvalidPid); - if (pid != MyProcPid) + if (pid != MyProcPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId) { QueuePosition pos = QUEUE_BACKEND_POS(i); @@ -1859,6 +1904,9 @@ asyncQueueReadAllNotifications(void) Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); pos = oldpos = QUEUE_BACKEND_POS(MyBackendId); head = QUEUE_HEAD; + /* If we're behind, we possibly got signalled to catchup. Remember + * this so we attempt to advance the tail later */ + advanceTail = QUEUE_SLOW_BACKEND(MyBackendId); LWLockRelease(AsyncQueueLock); if (QUEUE_POS_EQUAL(pos, head)) @@ -1966,12 +2014,9 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(AsyncQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyBackendId) = pos; - advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); LWLockRelease(AsyncQueueLock); - /* If we were the laziest backend, try to advance the tail pointer */ - if (advanceTail) - asyncQueueAdvanceTail(); + /* We don't try to advance the tail here. */ PG_RE_THROW(); } @@ -1980,10 +2025,10 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(AsyncQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyBackendId) = pos; - advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); LWLockRelease(AsyncQueueLock); - /* If we were the laziest backend, try to advance the tail pointer */ + /* We were behind, so try to advance the tail pointer, possibly + * signalling another backend if necessary */ if (advanceTail) asyncQueueAdvanceTail(); @@ -2093,8 +2138,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, } /* - * Advance the shared queue tail variable to the minimum of all the - * per-backend tail pointers. Truncate pg_notify space if possible. + * Advance the shared queue tail variable if possible. If a slow backend is + * holding everything up, signal it. Truncate pg_notify space if possible. */ static void asyncQueueAdvanceTail(void) @@ -2103,18 +2148,54 @@ asyncQueueAdvanceTail(void) int oldtailpage; int newtailpage; int boundary; + int slowestbackendid = InvalidBackendId; + int slowestbackendpid; + /* Advance the tail as far as possible, noting if there is a slow + * backend we could kick */ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); min = QUEUE_HEAD; for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + if (QUEUE_POS_LT(QUEUE_BACKEND_POS(i), min)) + { + /* this finds the tail of the queue and remembers who */ + min = QUEUE_BACKEND_POS(i); + slowestbackendid = i; + } } oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); QUEUE_TAIL = min; + /* if the we weren't the slowest, get the pid so we can kick it */ + if (slowestbackendid != InvalidBackendId) + { + if (QUEUE_SLOW_BACKEND(slowestbackendid) && + QUEUE_BACKEND_PID(slowestbackendid) != MyProcPid) + { + slowestbackendpid = QUEUE_BACKEND_PID(slowestbackendid); + } + else + { + slowestbackendid = InvalidBackendId; + } + } LWLockRelease(AsyncQueueLock); + /* Wake up the backend furthest behind, if it is considered "slow". + * It should in turn call this function to signal the next, see + * asyncQueueReadAllNotifications() */ + if (slowestbackendid != InvalidBackendId) { + + /* Note: assuming things aren't broken, a signal failure here could + * only occur if the target backend exited since we released + * AsyncQueueLock; which is unlikely but certainly possible. So we + * just log a low-level debug message if it happens. + */ + if (SendProcSignal(slowestbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowestbackendid) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", slowestbackendpid); + } + /* * We can truncate something if the global tail advanced across an SLRU * segment boundary. -- 2.17.1