From 11b2f444ea56f82c59a2e29e10b037c7dd3ee96d Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 5 Feb 2025 11:37:49 +0800 Subject: [PATCH v28 7/7] Support the conflict detection for update_deleted This patch supports detecting update_deleted conflicts during update operations. If the target row cannot be found when applying update operations, we perform an additional scan of the table using snapshotAny. This scan aims to locate the most recently deleted row that matches the old column values from the remote update operation and has not yet been removed by VACUUM. If any such tuples are found, we report the update_deleted conflict along with the origin and transaction information that deleted the tuple. --- doc/src/sgml/logical-replication.sgml | 15 ++ doc/src/sgml/monitoring.sgml | 11 ++ src/backend/catalog/system_views.sql | 1 + src/backend/executor/execReplication.c | 138 +++++++++++++++++- src/backend/replication/logical/conflict.c | 22 +++ src/backend/replication/logical/worker.c | 61 +++++--- src/backend/utils/adt/pgstatfuncs.c | 16 +- src/include/catalog/pg_proc.dat | 6 +- src/include/executor/executor.h | 7 +- src/include/replication/conflict.h | 3 + src/test/regress/expected/rules.out | 3 +- .../t/035_retain_conflict_info.pl | 23 ++- 12 files changed, 265 insertions(+), 41 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 632c102c2ae..9d40d8e1677 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1818,6 +1818,21 @@ test_sub=# SELECT * from tab_gen_to_gen; + + update_deleted + + + The tuple to be updated was deleted by another origin. The update will + simply be skipped in this scenario. + Note that this conflict can only be detected when + + and retain_conflict_info + are enabled. Note that if a tuple cannot be found due to the table being + truncated only a update_missing conflict will arise. + arise + + + update_origin_differs diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index f5c1bba496a..7f6ee9719e0 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2199,6 +2199,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + confl_update_deleted bigint + + + Number of times the tuple to be updated was deleted by another origin + during the application of changes. See + for details about this conflict. + + + confl_update_origin_differs bigint diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index a0695ac7247..9f2708dd9a7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1385,6 +1385,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.apply_error_count, ss.sync_error_count, ss.confl_insert_exists, + ss.confl_update_deleted, ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 5f7613cc831..1e768f08c1b 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -14,12 +14,14 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/genam.h" #include "access/gist.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/heapam.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -27,6 +29,7 @@ #include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -36,7 +39,7 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq); + TypeCacheEntry **eq, Bitmapset *columns); /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that @@ -221,7 +224,7 @@ retry: if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - if (!tuples_equal(outslot, searchslot, eq)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; } @@ -277,10 +280,13 @@ retry: /* * Compare the tuples in the slots by checking if they have equal values. + * + * If 'columns' is not null, only the columns specified within it will be + * considered for the equality check, ignoring all other columns. */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq) + TypeCacheEntry **eq, Bitmapset *columns) { int attrnum; @@ -305,6 +311,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, if (att->attisdropped || att->attgenerated) continue; + /* + * Ignore columns that are not listed for checking. + */ + if (columns && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + columns)) + continue; + /* * If one value is NULL and other is not, then they are certainly not * equal @@ -380,7 +394,7 @@ retry: /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { - if (!tuples_equal(scanslot, searchslot, eq)) + if (!tuples_equal(scanslot, searchslot, eq, NULL)) continue; found = true; @@ -431,6 +445,122 @@ retry: return found; } +/* + * Searches the relation 'rel' for the most recently deleted tuple that matches + * the values in 'searchslot' and is not yet removable by VACUUM. The function + * returns the transaction ID, origin, and commit timestamp of the transaction + * that deleted this tuple. + * + * The commit timestamp of the transaction that deleted the tuple is used to + * determine whether the tuple is the most recently deleted one. + * + * This function performs a full table scan instead of using indexes because + * index scans could miss deleted tuples if an index has been re-indexed or + * re-created during change applications. While this approach may be slow on + * large tables, it is considered acceptable because it is only used in rare + * conflict cases where the target row for an update cannot be found. + */ +bool +FindMostRecentlyDeletedTupleInfo(Relation rel, TupleTableSlot *searchslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + TypeCacheEntry **eq; + TransactionId oldestXmin; + BufferHeapTupleTableSlot *hslot; + HeapTuple tuple; + Buffer buf; + Bitmapset *indexbitmap; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + + *delete_xid = InvalidTransactionId; + *delete_origin = InvalidRepOriginId; + *delete_time = 0; + + /* Exit early if the commit timestamp data is not available */ + if (!track_commit_timestamp) + return false; + + /* Get the cutoff xmin for HeapTupleSatisfiesVacuum */ + oldestXmin = GetOldestNonRemovableTransactionId(rel); + + /* Get the index column bitmap for tuples_equal */ + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* fallback to PK if no replica identity */ + if (!indexbitmap) + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + + eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts); + + /* + * Start a heap scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. + */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); + scanslot = table_slot_create(rel, NULL); + hslot = (BufferHeapTupleTableSlot *) scanslot; + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + bool dead = false; + TransactionId xmax; + TimestampTz localts; + RepOriginId localorigin; + + if (!tuples_equal(scanslot, searchslot, eq, indexbitmap)) + continue; + + tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL); + buf = hslot->buffer; + + LockBuffer(buf, BUFFER_LOCK_SHARE); + + /* + * We do not consider HEAPTUPLE_DEAD status because it indicates + * either tuples whose inserting transaction was aborted, meaning + * there is no commit timestamp or origin, or tuples deleted by a + * transaction older than oldestXmin, making it safe to ignore them + * during conflict detection (See comments atop + * maybe_advance_nonremovable_xid() for details). + */ + if (HeapTupleSatisfiesVacuum(tuple, oldestXmin, buf) == HEAPTUPLE_RECENTLY_DEAD) + dead = true; + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!dead) + continue; + + xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + + /* Select the dead tuple with the most recent commit timestamp */ + if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) && + (TimestampDifferenceExceeds(*delete_time, localts, 0) || + *delete_time == 0)) + { + *delete_xid = xmax; + *delete_time = localts; + *delete_origin = localorigin; + } + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + /* * Find the tuple that violates the passed unique index (conflictindex). * diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 772fc83e88b..0eedbd2b163 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -25,6 +25,7 @@ static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_DELETED] = "update_deleted", [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", @@ -170,6 +171,7 @@ errcode_apply_conflict(ConflictType type) case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: return errcode(ERRCODE_UNIQUE_VIOLATION); + case CT_UPDATE_DELETED: case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: case CT_DELETE_ORIGIN_DIFFERS: @@ -240,6 +242,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, break; + case CT_UPDATE_DELETED: + if (localts) + { + if (localorigin == InvalidRepOriginId) + appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."), + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin that modified this row has been removed. */ + else + appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + } + else + appendStringInfo(&err_detail, _("The row to be updated was deleted.")); + + break; + case CT_UPDATE_ORIGIN_DIFFERS: if (localorigin == InvalidRepOriginId) appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 548ebb37593..c1abd1f6fbd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2753,6 +2753,9 @@ apply_handle_update_internal(ApplyExecutionData *edata, TupleTableSlot *localslot; bool found; MemoryContext oldctx; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxid = InvalidTransactionId; + TimestampTz localts = 0; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); ExecOpenIndices(relinfo, true); @@ -2769,15 +2772,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && + if (GetTupleTransactionInfo(localslot, &localxid, &localorigin, &localts) && localorigin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -2788,7 +2787,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot, localslot, newslot, - InvalidOid, localxmin, localorigin, localts); + InvalidOid, localxid, localorigin, localts); } /* Process and store remote tuple in the slot */ @@ -2807,19 +2806,28 @@ apply_handle_update_internal(ApplyExecutionData *edata, } else { + ConflictType type; TupleTableSlot *newslot = localslot; + if (MySubscription->retainconflictinfo && + FindMostRecentlyDeletedTupleInfo(localrel, remoteslot, + &localxid, &localorigin, + &localts) && + localorigin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found or was deleted. Do + * nothing except for emitting a log message. */ - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, + ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, NULL, newslot, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + InvalidOid, localxid, localorigin, localts); } /* Cleanup. */ @@ -3149,9 +3157,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxid = InvalidTransactionId; + TimestampTz localts = 0; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3160,20 +3168,29 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part, &localslot); if (!found) { + ConflictType type; TupleTableSlot *newslot = localslot; + if (MySubscription->retainconflictinfo && + FindMostRecentlyDeletedTupleInfo(partrel, remoteslot_part, + &localxid, &localorigin, + &localts) && + localorigin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found or was + * deleted. Do nothing except for emitting a log message. */ - ReportApplyConflict(estate, partrelinfo, - LOG, CT_UPDATE_MISSING, + ReportApplyConflict(estate, partrelinfo, LOG, type, remoteslot_part, NULL, newslot, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + InvalidOid, localxid, + localorigin, localts); return; } @@ -3182,7 +3199,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a * different origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && + if (GetTupleTransactionInfo(localslot, &localxid, &localorigin, &localts) && localorigin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -3193,7 +3210,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot_part, localslot, newslot, - InvalidOid, localxmin, localorigin, + InvalidOid, localxid, localorigin, localts); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index e9096a88492..b6e79f554bd 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2159,7 +2159,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2181,17 +2181,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f388205d741..90dace4d421 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5625,9 +5625,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_deleted,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 30e2a82346f..f614f35db55 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -14,6 +14,7 @@ #ifndef EXECUTOR_H #define EXECUTOR_H +#include "datatype/timestamp.h" #include "executor/execdesc.h" #include "fmgr.h" #include "nodes/lockoptions.h" @@ -671,7 +672,11 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TupleTableSlot *outslot); extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); - +extern bool FindMostRecentlyDeletedTupleInfo(Relation rel, + TupleTableSlot *searchslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 37454dc9513..91224b6ea60 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -26,6 +26,9 @@ typedef enum /* The row to be inserted violates unique constraint */ CT_INSERT_EXISTS, + /* The row to be updated was deleted by a different origin */ + CT_UPDATE_DELETED, + /* The row to be updated was modified by a different origin */ CT_UPDATE_ORIGIN_DIFFERS, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 892ebb1136e..a6388eb9405 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2153,6 +2153,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.apply_error_count, ss.sync_error_count, ss.confl_insert_exists, + ss.confl_update_deleted, ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, @@ -2160,7 +2161,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_delete_missing, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_deleted, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/035_retain_conflict_info.pl b/src/test/subscription/t/035_retain_conflict_info.pl index 75539b2cba9..ec446191e95 100644 --- a/src/test/subscription/t/035_retain_conflict_info.pl +++ b/src/test/subscription/t/035_retain_conflict_info.pl @@ -1,7 +1,9 @@ # Copyright (c) 2025, PostgreSQL Global Development Group -# Test the management of the replication slot 'pg_conflict_detection'. +# Test the CREATE SUBSCRIPTION 'retain_conflict_info' parameter and its +# interaction with the management of the replication slot +# 'pg_conflict_detection'. use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; @@ -15,7 +17,9 @@ my $subname_BA = 'tap_sub_b_a'; # Setup a bidirectional logical replication between node_A & node_B ############################################################################### -# Initialize nodes. +# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect +# the conflict when attempting to update a row that was previously modified by +# a different origin. # node_A. Increase the log_min_messages setting to DEBUG2 to debug test # failures. Disable autovacuum to avoid generating xid that could affect the @@ -24,7 +28,8 @@ my $node_A = PostgreSQL::Test::Cluster->new('node_A'); $node_A->init(allows_streaming => 'logical'); $node_A->append_conf( 'postgresql.conf', - qq{autovacuum = off + qq{track_commit_timestamp = on + autovacuum = off log_min_messages = 'debug2'}); $node_A->start; @@ -120,6 +125,13 @@ ok( $stderr =~ qr/1 are dead but not yet removable/, 'the deleted column is non-removable'); +my $logfile = slurp_file($node_B->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .*\n.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'delete target row was modified in tab'); + +$log_location = -s $node_A->logfile; + # Remember the next transaction ID to be assigned my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); @@ -127,6 +139,11 @@ $node_A->safe_psql( 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); $node_B->wait_for_catchup($subname_AB); +$logfile = slurp_file($node_A->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=update_deleted.*\n.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*\n.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'update target row was deleted in tab'); + # Account for the transaction ID increment caused by enabling the subscription $next_xid++; -- 2.30.0.windows.2