From 047c58df6aaae586efe58b6a4068b17f25976b0a Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 26 Dec 2022 08:36:28 +0000 Subject: [PATCH v2] Improve WALRead() to suck data directly from WAL buffers when possible --- src/backend/access/transam/xlog.c | 154 ++++++++++++++++++++++++ src/backend/access/transam/xlogreader.c | 47 +++++++- src/include/access/xlog.h | 6 + 3 files changed, 205 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 91473b00d9..c3138493be 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -689,6 +689,7 @@ static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli); +static char *GetXLogBufferForRead(XLogRecPtr ptr, TimeLineID tli, char *page); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); @@ -1639,6 +1640,159 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) return cachedPos + ptr % XLOG_BLCKSZ; } +/* + * Get the WAL buffer page containing passed in WAL record and also return the + * record's location within that buffer page. + */ +static char * +GetXLogBufferForRead(XLogRecPtr ptr, TimeLineID tli, char *page) +{ + XLogRecPtr expectedEndPtr; + XLogRecPtr endptr; + int idx; + char *recptr = NULL; + + idx = XLogRecPtrToBufIdx(ptr); + expectedEndPtr = ptr; + expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; + + /* + * Try to acquire WALBufMappingLock in shared mode so that the other + * concurrent WAL readers are also allowed. We try to do as less work as + * possible while holding the lock as it might impact concurrent WAL + * writers. + * + * If we cannot immediately acquire the lock, meaning the lock was busy, + * then exit quickly to not cause any contention. The caller can then + * fallback to reading WAL from WAL file. + */ + if (!LWLockConditionalAcquire(WALBufMappingLock, LW_SHARED)) + return recptr; + + /* + * Holding WALBufMappingLock ensures inserters don't overwrite this value + * while we are reading it. + */ + endptr = XLogCtl->xlblocks[idx]; + + if (expectedEndPtr == endptr) + { + XLogPageHeader phdr; + + /* + * We have found the WAL buffer page holding the given LSN. Read from a + * pointer to the right offset within the page. + */ + memcpy(page, (XLogCtl->pages + idx * (Size) XLOG_BLCKSZ), + (Size) XLOG_BLCKSZ); + + /* + * Release the lock as early as possible to avoid creating any possible + * contention. + */ + LWLockRelease(WALBufMappingLock); + + /* + * The fact that we acquire WALBufMappingLock while reading the WAL + * buffer page itself guarantees that no one else initializes it or + * makes it ready for next use in AdvanceXLInsertBuffer(). + * + * However, we perform basic page header checks for ensuring that we + * are not reading a page that got just initialized. The callers will + * anyway perform extensive page-level and record-level checks. + */ + phdr = (XLogPageHeader) page; + + if (phdr->xlp_magic == XLOG_PAGE_MAGIC && + phdr->xlp_pageaddr == (ptr - (ptr % XLOG_BLCKSZ)) && + phdr->xlp_tli == tli) + { + /* + * Page looks valid, so return the page and the requested record's + * LSN. + */ + recptr = page + ptr % XLOG_BLCKSZ; + } + } + else + { + /* We have found nothing. */ + LWLockRelease(WALBufMappingLock); + } + + return recptr; +} + +/* + * When possible, read WAL starting at 'startptr' of size 'count' bytes from + * WAL buffers into buffer passed in by the caller 'buf'. Read as much WAL as + * possible from the WAL buffers, remaining WAL, if any, the caller will take + * care of reading from WAL files directly. + * + * This function sets read bytes to 'read_bytes'. + */ +void +XLogReadFromBuffers(XLogRecPtr startptr, + TimeLineID tli, + Size count, + char *buf, + Size *read_bytes) +{ + XLogRecPtr ptr; + char *dst; + Size nbytes; + + Assert(!XLogRecPtrIsInvalid(startptr)); + Assert(count > 0); + Assert(startptr <= GetFlushRecPtr(NULL)); + Assert(!RecoveryInProgress()); + + ptr = startptr; + nbytes = count; + dst = buf; + *read_bytes = 0; + + while (nbytes > 0) + { + char page[XLOG_BLCKSZ] = {0}; + char *recptr; + + recptr = GetXLogBufferForRead(ptr, tli, page); + + if (recptr == NULL) + break; + + if ((recptr + nbytes) <= (page + XLOG_BLCKSZ)) + { + /* All the bytes are in one page. */ + memcpy(dst, recptr, nbytes); + dst += nbytes; + *read_bytes += nbytes; + ptr += nbytes; + nbytes = 0; + } + else if ((recptr + nbytes) > (page + XLOG_BLCKSZ)) + { + /* All the bytes are not in one page. */ + Size bytes_remaining; + + /* + * Compute the remaining bytes on the current page, copy them over + * to output buffer and move forward to read further. + */ + bytes_remaining = XLOG_BLCKSZ - (recptr - page); + memcpy(dst, recptr, bytes_remaining); + dst += bytes_remaining; + nbytes -= bytes_remaining; + *read_bytes += bytes_remaining; + ptr += bytes_remaining; + } + } + + elog(DEBUG1, "read %zu bytes out of %zu bytes from WAL buffers for given LSN %X/%X, Timeline ID %u", + *read_bytes, count, LSN_FORMAT_ARGS(startptr), tli); +} + /* * Converts a "usable byte position" to XLogRecPtr. A usable byte position * is the position starting from the beginning of WAL, excluding all WAL diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index a38a80e049..4a2e7af169 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1485,8 +1485,7 @@ err: * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. + * When possible, this function reads data directly from WAL buffers. */ bool WALRead(XLogReaderState *state, @@ -1497,6 +1496,50 @@ WALRead(XLogReaderState *state, XLogRecPtr recptr; Size nbytes; +#ifndef FRONTEND + /* Frontend tools have no idea of WAL buffers. */ + Size read_bytes; + + /* + * When possible, read WAL from WAL buffers. We skip this step and continue + * the usual way, that is to read from WAL file, either when the server is + * in recovery (standby mode, archive or crash recovery), in which case the + * WAL buffers are not used or when the server is inserting in a different + * timeline from that of the timeline that we're trying to read WAL from. + */ + if (!RecoveryInProgress() && + tli == GetWALInsertionTimeLine()) + { + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); + XLogReadFromBuffers(startptr, tli, count, buf, &read_bytes); + pgstat_report_wait_end(); + + /* + * Check if we have read fully (hit), partially (partial hit) or + * nothing (miss) from WAL buffers. If we have read either partially or + * nothing, then continue to read the remaining bytes the usual way, + * that is, read from WAL file. + */ + if (count == read_bytes) + { + /* Buffer hit, so return. */ + return true; + } + else if (read_bytes > 0 && count > read_bytes) + { + /* + * Buffer partial hit, so reset the state to count the read bytes + * and continue. + */ + buf += read_bytes; + startptr += read_bytes; + count -= read_bytes; + } + + /* Buffer miss i.e., read_bytes = 0, so continue */ + } +#endif /* FRONTEND */ + p = buf; recptr = startptr; nbytes = count; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 1fbd48fbda..f4e1c46b23 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -247,6 +247,12 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void XLogReadFromBuffers(XLogRecPtr startptr, + TimeLineID tli, + Size count, + char *buf, + Size *read_bytes); + /* * Routines used by xlogrecovery.c to call back into xlog.c during recovery. */ -- 2.34.1