From 19612c969d68089d6ddca6f78910ccefc556f35a Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Tue, 3 Mar 2026 11:44:38 +0800 Subject: [PATCH v3] Advance restart_lsn when reaching consistency without waiting Currently, the replication slot's restart_lsn is not advanced when first time building a consistent snapshot, even when it's safe to do so. This can lead to unnecessary retention of WAL segments, though the impact is rare. This commit advances restart_lsn at the consistency point if either: a serialized snapshot from a previous decoding session is available, or there were no running transactions when reaching consistency In both cases, it's safe and efficient to restart decoding from this LSN, reducing WAL retention without affecting decoding capabilities. --- src/backend/replication/logical/snapbuild.c | 86 ++++++++++----------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7f79621b57e..252894dc090 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -167,7 +167,7 @@ static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, Transaction uint32 xinfo); /* xlog reading helper functions for SnapBuildProcessRunningXacts */ -static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); +static void SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); /* serialization functions */ @@ -869,31 +869,34 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder) if (!TransactionIdIsNormal(builder->xmin)) return; - /* TODO: Neater algorithm than just copying and iterating? */ - workspace = - MemoryContextAlloc(builder->context, - builder->committed.xcnt * sizeof(TransactionId)); - - /* copy xids that still are interesting to workspace */ - for (off = 0; off < builder->committed.xcnt; off++) + if (builder->committed.xcnt > 0) { - if (NormalTransactionIdPrecedes(builder->committed.xip[off], - builder->xmin)) - ; /* remove */ - else - workspace[surviving_xids++] = builder->committed.xip[off]; - } + /* TODO: Neater algorithm than just copying and iterating? */ + workspace = + MemoryContextAlloc(builder->context, + builder->committed.xcnt * sizeof(TransactionId)); + + /* copy xids that still are interesting to workspace */ + for (off = 0; off < builder->committed.xcnt; off++) + { + if (NormalTransactionIdPrecedes(builder->committed.xip[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = builder->committed.xip[off]; + } - /* copy workspace back to persistent state */ - memcpy(builder->committed.xip, workspace, - surviving_xids * sizeof(TransactionId)); + /* copy workspace back to persistent state */ + memcpy(builder->committed.xip, workspace, + surviving_xids * sizeof(TransactionId)); - elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u", - (uint32) builder->committed.xcnt, (uint32) surviving_xids, - builder->xmin, builder->xmax); - builder->committed.xcnt = surviving_xids; + elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u", + (uint32) builder->committed.xcnt, (uint32) surviving_xids, + builder->xmin, builder->xmax); + builder->committed.xcnt = surviving_xids; - pfree(workspace); + pfree(workspace); + } /* * Purge xids in ->catchange as well. The purged array must also be sorted @@ -1143,11 +1146,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * our snapshot so others or we, after a restart, can use it. */ if (builder->state < SNAPBUILD_CONSISTENT) - { - /* returns false if there's no point in performing cleanup just yet */ - if (!SnapBuildFindSnapshot(builder, lsn, running)) - return; - } + SnapBuildFindSnapshot(builder, lsn, running); else SnapBuildSerialize(builder, lsn); @@ -1197,8 +1196,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ /* - * Can't know about a serialized snapshot's location if we're not - * consistent. + * Cannot advance restart_lsn to a point where a consistent snapshot cannot + * be built immediately in the next decoding round (either by restoring a + * serialized snapshot or by confirming there are no running transactions). + * Doing so could cause data prior to reaching consistency to be lost. */ if (builder->state < SNAPBUILD_CONSISTENT) return; @@ -1221,6 +1222,15 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact XLogRecPtrIsValid(builder->last_serialized_snapshot)) LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); + + /* + * Reaching here indicates we built the snapshot either by restoring a + * serialized snapshot from a previous decoding session or because there + * were no running transactions. In either case, it's safe and efficient to + * restart from this LSN next time. + */ + else + LogicalIncreaseRestartDecodingForSlot(lsn, lsn); } @@ -1229,11 +1239,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * * Helper function for SnapBuildProcessRunningXacts() while we're not yet * consistent. - * - * Returns true if there is a point in performing internal maintenance/cleanup - * using the xl_running_xacts record. */ -static bool +static void SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { /* --- @@ -1278,7 +1285,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon); - return true; + return; } /* @@ -1312,8 +1319,6 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn errmsg("logical decoding found consistent point at %X/%08X", LSN_FORMAT_ARGS(lsn)), errdetail("There are no running transactions.")); - - return false; } /* @@ -1324,8 +1329,6 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn !builder->in_slot_creation && SnapBuildRestore(builder, lsn)) { - /* there won't be any state to cleanup */ - return false; } /* @@ -1410,13 +1413,6 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn LSN_FORMAT_ARGS(lsn)), errdetail("There are no old transactions anymore.")); } - - /* - * We already started to track running xacts and need to wait for all - * in-progress ones to finish. We fall through to the normal processing of - * records so incremental cleanup can be performed. - */ - return true; } /* --- -- 2.51.1.windows.1