From 44889addf51f8e0a3491c17ae454305e2a5e1376 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 23 Jul 2025 13:38:12 +0800 Subject: [PATCH v59] Introduce a new GUC 'max_conflict_retention_duration' This commit introduces a GUC option max_conflict_retention_duration, designed to prevent excessive accumulation of dead tuples when subscription with retain_dead_tuples enabled is present and the apply worker cannot catch up with the publisher's workload. If the time spent advancing non-removable transaction ID surpasses the max_conflict_retention_duration threshold, the apply worker would stop retaining information for conflict detection. The replication slot pg_conflict_detection.xmin will be set to InvalidTransactionId if all apply workers associated with the subscription, where retain_dead_tuples is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration. In this patch, a replication slot will not be automatically re-initialized. Users can disable retain_dead_tuples and re-enable it after confirming that the replication slot has been dropped. An upcoming patch will include support for automatic slot re-initialization once at least one apply worker confirms that the retention duration is within the max_conflict_retention_duration limit. To monitor worker's conflict retention status, this patch also introduces a new column 'dead_tuple_retention_active' in the pg_stat_subscription view. This column indicates whether the apply worker is effectively retaining conflict information. The value is set to true only if dead_tuple_retention_active is enabled for the associated subscription, and the retention duration for conflict detection by the apply worker has not exceeded max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 41 ++++ doc/src/sgml/monitoring.sgml | 13 ++ src/backend/catalog/system_views.sql | 3 +- src/backend/replication/logical/launcher.c | 46 +++-- src/backend/replication/logical/tablesync.c | 22 +-- src/backend/replication/logical/worker.c | 177 ++++++++++++++++-- src/backend/utils/misc/guc_tables.c | 13 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 6 +- src/include/replication/logicallauncher.h | 1 + src/include/replication/worker_internal.h | 10 +- src/test/regress/expected/rules.out | 5 +- src/test/subscription/t/035_conflicts.pl | 10 +- 13 files changed, 305 insertions(+), 43 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 20ccb2d6b54..9b59c0fd5b9 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5399,6 +5399,47 @@ ANY num_sync ( + max_conflict_retention_duration (integer) + + max_conflict_retention_duration configuration parameter + + + + + Maximum duration for which each apply worker is allowed to retain the + information useful for conflict detection when + retain_dead_tuples is enabled for the associated + subscriptions. The default value is 0, indicating + that the information is retained until it is no longer needed for + detection purposes. If this value is specified without units, it is + taken as milliseconds. + + + The information useful for conflict detection is no longer retained if + all apply workers associated with the subscriptions, where + retain_dead_tuples is enabled, confirm that the + retention duration exceeded the + max_conflict_retention_duration. To re-enable + retention, you can disable retain_dead_tuples for all + subscriptions and re-enable it after confirming this replication slot has + been dropped. + + + This option is effective only if a subscription with + retain_dead_tuples enabled is present, and the + associated apply worker is active. + + + + Note that setting a non-zero value for this option could lead to + conflict information being removed prematurely, potentially missing + some conflict detections. + + + + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3f4a27a736e..e103bfd732d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2114,6 +2114,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage sender; NULL for parallel apply workers + + + + dead_tuple_retention_active boolean + + + True if retain_dead_tuples + is enabled and the duration for which information useful for conflict + detection is retained by this apply worker does not exceed + ; NULL for + parallel apply workers and table synchronization workers. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 1b3c5a55882..f654af0717e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -996,7 +996,8 @@ CREATE VIEW pg_stat_subscription AS st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.dead_tuple_retention_active FROM pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL) st ON (st.subid = su.oid); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 37377f7eb63..c5002effaee 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -23,6 +23,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" #include "lib/dshash.h" @@ -43,6 +44,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -51,6 +53,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_conflict_retention_duration = 0; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -102,7 +105,7 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void update_conflict_slot_xmin(TransactionId new_xmin); /* @@ -998,7 +1001,7 @@ ApplyLauncherShmemInit(void) LogicalRepWorker *worker = &LogicalRepCtx->workers[slot]; memset(worker, 0, sizeof(LogicalRepWorker)); - SpinLockInit(&worker->relmutex); + SpinLockInit(&worker->mutex); } } } @@ -1320,13 +1323,18 @@ ApplyLauncherMain(Datum main_arg) * that requires us to retain dead tuples. Otherwise, if required, * advance the slot's xmin to protect dead tuples required for the * conflict detection. + * + * However, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to cease retention, the + * new xmin will be set to InvalidTransactionId. We then update + * slot.xmin accordingly to permit the removal of dead tuples. */ if (MyReplicationSlot) { if (!retain_dead_tuples) ReplicationSlotDropAcquired(); else if (can_advance_xmin) - advance_conflict_slot_xmin(xmin); + update_conflict_slot_xmin(xmin); } /* Switch back to original memory context. */ @@ -1374,11 +1382,16 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) */ Assert(MyReplicationSlot); - SpinLockAcquire(&worker->relmutex); + SpinLockAcquire(&worker->mutex); nonremovable_xid = worker->oldest_nonremovable_xid; - SpinLockRelease(&worker->relmutex); + SpinLockRelease(&worker->mutex); - Assert(TransactionIdIsValid(nonremovable_xid)); + /* + * Skip collecting oldest_nonremovable_xid for workers that have stopped + * conflict retention. + */ + if (!TransactionIdIsValid(nonremovable_xid)) + return; if (!TransactionIdIsValid(*xmin) || TransactionIdPrecedes(nonremovable_xid, *xmin)) @@ -1402,17 +1415,17 @@ acquire_conflict_slot_if_exists(void) } /* - * Advance the xmin the replication slot used to retain information required + * Update the xmin the replication slot used to retain information required * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +update_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(!TransactionIdIsValid(new_xmin) || + TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ + /* Return if the xmin value of the slot cannot be updated */ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) return; @@ -1518,7 +1531,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 10 +#define PG_STAT_GET_SUBSCRIPTION_COLS 11 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1595,6 +1608,15 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) elog(ERROR, "unknown worker type"); } + /* + * Only the leader apply worker manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + if (!isParallelApplyWorker(&worker) && !isTablesyncWorker(&worker)) + values[10] = TransactionIdIsValid(worker.oldest_nonremovable_xid); + else + nulls[10] = true; + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d3356bc84ee..1ab5496f63f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -293,7 +293,7 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn) @@ -305,7 +305,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * UpdateSubscriptionRelState must be called within a transaction. @@ -390,7 +390,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) finish_sync_worker(); } else - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); } /* @@ -534,7 +534,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (syncworker) { /* Found one, update our copy of its state */ - SpinLockAcquire(&syncworker->relmutex); + SpinLockAcquire(&syncworker->mutex); rstate->state = syncworker->relstate; rstate->lsn = syncworker->relstate_lsn; if (rstate->state == SUBREL_STATE_SYNCWAIT) @@ -547,7 +547,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) syncworker->relstate_lsn = Max(syncworker->relstate_lsn, current_lsn); } - SpinLockRelease(&syncworker->relmutex); + SpinLockRelease(&syncworker->mutex); /* If we told worker to catch up, wait for it. */ if (rstate->state == SUBREL_STATE_SYNCWAIT) @@ -1342,10 +1342,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = relstate; MyLogicalRepWorker->relstate_lsn = relstate_lsn; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * If synchronization is already done or no longer necessary, exit now @@ -1428,10 +1428,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) goto copy_table_done; } - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* Update the state and make it visible to others. */ StartTransactionCommand(); @@ -1586,10 +1586,10 @@ copy_table_done: /* * We are done with the initial data synchronization, update the state. */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; MyLogicalRepWorker->relstate_lsn = *origin_startpos; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * Finally, wait until the leader apply worker tells us to catch up and diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0fdc5de57ba..6b5e218f603 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -415,6 +415,9 @@ typedef struct RetainDeadTuplesData * updated in final phase * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + long table_sync_wait_time; /* time spent waiting for table sync + * to finish */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -555,6 +558,8 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data); static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); +static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -3220,6 +3225,7 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, { TransactionId oldestxmin; ReplicationSlot *slot; + bool stop_retention; /* * Return false if either dead tuples are not retained or commit timestamp @@ -3228,6 +3234,42 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, if (!MySubscription->retaindeadtuples || !track_commit_timestamp) return false; + /* + * Check whether the leader apply worker has stopped retaining information + * for detecting conflicts. + */ + if (am_leader_apply_worker()) + { + stop_retention = + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid); + } + else + { + LogicalRepWorker *leader; + + /* + * Obtain the information from the leader apply worker as only the + * leader manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + leader = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + + SpinLockAcquire(&leader->mutex); + stop_retention = !TransactionIdIsValid(leader->oldest_nonremovable_xid); + SpinLockRelease(&leader->mutex); + LWLockRelease(LogicalRepWorkerLock); + } + + /* + * Return false if the leader apply worker has stopped retaining + * information for detecting conflicts. This implies that update_deleted + * can no longer be reliably detected. + */ + if (stop_retention) + return false; + /* * For conflict detection, we use the conflict slot's xmin value instead * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as @@ -3254,7 +3296,15 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, oldestxmin = slot->data.xmin; SpinLockRelease(&slot->mutex); - Assert(TransactionIdIsValid(oldestxmin)); + /* + * Return false if the conflict detection slot.xmin is set to + * InvalidTransactionId. This situation arises if the current worker is + * either a table synchronization or parallel apply worker, and the leader + * stopped retention immediately after checking the + * oldest_nonremovable_xid above. + */ + if (!TransactionIdIsValid(oldestxmin)) + return false; if (OidIsValid(localidxoid) && IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) @@ -4110,7 +4160,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Ensure to wake up when it's possible to advance the non-removable * transaction ID. */ - if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid) && + rdt_data.phase == RDT_GET_CANDIDATE_XID && rdt_data.xid_advance_interval) wait_time = Min(wait_time, rdt_data.xid_advance_interval); @@ -4325,6 +4376,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; + /* No need to advance if we have already stopped retaining */ + if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + return false; + return true; } @@ -4468,6 +4523,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, if (!status_received) return; + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) + return; + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) rdt_data->remote_wait_for = rdt_data->remote_nextxid; @@ -4549,6 +4611,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * have a WAL position greater than the rdt_data->remote_lsn. */ if (!AllTablesyncsReady()) + { + TimestampTz now; + + now = rdt_data->last_recv_time + ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Record the time spent waiting for table sync, it is needed for the + * timeout check in should_stop_conflict_info_retention(). + */ + rdt_data->table_sync_wait_time = + TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now); + + return; + } + + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) return; /* @@ -4583,9 +4666,9 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * transactions up to that position on the publisher have been applied and * flushed locally. So, we can advance the non-removable transaction ID. */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", LSN_FORMAT_ARGS(rdt_data->remote_lsn), @@ -4594,12 +4677,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); - /* - * Reset all data fields except those used to determine the timing for the - * next round of transaction ID advancement. We can even use - * flushpos_update_time in the next round to decide whether to get the - * latest flush position. - */ + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Reset all data fields of RetainDeadTuplesData except those used to + * determine the timing for the next round of transaction ID advancement. We + * can even use flushpos_update_time in the next round to decide whether to get + * the latest flush position. + */ +static void +reset_retention_data_fields(RetainDeadTuplesData *rdt_data) +{ rdt_data->phase = RDT_GET_CANDIDATE_XID; rdt_data->remote_lsn = InvalidXLogRecPtr; rdt_data->remote_oldestxid = InvalidFullTransactionId; @@ -4607,9 +4699,66 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) rdt_data->reply_time = 0; rdt_data->remote_wait_for = InvalidFullTransactionId; rdt_data->candidate_xid = InvalidTransactionId; + rdt_data->table_sync_wait_time = 0; +} - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); +/* + * Check whether conflict information retention should be stopped because the + * wait time has exceeded the maximum limit (max_conflict_retention_duration). + * + * If retention should be stopped, set LogicalRepWorker->oldest_nonremovable_xid + * to InvalidTransactionId, notify the launcher to set the slot.xmin to + * InvalidTransactionId as well, and return true. Return false otherwise. + * + * Currently, the retention will not resume automatically unless user manually + * disables retain_dead_tuples and re-enables it after confirming that the + * replication slot has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rdt_data->candidate_xid)); + Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || + rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); + + if (!max_conflict_retention_duration) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return if the wait time has not exceeded the maximum limit + * (max_conflict_retention_duration). The time spent waiting for table + * synchronization is not counted, as it's an infrequent operation. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + max_conflict_retention_duration + + rdt_data->table_sync_wait_time)) + return false; + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", + MySubscription->name), + errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", + max_conflict_retention_duration)); + + SpinLockAcquire(&MyLogicalRepWorker->mutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->mutex); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + + reset_retention_data_fields(rdt_data); + + return true; } /* @@ -4642,6 +4791,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) */ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, max_interval); + + /* Ensure the wait time remains within the maximum limit */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + max_conflict_retention_duration); } else { diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index d14b1678e7f..580762c6a00 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3388,6 +3388,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_conflict_retention_duration", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum duration for retaining information used in logical replication conflict detection."), + NULL, + GUC_UNIT_MS + }, + &max_conflict_retention_duration, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index a9d8293474a..45c9e0a16a0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -396,6 +396,7 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers +#max_conflict_retention_duration = 0 # in milliseconds; 0 disables #------------------------------------------------------------------------------ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 118d6da1ace..3810f3883b7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5696,9 +5696,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text,bool}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type,dead_tuple_retention_active}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index b29453e8e4f..6e3007db5f0 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,7 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_conflict_retention_duration; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7c0204dd6f4..9c0c2b8050c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -64,7 +64,12 @@ typedef struct LogicalRepWorker Oid relid; char relstate; XLogRecPtr relstate_lsn; - slock_t relmutex; + + /* + * Spinlock used to protect table synchronization information and the + * oldest_nonremovable_xid. + */ + slock_t mutex; /* * Used to create the changes and subxact files for the streaming @@ -94,6 +99,9 @@ typedef struct LogicalRepWorker * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to * decide when to advance the xmin value of the slot. + * + * This ID would be set to InvalidTransactionId if the apply worker has + * stopped retaining information useful for conflict detection. */ TransactionId oldest_nonremovable_xid; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 35e8aad7701..183fc193ad3 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2174,9 +2174,10 @@ pg_stat_subscription| SELECT su.oid AS subid, st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.dead_tuple_retention_active FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type, dead_tuple_retention_active) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 36aeb14c563..c1b8ede81cb 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -214,6 +214,10 @@ ok( $node_B->poll_query_until( ), "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); +my $result = $node_B->safe_psql('postgres', + "SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_BA';"); +is($result, qq(t), 'worker on node B retains conflict information'); + ################################################## # Check that the retain_dead_tuples option can be enabled only for disabled # subscriptions. Validate the NOTICE message during the subscription DDL, and @@ -254,6 +258,10 @@ ok( $node_A->poll_query_until( ), "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); +$result = $node_A->safe_psql('postgres', + "SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_AB';"); +is($result, qq(t), 'worker on node A retains conflict information'); + ################################################## # Check the WARNING when changing the origin to ANY, if retain_dead_tuples is # enabled. This warns of the possibility of receiving changes from origins @@ -281,7 +289,7 @@ $node_A->psql('postgres', $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); $node_A->wait_for_catchup($subname_BA); -my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); is($result, qq(1|1 2|2), 'check replicated insert on node B'); -- 2.50.1.windows.1