From 197f319cf26f0f2265bb90a42df8ecfa91eb3927 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 26 Jun 2025 10:47:48 +0800 Subject: [PATCH v42 5/5] Allow altering retain_conflict_info for enabled subscription This patch removes the restriction on altering retain_conflict_info when the subscription is enabled, and resolves race condition issues caused by the new option value being asynchronously acknowledged by the launcher and apply workers. First, without this restriction, the oldest_nonremovable_xid maintained by the apply worker could become invalid during transaction ID wraparound if the slot does not yet exist but the worker has already started to maintain the value. For example, this race condition can occur when a user disables and immediately re-enables the retain_conflict_info option. In this case, the launcher might drop the slot upon noticing the disable action, while the apply worker may keep maintaining oldest_nonremovable_xid without noticing the option change. During this period, a transaction ID wraparound could falsely make this ID appear as if it originates from the future w.r.t the transaction ID stored in the slot maintained by launcher. To address this issue, we define the oldest_nonremovable_xid as FullTransactionID so that even if the warparound happens, we can correctly identity if the transaction ID a old or new one. Second, when the launcher collects a new xmin value for the conflict detection replication slot, the new xmin might be older than the current xmin. This can happen when: 1) retain_conflict_info is disabled and immediately re-enabled similar to the case mentioned in the first issue. In this case, if the some transaction IDs are assigned before the slot creation, the newly created slot could have a newer xmin compared to the worker. 2) the user enables retain_conflict_info concurrently with the launcher starting the worker, the apply worker may start calculating oldest_nonremovable_xid before the launcher notices the enable action. Consequently, the launcher may update slot.xmin to a newer value than that maintained by the worker. In subsequent cycles, upon integrating the worker's oldest_nonremovable_xid, the launcher might detect a regression in the calculated xmin, necessitating additional handling. We address this by adding a safeguard check when advancing slot.xmin to prevent backward movement, which should be sufficient since we don't guarantee protection for rows deleted before slot creation. --- doc/src/sgml/ref/alter_subscription.sgml | 5 +- src/backend/commands/subscriptioncmds.c | 70 +----------- .../replication/logical/applyparallelworker.c | 3 +- src/backend/replication/logical/launcher.c | 86 +++++++++------ src/backend/replication/logical/tablesync.c | 3 +- src/backend/replication/logical/worker.c | 101 +++++++----------- src/include/replication/logicalworker.h | 3 +- src/include/replication/worker_internal.h | 29 ++++- src/test/subscription/t/035_conflicts.pl | 17 +-- 9 files changed, 132 insertions(+), 185 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index da9b559f18e..ac431034e0e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -262,9 +262,8 @@ ALTER SUBSCRIPTION name RENAME TO < - The failover, - two_phase, and - retain_conflict_info + The failover + and two_phase parameters can only be altered when the subscription is disabled. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9e47909a30e..ca2fe71e781 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -649,7 +649,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* Ensure that we can enable retainconflictinfo. */ CheckSubConflictInfoRetention(opts.retainconflictinfo, true, - !opts.enabled, WARNING); + !opts.enabled); if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) @@ -1065,22 +1065,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* - * Common checks for altering failover, two_phase, and retain_conflict_info - * options. + * Common checks for altering failover and two_phase options. */ static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel) { Assert(strcmp(option, "failover") == 0 || - strcmp(option, "two_phase") == 0 || - strcmp(option, "retain_conflict_info") == 0); - - /* - * Altering the retain_conflict_info option does not update the slot on - * the publisher. - */ - Assert(!slot_needs_update || strcmp(option, "retain_conflict_info") != 0); + strcmp(option, "two_phase") == 0); /* * Do not allow changing the option if the subscription is enabled. This @@ -1092,41 +1084,6 @@ CheckAlterSubOption(Subscription *sub, const char *option, * the publisher by the existing walsender, so we could have allowed that * even when the subscription is enabled. But we kept this restriction for * the sake of consistency and simplicity. - * - * Additionally, do not allow changing the retain_conflict_info option - * when the subscription is enabled to prevent race conditions arising - * from the new option value being acknowledged asynchronously by the - * launcher and apply workers. - * - * Without the restriction, a race condition may arise when a user - * disables and immediately re-enables the retain_conflict_info option. In - * this case, the launcher might drop the slot upon noticing the disabled - * action, while the apply worker may keep maintaining - * oldest_nonremovable_xid without noticing the option change. During this - * period, a transaction ID wraparound could falsely make this ID appear - * as if it originates from the future w.r.t the transaction ID stored in - * the slot maintained by launcher. - * - * Similarly, if the user enables retain_conflict_info concurrently with - * the launcher starting the worker, the apply worker may start - * calculating oldest_nonremovable_xid before the launcher notices the - * enable action. Consequently, the launcher may update slot.xmin to a - * newer value than that maintained by the worker. In subsequent cycles, - * upon integrating the worker's oldest_nonremovable_xid, the launcher - * might detect a retreat in the calculated xmin, necessitating additional - * handling. - * - * XXX To address the above race conditions, we can define - * oldest_nonremovable_xid as FullTransactionID and adds the check to - * disallow retreating the conflict slot's xmin. For now, we kept the - * implementation simple by disallowing change to the - * retain_conflict_info, but in the future we can change this after some - * more analysis. - * - * Note that we could restrict only the enabling of retain_conflict_info - * to avoid the race conditions described above, but we maintain the - * restriction for both enable and disable operations for the sake of - * consistency. */ if (sub->enabled) ereport(ERROR, @@ -1396,28 +1353,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, BoolGetDatum(opts.retainconflictinfo); replaces[Anum_pg_subscription_subretainconflictinfo - 1] = true; - CheckAlterSubOption(sub, "retain_conflict_info", false, isTopLevel); - - /* - * Workers may continue running even after the - * subscription has been disabled. - * - * To prevent race conditions (as described in - * CheckAlterSubOption()), ensure that all worker - * processes have already exited before proceeding. - */ - if (logicalrep_workers_find(subid, true, true)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot alter retain_conflict_info when logical replication worker is still running"), - errhint("Try again after some time."))); - /* * Remind the user that enabling subscription will prevent * the accumulation of dead tuples. */ CheckSubConflictInfoRetention(opts.retainconflictinfo, - true, !sub->enabled, NOTICE); + true, !sub->enabled); /* * Notify the launcher to manage the replication slot for @@ -1458,8 +1399,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * comments atop CheckSubConflictInfoRetention() for details. */ CheckSubConflictInfoRetention(sub->retainconflictinfo, - opts.enabled, !opts.enabled, - WARNING); + opts.enabled, !opts.enabled); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1fa931a7422..d25085d3515 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -441,8 +441,7 @@ pa_launch_parallel_worker(void) MySubscription->name, MyLogicalRepWorker->userid, InvalidOid, - dsm_segment_handle(winfo->dsm_seg), - false); + dsm_segment_handle(winfo->dsm_seg)); if (launched) { diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 5ed280d57f2..414c12797cd 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -105,11 +105,11 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, bool retain_conflict_info, - TransactionId *xmin, + FullTransactionId *xmin, bool *can_advance_xmin, bool *stop_retention); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void advance_conflict_slot_xmin(FullTransactionId new_xmin); static void invalidate_conflict_slot(void); @@ -321,8 +321,7 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid, dsm_handle subworker_dsm, - bool retain_conflict_info) + Oid relid, dsm_handle subworker_dsm) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -341,13 +340,10 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * - must be valid worker type * - tablesync workers are only ones to have relid * - parallel apply worker is the only kind of subworker - * - The replication slot used in conflict detection is created when - * retain_conflict_info is enabled */ Assert(wtype != WORKERTYPE_UNKNOWN); Assert(is_tablesync_worker == OidIsValid(relid)); Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); - Assert(!retain_conflict_info || MyReplicationSlot); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -470,10 +466,8 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; - worker->oldest_nonremovable_xid = retain_conflict_info - ? MyReplicationSlot->data.xmin - : InvalidTransactionId; - worker->stop_conflict_info_retention = (retain_conflict_info && + worker->oldest_nonremovable_xid = InvalidFullTransactionId; + worker->stop_conflict_info_retention = (MyReplicationSlot && MyReplicationSlot->data.invalidated != RS_INVAL_NONE); worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); @@ -1194,7 +1188,7 @@ ApplyLauncherMain(Datum main_arg) bool can_advance_xmin = true; bool retain_conflict_info = false; bool stop_retention = true; - TransactionId xmin = InvalidTransactionId; + FullTransactionId xmin = InvalidFullTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1267,8 +1261,7 @@ ApplyLauncherMain(Datum main_arg) if (!logicalrep_worker_launch(WORKERTYPE_APPLY, sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, - DSM_HANDLE_INVALID, - sub->retainconflictinfo)) + DSM_HANDLE_INVALID)) { /* * We get here either if we failed to launch a worker @@ -1348,7 +1341,7 @@ ApplyLauncherMain(Datum main_arg) */ static void compute_min_nonremovable_xid(LogicalRepWorker *worker, - bool retain_conflict_info, TransactionId *xmin, + bool retain_conflict_info, FullTransactionId *xmin, bool *can_advance_xmin, bool *stop_retention) { if (!retain_conflict_info) @@ -1357,13 +1350,7 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, if (worker) { bool stop_conflict_info_retention; - TransactionId nonremovable_xid; - - /* - * The replication slot for conflict detection must be created before - * the worker starts. - */ - Assert(MyReplicationSlot); + FullTransactionId nonremovable_xid; SpinLockAcquire(&worker->relmutex); nonremovable_xid = worker->oldest_nonremovable_xid; @@ -1383,10 +1370,14 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, if (stop_conflict_info_retention || !*can_advance_xmin) return; - Assert(TransactionIdIsValid(nonremovable_xid)); - - if (!TransactionIdIsValid(*xmin) || - TransactionIdPrecedes(nonremovable_xid, *xmin)) + /* + * Stop advancing xmin if an invalid non-removable transaction ID is + * found, otherwise update xmin. + */ + if (!FullTransactionIdIsValid(nonremovable_xid)) + *can_advance_xmin = false; + else if (!FullTransactionIdIsValid(*xmin) || + FullTransactionIdPrecedes(nonremovable_xid, *xmin)) *xmin = nonremovable_xid; } else @@ -1451,19 +1442,48 @@ acquire_conflict_slot_if_exists(void) * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +advance_conflict_slot_xmin(FullTransactionId new_xmin) { + FullTransactionId full_xmin; + FullTransactionId next_full_xid; + Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(FullTransactionIdIsValid(new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ - if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) + next_full_xid = ReadNextFullTransactionId(); + + /* + * Compute FullTransactionId for the current xmin. This handles the case + * where transaction ID wraparound has occurred. + */ + full_xmin = FullTransactionIdFromAllowableAt(next_full_xid, + MyReplicationSlot->data.xmin); + + /* + * Do not allow the xmin to go backwards. The newly computed xmin might be + * older than the current xmin if the slot was created after the apply + * worker began maintaining oldest_nonremovable_xid. This can occur if a + * user disables and immediately re-enables the retain_conflict_info + * option. In this case, the launcher might drop the slot upon noticing the + * disable action, while the apply worker may keep running with an old + * oldest_nonremovable_xid without noticing the option change. During this + * period, if the some transaction IDs are assigned, the newly created slot + * will have a newer xmin compared to the worker. + * + * Similarily, if the user enables retain_conflict_info concurrently with + * the launcher starting the worker, the apply worker may start calculating + * oldest_nonremovable_xid before the launcher notices the enable action. + * Consequently, the launcher may update slot.xmin to a newer value than + * that maintained by the worker. In subsequent cycles, upon integrating + * the worker's oldest_nonremovable_xid, the launcher might detect a + * regression in the calculated xmin, necessitating additional handling. + */ + if (FullTransactionIdPrecedesOrEquals(new_xmin, full_xmin)) return; SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin; - MyReplicationSlot->data.xmin = new_xmin; + MyReplicationSlot->effective_xmin = XidFromFullTransactionId(new_xmin); + MyReplicationSlot->data.xmin = XidFromFullTransactionId(new_xmin); SpinLockRelease(&MyReplicationSlot->mutex); elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index accfa94badd..c90f23ee5b0 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -615,8 +615,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->name, MyLogicalRepWorker->userid, rstate->relid, - DSM_HANDLE_INVALID, - false); + DSM_HANDLE_INVALID); } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5eb608d21e..960192cc660 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -302,8 +302,8 @@ typedef struct RetainConflictInfoData * be awaited to complete before * entering the final phase * (RCI_WAIT_FOR_LOCAL_FLUSH) */ - TransactionId candidate_xid; /* candidate for the non-removable - * transaction ID */ + FullTransactionId candidate_xid; /* candidate for the non-removable + * transaction ID */ TimestampTz flushpos_update_time; /* when the remote flush position was * updated in final phase * (RCI_WAIT_FOR_LOCAL_FLUSH) */ @@ -476,8 +476,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, LogicalRepTupleData *newtup, CmdType operation); -static void apply_worker_exit(void); - /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); static void stop_skipping_changes(void); @@ -4199,6 +4197,8 @@ static void get_candidate_xid(RetainConflictInfoData *rci_data) { TransactionId oldest_running_xid; + FullTransactionId next_full_xid; + FullTransactionId full_oldest_xid; TimestampTz now; /* @@ -4226,17 +4226,20 @@ get_candidate_xid(RetainConflictInfoData *rci_data) rci_data->candidate_xid_time = now; oldest_running_xid = GetOldestActiveTransactionId(false); + next_full_xid = ReadNextFullTransactionId(); /* - * Oldest active transaction ID (oldest_running_xid) can't be behind any - * of its previously computed value. + * Compute FullTransactionId for the oldest running transaction ID. This + * handles the case where transaction ID wraparound has occurred. */ - Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, - oldest_running_xid)); + full_oldest_xid = FullTransactionIdFromAllowableAt(next_full_xid, oldest_running_xid); + + Assert(FullTransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + full_oldest_xid)); /* Return if the oldest_nonremovable_xid cannot be advanced */ - if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid, - oldest_running_xid)) + if (FullTransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + full_oldest_xid)) { adjust_xid_advance_interval(rci_data, false); return; @@ -4244,7 +4247,7 @@ get_candidate_xid(RetainConflictInfoData *rci_data) adjust_xid_advance_interval(rci_data, true); - rci_data->candidate_xid = oldest_running_xid; + rci_data->candidate_xid = full_oldest_xid; rci_data->phase = RCI_REQUEST_PUBLISHER_STATUS; /* process the next phase */ @@ -4353,7 +4356,7 @@ static void wait_for_local_flush(RetainConflictInfoData *rci_data) { Assert(!XLogRecPtrIsInvalid(rci_data->remote_lsn) && - TransactionIdIsValid(rci_data->candidate_xid)); + FullTransactionIdIsValid(rci_data->candidate_xid)); /* * We expect the publisher and subscriber clocks to be in sync using time @@ -4445,22 +4448,16 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) /* * If conflict info retention was previously stopped due to a timeout, and * the time required to advance the non-removable transaction ID has now - * decreased to within acceptable limits, log a message and exit. This - * allows the launcher to recreate the replication slot prior to - * restarting the worker. + * decreased to within acceptable limits, log a message. */ if (MyLogicalRepWorker->stop_conflict_info_retention) - { ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" will restart to resume retaining conflict information", + errmsg("logical replication worker for subscription \"%s\" will resume retaining conflict information", MySubscription->name), errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.", LSN_FORMAT_ARGS(rci_data->remote_lsn), max_conflict_retention_duration)); - apply_worker_exit(); - } - /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and @@ -4473,7 +4470,7 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", LSN_FORMAT_ARGS(rci_data->remote_lsn), - rci_data->candidate_xid); + XidFromFullTransactionId(rci_data->candidate_xid)); /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); @@ -4499,7 +4496,7 @@ reset_conflict_info_fields(RetainConflictInfoData *rci_data) rci_data->remote_nextxid = InvalidFullTransactionId; rci_data->reply_time = 0; rci_data->remote_wait_for = InvalidFullTransactionId; - rci_data->candidate_xid = InvalidTransactionId; + rci_data->candidate_xid = InvalidFullTransactionId; } /* @@ -4518,7 +4515,7 @@ should_stop_conflict_info_retention(RetainConflictInfoData *rci_data) { TimestampTz now; - Assert(TransactionIdIsValid(rci_data->candidate_xid)); + Assert(FullTransactionIdIsValid(rci_data->candidate_xid)); Assert(rci_data->phase == RCI_WAIT_FOR_PUBLISHER_STATUS || rci_data->phase == RCI_WAIT_FOR_LOCAL_FLUSH); @@ -4553,7 +4550,7 @@ should_stop_conflict_info_retention(RetainConflictInfoData *rci_data) max_conflict_retention_duration)); SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidFullTransactionId; MyLogicalRepWorker->stop_conflict_info_retention = true; SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -4710,6 +4707,15 @@ maybe_reread_subscription(void) * worker won't restart if the streaming option's value is changed from * 'parallel' to any other value or the server decides not to stream the * in-progress transaction. + * + * Additionally, exit if the retain_conflict_info option was changed. This + * is necessary to reset the oldest non-removable transaction ID and the + * state of advancement. Direct resetting could not work without a + * restart, as the worker might be in an intermediate state (e.g., waiting + * publisher status). If the option is re-enabled before the old publisher + * status is received, it could incorrectly use the old status in a new + * transaction ID advancement cycle, leading to premature advancement of + * the non-removable transaction ID. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || strcmp(newsub->name, MySubscription->name) != 0 || @@ -4719,7 +4725,8 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + newsub->retainconflictinfo != MySubscription->retainconflictinfo) { if (am_parallel_apply_worker()) ereport(LOG, @@ -5399,30 +5406,6 @@ InitializeLogRepWorker(void) apply_worker_exit(); } - /* - * Restart the worker if retain_conflict_info was enabled during startup. - * - * At this point, the replication slot used for conflict detection might - * not exist yet, or could be dropped soon if the launcher perceives - * retain_conflict_info as disabled. To avoid unnecessary tracking of - * oldest_nonremovable_xid when the slot is absent or at risk of being - * dropped, a restart is initiated. - * - * The oldest_nonremovable_xid should be initialized only when the - * retain_conflict_info is enabled before launching the worker. See - * logicalrep_worker_launch. - */ - if (am_leader_apply_worker() && - MySubscription->retainconflictinfo && - !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) - { - ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup", - MySubscription->name, "retain_conflict_info")); - - apply_worker_exit(); - } - /* Setup synchronous commit according to the user's wishes */ SetConfigOption("synchronous_commit", MySubscription->synccommit, PGC_BACKEND, PGC_S_OVERRIDE); @@ -5585,7 +5568,7 @@ DisableSubscriptionAndExit(void) * context. */ CheckSubConflictInfoRetention(MySubscription->retainconflictinfo, false, - true, WARNING); + true); proc_exit(0); } @@ -5963,18 +5946,13 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) * this setting can be adjusted after subscription creation. Without it, the * apply worker will simply skip conflict detection. * - * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an - * ERROR since users can only modify retain_conflict_info for disabled - * subscriptions. And as long as the subscription is enabled promptly, it will - * not pose issues. + * Issue a WARNING if the subscription is disabled. Do not raise an ERROR since + * as long as the subscription is enabled promptly, it will not pose issues. */ void CheckSubConflictInfoRetention(bool retain_conflict_info, bool check_guc, - bool sub_disabled, int elevel_for_sub_disabled) + bool sub_disabled) { - Assert(elevel_for_sub_disabled == NOTICE || - elevel_for_sub_disabled == WARNING); - if (!retain_conflict_info) return; @@ -5992,10 +5970,9 @@ CheckSubConflictInfoRetention(bool retain_conflict_info, bool check_guc, "track_commit_timestamp")); if (sub_disabled) - ereport(elevel_for_sub_disabled, + ereport(WARNING, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), - (elevel_for_sub_disabled > NOTICE) - ? errhint("Consider setting %s to false.", - "retain_conflict_info") : 0); + errhint("Consider setting %s to false.", + "retain_conflict_info")); } diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index ebbb3ef09c7..2d006e7888e 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -32,7 +32,6 @@ extern void AtEOXact_LogicalRepWorkers(bool isCommit); extern void CheckSubConflictInfoRetention(bool retain_conflict_info, bool check_guc, - bool sub_disabled, - int elevel_for_sub_disabled); + bool sub_disabled); #endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 243164aada8..7cfc10d4052 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -94,8 +94,32 @@ typedef struct LogicalRepWorker * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to * decide when to advance the xmin value of the slot. + * + * It's necessary to use FullTransactionId here to mitigate potential race + * conditions. Such scenarios might occur if the replication slot is not + * yet created by the launcher while the apply worker has already + * initialized this field. During this period, a transaction ID wraparound + * could falsely make this ID appear as if it originates from the future + * w.r.t the transaction ID stored in the slot maintained by launcher. See + * advance_conflict_slot_xmin. + * + * Closing this race condition is complex. A potential solution involves + * the apply worker waiting for slot creation before updating + * oldest_nonremovable_xid. However, ensuring a race-free mechanism is + * difficult, especially when users concurrently toggle + * retain_conflict_info. We must prevent the launcher from prematurely + * recreating the slot if a subscription re-enables retain_conflict_info + * after it's been disabled and the slot dropped, and before workers reset + * their oldest_nonremovable_xid. + * + * Another approach could be to manage slot creation and deletion within + * subscription DDLs using strong locks to prevent race conditions. + * However, this method isn't entirely reliable. Commands can be rolled + * back, and even if slot creation is deferred until after all database + * modifications, transaction failures can still occur during the commit + * phase. */ - TransactionId oldest_nonremovable_xid; + FullTransactionId oldest_nonremovable_xid; /* * Indicates whether the apply worker has stopped retaining conflict @@ -262,8 +286,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running, extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, - dsm_handle subworker_dsm, - bool retain_conflict_info); + dsm_handle subworker_dsm); extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 51ad9b40258..d2284b857be 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -228,20 +228,11 @@ $result = $node_B->safe_psql('postgres', is($result, qq(t), 'worker on node B retains conflict information'); ################################################## -# Check that the retain_conflict_info option cannot be changed for enabled -# subscriptions, and validate the WARNING and NOTICE messages during -# subscription DDLs. +# Validate the WARNING and NOTICE messages during subscription DDLs. ################################################## -# Alter retain_conflict_info for enabled subscription -my ($cmdret, $stdout, $stderr) = $node_A->psql('postgres', - "ALTER SUBSCRIPTION $subname_AB SET (retain_conflict_info = true)"); -ok( $stderr =~ - /ERROR: cannot set option \"retain_conflict_info\" for enabled subscription/, - "altering retain_conflict_info is not allowed for enabled subscription"); - # Disable the subscription -($cmdret, $stdout, $stderr) = $node_A->psql('postgres', +my ($cmdret, $stdout, $stderr) = $node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;"); ok( $stderr =~ /WARNING: deleted rows to detect conflicts would not be removed until the subscription is enabled/, @@ -251,8 +242,8 @@ ok( $stderr =~ ($cmdret, $stdout, $stderr) = $node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB SET (retain_conflict_info = true);"); ok( $stderr =~ - /NOTICE: deleted rows to detect conflicts would not be removed until the subscription is enabled/, - "altering retain_conflict_info is allowed for disabled subscription"); + /WARNING: deleted rows to detect conflicts would not be removed until the subscription is enabled/, + "A warning is raised on enabling retain_conflict_info for disabled subscription"); # Re-enable the subscription $node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); -- 2.31.1