Re: Remove page-read callback from XLogReaderState. - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Remove page-read callback from XLogReaderState.
Date
Msg-id 20211007.172820.1874635561738958207.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Remove page-read callback from XLogReaderState.  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
List pgsql-hackers
At Thu, 30 Sep 2021 09:40:06 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> At Mon, 27 Sep 2021 17:31:03 +1300, Thomas Munro <thomas.munro@gmail.com> wrote in 
> > On Thu, Jul 15, 2021 at 4:48 PM Kyotaro Horiguchi
> > <horikyota.ntt@gmail.com> wrote:
> > > Gah... Thank you for noticing me.  I thought that I have sent the
> > > rebased version. This is the rebased version on the current master.
> > 
> > Hi Kyotaro,
> > 
> > Did you see this?
> > 
> > https://www.postgresql.org/message-id/20210429022553.4h5qii5jb5eclu4i%40alap3.anarazel.de
> 
> Thank you for pinging me. I haven't noticed of that.
> I'll check on that line.

It looks like the XLogFindNextRecord was not finished. It should have
been turned into a state machine.

In this version (v18),

This contains only page-reader refactoring stuff.

- Rebased to the current master, including additional change for
  XLOG_OVERWRITE_CONTRECORD stuff. (This needed the new function
  XLogTerminateRead.)

- Finished XLogFindNextRecord including the fixup from Thomas' v17.

- Added a test for XLogFindNextRecord, on the behavior that
  page-skipping on seeking for the first record.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 2da06a9de78e4f2125bcc889d12e7e3ffdf1d4d4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Thu, 30 Sep 2021 11:48:40 +0900
Subject: [PATCH v18 1/5] Move callback-call from ReadPageInternal to
 XLogReadRecord.

The current WAL record reader reads page data using a call back
function.  Redesign the interface so that it asks the caller for more
data when required.  This model works better for proposed projects that
encryption, prefetching and other new features that would require
extending the callback interface for each case.

As the first step of that change, this patch moves the page reader
function out of ReadPageInternal(), then the remaining tasks of the
function are taken over by the new function XLogNeedData().

Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
Author: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Antonin Houska <ah@cybertec.at>
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Takashi Menjo <takashi.menjo@gmail.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
---
 src/backend/access/transam/xlog.c       |  16 +-
 src/backend/access/transam/xlogreader.c | 325 +++++++++++++++---------
 src/backend/access/transam/xlogutils.c  |  12 +-
 src/backend/replication/walsender.c     |  10 +-
 src/bin/pg_rewind/parsexlog.c           |  21 +-
 src/bin/pg_waldump/pg_waldump.c         |   8 +-
 src/include/access/xlogreader.h         |  31 ++-
 src/include/access/xlogutils.h          |   2 +-
 8 files changed, 266 insertions(+), 159 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 26dcc00ac0..1557ceb8c1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -918,7 +918,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int    XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                          XLogSource source, bool notfoundOk);
 static int    XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
-static int    XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+static bool    XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
                          int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                         bool fetching_ckpt, XLogRecPtr tliRecPtr);
@@ -4401,7 +4401,6 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
     XLogRecord *record;
     XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 
-    /* Pass through parameters to XLogPageRead */
     private->fetching_ckpt = fetching_ckpt;
     private->emode = emode;
     private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
@@ -12300,7 +12299,7 @@ CancelBackup(void)
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
  */
-static int
+static bool
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
              XLogRecPtr targetRecPtr, char *readBuf)
 {
@@ -12359,7 +12358,8 @@ retry:
             readLen = 0;
             readSource = XLOG_FROM_ANY;
 
-            return -1;
+            xlogreader->readLen = -1;
+            return false;
         }
     }
 
@@ -12469,7 +12469,8 @@ retry:
         goto next_record_is_invalid;
     }
 
-    return readLen;
+    xlogreader->readLen = readLen;
+    return true;
 
 next_record_is_invalid:
     lastSourceFailed = true;
@@ -12483,8 +12484,9 @@ next_record_is_invalid:
     /* In standby-mode, keep trying */
     if (StandbyMode)
         goto retry;
-    else
-        return -1;
+
+    xlogreader->readLen = -1;
+    return false;
 }
 
 /*
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 4b03577dcc..1d9976ecf4 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -39,8 +39,8 @@
 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
             pg_attribute_printf(2, 3);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
-static int    ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
-                             int reqLen);
+static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
+                         int reqLen, bool header_inclusive);
 static void XLogReaderInvalReadState(XLogReaderState *state);
 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
                                   XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
@@ -264,8 +264,48 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * If the reading fails for some other reason, NULL is also returned, and
  * *errormsg is set to a string with details of the failure.
  *
+ * Returns XLREAD_NEED_DATA if more data is needed to finish reading the
+ * current record.  In that case, state->readPagePtr and state->readLen inform
+ * the desired position and minimum length of data needed. The caller shall
+ * read in the requested data and set state->readBuf to point to a buffer
+ * containing it. The caller must also set state->seg->ws_tli and
+ * state->readLen to indicate the timeline that it was read from, and the
+ * length of data that is now available (which must be >= given readLen),
+ * respectively.
+ *
+ * If invalid data is encountered, returns XLREAD_FAIL with *record being set to
+ * NULL. *errormsg is set to a string with details of the failure.
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
+ *
+ *
+ * This function runs a state machine consists of the following states.
+ *
+ * XLREAD_NEXT_RECORD :
+ *    The initial state, if called with valid RecPtr, try to read a record at
+ *    that position.  If invalid RecPtr is given try to read a record just after
+ *    the last one previously read.
+ *    This state ens after setting ReadRecPtr. Then goes to XLREAD_TOT_LEN.
+ *
+ * XLREAD_TOT_LEN:
+ *    Examining record header. Ends after reading record total
+ *    length. recordRemainLen and recordGotLen are initialized.
+ *
+ * XLREAD_FIRST_FRAGMENT:
+ *    Reading the first fragment. Ends with finishing reading a single
+ *    record. Goes to XLREAD_NEXT_RECORD if that's all or
+ *    XLREAD_CONTINUATION if we have continuation.
+
+ * XLREAD_CONTINUATION:
+ *    Reading continuation of record. Ends with finishing the whole record then
+ *    goes to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates
+ *    how much is left and readRecordBuf holds the partially assert
+ *    record.recordContRecPtr points to the beginning of the next page where to
+ *    continue.
+ *
+ * If wrong data found in any state, the state machine stays at the current
+ * state. This behavior allows to continue reading a reacord switching among
+ * different souces, while streaming replication.
  */
 XLogRecord *
 XLogReadRecord(XLogReaderState *state, char **errormsg)
@@ -280,7 +320,6 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
     uint32        pageHeaderSize;
     bool        assembled;
     bool        gotheader;
-    int            readOff;
 
     /*
      * randAccess indicates whether to verify the previous-record pointer of
@@ -334,14 +373,20 @@ restart:
      * byte to cover the whole record header, or at least the part of it that
      * fits on the same page.
      */
-    readOff = ReadPageInternal(state, targetPagePtr,
-                               Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
-    if (readOff < 0)
+    while (XLogNeedData(state, targetPagePtr,
+                        Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
+                        targetRecOff != 0))
+    {
+        if (!state->routine.page_read(state, state->readPagePtr, state->readLen,
+                                      RecPtr, state->readBuf))
+            break;
+    }
+
+    if (!state->page_verified)
         goto err;
 
     /*
-     * ReadPageInternal always returns at least the page header, so we can
-     * examine it now.
+     * We have at least the page header, so we can examine it now.
      */
     pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
     if (targetRecOff == 0)
@@ -367,8 +412,8 @@ restart:
         goto err;
     }
 
-    /* ReadPageInternal has verified the page header */
-    Assert(pageHeaderSize <= readOff);
+    /* XLogNeedData has verified the page header */
+    Assert(pageHeaderSize <= state->readLen);
 
     /*
      * Read the record length.
@@ -442,18 +487,27 @@ restart:
 
         do
         {
+            int            rest_len = total_len - gotlen;
+
             /* Calculate pointer to beginning of next page */
             targetPagePtr += XLOG_BLCKSZ;
 
             /* Wait for the next page to become available */
-            readOff = ReadPageInternal(state, targetPagePtr,
-                                       Min(total_len - gotlen + SizeOfXLogShortPHD,
-                                           XLOG_BLCKSZ));
+            while (XLogNeedData(state, targetPagePtr,
+                                Min(rest_len, XLOG_BLCKSZ),
+                                false))
+            {
+                if (!state->routine.page_read(state, state->readPagePtr,
+                                              state->readLen,
+                                              state->ReadRecPtr,
+                                              state->readBuf))
+                    break;
+            }
 
-            if (readOff < 0)
+            if (!state->page_verified)
                 goto err;
 
-            Assert(SizeOfXLogShortPHD <= readOff);
+            Assert(SizeOfXLogShortPHD <= state->readLen);
 
             pageHeader = (XLogPageHeader) state->readBuf;
 
@@ -500,21 +554,14 @@ restart:
             /* Append the continuation from this page to the buffer */
             pageHeaderSize = XLogPageHeaderSize(pageHeader);
 
-            if (readOff < pageHeaderSize)
-                readOff = ReadPageInternal(state, targetPagePtr,
-                                           pageHeaderSize);
-
-            Assert(pageHeaderSize <= readOff);
+            Assert(pageHeaderSize <= state->readLen);
 
             contdata = (char *) state->readBuf + pageHeaderSize;
             len = XLOG_BLCKSZ - pageHeaderSize;
             if (pageHeader->xlp_rem_len < len)
                 len = pageHeader->xlp_rem_len;
 
-            if (readOff < pageHeaderSize + len)
-                readOff = ReadPageInternal(state, targetPagePtr,
-                                           pageHeaderSize + len);
-
+            Assert(pageHeaderSize + len <= state->readLen);
             memcpy(buffer, (char *) contdata, len);
             buffer += len;
             gotlen += len;
@@ -544,9 +591,16 @@ restart:
     else
     {
         /* Wait for the record data to become available */
-        readOff = ReadPageInternal(state, targetPagePtr,
-                                   Min(targetRecOff + total_len, XLOG_BLCKSZ));
-        if (readOff < 0)
+        while (XLogNeedData(state, targetPagePtr,
+                            Min(targetRecOff + total_len, XLOG_BLCKSZ), true))
+        {
+            if (!state->routine.page_read(state, state->readPagePtr,
+                                          state->readLen,
+                                          state->ReadRecPtr, state->readBuf))
+                break;
+        }
+
+        if (!state->page_verified)
             goto err;
 
         /* Record does not cross a page boundary */
@@ -603,109 +657,138 @@ err:
 }
 
 /*
- * Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the page_read() callback.
+ * Checks that an xlog page loaded in state->readBuf is including at least
+ * [pageptr, reqLen] and the page is valid. header_inclusive indicates that
+ * reqLen is calculated including page header length.
  *
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the page_read callback).
+ * Returns false if the buffer already contains the requested data, or found
+ * error. state->page_verified is set to true for the former and false for the
+ * latter.
  *
- * We fetch the page from a reader-local cache if we know we have the required
- * data and if there hasn't been any error since caching the data.
+ * Otherwise returns true and requests data loaded onto state->readBuf by
+ * state->readPagePtr and state->readLen. The caller shall call this function
+ * again after filling the buffer at least with that portion of data and set
+ * state->readLen to the length of actually loaded data.
+ *
+ * If header_inclusive is false, corrects reqLen internally by adding the
+ * actual page header length and may request caller for new data.
  */
-static int
-ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
+static bool
+XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
+             bool header_inclusive)
 {
-    int            readLen;
     uint32        targetPageOff;
     XLogSegNo    targetSegNo;
-    XLogPageHeader hdr;
+    uint32        addLen = 0;
 
-    Assert((pageptr % XLOG_BLCKSZ) == 0);
+    /* Some data is loaded, but page header is not verified yet. */
+    if (!state->page_verified &&
+        !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0)
+    {
+        uint32        pageHeaderSize;
 
+        /* just loaded new data so needs to verify page header */
+
+        /* The caller must have loaded at least page header */
+        Assert(state->readLen >= SizeOfXLogShortPHD);
+
+        /*
+         * We have enough data to check the header length. Recheck the loaded
+         * length against the actual header length.
+         */
+        pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+
+        /* Request more data if we don't have the full header. */
+        if (state->readLen < pageHeaderSize)
+        {
+            state->readLen = pageHeaderSize;
+            return true;
+        }
+
+        /* Now that we know we have the full header, validate it. */
+        if (!XLogReaderValidatePageHeader(state, state->readPagePtr,
+                                          (char *) state->readBuf))
+        {
+            /* That's bad. Force reading the page again. */
+            XLogReaderInvalReadState(state);
+
+            return false;
+        }
+
+        state->page_verified = true;
+
+        XLByteToSeg(state->readPagePtr, state->seg.ws_segno,
+                    state->segcxt.ws_segsize);
+    }
+
+    /*
+     * The loaded page may not be the one caller is supposing to read when we
+     * are verifying the first page of new segment. In that case, skip further
+     * verification and immediately load the target page.
+     */
+    if (state->page_verified && pageptr == state->readPagePtr)
+    {
+        /*
+         * calculate additional length for page header keeping the total
+         * length within the block size.
+         */
+        if (!header_inclusive)
+        {
+            uint32        pageHeaderSize =
+            XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+
+            addLen = pageHeaderSize;
+            if (reqLen + pageHeaderSize <= XLOG_BLCKSZ)
+                addLen = pageHeaderSize;
+            else
+                addLen = XLOG_BLCKSZ - reqLen;
+
+            Assert(addLen >= 0);
+        }
+
+        /* Return if we already have it. */
+        if (reqLen + addLen <= state->readLen)
+            return false;
+    }
+
+    /* Data is not in our buffer, request the caller for it. */
     XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
     targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
+    Assert((pageptr % XLOG_BLCKSZ) == 0);
 
-    /* check whether we have all the requested data already */
-    if (targetSegNo == state->seg.ws_segno &&
-        targetPageOff == state->segoff && reqLen <= state->readLen)
-        return state->readLen;
+    /*
+     * Every time we request to load new data of a page to the caller, even if
+     * we looked at a part of it before, we need to do verification on the
+     * next invocation as the caller might now be rereading data from a
+     * different source.
+     */
+    state->page_verified = false;
 
     /*
-     * Data is not in our buffer.
-     *
-     * Every time we actually read the segment, even if we looked at parts of
-     * it before, we need to do verification as the page_read callback might
-     * now be rereading data from a different source.
-     *
      * Whenever switching to a new WAL segment, we read the first page of the
      * file and validate its header, even if that's not where the target
      * record is.  This is so that we can check the additional identification
-     * info that is present in the first page's "long" header.
+     * info that is present in the first page's "long" header. Don't do this
+     * if the caller requested the first page in the segment.
      */
     if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
     {
-        XLogRecPtr    targetSegmentPtr = pageptr - targetPageOff;
-
-        readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
-                                           state->currRecPtr,
-                                           state->readBuf);
-        if (readLen < 0)
-            goto err;
-
-        /* we can be sure to have enough WAL available, we scrolled back */
-        Assert(readLen == XLOG_BLCKSZ);
-
-        if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
-                                          state->readBuf))
-            goto err;
+        /*
+         * Then we'll see that the targetSegNo now matches the ws_segno, and
+         * will not come back here, but will request the actual target page.
+         */
+        state->readPagePtr = pageptr - targetPageOff;
+        state->readLen = XLOG_BLCKSZ;
+        return true;
     }
 
     /*
-     * First, read the requested data length, but at least a short page header
-     * so that we can validate it.
+     * Request the caller to load the page. We need at least a short page
+     * header so that we can validate it.
      */
-    readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
-                                       state->currRecPtr,
-                                       state->readBuf);
-    if (readLen < 0)
-        goto err;
-
-    Assert(readLen <= XLOG_BLCKSZ);
-
-    /* Do we have enough data to check the header length? */
-    if (readLen <= SizeOfXLogShortPHD)
-        goto err;
-
-    Assert(readLen >= reqLen);
-
-    hdr = (XLogPageHeader) state->readBuf;
-
-    /* still not enough */
-    if (readLen < XLogPageHeaderSize(hdr))
-    {
-        readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
-                                           state->currRecPtr,
-                                           state->readBuf);
-        if (readLen < 0)
-            goto err;
-    }
-
-    /*
-     * Now that we know we have the full header, validate it.
-     */
-    if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
-        goto err;
-
-    /* update read state information */
-    state->seg.ws_segno = targetSegNo;
-    state->segoff = targetPageOff;
-    state->readLen = readLen;
-
-    return readLen;
-
-err:
-    XLogReaderInvalReadState(state);
-    return -1;
+    state->readPagePtr = pageptr;
+    state->readLen = Max(reqLen + addLen, SizeOfXLogShortPHD);
+    return true;
 }
 
 /*
@@ -714,9 +797,7 @@ err:
 static void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
-    state->seg.ws_segno = 0;
-    state->segoff = 0;
-    state->readLen = 0;
+    state->readPagePtr = InvalidXLogRecPtr;
 }
 
 /*
@@ -994,7 +1075,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
         XLogRecPtr    targetPagePtr;
         int            targetRecOff;
         uint32        pageHeaderSize;
-        int            readLen;
 
         /*
          * Compute targetRecOff. It should typically be equal or greater than
@@ -1002,27 +1082,32 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
          * that, except when caller has explicitly specified the offset that
          * falls somewhere there or when we are skipping multi-page
          * continuation record. It doesn't matter though because
-         * ReadPageInternal() is prepared to handle that and will read at
-         * least short page-header worth of data
+         * XLogNeedData() is prepared to handle that and will read at least
+         * short page-header worth of data
          */
         targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
 
         /* scroll back to page boundary */
         targetPagePtr = tmpRecPtr - targetRecOff;
 
-        /* Read the page containing the record */
-        readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
-        if (readLen < 0)
+        while (XLogNeedData(state, targetPagePtr, targetRecOff,
+                            targetRecOff != 0))
+        {
+            if (!state->routine.page_read(state, state->readPagePtr,
+                                          state->readLen,
+                                          state->ReadRecPtr, state->readBuf))
+                break;
+        }
+
+        if (!state->page_verified)
             goto err;
 
         header = (XLogPageHeader) state->readBuf;
 
         pageHeaderSize = XLogPageHeaderSize(header);
 
-        /* make sure we have enough data for the page header */
-        readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
-        if (readLen < 0)
-            goto err;
+        /* we should have read the page header */
+        Assert(state->readLen >= pageHeaderSize);
 
         /* skip over potential continuation data */
         if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 88a1bfd939..421040f391 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -706,8 +706,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-    const XLogRecPtr lastReadPage = (state->seg.ws_segno *
-                                     state->segcxt.ws_segsize + state->segoff);
+    const XLogRecPtr lastReadPage = state->seg.ws_segno *
+    state->segcxt.ws_segsize + state->readLen;
 
     Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
     Assert(wantLength <= XLOG_BLCKSZ);
@@ -844,7 +844,7 @@ wal_segment_close(XLogReaderState *state)
  * exists for normal backends, so we have to do a check/sleep/repeat style of
  * loop for now.
  */
-int
+bool
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
                      int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 {
@@ -946,7 +946,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
     else if (targetPagePtr + reqLen > read_upto)
     {
         /* not enough data there */
-        return -1;
+        state->readLen = -1;
+        return false;
     }
     else
     {
@@ -964,7 +965,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
         WALReadRaiseError(&errinfo);
 
     /* number of valid bytes in the buffer */
-    return count;
+    state->readLen = count;
+    return true;
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b811a5c0ef..ddba340653 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -802,7 +802,7 @@ StartReplication(StartReplicationCmd *cmd)
  * which has to do a plain sleep/busy loop, because the walsender's latch gets
  * set every time WAL is flushed.
  */
-static int
+static bool
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                        XLogRecPtr targetRecPtr, char *cur_page)
 {
@@ -822,7 +822,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 
     /* fail if not (implies we are going to shut down) */
     if (flushptr < targetPagePtr + reqLen)
-        return -1;
+    {
+        state->readLen = -1;
+        return false;
+    }
 
     if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
         count = XLOG_BLCKSZ;    /* more than one block available */
@@ -850,7 +853,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
     XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
     CheckXLogRemoved(segno, state->seg.ws_tli);
 
-    return count;
+    state->readLen = count;
+    return true;
 }
 
 /*
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 59ebac7d6a..cf119848b0 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -47,7 +47,7 @@ typedef struct XLogPageReadPrivate
     int            tliIndex;
 } XLogPageReadPrivate;
 
-static int    SimpleXLogPageRead(XLogReaderState *xlogreader,
+static bool    SimpleXLogPageRead(XLogReaderState *xlogreader,
                                XLogRecPtr targetPagePtr,
                                int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 
@@ -246,7 +246,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 }
 
 /* XLogReader callback function, to read a WAL page */
-static int
+static bool
 SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
                    int reqLen, XLogRecPtr targetRecPtr, char *readBuf)
 {
@@ -306,7 +306,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
             if (private->restoreCommand == NULL)
             {
                 pg_log_error("could not open file \"%s\": %m", xlogfpath);
-                return -1;
+                xlogreader->readLen = -1;
+                return false;
             }
 
             /*
@@ -319,7 +320,10 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
                                              private->restoreCommand);
 
             if (xlogreadfd < 0)
-                return -1;
+            {
+                xlogreader->readLen = -1;
+                return false;
+            }
             else
                 pg_log_debug("using file \"%s\" restored from archive",
                              xlogfpath);
@@ -335,7 +339,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
     if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
     {
         pg_log_error("could not seek in file \"%s\": %m", xlogfpath);
-        return -1;
+        xlogreader->readLen = -1;
+        return false;
     }
 
 
@@ -348,13 +353,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
             pg_log_error("could not read file \"%s\": read %d of %zu",
                          xlogfpath, r, (Size) XLOG_BLCKSZ);
 
-        return -1;
+        xlogreader->readLen = -1;
+        return false;
     }
 
     Assert(targetSegNo == xlogreadsegno);
 
     xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli;
-    return XLOG_BLCKSZ;
+    xlogreader->readLen = XLOG_BLCKSZ;
+    return true;
 }
 
 /*
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 1e3894b9c4..833a64210b 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -332,7 +332,7 @@ WALDumpCloseSegment(XLogReaderState *state)
 }
 
 /* pg_waldump's XLogReaderRoutine->page_read callback */
-static int
+static bool
 WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                 XLogRecPtr targetPtr, char *readBuff)
 {
@@ -349,7 +349,8 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
         else
         {
             private->endptr_reached = true;
-            return -1;
+            state->readLen = -1;
+            return false;
         }
     }
 
@@ -374,7 +375,8 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                         errinfo.wre_req);
     }
 
-    return count;
+    state->readLen = count;
+    return true;
 }
 
 /*
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index de6fd791fe..f3cf4f2f49 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -57,12 +57,12 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
-/* Function type definitions for various xlogreader interactions */
-typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
-                               XLogRecPtr targetPagePtr,
-                               int reqLen,
-                               XLogRecPtr targetRecPtr,
-                               char *readBuf);
+/* Function type definition for the read_page callback */
+typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader,
+                                XLogRecPtr targetPagePtr,
+                                int reqLen,
+                                XLogRecPtr targetRecPtr,
+                                char *readBuf);
 typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
                                   XLogSegNo nextSegNo,
                                   TimeLineID *tli_p);
@@ -185,6 +185,18 @@ struct XLogReaderState
     /* Set when XLP_FIRST_IS_OVERWRITE_CONTRECORD is found */
     XLogRecPtr    overwrittenRecPtr;
 
+    /* ----------------------------------------
+     * Communication with page reader
+     * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes.
+     *  ----------------------------------------
+     */
+    /* variables to communicate with page reader */
+    XLogRecPtr    readPagePtr;    /* page pointer to read */
+    int32        readLen;        /* bytes requested to reader, or actual bytes
+                                 * read by reader, which must be larger than
+                                 * the request, or -1 on error */
+    char       *readBuf;        /* buffer to store data */
+    bool        page_verified;    /* is the page on the buffer verified? */
 
     /* ----------------------------------------
      * Decoded representation of current record
@@ -213,13 +225,6 @@ struct XLogReaderState
      * ----------------------------------------
      */
 
-    /*
-     * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
-     * readLen bytes)
-     */
-    char       *readBuf;
-    uint32        readLen;
-
     /* last read XLOG position for data currently in readBuf */
     WALSegmentContext segcxt;
     WALOpenSegment seg;
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index a5cb3d322c..8669f7eeb3 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -89,7 +89,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
-extern int    read_local_xlog_page(XLogReaderState *state,
+extern bool    read_local_xlog_page(XLogReaderState *state,
                                  XLogRecPtr targetPagePtr, int reqLen,
                                  XLogRecPtr targetRecPtr, char *cur_page);
 extern void wal_segment_open(XLogReaderState *state,
-- 
2.27.0

From 320e6d1cc8d65c455069fb32390039601839c667 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Thu, 30 Sep 2021 12:04:37 +0900
Subject: [PATCH v18 2/5] Move page-reader out of XLogReadRecord().

This is the second step of removing callbacks from the WAL decoder.
XLogReadRecord() return XLREAD_NEED_DATA to indicate that the caller
should supply new data, and the decoder works as a state machine.

Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
Author: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Antonin Houska <ah@cybertec.at>
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Takashi Menjo <takashi.menjo@gmail.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
---
 src/backend/access/transam/twophase.c         |  16 +-
 src/backend/access/transam/xlog.c             |  60 +-
 src/backend/access/transam/xlogreader.c       | 760 ++++++++++--------
 src/backend/access/transam/xlogutils.c        |  17 +-
 src/backend/replication/logical/logical.c     |  29 +-
 .../replication/logical/logicalfuncs.c        |  16 +-
 src/backend/replication/slotfuncs.c           |  21 +-
 src/backend/replication/walsender.c           |  35 +-
 src/bin/pg_rewind/parsexlog.c                 |  93 ++-
 src/bin/pg_waldump/pg_waldump.c               |  39 +-
 src/include/access/xlogreader.h               | 124 ++-
 src/include/access/xlogutils.h                |   4 +-
 src/include/pg_config_manual.h                |   2 +-
 src/include/replication/logical.h             |  11 +-
 14 files changed, 703 insertions(+), 524 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2156de187c..9b42a935a7 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1330,11 +1330,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
     char       *errormsg;
     TimeLineID    save_currtli = ThisTimeLineID;
 
-    xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-                                    XL_ROUTINE(.page_read = &read_local_xlog_page,
-                                               .segment_open = &wal_segment_open,
-                                               .segment_close = &wal_segment_close),
-                                    NULL);
+    xlogreader = XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
+
     if (!xlogreader)
         ereport(ERROR,
                 (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -1342,7 +1339,14 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
                  errdetail("Failed while allocating a WAL reading processor.")));
 
     XLogBeginRead(xlogreader, lsn);
-    record = XLogReadRecord(xlogreader, &errormsg);
+    while (XLogReadRecord(xlogreader, &record, &errormsg) == XLREAD_NEED_DATA)
+    {
+        if (!read_local_xlog_page(xlogreader))
+        {
+            XLogTerminateRead(xlogreader);
+            break;
+        }
+    }
 
     /*
      * Restore immediately the timeline where it was previously, as
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1557ceb8c1..316ba256f6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -836,13 +836,6 @@ static XLogSource currentSource = XLOG_FROM_ANY;
 static bool lastSourceFailed = false;
 static bool pendingWalRcvRestart = false;
 
-typedef struct XLogPageReadPrivate
-{
-    int            emode;
-    bool        fetching_ckpt;    /* are we fetching a checkpoint record? */
-    bool        randAccess;
-} XLogPageReadPrivate;
-
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
@@ -918,8 +911,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int    XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                          XLogSource source, bool notfoundOk);
 static int    XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
-static bool    XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-                         int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
+static bool XLogPageRead(XLogReaderState *xlogreader,
+                         bool fetching_ckpt, int emode, bool randAccess);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                         bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static void XLogShutdownWalRcv(void);
@@ -1233,8 +1226,7 @@ XLogInsertRecord(XLogRecData *rdata,
             appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
 
         if (!debug_reader)
-            debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
-                                              XL_ROUTINE(), NULL);
+            debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
 
         if (!debug_reader)
         {
@@ -4395,15 +4387,10 @@ CleanupBackupHistory(void)
  * record is available.
  */
 static XLogRecord *
-ReadRecord(XLogReaderState *xlogreader, int emode,
-           bool fetching_ckpt)
+ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt)
 {
     XLogRecord *record;
-    XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
-
-    private->fetching_ckpt = fetching_ckpt;
-    private->emode = emode;
-    private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
+    bool        randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
 
     /* This is the first attempt to read this page. */
     lastSourceFailed = false;
@@ -4411,8 +4398,18 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
     for (;;)
     {
         char       *errormsg;
+        XLogReadRecordResult result;
+
+        while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
+               == XLREAD_NEED_DATA)
+        {
+            if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess))
+            {
+                XLogTerminateRead(xlogreader);
+                break;
+            }
+        }
 
-        record = XLogReadRecord(xlogreader, &errormsg);
         ReadRecPtr = xlogreader->ReadRecPtr;
         EndRecPtr = xlogreader->EndRecPtr;
         if (record == NULL)
@@ -6547,7 +6544,6 @@ StartupXLOG(void)
     bool        backupFromStandby = false;
     DBState        dbstate_at_startup;
     XLogReaderState *xlogreader;
-    XLogPageReadPrivate private;
     bool        promoted = false;
     struct stat st;
 
@@ -6706,13 +6702,9 @@ StartupXLOG(void)
         OwnLatch(&XLogCtl->recoveryWakeupLatch);
 
     /* Set up XLOG reader facility */
-    MemSet(&private, 0, sizeof(XLogPageReadPrivate));
     xlogreader =
-        XLogReaderAllocate(wal_segment_size, NULL,
-                           XL_ROUTINE(.page_read = &XLogPageRead,
-                                      .segment_open = NULL,
-                                      .segment_close = wal_segment_close),
-                           &private);
+        XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
+
     if (!xlogreader)
         ereport(ERROR,
                 (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -12300,12 +12292,13 @@ CancelBackup(void)
  * sleep and retry.
  */
 static bool
-XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
-             XLogRecPtr targetRecPtr, char *readBuf)
+XLogPageRead(XLogReaderState *xlogreader,
+             bool fetching_ckpt, int emode, bool randAccess)
 {
-    XLogPageReadPrivate *private =
-    (XLogPageReadPrivate *) xlogreader->private_data;
-    int            emode = private->emode;
+    char *readBuf                = xlogreader->readBuf;
+    XLogRecPtr targetPagePtr    = xlogreader->readPagePtr;
+    int reqLen                    = xlogreader->readLen;
+    XLogRecPtr targetRecPtr        = xlogreader->ReadRecPtr;
     uint32        targetPageOff;
     XLogSegNo    targetSegNo PG_USED_FOR_ASSERTS_ONLY;
     int            r;
@@ -12348,8 +12341,8 @@ retry:
          flushedUpto < targetPagePtr + reqLen))
     {
         if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
-                                         private->randAccess,
-                                         private->fetching_ckpt,
+                                         randAccess,
+                                         fetching_ckpt,
                                          targetRecPtr))
         {
             if (readFile >= 0)
@@ -12469,6 +12462,7 @@ retry:
         goto next_record_is_invalid;
     }
 
+    Assert(xlogreader->readPagePtr == targetPagePtr);
     xlogreader->readLen = readLen;
     return true;
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 1d9976ecf4..4501a09245 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -43,7 +43,7 @@ static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
                          int reqLen, bool header_inclusive);
 static void XLogReaderInvalReadState(XLogReaderState *state);
 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
-                                  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
+                                  XLogRecPtr PrevRecPtr, XLogRecord *record);
 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
                             XLogRecPtr recptr);
 static void ResetDecoder(XLogReaderState *state);
@@ -76,7 +76,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
  */
 XLogReaderState *
 XLogReaderAllocate(int wal_segment_size, const char *waldir,
-                   XLogReaderRoutine *routine, void *private_data)
+                   WALSegmentCleanupCB cleanup_cb)
 {
     XLogReaderState *state;
 
@@ -87,7 +87,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
         return NULL;
 
     /* initialize caller-provided support functions */
-    state->routine = *routine;
+    state->cleanup_cb = cleanup_cb;
 
     state->max_block_id = -1;
 
@@ -110,8 +110,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
     WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
                        waldir);
 
-    /* system_identifier initialized to zeroes above */
-    state->private_data = private_data;
     /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
     state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
                                           MCXT_ALLOC_NO_OOM);
@@ -143,8 +141,8 @@ XLogReaderFree(XLogReaderState *state)
 {
     int            block_id;
 
-    if (state->seg.ws_file != -1)
-        state->routine.segment_close(state);
+    if (state->seg.ws_file >= 0)
+        state->cleanup_cb(state);
 
     for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
     {
@@ -249,6 +247,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
     /* Begin at the passed-in record pointer. */
     state->EndRecPtr = RecPtr;
     state->ReadRecPtr = InvalidXLogRecPtr;
+    state->readRecordState = XLREAD_NEXT_RECORD;
 }
 
 /*
@@ -257,12 +256,12 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
  * to XLogReadRecord().
  *
- * If the page_read callback fails to read the requested data, NULL is
- * returned.  The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * This function may return XLREAD_NEED_DATA several times before returning a
+ * result record. The caller shall read in some new data then call this
+ * function again with the same parameters.
  *
- * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * When a record is successfully read, returns XLREAD_SUCCESS with result
+ * record being stored in *record. Otherwise *record is NULL.
  *
  * Returns XLREAD_NEED_DATA if more data is needed to finish reading the
  * current record.  In that case, state->readPagePtr and state->readLen inform
@@ -307,329 +306,458 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * state. This behavior allows to continue reading a reacord switching among
  * different souces, while streaming replication.
  */
-XLogRecord *
-XLogReadRecord(XLogReaderState *state, char **errormsg)
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
 {
-    XLogRecPtr    RecPtr;
-    XLogRecord *record;
-    XLogRecPtr    targetPagePtr;
-    bool        randAccess;
-    uint32        len,
-                total_len;
-    uint32        targetRecOff;
-    uint32        pageHeaderSize;
-    bool        assembled;
-    bool        gotheader;
+    XLogRecord *prec;
 
-    /*
-     * randAccess indicates whether to verify the previous-record pointer of
-     * the record we're reading.  We only do this if we're reading
-     * sequentially, which is what we initially assume.
-     */
-    randAccess = false;
+    *record = NULL;
 
     /* reset error state */
     *errormsg = NULL;
     state->errormsg_buf[0] = '\0';
 
-    ResetDecoder(state);
     state->abortedRecPtr = InvalidXLogRecPtr;
     state->missingContrecPtr = InvalidXLogRecPtr;
 
-    RecPtr = state->EndRecPtr;
-
-    if (state->ReadRecPtr != InvalidXLogRecPtr)
-    {
-        /* read the record after the one we just read */
-
-        /*
-         * EndRecPtr is pointing to end+1 of the previous WAL record.  If
-         * we're at a page boundary, no more records can fit on the current
-         * page. We must skip over the page header, but we can't do that until
-         * we've read in the page, since the header size is variable.
-         */
-    }
-    else
-    {
-        /*
-         * Caller supplied a position to start at.
-         *
-         * In this case, EndRecPtr should already be pointing to a valid
-         * record starting position.
-         */
-        Assert(XRecOffIsValid(RecPtr));
-        randAccess = true;
-    }
-
 restart:
-    state->currRecPtr = RecPtr;
-    assembled = false;
-
-    targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
-    targetRecOff = RecPtr % XLOG_BLCKSZ;
-
-    /*
-     * Read the page containing the record into state->readBuf. Request enough
-     * byte to cover the whole record header, or at least the part of it that
-     * fits on the same page.
-     */
-    while (XLogNeedData(state, targetPagePtr,
-                        Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
-                        targetRecOff != 0))
+    switch (state->readRecordState)
     {
-        if (!state->routine.page_read(state, state->readPagePtr, state->readLen,
-                                      RecPtr, state->readBuf))
-            break;
-    }
-
-    if (!state->page_verified)
-        goto err;
-
-    /*
-     * We have at least the page header, so we can examine it now.
-     */
-    pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
-    if (targetRecOff == 0)
-    {
-        /*
-         * At page start, so skip over page header.
-         */
-        RecPtr += pageHeaderSize;
-        targetRecOff = pageHeaderSize;
-    }
-    else if (targetRecOff < pageHeaderSize)
-    {
-        report_invalid_record(state, "invalid record offset at %X/%X",
-                              LSN_FORMAT_ARGS(RecPtr));
-        goto err;
-    }
+        case XLREAD_NEXT_RECORD:
+            ResetDecoder(state);
 
-    if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
-        targetRecOff == pageHeaderSize)
-    {
-        report_invalid_record(state, "contrecord is requested by %X/%X",
-                              LSN_FORMAT_ARGS(RecPtr));
-        goto err;
-    }
-
-    /* XLogNeedData has verified the page header */
-    Assert(pageHeaderSize <= state->readLen);
-
-    /*
-     * Read the record length.
-     *
-     * NB: Even though we use an XLogRecord pointer here, the whole record
-     * header might not fit on this page. xl_tot_len is the first field of the
-     * struct, so it must be on this page (the records are MAXALIGNed), but we
-     * cannot access any other fields until we've verified that we got the
-     * whole header.
-     */
-    record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-    total_len = record->xl_tot_len;
-
-    /*
-     * If the whole record header is on this page, validate it immediately.
-     * Otherwise do just a basic sanity check on xl_tot_len, and validate the
-     * rest of the header after reading it from the next page.  The xl_tot_len
-     * check is necessary here to ensure that we enter the "Need to reassemble
-     * record" code path below; otherwise we might fail to apply
-     * ValidXLogRecordHeader at all.
-     */
-    if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
-    {
-        if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
-                                   randAccess))
-            goto err;
-        gotheader = true;
-    }
-    else
-    {
-        /* XXX: more validation should be done here */
-        if (total_len < SizeOfXLogRecord)
-        {
-            report_invalid_record(state,
-                                  "invalid record length at %X/%X: wanted %u, got %u",
-                                  LSN_FORMAT_ARGS(RecPtr),
-                                  (uint32) SizeOfXLogRecord, total_len);
-            goto err;
-        }
-        gotheader = false;
-    }
-
-    len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-    if (total_len > len)
-    {
-        /* Need to reassemble record */
-        char       *contdata;
-        XLogPageHeader pageHeader;
-        char       *buffer;
-        uint32        gotlen;
-
-        assembled = true;
-
-        /*
-         * Enlarge readRecordBuf as needed.
-         */
-        if (total_len > state->readRecordBufSize &&
-            !allocate_recordbuf(state, total_len))
-        {
-            /* We treat this as a "bogus data" condition */
-            report_invalid_record(state, "record length %u at %X/%X too long",
-                                  total_len, LSN_FORMAT_ARGS(RecPtr));
-            goto err;
-        }
-
-        /* Copy the first fragment of the record from the first page. */
-        memcpy(state->readRecordBuf,
-               state->readBuf + RecPtr % XLOG_BLCKSZ, len);
-        buffer = state->readRecordBuf + len;
-        gotlen = len;
-
-        do
-        {
-            int            rest_len = total_len - gotlen;
-
-            /* Calculate pointer to beginning of next page */
-            targetPagePtr += XLOG_BLCKSZ;
-
-            /* Wait for the next page to become available */
-            while (XLogNeedData(state, targetPagePtr,
-                                Min(rest_len, XLOG_BLCKSZ),
-                                false))
+            if (state->ReadRecPtr != InvalidXLogRecPtr)
+            {
+                /* read the record after the one we just read */
+
+                /*
+                 * EndRecPtr is pointing to end+1 of the previous WAL record.
+                 * If we're at a page boundary, no more records can fit on the
+                 * current page. We must skip over the page header, but we
+                 * can't do that until we've read in the page, since the
+                 * header size is variable.
+                 */
+                state->PrevRecPtr = state->ReadRecPtr;
+                state->ReadRecPtr = state->EndRecPtr;
+            }
+            else
             {
-                if (!state->routine.page_read(state, state->readPagePtr,
-                                              state->readLen,
-                                              state->ReadRecPtr,
-                                              state->readBuf))
+                /*
+                 * Caller supplied a position to start at.
+                 *
+                 * In this case, EndRecPtr should already be pointing to a
+                 * valid record starting position.
+                 */
+                Assert(XRecOffIsValid(state->EndRecPtr));
+                state->ReadRecPtr = state->EndRecPtr;
+
+                /*
+                 * We cannot verify the previous-record pointer when we're
+                 * seeking to a particular record. Reset PrevRecPtr so that we
+                 * won't try doing that.
+                 */
+                state->PrevRecPtr = InvalidXLogRecPtr;
+                state->EndRecPtr = InvalidXLogRecPtr;    /* to be tidy */
+            }
+
+            state->record_verified = false;
+            state->readRecordState = XLREAD_TOT_LEN;
+            /* fall through */
+
+        case XLREAD_TOT_LEN:
+            {
+                uint32        total_len;
+                uint32        pageHeaderSize;
+                XLogRecPtr    targetPagePtr;
+                uint32        targetRecOff;
+                XLogPageHeader pageHeader;
+
+                targetPagePtr =
+                    state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+                targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+
+                /*
+                 * Check if we have enough data. For the first record in the
+                 * page, the requesting length doesn't contain page header.
+                 */
+                if (XLogNeedData(state, targetPagePtr,
+                                 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
+                                 targetRecOff != 0))
+                    return XLREAD_NEED_DATA;
+
+                /* error out if caller supplied bogus page */
+                if (!state->page_verified)
+                    goto err;
+
+                /* examine page header now. */
+                pageHeaderSize =
+                    XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+                if (targetRecOff == 0)
+                {
+                    /* At page start, so skip over page header. */
+                    state->ReadRecPtr += pageHeaderSize;
+                    targetRecOff = pageHeaderSize;
+                }
+                else if (targetRecOff < pageHeaderSize)
+                {
+                    report_invalid_record(state, "invalid record offset at %X/%X",
+                                          LSN_FORMAT_ARGS(state->ReadRecPtr));
+                    goto err;
+                }
+
+                pageHeader = (XLogPageHeader) state->readBuf;
+                if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+                    targetRecOff == pageHeaderSize)
+                {
+                    report_invalid_record(state, "contrecord is requested by %X/%X",
+                                          (uint32) (state->ReadRecPtr >> 32),
+                                          (uint32) state->ReadRecPtr);
+                    goto err;
+                }
+
+                /* XLogNeedData has verified the page header */
+                Assert(pageHeaderSize <= state->readLen);
+
+                /*
+                 * Read the record length.
+                 *
+                 * NB: Even though we use an XLogRecord pointer here, the
+                 * whole record header might not fit on this page. xl_tot_len
+                 * is the first field of the struct, so it must be on this
+                 * page (the records are MAXALIGNed), but we cannot access any
+                 * other fields until we've verified that we got the whole
+                 * header.
+                 */
+                prec = (XLogRecord *) (state->readBuf +
+                                       state->ReadRecPtr % XLOG_BLCKSZ);
+                total_len = prec->xl_tot_len;
+
+                /*
+                 * If the whole record header is on this page, validate it
+                 * immediately.  Otherwise do just a basic sanity check on
+                 * xl_tot_len, and validate the rest of the header after
+                 * reading it from the next page.  The xl_tot_len check is
+                 * necessary here to ensure that we enter the
+                 * XLREAD_CONTINUATION state below; otherwise we might fail to
+                 * apply ValidXLogRecordHeader at all.
+                 */
+                if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+                {
+                    if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+                                               state->PrevRecPtr, prec))
+                        goto err;
+
+                    state->record_verified = true;
+                }
+                else
+                {
+                    /* XXX: more validation should be done here */
+                    if (total_len < SizeOfXLogRecord)
+                    {
+                        report_invalid_record(state,
+                                              "invalid record length at %X/%X: wanted %u, got %u",
+                                              LSN_FORMAT_ARGS(state->ReadRecPtr),
+                                              (uint32) SizeOfXLogRecord, total_len);
+                        goto err;
+                    }
+                }
+
+                /*
+                 * Wait for the rest of the record, or the part of the record
+                 * that fit on the first page if crossed a page boundary, to
+                 * become available.
+                 */
+                state->recordGotLen = 0;
+                state->recordRemainLen = total_len;
+                state->readRecordState = XLREAD_FIRST_FRAGMENT;
+            }
+            /* fall through */
+
+        case XLREAD_FIRST_FRAGMENT:
+            {
+                uint32        total_len = state->recordRemainLen;
+                uint32        request_len;
+                uint32        record_len;
+                XLogRecPtr    targetPagePtr;
+                uint32        targetRecOff;
+
+                /*
+                 * Wait for the rest of the record on the first page to become
+                 * available
+                 */
+                targetPagePtr =
+                    state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+                targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+
+                request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ);
+                record_len = request_len - targetRecOff;
+
+                /* ReadRecPtr contains page header */
+                Assert(targetRecOff != 0);
+                if (XLogNeedData(state, targetPagePtr, request_len, true))
+                    return XLREAD_NEED_DATA;
+
+                /* error out if caller supplied bogus page */
+                if (!state->page_verified)
+                    goto err;
+
+                prec = (XLogRecord *) (state->readBuf + targetRecOff);
+
+                /* validate record header if not yet */
+                if (!state->record_verified && record_len >= SizeOfXLogRecord)
+                {
+                    if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+                                               state->PrevRecPtr, prec))
+                        goto err;
+
+                    state->record_verified = true;
+                }
+
+
+                if (total_len == record_len)
+                {
+                    /* Record does not cross a page boundary */
+                    Assert(state->record_verified);
+
+                    if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
+                        goto err;
+
+                    state->record_verified = true;    /* to be tidy */
+
+                    /* We already checked the header earlier */
+                    state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len);
+
+                    *record = prec;
+                    state->readRecordState = XLREAD_NEXT_RECORD;
                     break;
-            }
+                }
 
-            if (!state->page_verified)
-                goto err;
+                /*
+                 * The record continues on the next page. Need to reassemble
+                 * record
+                 */
+                Assert(total_len > record_len);
 
-            Assert(SizeOfXLogShortPHD <= state->readLen);
+                /* Enlarge readRecordBuf as needed. */
+                if (total_len > state->readRecordBufSize &&
+                    !allocate_recordbuf(state, total_len))
+                {
+                    /* We treat this as a "bogus data" condition */
+                    report_invalid_record(state,
+                                          "record length %u at %X/%X too long",
+                                          total_len,
+                                          LSN_FORMAT_ARGS(state->ReadRecPtr));
+                    goto err;
+                }
 
-            pageHeader = (XLogPageHeader) state->readBuf;
+                /* Copy the first fragment of the record from the first page. */
+                memcpy(state->readRecordBuf, state->readBuf + targetRecOff,
+                       record_len);
+                state->recordGotLen += record_len;
+                state->recordRemainLen -= record_len;
 
-            /*
-             * If we were expecting a continuation record and got an
-             * "overwrite contrecord" flag, that means the continuation record
-             * was overwritten with a different record.  Restart the read by
-             * assuming the address to read is the location where we found
-             * this flag; but keep track of the LSN of the record we were
-             * reading, for later verification.
-             */
-            if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
-            {
-                state->overwrittenRecPtr = state->currRecPtr;
-                ResetDecoder(state);
-                RecPtr = targetPagePtr;
-                goto restart;
-            }
+                /* Calculate pointer to beginning of next page */
+                state->recordContRecPtr = state->ReadRecPtr + record_len;
+                Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
 
-            /* Check that the continuation on next page looks valid */
-            if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
-            {
-                report_invalid_record(state,
-                                      "there is no contrecord flag at %X/%X",
-                                      LSN_FORMAT_ARGS(RecPtr));
-                goto err;
+                state->readRecordState = XLREAD_CONTINUATION;
             }
+            /* fall through */
 
-            /*
-             * Cross-check that xlp_rem_len agrees with how much of the record
-             * we expect there to be left.
-             */
-            if (pageHeader->xlp_rem_len == 0 ||
-                total_len != (pageHeader->xlp_rem_len + gotlen))
+        case XLREAD_CONTINUATION:
             {
-                report_invalid_record(state,
-                                      "invalid contrecord length %u (expected %lld) at %X/%X",
-                                      pageHeader->xlp_rem_len,
-                                      ((long long) total_len) - gotlen,
-                                      LSN_FORMAT_ARGS(RecPtr));
-                goto err;
-            }
+                XLogPageHeader pageHeader;
+                uint32        pageHeaderSize;
+                XLogRecPtr    targetPagePtr;
 
-            /* Append the continuation from this page to the buffer */
-            pageHeaderSize = XLogPageHeaderSize(pageHeader);
+                /*
+                 * we enter this state only if we haven't read the whole
+                 * record.
+                 */
+                Assert(state->recordRemainLen > 0);
 
-            Assert(pageHeaderSize <= state->readLen);
+                while (state->recordRemainLen > 0)
+                {
+                    char       *contdata;
+                    uint32        request_len;
+                    uint32        record_len;
 
-            contdata = (char *) state->readBuf + pageHeaderSize;
-            len = XLOG_BLCKSZ - pageHeaderSize;
-            if (pageHeader->xlp_rem_len < len)
-                len = pageHeader->xlp_rem_len;
+                    /* Wait for the next page to become available */
+                    targetPagePtr = state->recordContRecPtr;
 
-            Assert(pageHeaderSize + len <= state->readLen);
-            memcpy(buffer, (char *) contdata, len);
-            buffer += len;
-            gotlen += len;
+                    /* this request contains page header */
+                    Assert(targetPagePtr != 0);
+                    if (XLogNeedData(state, targetPagePtr,
+                                     Min(state->recordRemainLen, XLOG_BLCKSZ),
+                                     false))
+                        return XLREAD_NEED_DATA;
 
-            /* If we just reassembled the record header, validate it. */
-            if (!gotheader)
-            {
-                record = (XLogRecord *) state->readRecordBuf;
-                if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
-                                           record, randAccess))
+                    if (!state->page_verified)
+                        goto err;
+
+                    Assert(SizeOfXLogShortPHD <= state->readLen);
+
+                    /* Check that the continuation on next page looks valid */
+                    pageHeader = (XLogPageHeader) state->readBuf;
+
+                    /*
+                     * If we were expecting a continuation record and got an
+                     * "overwrite contrecord" flag, that means the continuation
+                     * record was overwritten with a different record.  Restart
+                     * the read by assuming the address to read is the location
+                     * where we found this flag; but keep track of the LSN of
+                     * the record we were reading, for later verification.
+                     */
+                    if (pageHeader->xlp_info &
+                        XLP_FIRST_IS_OVERWRITE_CONTRECORD)
+                    {
+                        state->overwrittenRecPtr = state->ReadRecPtr;
+                        state->EndRecPtr = targetPagePtr;
+                        state->readRecordState = XLREAD_NEXT_RECORD;
+
+                        /*
+                         * ReadRecPtr is the PrevRecPtr of the next
+                         * record. Keep the LSN at the retry.
+                         */
+                        state->ReadRecPtr = state->PrevRecPtr;
+
+                        goto restart;
+                    }
+
+                    if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+                    {
+                        report_invalid_record(
+                                              state,
+                                              "there is no contrecord flag at %X/%X reading %X/%X",
+                                              (uint32) (state->recordContRecPtr >> 32),
+                                              (uint32) state->recordContRecPtr,
+                                              (uint32) (state->ReadRecPtr >> 32),
+                                              (uint32) state->ReadRecPtr);
+                        goto err;
+                    }
+
+                    /*
+                     * Cross-check that xlp_rem_len agrees with how much of
+                     * the record we expect there to be left.
+                     */
+                    if (pageHeader->xlp_rem_len == 0 ||
+                        pageHeader->xlp_rem_len != state->recordRemainLen)
+                    {
+                        report_invalid_record(
+                                              state,
+                                              "invalid contrecord length %u at %X/%X reading %X/%X, expected %u",
+                                              pageHeader->xlp_rem_len,
+                                              (uint32) (state->recordContRecPtr >> 32),
+                                              (uint32) state->recordContRecPtr,
+                                              (uint32) (state->ReadRecPtr >> 32),
+                                              (uint32) state->ReadRecPtr,
+                                              state->recordRemainLen);
+                        goto err;
+                    }
+
+                    /* Append the continuation from this page to the buffer */
+                    pageHeaderSize = XLogPageHeaderSize(pageHeader);
+
+                    /*
+                     * XLogNeedData should have ensured that the whole page
+                     * header was read
+                     */
+                    Assert(state->readLen >= pageHeaderSize);
+
+                    contdata = (char *) state->readBuf + pageHeaderSize;
+                    record_len = XLOG_BLCKSZ - pageHeaderSize;
+                    if (pageHeader->xlp_rem_len < record_len)
+                        record_len = pageHeader->xlp_rem_len;
+
+                    request_len = record_len + pageHeaderSize;
+
+                    /*
+                     * XLogNeedData should have ensured all needed data was
+                     * read
+                     */
+                    Assert(state->readLen >= request_len);
+
+                    memcpy(state->readRecordBuf + state->recordGotLen,
+                           (char *) contdata, record_len);
+                    state->recordGotLen += record_len;
+                    state->recordRemainLen -= record_len;
+
+                    /* If we just reassembled the record header, validate it. */
+                    if (!state->record_verified)
+                    {
+                        Assert(state->recordGotLen >= SizeOfXLogRecord);
+                        if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+                                                   state->PrevRecPtr,
+                                                   (XLogRecord *) state->readRecordBuf))
+                            goto err;
+
+                        state->record_verified = true;
+                    }
+
+                    /*
+                     * Calculate pointer to beginning of next page, and
+                     * continue
+                     */
+                    state->recordContRecPtr += XLOG_BLCKSZ;
+                }
+
+                /* targetPagePtr is pointing the last-read page here */
+                prec = (XLogRecord *) state->readRecordBuf;
+                if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
                     goto err;
-                gotheader = true;
-            }
-        } while (gotlen < total_len);
-
-        Assert(gotheader);
-
-        record = (XLogRecord *) state->readRecordBuf;
-        if (!ValidXLogRecord(state, record, RecPtr))
-            goto err;
-
-        pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
-        state->ReadRecPtr = RecPtr;
-        state->EndRecPtr = targetPagePtr + pageHeaderSize
-            + MAXALIGN(pageHeader->xlp_rem_len);
-    }
-    else
-    {
-        /* Wait for the record data to become available */
-        while (XLogNeedData(state, targetPagePtr,
-                            Min(targetRecOff + total_len, XLOG_BLCKSZ), true))
-        {
-            if (!state->routine.page_read(state, state->readPagePtr,
-                                          state->readLen,
-                                          state->ReadRecPtr, state->readBuf))
+
+                pageHeaderSize =
+                    XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+                state->EndRecPtr = targetPagePtr + pageHeaderSize
+                    + MAXALIGN(pageHeader->xlp_rem_len);
+
+                *record = prec;
+                state->readRecordState = XLREAD_NEXT_RECORD;
                 break;
-        }
-
-        if (!state->page_verified)
-            goto err;
-
-        /* Record does not cross a page boundary */
-        if (!ValidXLogRecord(state, record, RecPtr))
-            goto err;
-
-        state->EndRecPtr = RecPtr + MAXALIGN(total_len);
-
-        state->ReadRecPtr = RecPtr;
+            }
     }
 
     /*
      * Special processing if it's an XLOG SWITCH record
      */
-    if (record->xl_rmid == RM_XLOG_ID &&
-        (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+    if ((*record)->xl_rmid == RM_XLOG_ID &&
+        ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
     {
         /* Pretend it extends to end of segment */
         state->EndRecPtr += state->segcxt.ws_segsize - 1;
         state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
     }
 
-    if (DecodeXLogRecord(state, record, errormsg))
-        return record;
-    else
-        return NULL;
+    Assert(!*record || state->readLen >= 0);
+    if (DecodeXLogRecord(state, *record, errormsg))
+        return XLREAD_SUCCESS;
+
+    *record = NULL;
+    return XLREAD_FAIL;
 
 err:
-    if (assembled)
+    Assert(state->readRecordState != XLREAD_NEXT_RECORD);
+    XLogTerminateRead(state);
+
+    if (state->errormsg_buf[0] != '\0')
+        *errormsg = state->errormsg_buf;
+
+    *record = NULL;
+
+    return XLREAD_FAIL;
+}
+
+/*
+ * Terminate read WAL.
+ *
+ * When the caller failed to read the data requested from XLogReadRecord, it is
+ * supposed to call this function to set the correct reader state to reflect
+ * the failure.
+ */
+void
+XLogTerminateRead(XLogReaderState *state)
+{
+    if (state->readRecordState == XLREAD_CONTINUATION)
     {
         /*
          * We get here when a record that spans multiple pages needs to be
@@ -640,20 +768,15 @@ err:
          * in turn signal downstream WAL consumers that the broken WAL record
          * is to be ignored.
          */
-        state->abortedRecPtr = RecPtr;
-        state->missingContrecPtr = targetPagePtr;
+        state->abortedRecPtr = state->ReadRecPtr;
+        state->missingContrecPtr = state->recordContRecPtr;
     }
 
     /*
-     * Invalidate the read state. We might read from a different source after
+     * Invalidate the read page. We might read from a different source after
      * failure.
      */
     XLogReaderInvalReadState(state);
-
-    if (state->errormsg_buf[0] != '\0')
-        *errormsg = state->errormsg_buf;
-
-    return NULL;
 }
 
 /*
@@ -805,11 +928,12 @@ XLogReaderInvalReadState(XLogReaderState *state)
  *
  * This is just a convenience subroutine to avoid duplicated code in
  * XLogReadRecord.  It's not intended for use from anywhere else.
+ *
+ * If PrevRecPtr is valid, the xl_prev is is cross-checked with it.
  */
 static bool
 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
-                      XLogRecPtr PrevRecPtr, XLogRecord *record,
-                      bool randAccess)
+                      XLogRecPtr PrevRecPtr, XLogRecord *record)
 {
     if (record->xl_tot_len < SizeOfXLogRecord)
     {
@@ -826,7 +950,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
                               record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
         return false;
     }
-    if (randAccess)
+    if (PrevRecPtr == InvalidXLogRecPtr)
     {
         /*
          * We can't exactly verify the prev-link, but surely it should be less
@@ -1056,11 +1180,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
  * XLogReadRecord() will read the next valid record.
  */
 XLogRecPtr
-XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
+XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+                   XLogFindNextRecordCB read_page, void *private)
 {
     XLogRecPtr    tmpRecPtr;
     XLogRecPtr    found = InvalidXLogRecPtr;
     XLogPageHeader header;
+    XLogRecord *record;
+    XLogReadRecordResult result;
     char       *errormsg;
 
     Assert(!XLogRecPtrIsInvalid(RecPtr));
@@ -1093,9 +1220,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
         while (XLogNeedData(state, targetPagePtr, targetRecOff,
                             targetRecOff != 0))
         {
-            if (!state->routine.page_read(state, state->readPagePtr,
-                                          state->readLen,
-                                          state->ReadRecPtr, state->readBuf))
+            if (!read_page(state, private))
                 break;
         }
 
@@ -1147,8 +1272,16 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
      * or we just jumped over the remaining data of a continuation.
      */
     XLogBeginRead(state, tmpRecPtr);
-    while (XLogReadRecord(state, &errormsg) != NULL)
+    while ((result = XLogReadRecord(state, &record, &errormsg)) !=
+           XLREAD_FAIL)
     {
+        if (result == XLREAD_NEED_DATA)
+        {
+            if (!read_page(state, private))
+                goto err;
+            continue;
+        }
+
         /* past the record we've found, break out */
         if (RecPtr <= state->ReadRecPtr)
         {
@@ -1168,9 +1301,9 @@ err:
 #endif                            /* FRONTEND */
 
 /*
- * Helper function to ease writing of XLogRoutine->page_read callbacks.
- * If this function is used, caller must supply a segment_open callback in
- * 'state', as that is used here.
+ * Helper function to ease writing of page_read callback.
+ * If this function is used, caller must supply a segment_open callback and
+ * segment_close callback as that is used here.
  *
  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
  * fetched from timeline 'tli'.
@@ -1183,6 +1316,7 @@ err:
  */
 bool
 WALRead(XLogReaderState *state,
+        WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn,
         char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
         WALReadError *errinfo)
 {
@@ -1214,10 +1348,10 @@ WALRead(XLogReaderState *state,
             XLogSegNo    nextSegNo;
 
             if (state->seg.ws_file >= 0)
-                state->routine.segment_close(state);
+                segclosefn(state);
 
             XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
-            state->routine.segment_open(state, nextSegNo, &tli);
+            segopenfn(state, nextSegNo, &tli);
 
             /* This shouldn't happen -- indicates a bug in segment_open */
             Assert(state->seg.ws_file >= 0);
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 421040f391..fc09a72b8b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -706,8 +706,7 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-    const XLogRecPtr lastReadPage = state->seg.ws_segno *
-    state->segcxt.ws_segsize + state->readLen;
+    const XLogRecPtr lastReadPage = state->readPagePtr;
 
     Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
     Assert(wantLength <= XLOG_BLCKSZ);
@@ -722,7 +721,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
      * current TLI has since become historical.
      */
     if (lastReadPage == wantPage &&
-        state->readLen != 0 &&
+        state->page_verified &&
         lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1))
         return;
 
@@ -808,6 +807,7 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
     char        path[MAXPGPATH];
 
     XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
+    elog(LOG, "HOGE: %lu, %d => %s", nextSegNo, tli, path);
     state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
     if (state->seg.ws_file >= 0)
         return;
@@ -845,9 +845,11 @@ wal_segment_close(XLogReaderState *state)
  * loop for now.
  */
 bool
-read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-                     int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
+read_local_xlog_page(XLogReaderState *state)
 {
+    XLogRecPtr    targetPagePtr = state->readPagePtr;
+    int            reqLen          = state->readLen;
+    char       *cur_page      = state->readBuf;
     XLogRecPtr    read_upto,
                 loc;
     TimeLineID    tli;
@@ -960,11 +962,12 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
      * as 'count', read the whole page anyway. It's guaranteed to be
      * zero-padded up to the page boundary if it's incomplete.
      */
-    if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
-                 &errinfo))
+    if (!WALRead(state, wal_segment_open, wal_segment_close,
+                 cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &errinfo))
         WALReadRaiseError(&errinfo);
 
     /* number of valid bytes in the buffer */
+    state->readPagePtr = targetPagePtr;
     state->readLen = count;
     return true;
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index aae0ae5b8a..2a88aa063a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options,
                        TransactionId xmin_horizon,
                        bool need_full_snapshot,
                        bool fast_forward,
-                       XLogReaderRoutine *xl_routine,
+                       LogicalDecodingXLogPageReadCB page_read,
+                       WALSegmentCleanupCB cleanup_cb,
                        LogicalOutputPluginWriterPrepareWrite prepare_write,
                        LogicalOutputPluginWriterWrite do_write,
                        LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options,
 
     ctx->slot = slot;
 
-    ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
+    ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb);
     if (!ctx->reader)
         ereport(ERROR,
                 (errcode(ERRCODE_OUT_OF_MEMORY),
                  errmsg("out of memory")));
+    ctx->page_read = page_read;
 
     ctx->reorder = ReorderBufferAllocate();
     ctx->snapshot_builder =
@@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin,
                           List *output_plugin_options,
                           bool need_full_snapshot,
                           XLogRecPtr restart_lsn,
-                          XLogReaderRoutine *xl_routine,
+                          LogicalDecodingXLogPageReadCB page_read,
+                          WALSegmentCleanupCB cleanup_cb,
                           LogicalOutputPluginWriterPrepareWrite prepare_write,
                           LogicalOutputPluginWriterWrite do_write,
                           LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin,
 
     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
                                  need_full_snapshot, false,
-                                 xl_routine, prepare_write, do_write,
+                                 page_read, cleanup_cb, prepare_write, do_write,
                                  update_progress);
 
     /* call output plugin initialization callback */
@@ -478,7 +481,8 @@ LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
                       List *output_plugin_options,
                       bool fast_forward,
-                      XLogReaderRoutine *xl_routine,
+                      LogicalDecodingXLogPageReadCB page_read,
+                      WALSegmentCleanupCB cleanup_cb,
                       LogicalOutputPluginWriterPrepareWrite prepare_write,
                       LogicalOutputPluginWriterWrite do_write,
                       LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -534,8 +538,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
     ctx = StartupDecodingContext(output_plugin_options,
                                  start_lsn, InvalidTransactionId, false,
-                                 fast_forward, xl_routine, prepare_write,
-                                 do_write, update_progress);
+                                 fast_forward, page_read, cleanup_cb,
+                                 prepare_write, do_write, update_progress);
 
     /* call output plugin initialization callback */
     old_context = MemoryContextSwitchTo(ctx->context);
@@ -603,7 +607,16 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
         char       *err = NULL;
 
         /* the read_page callback waits for new WAL */
-        record = XLogReadRecord(ctx->reader, &err);
+        while (XLogReadRecord(ctx->reader, &record, &err) ==
+               XLREAD_NEED_DATA)
+        {
+            if (!ctx->page_read(ctx->reader))
+            {
+                XLogTerminateRead(ctx->reader);
+                break;
+            }
+        }
+
         if (err)
             elog(ERROR, "%s", err);
         if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index e59939aad1..f68d08fdee 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -224,9 +224,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
         ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                     options,
                                     false,
-                                    XL_ROUTINE(.page_read = read_local_xlog_page,
-                                               .segment_open = wal_segment_open,
-                                               .segment_close = wal_segment_close),
+                                    read_local_xlog_page,
+                                    wal_segment_close,
                                     LogicalOutputPrepareWrite,
                                     LogicalOutputWrite, NULL);
 
@@ -275,7 +274,16 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
             XLogRecord *record;
             char       *errm = NULL;
 
-            record = XLogReadRecord(ctx->reader, &errm);
+            while (XLogReadRecord(ctx->reader, &record, &errm) ==
+                   XLREAD_NEED_DATA)
+            {
+                if (!ctx->page_read(ctx->reader))
+                {
+                    XLogTerminateRead(ctx->reader);
+                    break;
+                }
+            }
+
             if (errm)
                 elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 17df99c2ac..57426aa9d7 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -144,9 +144,8 @@ create_logical_replication_slot(char *name, char *plugin,
     ctx = CreateInitDecodingContext(plugin, NIL,
                                     false,    /* just catalogs is OK */
                                     restart_lsn,
-                                    XL_ROUTINE(.page_read = read_local_xlog_page,
-                                               .segment_open = wal_segment_open,
-                                               .segment_close = wal_segment_close),
+                                    read_local_xlog_page,
+                                    wal_segment_close,
                                     NULL, NULL, NULL);
 
     /*
@@ -503,9 +502,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
         ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                     NIL,
                                     true,    /* fast_forward */
-                                    XL_ROUTINE(.page_read = read_local_xlog_page,
-                                               .segment_open = wal_segment_open,
-                                               .segment_close = wal_segment_close),
+                                    read_local_xlog_page,
+                                    wal_segment_close,
                                     NULL, NULL, NULL);
 
         /*
@@ -527,7 +525,16 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
              * Read records.  No changes are generated in fast_forward mode,
              * but snapbuilder/slot statuses are updated properly.
              */
-            record = XLogReadRecord(ctx->reader, &errm);
+            while (XLogReadRecord(ctx->reader, &record, &errm) ==
+                   XLREAD_NEED_DATA)
+            {
+                if (!ctx->page_read(ctx->reader))
+                {
+                    XLogTerminateRead(ctx->reader);
+                    break;
+                }
+            }
+
             if (errm)
                 elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ddba340653..78d805ab80 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -575,10 +575,7 @@ StartReplication(StartReplicationCmd *cmd)
 
     /* create xlogreader for physical replication */
     xlogreader =
-        XLogReaderAllocate(wal_segment_size, NULL,
-                           XL_ROUTINE(.segment_open = WalSndSegmentOpen,
-                                      .segment_close = wal_segment_close),
-                           NULL);
+        XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
 
     if (!xlogreader)
         ereport(ERROR,
@@ -803,9 +800,11 @@ StartReplication(StartReplicationCmd *cmd)
  * set every time WAL is flushed.
  */
 static bool
-logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-                       XLogRecPtr targetRecPtr, char *cur_page)
+logical_read_xlog_page(XLogReaderState *state)
 {
+    XLogRecPtr        targetPagePtr = state->readPagePtr;
+    int                reqLen          = state->readLen;
+    char           *cur_page      = state->readBuf;
     XLogRecPtr    flushptr;
     int            count;
     WALReadError errinfo;
@@ -833,7 +832,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
         count = flushptr - targetPagePtr;    /* part of the page available */
 
     /* now actually read the data, we know it's there */
-    if (!WALRead(state,
+    if (!WALRead(state, WalSndSegmentOpen, wal_segment_close,
                  cur_page,
                  targetPagePtr,
                  XLOG_BLCKSZ,
@@ -1023,9 +1022,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
         ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
                                         InvalidXLogRecPtr,
-                                        XL_ROUTINE(.page_read = logical_read_xlog_page,
-                                                   .segment_open = WalSndSegmentOpen,
-                                                   .segment_close = wal_segment_close),
+                                        logical_read_xlog_page,
+                                        wal_segment_close,
                                         WalSndPrepareWrite, WalSndWriteData,
                                         WalSndUpdateProgress);
 
@@ -1183,9 +1181,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
      */
     logical_decoding_ctx =
         CreateDecodingContext(cmd->startpoint, cmd->options, false,
-                              XL_ROUTINE(.page_read = logical_read_xlog_page,
-                                         .segment_open = WalSndSegmentOpen,
-                                         .segment_close = wal_segment_close),
+                              logical_read_xlog_page,
+                              wal_segment_close,
                               WalSndPrepareWrite, WalSndWriteData,
                               WalSndUpdateProgress);
     xlogreader = logical_decoding_ctx->reader;
@@ -2778,7 +2775,7 @@ XLogSendPhysical(void)
     enlargeStringInfo(&output_message, nbytes);
 
 retry:
-    if (!WALRead(xlogreader,
+    if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close,
                  &output_message.data[output_message.len],
                  startptr,
                  nbytes,
@@ -2876,7 +2873,15 @@ XLogSendLogical(void)
      */
     WalSndCaughtUp = false;
 
-    record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
+    while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) ==
+           XLREAD_NEED_DATA)
+    {
+        if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader))
+        {
+            XLogTerminateRead(logical_decoding_ctx->reader);
+            break;
+        }
+    }
 
     /* xlog record was invalid */
     if (errm != NULL)
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index cf119848b0..da723e5340 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -41,15 +41,9 @@ static int    xlogreadfd = -1;
 static XLogSegNo xlogreadsegno = -1;
 static char xlogfpath[MAXPGPATH];
 
-typedef struct XLogPageReadPrivate
-{
-    const char *restoreCommand;
-    int            tliIndex;
-} XLogPageReadPrivate;
-
-static bool    SimpleXLogPageRead(XLogReaderState *xlogreader,
-                               XLogRecPtr targetPagePtr,
-                               int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
+static bool SimpleXLogPageRead(XLogReaderState *xlogreader,
+                               const char *datadir, int *tliIndex,
+                               const char *restoreCommand);
 
 /*
  * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
@@ -66,20 +60,25 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
     XLogRecord *record;
     XLogReaderState *xlogreader;
     char       *errormsg;
-    XLogPageReadPrivate private;
 
-    private.tliIndex = tliIndex;
-    private.restoreCommand = restoreCommand;
-    xlogreader = XLogReaderAllocate(WalSegSz, datadir,
-                                    XL_ROUTINE(.page_read = &SimpleXLogPageRead),
-                                    &private);
+    xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL);
+
     if (xlogreader == NULL)
         pg_fatal("out of memory");
 
     XLogBeginRead(xlogreader, startpoint);
     do
     {
-        record = XLogReadRecord(xlogreader, &errormsg);
+        while (XLogReadRecord(xlogreader, &record, &errormsg) ==
+               XLREAD_NEED_DATA)
+        {
+            if (!SimpleXLogPageRead(xlogreader, datadir,
+                                    &tliIndex, restoreCommand))
+            {
+                XLogTerminateRead(xlogreader);
+                break;
+            }
+        }
 
         if (record == NULL)
         {
@@ -123,19 +122,22 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
     XLogRecord *record;
     XLogReaderState *xlogreader;
     char       *errormsg;
-    XLogPageReadPrivate private;
     XLogRecPtr    endptr;
 
-    private.tliIndex = tliIndex;
-    private.restoreCommand = restoreCommand;
-    xlogreader = XLogReaderAllocate(WalSegSz, datadir,
-                                    XL_ROUTINE(.page_read = &SimpleXLogPageRead),
-                                    &private);
+    xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL);
     if (xlogreader == NULL)
         pg_fatal("out of memory");
 
     XLogBeginRead(xlogreader, ptr);
-    record = XLogReadRecord(xlogreader, &errormsg);
+    while (XLogReadRecord(xlogreader, &record, &errormsg) ==
+           XLREAD_NEED_DATA)
+    {
+        if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex, restoreCommand))
+        {
+            XLogTerminateRead(xlogreader);
+            break;
+        }
+    }
     if (record == NULL)
     {
         if (errormsg)
@@ -170,7 +172,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
     XLogRecPtr    searchptr;
     XLogReaderState *xlogreader;
     char       *errormsg;
-    XLogPageReadPrivate private;
 
     /*
      * The given fork pointer points to the end of the last common record,
@@ -186,11 +187,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
             forkptr += SizeOfXLogShortPHD;
     }
 
-    private.tliIndex = tliIndex;
-    private.restoreCommand = restoreCommand;
-    xlogreader = XLogReaderAllocate(WalSegSz, datadir,
-                                    XL_ROUTINE(.page_read = &SimpleXLogPageRead),
-                                    &private);
+    xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL);
     if (xlogreader == NULL)
         pg_fatal("out of memory");
 
@@ -200,7 +197,16 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
         uint8        info;
 
         XLogBeginRead(xlogreader, searchptr);
-        record = XLogReadRecord(xlogreader, &errormsg);
+        while (XLogReadRecord(xlogreader, &record, &errormsg) ==
+               XLREAD_NEED_DATA)
+        {
+            if (!SimpleXLogPageRead(xlogreader, datadir,
+                                    &tliIndex, restoreCommand))
+            {
+                XLogTerminateRead(xlogreader);
+                break;
+            }
+        }
 
         if (record == NULL)
         {
@@ -247,10 +253,11 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 
 /* XLogReader callback function, to read a WAL page */
 static bool
-SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-                   int reqLen, XLogRecPtr targetRecPtr, char *readBuf)
+SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir,
+                   int *tliIndex, const char *restoreCommand)
 {
-    XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
+    XLogRecPtr    targetPagePtr = xlogreader->readPagePtr;
+    char       *readBuf          = xlogreader->readBuf;
     uint32        targetPageOff;
     XLogRecPtr    targetSegEnd;
     XLogSegNo    targetSegNo;
@@ -283,14 +290,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
          * be done both forward and backward, consider also switching timeline
          * accordingly.
          */
-        while (private->tliIndex < targetNentries - 1 &&
-               targetHistory[private->tliIndex].end < targetSegEnd)
-            private->tliIndex++;
-        while (private->tliIndex > 0 &&
-               targetHistory[private->tliIndex].begin >= targetSegEnd)
-            private->tliIndex--;
+        while (*tliIndex < targetNentries - 1 &&
+               targetHistory[*tliIndex].end < targetSegEnd)
+            (*tliIndex)++;
+        while (*tliIndex > 0 &&
+               targetHistory[*tliIndex].begin >= targetSegEnd)
+            (*tliIndex)--;
 
-        XLogFileName(xlogfname, targetHistory[private->tliIndex].tli,
+        XLogFileName(xlogfname, targetHistory[*tliIndex].tli,
                      xlogreadsegno, WalSegSz);
 
         snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s",
@@ -303,7 +310,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
             /*
              * If we have no restore_command to execute, then exit.
              */
-            if (private->restoreCommand == NULL)
+            if (restoreCommand == NULL)
             {
                 pg_log_error("could not open file \"%s\": %m", xlogfpath);
                 xlogreader->readLen = -1;
@@ -317,7 +324,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
             xlogreadfd = RestoreArchivedFile(xlogreader->segcxt.ws_dir,
                                              xlogfname,
                                              WalSegSz,
-                                             private->restoreCommand);
+                                             restoreCommand);
 
             if (xlogreadfd < 0)
             {
@@ -359,7 +366,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 
     Assert(targetSegNo == xlogreadsegno);
 
-    xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli;
+    xlogreader->seg.ws_tli = targetHistory[*tliIndex].tli;
     xlogreader->readLen = XLOG_BLCKSZ;
     return true;
 }
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 833a64210b..80182621f8 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -331,12 +331,17 @@ WALDumpCloseSegment(XLogReaderState *state)
     state->seg.ws_file = -1;
 }
 
-/* pg_waldump's XLogReaderRoutine->page_read callback */
+/*
+ * pg_waldump's WAL page rader, also used as page_read callback for
+ * XLogFindNextRecord
+ */
 static bool
-WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-                XLogRecPtr targetPtr, char *readBuff)
+WALDumpReadPage(XLogReaderState *state, void *priv)
 {
-    XLogDumpPrivate *private = state->private_data;
+    XLogRecPtr    targetPagePtr = state->readPagePtr;
+    int            reqLen          = state->readLen;
+    char       *readBuff      = state->readBuf;
+    XLogDumpPrivate *private  = (XLogDumpPrivate *) priv;
     int            count = XLOG_BLCKSZ;
     WALReadError errinfo;
 
@@ -354,8 +359,8 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
         }
     }
 
-    if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
-                 &errinfo))
+    if (!WALRead(state, WALDumpOpenSegment, WALDumpCloseSegment,
+                 readBuff, targetPagePtr, count, private->timeline, &errinfo))
     {
         WALOpenSegment *seg = &errinfo.wre_seg;
         char        fname[MAXPGPATH];
@@ -375,6 +380,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                         errinfo.wre_req);
     }
 
+    Assert(count >= state->readLen);
     state->readLen = count;
     return true;
 }
@@ -1057,16 +1063,14 @@ main(int argc, char **argv)
 
     /* we have everything we need, start reading */
     xlogreader_state =
-        XLogReaderAllocate(WalSegSz, waldir,
-                           XL_ROUTINE(.page_read = WALDumpReadPage,
-                                      .segment_open = WALDumpOpenSegment,
-                                      .segment_close = WALDumpCloseSegment),
-                           &private);
+        XLogReaderAllocate(WalSegSz, waldir, WALDumpCloseSegment);
+
     if (!xlogreader_state)
         fatal_error("out of memory");
 
     /* first find a valid recptr to start from */
-    first_record = XLogFindNextRecord(xlogreader_state, private.startptr);
+    first_record = XLogFindNextRecord(xlogreader_state, private.startptr,
+                                      &WALDumpReadPage, (void*) &private);
 
     if (first_record == InvalidXLogRecPtr)
         fatal_error("could not find a valid record after %X/%X",
@@ -1089,7 +1093,16 @@ main(int argc, char **argv)
     for (;;)
     {
         /* try to read the next record */
-        record = XLogReadRecord(xlogreader_state, &errormsg);
+        while (XLogReadRecord(xlogreader_state, &record, &errormsg) ==
+               XLREAD_NEED_DATA)
+        {
+            if (!WALDumpReadPage(xlogreader_state, (void *) &private))
+            {
+                XLogTerminateRead(xlogreader_state);
+                break;
+            }
+        }
+
         if (!record)
         {
             if (!config.follow || private.endptr_reached)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index f3cf4f2f49..ff1aca719b 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -57,64 +57,15 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
-/* Function type definition for the read_page callback */
-typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader,
-                                XLogRecPtr targetPagePtr,
-                                int reqLen,
-                                XLogRecPtr targetRecPtr,
-                                char *readBuf);
+/* Function type definition for the segment cleanup callback */
+typedef void (*WALSegmentCleanupCB) (XLogReaderState *xlogreader);
+
+/* Function type definition for the open/close callbacks for WALRead() */
 typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
                                   XLogSegNo nextSegNo,
                                   TimeLineID *tli_p);
 typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
 
-typedef struct XLogReaderRoutine
-{
-    /*
-     * Data input callback
-     *
-     * This callback shall read at least reqLen valid bytes of the xlog page
-     * starting at targetPagePtr, and store them in readBuf.  The callback
-     * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
-     * -1 on failure.  The callback shall sleep, if necessary, to wait for the
-     * requested bytes to become available.  The callback will not be invoked
-     * again for the same page unless more than the returned number of bytes
-     * are needed.
-     *
-     * targetRecPtr is the position of the WAL record we're reading.  Usually
-     * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
-     * to read and verify the page or segment header, before it reads the
-     * actual WAL record it's interested in.  In that case, targetRecPtr can
-     * be used to determine which timeline to read the page from.
-     *
-     * The callback shall set ->seg.ws_tli to the TLI of the file the page was
-     * read from.
-     */
-    XLogPageReadCB page_read;
-
-    /*
-     * Callback to open the specified WAL segment for reading.  ->seg.ws_file
-     * shall be set to the file descriptor of the opened segment.  In case of
-     * failure, an error shall be raised by the callback and it shall not
-     * return.
-     *
-     * "nextSegNo" is the number of the segment to be opened.
-     *
-     * "tli_p" is an input/output argument. WALRead() uses it to pass the
-     * timeline in which the new segment should be found, but the callback can
-     * use it to return the TLI that it actually opened.
-     */
-    WALSegmentOpenCB segment_open;
-
-    /*
-     * WAL segment close callback.  ->seg.ws_file shall be set to a negative
-     * number.
-     */
-    WALSegmentCloseCB segment_close;
-} XLogReaderRoutine;
-
-#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
-
 typedef struct
 {
     /* Is this block ref in use? */
@@ -144,12 +95,36 @@ typedef struct
     uint16        data_bufsz;
 } DecodedBkpBlock;
 
+/* Return code from XLogReadRecord */
+typedef enum XLogReadRecordResult
+{
+    XLREAD_SUCCESS,                /* record is successfully read */
+    XLREAD_NEED_DATA,            /* need more data. see XLogReadRecord. */
+    XLREAD_FAIL                    /* failed during reading a record */
+}            XLogReadRecordResult;
+
+/*
+ * internal state of XLogReadRecord
+ *
+ * XLogReadState runs a state machine while reading a record. Theses states
+ * are not seen outside the function. Each state may repeat several times
+ * exiting requesting caller for new data. See the comment of XLogReadRecrod
+ * for details.
+ */
+typedef enum XLogReadRecordState
+{
+    XLREAD_NEXT_RECORD,
+    XLREAD_TOT_LEN,
+    XLREAD_FIRST_FRAGMENT,
+    XLREAD_CONTINUATION
+}            XLogReadRecordState;
+
 struct XLogReaderState
 {
     /*
      * Operational callbacks
      */
-    XLogReaderRoutine routine;
+    WALSegmentCleanupCB cleanup_cb;
 
     /* ----------------------------------------
      * Public parameters
@@ -162,18 +137,14 @@ struct XLogReaderState
      */
     uint64        system_identifier;
 
-    /*
-     * Opaque data for callbacks to use.  Not used by XLogReader.
-     */
-    void       *private_data;
-
     /*
      * Start and end point of last record read.  EndRecPtr is also used as the
      * position to read next.  Calling XLogBeginRead() sets EndRecPtr to the
      * starting position and ReadRecPtr to invalid.
      */
-    XLogRecPtr    ReadRecPtr;        /* start of last record read */
+    XLogRecPtr    ReadRecPtr;        /* start of last record read or being read */
     XLogRecPtr    EndRecPtr;        /* end+1 of last record read */
+    XLogRecPtr    PrevRecPtr;        /* start of previous record read */
 
     /*
      * Set at the end of recovery: the start point of a partial record at the
@@ -196,7 +167,9 @@ struct XLogReaderState
                                  * read by reader, which must be larger than
                                  * the request, or -1 on error */
     char       *readBuf;        /* buffer to store data */
-    bool        page_verified;    /* is the page on the buffer verified? */
+    bool        page_verified;    /* is the page header on the buffer verified? */
+    bool        record_verified;    /* is the current record header verified? */
+
 
     /* ----------------------------------------
      * Decoded representation of current record
@@ -237,8 +210,6 @@ struct XLogReaderState
     XLogRecPtr    latestPagePtr;
     TimeLineID    latestPageTLI;

-    /* beginning of the WAL record being read. */
-    XLogRecPtr    currRecPtr;
     /* timeline to read it from, 0 if a lookup is required */
     TimeLineID    currTLI;
 
@@ -265,6 +236,15 @@ struct XLogReaderState
     char       *readRecordBuf;
     uint32        readRecordBufSize;
 
+    /*
+     * XLogReadRecord() state
+     */
+    XLogReadRecordState readRecordState;    /* state machine state */
+    int            recordGotLen;    /* amount of current record that has already
+                                 * been read */
+    int            recordRemainLen;    /* length of current record that remains */
+    XLogRecPtr    recordContRecPtr;    /* where the current record continues */
+
     /* Buffer to hold error message */
     char       *errormsg_buf;
 };
@@ -272,9 +252,7 @@ struct XLogReaderState
 /* Get a new XLogReader */
 extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
                                            const char *waldir,
-                                           XLogReaderRoutine *routine,
-                                           void *private_data);
-extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
+                                           WALSegmentCleanupCB cleanup_cb);
 
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
@@ -282,12 +260,19 @@ extern void XLogReaderFree(XLogReaderState *state);
 /* Position the XLogReader to given record */
 extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
 #ifdef FRONTEND
-extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
+/* Function type definition for the read_page callback */
+typedef bool (*XLogFindNextRecordCB) (XLogReaderState *xlogreader,
+                                      void *private);
+extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+                                     XLogFindNextRecordCB read_page, void *private);
 #endif                            /* FRONTEND */
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
-extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-                                         char **errormsg);
+extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
+                                           XLogRecord **record,
+                                           char **errormsg);
+/* Finalize function when the caller of XLogReadRecord failed */
+extern void XLogTerminateRead(XLogReaderState *state);
 
 /* Validate a page */
 extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
@@ -307,6 +292,7 @@ typedef struct WALReadError
 } WALReadError;
 
 extern bool WALRead(XLogReaderState *state,
+                    WALSegmentOpenCB segopenfn, WALSegmentCloseCB sgclosefn,
                     char *buf, XLogRecPtr startptr, Size count,
                     TimeLineID tli, WALReadError *errinfo);
 
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 8669f7eeb3..e403d253d6 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -89,9 +89,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
-extern bool    read_local_xlog_page(XLogReaderState *state,
-                                 XLogRecPtr targetPagePtr, int reqLen,
-                                 XLogRecPtr targetRecPtr, char *cur_page);
+extern bool read_local_xlog_page(XLogReaderState *state);
 extern void wal_segment_open(XLogReaderState *state,
                              XLogSegNo nextSegNo,
                              TimeLineID *tli_p);
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 614035e215..fecffdb3f6 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -390,7 +390,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of resource consumption during sort operations;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index e0f513b773..3346188b48 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -29,6 +29,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
                                                          TransactionId xid
 );
 
+typedef struct LogicalDecodingContext LogicalDecodingContext;
+
+typedef bool (*LogicalDecodingXLogPageReadCB)(XLogReaderState *ctx);
+
 typedef struct LogicalDecodingContext
 {
     /* memory context this is all allocated in */
@@ -39,6 +43,7 @@ typedef struct LogicalDecodingContext
 
     /* infrastructure pieces for decoding */
     XLogReaderState *reader;
+    LogicalDecodingXLogPageReadCB page_read;
     struct ReorderBuffer *reorder;
     struct SnapBuild *snapshot_builder;
 
@@ -115,14 +120,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
                                                          List *output_plugin_options,
                                                          bool need_full_snapshot,
                                                          XLogRecPtr restart_lsn,
-                                                         XLogReaderRoutine *xl_routine,
+                                                         LogicalDecodingXLogPageReadCB page_read,
+                                                         WALSegmentCleanupCB cleanup_cb,
                                                          LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                          LogicalOutputPluginWriterWrite do_write,
                                                          LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
                                                      List *output_plugin_options,
                                                      bool fast_forward,
-                                                     XLogReaderRoutine *xl_routine,
+                                                     LogicalDecodingXLogPageReadCB page_read,
+                                                     WALSegmentCleanupCB cleanup_cb,
                                                      LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                      LogicalOutputPluginWriterWrite do_write,
                                                      LogicalOutputPluginWriterUpdateProgress update_progress);
-- 
2.27.0

From ee9199ce27cb11b5d0fc03f5dd4fdbee7a2a6d6d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Thu, 30 Sep 2021 13:16:51 +0900
Subject: [PATCH v18 3/5] Remove globals readOff, readLen and readSegNo.

The first two global variables are duplicated in XLogReaderState.
Remove them, and also readSegNo, which should move into that struct too.

Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
---
 src/backend/access/transam/xlog.c | 77 ++++++++++++++-----------------
 1 file changed, 35 insertions(+), 42 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 316ba256f6..c4fe006776 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -809,17 +809,13 @@ static XLogSegNo openLogSegNo = 0;
 
 /*
  * These variables are used similarly to the ones above, but for reading
- * the XLOG.  readOff is the offset of the page just read, readLen
- * indicates how much of it has been read into readBuf, and readSource
+ * the XLOG.  readOff is the offset of the page just read, readSource
  * indicates where we got the currently open file from.
  * Note: we could use Reserve/ReleaseExternalFD to track consumption of
  * this FD too; but it doesn't currently seem worthwhile, since the XLOG is
  * not read by general-purpose sessions.
  */
 static int    readFile = -1;
-static XLogSegNo readSegNo = 0;
-static uint32 readOff = 0;
-static uint32 readLen = 0;
 static XLogSource readSource = XLOG_FROM_ANY;
 
 /*
@@ -911,10 +907,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int    XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                          XLogSource source, bool notfoundOk);
 static int    XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
-static bool XLogPageRead(XLogReaderState *xlogreader,
+static bool XLogPageRead(XLogReaderState *state,
                          bool fetching_ckpt, int emode, bool randAccess);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
-                                        bool fetching_ckpt, XLogRecPtr tliRecPtr);
+                                        bool fetching_ckpt,
+                                        XLogRecPtr tliRecPtr,
+                                        XLogSegNo readSegNo);
 static void XLogShutdownWalRcv(void);
 static int    emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
 static void XLogFileClose(void);
@@ -7894,7 +7892,8 @@ StartupXLOG(void)
         XLogRecPtr    pageBeginPtr;
 
         pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
-        Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size));
+        Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) ==
+               XLogSegmentOffset(pageBeginPtr, wal_segment_size));
 
         firstIdx = XLogRecPtrToBufIdx(EndOfLog);
 
@@ -12292,13 +12291,14 @@ CancelBackup(void)
  * sleep and retry.
  */
 static bool
-XLogPageRead(XLogReaderState *xlogreader,
+XLogPageRead(XLogReaderState *state,
              bool fetching_ckpt, int emode, bool randAccess)
 {
-    char *readBuf                = xlogreader->readBuf;
-    XLogRecPtr targetPagePtr    = xlogreader->readPagePtr;
-    int reqLen                    = xlogreader->readLen;
-    XLogRecPtr targetRecPtr        = xlogreader->ReadRecPtr;
+    char *readBuf                = state->readBuf;
+    XLogRecPtr    targetPagePtr    = state->readPagePtr;
+    int            reqLen            = state->readLen;
+    int            readLen            = 0;
+    XLogRecPtr    targetRecPtr    = state->ReadRecPtr;
     uint32        targetPageOff;
     XLogSegNo    targetSegNo PG_USED_FOR_ASSERTS_ONLY;
     int            r;
@@ -12311,7 +12311,7 @@ XLogPageRead(XLogReaderState *xlogreader,
      * is not in the currently open one.
      */
     if (readFile >= 0 &&
-        !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size))
+        !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size))
     {
         /*
          * Request a restartpoint if we've replayed too much xlog since the
@@ -12319,10 +12319,10 @@ XLogPageRead(XLogReaderState *xlogreader,
          */
         if (ArchiveRecoveryRequested && IsUnderPostmaster)
         {
-            if (XLogCheckpointNeeded(readSegNo))
+            if (XLogCheckpointNeeded(state->seg.ws_segno))
             {
                 (void) GetRedoRecPtr();
-                if (XLogCheckpointNeeded(readSegNo))
+                if (XLogCheckpointNeeded(state->seg.ws_segno))
                     RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
             }
         }
@@ -12332,7 +12332,7 @@ XLogPageRead(XLogReaderState *xlogreader,
         readSource = XLOG_FROM_ANY;
     }
 
-    XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size);
+    XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size);
 
 retry:
     /* See if we need to retrieve more data */
@@ -12341,17 +12341,14 @@ retry:
          flushedUpto < targetPagePtr + reqLen))
     {
         if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
-                                         randAccess,
-                                         fetching_ckpt,
-                                         targetRecPtr))
+                                         randAccess, fetching_ckpt,
+                                         targetRecPtr, state->seg.ws_segno))
         {
             if (readFile >= 0)
                 close(readFile);
             readFile = -1;
-            readLen = 0;
             readSource = XLOG_FROM_ANY;
-
-            xlogreader->readLen = -1;
+            state->readLen = -1;
             return false;
         }
     }
@@ -12379,40 +12376,36 @@ retry:
     else
         readLen = XLOG_BLCKSZ;
 
-    /* Read the requested page */
-    readOff = targetPageOff;
-
     pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-    r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
+    r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff);
     if (r != XLOG_BLCKSZ)
     {
         char        fname[MAXFNAMELEN];
         int            save_errno = errno;
 
         pgstat_report_wait_end();
-        XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
+        XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size);
         if (r < 0)
         {
             errno = save_errno;
             ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                     (errcode_for_file_access(),
                      errmsg("could not read from log segment %s, offset %u: %m",
-                            fname, readOff)));
+                            fname, targetPageOff)));
         }
         else
             ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                     (errcode(ERRCODE_DATA_CORRUPTED),
                      errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-                            fname, readOff, r, (Size) XLOG_BLCKSZ)));
+                            fname, targetPageOff, r, (Size) XLOG_BLCKSZ)));
         goto next_record_is_invalid;
     }
     pgstat_report_wait_end();
 
-    Assert(targetSegNo == readSegNo);
-    Assert(targetPageOff == readOff);
+    Assert(targetSegNo == state->seg.ws_segno);
     Assert(reqLen <= readLen);
 
-    xlogreader->seg.ws_tli = curFileTLI;
+    state->seg.ws_tli = curFileTLI;
 
     /*
      * Check the page header immediately, so that we can retry immediately if
@@ -12447,23 +12440,23 @@ retry:
      * responsible for the validation.
      */
     if (StandbyMode &&
-        !XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf))
+        !XLogReaderValidatePageHeader(state, targetPagePtr, readBuf))
     {
         /*
          * Emit this error right now then retry this page immediately. Use
          * errmsg_internal() because the message was already translated.
          */
-        if (xlogreader->errormsg_buf[0])
+        if (state->errormsg_buf[0])
             ereport(emode_for_corrupt_record(emode, EndRecPtr),
-                    (errmsg_internal("%s", xlogreader->errormsg_buf)));
+                    (errmsg_internal("%s", state->errormsg_buf)));
 
         /* reset any error XLogReaderValidatePageHeader() might have set */
-        xlogreader->errormsg_buf[0] = '\0';
+        state->errormsg_buf[0] = '\0';
         goto next_record_is_invalid;
     }
 
-    Assert(xlogreader->readPagePtr == targetPagePtr);
-    xlogreader->readLen = readLen;
+    Assert(state->readPagePtr == targetPagePtr);
+    state->readLen = readLen;
     return true;
 
 next_record_is_invalid:
@@ -12472,14 +12465,13 @@ next_record_is_invalid:
     if (readFile >= 0)
         close(readFile);
     readFile = -1;
-    readLen = 0;
     readSource = XLOG_FROM_ANY;
 
     /* In standby-mode, keep trying */
     if (StandbyMode)
         goto retry;
 
-    xlogreader->readLen = -1;
+    state->readLen = -1;
     return false;
 }
 
@@ -12511,7 +12503,8 @@ next_record_is_invalid:
  */
 static bool
 WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
-                            bool fetching_ckpt, XLogRecPtr tliRecPtr)
+                            bool fetching_ckpt, XLogRecPtr tliRecPtr,
+                            XLogSegNo readSegNo)
 {
     static TimestampTz last_fail_time = 0;
     TimestampTz now;
-- 
2.27.0

From 8c6d4b5da61aeca9b12f8f8ea4b93ecc3f8aa769 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Thu, 30 Sep 2021 14:14:23 +0900
Subject: [PATCH v18 4/5] Make XLogFindNextRecord not use callback function

The last function that uses page-read callback is
XLogFindNextRecord. Lets make it free from call-back.  This also
simplifies the interface of WALDumpReadPage.
---
 src/backend/access/transam/xlogreader.c | 161 +++++++++++++-----------
 src/bin/pg_waldump/pg_waldump.c         | 118 +++++++++--------
 src/bin/pg_waldump/t/001_basic.pl       |  35 +++++-
 src/include/access/xlogreader.h         |  13 +-
 4 files changed, 193 insertions(+), 134 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 4501a09245..e20c4dc4c7 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -750,9 +750,9 @@ err:
 /*
  * Terminate read WAL.
  *
- * When the caller failed to read the data requested from XLogReadRecord, it is
- * supposed to call this function to set the correct reader state to reflect
- * the failure.
+ * When the caller failed to read the data requested from XLogReadRecord or
+ * XLogFindNextRecord, it is supposed to call this function to set the correct
+ * reader state to reflect the failure.
  */
 void
 XLogTerminateRead(XLogReaderState *state)
@@ -1168,6 +1168,23 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
  * here.
  */
 
+XLogFindNextRecordState *
+InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr)
+{
+    XLogFindNextRecordState *state = (XLogFindNextRecordState *)
+        palloc_extended(sizeof(XLogFindNextRecordState),
+                        MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
+    if (!state)
+        return NULL;
+
+    state->reader_state = reader_state;
+    state->targetRecPtr = start_ptr;
+    state->currRecPtr = start_ptr;
+    state->page_found = false;
+
+    return state;
+}
+
 /*
  * Find the first record with an lsn >= RecPtr.
  *
@@ -1179,123 +1196,115 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
  * This positions the reader, like XLogBeginRead(), so that the next call to
  * XLogReadRecord() will read the next valid record.
  */
-XLogRecPtr
-XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr,
-                   XLogFindNextRecordCB read_page, void *private)
+bool
+XLogFindNextRecord(XLogFindNextRecordState *state)
 {
-    XLogRecPtr    tmpRecPtr;
-    XLogRecPtr    found = InvalidXLogRecPtr;
     XLogPageHeader header;
     XLogRecord *record;
     XLogReadRecordResult result;
     char       *errormsg;
 
-    Assert(!XLogRecPtrIsInvalid(RecPtr));
+    XLogRecPtr    targetPagePtr;
+    int            targetRecOff;
+    uint32        pageHeaderSize;
 
     /*
      * skip over potential continuation data, keeping in mind that it may span
      * multiple pages
      */
-    tmpRecPtr = RecPtr;
-    while (true)
+restart:
+    if (!state->page_found)
     {
-        XLogRecPtr    targetPagePtr;
-        int            targetRecOff;
-        uint32        pageHeaderSize;
+        Assert(!XLogRecPtrIsInvalid(state->currRecPtr));
 
         /*
          * Compute targetRecOff. It should typically be equal or greater than
          * short page-header since a valid record can't start anywhere before
          * that, except when caller has explicitly specified the offset that
          * falls somewhere there or when we are skipping multi-page
-         * continuation record. It doesn't matter though because
-         * XLogNeedData() is prepared to handle that and will read at least
-         * short page-header worth of data
+         * continuation record. It doesn't matter though because XLogNeedData()
+         * is prepared to handle that and will read at least short page-header
+         * worth of data
          */
-        targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
+        targetRecOff = state->currRecPtr % XLOG_BLCKSZ;
 
         /* scroll back to page boundary */
-        targetPagePtr = tmpRecPtr - targetRecOff;
+        targetPagePtr = state->currRecPtr - targetRecOff;
 
-        while (XLogNeedData(state, targetPagePtr, targetRecOff,
-                            targetRecOff != 0))
-        {
-            if (!read_page(state, private))
-                break;
-        }
+        if (XLogNeedData(state->reader_state, targetPagePtr, targetRecOff,
+                         targetRecOff != 0))
+            return true;
 
-        if (!state->page_verified)
+        if (!state->reader_state->page_verified)
             goto err;
 
-        header = (XLogPageHeader) state->readBuf;
+        header = (XLogPageHeader) state->reader_state->readBuf;
 
         pageHeaderSize = XLogPageHeaderSize(header);
 
         /* we should have read the page header */
-        Assert(state->readLen >= pageHeaderSize);
+        Assert(state->reader_state->readLen >= pageHeaderSize);
 
-        /* skip over potential continuation data */
         if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
         {
-            /*
-             * If the length of the remaining continuation data is more than
-             * what can fit in this page, the continuation record crosses over
-             * this page. Read the next page and try again. xlp_rem_len in the
-             * next page header will contain the remaining length of the
-             * continuation data
-             *
-             * Note that record headers are MAXALIGN'ed
-             */
-            if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
-                tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
-            else
+            if (MAXALIGN(header->xlp_rem_len) >=
+                (XLOG_BLCKSZ - pageHeaderSize))
             {
                 /*
-                 * The previous continuation record ends in this page. Set
-                 * tmpRecPtr to point to the first valid record
+                 * If the length of the remaining continuation data is more
+                 * than what can fit in this page, the continuation record
+                 * crosses over this page. Read the next page and try
+                 * again. xlp_rem_len in the next page header will contain the
+                 * remaining length of the continuation data
+                 *
+                 * Note that record headers are MAXALIGN'ed
                  */
-                tmpRecPtr = targetPagePtr + pageHeaderSize
-                    + MAXALIGN(header->xlp_rem_len);
-                break;
+                state->currRecPtr = targetPagePtr + XLOG_BLCKSZ;
+                goto restart;
             }
+
+            /*
+             * The previous continuation record ends in this page. Set
+             * tmpRecPtr to point to the first valid record
+             */
+            state->currRecPtr = targetPagePtr + pageHeaderSize
+                + MAXALIGN(header->xlp_rem_len);
         }
         else
-        {
-            tmpRecPtr = targetPagePtr + pageHeaderSize;
-            break;
-        }
+            state->currRecPtr = targetPagePtr + pageHeaderSize;
+
+        /*
+         * we know now that currRecPtr is an address pointing to a valid
+         * XLogRecord because either we're at the first record after the
+         * beginning of a page or we just jumped over the remaining data of
+         * a continuation.
+         */
+        XLogBeginRead(state->reader_state, state->currRecPtr);
+        state->page_found = true;
     }
 
-    /*
-     * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
-     * because either we're at the first record after the beginning of a page
-     * or we just jumped over the remaining data of a continuation.
-     */
-    XLogBeginRead(state, tmpRecPtr);
-    while ((result = XLogReadRecord(state, &record, &errormsg)) !=
-           XLREAD_FAIL)
-    {
-        if (result == XLREAD_NEED_DATA)
-        {
-            if (!read_page(state, private))
-                goto err;
-            continue;
-        }
+    result = XLogReadRecord(state->reader_state, &record, &errormsg);
 
-        /* past the record we've found, break out */
-        if (RecPtr <= state->ReadRecPtr)
-        {
-            /* Rewind the reader to the beginning of the last record. */
-            found = state->ReadRecPtr;
-            XLogBeginRead(state, found);
-            return found;
-        }
-    }
+    if (result == XLREAD_FAIL)
+        goto err;
+
+    if (result == XLREAD_NEED_DATA)
+        return true;
+
+    /* past the record we've found, break out */
+    if (state->reader_state->ReadRecPtr < state->targetRecPtr)
+        goto restart;
+
+    /* Rewind the reader to the beginning of the last record. */
+    state->currRecPtr = state->reader_state->ReadRecPtr;
+    XLogBeginRead(state->reader_state, state->currRecPtr);
+    return false;
 
 err:
-    XLogReaderInvalReadState(state);
+    XLogReaderInvalReadState(state->reader_state);
 
-    return InvalidXLogRecPtr;
+    state->currRecPtr = InvalidXLogRecPtr;;
+    return false;
 }
 
 #endif                            /* FRONTEND */
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 80182621f8..d1d7427db0 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -29,14 +29,6 @@ static const char *progname;
 
 static int    WalSegSz;
 
-typedef struct XLogDumpPrivate
-{
-    TimeLineID    timeline;
-    XLogRecPtr    startptr;
-    XLogRecPtr    endptr;
-    bool        endptr_reached;
-} XLogDumpPrivate;
-
 typedef struct XLogDumpConfig
 {
     /* display options */
@@ -332,35 +324,40 @@ WALDumpCloseSegment(XLogReaderState *state)
 }
 
 /*
- * pg_waldump's WAL page rader, also used as page_read callback for
- * XLogFindNextRecord
+ * pg_waldump's WAL page rader
+ *
+ * timeline and startptr specifies the LSN, and reads up to endptr.
  */
 static bool
-WALDumpReadPage(XLogReaderState *state, void *priv)
+WALDumpReadPage(XLogReaderState *state, TimeLineID timeline,
+                XLogRecPtr startptr, XLogRecPtr endptr)
 {
     XLogRecPtr    targetPagePtr = state->readPagePtr;
     int            reqLen          = state->readLen;
     char       *readBuff      = state->readBuf;
-    XLogDumpPrivate *private  = (XLogDumpPrivate *) priv;
     int            count = XLOG_BLCKSZ;
     WALReadError errinfo;
 
-    if (private->endptr != InvalidXLogRecPtr)
+    /* determine the number of bytes to read on the page */
+    if (endptr != InvalidXLogRecPtr)
     {
-        if (targetPagePtr + XLOG_BLCKSZ <= private->endptr)
+        if (targetPagePtr + XLOG_BLCKSZ <= endptr)
             count = XLOG_BLCKSZ;
-        else if (targetPagePtr + reqLen <= private->endptr)
-            count = private->endptr - targetPagePtr;
+        else if (targetPagePtr + reqLen <= endptr)
+            count = endptr - targetPagePtr;
         else
         {
-            private->endptr_reached = true;
+            /* Notify xlogreader that we didn't read at all */
             state->readLen = -1;
             return false;
         }
     }
 
+    /* We should read more than requested by xlogreader */
+    Assert(count >= state->readLen);
+
     if (!WALRead(state, WALDumpOpenSegment, WALDumpCloseSegment,
-                 readBuff, targetPagePtr, count, private->timeline, &errinfo))
+                 readBuff, targetPagePtr, count, timeline, &errinfo))
     {
         WALOpenSegment *seg = &errinfo.wre_seg;
         char        fname[MAXPGPATH];
@@ -380,7 +377,7 @@ WALDumpReadPage(XLogReaderState *state, void *priv)
                         errinfo.wre_req);
     }
 
-    Assert(count >= state->readLen);
+    /* Notify xlogreader of how many bytes we have read */
     state->readLen = count;
     return true;
 }
@@ -774,7 +771,10 @@ main(int argc, char **argv)
     uint32        xlogid;
     uint32        xrecoff;
     XLogReaderState *xlogreader_state;
-    XLogDumpPrivate private;
+    XLogFindNextRecordState *findnext_state;
+    TimeLineID    timeline;
+    XLogRecPtr    startptr;
+    XLogRecPtr    endptr;
     XLogDumpConfig config;
     XLogDumpStats stats;
     XLogRecord *record;
@@ -820,14 +820,9 @@ main(int argc, char **argv)
         }
     }
 
-    memset(&private, 0, sizeof(XLogDumpPrivate));
-    memset(&config, 0, sizeof(XLogDumpConfig));
-    memset(&stats, 0, sizeof(XLogDumpStats));
-
-    private.timeline = 1;
-    private.startptr = InvalidXLogRecPtr;
-    private.endptr = InvalidXLogRecPtr;
-    private.endptr_reached = false;
+    timeline = 1;
+    startptr = InvalidXLogRecPtr;
+    endptr = InvalidXLogRecPtr;
 
     config.quiet = false;
     config.bkp_details = false;
@@ -862,7 +857,7 @@ main(int argc, char **argv)
                                  optarg);
                     goto bad_argument;
                 }
-                private.endptr = (uint64) xlogid << 32 | xrecoff;
+                endptr = (uint64) xlogid << 32 | xrecoff;
                 break;
             case 'f':
                 config.follow = true;
@@ -915,10 +910,10 @@ main(int argc, char **argv)
                     goto bad_argument;
                 }
                 else
-                    private.startptr = (uint64) xlogid << 32 | xrecoff;
+                    startptr = (uint64) xlogid << 32 | xrecoff;
                 break;
             case 't':
-                if (sscanf(optarg, "%d", &private.timeline) != 1)
+                if (sscanf(optarg, "%d", &timeline) != 1)
                 {
                     pg_log_error("could not parse timeline \"%s\"", optarg);
                     goto bad_argument;
@@ -995,21 +990,21 @@ main(int argc, char **argv)
         close(fd);
 
         /* parse position from file */
-        XLogFromFileName(fname, &private.timeline, &segno, WalSegSz);
+        XLogFromFileName(fname, &timeline, &segno, WalSegSz);
 
-        if (XLogRecPtrIsInvalid(private.startptr))
-            XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, private.startptr);
-        else if (!XLByteInSeg(private.startptr, segno, WalSegSz))
+        if (XLogRecPtrIsInvalid(startptr))
+            XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, startptr);
+        else if (!XLByteInSeg(startptr, segno, WalSegSz))
         {
             pg_log_error("start WAL location %X/%X is not inside file \"%s\"",
-                         LSN_FORMAT_ARGS(private.startptr),
+                         LSN_FORMAT_ARGS(startptr),
                          fname);
             goto bad_argument;
         }
 
         /* no second file specified, set end position */
-        if (!(optind + 1 < argc) && XLogRecPtrIsInvalid(private.endptr))
-            XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, private.endptr);
+        if (!(optind + 1 < argc) && XLogRecPtrIsInvalid(endptr))
+            XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, endptr);
 
         /* parse ENDSEG if passed */
         if (optind + 1 < argc)
@@ -1025,26 +1020,26 @@ main(int argc, char **argv)
             close(fd);
 
             /* parse position from file */
-            XLogFromFileName(fname, &private.timeline, &endsegno, WalSegSz);
+            XLogFromFileName(fname, &timeline, &endsegno, WalSegSz);
 
             if (endsegno < segno)
                 fatal_error("ENDSEG %s is before STARTSEG %s",
                             argv[optind + 1], argv[optind]);
 
-            if (XLogRecPtrIsInvalid(private.endptr))
+            if (XLogRecPtrIsInvalid(endptr))
                 XLogSegNoOffsetToRecPtr(endsegno + 1, 0, WalSegSz,
-                                        private.endptr);
+                                        endptr);
 
             /* set segno to endsegno for check of --end */
             segno = endsegno;
         }
 
 
-        if (!XLByteInSeg(private.endptr, segno, WalSegSz) &&
-            private.endptr != (segno + 1) * WalSegSz)
+        if (!XLByteInSeg(endptr, segno, WalSegSz) &&
+            endptr != (segno + 1) * WalSegSz)
         {
             pg_log_error("end WAL location %X/%X is not inside file \"%s\"",
-                         LSN_FORMAT_ARGS(private.endptr),
+                         LSN_FORMAT_ARGS(endptr),
                          argv[argc - 1]);
             goto bad_argument;
         }
@@ -1053,7 +1048,7 @@ main(int argc, char **argv)
         waldir = identify_target_directory(waldir, NULL);
 
     /* we don't know what to print */
-    if (XLogRecPtrIsInvalid(private.startptr))
+    if (XLogRecPtrIsInvalid(startptr))
     {
         pg_log_error("no start WAL location given");
         goto bad_argument;
@@ -1068,27 +1063,40 @@ main(int argc, char **argv)
     if (!xlogreader_state)
         fatal_error("out of memory");
 
+    findnext_state =
+        InitXLogFindNextRecord(xlogreader_state, startptr);
+
+    if (!findnext_state)
+        fatal_error("out of memory");
+
     /* first find a valid recptr to start from */
-    first_record = XLogFindNextRecord(xlogreader_state, private.startptr,
-                                      &WALDumpReadPage, (void*) &private);
+    while (XLogFindNextRecord(findnext_state))
+    {
+        if (!WALDumpReadPage(xlogreader_state, timeline, startptr, endptr))
+        {
+            XLogTerminateRead(xlogreader_state);
+            break;
+        }
+    }
 
+    first_record = findnext_state->currRecPtr;
     if (first_record == InvalidXLogRecPtr)
         fatal_error("could not find a valid record after %X/%X",
-                    LSN_FORMAT_ARGS(private.startptr));
+                    LSN_FORMAT_ARGS(startptr));
 
     /*
      * Display a message that we're skipping data if `from` wasn't a pointer
      * to the start of a record and also wasn't a pointer to the beginning of
      * a segment (e.g. we were used in file mode).
      */
-    if (first_record != private.startptr &&
-        XLogSegmentOffset(private.startptr, WalSegSz) != 0)
+    if (first_record != startptr &&
+        XLogSegmentOffset(startptr, WalSegSz) != 0)
         printf(ngettext("first record is after %X/%X, at %X/%X, skipping over %u byte\n",
                         "first record is after %X/%X, at %X/%X, skipping over %u bytes\n",
-                        (first_record - private.startptr)),
-               LSN_FORMAT_ARGS(private.startptr),
+                        (first_record - startptr)),
+               LSN_FORMAT_ARGS(startptr),
                LSN_FORMAT_ARGS(first_record),
-               (uint32) (first_record - private.startptr));
+               (uint32) (first_record - startptr));
 
     for (;;)
     {
@@ -1096,7 +1104,7 @@ main(int argc, char **argv)
         while (XLogReadRecord(xlogreader_state, &record, &errormsg) ==
                XLREAD_NEED_DATA)
         {
-            if (!WALDumpReadPage(xlogreader_state, (void *) &private))
+            if (!WALDumpReadPage(xlogreader_state, timeline, startptr, endptr))
             {
                 XLogTerminateRead(xlogreader_state);
                 break;
@@ -1105,7 +1113,7 @@ main(int argc, char **argv)
 
         if (!record)
         {
-            if (!config.follow || private.endptr_reached)
+            if (!config.follow)
                 break;
             else
             {
diff --git a/src/bin/pg_waldump/t/001_basic.pl b/src/bin/pg_waldump/t/001_basic.pl
index fb2f807dc3..def8e03250 100644
--- a/src/bin/pg_waldump/t/001_basic.pl
+++ b/src/bin/pg_waldump/t/001_basic.pl
@@ -3,9 +3,42 @@
 
 use strict;
 use warnings;
+use PostgresNode;
 use TestLib;
-use Test::More tests => 8;
+use Test::More tests => 11;
 
 program_help_ok('pg_waldump');
 program_version_ok('pg_waldump');
 program_options_handling_ok('pg_waldump');
+
+# Test: check if pg_waldump correctly skips over the contiulation
+# pages while seeking for the first record.
+my $node = PostgresNode->new('primary');
+$node->init(allows_streaming => 1);
+$node->append_conf('postgresql.conf', 'wal_keep_size=1GB');
+$node->start;
+
+my $start_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $start_file = $node->safe_psql('postgres', "SELECT pg_walfile_name(pg_current_wal_lsn())");
+
+# insert a record spans over multiple pages
+$node->safe_psql('postgres',
+    qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+);
+
+# run pg_waldump from the second byte of a record
+my ($file, $off) = split(/\//, $start_lsn);
+my $target_lsn = sprintf("%X/%X", hex($file), hex($off) + 1);
+my ($stdout, $stderr) =
+  run_command(["pg_waldump", '-s', $target_lsn,
+              $node->basedir . "/pgdata/pg_wal/" . $start_file]);
+
+ok ($stdout =~
+    /first record is after ([0-9A-F\/]+), at ([0-9A-F\/]+), skipping over ([0-9]+) bytes/,
+    'output contains required information');
+my $echoed_target = $1;
+my $first_record = $2;
+my $skipped_bytes = $3;
+
+ok ($echoed_target eq $target_lsn, 'target LSN is correct');
+ok ($skipped_bytes > 8192, 'skipped more than a page');
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index ff1aca719b..1a1e1939e0 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -56,6 +56,7 @@ typedef struct WALSegmentContext
 } WALSegmentContext;
 
 typedef struct XLogReaderState XLogReaderState;
+typedef struct XLogFindNextRecordState XLogFindNextRecordState;
 
 /* Function type definition for the segment cleanup callback */
 typedef void (*WALSegmentCleanupCB) (XLogReaderState *xlogreader);
@@ -249,6 +250,14 @@ struct XLogReaderState
     char       *errormsg_buf;
 };
 
+struct XLogFindNextRecordState
+{
+    XLogReaderState *reader_state;
+    XLogRecPtr        targetRecPtr;
+    XLogRecPtr        currRecPtr;
+    bool            page_found;
+};
+
 /* Get a new XLogReader */
 extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
                                            const char *waldir,
@@ -263,8 +272,8 @@ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
 /* Function type definition for the read_page callback */
 typedef bool (*XLogFindNextRecordCB) (XLogReaderState *xlogreader,
                                       void *private);
-extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr,
-                                     XLogFindNextRecordCB read_page, void *private);
+extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr);
+extern bool XLogFindNextRecord(XLogFindNextRecordState *state);
 #endif                            /* FRONTEND */
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
-- 
2.27.0

From 44abe155352c549631ed71ceb4179572d7ded7b3 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Thu, 7 Oct 2021 09:56:45 +0900
Subject: [PATCH v18 5/5] Split readLen and reqLen of XLogReaderState.

The variable is used as both out and in parameter of page-read
functions.  Separate the varialbe according to the roles.  To avoid
confusion between the two variables, provide a setter function for
page-read functions to set readLen.
---
 src/backend/access/transam/xlog.c       | 10 +++++-----
 src/backend/access/transam/xlogreader.c | 17 ++++++++---------
 src/backend/access/transam/xlogutils.c  |  6 +++---
 src/backend/replication/walsender.c     |  6 +++---
 src/bin/pg_rewind/parsexlog.c           | 12 +++++++-----
 src/bin/pg_waldump/pg_waldump.c         |  6 +++---
 src/include/access/xlogreader.h         | 20 +++++++++++++++-----
 7 files changed, 44 insertions(+), 33 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c4fe006776..4bbc4d3c6a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -12296,7 +12296,7 @@ XLogPageRead(XLogReaderState *state,
 {
     char *readBuf                = state->readBuf;
     XLogRecPtr    targetPagePtr    = state->readPagePtr;
-    int            reqLen            = state->readLen;
+    int            reqLen            = state->reqLen;
     int            readLen            = 0;
     XLogRecPtr    targetRecPtr    = state->ReadRecPtr;
     uint32        targetPageOff;
@@ -12348,7 +12348,7 @@ retry:
                 close(readFile);
             readFile = -1;
             readSource = XLOG_FROM_ANY;
-            state->readLen = -1;
+            XLogReaderNotifySize(state, -1);
             return false;
         }
     }
@@ -12403,7 +12403,7 @@ retry:
     pgstat_report_wait_end();
 
     Assert(targetSegNo == state->seg.ws_segno);
-    Assert(reqLen <= readLen);
+    Assert(readLen >= reqLen);
 
     state->seg.ws_tli = curFileTLI;
 
@@ -12456,7 +12456,7 @@ retry:
     }
 
     Assert(state->readPagePtr == targetPagePtr);
-    state->readLen = readLen;
+    XLogReaderNotifySize(state, readLen);
     return true;
 
 next_record_is_invalid:
@@ -12471,7 +12471,7 @@ next_record_is_invalid:
     if (StandbyMode)
         goto retry;
 
-    state->readLen = -1;
+    XLogReaderNotifySize(state, -1);
     return false;
 }
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e20c4dc4c7..96fa1baf98 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -110,7 +110,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
     WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
                        waldir);
 
-    /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
+    /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */
     state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
                                           MCXT_ALLOC_NO_OOM);
     if (!state->errormsg_buf)
@@ -264,12 +264,12 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * record being stored in *record. Otherwise *record is NULL.
  *
  * Returns XLREAD_NEED_DATA if more data is needed to finish reading the
- * current record.  In that case, state->readPagePtr and state->readLen inform
+ * current record.  In that case, state->readPagePtr and state->reqLen inform
  * the desired position and minimum length of data needed. The caller shall
  * read in the requested data and set state->readBuf to point to a buffer
  * containing it. The caller must also set state->seg->ws_tli and
  * state->readLen to indicate the timeline that it was read from, and the
- * length of data that is now available (which must be >= given readLen),
+ * length of data that is now available (which must be >= given reqLen),
  * respectively.
  *
  * If invalid data is encountered, returns XLREAD_FAIL with *record being set to
@@ -662,7 +662,7 @@ restart:
                      * XLogNeedData should have ensured that the whole page
                      * header was read
                      */
-                    Assert(state->readLen >= pageHeaderSize);
+                    Assert(pageHeaderSize <= state->readLen);
 
                     contdata = (char *) state->readBuf + pageHeaderSize;
                     record_len = XLOG_BLCKSZ - pageHeaderSize;
@@ -675,7 +675,7 @@ restart:
                      * XLogNeedData should have ensured all needed data was
                      * read
                      */
-                    Assert(state->readLen >= request_len);
+                    Assert(request_len <= state->readLen);
 
                     memcpy(state->readRecordBuf + state->recordGotLen,
                            (char *) contdata, record_len);
@@ -728,7 +728,6 @@ restart:
         state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
     }
 
-    Assert(!*record || state->readLen >= 0);
     if (DecodeXLogRecord(state, *record, errormsg))
         return XLREAD_SUCCESS;
 
@@ -824,7 +823,7 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
         /* Request more data if we don't have the full header. */
         if (state->readLen < pageHeaderSize)
         {
-            state->readLen = pageHeaderSize;
+            state->reqLen = pageHeaderSize;
             return true;
         }
 
@@ -901,7 +900,7 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
          * will not come back here, but will request the actual target page.
          */
         state->readPagePtr = pageptr - targetPageOff;
-        state->readLen = XLOG_BLCKSZ;
+        state->reqLen = XLOG_BLCKSZ;
         return true;
     }
 
@@ -910,7 +909,7 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
      * header so that we can validate it.
      */
     state->readPagePtr = pageptr;
-    state->readLen = Max(reqLen + addLen, SizeOfXLogShortPHD);
+    state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD);
     return true;
 }
 
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index fc09a72b8b..e69fd46099 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -848,7 +848,7 @@ bool
 read_local_xlog_page(XLogReaderState *state)
 {
     XLogRecPtr    targetPagePtr = state->readPagePtr;
-    int            reqLen          = state->readLen;
+    int            reqLen          = state->reqLen;
     char       *cur_page      = state->readBuf;
     XLogRecPtr    read_upto,
                 loc;
@@ -948,7 +948,7 @@ read_local_xlog_page(XLogReaderState *state)
     else if (targetPagePtr + reqLen > read_upto)
     {
         /* not enough data there */
-        state->readLen = -1;
+        XLogReaderNotifySize(state,  -1);
         return false;
     }
     else
@@ -968,7 +968,7 @@ read_local_xlog_page(XLogReaderState *state)
 
     /* number of valid bytes in the buffer */
     state->readPagePtr = targetPagePtr;
-    state->readLen = count;
+    XLogReaderNotifySize(state, count);
     return true;
 }
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 78d805ab80..81baeaded1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -803,7 +803,7 @@ static bool
 logical_read_xlog_page(XLogReaderState *state)
 {
     XLogRecPtr        targetPagePtr = state->readPagePtr;
-    int                reqLen          = state->readLen;
+    int                reqLen          = state->reqLen;
     char           *cur_page      = state->readBuf;
     XLogRecPtr    flushptr;
     int            count;
@@ -822,7 +822,7 @@ logical_read_xlog_page(XLogReaderState *state)
     /* fail if not (implies we are going to shut down) */
     if (flushptr < targetPagePtr + reqLen)
     {
-        state->readLen = -1;
+        XLogReaderNotifySize(state, -1);
         return false;
     }
 
@@ -852,7 +852,7 @@ logical_read_xlog_page(XLogReaderState *state)
     XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
     CheckXLogRemoved(segno, state->seg.ws_tli);
 
-    state->readLen = count;
+    XLogReaderNotifySize(state, count);
     return true;
 }
 
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index da723e5340..119d9c5cd2 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -263,6 +263,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir,
     XLogSegNo    targetSegNo;
     int            r;
 
+    Assert(xlogreader->reqLen <= XLOG_BLCKSZ);
+
     XLByteToSeg(targetPagePtr, targetSegNo, WalSegSz);
     XLogSegNoOffsetToRecPtr(targetSegNo + 1, 0, WalSegSz, targetSegEnd);
     targetPageOff = XLogSegmentOffset(targetPagePtr, WalSegSz);
@@ -313,7 +315,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir,
             if (restoreCommand == NULL)
             {
                 pg_log_error("could not open file \"%s\": %m", xlogfpath);
-                xlogreader->readLen = -1;
+                XLogReaderNotifySize(xlogreader, -1);
                 return false;
             }
 
@@ -328,7 +330,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir,
 
             if (xlogreadfd < 0)
             {
-                xlogreader->readLen = -1;
+                XLogReaderNotifySize(xlogreader, -1);
                 return false;
             }
             else
@@ -346,7 +348,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir,
     if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
     {
         pg_log_error("could not seek in file \"%s\": %m", xlogfpath);
-        xlogreader->readLen = -1;
+        XLogReaderNotifySize(xlogreader, -1);
         return false;
     }
 
@@ -360,14 +362,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir,
             pg_log_error("could not read file \"%s\": read %d of %zu",
                          xlogfpath, r, (Size) XLOG_BLCKSZ);
 
-        xlogreader->readLen = -1;
+        XLogReaderNotifySize(xlogreader, -1);
         return false;
     }
 
     Assert(targetSegNo == xlogreadsegno);
 
     xlogreader->seg.ws_tli = targetHistory[*tliIndex].tli;
-    xlogreader->readLen = XLOG_BLCKSZ;
+    XLogReaderNotifySize(xlogreader, XLOG_BLCKSZ);
     return true;
 }
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index d1d7427db0..7d782271fe 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -333,7 +333,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline,
                 XLogRecPtr startptr, XLogRecPtr endptr)
 {
     XLogRecPtr    targetPagePtr = state->readPagePtr;
-    int            reqLen          = state->readLen;
+    int            reqLen          = state->reqLen;
     char       *readBuff      = state->readBuf;
     int            count = XLOG_BLCKSZ;
     WALReadError errinfo;
@@ -348,7 +348,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline,
         else
         {
             /* Notify xlogreader that we didn't read at all */
-            state->readLen = -1;
+            XLogReaderNotifySize(state,  -1);
             return false;
         }
     }
@@ -378,7 +378,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline,
     }
 
     /* Notify xlogreader of how many bytes we have read */
-    state->readLen = count;
+    XLogReaderNotifySize(state, count);
     return true;
 }
 
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 1a1e1939e0..9efc04a4ad 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -159,18 +159,21 @@ struct XLogReaderState
 
     /* ----------------------------------------
      * Communication with page reader
-     * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes.
+     * readBuf is XLOG_BLCKSZ bytes, valid up to at least reqLen bytes.
      *  ----------------------------------------
      */
     /* variables to communicate with page reader */
     XLogRecPtr    readPagePtr;    /* page pointer to read */
-    int32        readLen;        /* bytes requested to reader, or actual bytes
-                                 * read by reader, which must be larger than
-                                 * the request, or -1 on error */
+    int32        reqLen;            /* bytes requested to the caller */
     char       *readBuf;        /* buffer to store data */
     bool        page_verified;    /* is the page header on the buffer verified? */
-    bool        record_verified;    /* is the current record header verified? */
+    bool        record_verified;/* is the current record header verified? */
 
+    /* variables to respond from the callers to xlogreader */
+    int32        readLen;        /* actual bytes read by reader, which must be
+                                 * larger than the request, or -1 on error.
+                                 * Use XLogReaderNotifyLength() to set a
+                                 * value. */
 
     /* ----------------------------------------
      * Decoded representation of current record
@@ -258,6 +261,13 @@ struct XLogFindNextRecordState
     bool            page_found;
 };
 
+/* setter functions of XLogReaderState used by other modules */
+static inline void
+XLogReaderNotifySize(XLogReaderState *state, int32 len)
+{
+    state->readLen = len;
+}
+
 /* Get a new XLogReader */
 extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
                                            const char *waldir,
-- 
2.27.0


pgsql-hackers by date:

Previous
From: Antonin Houska
Date:
Subject: Re: storing an explicit nonce
Next
From: Etsuro Fujita
Date:
Subject: Re: a comment in joinrel.c: compute_partition_bounds()