From bc638e2220d5b82ea3b289f646617100a44adab6 Mon Sep 17 00:00:00 2001 From: rbagga Date: Thu, 28 Aug 2025 16:09:10 -0700 Subject: [PATCH] Implement WAL-based async notifications for improved throughput - Added WAL logging for async notifications to improve scalability - Implemented async resource manager for WAL-based notification handling - Added new async descriptor files for pg_waldump support - Updated makefiles and build configuration for new components --- src/backend/access/rmgrdesc/Makefile | 1 + src/backend/access/rmgrdesc/asyncdesc.c | 58 +++ src/backend/access/rmgrdesc/meson.build | 1 + src/backend/access/transam/rmgr.c | 1 + src/backend/commands/async.c | 568 ++++++++++++++++++------ src/bin/pg_rewind/parsexlog.c | 1 + src/bin/pg_waldump/asyncdesc.c | 1 + src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/async_xlog.h | 56 +++ src/include/access/rmgrlist.h | 1 + src/include/commands/async.h | 19 + src/include/storage/proc.h | 3 + 12 files changed, 587 insertions(+), 124 deletions(-) create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c create mode 120000 src/bin/pg_waldump/asyncdesc.c create mode 100644 src/include/access/async_xlog.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index cd95eec37f1..6e6e75b12bd 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -9,6 +9,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = \ + asyncdesc.o \ brindesc.o \ clogdesc.o \ committsdesc.o \ diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c new file mode 100644 index 00000000000..b110457431f --- /dev/null +++ b/src/backend/access/rmgrdesc/asyncdesc.c @@ -0,0 +1,58 @@ +/*------------------------------------------------------------------------- + * + * asyncdesc.c + * rmgr descriptor routines for access/transam/async.c + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/asyncdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/async_xlog.h" + +void +async_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_ASYNC_NOTIFY_DATA) + { + xl_async_notify_data *xlrec = (xl_async_notify_data *) rec; + + appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u", + xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications); + } + else if (info == XLOG_ASYNC_NOTIFY_COMMIT) + { + xl_async_notify_commit *xlrec = (xl_async_notify_commit *) rec; + + appendStringInfo(buf, "notify commit: db %u xid %u notify_lsn %X/%X", + xlrec->dbid, xlrec->xid, + LSN_FORMAT_ARGS(xlrec->notify_lsn)); + } +} + +const char * +async_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_ASYNC_NOTIFY_DATA: + id = "NOTIFY_DATA"; + break; + case XLOG_ASYNC_NOTIFY_COMMIT: + id = "NOTIFY_COMMIT"; + break; + } + + return id; +} \ No newline at end of file diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build index 96c98e800c2..38bef2e87f6 100644 --- a/src/backend/access/rmgrdesc/meson.build +++ b/src/backend/access/rmgrdesc/meson.build @@ -2,6 +2,7 @@ # used by frontend programs like pg_waldump rmgr_desc_sources = files( + 'asyncdesc.c', 'brindesc.c', 'clogdesc.c', 'committsdesc.c', diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 1b7499726eb..f8c25e6597a 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -19,6 +19,7 @@ /* includes needed for "access/rmgrlist.h" */ /* IWYU pragma: begin_keep */ +#include "access/async_xlog.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..8520dbe8920 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -129,10 +129,16 @@ #include #include +#include "access/async_xlog.h" #include "access/parallel.h" #include "access/slru.h" #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "access/xlogrecovery.h" #include "catalog/pg_database.h" #include "commands/async.h" #include "common/hashfn.h" @@ -151,6 +157,16 @@ #include "utils/snapmgr.h" #include "utils/timestamp.h" +/* Missing definitions for WAL-based notification system */ +#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE +#define SLRU_PAGE_SIZE BLCKSZ +#define AsyncCtl NotifyCtl + +/* WAL record types */ +#define XLOG_ASYNC_NOTIFY_DATA 0x00 +#define XLOG_ASYNC_NOTIFY_COMMIT 0x10 + + /* * Maximum size of a NOTIFY payload, including terminating NULL. This @@ -163,30 +179,13 @@ #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) /* - * Struct representing an entry in the global notify queue - * - * This struct declaration has the maximal length, but in a real queue entry - * the data area is only big enough for the actual channel and payload strings - * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible - * entry size, if both channel and payload strings are empty (but note it - * doesn't include alignment padding). - * - * The "length" field should always be rounded up to the next QUEUEALIGN - * multiple so that all fields are properly aligned. + * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h + * as a compact metadata-only structure for the new WAL-based notification system. + * The old variable-length structure with full notification content is no longer used. */ -typedef struct AsyncQueueEntry -{ - int length; /* total allocated length of entry */ - Oid dboid; /* sender's database OID */ - TransactionId xid; /* sender's XID */ - int32 srcPid; /* sender's PID */ - char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; -} AsyncQueueEntry; - -/* Currently, no field of AsyncQueueEntry requires more than int alignment */ -#define QUEUEALIGN(len) INTALIGN(len) -#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) +/* Queue alignment is still needed for SLRU page management */ +#define QUEUEALIGN(len) INTALIGN(len) /* * Struct describing a queue position, and assorted macros for working with it @@ -440,8 +439,6 @@ static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); static bool asyncQueueIsFull(void); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); -static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); -static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); static void SignalBackends(void); @@ -457,6 +454,8 @@ static void AddEventToPendingNotifies(Notification *n); static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn); +static void processNotificationFromWAL(XLogRecPtr notify_lsn); /* * Compute the difference between two queue page numbers. @@ -890,65 +889,81 @@ PreCommit_Notify(void) } } - /* Queue any pending notifies (must happen after the above) */ + /* Write notification data to WAL if we have any */ if (pendingNotifies) { - ListCell *nextNotify; + TransactionId currentXid; + ListCell *l; + size_t total_size = 0; + uint32 nnotifications = 0; + char *notifications_data; + char *ptr; + XLogRecPtr notify_lsn; /* * Make sure that we have an XID assigned to the current transaction. * GetCurrentTransactionId is cheap if we already have an XID, but not - * so cheap if we don't, and we'd prefer not to do that work while - * holding NotifyQueueLock. + * so cheap if we don't. */ - (void) GetCurrentTransactionId(); + currentXid = GetCurrentTransactionId(); /* - * Serialize writers by acquiring a special lock that we hold till - * after commit. This ensures that queue entries appear in commit - * order, and in particular that there are never uncommitted queue - * entries ahead of committed ones, so an uncommitted transaction - * can't block delivery of deliverable notifications. - * - * We use a heavyweight lock so that it'll automatically be released - * after either commit or abort. This also allows deadlocks to be - * detected, though really a deadlock shouldn't be possible here. - * - * The lock is on "database 0", which is pretty ugly but it doesn't - * seem worth inventing a special locktag category just for this. - * (Historical note: before PG 9.0, a similar lock on "database 0" was - * used by the flatfiles mechanism.) + * Step 1: Write notification data to WAL. + * This can be done in parallel with other transactions since we're + * not holding any global locks yet. */ - LockSharedObject(DatabaseRelationId, InvalidOid, 0, - AccessExclusiveLock); + + /* First pass: calculate total size needed for serialization */ + foreach(l, pendingNotifies->events) + { + Notification *n = (Notification *) lfirst(l); + + /* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */ + total_size += 4 + n->channel_len + 1 + n->payload_len + 1; + nnotifications++; + } + + /* Allocate buffer for notification data */ + notifications_data = palloc(total_size); + ptr = notifications_data; - /* Now push the notifications into the queue */ - nextNotify = list_head(pendingNotifies->events); - while (nextNotify != NULL) + /* Second pass: serialize all notifications */ + foreach(l, pendingNotifies->events) { - /* - * Add the pending notifications to the queue. We acquire and - * release NotifyQueueLock once per page, which might be overkill - * but it does allow readers to get in while we're doing this. - * - * A full queue is very uncommon and should really not happen, - * given that we have so much space available in the SLRU pages. - * Nevertheless we need to deal with this possibility. Note that - * when we get here we are in the process of committing our - * transaction, but we have not yet committed to clog, so at this - * point in time we can still roll the transaction back. - */ - LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - asyncQueueFillWarning(); - if (asyncQueueIsFull()) - ereport(ERROR, - (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("too many notifications in the NOTIFY queue"))); - nextNotify = asyncQueueAddEntries(nextNotify); - LWLockRelease(NotifyQueueLock); + Notification *n = (Notification *) lfirst(l); + char *channel = n->data; + char *payload = n->data + n->channel_len + 1; + + /* Write channel length, payload length, channel, and payload */ + memcpy(ptr, &n->channel_len, 2); + ptr += 2; + memcpy(ptr, &n->payload_len, 2); + ptr += 2; + memcpy(ptr, channel, n->channel_len + 1); + ptr += n->channel_len + 1; + memcpy(ptr, payload, n->payload_len + 1); + ptr += n->payload_len + 1; } - /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ + /* Write notification data to WAL */ + notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid, + nnotifications, total_size, + notifications_data); + + pfree(notifications_data); + + /* + * Step 2: Record the notification LSN in transaction state. + * This will be included in the commit record later. + */ + MyProc->notifyCommitLsn = notify_lsn; + + /* + * Note: We don't add to the traditional SLRU queue here anymore. + * Instead, AtCommit_Notify will add a compact entry to the queue + * pointing to the WAL data after the transaction commits. + * We also don't clear pendingNotifies here; AtCommit_Notify will. + */ } } @@ -1006,13 +1021,34 @@ AtCommit_Notify(void) asyncQueueUnregister(); /* - * Send signals to listening backends. We need do this only if there are - * pending notifies, which were previously added to the shared queue by - * PreCommit_Notify(). + * Step 3: If we have notifications, add compact metadata to SLRU queue + * and signal listeners. This happens after transaction commit so the + * notification LSN in our commit record is now durable. */ - if (pendingNotifies != NULL) + if (pendingNotifies != NULL && !XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + /* + * Write commit record with reference to notification data. + * This establishes the connection between commit and notifications. + */ + LogAsyncNotifyCommit(MyDatabaseId, GetCurrentTransactionId(), MyProc->notifyCommitLsn); + + /* + * Add compact entry to SLRU queue pointing to WAL data. + * This is much faster than the old approach since we're only + * writing metadata, not the full notification content. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + asyncQueueAddCompactEntry(MyDatabaseId, GetCurrentTransactionId(), MyProc->notifyCommitLsn); + LWLockRelease(NotifyQueueLock); + + /* Signal listening backends to check the queue */ SignalBackends(); + /* Clear the notification LSN now that we're done with it */ + MyProc->notifyCommitLsn = InvalidXLogRecPtr; + } + /* * If it's time to try to advance the global tail pointer, do that. * @@ -1319,21 +1355,11 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength) static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) { - size_t channellen = n->channel_len; - size_t payloadlen = n->payload_len; - int entryLength; - - Assert(channellen < NAMEDATALEN); - Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); - - /* The terminators are already included in AsyncQueueEntryEmptySize */ - entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; - entryLength = QUEUEALIGN(entryLength); - qe->length = entryLength; - qe->dboid = MyDatabaseId; + /* For the new WAL-based system, we create a compact entry with metadata only */ + qe->dbid = MyDatabaseId; qe->xid = GetCurrentTransactionId(); - qe->srcPid = MyProcPid; - memcpy(qe->data, n->data, channellen + payloadlen + 2); + /* notify_lsn will be set later when we write to WAL */ + qe->notify_lsn = InvalidXLogRecPtr; } /* @@ -1405,7 +1431,7 @@ asyncQueueAddEntries(ListCell *nextNotify) offset = QUEUE_POS_OFFSET(queue_head); /* Check whether the entry really fits on the current page */ - if (offset + qe.length <= QUEUE_PAGESIZE) + if (offset + ASYNC_QUEUE_ENTRY_SIZE <= QUEUE_PAGESIZE) { /* OK, so advance nextNotify past this item */ nextNotify = lnext(pendingNotifies->events, nextNotify); @@ -1414,22 +1440,21 @@ asyncQueueAddEntries(ListCell *nextNotify) { /* * Write a dummy entry to fill up the page. Actually readers will - * only check dboid and since it won't match any reader's database + * only check dbid and since it won't match any reader's database * OID, they will ignore this entry and move on. */ - qe.length = QUEUE_PAGESIZE - offset; - qe.dboid = InvalidOid; - qe.data[0] = '\0'; /* empty channel */ - qe.data[1] = '\0'; /* empty payload */ + qe.dbid = InvalidOid; + qe.xid = InvalidTransactionId; + qe.notify_lsn = InvalidXLogRecPtr; } /* Now copy qe into the shared buffer page */ memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, &qe, - qe.length); + ASYNC_QUEUE_ENTRY_SIZE); /* Advance queue_head appropriately, and detect if page is full */ - if (asyncQueueAdvance(&(queue_head), qe.length)) + if (asyncQueueAdvance(&(queue_head), ASYNC_QUEUE_ENTRY_SIZE)) { LWLock *lock; @@ -2032,14 +2057,13 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); /* - * Advance *current over this message, possibly to the next page. As - * noted in the comments for asyncQueueReadAllNotifications, we must - * do this before possibly failing while processing the message. + * Advance *current over this compact entry. The new compact entries are + * fixed-size, making this much simpler than the old variable-length entries. */ - reachedEndOfPage = asyncQueueAdvance(current, qe->length); + reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry)); /* Ignore messages destined for other databases */ - if (qe->dboid == MyDatabaseId) + if (qe->dbid == MyDatabaseId) { if (XidInMVCCSnapshot(qe->xid, snapshot)) { @@ -2047,20 +2071,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * The source transaction is still in progress, so we can't * process this message yet. Break out of the loop, but first * back up *current so we will reprocess the message next - * time. (Note: it is unlikely but not impossible for - * TransactionIdDidCommit to fail, so we can't really avoid - * this advance-then-back-up behavior when dealing with an - * uncommitted message.) - * - * Note that we must test XidInMVCCSnapshot before we test - * TransactionIdDidCommit, else we might return a message from - * a transaction that is not yet visible to snapshots; compare - * the comments at the head of heapam_visibility.c. - * - * Also, while our own xact won't be listed in the snapshot, - * we need not check for TransactionIdIsCurrentTransactionId - * because our transaction cannot (yet) have queued any - * messages. + * time. */ *current = thisentry; reachedStop = true; @@ -2068,16 +2079,12 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, } else if (TransactionIdDidCommit(qe->xid)) { - /* qe->data is the null-terminated channel name */ - char *channel = qe->data; - - if (IsListeningOn(channel)) - { - /* payload follows channel name */ - char *payload = qe->data + strlen(channel) + 1; - - NotifyMyFrontEnd(channel, payload, qe->srcPid); - } + /* + * Step 5: Read notification data from WAL using stored LSN. + * The compact entry only contains metadata; actual notification + * content is retrieved from WAL on demand. + */ + processNotificationFromWAL(qe->notify_lsn); } else { @@ -2097,6 +2104,228 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, return reachedStop; } +/* + * processNotificationFromWAL + * + * Fetch notification data from WAL using the stored LSN and process + * the individual notifications for delivery to listening frontend. + * This implements Step 5 of the new WAL-based notification system. + */ +static void +processNotificationFromWAL(XLogRecPtr notify_lsn) +{ + XLogReaderState *xlogreader; + DecodedXLogRecord *record; + xl_async_notify_data *xlrec; + char *data; + char *ptr; + uint32_t remaining; + int srcPid; + char *errormsg; + + /* + * Create XLog reader to fetch the notification data record. + * We use a temporary reader since this is called during normal + * notification processing, not during recovery. + */ + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); + if (!xlogreader) + elog(ERROR, "failed to allocate XLog reader for notification data"); + + /* Begin reading from the specified LSN */ + { + XLogRecPtr startptr; + /* notify_lsn can be the end LSN; back up one byte and find next record */ + startptr = XLogFindNextRecord(xlogreader, notify_lsn - 1); + if (XLogRecPtrIsInvalid(startptr)) + elog(ERROR, "could not locate WAL record preceding %X/%X", + LSN_FORMAT_ARGS(notify_lsn)); + XLogBeginRead(xlogreader, startptr); + } + + /* Read the WAL record containing notification data */ + record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + elog(ERROR, "failed to read notification data from WAL at %X/%X: %s", + LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message"); + + /* Verify this is the expected record type */ + if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA) + { + elog(LOG, "Unexpected WAL record type for notification data"); + elog(LOG, "XLogRecGetRmid(xlogreader): %d", XLogRecGetRmid(xlogreader)); + elog(LOG, "XLogRecGetInfo(xlogreader): %d", XLogRecGetInfo(xlogreader)); + } + + /* Extract the notification data from the WAL record */ + xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader); + srcPid = xlrec->srcPid; + data = (char *) xlrec + SizeOfAsyncNotifyData; + ptr = data; + remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData; + + /* + * Process each notification in the serialized data. + * The format is: 2-byte channel_len, 2-byte payload_len, + * null-terminated channel, null-terminated payload. + */ + for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++) + { + uint16 channel_len; + uint16 payload_len; + char *channel; + char *payload; + + /* Read lengths */ + memcpy(&channel_len, ptr, 2); + ptr += 2; + memcpy(&payload_len, ptr, 2); + ptr += 2; + remaining -= 4; + + /* Verify we have enough data */ + if (remaining < channel_len + 1 + payload_len + 1) + break; + + /* Extract channel and payload strings */ + channel = ptr; + ptr += channel_len + 1; + payload = ptr; + ptr += payload_len + 1; + remaining -= (channel_len + 1 + payload_len + 1); + + /* Deliver notification if we're listening on this channel */ + if (IsListeningOn(channel)) + NotifyMyFrontEnd(channel, payload, srcPid); + } + + /* Clean up */ + XLogReaderFree(xlogreader); +} + +/* + * asyncQueueAddCompactEntry + * + * Add a compact entry to the notification SLRU queue containing only + * metadata (dbid, xid, notify_lsn) that points to the full notification + * data in WAL. This is much more efficient than the old approach of + * storing complete notification content in the SLRU queue. + */ +static void +asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn) +{ + AsyncQueueEntry entry; + QueuePosition queue_head; + int64 pageno; + int offset; + int slotno; + LWLock *banklock; + + /* + * Fill in the compact entry with just the metadata. + * No payload data is stored here - it's all in WAL. + */ + entry.dbid = dbid; + entry.xid = xid; + entry.notify_lsn = notify_lsn; + + /* Caller should already hold NotifyQueueLock in exclusive mode */ + queue_head = QUEUE_HEAD; + + /* + * Get the current page. If this is the first write since postmaster + * started, initialize the first page. + */ + pageno = QUEUE_POS_PAGE(queue_head); + banklock = SimpleLruGetBankLock(NotifyCtl, pageno); + + LWLockAcquire(banklock, LW_EXCLUSIVE); + + if (QUEUE_POS_IS_ZERO(queue_head)) + slotno = SimpleLruZeroPage(NotifyCtl, pageno); + else + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + + /* Mark the page dirty before writing */ + NotifyCtl->shared->page_dirty[slotno] = true; + + offset = QUEUE_POS_OFFSET(queue_head); + + /* Check if the compact entry fits on the current page */ + if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE) + { + /* Copy the compact entry to the shared buffer */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &entry, + sizeof(AsyncQueueEntry)); + + /* Advance queue head by the size of our compact entry */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + /* + * Page became full. Initialize the next page to ensure SLRU + * consistency (similar to what asyncQueueAddEntries does). + */ + LWLock *nextlock; + + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); + } + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) + { + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); + } + + /* Set cleanup flag if appropriate */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; + } + + /* Update the global queue head */ + QUEUE_HEAD = queue_head; + } + else + { + /* + * Entry doesn't fit on current page. This should be very rare with + * our small compact entries, but handle it by padding the page and + * writing to the next page. + */ + AsyncQueueEntry padding; + + memset(&padding, 0, sizeof(padding)); + padding.dbid = InvalidOid; /* Mark as padding */ + + /* Fill the rest of the page with padding */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &padding, + QUEUE_PAGESIZE - offset); + + /* Advance to next page */ + asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset); + + /* Recursively add the entry on the new page */ + QUEUE_HEAD = queue_head; + LWLockRelease(banklock); + asyncQueueAddCompactEntry(dbid, xid, notify_lsn); + return; + } + + LWLockRelease(banklock); +} + /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. @@ -2395,3 +2624,94 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + +/* + * Write a WAL record containing async notification data + * + * This logs notification data to WAL, allowing us to release locks earlier + * and maintain commit ordering through WAL's natural ordering guarantees. + */ +XLogRecPtr +LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid, + uint32 nnotifications, Size data_len, char *data) +{ + xl_async_notify_data xlrec; + XLogRecPtr recptr; + + xlrec.dbid = dboid; + xlrec.xid = xid; + xlrec.srcPid = srcPid; + xlrec.nnotifications = nnotifications; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData); + XLogRegisterData(data, data_len); + + recptr = XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA); + + return recptr; +} + +/* + * Write a WAL record marking commit with notification reference + * + * This creates a link between the transaction commit and its notification data, + * allowing listeners to efficiently locate notification data in WAL. + */ +XLogRecPtr +LogAsyncNotifyCommit(Oid dboid, TransactionId xid, XLogRecPtr notify_lsn) +{ + xl_async_notify_commit xlrec; + XLogRecPtr recptr; + + xlrec.dbid = dboid; + xlrec.xid = xid; + xlrec.notify_lsn = notify_lsn; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyCommit); + + recptr = XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_COMMIT); + + return recptr; +} + + + +/* + * Redo function for async notification WAL records + * + * During recovery, we need to replay notification records. For now, + * we'll add them to the traditional notification queue. In a complete + * implementation, replaying backends would read directly from WAL. + */ +void +async_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_ASYNC_NOTIFY_DATA: + /* + * For notification data records, we don't need to do anything + * during recovery since listeners will read directly from WAL. + * The data is already durably stored in the WAL record itself. + */ + break; + + case XLOG_ASYNC_NOTIFY_COMMIT: + /* + * For commit records, we could add the compact entry to the + * SLRU queue during recovery, but it's not strictly necessary + * since recovery typically happens with no active listeners. + * The important thing is that the WAL data is preserved. + */ + break; + + default: + elog(PANIC, "async_redo: unknown op code %u", info); + } +} + + diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 8f4b282c6b1..a2e536cc910 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -13,6 +13,7 @@ #include +#include "access/async_xlog.h" #include "access/rmgr.h" #include "access/xact.h" #include "access/xlog_internal.h" diff --git a/src/bin/pg_waldump/asyncdesc.c b/src/bin/pg_waldump/asyncdesc.c new file mode 120000 index 00000000000..0f6512e98ef --- /dev/null +++ b/src/bin/pg_waldump/asyncdesc.c @@ -0,0 +1 @@ +../../../src/backend/access/rmgrdesc/asyncdesc.c \ No newline at end of file diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index fac509ed134..b06c85bf0e7 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -8,6 +8,7 @@ #define FRONTEND 1 #include "postgres.h" +#include "access/async_xlog.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h new file mode 100644 index 00000000000..1214be82099 --- /dev/null +++ b/src/include/access/async_xlog.h @@ -0,0 +1,56 @@ +/*------------------------------------------------------------------------- + * + * async_xlog.h + * Async notification WAL definitions + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/async_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef ASYNC_XLOG_H +#define ASYNC_XLOG_H + +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" + +/* + * WAL record types for async notifications + */ +#define XLOG_ASYNC_NOTIFY_DATA 0x00 /* notification data */ +#define XLOG_ASYNC_NOTIFY_COMMIT 0x10 /* commit with notify reference */ + +/* + * WAL record for notification data (written in PreCommit_Notify) + */ +typedef struct xl_async_notify_data +{ + Oid dbid; /* database ID */ + TransactionId xid; /* transaction ID */ + int32 srcPid; /* source backend PID */ + uint32 nnotifications; /* number of notifications */ + /* followed by serialized notification data */ +} xl_async_notify_data; + +#define SizeOfAsyncNotifyData (offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32)) + +/* + * WAL record for commit with notification reference + */ +typedef struct xl_async_notify_commit +{ + Oid dbid; /* database ID */ + TransactionId xid; /* transaction ID */ + XLogRecPtr notify_lsn; /* LSN of corresponding notify data record */ +} xl_async_notify_commit; + +#define SizeOfAsyncNotifyCommit (offsetof(xl_async_notify_commit, notify_lsn) + sizeof(XLogRecPtr)) + +extern void async_redo(XLogReaderState *record); +extern void async_desc(StringInfo buf, XLogReaderState *record); +extern const char *async_identify(uint8 info); + +#endif /* ASYNC_XLOG_H */ \ No newline at end of file diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 8e7fc9db877..58293e05165 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..1d204542840 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -14,11 +14,24 @@ #define ASYNC_H #include +#include "access/xlogreader.h" extern PGDLLIMPORT bool Trace_notify; extern PGDLLIMPORT int max_notify_queue_pages; extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending; +/* + * Compact SLRU queue entry - stores metadata pointing to WAL data + */ +typedef struct AsyncQueueEntry +{ + Oid dbid; /* database ID for quick filtering */ + TransactionId xid; /* transaction ID */ + XLogRecPtr notify_lsn; /* LSN of notification data in WAL */ +} AsyncQueueEntry; + +#define ASYNC_QUEUE_ENTRY_SIZE sizeof(AsyncQueueEntry) + extern Size AsyncShmemSize(void); extern void AsyncShmemInit(void); @@ -46,4 +59,10 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +/* WAL-based notification functions */ +extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid, + uint32 nnotifications, Size data_len, char *data); +extern XLogRecPtr LogAsyncNotifyCommit(Oid dboid, TransactionId xid, XLogRecPtr notify_lsn); +extern void async_redo(XLogReaderState *record); + #endif /* ASYNC_H */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c6f5ebceefd..71459fe5529 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -301,6 +301,9 @@ struct PGPROC uint32 wait_event_info; /* proc's wait information */ + /* Support for async notifications */ + XLogRecPtr notifyCommitLsn; /* LSN of notification data for current xact */ + /* Support for group transaction status update. */ bool clogGroupMember; /* true, if member of clog group */ pg_atomic_uint32 clogGroupNext; /* next clog group member */ -- 2.47.1