From 4b66fe543e1cc68102313f891914d023a5748ac7 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 31 May 2024 14:20:45 +0530 Subject: [PATCH v3 1/5] Detect and log conflicts in logical replication This patch adds a new parameter detect_conflict for CREATE and ALTER subscription commands. This new parameter will decide if subscription will go for confict detection. By default, conflict detection will be off for a subscription. When conflict detection is enabled, additional logging is triggered in the following conflict scenarios: insert_exists: Inserting a row that iolates a NOT DEFERRABLE unique constraint. update_differ: updating a row that was previously modified by another origin. update_missing: The tuple to be updated is missing. delete_missing: The tuple to be deleted is missing. For insert_exists conflict, the log can include origin and commit timestamp details of the conflicting key with track_commit_timestamp enabled. update_differ conflict can only be detected when track_commit_timestamp is enabled. --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 55 ++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 31 +++- src/backend/executor/execIndexing.c | 14 +- src/backend/executor/execReplication.c | 117 +++++++++++- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/conflict.c | 187 ++++++++++++++++++++ src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/worker.c | 50 ++++-- src/bin/pg_dump/pg_dump.c | 17 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.c | 14 +- src/include/catalog/pg_subscription.h | 4 + src/include/replication/conflict.h | 44 +++++ src/test/regress/expected/subscription.out | 176 ++++++++++-------- src/test/regress/sql/subscription.sql | 15 ++ src/test/subscription/t/001_rep_changes.pl | 15 +- src/test/subscription/t/013_partition.pl | 48 ++--- src/test/subscription/t/029_on_error.pl | 5 +- src/test/subscription/t/030_origin.pl | 26 +++ src/tools/pgindent/typedefs.list | 1 + 25 files changed, 695 insertions(+), 151 deletions(-) create mode 100644 src/backend/replication/logical/conflict.c create mode 100644 src/include/replication/conflict.h diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index b654fae1b2..b042a5a94a 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8035,6 +8035,15 @@ SCRAM-SHA-256$<iteration count>:&l + + + subdetectconflict bool + + + If true, the subscription is enabled for conflict detection. + + + subconninfo text diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 476f195622..5f6b83e415 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -228,8 +228,9 @@ ALTER SUBSCRIPTION name RENAME TO < disable_on_error, password_required, run_as_owner, - origin, and - failover. + origin, + failover, and + detect_conflict. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 740b7d9421..ce37fa6490 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -428,6 +428,61 @@ CREATE SUBSCRIPTION subscription_name + + + detect_conflict (boolean) + + + Specifies whether the subscription is enabled for conflict detection. + The default is false. + + + When conflict detection is enabled, additional logging is triggered + in the following scenarios: + + + insert_exists + + + Inserting a row that violates a NOT DEFERRABLE + unique constraint. Note that to obtain the origin and commit + timestamp details of the conflicting key in the log, ensure that + track_commit_timestamp + is enabled. + + + + + update_differ + + + Updating a row that was previously modified by another origin. Note that this + conflict can only be detected when + track_commit_timestamp + is enabled. + + + + + update_missing + + + The tuple to be updated is not found. + + + + + delete_missing + + + The tuple to be deleted is not found. + + + + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..5a423f4fb0 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->detectconflict = subform->subdetectconflict; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 19cabc9a47..d084bfc48a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1356,7 +1356,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subdetectconflict, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e407428dbc..e670d72708 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -70,8 +70,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_DETECT_CONFLICT 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -97,6 +98,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool detectconflict; char *origin; XLogRecPtr lsn; } SubOpts; @@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT)) + opts->detectconflict = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT) && + strcmp(defel->defname, "detect_conflict") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_DETECT_CONFLICT)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_DETECT_CONFLICT; + opts->detectconflict = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -603,7 +616,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -710,6 +724,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1146,7 +1162,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1256,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_DETECT_CONFLICT)) + { + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); + replaces[Anum_pg_subscription_subdetectconflict - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index 9f05b3654c..2a2d188b19 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative) ii = BuildIndexInfo(indexDesc); /* - * If the indexes are to be used for speculative insertion, add extra - * information required by unique index entries. + * If the indexes are to be used for speculative insertion or conflict + * detection in logical replication, add extra information required by + * unique index entries. */ if (speculative && ii->ii_Unique) BuildSpeculativeIndexInfo(indexDesc, ii); @@ -521,6 +522,11 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, * possible that a conflicting tuple is inserted immediately * after this returns. But this can be used for a pre-check * before insertion. + * + * If the 'slot' holds a tuple with valid tid, this tuple will + * be ingored when checking conflict. This can help in scenarios + * where we want to re-check for conflicts after inserting a + * tuple. * ---------------------------------------------------------------- */ bool @@ -536,11 +542,9 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, ExprContext *econtext; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - ItemPointerData invalidItemPtr; bool checkedIndex = false; ItemPointerSetInvalid(conflictTid); - ItemPointerSetInvalid(&invalidItemPtr); /* * Get information from the result relation info structure. @@ -629,7 +633,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, satisfiesConstraint = check_exclusion_or_unique_constraint(heapRelation, indexRelation, - indexInfo, &invalidItemPtr, + indexInfo, &slot->tts_tid, values, isnull, estate, false, CEOUC_WAIT, true, conflictTid); diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd577..f01927a933 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" +#include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" #include "utils/builtins.h" @@ -480,6 +481,83 @@ retry: return found; } +/* + * Find the tuple that violates the passed in unique index constraint + * (conflictindex). + * + * Return false if there is no conflict and *conflictslot is set to NULL. + * Otherwise return true, and the conflicting tuple is locked and returned + * in *conflictslot. + */ +static bool +FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, + Oid conflictindex, TupleTableSlot *slot, + TupleTableSlot **conflictslot) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + ItemPointerData conflictTid; + TM_FailureData tmfd; + TM_Result res; + + *conflictslot = NULL; + +retry: + if (ExecCheckIndexConstraints(resultRelInfo, slot, estate, + &conflictTid, list_make1_oid(conflictindex))) + { + if (*conflictslot) + ExecDropSingleTupleTableSlot(*conflictslot); + + *conflictslot = NULL; + return false; + } + + *conflictslot = table_slot_create(rel, NULL); + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(), + *conflictslot, + GetCurrentCommandId(false), + LockTupleShare, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + switch (res) + { + case TM_Ok: + break; + case TM_Updated: + /* XXX: Improve handling here */ + if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + goto retry; + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; + case TM_Invisible: + elog(ERROR, "attempted to lock invisible tuple"); + break; + default: + elog(ERROR, "unexpected table_tuple_lock status: %u", res); + break; + } + + return true; +} + /* * Insert tuple represented in the slot to the relation, update the indexes, * and execute any constraints and per-row triggers. @@ -509,6 +587,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, if (!skip_tuple) { List *recheckIndexes = NIL; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -525,10 +605,43 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, false, - NULL, NIL, false); + slot, estate, false, + conflictindexes, &conflict, + conflictindexes, false); + + /* Re-check all the unique indexes for potential conflicts */ + foreach_oid(uniqueidx, (conflict ? conflictindexes : NIL)) + { + TupleTableSlot *conflictslot; + + /* + * Reports the conflict if any. + * + * Here, we attempt to find the conflict tuple. This operation may + * seem redundant with the unique violation check of indexam, but + * since we perform this only when we are detecting conflict in + * logical replication and encountering potential conflicts with + * any unique index constraints (which should not be frequent), so + * it's ok. Moreover, upon detecting a conflict, we will report an + * ERROR and restart the logical replication, so the additional + * cost of finding the tuple should be acceptable in this case. + */ + if (list_member_oid(recheckIndexes, uniqueidx) && + FindConflictTuple(resultRelInfo, estate, uniqueidx, slot, &conflictslot)) + { + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + + GetTupleCommitTs(conflictslot, &xmin, &origin, &committs); + ReportApplyConflict(ERROR, CT_INSERT_EXISTS, rel, uniqueidx, + xmin, origin, committs, conflictslot); + } + } /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, slot, diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index ba03eeff1c..1e08bbbd4e 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ applyparallelworker.o \ + conflict.o \ decode.o \ launcher.o \ logical.o \ diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c new file mode 100644 index 0000000000..f24f048054 --- /dev/null +++ b/src/backend/replication/logical/conflict.c @@ -0,0 +1,187 @@ +/*------------------------------------------------------------------------- + * conflict.c + * Functionality for detecting and logging conflicts. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/conflict.c + * + * This file contains the code for detecting and logging conflicts on + * the subscriber during logical replication. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/commit_ts.h" +#include "replication/conflict.h" +#include "replication/origin.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" + +const char *const ConflictTypeNames[] = { + [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_DIFFER] = "update_differ", + [CT_UPDATE_MISSING] = "update_missing", + [CT_DELETE_MISSING] = "delete_missing" +}; + +static char *build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot); +static int errdetail_apply_conflict(ConflictType type, Oid conflictidx, + TransactionId localxmin, + RepOriginId localorigin, + TimestampTz localts, + TupleTableSlot *conflictslot); + +/* + * Get the xmin and commit timestamp data (origin and timestamp) associated + * with the provided local tuple. + * + * Return true if the commit timestamp data was found, false otherwise. + */ +bool +GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, TimestampTz *localts) +{ + Datum xminDatum; + bool isnull; + + xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber, + &isnull); + *xmin = DatumGetTransactionId(xminDatum); + Assert(!isnull); + + /* + * The commit timestamp data is not available if track_commit_timestamp is + * disabled. + */ + if (!track_commit_timestamp) + { + *localorigin = InvalidRepOriginId; + *localts = 0; + return false; + } + + return TransactionIdGetCommitTsData(*xmin, localts, localorigin); +} + +/* + * Report a conflict when applying remote changes. + */ +void +ReportApplyConflict(int elevel, ConflictType type, Relation localrel, + Oid conflictidx, TransactionId localxmin, + RepOriginId localorigin, TimestampTz localts, + TupleTableSlot *conflictslot) +{ + ereport(elevel, + errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("conflict %s detected on relation \"%s.%s\"", + ConflictTypeNames[type], + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel)), + errdetail_apply_conflict(type, conflictidx, localxmin, localorigin, + localts, conflictslot)); +} + +/* + * Find all unique indexes to check for a conflict and store them into + * ResultRelInfo. + */ +void +InitConflictIndexes(ResultRelInfo *relInfo) +{ + List *uniqueIndexes = NIL; + + for (int i = 0; i < relInfo->ri_NumIndices; i++) + { + Relation indexRelation = relInfo->ri_IndexRelationDescs[i]; + + if (indexRelation == NULL) + continue; + + /* Detect conflict only for unique indexes */ + if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique) + continue; + + /* Don't support conflict detection for deferrable index */ + if (!indexRelation->rd_index->indimmediate) + continue; + + uniqueIndexes = lappend_oid(uniqueIndexes, + RelationGetRelid(indexRelation)); + } + + relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; +} + +/* + * Add an errdetail() line showing conflict detail. + */ +static int +errdetail_apply_conflict(ConflictType type, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot) +{ + switch (type) + { + case CT_INSERT_EXISTS: + { + /* + * Bulid the index value string. If the return value is NULL, + * it indicates that the current user lacks permissions to + * view all the columns involved. + */ + char *index_value = build_index_value_desc(conflictidx, + conflictslot); + + if (index_value && localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", + index_value, get_rel_name(conflictidx), localorigin, + localxmin, timestamptz_to_str(localts)); + else if (index_value && !localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", + index_value, get_rel_name(conflictidx), localxmin); + else + return errdetail("Key already exists in unique index \"%s\".", + get_rel_name(conflictidx)); + } + case CT_UPDATE_DIFFER: + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s.", + localorigin, localxmin, timestamptz_to_str(localts)); + case CT_UPDATE_MISSING: + return errdetail("Did not find the row to be updated."); + case CT_DELETE_MISSING: + return errdetail("Did not find the row to be deleted."); + } + + return 0; /* silence compiler warning */ +} + +/* + * Helper functions to construct a string describing the contents of an index + * entry. See BuildIndexValueDescription for details. + */ +static char * +build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot) +{ + char *conflict_row; + Relation indexDesc; + + if (!conflictslot) + return NULL; + + /* Assume the index has been locked */ + indexDesc = index_open(indexoid, NoLock); + + slot_getallattrs(conflictslot); + + conflict_row = BuildIndexValueDescription(indexDesc, + conflictslot->tts_values, + conflictslot->tts_isnull); + + index_close(indexDesc, NoLock); + + return conflict_row; +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 3dec36a6de..3d36249d8a 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -2,6 +2,7 @@ backend_sources += files( 'applyparallelworker.c', + 'conflict.c', 'decode.c', 'launcher.c', 'logical.c', diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3b285894db..06ee46886b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -167,6 +167,7 @@ #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" #include "postmaster/walwriter.h" +#include "replication/conflict.h" #include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" @@ -2461,7 +2462,10 @@ apply_handle_insert_internal(ApplyExecutionData *edata, EState *estate = edata->estate; /* We must open indexes here. */ - ExecOpenIndices(relinfo, false); + ExecOpenIndices(relinfo, MySubscription->detectconflict); + + if (MySubscription->detectconflict) + InitConflictIndexes(relinfo); /* Do the insert. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); @@ -2664,6 +2668,20 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * If conflict detection is enabled, check whether the local tuple was + * modified by a different origin. If detected, report the conflict. + */ + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(LOG, CT_UPDATE_DIFFER, localrel, InvalidOid, + localxmin, localorigin, localts, NULL); + /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_modify_data(remoteslot, localslot, relmapentry, newtup); @@ -2681,13 +2699,10 @@ apply_handle_update_internal(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_UPDATE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -2821,13 +2836,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be deleted " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_DELETE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -3005,13 +3017,13 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing * except for emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation's partition \"%s\"", - RelationGetRelationName(partrel)); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_UPDATE_MISSING, + partrel, InvalidOid, + InvalidTransactionId, + InvalidRepOriginId, 0, NULL); + return; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 5426f1177c..e0ecadd081 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4754,6 +4754,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subdetectconflict; int i, ntups; @@ -4826,11 +4827,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBuffer(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + if (fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, + " s.subdetectconflict\n"); + else + appendPQExpBuffer(query, + " false AS subdetectconflict\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -4869,6 +4876,7 @@ getSubscriptions(Archive *fout) i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); i_subenabled = PQfnumber(res, "subenabled"); i_subfailover = PQfnumber(res, "subfailover"); + i_subdetectconflict = PQfnumber(res, "subdetectconflict"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4915,6 +4923,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subenabled)); subinfo[i].subfailover = pg_strdup(PQgetvalue(res, i, i_subfailover)); + subinfo[i].subdetectconflict = + pg_strdup(PQgetvalue(res, i, i_subdetectconflict)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5155,6 +5165,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subfailover, "t") == 0) appendPQExpBufferStr(query, ", failover = true"); + if (strcmp(subinfo->subdetectconflict, "t") == 0) + appendPQExpBufferStr(query, ", detect_conflict = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 4b2e5870a9..bbd7cbeff6 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -671,6 +671,7 @@ typedef struct _SubscriptionInfo char *suborigin; char *suboriginremotelsn; char *subfailover; + char *subdetectconflict; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7c9a1f234c..fef1ad0d70 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6539,7 +6539,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6607,6 +6607,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 170000) + appendPQExpBuffer(&buf, + ", subdetectconflict AS \"%s\"\n", + gettext_noop("Detect conflict")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d453e224d9..219fac7e71 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1946,9 +1946,10 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + COMPLETE_WITH("binary", "detect_conflict", "disable_on_error", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3363,9 +3364,10 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "detect_conflict", "disable_on_error", "enabled", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..17daf11dc7 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subdetectconflict; /* True if replication should perform + * conflict detection */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -151,6 +154,7 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool detectconflict; /* True if conflict detection is enabled */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h new file mode 100644 index 0000000000..0bc9db991e --- /dev/null +++ b/src/include/replication/conflict.h @@ -0,0 +1,44 @@ +/*------------------------------------------------------------------------- + * conflict.h + * Exports for conflict detection and log + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef CONFLICT_H +#define CONFLICT_H + +#include "access/xlogdefs.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" +#include "utils/relcache.h" +#include "utils/timestamp.h" + +/* + * Conflict types that could be encountered when applying remote changes. + */ +typedef enum +{ + /* The row to be inserted violates unique constraint */ + CT_INSERT_EXISTS, + + /* The row to be updated was modified by a different origin */ + CT_UPDATE_DIFFER, + + /* The row to be updated is missing */ + CT_UPDATE_MISSING, + + /* The row to be deleted is missing */ + CT_DELETE_MISSING, +} ConflictType; + +extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, TimestampTz *localts); +extern void ReportApplyConflict(int elevel, ConflictType type, + Relation localrel, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot); +extern void InitConflictIndexes(ResultRelInfo *relInfo); + +#endif diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 0f2a25cdc1..a8b0086dd9 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,10 +371,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -383,10 +383,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,10 +396,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,18 +412,42 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); +ERROR: detect_conflict requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 3e5ba4cb8c..a77b196704 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -290,6 +290,21 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 471e981962..78c0307165 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -331,11 +331,12 @@ is( $result, qq(1|bar 2|baz), 'update works with REPLICA IDENTITY FULL and a primary key'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); -$node_subscriber->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (detect_conflict = true)" +); $node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); @@ -352,10 +353,10 @@ $node_publisher->wait_for_catchup('tap_sub'); my $logfile = slurp_file($node_subscriber->logfile, $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/, + qr/conflict update_missing detected on relation "public.tab_full_pk".*\n.*DETAIL:.* Did not find the row to be updated./m, 'update target row is missing'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/, + qr/conflict delete_missing detected on relation "public.tab_full_pk".*\n.*DETAIL:.* Did not find the row to be deleted./m, 'delete target row is missing'); $node_subscriber->append_conf('postgresql.conf', diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 29580525a9..c64f17211d 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -343,12 +343,12 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); is($result, qq(), 'truncate of tab1 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = true)" +); $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); @@ -372,21 +372,21 @@ $node_publisher->wait_for_catchup('sub2'); my $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/, + qr/conflict update_missing detected on relation "public.tab1_2_2".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/, + qr/conflict delete_missing detected on relation "public.tab1_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/, + qr/conflict delete_missing detected on relation "public.tab1_2_2".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/, + qr/conflict delete_missing detected on relation "public.tab1_def".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_def'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = false)" +); # Tests for replication using root table identity and schema @@ -773,12 +773,12 @@ pub_tab2|3|yyy pub_tab2|5|zzz xxx_c|6|aaa), 'inserts into tab2 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = true)" +); $node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); @@ -796,15 +796,15 @@ $node_publisher->wait_for_catchup('sub2'); $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/, + qr/conflict update_missing detected on relation "public.tab2_1".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab2_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, + qr/conflict delete_missing detected on relation "public.tab2_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab2_1'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = false)" +); # Test that replication continues to work correctly after altering the # partition of a partitioned target table. diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index 0ab57a4b5b..496a3c6cd9 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -30,7 +30,7 @@ sub test_skip_lsn # ERROR with its CONTEXT when retrieving this information. my $contents = slurp_file($node_subscriber->logfile, $offset); $contents =~ - qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m + qr/conflict insert_exists detected on relation "public.tbl".*\n.*DETAIL:.* Key \(i\)=\(1\) already exists in unique index "tbl_pkey", which was modified by origin \d+ in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m or die "could not get error-LSN"; my $lsn = $1; @@ -83,6 +83,7 @@ $node_subscriber->append_conf( 'postgresql.conf', qq[ max_prepared_transactions = 10 +track_commit_timestamp = on ]); $node_subscriber->start; @@ -109,7 +110,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR TABLE tbl"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)" + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on, detect_conflict = on)" ); # Initial synchronization failure causes the subscription to be disabled. diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 056561f008..03dabfeb72 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -26,7 +26,12 @@ my $stderr; # node_A my $node_A = PostgreSQL::Test::Cluster->new('node_A'); $node_A->init(allows_streaming => 'logical'); + +# Enable the track_commit_timestamp to detect the conflict when attempting to +# update a row that was previously modified by a different origin. +$node_A->append_conf('postgresql.conf', 'track_commit_timestamp = on'); $node_A->start; + # node_B my $node_B = PostgreSQL::Test::Cluster->new('node_B'); $node_B->init(allows_streaming => 'logical'); @@ -89,11 +94,32 @@ is( $result, qq(11 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' ); +############################################################################### +# Check that the conflict can be detected when attempting to update a row that +# was previously modified by a different source. +############################################################################### + +$node_A->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (detect_conflict = true);"); + +$node_B->safe_psql('postgres', "UPDATE tab SET a = 10 WHERE a = 11;"); + +$node_A->wait_for_log( + qr/Updating a row that was modified by a different origin [0-9]+ in transaction [0-9]+ at .*/ +); + $node_A->safe_psql('postgres', "DELETE FROM tab;"); $node_A->wait_for_catchup($subname_BA); $node_B->wait_for_catchup($subname_AB); +# The remaining tests no longer test conflict detection. +$node_A->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (detect_conflict = false);"); + +$node_A->append_conf('postgresql.conf', 'track_commit_timestamp = off'); +$node_A->restart; + ############################################################################### # Check that remote data of node_B (that originated from node_C) is not # published to node_A. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e710fa48e5..2485a80833 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -466,6 +466,7 @@ ConditionVariableMinimallyPadded ConditionalStack ConfigData ConfigVariable +ConflictType ConnCacheEntry ConnCacheKey ConnParams -- 2.34.1