From ac4879d814fc85bf59fd8a66e1d7b01f6d67d62c Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Thu, 25 Jul 2024 13:44:17 +0530 Subject: [PATCH v7 3/4] Conflict resolvers for insert, update, and delete This patch introduces support for handling conflicts with the following resolutions: - For INSERT conflicts: - insert_exists: remote_apply, keep_local, error - For UPDATE conflicts: - update_differ: remote_apply, keep_local, error - update_missing: apply_or_skip, apply_or_error, skip, error - For DELETE conflicts: - delete_missing: skip, error - delete_differ: remote_apply, keep_local, error --- src/backend/executor/execReplication.c | 56 +- src/backend/replication/logical/conflict.c | 207 ++++++- src/backend/replication/logical/worker.c | 366 ++++++++--- src/include/executor/executor.h | 5 +- src/include/replication/conflict.h | 13 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/029_on_error.pl | 9 + .../subscription/t/034_conflict_resolver.pl | 576 ++++++++++++++++++ 8 files changed, 1104 insertions(+), 129 deletions(-) create mode 100755 src/test/subscription/t/034_conflict_resolver.pl diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 0680bc86fd..fb96bb7005 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -590,8 +590,9 @@ ReCheckConflictIndexes(ResultRelInfo *resultRelInfo, EState *estate, TransactionId xmin; GetTupleCommitTs(conflictslot, &xmin, &origin, &committs); - ReportApplyConflict(ERROR, type, resultRelInfo->ri_RelationDesc, uniqueidx, - xmin, origin, committs, conflictslot); + ReportApplyConflict(type, CR_ERROR, resultRelInfo->ri_RelationDesc, + uniqueidx, xmin, origin, committs, + conflictslot, false); } } } @@ -604,7 +605,8 @@ ReCheckConflictIndexes(ResultRelInfo *resultRelInfo, EState *estate, */ void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, - EState *estate, TupleTableSlot *slot) + EState *estate, TupleTableSlot *slot, + TupleTableSlot **conflictslot, Oid subid) { bool skip_tuple = false; Relation rel = resultRelInfo->ri_RelationDesc; @@ -640,11 +642,55 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, if (rel->rd_rel->relispartition) ExecPartitionCheck(resultRelInfo, slot, estate, true); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + + /* + * If caller has passed non null conflictslot, check all the unique + * indexes for potential conflicts. If the configured resolver is in + * favour of apply, give the conflicted tuple information in + * conflictslot. + */ + if (conflictslot) + { + foreach_oid(uniqueidx, conflictindexes) + { + /* + * Return if any conflict is found other than one with 'ERROR' + * resolver configured. In case of 'ERROR' resolver, emit + * error here; otherwise return to caller for resolutions. + */ + if (FindConflictTuple(resultRelInfo, estate, uniqueidx, + slot, &(*conflictslot))) + { + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + ConflictResolver resolver; + bool apply_remote = false; + + GetTupleCommitTs(*conflictslot, &xmin, &origin, &committs); + resolver = GetConflictResolver(rel, CT_INSERT_EXISTS, + &apply_remote, NULL, subid); + + ReportApplyConflict(CT_INSERT_EXISTS, resolver, rel, + uniqueidx, xmin, origin, committs, + *conflictslot, apply_remote); + + /* Nothing to apply, free the resources */ + if (!apply_remote) + { + ExecDropSingleTupleTableSlot(*conflictslot); + *conflictslot = NULL; + } + + return; + } + } + } + /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); - conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; - if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, slot, estate, false, diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 104a8ba3cd..50f12a0efd 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -26,13 +26,17 @@ #include "catalog/pg_subscription_conflict_d.h" #include "catalog/pg_inherits.h" #include "commands/defrem.h" +#include "executor/executor.h" #include "replication/conflict.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/syscache.h" #include "utils/rel.h" +#include "utils/syscache.h" const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -94,11 +98,13 @@ const int ConflictTypeDefaultResolvers[] = { }; static char *build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot); -static int errdetail_apply_conflict(ConflictType type, Oid conflictidx, - TransactionId localxmin, +static int errdetail_apply_conflict(ConflictType type, + ConflictResolver resolver, + Oid conflictidx, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts, - TupleTableSlot *conflictslot); + TupleTableSlot *conflictslot, + bool apply_remote); /* * Get the xmin and commit timestamp data (origin and timestamp) associated @@ -133,22 +139,32 @@ GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, } /* - * Report a conflict when applying remote changes. + * Report conflict and resolution applied while applying remote changes. */ void -ReportApplyConflict(int elevel, ConflictType type, Relation localrel, - Oid conflictidx, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts, - TupleTableSlot *conflictslot) +ReportApplyConflict(ConflictType type, ConflictResolver resolver, + Relation localrel, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot, + bool apply_remote) { + int elevel; + + if (resolver == CR_ERROR || + (resolver == CR_APPLY_OR_ERROR && !apply_remote)) + elevel = ERROR; + else + elevel = LOG; + ereport(elevel, errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), - errmsg("conflict %s detected on relation \"%s.%s\"", + errmsg("conflict %s detected on relation \"%s.%s\". Resolution: %s", ConflictTypeNames[type], get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel)), - errdetail_apply_conflict(type, conflictidx, localxmin, localorigin, - localts, conflictslot)); + RelationGetRelationName(localrel), + ConflictResolverNames[resolver]), + errdetail_apply_conflict(type, resolver, conflictidx, localxmin, localorigin, + localts, conflictslot, apply_remote)); } /* @@ -183,13 +199,21 @@ InitConflictIndexes(ResultRelInfo *relInfo) } /* - * Add an errdetail() line showing conflict detail. + * Add an errdetail() line showing conflict and resolution details. */ static int -errdetail_apply_conflict(ConflictType type, Oid conflictidx, - TransactionId localxmin, RepOriginId localorigin, - TimestampTz localts, TupleTableSlot *conflictslot) +errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, + Oid conflictidx, TransactionId localxmin, + RepOriginId localorigin, TimestampTz localts, + TupleTableSlot *conflictslot, bool apply_remote) { + char *applymsg; + + if (apply_remote) + applymsg = "applying the remote changes."; + else + applymsg = "ignoring the remote changes."; + switch (type) { case CT_INSERT_EXISTS: @@ -203,27 +227,43 @@ errdetail_apply_conflict(ConflictType type, Oid conflictidx, char *index_value = build_index_value_desc(conflictidx, conflictslot); - if (index_value && localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", - index_value, get_rel_name(conflictidx), localorigin, - localxmin, timestamptz_to_str(localts)); - else if (index_value && !localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", - index_value, get_rel_name(conflictidx), localxmin); + if (resolver == CR_ERROR) + { + if (index_value && localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", + index_value, get_rel_name(conflictidx), localorigin, + localxmin, timestamptz_to_str(localts)); + else if (index_value && !localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", + index_value, get_rel_name(conflictidx), localxmin); + else + return errdetail("Key already exists in unique index \"%s\".", + get_rel_name(conflictidx)); + } else - return errdetail("Key already exists in unique index \"%s\".", - get_rel_name(conflictidx)); + return errdetail("Key already exists, %s", applymsg); } case CT_UPDATE_DIFFER: - return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s.", - localorigin, localxmin, timestamptz_to_str(localts)); + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s, %s", + localorigin, localxmin, + timestamptz_to_str(localts), applymsg); case CT_UPDATE_MISSING: - return errdetail("Did not find the row to be updated."); + if (resolver == CR_APPLY_OR_SKIP && !apply_remote) + return errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence SKIP the update."); + else if (resolver == CR_APPLY_OR_ERROR && !apply_remote) + return errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence ERROR out."); + else if (apply_remote) + return errdetail("Did not find the row to be updated. Convert UPDATE to INSERT and %s", + applymsg); + else + return errdetail("Did not find the row to be updated, %s", + applymsg); case CT_DELETE_MISSING: return errdetail("Did not find the row to be deleted."); case CT_DELETE_DIFFER: - return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s.", - localorigin, localxmin, timestamptz_to_str(localts)); + return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s, %s", + localorigin, localxmin, + timestamptz_to_str(localts), applymsg); } return 0; /* silence compiler warning */ @@ -339,6 +379,69 @@ validate_conflict_type_and_resolver(const char *conflict_type, } +/* + * Get the conflict resolver configured at subscription level for + * for the given conflict type. + */ +static ConflictResolver +get_conflict_resolver_internal(ConflictType type, Oid subid) +{ + + ConflictResolver resolver; + HeapTuple tuple; + Datum datum; + char *conflict_res; + + /* + * XXX: Currently, we fetch the conflict resolver from cache for each + * conflict detection. If needed, we can keep the info in global variable + * and fetch from cache only once after cache invalidation. + */ + tuple = SearchSysCache2(SUBSCRIPTIONCONFLICTSUBOID, + ObjectIdGetDatum(subid), + CStringGetTextDatum(ConflictTypeNames[type])); + + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for conflict type %u", type); + + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONCONFLICTSUBOID, + tuple, Anum_pg_subscription_conflict_confrres); + + conflict_res = TextDatumGetCString(datum); + + for (resolver = CR_MIN; resolver <= CR_MAX; resolver++) + { + if (strcmp(ConflictResolverNames[resolver], conflict_res) == 0) + break; + } + + ReleaseSysCache(tuple); + return resolver; +} + +/* + * Check if a full tuple can be created from the new tuple. + * Return true if yes, false otherwise. + */ +static bool +can_create_full_tuple(Relation localrel, + LogicalRepTupleData *newtup) +{ + int i; + int local_att = RelationGetNumberOfAttributes(localrel); + + if (newtup->ncols != local_att) + return false; + + for (i = 0; i < newtup->ncols; i++) + { + if (newtup->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + return false; + } + + return true; +} + /* * Extract the conflict type and conflict resolvers from the * ALTER SUBSCRIPTION command and return a list of ConflictTypeResolver nodes. @@ -502,3 +605,47 @@ RemoveSubscriptionConflictBySubid(Oid subid) table_endscan(scan); table_close(rel, RowExclusiveLock); } + +/* + * Find the resolver of the conflict type set under the given subscription. + * + * Set 'apply_remote' to true if remote tuple should be applied, + * false otherwise. + */ +ConflictResolver +GetConflictResolver(Relation localrel, ConflictType type, bool *apply_remote, + LogicalRepTupleData *newtup, Oid subid) +{ + ConflictResolver resolver; + + resolver = get_conflict_resolver_internal(type, subid); + + switch (resolver) + { + case CR_REMOTE_APPLY: + *apply_remote = true; + break; + case CR_APPLY_OR_SKIP: + if (can_create_full_tuple(localrel, newtup)) + *apply_remote = true; + else + *apply_remote = false; + break; + case CR_APPLY_OR_ERROR: + if (can_create_full_tuple(localrel, newtup)) + *apply_remote = true; + else + *apply_remote = false; + break; + case CR_KEEP_LOCAL: + case CR_SKIP: + case CR_ERROR: + *apply_remote = false; + break; + default: + elog(ERROR, "Conflict %s is detected! Unrecogonized conflict resolution method", + ConflictTypeNames[type]); + } + + return resolver; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc3f80e8b0..36172b2460 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -382,7 +382,9 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, - TupleTableSlot *remoteslot); + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *rel_entry); static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, @@ -2428,10 +2430,10 @@ apply_handle_insert(StringInfo s) /* For a partitioned table, insert the tuple into a partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) apply_handle_tuple_routing(edata, - remoteslot, NULL, CMD_INSERT); + remoteslot, &newtup, CMD_INSERT); else apply_handle_insert_internal(edata, edata->targetRelInfo, - remoteslot); + remoteslot, &newtup, rel); finish_edata(edata); @@ -2454,9 +2456,13 @@ apply_handle_insert(StringInfo s) static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, - TupleTableSlot *remoteslot) + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *rel_entry) { EState *estate = edata->estate; + MemoryContext oldctx; + TupleTableSlot *conflictslot = NULL; /* We must open indexes here. */ ExecOpenIndices(relinfo, MySubscription->detectconflict); @@ -2466,7 +2472,36 @@ apply_handle_insert_internal(ApplyExecutionData *edata, /* Do the insert. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); - ExecSimpleRelationInsert(relinfo, estate, remoteslot); + + if (MySubscription->detectconflict) + ExecSimpleRelationInsert(relinfo, estate, remoteslot, &conflictslot, + MyLogicalRepWorker->subid); + else + ExecSimpleRelationInsert(relinfo, estate, remoteslot, NULL, InvalidOid); + + + /* Apply remote tuple by converting INSERT to UPDATE */ + if (conflictslot) + { + EPQState epqstate; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); + + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot, conflictslot, rel_entry, newtup); + MemoryContextSwitchTo(oldctx); + + EvalPlanQualSetSlot(&epqstate, remoteslot); + + /* Do the update */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); + ExecSimpleRelationUpdate(relinfo, estate, &epqstate, conflictslot, + remoteslot); + + EvalPlanQualEnd(&epqstate); + ExecDropSingleTupleTableSlot(conflictslot); + } /* Cleanup. */ ExecCloseIndices(relinfo); @@ -2648,6 +2683,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, TupleTableSlot *localslot; bool found; MemoryContext oldctx; + bool apply_remote = true; + ConflictResolver resolver; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); ExecOpenIndices(relinfo, MySubscription->detectconflict); @@ -2671,38 +2708,71 @@ apply_handle_update_internal(ApplyExecutionData *edata, /* * If conflict detection is enabled, check whether the local tuple was - * modified by a different origin. If detected, report the conflict. + * modified by a different origin. If detected, report the conflict + * and configured resolver. */ if (MySubscription->detectconflict && GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) - ReportApplyConflict(LOG, CT_UPDATE_DIFFER, localrel, InvalidOid, - localxmin, localorigin, localts, NULL); + { + resolver = GetConflictResolver(localrel, CT_UPDATE_DIFFER, + &apply_remote, NULL, + MyLogicalRepWorker->subid); - /* Process and store remote tuple in the slot */ - oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_data(remoteslot, localslot, relmapentry, newtup); - MemoryContextSwitchTo(oldctx); + ReportApplyConflict(CT_UPDATE_DIFFER, resolver, localrel, + InvalidOid, localxmin, localorigin, localts, + NULL, apply_remote); + } - EvalPlanQualSetSlot(&epqstate, remoteslot); + /* + * Apply the change if configured resolver is in favor of that, else + * ignore the remote update. + */ + if (apply_remote) + { + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); + MemoryContextSwitchTo(oldctx); - if (MySubscription->detectconflict) - InitConflictIndexes(relinfo); + EvalPlanQualSetSlot(&epqstate, remoteslot); - /* Do the actual update. */ - TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); - ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, - remoteslot); + if (MySubscription->detectconflict) + InitConflictIndexes(relinfo); + + /* Do the actual update. */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); + ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, + remoteslot); + } } else { /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found. Report the conflict. If + * the configured resolver is in favor of applying the change, convert + * UPDATE to INSERT and apply the change. */ if (MySubscription->detectconflict) - ReportApplyConflict(LOG, CT_UPDATE_MISSING, localrel, InvalidOid, - InvalidTransactionId, InvalidRepOriginId, 0, NULL); + { + resolver = GetConflictResolver(localrel, CT_UPDATE_MISSING, + &apply_remote, newtup, + MyLogicalRepWorker->subid); + + ReportApplyConflict(CT_UPDATE_MISSING, resolver, localrel, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0, NULL, apply_remote); + + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot, relmapentry, newtup); + slot_fill_defaults(relmapentry, estate, remoteslot); + MemoryContextSwitchTo(oldctx); + + if (apply_remote) + apply_handle_insert_internal(edata, relinfo, remoteslot, + newtup, relmapentry); + } } /* Cleanup. */ @@ -2815,6 +2885,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata, EPQState epqstate; TupleTableSlot *localslot; bool found; + bool apply_remote = true; + ConflictResolver resolver; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); ExecOpenIndices(relinfo, false); @@ -2836,24 +2908,44 @@ apply_handle_delete_internal(ApplyExecutionData *edata, if (MySubscription->detectconflict && GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) - ReportApplyConflict(LOG, CT_DELETE_DIFFER, localrel, InvalidOid, - localxmin, localorigin, localts, NULL); + { + resolver = GetConflictResolver(localrel, CT_DELETE_DIFFER, + &apply_remote, NULL, + MyLogicalRepWorker->subid); + + ReportApplyConflict(CT_DELETE_DIFFER, resolver, localrel, + InvalidOid, localxmin, localorigin, localts, + NULL, apply_remote); + } - EvalPlanQualSetSlot(&epqstate, localslot); + if (apply_remote) + { + EvalPlanQualSetSlot(&epqstate, localslot); + + /* Do the actual delete. */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE); + ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot); + } - /* Do the actual delete. */ - TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE); - ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot); } else { /* - * The tuple to be deleted could not be found. Do nothing except for - * emitting a log message. + * The tuple to be deleted could not be found. Based on resolver + * configured, either skip and log a message or emit an error. */ if (MySubscription->detectconflict) - ReportApplyConflict(LOG, CT_DELETE_MISSING, localrel, InvalidOid, - InvalidTransactionId, InvalidRepOriginId, 0, NULL); + { + resolver = GetConflictResolver(localrel, CT_DELETE_MISSING, + &apply_remote, NULL, + MyLogicalRepWorker->subid); + + /* Resolver is set to skip, thus report the conflict and skip */ + if (!apply_remote) + ReportApplyConflict(CT_DELETE_MISSING, resolver, localrel, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0, NULL, apply_remote); + } } /* Cleanup. */ @@ -2986,19 +3078,21 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); - /* Check if we can do the update or delete on the leaf partition. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE) { - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); - check_relation_updatable(part_entry); + part_entry = logicalrep_partition_open(relmapentry, partrel, attrmap); + + /* Check if we can do the update or delete on the leaf partition */ + if (operation != CMD_INSERT) + check_relation_updatable(part_entry); } switch (operation) { case CMD_INSERT: apply_handle_insert_internal(edata, partrelinfo, - remoteslot_part); + remoteslot_part, newtup, part_entry); break; case CMD_DELETE: @@ -3023,6 +3117,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, RepOriginId localorigin; TransactionId localxmin; TimestampTz localts; + LogicalRepRelMapEntry *part_entry_new = NULL; + ConflictResolver resolver; + bool apply_remote = true; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3032,38 +3129,81 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, if (!found) { /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found. Report the + * conflict and resolver. And take action based on the + * configured resolver. */ if (MySubscription->detectconflict) - ReportApplyConflict(LOG, CT_UPDATE_MISSING, - partrel, InvalidOid, - InvalidTransactionId, - InvalidRepOriginId, 0, NULL); - - return; + { + resolver = GetConflictResolver(partrel, CT_UPDATE_MISSING, + &apply_remote, newtup, + MyLogicalRepWorker->subid); + + ReportApplyConflict(CT_UPDATE_MISSING, resolver, partrel, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0, NULL, apply_remote); + + if (apply_remote) + { + /* + * Resolver is in favour of applying the remote + * changes. Prepare the slot for the INSERT. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot_part, part_entry, newtup); + slot_fill_defaults(part_entry, estate, remoteslot_part); + MemoryContextSwitchTo(oldctx); + } + } } + else + { + /* + * The tuple to be updated is found. If conflict detection + * is enabled, check whether the local tuple was modified + * by a different origin. If detected, report and resolve + * the conflict. + */ + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + { + resolver = GetConflictResolver(partrel, CT_UPDATE_DIFFER, + &apply_remote, NULL, + MyLogicalRepWorker->subid); - /* - * If conflict detection is enabled, check whether the local - * tuple was modified by a different origin. If detected, - * report the conflict. - */ - if (MySubscription->detectconflict && - GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) - ReportApplyConflict(LOG, CT_UPDATE_DIFFER, partrel, - InvalidOid, localxmin, localorigin, - localts, NULL); + ReportApplyConflict(CT_UPDATE_DIFFER, resolver, partrel, + InvalidOid, localxmin, localorigin, localts, + NULL, apply_remote); + } - /* - * Apply the update to the local tuple, putting the result in - * remoteslot_part. - */ - oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_data(remoteslot_part, localslot, part_entry, - newtup); - MemoryContextSwitchTo(oldctx); + if (apply_remote) + { + /* + * We can reach here in two cases: + * + * 1. If we found a tuple but conflict detection is + * OFF + * + * 2. If we found a tuple and resolver is in favor of + * applying the change when conflict detection is ON + * + * Putting the result in remoteslot_part. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot_part, localslot, part_entry, + newtup); + MemoryContextSwitchTo(oldctx); + } + else + + /* + * apply_remote can be toggled if resolver for + * update_differ is set to skip. Ignore remote update. + */ + return; + + } /* * Does the updated tuple still satisfy the current @@ -3073,27 +3213,59 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ExecPartitionCheck(partrelinfo, remoteslot_part, estate, false)) { - /* - * Yes, so simply UPDATE the partition. We don't call - * apply_handle_update_internal() here, which would - * normally do the following work, to avoid repeating some - * work already done above to find the local tuple in the - * partition. - */ - EPQState epqstate; - - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(partrelinfo, false); - - EvalPlanQualSetSlot(&epqstate, remoteslot_part); - TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, - ACL_UPDATE); - ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate, - localslot, remoteslot_part); - ExecCloseIndices(partrelinfo); - EvalPlanQualEnd(&epqstate); + if (found && apply_remote) + { + /* + * Yes, so simply UPDATE the partition. We don't call + * apply_handle_update_internal() here, which would + * normally do the following work, to avoid repeating + * some work already done above to find the local + * tuple in the partition. + * + * Do the update in cases - 1. conflict detection is + * OFF and found a tuple 2. conflict detection is ON, + * update_differ conflict is detected for the found + * tuple and the resolver is in favour of applying the + * update. + */ + + EPQState epqstate; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); + ExecOpenIndices(partrelinfo, false); + + EvalPlanQualSetSlot(&epqstate, remoteslot_part); + TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, + ACL_UPDATE); + ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate, + localslot, remoteslot_part); + ExecCloseIndices(partrelinfo); + EvalPlanQualEnd(&epqstate); + + } + else if (apply_remote) + { + /* + * Tuple is not found but update_missing resolver is + * in favour of applying the change as INSERT. + */ + apply_handle_insert_internal(edata, partrelinfo, + remoteslot_part, newtup, + part_entry); + } } - else + + /* + * Updated tuple doesn't satisfy the current partition's + * constraint. + * + * If conflict detection is OFF, proceed by always applying + * the update (as 'apply_remote' is by default true). If + * conflict detection is ON, 'apply_remote' can be OFF as well + * if the resolver for update_missing conflict conveys to skip + * the update. + */ + else if (apply_remote) { /* Move the tuple into the new partition. */ @@ -3134,10 +3306,16 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, get_namespace_name(RelationGetNamespace(partrel_new)), RelationGetRelationName(partrel_new)); - /* DELETE old tuple found in the old partition. */ - apply_handle_delete_internal(edata, partrelinfo, - localslot, - part_entry->localindexoid); + /* + * If tuple is found, delete it from old partition. We can + * reach this flow even for the case when the 'found' flag + * is false for 'update_missing' conflict and resolver is + * in favor of inserting the tuple. + */ + if (found) + apply_handle_delete_internal(edata, partrelinfo, + localslot, + part_entry->localindexoid); /* INSERT new tuple into the new partition. */ @@ -3153,19 +3331,25 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, map = ExecGetRootToChildMap(partrelinfo_new, estate); if (map != NULL) { + attrmap = map->attrMap; remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot, remoteslot_part); } else { + attrmap = NULL; remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); slot_getallattrs(remoteslot); } MemoryContextSwitchTo(oldctx); + part_entry_new = logicalrep_partition_open(part_entry, + partrel_new, + attrmap); apply_handle_insert_internal(edata, partrelinfo_new, - remoteslot_part); + remoteslot_part, newtup, + part_entry_new); } } break; diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 9770752ea3..7d4a698c83 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -18,6 +18,8 @@ #include "fmgr.h" #include "nodes/lockoptions.h" #include "nodes/parsenodes.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "utils/memutils.h" @@ -655,7 +657,8 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, - EState *estate, TupleTableSlot *slot); + EState *estate, TupleTableSlot *slot, + TupleTableSlot **conflictsloty, Oid subid); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot); diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 91ab4908b1..8ce20c5bb4 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -12,6 +12,8 @@ #include "access/xlogdefs.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "utils/relcache.h" #include "utils/timestamp.h" @@ -82,10 +84,11 @@ typedef struct ConflictTypeResolver extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts); -extern void ReportApplyConflict(int elevel, ConflictType type, +extern void ReportApplyConflict(ConflictType type, ConflictResolver resolver, Relation localrel, Oid conflictidx, TransactionId localxmin, RepOriginId localorigin, - TimestampTz localts, TupleTableSlot *conflictslot); + TimestampTz localts, TupleTableSlot *conflictslot, + bool apply_remote); extern void InitConflictIndexes(ResultRelInfo *relInfo); extern void SetSubConflictResolver(Oid subId, ConflictTypeResolver * resolvers); extern void RemoveSubscriptionConflictById(Oid confid); @@ -95,5 +98,11 @@ extern void UpdateSubConflictResolvers(List *conflict_resolvers, Oid subid); extern void validate_conflict_type_and_resolver(const char *conflict_type, const char *conflict_resolver); extern void SetDefaultResolvers(ConflictTypeResolver * conflictResolvers); +extern ConflictResolver GetConflictResolver(Relation localrel, + ConflictType type, + bool *apply_remote, + LogicalRepTupleData *newtup, + Oid subid); +extern bool CanCreateFullTuple(Relation localrel, LogicalRepTupleData *newtup); #endif diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index c591cd7d61..00ade29b02 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/034_conflict_resolver.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index 496a3c6cd9..e6f07fac03 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -113,6 +113,11 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on, detect_conflict = on)" ); +# Set 'ERROR' conflict resolver for 'insert_exist' conflict type +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub CONFLICT RESOLVER (insert_exists=error)"); + + # Initial synchronization failure causes the subscription to be disabled. $node_subscriber->poll_query_until('postgres', "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" @@ -177,6 +182,10 @@ $result = $node_subscriber->safe_psql('postgres', is($result, "0", "check all prepared transactions are resolved on the subscriber"); +# Reset conflict resolver for 'insert_exist' conflict type to default. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub CONFLICT RESOLVER (insert_exists=remote_apply)"); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/034_conflict_resolver.pl b/src/test/subscription/t/034_conflict_resolver.pl new file mode 100755 index 0000000000..58751c5c24 --- /dev/null +++ b/src/test/subscription/t/034_conflict_resolver.pl @@ -0,0 +1,576 @@ +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node with track_commit_timestamp_set +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->start; + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text, comments text); + ALTER TABLE conf_tab ALTER COLUMN comments SET STORAGE EXTERNAL;"); + +# Create similar table on subscriber +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text, comments text); + ALTER TABLE conf_tab ALTER COLUMN comments SET STORAGE EXTERNAL;"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE conf_tab"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (detect_conflict = on);"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype" +); +is( $result, qq(delete_differ|remote_apply +delete_missing|skip +insert_exists|remote_apply +update_differ|remote_apply +update_exists|remote_apply +update_missing|apply_or_skip), + "confirm that the default conflict resolvers are in place"); + +############################################ +# Test 'remote_apply' for 'insert_exists' +############################################ + +# Create local data on the subscriber +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'fromsub')"); + +# Create conflicting data on the publisher +my $log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict insert_exists detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that remote insert is converted to an update and the remote data is updated. +$result = $node_subscriber->safe_psql('postgres', + "SELECT data FROM conf_tab WHERE (a=1);"); + +is($result, 'frompub', "remote data is kept"); + + +######################################## +# Test 'keep_local' for 'insert_exists' +######################################## + +# Change CONFLICT RESOLVER of insert_exists to keep_local +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (insert_exists = 'keep_local');" +); + +# Create local data on the subscriber +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (2,'fromsub')"); + +# Confirm that row is updated +$result = $node_subscriber->safe_psql('postgres', + "SELECT data FROM conf_tab WHERE (a=2);"); + +is($result, 'fromsub', "data 2 from local is inserted"); + +$log_offset = -s $node_subscriber->logfile; + +# Create conflicting data on the publisher +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (2,'frompub')"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict insert_exists detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that remote insert is ignored and the local row is kept +$result = $node_subscriber->safe_psql('postgres', + "SELECT data FROM conf_tab WHERE (a=2);"); + +is($result, 'fromsub', "data from local is kept"); + +################################### +# Test 'error' for 'insert_exists' +################################### + +# Change CONFLICT RESOLVER of insert_exists to error +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (insert_exists = 'error');" +); + +# Create local data on the subscriber +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (3,'fromsub')"); + +# Create conflicting data on the publisher +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (3,'frompub')"); + +$log_offset = -s $node_subscriber->logfile; + +# Confirm that this causes an error on the subscriber +$node_subscriber->wait_for_log( + qr/ERROR: conflict insert_exists detected on relation \"public.conf_tab\"/, + $log_offset); + +# Truncate table on subscriber to get rid of the error +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +################################### +# Test 'skip' for 'delete_missing' +################################### + +# Delete row on publisher that is not present on the subscriber and confirm that it is skipped +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=2);"); + +# Confirm that the missing row is skipped because 'delete_missing' is set to 'skip' +$node_subscriber->wait_for_log( + qr/LOG: conflict delete_missing detected on relation \"public.conf_tab\"/, + $log_offset); + +#################################### +# Test 'error' for 'delete_missing' +#################################### + +# Change CONFLICT RESOLVER of delete_missing to error +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (delete_missing = 'error');" +); + +# Capture the log offset before performing the delete on the publisher +$log_offset = -s $node_subscriber->logfile; + +# Perform the delete on the publisher +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=1);"); + +# Confirm that this causes an error on the subscriber +$node_subscriber->wait_for_log( + qr/ERROR: conflict delete_missing detected on relation \"public.conf_tab\"/, + $log_offset); + +# Drop the subscriber to remove error +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub;"); + +# Truncate table on subscriber +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Truncate the table on the publisher +$node_publisher->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Create the subscription +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (detect_conflict = on)"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + + +######################################### +# Test 'remote_apply' for 'delete_differ' +######################################### + +# Insert data in the publisher +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (2,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (3,'frompub');"); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=1);"); + +# Create a conflicting delete on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=1);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict delete_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the remote delete the local updated row +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=1);"); + +is($result, '', "delete from remote is applied"); + +######################################### +# Test 'keep_local' for 'delete_differ' +######################################### + +# Change CONFLICT RESOLVER of delete_differ to keep_local +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (delete_differ = 'keep_local');" +); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=2);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=2);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict delete_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the local data is untouched by the remote update +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=2);"); + +is($result, 'fromsub', "update from local is kept"); + +######################################### +# Test 'error' for 'delete_differ' +######################################### + +# Change CONFLICT RESOLVER of delete_differ to error +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (delete_differ = 'error');" +); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=3);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=3);"); + +$node_subscriber->wait_for_log( + qr/ERROR: conflict delete_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Drop the subscriber to remove error +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub;"); + +# Truncate the table on the publisher +$node_publisher->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Truncate the table on the subscriber +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Create the subscription +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (detect_conflict = on)"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +######################################### +# Test 'remote_apply' for 'update_differ' +######################################### + +# Insert data in the publisher +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (2,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (3,'frompub');"); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=1);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict update_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the remote update overrides the local update +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=1);"); + +is($result, 'frompubnew', "update from remote is kept"); + +######################################### +# Test 'keep_local' for 'update_differ' +######################################### + +# Change CONFLICT RESOLVER of update_differ to keep_local +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (update_differ = 'keep_local');" +); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=2);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=2);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict update_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the local data is untouched by the remote update +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=2);"); + +is($result, 'fromsub', "update from local is kept"); + +######################################### +# Test 'error' for 'update_differ' +######################################### + +# Change CONFLICT RESOLVER of update_differ to error +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (update_differ = 'error');" +); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=3);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=3);"); + +$node_subscriber->wait_for_log( + qr/ERROR: conflict update_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Drop the subscriber to remove error +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub;"); + +# Truncate the table on the publisher +$node_publisher->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Truncate the table on the subscriber +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Create the subscription +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (detect_conflict = on)"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Insert data in the publisher +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (2,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (3,'frompub');"); + +########################################### +# Test 'apply_or_skip' for 'update_missing' +########################################### + +#test the apply part + +# Delete the row on the subscriber +$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=3);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=3);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict update_missing detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the remote update is converted to an insert and new row applied +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=3);"); + +is($result, 'frompubnew', "update from remote is converted to insert"); + +#test the skip part + +# Create new row on publisher with toast data +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab (a, data, comments) VALUES(4,'frompub',repeat('abcdefghij', 200));" +); + +# Delete the row on the subscriber +$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=4);"); + +# Update the row on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=4);"); + +$node_subscriber->wait_for_log( + qr/DETAIL: Did not find the row to be updated. UPDATE can not be converted to INSERT, hence SKIP the update./, + $log_offset); + +########################################### +# Test 'apply_or_error' for 'update_missing' +########################################### + +# test the apply part + +#Change CONFLICT RESOLVER of update_missing to apply_or_error +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (update_missing = 'apply_or_error');" +); + +# Create new row on the publisher +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (5,'frompub');"); + +# Delete the row on the subscriber +$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=5);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=5);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict update_missing detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the remote update is converted to an insert and new row applied +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=5);"); + +is($result, 'frompubnew', "update from remote is converted to insert"); + +#test the error part + +# Create new row on publisher with toast data +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab (a, data, comments) VALUES(6,'frompub',repeat('abcdefghij', 200));" +); + +# Delete the row on the subscriber +$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=6);"); + +# Update the row on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=6);"); + +$node_subscriber->wait_for_log( + qr/ERROR: conflict update_missing detected on relation \"public.conf_tab\"/, + $log_offset); + +# Drop the subscriber to remove error +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub;"); + +# Truncate the table on the publisher +$node_publisher->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Truncate the table on the subscriber +$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); + +# Create the subscription +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (detect_conflict = on)"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Insert data in the publisher +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (2,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (3,'frompub');"); + +################################### +# Test 'skip' for 'update_missing' +################################### + +#Change CONFLICT RESOLVER of update_missing to skip +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (update_missing = 'skip');" +); + +# Delete the row on the subscriber +$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=2);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=2);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict update_missing detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the update does not change anything on the subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=2);"); + +is($result, '', "update from remote is skipped on the subscriber"); + +################################### +# Test 'error' for 'update_missing' +################################### + +#Change CONFLICT RESOLVER of update_missing to error +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (update_missing = 'error');" +); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnewer' WHERE (a=2);"); + +$node_subscriber->wait_for_log( + qr/ERROR: conflict update_missing detected on relation \"public.conf_tab\"/, + $log_offset); + +done_testing(); -- 2.34.1