From 66ef4d2906f1afd1c9b2d8f81b10f3bc67175709 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 20 Jun 2025 17:53:17 +0800 Subject: [PATCH v41 2/6] Introduce a new GUC 'max_conflict_retention_duration' This commit introduces a GUC option max_conflict_retention_duration, designed to prevent excessive accumulation of dead tuples when subscription with retain_conflict_info enabled is present and the the apply worker cannot catch up with the publisher's workload. If the time spent advancing non-removable transaction ID surpasses the max_conflict_retention_duration threshold, the apply worker would stop retaining information for conflict detection. The replication slot pg_conflict_detection will be invalidated if all apply workers associated with the subscription, where retain_conflict_info is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration. In this patch, a replication slot will not be automatically re-created if it becomes invalidated. Users can disable retain_conflict_info and re-enable it after confirming that the replication slot has been dropped. An upcoming patch will include support for automatic slot recreation once at least one apply worker confirms that the retention duration is within the max_conflict_retention_duration limit. To monitor worker's conflict retention status, this patch also introduces a new column 'retain_conflict_info' in the pg_stat_subscription view. This column indicates whether the apply worker is effectively retaining conflict information. The value is set to true only if retain_conflict_info is enabled for the associated subscription, and the retention duration for conflict detection by the apply worker has not exceeded max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 43 ++++++ doc/src/sgml/monitoring.sgml | 13 ++ doc/src/sgml/system-views.sgml | 11 ++ src/backend/catalog/system_views.sql | 3 +- src/backend/replication/logical/launcher.c | 117 +++++++++++++++-- src/backend/replication/logical/worker.c | 124 ++++++++++++++++-- src/backend/replication/slot.c | 13 ++ src/backend/utils/misc/guc_tables.c | 13 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 6 +- src/include/replication/logicallauncher.h | 1 + src/include/replication/slot.h | 4 +- src/include/replication/worker_internal.h | 6 + src/test/regress/expected/rules.out | 5 +- 14 files changed, 332 insertions(+), 28 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 0908f2e32f8..4c870b5e806 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5397,6 +5397,49 @@ ANY num_sync ( + max_conflict_retention_duration (integer) + + max_conflict_retention_duration configuration parameter + + + + + Maximum duration for which each apply worker can request to retain the + information useful for conflict detection when + retain_conflict_info is enabled for the associated + subscriptions. The default value is 0, indicating + that conflict information is retained until it is no longer needed for + detection purposes. If this value is specified without units, it is + taken as milliseconds. + + + The replication slot + pg_conflict_detection that used to + retain conflict information will be invalidated if all apply workers + associated with the subscriptions, where + retain_conflict_info is enabled, confirm that the + retention duration exceeded the + max_conflict_retention_duration. If the replication + slot is invalidated, you can disable + retain_conflict_info and re-enable it after + confirming this replication slot has been dropped. + + + This option is effective only if a subscription with + retain_conflict_info enabled is present, and the + associated apply worker is active. + + + + Note that setting a non-zero value for this option could lead to + conflict information being removed prematurely, potentially missing + some conflict detections. + + + + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4265a22d4de..84e5a48181c 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2114,6 +2114,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage sender; NULL for parallel apply workers + + + + retain_conflict_info boolean + + + True if retain_conflict_info + is enabled and the duration for which conflict information is + retained for conflict detection by this apply worker does not exceed + max_conflict_retention_duration; NULL for + parallel apply workers and table synchronization workers. + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 986ae1f543d..1d3ee5a9ed7 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2936,6 +2936,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx duration. + + + conflict_retention_exceeds_max_duration means that + the duration for retaining conflict information, which is used + in logical replication conflict detection, has exceeded the maximum + allowable limit. It is set only for the slot + pg_conflict_detection, which is created when + retain_conflict_info + is enabled. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ec4aa9ea7b4..4ef1ec6e668 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -988,7 +988,8 @@ CREATE VIEW pg_stat_subscription AS st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.retain_conflict_info FROM pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL) st ON (st.subid = su.oid); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 727508c0894..a988969636e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -23,6 +23,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" #include "lib/dshash.h" @@ -43,6 +44,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -51,6 +53,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_conflict_retention_duration = 0; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -103,9 +106,11 @@ static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, bool retain_conflict_info, TransactionId *xmin, - bool *can_advance_xmin); + bool *can_advance_xmin, + bool *stop_retention); static bool acquire_conflict_slot_if_exists(void); static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void invalidate_conflict_slot(void); /* @@ -468,6 +473,8 @@ retry: worker->oldest_nonremovable_xid = retain_conflict_info ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->stop_conflict_info_retention = (retain_conflict_info && + MyReplicationSlot->data.invalidated != RS_INVAL_NONE); worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -1186,6 +1193,7 @@ ApplyLauncherMain(Datum main_arg) long wait_time = DEFAULT_NAPTIME_PER_CYCLE; bool can_advance_xmin = true; bool retain_conflict_info = false; + bool stop_retention = true; TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1218,7 +1226,8 @@ ApplyLauncherMain(Datum main_arg) * has set the retain_conflict_info option. */ compute_min_nonremovable_xid(NULL, sub->retainconflictinfo, - &xmin, &can_advance_xmin); + &xmin, &can_advance_xmin, + &stop_retention); continue; } @@ -1231,7 +1240,7 @@ ApplyLauncherMain(Datum main_arg) * required for conflict detection. */ compute_min_nonremovable_xid(w, sub->retainconflictinfo, &xmin, - &can_advance_xmin); + &can_advance_xmin, &stop_retention); if (w != NULL) continue; /* worker is running already */ @@ -1280,14 +1289,19 @@ ApplyLauncherMain(Datum main_arg) } /* - * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription - * that requires us to retain the conflict information. Otherwise, if - * required, advance the slot's xmin to protect deleted tuples - * required for the conflict detection. + * Manage the replication slot based on requirements: + * - Invalidate the slot only if all workers for subscriptions with + * retain_conflict_info enabled have requested it. + * - Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription + * that requires us to retain the conflict information. + * - if required, advance the slot's xmin to protect deleted tuples + * required for the conflict detection. */ if (MyReplicationSlot) { - if (!retain_conflict_info) + if (retain_conflict_info && stop_retention) + invalidate_conflict_slot(); + else if (!retain_conflict_info) ReplicationSlotDropAcquired(); else if (can_advance_xmin) advance_conflict_slot_xmin(xmin); @@ -1328,17 +1342,21 @@ ApplyLauncherMain(Datum main_arg) * If the replication slot cannot be advanced during this cycle, due to either * a disabled subscription or an inactive worker, set *can_advance_xmin to * false. + * + * if any worker continues retaining conflict information, *stop_retention is + * set to false. */ static void compute_min_nonremovable_xid(LogicalRepWorker *worker, bool retain_conflict_info, TransactionId *xmin, - bool *can_advance_xmin) + bool *can_advance_xmin, bool *stop_retention) { - if (!retain_conflict_info || !*can_advance_xmin) + if (!retain_conflict_info) return; if (worker) { + bool stop_conflict_info_retention; TransactionId nonremovable_xid; /* @@ -1349,8 +1367,22 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, SpinLockAcquire(&worker->relmutex); nonremovable_xid = worker->oldest_nonremovable_xid; + stop_conflict_info_retention = worker->stop_conflict_info_retention; SpinLockRelease(&worker->relmutex); + /* + * Stop the conflict information retention only if all workers for + * subscriptions with retain_conflict_info enabled have requested it. + */ + *stop_retention &= stop_conflict_info_retention; + + /* + * Skip collecting oldest_nonremovable_xid for workers that have + * stopped conflict retention or if advancement is not possible. + */ + if (stop_conflict_info_retention || !*can_advance_xmin) + return; + Assert(TransactionIdIsValid(nonremovable_xid)); if (!TransactionIdIsValid(*xmin) || @@ -1379,6 +1411,12 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, * running. */ *can_advance_xmin = false; + + /* + * Stop the conflict information retention only if all workers for + * subscriptions with retain_conflict_info enabled have requested it. + */ + *stop_retention = false; } } @@ -1469,6 +1507,37 @@ advance_conflict_slot_xmin(TransactionId new_xmin) return; } +/* + * Invalidate the replication slot used to retain information useful for + * conflict detection. + */ +static void +invalidate_conflict_slot(void) +{ + Assert(MyReplicationSlot); + + /* + * Do nothing if the replication slot has already been invalidated due to + * conflict retention duration. + */ + if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) + { + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_CONFLICT_RETENTION_DURATION); + return; + } + + ReplicationSlotRelease(); + InvalidateObsoleteReplicationSlots(RS_INVAL_CONFLICT_RETENTION_DURATION, + InvalidXLogRecPtr, InvalidOid, + InvalidTransactionId); + + /* + * Acquire the invalidated slot to allow the launcher to drop it in the + * next cycle if no subscriptions have retain_conflict_info enabled. + */ + ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); +} + /* * Is current process the logical replication launcher? */ @@ -1512,7 +1581,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 10 +#define PG_STAT_GET_SUBSCRIPTION_COLS 11 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1589,6 +1658,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) elog(ERROR, "unknown worker type"); } + /* + * Only the leader apply worker manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + if (!isParallelApplyWorker(&worker) && !isTablesyncWorker(&worker)) + { + HeapTuple tup; + Form_pg_subscription subform; + + tup = SearchSysCache1(SUBSCRIPTIONOID, + ObjectIdGetDatum(worker.subid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for subscription %u", + worker.subid); + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + values[10] = subform->subretainconflictinfo && + !worker.stop_conflict_info_retention; + + ReleaseSysCache(tup); + } + else + nulls[10] = true; + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 49029638c47..b80e028eb9d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -448,6 +448,8 @@ static void request_publisher_status(RetainConflictInfoData *rci_data); static void wait_for_publisher_status(RetainConflictInfoData *rci_data, bool status_received); static void wait_for_local_flush(RetainConflictInfoData *rci_data); +static void reset_conflict_info_fields(RetainConflictInfoData *rci_data); +static bool should_stop_conflict_info_retention(RetainConflictInfoData *rci_data); static void adjust_xid_advance_interval(RetainConflictInfoData *rci_data, bool new_xid_found); @@ -3860,7 +3862,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Ensure to wake up when it's possible to advance the non-removable * transaction ID. */ - if (rci_data.phase == RCI_GET_CANDIDATE_XID && + if (!MyLogicalRepWorker->stop_conflict_info_retention && + rci_data.phase == RCI_GET_CANDIDATE_XID && rci_data.xid_advance_interval) wait_time = Min(wait_time, rci_data.xid_advance_interval); @@ -4136,6 +4139,10 @@ can_advance_nonremovable_xid(RetainConflictInfoData *rci_data) if (!MySubscription->retainconflictinfo) return false; + /* No need to advance if we have already stopped retaining */ + if (MyLogicalRepWorker->stop_conflict_info_retention) + return false; + return true; } @@ -4276,6 +4283,13 @@ wait_for_publisher_status(RetainConflictInfoData *rci_data, if (!status_received) return; + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rci_data)) + return; + if (!FullTransactionIdIsValid(rci_data->remote_wait_for)) rci_data->remote_wait_for = rci_data->remote_nextxid; @@ -4357,6 +4371,25 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) * have a WAL position greater than the rci_data->remote_lsn. */ if (!AllTablesyncsReady()) + { + /* + * Reset the timer to prevent stopping conflict info retention due to + * time consumed during table synchronization. Given that table sync + * is an infrequent operation, it merits a time adjustment. + */ + if (max_conflict_retention_duration) + rci_data->candidate_xid_time = rci_data->last_recv_time + ? rci_data->last_recv_time + : GetCurrentTimestamp(); + + return; + } + + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rci_data)) return; /* @@ -4382,7 +4415,10 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) rci_data->flushpos_update_time = rci_data->last_recv_time; } - /* Return to wait for the changes to be applied */ + /* + * Return if changes up to the remote_lsn have not been applied and + * flushed. + */ if (last_flushpos < rci_data->remote_lsn) return; @@ -4402,12 +4438,21 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); - /* - * Reset all data fields except those used to determine the timing for the - * next round of transaction ID advancement. We can even use - * flushpos_update_time in the next round to decide whether to get the - * latest flush position. - */ + reset_conflict_info_fields(rci_data); + + /* process the next phase */ + process_rci_phase_transition(rci_data, false); +} + +/* + * Reset all data fields of RetainConflictInfoData except those used to + * determine the timing for the next round of transaction ID advancement. We + * can even use flushpos_update_time in the next round to decide whether to get + * the latest flush position. + */ +static void +reset_conflict_info_fields(RetainConflictInfoData *rci_data) +{ rci_data->phase = RCI_GET_CANDIDATE_XID; rci_data->remote_lsn = InvalidXLogRecPtr; rci_data->remote_oldestxid = InvalidFullTransactionId; @@ -4415,9 +4460,64 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) rci_data->reply_time = 0; rci_data->remote_wait_for = InvalidFullTransactionId; rci_data->candidate_xid = InvalidTransactionId; +} - /* process the next phase */ - process_rci_phase_transition(rci_data, false); +/* + * Check whether conflict information retention should be stopped because the + * wait time has exceeded the maximum limit (max_conflict_retention_duration). + * + * If retention should be stopped, set + * LogicalRepWorker->stop_conflict_info_retention to true, notify the launcher to + * invalidate the slot, and return true. Return false otherwise. + * + * Currently, the retention will not resume automatically unless user manually + * disables retain_conflict_info and re-enables it after confirming that the + * replication slot has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainConflictInfoData *rci_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rci_data->candidate_xid)); + Assert(rci_data->phase == RCI_WAIT_FOR_PUBLISHER_STATUS || + rci_data->phase == RCI_WAIT_FOR_LOCAL_FLUSH); + + if (!max_conflict_retention_duration) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rci_data->last_recv_time ? rci_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return if the wait time has not exceeded the maximum limit + * (max_conflict_retention_duration). + */ + if (!TimestampDifferenceExceeds(rci_data->candidate_xid_time, now, + max_conflict_retention_duration)) + return false; + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", + MySubscription->name), + errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", + max_conflict_retention_duration)); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + MyLogicalRepWorker->stop_conflict_info_retention = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* Notify launcher to invalidate the conflict slot */ + ApplyLauncherWakeup(); + + reset_conflict_info_fields(rci_data); + + return true; } /* @@ -4450,6 +4550,10 @@ adjust_xid_advance_interval(RetainConflictInfoData *rci_data, bool new_xid_found */ rci_data->xid_advance_interval = Min(rci_data->xid_advance_interval * 2, max_interval); + + /* Ensure the wait time remains within the maximum limit */ + rci_data->xid_advance_interval = Min(rci_data->xid_advance_interval, + max_conflict_retention_duration); } else { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f07ef2b87a3..935828b08ac 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -116,6 +116,7 @@ static const SlotInvalidationCauseMap SlotInvalidationCauses[] = { {RS_INVAL_HORIZON, "rows_removed"}, {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"}, {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"}, + {RS_INVAL_CONFLICT_RETENTION_DURATION, "conflict_retention_exceeds_max_duration"}, }; /* @@ -1663,6 +1664,11 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, "idle_replication_slot_timeout"); break; } + case RS_INVAL_CONFLICT_RETENTION_DURATION: + appendStringInfo(&err_detail, + _("The duration for retaining conflict information exceeds the configured \"%s\" limit of %d milliseconds"), + "max_conflict_retention_duration", max_conflict_retention_duration); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1781,6 +1787,12 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, } } + if (possible_causes & RS_INVAL_CONFLICT_RETENTION_DURATION) + { + if (IsReservedSlotName(NameStr(s->data.name))) + return RS_INVAL_CONFLICT_RETENTION_DURATION; + } + return RS_INVAL_NONE; } @@ -2051,6 +2063,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured * "idle_replication_slot_timeout" duration. + * - RS_INVAL_CONFLICT_RETENTION_DURATION: is "pg_conflict_detection" * * Note: This function attempts to invalidate the slot for multiple possible * causes in a single pass, minimizing redundant iterations. The "cause" diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f04bfedb2fd..bd4e73099af 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3388,6 +3388,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_conflict_retention_duration", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum duration for retaining information used in logical replication conflict detection."), + NULL, + GUC_UNIT_MS + }, + &max_conflict_retention_duration, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 341f88adc87..bb6e880f2ae 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -396,6 +396,7 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers +#max_conflict_retention_duration = 0 # in milliseconds; 0 disables #------------------------------------------------------------------------------ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 359f03cd331..f566e23f785 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5695,9 +5695,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text,bool}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type,retain_conflict_info}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index b29453e8e4f..6e3007db5f0 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,7 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_conflict_retention_duration; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 9c8c7b9840b..52b5d7aa815 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -66,10 +66,12 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL = (1 << 2), /* idle slot timeout has occurred */ RS_INVAL_IDLE_TIMEOUT = (1 << 3), + /* duration of conflict info retention exceeds the maximum limit */ + RS_INVAL_CONFLICT_RETENTION_DURATION = (1 << 4), } ReplicationSlotInvalidationCause; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES 4 +#define RS_INVAL_MAX_CAUSES 5 /* * On-Disk data of a replication slot, preserved across restarts. diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 4fb317b3f85..436fe445d64 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -96,6 +96,12 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* + * Indicates whether the apply worker has stopped retaining conflict + * information. This is used only when retain_conflict_info is enabled. + */ + bool stop_conflict_info_retention; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6cf828ca8d0..79ed5233edb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2165,9 +2165,10 @@ pg_stat_subscription| SELECT su.oid AS subid, st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.retain_conflict_info FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type, retain_conflict_info) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, -- 2.30.0.windows.2