From cb9f46957122238076a28713ecfde1844e639e9c Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 31 Jul 2024 03:27:18 -0400 Subject: [PATCH v8 4/4] Manage Clock skew and implement last_update_wins This patch attempts to manage clock skew between nodes by introducing two new GUCs: a) max_logical_rep_clock_skew b) max_logical_rep_clock_skew_action c) max_logical_rep_clock_skew_wait If the timestamp of the currently replayed transaction is in the future compared to the current time on the subscriber and the difference is larger than 'max_logical_rep_clock_skew', then the action configured in 'max_logical_rep_clock_skew_action' is performed by the apply worker. If user configures 'wait' in 'max_logical_rep_clock_skew_action' and actual clock skew is large while 'max_logical_rep_clock_skew' is small, the apply worker may have to wait for a longer period to manage the clock skew. To control this maximum wait time, a new GUC, 'max_logical_rep_clock_skew_wait', is provided. This allows the user to set a cap on how long the apply worker should wait. If the computed wait time exceeds this value, the apply worker will error out without waiting. This patch also implements last_update_wins resolver. Since conflict resolution for two phase commit transactions using prepare-timestamp can result in data divergence, this patch also restricts enabling two_phase and detect_conflict together for a subscription. --- src/backend/commands/subscriptioncmds.c | 45 ++++++ src/backend/executor/execReplication.c | 2 +- .../replication/logical/applyparallelworker.c | 26 +++- src/backend/replication/logical/conflict.c | 146 ++++++++++++++----- src/backend/replication/logical/origin.c | 1 + src/backend/replication/logical/worker.c | 155 +++++++++++++++++++-- src/backend/utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 40 ++++++ src/backend/utils/misc/postgresql.conf.sample | 9 +- src/include/replication/conflict.h | 6 +- src/include/replication/logicalworker.h | 18 +++ src/include/replication/origin.h | 1 + src/include/replication/worker_internal.h | 2 +- src/include/utils/timestamp.h | 1 + src/test/regress/expected/subscription.out | 46 +++--- src/test/subscription/t/029_on_error.pl | 53 +++++-- src/test/subscription/t/034_conflict_resolver.pl | 116 +++++++++++++-- src/tools/pgindent/typedefs.list | 1 + 18 files changed, 576 insertions(+), 93 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5c18c45..5a0c8b0 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -452,6 +453,22 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } + + /* + * Time based conflict resolution for two phase transactions can result in + * data divergence, so disallow enabling both together. + */ + if (opts->detectconflict && + IsSet(opts->specified_opts, SUBOPT_DETECT_CONFLICT)) + { + if (opts->twophase && + IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ + errmsg("%s and %s are mutually exclusive options", + "detect_conflict = true", "two_phase = true"))); + } } /* @@ -719,6 +736,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\""); #endif + /* Warn if detect_conflict is enabled and track_commit_timestamp is off */ + if (opts.detectconflict && !track_commit_timestamp) + ereport(WARNING, + (errmsg("detect_conflict is enabled but \"%s\" is OFF, the last_update_wins resolution may not work", + "track_commit_timestamp"), + errhint("Enable \"%s\".", "track_commit_timestamp"))); + rel = table_open(SubscriptionRelationId, RowExclusiveLock); /* Check if name is used */ @@ -1454,6 +1478,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, values[Anum_pg_subscription_subdetectconflict - 1] = BoolGetDatum(opts.detectconflict); replaces[Anum_pg_subscription_subdetectconflict - 1] = true; + + /* + * Time based conflict resolution for two phase + * transactions can result in data divergence, so disallow + * enabling it when two_phase is enabled. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for a subscription that has two_phase enabled", + "detect_conflict"))); + + /* + * Warn if detect_conflict is enabled and + * track_commit_timestamp is off. + */ + if (opts.detectconflict && !track_commit_timestamp) + ereport(WARNING, + (errmsg("detect_conflict is enabled but \"%s\" is OFF, the last_update_wins resolution may not work", + "track_commit_timestamp"), + errhint("Enable \"%s\".", "track_commit_timestamp"))); } if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index fb96bb7..93ba364 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -669,7 +669,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, bool apply_remote = false; GetTupleCommitTs(*conflictslot, &xmin, &origin, &committs); - resolver = GetConflictResolver(rel, CT_INSERT_EXISTS, + resolver = GetConflictResolver(*conflictslot, rel, CT_INSERT_EXISTS, &apply_remote, NULL, subid); ReportApplyConflict(CT_INSERT_EXISTS, resolver, rel, diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index e7f7d4c..10c7ca9 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -312,6 +312,20 @@ pa_can_start(void) if (!AllTablesyncsReady()) return false; + /* + * Don't start a new parallel worker if user has either configured max + * clock skew or if conflict detection and resolution is ON. In both cases + * we need commit timestamp in the beginning. + * + * XXX: For conflict reolution case, see if we can reduce the scope of + * this restriction to only such cases where time-based resolvers are + * actually being used. + */ + if ((max_logical_rep_clock_skew > LR_CLOCK_SKEW_DEFAULT) || + MySubscription->detectconflict) + return false; + + return true; } @@ -696,9 +710,19 @@ pa_process_spooled_messages_if_required(void) } else if (fileset_state == FS_READY) { + /* + * Currently we do not support starting parallel apply worker when + * either clock skew is configured or conflict resolution is + * configured, thus it is okay to pass 0 as origin-timestamp here. + * + * XXX: If in future, we support starting pa worker even with conflict + * detection enabled, then here we need to pass remote's + * commit/prepare/abort timestamp; we can get that info from leader + * worker in shared memory. + */ apply_spooled_messages(&MyParallelShared->fileset, MyParallelShared->xid, - InvalidXLogRecPtr); + InvalidXLogRecPtr, 0); pa_set_fileset_state(MyParallelShared, FS_EMPTY); } diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 9ac707c..7af07b8 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -50,6 +50,7 @@ const char *const ConflictTypeNames[] = { const char *const ConflictResolverNames[] = { [CR_REMOTE_APPLY] = "remote_apply", [CR_KEEP_LOCAL] = "keep_local", + [CR_LAST_UPDATE_WINS] = "last_update_wins", [CR_APPLY_OR_SKIP] = "apply_or_skip", [CR_APPLY_OR_ERROR] = "apply_or_error", [CR_SKIP] = "skip", @@ -75,24 +76,24 @@ const char *const ConflictResolverNames[] = { * friendly name for a resolver and thus has been added here. */ const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { - [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, - [CT_UPDATE_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, - [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, + [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, + [CT_UPDATE_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, + [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, [CT_UPDATE_MISSING] = {CR_APPLY_OR_SKIP, CR_APPLY_OR_ERROR, CR_SKIP, CR_ERROR}, [CT_DELETE_MISSING] = {CR_SKIP, CR_ERROR}, - [CT_DELETE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR} + [CT_DELETE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR} }; /* * Default conflict resolver for each conflict type. */ const int ConflictTypeDefaultResolvers[] = { - [CT_INSERT_EXISTS] = CR_REMOTE_APPLY, - [CT_UPDATE_EXISTS] = CR_REMOTE_APPLY, - [CT_UPDATE_DIFFER] = CR_REMOTE_APPLY, + [CT_INSERT_EXISTS] = CR_LAST_UPDATE_WINS, + [CT_UPDATE_EXISTS] = CR_LAST_UPDATE_WINS, + [CT_UPDATE_DIFFER] = CR_LAST_UPDATE_WINS, [CT_UPDATE_MISSING] = CR_APPLY_OR_SKIP, [CT_DELETE_MISSING] = CR_SKIP, - [CT_DELETE_DIFFER] = CR_REMOTE_APPLY + [CT_DELETE_DIFFER] = CR_LAST_UPDATE_WINS }; @@ -207,6 +208,12 @@ errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, TupleTableSlot *conflictslot, bool apply_remote) { char *applymsg; + int errdet = 0; + char *local_ts; + char *remote_ts; + + local_ts = pstrdup(timestamptz_to_str(localts)); + remote_ts = pstrdup(timestamptz_to_str(replorigin_session_origin_timestamp)); if (apply_remote) applymsg = "applying the remote changes."; @@ -229,43 +236,63 @@ errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, if (resolver == CR_ERROR) { if (index_value && localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", - index_value, get_rel_name(conflictidx), localorigin, - localxmin, timestamptz_to_str(localts)); + errdet = errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", + index_value, get_rel_name(conflictidx), + localorigin, localxmin, local_ts); else if (index_value && !localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", - index_value, get_rel_name(conflictidx), localxmin); + errdet = errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", + index_value, get_rel_name(conflictidx), localxmin); else - return errdetail("Key already exists in unique index \"%s\".", - get_rel_name(conflictidx)); + errdet = errdetail("Key already exists in unique index \"%s\".", + get_rel_name(conflictidx)); } + else if (resolver == CR_LAST_UPDATE_WINS) + errdet = errdetail("Key already exists, %s. The local tuple : origin=%u, timestamp=%s; The remote tuple : origin=%u, timestamp=%s.", + applymsg, localorigin, local_ts, + replorigin_session_origin, remote_ts); else - return errdetail("Key already exists, %s", applymsg); + errdet = errdetail("Key already exists, %s", applymsg); } + break; case CT_UPDATE_DIFFER: - return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s, %s", - localorigin, localxmin, - timestamptz_to_str(localts), applymsg); + if (resolver == CR_LAST_UPDATE_WINS) + errdet = errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s, %s The remote tuple : origin=%u, timestamp=%s.", + localorigin, localxmin, local_ts, applymsg, + replorigin_session_origin, remote_ts); + else + errdet = errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s, %s", + localorigin, localxmin, local_ts, applymsg); + break; case CT_UPDATE_MISSING: if (resolver == CR_APPLY_OR_SKIP && !apply_remote) - return errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence SKIP the update."); + errdet = errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence SKIP the update."); else if (resolver == CR_APPLY_OR_ERROR && !apply_remote) - return errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence ERROR out."); + errdet = errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence ERROR out."); else if (apply_remote) - return errdetail("Did not find the row to be updated. Convert UPDATE to INSERT and %s", - applymsg); + errdet = errdetail("Did not find the row to be updated. Convert UPDATE to INSERT and %s", + applymsg); else - return errdetail("Did not find the row to be updated, %s", - applymsg); + errdet = errdetail("Did not find the row to be updated, %s", + applymsg); + break; case CT_DELETE_MISSING: - return errdetail("Did not find the row to be deleted."); + errdet = errdetail("Did not find the row to be deleted."); + break; case CT_DELETE_DIFFER: - return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s, %s", - localorigin, localxmin, - timestamptz_to_str(localts), applymsg); + if (resolver == CR_LAST_UPDATE_WINS) + errdet = errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s, %s The remote tuple : origin=%u, timestamp=%s.", + localorigin, localxmin, local_ts, applymsg, + replorigin_session_origin, remote_ts); + else + errdet = errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s, %s", + localorigin, localxmin, local_ts, applymsg); + break; } - return 0; /* silence compiler warning */ + pfree(local_ts); + pfree(remote_ts); + + return errdet; } /* @@ -375,6 +402,15 @@ validate_conflict_type_and_resolver(const char *conflict_type, errmsg("%s is not a valid conflict resolver for conflict type %s", conflict_resolver, conflict_type)); + + if ((resolver == CR_LAST_UPDATE_WINS) && !track_commit_timestamp) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("resolver %s requires \"%s\" to be enabled", + conflict_resolver, "track_commit_timestamp"), + errhint("Make sure the configuration parameter \"%s\" is set.", + "track_commit_timestamp")); + return type; } @@ -420,6 +456,42 @@ get_conflict_resolver_internal(ConflictType type, Oid subid) } /* + * Compare the timestamps of given local tuple and the remote tuple to + * resolve the conflict. + * + * Returns true if remote tuple has the latest timestamp, false otherwise. + */ +static bool +resolve_by_timestamp(TupleTableSlot *localslot) +{ + TransactionId local_xmin; + TimestampTz local_ts; + RepOriginId local_origin; + int ts_cmp; + uint64 local_system_identifier; + + /* Get origin and timestamp info of the local tuple */ + GetTupleCommitTs(localslot, &local_xmin, &local_origin, &local_ts); + + /* Compare the timestamps of remote & local tuple to decide the winner */ + ts_cmp = timestamptz_cmp_internal(replorigin_session_origin_timestamp, + local_ts); + + if (ts_cmp == 0) + { + elog(LOG, "Timestamps of remote and local tuple are equal, comparing remote and local system identifiers"); + + /* Get current system's identifier */ + local_system_identifier = GetSystemIdentifier(); + + return local_system_identifier <= replorigin_session_origin_sysid; + } + else + return (ts_cmp > 0); + +} + +/* * Check if a full tuple can be created from the new tuple. * Return true if yes, false otherwise. */ @@ -610,7 +682,8 @@ RemoveSubscriptionConflictBySubid(Oid subid) * false otherwise. */ ConflictResolver -GetConflictResolver(Relation localrel, ConflictType type, bool *apply_remote, +GetConflictResolver(TupleTableSlot *localslot, Relation localrel, + ConflictType type, bool *apply_remote, LogicalRepTupleData *newtup, Oid subid) { ConflictResolver resolver; @@ -619,6 +692,17 @@ GetConflictResolver(Relation localrel, ConflictType type, bool *apply_remote, switch (resolver) { + case CR_LAST_UPDATE_WINS: + if (!track_commit_timestamp) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("resolver %s requires \"%s\" to be enabled", + ConflictResolverNames[resolver], "track_commit_timestamp"), + errhint("Make sure the configuration parameter \"%s\" is set.", + "track_commit_timestamp")); + else + *apply_remote = resolve_by_timestamp(localslot); + break; case CR_REMOTE_APPLY: *apply_remote = true; break; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 419e481..bd8e6f0 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -155,6 +155,7 @@ typedef struct ReplicationStateCtl RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; TimestampTz replorigin_session_origin_timestamp = 0; +uint64 replorigin_session_origin_sysid = 0; /* * Base address into a shared memory array of replication states of size diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 36172b2..5b3b89e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -319,6 +319,20 @@ static uint32 parallel_stream_nchanges = 0; bool InitializingApplyWorker = false; /* + * GUC support + */ +const struct config_enum_entry logical_rep_clock_skew_action_options[] = { + {"error", LR_CLOCK_SKEW_ACTION_ERROR, false}, + {"wait", LR_CLOCK_SKEW_ACTION_WAIT, false}, + {NULL, 0, false} +}; + +/* GUCs */ +int max_logical_rep_clock_skew = LR_CLOCK_SKEW_DEFAULT; +int max_logical_rep_clock_skew_action = LR_CLOCK_SKEW_ACTION_ERROR; +int max_logical_rep_clock_skew_wait = 300; /* 5 mins */ + +/* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. * Once we start skipping changes, we don't stop it until we skip all changes of @@ -985,6 +999,95 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, } /* + * Manage clock skew between nodes. + * + * It checks if the remote timestamp is ahead of the local clock + * and if the difference exceeds max_logical_rep_clock_skew, it performs + * the action specified by the max_logical_rep_clock_skew_action. + */ +static void +manage_clock_skew(TimestampTz origin_timestamp) +{ + TimestampTz current; + TimestampTz delayUntil; + long msecs; + int rc; + + /* nothing to do if no max clock skew configured */ + if (max_logical_rep_clock_skew == LR_CLOCK_SKEW_DEFAULT) + return; + + current = GetCurrentTimestamp(); + + /* + * If the timestamp of the currently replayed transaction is in the future + * compared to the current time on the subscriber and the difference is + * larger than max_logical_rep_clock_skew, then perform the action + * specified by the max_logical_rep_clock_skew_action setting. + */ + if (origin_timestamp > current && + TimestampDifferenceExceeds(current, origin_timestamp, + max_logical_rep_clock_skew * 1000)) + { + if (max_logical_rep_clock_skew_action == LR_CLOCK_SKEW_ACTION_ERROR) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg_internal("clock skew exceeds max_logical_rep_clock_skew (%d seconds)", + max_logical_rep_clock_skew))); + + /* Perform the wait */ + while (true) + { + delayUntil = + TimestampTzMinusSeconds(origin_timestamp, + max_logical_rep_clock_skew); + + /* Exit without waiting if it's already past 'delayUntil' time */ + msecs = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), + delayUntil); + if (msecs <= 0) + break; + + /* The wait time should not exceed max_logical_rep_clock_skew_wait */ + if (msecs > (max_logical_rep_clock_skew_wait * 1000L)) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg_internal("clock skew wait time exceeds max_logical_rep_clock_skew_wait (%d seconds)", + max_logical_rep_clock_skew_wait))); + + elog(DEBUG2, "delaying apply for %ld milliseconds to manage clock skew", + msecs); + + /* Sleep until we are signaled or msecs have elapsed */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + msecs, + WAIT_EVENT_LOGICAL_CLOCK_SKEW); + + /* Exit the loop if msecs have elapsed */ + if (rc & WL_TIMEOUT) + break; + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* + * This might change max_logical_rep_clock_skew and + * max_logical_rep_clock_skew_wait. + */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + } +} + +/* * Handle BEGIN message. */ static void @@ -1005,6 +1108,15 @@ apply_handle_begin(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.committime); + + /* + * Capture the commit timestamp of the remote transaction for time based + * conflict resolution purpose. + */ + replorigin_session_origin_timestamp = begin_data.committime; } /* @@ -1062,6 +1174,9 @@ apply_handle_begin_prepare(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.prepare_time); } /* @@ -1302,7 +1417,8 @@ apply_handle_stream_prepare(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, - prepare_data.xid, prepare_data.prepare_lsn); + prepare_data.xid, prepare_data.prepare_lsn, + prepare_data.prepare_time); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -1999,7 +2115,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, */ void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn) + XLogRecPtr lsn, + TimestampTz origin_timestamp) { int nchanges; char path[MAXPGPATH]; @@ -2053,6 +2170,16 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, end_replication_step(); /* + * If origin_timestamp is provided by caller, then check clock skew with + * respect to the passed time and take configured action. + */ + if (origin_timestamp) + manage_clock_skew(origin_timestamp); + + /* Capture the timestamp (prepare or commit) of the remote transaction */ + replorigin_session_origin_timestamp = origin_timestamp; + + /* * Read the entries one by one and pass them through the same logic as in * apply_dispatch. */ @@ -2157,7 +2284,8 @@ apply_handle_stream_commit(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, - commit_data.commit_lsn); + commit_data.commit_lsn, + commit_data.committime); apply_handle_commit_internal(&commit_data); @@ -2715,7 +2843,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) { - resolver = GetConflictResolver(localrel, CT_UPDATE_DIFFER, + resolver = GetConflictResolver(localslot, localrel, CT_UPDATE_DIFFER, &apply_remote, NULL, MyLogicalRepWorker->subid); @@ -2755,7 +2883,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (MySubscription->detectconflict) { - resolver = GetConflictResolver(localrel, CT_UPDATE_MISSING, + resolver = GetConflictResolver(localslot, localrel, CT_UPDATE_MISSING, &apply_remote, newtup, MyLogicalRepWorker->subid); @@ -2909,7 +3037,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) { - resolver = GetConflictResolver(localrel, CT_DELETE_DIFFER, + resolver = GetConflictResolver(localslot, localrel, CT_DELETE_DIFFER, &apply_remote, NULL, MyLogicalRepWorker->subid); @@ -2936,7 +3064,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, */ if (MySubscription->detectconflict) { - resolver = GetConflictResolver(localrel, CT_DELETE_MISSING, + resolver = GetConflictResolver(localslot, localrel, CT_DELETE_MISSING, &apply_remote, NULL, MyLogicalRepWorker->subid); @@ -3135,7 +3263,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, */ if (MySubscription->detectconflict) { - resolver = GetConflictResolver(partrel, CT_UPDATE_MISSING, + resolver = GetConflictResolver(localslot, partrel, CT_UPDATE_MISSING, &apply_remote, newtup, MyLogicalRepWorker->subid); @@ -3168,7 +3296,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) { - resolver = GetConflictResolver(partrel, CT_UPDATE_DIFFER, + resolver = GetConflictResolver(localslot, partrel, CT_UPDATE_DIFFER, &apply_remote, NULL, MyLogicalRepWorker->subid); @@ -4671,6 +4799,7 @@ run_apply_worker() TimeLineID startpointTLI; char *err; bool must_use_password; + char *replorigin_sysid; slotname = MySubscription->slotname; @@ -4711,10 +4840,12 @@ run_apply_worker() MySubscription->name, err))); /* - * We don't really use the output identify_system for anything but it does - * some initializations on the upstream so let's still call it. + * Call identify_system to do some initializations on the upstream and + * store the output as system identifier of the replication origin node. */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + replorigin_sysid = walrcv_identify_system(LogRepWorkerWalRcvConn, + &startpointTLI); + replorigin_session_origin_sysid = strtoul(replorigin_sysid, NULL, 10); set_apply_error_context_origin(originname); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index db37bee..a51f821 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -59,6 +59,7 @@ CHECKPOINTER_MAIN "Waiting in main loop of checkpointer process." LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." +LOGICAL_CLOCK_SKEW "Waiting in apply-begin of logical replication apply process to bring clock skew in permissible range." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 6a623f5..c9fdfa1 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -68,6 +68,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" @@ -482,6 +483,7 @@ extern const struct config_enum_entry archive_mode_options[]; extern const struct config_enum_entry recovery_target_action_options[]; extern const struct config_enum_entry wal_sync_method_options[]; extern const struct config_enum_entry dynamic_shared_memory_options[]; +extern const struct config_enum_entry logical_rep_clock_skew_action_options[]; /* * GUC option variables that are exported from this module @@ -3651,6 +3653,33 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets maximum clock skew tolerance between logical " + "replication nodes beyond which action configured " + "in max_logical_rep_clock_skew_action is triggered."), + gettext_noop("-1 turns this check off."), + GUC_UNIT_S + }, + &max_logical_rep_clock_skew, + LR_CLOCK_SKEW_DEFAULT, LR_CLOCK_SKEW_DEFAULT, INT_MAX, + NULL, NULL, NULL + }, + + { + {"max_logical_rep_clock_skew_wait", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets max limit on how long apply worker shall wait to " + "bring clock skew within permissible range of max_logical_rep_clock_skew. " + "If the computed wait time is more than this value, " + "apply worker will error out without waiting."), + gettext_noop("0 turns this limit off."), + GUC_UNIT_S + }, + &max_logical_rep_clock_skew_wait, + 300, 0, 3600, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -4918,6 +4947,17 @@ struct config_enum ConfigureNamesEnum[] = }, { + {"max_logical_rep_clock_skew_action", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets the action to perform if a clock skew higher " + "than max_logical_rep_clock_skew is detected."), + NULL + }, + &max_logical_rep_clock_skew_action, + LR_CLOCK_SKEW_ACTION_ERROR, logical_rep_clock_skew_action_options, + NULL, NULL, NULL + }, + + { {"track_functions", PGC_SUSET, STATS_CUMULATIVE, gettext_noop("Collects function-level statistics on database activity."), NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9ec9f97..f7a664a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -383,7 +383,14 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers - +#max_logical_rep_clock_skew = -1 # maximum clock skew tolerance between logical + # replication nodes beyond which action configured in + # 'max_logical_rep_clock_skew_action' is triggered. +#max_logical_rep_clock_skew_action = error # error or wait + # (change requires restart) +#max_logical_rep_clock_skew_wait = 300 # max limit on how long apply worker + # shall wait to bring clock skew within permissible + # range of max_logical_rep_clock_skew. #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 2793ad6..8791f1b 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -59,6 +59,9 @@ typedef enum ConflictResolver /* Keep the local change */ CR_KEEP_LOCAL, + /* Apply the change with latest timestamp */ + CR_LAST_UPDATE_WINS, + /* Apply the remote change; skip if it can not be applied */ CR_APPLY_OR_SKIP, @@ -98,7 +101,8 @@ extern void UpdateSubConflictResolvers(List *conflict_resolvers, Oid subid); extern ConflictType validate_conflict_type_and_resolver(const char *conflict_type, const char *conflict_resolver); extern void SetDefaultResolvers(ConflictTypeResolver * conflictResolvers); -extern ConflictResolver GetConflictResolver(Relation localrel, +extern ConflictResolver GetConflictResolver(TupleTableSlot *localslot, + Relation localrel, ConflictType type, bool *apply_remote, LogicalRepTupleData *newtup, diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d..7cb0306 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,7 +14,25 @@ #include +/* + * The default for max_logical_rep_clock_skew is -1, which means ignore clock + * skew (the check is turned off). + */ +#define LR_CLOCK_SKEW_DEFAULT -1 + +/* + * Worker Clock Skew Action. + */ +typedef enum +{ + LR_CLOCK_SKEW_ACTION_ERROR, + LR_CLOCK_SKEW_ACTION_WAIT, +} LogicalRepClockSkewAction; + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; +extern PGDLLIMPORT int max_logical_rep_clock_skew; +extern PGDLLIMPORT int max_logical_rep_clock_skew_action; +extern PGDLLIMPORT int max_logical_rep_clock_skew_wait; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 7189ba9..dcbbbdf 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -36,6 +36,7 @@ typedef struct xl_replorigin_drop extern PGDLLIMPORT RepOriginId replorigin_session_origin; extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; +extern PGDLLIMPORT uint64 replorigin_session_origin_sysid; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(const char *roname, bool missing_ok); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9646261..95b2a52 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -268,7 +268,7 @@ extern void stream_stop_internal(TransactionId xid); /* Common streaming function to apply all the spooled messages */ extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn); + XLogRecPtr lsn, TimestampTz origin_timestamp); extern void apply_dispatch(StringInfo s); diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index a6ce03e..53b828d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -84,6 +84,7 @@ IntervalPGetDatum(const Interval *X) /* Macros for doing timestamp arithmetic without assuming timestamp's units */ #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000)) #define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000)) +#define TimestampTzMinusSeconds(tz,s) ((tz) - ((s) * (int64) 1000000)) /* Set at postmaster start */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 7bd4e82..f46e922 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -431,6 +431,8 @@ ERROR: detect_conflict requires a Boolean value -- now it works CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); WARNING: Will use default resolvers configuration as detect_conflict is ON but resolvers are not given +WARNING: detect_conflict is enabled but "track_commit_timestamp" is OFF, the last_update_wins resolution may not work +HINT: Enable "track_commit_timestamp". WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ @@ -442,13 +444,13 @@ HINT: To initiate replication, you must manually create the replication slot, e -- confirm that the default conflict resolvers have been set SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; - confrtype | confrres -----------------+--------------- - delete_differ | remote_apply + confrtype | confrres +----------------+------------------ + delete_differ | last_update_wins delete_missing | skip - insert_exists | remote_apply - update_differ | remote_apply - update_exists | remote_apply + insert_exists | last_update_wins + update_differ | last_update_wins + update_exists | last_update_wins update_missing | apply_or_skip (6 rows) @@ -487,18 +489,20 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; --try setting resolvers for few types CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true) CONFLICT RESOLVER (insert_exists = 'keep_local', update_missing = 'skip', delete_differ = 'keep_local' ); +WARNING: detect_conflict is enabled but "track_commit_timestamp" is OFF, the last_update_wins resolution may not work +HINT: Enable "track_commit_timestamp". WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. --check if above are configured; for non specified conflict types, default resolvers should be seen SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; - confrtype | confrres -----------------+-------------- + confrtype | confrres +----------------+------------------ delete_differ | keep_local delete_missing | skip insert_exists | keep_local - update_differ | remote_apply - update_exists | remote_apply + update_differ | last_update_wins + update_exists | last_update_wins update_missing | skip (6 rows) @@ -518,14 +522,16 @@ SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; -- setting detect_conflict to true will set default conflict resolvers ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = true); WARNING: Using default conflict resolvers +WARNING: detect_conflict is enabled but "track_commit_timestamp" is OFF, the last_update_wins resolution may not work +HINT: Enable "track_commit_timestamp". SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; - confrtype | confrres -----------------+--------------- - delete_differ | remote_apply + confrtype | confrres +----------------+------------------ + delete_differ | last_update_wins delete_missing | skip - insert_exists | remote_apply - update_differ | remote_apply - update_exists | remote_apply + insert_exists | last_update_wins + update_differ | last_update_wins + update_exists | last_update_wins update_missing | apply_or_skip (6 rows) @@ -538,13 +544,13 @@ ERROR: foo is not a valid conflict resolver -- ok - valid conflict type and resolver ALTER SUBSCRIPTION regress_testsub CONFLICT RESOLVER (insert_exists = 'keep_local', update_missing = 'skip', delete_differ = 'keep_local' ); SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; - confrtype | confrres -----------------+-------------- + confrtype | confrres +----------------+------------------ delete_differ | keep_local delete_missing | skip insert_exists | keep_local - update_differ | remote_apply - update_exists | remote_apply + update_differ | last_update_wins + update_exists | last_update_wins update_missing | skip (6 rows) diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index e6f07fa..9620a4b 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -18,7 +18,8 @@ my $offset = 0; # on the publisher. sub test_skip_lsn { - my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg) + my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, + $msg, $conflict_detection) = @_; # Wait until a conflict occurs on the subscriber. @@ -26,13 +27,25 @@ sub test_skip_lsn "SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'" ); + my $lsn; + my $contents = slurp_file($node_subscriber->logfile, $offset); + # Get the finish LSN of the error transaction, mapping the expected # ERROR with its CONTEXT when retrieving this information. - my $contents = slurp_file($node_subscriber->logfile, $offset); - $contents =~ - qr/conflict insert_exists detected on relation "public.tbl".*\n.*DETAIL:.* Key \(i\)=\(1\) already exists in unique index "tbl_pkey", which was modified by origin \d+ in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m - or die "could not get error-LSN"; - my $lsn = $1; + if ($conflict_detection) + { + $contents =~ + qr/conflict insert_exists detected on relation "public.tbl".*\n.*DETAIL:.* Key \(i\)=\(1\) already exists in unique index "tbl_pkey", which was modified by origin \d+ in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m + or die "could not get error-LSN"; + $lsn = $1; + } + else + { + $contents =~ + qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m + or die "could not get error-LSN"; + $lsn = $1; + } # Set skip lsn. $node_subscriber->safe_psql('postgres', @@ -110,7 +123,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR TABLE tbl"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on, detect_conflict = on)" + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, detect_conflict = on)" ); # Set 'ERROR' conflict resolver for 'insert_exist' conflict type @@ -148,7 +161,22 @@ INSERT INTO tbl VALUES (1, NULL); COMMIT; ]); test_skip_lsn($node_publisher, $node_subscriber, - "(2, NULL)", "2", "test skipping transaction"); + "(2, NULL)", "2", "test skipping transaction", 1); + +# Cleanup before we start PREPARE AND COMMIT PREPARED tests +$node_subscriber->safe_psql('postgres', "TRUNCATE tbl"); +$node_publisher->safe_psql('postgres', "TRUNCATE tbl"); + +# Drop subscription and recreate with two_phase enabled +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub'); + +$node_subscriber->safe_psql('postgres', "INSERT INTO tbl VALUES (1, NULL)"); # Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and # PREPARE the transaction, raising an error. Then skip the transaction. @@ -161,7 +189,7 @@ PREPARE TRANSACTION 'gtx'; COMMIT PREPARED 'gtx'; ]); test_skip_lsn($node_publisher, $node_subscriber, - "(3, NULL)", "3", "test skipping prepare and commit prepared "); + "(2, NULL)", "2", "test skipping prepare and commit prepared ", 0); # Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB # limit, also raising an error on the subscriber during applying spooled @@ -174,17 +202,14 @@ INSERT INTO tbl SELECT i, sha256(i::text::bytea) FROM generate_series(1, 10000) COMMIT; ]); test_skip_lsn($node_publisher, $node_subscriber, - "(4, sha256(4::text::bytea))", - "4", "test skipping stream-commit"); + "(3, sha256(4::text::bytea))", + "3", "test skipping stream-commit", 0); $result = $node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM pg_prepared_xacts"); is($result, "0", "check all prepared transactions are resolved on the subscriber"); -# Reset conflict resolver for 'insert_exist' conflict type to default. -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION sub CONFLICT RESOLVER (insert_exists=remote_apply)"); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/034_conflict_resolver.pl b/src/test/subscription/t/034_conflict_resolver.pl index 58751c5..e961809 100755 --- a/src/test/subscription/t/034_conflict_resolver.pl +++ b/src/test/subscription/t/034_conflict_resolver.pl @@ -54,11 +54,11 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype" ); -is( $result, qq(delete_differ|remote_apply +is( $result, qq(delete_differ|last_update_wins delete_missing|skip -insert_exists|remote_apply -update_differ|remote_apply -update_exists|remote_apply +insert_exists|last_update_wins +update_differ|last_update_wins +update_exists|last_update_wins update_missing|apply_or_skip), "confirm that the default conflict resolvers are in place"); @@ -66,6 +66,11 @@ update_missing|apply_or_skip), # Test 'remote_apply' for 'insert_exists' ############################################ +# Change CONFLICT RESOLVER of insert_exists to remote_apply +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (insert_exists = 'remote_apply');" +); + # Create local data on the subscriber $node_subscriber->safe_psql('postgres', "INSERT INTO conf_tab(a, data) VALUES (1,'fromsub')"); @@ -148,6 +153,34 @@ $node_subscriber->wait_for_log( # Truncate table on subscriber to get rid of the error $node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); +############################################ +# Test 'last_update_wins' for 'insert_exists' +############################################ + +# Change CONFLICT RESOLVER of insert_exists to last_update_wins +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (insert_exists = 'last_update_wins');" +); + +# Create local data on the subscriber +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (4,'fromsub')"); + +# Create conflicting data on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (4,'frompub')"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict insert_exists detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that remote insert is converted to an update and the remote data is updated. +$result = $node_subscriber->safe_psql('postgres', + "SELECT data FROM conf_tab WHERE (a=4);"); + +is($result, 'frompub', "remote data wins"); + ################################### # Test 'skip' for 'delete_missing' ################################### @@ -202,16 +235,49 @@ $node_subscriber->safe_psql( $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); -######################################### -# Test 'remote_apply' for 'delete_differ' -######################################### +############################################# +# Test 'last_update_wins' for 'delete_differ' +############################################# + +# Change CONFLICT RESOLVER of delete_differ to last_update_wins +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (delete_differ = 'last_update_wins');" +); # Insert data in the publisher $node_publisher->safe_psql( 'postgres', "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); INSERT INTO conf_tab(a, data) VALUES (2,'frompub'); - INSERT INTO conf_tab(a, data) VALUES (3,'frompub');"); + INSERT INTO conf_tab(a, data) VALUES (3,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (4,'frompub');"); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=4);"); + +# Create a conflicting delete on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE (a=4);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict delete_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the remote delete the local updated row +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=4);"); + +is($result, '', "delete from remote wins"); + +######################################### +# Test 'remote_apply' for 'delete_differ' +######################################### + +# Change CONFLICT RESOLVER of delete_differ to remote_apply +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (delete_differ = 'remote_apply');" +); # Modify data on the subscriber $node_subscriber->safe_psql('postgres', @@ -299,9 +365,9 @@ $node_subscriber->safe_psql( # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); -######################################### -# Test 'remote_apply' for 'update_differ' -######################################### +############################################# +# Test 'last_update_wins' for 'update_differ' +############################################# # Insert data in the publisher $node_publisher->safe_psql( @@ -330,6 +396,34 @@ $result = $node_subscriber->safe_psql('postgres', is($result, 'frompubnew', "update from remote is kept"); ######################################### +# Test 'remote_apply' for 'update_differ' +######################################### + +# Change CONFLICT RESOLVER of update_differ to remote_apply +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (update_differ = 'remote_apply');" +); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub2' WHERE (a=1);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew2' WHERE (a=1);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict update_differ detected on relation \"public.conf_tab\"/, + $log_offset); + +# Confirm that the remote update overrides the local update +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=1);"); + +is($result, 'frompubnew2', "update from remote is kept"); + +######################################### # Test 'keep_local' for 'update_differ' ######################################### diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ae1db1a..77c6469 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1565,6 +1565,7 @@ LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData +LogicalRepClockSkewAction LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct -- 1.8.3.1