From 65e60fa35df180ccf781363d6c2d363c0be36637 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 31 Jul 2024 03:01:17 -0400 Subject: [PATCH v8 1/4] Detect and log conflicts in logical replication This patch adds a new parameter detect_conflict for CREATE and ALTER subscription commands. This new parameter will decide if subscription will go for confict detection. By default, conflict detection will be off for a subscription. When conflict detection is enabled, additional logging is triggered in the following conflict scenarios: insert_exists: Inserting a row that violates a NOT DEFERRABLE unique constraint. update_exists: The updated row value violates a NOT DEFERRABLE unique constraint. update_differ: Updating a row that was previously modified by another origin. update_missing: The tuple to be updated is missing. delete_missing: The tuple to be deleted is missing. delete_differ: Deleting a row that was previously modified by another origin. For insert_exists conflict, the log can include origin and commit timestamp details of the conflicting key with track_commit_timestamp enabled. update_differ and delete_differ conflicts can only be detected when track_commit_timestamp is enabled. --- doc/src/sgml/catalogs.sgml | 9 ++ doc/src/sgml/logical-replication.sgml | 21 ++- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 84 ++++++++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 31 ++++- src/backend/executor/execIndexing.c | 14 +- src/backend/executor/execReplication.c | 142 +++++++++++++++++++- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/conflict.c | 193 ++++++++++++++++++++++++++++ src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/worker.c | 84 +++++++++--- src/bin/pg_dump/pg_dump.c | 17 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.c | 14 +- src/include/catalog/pg_subscription.h | 4 + src/include/replication/conflict.h | 50 +++++++ src/test/regress/expected/subscription.out | 176 ++++++++++++++----------- src/test/regress/sql/subscription.sql | 15 +++ src/test/subscription/t/001_rep_changes.pl | 15 ++- src/test/subscription/t/013_partition.pl | 68 ++++++---- src/test/subscription/t/029_on_error.pl | 5 +- src/test/subscription/t/030_origin.pl | 43 +++++++ src/tools/pgindent/typedefs.list | 1 + 26 files changed, 849 insertions(+), 155 deletions(-) create mode 100644 src/backend/replication/logical/conflict.c create mode 100644 src/include/replication/conflict.h diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index b654fae..b042a5a 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8037,6 +8037,15 @@ SCRAM-SHA-256$<iteration count>:&l + subdetectconflict bool + + + If true, the subscription is enabled for conflict detection. + + + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index a23a3d5..d236d85 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1580,7 +1580,8 @@ test_sub=# SELECT * FROM t1 ORDER BY id; stop. This is referred to as a conflict. When replicating UPDATE or DELETE operations, missing data will not produce a conflict and such operations - will simply be skipped. + will simply be skipped. Please refer to detect_conflict + for all the conflicts that will be logged when enabling detect_conflict. @@ -1649,6 +1650,24 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER linkend="sql-altersubscription">ALTER SUBSCRIPTION ... SKIP. + + + Enabling both detect_conflict + and track_commit_timestamp + on the subscriber can provide additional details regarding conflicting + rows, such as their origin and commit timestamp, in case of a unique + constraint violation conflict: + +ERROR: conflict insert_exists detected on relation "public.t" +DETAIL: Key (a)=(1) already exists in unique index "t_pkey", which was modified by origin 0 in transaction 740 at 2024-06-26 10:47:04.727375+08. +CONTEXT: processing remote data for replication origin "pg_16389" during message type "INSERT" for replication target relation "public.t" in transaction 740, finished at 0/14F7EC0 + + Users can use these information to make decisions on whether to retain + the local change or adopt the remote alteration. For instance, the + origin in above log indicates that the existing row was modified by a + local change, users can manually perform a remote-change-win resolution + by deleting the local row. + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d..dfbe25b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -235,8 +235,9 @@ ALTER SUBSCRIPTION name RENAME TO < password_required, run_as_owner, origin, - failover, and - two_phase. + failover, + two_phase, and + detect_conflict. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 740b7d94..eb51805 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -428,6 +428,90 @@ CREATE SUBSCRIPTION subscription_name + + + detect_conflict (boolean) + + + Specifies whether the subscription is enabled for conflict detection. + The default is false. + + + When conflict detection is enabled, additional logging is triggered + in the following scenarios: + + + insert_exists + + + Inserting a row that violates a NOT DEFERRABLE + unique constraint. Note that to obtain the origin and commit + timestamp details of the conflicting key in the log, ensure that + track_commit_timestamp + is enabled. In this scenario, an error will be raised until the + conflict is resolved manually. + + + + + update_exists + + + The updated value of a row violates a NOT DEFERRABLE + unique constraint. Note that to obtain the origin and commit + timestamp details of the conflicting key in the log, ensure that + track_commit_timestamp + is enabled. In this scenario, an error will be raised until the + conflict is resolved manually. + + + + + update_differ + + + Updating a row that was previously modified by another origin. + Note that this conflict can only be detected when + track_commit_timestamp + is enabled. Currenly, the update is always applied regardless of + the origin of the local row. + + + + + update_missing + + + The tuple to be updated was not found. The update will simply be + skipped in this scenario. + + + + + delete_missing + + + The tuple to be deleted was not found. The delete will simply be + skipped in this scenario. + + + + + delete_differ + + + Deleting a row that was previously modified by another origin. Note that this + conflict can only be detected when + track_commit_timestamp + is enabled. Currenly, the delete is always applied regardless of + the origin of the local row. + + + + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc915..5a423f4 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->detectconflict = subform->subdetectconflict; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 19cabc9..d084bfc 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1356,7 +1356,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subdetectconflict, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d124bfe..b2bc095 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,8 +71,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_DETECT_CONFLICT 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +99,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool detectconflict; char *origin; XLogRecPtr lsn; } SubOpts; @@ -162,6 +164,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT)) + opts->detectconflict = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -307,6 +311,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT) && + strcmp(defel->defname, "detect_conflict") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_DETECT_CONFLICT)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_DETECT_CONFLICT; + opts->detectconflict = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -594,7 +607,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -701,6 +715,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1196,7 +1212,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1356,6 +1372,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_DETECT_CONFLICT)) + { + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); + replaces[Anum_pg_subscription_subdetectconflict - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index 9f05b36..ef52277 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative) ii = BuildIndexInfo(indexDesc); /* - * If the indexes are to be used for speculative insertion, add extra - * information required by unique index entries. + * If the indexes are to be used for speculative insertion or conflict + * detection in logical replication, add extra information required by + * unique index entries. */ if (speculative && ii->ii_Unique) BuildSpeculativeIndexInfo(indexDesc, ii); @@ -521,6 +522,11 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, * possible that a conflicting tuple is inserted immediately * after this returns. But this can be used for a pre-check * before insertion. + * + * If the 'slot' holds a tuple with valid tid, this tuple will + * be ignored when checking conflict. This can help in scenarios + * where we want to re-check for conflicts after inserting a + * tuple. * ---------------------------------------------------------------- */ bool @@ -536,11 +542,9 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, ExprContext *econtext; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - ItemPointerData invalidItemPtr; bool checkedIndex = false; ItemPointerSetInvalid(conflictTid); - ItemPointerSetInvalid(&invalidItemPtr); /* * Get information from the result relation info structure. @@ -629,7 +633,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, satisfiesConstraint = check_exclusion_or_unique_constraint(heapRelation, indexRelation, - indexInfo, &invalidItemPtr, + indexInfo, &slot->tts_tid, values, isnull, estate, false, CEOUC_WAIT, true, conflictTid); diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd..0680bc8 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" +#include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" #include "utils/builtins.h" @@ -481,6 +482,121 @@ retry: } /* + * Find the tuple that violates the passed in unique index constraint + * (conflictindex). + * + * Return false if there is no conflict and *conflictslot is set to NULL. + * Otherwise return true, and the conflicting tuple is locked and returned + * in *conflictslot. + */ +static bool +FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, + Oid conflictindex, TupleTableSlot *slot, + TupleTableSlot **conflictslot) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + ItemPointerData conflictTid; + TM_FailureData tmfd; + TM_Result res; + + *conflictslot = NULL; + +retry: + if (ExecCheckIndexConstraints(resultRelInfo, slot, estate, + &conflictTid, list_make1_oid(conflictindex))) + { + if (*conflictslot) + ExecDropSingleTupleTableSlot(*conflictslot); + + *conflictslot = NULL; + return false; + } + + *conflictslot = table_slot_create(rel, NULL); + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(), + *conflictslot, + GetCurrentCommandId(false), + LockTupleShare, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + switch (res) + { + case TM_Ok: + break; + case TM_Updated: + /* XXX: Improve handling here */ + if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + goto retry; + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; + case TM_Invisible: + elog(ERROR, "attempted to lock invisible tuple"); + break; + default: + elog(ERROR, "unexpected table_tuple_lock status: %u", res); + break; + } + + return true; +} + +/* + * Re-check all the unique indexes in 'recheckIndexes' to see if there are + * potential conflicts with the tuple in 'slot'. + * + * This function is invoked after inserting or updating a tuple that detected + * potential conflict tuples. It attempts to find the tuple that conflicts with + * the provided tuple. This operation may seem redundant with the unique + * violation check of indexam, but since we call this function only when we are + * detecting conflict in logical replication and encountering potential + * conflicts with any unique index constraints (which should not be frequent), + * so it's ok. Moreover, upon detecting a conflict, we will report an ERROR and + * restart the logical replication, so the additional cost of finding the tuple + * should be acceptable. + */ +static void +ReCheckConflictIndexes(ResultRelInfo *resultRelInfo, EState *estate, + ConflictType type, List *recheckIndexes, + TupleTableSlot *slot) +{ + /* Re-check all the unique indexes for potential conflicts */ + foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes) + { + TupleTableSlot *conflictslot; + + if (list_member_oid(recheckIndexes, uniqueidx) && + FindConflictTuple(resultRelInfo, estate, uniqueidx, slot, &conflictslot)) + { + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + + GetTupleCommitTs(conflictslot, &xmin, &origin, &committs); + ReportApplyConflict(ERROR, type, resultRelInfo->ri_RelationDesc, uniqueidx, + xmin, origin, committs, conflictslot); + } + } +} + +/* * Insert tuple represented in the slot to the relation, update the indexes, * and execute any constraints and per-row triggers. * @@ -509,6 +625,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, if (!skip_tuple) { List *recheckIndexes = NIL; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -525,10 +643,17 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, false, - NULL, NIL, false); + slot, estate, false, + conflictindexes, &conflict, + conflictindexes, false); + + if (conflict) + ReCheckConflictIndexes(resultRelInfo, estate, CT_INSERT_EXISTS, + recheckIndexes, slot); /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, slot, @@ -577,6 +702,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, { List *recheckIndexes = NIL; TU_UpdateIndexes update_indexes; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -593,12 +720,19 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, simple_table_tuple_update(rel, tid, slot, estate->es_snapshot, &update_indexes); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None)) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, true, false, - NULL, NIL, + slot, estate, true, + conflictindexes, + &conflict, conflictindexes, (update_indexes == TU_Summarizing)); + if (conflict) + ReCheckConflictIndexes(resultRelInfo, estate, CT_UPDATE_EXISTS, + recheckIndexes, slot); + /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, NULL, NULL, diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index ba03eef..1e08bbb 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ applyparallelworker.o \ + conflict.o \ decode.o \ launcher.o \ logical.o \ diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c new file mode 100644 index 0000000..4918011 --- /dev/null +++ b/src/backend/replication/logical/conflict.c @@ -0,0 +1,193 @@ +/*------------------------------------------------------------------------- + * conflict.c + * Functionality for detecting and logging conflicts. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/conflict.c + * + * This file contains the code for detecting and logging conflicts on + * the subscriber during logical replication. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/commit_ts.h" +#include "replication/conflict.h" +#include "replication/origin.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" + +const char *const ConflictTypeNames[] = { + [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_EXISTS] = "update_exists", + [CT_UPDATE_DIFFER] = "update_differ", + [CT_UPDATE_MISSING] = "update_missing", + [CT_DELETE_MISSING] = "delete_missing", + [CT_DELETE_DIFFER] = "delete_differ" +}; + +static char *build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot); +static int errdetail_apply_conflict(ConflictType type, Oid conflictidx, + TransactionId localxmin, + RepOriginId localorigin, + TimestampTz localts, + TupleTableSlot *conflictslot); + +/* + * Get the xmin and commit timestamp data (origin and timestamp) associated + * with the provided local tuple. + * + * Return true if the commit timestamp data was found, false otherwise. + */ +bool +GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, TimestampTz *localts) +{ + Datum xminDatum; + bool isnull; + + xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber, + &isnull); + *xmin = DatumGetTransactionId(xminDatum); + Assert(!isnull); + + /* + * The commit timestamp data is not available if track_commit_timestamp is + * disabled. + */ + if (!track_commit_timestamp) + { + *localorigin = InvalidRepOriginId; + *localts = 0; + return false; + } + + return TransactionIdGetCommitTsData(*xmin, localts, localorigin); +} + +/* + * Report a conflict when applying remote changes. + */ +void +ReportApplyConflict(int elevel, ConflictType type, Relation localrel, + Oid conflictidx, TransactionId localxmin, + RepOriginId localorigin, TimestampTz localts, + TupleTableSlot *conflictslot) +{ + ereport(elevel, + errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("conflict %s detected on relation \"%s.%s\"", + ConflictTypeNames[type], + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel)), + errdetail_apply_conflict(type, conflictidx, localxmin, localorigin, + localts, conflictslot)); +} + +/* + * Find all unique indexes to check for a conflict and store them into + * ResultRelInfo. + */ +void +InitConflictIndexes(ResultRelInfo *relInfo) +{ + List *uniqueIndexes = NIL; + + for (int i = 0; i < relInfo->ri_NumIndices; i++) + { + Relation indexRelation = relInfo->ri_IndexRelationDescs[i]; + + if (indexRelation == NULL) + continue; + + /* Detect conflict only for unique indexes */ + if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique) + continue; + + /* Don't support conflict detection for deferrable index */ + if (!indexRelation->rd_index->indimmediate) + continue; + + uniqueIndexes = lappend_oid(uniqueIndexes, + RelationGetRelid(indexRelation)); + } + + relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; +} + +/* + * Add an errdetail() line showing conflict detail. + */ +static int +errdetail_apply_conflict(ConflictType type, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot) +{ + switch (type) + { + case CT_INSERT_EXISTS: + case CT_UPDATE_EXISTS: + { + /* + * Bulid the index value string. If the return value is NULL, + * it indicates that the current user lacks permissions to + * view all the columns involved. + */ + char *index_value = build_index_value_desc(conflictidx, + conflictslot); + + if (index_value && localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", + index_value, get_rel_name(conflictidx), localorigin, + localxmin, timestamptz_to_str(localts)); + else if (index_value && !localts) + return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", + index_value, get_rel_name(conflictidx), localxmin); + else + return errdetail("Key already exists in unique index \"%s\".", + get_rel_name(conflictidx)); + } + case CT_UPDATE_DIFFER: + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s.", + localorigin, localxmin, timestamptz_to_str(localts)); + case CT_UPDATE_MISSING: + return errdetail("Did not find the row to be updated."); + case CT_DELETE_MISSING: + return errdetail("Did not find the row to be deleted."); + case CT_DELETE_DIFFER: + return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s.", + localorigin, localxmin, timestamptz_to_str(localts)); + } + + return 0; /* silence compiler warning */ +} + +/* + * Helper functions to construct a string describing the contents of an index + * entry. See BuildIndexValueDescription for details. + */ +static char * +build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot) +{ + char *conflict_row; + Relation indexDesc; + + if (!conflictslot) + return NULL; + + /* Assume the index has been locked */ + indexDesc = index_open(indexoid, NoLock); + + slot_getallattrs(conflictslot); + + conflict_row = BuildIndexValueDescription(indexDesc, + conflictslot->tts_values, + conflictslot->tts_isnull); + + index_close(indexDesc, NoLock); + + return conflict_row; +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 3dec36a..3d36249 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -2,6 +2,7 @@ backend_sources += files( 'applyparallelworker.c', + 'conflict.c', 'decode.c', 'launcher.c', 'logical.c', diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ec96b5f..fc3f80e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -167,6 +167,7 @@ #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" #include "postmaster/walwriter.h" +#include "replication/conflict.h" #include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" @@ -2458,7 +2459,10 @@ apply_handle_insert_internal(ApplyExecutionData *edata, EState *estate = edata->estate; /* We must open indexes here. */ - ExecOpenIndices(relinfo, false); + ExecOpenIndices(relinfo, MySubscription->detectconflict); + + if (MySubscription->detectconflict) + InitConflictIndexes(relinfo); /* Do the insert. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); @@ -2646,7 +2650,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, MemoryContext oldctx; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(relinfo, false); + ExecOpenIndices(relinfo, MySubscription->detectconflict); found = FindReplTupleInLocalRel(edata, localrel, &relmapentry->remoterel, @@ -2661,6 +2665,20 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * If conflict detection is enabled, check whether the local tuple was + * modified by a different origin. If detected, report the conflict. + */ + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(LOG, CT_UPDATE_DIFFER, localrel, InvalidOid, + localxmin, localorigin, localts, NULL); + /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_modify_data(remoteslot, localslot, relmapentry, newtup); @@ -2668,6 +2686,9 @@ apply_handle_update_internal(ApplyExecutionData *edata, EvalPlanQualSetSlot(&epqstate, remoteslot); + if (MySubscription->detectconflict) + InitConflictIndexes(relinfo); + /* Do the actual update. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, @@ -2678,13 +2699,10 @@ apply_handle_update_internal(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_UPDATE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -2807,6 +2825,20 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* If found delete it. */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * If conflict detection is enabled, check whether the local tuple was + * modified by a different origin. If detected, report the conflict. + */ + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(LOG, CT_DELETE_DIFFER, localrel, InvalidOid, + localxmin, localorigin, localts, NULL); + EvalPlanQualSetSlot(&epqstate, localslot); /* Do the actual delete. */ @@ -2818,13 +2850,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be deleted " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_DELETE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -2991,6 +3020,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ResultRelInfo *partrelinfo_new; Relation partrel_new; bool found; + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3002,17 +3034,29 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing * except for emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation's partition \"%s\"", - RelationGetRelationName(partrel)); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_UPDATE_MISSING, + partrel, InvalidOid, + InvalidTransactionId, + InvalidRepOriginId, 0, NULL); + return; } /* + * If conflict detection is enabled, check whether the local + * tuple was modified by a different origin. If detected, + * report the conflict. + */ + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(LOG, CT_UPDATE_DIFFER, partrel, + InvalidOid, localxmin, localorigin, + localts, NULL); + + /* * Apply the update to the local tuple, putting the result in * remoteslot_part. */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2b02148..0f72d28 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4760,6 +4760,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subdetectconflict; int i, ntups; @@ -4832,11 +4833,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBuffer(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + if (fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, + " s.subdetectconflict\n"); + else + appendPQExpBuffer(query, + " false AS subdetectconflict\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -4875,6 +4882,7 @@ getSubscriptions(Archive *fout) i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); i_subenabled = PQfnumber(res, "subenabled"); i_subfailover = PQfnumber(res, "subfailover"); + i_subdetectconflict = PQfnumber(res, "subdetectconflict"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4921,6 +4929,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subenabled)); subinfo[i].subfailover = pg_strdup(PQgetvalue(res, i, i_subfailover)); + subinfo[i].subdetectconflict = + pg_strdup(PQgetvalue(res, i, i_subdetectconflict)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5161,6 +5171,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subfailover, "t") == 0) appendPQExpBufferStr(query, ", failover = true"); + if (strcmp(subinfo->subdetectconflict, "t") == 0) + appendPQExpBufferStr(query, ", detect_conflict = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 4b2e587..bbd7cbe 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -671,6 +671,7 @@ typedef struct _SubscriptionInfo char *suborigin; char *suboriginremotelsn; char *subfailover; + char *subdetectconflict; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7c9a1f2..fef1ad0 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6539,7 +6539,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6607,6 +6607,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 170000) + appendPQExpBuffer(&buf, + ", subdetectconflict AS \"%s\"\n", + gettext_noop("Detect conflict")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 0244694..1c416cf 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1946,9 +1946,10 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + COMPLETE_WITH("binary", "detect_conflict", "disable_on_error", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3357,9 +3358,10 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "detect_conflict", "disable_on_error", "enabled", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec..17daf11 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subdetectconflict; /* True if replication should perform + * conflict detection */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -151,6 +154,7 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool detectconflict; /* True if conflict detection is enabled */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h new file mode 100644 index 0000000..3a7260d --- /dev/null +++ b/src/include/replication/conflict.h @@ -0,0 +1,50 @@ +/*------------------------------------------------------------------------- + * conflict.h + * Exports for conflict detection and log + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef CONFLICT_H +#define CONFLICT_H + +#include "access/xlogdefs.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" +#include "utils/relcache.h" +#include "utils/timestamp.h" + +/* + * Conflict types that could be encountered when applying remote changes. + */ +typedef enum +{ + /* The row to be inserted violates unique constraint */ + CT_INSERT_EXISTS, + + /* The updated row value violates unique constraint */ + CT_UPDATE_EXISTS, + + /* The row to be updated was modified by a different origin */ + CT_UPDATE_DIFFER, + + /* The row to be updated is missing */ + CT_UPDATE_MISSING, + + /* The row to be deleted is missing */ + CT_DELETE_MISSING, + + /* The row to be deleted was modified by a different origin */ + CT_DELETE_DIFFER, +} ConflictType; + +extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, TimestampTz *localts); +extern void ReportApplyConflict(int elevel, ConflictType type, + Relation localrel, Oid conflictidx, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts, TupleTableSlot *conflictslot); +extern void InitConflictIndexes(ResultRelInfo *relInfo); + +#endif diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 17d48b1..cc9337c 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,42 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); +ERROR: detect_conflict requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 007c9e7..5c740fd 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -287,6 +287,21 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 471e981..78c0307 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -331,11 +331,12 @@ is( $result, qq(1|bar 2|baz), 'update works with REPLICA IDENTITY FULL and a primary key'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); -$node_subscriber->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (detect_conflict = true)" +); $node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); @@ -352,10 +353,10 @@ $node_publisher->wait_for_catchup('tap_sub'); my $logfile = slurp_file($node_subscriber->logfile, $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/, + qr/conflict update_missing detected on relation "public.tab_full_pk".*\n.*DETAIL:.* Did not find the row to be updated./m, 'update target row is missing'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/, + qr/conflict delete_missing detected on relation "public.tab_full_pk".*\n.*DETAIL:.* Did not find the row to be deleted./m, 'delete target row is missing'); $node_subscriber->append_conf('postgresql.conf', diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 2958052..7a66a06 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -343,12 +343,12 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); is($result, qq(), 'truncate of tab1 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = true)" +); $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); @@ -372,21 +372,21 @@ $node_publisher->wait_for_catchup('sub2'); my $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/, + qr/conflict update_missing detected on relation "public.tab1_2_2".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/, + qr/conflict delete_missing detected on relation "public.tab1_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/, + qr/conflict delete_missing detected on relation "public.tab1_2_2".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/, + qr/conflict delete_missing detected on relation "public.tab1_def".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_def'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = false)" +); # Tests for replication using root table identity and schema @@ -773,12 +773,12 @@ pub_tab2|3|yyy pub_tab2|5|zzz xxx_c|6|aaa), 'inserts into tab2 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = true)" +); $node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); @@ -796,15 +796,35 @@ $node_publisher->wait_for_catchup('sub2'); $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/, + qr/conflict update_missing detected on relation "public.tab2_1".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab2_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, + qr/conflict delete_missing detected on relation "public.tab2_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab2_1'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; +# Enable the track_commit_timestamp to detect the conflict when attempting +# to update a row that was previously modified by a different origin. +$node_subscriber1->append_conf('postgresql.conf', 'track_commit_timestamp = on'); +$node_subscriber1->restart; + +$node_subscriber1->safe_psql('postgres', "INSERT INTO tab2 VALUES (3, 'yyy')"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET b = 'quux' WHERE a = 3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); + +$logfile = slurp_file($node_subscriber1->logfile(), $log_location); +ok( $logfile =~ + qr/Updating a row that was modified by a different origin [0-9]+ in transaction [0-9]+ at .*/, + 'updating a tuple that was modified by a different origin'); + +# The remaining tests no longer test conflict detection. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = false)" +); + +$node_subscriber1->append_conf('postgresql.conf', 'track_commit_timestamp = off'); +$node_subscriber1->restart; # Test that replication continues to work correctly after altering the # partition of a partitioned target table. diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index 0ab57a4..496a3c6 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -30,7 +30,7 @@ sub test_skip_lsn # ERROR with its CONTEXT when retrieving this information. my $contents = slurp_file($node_subscriber->logfile, $offset); $contents =~ - qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m + qr/conflict insert_exists detected on relation "public.tbl".*\n.*DETAIL:.* Key \(i\)=\(1\) already exists in unique index "tbl_pkey", which was modified by origin \d+ in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m or die "could not get error-LSN"; my $lsn = $1; @@ -83,6 +83,7 @@ $node_subscriber->append_conf( 'postgresql.conf', qq[ max_prepared_transactions = 10 +track_commit_timestamp = on ]); $node_subscriber->start; @@ -109,7 +110,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR TABLE tbl"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)" + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on, detect_conflict = on)" ); # Initial synchronization failure causes the subscription to be disabled. diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 056561f..8c929c0 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -27,9 +27,14 @@ my $stderr; my $node_A = PostgreSQL::Test::Cluster->new('node_A'); $node_A->init(allows_streaming => 'logical'); $node_A->start; + # node_B my $node_B = PostgreSQL::Test::Cluster->new('node_B'); $node_B->init(allows_streaming => 'logical'); + +# Enable the track_commit_timestamp to detect the conflict when attempting to +# update a row that was previously modified by a different origin. +$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = on'); $node_B->start; # Create table on node_A @@ -140,6 +145,44 @@ is($result, qq(), ); ############################################################################### +# Check that the conflict can be detected when attempting to update or +# delete a row that was previously modified by a different source. +############################################################################### + +$node_B->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_BC SET (detect_conflict = true); + DELETE FROM tab;"); + +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); + +# The update should update the row on node B that was inserted by node A. +$node_C->safe_psql('postgres', "UPDATE tab SET a = 33 WHERE a = 32;"); + +$node_B->wait_for_log( + qr/Updating a row that was modified by a different origin [0-9]+ in transaction [0-9]+ at .*/ +); + +$node_B->safe_psql('postgres', "DELETE FROM tab;"); +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (33);"); + +# The delete should remove the row on node B that was inserted by node A. +$node_C->safe_psql('postgres', "DELETE FROM tab WHERE a = 33;"); + +$node_B->wait_for_log( + qr/Deleting a row that was modified by a different origin [0-9]+ in transaction [0-9]+ at .*/ +); + +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); + +# The remaining tests no longer test conflict detection. +$node_B->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_BC SET (detect_conflict = false);"); + +$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = off'); +$node_B->restart; + +############################################################################### # Specifying origin = NONE indicates that the publisher should only replicate the # changes that are generated locally from node_B, but in this case since the # node_B is also subscribing data from node_A, node_B can have remotely diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3deb611..ae1db1a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -467,6 +467,7 @@ ConditionVariableMinimallyPadded ConditionalStack ConfigData ConfigVariable +ConflictType ConnCacheEntry ConnCacheKey ConnParams -- 1.8.3.1