Re: Remove page-read callback from XLogReaderState. - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Re: Remove page-read callback from XLogReaderState. |
Date | |
Msg-id | 20190906.163318.225608207.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: Remove page-read callback from XLogReaderState. (Kyotaro Horiguchi <horikyota.ntt@gmail.com>) |
Responses |
Re: Remove page-read callback from XLogReaderState.
|
List | pgsql-hackers |
At Thu, 22 Aug 2019 10:43:52 +0900 (Tokyo Standard Time), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in <20190822.104352.26342272.horikyota.ntt@gmail.com> > I think you diff is intelligible enough for me. I'll take this if > you haven't done. Anyway I'm staring on this. - Reducing state variables It was a problem for me that there seems to be many state variables than required. So first I tried to reduce them. Now readPagePtr and readLen are used bidirectionally. XLogNeedData sets it as request and page reader set readLen to the actual length. Similarly verified* changes only when page header is verified, so I introduced page_verified instead of the variables. - Changed calling convention of XLogReadRecord To make caller loop simple, XLogReadRecord now allows to specify the same valid value while reading the a record. No longer need to change lsn to invalid after the first call in the following reader loop. while (XLogReadRecord(state, lsn, &record, &errormsg) == XLREAD_NEED_DATA) { if (!page_reader(state)) break; } - Frequent data request caused by seeing long page header. XLogNeedData now takes the fourth parameter includes_page_header. True means the caller is requesting with reqLen that is not counting page header length. But it makes the function a bit too complex than expected. Blindly requsting anticipating long page header for a new page may prevent page-reader from returning the bytes already at hand by waiting for bytes that won't come. To prevent such a case the funtion should request anticipating short page header first for a new page, then make a re-request using SizeOfLongPHD if needed. Of course it is unlikely to happen for file sources, and unlikely to harm physical replication (and the behavior is not changed). Finally, the outcome is more or less the same with just stashing the seemingly bogus retry from XLogReadRecord to XLogNeedData. If we are allowed to utilize the knowlege that long page header is attached to only the first page of a segment, such complexitly could be eliminated. - Moving page buffer allocation As for page buffer allocation, I'm not sure it is meaningful, as the reader assumes the buffer is in the same with page size, which is immutable system-wide. It would be surely meanintful if it were on the caller to decide its own block size, or loading unit. Anyway it is in the third patch. - Restored early check-out of record header The current record reader code seems to be designed to bail-out by broken record header as earlier as possible, perhaps in order to prevent impossible size of read in. So I restored the behavior. The attched are the current status, it is separated to two significant parts plus one for readability. v5-0001-Move-callback-call-from-ReadPageInternal-to-XLogR.patch: ReadPageInternal part of the patch. Moves callback calls from ReadPageInternal up to XLogReadRecord. Some of recovery tests fail applyin only this one but I don't want to put more efforts to make this state perfecgt. v5-0002-Move-page-reader-out-of-XLogReadRecord.patch The remaining part of the main work. Eliminates callback calls from XLogReadRecord. Applies to current master. Passes all regression and TAP tests. v5-0003-Change-policy-of-XLog-read-buffer-allocation.patch Separate patch to move page buffer allocation from XLogReaderAllocation from allers of XLogReadRecord. regards. -- Kyotaro Horiguchi NTT Open Source Software Center From 067c3bbb9105c391b7c19cc9602a5df6db5fb434 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Thu, 5 Sep 2019 20:21:55 +0900 Subject: [PATCH v5 1/3] Move callback-call from ReadPageInternal to XLogReadRecord. WAL reader facility used to call given page-reader callback function in the ReadPageInternal, which is two levels below from ReadRecord. That makes things a bit complex, but furthermore we are going to have additional callbacks for encryption, compression or something like that works before or after reading pages. Just adding them as new callback makes things messier. If the caller of the current ReadRecord could call page fetching function directly, things would get quite easier. As the first step of that change, this patch moves the place where that callbacks are called by 1 level above ReadPageInternal. XLogPageRead uses a loop over new function XLogNeedData and the callback directly instead of ReadPageInternal. --- src/backend/access/transam/xlog.c | 15 +- src/backend/access/transam/xlogreader.c | 355 ++++++++++++++++--------- src/backend/access/transam/xlogutils.c | 10 +- src/backend/replication/logical/logicalfuncs.c | 4 +- src/backend/replication/walsender.c | 10 +- src/bin/pg_rewind/parsexlog.c | 17 +- src/bin/pg_waldump/pg_waldump.c | 8 +- src/include/access/xlogreader.h | 30 ++- src/include/access/xlogutils.h | 2 +- src/include/replication/logicalfuncs.h | 2 +- 10 files changed, 292 insertions(+), 161 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e651a841bb..acd4ed89c4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -884,7 +884,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, +static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, @@ -11521,7 +11521,7 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int +static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI) { @@ -11580,7 +11580,8 @@ retry: readLen = 0; readSource = 0; - return -1; + xlogreader->readLen = -1; + return false; } } @@ -11675,7 +11676,8 @@ retry: goto next_record_is_invalid; } - return readLen; + xlogreader->readLen = readLen; + return true; next_record_is_invalid: lastSourceFailed = true; @@ -11689,8 +11691,9 @@ next_record_is_invalid: /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - else - return -1; + + xlogreader->readLen = -1; + return false; } /* diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c6faf48d24..1d8d470158 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -36,8 +36,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); -static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, - int reqLen); +static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, + int reqLen, bool includes_page_header); static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); static void ResetDecoder(XLogReaderState *state); @@ -100,7 +100,7 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr and EndRecPtr initialized to zeroes above */ - /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */ + /* readSegNo, readLen, readPageTLI initialized to zeroes above */ state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) @@ -224,7 +224,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) uint32 targetRecOff; uint32 pageHeaderSize; bool gotheader; - int readOff; /* * randAccess indicates whether to verify the previous-record pointer of @@ -276,15 +275,21 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) * byte to cover the whole record header, or at least the part of it that * fits on the same page. */ - readOff = ReadPageInternal(state, - targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) + while (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + true)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf, + &state->readPageTLI)) + break; + } + + if (!state->page_verified) goto err; /* - * ReadPageInternal always returns at least the page header, so we can - * examine it now. + * We have loaded at least the page header, so we can examine it now. */ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); if (targetRecOff == 0) @@ -310,8 +315,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) goto err; } - /* ReadPageInternal has verified the page header */ - Assert(pageHeaderSize <= readOff); + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); /* * Read the record length. @@ -388,14 +393,21 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) targetPagePtr += XLOG_BLCKSZ; /* Wait for the next page to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ)); + while (XLogNeedData(state, targetPagePtr, + Min(total_len - gotlen + SizeOfXLogShortPHD, + XLOG_BLCKSZ), + false)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf, + &state->readPageTLI)) + break; + } - if (readOff < 0) + if (!state->page_verified) goto err; - Assert(SizeOfXLogShortPHD <= readOff); + Assert(SizeOfXLogShortPHD <= state->readLen); /* Check that the continuation on next page looks valid */ pageHeader = (XLogPageHeader) state->readBuf; @@ -424,20 +436,38 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) /* Append the continuation from this page to the buffer */ pageHeaderSize = XLogPageHeaderSize(pageHeader); - if (readOff < pageHeaderSize) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize); + if (state->readLen < pageHeaderSize) + { + while (XLogNeedData(state, targetPagePtr, pageHeaderSize, + false)) + { + if (!state->read_page(state, + state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf, + &state->readPageTLI)) + break; + } + } - Assert(pageHeaderSize <= readOff); + Assert(pageHeaderSize <= state->readLen); contdata = (char *) state->readBuf + pageHeaderSize; len = XLOG_BLCKSZ - pageHeaderSize; if (pageHeader->xlp_rem_len < len) len = pageHeader->xlp_rem_len; - if (readOff < pageHeaderSize + len) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize + len); + if (state->readLen < pageHeaderSize + len) + { + if (XLogNeedData(state, targetPagePtr, pageHeaderSize + len, + true)) + { + if (!state->read_page(state, + state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf, + &state->readPageTLI)) + break; + } + } memcpy(buffer, (char *) contdata, len); buffer += len; @@ -468,9 +498,16 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) else { /* Wait for the record data to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) + while (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + total_len, XLOG_BLCKSZ), true)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf, + &state->readPageTLI)) + break; + } + + if (!state->page_verified) goto err; /* Record does not cross a page boundary */ @@ -504,7 +541,7 @@ err: * Invalidate the read state. We might read from a different source after * failure. */ - XLogReaderInvalReadState(state); + XLogReaderDiscardReadingPage(state); if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; @@ -513,120 +550,181 @@ err: } /* - * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the read_page() callback. + * Checks that an xlog page loaded in state->readBuf is including at least + * [pageptr, reqLen] and the page is valid. includes_page_header indicates that + * the requested region contains page header. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * Returns false if the buffer already contains the requested data, or error is + * found. state->page_verified is set to true for the former and false for the + * latter. * - * We fetch the page from a reader-local cache if we know we have the required - * data and if there hasn't been any error since caching the data. + * Otherwise returns true and requests data loaded onto state->readBuf by + * state->readPagePtr and state->readLen. The caller shall fill the buffer at + * least with that portion of data and set state->readLen to the actual length + * of loaded data before the next call to this function. + * + * If reqLen does not contain page header, includes_page_header should be + * true. This function internally adds page header to reqLen. */ -static int -ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +static bool +XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, + bool includes_page_header) { - int readLen; uint32 targetPageOff; XLogSegNo targetSegNo; - XLogPageHeader hdr; - - Assert((pageptr % XLOG_BLCKSZ) == 0); - - XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size); - targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size); + uint32 addLen = 0; /* check whether we have all the requested data already */ - if (targetSegNo == state->readSegNo && targetPageOff == state->readOff && - reqLen <= state->readLen) - return state->readLen; + if (state->page_verified && pageptr == state->readPagePtr) + { + if (includes_page_header) + { + /* + * Include page header length in request, but the total shoudn't + * exceed the block size. + */ + uint32 headerLen = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + + if (reqLen + headerLen <= XLOG_BLCKSZ) + addLen = headerLen; + else + addLen = XLOG_BLCKSZ - reqLen; + } + + if (reqLen + addLen <= state->readLen) + return false; + } + + /* Haven't loaded any data yet? Then request it. */ + if (XLogRecPtrIsInvalid(state->readPagePtr) || state->readLen < 0) + { + state->readPagePtr = pageptr; + state->readLen = Max(reqLen, SizeOfXLogShortPHD); + state->page_verified = false; + return true; + } + + if (!state->page_verified) + { + uint32 pageHeaderSize; + uint32 addLen = 0; + + /* just loaded new data so needs to verify page header */ + + /* The caller must have loaded at least page header */ + Assert(state->readLen >= SizeOfXLogShortPHD); + + /* + * We have enough data to check the header length. Recheck the loaded + * length if it is a long header if any. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); + + /* + * If we have not loaded a page so far, readLen is zero, which is + * shorter than pageHeaderSize here. + */ + if (state->readLen < pageHeaderSize) + { + state->readPagePtr = pageptr; + state->readLen = pageHeaderSize; + return true; + } + + /* + * Now that we know we have the full header, validate it. + */ + if (!XLogReaderValidatePageHeader(state, state->readPagePtr, + (char *) state->readBuf)) + { + /* force reading the page again. */ + XLogReaderDiscardReadingPage(state); + + return false; + } + + state->page_verified = true; + + XLByteToSeg(state->readPagePtr, state->readSegNo, + state->wal_segment_size); + + /* + * calculate additional length for page header so that the total length + * doesn't exceed the block size. + */ + if (includes_page_header) + { + addLen = pageHeaderSize; + if (reqLen + pageHeaderSize <= XLOG_BLCKSZ) + addLen = pageHeaderSize; + else + addLen = XLOG_BLCKSZ - reqLen; + + Assert(addLen >= 0); + } + + /* Usually we have requested data loaded in buffer here. */ + if (pageptr == state->readPagePtr && reqLen + addLen <= state->readLen) + return false; + + /* + * In the case the page is requested for the first record in the page, + * the previous request have been made not counting page header + * length. Request again for the same page with the length known to be + * needed. Otherwise we don't know the header length of the new page. + */ + if (pageptr != state->readPagePtr) + addLen = 0; + } + + /* Data is not in our buffer, make a new load request to the caller. */ + XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size); + targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size); + Assert((pageptr % XLOG_BLCKSZ) == 0); + + /* + * Every time we request to load new data of a page to the caller, even if + * we looked at a part of it before, we need to do verification on the next + * invocation as the caller might now be rereading data from a different + * source. + */ + state->page_verified = false; /* - * Data is not in our buffer. - * - * Every time we actually read the page, even if we looked at parts of it - * before, we need to do verification as the read_page callback might now - * be rereading data from a different source. - * * Whenever switching to a new WAL segment, we read the first page of the * file and validate its header, even if that's not where the target * record is. This is so that we can check the additional identification * info that is present in the first page's "long" header. + * Don't do this if the caller requested the first page in the segment. */ if (targetSegNo != state->readSegNo && targetPageOff != 0) { - XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; - - readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf, &state->readPageTLI); - if (readLen < 0) - goto err; - - /* we can be sure to have enough WAL available, we scrolled back */ - Assert(readLen == XLOG_BLCKSZ); - - if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, - state->readBuf)) - goto err; + /* + * Then we'll see that the targetSegNo now matches the readSegNo, and + * will not come back here, but will request the actual target page. + */ + state->readPagePtr = pageptr - targetPageOff; + state->readLen = XLOG_BLCKSZ; + return true; } /* - * First, read the requested data length, but at least a short page header - * so that we can validate it. + * Request the caller to load the requested page. We need at least a short + * page header so that we can validate it. */ - readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf, &state->readPageTLI); - if (readLen < 0) - goto err; - - Assert(readLen <= XLOG_BLCKSZ); - - /* Do we have enough data to check the header length? */ - if (readLen <= SizeOfXLogShortPHD) - goto err; - - Assert(readLen >= reqLen); - - hdr = (XLogPageHeader) state->readBuf; - - /* still not enough */ - if (readLen < XLogPageHeaderSize(hdr)) - { - readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf, &state->readPageTLI); - if (readLen < 0) - goto err; - } - - /* - * Now that we know we have the full header, validate it. - */ - if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) - goto err; - - /* update read state information */ - state->readSegNo = targetSegNo; - state->readOff = targetPageOff; - state->readLen = readLen; - - return readLen; - -err: - XLogReaderInvalReadState(state); - return -1; + state->readPagePtr = pageptr; + state->readLen = Max(reqLen + addLen, SizeOfXLogShortPHD); + return true; } /* - * Invalidate the xlogreader's read state to force a re-read. + * Invalidate current reading page buffer */ void -XLogReaderInvalReadState(XLogReaderState *state) +XLogReaderDiscardReadingPage(XLogReaderState *state) { - state->readSegNo = 0; - state->readOff = 0; - state->readLen = 0; + state->readPagePtr = InvalidXLogRecPtr; } /* @@ -904,7 +1002,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -912,7 +1009,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * that, except when caller has explicitly specified the offset that * falls somewhere there or when we are skipping multi-page * continuation record. It doesn't matter though because - * ReadPageInternal() is prepared to handle that and will read at + * CheckPage() is prepared to handle that and will read at * least short page-header worth of data */ targetRecOff = tmpRecPtr % XLOG_BLCKSZ; @@ -921,8 +1018,15 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) targetPagePtr = tmpRecPtr - targetRecOff; /* Read the page containing the record */ - readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); - if (readLen < 0) + while(XLogNeedData(state, targetPagePtr, targetRecOff, true)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf, + &state->readPageTLI)) + break; + } + + if (!state->page_verified) goto err; header = (XLogPageHeader) state->readBuf; @@ -930,8 +1034,15 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) pageHeaderSize = XLogPageHeaderSize(header); /* make sure we have enough data for the page header */ - readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); - if (readLen < 0) + while (XLogNeedData(state, targetPagePtr, pageHeaderSize, false)) + { + if (!state->read_page(state, state->readPagePtr, state->readLen, + state->ReadRecPtr, state->readBuf, + &state->readPageTLI)) + break; + } + + if (!state->page_verified) goto err; /* skip over potential continuation data */ @@ -989,7 +1100,7 @@ out: /* Reset state to what we had before finding the record */ state->ReadRecPtr = saved_state.ReadRecPtr; state->EndRecPtr = saved_state.EndRecPtr; - XLogReaderInvalReadState(state); + XLogReaderDiscardReadingPage(state); return found; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 1fc39333f1..dad9074b9f 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -803,7 +803,7 @@ void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { const XLogRecPtr lastReadPage = state->readSegNo * - state->wal_segment_size + state->readOff; + state->wal_segment_size + state->readLen; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -907,7 +907,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * exists for normal backends, so we have to do a check/sleep/repeat style of * loop for now. */ -int +bool read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) @@ -1009,7 +1009,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + state->readLen = -1; + return false; } else { @@ -1026,5 +1027,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, XLOG_BLCKSZ); /* number of valid bytes in the buffer */ - return count; + state->readLen = count; + return true; } diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index d974400d6e..7210a940bd 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -114,12 +114,12 @@ check_permissions(void) (errmsg("must be superuser or replication role to use replication slots")))); } -int +bool logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) { return read_local_xlog_page(state, targetPagePtr, reqLen, - targetRecPtr, cur_page, pageTLI); + targetRecPtr, cur_page, pageTLI); } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 23870a25a5..cc35e2a04d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -761,7 +761,7 @@ StartReplication(StartReplicationCmd *cmd) * which has to do a plain sleep/busy loop, because the walsender's latch gets * set every time WAL is flushed. */ -static int +static bool logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) { @@ -779,7 +779,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + { + state->readLen = -1; + return false; + } if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ @@ -789,7 +792,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* now actually read the data, we know it's there */ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); - return count; + state->readLen = count; + return true; } /* diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 63c3879ead..4df53964e4 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -47,7 +47,7 @@ typedef struct XLogPageReadPrivate int tliIndex; } XLogPageReadPrivate; -static int SimpleXLogPageRead(XLogReaderState *xlogreader, +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI); @@ -235,7 +235,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, } /* XLogReader callback function, to read a WAL page */ -static int +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI) @@ -290,7 +290,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (xlogreadfd < 0) { pg_log_error("could not open file \"%s\": %m", xlogfpath); - return -1; + xlogreader->readLen = -1; + return false; } } @@ -303,7 +304,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) { pg_log_error("could not seek in file \"%s\": %m", xlogfpath); - return -1; + xlogreader->readLen = -1; + return false; } @@ -316,13 +318,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, pg_log_error("could not read file \"%s\": read %d of %zu", xlogfpath, r, (Size) XLOG_BLCKSZ); - return -1; + xlogreader->readLen = -1; + return false; } Assert(targetSegNo == xlogreadsegno); *pageTLI = targetHistory[private->tliIndex].tli; - return XLOG_BLCKSZ; + + xlogreader->readLen = XLOG_BLCKSZ; + return true; } /* diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index b95d467805..96d1f36ebc 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -421,7 +421,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, /* * XLogReader read_page callback */ -static int +static bool XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI) { @@ -437,14 +437,16 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, else { private->endptr_reached = true; - return -1; + state->readLen = -1; + return false; } } XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr, readBuff, count); - return count; + state->readLen = count; + return true; } /* diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index aa9bc63725..030f56802b 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -34,7 +34,7 @@ typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ -typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, +typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, @@ -123,6 +123,18 @@ struct XLogReaderState XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ + /* ---------------------------------------- + * Communication with page reader + * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes. + * ---------------------------------------- + */ + /* variables to communicate with page reader */ + XLogRecPtr readPagePtr; /* page pointer to read */ + int32 readLen; /* bytes requested to reader, or actual bytes + * read by reader, which must be larger than + * the request, or -1 on error */ + TimeLineID readPageTLI; /* TLI for data currently in readBuf */ + char *readBuf; /* buffer to store data */ /* ---------------------------------------- * Decoded representation of current record @@ -149,17 +161,9 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least - * readLen bytes) - */ - char *readBuf; - uint32 readLen; - - /* last read segment, segment offset, TLI for data currently in readBuf */ + /* last read segment and segment offset for data currently in readBuf */ + bool page_verified; XLogSegNo readSegNo; - uint32 readOff; - TimeLineID readPageTLI; /* * beginning of prior page read, and its TLI. Doesn't necessarily @@ -216,8 +220,8 @@ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, extern bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr); -/* Invalidate read state */ -extern void XLogReaderInvalReadState(XLogReaderState *state); +/* Discard bufferd page */ +extern void XLogReaderDiscardReadingPage(XLogReaderState *state); #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 4105b59904..0842af9f95 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,7 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); -extern int read_local_xlog_page(XLogReaderState *state, +extern bool read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index a9c178a9e6..8e52b1f4aa 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -11,7 +11,7 @@ #include "replication/logical.h" -extern int logical_read_local_xlog_page(XLogReaderState *state, +extern bool logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); -- 2.16.3 From 6228af08158ac05950fb11e2c27e703ec87b6d58 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Thu, 5 Sep 2019 21:20:18 +0900 Subject: [PATCH v5 2/3] Move page-reader out of XLogReadRecord This is the second step of removing callbacks from WAL reading. Moves page-reader calls out of XLogReadRecord. Since it is essential to reading additional data while reading a record, the function necessarily ask caller to do so while keeping working state. Thus the function now works as a state machine. --- src/backend/access/transam/twophase.c | 13 +- src/backend/access/transam/xlog.c | 53 +- src/backend/access/transam/xlogreader.c | 678 +++++++++++++++---------- src/backend/access/transam/xlogutils.c | 14 +- src/backend/replication/logical/logical.c | 17 +- src/backend/replication/logical/logicalfuncs.c | 14 +- src/backend/replication/slotfuncs.c | 8 +- src/backend/replication/walsender.c | 14 +- src/bin/pg_rewind/parsexlog.c | 83 ++- src/bin/pg_waldump/pg_waldump.c | 26 +- src/include/access/xlogreader.h | 90 ++-- src/include/access/xlogutils.h | 5 +- src/include/replication/logical.h | 9 +- src/include/replication/logicalfuncs.h | 5 +- 14 files changed, 597 insertions(+), 432 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 477709bbc2..929da4eef2 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1382,19 +1382,24 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) { - XLogRecord *record; + XLogRecord *record = NULL; XLogReaderState *xlogreader; char *errormsg; - xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page, - NULL); + xlogreader = XLogReaderAllocate(wal_segment_size); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), errdetail("Failed while allocating a WAL reading processor."))); - record = XLogReadRecord(xlogreader, lsn, &errormsg); + while (XLogReadRecord(xlogreader, lsn, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!read_local_xlog_page(xlogreader)) + break; + } + if (record == NULL) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index acd4ed89c4..96e2115aad 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -803,13 +803,6 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */ static bool lastSourceFailed = false; -typedef struct XLogPageReadPrivate -{ - int emode; - bool fetching_ckpt; /* are we fetching a checkpoint record? */ - bool randAccess; -} XLogPageReadPrivate; - /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -884,9 +877,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); -static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *readTLI); +static bool XLogPageRead(XLogReaderState *xlogreader, + bool fetching_ckpt, int emode, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); @@ -1195,7 +1187,7 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); + debug_reader = XLogReaderAllocate(wal_segment_size); if (!debug_reader) { @@ -4246,13 +4238,8 @@ static XLogRecord * ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, bool fetching_ckpt) { - XLogRecord *record; - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - - /* Pass through parameters to XLogPageRead */ - private->fetching_ckpt = fetching_ckpt; - private->emode = emode; - private->randAccess = (RecPtr != InvalidXLogRecPtr); + XLogRecord *record = NULL; + bool randAccess = (RecPtr != InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4260,8 +4247,15 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, for (;;) { char *errormsg; + XLogReadRecordResult result; + + while ((result = XLogReadRecord(xlogreader, RecPtr, &record, &errormsg)) + == XLREAD_NEED_DATA) + { + if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) + break; + } - record = XLogReadRecord(xlogreader, RecPtr, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -6211,7 +6205,6 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; - XLogPageReadPrivate private; bool fast_promoted = false; struct stat st; @@ -6352,8 +6345,7 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ - MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private); + xlogreader = XLogReaderAllocate(wal_segment_size); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -11522,12 +11514,14 @@ CancelBackup(void) * sleep and retry. */ static bool -XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI) +XLogPageRead(XLogReaderState *xlogreader, + bool fetching_ckpt, int emode, bool randAccess) { - XLogPageReadPrivate *private = - (XLogPageReadPrivate *) xlogreader->private_data; - int emode = private->emode; + char *readBuf = xlogreader->readBuf; + XLogRecPtr targetPagePtr = xlogreader->readPagePtr; + int reqLen = xlogreader->readLen; + XLogRecPtr targetRecPtr = xlogreader->ReadRecPtr; + TimeLineID *readTLI = &xlogreader->readPageTLI; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -11570,8 +11564,8 @@ retry: receivedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, + randAccess, + fetching_ckpt, targetRecPtr)) { if (readFile >= 0) @@ -11676,6 +11670,7 @@ retry: goto next_record_is_invalid; } + Assert(xlogreader->readPagePtr == targetPagePtr); xlogreader->readLen = readLen; return true; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 1d8d470158..66bd9eb8d7 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -33,7 +33,7 @@ static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); + XLogRecPtr PrevRecPtr, XLogRecord *record); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, @@ -67,8 +67,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) * Returns NULL if the xlogreader couldn't be allocated. */ XLogReaderState * -XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, - void *private_data) +XLogReaderAllocate(int wal_segment_size) { XLogReaderState *state; @@ -96,11 +95,6 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, } state->wal_segment_size = wal_segment_size; - state->read_page = pagereadfunc; - /* system_identifier initialized to zeroes above */ - state->private_data = private_data; - /* ReadRecPtr and EndRecPtr initialized to zeroes above */ - /* readSegNo, readLen, readPageTLI initialized to zeroes above */ state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) @@ -200,209 +194,339 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) /* * Attempt to read an XLOG record. * - * If RecPtr is valid, try to read a record at that position. Otherwise - * try to read a record just after the last one previously read. + * When starting to read a new record, valid RecPtr starts reading the record + * at that position. If invalid RecPtr is given try to start reading a record + * just after the last one previously read. Anytime (means in any internal + * state) when valid new RecPtr is given, starts reading the record at that + * position. This function may return XLREAD_NEED_DATA several times before + * returning a result record. The caller shall read in some new data then call + * this function again with the same parameters. * - * If the read_page callback fails to read the requested data, NULL is - * returned. The callback is expected to have reported the error; errormsg - * is set to NULL. + * When a record is successfully read, returns XLREAD_SUCCESS with result + * record being stored in *record. * - * If the reading fails for some other reason, NULL is also returned, and - * *errormsg is set to a string with details of the failure. + * Returns XLREAD_NEED_DATA if more data is needed to finish reading the + * current record. In that case, state->readPagePtr and state->readLen inform + * the desired position and minimum length of data needed. The caller shall + * read in the requested data and set state->readBuf to point to a buffer + * containing it. The caller must also set state->readPageTLI and + * state->readLen to indicate the timeline that it was read from, and the + * length of data that is now available (which must be >= given readLen), + * respectively. * - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. - */ -XLogRecord * -XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) -{ - XLogRecord *record; - XLogRecPtr targetPagePtr; - bool randAccess; - uint32 len, - total_len; - uint32 targetRecOff; - uint32 pageHeaderSize; - bool gotheader; + * If invalid data is encountered, returns XLREAD_FAIL with *record being set to + * NULL. *errormsg is set to a string with details of the failure. + * The returned pointer (or *errormsg) points to an internal buffer that's valid + * until the next call to XLogReadRecord. + * + * + * This function runs a state machine consists of the following states. + * + * XLREAD_NEXT_RECORD : + * The initial state, if called with valid RecPtr, try to read a record at + * that position. If invalid RecPtr is given try to read a record just after + * the last one previously read. + * This state ens after setting ReadRecPtr. The goes to XLREAD_NEED_TOT_LEN. + * + * XLREAD_NEED_TOT_LEN: + * Examining record header. Ends after reading record total + * length. recordRemainLen and recordGotLen are initialized. + * + * XLREAD_NEED_FIRST_FRAGMENT: + * Reading the first fragment. Ends with finishing reading a single + * record. Goes to XLREAD_NEXT_RECORD if that's all or + * XLREAD_NEED_CONTINUATION if we have continuation. - /* - * randAccess indicates whether to verify the previous-record pointer of - * the record we're reading. We only do this if we're reading - * sequentially, which is what we initially assume. - */ - randAccess = false; + * XLREAD_NEED_CONTINUATION: + * Reading continuation of record. Ends with finishing the whole record then + * goes to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates + * how much is left and readRecordBuf holds the partially assert + * record.recordContRecPtr points to the beginning of the next page where to + * continue. + * + * If wrong data found in any state, the state machine stays at the current + * state. This behavior allows to continue reading a reacord switching among + * different souces, while streaming replication. + */ +XLogReadRecordResult +XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecord **record, + char **errormsg) +{ + XLogRecord *prec; /* reset error state */ *errormsg = NULL; state->errormsg_buf[0] = '\0'; - ResetDecoder(state); - - if (RecPtr == InvalidXLogRecPtr) - { - /* No explicit start point; read the record after the one we just read */ - RecPtr = state->EndRecPtr; - - if (state->ReadRecPtr == InvalidXLogRecPtr) - randAccess = true; - - /* - * RecPtr is pointing to end+1 of the previous WAL record. If we're - * at a page boundary, no more records can fit on the current page. We - * must skip over the page header, but we can't do that until we've - * read in the page, since the header size is variable. - */ - } - else - { - /* - * Caller supplied a position to start at. - * - * In this case, the passed-in record pointer should already be - * pointing to a valid record starting position. - */ - Assert(XRecOffIsValid(RecPtr)); - randAccess = true; - } - - state->currRecPtr = RecPtr; - - targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); - targetRecOff = RecPtr % XLOG_BLCKSZ; - /* - * Read the page containing the record into state->readBuf. Request enough - * byte to cover the whole record header, or at least the part of it that - * fits on the same page. + * Reset to the initial state anytime the caller requested new record. */ - while (XLogNeedData(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), - true)) - { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf, - &state->readPageTLI)) - break; - } + if (RecPtr != InvalidXLogRecPtr && RecPtr != state->ReadRecPtr) + state->readRecordState = XLREAD_NEXT_RECORD; - if (!state->page_verified) - goto err; - - /* - * We have loaded at least the page header, so we can examine it now. - */ - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - if (targetRecOff == 0) +again: + switch (state->readRecordState) { - /* - * At page start, so skip over page header. - */ - RecPtr += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) - { - report_invalid_record(state, "invalid record offset at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - goto err; - } + case XLREAD_NEXT_RECORD: + ResetDecoder(state); - if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - report_invalid_record(state, "contrecord is requested by %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - goto err; - } + if (RecPtr != InvalidXLogRecPtr) + { + /* + * Caller supplied a position to start at. + * + * In this case, the passed-in record pointer should already be + * pointing to a valid record starting position. + */ + state->ReadRecPtr = RecPtr; - /* XLogNeedData has verified the page header */ - Assert(pageHeaderSize <= state->readLen); + /* + * We cannot verify the previous-record pointer when we're + * seeking to a particular record. Reset ReadRecPtr so that we + * won't try doing that. + */ + state->PrevRecPtr = InvalidXLogRecPtr; + state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */ + } + else + { + /* + * Otherwise, read the record after the one we just read. (Or + * the first record, if this is the first call. In that case, + * EndRecPtr was set to the desired starting point above.) + * + * EndRecPtr is pointing to end+1 of the previous WAL record. + * If we're at a page boundary, no more records can fit on the + * current page. We must skip over the page header on the next + * page, but we can't do that until we've read in the page, + * since the header size is variable. + */ + state->PrevRecPtr = state->ReadRecPtr; + state->ReadRecPtr = state->EndRecPtr; + } - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of the - * struct, so it must be on this page (the records are MAXALIGNed), but we - * cannot access any other fields until we've verified that we got the - * whole header. - */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + state->record_verified = false; + state->readRecordState = XLREAD_NEED_TOT_LEN; + /* fall through */ - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, - randAccess)) - goto err; - gotheader = true; - } - else - { - /* XXX: more validation should be done here */ - if (total_len < SizeOfXLogRecord) + case XLREAD_NEED_TOT_LEN: { - report_invalid_record(state, - "invalid record length at %X/%X: wanted %u, got %u", - (uint32) (RecPtr >> 32), (uint32) RecPtr, - (uint32) SizeOfXLogRecord, total_len); - goto err; + uint32 total_len; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; + XLogPageHeader pageHeader; + + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; + + /* + * Check if we have enough data. For the first record in the page, + * the requesting data length doesn't contain page header. Even in + * the case we tell the fucntion to check counting it. We have at + * least the page header and required part of the first record when + * this function returns true. + */ + if (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + targetRecOff == 0)) + return XLREAD_NEED_DATA; + + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; + + /* examine page header now. */ + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + if (targetRecOff == 0) + { + /* At page start, so skip over page header. */ + state->ReadRecPtr += pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + report_invalid_record(state, "invalid record offset at %X/%X", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + + pageHeader = (XLogPageHeader) state->readBuf; + if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) && + targetRecOff == pageHeaderSize) + { + report_invalid_record(state, "contrecord is requested by %X/%X", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); + + /* + * Read the record length. + * + * NB: Even though we use an XLogRecord pointer here, the whole + * record header might not fit on this page. xl_tot_len is the first + * field of the struct, so it must be on this page (the records are + * MAXALIGNed), but we cannot access any other fields until we've + * verified that we got the whole header. + */ + prec = (XLogRecord *) (state->readBuf + state->ReadRecPtr % XLOG_BLCKSZ); + total_len = prec->xl_tot_len; + + /* + * If the whole record header is on this page, validate it + * immediately. Otherwise do just a basic sanity check on + * xl_tot_len, and validate the rest of the header after reading it + * from the next page. The xl_tot_len check is necessary here to + * ensure that we enter the XLREAD_NEED_CONTINUATION state below; + * otherwise we might fail to apply ValidXLogRecordHeader at all. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; + + state->record_verified = true; + } + else + { + /* XXX: more validation should be done here */ + if (total_len < SizeOfXLogRecord) + { + report_invalid_record(state, + "invalid record length at %X/%X: wanted %u, got %u", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr, + (uint32) SizeOfXLogRecord, total_len); + goto err; + } + } + + /* + * Wait for the rest of the record, or the part of the record that + * fit on the first page if crossed a page boundary, to become + * available. + */ + state->recordGotLen = 0; + state->recordRemainLen = total_len; + state->readRecordState = XLREAD_NEED_FIRST_FRAGMENT; } - gotheader = false; - } + /* fall through */ - len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) - { - /* Need to reassemble record */ - char *contdata; - XLogPageHeader pageHeader; - char *buffer; - uint32 gotlen; - - /* - * Enlarge readRecordBuf as needed. - */ - if (total_len > state->readRecordBufSize && - !allocate_recordbuf(state, total_len)) + case XLREAD_NEED_FIRST_FRAGMENT: { - /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", - total_len, - (uint32) (RecPtr >> 32), (uint32) RecPtr); - goto err; - } + uint32 total_len = state->recordRemainLen; + uint32 request_len; + uint32 record_len; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; - /* Copy the first fragment of the record from the first page. */ - memcpy(state->readRecordBuf, - state->readBuf + RecPtr % XLOG_BLCKSZ, len); - buffer = state->readRecordBuf + len; - gotlen = len; + /* + * Wait for the rest of the record on the first page to become + * available + */ + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; + + request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); + record_len = request_len - targetRecOff; + + /* ReadRecPtr was corrected in the previous state */ + if (XLogNeedData(state, targetPagePtr, request_len, false)) + return XLREAD_NEED_DATA; + + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; + + prec = (XLogRecord *) (state->readBuf + targetRecOff); + + /* validate record header if not yet */ + if (!state->record_verified && record_len >= SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; + + state->record_verified = true; + } + + + if (total_len == record_len) + { + /* Record does not cross a page boundary */ + Assert(state->record_verified); + + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; + + state->record_verified = true; /* to be tidy */ + + /* We already checked the header earlier */ + state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); + + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } + + /* + * The record continues on the next page. Need to reassemble + * record + */ + Assert(total_len > record_len); + + /* Enlarge readRecordBuf as needed. */ + if (total_len > state->readRecordBufSize && + !allocate_recordbuf(state, total_len)) + { + /* We treat this as a "bogus data" condition */ + report_invalid_record(state, + "record length %u at %X/%X too long", + total_len, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + + /* Copy the first fragment of the record from the first page. */ + memcpy(state->readRecordBuf, state->readBuf + targetRecOff, + record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; - do - { /* Calculate pointer to beginning of next page */ - targetPagePtr += XLOG_BLCKSZ; + state->recordContRecPtr = state->ReadRecPtr + record_len; + Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); + + state->readRecordState = XLREAD_NEED_CONTINUATION; + } + /* fall through */ + + case XLREAD_NEED_CONTINUATION: + { + XLogRecPtr targetPagePtr; + char *contdata; + XLogPageHeader pageHeader; + uint32 pageHeaderSize; + uint32 request_len; + uint32 record_len; /* Wait for the next page to become available */ - while (XLogNeedData(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ), - false)) - { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf, - &state->readPageTLI)) - break; - } + targetPagePtr = state->recordContRecPtr; + + /* this request contains page header */ + if (XLogNeedData(state, targetPagePtr, + Min(state->recordRemainLen, XLOG_BLCKSZ), + true)) + return XLREAD_NEED_DATA; if (!state->page_verified) goto err; @@ -414,8 +538,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) { report_invalid_record(state, - "there is no contrecord flag at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); + "there is no contrecord flag at %X/%X reading %X/%X", + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); goto err; } @@ -424,121 +551,115 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) * we expect there to be left. */ if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) + pageHeader->xlp_rem_len != state->recordRemainLen) { report_invalid_record(state, - "invalid contrecord length %u at %X/%X", + "invalid contrecord length %u at %X/%X reading %X/%X, expected %u", pageHeader->xlp_rem_len, - (uint32) (RecPtr >> 32), (uint32) RecPtr); + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr, + state->recordRemainLen); goto err; } /* Append the continuation from this page to the buffer */ pageHeaderSize = XLogPageHeaderSize(pageHeader); - if (state->readLen < pageHeaderSize) - { - while (XLogNeedData(state, targetPagePtr, pageHeaderSize, - false)) - { - if (!state->read_page(state, - state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf, - &state->readPageTLI)) - break; - } - } - + /* + * XLogNeedData should have ensured that the whole page header was + * read + */ Assert(pageHeaderSize <= state->readLen); contdata = (char *) state->readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len < len) - len = pageHeader->xlp_rem_len; + record_len = XLOG_BLCKSZ - pageHeaderSize; + if (pageHeader->xlp_rem_len < record_len) + record_len = pageHeader->xlp_rem_len; - if (state->readLen < pageHeaderSize + len) + request_len = record_len + pageHeaderSize; + + if (state->readLen < request_len) { - if (XLogNeedData(state, targetPagePtr, pageHeaderSize + len, - true)) - { - if (!state->read_page(state, - state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf, - &state->readPageTLI)) - break; - } + /* + * Read the rest of the page containing the record, if we didn't + * get it already. (If we didn't get it yet, we'll read the page + * header again on next invocation. In practice, this should + * happen very rarely, assuming that the caller makes the whole + * page available to us even when we request just a part of + * it. request_len already contains page header. + */ + if (XLogNeedData(state, targetPagePtr, request_len, false)) + return XLREAD_NEED_DATA; + + if (!state->page_verified) + goto err; } - memcpy(buffer, (char *) contdata, len); - buffer += len; - gotlen += len; + memcpy(state->readRecordBuf + state->recordGotLen, + (char *) contdata, record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; /* If we just reassembled the record header, validate it. */ - if (!gotheader) + if (!state->record_verified) { - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, - record, randAccess)) + Assert(state->recordGotLen >= SizeOfXLogRecord); + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, + (XLogRecord *) state->readRecordBuf)) goto err; - gotheader = true; + + state->record_verified = true; } - } while (gotlen < total_len); - Assert(gotheader); + if (state->recordRemainLen > 0) + { + /* Calculate pointer to beginning of next page, and continue */ + state->recordContRecPtr += XLOG_BLCKSZ; + goto again; + } + else + { + prec = (XLogRecord *) state->readRecordBuf; + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + state->EndRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(pageHeader->xlp_rem_len); - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->ReadRecPtr = RecPtr; - state->EndRecPtr = targetPagePtr + pageHeaderSize - + MAXALIGN(pageHeader->xlp_rem_len); - } - else - { - /* Wait for the record data to become available */ - while (XLogNeedData(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ), true)) - { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf, - &state->readPageTLI)) + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; break; + } } - - if (!state->page_verified) - goto err; - - /* Record does not cross a page boundary */ - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; - - state->EndRecPtr = RecPtr + MAXALIGN(total_len); - - state->ReadRecPtr = RecPtr; } /* * Special processing if it's an XLOG SWITCH record */ - if (record->xl_rmid == RM_XLOG_ID && - (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) + if ((*record)->xl_rmid == RM_XLOG_ID && + ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ state->EndRecPtr += state->wal_segment_size - 1; state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size); } - if (DecodeXLogRecord(state, record, errormsg)) - return record; - else - return NULL; + Assert (!*record || state->readLen >= 0); + if (DecodeXLogRecord(state, *record, errormsg)) + return XLREAD_SUCCESS; + + *record = NULL; + return XLREAD_FAIL; err: /* - * Invalidate the read state. We might read from a different source after + * Invalidate the read page. We might read from a different source after * failure. */ XLogReaderDiscardReadingPage(state); @@ -546,7 +667,8 @@ err: if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; - return NULL; + *record = NULL; + return XLREAD_FAIL; } /* @@ -732,11 +854,12 @@ XLogReaderDiscardReadingPage(XLogReaderState *state) * * This is just a convenience subroutine to avoid duplicated code in * XLogReadRecord. It's not intended for use from anywhere else. + * + * If PrevRecPtr is valid, the xl_prev is is cross-checked with it. */ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, - bool randAccess) + XLogRecPtr PrevRecPtr, XLogRecord *record) { if (record->xl_tot_len < SizeOfXLogRecord) { @@ -754,7 +877,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) RecPtr); return false; } - if (randAccess) + if (PrevRecPtr == InvalidXLogRecPtr) { /* * We can't exactly verify the prev-link, but surely it should be less @@ -982,12 +1105,15 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * debugging purposes. */ XLogRecPtr -XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) +XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr, + XLogFindNextRecordCB read_page, void *private) { XLogReaderState saved_state = *state; XLogRecPtr tmpRecPtr; XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; + XLogRecord *record; + XLogReadRecordResult result; char *errormsg; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -1018,11 +1144,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) targetPagePtr = tmpRecPtr - targetRecOff; /* Read the page containing the record */ - while(XLogNeedData(state, targetPagePtr, targetRecOff, true)) + while(XLogNeedData(state, targetPagePtr, targetRecOff, false)) { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf, - &state->readPageTLI)) + if (!read_page(state, private)) break; } @@ -1036,9 +1160,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) /* make sure we have enough data for the page header */ while (XLogNeedData(state, targetPagePtr, pageHeaderSize, false)) { - if (!state->read_page(state, state->readPagePtr, state->readLen, - state->ReadRecPtr, state->readBuf, - &state->readPageTLI)) + if (!read_page(state, private)) break; } @@ -1082,11 +1204,19 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * because either we're at the first record after the beginning of a page * or we just jumped over the remaining data of a continuation. */ - while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL) + while ((result = XLogReadRecord(state, tmpRecPtr, &record, &errormsg)) != + XLREAD_FAIL) { /* continue after the record */ tmpRecPtr = InvalidXLogRecPtr; + if (result == XLREAD_NEED_DATA) + { + if (!read_page(state, private)) + goto err; + continue; + } + /* past the record we've found, break out */ if (RecPtr <= state->ReadRecPtr) { diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index dad9074b9f..d50a0ca187 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -802,8 +802,7 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->readSegNo * - state->wal_segment_size + state->readLen; + const XLogRecPtr lastReadPage = state->readPagePtr; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -818,7 +817,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * current TLI has since become historical. */ if (lastReadPage == wantPage && - state->readLen != 0 && + state->page_verified && lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1)) return; @@ -908,10 +907,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * loop for now. */ bool -read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, - TimeLineID *pageTLI) +read_local_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->readLen; + char *cur_page = state->readBuf; + TimeLineID *pageTLI = &state->readPageTLI; XLogRecPtr read_upto, loc; int count; @@ -1027,6 +1028,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, XLOG_BLCKSZ); /* number of valid bytes in the buffer */ + state->readPagePtr = targetPagePtr; state->readLen = count; return true; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f8b9020081..11e52e4c01 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -124,7 +124,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -173,11 +173,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); + ctx->read_page = read_page; ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = @@ -232,7 +233,7 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -374,7 +375,7 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -482,7 +483,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, startptr, &err); + while (XLogReadRecord(ctx->reader, startptr, &record, &err) == + XLREAD_NEED_DATA) + { + if (!ctx->read_page(ctx)) + break; + } + if (err) elog(ERROR, "%s", err); if (!record) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 7210a940bd..5270f646bd 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -115,11 +115,9 @@ check_permissions(void) } bool -logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) +logical_read_local_xlog_page(LogicalDecodingContext *ctx) { - return read_local_xlog_page(state, targetPagePtr, reqLen, - targetRecPtr, cur_page, pageTLI); + return read_local_xlog_page(ctx->reader); } /* @@ -289,7 +287,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin XLogRecord *record; char *errm = NULL; - record = XLogReadRecord(ctx->reader, startptr, &errm); + while (XLogReadRecord(ctx->reader, startptr, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->read_page(ctx)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 808a6f5b83..fb5c0a702d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -433,7 +433,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, startlsn, &errm); + while (XLogReadRecord(ctx->reader, startlsn, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->read_page(ctx)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cc35e2a04d..a4518f5b55 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -762,9 +762,12 @@ StartReplication(StartReplicationCmd *cmd) * set every time WAL is flushed. */ static bool -logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) +logical_read_xlog_page(LogicalDecodingContext *ctx) { + XLogReaderState *state = ctx->reader; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->readLen; + char *cur_page = state->readBuf; XLogRecPtr flushptr; int count; @@ -2827,7 +2830,12 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); + while (XLogReadRecord(logical_decoding_ctx->reader, + logical_startptr, &record, &errm) == XLREAD_NEED_DATA) + { + if (!logical_decoding_ctx->read_page(logical_decoding_ctx)) + break; + } logical_startptr = InvalidXLogRecPtr; /* xlog record was invalid */ diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 4df53964e4..ff26b30f82 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -41,16 +41,8 @@ static int xlogreadfd = -1; static XLogSegNo xlogreadsegno = -1; static char xlogfpath[MAXPGPATH]; -typedef struct XLogPageReadPrivate -{ - const char *datadir; - int tliIndex; -} XLogPageReadPrivate; - -static bool SimpleXLogPageRead(XLogReaderState *xlogreader, - XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *pageTLI); +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, + const char *datadir, int *tliIndex); /* * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline @@ -64,24 +56,26 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; - private.datadir = datadir; - private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead, - &private); + xlogreader = XLogReaderAllocate(WalSegSz); if (xlogreader == NULL) pg_fatal("out of memory"); do { - record = XLogReadRecord(xlogreader, startpoint, &errormsg); + while (XLogReadRecord(xlogreader, startpoint, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex)) + break; + } if (record == NULL) { - XLogRecPtr errptr; + XLogRecPtr errptr = xlogreader->EndRecPtr; - errptr = startpoint ? startpoint : xlogreader->EndRecPtr; + if (startpoint) + errptr = startpoint; if (errormsg) pg_fatal("could not read WAL record at %X/%X: %s", @@ -116,17 +110,18 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; XLogRecPtr endptr; - private.datadir = datadir; - private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead, - &private); + xlogreader = XLogReaderAllocate(WalSegSz); if (xlogreader == NULL) pg_fatal("out of memory"); - record = XLogReadRecord(xlogreader, ptr, &errormsg); + while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex)) + break; + } if (record == NULL) { if (errormsg) @@ -161,7 +156,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogRecPtr searchptr; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; /* * The given fork pointer points to the end of the last common record, @@ -177,10 +171,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, forkptr += SizeOfXLogShortPHD; } - private.datadir = datadir; - private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead, - &private); + xlogreader = XLogReaderAllocate(WalSegSz); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -189,7 +180,12 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, { uint8 info; - record = XLogReadRecord(xlogreader, searchptr, &errormsg); + while (XLogReadRecord(xlogreader, searchptr, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex)) + break; + } if (record == NULL) { @@ -236,11 +232,12 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, /* XLogReader callback function, to read a WAL page */ static bool -SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *pageTLI) +SimpleXLogPageRead(XLogReaderState *xlogreader, + const char*datadir, int *tliIndex) { - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; + XLogRecPtr targetPagePtr = xlogreader->readPagePtr; + char *readBuf = xlogreader->readBuf; + TimeLineID *pageTLI = &xlogreader->readPageTLI; uint32 targetPageOff; XLogRecPtr targetSegEnd; XLogSegNo targetSegNo; @@ -273,17 +270,17 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, * be done both forward and backward, consider also switching timeline * accordingly. */ - while (private->tliIndex < targetNentries - 1 && - targetHistory[private->tliIndex].end < targetSegEnd) - private->tliIndex++; - while (private->tliIndex > 0 && - targetHistory[private->tliIndex].begin >= targetSegEnd) - private->tliIndex--; + while (*tliIndex < targetNentries - 1 && + targetHistory[*tliIndex].end < targetSegEnd) + (*tliIndex)++; + while (*tliIndex > 0 && + targetHistory[*tliIndex].begin >= targetSegEnd) + (*tliIndex)--; - XLogFileName(xlogfname, targetHistory[private->tliIndex].tli, + XLogFileName(xlogfname, targetHistory[*tliIndex].tli, xlogreadsegno, WalSegSz); - snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname); + snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", datadir, xlogfname); xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0); @@ -324,7 +321,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, Assert(targetSegNo == xlogreadsegno); - *pageTLI = targetHistory[private->tliIndex].tli; + *pageTLI = targetHistory[*tliIndex].tli; xlogreader->readLen = XLOG_BLCKSZ; return true; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 96d1f36ebc..56e2f8b0b0 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -422,10 +422,12 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, * XLogReader read_page callback */ static bool -XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI) +XLogDumpReadPage(XLogReaderState *state, void *priv) { - XLogDumpPrivate *private = state->private_data; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->readLen; + char *readBuff = state->readBuf; + XLogDumpPrivate *private = (XLogDumpPrivate *) priv; int count = XLOG_BLCKSZ; if (private->endptr != InvalidXLogRecPtr) @@ -445,6 +447,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr, readBuff, count); + Assert(count >= state->readLen); state->readLen = count; return true; } @@ -1102,13 +1105,13 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage, - &private); + xlogreader_state = XLogReaderAllocate(WalSegSz); if (!xlogreader_state) fatal_error("out of memory"); /* first find a valid recptr to start from */ - first_record = XLogFindNextRecord(xlogreader_state, private.startptr); + first_record = XLogFindNextRecord(xlogreader_state, private.startptr, + &XLogDumpReadPage, (void*) &private); if (first_record == InvalidXLogRecPtr) fatal_error("could not find a valid record after %X/%X", @@ -1131,8 +1134,17 @@ main(int argc, char **argv) for (;;) { + record = NULL; + /* try to read the next record */ - record = XLogReadRecord(xlogreader_state, first_record, &errormsg); + while (XLogReadRecord(xlogreader_state, + first_record, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!XLogDumpReadPage(xlogreader_state, (void *) &private)) + break; + } + if (!record) { if (!config.follow || private.endptr_reached) diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 030f56802b..c2b55ce743 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -33,14 +33,6 @@ typedef struct XLogReaderState XLogReaderState; -/* Function type definition for the read_page callback */ -typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader, - XLogRecPtr targetPagePtr, - int reqLen, - XLogRecPtr targetRecPtr, - char *readBuf, - TimeLineID *pageTLI); - typedef struct { /* Is this block ref in use? */ @@ -70,6 +62,29 @@ typedef struct uint16 data_bufsz; } DecodedBkpBlock; +/* Return code from XLogReadRecord */ +typedef enum XLogReadRecordResult +{ + XLREAD_SUCCESS, /* record is successfully read */ + XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */ + XLREAD_FAIL /* failed during reading a record */ +} XLogReadRecordResult; + +/* + * internal state of XLogReadRecord + * + * XLogReadState runs a state machine while reading a record. Theses states + * are not seen outside the function. Each state may repeat several times + * exiting requesting caller for new data. See the comment of XLogReadRecrod + * for details. + */ +typedef enum XLogReadRecordState { + XLREAD_NEXT_RECORD, + XLREAD_NEED_TOT_LEN, + XLREAD_NEED_FIRST_FRAGMENT, + XLREAD_NEED_CONTINUATION +} XLogReadRecordState; + struct XLogReaderState { /* ---------------------------------------- @@ -82,46 +97,19 @@ struct XLogReaderState */ int wal_segment_size; - /* - * Data input callback (mandatory). - * - * This callback shall read at least reqLen valid bytes of the xlog page - * starting at targetPagePtr, and store them in readBuf. The callback - * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. - * - * targetRecPtr is the position of the WAL record we're reading. Usually - * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs - * to read and verify the page or segment header, before it reads the - * actual WAL record it's interested in. In that case, targetRecPtr can - * be used to determine which timeline to read the page from. - * - * The callback shall set *pageTLI to the TLI of the file the page was - * read from. It is currently used only for error reporting purposes, to - * reconstruct the name of the WAL file where an error occurred. - */ - XLogPageReadCB read_page; - /* * System identifier of the xlog files we're about to read. Set to zero * (the default value) if unknown or unimportant. */ uint64 system_identifier; - /* - * Opaque data for callbacks to use. Not used by XLogReader. - */ - void *private_data; - /* * Start and end point of last record read. EndRecPtr is also used as the * position to read next, if XLogReadRecord receives an invalid recptr. */ - XLogRecPtr ReadRecPtr; /* start of last record read */ + XLogRecPtr ReadRecPtr; /* start of last record read or being read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ + XLogRecPtr PrevRecPtr; /* start of previous record read */ /* ---------------------------------------- * Communication with page reader @@ -163,6 +151,7 @@ struct XLogReaderState /* last read segment and segment offset for data currently in readBuf */ bool page_verified; + bool record_verified; XLogSegNo readSegNo; /* @@ -172,8 +161,6 @@ struct XLogReaderState XLogRecPtr latestPagePtr; TimeLineID latestPageTLI; - /* beginning of the WAL record being read. */ - XLogRecPtr currRecPtr; /* timeline to read it from, 0 if a lookup is required */ TimeLineID currTLI; @@ -200,21 +187,30 @@ struct XLogReaderState char *readRecordBuf; uint32 readRecordBufSize; + /* + * XLogReadRecord() state + */ + XLogReadRecordState readRecordState;/* state machine state */ + int recordGotLen; /* amount of current record that has + * already been read */ + int recordRemainLen; /* length of current record that remains */ + XLogRecPtr recordContRecPtr; /* where the current record continues */ + /* Buffer to hold error message */ char *errormsg_buf; }; /* Get a new XLogReader */ -extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, - XLogPageReadCB pagereadfunc, - void *private_data); +extern XLogReaderState *XLogReaderAllocate(int wal_segment_size); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ -extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - XLogRecPtr recptr, char **errormsg); +extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state, + XLogRecPtr recptr, + XLogRecord **record, + char **errormsg); /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, @@ -224,7 +220,11 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, extern void XLogReaderDiscardReadingPage(XLogReaderState *state); #ifdef FRONTEND -extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); +/* Function type definition for the read_page callback */ +typedef bool (*XLogFindNextRecordCB) (XLogReaderState *xlogreader, + void *private); +extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr, + XLogFindNextRecordCB read_page, void *private); #endif /* FRONTEND */ /* Functions for decoding an XLogRecord */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 0842af9f95..55a9b6237a 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,10 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); -extern bool read_local_xlog_page(XLogReaderState *state, - XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page, - TimeLineID *pageTLI); +extern bool read_local_xlog_page(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 31c796b765..482d3d311c 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -30,6 +30,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC TransactionId xid ); +typedef struct LogicalDecodingContext LogicalDecodingContext; + +typedef bool (*LogicalDecodingXLogReadPageCB)(LogicalDecodingContext *ctx); + typedef struct LogicalDecodingContext { /* memory context this is all allocated in */ @@ -40,6 +44,7 @@ typedef struct LogicalDecodingContext /* infrastructure pieces for decoding */ XLogReaderState *reader; + LogicalDecodingXLogReadPageCB read_page; struct ReorderBuffer *reorder; struct SnapBuild *snapshot_builder; @@ -96,14 +101,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + LogicalDecodingXLogReadPageCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index 8e52b1f4aa..25fa68d5b9 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -11,9 +11,6 @@ #include "replication/logical.h" -extern bool logical_read_local_xlog_page(XLogReaderState *state, - XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, - char *cur_page, TimeLineID *pageTLI); +extern bool logical_read_local_xlog_page(LogicalDecodingContext *ctx); #endif -- 2.16.3 From c3956235807a343e0ef3bf3e4ed4c2ae8ef882e4 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Thu, 5 Sep 2019 21:29:32 +0900 Subject: [PATCH v5 3/3] Change policy of XLog read-buffer allocation Page buffer in XLogReaderState was allocated by XLogReaderAllcoate but actually it'd be the responsibility to the callers of XLogReadRecord, which now actually reads in pages. This patch does that. --- src/backend/access/transam/twophase.c | 2 ++ src/backend/access/transam/xlog.c | 2 ++ src/backend/access/transam/xlogreader.c | 18 ------------------ src/backend/replication/logical/logical.c | 2 ++ src/bin/pg_rewind/parsexlog.c | 6 ++++++ src/bin/pg_waldump/pg_waldump.c | 2 ++ 6 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 929da4eef2..9ec88b35ef 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1392,6 +1392,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), errdetail("Failed while allocating a WAL reading processor."))); + xlogreader->readBuf = palloc(XLOG_BLCKSZ); while (XLogReadRecord(xlogreader, lsn, &record, &errormsg) == XLREAD_NEED_DATA) @@ -1421,6 +1422,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader)); memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader)); + pfree(xlogreader->readBuf); XLogReaderFree(xlogreader); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 96e2115aad..425f7a12bd 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6351,6 +6351,7 @@ StartupXLOG(void) (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), errdetail("Failed while allocating a WAL reading processor."))); + xlogreader->readBuf = palloc(XLOG_BLCKSZ); xlogreader->system_identifier = ControlFile->system_identifier; /* @@ -7722,6 +7723,7 @@ StartupXLOG(void) close(readFile); readFile = -1; } + pfree(xlogreader->readBuf); XLogReaderFree(xlogreader); /* diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 66bd9eb8d7..26f6834b8f 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -79,27 +79,11 @@ XLogReaderAllocate(int wal_segment_size) state->max_block_id = -1; - /* - * Permanently allocate readBuf. We do it this way, rather than just - * making a static array, for two reasons: (1) no need to waste the - * storage in most instantiations of the backend; (2) a static char array - * isn't guaranteed to have any particular alignment, whereas - * palloc_extended() will provide MAXALIGN'd storage. - */ - state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ, - MCXT_ALLOC_NO_OOM); - if (!state->readBuf) - { - pfree(state); - return NULL; - } - state->wal_segment_size = wal_segment_size; state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) { - pfree(state->readBuf); pfree(state); return NULL; } @@ -112,7 +96,6 @@ XLogReaderAllocate(int wal_segment_size) if (!allocate_recordbuf(state, 0)) { pfree(state->errormsg_buf); - pfree(state->readBuf); pfree(state); return NULL; } @@ -136,7 +119,6 @@ XLogReaderFree(XLogReaderState *state) pfree(state->errormsg_buf); if (state->readRecordBuf) pfree(state->readRecordBuf); - pfree(state->readBuf); pfree(state); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 11e52e4c01..ea027caa69 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -178,6 +178,7 @@ StartupDecodingContext(List *output_plugin_options, ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); + ctx->reader->readBuf = palloc(XLOG_BLCKSZ); ctx->read_page = read_page; ctx->reorder = ReorderBufferAllocate(); @@ -523,6 +524,7 @@ FreeDecodingContext(LogicalDecodingContext *ctx) ReorderBufferFree(ctx->reorder); FreeSnapshotBuilder(ctx->snapshot_builder); + pfree(ctx->reader->readBuf); XLogReaderFree(ctx->reader); MemoryContextDelete(ctx->context); } diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index ff26b30f82..c60267e87e 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -60,6 +60,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, xlogreader = XLogReaderAllocate(WalSegSz); if (xlogreader == NULL) pg_fatal("out of memory"); + xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ); do { @@ -92,6 +93,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, } while (xlogreader->ReadRecPtr != endpoint); + pg_free(xlogreader->readBuf); XLogReaderFree(xlogreader); if (xlogreadfd != -1) { @@ -115,6 +117,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) xlogreader = XLogReaderAllocate(WalSegSz); if (xlogreader == NULL) pg_fatal("out of memory"); + xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ); while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) == XLREAD_NEED_DATA) @@ -133,6 +136,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) } endptr = xlogreader->EndRecPtr; + pg_free(xlogreader->readBuf); XLogReaderFree(xlogreader); if (xlogreadfd != -1) { @@ -174,6 +178,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, xlogreader = XLogReaderAllocate(WalSegSz); if (xlogreader == NULL) pg_fatal("out of memory"); + xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ); searchptr = forkptr; for (;;) @@ -222,6 +227,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, searchptr = record->xl_prev; } + pg_free(xlogreader->readBuf); XLogReaderFree(xlogreader); if (xlogreadfd != -1) { diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 56e2f8b0b0..bdf3c81b03 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -1108,6 +1108,7 @@ main(int argc, char **argv) xlogreader_state = XLogReaderAllocate(WalSegSz); if (!xlogreader_state) fatal_error("out of memory"); + xlogreader_state->readBuf = palloc(XLOG_BLCKSZ); /* first find a valid recptr to start from */ first_record = XLogFindNextRecord(xlogreader_state, private.startptr, @@ -1190,6 +1191,7 @@ main(int argc, char **argv) (uint32) xlogreader_state->ReadRecPtr, errormsg); + pfree(xlogreader_state->readBuf); XLogReaderFree(xlogreader_state); return EXIT_SUCCESS; -- 2.16.3
pgsql-hackers by date: