From cbd5b287964e9527963f9cc9547dfbe7ed99fa2e Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 20 Feb 2025 14:53:43 +0800 Subject: [PATCH v28 4/4] Introduce a new GUC 'max_conflict_retention_duration' This commit introduces a GUC option max_conflict_retention_duration, designed to prevent excessive accumulation of dead tuples when subscription with retain_conflict_info enabled is present and the the apply worker cannot catch up with the publisher's workload. If the time spent advancing non-removable transaction ID surpasses the max_conflict_retention_duration threshold, the apply worker would stop retaining information for conflict detection. The replication slot pg_conflict_detection will be invalidated if all apply workers associated with the subscription, where retain_conflict_info is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration. In this patch, a replication slot will not be automatically re-created if it becomes invalidated. Users can disable retain_conflict_info and re-enable it after confirming that the replication slot has been dropped. An upcoming patch will include support for automatic slot recreation once at least one apply worker confirms that the retention duration is within the max_conflict_retention_duration limit. To monitor worker's conflict retention status, this patch also introduces a new column 'retain_conflict_info' in the pg_stat_subscription view. This column indicates whether the apply worker is effectively retaining conflict information. The value is set to true only if retain_conflict_info is enabled for the associated subscription, and the retention duration for conflict detection by the apply worker has not exceeded max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 41 +++++ doc/src/sgml/monitoring.sgml | 13 ++ doc/src/sgml/system-views.sgml | 11 ++ src/backend/catalog/system_views.sql | 3 +- src/backend/replication/logical/launcher.c | 80 +++++++++- src/backend/replication/logical/worker.c | 144 ++++++++++++++++-- src/backend/replication/slot.c | 13 ++ src/backend/utils/misc/guc_tables.c | 13 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 6 +- src/include/replication/logicallauncher.h | 1 + src/include/replication/slot.h | 4 +- src/include/replication/worker_internal.h | 6 + src/test/regress/expected/rules.out | 5 +- 14 files changed, 312 insertions(+), 29 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 1041819b500..6decdc89eef 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5215,6 +5215,47 @@ ANY num_sync ( + max_conflict_retention_duration (integer) + + max_conflict_retention_duration configuration parameter + + + + + Maximum duration (in milliseconds) for which conflict + information can be retained for conflict detection by the apply worker. + The default value is 0, indicating that conflict + information is retained until it is no longer needed for detection + purposes. + + + The replication slot + pg_conflict_detection that used to + retain conflict information will be invalidated if all apply workers + associated with the subscription, where + retain_conflict_info is enabled, confirm that the + retention duration exceeded the + max_conflict_retention_duration. If the replication + slot is invalidated, you can disable + retain_conflict_info and re-enable it after + confirming this replication slot has been dropped. + + + This option is effective only if a subscription with + retain_conflict_info enabled is present, and the + associated apply worker is active. + + + + Note that setting a non-zero value for this option could lead to + conflict information being removed prematurely, potentially missing + some conflict detections. + + + + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 71c4f96d054..f5c1bba496a 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2101,6 +2101,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage sender; NULL for parallel apply workers + + + + retain_conflict_info boolean + + + True if retain_conflict_info + is enabled and the duration for which conflict information is + retained for conflict detection by this apply worker does not exceed + max_conflict_retention_duration; NULL for + parallel apply workers and table synchronization workers. + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 3f5a306247e..8e8bef7c801 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2626,6 +2626,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx duration. + + + conflict_retention_exceeds_max_duration means that + the duration for retaining conflict information, which is used + in logical replication conflict detection, has exceeded the maximum + allowable limit. It is set only for the slot + pg_conflict_detection, which is created when + retain_conflict_info + is enabled. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8afbba16f8b..a0695ac7247 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -980,7 +980,8 @@ CREATE VIEW pg_stat_subscription AS st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.retain_conflict_info FROM pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL) st ON (st.subid = su.oid); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index c176bc0d87d..af4cd864d97 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -23,6 +23,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" #include "lib/dshash.h" @@ -42,6 +43,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -62,6 +64,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_conflict_retention_duration = 0; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -458,6 +461,7 @@ retry: worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; worker->oldest_nonremovable_xid = InvalidFullTransactionId; + worker->stop_conflict_retention = false; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -1170,7 +1174,8 @@ ApplyLauncherMain(Datum main_arg) MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; bool can_advance_xmin = true; - bool retain_conflict_info = false; + int nretain_conflict_info = 0; + int nstop_retention = 0; FullTransactionId xmin = InvalidFullTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1199,7 +1204,7 @@ ApplyLauncherMain(Datum main_arg) */ if (sub->retainconflictinfo) { - retain_conflict_info = true; + nretain_conflict_info++; can_advance_xmin &= sub->enabled; /* @@ -1225,22 +1230,32 @@ ApplyLauncherMain(Datum main_arg) * the new xmin for advancing the replication slot used in * conflict detection. */ - if (sub->retainconflictinfo && can_advance_xmin) + if (sub->retainconflictinfo) { FullTransactionId nonremovable_xid; + bool stop_conflict_retention; SpinLockAcquire(&w->relmutex); nonremovable_xid = w->oldest_nonremovable_xid; + stop_conflict_retention = w->stop_conflict_retention; SpinLockRelease(&w->relmutex); + /* + * Skip collecting oldest_nonremovable_xid for workers + * that have stopped conflict retention. + */ + if (stop_conflict_retention) + nstop_retention++; + /* * Stop advancing xmin if an invalid non-removable * transaction ID is found, otherwise update xmin. */ - if (!FullTransactionIdIsValid(nonremovable_xid)) + else if (!FullTransactionIdIsValid(nonremovable_xid)) can_advance_xmin = false; - else if (!FullTransactionIdIsValid(xmin) || - FullTransactionIdPrecedes(nonremovable_xid, xmin)) + else if (can_advance_xmin && + (!FullTransactionIdIsValid(xmin) || + FullTransactionIdPrecedes(nonremovable_xid, xmin))) xmin = nonremovable_xid; } @@ -1285,12 +1300,35 @@ ApplyLauncherMain(Datum main_arg) } } + /* + * Do nothing if the replication slot is invalidated due to conflict + * retention duration. + */ + if (nretain_conflict_info && + MyReplicationSlot->data.invalidated != RS_INVAL_NONE) + { + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_CONFLICT_RETENTION_DURATION); + } + + /* + * Invalidate the conflict slot if all workers with + * retain_conflict_info enabled have stopped further conflict + * retention. + */ + else if (nstop_retention && nretain_conflict_info == nstop_retention) + { + ReplicationSlotRelease(); + InvalidateObsoleteReplicationSlots(RS_INVAL_CONFLICT_RETENTION_DURATION, + InvalidXLogRecPtr, InvalidOid, + InvalidTransactionId); + } + /* * Maintain the xmin value of the replication slot for conflict * detection if needed, and update the sleep time before the next * attempt. */ - if (retain_conflict_info) + else if (nretain_conflict_info) { bool updated = false; @@ -1518,7 +1556,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 10 +#define PG_STAT_GET_SUBSCRIPTION_COLS 11 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1595,6 +1633,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) elog(ERROR, "unknown worker type"); } + /* + * Only the leader apply worker manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + if (!isParallelApplyWorker(&worker) && !isTablesyncWorker(&worker)) + { + HeapTuple tup; + Form_pg_subscription subform; + + tup = SearchSysCache1(SUBSCRIPTIONOID, + ObjectIdGetDatum(worker.subid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for subscription %u", + worker.subid); + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + values[10] = subform->subretainconflictinfo && + !worker.stop_conflict_retention; + + ReleaseSysCache(tup); + } + else + nulls[10] = true; + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 923df059ff5..6b1746e0e2b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,6 +173,7 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" @@ -453,6 +454,7 @@ static void wait_for_publisher_status(RetainConflictInfoData *data, static void wait_for_local_flush(RetainConflictInfoData *data); static void adjust_xid_advance_interval(RetainConflictInfoData *data, bool new_xid_found); +static void update_conflict_retention_status(void); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -3866,7 +3868,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Ensure to wake up when it's possible to attempt to advance the * non-removable transaction ID. */ - if (data.phase == RCI_GET_CANDIDATE_XID && data.xid_advance_interval) + if (!MyLogicalRepWorker->stop_conflict_retention && + data.phase == RCI_GET_CANDIDATE_XID && data.xid_advance_interval) wait_time = Min(wait_time, data.xid_advance_interval); rc = WaitLatchOrSocket(MyLatch, @@ -4123,6 +4126,10 @@ maybe_advance_nonremovable_xid(RetainConflictInfoData *data, if (!am_leader_apply_worker()) return; + /* Exit early if we have already stopped retaining */ + if (MyLogicalRepWorker->stop_conflict_retention) + return; + switch (data->phase) { case RCI_GET_CANDIDATE_XID: @@ -4285,6 +4292,8 @@ wait_for_publisher_status(RetainConflictInfoData *data, bool status_received) static void wait_for_local_flush(RetainConflictInfoData *data) { + bool stop_conflict_retention = false; + Assert(!XLogRecPtrIsInvalid(data->remote_lsn) && FullTransactionIdIsValid(data->candidate_xid)); @@ -4346,22 +4355,72 @@ wait_for_local_flush(RetainConflictInfoData *data) data->flushpos_update_time = data->last_recv_time; } - /* Return to wait for the changes to be applied */ + /* Check if changes up to the remote_lsn have been applied and flushed */ if (last_flushpos < data->remote_lsn) - return; + { + TimestampTz now; - /* - * Reaching here means the remote WAL position has been received, and all - * transactions up to that position on the publisher have been applied and - * flushed locally. So, we can advance the non-removable transaction ID. - */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not + * available, obtain the current timestamp. + */ + now = data->last_recv_time ? data->last_recv_time : GetCurrentTimestamp(); + + /* + * If the wait time has not exceeded the maximum limit + * (max_conflict_retention_duration), continue waiting for the changes + * to be applied. Otherwise, stop tracking the non-removable + * transaction ID by this apply worker. + */ + if (!max_conflict_retention_duration || + !TimestampDifferenceExceeds(data->candidate_xid_time, now, + max_conflict_retention_duration)) + return; - elog(DEBUG2, "confirmed remote flush up to %X/%X: new oldest_nonremovable_xid %u", - LSN_FORMAT_ARGS(data->remote_lsn), - XidFromFullTransactionId(data->candidate_xid)); + stop_conflict_retention = true; + } + + if (!stop_conflict_retention) + { + /* + * Reaching here means the remote WAL position has been received, and + * all transactions up to that position on the publisher have been + * applied and flushed locally. So, we can advance the non-removable + * transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + elog(DEBUG2, "confirmed remote flush up to %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(data->remote_lsn), + XidFromFullTransactionId(data->candidate_xid)); + } + else + { + /* + * Reaching here means the time spent applying changes up to the + * remote_lsn has exceeded the maximum allowed limit + * (max_conflict_retention_duration). So, we will stop retaining + * conflict information. + * + * Currently, the retention will not resume automatically unless user + * manually disable retain_conflict_info and re-enable it after + * confirming that the replication slot has been dropped. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidFullTransactionId; + MyLogicalRepWorker->stop_conflict_retention = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", + MySubscription->name), + errdetail("The time spent applying changes up to LSN %X/%X has exceeded the maximum limit of %u ms.", + LSN_FORMAT_ARGS(data->remote_lsn), + max_conflict_retention_duration)); + } /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); @@ -4425,6 +4484,51 @@ adjust_xid_advance_interval(RetainConflictInfoData *data, bool new_xid_found) } } +/* + * Update the conflict retention status for the current apply worker. It checks + * whether the worker should stop retaining conflict information due to + * invalidation of the replication slot ("pg_conflict_detection"). + * + * Currently, the replication slot is invalidated only if the duration for + * retaining conflict information exceeds the allowed maximum. + */ +static void +update_conflict_retention_status(void) +{ + ReplicationSlotInvalidationCause cause = RS_INVAL_NONE; + ReplicationSlot *slot; + + /* Exit early if retaining conflict information is not required */ + if (!MySubscription->retainconflictinfo) + return; + + /* + * Only the leader apply worker manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + if (!am_leader_apply_worker()) + return; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, false); + + if (slot) + { + SpinLockAcquire(&slot->mutex); + cause = slot->data.invalidated; + SpinLockRelease(&slot->mutex); + + Assert(cause == RS_INVAL_NONE || cause == RS_INVAL_CONFLICT_RETENTION_DURATION); + } + + LWLockRelease(ReplicationSlotControlLock); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->stop_conflict_retention = cause != RS_INVAL_NONE; + SpinLockRelease(&MyLogicalRepWorker->relmutex); +} + /* * Exit routine for apply workers due to subscription parameter changes. */ @@ -4596,6 +4700,16 @@ maybe_reread_subscription(void) CommitTransactionCommand(); MySubscriptionValid = true; + + /* + * Update worker status to avoid unnecessary conflict retention if the + * replication slot ("pg_conflict_detection") was invalidated prior to + * enabling the retain_conflict_info option. This is also necessary to + * restart conflict retention if the user has disabled and subsequently + * re-enabled the retain_conflict_info option, resulting in the + * replication slot being recreated. + */ + update_conflict_retention_status(); } /* @@ -5234,6 +5348,8 @@ InitializeLogRepWorker(void) MySubscription->name))); CommitTransactionCommand(); + + update_conflict_retention_status(); } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e0a53031895..0eedc3016bf 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -116,6 +116,7 @@ static const SlotInvalidationCauseMap SlotInvalidationCauses[] = { {RS_INVAL_HORIZON, "rows_removed"}, {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"}, {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"}, + {RS_INVAL_CONFLICT_RETENTION_DURATION, "conflict_retention_exceeds_max_duration"}, }; /* @@ -1610,6 +1611,11 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, "idle_replication_slot_timeout"); break; } + case RS_INVAL_CONFLICT_RETENTION_DURATION: + appendStringInfo(&err_detail, + _("The duration for retaining conflict information exceeds the configured \"%s\" limit of %d milliseconds"), + "max_conflict_retention_duration", max_conflict_retention_duration); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1728,6 +1734,12 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, } } + if (possible_causes & RS_INVAL_CONFLICT_RETENTION_DURATION) + { + if (IsReservedSlotName(NameStr(s->data.name))) + return RS_INVAL_CONFLICT_RETENTION_DURATION; + } + return RS_INVAL_NONE; } @@ -1995,6 +2007,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured * "idle_replication_slot_timeout" duration. + * - RS_INVAL_CONFLICT_RETENTION_DURATION: is "pg_conflict_detection" * * Note: This function attempts to invalidate the slot for multiple possible * causes in a single pass, minimizing redundant iterations. The "cause" diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 3cde94a1759..745a46a989e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3311,6 +3311,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_conflict_retention_duration", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum duration for retaining information used in logical replication conflict detection."), + NULL, + GUC_UNIT_MS + }, + &max_conflict_retention_duration, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 415f253096c..5b008e43556 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -381,6 +381,7 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers +#max_conflict_retention_duration = 0 # in milliseconds; 0 disables #------------------------------------------------------------------------------ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9e803d610d7..f388205d741 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5633,9 +5633,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text,bool}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type,retain_conflict_info}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 7b29f1814db..d347d3d67f6 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,7 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_conflict_retention_duration; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index e4f1e69cb6b..155ee176e0b 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -66,10 +66,12 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL = (1 << 2), /* idle slot timeout has occurred */ RS_INVAL_IDLE_TIMEOUT = (1 << 3), + /* duration of conflict info retention exceeds the maximum limit */ + RS_INVAL_CONFLICT_RETENTION_DURATION = (1 << 4), } ReplicationSlotInvalidationCause; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES 4 +#define RS_INVAL_MAX_CAUSES 5 /* * On-Disk data of a replication slot, preserved across restarts. diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b09486017f4..40469f2df28 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -103,6 +103,12 @@ typedef struct LogicalRepWorker */ FullTransactionId oldest_nonremovable_xid; + /* + * Indicates whether the apply worker has stopped retaining conflict + * information. This is used only when retain_conflict_info is enabled. + */ + bool stop_conflict_retention; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5baba8d39ff..892ebb1136e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2144,9 +2144,10 @@ pg_stat_subscription| SELECT su.oid AS subid, st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.retain_conflict_info FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type, retain_conflict_info) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, -- 2.30.0.windows.2