diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index b33e49c0b1..3310f6bd2c 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -88,6 +88,70 @@ stop (1 row) +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_xid s1_checkpoint s1_get_changes s0_commit s1_get_changes s1_checkpoint s0_drop s0_init s1_checkpoint s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_xid: SELECT 'assigned' FROM pg_current_xact_id(); +?column? +-------- +assigned +(1 row) + +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s1_checkpoint: CHECKPOINT; +step s0_drop: SELECT 'drop' FROM pg_drop_replication_slot('isolation_slot'); +?column? +-------- +drop +(1 row) + +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_truncate s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); ?column? diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index 770dbd642d..cbf99f1320 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -17,11 +17,13 @@ teardown session "s0" setup { SET synchronous_commit=on; } step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_drop" { SELECT 'drop' FROM pg_drop_replication_slot('isolation_slot'); } step "s0_begin" { BEGIN; } step "s0_savepoint" { SAVEPOINT sp1; } step "s0_truncate" { TRUNCATE tbl1; } step "s0_insert" { INSERT INTO tbl1 VALUES (1); } step "s0_insert2" { INSERT INTO user_cat VALUES (1); } +step "s0_xid" { SELECT 'assigned' FROM pg_current_xact_id(); } step "s0_commit" { COMMIT; } session "s1" @@ -54,9 +56,25 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s # containing catalog changes. permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" +# XXX: This scenario simulates the execution of both the first permutation and the +# part of the second permutation in the one scenario. +# +# The second from the last decoding restarts from the RUNNING_XACTS record that has +# one running transaction and is emitted during the second checkpoint. Then it setups +# the space and the count for the initial running transactions. +# +# The problem was that when it finishes the allocated, it forgot to reset them. The +# allocated space is freed along with its memory context but the counter is not +# reset. After that, since we recreate the replication with the same name by another +# session, the slot's restart_lsn is re-initialized to a new LSN and restarting the +# decoding from there (the last decoding) doesn't need to restore the snapshot. The +# session ends up using unreset data. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_xid" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" "s1_checkpoint" "s0_drop" "s0_init" "s1_checkpoint" "s1_get_changes" + # The last decoding restarts from the first checkpoint and adds invalidation # messages generated by "s0_truncate" to the subtransaction. While # processing the commit record for the top-level transaction, we decide # to skip this xact but ensure that corresponding invalidation messages # get processed. permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index cdf4aa01e9..d2fe2d222e 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -343,6 +343,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, MemoryContextSwitchTo(oldcontext); + /* Make sure the initial running transactions array is empty */ + Assert(InitialRunningXacts == NULL && NInitialRunningXacts == 0); + return builder; } @@ -363,6 +366,10 @@ FreeSnapshotBuilder(SnapBuild *builder) /* other resources are deallocated via memory context reset */ MemoryContextDelete(context); + + /* InitialRunningXacts is freed along with the context */ + InitialRunningXacts = NULL; + NInitialRunningXacts = 0; } /*