From 7780d27210c3211eb33fafd66c44a2a9f3f91368 Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Tue, 25 Jun 2024 15:45:37 +0530 Subject: [PATCH v1 3/4] Implement conflict resolution for INSERT, UPDATE, and DELETE operations. This patch introduces support for handling conflicts with the following resolutions: - For INSERT conflicts: - insert_exists: remote_apply, keep_local, error - For UPDATE conflicts: - update_differ: remote_apply, keep_local, error - update_missing: apply_or_skip, apply_or_error, skip, error - For DELETE conflicts: - delete_missing: skip, error Each of the conflict type now has defined actions to manage data synchronization between Publisher and subscriber databases. --- src/backend/executor/execReplication.c | 80 ++++- src/backend/replication/logical/conflict.c | 190 ++++++++++-- src/backend/replication/logical/worker.c | 333 ++++++++++++++++----- src/include/executor/executor.h | 5 +- src/include/replication/conflict.h | 19 +- src/test/subscription/t/029_on_error.pl | 7 + 6 files changed, 525 insertions(+), 109 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index f01927a933..3e8d768214 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -566,7 +566,8 @@ retry: */ void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, - EState *estate, TupleTableSlot *slot) + EState *estate, TupleTableSlot *slot, + TupleTableSlot **conflictslot) { bool skip_tuple = false; Relation rel = resultRelInfo->ri_RelationDesc; @@ -602,21 +603,82 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, if (rel->rd_rel->relispartition) ExecPartitionCheck(resultRelInfo, slot, estate, true); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + + /* + * If caller has passed non null conflictslot, check all the unique + * indexes for potential conflicts. If the configured resolver is in + * favour of apply, give the conflicted tuple information in + * conflictslot. + */ + if (conflictslot) + { + foreach_oid(uniqueidx, conflictindexes) + { + /* + * Return if any conflict is found other than one with 'ERROR' + * resolver configured. In case of 'ERROR' resolver, emit + * error here; otherwise return to caller for resolutions. + */ + if (FindConflictTuple(resultRelInfo, estate, uniqueidx, + slot, &(*conflictslot))) + { + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + ConflictResolver resolver; + bool apply_remote = false; + + GetTupleCommitTs(*conflictslot, &xmin, &origin, &committs); + resolver = GetConflictResolver(*conflictslot, rel, + CT_INSERT_EXISTS, + &apply_remote, NULL, uniqueidx, + xmin, origin, committs); + + ReportApplyConflict(LOG, CT_INSERT_EXISTS, resolver, rel, + uniqueidx, xmin, origin, committs, + *conflictslot); + + /* Nothing to apply, free the resources */ + if (!apply_remote) + { + ExecDropSingleTupleTableSlot(*conflictslot); + *conflictslot = NULL; + } + + return; + } + } + } + + /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); - conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; - if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, slot, estate, false, conflictindexes, &conflict, conflictindexes, false); - /* Re-check all the unique indexes for potential conflicts */ + /* + * Re-check all the unique indexes for potential conflicts. + * + * The Reason for doing the check again is: + * + * 1. If the remote change violated multiple unique constraints, the + * current resolution cannot solve it, so we still need to report this + * conflict after resolution. + * + * 2. If the local data is changed immediately after converting the + * insert to update but before updating the data, then the conflict + * can still happen, and we may need to report it again. + * + * XXX: Needs further review and discussion on usefulness of this + * repeated call. + */ foreach_oid(uniqueidx, (conflict ? conflictindexes : NIL)) { - TupleTableSlot *conflictslot; /* * Reports the conflict if any. @@ -631,15 +693,15 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, * cost of finding the tuple should be acceptable in this case. */ if (list_member_oid(recheckIndexes, uniqueidx) && - FindConflictTuple(resultRelInfo, estate, uniqueidx, slot, &conflictslot)) + FindConflictTuple(resultRelInfo, estate, uniqueidx, slot, &(*conflictslot))) { RepOriginId origin; TimestampTz committs; TransactionId xmin; - GetTupleCommitTs(conflictslot, &xmin, &origin, &committs); - ReportApplyConflict(ERROR, CT_INSERT_EXISTS, rel, uniqueidx, - xmin, origin, committs, conflictslot); + GetTupleCommitTs(*conflictslot, &xmin, &origin, &committs); + ReportApplyConflict(ERROR, CT_INSERT_EXISTS, 0, rel, uniqueidx, + xmin, origin, committs, *conflictslot); } } diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 87b1ea0ab3..a622a0ea8b 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -21,13 +21,17 @@ #include "access/table.h" #include "catalog/indexing.h" #include "catalog/pg_conflict.h" +#include "executor/executor.h" #include "replication/conflict.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/timestamp.h" +#include "utils/syscache.h" const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -82,8 +86,9 @@ const int ConflictTypeDefaultResolvers[] = { }; static char *build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot); -static int errdetail_apply_conflict(ConflictType type, Oid conflictidx, - TransactionId localxmin, +static int errdetail_apply_conflict(ConflictType type, + ConflictResolver resolver, + Oid conflictidx, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts, TupleTableSlot *conflictslot); @@ -122,13 +127,13 @@ GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, } /* - * Report a conflict when applying remote changes. + * Report conflict and resolution applied while applying remote changes. */ void -ReportApplyConflict(int elevel, ConflictType type, Relation localrel, - Oid conflictidx, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts, - TupleTableSlot *conflictslot) +ReportApplyConflict(int elevel, ConflictType type, ConflictResolver resolver, + Relation localrel, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot) { ereport(elevel, errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), @@ -136,7 +141,7 @@ ReportApplyConflict(int elevel, ConflictType type, Relation localrel, ConflictTypeNames[type], get_namespace_name(RelationGetNamespace(localrel)), RelationGetRelationName(localrel)), - errdetail_apply_conflict(type, conflictidx, localxmin, localorigin, + errdetail_apply_conflict(type, resolver, conflictidx, localxmin, localorigin, localts, conflictslot)); } @@ -172,12 +177,13 @@ InitConflictIndexes(ResultRelInfo *relInfo) } /* - * Add an errdetail() line showing conflict detail. + * Add an errdetail() line showing conflict and resolution details. */ static int -errdetail_apply_conflict(ConflictType type, Oid conflictidx, - TransactionId localxmin, RepOriginId localorigin, - TimestampTz localts, TupleTableSlot *conflictslot) +errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, + Oid conflictidx, TransactionId localxmin, + RepOriginId localorigin, TimestampTz localts, + TupleTableSlot *conflictslot) { switch (type) { @@ -191,24 +197,29 @@ errdetail_apply_conflict(ConflictType type, Oid conflictidx, char *index_value = build_index_value_desc(conflictidx, conflictslot); - if (index_value && localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", - index_value, get_rel_name(conflictidx), localorigin, - localxmin, timestamptz_to_str(localts)); - else if (index_value && !localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", - index_value, get_rel_name(conflictidx), localxmin); + if (!resolver || (resolver == CR_ERROR)) + { + if (index_value && localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", + index_value, get_rel_name(conflictidx), localorigin, + localxmin, timestamptz_to_str(localts)); + else if (index_value && !localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", + index_value, get_rel_name(conflictidx), localxmin); + else + return errdetail("Key already exists in unique index \"%s\".", + get_rel_name(conflictidx)); + } else - return errdetail("Key already exists in unique index \"%s\".", - get_rel_name(conflictidx)); + return errdetail("Key already exists. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); } case CT_UPDATE_DIFFER: - return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s.", - localorigin, localxmin, timestamptz_to_str(localts)); + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\"", + localorigin, localxmin, timestamptz_to_str(localts), ConflictResolverNames[resolver]); case CT_UPDATE_MISSING: - return errdetail("Did not find the row to be updated."); + return errdetail("Did not find the row to be updated. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); case CT_DELETE_MISSING: - return errdetail("Did not find the row to be deleted."); + return errdetail("Did not find the row to be deleted. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); } return 0; /* silence compiler warning */ @@ -322,6 +333,71 @@ validate_conflict_type_and_resolver(char *conflict_type, char *conflict_resolver return type; } +/* + * Get the global conflict resolver configured for given conflict type + */ +static ConflictResolver +get_conflict_resolver_internal(ConflictType type) +{ + + ConflictResolver resolver; + HeapTuple tuple; + Datum datum; + char *conflict_res; + + /* + * XXX: Currently, we fetch the conflict resolver from cache for each + * conflict detection. If needed, we can keep the info in global variable + * and fetch from cache only once after cache invalidation. + */ + tuple = SearchSysCache1(CONFLICTTYPE, + CStringGetTextDatum(ConflictTypeNames[type])); + + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for conflict type %u", type); + + datum = SysCacheGetAttrNotNull(CONFLICTTYPE, + tuple, Anum_pg_conflict_confres); + + conflict_res = TextDatumGetCString(datum); + + for (resolver = CR_MIN; resolver <= CR_MAX; resolver++) + { + if (strcmp(ConflictResolverNames[resolver], conflict_res) == 0) + break; + } + + ReleaseSysCache(tuple); + return resolver; +} + +/* + * Check if a full tuple can be created from the new tuple. + * Return true if yes, false otherwise. + * + * XXX: Currently, it only handles the simple case of identical table + * structures on both Publisher and subscriber. Need to analyze if more + * cases can be supported. + */ +static bool +can_create_full_tuple(Relation localrel, + LogicalRepTupleData *newtup) +{ + int i; + int local_att = RelationGetNumberOfAttributes(localrel); + + if (newtup->ncols != local_att) + return false; + + for (i = 0; i < newtup->ncols; i++) + { + if (newtup->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + return false; + } + + return true; +} + /* * Execute Conflict Resolver Stmt * @@ -352,7 +428,7 @@ ExecConflictResolverStmt(ConflictResolverStmt *stmt) memset(replaces, false, sizeof(replaces)); values[Anum_pg_conflict_conftype - 1] = - CStringGetTextDatum(stmt->conflict_type); + CStringGetTextDatum(stmt->conflict_type); if (stmt->isReset) { @@ -396,3 +472,65 @@ ExecConflictResolverStmt(ConflictResolverStmt *stmt) table_close(pg_conflict, RowExclusiveLock); } + +/* + * Find the global resolver for the given conflict type. + * It emits error in case the resolver is set to 'ERROR'. + * + * Set 'apply_remote' to true if remote tuple should be applied, + * false otherwise. + */ +ConflictResolver +GetConflictResolver(TupleTableSlot *localslot, Relation localrel, + ConflictType type, bool *apply_remote, + LogicalRepTupleData *newtup, + Oid conflictidx, TransactionId xmin, + RepOriginId origin, TimestampTz committs) +{ + ConflictResolver resolver; + + resolver = get_conflict_resolver_internal(type); + + switch (resolver) + { + case CR_REMOTE_APPLY: + *apply_remote = true; + break; + case CR_APPLY_OR_SKIP: + if (can_create_full_tuple(localrel, newtup)) + { + *apply_remote = true; + break; + } + else + { + *apply_remote = false; + elog(LOG, "UPDATE can not be converted to INSERT, hence SKIP the update!"); + break; + } + case CR_APPLY_OR_ERROR: + if (can_create_full_tuple(localrel, newtup)) + { + *apply_remote = true; + break; + } + else + { + ReportApplyConflict(ERROR, type, resolver, localrel, conflictidx, + xmin, origin, committs, localslot); + break; + } + case CR_KEEP_LOCAL: + case CR_SKIP: + *apply_remote = false; + break; + case CR_ERROR: + ReportApplyConflict(ERROR, type, resolver, localrel, conflictidx, + xmin, origin, committs, localslot); + break; + default: + elog(ERROR, "Conflict %s is detected! Unrecogonized conflict resolution method", ConflictTypeNames[type]); + } + + return resolver; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index af73e09b01..6e2fda28ec 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -382,7 +382,9 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, - TupleTableSlot *remoteslot); + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *rel_entry); static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, @@ -2431,10 +2433,10 @@ apply_handle_insert(StringInfo s) /* For a partitioned table, insert the tuple into a partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) apply_handle_tuple_routing(edata, - remoteslot, NULL, CMD_INSERT); + remoteslot, &newtup, CMD_INSERT); else apply_handle_insert_internal(edata, edata->targetRelInfo, - remoteslot); + remoteslot, &newtup, rel); finish_edata(edata); @@ -2457,9 +2459,13 @@ apply_handle_insert(StringInfo s) static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, - TupleTableSlot *remoteslot) + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *rel_entry) { EState *estate = edata->estate; + MemoryContext oldctx; + TupleTableSlot *conflictslot = NULL; /* We must open indexes here. */ ExecOpenIndices(relinfo, MySubscription->detectconflict); @@ -2469,7 +2475,33 @@ apply_handle_insert_internal(ApplyExecutionData *edata, /* Do the insert. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); - ExecSimpleRelationInsert(relinfo, estate, remoteslot); + + if (MySubscription->detectconflict) + ExecSimpleRelationInsert(relinfo, estate, remoteslot, &conflictslot); + else + ExecSimpleRelationInsert(relinfo, estate, remoteslot, NULL); + + + /* Apply remote tuple by converting INSERT to UPDATE */ + if (conflictslot) + { + EPQState epqstate; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); + + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot, conflictslot, rel_entry, newtup); + MemoryContextSwitchTo(oldctx); + + EvalPlanQualSetSlot(&epqstate, remoteslot); + + /* Do the update */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); + ExecSimpleRelationUpdate(relinfo, estate, &epqstate, conflictslot, + remoteslot); + } + /* Cleanup. */ ExecCloseIndices(relinfo); @@ -2651,6 +2683,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, TupleTableSlot *localslot; bool found; MemoryContext oldctx; + bool apply_remote = true; + ConflictResolver resolver; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); ExecOpenIndices(relinfo, false); @@ -2674,35 +2708,73 @@ apply_handle_update_internal(ApplyExecutionData *edata, /* * If conflict detection is enabled, check whether the local tuple was - * modified by a different origin. If detected, report the conflict. + * modified by a different origin. If detected, report the conflict + * and configured resolver. */ if (MySubscription->detectconflict && GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) - ReportApplyConflict(LOG, CT_UPDATE_DIFFER, localrel, InvalidOid, - localxmin, localorigin, localts, NULL); + { + resolver = GetConflictResolver(localslot, localrel, CT_UPDATE_DIFFER, + &apply_remote, NULL, InvalidOid, + localxmin, localorigin, localts); - /* Process and store remote tuple in the slot */ - oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_data(remoteslot, localslot, relmapentry, newtup); - MemoryContextSwitchTo(oldctx); + ReportApplyConflict(LOG, CT_UPDATE_DIFFER, resolver, localrel, + InvalidOid, localxmin, localorigin, localts, + NULL); + } - EvalPlanQualSetSlot(&epqstate, remoteslot); + /* + * Apply the change if configured resolver is in favor of that, else + * ignore the remote update. + */ + if (apply_remote) + { + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); + MemoryContextSwitchTo(oldctx); - /* Do the actual update. */ - TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); - ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, - remoteslot); + EvalPlanQualSetSlot(&epqstate, remoteslot); + + /* Do the actual update. */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); + ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, + remoteslot); + } } else { /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found. Report the conflict. If + * the configured resolver is in favor of applying the change, convert + * UPDATE to INSERT and apply the change. */ if (MySubscription->detectconflict) - ReportApplyConflict(LOG, CT_UPDATE_MISSING, localrel, InvalidOid, - InvalidTransactionId, InvalidRepOriginId, 0, NULL); + { + resolver = GetConflictResolver(localslot, localrel, CT_UPDATE_MISSING, + &apply_remote, newtup, InvalidOid, + InvalidTransactionId, + InvalidRepOriginId, 0); + + ReportApplyConflict(LOG, CT_UPDATE_MISSING, resolver, localrel, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0, NULL); + + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot, relmapentry, newtup); + slot_fill_defaults(relmapentry, estate, remoteslot); + MemoryContextSwitchTo(oldctx); + + if (apply_remote) + { + elog(LOG, "Converting UPDATE to INSERT and applying!"); + apply_handle_insert_internal(edata, relinfo, remoteslot, + newtup, relmapentry); + } + + } } /* Cleanup. */ @@ -2834,12 +2906,25 @@ apply_handle_delete_internal(ApplyExecutionData *edata, else { /* - * The tuple to be deleted could not be found. Do nothing except for - * emitting a log message. + * The tuple to be deleted could not be found. Based on resolver + * configured, either skip and log a message or emit an error. */ if (MySubscription->detectconflict) - ReportApplyConflict(LOG, CT_DELETE_MISSING, localrel, InvalidOid, - InvalidTransactionId, InvalidRepOriginId, 0, NULL); + { + ConflictResolver resolver; + bool apply_remote = false; + + resolver = GetConflictResolver(localslot, localrel, CT_DELETE_MISSING, + &apply_remote, NULL, InvalidOid, + InvalidTransactionId, + InvalidRepOriginId, 0); + + /* Resolver is set to skip, thus report the conflict and skip */ + if (!apply_remote) + ReportApplyConflict(LOG, CT_DELETE_MISSING, resolver, localrel, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0, NULL); + } } /* Cleanup. */ @@ -2972,19 +3057,21 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); - /* Check if we can do the update or delete on the leaf partition. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE) { - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); - check_relation_updatable(part_entry); + part_entry = logicalrep_partition_open(relmapentry, partrel, attrmap); + + /* Check if we can do the update or delete on the leaf partition */ + if (operation != CMD_INSERT) + check_relation_updatable(part_entry); } switch (operation) { case CMD_INSERT: apply_handle_insert_internal(edata, partrelinfo, - remoteslot_part); + remoteslot_part, newtup, part_entry); break; case CMD_DELETE: @@ -3006,6 +3093,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ResultRelInfo *partrelinfo_new; Relation partrel_new; bool found; + LogicalRepRelMapEntry *part_entry_new = NULL; + ConflictResolver resolver; + bool apply_remote = true; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3015,26 +3105,85 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, if (!found) { /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found. Report the + * conflict and resolver. And take action based on the + * configured resolver. */ if (MySubscription->detectconflict) - ReportApplyConflict(LOG, CT_UPDATE_MISSING, - partrel, InvalidOid, - InvalidTransactionId, + { + resolver = GetConflictResolver(localslot, partrel, CT_UPDATE_MISSING, + &apply_remote, newtup, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0); + + ReportApplyConflict(LOG, CT_UPDATE_MISSING, resolver, partrel, + InvalidOid, InvalidTransactionId, InvalidRepOriginId, 0, NULL); - return; + if (apply_remote) + { + /* + * Resolver is in favour of applying the remote + * changes. Prepare the slot for the INSERT. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot_part, part_entry, newtup); + slot_fill_defaults(part_entry, estate, remoteslot_part); + MemoryContextSwitchTo(oldctx); + } + } } + else + { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; - /* - * Apply the update to the local tuple, putting the result in - * remoteslot_part. - */ - oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_data(remoteslot_part, localslot, part_entry, - newtup); - MemoryContextSwitchTo(oldctx); + /* + * The tuple to be updated is found. If conflict detection + * is enabled, check whether the local tuple was modified + * by a different origin. If detected, report and resolve + * the conflict. + */ + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + { + resolver = GetConflictResolver(localslot, partrel, CT_UPDATE_DIFFER, + &apply_remote, NULL, InvalidOid, + localxmin, localorigin, localts); + + ReportApplyConflict(LOG, CT_UPDATE_DIFFER, resolver, partrel, + InvalidOid, localxmin, localorigin, localts, + NULL); + } + + if (apply_remote) + { + /* + * We can reach here in two cases: + * + * 1. If we found a tuple but conflict detection is + * OFF + * + * 2. If we found a tuple and resolver is in favor of + * applying the change when conflict detection is ON + * + * Putting the result in remoteslot_part. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot_part, localslot, part_entry, + newtup); + MemoryContextSwitchTo(oldctx); + } + else + + /* + * apply_remote can be toggled if resolver for + * update_differ is set to skip. Ignore remote update. + */ + return; + + } /* * Does the updated tuple still satisfy the current @@ -3044,27 +3193,59 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ExecPartitionCheck(partrelinfo, remoteslot_part, estate, false)) { - /* - * Yes, so simply UPDATE the partition. We don't call - * apply_handle_update_internal() here, which would - * normally do the following work, to avoid repeating some - * work already done above to find the local tuple in the - * partition. - */ - EPQState epqstate; - - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(partrelinfo, false); - - EvalPlanQualSetSlot(&epqstate, remoteslot_part); - TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, - ACL_UPDATE); - ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate, - localslot, remoteslot_part); - ExecCloseIndices(partrelinfo); - EvalPlanQualEnd(&epqstate); + if (found && apply_remote) + { + /* + * Yes, so simply UPDATE the partition. We don't call + * apply_handle_update_internal() here, which would + * normally do the following work, to avoid repeating + * some work already done above to find the local + * tuple in the partition. + * + * Do the update in cases - 1. conflict detection is + * OFF and found a tuple 2. conflict detection is ON, + * update_differ conflict is detected for the found + * tuple and the resolver is in favour of applying the + * update. + */ + + EPQState epqstate; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); + ExecOpenIndices(partrelinfo, false); + + EvalPlanQualSetSlot(&epqstate, remoteslot_part); + TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, + ACL_UPDATE); + ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate, + localslot, remoteslot_part); + ExecCloseIndices(partrelinfo); + EvalPlanQualEnd(&epqstate); + + } + else if (apply_remote) + { + /* + * Tuple is not found but update_missing resolver is + * in favour of applying the change as INSERT. + */ + apply_handle_insert_internal(edata, partrelinfo, + remoteslot_part, newtup, + part_entry); + } } - else + + /* + * Updated tuple doesn't satisfy the current partition's + * constraint. + * + * If conflict detection is OFF, proceed by always applying + * the update (as 'apply_remote' is by default true). If + * conflict detection is ON, 'apply_remote' can be OFF as well + * if the resolver for update_missing conflict conveys to skip + * the update. + */ + else if (apply_remote) { /* Move the tuple into the new partition. */ @@ -3105,10 +3286,16 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, get_namespace_name(RelationGetNamespace(partrel_new)), RelationGetRelationName(partrel_new)); - /* DELETE old tuple found in the old partition. */ - apply_handle_delete_internal(edata, partrelinfo, - localslot, - part_entry->localindexoid); + /* + * If tuple is found, delete it from old partition. We can + * reach this flow even for the case when the 'found' flag + * is false for 'update_missing' conflict and resolver is + * in favor of inserting the tuple. + */ + if (found) + apply_handle_delete_internal(edata, partrelinfo, + localslot, + part_entry->localindexoid); /* INSERT new tuple into the new partition. */ @@ -3124,19 +3311,25 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, map = ExecGetRootToChildMap(partrelinfo_new, estate); if (map != NULL) { + attrmap = map->attrMap; remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot, remoteslot_part); } else { + attrmap = NULL; remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); slot_getallattrs(remoteslot); } MemoryContextSwitchTo(oldctx); + part_entry_new = logicalrep_partition_open(part_entry, + partrel_new, + attrmap); apply_handle_insert_internal(edata, partrelinfo_new, - remoteslot_part); + remoteslot_part, newtup, + part_entry_new); } } break; diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 9770752ea3..bc72b1f4fd 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -18,6 +18,8 @@ #include "fmgr.h" #include "nodes/lockoptions.h" #include "nodes/parsenodes.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "utils/memutils.h" @@ -655,7 +657,8 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, - EState *estate, TupleTableSlot *slot); + EState *estate, TupleTableSlot *slot, + TupleTableSlot **conflictsloty); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot); diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 7eff93eff3..aa21886ee7 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -13,6 +13,8 @@ #include "catalog/objectaddress.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "utils/relcache.h" #include "utils/timestamp.h" @@ -72,9 +74,20 @@ typedef enum ConflictResolver extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts); extern void ReportApplyConflict(int elevel, ConflictType type, - Relation localrel, Oid conflictidx, - TransactionId localxmin, RepOriginId localorigin, - TimestampTz localts, TupleTableSlot *conflictslot); + ConflictResolver resolver, Relation localrel, + Oid conflictidx, TransactionId localxmin, + RepOriginId localorigin, TimestampTz localts, + TupleTableSlot *conflictslot); extern void InitConflictIndexes(ResultRelInfo *relInfo); extern void ExecConflictResolverStmt(ConflictResolverStmt *stmt); +extern ConflictResolver GetConflictResolver(TupleTableSlot *localslot, + Relation localrel, + ConflictType type, + bool *apply_remote, + LogicalRepTupleData *newtup, + Oid conflictidx, TransactionId xmin, + RepOriginId origin, + TimestampTz committs); +extern bool CanCreateFullTuple(Relation localrel, + LogicalRepTupleData *newtup); #endif diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index 496a3c6cd9..c746191601 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -113,6 +113,10 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on, detect_conflict = on)" ); +# Set 'ERROR' conflict resolver for 'insert_exist' conflict type +$node_subscriber->safe_psql('postgres',"set conflict resolver 'error' for 'insert_exists'"); + + # Initial synchronization failure causes the subscription to be disabled. $node_subscriber->poll_query_until('postgres', "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" @@ -177,6 +181,9 @@ $result = $node_subscriber->safe_psql('postgres', is($result, "0", "check all prepared transactions are resolved on the subscriber"); +# Reset conflict resolver for 'insert_exist' conflict type to default. +$node_subscriber->safe_psql('postgres',"reset conflict resolver for 'insert_exists'"); + $node_subscriber->stop; $node_publisher->stop; -- 2.34.1