From 82c2eadc602ae4f01dfa09dd943bad2ab600cdcd Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Fri, 26 Dec 2025 20:08:11 +0530 Subject: [PATCH v17 2/6] Implement the conflict insertion infrastructure for the conflict log table This patch introduces the core logic to populate the conflict log table whenever a logical replication conflict is detected. It captures the remote transaction details along with the corresponding local state at the time of the conflict. Handling Multi-row Conflicts: A single remote tuple may conflict with multiple local tuples (e.g., in the case of multiple_unique_conflicts). To handle this, the infrastructure creates a single row in the conflict log table for each remote tuple. The details of all conflicting local rows are aggregated into a single JSON array in the local_conflicts column. The JSON array uses the following structured format: [ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1", "key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ] Example of querying the structured conflict data: SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid, local_conflicts[1] ->> 'tuple' AS local_tuple FROM myschema.conflict_log_history2; remote_xid | relname | remote_origin | local_xid | local_tuple ------------+----------+---------------+-----------+--------------------- 760 | test | pg_16406 | 771 | {"a":1,"b":10} 765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2} --- src/backend/replication/logical/conflict.c | 603 ++++++++++++++++++- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/worker.c | 32 +- src/include/replication/conflict.h | 4 +- src/include/replication/worker_internal.h | 7 + src/test/subscription/t/037_conflict_dest.pl | 190 ++++++ 6 files changed, 805 insertions(+), 32 deletions(-) create mode 100644 src/test/subscription/t/037_conflict_dest.pl diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 16695592265..98080fd2db0 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,13 +15,20 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" #include "access/tableam.h" +#include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "funcapi.h" #include "pgstat.h" #include "replication/conflict.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/jsonb.h" static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -34,6 +41,19 @@ static const char *const ConflictTypeNames[] = { [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; +/* Schema for the elements within the 'local_conflicts' JSON array */ +static const ConflictLogColumnDef LocalConflictSchema[] = +{ + {.attname = "xid",.atttypid = XIDOID}, + {.attname = "commit_ts",.atttypid = TIMESTAMPTZOID}, + {.attname = "origin",.atttypid = TEXTOID}, + {.attname = "key",.atttypid = JSONOID}, + {.attname = "tuple",.atttypid = JSONOID} +}; + +#define MAX_LOCAL_CONFLICT_INFO_ATTRS \ + (sizeof(LocalConflictSchema) / sizeof(LocalConflictSchema[0])) + static int errcode_apply_conflict(ConflictType type); static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, @@ -50,8 +70,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid); +static void build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull); static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot); +static Datum tuple_table_slot_to_indextup_json(EState *estate, + Relation localrel, + Oid replica_index, + TupleTableSlot *slot); +static TupleDesc build_conflict_tupledesc(void); +static Datum build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples); +static void prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot); /* * Get the xmin and commit timestamp data (origin and timestamp) associated @@ -107,9 +146,17 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, { Relation localrel = relinfo->ri_RelationDesc; StringInfoData err_detail; + ConflictLogDest dest; + Relation conflictlogrel; initStringInfo(&err_detail); + /* + * Get both the conflict log destination and the opened conflict log + * relation for insertion. + */ + conflictlogrel = GetConflictLogTableInfo(&dest); + /* Form errdetail message by combining conflicting tuples information. */ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) errdetail_apply_conflict(estate, relinfo, type, searchslot, @@ -120,15 +167,62 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, conflicttuple->ts, &err_detail); + /* Insert to table if destination is 'table' or 'all' */ + if (conflictlogrel) + { + Assert((dest & CONFLICT_LOG_DEST_TABLE) != 0); + + if (ValidateConflictLogTable(conflictlogrel)) + { + /* + * Prepare the conflict log tuple. If the error level is below + * ERROR, insert it immediately. Otherwise, defer the insertion to + * a new transaction after the current one aborts, ensuring the + * insertion of the log tuple is not rolled back. + */ + prepare_conflict_log_tuple(estate, + relinfo->ri_RelationDesc, + conflictlogrel, + type, + searchslot, + conflicttuples, + remoteslot); + if (elevel < ERROR) + InsertConflictLogTuple(conflictlogrel); + } + + table_close(conflictlogrel, RowExclusiveLock); + } + pgstat_report_subscription_conflict(MySubscription->oid, type); - ereport(elevel, - errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), - ConflictTypeNames[type]), - errdetail_internal("%s", err_detail.data)); + /* Decide what detail to show in server logs. */ + if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL) + { + /* Standard reporting with full internal details. */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s.%s\": conflict=%s", + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel), + ConflictTypeNames[type]), + errdetail_internal("%s", err_detail.data)); + } + else + { + /* + * 'table' only: Report the error msg but omit raw tuple data from + * server logs since it's already captured in the internal table. + */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s.%s\": conflict=%s", + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel), + ConflictTypeNames[type]), + errdetail("Conflict details logged to internal table with OID %u.", + MySubscription->conflictlogrelid)); + } } /* @@ -162,6 +256,143 @@ InitConflictIndexes(ResultRelInfo *relInfo) relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; } +/* + * GetConflictLogTableInfo + * + * Fetches conflict logging metadata from the cached MySubscription pointer. + * Sets the destination enum in *log_dest and, if applicable, opens and + * returns the relation handle for the internal log table. + */ +Relation +GetConflictLogTableInfo(ConflictLogDest *log_dest) +{ + Oid conflictlogrelid; + Relation conflictlogrel = NULL; + + /* + * Convert the text log destination to the internal enum. MySubscription + * already contains the data from pg_subscription. + */ + *log_dest = GetLogDestination(MySubscription->conflictlogdest); + conflictlogrelid = MySubscription->conflictlogrelid; + + /* If destination is 'log' only, no table to open. */ + if (*log_dest == CONFLICT_LOG_DEST_LOG) + return NULL; + + Assert(OidIsValid(conflictlogrelid)); + + conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock); + + /* Conflict log table is dropped or not accessible. */ + if (conflictlogrel == NULL) + ereport(WARNING, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("conflict log table with OID %u does not exist", + conflictlogrelid))); + + return conflictlogrel; +} + +/* + * InsertConflictLogTuple + * + * Insert conflict log tuple into the conflict log table. It uses + * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple + * inserted into the conflict log table. + */ +void +InsertConflictLogTuple(Relation conflictlogrel) +{ + int options = HEAP_INSERT_NO_LOGICAL; + + /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ + Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + + heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, + GetCurrentCommandId(true), options, NULL); + + /* Free conflict log tuple. */ + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; +} + +/* + * ValidateConflictLogTable - Validate conflict log table schema + * + * Checks whether the table definition including its column names, data + * types, and column ordering meet the requirements for conflict log + * table. + */ +bool +ValidateConflictLogTable(Relation rel) +{ + Relation pg_attribute; + HeapTuple atup; + ScanKeyData scankey; + SysScanDesc scan; + int attcnt = 0; + bool tbl_ok = true; + + pg_attribute = table_open(AttributeRelationId, AccessShareLock); + ScanKeyInit(&scankey, + Anum_pg_attribute_attrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true, + SnapshotSelf, 1, &scankey); + + /* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */ + while (HeapTupleIsValid(atup = systable_getnext(scan))) + { + const ConflictLogColumnDef *expected; + int schema_idx; + Form_pg_attribute attForm = (Form_pg_attribute) GETSTRUCT(atup); + + /* Skip system columns and dropped columns */ + if (attForm->attnum < 1 || attForm->attisdropped) + continue; + + attcnt++; + + /* attnum 1 corresponds to index 0 in ConflictLogSchema */ + schema_idx = attForm->attnum - 1; + + /* Check against the central schema definition */ + if (schema_idx >= MAX_CONFLICT_ATTR_NUM) + { + /* Found an extra column beyond the required set */ + tbl_ok = false; + break; + } + + expected = &ConflictLogSchema[schema_idx]; + + if (attForm->atttypid != expected->atttypid || + strcmp(NameStr(attForm->attname), expected->attname) != 0) + { + tbl_ok = false; + break; + } + } + + systable_endscan(scan); + table_close(pg_attribute, AccessShareLock); + + if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok) + { + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("conflict log table \"%s.%s\" structure changed, skipping insertion", + get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel))); + return false; + } + + return true; +} + /* * Add SQLSTATE error code to the current conflict report. */ @@ -472,6 +703,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, return tuple_value.data; } +/* + * Helper function to extract the "raw" index key Datums and their null flags + * from a TupleTableSlot, given an already open index descriptor. + * This is the reusable core logic. + */ +static void +build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull) +{ + TupleTableSlot *tableslot = slot; + + /* + * If the slot is a virtual slot, copy it into a heap tuple slot as + * FormIndexDatum only works with heap tuple slots. + */ + if (TTS_IS_VIRTUAL(slot)) + { + /* Slot is created within the EState's tuple table */ + tableslot = table_slot_create(localrel, &estate->es_tupleTable); + tableslot = ExecCopySlot(tableslot, slot); + } + + /* + * Initialize ecxt_scantuple for potential use in FormIndexDatum + */ + GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + + /* Form the index datums */ + FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, + isnull); +} + /* * Helper functions to construct a string describing the contents of an index * entry. See BuildIndexValueDescription for details. @@ -487,41 +752,319 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Relation indexDesc; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - TupleTableSlot *tableslot = slot; - if (!tableslot) + if (!slot) return NULL; Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); indexDesc = index_open(indexoid, NoLock); - /* - * If the slot is a virtual slot, copy it into a heap tuple slot as - * FormIndexDatum only works with heap tuple slots. - */ - if (TTS_IS_VIRTUAL(slot)) + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + + index_value = BuildIndexValueDescription(indexDesc, values, isnull); + + index_close(indexDesc, NoLock); + + return index_value; +} + +/* + * tuple_table_slot_to_json_datum + * + * Helper function to convert a TupleTableSlot to Jsonb. + */ +static Datum +tuple_table_slot_to_json_datum(TupleTableSlot *slot) +{ + HeapTuple tuple; + Datum datum; + Datum json; + + Assert(slot != NULL); + + tuple = ExecCopySlotHeapTuple(slot); + datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor); + + json = DirectFunctionCall1(row_to_json, datum); + heap_freetuple(tuple); + + return json; +} + +/* + * tuple_table_slot_to_indextup_json + * + * Fetch replica identity key from the tuple table slot and convert into a + * jsonb datum. + */ +static Datum +tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, + Oid indexid, TupleTableSlot *slot) +{ + Relation indexDesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + HeapTuple tuple; + TupleDesc tupdesc; + Datum datum; + + Assert(slot != NULL); + + Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true)); + + indexDesc = index_open(indexid, NoLock); + + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + tupdesc = RelationGetDescr(indexDesc); + + /* Bless the tupdesc so it can be looked up by row_to_json. */ + BlessTupleDesc(tupdesc); + + /* Form the replica identity tuple. */ + tuple = heap_form_tuple(tupdesc, values, isnull); + datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + index_close(indexDesc, NoLock); + + /* Convert to a JSONB datum. */ + return DirectFunctionCall1(row_to_json, datum); +} + +static TupleDesc +build_conflict_tupledesc(void) +{ + TupleDesc tupdesc; + + tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS); + + for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++) + TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1), + LocalConflictSchema[i].attname, + LocalConflictSchema[i].atttypid, + -1, 0); + + BlessTupleDesc(tupdesc); + + return tupdesc; +} + +/* + * Builds the local conflicts JSONB array column from the list of + * ConflictTupleInfo objects. + * + * Example output structure: + * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] + */ +static Datum +build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples) +{ + ListCell *lc; + List *json_datums = NIL; /* List to hold the row_to_json results + * (type json) */ + Datum *json_datum_array; + bool *json_null_array; + Datum json_array_datum; + int num_conflicts; + int i; + int16 typlen; + bool typbyval; + char typalign; + TupleDesc tupdesc; + + /* Build local conflicts tuple descriptor. */ + tupdesc = build_conflict_tupledesc(); + + /* Process local conflict tuple list and prepare an array of JSON. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { - tableslot = table_slot_create(localrel, &estate->es_tupleTable); - tableslot = ExecCopySlot(tableslot, slot); + Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; + bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; + char *origin_name = NULL; + HeapTuple tuple; + Datum json_datum; + int attno; + + attno = 0; + values[attno++] = TransactionIdGetDatum(conflicttuple->xmin); + + if (conflicttuple->ts) + values[attno++] = TimestampTzGetDatum(conflicttuple->ts); + else + nulls[attno++] = true; + + if (conflicttuple->origin != InvalidRepOriginId) + replorigin_by_oid(conflicttuple->origin, true, &origin_name); + + /* Store empty string if origin name for the tuple is NULL. */ + if (origin_name != NULL) + values[attno++] = CStringGetTextDatum(origin_name); + else + nulls[attno++] = true; + + /* + * Add the conflicting key values in the case of a unique constraint + * violation. + */ + if (conflict_type == CT_INSERT_EXISTS || + conflict_type == CT_UPDATE_EXISTS || + conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS) + { + Oid indexoid = conflicttuple->indexoid; + + Assert(OidIsValid(indexoid) && conflicttuple->slot && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, + true)); + values[attno++] = + tuple_table_slot_to_indextup_json(estate, rel, + indexoid, + conflicttuple->slot); + } + else + nulls[attno++] = true; + + /* Convert conflicting tuple to JSON datum. */ + if (conflicttuple->slot) + values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot); + else + nulls[attno] = true; + + Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + json_datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + /* + * Build the higher level JSON datum in format described in function + * header. + */ + json_datum = DirectFunctionCall1(row_to_json, json_datum); + + /* Done with the temporary tuple. */ + heap_freetuple(tuple); + + /* Add to the array element. */ + json_datums = lappend(json_datums, (void *) json_datum); } - /* - * Initialize ecxt_scantuple for potential use in FormIndexDatum when - * index expressions are present. - */ - GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + num_conflicts = list_length(json_datums); - /* - * The values/nulls arrays passed to BuildIndexValueDescription should be - * the results of FormIndexDatum, which are the "raw" input to the index - * AM. - */ - FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull); + json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum)); + json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool)); - index_value = BuildIndexValueDescription(indexDesc, values, isnull); + i = 0; + foreach(lc, json_datums) + { + json_datum_array[i] = (Datum) lfirst(lc); + i++; + } - index_close(indexDesc, NoLock); + /* Construct the json[] array Datum. */ + get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign); + json_array_datum = PointerGetDatum(construct_array(json_datum_array, + num_conflicts, + JSONOID, + typlen, + typbyval, + typalign)); + pfree(json_datum_array); + pfree(json_null_array); + + return json_array_datum; +} - return index_value; +/* + * prepare_conflict_log_tuple + * + * This routine prepares a tuple detailing a conflict encountered during + * logical replication. The prepared tuple will be stored in + * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the + * conflict log table by calling InsertConflictLogTuple. + */ +static void +prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot) +{ + Datum values[MAX_CONFLICT_ATTR_NUM] = {0}; + bool nulls[MAX_CONFLICT_ATTR_NUM] = {0}; + int attno; + char *remote_origin = NULL; + MemoryContext oldctx; + + Assert(MyLogicalRepWorker->conflict_log_tuple == NULL); + + /* Populate the values and nulls arrays. */ + attno = 0; + values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel)); + + values[attno++] = + CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel))); + + values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel)); + + values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); + + if (TransactionIdIsValid(remote_xid)) + values[attno++] = TransactionIdGetDatum(remote_xid); + else + nulls[attno++] = true; + + values[attno++] = LSNGetDatum(remote_final_lsn); + + if (remote_commit_ts > 0) + values[attno++] = TimestampTzGetDatum(remote_commit_ts); + else + nulls[attno++] = true; + + if (replorigin_session_origin != InvalidRepOriginId) + replorigin_by_oid(replorigin_session_origin, true, &remote_origin); + + if (remote_origin != NULL) + values[attno++] = CStringGetTextDatum(remote_origin); + else + nulls[attno++] = true; + + if (!TupIsNull(searchslot)) + { + Oid replica_index = GetRelationIdentityOrPK(rel); + + /* + * If the table has a valid replica identity index, build the index + * json datum from key value. Otherwise, construct it from the + * complete tuple in REPLICA IDENTITY FULL cases. + */ + if (OidIsValid(replica_index)) + values[attno++] = tuple_table_slot_to_indextup_json(estate, rel, + replica_index, + searchslot); + else + values[attno++] = tuple_table_slot_to_json_datum(searchslot); + } + else + nulls[attno++] = true; + + if (!TupIsNull(remoteslot)) + values[attno++] = tuple_table_slot_to_json_datum(remoteslot); + else + nulls[attno++] = true; + + values[attno] = build_local_conflicts_json_array(estate, rel, + conflict_type, + conflicttuples); + + Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM); + + oldctx = MemoryContextSwitchTo(ApplyContext); + MyLogicalRepWorker->conflict_log_tuple = + heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3991e1495d4..bc7e1d9ebde 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -477,6 +477,7 @@ retry: worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->conflict_log_tuple = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 718408bb599..cf1008c93dc 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -482,7 +482,9 @@ static bool MySubscriptionValid = false; static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +TransactionId remote_xid = InvalidTransactionId; +TimestampTz remote_commit_ts = 0; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; + remote_commit_ts = begin_data.committime; + remote_xid = begin_data.xid; maybe_start_skipping_changes(begin_data.final_lsn); @@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s) /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -5609,6 +5617,28 @@ start_apply(XLogRecPtr origin_startpos) pgstat_report_subscription_error(MySubscription->oid, MyLogicalRepWorker->type); + /* + * Insert any pending conflict log tuple under a new transaction. + */ + if (MyLogicalRepWorker->conflict_log_tuple != NULL) + { + Relation conflictlogrel; + ConflictLogDest dest; + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Open conflict log table and insert the tuple. */ + conflictlogrel = GetConflictLogTableInfo(&dest); + if (ValidateConflictLogTable(conflictlogrel)) + InsertConflictLogTuple(conflictlogrel); + MyLogicalRepWorker->conflict_log_tuple = NULL; + table_close(conflictlogrel, RowExclusiveLock); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + PG_RE_THROW(); } } diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index e722df2d015..694e0ba26ee 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -132,7 +132,6 @@ static const ConflictLogColumnDef ConflictLogSchema[] = {.attname = "local_conflicts",.atttypid = JSONARRAYOID} }; -/* Define the count using the array size */ #define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0])) extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, @@ -145,4 +144,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, List *conflicttuples); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern Relation GetConflictLogTableInfo(ConflictLogDest *log_dest); +extern void InsertConflictLogTuple(Relation conflictlogrel); +extern bool ValidateConflictLogTable(Relation rel); #endif diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f081619f151..8ff556cc558 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -101,6 +101,9 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* A conflict log tuple that is prepared but not yet inserted. */ + HeapTuple conflict_log_tuple; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern XLogRecPtr remote_final_lsn; +extern TimestampTz remote_commit_ts; +extern TransactionId remote_xid; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, diff --git a/src/test/subscription/t/037_conflict_dest.pl b/src/test/subscription/t/037_conflict_dest.pl new file mode 100644 index 00000000000..ceccd74b34a --- /dev/null +++ b/src/test/subscription/t/037_conflict_dest.pl @@ -0,0 +1,190 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test conflicts in logical replication +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +############################### +# Setup +############################### + +# Create a publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create a subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create a table on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);" +); + +# Create same table on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE conf_tab (a int PRIMARY key, b int UNIQUE, c int UNIQUE);"); + +$node_subscriber->safe_psql( + 'postgres', qq[ + CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int, c int, unique(a,b)) PARTITION BY RANGE (a); + CREATE TABLE conf_tab_2_p1 PARTITION OF conf_tab_2 FOR VALUES FROM (MINVALUE) TO (100); +]); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_tab FOR TABLE conf_tab, conf_tab_2"); + +# Create the subscription +my $appname = 'sub_tab'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION sub_tab + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION pub_tab WITH (conflict_log_destination=table)"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +################################################## +# INSERT data on Pub and Sub +################################################## + +# Insert data in the publisher table +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (1,1,1);"); + +# Insert data in the subscriber table +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);"); + +############################################################################### +# Test conflict insertion into the internal conflict log table +############################################################################### + +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (10, 10, 10);"); + +# Get the internally generated table name +my $subid = $node_subscriber->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); +my $conflict_table = "pg_conflict.conflict_log_table_$subid"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (10, 20, 30);"); + +# Wait for the conflict to be logged +my $log_check = $node_subscriber->poll_query_until('postgres', + "SELECT count(*) > 0 FROM $conflict_table;"); + +is($log_check, 1, 'Conflict was successfully logged to the internal table'); + +my $json_query = qq[ + SELECT string_agg((unnested.j::json)->'key'->>'a', ',') + FROM ( + SELECT unnest(local_conflicts) AS j + FROM $conflict_table + ) AS unnested; +]; + +my $all_keys = $node_subscriber->safe_psql('postgres', $json_query); + +# Verify that '10' is present in the resulting string +like($all_keys, qr/10/, + 'Verified that key 10 exists in the local_conflicts log'); + +pass('Conflict type and data successfully validated in internal table'); + +# Final cleanup for subsequent bidirectional tests in the script +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +############################################################################### +# Test Case: update_missing +############################################################################### + +# Sync a row, then delete it locally on subscriber +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (50, 50, 50);"); +$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE a = 50;"); + +# Trigger conflict by updating that row on publisher +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET b = 500 WHERE a = 50;"); + +# Wait for the apply worker to detect the missing row and log it +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'update_missing';" +) or die "Timed out waiting for update_missing conflict"; + +my $upd_miss_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM $conflict_table WHERE conflict_type = 'update_missing';" +); +is($upd_miss_check, 1, + 'Verified update_missing conflict logged to internal table'); + +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +############################################################################### +# Test Case: insert_exists (via secondary unique index) +############################################################################### + +# 1. Subscriber has a row with b=100 +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (100, 100, 100);"); + +# 2. Publisher inserts a NEW PK (101) but a DUPLICATE 'b' (100) +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (101, 100, 101);"); + +# 3. Verify it appears as 'insert_exists' in your log table +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'insert_exists' AND local_conflicts::text LIKE '%100%';" +) or die "Timed out waiting for secondary index insert_exists conflict"; + +pass('Logged insert_exists triggered by secondary unique index violation'); + +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +############################################################################### +# CASE 3: Switching Destination to 'log' (Server Log Verification) +############################################################################### + +# Switch destination +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_tab SET (conflict_log_destination = 'all');"); + +$node_subscriber->safe_psql('postgres', "DELETE FROM $conflict_table;"); +# Trigger a conflict for server log (insert_exists) +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (600, 600, 600);"); +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab VALUES (600, 700, 700);"); + +# Wait for table log +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) > 0 FROM $conflict_table;") + or die "Timed out waiting for insert_exists conflict"; + +# Check subscriber server log +my $log_found = $node_subscriber->wait_for_log( + qr/conflict detected on relation "public.conf_tab": conflict=insert_exists/ +); +ok($log_found, 'Conflict correctly directed to server stderr log'); + +# Verify table count DID NOT increase for this conflict +my $table_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM $conflict_table WHERE local_conflicts::text LIKE '%600%';" +); +is($table_check, 1, 'Table log was bypassed when destination set to log'); + +done_testing(); -- 2.43.0