From 6011d15f89f450fd753d8dd4252e90b992523f42 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 27 Feb 2025 13:41:35 -0800 Subject: [PATCH v11 5/5] Support parallelism for collecting dead items during lazy vacuum. This feature allows the vacuum to leverage multiple CPUs in order to collect dead items (i.e. the first pass over heap table) with parallel workers. The parallel degree for parallel heap vacuuming is determined based on the number of blocks to vacuum unless PARALLEL option of VACUUM command is specified, and further limited by max_parallel_maintenance_workers. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch-through: --- doc/src/sgml/ref/vacuum.sgml | 54 +- src/backend/access/heap/heapam_handler.c | 4 + src/backend/access/heap/vacuumlazy.c | 875 ++++++++++++++++++++--- src/backend/access/table/tableam.c | 15 + src/backend/commands/vacuumparallel.c | 23 +- src/include/access/heapam.h | 12 + src/include/access/tableam.h | 4 + src/include/commands/vacuum.h | 2 + src/test/regress/expected/vacuum.out | 9 + src/test/regress/sql/vacuum.sql | 11 + src/tools/pgindent/typedefs.list | 4 + 11 files changed, 910 insertions(+), 103 deletions(-) diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml index 971b1237d47..9d73f6074de 100644 --- a/doc/src/sgml/ref/vacuum.sgml +++ b/doc/src/sgml/ref/vacuum.sgml @@ -279,25 +279,41 @@ VACUUM [ ( option [, ...] ) ] [ PARALLEL - Perform index vacuum and index cleanup phases of VACUUM - in parallel using integer - background workers (for the details of each vacuum phase, please - refer to ). The number of workers used - to perform the operation is equal to the number of indexes on the - relation that support parallel vacuum which is limited by the number of - workers specified with PARALLEL option if any which is - further limited by . - An index can participate in parallel vacuum if and only if the size of the - index is more than . - Please note that it is not guaranteed that the number of parallel workers - specified in integer will be - used during execution. It is possible for a vacuum to run with fewer - workers than specified, or even with no workers at all. Only one worker - can be used per index. So parallel workers are launched only when there - are at least 2 indexes in the table. Workers for - vacuum are launched before the start of each phase and exit at the end of - the phase. These behaviors might change in a future release. This - option can't be used with the FULL option. + Perform scanning heap, index vacuum, and index cleanup phases of + VACUUM in parallel using + integer background workers + (for the details of each vacuum phase, please refer to + ). + + + For heap tables, the number of workers used to perform the scanning + heap is determined based on the size of table. A table can participate in + parallel scanning heap if and only if the size of the table is more than + . During scanning heap, + the heap table's blocks will be divided into ranges and shared among the + cooperating processes. Each worker process will complete the scanning of + its given range of blocks before requesting an additional range of blocks. + + + The number of workers used to perform parallel index vacuum and index + cleanup is equal to the number of indexes on the relation that support + parallel vacuum. An index can participate in parallel vacuum if and only + if the size of the index is more than . + Only one worker can be used per index. So parallel workers for index vacuum + and index cleanup are launched only when there are at least 2 + indexes in the table. + + + Workers for vacuum are launched before the start of each phase and exit + at the end of the phase. The number of workers for each phase is limited by + the number of workers specified with PARALLEL option if + any which is futher limited by . + Please note that in any parallel vacuum phase, it is not guaanteed that the + number of parallel workers specified in integer + will be used during execution. It is possible for a vacuum to run with fewer + workers than specified, or even with no workers at all. These behaviors might + change in a future release. This option can't be used with the FULL + option. diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index b5a756802e9..a337b847997 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2691,6 +2691,10 @@ static const TableAmRoutine heapam_methods = { .scan_sample_next_tuple = heapam_scan_sample_next_tuple, .parallel_vacuum_compute_workers = heap_parallel_vacuum_compute_workers, + .parallel_vacuum_estimate = heap_parallel_vacuum_estimate, + .parallel_vacuum_initialize = heap_parallel_vacuum_initialize, + .parallel_vacuum_initialize_worker = heap_parallel_vacuum_initialize_worker, + .parallel_vacuum_collect_dead_items = heap_parallel_vacuum_collect_dead_items, }; diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 8632ee9cc43..6c54305545f 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -99,6 +99,34 @@ * After pruning and freezing, pages that are newly all-visible and all-frozen * are marked as such in the visibility map. * + * Parallel Lazy Heap Scanning: + * + * Lazy vacuum on heap tables supports parallel processing for phase I and + * phase II. Before starting phase I, we initialize parallel vacuum state, + * ParallelVacuumState, and allocate the TID store in a DSA area if we can + * use parallel mode for any of these two phases. + * + * We could require different number of parallel vacuum workers for each phase + * for various factors such as table size and number of indexes. Parallel + * workers are launched at the beginning of each phase and exit at the end of + * each phase. + * + * For scanning the heap table with parallel workers, we utilize the + * table_block_parallelscan_xxx facility which splits the table into several + * chunks and parallel workers allocate chunks to scan. If the TID store is + * close to overrunning the available space during phase I, parallel workers + * exit and leader process gathers the scan results. Then, it performs a index + * vacuuming that could also use the parallelism. After vacuuming both indexes + * and heap table, the leader process vacuums FSM to make newly-freed space + * visible. Then, it relaunches parallel workers to resume the scanning heap + * phase with parallel workers again. In order to be able to resume the parallel + * heap scan from the previous status, the workers' parallel scan descriptions + *are stored in the shared memory (DSM) space to share among parallel workers. + * If the leader could launch fewer workers to resume the parallel heap scan, + * some blocks are remained as un-scanned. The leader process serially deals + * with such blocks at the end of scanning heap phase (see + * parallel_heap_complete_unfinished_scan()). + * * Dead TID Storage: * * The major space usage for vacuuming is storage for the dead tuple IDs that @@ -147,6 +175,7 @@ #include "common/pg_prng.h" #include "executor/instrument.h" #include "miscadmin.h" +#include "optimizer/paths.h" #include "pgstat.h" #include "portability/instr_time.h" #include "postmaster/autovacuum.h" @@ -214,11 +243,21 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) +/* + * DSM keys for parallel heap vacuum. Unlike other parallel execution code, we + * we don't need to worry about DSM keys conflicting with plan_node_id, but need to + * avoid conflicting with DSM keys used in vacuumparallel.c. + */ +#define LV_PARALLEL_KEY_SHARED 0xFFFF0001 +#define LV_PARALLEL_KEY_SCANDESC 0xFFFF0002 +#define LV_PARALLEL_KEY_WORKER_SCANSTATE 0xFFFF0003 + /* * Macro to check if we are in a parallel vacuum. If true, we are in the * parallel mode and the DSM segment is initialized. */ #define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL) +#define ParallelHeapVacuumIsActive(vacrel) ((vacrel)->plvstate != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -307,6 +346,87 @@ typedef struct LVScanData bool skippedallvis; } LVScanData; +/* + * Struct for information that needs to be shared among parallel workers + * for parallel heap vacuum. + */ +typedef struct PLVShared +{ + bool aggressive; + bool skipwithvm; + + /* The current oldest extant XID/MXID shared by the leader process */ + TransactionId NewRelfrozenXid; + MultiXactId NewRelminMxid; + + /* VACUUM operation's cutoffs for freezing and pruning */ + struct VacuumCutoffs cutoffs; + GlobalVisState vistest; + + /* Per-worker scan data for parallel lazy heap scan */ + LVScanData worker_scandata[FLEXIBLE_ARRAY_MEMBER]; +} PLVShared; +#define SizeOfPLVShared (offsetof(PLVShared, worker_scandata)) + +/* Per-worker scan state for parallel heap vacuum */ +typedef struct PLVScanWorkerState +{ + /* Has this worker data been initialized? */ + bool inited; + + /* per-worker parallel table scan state */ + ParallelBlockTableScanWorkerData pbscanwork; + + /* + * True if a parallel vacuum scan worker allocated blocks in state but + * might have not scanned all of them. The leader process will take over + * for scanning these remaining blocks. + */ + bool maybe_have_unprocessed_blocks; + + /* Last block number the worker scanned */ + BlockNumber last_blkno; +} PLVScanWorkerState; + +/* + * Struct to store the parallel lazy scan state. + */ +typedef struct PLVState +{ + /* Parallel scan description shared among parallel workers */ + ParallelBlockTableScanDesc pbscan; + + /* Shared information */ + PLVShared *shared; + + /* Scan state for parallel heap vacuum */ + PLVScanWorkerState *scanstate; +} PLVState; + +/* + * Struct for leader in parallel heap vacuum. + */ +typedef struct PLVLeader +{ + /* Shared memory size for each shared object */ + Size pbscan_len; + Size shared_len; + Size scanstates_len; + + int nworkers_launched; + + /* + * Points to all per-worker scan states stored on DSM area. + * + * During parallel heap scan, each worker allocates some chunks of blocks + * to scan in its scan state, and could exit while leaving some chunks + * un-scanned if the size of dead_items TIDs is close to overrunning the + * the available space. We store the scan states on shared memory area so + * that workers can resume heap scans from the previous point. + */ + PLVScanWorkerState *scanstates; +} PLVLeader; + typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -369,6 +489,9 @@ typedef struct LVRelState /* Instrumentation counters */ int num_index_scans; + /* Next block to check for FSM vacuum */ + BlockNumber next_fsm_block_to_vacuum; + /* State maintained by heap_vac_scan_next_block() */ BlockNumber current_block; /* last block returned */ BlockNumber next_unskippable_block; /* next unskippable block */ @@ -376,6 +499,16 @@ typedef struct LVRelState bool next_unskippable_eager_scanned; /* if it was eagerly scanned */ Buffer next_unskippable_vmbuffer; /* buffer containing its VM bit */ + /* Fields used for parallel heap vacuum */ + + /* Parallel lazy vacuum working state */ + PLVState *plvstate; + + /* + * The leader state for parallel heap vacuum. NULL for parallel workers. + */ + PLVLeader *leader; + /* State related to managing eager scanning of all-visible pages */ /* @@ -435,12 +568,14 @@ typedef struct LVSavedErrInfo /* non-export function prototypes */ static void lazy_scan_heap(LVRelState *vacrel); +static bool do_lazy_scan_heap(LVRelState *vacrel); static void heap_vacuum_eager_scan_setup(LVRelState *vacrel, VacuumParams *params); static BlockNumber heap_vac_scan_next_block(ReadStream *stream, void *callback_private_data, void *per_buffer_data); -static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis); +static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis, + BlockNumber start_blk, BlockNumber end_blk); static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno, Page page, bool sharelock, Buffer vmbuffer); @@ -451,6 +586,12 @@ static void lazy_scan_prune(LVRelState *vacrel, Buffer buf, static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf, BlockNumber blkno, Page page, bool *has_lpdead_items); +static void do_parallel_lazy_scan_heap(LVRelState *vacrel); +static BlockNumber parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel); +static void complete_unfinihsed_lazy_scan_heap(LVRelState *vacrel); +static void parallel_lazy_scan_heap_begin(LVRelState *vacrel); +static void parallel_lazy_scan_heap_end(LVRelState *vacrel); +static void parallel_lazy_scan_gather_scan_results(LVRelState *vacrel); static void lazy_vacuum(LVRelState *vacrel); static bool lazy_vacuum_all_indexes(LVRelState *vacrel); static void lazy_vacuum_heap_rel(LVRelState *vacrel); @@ -530,6 +671,22 @@ heap_vacuum_eager_scan_setup(LVRelState *vacrel, VacuumParams *params) if (vacrel->aggressive) return; + /* + * Disable eager scanning if parallel heap vacuum is enabled. + * + * One might think that it would make sense to use the eager scanning even + * during parallel heap scanning, but parallel vacuum is available only in + * VACUUM command and would not be something that happens frequently, + * which seems not fit to the purpose of the eager scanning. Also, it + * would require making the code complex. So it would make sense to + * disable it for now. + * + * XXX: this limitation might be eliminated in the future for example + * when we use parallel vacuum also in autovacuum. + */ + if (ParallelHeapVacuumIsActive(vacrel)) + return; + /* * Aggressively vacuuming a small relation shouldn't take long, so it * isn't worth amortizing. We use two times the region size as the size @@ -773,6 +930,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, /* Initialize remaining counters (be tidy) */ vacrel->num_index_scans = 0; + vacrel->next_fsm_block_to_vacuum = 0; /* * Get cutoffs that determine which deleted tuples are considered DEAD, @@ -815,13 +973,6 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->skipwithvm = skipwithvm; - /* - * Set up eager scan tracking state. This must happen after determining - * whether or not the vacuum must be aggressive, because only normal - * vacuums use the eager scan algorithm. - */ - heap_vacuum_eager_scan_setup(vacrel, params); - if (verbose) { if (vacrel->aggressive) @@ -846,6 +997,13 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, lazy_check_wraparound_failsafe(vacrel); dead_items_alloc(vacrel, params->nworkers); + /* + * Set up eager scan tracking state. This must happen after determining + * whether or not the vacuum must be aggressive, because only normal + * vacuums use the eager scan algorithm. + */ + heap_vacuum_eager_scan_setup(vacrel, params); + /* * Call lazy_scan_heap to perform all required heap pruning, index * vacuuming, and heap vacuuming (plus related processing) @@ -1215,13 +1373,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, static void lazy_scan_heap(LVRelState *vacrel) { - ReadStream *stream; - BlockNumber rel_pages = vacrel->scan_data->rel_pages, - blkno = 0, - next_fsm_block_to_vacuum = 0; - BlockNumber orig_eager_scan_success_limit = - vacrel->eager_scan_remaining_successes; /* for logging */ - Buffer vmbuffer = InvalidBuffer; + BlockNumber rel_pages = vacrel->scan_data->rel_pages; const int initprog_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_TOTAL_HEAP_BLKS, @@ -1242,6 +1394,76 @@ lazy_scan_heap(LVRelState *vacrel) vacrel->next_unskippable_eager_scanned = false; vacrel->next_unskippable_vmbuffer = InvalidBuffer; + /* Do the actual work */ + if (ParallelHeapVacuumIsActive(vacrel)) + do_parallel_lazy_scan_heap(vacrel); + else + do_lazy_scan_heap(vacrel); + + /* + * Report that everything is now scanned. We never skip scanning the last + * block in the relation, so we can pass rel_pages here. + */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, + rel_pages); + + /* now we can compute the new value for pg_class.reltuples */ + vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, + vacrel->scan_data->scanned_pages, + vacrel->scan_data->live_tuples); + + /* + * Also compute the total number of surviving heap entries. In the + * (unlikely) scenario that new_live_tuples is -1, take it as zero. + */ + vacrel->new_rel_tuples = + Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples + + vacrel->scan_data->missed_dead_tuples; + + /* + * Do index vacuuming (call each index's ambulkdelete routine), then do + * related heap vacuuming + */ + if (vacrel->dead_items_info->num_items > 0) + lazy_vacuum(vacrel); + + /* + * Vacuum the remainder of the Free Space Map. We must do this whether or + * not there were indexes, and whether or not we bypassed index vacuuming. + * We can pass rel_pages here because we never skip scanning the last + * block of the relation. + */ + if (rel_pages > vacrel->next_fsm_block_to_vacuum) + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, rel_pages); + + /* report all blocks vacuumed */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages); + + /* Do final index cleanup (call each index's amvacuumcleanup routine) */ + if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) + lazy_cleanup_all_indexes(vacrel); +} + +/* + * Workhorse for lazy_scan_heap(). + * + * Return true if the scan reaches the end of the table, otherwise false. In single + * process vacuum, since we loop the cycle of heap scanning, vacuuming and heap + * vacuuming until we reach to the end of table, it always returns true. On the other + * hand, in parallel vacuum case, if the dead items space is overrunning the available + * space, we exit from this function without invoking a cycle of index and heap vacuuming. + * In this case, we return false. + */ +static bool +do_lazy_scan_heap(LVRelState *vacrel) +{ + ReadStream *stream; + BlockNumber blkno = InvalidBlockNumber; + BlockNumber orig_eager_scan_success_limit = + vacrel->eager_scan_remaining_successes; /* for logging */ + Buffer vmbuffer = InvalidBuffer; + bool reach_eot = true; + /* Set up the read stream for vacuum's first pass through the heap */ stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE, vacrel->bstrategy, @@ -1271,8 +1493,11 @@ lazy_scan_heap(LVRelState *vacrel) * that point. This check also provides failsafe coverage for the * one-pass strategy, and the two-pass strategy with the index_cleanup * param set to 'off'. + * + * The failsafe check should be done only by the leader process. */ - if (vacrel->scan_data->scanned_pages > 0 && + if (!IsParallelWorker() && + vacrel->scan_data->scanned_pages > 0 && vacrel->scan_data->scanned_pages % FAILSAFE_EVERY_PAGES == 0) lazy_check_wraparound_failsafe(vacrel); @@ -1296,6 +1521,19 @@ lazy_scan_heap(LVRelState *vacrel) vmbuffer = InvalidBuffer; } + /* + * In parallel heap vacuum, we return false to the caller without + * index and heap vacuuming. The parallel vacuum workers will exit + * and the leader process will perform both index and heap + * vacuuming. + */ + if (ParallelHeapVacuumIsActive(vacrel)) + { + vacrel->plvstate->scanstate->last_blkno = blkno; + reach_eot = false; + break; + } + /* Perform a round of index and heap vacuuming */ vacrel->consider_bypass_optimization = false; lazy_vacuum(vacrel); @@ -1305,9 +1543,9 @@ lazy_scan_heap(LVRelState *vacrel) * upper-level FSM pages. Note that blkno is the previously * processed block. */ - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, blkno + 1); - next_fsm_block_to_vacuum = blkno; + vacrel->next_fsm_block_to_vacuum = blkno; /* Report that we are once again scanning the heap */ pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, @@ -1468,10 +1706,13 @@ lazy_scan_heap(LVRelState *vacrel) * also be no opportunity to update the FSM later, because we'll never * revisit this page. Since updating the FSM is desirable but not * absolutely required, that's OK. + * + * FSM vacuum should be done only by the leader process. */ - if (vacrel->nindexes == 0 - || !vacrel->do_index_vacuuming - || !has_lpdead_items) + if (!IsParallelWorker() && + (vacrel->nindexes == 0 + || !vacrel->do_index_vacuuming + || !has_lpdead_items)) { Size freespace = PageGetHeapFreeSpace(page); @@ -1485,11 +1726,17 @@ lazy_scan_heap(LVRelState *vacrel) * held the cleanup lock and lazy_scan_prune() was called. */ if (got_cleanup_lock && vacrel->nindexes == 0 && has_lpdead_items && - blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) + blkno - vacrel->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) { - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, + /* + * XXX: Since the following logic doesn't consider the + * progress of workers' scan processes, there might be + * unprocessed pages between next_fsm_block_to_vacuum and + * blkno. + */ + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, blkno); - next_fsm_block_to_vacuum = blkno; + vacrel->next_fsm_block_to_vacuum = blkno; } } else @@ -1500,50 +1747,10 @@ lazy_scan_heap(LVRelState *vacrel) if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); - /* - * Report that everything is now scanned. We never skip scanning the last - * block in the relation, so we can pass rel_pages here. - */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, - rel_pages); - - /* now we can compute the new value for pg_class.reltuples */ - vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, - vacrel->scan_data->scanned_pages, - vacrel->scan_data->live_tuples); - - /* - * Also compute the total number of surviving heap entries. In the - * (unlikely) scenario that new_live_tuples is -1, take it as zero. - */ - vacrel->new_rel_tuples = - Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples + - vacrel->scan_data->missed_dead_tuples; - read_stream_end(stream); - /* - * Do index vacuuming (call each index's ambulkdelete routine), then do - * related heap vacuuming - */ - if (vacrel->dead_items_info->num_items > 0) - lazy_vacuum(vacrel); - - /* - * Vacuum the remainder of the Free Space Map. We must do this whether or - * not there were indexes, and whether or not we bypassed index vacuuming. - * We can pass rel_pages here because we never skip scanning the last - * block of the relation. - */ - if (rel_pages > next_fsm_block_to_vacuum) - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, rel_pages); - - /* report all blocks vacuumed */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages); - - /* Do final index cleanup (call each index's amvacuumcleanup routine) */ - if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) - lazy_cleanup_all_indexes(vacrel); + Assert(reach_eot || ParallelHeapVacuumIsActive(vacrel)); + return reach_eot; } /* @@ -1577,10 +1784,28 @@ heap_vac_scan_next_block(ReadStream *stream, { BlockNumber next_block; LVRelState *vacrel = callback_private_data; + PLVState *plvstate = vacrel->plvstate; uint8 blk_info = 0; - /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */ - next_block = vacrel->current_block + 1; +retry: + + if (ParallelHeapVacuumIsActive(vacrel)) + { + /* + * Get the next block to scan using parallel scan. + * + * If we reach the end of the relation, + * table_block_parallelscan_nextpage returns InvalidBlockNumber. + */ + next_block = table_block_parallelscan_nextpage(vacrel->rel, + &(plvstate->scanstate->pbscanwork), + plvstate->pbscan); + } + else + { + /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */ + next_block = vacrel->current_block + 1; + } /* Have we reached the end of the relation? */ if (next_block >= vacrel->scan_data->rel_pages) @@ -1605,8 +1830,22 @@ heap_vac_scan_next_block(ReadStream *stream, * visibility map. */ bool skipsallvis; + BlockNumber end_block; + BlockNumber nblocks_skip; + + /* + * In parallel heap vacuum, compute how man blocks are remaining in + * the current chunk. WE look for the next unskippable block within + * the chunk. + */ + if (ParallelHeapVacuumIsActive(vacrel)) + end_block = next_block + + plvstate->scanstate->pbscanwork.phsw_chunk_remaining + 1; + else + end_block = vacrel->scan_data->rel_pages; - find_next_unskippable_block(vacrel, &skipsallvis); + find_next_unskippable_block(vacrel, &skipsallvis, next_block, + end_block); /* * We now know the next block that we must process. It can be the @@ -1623,11 +1862,33 @@ heap_vac_scan_next_block(ReadStream *stream, * pages then skipping makes updating relfrozenxid unsafe, which is a * real downside. */ - if (vacrel->next_unskippable_block - next_block >= SKIP_PAGES_THRESHOLD) + nblocks_skip = vacrel->next_unskippable_block - next_block; + if (nblocks_skip >= SKIP_PAGES_THRESHOLD) { - next_block = vacrel->next_unskippable_block; if (skipsallvis) vacrel->scan_data->skippedallvis = true; + + if (ParallelHeapVacuumIsActive(vacrel)) + { + /* Tell the parallel scans to skip blocks */ + table_block_parallelscan_skip_pages_in_chunk(vacrel->rel, + &(plvstate->scanstate->pbscanwork), + plvstate->pbscan, + nblocks_skip); + + /* Did we consume all blocks in the chunk? */ + if (plvstate->scanstate->pbscanwork.phsw_chunk_remaining == 0) + { + /* + * Reset the next_unskippable_blocks and try to find an + * unskippable block in the next chunk. + */ + vacrel->next_unskippable_block = InvalidBlockNumber; + goto retry; + } + } + + next_block = vacrel->next_unskippable_block; } } @@ -1663,7 +1924,9 @@ heap_vac_scan_next_block(ReadStream *stream, } /* - * Find the next unskippable block in a vacuum scan using the visibility map. + * Find the next unskippable block in a vacuum scan using the visibility map, + * in a range of start_blk (inclusive) and end_blk (exclusive). + * * The next unskippable block and its visibility information is updated in * vacrel. * @@ -1676,17 +1939,20 @@ heap_vac_scan_next_block(ReadStream *stream, * to skip such a range is actually made, making everything safe.) */ static void -find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis) +find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis, + BlockNumber start_blk, BlockNumber end_blk) { BlockNumber rel_pages = vacrel->scan_data->rel_pages; - BlockNumber next_unskippable_block = vacrel->next_unskippable_block + 1; + BlockNumber next_unskippable_block; Buffer next_unskippable_vmbuffer = vacrel->next_unskippable_vmbuffer; bool next_unskippable_eager_scanned = false; bool next_unskippable_allvis; *skipsallvis = false; - for (;; next_unskippable_block++) + for (next_unskippable_block = start_blk; + next_unskippable_block < end_blk; + next_unskippable_block++) { uint8 mapbits = visibilitymap_get_status(vacrel->rel, next_unskippable_block, @@ -1773,6 +2039,236 @@ find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis) vacrel->next_unskippable_vmbuffer = next_unskippable_vmbuffer; } +/* + * A parallel variant of do_lazy_scan_hep(). The leader process launches + * parallel workers to scan the heap in parallel. +*/ +static void +do_parallel_lazy_scan_heap(LVRelState *vacrel) +{ + PLVScanWorkerState *scanstate; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* Launch parallel workers */ + parallel_lazy_scan_heap_begin(vacrel); + + /* + * Setup the parallel scan description for the leader to join as a worker. + */ + scanstate = palloc0(sizeof(PLVScanWorkerState)); + scanstate->last_blkno = InvalidBlockNumber; + table_block_parallelscan_startblock_init(vacrel->rel, + &(scanstate->pbscanwork), + vacrel->plvstate->pbscan); + vacrel->plvstate->scanstate = scanstate; + + for (;;) + { + bool reach_eot; + BlockNumber min_scan_blk; + + /* + * Scan the table until either we are close to overrunning the + * available space for dead_items TIDs or we reach the end of the + * relation. + */ + reach_eot = do_lazy_scan_heap(vacrel); + + /* + * Parallel lazy heap scan finished. Wait for parallel workers to + * finish and gather scan results. + */ + parallel_lazy_scan_heap_end(vacrel); + + /* We reach the end of the table */ + if (reach_eot) + break; + + /* Perform a round of index and heap vacuuming */ + vacrel->consider_bypass_optimization = false; + lazy_vacuum(vacrel); + + min_scan_blk = parallel_lazy_scan_compute_min_scan_block(vacrel); + + /* + * Vacuum the Free Space Map to make newly-freed space visible on + * upper-level FSM pages. + */ + if (min_scan_blk > vacrel->next_fsm_block_to_vacuum) + { + /* + * min_scanned_blkno was updated when gathering the workers' scan + * results. + */ + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, + min_scan_blk + 1); + vacrel->next_fsm_block_to_vacuum = min_scan_blk; + } + + /* Report that we are once again scanning the heap */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_SCAN_HEAP); + + /* Re-launch workers to restart parallel heap scan */ + parallel_lazy_scan_heap_begin(vacrel); + } + + /* + * The parallel heap scan finished, but it's possible that some workers + * have allocated blocks but not processed them yet. This can happen for + * example when workers exit because they are full of dead_items TIDs and + * the leader process launched fewer workers in the next cycle. + */ + complete_unfinihsed_lazy_scan_heap(vacrel); +} + +/* + * Return the minimum block number the leader and workers have scanned so far. + */ +static BlockNumber +parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel) +{ + BlockNumber min_blk; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + + min_blk = vacrel->plvstate->scanstate->last_blkno; + + /* + * We check all worker scan states here to compute the minimum block + * number among all scan states. + */ + for (int i = 0; i < vacrel->leader->nworkers_launched; i++) + { + PLVScanWorkerState *scanstate = &(vacrel->leader->scanstates[i]); + + /* Skip if no worker has been initialized the scan state */ + if (!scanstate->inited) + continue; + + if (!BlockNumberIsValid(min_blk) || scanstate->last_blkno < min_blk) + min_blk = scanstate->last_blkno; + } + + Assert(BlockNumberIsValid(min_blk)); + return min_blk; +} + +/* + * Complete parallel heaps scans that have remaining blocks in their + * chunks. + */ +static void +complete_unfinihsed_lazy_scan_heap(LVRelState *vacrel) +{ + int nworkers; + + Assert(!IsParallelWorker()); + + nworkers = parallel_vacuum_get_nworkers_table(vacrel->pvs); + + for (int i = 0; i < nworkers; i++) + { + PLVScanWorkerState *scanstate = &(vacrel->leader->scanstates[i]); + + /* + * Skip if this worker's scan has not been used or doesn't have + * unprocessed block in chunks. + */ + if (!scanstate->inited || !scanstate->maybe_have_unprocessed_blocks) + continue; + + /* Attach the worker's scan state and do heap scan */ + vacrel->plvstate->scanstate = scanstate; + do_lazy_scan_heap(vacrel); + } + + /* + * We don't need to gather the scan results here because the leader's scan + * state got updated directly. + */ +} + +/* + * Helper routine to launch parallel workers for parallel lazy heap scan. + */ +static void +parallel_lazy_scan_heap_begin(LVRelState *vacrel) +{ + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* launcher workers */ + vacrel->leader->nworkers_launched = parallel_vacuum_collect_dead_items_begin(vacrel->pvs); + + ereport(vacrel->verbose ? INFO : DEBUG2, + (errmsg(ngettext("launched %d parallel vacuum worker for collecting dead tuples (planned: %d)", + "launched %d parallel vacuum workers for collecting dead tuples (planned: %d)", + vacrel->leader->nworkers_launched), + vacrel->leader->nworkers_launched, + parallel_vacuum_get_nworkers_table(vacrel->pvs)))); +} + +/* + * Helper routine to finish the parallel lazy heap scan. + */ +static void +parallel_lazy_scan_heap_end(LVRelState *vacrel) +{ + /* Wait for all parallel workers to finish */ + parallel_vacuum_scan_end(vacrel->pvs); + + /* Gather the workers' scan results */ + parallel_lazy_scan_gather_scan_results(vacrel); +} + +/* Accumulate each worker's scan results into the leader's */ +static void +parallel_lazy_scan_gather_scan_results(LVRelState *vacrel) +{ + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* Gather the workers' scan results */ + for (int i = 0; i < vacrel->leader->nworkers_launched; i++) + { + LVScanData *data = &(vacrel->plvstate->shared->worker_scandata[i]); + +#define ACCUM_COUNT(item) vacrel->scan_data->item += data->item + ACCUM_COUNT(scanned_pages); + ACCUM_COUNT(removed_pages); + ACCUM_COUNT(new_frozen_tuple_pages); + ACCUM_COUNT(vm_new_visible_pages); + ACCUM_COUNT(vm_new_visible_frozen_pages); + ACCUM_COUNT(vm_new_frozen_pages); + ACCUM_COUNT(lpdead_item_pages); + ACCUM_COUNT(missed_dead_pages); + ACCUM_COUNT(tuples_deleted); + ACCUM_COUNT(tuples_frozen); + ACCUM_COUNT(lpdead_items); + ACCUM_COUNT(live_tuples); + ACCUM_COUNT(recently_dead_tuples); + ACCUM_COUNT(missed_dead_tuples); +#undef ACCUM_COUNT + + Assert(TransactionIdIsValid(data->NewRelfrozenXid)); + Assert(MultiXactIdIsValid(data->NewRelminMxid)); + + if (TransactionIdPrecedes(data->NewRelfrozenXid, vacrel->scan_data->NewRelfrozenXid)) + vacrel->scan_data->NewRelfrozenXid = data->NewRelfrozenXid; + + if (MultiXactIdPrecedesOrEquals(data->NewRelminMxid, vacrel->scan_data->NewRelminMxid)) + vacrel->scan_data->NewRelminMxid = data->NewRelminMxid; + + if (data->nonempty_pages < vacrel->scan_data->nonempty_pages) + vacrel->scan_data->nonempty_pages = data->nonempty_pages; + + vacrel->scan_data->skippedallvis |= data->skippedallvis; + } +} + /* * lazy_scan_new_or_empty() -- lazy_scan_heap() new/empty page handling. * @@ -3486,12 +3982,8 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; - /* - * Initialize state for a parallel vacuum. As of now, only one worker can - * be used for an index, so we invoke parallelism only if there are at - * least two indexes on a table. - */ - if (nworkers >= 0 && vacrel->nindexes > 1 && vacrel->do_index_vacuuming) + /* Initialize state for a parallel vacuum */ + if (nworkers >= 0) { /* * Since parallel workers cannot access data in temporary tables, we @@ -3509,11 +4001,17 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else + { + /* + * We initialize the parallel vacuum state for either lazy heap + * scan or index vacuuming, or both. + */ vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, vacrel->nindexes, nworkers, vac_work_mem, vacrel->verbose ? INFO : DEBUG2, vacrel->bstrategy, (void *) vacrel); + } /* * If parallel mode started, dead_items and dead_items_info spaces are @@ -3553,9 +4051,15 @@ dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets, }; int64 prog_val[2]; + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreLockExclusive(vacrel->dead_items); + TidStoreSetBlockOffsets(vacrel->dead_items, blkno, offsets, num_offsets); vacrel->dead_items_info->num_items += num_offsets; + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreUnlock(vacrel->dead_items); + /* update the progress information */ prog_val[0] = vacrel->dead_items_info->num_items; prog_val[1] = TidStoreMemoryUsage(vacrel->dead_items); @@ -3758,12 +4262,217 @@ update_relstats_all_indexes(LVRelState *vacrel) /* * Compute the number of workers for parallel heap vacuum. * - * Return 0 to disable parallel vacuum so far. + * The calculation logic is borrowed from compute_parallel_worker(). */ int heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested) { - return 0; + int parallel_workers = 0; + int heap_parallel_threshold; + int heap_pages; + + if (nworkers_requested == 0) + { + /* + * Select the number of workers based on the log of the size of the + * relation. Note that the upper limit of the min_parallel_table_scan_size + * GUC is chosen to prevent overflow here. + */ + heap_parallel_threshold = Max(min_parallel_table_scan_size, 1); + heap_pages = RelationGetNumberOfBlocks(rel); + while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3)) + { + parallel_workers++; + heap_parallel_threshold *= 3; + if (heap_parallel_threshold > INT_MAX / 3) + break; + } + } + else + parallel_workers = nworkers_requested; + + return parallel_workers; +} + +/* + * Estimate shared memory size required for parallel heap vacuum. + */ +void +heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + Size size = 0; + + vacrel->leader = palloc(sizeof(PLVLeader)); + + /* Estimate space for PLVShared */ + size = add_size(size, SizeOfPLVShared); + size = add_size(size, mul_size(sizeof(LVScanData), nworkers)); + vacrel->leader->shared_len = size; + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->shared_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for ParallelBlockTableScanDesc */ + vacrel->leader->pbscan_len = table_block_parallelscan_estimate(rel); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->pbscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for an array of PLVScanWorkerState */ + vacrel->leader->scanstates_len = mul_size(sizeof(PLVScanWorkerState), nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scanstates_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Set up shared memory for parallel heap vacuum. + */ +void +heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + PLVShared *shared; + ParallelBlockTableScanDesc pbscan; + PLVScanWorkerState *scanstates; + + vacrel->plvstate = palloc0(sizeof(PLVState)); + + /* Initialize PLVShared */ + shared = shm_toc_allocate(pcxt->toc, vacrel->leader->shared_len); + MemSet(shared, 0, vacrel->leader->shared_len); + shared->aggressive = vacrel->aggressive; + shared->skipwithvm = vacrel->skipwithvm; + shared->cutoffs = vacrel->cutoffs; + shared->NewRelfrozenXid = vacrel->scan_data->NewRelfrozenXid; + shared->NewRelminMxid = vacrel->scan_data->NewRelminMxid; + shared->vistest = *vacrel->vistest; + shm_toc_insert(pcxt->toc, LV_PARALLEL_KEY_SHARED, shared); + vacrel->plvstate->shared = shared; + + /* Initialize ParallelBlockTableScanDesc */ + pbscan = shm_toc_allocate(pcxt->toc, vacrel->leader->pbscan_len); + table_block_parallelscan_initialize(rel, (ParallelTableScanDesc) pbscan); + pbscan->base.phs_syncscan = false; /* always start from the first block */ + shm_toc_insert(pcxt->toc, LV_PARALLEL_KEY_SCANDESC, pbscan); + vacrel->plvstate->pbscan = pbscan; + + /* Initialize the array of PLVScanWorkerState */ + scanstates = shm_toc_allocate(pcxt->toc, vacrel->leader->scanstates_len); + MemSet(scanstates, 0, vacrel->leader->scanstates_len); + shm_toc_insert(pcxt->toc, LV_PARALLEL_KEY_WORKER_SCANSTATE, scanstates); + vacrel->leader->scanstates = scanstates; +} + +/* + * Initialize lazy vacuum state with the information retrieved from + * shared memory. + */ +void +heap_parallel_vacuum_initialize_worker(Relation rel, ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt, + void **state_out) +{ + LVRelState *vacrel; + PLVState *plvstate; + PLVShared *shared; + PLVScanWorkerState *scanstates; + ParallelBlockTableScanDesc pbscan; + + /* Initialize PLVState and prepare the related objects */ + + plvstate = palloc0(sizeof(PLVState)); + + /* Prepare PLVShared */ + shared = (PLVShared *) shm_toc_lookup(pwcxt->toc, LV_PARALLEL_KEY_SHARED, false); + plvstate->shared = shared; + + /* Prepare ParallelBlockTableScanWorkerData */ + pbscan = shm_toc_lookup(pwcxt->toc, LV_PARALLEL_KEY_SCANDESC, false); + plvstate->pbscan = pbscan; + + /* Prepare PLVScanWorkerState */ + scanstates = shm_toc_lookup(pwcxt->toc, LV_PARALLEL_KEY_WORKER_SCANSTATE, false); + plvstate->scanstate = &(scanstates[ParallelWorkerNumber]); + + /* Initialize LVRelState and prepare fields required by lazy scan heap */ + vacrel = palloc0(sizeof(LVRelState)); + vacrel->rel = rel; + vacrel->indrels = parallel_vacuum_get_table_indexes(pvs, + &vacrel->nindexes); + vacrel->pvs = pvs; + vacrel->aggressive = shared->aggressive; + vacrel->skipwithvm = shared->skipwithvm; + vacrel->cutoffs = shared->cutoffs; + vacrel->vistest = &(shared->vistest); + vacrel->dead_items = parallel_vacuum_get_dead_items(pvs, + &vacrel->dead_items_info); + vacrel->plvstate = plvstate; + vacrel->scan_data = &(shared->worker_scandata[ParallelWorkerNumber]); + MemSet(vacrel->scan_data, 0, sizeof(LVScanData)); + vacrel->scan_data->NewRelfrozenXid = shared->NewRelfrozenXid; + vacrel->scan_data->NewRelminMxid = shared->NewRelminMxid; + vacrel->scan_data->skippedallvis = false; + vacrel->scan_data->rel_pages = RelationGetNumberOfBlocks(rel); + + /* + * Initialize the scan state if not yet. The chunk of blocks will be + * allocated when to get the scan block for the first time. + */ + if (!vacrel->plvstate->scanstate->inited) + { + vacrel->plvstate->scanstate->inited = true; + table_block_parallelscan_startblock_init(rel, + &(vacrel->plvstate->scanstate->pbscanwork), + vacrel->plvstate->pbscan); + vacrel->plvstate->scanstate->maybe_have_unprocessed_blocks = false; + } + + *state_out = (void *) vacrel; +} + +/* + * Parallel heap vacuum callback for collecting dead items (i.e., lazy heap scan). + */ +void +heap_parallel_vacuum_collect_dead_items(Relation rel, ParallelVacuumState *pvs, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + ErrorContextCallback errcallback; + bool reach_eot; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + + /* + * Setup error traceback support for ereport() for parallel table vacuum + * workers + */ + vacrel->dbname = get_database_name(MyDatabaseId); + vacrel->relnamespace = get_database_name(RelationGetNamespace(rel)); + vacrel->relname = pstrdup(RelationGetRelationName(rel)); + vacrel->indname = NULL; + vacrel->phase = VACUUM_ERRCB_PHASE_SCAN_HEAP; + errcallback.callback = vacuum_error_callback; + errcallback.arg = &vacrel; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Join the parallel heap vacuum */ + reach_eot = do_lazy_scan_heap(vacrel); + + /* + * If the leader or a worker finishes the heap scan because dead_items + * TIDs is close to the limit, lazy heap scan stops while it might have + * some unscanned blocks in the allocated chunk. Since this scan state + * could not be used in the next heap scan, we remember that it might have + * some unconsumed blocks so that the leader complete the scans after the + * heap scan phase finishes. + */ + vacrel->plvstate->scanstate->maybe_have_unprocessed_blocks = !reach_eot; + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; } /* diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index a56c5eceb14..a0a92dc8be5 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -600,6 +600,21 @@ table_block_parallelscan_nextpage(Relation rel, return page; } +/* + * skip some blocks to scan. + * + * Consume the given number of blocks in the current chunk. It doesn't skip blocks + * beyond the current chunk. + */ +void +table_block_parallelscan_skip_pages_in_chunk(Relation rel, + ParallelBlockTableScanWorker pbscanwork, + ParallelBlockTableScanDesc pbscan, + BlockNumber nblocks_skip) +{ + pbscanwork->phsw_chunk_remaining -= Min(nblocks_skip, pbscanwork->phsw_chunk_remaining); +} + /* ---------------------------------------------------------------------------- * Helper functions to implement relation sizing for block oriented AMs. * ---------------------------------------------------------------------------- diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 0acd7fa9144..ea8ffa4efaf 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -508,6 +508,26 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) pfree(pvs); } +/* + * Return the number of parallel workers initialized for parallel table vacuum. + */ +int +parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs) +{ + return pvs->nworkers_for_table; +} + +/* + * Return the array of indexes associated to the given table to be vacuumed. + */ +Relation * +parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes) +{ + *nindexes = pvs->nindexes; + + return pvs->indrels; +} + /* * Returns the dead items space and dead items information. */ @@ -1113,7 +1133,8 @@ parallel_vacuum_scan_end(ParallelVacuumState *pvs) WaitForParallelWorkersToFinish(pvs->pcxt); /* Decrement the worker count for the leader itself */ - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index c80c6b16143..7848e4621b7 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -15,6 +15,7 @@ #define HEAPAM_H #include "access/heapam_xlog.h" +#include "access/parallel.h" #include "access/relation.h" /* for backward compatibility */ #include "access/relscan.h" #include "access/sdir.h" @@ -407,9 +408,20 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer, /* in heap/vacuumlazy.c */ struct VacuumParams; +struct ParallelVacuumState; extern void heap_vacuum_rel(Relation rel, struct VacuumParams *params, BufferAccessStrategy bstrategy); extern int heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested); +extern int heap_parallel_vacuum_compute_workers(Relation rel, int max_workers); +extern void heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, + void *state); +extern void heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, + int nworkers, void *state); +extern void heap_parallel_vacuum_initialize_worker(Relation rel, struct ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt, + void **state_out); +extern void heap_parallel_vacuum_collect_dead_items(Relation rel, struct ParallelVacuumState *pvs, + void *state); /* in heap/heapam_visibility.c */ extern bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 3e1e6aefeb5..34c2b7856e5 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -2231,6 +2231,10 @@ extern BlockNumber table_block_parallelscan_nextpage(Relation rel, extern void table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanDesc pbscan); +extern void table_block_parallelscan_skip_pages_in_chunk(Relation rel, + ParallelBlockTableScanWorker pbscanwork, + ParallelBlockTableScanDesc pbscan, + BlockNumber nblocks_skip); /* ---------------------------------------------------------------------------- diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 596e901c207..20eecad4ec4 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -384,6 +384,8 @@ extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels BufferAccessStrategy bstrategy, void *state); extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); +extern int parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs); +extern Relation *parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes); extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p); extern void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs); diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out index 1a07dbf67d6..f3ddaaad081 100644 --- a/src/test/regress/expected/vacuum.out +++ b/src/test/regress/expected/vacuum.out @@ -156,6 +156,13 @@ UPDATE pvactst SET i = i WHERE i < 1000; VACUUM (PARALLEL 2) pvactst; UPDATE pvactst SET i = i WHERE i < 1000; VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum +CREATE TABLE pvactst2 (i INT) with (autovacuum_enabled = off); +INSERT INTO pvactst2 SELECT generate_series(1,1000) i; +-- VACUUM invokes parallel heap vacuum. +SET min_parallel_table_scan_size to 0; +VACUUM (PARALLEL 2, FREEZE) pvactst2; +UPDATE pvactst2 SET i = i WHERE i < 1000; +VACUUM (PARALLEL 1) pvactst2; VACUUM (PARALLEL -1) pvactst; -- error ERROR: parallel workers for vacuum must be between 0 and 1024 LINE 1: VACUUM (PARALLEL -1) pvactst; @@ -174,7 +181,9 @@ VACUUM (PARALLEL 1, FULL FALSE) tmp; -- parallel vacuum disabled for temp tables WARNING: disabling parallel option of vacuum on "tmp" --- cannot vacuum temporary tables in parallel VACUUM (PARALLEL 0, FULL TRUE) tmp; -- can specify parallel disabled (even though that's implied by FULL) RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; DROP TABLE pvactst; +DROP TABLE pvactst2; -- INDEX_CLEANUP option CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT); -- Use uncompressed data stored in toast. diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql index 5e55079e718..c28bb6b831e 100644 --- a/src/test/regress/sql/vacuum.sql +++ b/src/test/regress/sql/vacuum.sql @@ -125,6 +125,15 @@ VACUUM (PARALLEL 2) pvactst; UPDATE pvactst SET i = i WHERE i < 1000; VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum +CREATE TABLE pvactst2 (i INT) with (autovacuum_enabled = off); +INSERT INTO pvactst2 SELECT generate_series(1,1000) i; + +-- VACUUM invokes parallel heap vacuum. +SET min_parallel_table_scan_size to 0; +VACUUM (PARALLEL 2, FREEZE) pvactst2; +UPDATE pvactst2 SET i = i WHERE i < 1000; +VACUUM (PARALLEL 1) pvactst2; + VACUUM (PARALLEL -1) pvactst; -- error VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) pvactst; VACUUM (PARALLEL 2, FULL TRUE) pvactst; -- error, cannot use both PARALLEL and FULL @@ -136,7 +145,9 @@ CREATE INDEX tmp_idx1 ON tmp (a); VACUUM (PARALLEL 1, FULL FALSE) tmp; -- parallel vacuum disabled for temp tables VACUUM (PARALLEL 0, FULL TRUE) tmp; -- can specify parallel disabled (even though that's implied by FULL) RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; DROP TABLE pvactst; +DROP TABLE pvactst2; -- INDEX_CLEANUP option CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5000d029add..916862153ab 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1927,6 +1927,10 @@ PLpgSQL_type PLpgSQL_type_type PLpgSQL_var PLpgSQL_variable +PLVLeader +PLVScanWorkerState +PLVShared +PLVState PLwdatum PLword PLyArrayToOb -- 2.43.5