From af6a25eab3b9e829c312b776c1720c9b81cc159e Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Tue, 3 Mar 2026 11:44:38 +0800 Subject: [PATCH v3] Advance restart_lsn when reaching consistency without waiting Currently, the replication slot's restart_lsn is not advanced when first time building a consistent snapshot, even when it's safe to do so. This can lead to unnecessary retention of WAL segments, though the impact is rare. This commit advances restart_lsn at the consistency point if either: a serialized snapshot from a previous decoding session is available and no tranasctions have been decoded yet, or there were no running transactions when reaching consistency In both cases, it's safe and efficient to restart decoding from this LSN, reducing WAL retention without affecting decoding capabilities. --- src/backend/replication/logical/snapbuild.c | 85 +++++++++++++-------- 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7f79621b57e..30ab873b37b 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -869,31 +869,34 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder) if (!TransactionIdIsNormal(builder->xmin)) return; - /* TODO: Neater algorithm than just copying and iterating? */ - workspace = - MemoryContextAlloc(builder->context, - builder->committed.xcnt * sizeof(TransactionId)); - - /* copy xids that still are interesting to workspace */ - for (off = 0; off < builder->committed.xcnt; off++) + if (builder->committed.xcnt > 0) { - if (NormalTransactionIdPrecedes(builder->committed.xip[off], - builder->xmin)) - ; /* remove */ - else - workspace[surviving_xids++] = builder->committed.xip[off]; - } + /* TODO: Neater algorithm than just copying and iterating? */ + workspace = + MemoryContextAlloc(builder->context, + builder->committed.xcnt * sizeof(TransactionId)); + + /* copy xids that still are interesting to workspace */ + for (off = 0; off < builder->committed.xcnt; off++) + { + if (NormalTransactionIdPrecedes(builder->committed.xip[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = builder->committed.xip[off]; + } - /* copy workspace back to persistent state */ - memcpy(builder->committed.xip, workspace, - surviving_xids * sizeof(TransactionId)); + /* copy workspace back to persistent state */ + memcpy(builder->committed.xip, workspace, + surviving_xids * sizeof(TransactionId)); - elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u", - (uint32) builder->committed.xcnt, (uint32) surviving_xids, - builder->xmin, builder->xmax); - builder->committed.xcnt = surviving_xids; + elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u", + (uint32) builder->committed.xcnt, (uint32) surviving_xids, + builder->xmin, builder->xmax); + builder->committed.xcnt = surviving_xids; - pfree(workspace); + pfree(workspace); + } /* * Purge xids in ->catchange as well. The purged array must also be sorted @@ -1136,6 +1139,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact { ReorderBufferTXN *txn; TransactionId xmin; + bool incremental_build = false; /* * If we're not consistent yet, inspect the record to see whether it @@ -1143,12 +1147,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * our snapshot so others or we, after a restart, can use it. */ if (builder->state < SNAPBUILD_CONSISTENT) - { - /* returns false if there's no point in performing cleanup just yet */ - if (!SnapBuildFindSnapshot(builder, lsn, running)) - return; - } - else + incremental_build = SnapBuildFindSnapshot(builder, lsn, running); + + /* + * Serialize the snapshot only when it was built incrementally. + * + * If we built a consistent snapshot immediately at this LSN, either a + * serialized snapshot from a previous decoding session already exists, or + * there were no running transactions. In both cases, any future decoding + * session can also build a consistent snapshot at this point, so + * serialization is unnecessary. + */ + if (incremental_build && builder->state == SNAPBUILD_CONSISTENT) SnapBuildSerialize(builder, lsn); /* @@ -1197,8 +1207,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ /* - * Can't know about a serialized snapshot's location if we're not - * consistent. + * Cannot advance restart_lsn to a point where a consistent snapshot cannot + * be built immediately in the next decoding round (either by restoring a + * serialized snapshot or by confirming there are no running transactions). + * Doing so could cause data prior to reaching consistency to be lost. */ if (builder->state < SNAPBUILD_CONSISTENT) return; @@ -1221,6 +1233,15 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact XLogRecPtrIsValid(builder->last_serialized_snapshot)) LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); + + /* + * With no active transactions, we reached consistency either because the + * xl_running_xacts record showed no running transactions or because we + * restored a serialized snapshot from another decoding session. In either + * case, it's safe to restart from this LSN. + */ + else if (txn == NULL) + LogicalIncreaseRestartDecodingForSlot(lsn, lsn); } @@ -1230,8 +1251,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * Helper function for SnapBuildProcessRunningXacts() while we're not yet * consistent. * - * Returns true if there is a point in performing internal maintenance/cleanup - * using the xl_running_xacts record. + * Returns true when the snapshot is built incrementally (whether still in + * progress or just completed). Returns false when the snapshot is built + * immediately either by restoring a serialized snapshot from disk or because + * there were no running transactions. */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) -- 2.51.1.windows.1