diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e0f101784c7..005098932dc 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -100,9 +100,7 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); -static void compute_min_nonremovable_xid(LogicalRepWorker *worker, - bool retain_conflict_info, - TransactionId *xmin); +static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); static void advance_conflict_slot_xmin(TransactionId new_xmin); @@ -1193,7 +1191,14 @@ ApplyLauncherMain(Datum main_arg) ALLOCSET_DEFAULT_SIZES); oldctx = MemoryContextSwitchTo(subctx); - /* Start any missing workers for enabled subscriptions. */ + /* + * Start any missing workers for enabled subscriptions. + * + * Also, during the iteration through all subscriptions, we compute + * the minimum XID required to protect deleted tuples for conflict + * detection if one of the subscription enables retain_conflict_info + * option. + */ sublist = get_subscription_list(); foreach(lc, sublist) { @@ -1242,20 +1247,25 @@ ApplyLauncherMain(Datum main_arg) w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); - /* - * Compute the minimum xmin required to protect deleted tuples - * required for conflict detection. - */ - if (can_advance_xmin) - compute_min_nonremovable_xid(w, sub->retainconflictinfo, &xmin); - if (w != NULL) - continue; /* worker is running already */ + { + /* + * Compute the minimum xmin required to protect deleted tuples + * required for conflict detection among all running apply + * workers that enables retain_conflict_info. + */ + if (sub->retainconflictinfo && can_advance_xmin) + compute_min_nonremovable_xid(w, &xmin); + + /* worker is running already */ + continue; + } /* * Can't advance xmin of the slot unless all the workers * corresponding to subscriptions with retain_conflict_info are - * running. + * running, disabling the further computation of the minimum + * nonremovable xid. */ if (sub->retainconflictinfo) can_advance_xmin = false; @@ -1350,13 +1360,11 @@ ApplyLauncherMain(Datum main_arg) * in *xmin. */ static void -compute_min_nonremovable_xid(LogicalRepWorker *worker, bool retain_conflict_info, - TransactionId *xmin) +compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) { TransactionId nonremovable_xid; - if (!retain_conflict_info || !worker) - return; + Assert(worker != NULL); /* * The replication slot for conflict detection must be created before the