From 16ae28e3b3b4c10bd82ad679194325eb56aa3965 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Sun, 27 Apr 2025 14:53:35 +0800 Subject: [PATCH v2] Fix premature xmin advancement during fast forward decoding Currently, fast-forward decoding could prematurely advance catalog_xmin, resulting in required catalog data being removed by vacuum. Commit f49a80c4 fixed a similar issue where the logical slot's catalog_xmin was advanced prematurely during non-fast-forward mode, which involved using the minimum transaction ID remaining visible in the base snapshot when determining candidates for catalog_xmin. However, this fix did not apply to fast-forward mode, as a base snapshot is not built when decoding changes in this mode. Consequently, catalog_xmin was directly advanced to the oldest running transaction ID found in the latest running_xacts record. To resolve this, the commit allows the base snapshot to be built during fast forward decoding. --- .../test_decoding/expected/oldest_xmin.out | 41 +++++++++++++++++++ contrib/test_decoding/specs/oldest_xmin.spec | 5 +++ src/backend/replication/logical/decode.c | 38 +++++++++++------ 3 files changed, 71 insertions(+), 13 deletions(-) diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out index dd6053f9c1f..57268b38d33 100644 --- a/contrib/test_decoding/expected/oldest_xmin.out +++ b/contrib/test_decoding/expected/oldest_xmin.out @@ -38,3 +38,44 @@ COMMIT stop (1 row) + +starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_advance_slot s0_advance_slot s1_commit s0_vacuum s0_get_changes +step s0_begin: BEGIN; +step s0_getxid: SELECT pg_current_xact_id() IS NULL; +?column? +-------- +f +(1 row) + +step s1_begin: BEGIN; +step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3)); +step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos; +step s0_commit: COMMIT; +step s0_checkpoint: CHECKPOINT; +step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); +slot_name +-------------- +isolation_slot +(1 row) + +step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); +slot_name +-------------- +isolation_slot +(1 row) + +step s1_commit: COMMIT; +step s0_vacuum: VACUUM pg_attribute; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +------------------------------------------------------ +BEGIN +table public.harvest: INSERT: fruits[basket]:'(1,2,3)' +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec index 88bd30f5ff7..7f2fe3d7ed7 100644 --- a/contrib/test_decoding/specs/oldest_xmin.spec +++ b/contrib/test_decoding/specs/oldest_xmin.spec @@ -25,6 +25,7 @@ step "s0_commit" { COMMIT; } step "s0_checkpoint" { CHECKPOINT; } step "s0_vacuum" { VACUUM pg_attribute; } step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } +step "s0_advance_slot" { SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); } session "s1" setup { SET synchronous_commit=on; } @@ -40,3 +41,7 @@ step "s1_commit" { COMMIT; } # will be removed (xmax set) before T1 commits. That is, interlocking doesn't # forbid modifying catalog after someone read it (and didn't commit yet). permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes" + +# Perform the same testing process as described above, but use advance_slot to +# forces xmin advancement during fast forward decoding. +permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_advance_slot" "s0_advance_slot" "s1_commit" "s0_vacuum" "s0_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 297eb11b5a8..9c2f1f17c7f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -362,20 +362,24 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP2_MULTI_INSERT: - if (!ctx->fast_forward && - SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: + if (!ctx->fast_forward) { xl_heap_new_cid *xlrec; @@ -422,16 +426,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding data changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeInsert(ctx, buf); break; @@ -442,17 +450,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ case XLOG_HEAP_HOT_UPDATE: case XLOG_HEAP_UPDATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeUpdate(ctx, buf); break; case XLOG_HEAP_DELETE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeDelete(ctx, buf); break; case XLOG_HEAP_TRUNCATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeTruncate(ctx, buf); break; @@ -480,7 +491,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_HEAP_CONFIRM: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeSpecConfirm(ctx, buf); break; -- 2.30.0.windows.2