From 3346ed95bff15607e90a69741b0fcc5b90aff9c9 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Wed, 23 Oct 2019 10:46:49 +0530 Subject: [PATCH v1] divide vacuum cost limit --- src/backend/access/heap/vacuumlazy.c | 119 ++++++++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 2faa4e9..73b6af7 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -137,6 +137,7 @@ #define PARALLEL_VACUUM_KEY_SHARED 1 #define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 +#define PARALLEL_VACUUM_KEY_COST_BALANCE 4 /* * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION disables the leader's @@ -227,6 +228,14 @@ typedef struct LVShared } LVShared; #define SizeOfLVShared offsetof(LVShared, indstats) + sizeof(LVSharedIndStats) +typedef struct LVCostBalance +{ + pg_atomic_uint32 nslot; + int nworkers; + int vaccostbalance[FLEXIBLE_ARRAY_MEMBER]; +} LVCostBalance; +#define SizeOfLVCostBalance offsetof(LVCostBalance, vaccostbalance) + sizeof(int) + /* Struct for parallel lazy vacuum */ typedef struct LVParallelState { @@ -235,6 +244,8 @@ typedef struct LVParallelState /* Shared information among parallel vacuum workers */ LVShared *lvshared; + /* Shared cost balance. */ + LVCostBalance *lvcostbalance; /* * Always true except for a debugging case where * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION are defined. @@ -1927,6 +1938,31 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup) return false; } +static void +compute_cost_balance(LVParallelState *lps) +{ + int i; + + /* + * Share the estimated worker counts so that each worker can compute their + * cost limit. Include the leader if it is participating in the index + * vacuum phase. + * XXX: Actual worker launched might be lesser than the estimated worker so + * in that case each worker might operate with less vacuum cost limit. + */ + lps->lvcostbalance->nworkers = lps->pcxt->nworkers; + if (lps->leaderparticipates) + lps->lvcostbalance->nworkers += 1; + + /* + * Divide the current cost balance among the worker so that we don't loose + * accounting of the I/O balance so far. + */ + for (i = 0; i < lps->pcxt->nworkers; i++) + lps->lvcostbalance->vaccostbalance[i] = + VacuumCostBalance / lps->lvcostbalance->nworkers; +} + /* * Vacuuming indexes with parallel vacuum workers. This function must be used * by the parallel vacuum leader process. @@ -1936,6 +1972,8 @@ lazy_parallel_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel, int nindexes, IndexBulkDeleteResult **stats, LVParallelState *lps) { + int i; + Assert(!IsParallelWorker()); Assert(ParallelVacuumIsActive(lps)); Assert(nindexes > 0); @@ -1950,6 +1988,9 @@ lazy_parallel_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel, lps->lvshared->reltuples = vacrelstats->old_live_tuples; lps->lvshared->estimated_count = true; + /* Compute cost balance for the workers. */ + compute_cost_balance(lps); + LaunchParallelWorkers(lps->pcxt); ereport(elevel, @@ -1963,14 +2004,38 @@ lazy_parallel_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel, * does that in case where no workers launched. */ if (lps->leaderparticipates || lps->pcxt->nworkers_launched == 0) + { + int base_cost_limit = VacuumCostLimit; + + /* + * If leader is participating and we have launched the parallel workers + * then compute the leaders share of the cost limit and cost balance. + */ + if (lps->pcxt->nworkers_launched > 0) + { + VacuumCostLimit /= lps->lvcostbalance->nworkers; + VacuumCostBalance /= lps->lvcostbalance->nworkers; + } + vacuum_or_cleanup_indexes_worker(Irel, nindexes, stats, lps->lvshared, vacrelstats->dead_tuples); + VacuumCostLimit = base_cost_limit; + } /* Wait for all vacuum workers to finish */ WaitForParallelWorkersToFinish(lps->pcxt); /* Reset the processing count */ pg_atomic_write_u32(&(lps->lvshared->nprocessed), 0); + pg_atomic_write_u32(&(lps->lvcostbalance->nslot), 0); + + /* + * Index vacuuming phase is complete, so collect the remaining balance from + * all the worker and add to the current balance of the leader. So that we + * don't loose the accounting for the extra I/O balance of the workers. + */ + for (i = 0; i < lps->pcxt->nworkers_launched; i++) + VacuumCostBalance += lps->lvcostbalance->vaccostbalance[i]; /* * Reinitialize the parallel context to relaunch parallel workers @@ -1988,6 +2053,8 @@ lazy_parallel_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, int nindexes, IndexBulkDeleteResult **stats, LVParallelState *lps) { + int i; + Assert(!IsParallelWorker()); Assert(ParallelVacuumIsActive(lps)); Assert(nindexes > 0); @@ -2004,6 +2071,9 @@ lazy_parallel_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, lps->lvshared->estimated_count = (vacrelstats->tupcount_pages < vacrelstats->rel_pages); + /* Compute cost balance for the workers. */ + compute_cost_balance(lps); + LaunchParallelWorkers(lps->pcxt); ereport(elevel, @@ -2017,12 +2087,34 @@ lazy_parallel_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, * that in case where no workers launched. */ if (lps->leaderparticipates || lps->pcxt->nworkers_launched == 0) + { + int base_cost_limit = VacuumCostLimit; + + /* + * If leader is participating and we have launched the parallel workers + * then compute the leaders share of the cost limit and cost balance. + */ + if (lps->pcxt->nworkers_launched > 0) + { + VacuumCostLimit /= lps->lvcostbalance->nworkers; + VacuumCostBalance /= lps->lvcostbalance->nworkers; + } + vacuum_or_cleanup_indexes_worker(Irel, nindexes, stats, lps->lvshared, vacrelstats->dead_tuples); + VacuumCostLimit = base_cost_limit; + } /* Wait for all vacuum workers to finish */ WaitForParallelWorkersToFinish(lps->pcxt); + /* + * Index vacuuming phase is complete, so collect the remaining balance from + * all the worker and add to the current balance of the leader. So that we + * don't loose the accounting for the extra I/O balance of the workers. + */ + for (i = 0; i < lps->pcxt->nworkers_launched; i++) + VacuumCostBalance += lps->lvcostbalance->vaccostbalance[i]; /* * We don't need to reinitialize the parallel context unlike parallel index @@ -2937,10 +3029,12 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks, LVShared *shared; ParallelContext *pcxt; LVDeadTuples *tidmap; + LVCostBalance *costbalance; long maxtuples; char *sharedquery; Size est_shared; Size est_deadtuples; + Size est_costbalance; int querylen; int i; @@ -3019,6 +3113,14 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks, memcpy(sharedquery, debug_query_string, querylen + 1); sharedquery[querylen] = '\0'; shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); + + /* Vacuum cost balance. */ + est_costbalance = MAXALIGN(add_size(SizeOfLVCostBalance, + mul_size(sizeof(int), nrequested))); + costbalance = (LVCostBalance *) shm_toc_allocate(pcxt->toc, est_costbalance); + pg_atomic_init_u32(&(costbalance->nslot), 0); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_COST_BALANCE, costbalance); + lps->lvcostbalance = costbalance; return lps; } @@ -3084,8 +3186,10 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) Relation *indrels; LVShared *lvshared; LVDeadTuples *dead_tuples; + LVCostBalance *costbalance; int nindexes; char *sharedquery; + int slot; IndexBulkDeleteResult **stats; lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, @@ -3118,6 +3222,11 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, false); + + costbalance = (LVCostBalance *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_COST_BALANCE, + false); + slot = pg_atomic_fetch_add_u32(&(costbalance->nslot), 1); /* Set cost-based vacuum delay */ VacuumCostActive = (VacuumCostDelay > 0); @@ -3126,13 +3235,21 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) VacuumPageMiss = 0; VacuumPageDirty = 0; + /* Compute the vacuum cost limit for the worker. */ + VacuumCostLimit = VacuumCostLimit / costbalance->nworkers; + VacuumCostBalance = costbalance->vaccostbalance[slot]; + stats = (IndexBulkDeleteResult **) palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); /* Do either vacuuming indexes or cleaning indexes */ vacuum_or_cleanup_indexes_worker(indrels, nindexes, stats, lvshared, dead_tuples); - + /* + * Share the remaining balance with the leader so that we don't loose + * accounting for the same. + */ + costbalance->vaccostbalance[slot] = VacuumCostBalance; vac_close_indexes(nindexes, indrels, RowExclusiveLock); table_close(onerel, ShareUpdateExclusiveLock); } -- 1.8.3.1