From 17f441df6e04736aad844ff2c1ec6b9cdef8d6b3 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Wed, 29 Oct 2025 11:45:52 +0000 Subject: [PATCH v4 3/4] Replace InvalidXLogRecPtr comparisons with XLogRecPtrIsValid() This commit ensures the XLogRecPtrIsValid() macro is used instead of direct InvalidXLogRecPtr equality comparisons. --- src/backend/access/heap/rewriteheap.c | 4 +- src/backend/access/heap/vacuumlazy.c | 2 +- src/backend/access/transam/twophase.c | 2 +- src/backend/access/transam/xlog.c | 18 ++++----- src/backend/access/transam/xloginsert.c | 4 +- src/backend/access/transam/xlogreader.c | 2 +- src/backend/access/transam/xlogrecovery.c | 8 ++-- src/backend/access/transam/xlogutils.c | 8 ++-- src/backend/catalog/pg_subscription.c | 4 +- src/backend/replication/logical/logical.c | 30 +++++++-------- .../replication/logical/logicalfuncs.c | 4 +- src/backend/replication/logical/origin.c | 18 ++++----- src/backend/replication/logical/proto.c | 18 ++++----- .../replication/logical/reorderbuffer.c | 38 +++++++++---------- src/backend/replication/logical/snapbuild.c | 14 +++---- src/backend/replication/slot.c | 18 ++++----- src/backend/replication/slotfuncs.c | 6 +-- src/backend/replication/walsender.c | 6 +-- src/bin/pg_basebackup/pg_receivewal.c | 6 +-- src/bin/pg_basebackup/pg_recvlogical.c | 10 ++--- src/bin/pg_waldump/pg_waldump.c | 4 +- src/include/access/xlogdefs.h | 2 +- 22 files changed, 113 insertions(+), 113 deletions(-) 19.4% src/backend/access/transam/ 54.8% src/backend/replication/logical/ 12.3% src/backend/replication/ 3.6% src/backend/ 7.6% src/bin/pg_basebackup/ diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 8061e92f044..66ab48f0fe0 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -1169,7 +1169,7 @@ CheckPointLogicalRewriteHeap(void) cutoff = ReplicationSlotsComputeLogicalRestartLSN(); /* don't start earlier than the restart lsn */ - if (cutoff != InvalidXLogRecPtr && redo < cutoff) + if (XLogRecPtrIsValid(cutoff) && redo < cutoff) cutoff = redo; mappings_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR); @@ -1204,7 +1204,7 @@ CheckPointLogicalRewriteHeap(void) lsn = ((uint64) hi) << 32 | lo; - if (lsn < cutoff || cutoff == InvalidXLogRecPtr) + if (lsn < cutoff || !XLogRecPtrIsValid(cutoff)) { elog(DEBUG1, "removing logical rewrite file \"%s\"", path); if (unlink(path) < 0) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index d2b031fdd06..27fba1e3dd4 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -1900,7 +1900,7 @@ lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno, * WAL-logged, and if not, do that now. */ if (RelationNeedsWAL(vacrel->rel) && - PageGetLSN(page) == InvalidXLogRecPtr) + !XLogRecPtrIsValid(PageGetLSN(page))) log_newpage_buffer(buf, true); PageSetAllVisible(page); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 8a92b292e56..89d0bfa7760 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2198,7 +2198,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); if (!fromdisk) - Assert(prepare_start_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(prepare_start_lsn)); /* Already processed? */ if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) || diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4b827c17cfa..b2f53022805 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -848,7 +848,7 @@ XLogInsertRecord(XLogRecData *rdata, if (doPageWrites && (!prevDoPageWrites || - (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr))) + (XLogRecPtrIsValid(fpw_lsn) && fpw_lsn <= RedoRecPtr))) { /* * Oops, some buffer now needs to be backed up that the caller @@ -882,7 +882,7 @@ XLogInsertRecord(XLogRecData *rdata, * Those checks are only needed for records that can contain buffer * references, and an XLOG_SWITCH record never does. */ - Assert(fpw_lsn == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(fpw_lsn)); WALInsertLockAcquireExclusive(); inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); } @@ -897,7 +897,7 @@ XLogInsertRecord(XLogRecData *rdata, * not check RedoRecPtr before inserting the record; we just need to * update it afterwards. */ - Assert(fpw_lsn == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(fpw_lsn)); WALInsertLockAcquireExclusive(); ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, &rechdr->xl_prev); @@ -1602,7 +1602,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) */ } while (insertingat < upto); - if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) + if (XLogRecPtrIsValid(insertingat) && insertingat < finishedUpto) finishedUpto = insertingat; } @@ -7356,7 +7356,7 @@ CreateCheckPoint(int flags) * Update the average distance between checkpoints if the prior checkpoint * exists. */ - if (PriorRedoPtr != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(PriorRedoPtr)) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); INJECTION_POINT("checkpoint-before-old-wal-removal", NULL); @@ -7804,7 +7804,7 @@ CreateRestartPoint(int flags) * Update the average distance between checkpoints/restartpoints if the * prior checkpoint exists. */ - if (PriorRedoPtr != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(PriorRedoPtr)) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); /* @@ -8011,7 +8011,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) /* Calculate how many segments are kept by slots. */ keep = XLogGetReplicationSlotMinimumLSN(); - if (keep != InvalidXLogRecPtr && keep < recptr) + if (XLogRecPtrIsValid(keep) && keep < recptr) { XLByteToSeg(keep, segno, wal_segment_size); @@ -8038,7 +8038,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) * summarized. */ keep = GetOldestUnsummarizedLSN(NULL, NULL); - if (keep != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(keep)) { XLogSegNo unsummarized_segno; @@ -8596,7 +8596,7 @@ xlog_redo(XLogReaderState *record) LocalMinRecoveryPoint = ControlFile->minRecoveryPoint; LocalMinRecoveryPointTLI = ControlFile->minRecoveryPointTLI; } - if (LocalMinRecoveryPoint != InvalidXLogRecPtr && LocalMinRecoveryPoint < lsn) + if (XLogRecPtrIsValid(LocalMinRecoveryPoint) && LocalMinRecoveryPoint < lsn) { TimeLineID replayTLI; diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 58cb4b1b00c..a56d5a55282 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -528,7 +528,7 @@ XLogInsert(RmgrId rmid, uint8 info) EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi, fpi_bytes, topxid_included); - } while (EndPos == InvalidXLogRecPtr); + } while (!XLogRecPtrIsValid(EndPos)); XLogResetInsertion(); @@ -639,7 +639,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, needs_backup = (page_lsn <= RedoRecPtr); if (!needs_backup) { - if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn) + if (!XLogRecPtrIsValid(*fpw_lsn) || page_lsn < *fpw_lsn) *fpw_lsn = page_lsn; } } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index aad16127e60..755f351143a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -558,7 +558,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) RecPtr = state->NextRecPtr; - if (state->DecodeRecPtr != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(state->DecodeRecPtr)) { /* read the record after the one we just read */ diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 84df67b07d2..f1e90076576 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -757,9 +757,9 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, * end-of-backup record), and we can enter archive recovery directly. */ if (ArchiveRecoveryRequested && - (ControlFile->minRecoveryPoint != InvalidXLogRecPtr || + (XLogRecPtrIsValid(ControlFile->minRecoveryPoint) || ControlFile->backupEndRequired || - ControlFile->backupEndPoint != InvalidXLogRecPtr || + XLogRecPtrIsValid(ControlFile->backupEndPoint) || ControlFile->state == DB_SHUTDOWNED)) { InArchiveRecovery = true; @@ -3151,7 +3151,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, /* Pass through parameters to XLogPageRead */ private->fetching_ckpt = fetching_ckpt; private->emode = emode; - private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + private->randAccess = (!XLogRecPtrIsValid(xlogreader->ReadRecPtr)); private->replayTLI = replayTLI; /* This is the first attempt to read this page. */ @@ -4336,7 +4336,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source) * Skip scanning the timeline ID that the logfile segment to read * doesn't belong to */ - if (hent->begin != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(hent->begin)) { XLogSegNo beginseg = 0; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 38176d9688e..ce2a3e42146 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -710,7 +710,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, const XLogRecPtr lastReadPage = (state->seg.ws_segno * state->segcxt.ws_segsize + state->segoff); - Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); + Assert(XLogRecPtrIsValid(wantPage) && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ); Assert(currTLI != 0); @@ -741,7 +741,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, */ if (state->currTLI == currTLI && wantPage >= lastReadPage) { - Assert(state->currTLIValidUntil == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(state->currTLIValidUntil)); return; } @@ -750,7 +750,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, * timeline and the timeline we're reading from is valid until the end of * the current segment we can just keep reading. */ - if (state->currTLIValidUntil != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(state->currTLIValidUntil) && state->currTLI != currTLI && state->currTLI != 0 && ((wantPage + wantLength) / state->segcxt.ws_segsize) < @@ -790,7 +790,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, &state->nextTLI); - Assert(state->currTLIValidUntil == InvalidXLogRecPtr || + Assert(!XLogRecPtrIsValid(state->currTLIValidUntil) || wantPage + wantLength < state->currTLIValidUntil); list_free_deep(timelineHistory); diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 15b233a37d8..d836ea80b4d 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -293,7 +293,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); - if (sublsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(sublsn)) values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); else nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; @@ -366,7 +366,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; - if (sublsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(sublsn)) values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); else nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 79717b52941..866f92cf799 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -546,9 +546,9 @@ CreateDecodingContext(XLogRecPtr start_lsn, /* slot must be valid to allow decoding */ Assert(slot->data.invalidated == RS_INVAL_NONE); - Assert(slot->data.restart_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(slot->data.restart_lsn)); - if (start_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(start_lsn)) { /* continue from last position */ start_lsn = slot->data.confirmed_flush; @@ -757,7 +757,7 @@ output_plugin_error_callback(void *arg) LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg; /* not all callbacks have an associated LSN */ - if (state->report_location != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(state->report_location)) errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X", NameStr(state->ctx->slot->data.name), NameStr(state->ctx->slot->data.plugin), @@ -1711,7 +1711,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) * Only increase if the previous values have been applied, otherwise we * might never end up updating if the receiver acks too slowly. */ - else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr) + else if (!XLogRecPtrIsValid(slot->candidate_xmin_lsn)) { slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; @@ -1749,8 +1749,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart slot = MyReplicationSlot; Assert(slot != NULL); - Assert(restart_lsn != InvalidXLogRecPtr); - Assert(current_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(restart_lsn)); + Assert(XLogRecPtrIsValid(current_lsn)); SpinLockAcquire(&slot->mutex); @@ -1779,7 +1779,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart * might never end up updating if the receiver acks too slowly. A missed * value here will just cause some extra effort after reconnecting. */ - else if (slot->candidate_restart_valid == InvalidXLogRecPtr) + else if (!XLogRecPtrIsValid(slot->candidate_restart_valid)) { slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; @@ -1819,11 +1819,11 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart void LogicalConfirmReceivedLocation(XLogRecPtr lsn) { - Assert(lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(lsn)); /* Do an unlocked check for candidate_lsn first. */ - if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr || - MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) || + XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid)) { bool updated_xmin = false; bool updated_restart = false; @@ -1849,7 +1849,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) MyReplicationSlot->data.confirmed_flush = lsn; /* if we're past the location required for bumping xmin, do so */ - if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) && MyReplicationSlot->candidate_xmin_lsn <= lsn) { /* @@ -1871,10 +1871,10 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) } } - if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid) && MyReplicationSlot->candidate_restart_valid <= lsn) { - Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_lsn)); MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; @@ -2089,7 +2089,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner; XLogRecPtr retlsn; - Assert(moveto != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(moveto)); if (found_consistent_snapshot) *found_consistent_snapshot = false; @@ -2163,7 +2163,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, if (found_consistent_snapshot && DecodingContextReady(ctx)) *found_consistent_snapshot = true; - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(ctx->reader->EndRecPtr)) { LogicalConfirmReceivedLocation(moveto); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index d78e486bca6..49b2aef3c74 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -276,7 +276,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin } /* check limits */ - if (upto_lsn != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(upto_lsn) && upto_lsn <= ctx->reader->EndRecPtr) break; if (upto_nchanges != 0 && @@ -289,7 +289,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * Next time, start where we left off. (Hunting things, the family * business..) */ - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) + if (XLogRecPtrIsValid(ctx->reader->EndRecPtr) && confirm) { LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index bcd5d9aad62..4632aa8115d 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -984,8 +984,8 @@ replorigin_advance(RepOriginId node, /* initialize new slot */ LWLockAcquire(&free_state->lock, LW_EXCLUSIVE); replication_state = free_state; - Assert(replication_state->remote_lsn == InvalidXLogRecPtr); - Assert(replication_state->local_lsn == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(replication_state->remote_lsn)); + Assert(!XLogRecPtrIsValid(replication_state->local_lsn)); replication_state->roident = node; } @@ -1020,7 +1020,7 @@ replorigin_advance(RepOriginId node, */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; - if (local_commit != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(local_commit) && (go_backward || replication_state->local_lsn < local_commit)) replication_state->local_lsn = local_commit; LWLockRelease(&replication_state->lock); @@ -1064,7 +1064,7 @@ replorigin_get_progress(RepOriginId node, bool flush) LWLockRelease(ReplicationOriginLock); - if (flush && local_lsn != InvalidXLogRecPtr) + if (flush && XLogRecPtrIsValid(local_lsn)) XLogFlush(local_lsn); return remote_lsn; @@ -1197,8 +1197,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by) /* initialize new slot */ session_replication_state = &replication_states[free_slot]; - Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); - Assert(session_replication_state->local_lsn == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn)); + Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn)); session_replication_state->roident = node; } @@ -1282,7 +1282,7 @@ replorigin_session_get_progress(bool flush) local_lsn = session_replication_state->local_lsn; LWLockRelease(&session_replication_state->lock); - if (flush && local_lsn != InvalidXLogRecPtr) + if (flush && XLogRecPtrIsValid(local_lsn)) XLogFlush(local_lsn); return remote_lsn; @@ -1454,7 +1454,7 @@ pg_replication_origin_session_progress(PG_FUNCTION_ARGS) remote_lsn = replorigin_session_get_progress(flush); - if (remote_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(remote_lsn)) PG_RETURN_NULL(); PG_RETURN_LSN(remote_lsn); @@ -1543,7 +1543,7 @@ pg_replication_origin_progress(PG_FUNCTION_ARGS) remote_lsn = replorigin_get_progress(roident, flush); - if (remote_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(remote_lsn)) PG_RETURN_NULL(); PG_RETURN_LSN(remote_lsn); diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index ed62888764c..f0a913892b9 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -64,7 +64,7 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) { /* read fields */ begin_data->final_lsn = pq_getmsgint64(in); - if (begin_data->final_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(begin_data->final_lsn)) elog(ERROR, "final_lsn not set in begin message"); begin_data->committime = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -135,10 +135,10 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da { /* read fields */ begin_data->prepare_lsn = pq_getmsgint64(in); - if (begin_data->prepare_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(begin_data->prepare_lsn)) elog(ERROR, "prepare_lsn not set in begin prepare message"); begin_data->end_lsn = pq_getmsgint64(in); - if (begin_data->end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(begin_data->end_lsn)) elog(ERROR, "end_lsn not set in begin prepare message"); begin_data->prepare_time = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -207,10 +207,10 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype, /* read fields */ prepare_data->prepare_lsn = pq_getmsgint64(in); - if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->prepare_lsn)) elog(ERROR, "prepare_lsn is not set in %s message", msgtype); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in %s message", msgtype); prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); @@ -274,10 +274,10 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * /* read fields */ prepare_data->commit_lsn = pq_getmsgint64(in); - if (prepare_data->commit_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->commit_lsn)) elog(ERROR, "commit_lsn is not set in commit prepared message"); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in commit prepared message"); prepare_data->commit_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); @@ -333,10 +333,10 @@ logicalrep_read_rollback_prepared(StringInfo in, /* read fields */ rollback_data->prepare_end_lsn = pq_getmsgint64(in); - if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn)) elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); rollback_data->rollback_end_lsn = pq_getmsgint64(in); - if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn)) elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); rollback_data->prepare_time = pq_getmsgint64(in); rollback_data->rollback_time = pq_getmsgint64(in); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b57aef9916d..eb6a84554b7 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -701,7 +701,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, { /* initialize the new entry, if creation was requested */ Assert(ent != NULL); - Assert(lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(lsn)); ent->txn = ReorderBufferAllocTXN(rb); ent->txn->xid = xid; @@ -849,7 +849,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, change->lsn = lsn; change->txn = txn; - Assert(InvalidXLogRecPtr != lsn); + Assert(XLogRecPtrIsValid(lsn)); dlist_push_tail(&txn->changes, &change->node); txn->nentries++; txn->nentries_mem++; @@ -966,14 +966,14 @@ AssertTXNLsnOrder(ReorderBuffer *rb) iter.cur); /* start LSN must be set */ - Assert(cur_txn->first_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(cur_txn->first_lsn)); /* If there is an end LSN, it must be higher than start LSN */ - if (cur_txn->end_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(cur_txn->end_lsn)) Assert(cur_txn->first_lsn <= cur_txn->end_lsn); /* Current initial LSN must be strictly higher than previous */ - if (prev_first_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(prev_first_lsn)) Assert(prev_first_lsn < cur_txn->first_lsn); /* known-as-subtxn txns must not be listed */ @@ -990,10 +990,10 @@ AssertTXNLsnOrder(ReorderBuffer *rb) /* base snapshot (and its LSN) must be set */ Assert(cur_txn->base_snapshot != NULL); - Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(cur_txn->base_snapshot_lsn)); /* current LSN must be strictly higher than previous */ - if (prev_base_snap_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(prev_base_snap_lsn)) Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); /* known-as-subtxn txns must not be listed */ @@ -1022,11 +1022,11 @@ AssertChangeLsnOrder(ReorderBufferTXN *txn) cur_change = dlist_container(ReorderBufferChange, node, iter.cur); - Assert(txn->first_lsn != InvalidXLogRecPtr); - Assert(cur_change->lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); + Assert(XLogRecPtrIsValid(cur_change->lsn)); Assert(txn->first_lsn <= cur_change->lsn); - if (txn->end_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(txn->end_lsn)) Assert(cur_change->lsn <= txn->end_lsn); Assert(prev_lsn <= cur_change->lsn); @@ -1053,7 +1053,7 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); Assert(!rbtxn_is_known_subxact(txn)); - Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); return txn; } @@ -2276,7 +2276,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * We can't call start stream callback before processing first * change. */ - if (prev_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prev_lsn)) { if (streaming) { @@ -2291,7 +2291,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * subtransactions. The changes may have the same LSN due to * MULTI_INSERT xlog records. */ - Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); + Assert(!XLogRecPtrIsValid(prev_lsn) || prev_lsn <= change->lsn); prev_lsn = change->lsn; @@ -2975,7 +2975,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, * have been updated in it by now. */ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->final_lsn)); txn->gid = pstrdup(gid); @@ -3041,7 +3041,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, */ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE)); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->final_lsn)); /* * By this time the txn has the prepare record information and it is @@ -4552,8 +4552,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, dlist_mutable_iter cleanup_iter; File *fd = &file->vfd; - Assert(txn->first_lsn != InvalidXLogRecPtr); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); + Assert(XLogRecPtrIsValid(txn->final_lsn)); /* free current entries, so we have memory for more */ dlist_foreach_modify(cleanup_iter, &txn->changes) @@ -4860,8 +4860,8 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) XLogSegNo cur; XLogSegNo last; - Assert(txn->first_lsn != InvalidXLogRecPtr); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); + Assert(XLogRecPtrIsValid(txn->final_lsn)); XLByteToSeg(txn->first_lsn, first, wal_segment_size); XLByteToSeg(txn->final_lsn, last, wal_segment_size); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 98ddee20929..6e18baa33cb 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1210,7 +1210,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * oldest ongoing txn might have started when we didn't yet serialize * anything because we hadn't reached a consistent state yet. */ - if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) + if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn)) LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); /* @@ -1218,8 +1218,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * we have one. */ else if (txn == NULL && - builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && - builder->last_serialized_snapshot != InvalidXLogRecPtr) + XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) && + XLogRecPtrIsValid(builder->last_serialized_snapshot)) LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); } @@ -1293,7 +1293,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn */ if (running->oldestRunningXid == running->nextXid) { - if (builder->start_decoding_at == InvalidXLogRecPtr || + if (!XLogRecPtrIsValid(builder->start_decoding_at) || builder->start_decoding_at <= lsn) /* can decode everything after this */ builder->start_decoding_at = lsn + 1; @@ -1509,8 +1509,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) struct stat stat_buf; Size sz; - Assert(lsn != InvalidXLogRecPtr); - Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr || + Assert(XLogRecPtrIsValid(lsn)); + Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) || builder->last_serialized_snapshot <= lsn); /* @@ -2029,7 +2029,7 @@ CheckPointSnapBuild(void) lsn = ((uint64) hi) << 32 | lo; /* check whether we still need it */ - if (lsn < cutoff || cutoff == InvalidXLogRecPtr) + if (lsn < cutoff || !XLogRecPtrIsValid(cutoff)) { elog(DEBUG1, "removing snapbuild snapshot %s", path); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 0cb93dc90f2..1ec1e997b27 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1270,15 +1270,15 @@ ReplicationSlotsComputeRequiredLSN(void) */ if (persistency == RS_PERSISTENT) { - if (last_saved_restart_lsn != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(last_saved_restart_lsn) && restart_lsn > last_saved_restart_lsn) { restart_lsn = last_saved_restart_lsn; } } - if (restart_lsn != InvalidXLogRecPtr && - (min_required == InvalidXLogRecPtr || + if (XLogRecPtrIsValid(restart_lsn) && + (!XLogRecPtrIsValid(min_required) || restart_lsn < min_required)) min_required = restart_lsn; } @@ -1350,17 +1350,17 @@ ReplicationSlotsComputeLogicalRestartLSN(void) */ if (persistency == RS_PERSISTENT) { - if (last_saved_restart_lsn != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(last_saved_restart_lsn) && restart_lsn > last_saved_restart_lsn) { restart_lsn = last_saved_restart_lsn; } } - if (restart_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(restart_lsn)) continue; - if (result == InvalidXLogRecPtr || + if (!XLogRecPtrIsValid(result) || restart_lsn < result) result = restart_lsn; } @@ -1573,8 +1573,8 @@ ReplicationSlotReserveWal(void) ReplicationSlot *slot = MyReplicationSlot; Assert(slot != NULL); - Assert(slot->data.restart_lsn == InvalidXLogRecPtr); - Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(slot->data.restart_lsn)); + Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn)); /* * The replication slot mechanism is used to prevent removal of required @@ -1754,7 +1754,7 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, { XLogRecPtr restart_lsn = s->data.restart_lsn; - if (restart_lsn != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(restart_lsn) && restart_lsn < oldestLSN) return RS_INVAL_WAL_REMOVED; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 5aecd5c6a50..0478fc9c977 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -308,12 +308,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; - if (slot_contents.data.restart_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(slot_contents.data.restart_lsn)) values[i++] = LSNGetDatum(slot_contents.data.restart_lsn); else nulls[i++] = true; - if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush)) values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush); else nulls[i++] = true; @@ -467,7 +467,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; XLogRecPtr retlsn = startlsn; - Assert(moveto != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(moveto)); if (startlsn < moveto) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b6dfb230d0d..4a678e60b74 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2397,7 +2397,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) bool changed = false; ReplicationSlot *slot = MyReplicationSlot; - Assert(lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(lsn)); SpinLockAcquire(&slot->mutex); if (slot->data.restart_lsn != lsn) { @@ -2519,7 +2519,7 @@ ProcessStandbyReplyMessage(void) /* * Advance our local xmin horizon when the client confirmed a flush. */ - if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) + if (MyReplicationSlot && XLogRecPtrIsValid(flushPtr)) { if (SlotIsLogical(MyReplicationSlot)) LogicalConfirmReceivedLocation(flushPtr); @@ -3536,7 +3536,7 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr || + if (!XLogRecPtrIsValid(flushPtr) || logical_decoding_ctx->reader->EndRecPtr >= flushPtr) { /* diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 3e6f4a7fc48..46e553dce4b 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -535,7 +535,7 @@ StreamLog(void) * Figure out where to start streaming. First scan the local directory. */ stream.startpos = FindStreamingStart(&stream.timeline); - if (stream.startpos == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(stream.startpos)) { /* * Try to get the starting point from the slot if any. This is @@ -556,14 +556,14 @@ StreamLog(void) * If it the starting point is still not known, use the current WAL * flush value as last resort. */ - if (stream.startpos == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(stream.startpos)) { stream.startpos = serverpos; stream.timeline = servertli; } } - Assert(stream.startpos != InvalidXLogRecPtr && + Assert(XLogRecPtrIsValid(stream.startpos) && stream.timeline != 0); /* diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 6739784a993..14ad1504678 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -482,7 +482,7 @@ StreamLogicalLog(void) } replyRequested = copybuf[pos]; - if (endpos != InvalidXLogRecPtr && walEnd >= endpos) + if (XLogRecPtrIsValid(endpos) && walEnd >= endpos) { /* * If there's nothing to read on the socket until a keepalive @@ -535,7 +535,7 @@ StreamLogicalLog(void) /* Extract WAL location for this block */ cur_record_lsn = fe_recvint64(©buf[1]); - if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) + if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos) { /* * We've read past our endpoint, so prepare to go away being @@ -583,7 +583,7 @@ StreamLogicalLog(void) goto error; } - if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos) + if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos) { /* endpos was exactly the record we just processed, we're done */ if (!flushAndSendFeedback(conn, &now)) @@ -913,14 +913,14 @@ main(int argc, char **argv) exit(1); } - if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot)) + if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot)) { pg_log_error("cannot use --create-slot or --drop-slot together with --startpos"); pg_log_error_hint("Try \"%s --help\" for more information.", progname); exit(1); } - if (endpos != InvalidXLogRecPtr && !do_start_slot) + if (XLogRecPtrIsValid(endpos) && !do_start_slot) { pg_log_error("--endpos may only be specified with --start"); pg_log_error_hint("Try \"%s --help\" for more information.", progname); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 19cf5461eac..c6d6ba79e44 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -393,7 +393,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, int count = XLOG_BLCKSZ; WALReadError errinfo; - if (private->endptr != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(private->endptr)) { if (targetPagePtr + XLOG_BLCKSZ <= private->endptr) count = XLOG_BLCKSZ; @@ -1213,7 +1213,7 @@ main(int argc, char **argv) /* first find a valid recptr to start from */ first_record = XLogFindNextRecord(xlogreader_state, private.startptr); - if (first_record == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(first_record)) pg_fatal("could not find a valid record after %X/%08X", LSN_FORMAT_ARGS(private.startptr)); diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 2589a21083b..06fcfeeb9d0 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -41,7 +41,7 @@ PG_DEPRECATED("use XLogRecPtrIsValid() instead") static inline bool XLogRecPtrIsInvalid(XLogRecPtr ptr) { - return ptr == InvalidXLogRecPtr; + return !XLogRecPtrIsValid(ptr); } /* -- 2.34.1