From 31812fa9b922bad041ceb90a6ff6e0814a5e1f77 Mon Sep 17 00:00:00 2001 From: Daniil Davidov Date: Thu, 15 Jan 2026 23:15:48 +0700 Subject: [PATCH v30 3/5] Cost based parameters propagation for parallel autovacuum --- src/backend/commands/vacuum.c | 21 +++- src/backend/commands/vacuumparallel.c | 163 ++++++++++++++++++++++++++ src/backend/postmaster/autovacuum.c | 2 +- src/include/commands/vacuum.h | 2 + src/tools/pgindent/typedefs.list | 1 + 5 files changed, 186 insertions(+), 3 deletions(-) diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index bce3a2daa24..1b5ba3ce1ef 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -2435,8 +2435,19 @@ vacuum_delay_point(bool is_analyze) /* Always check for interrupts */ CHECK_FOR_INTERRUPTS(); - if (InterruptPending || - (!VacuumCostActive && !ConfigReloadPending)) + if (InterruptPending) + return; + + if (IsParallelWorker()) + { + /* + * Update cost-based vacuum delay parameters for a parallel autovacuum + * worker if any changes are detected. + */ + parallel_vacuum_update_shared_delay_params(); + } + + if (!VacuumCostActive && !ConfigReloadPending) return; /* @@ -2450,6 +2461,12 @@ vacuum_delay_point(bool is_analyze) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); VacuumUpdateCosts(); + + /* + * Propagate cost-based vacuum delay parameters to shared memory if + * any of them have changed during the config reload. + */ + parallel_vacuum_propagate_shared_delay_params(); } /* diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index b7ffd854009..98aeb66eec4 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -18,6 +18,13 @@ * the parallel context is re-initialized so that the same DSM can be used for * multiple passes of index bulk-deletion and index cleanup. * + * For parallel autovacuum, we need to propagate cost-based vacuum delay + * parameters from the leader to its workers, as the leader's parameters can + * change even while processing a table (e.g., due to a config reload). + * The PVSharedCostParams struct manages these parameters using a + * generation counter. Each parallel worker polls this shared state and + * refreshes its local delay parameters whenever a change is detected. + * * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -53,6 +60,31 @@ #define PARALLEL_VACUUM_KEY_WAL_USAGE 4 #define PARALLEL_VACUUM_KEY_INDEX_STATS 5 +/* + * Struct for cost-based vacuum delay related parameters to share among an + * autovacuum worker and its parallel vacuum workers. + */ +typedef struct PVSharedCostParams +{ + /* + * The generation counter is incremented by the leader process each time + * it updates the shared cost-based vacuum delay parameters. Paralell + * vacuum workers compares it with their local generation, + * shared_params_generation_local, to detect whether they need to refresh + * their local parameters. + */ + pg_atomic_uint32 generation; + + slock_t mutex; /* protects all fields below */ + + /* Parameters to share with parallel workers */ + double cost_delay; + int cost_limit; + int cost_page_dirty; + int cost_page_hit; + int cost_page_miss; +} PVSharedCostParams; + /* * Shared information among parallel workers. So this is allocated in the DSM * segment. @@ -122,6 +154,18 @@ typedef struct PVShared /* Statistics of shared dead items */ VacDeadItemsInfo dead_items_info; + + /* + * If 'true' then we are running parallel autovacuum. Otherwise, we are + * running parallel maintenence VACUUM. + */ + bool is_autovacuum; + + /* + * Struct for syncing cost-based vacuum delay parameters between + * supportive parallel autovacuum workers with leader worker. + */ + PVSharedCostParams cost_params; } PVShared; /* Status used during parallel index vacuum or cleanup */ @@ -224,6 +268,11 @@ struct ParallelVacuumState PVIndVacStatus status; }; +static PVSharedCostParams *pv_shared_cost_params = NULL; + +/* See comments in the PVSharedCostParams for the details */ +static uint32 shared_params_generation_local = 0; + static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum); static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, @@ -235,6 +284,7 @@ static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum); static void parallel_vacuum_error_callback(void *arg); +static inline void parallel_vacuum_set_cost_parameters(PVSharedCostParams *params); /* * Try to enter parallel mode and create a parallel context. Then initialize @@ -395,6 +445,21 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pg_atomic_init_u32(&(shared->active_nworkers), 0); pg_atomic_init_u32(&(shared->idx), 0); + shared->is_autovacuum = AmAutoVacuumWorkerProcess(); + + /* + * Initialize shared cost-based vacuum delay parameters if it's for + * autovacuum. + */ + if (shared->is_autovacuum) + { + parallel_vacuum_set_cost_parameters(&shared->cost_params); + pg_atomic_init_u32(&shared->cost_params.generation, 0); + SpinLockInit(&shared->cost_params.mutex); + + pv_shared_cost_params = &(shared->cost_params); + } + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; @@ -460,6 +525,9 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) DestroyParallelContext(pvs->pcxt); ExitParallelMode(); + if (AmAutoVacuumWorkerProcess()) + pv_shared_cost_params = NULL; + pfree(pvs->will_parallel_vacuum); pfree(pvs); } @@ -537,6 +605,95 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats); } +/* + * Fill in the given structure with cost-based vacuum delay parameter values. + */ +static inline void +parallel_vacuum_set_cost_parameters(PVSharedCostParams *params) +{ + params->cost_delay = vacuum_cost_delay; + params->cost_limit = vacuum_cost_limit; + params->cost_page_dirty = VacuumCostPageDirty; + params->cost_page_hit = VacuumCostPageHit; + params->cost_page_miss = VacuumCostPageMiss; +} + +/* + * Updates the cost-based vacuum delay parameters for parallel autovacuum + * workers. + * + * For non-autovacuum parallel worker this function will have no effect. + */ +void +parallel_vacuum_update_shared_delay_params(void) +{ + uint32 params_generation; + + Assert(IsParallelWorker()); + + /* Quick return if the wokrer is not running for the autovacuum */ + if (pv_shared_cost_params == NULL) + return; + + params_generation = pg_atomic_read_u32(&pv_shared_cost_params->generation); + Assert(shared_params_generation_local <= params_generation); + + /* Return if parameters had not changed in the leader */ + if (params_generation == shared_params_generation_local) + return; + + SpinLockAcquire(&pv_shared_cost_params->mutex); + VacuumCostDelay = pv_shared_cost_params->cost_delay; + VacuumCostLimit = pv_shared_cost_params->cost_limit; + VacuumCostPageDirty = pv_shared_cost_params->cost_page_dirty; + VacuumCostPageHit = pv_shared_cost_params->cost_page_hit; + VacuumCostPageMiss = pv_shared_cost_params->cost_page_miss; + SpinLockRelease(&pv_shared_cost_params->mutex); + + VacuumUpdateCosts(); + + shared_params_generation_local = params_generation; +} + +/* + * Store the cost-based vacuum delay parameters in the shared memory so that + * parallel vacuum workers can consume them (see + * parallel_vacuum_update_shared_delay_params()). + */ +void +parallel_vacuum_propagate_shared_delay_params(void) +{ + Assert(AmAutoVacuumWorkerProcess()); + + /* + * Quick return if the leader process is not sharing the delay parameters. + */ + if (pv_shared_cost_params == NULL) + return; + + /* + * Check if any delay parameters has changed. We can read them without + * locks as only the leader can modify them. + */ + if (vacuum_cost_delay == pv_shared_cost_params->cost_delay && + vacuum_cost_limit == pv_shared_cost_params->cost_limit && + VacuumCostPageDirty == pv_shared_cost_params->cost_page_dirty && + VacuumCostPageHit == pv_shared_cost_params->cost_page_hit && + VacuumCostPageMiss == pv_shared_cost_params->cost_page_miss) + return; + + /* Update the shared delay parameters */ + SpinLockAcquire(&pv_shared_cost_params->mutex); + parallel_vacuum_set_cost_parameters(pv_shared_cost_params); + SpinLockRelease(&pv_shared_cost_params->mutex); + + /* + * Increment the generation of the parameters, i.e. let parallel workers + * know that they should re-read shared cost params. + */ + pg_atomic_fetch_add_u32(&pv_shared_cost_params->generation, 1); +} + /* * Compute the number of parallel worker processes to request. Both index * vacuum and index cleanup can be executed with parallel workers. @@ -1078,6 +1235,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) VacuumSharedCostBalance = &(shared->cost_balance); VacuumActiveNWorkers = &(shared->active_nworkers); + if (shared->is_autovacuum) + pv_shared_cost_params = &(shared->cost_params); + /* Set parallel vacuum state */ pvs.indrels = indrels; pvs.nindexes = nindexes; @@ -1127,6 +1287,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) vac_close_indexes(nindexes, indrels, RowExclusiveLock); table_close(rel, ShareUpdateExclusiveLock); FreeAccessStrategy(pvs.bstrategy); + + if (shared->is_autovacuum) + pv_shared_cost_params = NULL; } /* diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index e810e1303db..f0535a0997f 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -1659,7 +1659,7 @@ VacuumUpdateCosts(void) } else { - /* Must be explicit VACUUM or ANALYZE */ + /* Must be explicit VACUUM or ANALYZE or parallel autovacuum worker */ vacuum_cost_delay = VacuumCostDelay; vacuum_cost_limit = VacuumCostLimit; } diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 953a506181e..cc154737115 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -423,6 +423,8 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool estimated_count, PVWorkerStats *wstats); +extern void parallel_vacuum_update_shared_delay_params(void); +extern void parallel_vacuum_propagate_shared_delay_params(void); extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a4a2ed07816..d5c7b91e167 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2090,6 +2090,7 @@ PVIndStats PVIndVacStatus PVOID PVShared +PVSharedCostParams PVWorkerUsage PVWorkerStats PX_Alias -- 2.43.0