Re: Remove page-read callback from XLogReaderState. - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Re: Remove page-read callback from XLogReaderState. |
Date | |
Msg-id | 20200324.182413.1894433151525329801.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: Remove page-read callback from XLogReaderState. (Kyotaro Horiguchi <horikyota.ntt@gmail.com>) |
Responses |
Re: Remove page-read callback from XLogReaderState.
|
List | pgsql-hackers |
5d0c2d5eba shot out this. Rebased. regards. -- Kyotaro Horiguchi NTT Open Source Software Center From 6837855f0c938b5e34039897158bc912c6619d2b Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Thu, 5 Sep 2019 20:21:55 +0900 Subject: [PATCH v7 1/4] Move callback-call from ReadPageInternal to XLogReadRecord. The current WAL record reader reads page data using a call back function. Although it is not so problematic alone, it would be a problem if we are going to do add tasks like encryption which is performed on page data before WAL reader reads them. To avoid that the record reader facility has to have a new code path corresponds to every new callback, this patch separates page reader from WAL record reading facility by modifying the current WAL record reader to a state machine. As the first step of that change, this patch moves the page reader function out of ReadPageInternal, then the remaining tasks of the function are taken over by the new function XLogNeedData. As the result XLogPageRead directly calls the page reader callback function according to the feedback from XLogNeedData. --- src/backend/access/transam/xlog.c | 16 +- src/backend/access/transam/xlogreader.c | 272 ++++++++++++++---------- src/backend/access/transam/xlogutils.c | 12 +- src/backend/replication/walsender.c | 10 +- src/bin/pg_rewind/parsexlog.c | 16 +- src/bin/pg_waldump/pg_waldump.c | 8 +- src/include/access/xlogreader.h | 23 +- src/include/access/xlogutils.h | 2 +- 8 files changed, 211 insertions(+), 148 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7621fc05e2..51c409d00e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -900,7 +900,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, +static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); @@ -4285,7 +4285,6 @@ ReadRecord(XLogReaderState *xlogreader, int emode, XLogRecord *record; XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - /* Pass through parameters to XLogPageRead */ private->fetching_ckpt = fetching_ckpt; private->emode = emode; private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); @@ -11660,7 +11659,7 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int +static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { @@ -11719,7 +11718,8 @@ retry: readLen = 0; readSource = XLOG_FROM_ANY; - return -1; + xlogreader->readLen = -1; + return false; } } @@ -11814,7 +11814,8 @@ retry: goto next_record_is_invalid; } - return readLen; + xlogreader->readLen = readLen; + return true; next_record_is_invalid: lastSourceFailed = true; @@ -11828,8 +11829,9 @@ next_record_is_invalid: /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - else - return -1; + + xlogreader->readLen = -1; + return false; } /* diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 32f02256ed..2c1500443e 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -36,8 +36,8 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); -static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, - int reqLen); +static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, + int reqLen, bool header_inclusive); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); @@ -269,7 +269,6 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) uint32 targetRecOff; uint32 pageHeaderSize; bool gotheader; - int readOff; /* * randAccess indicates whether to verify the previous-record pointer of @@ -319,14 +318,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) * byte to cover the whole record header, or at least the part of it that * fits on the same page. */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) + while (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + targetRecOff != 0)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + RecPtr, state->readBuf)) + break; + } + + if (!state->page_verified) goto err; /* - * ReadPageInternal always returns at least the page header, so we can - * examine it now. + * We have at least the page header, so we can examine it now. */ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); if (targetRecOff == 0) @@ -352,8 +357,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) goto err; } - /* ReadPageInternal has verified the page header */ - Assert(pageHeaderSize <= readOff); + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); /* * Read the record length. @@ -426,18 +431,25 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) do { + int rest_len = total_len - gotlen; + /* Calculate pointer to beginning of next page */ targetPagePtr += XLOG_BLCKSZ; /* Wait for the next page to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ)); + while (XLogNeedData(state, targetPagePtr, + Min(rest_len, XLOG_BLCKSZ), + false)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf)) + break; + } - if (readOff < 0) + if (!state->page_verified) goto err; - Assert(SizeOfXLogShortPHD <= readOff); + Assert(SizeOfXLogShortPHD <= state->readLen); /* Check that the continuation on next page looks valid */ pageHeader = (XLogPageHeader) state->readBuf; @@ -466,21 +478,14 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) /* Append the continuation from this page to the buffer */ pageHeaderSize = XLogPageHeaderSize(pageHeader); - if (readOff < pageHeaderSize) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize); - - Assert(pageHeaderSize <= readOff); + Assert(pageHeaderSize <= state->readLen); contdata = (char *) state->readBuf + pageHeaderSize; len = XLOG_BLCKSZ - pageHeaderSize; if (pageHeader->xlp_rem_len < len) len = pageHeader->xlp_rem_len; - if (readOff < pageHeaderSize + len) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize + len); - + Assert (pageHeaderSize + len <= state->readLen); memcpy(buffer, (char *) contdata, len); buffer += len; gotlen += len; @@ -510,9 +515,15 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) else { /* Wait for the record data to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) + while (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + total_len, XLOG_BLCKSZ), true)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf)) + break; + } + + if (!state->page_verified) goto err; /* Record does not cross a page boundary */ @@ -555,109 +566,139 @@ err: } /* - * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the read_page() callback. + * Checks that an xlog page loaded in state->readBuf is including at least + * [pageptr, reqLen] and the page is valid. header_inclusive indicates that + * reqLen is calculated including page header length. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * Returns false if the buffer already contains the requested data, or found + * error. state->page_verified is set to true for the former and false for the + * latter. * - * We fetch the page from a reader-local cache if we know we have the required - * data and if there hasn't been any error since caching the data. + * Otherwise returns true and requests data loaded onto state->readBuf by + * state->readPagePtr and state->readLen. The caller shall call this function + * again after filling the buffer at least with that portion of data and set + * state->readLen to the length of actually loaded data. + * + * If header_inclusive is false, corrects reqLen internally by adding the + * actual page header length and may request caller for new data. */ -static int -ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +static bool +XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, + bool header_inclusive) { - int readLen; uint32 targetPageOff; XLogSegNo targetSegNo; - XLogPageHeader hdr; + uint32 addLen = 0; - Assert((pageptr % XLOG_BLCKSZ) == 0); + /* Some data is loaded, but page header is not verified yet. */ + if (!state->page_verified && + !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0) + { + uint32 pageHeaderSize; - XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); - targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + /* just loaded new data so needs to verify page header */ - /* check whether we have all the requested data already */ - if (targetSegNo == state->seg.ws_segno && - targetPageOff == state->segoff && reqLen <= state->readLen) - return state->readLen; + /* The caller must have loaded at least page header */ + Assert (state->readLen >= SizeOfXLogShortPHD); - /* - * Data is not in our buffer. - * - * Every time we actually read the page, even if we looked at parts of it - * before, we need to do verification as the read_page callback might now - * be rereading data from a different source. - * - * Whenever switching to a new WAL segment, we read the first page of the - * file and validate its header, even if that's not where the target - * record is. This is so that we can check the additional identification - * info that is present in the first page's "long" header. - */ - if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) - { - XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; + /* + * We have enough data to check the header length. Recheck the loaded + * length against the actual header length. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* Request more data if we don't have the full header. */ + if (state->readLen < pageHeaderSize) + { + state->readLen = pageHeaderSize; + return true; + } + + /* Now that we know we have the full header, validate it. */ + if (!XLogReaderValidatePageHeader(state, state->readPagePtr, + (char *) state->readBuf)) + { + /* That's bad. Force reading the page again. */ + XLogReaderInvalReadState(state); - /* we can be sure to have enough WAL available, we scrolled back */ - Assert(readLen == XLOG_BLCKSZ); + return false; + } - if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, - state->readBuf)) - goto err; + state->page_verified = true; + + XLByteToSeg(state->readPagePtr, state->seg.ws_segno, + state->segcxt.ws_segsize); } /* - * First, read the requested data length, but at least a short page header - * so that we can validate it. + * The loaded page may not be the one caller is supposing to read when we + * are verifying the first page of new segment. In that case, skip further + * verification and immediately load the target page. */ - readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; - - Assert(readLen <= XLOG_BLCKSZ); + if (state->page_verified && pageptr == state->readPagePtr) + { - /* Do we have enough data to check the header length? */ - if (readLen <= SizeOfXLogShortPHD) - goto err; + /* + * calculate additional length for page header keeping the total + * length within the block size. + */ + if (!header_inclusive) + { + uint32 pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); - Assert(readLen >= reqLen); + addLen = pageHeaderSize; + if (reqLen + pageHeaderSize <= XLOG_BLCKSZ) + addLen = pageHeaderSize; + else + addLen = XLOG_BLCKSZ - reqLen; - hdr = (XLogPageHeader) state->readBuf; + Assert(addLen >= 0); + } - /* still not enough */ - if (readLen < XLogPageHeaderSize(hdr)) - { - readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* Return if we already have it. */ + if (reqLen + addLen <= state->readLen) + return false; } + /* Data is not in our buffer, request the caller for it. */ + XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); + targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + Assert((pageptr % XLOG_BLCKSZ) == 0); + /* - * Now that we know we have the full header, validate it. + * Every time we request to load new data of a page to the caller, even if + * we looked at a part of it before, we need to do verification on the next + * invocation as the caller might now be rereading data from a different + * source. */ - if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) - goto err; - - /* update read state information */ - state->seg.ws_segno = targetSegNo; - state->segoff = targetPageOff; - state->readLen = readLen; + state->page_verified = false; - return readLen; + /* + * Whenever switching to a new WAL segment, we read the first page of the + * file and validate its header, even if that's not where the target + * record is. This is so that we can check the additional identification + * info that is present in the first page's "long" header. + * Don't do this if the caller requested the first page in the segment. + */ + if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) + { + /* + * Then we'll see that the targetSegNo now matches the ws_segno, and + * will not come back here, but will request the actual target page. + */ + state->readPagePtr = pageptr - targetPageOff; + state->readLen = XLOG_BLCKSZ; + return true; + } -err: - XLogReaderInvalReadState(state); - return -1; + /* + * Request the caller to load the page. We need at least a short page + * header so that we can validate it. + */ + state->readPagePtr = pageptr; + state->readLen = Max(reqLen + addLen, SizeOfXLogShortPHD); + return true; } /* @@ -666,9 +707,7 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->seg.ws_segno = 0; - state->segoff = 0; - state->readLen = 0; + state->readPagePtr = InvalidXLogRecPtr; } /* @@ -949,7 +988,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -957,7 +995,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * that, except when caller has explicitly specified the offset that * falls somewhere there or when we are skipping multi-page * continuation record. It doesn't matter though because - * ReadPageInternal() is prepared to handle that and will read at + * XLogNeedData() is prepared to handle that and will read at * least short page-header worth of data */ targetRecOff = tmpRecPtr % XLOG_BLCKSZ; @@ -965,19 +1003,23 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) /* scroll back to page boundary */ targetPagePtr = tmpRecPtr - targetRecOff; - /* Read the page containing the record */ - readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); - if (readLen < 0) + while(XLogNeedData(state, targetPagePtr, targetRecOff, + targetRecOff != 0)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf)) + break; + } + + if (!state->page_verified) goto err; header = (XLogPageHeader) state->readBuf; pageHeaderSize = XLogPageHeaderSize(header); - /* make sure we have enough data for the page header */ - readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); - if (readLen < 0) - goto err; + /* we should have read the page header */ + Assert (state->readLen >= pageHeaderSize); /* skip over potential continuation data */ if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b217ffa52f..47676bf800 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -685,8 +685,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = (state->seg.ws_segno * - state->segcxt.ws_segsize + state->segoff); + const XLogRecPtr lastReadPage = state->seg.ws_segno * + state->segcxt.ws_segsize + state->readLen; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -818,7 +818,7 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, * exists for normal backends, so we have to do a check/sleep/repeat style of * loop for now. */ -int +bool read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { @@ -920,7 +920,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + state->readLen = -1; + return false; } else { @@ -938,7 +939,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ - return count; + state->readLen = count; + return true; } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 76ec3c7dd0..c66ea308d8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -762,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd) * which has to do a plain sleep/busy loop, because the walsender's latch gets * set every time WAL is flushed. */ -static int +static bool logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { @@ -782,7 +782,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + { + state->readLen = -1; + return false; + } if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ @@ -812,7 +815,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); CheckXLogRemoved(segno, sendSeg->ws_tli); - return count; + state->readLen = count; + return true; } /* diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index eb61cb8803..78ee9f3faa 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -44,7 +44,7 @@ typedef struct XLogPageReadPrivate int tliIndex; } XLogPageReadPrivate; -static int SimpleXLogPageRead(XLogReaderState *xlogreader, +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); @@ -227,7 +227,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, } /* XLogReader callback function, to read a WAL page */ -static int +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { @@ -282,7 +282,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (xlogreadfd < 0) { pg_log_error("could not open file \"%s\": %m", xlogfpath); - return -1; + xlogreader->readLen = -1; + return false; } } @@ -295,7 +296,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) { pg_log_error("could not seek in file \"%s\": %m", xlogfpath); - return -1; + xlogreader->readLen = -1; + return false; } @@ -308,13 +310,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, pg_log_error("could not read file \"%s\": read %d of %zu", xlogfpath, r, (Size) XLOG_BLCKSZ); - return -1; + xlogreader->readLen = -1; + return false; } Assert(targetSegNo == xlogreadsegno); xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli; - return XLOG_BLCKSZ; + xlogreader->readLen = XLOG_BLCKSZ; + return true; } /* diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 279acfa044..443fe33599 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -322,7 +322,7 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, /* * XLogReader read_page callback */ -static int +static bool WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff) { @@ -339,7 +339,8 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, else { private->endptr_reached = true; - return -1; + state->readLen = -1; + return false; } } @@ -364,7 +365,8 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, (Size) errinfo.wre_req); } - return count; + state->readLen = count; + return true; } /* diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 4582196e18..6ad953eea3 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -51,7 +51,7 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ -typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, +typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, @@ -134,6 +134,20 @@ struct XLogReaderState XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ + /* ---------------------------------------- + * Communication with page reader + * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes. + * ---------------------------------------- + */ + /* variables to communicate with page reader */ + XLogRecPtr readPagePtr; /* page pointer to read */ + int32 readLen; /* bytes requested to reader, or actual bytes + * read by reader, which must be larger than + * the request, or -1 on error */ + TimeLineID readPageTLI; /* TLI for data currently in readBuf */ + char *readBuf; /* buffer to store data */ + bool page_verified; /* is the page on the buffer verified? */ + /* ---------------------------------------- * Decoded representation of current record @@ -160,13 +174,6 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least - * readLen bytes) - */ - char *readBuf; - uint32 readLen; - /* last read XLOG position for data currently in readBuf */ WALSegmentContext segcxt; WALOpenSegment seg; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..dc7d894e5d 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,7 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); -extern int read_local_xlog_page(XLogReaderState *state, +extern bool read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); -- 2.18.2 From 915f8e4af7c2aff284166eedd9c5d53bee6c4853 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Tue, 10 Sep 2019 12:58:27 +0900 Subject: [PATCH v7 2/4] Move page-reader out of XLogReadRecord This is the second step of removing callbacks from WAL record reader. Since it is essential to take in additional data while reading a record, the function have to ask caller for new data while keeping working state. Thus the function is turned into a state machine. --- src/backend/access/transam/twophase.c | 11 +- src/backend/access/transam/xlog.c | 54 +- src/backend/access/transam/xlogreader.c | 647 +++++++++++------- src/backend/access/transam/xlogutils.c | 12 +- src/backend/replication/logical/logical.c | 17 +- .../replication/logical/logicalfuncs.c | 8 +- src/backend/replication/slotfuncs.c | 8 +- src/backend/replication/walsender.c | 13 +- src/bin/pg_rewind/parsexlog.c | 71 +- src/bin/pg_waldump/pg_waldump.c | 24 +- src/include/access/xlogreader.h | 90 +-- src/include/access/xlogutils.h | 4 +- src/include/replication/logical.h | 9 +- 13 files changed, 579 insertions(+), 389 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 5adf956f41..f5f6278880 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1330,8 +1330,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &read_local_xlog_page, NULL); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -1339,7 +1338,13 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errdetail("Failed while allocating a WAL reading processor."))); XLogBeginRead(xlogreader, lsn); - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!read_local_xlog_page(xlogreader)) + break; + } + if (record == NULL) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 51c409d00e..c138504f0d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -819,13 +819,6 @@ static XLogSource readSource = XLOG_FROM_ANY; static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; -typedef struct XLogPageReadPrivate -{ - int emode; - bool fetching_ckpt; /* are we fetching a checkpoint record? */ - bool randAccess; -} XLogPageReadPrivate; - /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -900,8 +893,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +static bool XLogPageRead(XLogReaderState *xlogreader, + bool fetching_ckpt, int emode, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); @@ -1212,8 +1205,7 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - NULL, NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL); if (!debug_reader) { @@ -4279,15 +4271,10 @@ CleanupBackupHistory(void) * record is available. */ static XLogRecord * -ReadRecord(XLogReaderState *xlogreader, int emode, - bool fetching_ckpt) +ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - - private->fetching_ckpt = fetching_ckpt; - private->emode = emode; - private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4295,8 +4282,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode, for (;;) { char *errormsg; + XLogReadRecordResult result; + + while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) + == XLREAD_NEED_DATA) + { + if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) + break; + + } - record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -6257,7 +6252,6 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; - XLogPageReadPrivate private; bool fast_promoted = false; struct stat st; @@ -6413,9 +6407,7 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ - MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &XLogPageRead, &private); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -11660,12 +11652,13 @@ CancelBackup(void) * sleep and retry. */ static bool -XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf) +XLogPageRead(XLogReaderState *xlogreader, + bool fetching_ckpt, int emode, bool randAccess) { - XLogPageReadPrivate *private = - (XLogPageReadPrivate *) xlogreader->private_data; - int emode = private->emode; + char *readBuf = xlogreader->readBuf; + XLogRecPtr targetPagePtr = xlogreader->readPagePtr; + int reqLen = xlogreader->readLen; + XLogRecPtr targetRecPtr = xlogreader->ReadRecPtr; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -11708,8 +11701,8 @@ retry: receivedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, + randAccess, + fetching_ckpt, targetRecPtr)) { if (readFile >= 0) @@ -11814,6 +11807,7 @@ retry: goto next_record_is_invalid; } + Assert(xlogreader->readPagePtr == targetPagePtr); xlogreader->readLen = readLen; return true; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 2c1500443e..ad85f50c77 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -40,7 +40,7 @@ static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, bool header_inclusive); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); + XLogRecPtr PrevRecPtr, XLogRecord *record); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); @@ -70,8 +70,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) * Returns NULL if the xlogreader couldn't be allocated. */ XLogReaderState * -XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, void *private_data) +XLogReaderAllocate(int wal_segment_size, const char *waldir) { XLogReaderState *state; @@ -102,9 +101,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - state->read_page = pagereadfunc; - /* system_identifier initialized to zeroes above */ - state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); @@ -239,6 +235,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->readRecordState = XLREAD_NEXT_RECORD; } /* @@ -247,314 +244,452 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the read_page callback fails to read the requested data, NULL is - * returned. The callback is expected to have reported the error; errormsg - * is set to NULL. + * This function may return XLREAD_NEED_DATA several times before returning a + * result record. The caller shall read in some new data then call this + * function again with the same parameters. * - * If the reading fails for some other reason, NULL is also returned, and - * *errormsg is set to a string with details of the failure. + * When a record is successfully read, returns XLREAD_SUCCESS with result + * record being stored in *record. Otherwise *record is NULL. * - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. + * Returns XLREAD_NEED_DATA if more data is needed to finish reading the + * current record. In that case, state->readPagePtr and state->readLen inform + * the desired position and minimum length of data needed. The caller shall + * read in the requested data and set state->readBuf to point to a buffer + * containing it. The caller must also set state->readPageTLI and + * state->readLen to indicate the timeline that it was read from, and the + * length of data that is now available (which must be >= given readLen), + * respectively. + * + * If invalid data is encountered, returns XLREAD_FAIL with *record being set to + * NULL. *errormsg is set to a string with details of the failure. + * The returned pointer (or *errormsg) points to an internal buffer that's valid + * until the next call to XLogReadRecord. + * + * + * This function runs a state machine consists of the following states. + * + * XLREAD_NEXT_RECORD : + * The initial state, if called with valid RecPtr, try to read a record at + * that position. If invalid RecPtr is given try to read a record just after + * the last one previously read. + * This state ens after setting ReadRecPtr. Then goes to XLREAD_TOT_LEN. + * + * XLREAD_TOT_LEN: + * Examining record header. Ends after reading record total + * length. recordRemainLen and recordGotLen are initialized. + * + * XLREAD_FIRST_FRAGMENT: + * Reading the first fragment. Ends with finishing reading a single + * record. Goes to XLREAD_NEXT_RECORD if that's all or + * XLREAD_CONTINUATION if we have continuation. + + * XLREAD_CONTINUATION: + * Reading continuation of record. Ends with finishing the whole record then + * goes to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates + * how much is left and readRecordBuf holds the partially assert + * record.recordContRecPtr points to the beginning of the next page where to + * continue. + * + * If wrong data found in any state, the state machine stays at the current + * state. This behavior allows to continue reading a reacord switching among + * different souces, while streaming replication. */ -XLogRecord * -XLogReadRecord(XLogReaderState *state, char **errormsg) +XLogReadRecordResult +XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) { - XLogRecPtr RecPtr; - XLogRecord *record; - XLogRecPtr targetPagePtr; - bool randAccess; - uint32 len, - total_len; - uint32 targetRecOff; - uint32 pageHeaderSize; - bool gotheader; + XLogRecord *prec; - /* - * randAccess indicates whether to verify the previous-record pointer of - * the record we're reading. We only do this if we're reading - * sequentially, which is what we initially assume. - */ - randAccess = false; + *record = NULL; /* reset error state */ *errormsg = NULL; state->errormsg_buf[0] = '\0'; - ResetDecoder(state); + switch (state->readRecordState) + { + case XLREAD_NEXT_RECORD: + ResetDecoder(state); - RecPtr = state->EndRecPtr; + if (state->ReadRecPtr != InvalidXLogRecPtr) + { + /* read the record after the one we just read */ - if (state->ReadRecPtr != InvalidXLogRecPtr) - { - /* read the record after the one we just read */ + /* + * EndRecPtr is pointing to end+1 of the previous WAL record. + * If we're at a page boundary, no more records can fit on the + * current page. We must skip over the page header, but we + * can't do that until we've read in the page, since the header + * size is variable. + */ + state->PrevRecPtr = state->ReadRecPtr; + state->ReadRecPtr = state->EndRecPtr; + } + else + { + /* + * Caller supplied a position to start at. + * + * In this case, EndRecPtr should already be pointing to a + * valid record starting position. + */ + Assert(XRecOffIsValid(state->EndRecPtr)); + state->ReadRecPtr = state->EndRecPtr; - /* - * EndRecPtr is pointing to end+1 of the previous WAL record. If - * we're at a page boundary, no more records can fit on the current - * page. We must skip over the page header, but we can't do that until - * we've read in the page, since the header size is variable. - */ - } - else - { - /* - * Caller supplied a position to start at. - * - * In this case, EndRecPtr should already be pointing to a valid - * record starting position. - */ - Assert(XRecOffIsValid(RecPtr)); - randAccess = true; - } + /* + * We cannot verify the previous-record pointer when we're + * seeking to a particular record. Reset PrevRecPtr so that we + * won't try doing that. + */ + state->PrevRecPtr = InvalidXLogRecPtr; + state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */ + } - state->currRecPtr = RecPtr; + state->record_verified = false; + state->readRecordState = XLREAD_TOT_LEN; + /* fall through */ - targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); - targetRecOff = RecPtr % XLOG_BLCKSZ; + case XLREAD_TOT_LEN: + { + uint32 total_len; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; + XLogPageHeader pageHeader; - /* - * Read the page containing the record into state->readBuf. Request enough - * byte to cover the whole record header, or at least the part of it that - * fits on the same page. - */ - while (XLogNeedData(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), - targetRecOff != 0)) - { - if (!state->read_page(state, state->readPagePtr, state->readLen, - RecPtr, state->readBuf)) - break; - } + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - if (!state->page_verified) - goto err; + /* + * Check if we have enough data. For the first record in the page, + * the requesting length doesn't contain page header. + */ + if (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + targetRecOff != 0)) + return XLREAD_NEED_DATA; - /* - * We have at least the page header, so we can examine it now. - */ - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - if (targetRecOff == 0) - { - /* - * At page start, so skip over page header. - */ - RecPtr += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) - { - report_invalid_record(state, "invalid record offset at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - goto err; - } + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; - if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - report_invalid_record(state, "contrecord is requested by %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - goto err; - } + /* examine page header now. */ + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + if (targetRecOff == 0) + { + /* At page start, so skip over page header. */ + state->ReadRecPtr += pageHeaderSize; + targetRecOff = pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + report_invalid_record(state, "invalid record offset at %X/%X", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } - /* XLogNeedData has verified the page header */ - Assert(pageHeaderSize <= state->readLen); + pageHeader = (XLogPageHeader) state->readBuf; + if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) && + targetRecOff == pageHeaderSize) + { + report_invalid_record(state, "contrecord is requested by %X/%X", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of the - * struct, so it must be on this page (the records are MAXALIGNed), but we - * cannot access any other fields until we've verified that we got the - * whole header. - */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, - randAccess)) - goto err; - gotheader = true; - } - else - { - /* XXX: more validation should be done here */ - if (total_len < SizeOfXLogRecord) - { - report_invalid_record(state, - "invalid record length at %X/%X: wanted %u, got %u", - (uint32) (RecPtr >> 32), (uint32) RecPtr, - (uint32) SizeOfXLogRecord, total_len); - goto err; - } - gotheader = false; - } + /* + * Read the record length. + * + * NB: Even though we use an XLogRecord pointer here, the whole + * record header might not fit on this page. xl_tot_len is the first + * field of the struct, so it must be on this page (the records are + * MAXALIGNed), but we cannot access any other fields until we've + * verified that we got the whole header. + */ + prec = (XLogRecord *) (state->readBuf + + state->ReadRecPtr % XLOG_BLCKSZ); + total_len = prec->xl_tot_len; - len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) - { - /* Need to reassemble record */ - char *contdata; - XLogPageHeader pageHeader; - char *buffer; - uint32 gotlen; + /* + * If the whole record header is on this page, validate it + * immediately. Otherwise do just a basic sanity check on + * xl_tot_len, and validate the rest of the header after reading it + * from the next page. The xl_tot_len check is necessary here to + * ensure that we enter the XLREAD_CONTINUATION state below; + * otherwise we might fail to apply ValidXLogRecordHeader at all. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; - /* - * Enlarge readRecordBuf as needed. - */ - if (total_len > state->readRecordBufSize && - !allocate_recordbuf(state, total_len)) - { - /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", - total_len, - (uint32) (RecPtr >> 32), (uint32) RecPtr); - goto err; - } + state->record_verified = true; + } + else + { + /* XXX: more validation should be done here */ + if (total_len < SizeOfXLogRecord) + { + report_invalid_record(state, + "invalid record length at %X/%X: wanted %u, got %u", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr, + (uint32) SizeOfXLogRecord, total_len); + goto err; + } + } - /* Copy the first fragment of the record from the first page. */ - memcpy(state->readRecordBuf, - state->readBuf + RecPtr % XLOG_BLCKSZ, len); - buffer = state->readRecordBuf + len; - gotlen = len; + /* + * Wait for the rest of the record, or the part of the record that + * fit on the first page if crossed a page boundary, to become + * available. + */ + state->recordGotLen = 0; + state->recordRemainLen = total_len; + state->readRecordState = XLREAD_FIRST_FRAGMENT; + } + /* fall through */ - do + case XLREAD_FIRST_FRAGMENT: { - int rest_len = total_len - gotlen; + uint32 total_len = state->recordRemainLen; + uint32 request_len; + uint32 record_len; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; - /* Calculate pointer to beginning of next page */ - targetPagePtr += XLOG_BLCKSZ; + /* + * Wait for the rest of the record on the first page to become + * available + */ + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - /* Wait for the next page to become available */ - while (XLogNeedData(state, targetPagePtr, - Min(rest_len, XLOG_BLCKSZ), - false)) - { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf)) - break; - } + request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); + record_len = request_len - targetRecOff; + + /* ReadRecPtr contains page header */ + Assert (targetRecOff != 0); + if (XLogNeedData(state, targetPagePtr, request_len, true)) + return XLREAD_NEED_DATA; + /* error out if caller supplied bogus page */ if (!state->page_verified) goto err; - Assert(SizeOfXLogShortPHD <= state->readLen); + prec = (XLogRecord *) (state->readBuf + targetRecOff); - /* Check that the continuation on next page looks valid */ - pageHeader = (XLogPageHeader) state->readBuf; - if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) + /* validate record header if not yet */ + if (!state->record_verified && record_len >= SizeOfXLogRecord) { - report_invalid_record(state, - "there is no contrecord flag at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - goto err; + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; + + state->record_verified = true; + } + + + if (total_len == record_len) + { + /* Record does not cross a page boundary */ + Assert(state->record_verified); + + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; + + state->record_verified = true; /* to be tidy */ + + /* We already checked the header earlier */ + state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); + + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; } /* - * Cross-check that xlp_rem_len agrees with how much of the record - * we expect there to be left. + * The record continues on the next page. Need to reassemble + * record */ - if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) + Assert(total_len > record_len); + + /* Enlarge readRecordBuf as needed. */ + if (total_len > state->readRecordBufSize && + !allocate_recordbuf(state, total_len)) { + /* We treat this as a "bogus data" condition */ report_invalid_record(state, - "invalid contrecord length %u at %X/%X", - pageHeader->xlp_rem_len, - (uint32) (RecPtr >> 32), (uint32) RecPtr); + "record length %u at %X/%X too long", + total_len, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); goto err; } - /* Append the continuation from this page to the buffer */ - pageHeaderSize = XLogPageHeaderSize(pageHeader); + /* Copy the first fragment of the record from the first page. */ + memcpy(state->readRecordBuf, state->readBuf + targetRecOff, + record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; - Assert(pageHeaderSize <= state->readLen); + /* Calculate pointer to beginning of next page */ + state->recordContRecPtr = state->ReadRecPtr + record_len; + Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); - contdata = (char *) state->readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len < len) - len = pageHeader->xlp_rem_len; + state->readRecordState = XLREAD_CONTINUATION; + } + /* fall through */ - Assert (pageHeaderSize + len <= state->readLen); - memcpy(buffer, (char *) contdata, len); - buffer += len; - gotlen += len; + case XLREAD_CONTINUATION: + { + XLogPageHeader pageHeader; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr; + + /* we enter this state only if we haven't read the whole record. */ + Assert (state->recordRemainLen > 0); - /* If we just reassembled the record header, validate it. */ - if (!gotheader) + while(state->recordRemainLen > 0) { - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, - record, randAccess)) + char *contdata; + uint32 request_len; + uint32 record_len; + + /* Wait for the next page to become available */ + targetPagePtr = state->recordContRecPtr; + + /* this request contains page header */ + Assert (targetPagePtr != 0); + if (XLogNeedData(state, targetPagePtr, + Min(state->recordRemainLen, XLOG_BLCKSZ), + false)) + return XLREAD_NEED_DATA; + + if (!state->page_verified) goto err; - gotheader = true; - } - } while (gotlen < total_len); - Assert(gotheader); + Assert(SizeOfXLogShortPHD <= state->readLen); - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* Check that the continuation on next page looks valid */ + pageHeader = (XLogPageHeader) state->readBuf; + if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) + { + report_invalid_record( + state, + "there is no contrecord flag at %X/%X reading %X/%X", + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->ReadRecPtr = RecPtr; - state->EndRecPtr = targetPagePtr + pageHeaderSize - + MAXALIGN(pageHeader->xlp_rem_len); - } - else - { - /* Wait for the record data to become available */ - while (XLogNeedData(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ), true)) - { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf)) - break; - } + /* + * Cross-check that xlp_rem_len agrees with how much of the + * record we expect there to be left. + */ + if (pageHeader->xlp_rem_len == 0 || + pageHeader->xlp_rem_len != state->recordRemainLen) + { + report_invalid_record( + state, + "invalid contrecord length %u at %X/%X reading %X/%X, expected %u", + pageHeader->xlp_rem_len, + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr, + state->recordRemainLen); + goto err; + } - if (!state->page_verified) - goto err; + /* Append the continuation from this page to the buffer */ + pageHeaderSize = XLogPageHeaderSize(pageHeader); - /* Record does not cross a page boundary */ - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* + * XLogNeedData should have ensured that the whole page header + * was read + */ + Assert(state->readLen >= pageHeaderSize); + + contdata = (char *) state->readBuf + pageHeaderSize; + record_len = XLOG_BLCKSZ - pageHeaderSize; + if (pageHeader->xlp_rem_len < record_len) + record_len = pageHeader->xlp_rem_len; + + request_len = record_len + pageHeaderSize; - state->EndRecPtr = RecPtr + MAXALIGN(total_len); + /* XLogNeedData should have ensured all needed data was read */ + Assert (state->readLen >= request_len); - state->ReadRecPtr = RecPtr; + memcpy(state->readRecordBuf + state->recordGotLen, + (char *) contdata, record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; + + /* If we just reassembled the record header, validate it. */ + if (!state->record_verified) + { + Assert(state->recordGotLen >= SizeOfXLogRecord); + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, + (XLogRecord *) state->readRecordBuf)) + goto err; + + state->record_verified = true; + } + + /* Calculate pointer to beginning of next page, and continue */ + state->recordContRecPtr += XLOG_BLCKSZ; + } + + /* targetPagePtr is pointing the last-read page here */ + prec = (XLogRecord *) state->readRecordBuf; + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; + + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + state->EndRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(pageHeader->xlp_rem_len); + + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } } /* * Special processing if it's an XLOG SWITCH record */ - if (record->xl_rmid == RM_XLOG_ID && - (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) + if ((*record)->xl_rmid == RM_XLOG_ID && + ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ state->EndRecPtr += state->segcxt.ws_segsize - 1; state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, record, errormsg)) - return record; - else - return NULL; + Assert (!*record || state->readLen >= 0); + if (DecodeXLogRecord(state, *record, errormsg)) + return XLREAD_SUCCESS; + + *record = NULL; + return XLREAD_FAIL; err: /* - * Invalidate the read state. We might read from a different source after + * Invalidate the read page. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); @@ -562,7 +697,8 @@ err: if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; - return NULL; + *record = NULL; + return XLREAD_FAIL; } /* @@ -715,11 +851,12 @@ XLogReaderInvalReadState(XLogReaderState *state) * * This is just a convenience subroutine to avoid duplicated code in * XLogReadRecord. It's not intended for use from anywhere else. + * + * If PrevRecPtr is valid, the xl_prev is is cross-checked with it. */ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, - bool randAccess) + XLogRecPtr PrevRecPtr, XLogRecord *record) { if (record->xl_tot_len < SizeOfXLogRecord) { @@ -737,7 +874,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) RecPtr); return false; } - if (randAccess) + if (PrevRecPtr == InvalidXLogRecPtr) { /* * We can't exactly verify the prev-link, but surely it should be less @@ -969,11 +1106,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * XLogReadRecord() will read the next valid record. */ XLogRecPtr -XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) +XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr, + XLogFindNextRecordCB read_page, void *private) { XLogRecPtr tmpRecPtr; XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; + XLogRecord *record; + XLogReadRecordResult result; char *errormsg; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -1006,8 +1146,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) while(XLogNeedData(state, targetPagePtr, targetRecOff, targetRecOff != 0)) { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf)) + if (!read_page(state, private)) break; } @@ -1059,8 +1198,16 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * or we just jumped over the remaining data of a continuation. */ XLogBeginRead(state, tmpRecPtr); - while (XLogReadRecord(state, &errormsg) != NULL) + while ((result = XLogReadRecord(state, &record, &errormsg)) != + XLREAD_FAIL) { + if (result == XLREAD_NEED_DATA) + { + if (!read_page(state, private)) + goto err; + continue; + } + /* past the record we've found, break out */ if (RecPtr <= state->ReadRecPtr) { diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 47676bf800..b2c2abc826 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -685,8 +685,7 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->seg.ws_segno * - state->segcxt.ws_segsize + state->readLen; + const XLogRecPtr lastReadPage = state->readPagePtr; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -701,7 +700,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * current TLI has since become historical. */ if (lastReadPage == wantPage && - state->readLen != 0 && + state->page_verified && lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1)) return; @@ -819,9 +818,11 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, * loop for now. */ bool -read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page) +read_local_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->readLen; + char *cur_page = state->readBuf; XLogRecPtr read_upto, loc; TimeLineID tli; @@ -939,6 +940,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ + state->readPagePtr = targetPagePtr; state->readLen = count; return true; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5adf253583..7782e3ad2f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -169,11 +169,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); + ctx->read_page = read_page; ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = @@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -372,7 +373,7 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -479,7 +480,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, &err); + while (XLogReadRecord(ctx->reader, &record, &err) == + XLREAD_NEED_DATA) + { + if (!ctx->read_page(ctx->reader)) + break; + } + if (err) elog(ERROR, "%s", err); if (!record) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 04510094a8..a28e05b8aa 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -269,7 +269,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin XLogRecord *record; char *errm = NULL; - record = XLogReadRecord(ctx->reader, &errm); + while (XLogReadRecord(ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->read_page(ctx->reader)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ce0c9127bc..572c4fadda 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -449,7 +449,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, &errm); + while (XLogReadRecord(ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->read_page(ctx->reader)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c66ea308d8..b165900d4f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -763,9 +763,11 @@ StartReplication(StartReplicationCmd *cmd) * set every time WAL is flushed. */ static bool -logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page) +logical_read_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->readLen; + char *cur_page = state->readBuf; XLogRecPtr flushptr; int count; WALReadError errinfo; @@ -2801,7 +2803,12 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!logical_decoding_ctx->read_page(logical_decoding_ctx->reader)) + break; + } /* xlog record was invalid */ if (errm != NULL) diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 78ee9f3faa..55337b65f1 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -39,14 +39,8 @@ static int xlogreadfd = -1; static XLogSegNo xlogreadsegno = -1; static char xlogfpath[MAXPGPATH]; -typedef struct XLogPageReadPrivate -{ - int tliIndex; -} XLogPageReadPrivate; - -static bool SimpleXLogPageRead(XLogReaderState *xlogreader, - XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, + const char *datadir, int *tliIndex); /* * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline @@ -60,18 +54,20 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; - private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, - &private); + xlogreader = XLogReaderAllocate(WalSegSz, datadir); if (xlogreader == NULL) pg_fatal("out of memory"); XLogBeginRead(xlogreader, startpoint); do { - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex)) + break; + } if (record == NULL) { @@ -108,17 +104,19 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; XLogRecPtr endptr; - private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, - &private); + xlogreader = XLogReaderAllocate(WalSegSz, datadir); if (xlogreader == NULL) pg_fatal("out of memory"); XLogBeginRead(xlogreader, ptr); - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex)) + break; + } if (record == NULL) { if (errormsg) @@ -153,7 +151,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogRecPtr searchptr; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; /* * The given fork pointer points to the end of the last common record, @@ -169,9 +166,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, forkptr += SizeOfXLogShortPHD; } - private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, - &private); + xlogreader = XLogReaderAllocate(WalSegSz, datadir); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -181,7 +176,12 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, uint8 info; XLogBeginRead(xlogreader, searchptr); - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex)) + break; + } if (record == NULL) { @@ -228,10 +228,11 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, /* XLogReader callback function, to read a WAL page */ static bool -SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf) +SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, + int *tliIndex) { - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; + XLogRecPtr targetPagePtr = xlogreader->readPagePtr; + char *readBuf = xlogreader->readBuf; uint32 targetPageOff; XLogRecPtr targetSegEnd; XLogSegNo targetSegNo; @@ -264,14 +265,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, * be done both forward and backward, consider also switching timeline * accordingly. */ - while (private->tliIndex < targetNentries - 1 && - targetHistory[private->tliIndex].end < targetSegEnd) - private->tliIndex++; - while (private->tliIndex > 0 && - targetHistory[private->tliIndex].begin >= targetSegEnd) - private->tliIndex--; - - XLogFileName(xlogfname, targetHistory[private->tliIndex].tli, + while (*tliIndex < targetNentries - 1 && + targetHistory[*tliIndex].end < targetSegEnd) + (*tliIndex)++; + while (*tliIndex > 0 && + targetHistory[*tliIndex].begin >= targetSegEnd) + (*tliIndex)--; + + XLogFileName(xlogfname, targetHistory[*tliIndex].tli, xlogreadsegno, WalSegSz); snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", @@ -316,7 +317,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, Assert(targetSegNo == xlogreadsegno); - xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli; + xlogreader->seg.ws_tli = targetHistory[*tliIndex].tli; xlogreader->readLen = XLOG_BLCKSZ; return true; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 443fe33599..0cd0d132af 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -323,10 +323,12 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, * XLogReader read_page callback */ static bool -WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetPtr, char *readBuff) +WALDumpReadPage(XLogReaderState *state, void *priv) { - XLogDumpPrivate *private = state->private_data; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->readLen; + char *readBuff = state->readBuf; + XLogDumpPrivate *private = (XLogDumpPrivate *) priv; int count = XLOG_BLCKSZ; WALReadError errinfo; @@ -365,6 +367,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, (Size) errinfo.wre_req); } + Assert(count >= state->readLen); state->readLen = count; return true; } @@ -1026,13 +1029,14 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage, - &private); + xlogreader_state = XLogReaderAllocate(WalSegSz, waldir); + if (!xlogreader_state) fatal_error("out of memory"); /* first find a valid recptr to start from */ - first_record = XLogFindNextRecord(xlogreader_state, private.startptr); + first_record = XLogFindNextRecord(xlogreader_state, private.startptr, + &WALDumpReadPage, (void*) &private); if (first_record == InvalidXLogRecPtr) fatal_error("could not find a valid record after %X/%X", @@ -1056,7 +1060,13 @@ main(int argc, char **argv) for (;;) { /* try to read the next record */ - record = XLogReadRecord(xlogreader_state, &errormsg); + while (XLogReadRecord(xlogreader_state, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!WALDumpReadPage(xlogreader_state, (void *) &private)) + break; + } + if (!record) { if (!config.follow || private.endptr_reached) diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 6ad953eea3..e59e42bee3 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -50,13 +50,6 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; -/* Function type definition for the read_page callback */ -typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader, - XLogRecPtr targetPagePtr, - int reqLen, - XLogRecPtr targetRecPtr, - char *readBuf); - typedef struct { /* Is this block ref in use? */ @@ -86,6 +79,29 @@ typedef struct uint16 data_bufsz; } DecodedBkpBlock; +/* Return code from XLogReadRecord */ +typedef enum XLogReadRecordResult +{ + XLREAD_SUCCESS, /* record is successfully read */ + XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */ + XLREAD_FAIL /* failed during reading a record */ +} XLogReadRecordResult; + +/* + * internal state of XLogReadRecord + * + * XLogReadState runs a state machine while reading a record. Theses states + * are not seen outside the function. Each state may repeat several times + * exiting requesting caller for new data. See the comment of XLogReadRecrod + * for details. + */ +typedef enum XLogReadRecordState { + XLREAD_NEXT_RECORD, + XLREAD_TOT_LEN, + XLREAD_FIRST_FRAGMENT, + XLREAD_CONTINUATION +} XLogReadRecordState; + struct XLogReaderState { /* ---------------------------------------- @@ -93,46 +109,20 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Data input callback (mandatory). - * - * This callback shall read at least reqLen valid bytes of the xlog page - * starting at targetPagePtr, and store them in readBuf. The callback - * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. - * - * targetRecPtr is the position of the WAL record we're reading. Usually - * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs - * to read and verify the page or segment header, before it reads the - * actual WAL record it's interested in. In that case, targetRecPtr can - * be used to determine which timeline to read the page from. - * - * The callback shall set ->seg.ws_tli to the TLI of the file the page was - * read from. - */ - XLogPageReadCB read_page; - /* * System identifier of the xlog files we're about to read. Set to zero * (the default value) if unknown or unimportant. */ uint64 system_identifier; - /* - * Opaque data for callbacks to use. Not used by XLogReader. - */ - void *private_data; - /* * Start and end point of last record read. EndRecPtr is also used as the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the * starting position and ReadRecPtr to invalid. */ - XLogRecPtr ReadRecPtr; /* start of last record read */ + XLogRecPtr ReadRecPtr; /* start of last record read or being read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ + XLogRecPtr PrevRecPtr; /* start of previous record read */ /* ---------------------------------------- * Communication with page reader @@ -146,7 +136,9 @@ struct XLogReaderState * the request, or -1 on error */ TimeLineID readPageTLI; /* TLI for data currently in readBuf */ char *readBuf; /* buffer to store data */ - bool page_verified; /* is the page on the buffer verified? */ + bool page_verified; /* is the page header on the buffer verified? */ + bool record_verified;/* is the current record header verified? */ + /* ---------------------------------------- @@ -186,8 +178,6 @@ struct XLogReaderState XLogRecPtr latestPagePtr; TimeLineID latestPageTLI; - /* beginning of the WAL record being read. */ - XLogRecPtr currRecPtr; /* timeline to read it from, 0 if a lookup is required */ TimeLineID currTLI; @@ -214,15 +204,22 @@ struct XLogReaderState char *readRecordBuf; uint32 readRecordBufSize; + /* + * XLogReadRecord() state + */ + XLogReadRecordState readRecordState;/* state machine state */ + int recordGotLen; /* amount of current record that has + * already been read */ + int recordRemainLen; /* length of current record that remains */ + XLogRecPtr recordContRecPtr; /* where the current record continues */ + /* Buffer to hold error message */ char *errormsg_buf; }; /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, - const char *waldir, - XLogPageReadCB pagereadfunc, - void *private_data); + const char *waldir); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); @@ -252,12 +249,17 @@ extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND -extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); +/* Function type definition for the read_page callback */ +typedef bool (*XLogFindNextRecordCB) (XLogReaderState *xlogreader, + void *private); +extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr, + XLogFindNextRecordCB read_page, void *private); #endif /* FRONTEND */ /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ -extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - char **errormsg); +extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state, + XLogRecord **record, + char **errormsg); /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index dc7d894e5d..312acb4fa3 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,9 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); -extern bool read_local_xlog_page(XLogReaderState *state, - XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page); +extern bool read_local_xlog_page(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 3b7ca7f1da..cbe9ed751c 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -29,6 +29,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC TransactionId xid ); +typedef struct LogicalDecodingContext LogicalDecodingContext; + +typedef bool (*LogicalDecodingXLogReadPageCB)(XLogReaderState *ctx); + typedef struct LogicalDecodingContext { /* memory context this is all allocated in */ @@ -39,6 +43,7 @@ typedef struct LogicalDecodingContext /* infrastructure pieces for decoding */ XLogReaderState *reader; + LogicalDecodingXLogReadPageCB read_page; struct ReorderBuffer *reorder; struct SnapBuild *snapshot_builder; @@ -95,14 +100,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); -- 2.18.2 From fe15f23e40882b5b09a291a9fcb7b0b628b7304d Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Tue, 10 Sep 2019 17:28:48 +0900 Subject: [PATCH v7 3/4] Remove globals readOff, readLen and readSegNo The first two variables are functionally duplicate with them in XLogReaderState. Remove the globals along with readSegNo, which behaves in the similar way. --- src/backend/access/transam/xlog.c | 79 ++++++++++++++----------------- src/include/access/xlogreader.h | 1 + 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c138504f0d..a3eded30ad 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -796,18 +796,14 @@ static XLogSegNo openLogSegNo = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. readLen indicates how much of the current - * page has been read into readBuf, and readSource indicates where we got - * the currently open file from. + * will be just past that page. readSource indicates where we got the + * currently open file from. * Note: we could use Reserve/ReleaseExternalFD to track consumption of * this FD too; but it doesn't currently seem worthwhile, since the XLOG is * not read by general-purpose sessions. */ static int readFile = -1; -static XLogSegNo readSegNo = 0; -static uint32 readOff = 0; -static uint32 readLen = 0; -static XLogSource readSource = XLOG_FROM_ANY; +static XLogSource readSource = 0; /* XLOG_FROM_* code */ /* * Keeps track of which source we're currently reading from. This is @@ -893,10 +889,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static bool XLogPageRead(XLogReaderState *xlogreader, +static bool XLogPageRead(XLogReaderState *state, bool fetching_ckpt, int emode, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr); + bool fetching_ckpt, + XLogRecPtr tliRecPtr, + XLogSegNo readSegNo); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -7586,7 +7584,8 @@ StartupXLOG(void) XLogRecPtr pageBeginPtr; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); + Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) == + XLogSegmentOffset(pageBeginPtr, wal_segment_size)); firstIdx = XLogRecPtrToBufIdx(EndOfLog); @@ -11652,13 +11651,14 @@ CancelBackup(void) * sleep and retry. */ static bool -XLogPageRead(XLogReaderState *xlogreader, +XLogPageRead(XLogReaderState *state, bool fetching_ckpt, int emode, bool randAccess) { - char *readBuf = xlogreader->readBuf; - XLogRecPtr targetPagePtr = xlogreader->readPagePtr; - int reqLen = xlogreader->readLen; - XLogRecPtr targetRecPtr = xlogreader->ReadRecPtr; + char *readBuf = state->readBuf; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->readLen; + int readLen = 0; + XLogRecPtr targetRecPtr = state->ReadRecPtr; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -11671,7 +11671,7 @@ XLogPageRead(XLogReaderState *xlogreader, * is not in the currently open one. */ if (readFile >= 0 && - !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) + !XLByteInSeg(targetPagePtr, state->readSegNo, wal_segment_size)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -11679,10 +11679,10 @@ XLogPageRead(XLogReaderState *xlogreader, */ if (bgwriterLaunched) { - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->readSegNo)) { (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->readSegNo)) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } @@ -11692,7 +11692,7 @@ XLogPageRead(XLogReaderState *xlogreader, readSource = XLOG_FROM_ANY; } - XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); + XLByteToSeg(targetPagePtr, state->readSegNo, wal_segment_size); retry: /* See if we need to retrieve more data */ @@ -11701,17 +11701,14 @@ retry: receivedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - randAccess, - fetching_ckpt, - targetRecPtr)) + randAccess, fetching_ckpt, + targetRecPtr, state->readSegNo)) { if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; - - xlogreader->readLen = -1; + state->readLen = -1; return false; } } @@ -11739,40 +11736,36 @@ retry: else readLen = XLOG_BLCKSZ; - /* Read the requested page */ - readOff = targetPageOff; - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff); if (r != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; int save_errno = errno; pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); + XLogFileName(fname, curFileTLI, state->readSegNo, wal_segment_size); if (r < 0) { errno = save_errno; ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + fname, targetPageOff))); } else ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, readOff, r, (Size) XLOG_BLCKSZ))); + fname, targetPageOff, r, (Size) XLOG_BLCKSZ))); goto next_record_is_invalid; } pgstat_report_wait_end(); - Assert(targetSegNo == readSegNo); - Assert(targetPageOff == readOff); + Assert(targetSegNo == state->readSegNo); Assert(reqLen <= readLen); - xlogreader->seg.ws_tli = curFileTLI; + state->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if @@ -11800,15 +11793,15 @@ retry: * Validating the page header is cheap enough that doing it twice * shouldn't be a big deal from a performance point of view. */ - if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) + if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf)) { - /* reset any error XLogReaderValidatePageHeader() might have set */ - xlogreader->errormsg_buf[0] = '\0'; + /* reset any error StateValidatePageHeader() might have set */ + state->errormsg_buf[0] = '\0'; goto next_record_is_invalid; } - Assert(xlogreader->readPagePtr == targetPagePtr); - xlogreader->readLen = readLen; + Assert(state->readPagePtr == targetPagePtr); + state->readLen = readLen; return true; next_record_is_invalid: @@ -11817,14 +11810,13 @@ next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - xlogreader->readLen = -1; + state->readLen = -1; return false; } @@ -11856,7 +11848,8 @@ next_record_is_invalid: */ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr) + bool fetching_ckpt, XLogRecPtr tliRecPtr, + XLogSegNo readSegNo) { static TimestampTz last_fail_time = 0; TimestampTz now; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index e59e42bee3..a862db6d90 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -135,6 +135,7 @@ struct XLogReaderState * read by reader, which must be larger than * the request, or -1 on error */ TimeLineID readPageTLI; /* TLI for data currently in readBuf */ + XLogSegNo readSegNo; /* Segment # for data currently in readBuf */ char *readBuf; /* buffer to store data */ bool page_verified; /* is the page header on the buffer verified? */ bool record_verified;/* is the current record header verified? */ -- 2.18.2 From 3aacd6c3910f252b5c9d3323c3cf25b1d6fe2d85 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Thu, 5 Sep 2019 21:29:32 +0900 Subject: [PATCH v7 4/4] Change policy of XLog read-buffer allocation Page buffer in XLogReaderState was allocated by XLogReaderAllcoate but actually it'd be the responsibility to the callers of XLogReadRecord, which now actually reads in pages. This patch does that. --- src/backend/access/transam/twophase.c | 2 ++ src/backend/access/transam/xlog.c | 2 ++ src/backend/access/transam/xlogreader.c | 3 --- src/backend/replication/logical/logical.c | 2 ++ src/bin/pg_rewind/parsexlog.c | 6 ++++++ src/bin/pg_waldump/pg_waldump.c | 2 ++ 6 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index f5f6278880..6f5e6e5e56 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1336,6 +1336,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), errdetail("Failed while allocating a WAL reading processor."))); + xlogreader->readBuf = palloc(XLOG_BLCKSZ); XLogBeginRead(xlogreader, lsn); while (XLogReadRecord(xlogreader, &record, &errormsg) == @@ -1366,6 +1367,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader)); memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader)); + pfree(xlogreader->readBuf); XLogReaderFree(xlogreader); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a3eded30ad..3f7f3e4d4f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6411,6 +6411,7 @@ StartupXLOG(void) (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), errdetail("Failed while allocating a WAL reading processor."))); + xlogreader->readBuf = palloc(XLOG_BLCKSZ); xlogreader->system_identifier = ControlFile->system_identifier; /* @@ -7813,6 +7814,7 @@ StartupXLOG(void) close(readFile); readFile = -1; } + pfree(xlogreader->readBuf); XLogReaderFree(xlogreader); /* diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index ad85f50c77..6b9287d68a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -106,7 +106,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir) MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) { - pfree(state->readBuf); pfree(state); return NULL; } @@ -119,7 +118,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir) if (!allocate_recordbuf(state, 0)) { pfree(state->errormsg_buf); - pfree(state->readBuf); pfree(state); return NULL; } @@ -143,7 +141,6 @@ XLogReaderFree(XLogReaderState *state) pfree(state->errormsg_buf); if (state->readRecordBuf) pfree(state->readRecordBuf); - pfree(state->readBuf); pfree(state); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7782e3ad2f..858c537558 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -174,6 +174,7 @@ StartupDecodingContext(List *output_plugin_options, ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); + ctx->reader->readBuf = palloc(XLOG_BLCKSZ); ctx->read_page = read_page; ctx->reorder = ReorderBufferAllocate(); @@ -518,6 +519,7 @@ FreeDecodingContext(LogicalDecodingContext *ctx) ReorderBufferFree(ctx->reorder); FreeSnapshotBuilder(ctx->snapshot_builder); + pfree(ctx->reader->readBuf); XLogReaderFree(ctx->reader); MemoryContextDelete(ctx->context); } diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 55337b65f1..48502151a1 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -58,6 +58,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, xlogreader = XLogReaderAllocate(WalSegSz, datadir); if (xlogreader == NULL) pg_fatal("out of memory"); + xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ); XLogBeginRead(xlogreader, startpoint); do @@ -86,6 +87,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, } while (xlogreader->ReadRecPtr != endpoint); + pg_free(xlogreader->readBuf); XLogReaderFree(xlogreader); if (xlogreadfd != -1) { @@ -109,6 +111,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) xlogreader = XLogReaderAllocate(WalSegSz, datadir); if (xlogreader == NULL) pg_fatal("out of memory"); + xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ); XLogBeginRead(xlogreader, ptr); while (XLogReadRecord(xlogreader, &record, &errormsg) == @@ -128,6 +131,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) } endptr = xlogreader->EndRecPtr; + pg_free(xlogreader->readBuf); XLogReaderFree(xlogreader); if (xlogreadfd != -1) { @@ -169,6 +173,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, xlogreader = XLogReaderAllocate(WalSegSz, datadir); if (xlogreader == NULL) pg_fatal("out of memory"); + xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ); searchptr = forkptr; for (;;) @@ -218,6 +223,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, searchptr = record->xl_prev; } + pg_free(xlogreader->readBuf); XLogReaderFree(xlogreader); if (xlogreadfd != -1) { diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 0cd0d132af..ac6740f21d 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -1033,6 +1033,7 @@ main(int argc, char **argv) if (!xlogreader_state) fatal_error("out of memory"); + xlogreader_state->readBuf = palloc(XLOG_BLCKSZ); /* first find a valid recptr to start from */ first_record = XLogFindNextRecord(xlogreader_state, private.startptr, @@ -1109,6 +1110,7 @@ main(int argc, char **argv) (uint32) xlogreader_state->ReadRecPtr, errormsg); + pfree(xlogreader_state->readBuf); XLogReaderFree(xlogreader_state); return EXIT_SUCCESS; -- 2.18.2
pgsql-hackers by date: