From 946862e2a4dbfd91ac6802c2e8da104dce81c43a Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu Date: Tue, 2 Sep 2025 11:30:55 +0200 Subject: [PATCH v21 6/6] Preserve visibility information of the concurrent data changes. As explained in the commit message of the preceding patch of the series, the data changes done by applications while REPACK CONCURRENTLY is copying the table contents to a new file are decoded from WAL and eventually also applied to the new file. To reduce the complexity a little bit, the preceding patch uses the current transaction (i.e. transaction opened by the REPACK command) to execute those INSERT, UPDATE and DELETE commands. However, REPACK is not expected to change visibility of tuples. Therefore, this patch fixes the handling of the "concurrent data changes". It ensures that tuples written into the new table have the same XID and command ID (CID) as they had in the old table. To "replay" an UPDATE or DELETE command on the new table, we use SnapshotSelf to find the last alive version of tuple and update with stamp with xid of original transaction. It is safe because: * all transactions we replaying are committed * apply worker working without any concurrent modifiers of the table As long as we preserve the tuple visibility information (which includes XID), it's important to avoid logical decoding of the WAL generated by DMLs on the new table: the logical decoding subsystem probably does not expect that the incoming WAL records contain XIDs of an already decoded transactions. (And of course, repeated decoding would be wasted effort.) Author: Antonin Houska with changes from Mikhail Nikalayeu 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +my $node; + +# +# Test set-up +# +$node = PostgreSQL::Test::Cluster->new('CIC_test'); +$node->init; +$node->append_conf('postgresql.conf', + 'lock_timeout = ' . (1000 * $PostgreSQL::Test::Utils::timeout_default)); +$node->append_conf( + 'postgresql.conf', qq( +wal_level = logical +)); +$node->start; +$node->safe_psql('postgres', q(CREATE TABLE tbl1(i int PRIMARY KEY, j int))); +$node->safe_psql('postgres', q(CREATE TABLE tbl2(i int PRIMARY KEY, j int))); + + +# Insert 100 rows into tbl1 +$node->safe_psql('postgres', q( + INSERT INTO tbl1 SELECT i, i % 100 FROM generate_series(1,100) i +)); + +# Insert 100 rows into tbl2 +$node->safe_psql('postgres', q( + INSERT INTO tbl2 SELECT i, i % 100 FROM generate_series(1,100) i +)); + + +# Insert 100 rows into tbl1 +$node->safe_psql('postgres', q( + CREATE OR REPLACE FUNCTION log_raise(i int, j1 int, j2 int) RETURNS VOID AS $$ + BEGIN + RAISE NOTICE 'ERROR i=% j1=% j2=%', i, j1, j2; + END;$$ LANGUAGE plpgsql; +)); + +$node->safe_psql('postgres', q(CREATE UNLOGGED SEQUENCE in_row_rebuild START 1 INCREMENT 1;)); +$node->safe_psql('postgres', q(SELECT nextval('in_row_rebuild');)); + + +$node->pgbench( +'--no-vacuum --client=10 --jobs=4 --exit-on-abort --transactions=2500', +0, +[qr{actually processed}], +[qr{^$}], +'concurrent operations with REINDEX/CREATE INDEX CONCURRENTLY', +{ + 'concurrent_ops' => q( + SELECT pg_try_advisory_lock(42)::integer AS gotlock \gset + \if :gotlock + SELECT nextval('in_row_rebuild') AS last_value \gset + \if :last_value = 2 + REPACK (CONCURRENTLY) tbl1 USING INDEX tbl1_pkey; + \sleep 10 ms + REPACK (CONCURRENTLY) tbl2 USING INDEX tbl2_pkey; + \sleep 10 ms + \endif + SELECT pg_advisory_unlock(42); + \else + \set num random(1, 100) + BEGIN; + UPDATE tbl1 SET j = j + 1 WHERE i = :num; + \sleep 1 ms + UPDATE tbl1 SET j = j + 2 WHERE i = :num; + \sleep 1 ms + UPDATE tbl1 SET j = j + 3 WHERE i = :num; + \sleep 1 ms + UPDATE tbl1 SET j = j + 4 WHERE i = :num; + \sleep 1 ms + + UPDATE tbl2 SET j = j + 1 WHERE i = :num; + \sleep 1 ms + UPDATE tbl2 SET j = j + 2 WHERE i = :num; + \sleep 1 ms + UPDATE tbl2 SET j = j + 3 WHERE i = :num; + \sleep 1 ms + UPDATE tbl2 SET j = j + 4 WHERE i = :num; + + COMMIT; + SELECT setval('in_row_rebuild', 1); + + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + SELECT COALESCE(SUM(j), 0) AS t1 FROM tbl1 WHERE i = :num \gset p_ + \sleep 10 ms + SELECT COALESCE(SUM(j), 0) AS t2 FROM tbl2 WHERE i = :num \gset p_ + \if :p_t1 != :p_t2 + COMMIT; + SELECT log_raise(tbl1.i, tbl1.j, tbl2.j) FROM tbl1 LEFT OUTER JOIN tbl2 ON tbl1.i = tbl2.i WHERE tbl1.j != tbl2.j; + \sleep 10 ms + SELECT log_raise(tbl1.i, tbl1.j, tbl2.j) FROM tbl1 LEFT OUTER JOIN tbl2 ON tbl1.i = tbl2.i WHERE tbl1.j != tbl2.j; + SELECT (:p_t1 + :p_t2) / 0; + \endif + + COMMIT; + \endif + ) +}); + +$node->stop; +done_testing(); diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml index 0f5c34af542..049ee75a4ba 100644 --- a/doc/src/sgml/mvcc.sgml +++ b/doc/src/sgml/mvcc.sgml @@ -1833,17 +1833,15 @@ SELECT pg_advisory_lock(q.id) FROM Caveats - Some commands, currently only TRUNCATE, the - table-rewriting forms of ALTER - TABLE and REPACK with - the CONCURRENTLY option, are not + Some DDL commands, currently only TRUNCATE and the + table-rewriting forms of ALTER TABLE, are not MVCC-safe. This means that after the truncation or rewrite commits, the table will appear empty to concurrent transactions, if they are using a - snapshot taken before the command committed. This will only be an + snapshot taken before the DDL command committed. This will only be an issue for a transaction that did not access the table in question - before the command started — any transaction that has done so + before the DDL command started — any transaction that has done so would hold at least an ACCESS SHARE table lock, - which would block the truncating or rewriting command until that transaction completes. + which would block the DDL command until that transaction completes. So these commands will not cause any apparent inconsistency in the table contents for successive queries on the target table, but they could cause visible inconsistency between the contents of the target diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml index ff5ce48de55..271923a5a60 100644 --- a/doc/src/sgml/ref/repack.sgml +++ b/doc/src/sgml/ref/repack.sgml @@ -292,15 +292,6 @@ REPACK [ ( option [, ...] ) ] [ - - - - REPACK with the CONCURRENTLY - option is not MVCC-safe, see for - details. - - - diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index a1d0eed8953..586eb42a137 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -320,7 +320,8 @@ toast_save_datum(Relation rel, Datum value, memcpy(VARDATA(&chunk_data), data_p, chunk_size); toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull); - heap_insert(toastrel, toasttup, mycid, options, NULL); + heap_insert(toastrel, toasttup, GetCurrentTransactionId(), mycid, + options, NULL); /* * Create the index entry. We cheat a little here by not using diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f9a4fe3faed..45da5902de0 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2070,7 +2070,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate) /* * heap_insert - insert tuple into a heap * - * The new tuple is stamped with current transaction ID and the specified + * The new tuple is stamped with specified transaction ID and the specified * command ID. * * See table_tuple_insert for comments about most of the input flags, except @@ -2086,15 +2086,16 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate) * reflected into *tup. */ void -heap_insert(Relation relation, HeapTuple tup, CommandId cid, - int options, BulkInsertState bistate) +heap_insert(Relation relation, HeapTuple tup, TransactionId xid, + CommandId cid, int options, BulkInsertState bistate) { - TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; + Assert(TransactionIdIsValid(xid)); + /* Cheap, simplistic check that the tuple matches the rel's rowtype. */ Assert(HeapTupleHeaderGetNatts(tup->t_data) <= RelationGetNumberOfAttributes(relation)); @@ -2176,8 +2177,15 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, /* * If this is a catalog, we need to transmit combo CIDs to properly * decode, so log that as well. + * + * HEAP_INSERT_NO_LOGICAL should be set when applying data changes + * done by other transactions during REPACK CONCURRENTLY. In such a + * case, the insertion should not be decoded at all - see + * heap_decode(). (It's also set by raw_heap_insert() for TOAST, but + * TOAST does not pass this test anyway.) */ - if (RelationIsAccessibleInLogicalDecoding(relation)) + if ((options & HEAP_INSERT_NO_LOGICAL) == 0 && + RelationIsAccessibleInLogicalDecoding(relation)) log_heap_new_cid(relation, heaptup); /* @@ -2723,7 +2731,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, void simple_heap_insert(Relation relation, HeapTuple tup) { - heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL); + heap_insert(relation, tup, GetCurrentTransactionId(), + GetCurrentCommandId(true), 0, NULL); } /* @@ -2780,11 +2789,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask) */ TM_Result heap_delete(Relation relation, ItemPointer tid, - CommandId cid, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart, bool wal_logical) + TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, + TM_FailureData *tmfd, bool changingPart, + bool wal_logical) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData tp; Page page; @@ -2801,6 +2810,7 @@ heap_delete(Relation relation, ItemPointer tid, bool old_key_copied = false; Assert(ItemPointerIsValid(tid)); + Assert(TransactionIdIsValid(xid)); AssertHasSnapshotForToast(relation); @@ -3217,7 +3227,7 @@ simple_heap_delete(Relation relation, ItemPointer tid) TM_Result result; TM_FailureData tmfd; - result = heap_delete(relation, tid, + result = heap_delete(relation, tid, GetCurrentTransactionId(), GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , &tmfd, false, /* changingPart */ @@ -3260,12 +3270,11 @@ simple_heap_delete(Relation relation, ItemPointer tid) */ TM_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, - CommandId cid, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, LockTupleMode *lockmode, + TransactionId xid, CommandId cid, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes, bool wal_logical) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); Bitmapset *hot_attrs; Bitmapset *sum_attrs; Bitmapset *key_attrs; @@ -3305,6 +3314,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, infomask2_new_tuple; Assert(ItemPointerIsValid(otid)); + Assert(TransactionIdIsValid(xid)); /* Cheap, simplistic check that the tuple matches the rel's rowtype. */ Assert(HeapTupleHeaderGetNatts(newtup->t_data) <= @@ -4144,8 +4154,12 @@ l2: /* * For logical decoding we need combo CIDs to properly decode the * catalog. + * + * Like in heap_insert(), visibility is unchanged when called from + * VACUUM FULL / CLUSTER. */ - if (RelationIsAccessibleInLogicalDecoding(relation)) + if (wal_logical && + RelationIsAccessibleInLogicalDecoding(relation)) { log_heap_new_cid(relation, &oldtup); log_heap_new_cid(relation, heaptup); @@ -4511,7 +4525,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup, TM_FailureData tmfd; LockTupleMode lockmode; - result = heap_update(relation, otid, tup, + result = heap_update(relation, otid, tup, GetCurrentTransactionId(), GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , &tmfd, &lockmode, update_indexes, @@ -5351,8 +5365,6 @@ compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 new_infomask, new_infomask2; - Assert(TransactionIdIsCurrentTransactionId(add_to_xmax)); - l5: new_infomask = 0; new_infomask2 = 0; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d03084768e0..6733e5fdda6 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -253,7 +253,8 @@ heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, tuple->t_tableOid = slot->tts_tableOid; /* Perform the insertion, and copy the resulting ItemPointer */ - heap_insert(relation, tuple, cid, options, bistate); + heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options, + bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) @@ -276,7 +277,8 @@ heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, options |= HEAP_INSERT_SPECULATIVE; /* Perform the insertion, and copy the resulting ItemPointer */ - heap_insert(relation, tuple, cid, options, bistate); + heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options, + bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) @@ -310,8 +312,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart, - true); + return heap_delete(relation, tid, GetCurrentTransactionId(), cid, + crosscheck, wait, tmfd, changingPart, true); } @@ -329,7 +331,8 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, slot->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slot->tts_tableOid; - result = heap_update(relation, otid, tuple, cid, crosscheck, wait, + result = heap_update(relation, otid, tuple, GetCurrentTransactionId(), + cid, crosscheck, wait, tmfd, lockmode, update_indexes, true); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); @@ -2477,9 +2480,16 @@ reform_and_rewrite_tuple(HeapTuple tuple, * flag to skip logical decoding: as soon as REPACK CONCURRENTLY swaps * the relation files, it drops this relation, so no logical * replication subscription should need the data. + * + * It is also crucial to stamp the new record with the exact same xid + * and cid, because the tuple must be visible to the snapshots of the + * concurrent transactions later. */ - heap_insert(NewHeap, copiedTuple, GetCurrentCommandId(true), - HEAP_INSERT_NO_LOGICAL, NULL); + // TODO: looks like cid is not required + CommandId cid = HeapTupleHeaderGetRawCommandId(tuple->t_data); + TransactionId xid = HeapTupleHeaderGetXmin(tuple->t_data); + + heap_insert(NewHeap, copiedTuple, xid, cid, HEAP_INSERT_NO_LOGICAL, NULL); } heap_freetuple(copiedTuple); diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 61224a3adf2..936cb0ae429 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -55,6 +55,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/predicate.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -146,6 +147,7 @@ static void apply_concurrent_delete(Relation rel, HeapTuple tup_target, ConcurrentChange *change); static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, + Snapshot snapshot, IndexInsertState *iistate, TupleTableSlot *ident_slot, IndexScanDesc *scan_p); @@ -1008,7 +1010,14 @@ rebuild_relation(RepackCommand cmd, bool usingindex, /* The historic snapshot won't be needed anymore. */ if (snapshot) + { + TransactionId xmin = snapshot->xmin; PopActiveSnapshot(); + Assert(concurrent); + // TODO: seems like it not required: need to check SnapBuildInitialSnapshotForRepack + WaitForOlderSnapshots(xmin, false); + } + if (concurrent) { @@ -1299,30 +1308,35 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, * not to be aggressive about this. */ memset(¶ms, 0, sizeof(VacuumParams)); - vacuum_get_cutoffs(OldHeap, params, &cutoffs); - - /* - * FreezeXid will become the table's new relfrozenxid, and that mustn't go - * backwards, so take the max. - */ + if (!concurrent) { TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid; + MultiXactId relminmxid = OldHeap->rd_rel->relminmxid; + vacuum_get_cutoffs(OldHeap, params, &cutoffs); + /* + * FreezeXid will become the table's new relfrozenxid, and that mustn't go + * backwards, so take the max. + */ if (TransactionIdIsValid(relfrozenxid) && TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid)) cutoffs.FreezeLimit = relfrozenxid; - } - - /* - * MultiXactCutoff, similarly, shouldn't go backwards either. - */ - { - MultiXactId relminmxid = OldHeap->rd_rel->relminmxid; - + /* + * MultiXactCutoff, similarly, shouldn't go backwards either. + */ if (MultiXactIdIsValid(relminmxid) && MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid)) cutoffs.MultiXactCutoff = relminmxid; } + else + { + /* + * In concurrent mode we reuse all the xmin/xmax, + * so just use current values for simplicity. + */ + cutoffs.FreezeLimit = OldHeap->rd_rel->relfrozenxid; + cutoffs.MultiXactCutoff = OldHeap->rd_rel->relminmxid; + } /* * Decide whether to use an indexscan or seqscan-and-optional-sort to scan @@ -2675,6 +2689,16 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel, continue; } + if (TransactionIdIsInProgress(change.xid)) + { + /* xid is committed for sure because we got that update from reorderbuffer. + * but there is a possibility procarray is not yet updated and current backend still see it as + * in-progress. Let's wait for procarray to be updated. */ + XactLockTableWait(change.xid, NULL, NULL, XLTW_None); + Assert(!TransactionIdIsInProgress(change.xid)); + Assert(TransactionIdDidCommit(change.xid)); + } + /* * Extract the tuple from the change. The tuple is copied here because * it might be assigned to 'tup_old', in which case it needs to @@ -2712,9 +2736,13 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel, } /* - * Find the tuple to be updated or deleted. + * Find the tuple to be updated or deleted using SnapshotSelf. + * That way we receive the last alive version in case of HOT chain. + * It is guaranteed there is no any non-yet committed, but updated version + * because we here replaying all-committed transactions without any concurrency + * involved. */ - tup_exist = find_target_tuple(rel, key, nkeys, tup_key, + tup_exist = find_target_tuple(rel, key, nkeys, tup_key, SnapshotSelf, iistate, ident_slot, &ind_scan); if (tup_exist == NULL) elog(ERROR, "Failed to find target tuple"); @@ -2743,6 +2771,7 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel, */ if (change.kind != CHANGE_UPDATE_OLD) { + // TODO: not sure it is required at all: we are replaying committed transactions stamping them with committed XID CommandCounterIncrement(); UpdateActiveSnapshotCommandId(); } @@ -2771,9 +2800,11 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, * Like simple_heap_insert(), but make sure that the INSERT is not * logically decoded - see reform_and_rewrite_tuple() for more * information. + * + * Use already committed xid to stamp the tuple. */ - heap_insert(rel, tup, GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, - NULL); + heap_insert(rel, tup, change->xid, GetCurrentCommandId(true), + HEAP_INSERT_NO_LOGICAL, NULL); /* * Update indexes. @@ -2781,6 +2812,7 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, * In case functions in the index need the active snapshot and caller * hasn't set one. */ + PushActiveSnapshot(GetLatestSnapshot()); ExecStoreHeapTuple(tup, index_slot, false); recheck = ExecInsertIndexTuples(iistate->rri, index_slot, @@ -2791,6 +2823,7 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, NIL, /* arbiterIndexes */ false /* onlySummarizing */ ); + PopActiveSnapshot(); /* * If recheck is required, it must have been preformed on the source @@ -2819,9 +2852,11 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, * * Do it like in simple_heap_update(), except for 'wal_logical' (and * except for 'wait'). + * + * Use already committed xid to stamp the tuple. */ res = heap_update(rel, &tup_target->t_self, tup, - GetCurrentCommandId(true), + change->xid, GetCurrentCommandId(true), InvalidSnapshot, false, /* no wait - only we are doing changes */ &tmfd, &lockmode, &update_indexes, @@ -2833,6 +2868,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, if (update_indexes != TU_None) { + PushActiveSnapshot(GetLatestSnapshot()); recheck = ExecInsertIndexTuples(iistate->rri, index_slot, iistate->estate, @@ -2842,6 +2878,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, NIL, /* arbiterIndexes */ /* onlySummarizing */ update_indexes == TU_Summarizing); + PopActiveSnapshot(); list_free(recheck); } @@ -2860,9 +2897,11 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target, * * Do it like in simple_heap_delete(), except for 'wal_logical' (and * except for 'wait'). + * + * Use already committed xid to stamp the tuple. */ - res = heap_delete(rel, &tup_target->t_self, GetCurrentCommandId(true), - InvalidSnapshot, false, + res = heap_delete(rel, &tup_target->t_self, change->xid, + GetCurrentCommandId(true), InvalidSnapshot, false, &tmfd, false, /* no wait - only we are doing changes */ false /* wal_logical */ ); @@ -2886,7 +2925,7 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target, */ static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, - IndexInsertState *iistate, + Snapshot snapshot, IndexInsertState *iistate, TupleTableSlot *ident_slot, IndexScanDesc *scan_p) { IndexScanDesc scan; @@ -2895,7 +2934,7 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, HeapTuple result = NULL; /* XXX no instrumentation for now */ - scan = index_beginscan(rel, iistate->ident_index, GetActiveSnapshot(), + scan = index_beginscan(rel, iistate->ident_index, snapshot, NULL, nkeys, 0); *scan_p = scan; index_rescan(scan, key, nkeys, NULL, 0); diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c index 687fbbc59bb..020ff7b7c80 100644 --- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c +++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c @@ -32,7 +32,8 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx, Relation relations[], ReorderBufferChange *change); static void store_change(LogicalDecodingContext *ctx, - ConcurrentChangeKind kind, HeapTuple tuple); + ConcurrentChangeKind kind, HeapTuple tuple, + TransactionId xid); void _PG_output_plugin_init(OutputPluginCallbacks *cb) @@ -124,7 +125,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (newtuple == NULL) elog(ERROR, "Incomplete insert info."); - store_change(ctx, CHANGE_INSERT, newtuple); + store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid); } break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -141,9 +142,11 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, elog(ERROR, "Incomplete update info."); if (oldtuple != NULL) - store_change(ctx, CHANGE_UPDATE_OLD, oldtuple); + store_change(ctx, CHANGE_UPDATE_OLD, oldtuple, + change->txn->xid); - store_change(ctx, CHANGE_UPDATE_NEW, newtuple); + store_change(ctx, CHANGE_UPDATE_NEW, newtuple, + change->txn->xid); } break; case REORDER_BUFFER_CHANGE_DELETE: @@ -156,7 +159,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (oldtuple == NULL) elog(ERROR, "Incomplete delete info."); - store_change(ctx, CHANGE_DELETE, oldtuple); + store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid); } break; default: @@ -190,13 +193,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (i == nrelations) return; - store_change(ctx, CHANGE_TRUNCATE, NULL); + store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId); } /* Store concurrent data change. */ static void store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, - HeapTuple tuple) + HeapTuple tuple, TransactionId xid) { RepackDecodingState *dstate; char *change_raw; @@ -266,6 +269,7 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, dst = dst_start + SizeOfConcurrentChange; memcpy(dst, tuple->t_data, tuple->t_len); + change.xid = xid; /* The data has been copied. */ if (flattened) pfree(tuple); diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index b82dd17a966..981425f23b6 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -316,22 +316,24 @@ extern BulkInsertState GetBulkInsertState(void); extern void FreeBulkInsertState(BulkInsertState); extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); -extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, - int options, BulkInsertState bistate); +extern void heap_insert(Relation relation, HeapTuple tup, TransactionId xid, + CommandId cid, int options, BulkInsertState bistate); extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); extern TM_Result heap_delete(Relation relation, ItemPointer tid, - CommandId cid, Snapshot crosscheck, bool wait, + TransactionId xid, CommandId cid, + Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart, bool wal_logical); extern void heap_finish_speculative(Relation relation, ItemPointer tid); extern void heap_abort_speculative(Relation relation, ItemPointer tid); extern TM_Result heap_update(Relation relation, ItemPointer otid, - HeapTuple newtup, + HeapTuple newtup, TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, LockTupleMode *lockmode, - TU_UpdateIndexes *update_indexes, bool wal_logical); + TU_UpdateIndexes *update_indexes, + bool wal_logical); extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index 4a508c57a50..242f8da770a 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -61,6 +61,8 @@ typedef struct ConcurrentChange /* See the enum above. */ ConcurrentChangeKind kind; + /* Transaction that changes the data. */ + TransactionId xid; /* * The actual tuple. * diff --git a/src/test/modules/injection_points/specs/repack.spec b/src/test/modules/injection_points/specs/repack.spec index 75850334986..3711a7c92b9 100644 --- a/src/test/modules/injection_points/specs/repack.spec +++ b/src/test/modules/injection_points/specs/repack.spec @@ -86,9 +86,6 @@ step change_new # When applying concurrent data changes, we should see the effects of an # in-progress subtransaction. # -# XXX Not sure this test is useful now - it was designed for the patch that -# preserves tuple visibility and which therefore modifies -# TransactionIdIsCurrentTransactionId(). step change_subxact1 { BEGIN; @@ -103,7 +100,6 @@ step change_subxact1 # When applying concurrent data changes, we should not see the effects of a # rolled back subtransaction. # -# XXX Is this test useful? See above. step change_subxact2 { BEGIN; -- 2.43.0