Thread: Fix slot's xmin advancement and subxact's lost snapshots in decoding.
Hello, I've discovered a couple of bugs in logical decoding code, both leading to incorrect decoding results in somewhat rare cases. First, xmin of slots is advanced too early. This affects the results only when interlocking allows to perform DDL concurrently with looking at the schema. In fact, I was not aware about such DDL until at https://www.postgresql.org/message-id/flat/87tvu0p0jm.fsf%40ars-thinkpad#87tvu0p0jm.fsf@ars-thinkpad I raised this question and Andres pointed out ALTER of composite types. Probably there are others, I am not sure; it would be interesting to know them. Another problem is that new snapshots are never queued to known subxacts. It means decoding results can be wrong if toplevel doesn't write anything while subxact does. Please see detailed description of the issues, tests which reproduce them and fixes in the attached patch. -- Arseny Sher Postgres Professional: http://www.postgrespro.com The Russian Postgres Company From 0d03ef172a29ce64a6c5c26f484f0f3186895125 Mon Sep 17 00:00:00 2001 From: Arseny Sher <sher-ars@yandex.ru> Date: Sat, 7 Apr 2018 08:22:30 +0300 Subject: [PATCH] Fix slot's xmin advancement and subxact's lost snapshots in decoding. There are two in some way related bugs here. First, xmin of logical slots was advanced too early. During xl_running_xacts processing, xmin of slot was set to oldest running xid in the record, which is wrong: actually snapshots which will be used for not-yet-replayed at this moment xacts might consider older xacts as running too. The problem wasn't noticed earlier because DDL which allows to delete tuple (set xmax) while some another not yet committed xact looks at it is pretty rare, if not unique: e.g. all forms of ALTER TABLE which change schema acquire ACCESS EXCLUSIVE lock conflicting with any inserts. Included test case uses ALTER of a composite type, which doesn't have such interlocking. To deal with this, we must be able to quickly retrieve oldest xmin (oldest running xid among all assigned snapshots) from ReorderBuffer. Proposed fix adds yet another list of ReorderBufferTXNs to it, where xacts are sorted by LSN of the base snapshot, because * Plain usage of current ReorderBufferGetOldestTXN (to get base snapshot's xmin) is wrong, because order by LSN of the first record is *not* the same as order by LSN of record on which base snapshot was assigned: many record types don't need the snapshot, e.g. xl_xact_assignment. We could remember snapbuilder's xmin during processing of the first record, but that's too conservative and we would keep more tuples from vacuuming than needed. * On the other hand, we can't fully replace existing order by LSN of the first change with LSN of the base snapshot, because if we do that, WAL records which didn't forced receival of base snap might be recycled before we replay the xact, as ReorderBufferGetOldestTXN determines which LSN still must be kept. Second issue concerns subxacts. Currently SnapBuildDistributeNewCatalogSnapshot never spares a snapshot to known subxact. It means that if toplevel doesn't have base snapshot (never did writes), there was an assignment record (subxact is known) and subxact is writing, no new snapshots are being queued. Probably the simplest fix would be to employ freshly baked by_base_snapshot_lsn list and just distribute snapshot to all xacts with base snapshot whether they are subxacts or not. However, in this case we would queue unneccessary snapshot to all already known subxacts. To avoid this, in this patch base snapshot of known subxact is immediately passed to the toplevel as soon as we learn the kinship / base snap assigned -- we have to do that anyway in ReorderBufferCommitChild; new snaps are distributed only to top-level xacts (or unknown subxacts) as before. Also, minor memory leak is fixed: counter of toplevel's old base snapshot was not decremented when snap has been passed from child. --- contrib/test_decoding/Makefile | 3 +- contrib/test_decoding/expected/oldest_xmin.out | 27 +++ .../test_decoding/expected/subxact_snap_pass.out | 49 ++++++ contrib/test_decoding/specs/oldest_xmin.spec | 37 +++++ contrib/test_decoding/specs/subxact_snap_pass.spec | 42 +++++ src/backend/replication/logical/reorderbuffer.c | 183 ++++++++++++++------- src/backend/replication/logical/snapbuild.c | 15 +- src/include/replication/reorderbuffer.h | 15 +- 8 files changed, 304 insertions(+), 67 deletions(-) create mode 100644 contrib/test_decoding/expected/oldest_xmin.out create mode 100644 contrib/test_decoding/expected/subxact_snap_pass.out create mode 100644 contrib/test_decoding/specs/oldest_xmin.spec create mode 100644 contrib/test_decoding/specs/subxact_snap_pass.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 6c18189d9d..c696288d67 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(pg_regress_installcheck) \ $(REGRESSCHECKS) -ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml +ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ + oldest_xmin subxact_snap_pass isolationcheck: | submake-isolation submake-test_decoding temp-install $(pg_isolation_regress_check) \ diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out new file mode 100644 index 0000000000..d09342c4be --- /dev/null +++ b/contrib/test_decoding/expected/oldest_xmin.out @@ -0,0 +1,27 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_get_changes s1_commit s0_vacuums0_get_changes +step s0_begin: BEGIN; +step s0_getxid: SELECT txid_current() IS NULL; +?column? + +f +step s1_begin: BEGIN; +step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3)); +step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos; +step s0_commit: COMMIT; +step s0_checkpoint: CHECKPOINT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +step s1_commit: COMMIT; +step s0_vacuum: VACUUM FULL; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +BEGIN +table public.harvest: INSERT: fruits[basket]:'(1,2,3)' +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/expected/subxact_snap_pass.out b/contrib/test_decoding/expected/subxact_snap_pass.out new file mode 100644 index 0000000000..87bed03f76 --- /dev/null +++ b/contrib/test_decoding/expected/subxact_snap_pass.out @@ -0,0 +1,49 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0s0_commit s0_get_changes +step s0_begin: BEGIN; +step s0_begin_sub0: SAVEPOINT s0; +step s0_log_assignment: SELECT txid_current() IS NULL; +?column? + +f +step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0); +step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int; +step s0_insert: INSERT INTO harvest VALUES (1, 2, 3); +step s0_end_sub0: RELEASE SAVEPOINT s0; +step s0_commit: COMMIT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +BEGIN +table public.dummy: INSERT: i[integer]:0 +table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3 +COMMIT +?column? + +stop + +starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 s0_sub_get_base_snap s1_produce_new_snap s0_inserts0_end_sub1 s0_end_sub0 s0_commit s0_get_changes +step s0_begin: BEGIN; +step s0_begin_sub0: SAVEPOINT s0; +step s0_log_assignment: SELECT txid_current() IS NULL; +?column? + +f +step s0_begin_sub1: SAVEPOINT s1; +step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0); +step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int; +step s0_insert: INSERT INTO harvest VALUES (1, 2, 3); +step s0_end_sub1: RELEASE SAVEPOINT s1; +step s0_end_sub0: RELEASE SAVEPOINT s0; +step s0_commit: COMMIT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +BEGIN +table public.dummy: INSERT: i[integer]:0 +table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec new file mode 100644 index 0000000000..4f8af70aa2 --- /dev/null +++ b/contrib/test_decoding/specs/oldest_xmin.spec @@ -0,0 +1,37 @@ +# Test advancement of the slot's oldest xmin + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write inxact + DROP TYPE IF EXISTS basket; + CREATE TYPE basket AS (apples integer, pears integer, mangos integer); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(fruits basket); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TYPE IF EXISTS basket; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +step "s0_begin" { BEGIN; } +step "s0_getxid" { SELECT txid_current() IS NULL; } +step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; } +step "s0_commit" { COMMIT; } +step "s0_checkpoint" { CHECKPOINT; } +step "s0_vacuum" { VACUUM FULL; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0','skip-empty-xacts', '1'); } + +session "s1" +step "s1_begin" { BEGIN; } +step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); } +step "s1_commit" { COMMIT; } + +# Checkpoint with following get_changes forces to advance xmin. ALTER of a +# composite type is a rare form of DDL which allows T1 to see the tuple which +# will be removed (xmax set) before T1 commits. That is, interlocking doesn't +# forbid modifying catalog after someone read it (and didn't commit yet). +permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit""s0_vacuum" "s0_get_changes" diff --git a/contrib/test_decoding/specs/subxact_snap_pass.spec b/contrib/test_decoding/specs/subxact_snap_pass.spec new file mode 100644 index 0000000000..d2d88176ac --- /dev/null +++ b/contrib/test_decoding/specs/subxact_snap_pass.spec @@ -0,0 +1,42 @@ +# Test passing snapshot from subxact to top-level and receival of later snaps. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write inxact + DROP TABLE IF EXISTS dummy; + CREATE TABLE dummy(i int); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(apples int, pears int); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TABLE IF EXISTS dummy; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +step "s0_begin" { BEGIN; } +step "s0_begin_sub0" { SAVEPOINT s0; } +step "s0_log_assignment" { SELECT txid_current() IS NULL; } +step "s0_begin_sub1" { SAVEPOINT s1; } +step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); } +step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); } +step "s0_end_sub0" { RELEASE SAVEPOINT s0; } +step "s0_end_sub1" { RELEASE SAVEPOINT s1; } +step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); } +step "s0_commit" { COMMIT; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0','skip-empty-xacts', '1'); } + +session "s1" +step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; } + +# start top-level without base snap, get base snap in subxact, then create new +# snap and make sure it is queued. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0""s0_commit" "s0_get_changes" + +# In previous test, we firstly associated subxact with xact and only then got +# base snap; now nest one more subxact to get snap first and only then (at +# commit) associate it with toplevel. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap""s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b4016ed52b..1f99697dbb 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top); +static void ReorderBufferPassBaseSnapshot(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn); static void AssertTXNLsnOrder(ReorderBuffer *rb); @@ -272,6 +274,8 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); + dlist_init(&buffer->by_base_snapshot_lsn); + /* * Ensure there's no stale data from prior uses of this slot, in case some * prior exit avoided calling ReorderBufferFree. Failure to do this can @@ -470,7 +474,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool found; Assert(TransactionIdIsValid(xid)); - Assert(!create || lsn != InvalidXLogRecPtr); /* * Check the one-entry lookup cache first @@ -514,6 +517,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, { /* initialize the new entry, if creation was requested */ Assert(ent != NULL); + Assert(lsn != InvalidXLogRecPtr); ent->txn = ReorderBufferGetTXN(rb); ent->txn->xid = xid; @@ -659,6 +663,27 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) return txn; } +/* + * Returns oldest possibly running xid from POV of snapshots used in xacts + * ReorderBuffer currently keeps, or InvalidTransactionId if there are none. + * Since snapshots are assigned monotonically, it is enough to check xmin of + * base snapshot with minimal base_snapshot_lsn. + */ +TransactionId +ReorderBufferGetOldestXmin(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + if (dlist_is_empty(&rb->by_base_snapshot_lsn)) + return InvalidTransactionId; + + txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node, + &rb->by_base_snapshot_lsn); + + Assert(txn->base_snapshot != NULL); + return txn->base_snapshot->xmin; +} + void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) { @@ -677,32 +702,71 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); - if (new_sub) + if (new_top && !new_sub) + elog(ERROR, "subxact logged without previous toplevel record"); + + if (!new_sub) { - /* - * we assign subtransactions to top level transaction even if we don't - * have data for it yet, assignment records frequently reference xids - * that have not yet produced any records. Knowing those aren't top - * level xids allows us to make processing cheaper in some places. - */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; + if (subtxn->is_known_as_subxact) + /* already associated, nothing to do */ + return; + else + { + /* + * We have seen this subxact previously, but created it as a top. + * Now remove it from lsn order list of top-level transactions. + */ + dlist_delete(&subtxn->node); + } } - else if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); + subtxn->is_known_as_subxact = true; + subtxn->toplevel_xid = xid; + Assert(subtxn->nsubtxns == 0); - /* add to toplevel transaction */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } - else if (new_top) + /* add to subtransaction list */ + dlist_push_tail(&txn->subtxns, &subtxn->node); + txn->nsubtxns++; + + /* + * If subxact already got base snapshot, pass it to toplevel. + * This allows to queue further base snapshots only to toplevel. + */ + ReorderBufferPassBaseSnapshot(txn, subtxn); +} + +/* + * Pass base snapshot of subtxn to the toplevel txn if it doesn't have one, or + * subtxn's is earlier. That can happen if there are no changes in the + * toplevel transaction but in one of the child transactions (or first change + * in subxact has earlier lsn than first change in xact and we learned about + * xacts relationship only now). This is required for correct decoding as we + * replay all subxacts in one shot with toplevel. We do that as soon as we + * become aware of the kinship to avoid queueing extra snapshots to known + * subxacts -- only toplevel will receive further snaps. + */ +static void +ReorderBufferPassBaseSnapshot(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn) +{ + if (subtxn->base_snapshot != NULL && + (txn->base_snapshot == NULL || + txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) { - elog(ERROR, "existing subxact assigned to unknown toplevel xact"); + /* toplevel has base snapshot, but it's too fresh; purge it */ + if (txn->base_snapshot != NULL) + { + SnapBuildSnapDecRefcount(txn->base_snapshot); + dlist_delete(&txn->base_snapshot_node); + } + + txn->base_snapshot = subtxn->base_snapshot; + txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + /* We must preserve the correct place in by_base_snapshot_lsn list */ + dlist_insert_before(&subtxn->base_snapshot_node, &txn->base_snapshot_node); + /* Now remove subtxn from it */ + dlist_delete(&subtxn->base_snapshot_node); } } @@ -715,7 +779,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn) { - ReorderBufferTXN *txn; ReorderBufferTXN *subtxn; subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, @@ -727,42 +790,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, if (!subtxn) return; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true); - - if (txn == NULL) - elog(ERROR, "subxact logged without previous toplevel record"); - - /* - * Pass our base snapshot to the parent transaction if it doesn't have - * one, or ours is older. That can happen if there are no changes in the - * toplevel transaction but in one of the child transactions. This allows - * the parent to simply use its base snapshot initially. - */ - if (subtxn->base_snapshot != NULL && - (txn->base_snapshot == NULL || - txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) - { - txn->base_snapshot = subtxn->base_snapshot; - txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; - subtxn->base_snapshot = NULL; - subtxn->base_snapshot_lsn = InvalidXLogRecPtr; - } - subtxn->final_lsn = commit_lsn; subtxn->end_lsn = end_lsn; - if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); - - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); - - /* add to subtransaction list */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } + /* + * We already checked subtxn existence and no ReorderBufferTXNs will be + * created in this call, so lsn doesn't matter at all. + */ + ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr); } @@ -1091,6 +1126,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) SnapBuildSnapDecRefcount(txn->base_snapshot); txn->base_snapshot = NULL; txn->base_snapshot_lsn = InvalidXLogRecPtr; + dlist_delete(&txn->base_snapshot_node); } /* @@ -1840,6 +1876,9 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, * Setup the base snapshot of a transaction. The base snapshot is the snapshot * that is used to decode all changes until either this transaction modifies * the catalog or another catalog modifying transaction commits. + * If we know that xid is subxact, we just make sure that toplevel has + * the base snapshot. If toplevel already has some base snapshot, it + * definitely has also passed snap as its base or queued in its change queue. * * Needs to be called before any changes are added with * ReorderBufferQueueChange(). @@ -1855,8 +1894,18 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, Assert(txn->base_snapshot == NULL); Assert(snap != NULL); + if (txn->is_known_as_subxact) + { + /* Will operate on toplevel */ + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + Assert(txn != NULL); + Assert(txn->base_snapshot == NULL); + } + txn->base_snapshot = snap; txn->base_snapshot_lsn = lsn; + dlist_push_tail(&rb->by_base_snapshot_lsn, &txn->base_snapshot_node); } /* @@ -1976,6 +2025,9 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) /* * Have we already added the first snapshot? + * If xid is known subxact, we check toplevel, as known subxacts never keep + * base snapshot themselves. This allows to queue new snapshots only to the + * toplevel instead of duplicating them in change queues of subxacts. */ bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) @@ -1989,12 +2041,19 @@ ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) if (txn == NULL) return false; - /* - * TODO: It would be a nice improvement if we would check the toplevel - * transaction in subtransactions, but we'd need to keep track of a bit - * more state. - */ - return txn->base_snapshot != NULL; + if (txn->is_known_as_subxact) + { + ReorderBufferTXN *toplevel_txn; + + Assert(txn->base_snapshot == NULL); + toplevel_txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + Assert(toplevel_txn != NULL); + return toplevel_txn->base_snapshot != NULL; + + } + else + return txn->base_snapshot != NULL; } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 4123cdebcf..7f344bdced 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -830,7 +830,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * all. We'll add a snapshot when the first change gets queued. * * NB: This works correctly even for subtransactions because - * ReorderBufferCommitChild() takes care to pass the parent the base + * ReorderBufferAssignChild() takes care to pass the parent the base * snapshot, and while iterating the changequeue we'll get the change * from the subtxn. */ @@ -1094,6 +1094,7 @@ void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { ReorderBufferTXN *txn; + TransactionId oldest_xmin; /* * If we're not consistent yet, inspect the record to see whether it @@ -1132,9 +1133,17 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact /* * Increase shared memory limits, so vacuum can work on tuples we - * prevented from being pruned till now. + * prevented from being pruned till now. We ask reorderbuffer which + * minimal xid might still be running in snapshots of xacts currently + * being reordered. If rb doesn't have any xacts with snapshots, we need + * to care only about xids which will be considered as running by + * snapshots we will produce later, and min of them can't be less than + * oldest running xid in the record we are reading. */ - LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid); + oldest_xmin = ReorderBufferGetOldestXmin(builder->reorder); + if (oldest_xmin == InvalidTransactionId) + oldest_xmin = running->oldestRunningXid; + LogicalIncreaseXminForSlot(lsn, oldest_xmin); /* * Also tell the slot where we can restart decoding from. We don't want to diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index aa430c843c..47178adf68 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -148,9 +148,10 @@ typedef struct ReorderBufferTXN bool has_catalog_changes; /* - * Do we know this is a subxact? + * Do we know this is a subxact? Xid of top-level, if yes. */ bool is_known_as_subxact; + TransactionId toplevel_xid; /* * LSN of the first data carrying, WAL record with knowledge about this @@ -200,6 +201,7 @@ typedef struct ReorderBufferTXN */ Snapshot base_snapshot; XLogRecPtr base_snapshot_lsn; + dlist_node base_snapshot_node; /* position in by_base_snapshot_lsn list */ /* * How many ReorderBufferChange's do we have in this txn. @@ -317,6 +319,16 @@ struct ReorderBuffer dlist_head toplevel_by_lsn; /* + * Transactions (or subxacts) that have base snapshot, ordered by LSN of + * the first record which forced us to set the base snapshot. Used for + * advancing slot's xmin: xmin of the first snapshot defines which xid we + * still need to consider as running. This is not the same as + * toplevel_by_lsn: we set snapshot only on data-carrying record, e.g. + * heap insert/update/delete, not just the first. + */ + dlist_head by_base_snapshot_lsn; + + /* * one-entry sized cache for by_txn. Very frequently the same txn gets * looked up over and over again. */ @@ -400,6 +412,7 @@ bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); +TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); -- 2.11.0
(delicate ping)
On Sun, Apr 08, 2018 at 08:46:04AM +0300, Arseny Sher wrote: > I've discovered a couple of bugs in logical decoding code, both leading > to incorrect decoding results in somewhat rare cases. First, xmin of > slots is advanced too early. This affects the results only when > interlocking allows to perform DDL concurrently with looking at the > schema. In fact, I was not aware about such DDL until at > https://www.postgresql.org/message-id/flat/87tvu0p0jm.fsf%40ars-thinkpad#87tvu0p0jm.fsf@ars-thinkpad > I raised this question and Andres pointed out ALTER of composite > types. Probably there are others, I am not sure; it would be interesting > to know them. > > Another problem is that new snapshots are never queued to known > subxacts. It means decoding results can be wrong if toplevel doesn't > write anything while subxact does. Most people are recovering from the last commit fest, where many patches have been discussed and dealt with. I have not looked in details at your patch so I cannot say is legit or not, but for now I would recommend to register this patch to the next commit fest under the category "Bug Fixes" so as it does not fall into the cracks: https://commitfest.postgresql.org/18/ I have added an entry to the open items in the section for older bugs: https://wiki.postgresql.org/wiki/PostgreSQL_11_Open_Items#Older_Bugs However this list tends to be... Er... Ignored. > Please see detailed description of the issues, tests which reproduce > them and fixes in the attached patch. Those are always good things to have in a patch. -- Michael
Attachment
Michael Paquier <michael@paquier.xyz> writes: > but for now I would recommend to register this patch to the next > commit fest under the category "Bug Fixes" so as it does not fall into > the cracks: https://commitfest.postgresql.org/18/ > > I have added an entry to the open items in the section for older bugs: > https://wiki.postgresql.org/wiki/PostgreSQL_11_Open_Items#Older_Bugs > However this list tends to be... Er... Ignored. > Thank you, Michael. I have created the commitfest entry. -- Arseny Sher Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
Arseny Sher <a.sher@postgrespro.ru> wrote: > Please see detailed description of the issues, tests which reproduce > them and fixes in the attached patch. I've played with your tests and gdb for a while, both w/o and with your patch. I think I can understand both problems. I could not invent simpler way to fix them. One comment about the coding: since you introduce a new transaction list and it's sorted by LSN, I think you should make the function AssertTXNLsnOrder() more generic and use it to check the by_base_snapshot_lsn list too. For example call it after the list insertion and deletion in ReorderBufferPassBaseSnapshot(). Also I think it makes no harm if you split the diff into two, although both fixes use the by_base_snapshot_lsn field. -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26, A-2700 Wiener Neustadt Web: https://www.cybertec-postgresql.com
On 2018-Jun-11, Antonin Houska wrote: > Arseny Sher <a.sher@postgrespro.ru> wrote: > > Please see detailed description of the issues, tests which reproduce > > them and fixes in the attached patch. > > I've played with your tests and gdb for a while, both w/o and with your > patch. I think I can understand both problems. I could not invent simpler way > to fix them. > > One comment about the coding: since you introduce a new transaction list and > it's sorted by LSN, I think you should make the function AssertTXNLsnOrder() > more generic and use it to check the by_base_snapshot_lsn list too. For > example call it after the list insertion and deletion in > ReorderBufferPassBaseSnapshot(). I've been going over this patch also, and I've made a few minor edits of the patch and the existing code as I come to understand what it's all about. > Also I think it makes no harm if you split the diff into two, although both > fixes use the by_base_snapshot_lsn field. Please don't. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Hello, Thank you for your time. Alvaro Herrera <alvherre@2ndquadrant.com> writes: > On 2018-Jun-11, Antonin Houska wrote: > >> >> One comment about the coding: since you introduce a new transaction list and >> it's sorted by LSN, I think you should make the function AssertTXNLsnOrder() >> more generic and use it to check the by_base_snapshot_lsn list too. For >> example call it after the list insertion and deletion in >> ReorderBufferPassBaseSnapshot(). Ok, probably this makes sense. Updated patch is attached. >> Also I think it makes no harm if you split the diff into two, although both >> fixes use the by_base_snapshot_lsn field. > > Please don't. Yeah, I don't think we should bother with that. -- Arseny Sher Postgres Professional: http://www.postgrespro.com The Russian Postgres Company From 61b4b0a89c95f8912729c54c013b64033f88fccf Mon Sep 17 00:00:00 2001 From: Arseny Sher <sher-ars@yandex.ru> Date: Thu, 21 Jun 2018 10:29:07 +0300 Subject: [PATCH] Fix slot's xmin advancement and subxact's lost snapshots in decoding There are two in some way related bugs here. First, xmin of logical slots was advanced too early. During xl_running_xacts processing, xmin of slot was set to oldest running xid in the record, which is wrong: actually snapshots which will be used for not-yet-replayed at this moment xacts might consider older xacts as running too. The problem wasn't noticed earlier because DDL which allows to delete tuple (set xmax) while some another not yet committed xact looks at it is pretty rare, if not unique: e.g. all forms of ALTER TABLE which change schema acquire ACCESS EXCLUSIVE lock conflicting with any inserts. Included test case uses ALTER of a composite type, which doesn't have such interlocking. To deal with this, we must be able to quickly retrieve oldest xmin (oldest running xid among all assigned snapshots) from ReorderBuffer. Proposed fix adds yet another list of ReorderBufferTXNs to it, where xacts are sorted by LSN of the base snapshot, because * Plain usage of current ReorderBufferGetOldestTXN (to get base snapshot's xmin) is wrong, because order by LSN of the first record is *not* the same as order by LSN of record on which base snapshot was assigned: many record types don't need the snapshot, e.g. xl_xact_assignment. We could remember snapbuilder's xmin during processing of the first record, but that's too conservative and we would keep more tuples from vacuuming than needed. * On the other hand, we can't fully replace existing order by LSN of the first change with LSN of the base snapshot, because if we do that, WAL records which didn't forced receival of base snap might be recycled before we replay the xact, as ReorderBufferGetOldestTXN determines which LSN still must be kept. Second issue concerns subxacts. Currently SnapBuildDistributeNewCatalogSnapshot never spares a snapshot to known subxact. It means that if toplevel doesn't have base snapshot (never did writes), there was an assignment record (subxact is known) and subxact is writing, no new snapshots are being queued. Probably the simplest fix would be to employ freshly baked by_base_snapshot_lsn list and just distribute snapshot to all xacts with base snapshot whether they are subxacts or not. However, in this case we would queue unneccessary snapshot to all already known subxacts. To avoid this, in this patch base snapshot of known subxact is immediately passed to the toplevel as soon as we learn the kinship / base snap assigned -- we have to do that anyway in ReorderBufferCommitChild; new snaps are distributed only to top-level xacts (or unknown subxacts) as before. Also, minor memory leak is fixed: counter of toplevel's old base snapshot was not decremented when snap has been passed from child. --- contrib/test_decoding/Makefile | 3 +- contrib/test_decoding/expected/oldest_xmin.out | 27 +++ .../test_decoding/expected/subxact_snap_pass.out | 49 +++++ contrib/test_decoding/specs/oldest_xmin.spec | 37 ++++ contrib/test_decoding/specs/subxact_snap_pass.spec | 42 +++++ src/backend/replication/logical/reorderbuffer.c | 208 +++++++++++++++------ src/backend/replication/logical/snapbuild.c | 15 +- src/include/replication/reorderbuffer.h | 15 +- 8 files changed, 330 insertions(+), 66 deletions(-) create mode 100644 contrib/test_decoding/expected/oldest_xmin.out create mode 100644 contrib/test_decoding/expected/subxact_snap_pass.out create mode 100644 contrib/test_decoding/specs/oldest_xmin.spec create mode 100644 contrib/test_decoding/specs/subxact_snap_pass.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 1d601d8144..05d7766cd5 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(pg_regress_installcheck) \ $(REGRESSCHECKS) -ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml +ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ + oldest_xmin subxact_snap_pass isolationcheck: | submake-isolation submake-test_decoding temp-install $(pg_isolation_regress_check) \ diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out new file mode 100644 index 0000000000..d09342c4be --- /dev/null +++ b/contrib/test_decoding/expected/oldest_xmin.out @@ -0,0 +1,27 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_get_changes s1_commit s0_vacuums0_get_changes +step s0_begin: BEGIN; +step s0_getxid: SELECT txid_current() IS NULL; +?column? + +f +step s1_begin: BEGIN; +step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3)); +step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos; +step s0_commit: COMMIT; +step s0_checkpoint: CHECKPOINT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +step s1_commit: COMMIT; +step s0_vacuum: VACUUM FULL; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +BEGIN +table public.harvest: INSERT: fruits[basket]:'(1,2,3)' +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/expected/subxact_snap_pass.out b/contrib/test_decoding/expected/subxact_snap_pass.out new file mode 100644 index 0000000000..87bed03f76 --- /dev/null +++ b/contrib/test_decoding/expected/subxact_snap_pass.out @@ -0,0 +1,49 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0s0_commit s0_get_changes +step s0_begin: BEGIN; +step s0_begin_sub0: SAVEPOINT s0; +step s0_log_assignment: SELECT txid_current() IS NULL; +?column? + +f +step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0); +step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int; +step s0_insert: INSERT INTO harvest VALUES (1, 2, 3); +step s0_end_sub0: RELEASE SAVEPOINT s0; +step s0_commit: COMMIT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +BEGIN +table public.dummy: INSERT: i[integer]:0 +table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3 +COMMIT +?column? + +stop + +starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 s0_sub_get_base_snap s1_produce_new_snap s0_inserts0_end_sub1 s0_end_sub0 s0_commit s0_get_changes +step s0_begin: BEGIN; +step s0_begin_sub0: SAVEPOINT s0; +step s0_log_assignment: SELECT txid_current() IS NULL; +?column? + +f +step s0_begin_sub1: SAVEPOINT s1; +step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0); +step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int; +step s0_insert: INSERT INTO harvest VALUES (1, 2, 3); +step s0_end_sub1: RELEASE SAVEPOINT s1; +step s0_end_sub0: RELEASE SAVEPOINT s0; +step s0_commit: COMMIT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1'); +data + +BEGIN +table public.dummy: INSERT: i[integer]:0 +table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec new file mode 100644 index 0000000000..4f8af70aa2 --- /dev/null +++ b/contrib/test_decoding/specs/oldest_xmin.spec @@ -0,0 +1,37 @@ +# Test advancement of the slot's oldest xmin + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write inxact + DROP TYPE IF EXISTS basket; + CREATE TYPE basket AS (apples integer, pears integer, mangos integer); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(fruits basket); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TYPE IF EXISTS basket; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +step "s0_begin" { BEGIN; } +step "s0_getxid" { SELECT txid_current() IS NULL; } +step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; } +step "s0_commit" { COMMIT; } +step "s0_checkpoint" { CHECKPOINT; } +step "s0_vacuum" { VACUUM FULL; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0','skip-empty-xacts', '1'); } + +session "s1" +step "s1_begin" { BEGIN; } +step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); } +step "s1_commit" { COMMIT; } + +# Checkpoint with following get_changes forces to advance xmin. ALTER of a +# composite type is a rare form of DDL which allows T1 to see the tuple which +# will be removed (xmax set) before T1 commits. That is, interlocking doesn't +# forbid modifying catalog after someone read it (and didn't commit yet). +permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit""s0_vacuum" "s0_get_changes" diff --git a/contrib/test_decoding/specs/subxact_snap_pass.spec b/contrib/test_decoding/specs/subxact_snap_pass.spec new file mode 100644 index 0000000000..d2d88176ac --- /dev/null +++ b/contrib/test_decoding/specs/subxact_snap_pass.spec @@ -0,0 +1,42 @@ +# Test passing snapshot from subxact to top-level and receival of later snaps. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write inxact + DROP TABLE IF EXISTS dummy; + CREATE TABLE dummy(i int); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(apples int, pears int); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TABLE IF EXISTS dummy; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +step "s0_begin" { BEGIN; } +step "s0_begin_sub0" { SAVEPOINT s0; } +step "s0_log_assignment" { SELECT txid_current() IS NULL; } +step "s0_begin_sub1" { SAVEPOINT s1; } +step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); } +step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); } +step "s0_end_sub0" { RELEASE SAVEPOINT s0; } +step "s0_end_sub1" { RELEASE SAVEPOINT s1; } +step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); } +step "s0_commit" { COMMIT; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0','skip-empty-xacts', '1'); } + +session "s1" +step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; } + +# start top-level without base snap, get base snap in subxact, then create new +# snap and make sure it is queued. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0""s0_commit" "s0_get_changes" + +# In previous test, we firstly associated subxact with xact and only then got +# base snap; now nest one more subxact to get snap first and only then (at +# commit) associate it with toplevel. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap""s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e2f59bf580..7293286161 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top); +static void ReorderBufferPassBaseSnapshot(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn); static void AssertTXNLsnOrder(ReorderBuffer *rb); @@ -272,6 +274,8 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); + dlist_init(&buffer->by_base_snapshot_lsn); + /* * Ensure there's no stale data from prior uses of this slot, in case some * prior exit avoided calling ReorderBufferFree. Failure to do this can @@ -471,7 +475,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool found; Assert(TransactionIdIsValid(xid)); - Assert(!create || lsn != InvalidXLogRecPtr); /* * Check the one-entry lookup cache first @@ -515,6 +518,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, { /* initialize the new entry, if creation was requested */ Assert(ent != NULL); + Assert(lsn != InvalidXLogRecPtr); ent->txn = ReorderBufferGetTXN(rb); ent->txn->xid = xid; @@ -623,6 +627,7 @@ AssertTXNLsnOrder(ReorderBuffer *rb) #ifdef USE_ASSERT_CHECKING dlist_iter iter; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; + XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; dlist_foreach(iter, &rb->toplevel_by_lsn) { @@ -640,6 +645,20 @@ AssertTXNLsnOrder(ReorderBuffer *rb) Assert(!cur_txn->is_known_as_subxact); prev_first_lsn = cur_txn->first_lsn; } + + dlist_foreach(iter, &rb->by_base_snapshot_lsn) + { + ReorderBufferTXN *cur_txn; + + cur_txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur); + Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); + + if (prev_base_snap_lsn != InvalidXLogRecPtr) + Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); + + Assert(!cur_txn->is_known_as_subxact); + prev_base_snap_lsn = cur_txn->base_snapshot_lsn; + } #endif } @@ -660,6 +679,29 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) return txn; } +/* + * Returns oldest possibly running xid from POV of snapshots used in xacts + * ReorderBuffer currently keeps, or InvalidTransactionId if there are none. + * Since snapshots are assigned monotonically, it is enough to check xmin of + * base snapshot with minimal base_snapshot_lsn. + */ +TransactionId +ReorderBufferGetOldestXmin(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + if (dlist_is_empty(&rb->by_base_snapshot_lsn)) + return InvalidTransactionId; + + AssertTXNLsnOrder(rb); + + txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node, + &rb->by_base_snapshot_lsn); + + Assert(txn->base_snapshot != NULL); + return txn->base_snapshot->xmin; +} + void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) { @@ -678,32 +720,80 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); - if (new_sub) + if (new_top && !new_sub) + elog(ERROR, "subxact logged without previous toplevel record"); + + if (!new_sub) { - /* - * we assign subtransactions to top level transaction even if we don't - * have data for it yet, assignment records frequently reference xids - * that have not yet produced any records. Knowing those aren't top - * level xids allows us to make processing cheaper in some places. - */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; + if (subtxn->is_known_as_subxact) + /* already associated, nothing to do */ + return; + else + { + /* + * We have seen this subxact previously, but created it as a top. + * Now remove it from lsn order list of top-level transactions. + */ + dlist_delete(&subtxn->node); + } } - else if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); + subtxn->is_known_as_subxact = true; + subtxn->toplevel_xid = xid; + Assert(subtxn->nsubtxns == 0); + + /* add to subtransaction list */ + dlist_push_tail(&txn->subtxns, &subtxn->node); + txn->nsubtxns++; + + /* + * If subxact already got base snapshot, pass it to toplevel. + * This allows to queue further base snapshots only to toplevel. + */ + ReorderBufferPassBaseSnapshot(txn, subtxn); + AssertTXNLsnOrder(rb); +} + +/* + * Pass base snapshot of subtxn to the toplevel txn if it doesn't have one, or + * subtxn's is earlier. That can happen if there are no changes in the + * toplevel transaction but in one of the child transactions (or first change + * in subxact has earlier lsn than first change in xact and we learned about + * xacts relationship only now). This is required for correct decoding as we + * replay all subxacts in one shot with toplevel. We do that as soon as we + * become aware of the kinship to avoid queueing extra snapshots to known + * subxacts -- only toplevel will receive further snaps. + */ +static void +ReorderBufferPassBaseSnapshot(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn) +{ + if (subtxn->base_snapshot != NULL && + (txn->base_snapshot == NULL || + txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) + { + /* toplevel has base snapshot, but it's too fresh; purge it */ + if (txn->base_snapshot != NULL) + { + SnapBuildSnapDecRefcount(txn->base_snapshot); + dlist_delete(&txn->base_snapshot_node); + } - /* add to toplevel transaction */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; + txn->base_snapshot = subtxn->base_snapshot; + txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + /* We must preserve the correct place in by_base_snapshot_lsn list */ + dlist_insert_before(&subtxn->base_snapshot_node, &txn->base_snapshot_node); + /* Now remove subtxn from it */ + dlist_delete(&subtxn->base_snapshot_node); } - else if (new_top) + else if (subtxn->base_snapshot != NULL) { - elog(ERROR, "existing subxact assigned to unknown toplevel xact"); + /* Base snap of toplevel is fine, so subxact's one is not needed */ + SnapBuildSnapDecRefcount(subtxn->base_snapshot); + dlist_delete(&subtxn->base_snapshot_node); + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; } } @@ -716,7 +806,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn) { - ReorderBufferTXN *txn; ReorderBufferTXN *subtxn; subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, @@ -728,42 +817,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, if (!subtxn) return; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true); - - if (txn == NULL) - elog(ERROR, "subxact logged without previous toplevel record"); - - /* - * Pass our base snapshot to the parent transaction if it doesn't have - * one, or ours is older. That can happen if there are no changes in the - * toplevel transaction but in one of the child transactions. This allows - * the parent to simply use its base snapshot initially. - */ - if (subtxn->base_snapshot != NULL && - (txn->base_snapshot == NULL || - txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) - { - txn->base_snapshot = subtxn->base_snapshot; - txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; - subtxn->base_snapshot = NULL; - subtxn->base_snapshot_lsn = InvalidXLogRecPtr; - } - subtxn->final_lsn = commit_lsn; subtxn->end_lsn = end_lsn; - if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); - - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); - - /* add to subtransaction list */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } + /* + * We already checked subtxn existence and no ReorderBufferTXNs will be + * created in this call, so lsn doesn't matter at all. + */ + ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr); } @@ -1092,6 +1153,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) SnapBuildSnapDecRefcount(txn->base_snapshot); txn->base_snapshot = NULL; txn->base_snapshot_lsn = InvalidXLogRecPtr; + dlist_delete(&txn->base_snapshot_node); } /* @@ -1873,6 +1935,9 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, * Setup the base snapshot of a transaction. The base snapshot is the snapshot * that is used to decode all changes until either this transaction modifies * the catalog or another catalog modifying transaction commits. + * If we know that xid is subxact, we just make sure that toplevel has + * the base snapshot. If toplevel already has some base snapshot, it + * definitely has also passed snap as its base or queued in its change queue. * * Needs to be called before any changes are added with * ReorderBufferQueueChange(). @@ -1888,8 +1953,19 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, Assert(txn->base_snapshot == NULL); Assert(snap != NULL); + if (txn->is_known_as_subxact) + { + /* Will operate on toplevel */ + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + Assert(txn != NULL); + Assert(txn->base_snapshot == NULL); + } + txn->base_snapshot = snap; txn->base_snapshot_lsn = lsn; + dlist_push_tail(&rb->by_base_snapshot_lsn, &txn->base_snapshot_node); + AssertTXNLsnOrder(rb); } /* @@ -2009,6 +2085,9 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) /* * Have we already added the first snapshot? + * If xid is known subxact, we check toplevel, as known subxacts never keep + * base snapshot themselves. This allows to queue new snapshots only to the + * toplevel instead of duplicating them in change queues of subxacts. */ bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) @@ -2022,12 +2101,19 @@ ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) if (txn == NULL) return false; - /* - * TODO: It would be a nice improvement if we would check the toplevel - * transaction in subtransactions, but we'd need to keep track of a bit - * more state. - */ - return txn->base_snapshot != NULL; + if (txn->is_known_as_subxact) + { + ReorderBufferTXN *toplevel_txn; + + Assert(txn->base_snapshot == NULL); + toplevel_txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + Assert(toplevel_txn != NULL); + return toplevel_txn->base_snapshot != NULL; + + } + else + return txn->base_snapshot != NULL; } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 4123cdebcf..7f344bdced 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -830,7 +830,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * all. We'll add a snapshot when the first change gets queued. * * NB: This works correctly even for subtransactions because - * ReorderBufferCommitChild() takes care to pass the parent the base + * ReorderBufferAssignChild() takes care to pass the parent the base * snapshot, and while iterating the changequeue we'll get the change * from the subtxn. */ @@ -1094,6 +1094,7 @@ void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { ReorderBufferTXN *txn; + TransactionId oldest_xmin; /* * If we're not consistent yet, inspect the record to see whether it @@ -1132,9 +1133,17 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact /* * Increase shared memory limits, so vacuum can work on tuples we - * prevented from being pruned till now. + * prevented from being pruned till now. We ask reorderbuffer which + * minimal xid might still be running in snapshots of xacts currently + * being reordered. If rb doesn't have any xacts with snapshots, we need + * to care only about xids which will be considered as running by + * snapshots we will produce later, and min of them can't be less than + * oldest running xid in the record we are reading. */ - LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid); + oldest_xmin = ReorderBufferGetOldestXmin(builder->reorder); + if (oldest_xmin == InvalidTransactionId) + oldest_xmin = running->oldestRunningXid; + LogicalIncreaseXminForSlot(lsn, oldest_xmin); /* * Also tell the slot where we can restart decoding from. We don't want to diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1c7982958e..92c767181c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -161,9 +161,10 @@ typedef struct ReorderBufferTXN bool has_catalog_changes; /* - * Do we know this is a subxact? + * Do we know this is a subxact? Xid of top-level, if yes. */ bool is_known_as_subxact; + TransactionId toplevel_xid; /* * LSN of the first data carrying, WAL record with knowledge about this @@ -213,6 +214,7 @@ typedef struct ReorderBufferTXN */ Snapshot base_snapshot; XLogRecPtr base_snapshot_lsn; + dlist_node base_snapshot_node; /* position in by_base_snapshot_lsn list */ /* * How many ReorderBufferChange's do we have in this txn. @@ -338,6 +340,16 @@ struct ReorderBuffer dlist_head toplevel_by_lsn; /* + * Transactions (or subxacts) that have base snapshot, ordered by LSN of + * the first record which forced us to set the base snapshot. Used for + * advancing slot's xmin: xmin of the first snapshot defines which xid we + * still need to consider as running. This is not the same as + * toplevel_by_lsn: we set snapshot only on data-carrying record, e.g. + * heap insert/update/delete, not just the first. + */ + dlist_head by_base_snapshot_lsn; + + /* * one-entry sized cache for by_txn. Very frequently the same txn gets * looked up over and over again. */ @@ -422,6 +434,7 @@ bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); +TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); -- 2.11.0
Hello Firstly -- this is top-notch detective work, kudos and thanks for the patch and test cases. (I verified that both fail before the code fix.) Here's a v3. I applied a lot of makeup in order to try to understand what's going on. I *think* I have a grasp on the original code and your bugfix, not terribly firm I admit. Some comments * you don't need to Assert that things are not NULL if you're immediately going to dereference them. The assert is there to make the code crash in case it's a NULL pointer, but the subsequent dereference is going to have the same effect, so the assert is redundant. * I think setting snapshot_base to NULL in ReorderBufferCleanupTXN is pointless, since the struct is gonna be freed shortly afterwards. * I rewrote many comments (both existing and some of the ones your patch adds), and added lots of comments where there were none. * I'm a bit unsure about the comment atop ReorderBufferSetBaseSnapshot. Obviously, the bit within the #if 0/#endif I'm going to remove before push. I don't understand why it says "Needs to be called before any changes are added with ReorderBufferQueueChange"; but if you edit that function and add an assert that the base snapshot is set, it crashes pretty quickly in the test_decoding tests. (The supposedly bogus comment was there before your patch -- I'm not saying your comment addition is faulty.) * I also noticed that we're doing subtxn cleanup one by one in both ReorderBufferAssignChild and ReorderBufferCommitChild, which means the top-level txn is sought in the hash table over and over, which seems a bit silly. Not this patch's problem to fix ... -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachment
Alvaro Herrera <alvherre@2ndquadrant.com> writes: > Firstly -- this is top-notch detective work, kudos and thanks for the > patch and test cases. (I verified that both fail before the code fix.) Thank you! > Here's a v3. I applied a lot of makeup in order to try to understand > what's going on. I *think* I have a grasp on the original code and your > bugfix, not terribly firm I admit. Well, when I started digging it, I have found logical decoding code somewhat underdocumented. If you or any other committer will consider the patch trying to improve the comments, I can prepare one. > * you don't need to Assert that things are not NULL if you're > immediately going to dereference them. Indeed. > * I think setting snapshot_base to NULL in ReorderBufferCleanupTXN is > pointless, since the struct is gonna be freed shortly afterwards. Yeah, but it is a general style of the original code, e.g. see /* cosmetic... */ comments. It probably makes the picture a bit more aesthetic and consistent, kind of final chord, though I don't mind removing it. > * I rewrote many comments (both existing and some of the ones your patch > adds), and added lots of comments where there were none. Now the code is nicer, thanks. > * I'm a bit unsure about the comment atop ReorderBufferSetBaseSnapshot. > Obviously, the bit within the #if 0/#endif I'm going to remove before > push. It looks like you've started editing that bit and didn't finish. > I don't understand why it says "Needs to be called before any > changes are added with ReorderBufferQueueChange"; but if you edit that > function and add an assert that the base snapshot is set, it crashes > pretty quickly in the test_decoding tests. (The supposedly bogus > comment was there before your patch -- I'm not saying your comment > addition is faulty.) That's because REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID changes are queued whenever we have read that xact has modified the catalog, regardless of base snapshot existence. Even if there are no changes yet, we will need it for correct visibility of catalog, so I am inclined to remove the assert and comment or rephrase the latter with 'any *data-carrying* changes'. > * I also noticed that we're doing subtxn cleanup one by one in both > ReorderBufferAssignChild and ReorderBufferCommitChild, which means the > top-level txn is sought in the hash table over and over, which seems a > bit silly. Not this patch's problem to fix ... There is already one-entry cache in ReorderBufferTXNByXid. We could add 'don't cache me' flag to it and use it with subxacts, or simply pull top-level reorderbuffer out of loops. I'm fine with the rest of your edits. One more little comment: @@ -543,9 +546,10 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferTXN *txn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + Assert(txn->base_snapshot != NULL || txn->is_known_as_subxact); change->lsn = lsn; - Assert(InvalidXLogRecPtr != lsn); + Assert(!XLogRecPtrIsInvalid(lsn)); dlist_push_tail(&txn->changes, &change->node); txn->nentries++; txn->nentries_mem++; Since we do that, probably we should replace all lsn validity checks with XLogRectPtrIsInvalid? -- Arseny Sher Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
On 2018-Jun-26, Arseny Sher wrote: > Alvaro Herrera <alvherre@2ndquadrant.com> writes: > > * I'm a bit unsure about the comment atop ReorderBufferSetBaseSnapshot. > > Obviously, the bit within the #if 0/#endif I'm going to remove before > > push. > > It looks like you've started editing that bit and didn't finish. Yeah, I left those lines in place as a reminder that I need to finish editing, wondering if there's any nuance I'm missing that I need to add to the final version. > > I don't understand why it says "Needs to be called before any > > changes are added with ReorderBufferQueueChange"; but if you edit that > > function and add an assert that the base snapshot is set, it crashes > > pretty quickly in the test_decoding tests. (The supposedly bogus > > comment was there before your patch -- I'm not saying your comment > > addition is faulty.) > > That's because REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID changes are > queued whenever we have read that xact has modified the catalog, > regardless of base snapshot existence. Even if there are no changes yet, > we will need it for correct visibility of catalog, so I am inclined to > remove the assert and comment or rephrase the latter with 'any > *data-carrying* changes'. I'm struggling with this assert. I find that test_decoding passes tests with this assertion in branch master, but fails in 9.4 - 10. Maybe I'm running the tests wrong (no assertions in master?) but I don't see it. It *should* fail ... > > * I also noticed that we're doing subtxn cleanup one by one in both > > ReorderBufferAssignChild and ReorderBufferCommitChild, which means the > > top-level txn is sought in the hash table over and over, which seems a > > bit silly. Not this patch's problem to fix ... > > There is already one-entry cache in ReorderBufferTXNByXid. That's useless, because we use two xids: first the parent; then the subxact. Looking up the subxact evicts the parent from the one-entry cache, so when we loop to process next subxact, the parent is no longer cached :-) > We could add 'don't cache me' flag to it and use it with subxacts, or > simply pull top-level reorderbuffer out of loops. Yeah, but that's an API change. > I'm fine with the rest of your edits. One more little comment: > > @@ -543,9 +546,10 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, > ReorderBufferTXN *txn; > > txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); > + Assert(txn->base_snapshot != NULL || txn->is_known_as_subxact); > > change->lsn = lsn; > - Assert(InvalidXLogRecPtr != lsn); > + Assert(!XLogRecPtrIsInvalid(lsn)); > dlist_push_tail(&txn->changes, &change->node); > txn->nentries++; > txn->nentries_mem++; > > Since we do that, probably we should replace all lsn validity checks > with XLogRectPtrIsInvalid? I reverted this back to how it was originally. We can change it as a style item later in all places where it appears (branch master only), but modifying only one in a backpatched commit seems poor practice. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Alvaro Herrera <alvherre@2ndquadrant.com> writes: > I'm struggling with this assert. I find that test_decoding passes tests > with this assertion in branch master, but fails in 9.4 - 10. Maybe I'm > running the tests wrong (no assertions in master?) but I don't see it. > It *should* fail ... Your v3 patch fails for me on freshest master (4d54543efa) in exactly this assert, it looks like there is something wrong in your setup. -- Arseny Sher Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
Pushed this to 9.4 - master after some more tinkering. It occurred to me that it might be better to have ReorderBufferSetBaseSnapshot do the IncrRefCount instead of expecting caller to do it. But I wouldn't backpatch that change, so I refrained. Thanks for the patch. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services