From 9abb6eaf79f233db323f8ed65fd69e0651c11c54 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Sat, 20 Dec 2025 15:20:09 +0530 Subject: [PATCH v14 2/2] Implement the conflict insertion infrastructure into the conflict log table Note: A single remote tuple may conflict with multiple local tuples when conflict type is CT_MULTIPLE_UNIQUE_CONFLICTS, so for handling this case we create a single row in conflict log table with respect to each remote conflict tuple even if it conflicts with multiple local tuples and we store the multiple conflict tuples as a single JSON array element in format as [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] We can extract the elements of the local tuples from the conflict log table row as given in below example. SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid, local_conflicts[1] ->> 'tuple' AS local_tuple FROM myschema.conflict_log_history2; remote_xid | relname | remote_origin | local_xid | local_tuple ------------+----------+---------------+-----------+--------------------- 760 | test | pg_16406 | 771 | {"a":1,"b":10} 765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2} --- src/backend/replication/logical/conflict.c | 619 +++++++++++++++++++-- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/worker.c | 38 +- src/include/replication/conflict.h | 3 + src/include/replication/worker_internal.h | 7 + 5 files changed, 635 insertions(+), 33 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 16695592265..5f753cd8042 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,13 +15,22 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" #include "access/tableam.h" +#include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "funcapi.h" #include "pgstat.h" #include "replication/conflict.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/jsonb.h" + +#define MAX_LOCAL_CONFLICT_INFO_ATTRS 5 static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -50,8 +59,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid); +static void build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull); static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot); +static Datum tuple_table_slot_to_indextup_json(EState *estate, + Relation localrel, + Oid replica_index, + TupleTableSlot *slot); +static TupleDesc build_conflict_tupledesc(void); +static Datum build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples); +static void prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot); /* * Get the xmin and commit timestamp data (origin and timestamp) associated @@ -105,11 +133,19 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples) { - Relation localrel = relinfo->ri_RelationDesc; - StringInfoData err_detail; + Relation localrel = relinfo->ri_RelationDesc; + StringInfoData err_detail; + ConflictLogDest dest; + Relation conflictlogrel; initStringInfo(&err_detail); + /* + * Get both the conflict log destination and the opened conflict log + * relation for insertion. + */ + conflictlogrel = GetConflictLogTableInfo(&dest); + /* Form errdetail message by combining conflicting tuples information. */ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) errdetail_apply_conflict(estate, relinfo, type, searchslot, @@ -120,15 +156,69 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, conflicttuple->ts, &err_detail); + /* Insert to table if destination is 'table' or 'all' */ + if (conflictlogrel) + { + Assert(dest == CONFLICT_LOG_DEST_TABLE || + dest == CONFLICT_LOG_DEST_ALL); + + if (ValidateConflictLogTable(conflictlogrel)) + { + /* + * Prepare the conflict log tuple. If the error level is below + * ERROR, insert it immediately. Otherwise, defer the insertion to + * a new transaction after the current one aborts, ensuring the + * insertion of the log tuple is not rolled back. + */ + prepare_conflict_log_tuple(estate, + relinfo->ri_RelationDesc, + conflictlogrel, + type, + searchslot, + conflicttuples, + remoteslot); + if (elevel < ERROR) + InsertConflictLogTuple(conflictlogrel); + } + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("conflict log table \"%s.%s\" structure changed, skipping insertion", + get_namespace_name(RelationGetNamespace(conflictlogrel)), + RelationGetRelationName(conflictlogrel))); + + table_close(conflictlogrel, RowExclusiveLock); + } + pgstat_report_subscription_conflict(MySubscription->oid, type); - ereport(elevel, - errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), - ConflictTypeNames[type]), - errdetail_internal("%s", err_detail.data)); + /* Decide what detail to show in server logs. */ + if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL) + { + /* Standard reporting with full internal details. */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s.%s\": conflict=%s", + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel), + ConflictTypeNames[type]), + errdetail_internal("%s", err_detail.data)); + } + else + { + /* + * 'table' only: Report the error msg but omit raw tuple data from + * server logs since it's already captured in the internal table. + */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s.%s\": conflict=%s", + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel), + ConflictTypeNames[type]), + errdetail("Conflict details logged to internal table with OID %u.", + MySubscription->conflictrelid)); + } } /* @@ -162,6 +252,142 @@ InitConflictIndexes(ResultRelInfo *relInfo) relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; } +/* + * GetConflictLogTableInfo + * + * Fetches conflict logging metadata from the cached MySubscription pointer. + * Sets the destination enum in *log_dest and, if applicable, opens and + * returns the relation handle for the internal log table. + */ +Relation +GetConflictLogTableInfo(ConflictLogDest *log_dest) +{ + Oid conflictlogrelid; + Relation conflictlogrel = NULL; + + /* + * Convert the text log destination to the internal enum. MySubscription + * already contains the data from pg_subscription. + */ + *log_dest = GetLogDestination(MySubscription->logdestination); + conflictlogrelid = MySubscription->conflictrelid; + + /* If destination is 'log' only, no table to open. */ + if (*log_dest == CONFLICT_LOG_DEST_LOG) + return NULL; + + Assert(OidIsValid(conflictlogrelid)); + + conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock); + + /* Conflict log table is dropped or not accessible. */ + if (conflictlogrel == NULL) + ereport(WARNING, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("conflict log table with OID %u does not exist", + conflictlogrelid))); + + return conflictlogrel; +} + +/* + * InsertConflictLogTuple + * + * Insert conflict log tuple into the conflict log table. It uses + * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple + * inserted into the conflict log table. + */ +void +InsertConflictLogTuple(Relation conflictlogrel) +{ + int options = HEAP_INSERT_NO_LOGICAL; + + /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ + Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + + heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, + GetCurrentCommandId(true), options, NULL); + + /* Free conflict log tuple. */ + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; +} + +/* + * ValidateConflictLogTable - Validate conflict log table + * + * Validate whether the conflict log table is still suitable for considering as + * conflict log table. + */ +bool +ValidateConflictLogTable(Relation rel) +{ + Relation pg_attribute; + HeapTuple atup; + ScanKeyData scankey; + SysScanDesc scan; + Form_pg_attribute attForm; + int attcnt = 0; + bool tbl_ok = true; + + /* + * Check whether the table definition including its column names, data + * types, and column ordering meets the requirements for conflict log + * table. + */ + pg_attribute = table_open(AttributeRelationId, AccessShareLock); + ScanKeyInit(&scankey, + Anum_pg_attribute_attrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true, + SnapshotSelf, 1, &scankey); + + /* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */ + while (HeapTupleIsValid(atup = systable_getnext(scan))) + { + const ConflictLogColumnDef *expected; + int schema_idx; + + attForm = (Form_pg_attribute) GETSTRUCT(atup); + + /* Skip system columns and dropped columns */ + if (attForm->attnum < 1 || attForm->attisdropped) + continue; + + attcnt++; + + /* attnum 1 corresponds to index 0 in ConflictLogSchema */ + schema_idx = attForm->attnum - 1; + + /* Check against the central schema definition */ + if (schema_idx >= MAX_CONFLICT_ATTR_NUM) + { + /* Found an extra column beyond the required set */ + tbl_ok = false; + break; + } + + expected = &ConflictLogSchema[schema_idx]; + + if (attForm->atttypid != expected->atttypid || + strcmp(NameStr(attForm->attname), expected->attname) != 0) + { + tbl_ok = false; + break; + } + } + + systable_endscan(scan); + table_close(pg_attribute, AccessShareLock); + + if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok) + return false; + + return true; +} + /* * Add SQLSTATE error code to the current conflict report. */ @@ -472,6 +698,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, return tuple_value.data; } +/* + * Helper function to extract the "raw" index key Datums and their null flags + * from a TupleTableSlot, given an already open index descriptor. + * This is the reusable core logic. + */ +static void +build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull) +{ + TupleTableSlot *tableslot = slot; + + /* + * If the slot is a virtual slot, copy it into a heap tuple slot as + * FormIndexDatum only works with heap tuple slots. + */ + if (TTS_IS_VIRTUAL(slot)) + { + /* Slot is created within the EState's tuple table */ + tableslot = table_slot_create(localrel, &estate->es_tupleTable); + tableslot = ExecCopySlot(tableslot, slot); + } + + /* + * Initialize ecxt_scantuple for potential use in FormIndexDatum + */ + GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + + /* Form the index datums */ + FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, + isnull); +} + /* * Helper functions to construct a string describing the contents of an index * entry. See BuildIndexValueDescription for details. @@ -487,41 +747,336 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Relation indexDesc; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - TupleTableSlot *tableslot = slot; - if (!tableslot) + if (!slot) return NULL; Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); indexDesc = index_open(indexoid, NoLock); - /* - * If the slot is a virtual slot, copy it into a heap tuple slot as - * FormIndexDatum only works with heap tuple slots. - */ - if (TTS_IS_VIRTUAL(slot)) + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + + index_value = BuildIndexValueDescription(indexDesc, values, isnull); + + index_close(indexDesc, NoLock); + + return index_value; +} + +/* + * tuple_table_slot_to_json_datum + * + * Helper function to convert a TupleTableSlot to Jsonb. + */ +static Datum +tuple_table_slot_to_json_datum(TupleTableSlot *slot) +{ + HeapTuple tuple; + Datum datum; + Datum json; + + Assert(slot != NULL); + + tuple = ExecCopySlotHeapTuple(slot); + datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor); + + json = DirectFunctionCall1(row_to_json, datum); + heap_freetuple(tuple); + + return json; +} + +/* + * tuple_table_slot_to_indextup_json + * + * Fetch replica identity key from the tuple table slot and convert into a + * jsonb datum. + */ +static Datum +tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, + Oid indexid, TupleTableSlot *slot) +{ + Relation indexDesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + HeapTuple tuple; + TupleDesc tupdesc; + Datum datum; + + Assert(slot != NULL); + + Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true)); + + indexDesc = index_open(indexid, NoLock); + + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + tupdesc = RelationGetDescr(indexDesc); + + /* Bless the tupdesc so it can be looked up by row_to_json. */ + BlessTupleDesc(tupdesc); + + /* Form the replica identity tuple. */ + tuple = heap_form_tuple(tupdesc, values, isnull); + datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + index_close(indexDesc, NoLock); + + /* Convert to a JSONB datum. */ + return DirectFunctionCall1(row_to_json, datum); +} + +/* + * Initialize the tuple descriptor for local conflict info. + */ +static TupleDesc +build_conflict_tupledesc(void) +{ + TupleDesc tupdesc; + int attno = 1; + + tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS); + + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "commit_ts", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "origin", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "key", + JSONOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno, "tuple", + JSONOID, -1, 0); + + BlessTupleDesc(tupdesc); + + Assert(attno == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + return tupdesc; +} + +/* + * Builds the local conflicts JSONB array column from the list of + * ConflictTupleInfo objects. + * + * Example output structure: + * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] + */ +static Datum +build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples) +{ + ListCell *lc; + List *json_datums = NIL; /* List to hold the row_to_json results (type json) */ + Datum *json_datum_array; + bool *json_null_array; + Datum json_array_datum; + int num_conflicts; + int i; + int16 typlen; + bool typbyval; + char typalign; + TupleDesc tupdesc; + + /* Build local conflicts tuple descriptor. */ + tupdesc = build_conflict_tupledesc(); + + /* Process local conflict tuple list and prepare an array of JSON. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { - tableslot = table_slot_create(localrel, &estate->es_tupleTable); - tableslot = ExecCopySlot(tableslot, slot); + Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS]; + bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS]; + char *origin_name = NULL; + HeapTuple tuple; + Datum json_datum; + int attno; + + memset(values, 0, sizeof(Datum) * MAX_LOCAL_CONFLICT_INFO_ATTRS); + memset(nulls, 0, sizeof(bool) * MAX_LOCAL_CONFLICT_INFO_ATTRS); + + attno = 0; + values[attno++] = TransactionIdGetDatum(conflicttuple->xmin); + + if (conflicttuple->ts) + values[attno++] = TimestampTzGetDatum(conflicttuple->ts); + else + nulls[attno++] = true; + + if (conflicttuple->origin != InvalidRepOriginId) + replorigin_by_oid(conflicttuple->origin, true, &origin_name); + + /* Store empty string if origin name for the tuple is NULL. */ + if (origin_name != NULL) + values[attno++] = CStringGetTextDatum(origin_name); + else + nulls[attno++] = true; + + /* + * Add the conflicting key values in the case of a unique constraint + * violation. + */ + if (conflict_type == CT_INSERT_EXISTS || + conflict_type == CT_UPDATE_EXISTS || + conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS) + { + Oid indexoid = conflicttuple->indexoid; + + Assert(OidIsValid(indexoid) && conflicttuple->slot && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, + true)); + values[attno++] = + tuple_table_slot_to_indextup_json(estate, rel, + indexoid, + conflicttuple->slot); + } + else + nulls[attno++] = true; + + /* Convert conflicting tuple to JSON datum. */ + if (conflicttuple->slot) + values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot); + else + nulls[attno] = true; + + Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + json_datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + /* + * Build the higher level JSON datum in format described in function + * header. + */ + json_datum = DirectFunctionCall1(row_to_json, json_datum); + + /* Done with the temporary tuple. */ + heap_freetuple(tuple); + + /* Add to the array element. */ + json_datums = lappend(json_datums, (void *) json_datum); } - /* - * Initialize ecxt_scantuple for potential use in FormIndexDatum when - * index expressions are present. - */ - GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + num_conflicts = list_length(json_datums); - /* - * The values/nulls arrays passed to BuildIndexValueDescription should be - * the results of FormIndexDatum, which are the "raw" input to the index - * AM. - */ - FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull); + json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum)); + json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool)); - index_value = BuildIndexValueDescription(indexDesc, values, isnull); + i = 0; + foreach(lc, json_datums) + { + json_datum_array[i] = (Datum) lfirst(lc); + i++; + } - index_close(indexDesc, NoLock); + /* Construct the json[] array Datum. */ + get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign); + json_array_datum = PointerGetDatum(construct_array(json_datum_array, + num_conflicts, + JSONOID, + typlen, + typbyval, + typalign)); + pfree(json_datum_array); + pfree(json_null_array); + + return json_array_datum; +} - return index_value; +/* + * prepare_conflict_log_tuple + * + * This routine prepares a tuple detailing a conflict encountered during + * logical replication. The prepared tuple will be stored in + * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the + * conflict log table by calling InsertConflictLogTuple. + */ +static void +prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot) +{ + Datum values[MAX_CONFLICT_ATTR_NUM]; + bool nulls[MAX_CONFLICT_ATTR_NUM]; + int attno; + char *remote_origin = NULL; + MemoryContext oldctx; + + Assert(MyLogicalRepWorker->conflict_log_tuple == NULL); + + /* Initialize values and nulls arrays. */ + memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM); + memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM); + + /* Populate the values and nulls arrays. */ + attno = 0; + values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel)); + + values[attno++] = + CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel))); + + values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel)); + + values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); + + if (TransactionIdIsValid(remote_xid)) + values[attno++] = TransactionIdGetDatum(remote_xid); + else + nulls[attno++] = true; + + values[attno++] = LSNGetDatum(remote_final_lsn); + + if (remote_commit_ts > 0) + values[attno++] = TimestampTzGetDatum(remote_commit_ts); + else + nulls[attno++] = true; + + if (replorigin_session_origin != InvalidRepOriginId) + replorigin_by_oid(replorigin_session_origin, true, &remote_origin); + + if (remote_origin != NULL) + values[attno++] = CStringGetTextDatum(remote_origin); + else + nulls[attno++] = true; + + if (!TupIsNull(searchslot)) + { + Oid replica_index = GetRelationIdentityOrPK(rel); + + /* + * If the table has a valid replica identity index, build the index + * json datum from key value. Otherwise, construct it from the complete + * tuple in REPLICA IDENTITY FULL cases. + */ + if (OidIsValid(replica_index)) + values[attno++] = tuple_table_slot_to_indextup_json(estate, rel, + replica_index, + searchslot); + else + values[attno++] = tuple_table_slot_to_json_datum(searchslot); + } + else + nulls[attno++] = true; + + if (!TupIsNull(remoteslot)) + values[attno++] = tuple_table_slot_to_json_datum(remoteslot); + else + nulls[attno++] = true; + + values[attno] = build_local_conflicts_json_array(estate, rel, + conflict_type, + conflicttuples); + + Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM); + + oldctx = MemoryContextSwitchTo(ApplyContext); + MyLogicalRepWorker->conflict_log_tuple = + heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3991e1495d4..bc7e1d9ebde 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -477,6 +477,7 @@ retry: worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->conflict_log_tuple = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc64476a9ef..ccd1b2c6e81 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -482,7 +482,9 @@ static bool MySubscriptionValid = false; static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +TransactionId remote_xid = InvalidTransactionId; +TimestampTz remote_commit_ts = 0; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; + remote_commit_ts = begin_data.committime; + remote_xid = begin_data.xid; maybe_start_skipping_changes(begin_data.final_lsn); @@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s) /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -5609,6 +5617,34 @@ start_apply(XLogRecPtr origin_startpos) pgstat_report_subscription_error(MySubscription->oid, MyLogicalRepWorker->type); + /* + * Insert any pending conflict log tuple under a new transaction. + */ + if (MyLogicalRepWorker->conflict_log_tuple != NULL) + { + Relation conflictlogrel; + ConflictLogDest dest; + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Open conflict log table and insert the tuple. */ + conflictlogrel = GetConflictLogTableInfo(&dest); + if (ValidateConflictLogTable(conflictlogrel)) + InsertConflictLogTuple(conflictlogrel); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion", + get_namespace_name(RelationGetNamespace(conflictlogrel)), + RelationGetRelationName(conflictlogrel))); + MyLogicalRepWorker->conflict_log_tuple = NULL; + table_close(conflictlogrel, RowExclusiveLock); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + PG_RE_THROW(); } } diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 70f8744b381..226fed9e383 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -139,4 +139,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, List *conflicttuples); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern Relation GetConflictLogTableInfo(ConflictLogDest *log_dest); +extern void InsertConflictLogTuple(Relation conflictlogrel); +extern bool ValidateConflictLogTable(Relation rel); #endif diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f081619f151..711c04c7297 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -101,6 +101,9 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* A conflict log tuple that is prepared but not yet inserted. */ + HeapTuple conflict_log_tuple; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern XLogRecPtr remote_final_lsn; +extern TimestampTz remote_commit_ts; +extern TransactionId remote_xid; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, -- 2.49.0