From 55b76f15bbc3991b7457de6c1d6998d39b16292c Mon Sep 17 00:00:00 2001 From: Daniil Davidov Date: Fri, 16 May 2025 11:58:40 +0700 Subject: [PATCH v7 1/2] Parallel index autovacuum with bgworkers --- src/backend/access/common/reloptions.c | 12 ++ src/backend/access/heap/vacuumlazy.c | 6 +- src/backend/commands/vacuumparallel.c | 57 ++++++-- src/backend/postmaster/autovacuum.c | 135 +++++++++++++++++- src/backend/utils/init/globals.c | 1 + src/backend/utils/misc/guc_tables.c | 10 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/miscadmin.h | 1 + src/include/postmaster/autovacuum.h | 4 + src/include/utils/guc_hooks.h | 2 + src/include/utils/rel.h | 2 + 11 files changed, 220 insertions(+), 11 deletions(-) diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 50747c16396..e36d59f632b 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -222,6 +222,16 @@ static relopt_int intRelOpts[] = }, SPGIST_DEFAULT_FILLFACTOR, SPGIST_MIN_FILLFACTOR, 100 }, + { + { + "parallel_autovacuum_workers", + "Maximum number of parallel autovacuum workers that can be taken from bgworkers pool for processing this table. " + "If value is 0 then parallel degree will computed based on number of indexes.", + RELOPT_KIND_HEAP, + ShareUpdateExclusiveLock + }, + -1, -1, 1024 + }, { { "autovacuum_vacuum_threshold", @@ -1872,6 +1882,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) {"fillfactor", RELOPT_TYPE_INT, offsetof(StdRdOptions, fillfactor)}, {"autovacuum_enabled", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, enabled)}, + {"parallel_autovacuum_workers", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, parallel_autovacuum_workers)}, {"autovacuum_vacuum_threshold", RELOPT_TYPE_INT, offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_threshold)}, {"autovacuum_vacuum_max_threshold", RELOPT_TYPE_INT, diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 14036c27e87..7e0ae0184aa 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -3477,6 +3477,10 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; + int elevel = AmAutoVacuumWorkerProcess() || + vacrel->verbose ? + INFO : DEBUG2; + /* * Initialize state for a parallel vacuum. As of now, only one worker can * be used for an index, so we invoke parallelism only if there are at @@ -3503,7 +3507,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, vacrel->nindexes, nworkers, vac_work_mem, - vacrel->verbose ? INFO : DEBUG2, + elevel, vacrel->bstrategy); /* diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 0feea1d30ec..6ec610e29e4 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -1,7 +1,9 @@ /*------------------------------------------------------------------------- * * vacuumparallel.c - * Support routines for parallel vacuum execution. + * Support routines for parallel vacuum and autovacuum execution. In the + * future comments, the word "vacuum" will refer to both vacuum and + * autovacuum. * * This file contains routines that are intended to support setting up, using, * and tearing down a ParallelVacuumState. @@ -34,6 +36,7 @@ #include "executor/instrument.h" #include "optimizer/paths.h" #include "pgstat.h" +#include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" @@ -371,10 +374,12 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shared->relid = RelationGetRelid(rel); shared->elevel = elevel; shared->queryid = pgstat_get_my_query_id(); + shared->maintenance_work_mem_worker = (nindexes_mwm > 0) ? - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : - maintenance_work_mem; + vac_work_mem / Min(parallel_workers, nindexes_mwm) : + vac_work_mem; + shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024; /* Prepare DSA space for dead items */ @@ -435,6 +440,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) { + int nlaunched_workers; + Assert(!IsParallelWorker()); /* Copy the updated statistics */ @@ -453,7 +460,13 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) TidStoreDestroy(pvs->dead_items); + nlaunched_workers = pvs->pcxt->nworkers_launched; /* remember this value */ DestroyParallelContext(pvs->pcxt); + + /* Release all launched (i.e. reserved) parallel autovacuum workers. */ + if (AmAutoVacuumWorkerProcess()) + AutoVacuumReleaseParallelWorkers(nlaunched_workers); + ExitParallelMode(); pfree(pvs->will_parallel_vacuum); @@ -553,12 +566,17 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, int nindexes_parallel_bulkdel = 0; int nindexes_parallel_cleanup = 0; int parallel_workers; + int max_parallel_workers; + + max_parallel_workers = AmAutoVacuumWorkerProcess() ? + autovacuum_max_parallel_workers : + max_parallel_maintenance_workers; /* * We don't allow performing parallel operation in standalone backend or * when parallelism is disabled. */ - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) + if (!IsUnderPostmaster || max_parallel_workers == 0) return 0; /* @@ -597,8 +615,8 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, parallel_workers = (nrequested > 0) ? Min(nrequested, nindexes_parallel) : nindexes_parallel; - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + /* Cap by GUC variable */ + parallel_workers = Min(parallel_workers, max_parallel_workers); return parallel_workers; } @@ -646,6 +664,13 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan */ nworkers = Min(nworkers, pvs->pcxt->nworkers); + /* + * Also reserve workers in autovacuum global state. Note, that we may be + * given fewer workers than we requested. + */ + if (AmAutoVacuumWorkerProcess() && nworkers > 0) + nworkers = AutoVacuumReserveParallelWorkers(nworkers); + /* * Set index vacuum status and mark whether parallel vacuum worker can * process it. @@ -690,6 +715,16 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan LaunchParallelWorkers(pvs->pcxt); + if (AmAutoVacuumWorkerProcess() && + pvs->pcxt->nworkers_launched < nworkers) + { + /* + * Tell autovacuum that we could not launch all the previously + * reserved workers. + */ + AutoVacuumReleaseParallelWorkers(nworkers - pvs->pcxt->nworkers_launched); + } + if (pvs->pcxt->nworkers_launched > 0) { /* @@ -709,13 +744,19 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", "launched %d parallel vacuum workers for index vacuuming (planned: %d)", pvs->pcxt->nworkers_launched), - pvs->pcxt->nworkers_launched, nworkers))); + pvs->pcxt->nworkers_launched, nworkers), + AmAutoVacuumWorkerProcess() ? + errhint("workers were launched for parallel autovacuum") : + errhint("workers were launched for parallel vacuum"))); else ereport(pvs->shared->elevel, (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", "launched %d parallel vacuum workers for index cleanup (planned: %d)", pvs->pcxt->nworkers_launched), - pvs->pcxt->nworkers_launched, nworkers))); + pvs->pcxt->nworkers_launched, nworkers), + AmAutoVacuumWorkerProcess() ? + errhint("workers were launched for parallel autovacuum") : + errhint("workers were launched for parallel vacuum"))); } /* Vacuum the indexes that can be processed by only leader process */ diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 9474095f271..98609ac8f8f 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -285,6 +285,7 @@ typedef struct AutoVacuumWorkItem * av_workItems work item array * av_nworkersForBalance the number of autovacuum workers to use when * calculating the per worker cost limit + * av_freeParallelWorkers the number of free parallel autovacuum workers * * This struct is protected by AutovacuumLock, except for av_signal and parts * of the worker list (see above). @@ -299,6 +300,7 @@ typedef struct WorkerInfo av_startingWorker; AutoVacuumWorkItem av_workItems[NUM_WORKITEMS]; pg_atomic_uint32 av_nworkersForBalance; + uint32 av_freeParallelWorkers; } AutoVacuumShmemStruct; static AutoVacuumShmemStruct *AutoVacuumShmem; @@ -354,6 +356,7 @@ static void autovac_report_workitem(AutoVacuumWorkItem *workitem, static void avl_sigusr2_handler(SIGNAL_ARGS); static bool av_worker_available(void); static void check_av_worker_gucs(void); +static void check_parallel_av_gucs(int prev_max_parallel_workers); @@ -753,7 +756,9 @@ ProcessAutoVacLauncherInterrupts(void) if (ConfigReloadPending) { int autovacuum_max_workers_prev = autovacuum_max_workers; + int autovacuum_max_parallel_workers_prev; + autovacuum_max_parallel_workers_prev = autovacuum_max_parallel_workers; ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); @@ -769,6 +774,14 @@ ProcessAutoVacLauncherInterrupts(void) if (autovacuum_max_workers_prev != autovacuum_max_workers) check_av_worker_gucs(); + /* + * If autovacuum_max_parallel_workers changed, we must take care of + * the correct value of available parallel autovacuum workers in shmem. + */ + if (autovacuum_max_parallel_workers_prev != + autovacuum_max_parallel_workers) + check_parallel_av_gucs(autovacuum_max_parallel_workers_prev); + /* rebuild the list in case the naptime changed */ rebuild_database_list(InvalidOid); } @@ -2847,8 +2860,12 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map, */ tab->at_params.index_cleanup = VACOPTVALUE_UNSPECIFIED; tab->at_params.truncate = VACOPTVALUE_UNSPECIFIED; - /* As of now, we don't support parallel vacuum for autovacuum */ - tab->at_params.nworkers = -1; + + /* Decide whether we need to process indexes of table in parallel. */ + tab->at_params.nworkers = avopts + ? avopts->parallel_autovacuum_workers + : -1; + tab->at_params.freeze_min_age = freeze_min_age; tab->at_params.freeze_table_age = freeze_table_age; tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age; @@ -3329,6 +3346,64 @@ AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId, return result; } +/* + * In order to meet the 'autovacuum_max_parallel_workers' limit, leader worker + * must call this function. It returns the number of parallel workers that + * actually can be launched and reserves (if any) these workers in global + * autovacuum state. + * + * NOTE: We will try to provide as many workers as requested, even if caller + * will occupy all available workers. + */ +int +AutoVacuumReserveParallelWorkers(int nworkers) +{ + int can_launch; + + /* Only leader worker can call this function. */ + Assert(AmAutoVacuumWorkerProcess() && !IsParallelWorker()); + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + /* Provide as many workers as we can. */ + can_launch = Min(AutoVacuumShmem->av_freeParallelWorkers, nworkers); + AutoVacuumShmem->av_freeParallelWorkers -= nworkers; + + LWLockRelease(AutovacuumLock); + return can_launch; +} + +/* + * When parallel autovacuum worker die, leader worker must call this function + * in order to refresh global autovacuum state. Thus, other leaders will be able + * to use these workers. + * + * 'nworkers' - how many workers caller wants to release. + */ +void +AutoVacuumReleaseParallelWorkers(int nworkers) +{ + /* Only leader worker can call this function. */ + Assert(AmAutoVacuumWorkerProcess() && !IsParallelWorker()); + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + AutoVacuumShmem->av_freeParallelWorkers += nworkers; + + /* + * If autovacuum_max_parallel_workers variable was reduced during parallel + * autovacuum execution, we must cap available workers number by its new + * value. + */ + if (AutoVacuumShmem->av_freeParallelWorkers > + autovacuum_max_parallel_workers) + { + AutoVacuumShmem->av_freeParallelWorkers = + autovacuum_max_parallel_workers; + } + + LWLockRelease(AutovacuumLock); +} + /* * autovac_init * This is called at postmaster initialization. @@ -3389,6 +3464,8 @@ AutoVacuumShmemInit(void) Assert(!found); AutoVacuumShmem->av_launcherpid = 0; + AutoVacuumShmem->av_freeParallelWorkers = + autovacuum_max_parallel_workers; dclist_init(&AutoVacuumShmem->av_freeWorkers); dlist_init(&AutoVacuumShmem->av_runningWorkers); AutoVacuumShmem->av_startingWorker = NULL; @@ -3439,6 +3516,15 @@ check_autovacuum_work_mem(int *newval, void **extra, GucSource source) return true; } +bool +check_autovacuum_max_parallel_workers(int *newval, void **extra, + GucSource source) +{ + if (*newval >= max_worker_processes) + return false; + return true; +} + /* * Returns whether there is a free autovacuum worker slot available. */ @@ -3470,3 +3556,48 @@ check_av_worker_gucs(void) errdetail("The server will only start up to \"autovacuum_worker_slots\" (%d) autovacuum workers at a given time.", autovacuum_worker_slots))); } + +/* + * Make sure that number of available parallel workers corresponds to the + * autovacuum_max_parallel_workers parameter (after it was changed). + */ +static void +check_parallel_av_gucs(int prev_max_parallel_workers) +{ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + if (AutoVacuumShmem->av_freeParallelWorkers > + autovacuum_max_parallel_workers) + { + Assert(prev_max_parallel_workers > autovacuum_max_parallel_workers); + + /* + * Number of available workers must not exceed limit. + * + * Note, that if some parallel autovacuum workers are running at this + * moment, available workers number will not exceed limit after + * releasing them (see ParallelAutoVacuumReleaseWorkers). + */ + AutoVacuumShmem->av_freeParallelWorkers = + autovacuum_max_parallel_workers; + } + else if ((AutoVacuumShmem->av_freeParallelWorkers < + autovacuum_max_parallel_workers) && + (autovacuum_max_parallel_workers > prev_max_parallel_workers)) + { + /* + * If user wants to increase number of parallel autovacuum workers, we + * must increase number of available workers in shmem. + */ + AutoVacuumShmem->av_freeParallelWorkers += + (autovacuum_max_parallel_workers - prev_max_parallel_workers); + + /* + * Nothing to do when autovacuum_max_parallel_workers < + * prev_max_parallel_workers. Available workers number will be capped + * inside ParallelAutoVacuumReleaseWorkers. + */ + } + + LWLockRelease(AutovacuumLock); +} diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index d31cb45a058..977644978c1 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -143,6 +143,7 @@ int NBuffers = 16384; int MaxConnections = 100; int max_worker_processes = 8; int max_parallel_workers = 8; +int autovacuum_max_parallel_workers = 0; int MaxBackends = 0; /* GUC parameters for vacuum */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index d14b1678e7f..b6a192af8f8 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3604,6 +3604,16 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"autovacuum_max_parallel_workers", PGC_SIGHUP, VACUUM_AUTOVACUUM, + gettext_noop("Maximum number of parallel autovacuum workers, that can be taken from bgworkers pool."), + gettext_noop("This parameter is capped by \"max_worker_processes\" (not by \"autovacuum_max_workers\"!)."), + }, + &autovacuum_max_parallel_workers, + 0, 0, MAX_BACKENDS, + check_autovacuum_max_parallel_workers, NULL, NULL + }, + { {"max_parallel_maintenance_workers", PGC_USERSET, RESOURCES_WORKER_PROCESSES, gettext_noop("Sets the maximum number of parallel processes per maintenance operation."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index a9d8293474a..bbf5307000f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -683,6 +683,7 @@ autovacuum_worker_slots = 16 # autovacuum worker slots to allocate # (change requires restart) #autovacuum_max_workers = 3 # max number of autovacuum subprocesses +#autovacuum_max_parallel_workers = 0 # disabled by default and limited by max_worker_processes #autovacuum_naptime = 1min # time between autovacuum runs #autovacuum_vacuum_threshold = 50 # min number of row updates before # vacuum diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 1bef98471c3..85926415657 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -177,6 +177,7 @@ extern PGDLLIMPORT int MaxBackends; extern PGDLLIMPORT int MaxConnections; extern PGDLLIMPORT int max_worker_processes; extern PGDLLIMPORT int max_parallel_workers; +extern PGDLLIMPORT int autovacuum_max_parallel_workers; extern PGDLLIMPORT int commit_timestamp_buffers; extern PGDLLIMPORT int multixact_member_buffers; diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index e8135f41a1c..863d206f2bd 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -64,6 +64,10 @@ pg_noreturn extern void AutoVacWorkerMain(const void *startup_data, size_t start extern bool AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId, BlockNumber blkno); +/* parallel autovacuum stuff */ +extern int AutoVacuumReserveParallelWorkers(int nworkers); +extern void AutoVacuumReleaseParallelWorkers(int nworkers); + /* shared memory stuff */ extern Size AutoVacuumShmemSize(void); extern void AutoVacuumShmemInit(void); diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 82ac8646a8d..b45023a90b2 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -31,6 +31,8 @@ extern void assign_application_name(const char *newval, void *extra); extern const char *show_archive_command(void); extern bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source); +extern bool check_autovacuum_max_parallel_workers(int *newval, void **extra, + GucSource source); extern bool check_vacuum_buffer_usage_limit(int *newval, void **extra, GucSource source); extern bool check_backtrace_functions(char **newval, void **extra, diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index b552359915f..29c32f75780 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -311,6 +311,8 @@ typedef struct ForeignKeyCacheInfo typedef struct AutoVacOpts { bool enabled; + int parallel_autovacuum_workers; /* max number of parallel + autovacuum workers */ int vacuum_threshold; int vacuum_max_threshold; int vacuum_ins_threshold; -- 2.43.0