From ff479604292adfd1ac569335d07caa0609a10a40 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 26 Mar 2025 16:18:27 +0800 Subject: [PATCH v29 7/7] Support the conflict detection for update_deleted This patch supports detecting update_deleted conflicts during update operations. If the target row cannot be found when applying update operations, we perform an additional scan of the table using snapshotAny. This scan aims to locate the most recently deleted row that matches the old column values from the remote update operation and has not yet been removed by VACUUM. If any such tuples are found, we report the update_deleted conflict along with the origin and transaction information that deleted the tuple. --- doc/src/sgml/logical-replication.sgml | 15 +++ doc/src/sgml/monitoring.sgml | 11 ++ src/backend/catalog/system_views.sql | 1 + src/backend/executor/execReplication.c | 138 ++++++++++++++++++++- src/backend/replication/logical/conflict.c | 22 ++++ src/backend/replication/logical/worker.c | 38 ++++-- src/backend/utils/adt/pgstatfuncs.c | 18 +-- src/include/catalog/pg_proc.dat | 6 +- src/include/executor/executor.h | 7 +- src/include/replication/conflict.h | 3 + src/test/regress/expected/rules.out | 3 +- src/test/subscription/t/035_conflicts.pl | 25 +++- 12 files changed, 260 insertions(+), 27 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 9fcb7c0ff73..9342272fd17 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1818,6 +1818,21 @@ test_sub=# SELECT * from tab_gen_to_gen; + + update_deleted + + + The tuple to be updated was deleted by another origin. The update will + simply be skipped in this scenario. + Note that this conflict can only be detected when + + and retain_conflict_info + are enabled. Note that if a tuple cannot be found due to the table being + truncated only a update_missing conflict will arise. + arise + + + update_origin_differs diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 6636459b870..823d11ecf63 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2205,6 +2205,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + confl_update_deleted bigint + + + Number of times the tuple to be updated was deleted by another origin + during the application of changes. See + for details about this conflict. + + + confl_update_origin_differs bigint diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5c30ac999ba..498a85804fa 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1381,6 +1381,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.apply_error_count, ss.sync_error_count, ss.confl_insert_exists, + ss.confl_update_deleted, ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index ede89ea3cf9..2c0bb395c00 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -14,12 +14,14 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/genam.h" #include "access/gist.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/heapam.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -27,6 +29,7 @@ #include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -36,7 +39,7 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq); + TypeCacheEntry **eq, Bitmapset *columns); /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that @@ -221,7 +224,7 @@ retry: if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - if (!tuples_equal(outslot, searchslot, eq)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; } @@ -277,10 +280,13 @@ retry: /* * Compare the tuples in the slots by checking if they have equal values. + * + * If 'columns' is not null, only the columns specified within it will be + * considered for the equality check, ignoring all other columns. */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq) + TypeCacheEntry **eq, Bitmapset *columns) { int attrnum; @@ -305,6 +311,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, if (att->attisdropped || att->attgenerated) continue; + /* + * Ignore columns that are not listed for checking. + */ + if (columns && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + columns)) + continue; + /* * If one value is NULL and other is not, then they are certainly not * equal @@ -380,7 +394,7 @@ retry: /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { - if (!tuples_equal(scanslot, searchslot, eq)) + if (!tuples_equal(scanslot, searchslot, eq, NULL)) continue; found = true; @@ -431,6 +445,122 @@ retry: return found; } +/* + * Searches the relation 'rel' for the most recently deleted tuple that matches + * the values in 'searchslot' and is not yet removable by VACUUM. The function + * returns the transaction ID, origin, and commit timestamp of the transaction + * that deleted this tuple. + * + * The commit timestamp of the transaction that deleted the tuple is used to + * determine whether the tuple is the most recently deleted one. + * + * This function performs a full table scan instead of using indexes because + * index scans could miss deleted tuples if an index has been re-indexed or + * re-created during change applications. While this approach may be slow on + * large tables, it is considered acceptable because it is only used in rare + * conflict cases where the target row for an update cannot be found. + */ +bool +FindMostRecentlyDeletedTupleInfo(Relation rel, TupleTableSlot *searchslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + TypeCacheEntry **eq; + TransactionId oldestXmin; + BufferHeapTupleTableSlot *hslot; + HeapTuple tuple; + Buffer buf; + Bitmapset *indexbitmap; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + + *delete_xid = InvalidTransactionId; + *delete_origin = InvalidRepOriginId; + *delete_time = 0; + + /* Exit early if the commit timestamp data is not available */ + if (!track_commit_timestamp) + return false; + + /* Get the cutoff xmin for HeapTupleSatisfiesVacuum */ + oldestXmin = GetOldestNonRemovableTransactionId(rel); + + /* Get the index column bitmap for tuples_equal */ + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* fallback to PK if no replica identity */ + if (!indexbitmap) + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + + eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts); + + /* + * Start a heap scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. + */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); + scanslot = table_slot_create(rel, NULL); + hslot = (BufferHeapTupleTableSlot *) scanslot; + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + bool dead = false; + TransactionId xmax; + TimestampTz localts; + RepOriginId localorigin; + + if (!tuples_equal(scanslot, searchslot, eq, indexbitmap)) + continue; + + tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL); + buf = hslot->buffer; + + LockBuffer(buf, BUFFER_LOCK_SHARE); + + /* + * We do not consider HEAPTUPLE_DEAD status because it indicates + * either tuples whose inserting transaction was aborted, meaning + * there is no commit timestamp or origin, or tuples deleted by a + * transaction older than oldestXmin, making it safe to ignore them + * during conflict detection (See comments atop + * maybe_advance_nonremovable_xid() for details). + */ + if (HeapTupleSatisfiesVacuum(tuple, oldestXmin, buf) == HEAPTUPLE_RECENTLY_DEAD) + dead = true; + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!dead) + continue; + + xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + + /* Select the dead tuple with the most recent commit timestamp */ + if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) && + (TimestampDifferenceExceeds(*delete_time, localts, 0) || + *delete_time == 0)) + { + *delete_xid = xmax; + *delete_time = localts; + *delete_origin = localorigin; + } + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + /* * Find the tuple that violates the passed unique index (conflictindex). * diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index f1e92f2fc1a..504ba27209a 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -25,6 +25,7 @@ static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_DELETED] = "update_deleted", [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", @@ -173,6 +174,7 @@ errcode_apply_conflict(ConflictType type) case CT_UPDATE_EXISTS: case CT_MULTIPLE_UNIQUE_CONFLICTS: return errcode(ERRCODE_UNIQUE_VIOLATION); + case CT_UPDATE_DELETED: case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: case CT_DELETE_ORIGIN_DIFFERS: @@ -246,6 +248,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, break; + case CT_UPDATE_DELETED: + if (localts) + { + if (localorigin == InvalidRepOriginId) + appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."), + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin that modified this row has been removed. */ + else + appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + } + else + appendStringInfo(&err_detail, _("The row to be updated was deleted.")); + + break; + case CT_UPDATE_ORIGIN_DIFFERS: if (localorigin == InvalidRepOriginId) appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4bb669fa95f..7ce53f16e23 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2807,17 +2807,28 @@ apply_handle_update_internal(ApplyExecutionData *edata, } else { + ConflictType type; TupleTableSlot *newslot = localslot; + if (MySubscription->retainconflictinfo && + FindMostRecentlyDeletedTupleInfo(localrel, remoteslot, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found or was deleted. Do + * nothing except for emitting a log message. */ - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, newslot, list_make1(&conflicttuple)); + ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot, + list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3155,18 +3166,29 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part, &localslot); if (!found) { + ConflictType type; TupleTableSlot *newslot = localslot; + if (MySubscription->retainconflictinfo && + FindMostRecentlyDeletedTupleInfo(partrel, remoteslot_part, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found or was + * deleted. Do nothing except for emitting a log message. */ ReportApplyConflict(estate, partrelinfo, LOG, - CT_UPDATE_MISSING, remoteslot_part, - newslot, list_make1(&conflicttuple)); + type, remoteslot_part, newslot, + list_make1(&conflicttuple)); return; } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 97af7c6554f..9f07b2a4b56 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2193,19 +2193,21 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 8a61ed40f6d..71d42eb06c0 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5647,9 +5647,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_deleted,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 6a1fec88928..c15e04e392c 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -14,6 +14,7 @@ #ifndef EXECUTOR_H #define EXECUTOR_H +#include "datatype/timestamp.h" #include "executor/execdesc.h" #include "fmgr.h" #include "nodes/lockoptions.h" @@ -783,7 +784,11 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TupleTableSlot *outslot); extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); - +extern bool FindMostRecentlyDeletedTupleInfo(Relation rel, + TupleTableSlot *searchslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 6c59125f256..cbd9656a60a 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -26,6 +26,9 @@ typedef enum /* The row to be inserted violates unique constraint */ CT_INSERT_EXISTS, + /* The row to be updated was deleted by a different origin */ + CT_UPDATE_DELETED, + /* The row to be updated was modified by a different origin */ CT_UPDATE_ORIGIN_DIFFERS, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e6a2eff3961..82e93d32eb3 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2153,6 +2153,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.apply_error_count, ss.sync_error_count, ss.confl_insert_exists, + ss.confl_update_deleted, ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, @@ -2161,7 +2162,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_deleted, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 1be8383faa5..c3b52d09652 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -114,7 +114,9 @@ pass('multiple_unique_conflicts detected during update'); # Setup a bidirectional logical replication between node_A & node_B ############################################################################### -# Initialize nodes. +# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect +# the conflict when attempting to update a row that was previously modified by +# a different origin. # node_A. Increase the log_min_messages setting to DEBUG2 to debug test # failures. Disable autovacuum to avoid generating xid that could affect the @@ -122,7 +124,8 @@ pass('multiple_unique_conflicts detected during update'); my $node_A = $node_publisher; $node_A->append_conf( 'postgresql.conf', - qq{autovacuum = off + qq{track_commit_timestamp = on + autovacuum = off log_min_messages = 'debug2'}); $node_A->restart; @@ -192,6 +195,8 @@ is($result, qq(t), 'worker on node B retains conflict information'); ############################################################################### # Check that dead tuples on node A cannot be cleaned by VACUUM until the # concurrent transactions on Node B have been applied and flushed on Node A. +# And check that an update_deleted conflict is detected when updating a row +# that was deleted by a different origin. ############################################################################### # Insert a record @@ -220,6 +225,15 @@ ok( $stderr =~ qr/1 are dead but not yet removable/, 'the deleted column is non-removable'); +my $logfile = slurp_file($node_B->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.* +.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .* +.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'delete target row was modified in tab'); + +$log_location = -s $node_A->logfile; + # Remember the next transaction ID to be assigned my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); @@ -227,6 +241,13 @@ $node_A->safe_psql( 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); $node_B->wait_for_catchup($subname_AB); +$logfile = slurp_file($node_A->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=update_deleted.* +.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .* +.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'update target row was deleted in tab'); + # Account for the transaction ID increment caused by enabling the subscription $next_xid++; -- 2.30.0.windows.2