From 98a6ea9d18e0d7a864549db4e388f6e767be0edb Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 6 Jul 2022 12:53:36 +0900 Subject: [PATCH] Add catalog modifying transactions to logical decoding serialized snapshot. Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION records to know if the transaction has modified the catalog, and that information is not serialized to snapshot. Therefore, if the logical decoding decodes only the commit record of the transaction that actually has modified a catalog, we missed adding its XID to the snapshot. We ended up looking at catalogs with the wrong snapshot. To fix this problem, this change adds the list of transaction IDs and sub-transaction IDs, that have modified catalogs and are running when snapshot serialization, to the serialized snapshot. When decoding a COMMIT record, we check both the list and the ReorderBuffer to see if if the transaction has modified catalogs. Since this adds additional information to the serialized snapshot, we cannot backpatch it. For back branches, we take another approach; remember the last-running-xacts list of the first decoded RUNNING_XACTS record and check if the transaction whose commit record has XACT_XINFO_HAS_INVALIS and whose XID is in the list. This doesn't require any file format changes but the transaction will end up being added to the snapshot even if it has only relcache invalidations. --- contrib/test_decoding/Makefile | 2 +- .../expected/catalog_change_snapshot.out | 44 ++++ .../specs/catalog_change_snapshot.spec | 39 ++++ .../replication/logical/reorderbuffer.c | 41 ++++ src/backend/replication/logical/snapbuild.c | 193 +++++++++++------- src/include/replication/reorderbuffer.h | 1 + 6 files changed, 248 insertions(+), 72 deletions(-) create mode 100644 contrib/test_decoding/expected/catalog_change_snapshot.out create mode 100644 contrib/test_decoding/specs/catalog_change_snapshot.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index b220906479..c7ce603706 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error + twophase_snapshot slot_creation_error catalog_change_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out new file mode 100644 index 0000000000..dc4f9b7018 --- /dev/null +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 0000000000..bffd856bbb --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,39 @@ +# Test that decoding only the commit record of the transaction that have +# catalog-changed. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACT record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the XACT_RUNNING record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACT +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8da5f9089c..266376583f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -4821,6 +4821,47 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->toast_hash = NULL; } +/* + * Return palloc'ed array of the transactions that have changed catalogs. + * The returned array is sorted in xidComparator order. + * + * The caller must free the returned array when done with it. + */ +TransactionId * +ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb, size_t *xcnt_p) +{ + HASH_SEQ_STATUS hash_seq; + ReorderBufferTXNByIdEnt *ent; + TransactionId *xids = NULL; + size_t xcnt = 0; + size_t xcnt_space = 64; /* arbitrary number */ + + hash_seq_init(&hash_seq, rb->by_txn); + while ((ent = hash_seq_search(&hash_seq)) != NULL) + { + ReorderBufferTXN *txn = ent->txn; + + if (!rbtxn_has_catalog_changes(txn)) + continue; + + /* Initialize XID array */ + if (xcnt == 0) + xids = (TransactionId *) palloc(sizeof(TransactionId) * xcnt_space); + + if (xcnt >= xcnt_space) + { + xcnt_space *= 2; + xids = repalloc(xids, sizeof(TransactionId) * xcnt_space); + } + + xids[xcnt++] = txn->xid; + } + + qsort(xids, xcnt, sizeof(TransactionId), xidComparator); + + *xcnt_p = xcnt; + return xids; +} /* --------------------------------------- * Visibility support for logical decoding diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 1119a12db9..e3e7c3dd23 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -241,6 +241,28 @@ struct SnapBuild */ TransactionId *xip; } committed; + + /* + * Array of transactions and subtransactions that had modified catalogs + * and were running when the snapshot was serialized. + * + * We normally rely on HEAP2_NEW_CID records and XLOG_XACT_INVALIDATIONS to + * know if the transaction has changed the catalog. But it could happen that + * the logical decoding decodes only the commit record of the transaction. + * This array keeps track of the transactions that have modified catalogs + * and were running when serializing a snapshot, and this array is used to + * add such transactions to the snapshot. + * + * This field is updated when restoring a serialized snapshot. + */ + struct + { + /* number of transactions */ + size_t xcnt; + + /* This array must be sorted in xidComparator order */ + TransactionId *xip; + } catchanges; }; /* @@ -262,6 +284,8 @@ static void SnapBuildSnapIncRefcount(Snapshot snap); static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static inline bool SnapBuildXidHasCatalogChange(SnapBuild *builder, TransactionId xid); + /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); @@ -269,6 +293,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof /* serialization functions */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); /* * Allocate a new snapshot builder. @@ -306,6 +331,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); builder->committed.includes_all_transactions = true; + builder->catchanges.xcnt = 0; + builder->catchanges.xip = NULL; + builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; @@ -983,7 +1011,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, * Add subtransaction to base snapshot if catalog modifying, we don't * distinguish to toplevel transactions there. */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid)) + if (SnapBuildXidHasCatalogChange(builder, subxid)) { sub_needs_timetravel = true; needs_snapshot = true; @@ -1012,7 +1040,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } /* if top-level modified catalog, it'll need a snapshot */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + if (SnapBuildXidHasCatalogChange(builder, xid)) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); @@ -1089,6 +1117,25 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } +/* + * Check both the snapshot and the reorder buffer to see if the given + * transaction has modified catalogs. + */ +static inline bool +SnapBuildXidHasCatalogChange(SnapBuild *builder, TransactionId xid) +{ + if (builder->catchanges.xcnt > 0) + { + if (bsearch(&xid, builder->catchanges.xip, builder->catchanges.xcnt, + sizeof(TransactionId), xidComparator) != NULL) + return true; + + /* fall through to check the reorder buffer */ + } + + return ReorderBufferXidHasCatalogChanges(builder->reorder, xid); +} + /* ----------------------------------- * Snapshot building functions dealing with xlog records @@ -1438,6 +1485,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) * * struct SnapBuildOnDisk; * TransactionId * committed.xcnt; (*not xcnt_space*) + * TransactionId * catchanges.xcnt; * */ typedef struct SnapBuildOnDisk @@ -1493,6 +1541,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; + MemoryContext old_ctx; char *ondisk_c; int fd; char tmppath[MAXPGPATH]; @@ -1578,8 +1627,22 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", tmppath))); + /* + * Update the catalog modifying transactions that are yet not committed. + * + * We switch the memory context in order to make sure that the space for + * catalog modifying transactions are allocated in the snapshot builder + * context. + */ + if (builder->catchanges.xip) + pfree(builder->catchanges.xip); + old_ctx = MemoryContextSwitchTo(builder->context); + builder->catchanges.xip = ReorderBufferGetCatalogChangesXacts(builder->reorder, + &builder->catchanges.xcnt); + MemoryContextSwitchTo(old_ctx); + needed_length = sizeof(SnapBuildOnDisk) + - sizeof(TransactionId) * builder->committed.xcnt; + sizeof(TransactionId) * (builder->committed.xcnt + builder->catchanges.xcnt); ondisk_c = MemoryContextAllocZero(builder->context, needed_length); ondisk = (SnapBuildOnDisk *) ondisk_c; @@ -1598,6 +1661,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.snapshot = NULL; ondisk->builder.reorder = NULL; ondisk->builder.committed.xip = NULL; + ondisk->builder.catchanges.xip = NULL; COMP_CRC32C(ondisk->checksum, &ondisk->builder, @@ -1609,6 +1673,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) COMP_CRC32C(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; + /* copy catalog modifying xacts */ + sz = sizeof(TransactionId) * builder->catchanges.xcnt; + memcpy(ondisk_c, builder->catchanges.xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + FIN_CRC32C(ondisk->checksum); /* we have valid data now, open tempfile and write it there */ @@ -1707,7 +1777,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) int fd; char path[MAXPGPATH]; Size sz; - int readBytes; pg_crc32c checksum; /* no point in loading a snapshot if we're already there */ @@ -1739,29 +1808,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* read statically sized portion of snapshot */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize); - pgstat_report_wait_end(); - if (readBytes != SnapBuildOnDiskConstantSize) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, - (Size) SnapBuildOnDiskConstantSize))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path); if (ondisk.magic != SNAPBUILD_MAGIC) ereport(ERROR, @@ -1781,57 +1828,21 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); - pgstat_report_wait_end(); - if (readBytes != sizeof(SnapBuild)) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sizeof(SnapBuild)))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path); COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); /* restore committed xacts information */ sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, ondisk.builder.committed.xip, sz); - pgstat_report_wait_end(); - if (readBytes != sz) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sz))); - } + SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path); COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); + /* restore catalog modifying xacts information */ + sz = sizeof(TransactionId) * ondisk.builder.catchanges.xcnt; + ondisk.builder.catchanges.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchanges.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.catchanges.xip, sz); + if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), @@ -1885,6 +1896,14 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } ondisk.builder.committed.xip = NULL; + /* set catalog modifying transactions */ + if (builder->catchanges.xip) + pfree(builder->catchanges.xip); + builder->catchanges.xcnt = ondisk.builder.catchanges.xcnt; + builder->catchanges.xip = ondisk.builder.catchanges.xip; + + ondisk.builder.catchanges.xip = NULL; + /* our snapshot is not interesting anymore, build a new one */ if (builder->snapshot != NULL) { @@ -1909,6 +1928,38 @@ snapshot_not_interesting: return false; } +/* + * Read the contents of the serialized snapshot to the dest. + */ +static void +SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path) +{ + int readBytes; + + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); + readBytes = read(fd, dest, size); + pgstat_report_wait_end(); + if (readBytes != size) + { + int save_errno = errno; + + CloseTransientFile(fd); + + if (readBytes < 0) + { + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sizeof(SnapBuild)))); + } +} + /* * Remove all serialized snapshots that are not required anymore because no * slot can need them. This doesn't actually have to run during a checkpoint, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4a01f877e5..07e378d3ef 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -677,6 +677,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid); extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid); extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); +extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb, size_t *xcnt_p); extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); -- 2.24.3 (Apple Git-128)