From 860c2b19b783c39b5d68d523e4e7b542da5db978 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Wed, 7 Apr 2021 16:38:21 +0900 Subject: [PATCH v17 05/10] Split readLen and reqLen of XLogReaderState. The variable is used as both out and in parameter of page-read functions. Separate the varialbe according to the roles. To avoid confusion between the two variables, provide a setter function for page-read functions to set readLen. --- src/backend/access/transam/xlog.c | 10 +++++----- src/backend/access/transam/xlogreader.c | 17 ++++++++--------- src/backend/access/transam/xlogutils.c | 6 +++--- src/backend/replication/walsender.c | 6 +++--- src/bin/pg_rewind/parsexlog.c | 12 +++++++----- src/bin/pg_waldump/pg_waldump.c | 6 +++--- src/include/access/xlogreader.h | 23 ++++++++++++++++------- 7 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9a9835f05d..d3d6fb4643 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -12101,7 +12101,7 @@ XLogPageRead(XLogReaderState *state, { char *readBuf = state->readBuf; XLogRecPtr targetPagePtr = state->readPagePtr; - int reqLen = state->readLen; + int reqLen = state->reqLen; int readLen = 0; XLogRecPtr targetRecPtr = state->ReadRecPtr; uint32 targetPageOff; @@ -12153,7 +12153,7 @@ retry: close(readFile); readFile = -1; readSource = XLOG_FROM_ANY; - state->readLen = -1; + XLogReaderNotifySize(state, -1); return false; } } @@ -12208,7 +12208,7 @@ retry: pgstat_report_wait_end(); Assert(targetSegNo == state->seg.ws_segno); - Assert(reqLen <= readLen); + Assert(readLen >= reqLen); state->seg.ws_tli = curFileTLI; @@ -12246,7 +12246,7 @@ retry: } Assert(state->readPagePtr == targetPagePtr); - state->readLen = readLen; + XLogReaderNotifySize(state, readLen); return true; next_record_is_invalid: @@ -12261,7 +12261,7 @@ next_record_is_invalid: if (StandbyMode) goto retry; - state->readLen = -1; + XLogReaderNotifySize(state, -1); return false; } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 89c59843b9..7d1b5b50e6 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -107,7 +107,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ + /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */ state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) @@ -261,12 +261,12 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * record being stored in *record. Otherwise *record is NULL. * * Returns XLREAD_NEED_DATA if more data is needed to finish reading the - * current record. In that case, state->readPagePtr and state->readLen inform + * current record. In that case, state->readPagePtr and state->reqLen inform * the desired position and minimum length of data needed. The caller shall * read in the requested data and set state->readBuf to point to a buffer * containing it. The caller must also set state->seg->ws_tli and * state->readLen to indicate the timeline that it was read from, and the - * length of data that is now available (which must be >= given readLen), + * length of data that is now available (which must be >= given reqLen), * respectively. * * If invalid data is encountered, returns XLREAD_FAIL with *record being set to @@ -630,7 +630,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * XLogNeedData should have ensured that the whole page * header was read */ - Assert(state->readLen >= pageHeaderSize); + Assert(pageHeaderSize <= state->readLen); contdata = (char *) state->readBuf + pageHeaderSize; record_len = XLOG_BLCKSZ - pageHeaderSize; @@ -643,7 +643,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * XLogNeedData should have ensured all needed data was * read */ - Assert(state->readLen >= request_len); + Assert(request_len <= state->readLen); memcpy(state->readRecordBuf + state->recordGotLen, (char *) contdata, record_len); @@ -696,7 +696,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); } - Assert(!*record || state->readLen >= 0); if (DecodeXLogRecord(state, *record, errormsg)) return XLREAD_SUCCESS; @@ -763,7 +762,7 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, /* Request more data if we don't have the full header. */ if (state->readLen < pageHeaderSize) { - state->readLen = pageHeaderSize; + state->reqLen = pageHeaderSize; return true; } @@ -840,7 +839,7 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, * will not come back here, but will request the actual target page. */ state->readPagePtr = pageptr - targetPageOff; - state->readLen = XLOG_BLCKSZ; + state->reqLen = XLOG_BLCKSZ; return true; } @@ -849,7 +848,7 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, * header so that we can validate it. */ state->readPagePtr = pageptr; - state->readLen = Max(reqLen + addLen, SizeOfXLogShortPHD); + state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD); return true; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b003990745..61361192e7 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -828,7 +828,7 @@ bool read_local_xlog_page(XLogReaderState *state) { XLogRecPtr targetPagePtr = state->readPagePtr; - int reqLen = state->readLen; + int reqLen = state->reqLen; char *cur_page = state->readBuf; XLogRecPtr read_upto, loc; @@ -928,7 +928,7 @@ read_local_xlog_page(XLogReaderState *state) else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - state->readLen = -1; + XLogReaderNotifySize(state, -1); return false; } else @@ -948,7 +948,7 @@ read_local_xlog_page(XLogReaderState *state) /* number of valid bytes in the buffer */ state->readPagePtr = targetPagePtr; - state->readLen = count; + XLogReaderNotifySize(state, count); return true; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b024bbc3cd..31522c8cc2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -807,7 +807,7 @@ static bool logical_read_xlog_page(XLogReaderState *state) { XLogRecPtr targetPagePtr = state->readPagePtr; - int reqLen = state->readLen; + int reqLen = state->reqLen; char *cur_page = state->readBuf; XLogRecPtr flushptr; int count; @@ -826,7 +826,7 @@ logical_read_xlog_page(XLogReaderState *state) /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) { - state->readLen = -1; + XLogReaderNotifySize(state, -1); return false; } @@ -856,7 +856,7 @@ logical_read_xlog_page(XLogReaderState *state) XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); CheckXLogRemoved(segno, state->seg.ws_tli); - state->readLen = count; + XLogReaderNotifySize(state, count); return true; } diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 712c85281c..707493dddf 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -254,6 +254,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, XLogSegNo targetSegNo; int r; + Assert(xlogreader->reqLen <= XLOG_BLCKSZ); + XLByteToSeg(targetPagePtr, targetSegNo, WalSegSz); XLogSegNoOffsetToRecPtr(targetSegNo + 1, 0, WalSegSz, targetSegEnd); targetPageOff = XLogSegmentOffset(targetPagePtr, WalSegSz); @@ -304,7 +306,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, if (restoreCommand == NULL) { pg_log_error("could not open file \"%s\": %m", xlogfpath); - xlogreader->readLen = -1; + XLogReaderNotifySize(xlogreader, -1); return false; } @@ -319,7 +321,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, if (xlogreadfd < 0) { - xlogreader->readLen = -1; + XLogReaderNotifySize(xlogreader, -1); return false; } else @@ -337,7 +339,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) { pg_log_error("could not seek in file \"%s\": %m", xlogfpath); - xlogreader->readLen = -1; + XLogReaderNotifySize(xlogreader, -1); return false; } @@ -351,14 +353,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, pg_log_error("could not read file \"%s\": read %d of %zu", xlogfpath, r, (Size) XLOG_BLCKSZ); - xlogreader->readLen = -1; + XLogReaderNotifySize(xlogreader, -1); return false; } Assert(targetSegNo == xlogreadsegno); xlogreader->seg.ws_tli = targetHistory[*tliIndex].tli; - xlogreader->readLen = XLOG_BLCKSZ; + XLogReaderNotifySize(xlogreader, XLOG_BLCKSZ); return true; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index ab2d079bdb..82af398d20 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -332,7 +332,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline, XLogRecPtr startptr, XLogRecPtr endptr) { XLogRecPtr targetPagePtr = state->readPagePtr; - int reqLen = state->readLen; + int reqLen = state->reqLen; char *readBuff = state->readBuf; int count = XLOG_BLCKSZ; WALReadError errinfo; @@ -347,7 +347,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline, else { /* Notify xlogreader that we didn't read at all */ - state->readLen = -1; + XLogReaderNotifySize(state, -1); return false; } } @@ -377,7 +377,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline, } /* Notify xlogreader of how many bytes we have read */ - state->readLen = count; + XLogReaderNotifySize(state, count); return true; } diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d8cb488820..836ca7fce8 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -149,19 +149,21 @@ struct XLogReaderState /* ---------------------------------------- * Communication with page reader - * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes. + * readBuf is XLOG_BLCKSZ bytes, valid up to at least reqLen bytes. * ---------------------------------------- */ - /* variables to communicate with page reader */ + /* variables to inform to the callers from xlogreader */ XLogRecPtr readPagePtr; /* page pointer to read */ - int32 readLen; /* bytes requested to reader, or actual bytes - * read by reader, which must be larger than - * the request, or -1 on error */ + int32 reqLen; /* bytes requested to the caller */ char *readBuf; /* buffer to store data */ bool page_verified; /* is the page header on the buffer verified? */ - bool record_verified; /* is the current record header verified? */ - + bool record_verified;/* is the current record header verified? */ + /* variables to respond from the callers to xlogreader */ + int32 readLen; /* actual bytes read by reader, which must be + * larger than the request, or -1 on error. + * Use XLogReaderNotifyLength() to set a + * value. */ /* ---------------------------------------- * Decoded representation of current record @@ -248,6 +250,13 @@ struct XLogFindNextRecordState XLogRecPtr currRecPtr; }; +/* setter functions of XLogReaderState used by other modules */ +static inline void +XLogReaderNotifySize(XLogReaderState *state, int32 len) +{ + state->readLen = len; +} + /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, const char *waldir, -- 2.30.1