From 75077a9567540dcf973b9f4dd7840e9bbf28dca3 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 2 Jul 2024 11:06:28 +0530 Subject: [PATCH v3 4/5] Manage Clock skew and implement last_update_wins This patch attempts to manage clock skew between nodes by introducing two new GUCs: a) max_logical_rep_clock_skew b) max_logical_rep_clock_skew_action This patch also implements last_update_wins resolver. --- src/backend/replication/logical/conflict.c | 88 ++++++++++++-- src/backend/replication/logical/origin.c | 1 + src/backend/replication/logical/worker.c | 108 +++++++++++++++++- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 26 +++++ src/backend/utils/misc/postgresql.conf.sample | 6 +- src/include/catalog/pg_conflict.dat | 4 +- src/include/replication/conflict.h | 3 + src/include/replication/logicalworker.h | 17 +++ src/include/replication/origin.h | 1 + src/include/utils/timestamp.h | 1 + .../regress/expected/conflict_resolver.out | 22 ++-- src/tools/pgindent/typedefs.list | 1 + 13 files changed, 253 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index a622a0ea8b..cbd3a65a27 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -43,6 +43,7 @@ const char *const ConflictTypeNames[] = { const char *const ConflictResolverNames[] = { [CR_REMOTE_APPLY] = "remote_apply", [CR_KEEP_LOCAL] = "keep_local", + [CR_LAST_UPDATE_WINS] = "last_update_wins", [CR_APPLY_OR_SKIP] = "apply_or_skip", [CR_APPLY_OR_ERROR] = "apply_or_error", [CR_SKIP] = "skip", @@ -68,8 +69,8 @@ const char *const ConflictResolverNames[] = { * friendly name for a resolver and thus has been added here. */ const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { - [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, - [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, + [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, + [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, [CT_UPDATE_MISSING] = {CR_APPLY_OR_SKIP, CR_APPLY_OR_ERROR, CR_SKIP, CR_ERROR}, [CT_DELETE_MISSING] = {CR_SKIP, CR_ERROR} }; @@ -79,8 +80,8 @@ const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { * If this changes, change it in pg_conflict.dat as well. */ const int ConflictTypeDefaultResolvers[] = { - [CT_INSERT_EXISTS] = CR_REMOTE_APPLY, - [CT_UPDATE_DIFFER] = CR_REMOTE_APPLY, + [CT_INSERT_EXISTS] = CR_LAST_UPDATE_WINS, + [CT_UPDATE_DIFFER] = CR_LAST_UPDATE_WINS, [CT_UPDATE_MISSING] = CR_APPLY_OR_SKIP, [CT_DELETE_MISSING] = CR_SKIP }; @@ -211,15 +212,36 @@ errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, get_rel_name(conflictidx)); } else - return errdetail("Key already exists. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); + { + if (resolver == CR_LAST_UPDATE_WINS) + return errdetail("Key already exists. Applying resolution method \"%s\". The local tuple : origin=%u, timestamp=%s; The remote tuple : origin=%u, timestamp=%s.", + ConflictResolverNames[resolver], + localorigin, timestamptz_to_str(localts), + replorigin_session_origin, + timestamptz_to_str(replorigin_session_origin_timestamp)); + else + return errdetail("Key already exists. Applying resolution method \"%s\"", + ConflictResolverNames[resolver]); + } + } case CT_UPDATE_DIFFER: - return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\"", - localorigin, localxmin, timestamptz_to_str(localts), ConflictResolverNames[resolver]); + if (resolver == CR_LAST_UPDATE_WINS) + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\". The remote tuple : origin=%u, timestamp=%s.", + localorigin, localxmin, timestamptz_to_str(localts), + ConflictResolverNames[resolver], + replorigin_session_origin, + timestamptz_to_str(replorigin_session_origin_timestamp)); + else + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\"", + localorigin, localxmin, timestamptz_to_str(localts), + ConflictResolverNames[resolver]); case CT_UPDATE_MISSING: - return errdetail("Did not find the row to be updated. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); + return errdetail("Did not find the row to be updated. Applying resolution method \"%s\"", + ConflictResolverNames[resolver]); case CT_DELETE_MISSING: - return errdetail("Did not find the row to be deleted. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); + return errdetail("Did not find the row to be deleted. Applying resolution method \"%s\"", + ConflictResolverNames[resolver]); } return 0; /* silence compiler warning */ @@ -330,6 +352,15 @@ validate_conflict_type_and_resolver(char *conflict_type, char *conflict_resolver errmsg("%s is not a valid conflict resolver for conflict type %s", conflict_resolver, conflict_type)); + + if ((resolver == CR_LAST_UPDATE_WINS) && !track_commit_timestamp) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("resolver %s requires %s to be enabled", + conflict_resolver, "track_commit_timestamp"), + errhint("Make sure the configuration parameter %s is set.", + "track_commit_timestamp")); + return type; } @@ -371,6 +402,42 @@ get_conflict_resolver_internal(ConflictType type) return resolver; } +/* + * Compare the timestamps of given local tuple and the remote tuple to + * resolve the conflict. + * + * Returns true if remote tuple has the latest timestamp, false otherwise. + */ +static bool +resolve_by_timestamp(TupleTableSlot *localslot) +{ + TransactionId local_xmin; + TimestampTz local_ts; + RepOriginId local_origin; + int ts_cmp; + uint64 local_system_identifier; + + /* Get origin and timestamp info of the local tuple */ + GetTupleCommitTs(localslot, &local_xmin, &local_origin, &local_ts); + + /* Compare the timestamps of remote & local tuple to decide the winner */ + ts_cmp = timestamptz_cmp_internal(replorigin_session_origin_timestamp, + local_ts); + + if (ts_cmp == 0) + { + elog(LOG, "Timestamps of remote and local tuple are equal, comparing remote and local system identifiers"); + + /* Get current system's identifier */ + local_system_identifier = GetSystemIdentifier(); + + return local_system_identifier <= replorigin_session_origin_sysid; + } + else + return (ts_cmp > 0); + +} + /* * Check if a full tuple can be created from the new tuple. * Return true if yes, false otherwise. @@ -493,6 +560,9 @@ GetConflictResolver(TupleTableSlot *localslot, Relation localrel, switch (resolver) { + case CR_LAST_UPDATE_WINS: + *apply_remote = resolve_by_timestamp(localslot); + break; case CR_REMOTE_APPLY: *apply_remote = true; break; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 419e4814f0..bd8e6f0024 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -155,6 +155,7 @@ typedef struct ReplicationStateCtl RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; TimestampTz replorigin_session_origin_timestamp = 0; +uint64 replorigin_session_origin_sysid = 0; /* * Base address into a shared memory array of replication states of size diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c4dc03e964..55fb4f18f5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -318,6 +318,19 @@ static uint32 parallel_stream_nchanges = 0; /* Are we initializing an apply worker? */ bool InitializingApplyWorker = false; +/* + * GUC support + */ +const struct config_enum_entry logical_rep_clock_skew_action_options[] = { + {"error", LR_CLOCK_SKEW_ACTION_ERROR, false}, + {"wait", LR_CLOCK_SKEW_ACTION_WAIT, false}, + {NULL, 0, false} +}; + +/* GUCs */ +int max_logical_rep_clock_skew = 0; +int max_logical_rep_clock_skew_action = LR_CLOCK_SKEW_ACTION_ERROR; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -987,6 +1000,86 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, ExecStoreVirtualTuple(slot); } +/* + * Manage clock skew between nodes. + * + * It checks if the remote commit timestamp is ahead of the local clock + * and if the difference exceeds max_logical_rep_clock_skew, it performs + * the action specified by the max_logical_rep_clock_skew_action. + */ +static void +manage_clock_skew() +{ + TimestampTz current; + TimestampTz delayUntil; + long msecs; + int rc; + + /* nothing to do if no max clock skew configured */ + if (max_logical_rep_clock_skew == LR_CLOCK_SKEW_DEFAULT) + return; + + current = GetCurrentTimestamp(); + + /* + * If the timestamp of the currently replayed transaction is in the future + * compared to the current time on the subscriber and the difference is + * larger than max_logical_rep_clock_skew, then perform the action + * specified by the max_logical_rep_clock_skew_action setting. + */ + if (replorigin_session_origin_timestamp > current && + TimestampDifferenceExceeds(current, replorigin_session_origin_timestamp, + max_logical_rep_clock_skew * 1000)) + { + if (max_logical_rep_clock_skew_action == LR_CLOCK_SKEW_ACTION_ERROR) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("logical replication clock skew exceeded max tolerated value of %d seconds", + max_logical_rep_clock_skew))); + + /* Perform the wait */ + while (true) + { + delayUntil = + TimestampTzMinusSeconds(replorigin_session_origin_timestamp, + max_logical_rep_clock_skew); + + /* Exit without waiting if it's already past 'delayUntil' time */ + msecs = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), + delayUntil); + if (msecs <= 0) + break; + + elog(LOG, "delaying apply for %ld milliseconds to bring clock skew " + "within permissible value of %d seconds", + msecs, max_logical_rep_clock_skew); + + /* Sleep until we are signaled or msecs have elapsed */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + msecs, + WAIT_EVENT_LOGICAL_CLOCK_SKEW); + + /* Exit the loop if msecs have elapsed */ + if (rc & WL_TIMEOUT) + break; + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* This might change max_logical_rep_clock_skew. */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + } +} + /* * Handle BEGIN message. */ @@ -1008,6 +1101,12 @@ apply_handle_begin(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Capture the commit timestamp of the remote transaction */ + replorigin_session_origin_timestamp = begin_data.committime; + + /* Check if there is any clock skew and take configured action */ + manage_clock_skew(); } /* @@ -4669,6 +4768,7 @@ run_apply_worker() TimeLineID startpointTLI; char *err; bool must_use_password; + char *replorigin_sysid; slotname = MySubscription->slotname; @@ -4708,10 +4808,12 @@ run_apply_worker() errmsg("could not connect to the publisher: %s", err))); /* - * We don't really use the output identify_system for anything but it does - * some initializations on the upstream so let's still call it. + * Call identify_system to do some initializations on the upstream and + * store the output as system identifier of the replication origin node. */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + replorigin_sysid = walrcv_identify_system(LogRepWorkerWalRcvConn, + &startpointTLI); + replorigin_session_origin_sysid = strtoul(replorigin_sysid, NULL, 10); set_apply_error_context_origin(originname); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index db37beeaae..a51f82169e 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -59,6 +59,7 @@ CHECKPOINTER_MAIN "Waiting in main loop of checkpointer process." LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." +LOGICAL_CLOCK_SKEW "Waiting in apply-begin of logical replication apply process to bring clock skew in permissible range." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index d28b0bcb40..6dc23f28cf 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -67,6 +67,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" @@ -492,6 +493,7 @@ extern const struct config_enum_entry archive_mode_options[]; extern const struct config_enum_entry recovery_target_action_options[]; extern const struct config_enum_entry wal_sync_method_options[]; extern const struct config_enum_entry dynamic_shared_memory_options[]; +extern const struct config_enum_entry logical_rep_clock_skew_action_options[]; /* * GUC option variables that are exported from this module @@ -3649,6 +3651,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets maximum clock skew tolerance between logical " + "replication nodes beyond which action configured " + "in max_logical_rep_clock_skew_action is triggered."), + NULL, + GUC_UNIT_S + }, + &max_logical_rep_clock_skew, + LR_CLOCK_SKEW_DEFAULT, LR_CLOCK_SKEW_DEFAULT, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -4915,6 +4930,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew_action", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets the action to perform if a clock skew higher " + "than max_logical_rep_clock_skew is detected."), + NULL + }, + &max_logical_rep_clock_skew_action, + LR_CLOCK_SKEW_ACTION_ERROR, logical_rep_clock_skew_action_options, + NULL, NULL, NULL + }, + { {"track_functions", PGC_SUSET, STATS_CUMULATIVE, gettext_noop("Collects function-level statistics on database activity."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9ec9f97e92..3a7fd70506 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -383,7 +383,11 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers - +#max_logical_rep_clock_skew = -1 # maximum clock skew tolerance between logical + # replication nodes beyond which action configured in + # 'max_logical_rep_clock_skew_action' is triggered. +#max_logical_rep_clock_skew_action = error # error or wait + # (change requires restart) #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/include/catalog/pg_conflict.dat b/src/include/catalog/pg_conflict.dat index a3e5e10d97..2f7fd2394a 100644 --- a/src/include/catalog/pg_conflict.dat +++ b/src/include/catalog/pg_conflict.dat @@ -12,8 +12,8 @@ [ -{ conftype => 'insert_exists', confres => 'remote_apply' }, -{ conftype => 'update_differ', confres => 'remote_apply' }, +{ conftype => 'insert_exists', confres => 'last_update_wins' }, +{ conftype => 'update_differ', confres => 'last_update_wins' }, { conftype => 'update_missing', confres => 'apply_or_skip' }, { conftype => 'delete_missing', confres => 'skip' } diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index aa21886ee7..82f4f5ec49 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -54,6 +54,9 @@ typedef enum ConflictResolver /* Keep the local change */ CR_KEEP_LOCAL, + /* Apply the change with latest timestamp */ + CR_LAST_UPDATE_WINS, + /* Apply the remote change; skip if it can not be applied */ CR_APPLY_OR_SKIP, diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..2b922f9c62 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,7 +14,24 @@ #include +/* + * The default for max_logical_rep_clock_skew is -1, which means ignore clock + * skew (the check is turned off). + */ +#define LR_CLOCK_SKEW_DEFAULT -1 + +/* + * Worker Clock Skew Action. + */ +typedef enum +{ + LR_CLOCK_SKEW_ACTION_ERROR, + LR_CLOCK_SKEW_ACTION_WAIT, +} LogicalRepClockSkewAction; + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; +extern PGDLLIMPORT int max_logical_rep_clock_skew; +extern PGDLLIMPORT int max_logical_rep_clock_skew_action; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 7189ba9e76..dcbbbdf6ea 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -36,6 +36,7 @@ typedef struct xl_replorigin_drop extern PGDLLIMPORT RepOriginId replorigin_session_origin; extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; +extern PGDLLIMPORT uint64 replorigin_session_origin_sysid; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(const char *roname, bool missing_ok); diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index a6ce03ed46..53b828d89d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -84,6 +84,7 @@ IntervalPGetDatum(const Interval *X) /* Macros for doing timestamp arithmetic without assuming timestamp's units */ #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000)) #define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000)) +#define TimestampTzMinusSeconds(tz,s) ((tz) - ((s) * (int64) 1000000)) /* Set at postmaster start */ diff --git a/src/test/regress/expected/conflict_resolver.out b/src/test/regress/expected/conflict_resolver.out index 15c8c0cdba..d0157ba830 100644 --- a/src/test/regress/expected/conflict_resolver.out +++ b/src/test/regress/expected/conflict_resolver.out @@ -1,23 +1,23 @@ --check default global resolvers in system catalog select * from pg_conflict order by conftype; - conftype | confres -----------------+--------------- + conftype | confres +----------------+------------------ delete_missing | skip - insert_exists | remote_apply - update_differ | remote_apply + insert_exists | last_update_wins + update_differ | last_update_wins update_missing | apply_or_skip (4 rows) -- -- Test of SET/RESET CONFLICT RESOLVER with invalid names -- -SET CONFLICT RESOLVER 'keep_local' for 'aaaa'; -- fail +SET CONFLICT RESOLVER 'keep_local' for 'aaaa'; -- fail ERROR: aaaa is not a valid conflict type -SET CONFLICT RESOLVER 'bbbbb' for 'delete_missing'; -- fail +SET CONFLICT RESOLVER 'bbbbb' for 'delete_missing'; -- fail ERROR: bbbbb is not a valid conflict resolver -SET CONFLICT RESOLVER 'remote_apply' for 'delete_missing'; -- fail +SET CONFLICT RESOLVER 'remote_apply' for 'delete_missing'; -- fail ERROR: remote_apply is not a valid conflict resolver for conflict type delete_missing -RESET CONFLICT RESOLVER for 'ct'; -- fail +RESET CONFLICT RESOLVER for 'ct'; -- fail ERROR: ct is not a valid conflict type -- -- Test of SET/RESET CONFLICT RESOLVER with valid names @@ -40,10 +40,10 @@ RESET CONFLICT RESOLVER for 'delete_missing'; RESET CONFLICT RESOLVER for 'insert_exists'; --check resolvers are reset to default for delete_missing and insert_exists select * from pg_conflict order by conftype; - conftype | confres -----------------+---------------- + conftype | confres +----------------+------------------ delete_missing | skip - insert_exists | remote_apply + insert_exists | last_update_wins update_differ | keep_local update_missing | apply_or_error (4 rows) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 784194903a..15e5a494ba 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1565,6 +1565,7 @@ LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData +LogicalRepClockSkewAction LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct -- 2.34.1