From aa3b35f14895ce1e9e184432eefd4ddf73d04090 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 29 Jul 2024 11:14:37 +0800 Subject: [PATCH v8] Detect and log conflicts in logical replication This patch enables the logical replication worker to provide additional logging information in the following conflict scenarios while applying changes: insert_exists: Inserting a row that violates a NOT DEFERRABLE unique constraint. update_exists: The updated row value violates a NOT DEFERRABLE unique constraint. update_differ: Updating a row that was previously modified by another origin. update_missing: The tuple to be updated is missing. delete_missing: The tuple to be deleted is missing. delete_differ: Deleting a row that was previously modified by another origin. For insert_exists and update_exists conflicts, the log can include origin and commit timestamp details of the conflicting key with track_commit_timestamp enabled. update_differ and delete_differ conflicts can only be detected when track_commit_timestamp is enabled. --- doc/src/sgml/logical-replication.sgml | 93 +++++++- src/backend/executor/execIndexing.c | 13 +- src/backend/executor/execReplication.c | 238 ++++++++++++++----- src/backend/executor/nodeModifyTable.c | 5 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/conflict.c | 247 ++++++++++++++++++++ src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/worker.c | 82 +++++-- src/include/executor/executor.h | 1 + src/include/replication/conflict.h | 50 ++++ src/test/subscription/t/001_rep_changes.pl | 10 +- src/test/subscription/t/013_partition.pl | 51 ++-- src/test/subscription/t/029_on_error.pl | 11 +- src/test/subscription/t/030_origin.pl | 47 ++++ src/tools/pgindent/typedefs.list | 1 + 15 files changed, 724 insertions(+), 127 deletions(-) create mode 100644 src/backend/replication/logical/conflict.c create mode 100644 src/include/replication/conflict.h diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index a23a3d57e2..8f69844b6a 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1583,6 +1583,86 @@ test_sub=# SELECT * FROM t1 ORDER BY id; will simply be skipped. + + Additional logging is triggered in the following conflict + scenarios: + + + insert_exists + + + Inserting a row that violates a NOT DEFERRABLE + unique constraint. Note that to obtain the origin and commit + timestamp details of the conflicting key in the log, ensure that + track_commit_timestamp + is enabled. In this scenario, an error will be raised until the + conflict is resolved manually. + + + + + update_exists + + + The updated value of a row violates a NOT DEFERRABLE + unique constraint. Note that to obtain the origin and commit + timestamp details of the conflicting key in the log, ensure that + track_commit_timestamp + is enabled. In this scenario, an error will be raised until the + conflict is resolved manually. Note that when updating a + partitioned table, if the updated row value satisfies another + partition constraint resulting in the row being inserted into a + new partition, the insert_exists conflict may + arise if the new row violates a NOT DEFERRABLE + unique constraint. + + + + + update_differ + + + Updating a row that was previously modified by another origin. + Note that this conflict can only be detected when + track_commit_timestamp + is enabled. Currenly, the update is always applied regardless of + the origin of the local row. + + + + + update_missing + + + The tuple to be updated was not found. The update will simply be + skipped in this scenario. + + + + + delete_missing + + + The tuple to be deleted was not found. The delete will simply be + skipped in this scenario. + + + + + delete_differ + + + Deleting a row that was previously modified by another origin. Note that this + conflict can only be detected when + track_commit_timestamp + is enabled. Currenly, the delete is always applied regardless of + the origin of the local row. + + + + + + Logical replication operations are performed with the privileges of the role which owns the subscription. Permissions failures on target tables will @@ -1609,8 +1689,8 @@ test_sub=# SELECT * FROM t1 ORDER BY id; an error, the replication won't proceed, and the logical replication worker will emit the following kind of message to the subscriber's server log: -ERROR: duplicate key value violates unique constraint "test_pkey" -DETAIL: Key (c)=(1) already exists. +ERROR: conflict insert_exists detected on relation "public.test" +DETAIL: Key (c)=(1) already exists in unique index "t_pkey", which was modified locally in transaction 740 at 2024-06-26 10:47:04.727375+08. CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378 The LSN of the transaction that contains the change violating the constraint and @@ -1636,6 +1716,15 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER Please note that skipping the whole transaction includes skipping changes that might not violate any constraint. This can easily make the subscriber inconsistent. + The additional details regarding conflicting rows, such as their origin and + commit timestamp can be seen in the DETAIL line of the + log. But note that this information is only available when + track_commit_timestamp + is enabled. Users can use these information to make decisions on whether to + retain the local change or adopt the remote alteration. For instance, the + origin in above log indicates that the existing row was modified by a local + change, users can manually perform a remote-change-win resolution by + deleting the local row. diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index 9f05b3654c..bdfc7cf7a6 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative) ii = BuildIndexInfo(indexDesc); /* - * If the indexes are to be used for speculative insertion, add extra - * information required by unique index entries. + * If the indexes are to be used for speculative insertion or conflict + * detection in logical replication, add extra information required by + * unique index entries. */ if (speculative && ii->ii_Unique) BuildSpeculativeIndexInfo(indexDesc, ii); @@ -521,12 +522,16 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, * possible that a conflicting tuple is inserted immediately * after this returns. But this can be used for a pre-check * before insertion. + * + * 'tupleid' should be the TID of the tuple that has been recently + * inserted (or can be invalid if we haven't inserted a new tuple yet). + * This tuple will be excluded from conflict checking. * ---------------------------------------------------------------- */ bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, - List *arbiterIndexes) + ItemPointer tupleid, List *arbiterIndexes) { int i; int numIndices; @@ -629,7 +634,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, satisfiesConstraint = check_exclusion_or_unique_constraint(heapRelation, indexRelation, - indexInfo, &invalidItemPtr, + indexInfo, tupleid, values, isnull, estate, false, CEOUC_WAIT, true, conflictTid); diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd577..f4b605970d 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" +#include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" #include "utils/builtins.h" @@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, return skey_attoff; } + +/* + * Helper function to check if it is necessary to re-fetch and lock the tuple + * due to concurrent modifications. This function should be called after + * invoking table_tuple_lock. + */ +static bool +should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) +{ + bool refetch = false; + + switch (res) + { + case TM_Ok: + break; + case TM_Updated: + /* XXX: Improve handling here */ + if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + refetch = true; + break; + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + refetch = true; + break; + case TM_Invisible: + elog(ERROR, "attempted to lock invisible tuple"); + break; + default: + elog(ERROR, "unexpected table_tuple_lock status: %u", res); + break; + } + + return refetch; +} + /* * Search the relation 'rel' for tuple using the index. * @@ -260,34 +306,8 @@ retry: PopActiveSnapshot(); - switch (res) - { - case TM_Ok: - break; - case TM_Updated: - /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - goto retry; - case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - goto retry; - case TM_Invisible: - elog(ERROR, "attempted to lock invisible tuple"); - break; - default: - elog(ERROR, "unexpected table_tuple_lock status: %u", res); - break; - } + if (should_refetch_tuple(res, &tmfd)) + goto retry; } index_endscan(scan); @@ -444,34 +464,8 @@ retry: PopActiveSnapshot(); - switch (res) - { - case TM_Ok: - break; - case TM_Updated: - /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - goto retry; - case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - goto retry; - case TM_Invisible: - elog(ERROR, "attempted to lock invisible tuple"); - break; - default: - elog(ERROR, "unexpected table_tuple_lock status: %u", res); - break; - } + if (should_refetch_tuple(res, &tmfd)) + goto retry; } table_endscan(scan); @@ -480,6 +474,96 @@ retry: return found; } +/* + * Find the tuple that violates the passed in unique index constraint + * (conflictindex). + * + * If no conflict is found, return false and set *conflictslot to NULL. + * Otherwise return true, and the conflicting tuple is locked and returned in + * *conflictslot. + */ +static bool +FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, + Oid conflictindex, TupleTableSlot *slot, + TupleTableSlot **conflictslot) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + ItemPointerData conflictTid; + TM_FailureData tmfd; + TM_Result res; + + *conflictslot = NULL; + +retry: + if (ExecCheckIndexConstraints(resultRelInfo, slot, estate, + &conflictTid, &slot->tts_tid, + list_make1_oid(conflictindex))) + { + if (*conflictslot) + ExecDropSingleTupleTableSlot(*conflictslot); + + *conflictslot = NULL; + return false; + } + + *conflictslot = table_slot_create(rel, NULL); + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(), + *conflictslot, + GetCurrentCommandId(false), + LockTupleShare, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + if (should_refetch_tuple(res, &tmfd)) + goto retry; + + return true; +} + +/* + * Re-check all the unique indexes in 'recheckIndexes' to see if there are + * potential conflicts with the tuple in 'slot'. + * + * This function is invoked after inserting or updating a tuple that detected + * potential conflict tuples. It attempts to find the tuple that conflicts with + * the provided tuple. This operation may seem redundant with the unique + * violation check of indexam, but since we call this function only when we are + * detecting conflict in logical replication and encountering potential + * conflicts with any unique index constraints (which should not be frequent), + * so it's ok. Moreover, upon detecting a conflict, we will report an ERROR and + * restart the logical replication, so the additional cost of finding the tuple + * should be acceptable. + */ +static void +ReCheckConflictIndexes(ResultRelInfo *resultRelInfo, EState *estate, + ConflictType type, List *recheckIndexes, + TupleTableSlot *slot) +{ + /* Re-check all the unique indexes for potential conflicts */ + foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes) + { + TupleTableSlot *conflictslot; + + if (list_member_oid(recheckIndexes, uniqueidx) && + FindConflictTuple(resultRelInfo, estate, uniqueidx, slot, &conflictslot)) + { + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + + GetTupleCommitTs(conflictslot, &xmin, &origin, &committs); + ReportApplyConflict(ERROR, type, resultRelInfo->ri_RelationDesc, uniqueidx, + xmin, origin, committs, conflictslot); + } + } +} + /* * Insert tuple represented in the slot to the relation, update the indexes, * and execute any constraints and per-row triggers. @@ -509,6 +593,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, if (!skip_tuple) { List *recheckIndexes = NIL; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -525,10 +611,28 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, false, - NULL, NIL, false); + slot, estate, false, + conflictindexes ? true : false, + &conflict, + conflictindexes, false); + + /* + * Rechecks the conflict indexes to fetch the conflicting local tuple + * and reports the conflict. We perform this check here, instead of + * perform an additional index scan before the actual insertion and + * reporting the conflict if any conflicting tuples are found. This is + * to avoid the overhead of executing the extra scan for each INSERT + * operation, even when no conflict arises, which could introduce + * significant overhead to replication, particularly in cases where + * conflicts are rare. + */ + if (conflict) + ReCheckConflictIndexes(resultRelInfo, estate, CT_INSERT_EXISTS, + recheckIndexes, slot); /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, slot, @@ -577,6 +681,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, { List *recheckIndexes = NIL; TU_UpdateIndexes update_indexes; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -593,12 +699,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, simple_table_tuple_update(rel, tid, slot, estate->es_snapshot, &update_indexes); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None)) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, true, false, - NULL, NIL, + slot, estate, true, + conflictindexes ? true : false, + &conflict, conflictindexes, (update_indexes == TU_Summarizing)); + /* + * Refer to the comments above the call to ReCheckConflictIndexes() in + * ExecSimpleRelationInsert to understand why this check is done at + * this point. + */ + if (conflict) + ReCheckConflictIndexes(resultRelInfo, estate, CT_UPDATE_EXISTS, + recheckIndexes, slot); + /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, NULL, NULL, diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 4913e49319..8bf4c80d4a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1019,9 +1019,11 @@ ExecInsert(ModifyTableContext *context, /* Perform a speculative insertion. */ uint32 specToken; ItemPointerData conflictTid; + ItemPointerData invalidItemPtr; bool specConflict; List *arbiterIndexes; + ItemPointerSetInvalid(&invalidItemPtr); arbiterIndexes = resultRelInfo->ri_onConflictArbiterIndexes; /* @@ -1041,7 +1043,8 @@ ExecInsert(ModifyTableContext *context, CHECK_FOR_INTERRUPTS(); specConflict = false; if (!ExecCheckIndexConstraints(resultRelInfo, slot, estate, - &conflictTid, arbiterIndexes)) + &conflictTid, &invalidItemPtr, + arbiterIndexes)) { /* committed conflict tuple found */ if (onconflict == ONCONFLICT_UPDATE) diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index ba03eeff1c..1e08bbbd4e 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ applyparallelworker.o \ + conflict.o \ decode.o \ launcher.o \ logical.o \ diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c new file mode 100644 index 0000000000..287f62f3ba --- /dev/null +++ b/src/backend/replication/logical/conflict.c @@ -0,0 +1,247 @@ +/*------------------------------------------------------------------------- + * conflict.c + * Functionality for detecting and logging conflicts. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/conflict.c + * + * This file contains the code for detecting and logging conflicts on + * the subscriber during logical replication. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/commit_ts.h" +#include "replication/conflict.h" +#include "replication/origin.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" + +const char *const ConflictTypeNames[] = { + [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_EXISTS] = "update_exists", + [CT_UPDATE_DIFFER] = "update_differ", + [CT_UPDATE_MISSING] = "update_missing", + [CT_DELETE_MISSING] = "delete_missing", + [CT_DELETE_DIFFER] = "delete_differ" +}; + +static char *build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot); +static int errdetail_apply_conflict(ConflictType type, Oid conflictidx, + TransactionId localxmin, + RepOriginId localorigin, + TimestampTz localts, + TupleTableSlot *conflictslot); + +/* + * Get the xmin and commit timestamp data (origin and timestamp) associated + * with the provided local tuple. + * + * Return true if the commit timestamp data was found, false otherwise. + */ +bool +GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, TimestampTz *localts) +{ + Datum xminDatum; + bool isnull; + + xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber, + &isnull); + *xmin = DatumGetTransactionId(xminDatum); + Assert(!isnull); + + /* + * The commit timestamp data is not available if track_commit_timestamp is + * disabled. + */ + if (!track_commit_timestamp) + { + *localorigin = InvalidRepOriginId; + *localts = 0; + return false; + } + + return TransactionIdGetCommitTsData(*xmin, localts, localorigin); +} + +/* + * Report a conflict when applying remote changes. + * + * The caller should ensure that the index with the OID 'conflictidx' is + * locked. + */ +void +ReportApplyConflict(int elevel, ConflictType type, Relation localrel, + Oid conflictidx, TransactionId localxmin, + RepOriginId localorigin, TimestampTz localts, + TupleTableSlot *conflictslot) +{ + ereport(elevel, + errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("conflict %s detected on relation \"%s.%s\"", + ConflictTypeNames[type], + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel)), + errdetail_apply_conflict(type, conflictidx, localxmin, localorigin, + localts, conflictslot)); +} + +/* + * Find all unique indexes to check for a conflict and store them into + * ResultRelInfo. + */ +void +InitConflictIndexes(ResultRelInfo *relInfo) +{ + List *uniqueIndexes = NIL; + + for (int i = 0; i < relInfo->ri_NumIndices; i++) + { + Relation indexRelation = relInfo->ri_IndexRelationDescs[i]; + + if (indexRelation == NULL) + continue; + + /* Detect conflict only for unique indexes */ + if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique) + continue; + + /* Don't support conflict detection for deferrable index */ + if (!indexRelation->rd_index->indimmediate) + continue; + + uniqueIndexes = lappend_oid(uniqueIndexes, + RelationGetRelid(indexRelation)); + } + + relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; +} + +/* + * Add an errdetail() line showing conflict detail. + */ +static int +errdetail_apply_conflict(ConflictType type, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot) +{ + switch (type) + { + case CT_INSERT_EXISTS: + case CT_UPDATE_EXISTS: + { + /* + * Build the index value string. If the return value is NULL, + * it indicates that the current user lacks permissions to + * view all the columns involved. + */ + char *index_value = build_index_value_desc(conflictidx, + conflictslot); + + if (index_value && localts) + { + char *origin_name; + + if (localorigin == InvalidRepOriginId) + return errdetail("Key %s already exists in unique index \"%s\", which was modified locally in transaction %u at %s.", + index_value, get_rel_name(conflictidx), + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin \"%s\" in transaction %u at %s.", + index_value, get_rel_name(conflictidx), origin_name, + localxmin, timestamptz_to_str(localts)); + + /* + * The origin which modified the row has been dropped. + * This situation may occur if the origin was created by a + * different apply worker, but its associated subscription + * and origin were dropped after updating the row, or if + * the origin was manually dropped by the user. + */ + else + return errdetail("Key %s already exists in unique index \"%s\", which was modified by a non-existent origin in transaction %u at %s.", + index_value, get_rel_name(conflictidx), + localxmin, timestamptz_to_str(localts)); + } + else if (index_value && !localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", + index_value, get_rel_name(conflictidx), localxmin); + else + return errdetail("Key already exists in unique index \"%s\".", + get_rel_name(conflictidx)); + } + case CT_UPDATE_DIFFER: + { + char *origin_name; + + if (localorigin == InvalidRepOriginId) + return errdetail("Updating a row that was modified locally in transaction %u at %s.", + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + return errdetail("Updating a row that was modified by a different origin \"%s\" in transaction %u at %s.", + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin which modified the row has been dropped */ + else + return errdetail("Updating a row that was modified by a non-existent origin in transaction %u at %s.", + localxmin, timestamptz_to_str(localts)); + } + + case CT_UPDATE_MISSING: + return errdetail("Did not find the row to be updated."); + case CT_DELETE_MISSING: + return errdetail("Did not find the row to be deleted."); + case CT_DELETE_DIFFER: + { + char *origin_name; + + if (localorigin == InvalidRepOriginId) + return errdetail("Deleting a row that was modified by locally in transaction %u at %s.", + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + return errdetail("Deleting a row that was modified by a different origin \"%s\" in transaction %u at %s.", + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin which modified the row has been dropped */ + else + return errdetail("Deleting a row that was modified by a non-existent origin in transaction %u at %s.", + localxmin, timestamptz_to_str(localts)); + } + + } + + return 0; /* silence compiler warning */ +} + +/* + * Helper functions to construct a string describing the contents of an index + * entry. See BuildIndexValueDescription for details. + * + * The caller should ensure that the index with the OID 'conflictidx' is + * locked. + */ +static char * +build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot) +{ + char *conflict_row; + Relation indexDesc; + + if (!conflictslot) + return NULL; + + indexDesc = index_open(indexoid, NoLock); + + slot_getallattrs(conflictslot); + + conflict_row = BuildIndexValueDescription(indexDesc, + conflictslot->tts_values, + conflictslot->tts_isnull); + + index_close(indexDesc, NoLock); + + return conflict_row; +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 3dec36a6de..3d36249d8a 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -2,6 +2,7 @@ backend_sources += files( 'applyparallelworker.c', + 'conflict.c', 'decode.c', 'launcher.c', 'logical.c', diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ec96b5fe85..0541e8a165 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -167,6 +167,7 @@ #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" #include "postmaster/walwriter.h" +#include "replication/conflict.h" #include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" @@ -2458,7 +2459,8 @@ apply_handle_insert_internal(ApplyExecutionData *edata, EState *estate = edata->estate; /* We must open indexes here. */ - ExecOpenIndices(relinfo, false); + ExecOpenIndices(relinfo, true); + InitConflictIndexes(relinfo); /* Do the insert. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); @@ -2646,7 +2648,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, MemoryContext oldctx; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(relinfo, false); + ExecOpenIndices(relinfo, true); found = FindReplTupleInLocalRel(edata, localrel, &relmapentry->remoterel, @@ -2661,6 +2663,19 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * Check whether the local tuple was modified by a different origin. + * If detected, report the conflict. + */ + if (GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(LOG, CT_UPDATE_DIFFER, localrel, InvalidOid, + localxmin, localorigin, localts, NULL); + /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_modify_data(remoteslot, localslot, relmapentry, newtup); @@ -2668,6 +2683,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, EvalPlanQualSetSlot(&epqstate, remoteslot); + InitConflictIndexes(relinfo); + /* Do the actual update. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, @@ -2678,13 +2695,9 @@ apply_handle_update_internal(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + ReportApplyConflict(LOG, CT_UPDATE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -2807,6 +2820,24 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* If found delete it. */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * Check whether the local tuple was modified by a different origin. + * If detected, report the conflict. + * + * For cross-partition update, we skip detecting the delete_differ + * conflict since it should have been done in + * apply_handle_tuple_routing(). + */ + if ((!edata->mtstate || edata->mtstate->operation != CMD_UPDATE) && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(LOG, CT_DELETE_DIFFER, localrel, InvalidOid, + localxmin, localorigin, localts, NULL); + EvalPlanQualSetSlot(&epqstate, localslot); /* Do the actual delete. */ @@ -2818,13 +2849,9 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be deleted " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + ReportApplyConflict(LOG, CT_DELETE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -2991,6 +3018,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ResultRelInfo *partrelinfo_new; Relation partrel_new; bool found; + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3002,16 +3032,25 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing * except for emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation's partition \"%s\"", - RelationGetRelationName(partrel)); + ReportApplyConflict(LOG, CT_UPDATE_MISSING, + partrel, InvalidOid, + InvalidTransactionId, + InvalidRepOriginId, 0, NULL); + return; } + /* + * Check whether the local tuple was modified by a different + * origin. If detected, report the conflict. + */ + if (GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(LOG, CT_UPDATE_DIFFER, partrel, + InvalidOid, localxmin, localorigin, + localts, NULL); + /* * Apply the update to the local tuple, putting the result in * remoteslot_part. @@ -3039,7 +3078,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, EPQState epqstate; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(partrelinfo, false); + ExecOpenIndices(partrelinfo, true); + InitConflictIndexes(partrelinfo); EvalPlanQualSetSlot(&epqstate, remoteslot_part); TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 9770752ea3..3d5383c056 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -636,6 +636,7 @@ extern List *ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, extern bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, + ItemPointer tupleid, List *arbiterIndexes); extern void check_exclusion_constraint(Relation heap, Relation index, IndexInfo *indexInfo, diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h new file mode 100644 index 0000000000..3a7260d3c1 --- /dev/null +++ b/src/include/replication/conflict.h @@ -0,0 +1,50 @@ +/*------------------------------------------------------------------------- + * conflict.h + * Exports for conflict detection and log + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef CONFLICT_H +#define CONFLICT_H + +#include "access/xlogdefs.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" +#include "utils/relcache.h" +#include "utils/timestamp.h" + +/* + * Conflict types that could be encountered when applying remote changes. + */ +typedef enum +{ + /* The row to be inserted violates unique constraint */ + CT_INSERT_EXISTS, + + /* The updated row value violates unique constraint */ + CT_UPDATE_EXISTS, + + /* The row to be updated was modified by a different origin */ + CT_UPDATE_DIFFER, + + /* The row to be updated is missing */ + CT_UPDATE_MISSING, + + /* The row to be deleted is missing */ + CT_DELETE_MISSING, + + /* The row to be deleted was modified by a different origin */ + CT_DELETE_DIFFER, +} ConflictType; + +extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, TimestampTz *localts); +extern void ReportApplyConflict(int elevel, ConflictType type, + Relation localrel, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot); +extern void InitConflictIndexes(ResultRelInfo *relInfo); + +#endif diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 471e981962..79cbed2e5b 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -331,12 +331,6 @@ is( $result, qq(1|bar 2|baz), 'update works with REPLICA IDENTITY FULL and a primary key'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); -$node_subscriber->reload; - $node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); # Note that the current location of the log file is not grabbed immediately @@ -352,10 +346,10 @@ $node_publisher->wait_for_catchup('tap_sub'); my $logfile = slurp_file($node_subscriber->logfile, $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/, + qr/conflict update_missing detected on relation "public.tab_full_pk".*\n.*DETAIL:.* Did not find the row to be updated./m, 'update target row is missing'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/, + qr/conflict delete_missing detected on relation "public.tab_full_pk".*\n.*DETAIL:.* Did not find the row to be deleted./m, 'delete target row is missing'); $node_subscriber->append_conf('postgresql.conf', diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 29580525a9..896985d85b 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -343,13 +343,6 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); is($result, qq(), 'truncate of tab1 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; - $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); @@ -372,22 +365,18 @@ $node_publisher->wait_for_catchup('sub2'); my $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/, + qr/conflict update_missing detected on relation "public.tab1_2_2".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/, + qr/conflict delete_missing detected on relation "public.tab1_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/, + qr/conflict delete_missing detected on relation "public.tab1_2_2".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/, + qr/conflict delete_missing detected on relation "public.tab1_def".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_def'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; - # Tests for replication using root table identity and schema # publisher @@ -773,13 +762,6 @@ pub_tab2|3|yyy pub_tab2|5|zzz xxx_c|6|aaa), 'inserts into tab2 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; - $node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); # Note that the current location of the log file is not grabbed immediately @@ -796,15 +778,30 @@ $node_publisher->wait_for_catchup('sub2'); $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/, + qr/conflict update_missing detected on relation "public.tab2_1".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab2_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, + qr/conflict delete_missing detected on relation "public.tab2_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab2_1'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; +# Enable the track_commit_timestamp to detect the conflict when attempting +# to update a row that was previously modified by a different origin. +$node_subscriber1->append_conf('postgresql.conf', 'track_commit_timestamp = on'); +$node_subscriber1->restart; + +$node_subscriber1->safe_psql('postgres', "INSERT INTO tab2 VALUES (3, 'yyy')"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET b = 'quux' WHERE a = 3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); + +$logfile = slurp_file($node_subscriber1->logfile(), $log_location); +ok( $logfile =~ + qr/conflict update_differ detected on relation "public.tab2_1".*\n.*DETAIL:.* Updating a row that was modified locally in transaction [0-9]+ at .*/, + 'updating a tuple that was modified by a different origin'); + +$node_subscriber1->append_conf('postgresql.conf', 'track_commit_timestamp = off'); +$node_subscriber1->restart; # Test that replication continues to work correctly after altering the # partition of a partitioned target table. diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index 0ab57a4b5b..ac7c8bbe7c 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -30,7 +30,7 @@ sub test_skip_lsn # ERROR with its CONTEXT when retrieving this information. my $contents = slurp_file($node_subscriber->logfile, $offset); $contents =~ - qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m + qr/conflict .* detected on relation "public.tbl".*\n.*DETAIL:.* Key \(i\)=\(\d+\) already exists in unique index "tbl_pkey", which was modified by .*origin.* transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m or die "could not get error-LSN"; my $lsn = $1; @@ -83,6 +83,7 @@ $node_subscriber->append_conf( 'postgresql.conf', qq[ max_prepared_transactions = 10 +track_commit_timestamp = on ]); $node_subscriber->start; @@ -93,6 +94,7 @@ $node_publisher->safe_psql( 'postgres', qq[ CREATE TABLE tbl (i INT, t BYTEA); +ALTER TABLE tbl REPLICA IDENTITY FULL; INSERT INTO tbl VALUES (1, NULL); ]); $node_subscriber->safe_psql( @@ -144,13 +146,14 @@ COMMIT; test_skip_lsn($node_publisher, $node_subscriber, "(2, NULL)", "2", "test skipping transaction"); -# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and -# PREPARE the transaction, raising an error. Then skip the transaction. +# Test for PREPARE and COMMIT PREPARED. Update the data and PREPARE the +# transaction, raising an error on the subscriber due to violation of the +# unique constraint on tbl. Then skip the transaction. $node_publisher->safe_psql( 'postgres', qq[ BEGIN; -INSERT INTO tbl VALUES (1, NULL); +UPDATE tbl SET i = 2; PREPARE TRANSACTION 'gtx'; COMMIT PREPARED 'gtx'; ]); diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 056561f008..5a22413464 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -27,9 +27,14 @@ my $stderr; my $node_A = PostgreSQL::Test::Cluster->new('node_A'); $node_A->init(allows_streaming => 'logical'); $node_A->start; + # node_B my $node_B = PostgreSQL::Test::Cluster->new('node_B'); $node_B->init(allows_streaming => 'logical'); + +# Enable the track_commit_timestamp to detect the conflict when attempting to +# update a row that was previously modified by a different origin. +$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = on'); $node_B->start; # Create table on node_A @@ -139,6 +144,48 @@ is($result, qq(), 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none' ); +############################################################################### +# Check that the conflict can be detected when attempting to update or +# delete a row that was previously modified by a different source. +############################################################################### + +$node_B->safe_psql('postgres', "DELETE FROM tab;"); + +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); + +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(32), 'The node_A data replicated to node_B'); + +# The update should update the row on node B that was inserted by node A. +$node_C->safe_psql('postgres', "UPDATE tab SET a = 33 WHERE a = 32;"); + +$node_B->wait_for_log( + qr/conflict update_differ detected on relation "public.tab".*\n.*DETAIL:.* Updating a row that was modified by a different origin ".*" in transaction [0-9]+ at .*/ +); + +$node_B->safe_psql('postgres', "DELETE FROM tab;"); +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (33);"); + +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(33), 'The node_A data replicated to node_B'); + +# The delete should remove the row on node B that was inserted by node A. +$node_C->safe_psql('postgres', "DELETE FROM tab WHERE a = 33;"); + +$node_B->wait_for_log( + qr/conflict delete_differ detected on relation "public.tab".*\n.*DETAIL:.* Deleting a row that was modified by a different origin ".*" in transaction [0-9]+ at .*/ +); + +# The remaining tests no longer test conflict detection. +$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = off'); +$node_B->restart; + ############################################################################### # Specifying origin = NONE indicates that the publisher should only replicate the # changes that are generated locally from node_B, but in this case since the diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b4d7f9217c..2098ed7467 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -467,6 +467,7 @@ ConditionVariableMinimallyPadded ConditionalStack ConfigData ConfigVariable +ConflictType ConnCacheEntry ConnCacheKey ConnParams -- 2.31.1