From 1a912000d9725cd23b5cdf7918bb6b7b3dac9a48 Mon Sep 17 00:00:00 2001 From: ChangAo Chen Date: Tue, 23 Jan 2024 21:10:26 +0800 Subject: [PATCH] Track transactions committed in BUILDING_SNAPSHOT. We previously didn't track transactions that begin after BUILDING_SNAPSHOT and commit before FULL_SNAPSHOT when building historic snapshot in logical decoding. This can cause transactions that begin after FULL_SNAPSHOT to take an incorrect snapshot because transactions committed in BUILDING_SNAPSHOT are not in the snapshot. To fix it, we track transactions that committed in BUILDING_SNAPSHOT and add them to historic snapshot if they have catalog changes. Note that there is no need to track transactions that begin before BUILDING_SNAPSHOT because they are all finished when we reach FULL_SNAPSHOT. --- src/backend/replication/logical/decode.c | 88 +++++++++++++++++++-- src/backend/replication/logical/snapbuild.c | 4 + 2 files changed, 85 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6a0f22b209..ffb2d55983 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -51,6 +51,8 @@ static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeXactWhenBuildingSnapshot(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid, bool two_phase); @@ -210,10 +212,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If the snapshot isn't yet fully built, we cannot decode anything, so - * bail out. + * bail out. However, we need track some transactions committed in + * BUILDING_SNAPSHOT or we may have an incorrect historic snapshot + * when reaching FULL_SNAPSHOT. */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + { + if (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT) + DecodeXactWhenBuildingSnapshot(ctx, buf); return; + } switch (info) { @@ -413,10 +421,11 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding changes. + * Even though we don't have snapshot in BUILDING_SNAPSHOT, we need to + * handle XLOG_HEAP2_NEW_CID which mark a transaction has catalog changes. + * If we are just fast-forwarding, no need to handle anything. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT || ctx->fast_forward) return; @@ -472,10 +481,11 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding data changes. + * Even though we don't have snapshot in BUILDING_SNAPSHOT, we need to + * handle XLOG_HEAP_INPLACE which mark a transaction has catalog changes. + * If we are just fast-forwarding, no need to handle anything. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT || ctx->fast_forward) return; @@ -657,6 +667,70 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message->message + message->prefix_size); } +/* + * Handle rmgr XACT_ID records when we are in BUILDING_SNAPSHOT. + * Currently, we only handle XLOG_XACT_COMMIT(_PREPARED) and + * XLOG_XACT_INVALIDATIONS. + * + * XLOG_XACT_COMMIT(_PREPARED): + * we should add the transaction to historic snapshot if it + * has catalog changes and begins after BUILDING_SNAPSHOT. + * XLOG_XACT_INVALIDATIONS: + * we only track catalog modified transaction in historic + * snapshot, so we handle it which mark a transaction has + * catalog changes. + */ +static void +DecodeXactWhenBuildingSnapshot(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; + + switch (info) + { + case XLOG_XACT_COMMIT: + case XLOG_XACT_COMMIT_PREPARED: + { + xl_xact_commit *xlrec; + xl_xact_parsed_commit parsed; + TransactionId xid; + + xlrec = (xl_xact_commit *) XLogRecGetData(r); + ParseCommitRecord(XLogRecGetInfo(r), xlrec, &parsed); + + if (!TransactionIdIsValid(parsed.twophase_xid)) + xid = XLogRecGetXid(r); + else + xid = parsed.twophase_xid; + + DecodeCommit(ctx, buf, &parsed, xid, false); + } + break; + case XLOG_XACT_ABORT: + case XLOG_XACT_ABORT_PREPARED: + case XLOG_XACT_ASSIGNMENT: + break; + case XLOG_XACT_INVALIDATIONS: + { + TransactionId xid; + + xid = XLogRecGetXid(r); + + if (TransactionIdIsValid(xid)) + { + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, + buf->origptr); + } + } + break; + case XLOG_XACT_PREPARE: + break; + default: + elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); + } +} + /* * Consolidated commit record handling between the different form of commit * records. diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index a0b7947d2f..f47bddcecf 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -827,6 +827,10 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, */ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + /* we just call ReorderBufferXidSetCatalogChanges() in BUILDING_SNAPSHOT */ + if (builder->state < SNAPBUILD_FULL_SNAPSHOT) + return; + ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, xlrec->target_locator, xlrec->target_tid, xlrec->cmin, xlrec->cmax, -- 2.34.1