From 21d3b1ae822e82d87988d60b1d7c8a2e003565c0 Mon Sep 17 00:00:00 2001 From: nkey Date: Wed, 3 Sep 2025 19:08:55 +0200 Subject: [PATCH v18 2/2] Fix logical replication conflict detection during tuple lookup SNAPSHOT_DIRTY scans could miss conflict detection with concurrent transactions during logical replication. Replace SNAPSHOT_DIRTY scan with the GetLatestSnapshot in RelationFindReplTupleByIndex and RelationFindReplTupleSeq. --- src/backend/catalog/index.c | 2 +- src/backend/executor/execReplication.c | 64 ++++++++------------------ 2 files changed, 19 insertions(+), 47 deletions(-) diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 9407c357f27..05d21a3ccfe 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1733,7 +1733,7 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, const char *oldName) tgForm->tgconstrindid = newIndexId; - CatalogTupleUpdate(pg_trigger, &triggerTuple->t_self, triggerTuple); + CatalogTupleUpdate(pg_trigger, &triggerTuple->t_self, triggerTuple); heap_freetuple(triggerTuple); } diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b2ca5cbf117..82629aa33d6 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -187,8 +187,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, ScanKeyData skey[INDEX_MAX_KEYS]; int skey_attoff; IndexScanDesc scan; - SnapshotData snap; - TransactionId xwait; Relation idxrel; bool found; TypeCacheEntry **eq = NULL; @@ -199,18 +197,18 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); - InitDirtySnapshot(snap); - /* Build scan key. */ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); - /* Start an index scan. */ + /* Start an index scan. SnapshotAny will be replaced below. */ scan = index_beginscan(rel, idxrel, - &snap, NULL, skey_attoff, 0, SO_NONE); + SnapshotAny, NULL, skey_attoff, 0, SO_NONE); retry: found = false; - + PushActiveSnapshot(GetLatestSnapshot()); + /* Update the actual scan snapshot each retry */ + scan->xs_snapshot = GetActiveSnapshot(); index_rescan(scan, skey, skey_attoff, NULL, 0); /* Try to find the tuple */ @@ -231,19 +229,6 @@ retry: ExecMaterializeSlot(outslot); - xwait = TransactionIdIsValid(snap.xmin) ? - snap.xmin : snap.xmax; - - /* - * If the tuple is locked, wait for locking transaction to finish and - * retry. - */ - if (TransactionIdIsValid(xwait)) - { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; - } - /* Found our tuple and it's not locked */ found = true; break; @@ -255,8 +240,6 @@ retry: TM_FailureData tmfd; TM_Result res; - PushActiveSnapshot(GetLatestSnapshot()); - res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), @@ -265,13 +248,15 @@ retry: 0 /* don't follow updates */ , &tmfd); - PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + { + PopActiveSnapshot(); goto retry; + } } index_endscan(scan); + PopActiveSnapshot(); /* Don't release lock until commit. */ index_close(idxrel, NoLock); @@ -372,9 +357,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, { TupleTableSlot *scanslot; TableScanDesc scan; - SnapshotData snap; TypeCacheEntry **eq; - TransactionId xwait; bool found; TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); @@ -382,14 +365,15 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts); - /* Start a heap scan. */ - InitDirtySnapshot(snap); - scan = table_beginscan(rel, &snap, 0, NULL, - SO_NONE); + /* Start a heap scan. SnapshotAny will be replaced below. */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL, SO_NONE); scanslot = table_slot_create(rel, NULL); retry: found = false; + PushActiveSnapshot(GetLatestSnapshot()); + /* Update the actual scan snapshot each retry */ + scan->rs_snapshot = GetActiveSnapshot(); table_rescan(scan, NULL); @@ -402,19 +386,6 @@ retry: found = true; ExecCopySlot(outslot, scanslot); - xwait = TransactionIdIsValid(snap.xmin) ? - snap.xmin : snap.xmax; - - /* - * If the tuple is locked, wait for locking transaction to finish and - * retry. - */ - if (TransactionIdIsValid(xwait)) - { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; - } - /* Found our tuple and it's not locked */ break; } @@ -425,8 +396,6 @@ retry: TM_FailureData tmfd; TM_Result res; - PushActiveSnapshot(GetLatestSnapshot()); - res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), @@ -435,13 +404,16 @@ retry: 0 /* don't follow updates */ , &tmfd); - PopActiveSnapshot(); if (should_refetch_tuple(res, &tmfd)) + { + PopActiveSnapshot(); goto retry; + } } table_endscan(scan); + PopActiveSnapshot(); ExecDropSingleTupleTableSlot(scanslot); return found; -- 2.43.0