From 8d8c9208193cb93170b5b0423d52c5459b45890c Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Tue, 18 Mar 2025 12:27:48 +0530 Subject: [PATCH v5] Implement the conflict detection for multiple_unique_conflicts in logical replication Introduce a new conflict type, multiple_unique_conflicts, to handle cases where an incoming row during logical replication violates multiple UNIQUE constraints. Previously, the apply worker detected and reported only the first encountered key conflict (insert_exists/update_exists), causing repeated failures as each constraint violation need to be handled one by one making the process slow and error-prone. Now, the apply worker checks all unique constraints upfront and reports multiple_unique_conflicts if multiple violations exist. This allows users to resolve all conflicts at once by deleting all conflicting tuples rather than dealing with them individually or skipping the transaction. --- doc/src/sgml/logical-replication.sgml | 13 ++ src/backend/executor/execReplication.c | 29 ++-- src/backend/replication/logical/conflict.c | 76 +++++++--- src/backend/replication/logical/worker.c | 18 +-- src/include/replication/conflict.h | 13 +- src/test/subscription/meson.build | 1 + .../t/035_multiple_unique_conflicts.pl | 133 ++++++++++++++++++ 7 files changed, 239 insertions(+), 44 deletions(-) create mode 100644 src/test/subscription/t/035_multiple_unique_conflicts.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 3d18e507bbc..4817206af7d 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen; + + multiple_unique_conflicts + + + Inserting a row or updating values of a row violates more than one + NOT DEFERRABLE unique constraint. Note that to log + the origin and commit timestamp details of the conflicting key, + track_commit_timestamp + should be enabled on the subscriber. In this case, an error will be + raised until the conflict is resolved manually. + + + Note that there are other conflict scenarios, such as exclusion constraint violations. Currently, we do not provide additional details for them in the diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 0a9b880d250..c5b31cd728b 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -493,25 +493,32 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot) { - /* Check all the unique indexes for a conflict */ + int conflicts = 0; + List *conflictSlots = NIL; + List *conflictIndexes = NIL; + TupleTableSlot *conflictslot; + + /* Check all the unique indexes for conflicts */ foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes) { - TupleTableSlot *conflictslot; - if (list_member_oid(recheckIndexes, uniqueidx) && FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot, &conflictslot)) { - RepOriginId origin; - TimestampTz committs; - TransactionId xmin; - - GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs); - ReportApplyConflict(estate, resultRelInfo, ERROR, type, - searchslot, conflictslot, remoteslot, - uniqueidx, xmin, origin, committs); + conflicts++; + + /* Add the conflict slot and index to their respective lists */ + conflictSlots = lappend(conflictSlots, conflictslot); + conflictIndexes = lappend_oid(conflictIndexes, uniqueidx); } } + + /* Report the conflict if found */ + if (conflicts) + ReportApplyConflict(estate, resultRelInfo, ERROR, + conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type, + searchslot, conflictSlots, remoteslot, + conflictIndexes, InvalidTransactionId, 0, 0); } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 772fc83e88b..cba85c16888 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = { [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", - [CT_DELETE_MISSING] = "delete_missing" + [CT_DELETE_MISSING] = "delete_missing", + [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; static int errcode_apply_conflict(ConflictType type); -static int errdetail_apply_conflict(EState *estate, +static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, @@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, - TimestampTz localts); + TimestampTz localts, StringInfo err_msg); static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, @@ -90,15 +91,15 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, * 'searchslot' should contain the tuple used to search the local tuple to be * updated or deleted. * - * 'localslot' should contain the existing local tuple, if any, that conflicts - * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the - * transaction information related to this existing local tuple. + * 'conflictSlots' list contain the existing local tuples, if any, that + * conflicts with the remote tuple. 'localxmin', 'localorigin', and 'localts' + * provide the transaction information related to this existing local tuple. * * 'remoteslot' should contain the remote new tuple, if any. * - * The 'indexoid' represents the OID of the unique index that triggered the - * constraint violation error. We use this to report the key values for - * conflicting tuple. + * The 'conflictIndexes' list represents the OIDs of the unique index that + * triggered the constraint violation error. We use this to report the key + * values for conflicting tuple. * * The caller must ensure that the index with the OID 'indexoid' is locked so * that we can fetch and display the conflicting key value. @@ -106,16 +107,47 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - TupleTableSlot *localslot, TupleTableSlot *remoteslot, - Oid indexoid, TransactionId localxmin, + List *conflictSlots, TupleTableSlot *remoteslot, + List *conflictIndexes, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts) { + int conflictNum = 0; + Oid indexoid; Relation localrel = relinfo->ri_RelationDesc; + StringInfoData err_detail; + + initStringInfo(&err_detail); - Assert(!OidIsValid(indexoid) || - CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); + if (!conflictSlots) + errdetail_apply_conflict(estate, relinfo, type, searchslot, + NULL, remoteslot, InvalidOid, + localxmin, localorigin, localts, &err_detail); - pgstat_report_subscription_conflict(MySubscription->oid, type); + foreach_ptr(TupleTableSlot, slot, conflictSlots) + { + indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum)); + + Assert(!OidIsValid(indexoid) || + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); + + if (!localorigin) + GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts); + + /* + * Build the error detail message containing the conflicting key and + * tuple information. The details for each conflict will be appended + * to err_detail. + */ + errdetail_apply_conflict(estate, relinfo, type, searchslot, + slot, remoteslot, indexoid, + localxmin, localorigin, localts, &err_detail); + + conflictNum++; + } + + /* XXX: stats not supported for multiple_unique_conflict in this patch */ + if (type != CT_MULTIPLE_UNIQUE_CONFLICTS) + pgstat_report_subscription_conflict(MySubscription->oid, type); ereport(elevel, errcode_apply_conflict(type), @@ -123,9 +155,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, get_namespace_name(RelationGetNamespace(localrel)), RelationGetRelationName(localrel), ConflictTypeNames[type]), - errdetail_apply_conflict(estate, relinfo, type, searchslot, - localslot, remoteslot, indexoid, - localxmin, localorigin, localts)); + errdetail_internal("%s", err_detail.data)); } /* @@ -169,6 +199,7 @@ errcode_apply_conflict(ConflictType type) { case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: + case CT_MULTIPLE_UNIQUE_CONFLICTS: return errcode(ERRCODE_UNIQUE_VIOLATION); case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: @@ -191,12 +222,13 @@ errcode_apply_conflict(ConflictType type) * replica identity columns, if any. The remote old tuple is excluded as its * information is covered in the replica identity columns. */ -static int +static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts) + RepOriginId localorigin, TimestampTz localts, + StringInfo err_msg) { StringInfoData err_detail; char *val_desc; @@ -209,6 +241,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, { case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: + case CT_MULTIPLE_UNIQUE_CONFLICTS: Assert(OidIsValid(indexoid)); if (localts) @@ -291,7 +324,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, if (val_desc) appendStringInfo(&err_detail, "\n%s", val_desc); - return errdetail_internal("%s", err_detail.data); + appendStringInfo(err_msg, "%s", err_detail.data); } /* @@ -323,7 +356,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, * Report the conflicting key values in the case of a unique constraint * violation. */ - if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS) + if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS || + type == CT_MULTIPLE_UNIQUE_CONFLICTS) { Assert(OidIsValid(indexoid) && localslot); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 31ab69ea13a..6a8d8843b61 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2711,8 +2711,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, slot_store_data(newslot, relmapentry, newtup); ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot, localslot, newslot, - InvalidOid, localxmin, localorigin, localts); + remoteslot, list_make1(localslot), newslot, + list_make1(InvalidOid), localxmin, localorigin, localts); } /* Process and store remote tuple in the slot */ @@ -2742,7 +2742,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, remoteslot, NULL, newslot, - InvalidOid, InvalidTransactionId, + NULL, InvalidTransactionId, InvalidRepOriginId, 0); } @@ -2887,8 +2887,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata, if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, - remoteslot, localslot, NULL, - InvalidOid, localxmin, localorigin, localts); + remoteslot, list_make1(localslot), NULL, + list_make1(InvalidOid), localxmin, localorigin, localts); EvalPlanQualSetSlot(&epqstate, localslot); @@ -2904,7 +2904,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, */ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, remoteslot, NULL, NULL, - InvalidOid, InvalidTransactionId, + NULL, InvalidTransactionId, InvalidRepOriginId, 0); } @@ -3096,7 +3096,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_MISSING, remoteslot_part, NULL, newslot, - InvalidOid, InvalidTransactionId, + NULL, InvalidTransactionId, InvalidRepOriginId, 0); return; @@ -3116,8 +3116,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, slot_store_data(newslot, part_entry, newtup); ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot_part, localslot, newslot, - InvalidOid, localxmin, localorigin, + remoteslot_part, list_make1(localslot), newslot, + list_make1(InvalidOid), localxmin, localorigin, localts); } diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 37454dc9513..2c5a129959d 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -41,6 +41,9 @@ typedef enum /* The row to be deleted is missing */ CT_DELETE_MISSING, + /* The row to be inserted/updated violates multiple unique constraint */ + CT_MULTIPLE_UNIQUE_CONFLICTS, + /* * Other conflicts, such as exclusion constraint violations, involve more * complex rules than simple equality checks. These conflicts are left for @@ -48,6 +51,11 @@ typedef enum */ } ConflictType; +/* + * XXX: Don't increament the CONFLICT_NUM_TYPES, as it is used by subscription + * stats module and this patch does not support stats for + * multiple_unique_conflicts. + */ #define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1) extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, @@ -57,10 +65,9 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - TupleTableSlot *localslot, + List *conflictSlots, TupleTableSlot *remoteslot, - Oid indexoid, TransactionId localxmin, + List *conflictIndexes, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts); extern void InitConflictIndexes(ResultRelInfo *relInfo); - #endif diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index d40b49714f6..05fcdd08f57 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -41,6 +41,7 @@ tests += { 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', 't/034_temporal.pl', + 't/035_multiple_unique_conflicts.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/035_multiple_unique_conflicts.pl b/src/test/subscription/t/035_multiple_unique_conflicts.pl new file mode 100644 index 00000000000..f1417e313db --- /dev/null +++ b/src/test/subscription/t/035_multiple_unique_conflicts.pl @@ -0,0 +1,133 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection of conflict type 'multiple_unique_conflicts'. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +############################### +# Setup +############################### + +# Create a publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create a subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Create a table on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);"); + +# Create same table on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_tab FOR TABLE conf_tab"); + +# Create the subscription +my $appname = 'sub_tab'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION sub_tab + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION pub_tab;"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +################################################## +# INSERT data on Pub and Sub +################################################## + +# Insert data in the publisher table +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (1,1,1);"); + +# Insert data in the subscriber table +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);"); + +################################################## +# Test multiple_unique_conflicts due to INSERT +################################################## +my $log_offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (2,3,4);"); + +# Confirm that this causes an error on the subscriber +$node_subscriber->wait_for_log( + qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/, + $log_offset); + +ok( $node_subscriber->log_contains( + qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./, + $log_offset), + 'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)' +); + +ok( $node_subscriber->log_contains( + qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./, + $log_offset), + 'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)' +); + +ok( $node_subscriber->log_contains( + qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./, + $log_offset), + 'multiple_unique_conflicts detected during insertion for conf_tab_c_key (c) = (4)' +); + +# Truncate table to get rid of the error +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +################################################## +# Test multiple_unique_conflicts due to UPDATE +################################################## +$log_offset = -s $node_subscriber->logfile; + +# Insert data in the publisher table +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (5,5,5);"); + +# Insert data in the subscriber table +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);"); + +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab set a=6, b=7, c=8 where a=5;"); + +# Confirm that this causes an error on the subscriber +$node_subscriber->wait_for_log( + qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/, + $log_offset); + +ok( $node_subscriber->log_contains( + qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\)./, + $log_offset), + 'multiple_unique_conflicts detected during update for conf_tab_pkey (a) = (6)' +); + +ok( $node_subscriber->log_contains( + qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\)./, + $log_offset), + 'multiple_unique_conflicts detected during update for conf_tab_b_key (b) = (7)' +); + +ok( $node_subscriber->log_contains( + qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./, + $log_offset), + 'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)' +); + +done_testing(); -- 2.34.1