From aa6454d9abb9a70b728dbba7f40279108486a3e4 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 14 Mar 2023 07:30:09 +0000 Subject: [PATCH v10] Improve WALRead() to suck data directly from WAL buffers --- src/backend/access/transam/xlog.c | 171 ++++++++++++++++++++++++ src/backend/access/transam/xlogreader.c | 42 +++++- src/include/access/xlog.h | 6 + 3 files changed, 217 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 543d4d897a..d40b9562e1 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1639,6 +1639,177 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) return cachedPos + ptr % XLOG_BLCKSZ; } +/* + * Read WAL from WAL buffers. + * + * Read 'count' bytes of WAL from WAL buffers into 'buf', starting at location + * 'startptr', on timeline 'tli' and return total read bytes. + * + * Note that this function reads as much as it can from WAL buffers, meaning, + * it may not read all the requested 'count' bytes. Caller must be aware of + * this and deal with it. + */ +Size +XLogReadFromBuffers(XLogReaderState *state PG_USED_FOR_ASSERTS_ONLY, + XLogRecPtr startptr, + TimeLineID tli, + Size count, + char *buf) +{ + XLogRecPtr ptr = startptr; + Size nbytes = count; /* total bytes requested to be read by caller */ + Size ntotal = 0; /* total bytes read */ + Size nbatch = 0; /* bytes to be read in single batch */ + char *batchstart = NULL; /* location to read from for single batch */ + + Assert(!XLogRecPtrIsInvalid(startptr)); + Assert(count > 0); + Assert(startptr <= GetFlushRecPtr(NULL)); + Assert(!RecoveryInProgress()); + Assert(tli == GetWALInsertionTimeLine()); + + /* + * Holding WALBufMappingLock ensures inserters don't overwrite this value + * while we are reading it. We try to acquire it in shared mode so that the + * concurrent WAL readers are also allowed. We try to do as less work as + * possible while holding the lock to not impact concurrent WAL writers + * much. We quickly exit to not cause any contention, if the lock isn't + * immediately available. + */ + if (!LWLockConditionalAcquire(WALBufMappingLock, LW_SHARED)) + return ntotal; + + while (nbytes > 0) + { + XLogRecPtr expectedEndPtr; + XLogRecPtr endptr; + int idx; + char *page; + char *data; + XLogPageHeader phdr; + + idx = XLogRecPtrToBufIdx(ptr); + expectedEndPtr = ptr; + expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; + endptr = XLogCtl->xlblocks[idx]; + + /* Requested WAL isn't available in WAL buffers. */ + if (expectedEndPtr != endptr) + break; + + /* + * We found WAL buffer page containing given XLogRecPtr. Get starting + * address of the page and a pointer to the right location of given + * XLogRecPtr in that page. + */ + page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; + data = page + ptr % XLOG_BLCKSZ; + + /* + * The fact that we acquire WALBufMappingLock while reading the WAL + * buffer page itself guarantees that no one else initializes it or + * makes it ready for next use in AdvanceXLInsertBuffer(). However, we + * need to ensure that we are not reading a page that just got + * initialized. For this, we looka at the needed page header. + */ + phdr = (XLogPageHeader) page; + + /* Return, if WAL buffer page doesn't look valid. */ + if (!(phdr->xlp_magic == XLOG_PAGE_MAGIC && + phdr->xlp_pageaddr == (ptr - (ptr % XLOG_BLCKSZ)) && + phdr->xlp_tli == tli)) + break; + + /* + * Note that we don't perform all page header checks here to avoid + * extra work in production builds, callers will anyway do those checks + * extensively. However, in an assert-enabled build, we perform all the + * checks here and raise an error if failed. + */ +#ifdef USE_ASSERT_CHECKING + if (state != NULL && + !XLogReaderValidatePageHeader(state, (endptr - XLOG_BLCKSZ), + (char *) phdr)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg_internal("error while reading WAL from WAL buffers: %s", state->errormsg_buf))); +#endif + + /* Count what is wanted, not the whole page. */ + if ((data + nbytes) <= (page + XLOG_BLCKSZ)) + { + /* All the bytes are in one page. */ + nbatch += nbytes; + ntotal += nbytes; + nbytes = 0; + } + else + { + Size navailable; + + /* + * All the bytes are not in one page. Deduce available bytes on the + * current page, count them and continue to look for remaining + * bytes. + */ + navailable = XLOG_BLCKSZ - (data - page); + Assert(navailable > 0 && navailable <= nbytes); + ptr += navailable; + nbytes -= navailable; + nbatch += navailable; + ntotal += navailable; + } + + /* + * We avoid multiple memcpy calls while reading WAL. Note that we + * memcpy what we have counted so far whenever we are wrapping around + * WAL buffers (because WAL buffers are organized as cirucular array of + * pages) and continue to look for remaining WAL. + */ + if (batchstart == NULL) + { + /* Mark where the data in WAL buffers starts from. */ + batchstart = data; + } + + /* + * We are wrapping around WAL buffers, so read what we have counted so + * far. + */ + if (idx == XLogCtl->XLogCacheBlck) + { + Assert(batchstart != NULL); + Assert(nbatch > 0); + + memcpy(buf, batchstart, nbatch); + buf += nbatch; + + /* Reset for next batch. */ + batchstart = NULL; + nbatch = 0; + } + } + + /* Read what we have counted so far. */ + Assert(nbatch <= ntotal); + if (batchstart != NULL && nbatch > 0) + memcpy(buf, batchstart, nbatch); + + LWLockRelease(WALBufMappingLock); + + /* We never read more than what the caller has asked for. */ + Assert(ntotal <= count); + +#ifdef WAL_DEBUG + if (XLOG_DEBUG) + ereport(DEBUG1, + (errmsg_internal("read %zu bytes out of %zu bytes from WAL buffers for given LSN %X/%X, Timeline ID %u", + ntotal, count, LSN_FORMAT_ARGS(startptr), tli))); +#endif + + return ntotal; +} + /* * Converts a "usable byte position" to XLogRecPtr. A usable byte position * is the position starting from the beginning of WAL, excluding all WAL diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index cadea21b37..03f0cca1e6 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1486,8 +1486,7 @@ err: * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. + * When possible, this function reads data directly from WAL buffers. */ bool WALRead(XLogReaderState *state, @@ -1498,6 +1497,45 @@ WALRead(XLogReaderState *state, XLogRecPtr recptr; Size nbytes; +#ifndef FRONTEND + /* Frontend tools have no idea of WAL buffers. */ + Size nread; + + /* + * Try reading WAL from WAL buffers. We skip this step and continue the + * usual way, that is to read from WAL file, either when server is in + * recovery (standby mode, archive or crash recovery), in which case the + * WAL buffers are not used or when the server is inserting in a different + * timeline from that of the timeline that we're trying to read WAL from. + */ + if (!RecoveryInProgress() && + tli == GetWALInsertionTimeLine()) + { + nread = XLogReadFromBuffers(state, startptr, tli, count, buf); + + Assert(nread >= 0); + + /* + * Check if we have read fully (hit), partially (partial hit) or + * nothing (miss) from WAL buffers. If we have read either partially or + * nothing, then continue to read the remaining bytes the usual way, + * that is, read from WAL file. + */ + if (count == nread) + return true; /* Buffer hit, so return. */ + else if (count > nread) + { + /* + * Buffer partial hit, so reset the state to count the read bytes + * and continue. + */ + buf += nread; + startptr += nread; + count -= nread; + } + } +#endif /* FRONTEND */ + p = buf; recptr = startptr; nbytes = count; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cfe5409738..4fdd8c8b17 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -247,6 +247,12 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern Size XLogReadFromBuffers(struct XLogReaderState *state, + XLogRecPtr startptr, + TimeLineID tli, + Size count, + char *buf); + /* * Routines used by xlogrecovery.c to call back into xlog.c during recovery. */ -- 2.34.1