From 725249d8ec56a7ea3219df801dc7b93f93eea145 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 11 Jul 2022 21:49:06 +0900 Subject: [PATCH v7] Fix catalog lookup with the wrong snapshot during logical decoding. Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION records to know if the transaction has modified the catalog, and that information is not serialized to snapshot. Therefore, if the logical decoding decodes only the commit record of the transaction that actually has modified a catalog, we missed adding its XID to the snapshot. We ended up looking at catalogs with the wrong snapshot. To fix this problem, this changes the snapshot builder so that it remembers the initial running transaction written in the xl_running_xacts record that we decoded first, and mark the transaction as containing catalog changes if it's in the list of the initial running transactions and its commit record has XACT_XINFO_HAS_INVALS. This has false positive; we could end up adding the transaction that didn't change catalog to the snapshot since we cannot distinguish whether the transaction has catalog changes only by checking the COMMIT record. It doesn't have the information on which (sub) transaction has catalog changes, and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the transaction has catalog change. But it doesn't become a problem since we use historic snapshot only for reading system catalogs. On the master branch, we took a more future-proof approach -- writing catalog modifying transactions to the serialized snapshot. But we cannot backpatch it because of change in SnapBuild. Also, to avoid ABI breakage, we store the array of the initial running transactions in the static variables InitialRunningXacts and NInitialRunningXacts, but not in ReorderBuffer. Back-patch to all supported released. Reported-by: Mike Oh Author: Masahiko Sawada Reviewed-by: Amit Kapila Reviewed-by: Takamichi Osumi Reviewed-by: Kyotaro Horiguchi Reviewed-by: Bertrand Drouvot Reviewed-by: Shi yu Reviewed-by: Ahsan Hadi Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com Backpatch-through: 10 --- contrib/test_decoding/Makefile | 2 +- .../expected/catalog_change_snapshot.out | 44 ++++++ .../specs/catalog_change_snapshot.spec | 39 +++++ src/backend/replication/logical/decode.c | 15 ++ src/backend/replication/logical/snapbuild.c | 143 +++++++++++++++++- src/include/replication/snapbuild.h | 5 + 6 files changed, 240 insertions(+), 8 deletions(-) create mode 100644 contrib/test_decoding/expected/catalog_change_snapshot.out create mode 100644 contrib/test_decoding/specs/catalog_change_snapshot.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 9a31e0b879..4553252d75 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot + twophase_snapshot catalog_change_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out new file mode 100644 index 0000000000..dc4f9b7018 --- /dev/null +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 0000000000..662760fbcf --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,39 @@ +# Test decoding only the commit record of the transaction that have +# modified catalogs. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACT record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the XACT_RUNNING record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACT +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 92dfafc632..a164442436 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -691,6 +691,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, commit_time = parsed->origin_timestamp; } + /* + * If the COMMIT record has invalidation messages, it could have catalog + * changes. We check if it's in the list of the initial running transactions + * and then mark it as containing catalog change. + * + * This must be done before SnapBuildCommitTxn() so that we can include + * catalog change transactions to the historic snapshot. + */ + if (parsed->xinfo & XACT_XINFO_HAS_INVALS) + SnapBuildInitialXactSetCatalogChanges(ctx->snapshot_builder, + xid, + parsed->nsubxacts, + parsed->subxacts, + buf->origptr); + SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, parsed->nsubxacts, parsed->subxacts); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 6df602485b..3e9d1a7931 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -250,8 +250,38 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* + * Array of transactions and subtransactions that were running when + * the xl_running_xacts record that we decoded first was written. + * The array is sorted in xidComparator order. Xids are only removed + * from the array when decoding xl_running_xacts record, and then + * the array eventually becomes an empty. This array is allocated in + * builder->context so its lifetime is the same as the snapshot builder. + * + * We rely on HEAP2_NEW_CID records and XACT_INVALIDATIONS to know + * if the transaction has changed the catalog, and that information + * is not serialized to SnapBuilder. Therefore, if the logical + * decoding decodes the commit record of the transaction that actually + * has done catalog changes without these records, we miss to add + * the xid to the snapshot, and end up looking at catalogs with the + * wrong snapshot. To avoid this problem, if the COMMIT record of + * the xid listed in InitialRunningXacts has XACT_XINFO_HAS_INVALS + * flag, we mark both the top transaction and its substransactions + * as containing catalog changes. + * + * We could end up adding the transaction that didn't change catalog + * to the snapshot since we cannot distinguish whether the transaction + * has catalog changes only by checking the COMMIT record. It doesn't + * have the information on which (sub) transaction has catalog changes, + * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the + * transaction has catalog change. But it doesn't become a problem since + * we use historic snapshot only for reading system catalogs. + */ +static TransactionId *InitialRunningXacts = NULL; +static int NInitialRunningXacts = 0; + +/* ->committed and InitailRunningXacts manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -879,12 +909,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed and the initial + * running transactions that are smaller than ->xmin. Those won't ever get + * checked via the ->committed or InitialRunningXacts array, respectively. + * The committed xids will get checked via the clog machinery. + * + * We can ideally remove the transaction from InitialRunningXacts array + * once it is finished (committed/aborted) but that could be costly as we need + * to maintain the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -919,6 +954,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* Quick exit if there is no initial running transactions */ + if (likely(NInitialRunningXacts == 0)) + return; + + /* bound check if there is at least one transaction to remove */ + if (!NormalTransactionIdPrecedes(InitialRunningXacts[0], + builder->xmin)) + return; + + /* + * purge xids in InitialRunningXacts as well. The purged array must also be + * sorted in xidComparator order. + */ + workspace = + MemoryContextAlloc(builder->context, + NInitialRunningXacts * sizeof(TransactionId)); + surviving_xids = 0; + for (off = 0; off < NInitialRunningXacts; off++) + { + if (NormalTransactionIdPrecedes(InitialRunningXacts[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = InitialRunningXacts[off]; + } + + if (surviving_xids > 0) + memcpy(InitialRunningXacts, workspace, + sizeof(TransactionId) * surviving_xids); + else + { + pfree(InitialRunningXacts); + InitialRunningXacts = NULL; + } + + elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u", + (uint32) NInitialRunningXacts, + (uint32) surviving_xids, + builder->xmin); + + NInitialRunningXacts = surviving_xids; + pfree(workspace); } /* @@ -1104,6 +1182,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ if (builder->state < SNAPBUILD_CONSISTENT) { + /* + * Remember the transactions and subtransactions that were running + * when xl_running_xacts record that we decoded first was written. + */ + if (builder->state == SNAPBUILD_START) + { + int nxacts = running->subxcnt + running->xcnt; + Size sz = sizeof(TransactionId) * nxacts; + + NInitialRunningXacts = nxacts; + InitialRunningXacts = MemoryContextAlloc(builder->context, sz); + memcpy(InitialRunningXacts, running->xids, sz); + qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator); + } + /* returns false if there's no point in performing cleanup just yet */ if (!SnapBuildFindSnapshot(builder, lsn, running)) return; @@ -1126,7 +1219,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1993,3 +2086,39 @@ CheckPointSnapBuild(void) } FreeDir(snap_dir); } + +/* + * If the given xid is in the list of the initial running xacts, we mark both it + * and its subtransactions as containing catalog changes if not yet. + */ +void +SnapBuildInitialXactSetCatalogChanges(SnapBuild *builder, TransactionId xid, + int subxcnt, TransactionId *subxacts, + XLogRecPtr lsn) +{ + /* + * Skip if there is no initial running xacts information or the + * transaction is already marked as containing catalog changes. + */ + if (likely((NInitialRunningXacts == 0) || + ReorderBufferXidHasCatalogChanges(builder->reorder, xid))) + return; + + /* + * If this committed transaction is the one that was running at the time + * when decoding the first RUNNING_XACTS record and have done catalog + * changes, we can mark both the top transaction and its subtransactions + * as containing catalog changes. + */ + if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts, + sizeof(TransactionId), xidComparator) != NULL) + { + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + + for (int i = 0; i < subxcnt; i++) + { + ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + } + } +} diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 3604621e88..d8f15d70e0 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -90,4 +90,9 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildInitialXactSetCatalogChanges(SnapBuild *builder, + TransactionId xid, + int subxcnt, + TransactionId *subxacts, + XLogRecPtr lsn); #endif /* SNAPBUILD_H */ -- 2.24.3 (Apple Git-128)