From 01bab5f7526af5f06b0c1ea4b96cdf547149c3eb Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 8 Apr 2021 19:59:03 +1200 Subject: [PATCH v20 2/5] fixup --- src/backend/access/transam/xlog.c | 9 ++-- src/backend/access/transam/xlogreader.c | 59 +++++++++++-------------- src/backend/access/transam/xlogutils.c | 4 +- src/backend/replication/walsender.c | 4 +- src/bin/pg_rewind/parsexlog.c | 10 ++--- src/bin/pg_waldump/pg_waldump.c | 7 ++- src/include/access/xlogreader.h | 20 ++++----- 7 files changed, 54 insertions(+), 59 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index d3d6fb4643..7faac01bf2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4359,7 +4359,8 @@ CleanupBackupHistory(void) * record is available. */ static XLogRecord * -ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) +ReadRecord(XLogReaderState *xlogreader, int emode, + bool fetching_ckpt) { XLogRecord *record; bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); @@ -12153,7 +12154,7 @@ retry: close(readFile); readFile = -1; readSource = XLOG_FROM_ANY; - XLogReaderNotifySize(state, -1); + XLogReaderSetInputData(state, -1); return false; } } @@ -12246,7 +12247,7 @@ retry: } Assert(state->readPagePtr == targetPagePtr); - XLogReaderNotifySize(state, readLen); + XLogReaderSetInputData(state, readLen); return true; next_record_is_invalid: @@ -12261,7 +12262,7 @@ next_record_is_invalid: if (StandbyMode) goto retry; - XLogReaderNotifySize(state, -1); + XLogReaderSetInputData(state, -1); return false; } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index f34f053230..9b08ef5d5f 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -260,48 +260,45 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * When a record is successfully read, returns XLREAD_SUCCESS with result * record being stored in *record. Otherwise *record is NULL. * - * Returns XLREAD_NEED_DATA if more data is needed to finish reading the + * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the * current record. In that case, state->readPagePtr and state->reqLen inform - * the desired position and minimum length of data needed. The caller shall + * the desired position and minimum length of data needed. The caller shall * read in the requested data and set state->readBuf to point to a buffer - * containing it. The caller must also set state->seg->ws_tli and + * containing it. The caller must also set state->seg->ws_tli and * state->readLen to indicate the timeline that it was read from, and the * length of data that is now available (which must be >= given reqLen), * respectively. * - * If invalid data is encountered, returns XLREAD_FAIL with *record being set to - * NULL. *errormsg is set to a string with details of the failure. - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. + * If invalid data is encountered, returns XLREAD_FAIL and sets *record to + * NULL. *errormsg is set to a string with details of the failure. The + * returned pointer (or *errormsg) points to an internal buffer that's valid + * until the next call to XLogReadRecord. * * - * This function runs a state machine consists of the following states. + * This function runs a state machine consisting of the following states. * - * XLREAD_NEXT_RECORD : - * The initial state, if called with valid RecPtr, try to read a record at - * that position. If invalid RecPtr is given try to read a record just after - * the last one previously read. - * This state ens after setting ReadRecPtr. Then goes to XLREAD_TOT_LEN. + * XLREAD_NEXT_RECORD: + * The initial state. If called with a valid XLogRecPtr, try to read a + * record at that position. If invalid RecPtr is given try to read a record + * just after the last one read. The next state is XLREAD_TOT_LEN. * * XLREAD_TOT_LEN: - * Examining record header. Ends after reading record total - * length. recordRemainLen and recordGotLen are initialized. + * Examining record header. Ends after reading record length. + * recordRemainLen and recordGotLen are initialized. The next state is + * XLREAD_FIRST_FRAGMENT. * * XLREAD_FIRST_FRAGMENT: - * Reading the first fragment. Ends with finishing reading a single - * record. Goes to XLREAD_NEXT_RECORD if that's all or - * XLREAD_CONTINUATION if we have continuation. + * Reading the first fragment. Goes to XLREAD_NEXT_RECORD if that's all or + * XLREAD_CONTINUATION if we need more data. * XLREAD_CONTINUATION: - * Reading continuation of record. Ends with finishing the whole record then - * goes to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates - * how much is left and readRecordBuf holds the partially assert - * record.recordContRecPtr points to the beginning of the next page where to - * continue. + * Reading continuation of record. If the whole record is now decoded, goes + * to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates how + * much is left. * - * If wrong data found in any state, the state machine stays at the current - * state. This behavior allows to continue reading a reacord switching among - * different souces, while streaming replication. + * If bad data found in any state, the state machine stays at the current + * state. This behavior allows us to continue reading a reacord switching + * among different souces, during streaming replication. */ XLogReadRecordResult XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) @@ -559,9 +556,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) case XLREAD_CONTINUATION: { - XLogPageHeader pageHeader; + XLogPageHeader pageHeader = NULL; uint32 pageHeaderSize; - XLogRecPtr targetPagePtr; + XLogRecPtr targetPagePtr = InvalidXLogRecPtr; /* * we enter this state only if we haven't read the whole @@ -572,7 +569,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) while (state->recordRemainLen > 0) { char *contdata; - uint32 request_len; + uint32 request_len PG_USED_FOR_ASSERTS_ONLY; uint32 record_len; /* Wait for the next page to become available */ @@ -803,8 +800,6 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, addLen = pageHeaderSize; else addLen = XLOG_BLCKSZ - reqLen; - - Assert(addLen >= 0); } /* Return if we already have it. */ @@ -1245,7 +1240,7 @@ err: #endif /* FRONTEND */ /* - * Helper function to ease writing of page_read callback. + * Helper function to ease writing of routines that read raw WAL data. * If this function is used, caller must supply a segment_open callback and * segment_close callback as that is used here. * diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 61361192e7..08b9bd8b73 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -928,7 +928,7 @@ read_local_xlog_page(XLogReaderState *state) else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - XLogReaderNotifySize(state, -1); + XLogReaderSetInputData(state, -1); return false; } else @@ -948,7 +948,7 @@ read_local_xlog_page(XLogReaderState *state) /* number of valid bytes in the buffer */ state->readPagePtr = targetPagePtr; - XLogReaderNotifySize(state, count); + XLogReaderSetInputData(state, count); return true; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 31522c8cc2..52fe9aba66 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -826,7 +826,7 @@ logical_read_xlog_page(XLogReaderState *state) /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) { - XLogReaderNotifySize(state, -1); + XLogReaderSetInputData(state, -1); return false; } @@ -856,7 +856,7 @@ logical_read_xlog_page(XLogReaderState *state) XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); CheckXLogRemoved(segno, state->seg.ws_tli); - XLogReaderNotifySize(state, count); + XLogReaderSetInputData(state, count); return true; } diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 707493dddf..79f71c0477 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -306,7 +306,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, if (restoreCommand == NULL) { pg_log_error("could not open file \"%s\": %m", xlogfpath); - XLogReaderNotifySize(xlogreader, -1); + XLogReaderSetInputData(xlogreader, -1); return false; } @@ -321,7 +321,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, if (xlogreadfd < 0) { - XLogReaderNotifySize(xlogreader, -1); + XLogReaderSetInputData(xlogreader, -1); return false; } else @@ -339,7 +339,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) { pg_log_error("could not seek in file \"%s\": %m", xlogfpath); - XLogReaderNotifySize(xlogreader, -1); + XLogReaderSetInputData(xlogreader, -1); return false; } @@ -353,14 +353,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, pg_log_error("could not read file \"%s\": read %d of %zu", xlogfpath, r, (Size) XLOG_BLCKSZ); - XLogReaderNotifySize(xlogreader, -1); + XLogReaderSetInputData(xlogreader, -1); return false; } Assert(targetSegNo == xlogreadsegno); xlogreader->seg.ws_tli = targetHistory[*tliIndex].tli; - XLogReaderNotifySize(xlogreader, XLOG_BLCKSZ); + XLogReaderSetInputData(xlogreader, XLOG_BLCKSZ); return true; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 82af398d20..5db389aa2d 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -347,7 +347,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline, else { /* Notify xlogreader that we didn't read at all */ - XLogReaderNotifySize(state, -1); + XLogReaderSetInputData(state, -1); return false; } } @@ -377,7 +377,7 @@ WALDumpReadPage(XLogReaderState *state, TimeLineID timeline, } /* Notify xlogreader of how many bytes we have read */ - XLogReaderNotifySize(state, count); + XLogReaderSetInputData(state, count); return true; } @@ -808,6 +808,9 @@ main(int argc, char **argv) } } + memset(&config, 0, sizeof(XLogDumpConfig)); + memset(&stats, 0, sizeof(XLogDumpStats)); + timeline = 1; startptr = InvalidXLogRecPtr; endptr = InvalidXLogRecPtr; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 836ca7fce8..d27c0cd281 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -150,20 +150,19 @@ struct XLogReaderState /* ---------------------------------------- * Communication with page reader * readBuf is XLOG_BLCKSZ bytes, valid up to at least reqLen bytes. - * ---------------------------------------- + * ---------------------------------------- */ - /* variables to inform to the callers from xlogreader */ + /* variables the clients of xlogreader can examine */ XLogRecPtr readPagePtr; /* page pointer to read */ int32 reqLen; /* bytes requested to the caller */ char *readBuf; /* buffer to store data */ bool page_verified; /* is the page header on the buffer verified? */ bool record_verified;/* is the current record header verified? */ - /* variables to respond from the callers to xlogreader */ - int32 readLen; /* actual bytes read by reader, which must be - * larger than the request, or -1 on error. - * Use XLogReaderNotifyLength() to set a - * value. */ + /* variables set by the client of xlogreader */ + int32 readLen; /* actual bytes copied into readBuf by client, + * which should be >= reqLen. Client should + * use XLogReaderSetInputData() to set. */ /* ---------------------------------------- * Decoded representation of current record @@ -250,9 +249,9 @@ struct XLogFindNextRecordState XLogRecPtr currRecPtr; }; -/* setter functions of XLogReaderState used by other modules */ +/* Report that data is available for decoding. */ static inline void -XLogReaderNotifySize(XLogReaderState *state, int32 len) +XLogReaderSetInputData(XLogReaderState *state, int32 len) { state->readLen = len; } @@ -268,9 +267,6 @@ extern void XLogReaderFree(XLogReaderState *state); /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND -/* Function type definition for the read_page callback */ -typedef bool (*XLogFindNextRecordCB) (XLogReaderState *xlogreader, - void *private); extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr); extern bool XLogFindNextRecord(XLogFindNextRecordState *state); #endif /* FRONTEND */ -- 2.30.1