From e37a1d648a6cb50a51eb97033d591be1d4170cbd Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 27 Feb 2025 13:41:35 -0800 Subject: [PATCH v14 4/4] Support parallelism for collecting dead items during lazy vacuum. This feature allows the vacuum to leverage multiple CPUs in order to collect dead items (i.e. the first pass over heap table) with parallel workers. The parallel degree for parallel heap vacuuming is determined based on the number of blocks to vacuum unless PARALLEL option of VACUUM command is specified, and further limited by max_parallel_maintenance_workers. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch-through: --- doc/src/sgml/ref/vacuum.sgml | 54 +- src/backend/access/heap/heapam_handler.c | 4 + src/backend/access/heap/vacuumlazy.c | 992 ++++++++++++++++++++--- src/backend/commands/vacuumparallel.c | 29 + src/include/access/heapam.h | 11 + src/include/commands/vacuum.h | 3 + src/test/regress/expected/vacuum.out | 6 + src/test/regress/sql/vacuum.sql | 7 + src/tools/pgindent/typedefs.list | 4 + 9 files changed, 989 insertions(+), 121 deletions(-) diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml index bd5dcaf86a5..294494877d9 100644 --- a/doc/src/sgml/ref/vacuum.sgml +++ b/doc/src/sgml/ref/vacuum.sgml @@ -280,25 +280,41 @@ VACUUM [ ( option [, ...] ) ] [ PARALLEL - Perform index vacuum and index cleanup phases of VACUUM - in parallel using integer - background workers (for the details of each vacuum phase, please - refer to ). The number of workers used - to perform the operation is equal to the number of indexes on the - relation that support parallel vacuum which is limited by the number of - workers specified with PARALLEL option if any which is - further limited by . - An index can participate in parallel vacuum if and only if the size of the - index is more than . - Please note that it is not guaranteed that the number of parallel workers - specified in integer will be - used during execution. It is possible for a vacuum to run with fewer - workers than specified, or even with no workers at all. Only one worker - can be used per index. So parallel workers are launched only when there - are at least 2 indexes in the table. Workers for - vacuum are launched before the start of each phase and exit at the end of - the phase. These behaviors might change in a future release. This - option can't be used with the FULL option. + Perform scanning heap, index vacuum, and index cleanup phases of + VACUUM in parallel using + integer background workers + (for the details of each vacuum phase, please refer to + ). + + + For heap tables, the number of workers used to perform the scanning + heap is determined based on the size of table. A table can participate in + parallel scanning heap if and only if the size of the table is more than + . During scanning heap, + the heap table's blocks will be divided into ranges and shared among the + cooperating processes. Each worker process will complete the scanning of + its given range of blocks before requesting an additional range of blocks. + + + The number of workers used to perform parallel index vacuum and index + cleanup is equal to the number of indexes on the relation that support + parallel vacuum. An index can participate in parallel vacuum if and only + if the size of the index is more than . + Only one worker can be used per index. So parallel workers for index vacuum + and index cleanup are launched only when there are at least 2 + indexes in the table. + + + Workers for vacuum are launched before the start of each phase and exit + at the end of the phase. The number of workers for each phase is limited by + the number of workers specified with PARALLEL option if + any which is futher limited by . + Please note that in any parallel vacuum phase, it is not guaanteed that the + number of parallel workers specified in integer + will be used during execution. It is possible for a vacuum to run with fewer + workers than specified, or even with no workers at all. These behaviors might + change in a future release. This option can't be used with the FULL + option. diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index a534100692a..9de9f4637b2 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2713,6 +2713,10 @@ static const TableAmRoutine heapam_methods = { .scan_sample_next_tuple = heapam_scan_sample_next_tuple, .parallel_vacuum_compute_workers = heap_parallel_vacuum_compute_workers, + .parallel_vacuum_estimate = heap_parallel_vacuum_estimate, + .parallel_vacuum_initialize = heap_parallel_vacuum_initialize, + .parallel_vacuum_initialize_worker = heap_parallel_vacuum_initialize_worker, + .parallel_vacuum_collect_dead_items = heap_parallel_vacuum_collect_dead_items, }; diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index c54cffdc399..bdb90c2e771 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -99,6 +99,46 @@ * After pruning and freezing, pages that are newly all-visible and all-frozen * are marked as such in the visibility map. * + * Parallel Vacuum: + * + * Lazy vacuum on heap tables supports parallel processing for phase I and + * phase II. Before starting phase I, we initialize parallel vacuum state, + * ParallelVacuumState, and allocate the TID store in a DSA area if we can + * use parallel mode for any of these two phases. + * + * We could require different number of parallel vacuum workers for each phase + * for various factors such as table size and number of indexes. Parallel + * workers are launched at the beginning of each phase and exit at the end of + * each phase. + * + * For the parallel lazy heap scan (i.e. parallel phase I), we employ a parallel + * block table scan, controlled by ParallelBlockTableScanDesc, in conjunction + * with the read stream. The table is split into multiple chunks, which are + * then distributed among parallel workers. + * + * While vacuum cutoffs are shared between leader and worker processes, each + * individual process uses its own GlobalVisState, potentially causing some + * workers to remove fewer tuples than optimal. During parallel lazy heap scans, + * each worker tracks the oldest existing XID and MXID. The leader computes the + * globally oldest existing XID and MXID after the parallel scan, while + * gathering table data too. + * + * The workers' parallel scan descriptions, ParallelBlockTableScanWorkerData, + * are stored in the DSM space, enabling different parallel workers to resume + * phase I from their previous state. However, due to the potential presence + * of pinned buffers loaded by the read stream's look-ahead mechanism, we + * cannot abruptly stop phase I even when the space of dead_items TIDs exceeds + * the limit. Instead, once this threshold is surpassed, we begin processing + * pages without attempting to retrieve additional blocks until the read + * stream is exhausted. While this approach may increase the memory usage, it + * typically doesn't pose a significant problem, as processing a few 10s-100s + * buffers doesn't substantially increase the size of dead_items TIDs. + * + * If the leader launches fewer workers than the previous time to resume the + * parallel lazy heap scan, some block within chunks may remain un-scanned. + * To address this, the leader completes workers' unfinished scans at the end + * of the parallel lazy heap scan (see complete_unfinished_lazy_scan_heap()). + * * Dead TID Storage: * * The major space usage for vacuuming is storage for the dead tuple IDs that @@ -147,6 +187,7 @@ #include "common/pg_prng.h" #include "executor/instrument.h" #include "miscadmin.h" +#include "optimizer/paths.h" /* for min_parallel_table_scan_size */ #include "pgstat.h" #include "portability/instr_time.h" #include "postmaster/autovacuum.h" @@ -214,11 +255,21 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) +/* + * DSM keys for parallel lazy vacuum. Unlike other parallel execution code, we + * we don't need to worry about DSM keys conflicting with plan_node_id, but need to + * avoid conflicting with DSM keys used in vacuumparallel.c. + */ +#define PARALLEL_LV_KEY_SHARED 0xFFFF0001 +#define PARALLEL_LV_KEY_SCANDESC 0xFFFF0002 +#define PARALLEL_LV_KEY_SCANWORKER 0xFFFF0003 + /* * Macro to check if we are in a parallel vacuum. If true, we are in the * parallel mode and the DSM segment is initialized. */ #define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL) +#define ParallelHeapVacuumIsActive(vacrel) ((vacrel)->plvstate != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -306,6 +357,80 @@ typedef struct LVScanData bool skippedallvis; } LVScanData; +/* + * Struct for information that needs to be shared among parallel workers + * for parallel lazy vacuum. All fields are static, set by the leader + * process. + */ +typedef struct ParallelLVShared +{ + bool aggressive; + bool skipwithvm; + + /* The current oldest extant XID/MXID shared by the leader process */ + TransactionId NewRelfrozenXid; + MultiXactId NewRelminMxid; + + /* VACUUM operation's cutoffs for freezing and pruning */ + struct VacuumCutoffs cutoffs; +} ParallelLVShared; + +/* + * Per-worker data for scan description, statistics counters, and + * miscellaneous data need to be shared with the leader. + */ +typedef struct ParallelLVScanWorker +{ + /* Both last_blkno and pbscanworkdata are initialized? */ + bool scan_inited; + + /* The last processed block number */ + pg_atomic_uint32 last_blkno; + + /* per-worker parallel table scan state */ + ParallelBlockTableScanWorkerData pbscanworkdata; + + /* per-worker scan data and counters */ + LVScanData scandata; +} ParallelLVScanWorker; + +/* + * Struct to store parallel lazy vacuum working state. + */ +typedef struct ParallelLVState +{ + /* Parallel scan description shared among parallel workers */ + ParallelBlockTableScanDesc pbscan; + + /* Per-worker parallel table scan state */ + ParallelBlockTableScanWorker pbscanwork; + + /* Shared static information */ + ParallelLVShared *shared; + + /* Per-worker scan data. NULL for the leader process */ + ParallelLVScanWorker *scanworker; +} ParallelLVState; + +/* + * Struct for the leader process in parallel lazy vacuum. + */ +typedef struct ParallelLVLeader +{ + /* Shared memory size for each shared object */ + Size pbscan_len; + Size shared_len; + Size scanworker_len; + + /* The number of workers launched for parallel lazy heap scan */ + int nworkers_launched; + + /* + * Points to the array of all per-worker scan states stored on DSM area. + */ + ParallelLVScanWorker *scanworkers; +} ParallelLVLeader; + typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -368,6 +493,12 @@ typedef struct LVRelState /* Instrumentation counters */ int num_index_scans; + /* Last processed block number */ + BlockNumber last_blkno; + + /* Next block to check for FSM vacuum */ + BlockNumber next_fsm_block_to_vacuum; + /* State maintained by heap_vac_scan_next_block() */ BlockNumber current_block; /* last block returned */ BlockNumber next_unskippable_block; /* next unskippable block */ @@ -375,6 +506,16 @@ typedef struct LVRelState bool next_unskippable_eager_scanned; /* if it was eagerly scanned */ Buffer next_unskippable_vmbuffer; /* buffer containing its VM bit */ + /* Fields used for parallel lazy vacuum */ + + /* Parallel lazy vacuum working state */ + ParallelLVState *plvstate; + + /* + * The leader state for parallel lazy vacuum. NULL for parallel workers. + */ + ParallelLVLeader *leader; + /* State related to managing eager scanning of all-visible pages */ /* @@ -434,12 +575,14 @@ typedef struct LVSavedErrInfo /* non-export function prototypes */ static void lazy_scan_heap(LVRelState *vacrel); +static void do_lazy_scan_heap(LVRelState *vacrel, bool do_vacuum); static void heap_vacuum_eager_scan_setup(LVRelState *vacrel, VacuumParams *params); static BlockNumber heap_vac_scan_next_block(ReadStream *stream, void *callback_private_data, void *per_buffer_data); -static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis); +static bool find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis, + BlockNumber start_blk, BlockNumber end_blk); static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno, Page page, bool sharelock, Buffer vmbuffer); @@ -450,6 +593,12 @@ static void lazy_scan_prune(LVRelState *vacrel, Buffer buf, static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf, BlockNumber blkno, Page page, bool *has_lpdead_items); +static void do_parallel_lazy_scan_heap(LVRelState *vacrel); +static BlockNumber parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel); +static void complete_unfinished_lazy_scan_heap(LVRelState *vacrel); +static void parallel_lazy_scan_heap_begin(LVRelState *vacrel); +static void parallel_lazy_scan_heap_end(LVRelState *vacrel); +static void parallel_lazy_scan_gather_scan_results(LVRelState *vacrel); static void lazy_vacuum(LVRelState *vacrel); static bool lazy_vacuum_all_indexes(LVRelState *vacrel); static void lazy_vacuum_heap_rel(LVRelState *vacrel); @@ -474,6 +623,7 @@ static BlockNumber count_nondeletable_pages(LVRelState *vacrel, static void dead_items_alloc(LVRelState *vacrel, int nworkers); static void dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets, int num_offsets); +static bool dead_items_check_memory_limit(LVRelState *vacrel); static void dead_items_reset(LVRelState *vacrel); static void dead_items_cleanup(LVRelState *vacrel); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, @@ -529,6 +679,22 @@ heap_vacuum_eager_scan_setup(LVRelState *vacrel, VacuumParams *params) if (vacrel->aggressive) return; + /* + * Disable eager scanning if parallel lazy vacuum is enabled. + * + * One might think that it would make sense to use the eager scanning even + * during parallel lazy vacuum, but parallel vacuum is available only in + * VACUUM command and would not be something that happens frequently, + * which seems not fit to the purpose of the eager scanning. Also, it + * would require making the code complex. So it would make sense to + * disable it for now. + * + * XXX: this limitation might need to be eliminated in the future for + * example when we use parallel vacuum also in autovacuum. + */ + if (ParallelHeapVacuumIsActive(vacrel)) + return; + /* * Aggressively vacuuming a small relation shouldn't take long, so it * isn't worth amortizing. We use two times the region size as the size @@ -771,6 +937,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, /* Initialize remaining counters (be tidy) */ vacrel->num_index_scans = 0; + vacrel->next_fsm_block_to_vacuum = 0; /* dead_items_alloc allocates vacrel->dead_items later on */ @@ -815,13 +982,6 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->skipwithvm = skipwithvm; - /* - * Set up eager scan tracking state. This must happen after determining - * whether or not the vacuum must be aggressive, because only normal - * vacuums use the eager scan algorithm. - */ - heap_vacuum_eager_scan_setup(vacrel, params); - if (verbose) { if (vacrel->aggressive) @@ -846,6 +1006,13 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, lazy_check_wraparound_failsafe(vacrel); dead_items_alloc(vacrel, params->nworkers); + /* + * Set up eager scan tracking state. This must happen after determining + * whether or not the vacuum must be aggressive, because only normal + * vacuums use the eager scan algorithm. + */ + heap_vacuum_eager_scan_setup(vacrel, params); + /* * Call lazy_scan_heap to perform all required heap pruning, index * vacuuming, and heap vacuuming (plus related processing) @@ -1215,13 +1382,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, static void lazy_scan_heap(LVRelState *vacrel) { - ReadStream *stream; - BlockNumber rel_pages = vacrel->scan_data->rel_pages, - blkno = 0, - next_fsm_block_to_vacuum = 0; - BlockNumber orig_eager_scan_success_limit = - vacrel->eager_scan_remaining_successes; /* for logging */ - Buffer vmbuffer = InvalidBuffer; + BlockNumber rel_pages = vacrel->scan_data->rel_pages; const int initprog_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_TOTAL_HEAP_BLKS, @@ -1242,6 +1403,73 @@ lazy_scan_heap(LVRelState *vacrel) vacrel->next_unskippable_eager_scanned = false; vacrel->next_unskippable_vmbuffer = InvalidBuffer; + /* Do the actual work */ + if (ParallelHeapVacuumIsActive(vacrel)) + do_parallel_lazy_scan_heap(vacrel); + else + do_lazy_scan_heap(vacrel, true); + + /* + * Report that everything is now scanned. We never skip scanning the last + * block in the relation, so we can pass rel_pages here. + */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, + rel_pages); + + /* now we can compute the new value for pg_class.reltuples */ + vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, + vacrel->scan_data->scanned_pages, + vacrel->scan_data->live_tuples); + + /* + * Also compute the total number of surviving heap entries. In the + * (unlikely) scenario that new_live_tuples is -1, take it as zero. + */ + vacrel->new_rel_tuples = + Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples + + vacrel->scan_data->missed_dead_tuples; + + /* + * Do index vacuuming (call each index's ambulkdelete routine), then do + * related heap vacuuming + */ + if (vacrel->dead_items_info->num_items > 0) + lazy_vacuum(vacrel); + + /* + * Vacuum the remainder of the Free Space Map. We must do this whether or + * not there were indexes, and whether or not we bypassed index vacuuming. + * We can pass rel_pages here because we never skip scanning the last + * block of the relation. + */ + if (rel_pages > vacrel->next_fsm_block_to_vacuum) + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, rel_pages); + + /* report all blocks vacuumed */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages); + + /* Do final index cleanup (call each index's amvacuumcleanup routine) */ + if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) + lazy_cleanup_all_indexes(vacrel); +} + +/* + * Workhorse for lazy_scan_heap(). + * + * If do_vacuum is true, we stop the lazy heap scan and invoke a cycle of index + * vacuuming and table vacuuming if the space of dead_items TIDs exceeds the limit, and + * then resume it. On the other hand, if it's false, we continue scanning until the + * read stream is exhausted. + */ +static void +do_lazy_scan_heap(LVRelState *vacrel, bool do_vacuum) +{ + ReadStream *stream; + BlockNumber blkno = InvalidBlockNumber; + BlockNumber orig_eager_scan_success_limit = + vacrel->eager_scan_remaining_successes; /* for logging */ + Buffer vmbuffer = InvalidBuffer; + /* Set up the read stream for vacuum's first pass through the heap */ stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE, vacrel->bstrategy, @@ -1271,8 +1499,11 @@ lazy_scan_heap(LVRelState *vacrel) * that point. This check also provides failsafe coverage for the * one-pass strategy, and the two-pass strategy with the index_cleanup * param set to 'off'. + * + * The failsafe check is done only by the leader process. */ - if (vacrel->scan_data->scanned_pages > 0 && + if (!IsParallelWorker() && + vacrel->scan_data->scanned_pages > 0 && vacrel->scan_data->scanned_pages % FAILSAFE_EVERY_PAGES == 0) lazy_check_wraparound_failsafe(vacrel); @@ -1280,12 +1511,9 @@ lazy_scan_heap(LVRelState *vacrel) * Consider if we definitely have enough space to process TIDs on page * already. If we are close to overrunning the available space for * dead_items TIDs, pause and do a cycle of vacuuming before we tackle - * this page. However, let's force at least one page-worth of tuples - * to be stored as to ensure we do at least some work when the memory - * configured is so low that we run out before storing anything. + * this page. */ - if (vacrel->dead_items_info->num_items > 0 && - TidStoreMemoryUsage(vacrel->dead_items) > vacrel->dead_items_info->max_bytes) + if (do_vacuum && dead_items_check_memory_limit(vacrel)) { /* * Before beginning index vacuuming, we release any pin we may @@ -1308,15 +1536,16 @@ lazy_scan_heap(LVRelState *vacrel) * upper-level FSM pages. Note that blkno is the previously * processed block. */ - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, blkno + 1); - next_fsm_block_to_vacuum = blkno; + vacrel->next_fsm_block_to_vacuum = blkno; /* Report that we are once again scanning the heap */ pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_PHASE_SCAN_HEAP); } + /* Read the next block to process */ buf = read_stream_next_buffer(stream, &per_buffer_data); /* The relation is exhausted. */ @@ -1326,7 +1555,7 @@ lazy_scan_heap(LVRelState *vacrel) blk_info = *((uint8 *) per_buffer_data); CheckBufferIsPinnedOnce(buf); page = BufferGetPage(buf); - blkno = BufferGetBlockNumber(buf); + blkno = vacrel->last_blkno = BufferGetBlockNumber(buf); vacrel->scan_data->scanned_pages++; if (blk_info & VAC_BLK_WAS_EAGER_SCANNED) @@ -1486,13 +1715,36 @@ lazy_scan_heap(LVRelState *vacrel) * visible on upper FSM pages. This is done after vacuuming if the * table has indexes. There will only be newly-freed space if we * held the cleanup lock and lazy_scan_prune() was called. + * + * During parallel lazy heap scanning, only the leader process + * vacuums the FSM. However, we cannot vacuum the FSM for blocks + * up to 'blk' because there may be un-scanned blocks or blocks + * being processed by workers before this point. Instead, parallel + * workers advertise the block numbers they have just processed, + * and the leader vacuums the FSM up to the smallest block number + * among them. This approach ensures we vacuum the FSM for + * consecutive processed blocks. */ if (got_cleanup_lock && vacrel->nindexes == 0 && has_lpdead_items && - blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) + blkno - vacrel->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) { - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, + if (IsParallelWorker()) + { + pg_atomic_write_u32(&(vacrel->plvstate->scanworker->last_blkno), blkno); - next_fsm_block_to_vacuum = blkno; + } + else + { + BlockNumber fsmvac_upto = blkno; + + if (ParallelHeapVacuumIsActive(vacrel)) + fsmvac_upto = parallel_lazy_scan_compute_min_scan_block(vacrel); + + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, + fsmvac_upto); + } + + vacrel->next_fsm_block_to_vacuum = blkno; } } else @@ -1503,50 +1755,7 @@ lazy_scan_heap(LVRelState *vacrel) if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); - /* - * Report that everything is now scanned. We never skip scanning the last - * block in the relation, so we can pass rel_pages here. - */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, - rel_pages); - - /* now we can compute the new value for pg_class.reltuples */ - vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, - vacrel->scan_data->scanned_pages, - vacrel->scan_data->live_tuples); - - /* - * Also compute the total number of surviving heap entries. In the - * (unlikely) scenario that new_live_tuples is -1, take it as zero. - */ - vacrel->new_rel_tuples = - Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples + - vacrel->scan_data->missed_dead_tuples; - read_stream_end(stream); - - /* - * Do index vacuuming (call each index's ambulkdelete routine), then do - * related heap vacuuming - */ - if (vacrel->dead_items_info->num_items > 0) - lazy_vacuum(vacrel); - - /* - * Vacuum the remainder of the Free Space Map. We must do this whether or - * not there were indexes, and whether or not we bypassed index vacuuming. - * We can pass rel_pages here because we never skip scanning the last - * block of the relation. - */ - if (rel_pages > next_fsm_block_to_vacuum) - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, rel_pages); - - /* report all blocks vacuumed */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages); - - /* Do final index cleanup (call each index's amvacuumcleanup routine) */ - if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) - lazy_cleanup_all_indexes(vacrel); } /* @@ -1560,7 +1769,8 @@ lazy_scan_heap(LVRelState *vacrel) * heap_vac_scan_next_block() uses the visibility map, vacuum options, and * various thresholds to skip blocks which do not need to be processed and * returns the next block to process or InvalidBlockNumber if there are no - * remaining blocks. + * remaining blocks or the space of dead_items TIDs reaches the limit (only + * in parallel lazy vacuum cases). * * The visibility status of the next block to process and whether or not it * was eager scanned is set in the per_buffer_data. @@ -1582,11 +1792,37 @@ heap_vac_scan_next_block(ReadStream *stream, LVRelState *vacrel = callback_private_data; uint8 blk_info = 0; - /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */ - next_block = vacrel->current_block + 1; +retry: + next_block = InvalidBlockNumber; + + /* Get the next block to process */ + if (ParallelHeapVacuumIsActive(vacrel)) + { + /* + * Stop returning the next block to the read stream if we are close to + * overrunning the available space for dead_items TIDs so that the + * read stream returns pinned buffers in its buffers queue until the + * stream is exhausted. See the comments atop this file for details. + */ + if (!dead_items_check_memory_limit(vacrel)) + { + /* + * table_block_parallelscan_nextpage() returns InvalidBlockNumber + * if there are no remaining blocks. + */ + next_block = table_block_parallelscan_nextpage(vacrel->rel, + vacrel->plvstate->pbscanwork, + vacrel->plvstate->pbscan); + } + } + else + { + /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */ + next_block = vacrel->current_block + 1; + } /* Have we reached the end of the relation? */ - if (next_block >= vacrel->scan_data->rel_pages) + if (!BlockNumberIsValid(next_block) || next_block >= vacrel->scan_data->rel_pages) { if (BufferIsValid(vacrel->next_unskippable_vmbuffer)) { @@ -1608,8 +1844,42 @@ heap_vac_scan_next_block(ReadStream *stream, * visibility map. */ bool skipsallvis; + bool found; + BlockNumber end_block; + BlockNumber nblocks_skip; + + if (ParallelHeapVacuumIsActive(vacrel)) + { + /* We look for the next unskippable block within the chunk */ + end_block = next_block + + vacrel->plvstate->pbscanwork->phsw_chunk_remaining + 1; + } + else + end_block = vacrel->scan_data->rel_pages; + + found = find_next_unskippable_block(vacrel, &skipsallvis, next_block, end_block); + + /* + * We must have found the next unskippable block within the specified + * range in non-parallel cases as the end_block is always the last + * block + 1 and we must scan the last block. + */ + Assert(found || ParallelHeapVacuumIsActive(vacrel)); - find_next_unskippable_block(vacrel, &skipsallvis); + if (!found) + { + if (skipsallvis) + vacrel->scan_data->skippedallvis = true; + + /* + * Skip all remaining blocks in the current chunk, and retry with + * the next chunk. + */ + vacrel->plvstate->pbscanwork->phsw_chunk_remaining = 0; + goto retry; + } + + Assert(vacrel->next_unskippable_block < end_block); /* * We now know the next block that we must process. It can be the @@ -1626,11 +1896,20 @@ heap_vac_scan_next_block(ReadStream *stream, * pages then skipping makes updating relfrozenxid unsafe, which is a * real downside. */ - if (vacrel->next_unskippable_block - next_block >= SKIP_PAGES_THRESHOLD) + nblocks_skip = vacrel->next_unskippable_block - next_block; + if (nblocks_skip >= SKIP_PAGES_THRESHOLD) { - next_block = vacrel->next_unskippable_block; if (skipsallvis) vacrel->scan_data->skippedallvis = true; + + /* Tell the parallel scans to skip blocks */ + if (ParallelHeapVacuumIsActive(vacrel)) + { + vacrel->plvstate->pbscanwork->phsw_chunk_remaining -= nblocks_skip; + Assert(vacrel->plvstate->pbscanwork->phsw_chunk_remaining > 0); + } + + next_block = vacrel->next_unskippable_block; } } @@ -1666,9 +1945,11 @@ heap_vac_scan_next_block(ReadStream *stream, } /* - * Find the next unskippable block in a vacuum scan using the visibility map. - * The next unskippable block and its visibility information is updated in - * vacrel. + * Find the next unskippable block in a vacuum scan using the visibility map, + * in a range of 'start' (inclusive) and 'end' (exclusive). + * + * If found, the next unskippable block and its visibility information is updated + * in vacrel. Otherwise, return false and reset the information in vacrel. * * Note: our opinion of which blocks can be skipped can go stale immediately. * It's okay if caller "misses" a page whose all-visible or all-frozen marking @@ -1678,22 +1959,32 @@ heap_vac_scan_next_block(ReadStream *stream, * older XIDs/MXIDs. The *skippedallvis flag will be set here when the choice * to skip such a range is actually made, making everything safe.) */ -static void -find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis) +static bool +find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis, + BlockNumber start, BlockNumber end) { BlockNumber rel_pages = vacrel->scan_data->rel_pages; - BlockNumber next_unskippable_block = vacrel->next_unskippable_block + 1; + BlockNumber next_unskippable_block = start; Buffer next_unskippable_vmbuffer = vacrel->next_unskippable_vmbuffer; bool next_unskippable_eager_scanned = false; bool next_unskippable_allvis; + bool found = true; *skipsallvis = false; for (;; next_unskippable_block++) { - uint8 mapbits = visibilitymap_get_status(vacrel->rel, - next_unskippable_block, - &next_unskippable_vmbuffer); + uint8 mapbits; + + /* Reach the end of range? */ + if (next_unskippable_block >= end) + { + found = false; + break; + } + + mapbits = visibilitymap_get_status(vacrel->rel, next_unskippable_block, + &next_unskippable_vmbuffer); next_unskippable_allvis = (mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0; @@ -1769,11 +2060,274 @@ find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis) *skipsallvis = true; } - /* write the local variables back to vacrel */ - vacrel->next_unskippable_block = next_unskippable_block; - vacrel->next_unskippable_allvis = next_unskippable_allvis; - vacrel->next_unskippable_eager_scanned = next_unskippable_eager_scanned; - vacrel->next_unskippable_vmbuffer = next_unskippable_vmbuffer; + if (found) + { + /* write the local variables back to vacrel */ + vacrel->next_unskippable_block = next_unskippable_block; + vacrel->next_unskippable_allvis = next_unskippable_allvis; + vacrel->next_unskippable_eager_scanned = next_unskippable_eager_scanned; + vacrel->next_unskippable_vmbuffer = next_unskippable_vmbuffer; + } + else + { + if (BufferIsValid(next_unskippable_vmbuffer)) + ReleaseBuffer(next_unskippable_vmbuffer); + + /* + * There is not unskippable block in the specified range. Reset the + * related fields in vacrel. + */ + vacrel->next_unskippable_block = InvalidBlockNumber; + vacrel->next_unskippable_allvis = InvalidBlockNumber; + vacrel->next_unskippable_eager_scanned = false; + vacrel->next_unskippable_vmbuffer = InvalidBuffer; + } + + return found; +} + +/* + * A parallel variant of do_lazy_scan_heap(). The leader process launches + * parallel workers to scan the heap in parallel. +*/ +static void +do_parallel_lazy_scan_heap(LVRelState *vacrel) +{ + ParallelBlockTableScanWorkerData pbscanworkdata; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* + * Setup the parallel scan description for the leader to join as a worker. + */ + table_block_parallelscan_startblock_init(vacrel->rel, + &pbscanworkdata, + vacrel->plvstate->pbscan); + vacrel->plvstate->pbscanwork = &pbscanworkdata; + + for (;;) + { + BlockNumber fsmvac_upto; + + /* Launch parallel workers */ + parallel_lazy_scan_heap_begin(vacrel); + + /* + * Do lazy heap scan until the read stream is exhausted. We will stop + * retrieving new blocks for the read stream once the space of + * dead_items TIDs exceeds the limit. + */ + do_lazy_scan_heap(vacrel, false); + + /* Wait for parallel workers to finish and gather scan results */ + parallel_lazy_scan_heap_end(vacrel); + + if (!dead_items_check_memory_limit(vacrel)) + break; + + /* Perform a round of index and heap vacuuming */ + vacrel->consider_bypass_optimization = false; + lazy_vacuum(vacrel); + + /* Compute the smallest processed block number */ + fsmvac_upto = parallel_lazy_scan_compute_min_scan_block(vacrel); + + /* + * Vacuum the Free Space Map to make newly-freed space visible on + * upper-level FSM pages. + */ + if (fsmvac_upto > vacrel->next_fsm_block_to_vacuum) + { + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, + fsmvac_upto); + vacrel->next_fsm_block_to_vacuum = fsmvac_upto; + } + + /* Report that we are once again scanning the heap */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_SCAN_HEAP); + } + + /* + * The parallel heap scan finished, but it's possible that some workers + * have allocated blocks but not processed them yet. This can happen for + * example when workers exit because they are full of dead_items TIDs and + * the leader process launched fewer workers in the next cycle. + */ + complete_unfinished_lazy_scan_heap(vacrel); +} + +/* + * Return the smallest block number that the leader and workers have scanned. + */ +static BlockNumber +parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel) +{ + BlockNumber min_blk; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + + /* Initialized with the leader's value */ + min_blk = vacrel->last_blkno; + + for (int i = 0; i < vacrel->leader->nworkers_launched; i++) + { + ParallelLVScanWorker *scanworker = &(vacrel->leader->scanworkers[i]); + BlockNumber blkno; + + /* Skip if no worker has been initialized the scan state */ + if (!scanworker->scan_inited) + continue; + + blkno = pg_atomic_read_u32(&(scanworker->last_blkno)); + + if (!BlockNumberIsValid(min_blk) || min_blk > blkno) + min_blk = blkno; + } + + Assert(BlockNumberIsValid(min_blk)); + + return min_blk; +} + +/* + * Complete parallel heaps scans that have remaining blocks in their + * chunks. + */ +static void +complete_unfinished_lazy_scan_heap(LVRelState *vacrel) +{ + int nworkers; + + Assert(!IsParallelWorker()); + + nworkers = parallel_vacuum_get_nworkers_table(vacrel->pvs); + + for (int i = 0; i < nworkers; i++) + { + ParallelLVScanWorker *scanworker = &(vacrel->leader->scanworkers[i]); + + if (!scanworker->scan_inited) + continue; + + if (scanworker->pbscanworkdata.phsw_chunk_remaining == 0) + continue; + + /* Attach the worker's scan state */ + vacrel->plvstate->pbscanwork = &(scanworker->pbscanworkdata); + + /* + * Complete the unfinished scan. Note that we might perform multiple + * cycles of index and heap vacuuming while completing the scans. + */ + vacrel->next_fsm_block_to_vacuum = pg_atomic_read_u32(&(scanworker->last_blkno)); + do_lazy_scan_heap(vacrel, true); + } + + /* + * We don't need to gather the scan results here because the leader's scan + * state got updated directly. + */ +} + +/* + * Helper routine to launch parallel workers for parallel lazy heap scan. + */ +static void +parallel_lazy_scan_heap_begin(LVRelState *vacrel) +{ + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* launcher workers */ + vacrel->leader->nworkers_launched = parallel_vacuum_collect_dead_items_begin(vacrel->pvs); + + ereport(vacrel->verbose ? INFO : DEBUG2, + (errmsg(ngettext("launched %d parallel vacuum worker for collecting dead tuples (planned: %d)", + "launched %d parallel vacuum workers for collecting dead tuples (planned: %d)", + vacrel->leader->nworkers_launched), + vacrel->leader->nworkers_launched, + parallel_vacuum_get_nworkers_table(vacrel->pvs)))); +} + +/* + * Helper routine to finish the parallel lazy heap scan. + */ +static void +parallel_lazy_scan_heap_end(LVRelState *vacrel) +{ + /* Wait for all parallel workers to finish */ + parallel_vacuum_scan_end(vacrel->pvs); + + /* Gather the workers' scan results */ + parallel_lazy_scan_gather_scan_results(vacrel); +} + +/* + * Accumulate each worker's scan results into the leader's. +*/ +static void +parallel_lazy_scan_gather_scan_results(LVRelState *vacrel) +{ + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* Gather the workers' scan results */ + for (int i = 0; i < vacrel->leader->nworkers_launched; i++) + { + LVScanData *data = &(vacrel->leader->scanworkers[i].scandata); + + /* Accumulate the counters collected by workers */ +#define ACCUM_COUNT(item) vacrel->scan_data->item += data->item + ACCUM_COUNT(scanned_pages); + ACCUM_COUNT(removed_pages); + ACCUM_COUNT(new_frozen_tuple_pages); + ACCUM_COUNT(vm_new_visible_pages); + ACCUM_COUNT(vm_new_visible_frozen_pages); + ACCUM_COUNT(vm_new_frozen_pages); + ACCUM_COUNT(lpdead_item_pages); + ACCUM_COUNT(missed_dead_pages); + ACCUM_COUNT(tuples_deleted); + ACCUM_COUNT(tuples_frozen); + ACCUM_COUNT(lpdead_items); + ACCUM_COUNT(live_tuples); + ACCUM_COUNT(recently_dead_tuples); + ACCUM_COUNT(missed_dead_tuples); +#undef ACCUM_COUNT + + /* + * Track the greatest non-empty page among values the workers + * collected as it's used to cut-off point of heap truncation. + */ + if (vacrel->scan_data->nonempty_pages < data->nonempty_pages) + vacrel->scan_data->nonempty_pages = data->nonempty_pages; + + /* + * All workers must have initialized both values with the values + * passed by the leader. + */ + Assert(TransactionIdIsValid(data->NewRelfrozenXid)); + Assert(MultiXactIdIsValid(data->NewRelminMxid)); + + /* + * During parallel lazy scanning, since different workers process + * separate blocks, they may observe different existing XIDs and + * MXIDs. Therefore, we compute the oldest XID and MXID from the + * values observed by each worker (including the leader). These + * computations are crucial for correctly advancing both relfrozenxid + * and relmminmxid values. + */ + + if (TransactionIdPrecedes(data->NewRelfrozenXid, vacrel->scan_data->NewRelfrozenXid)) + vacrel->scan_data->NewRelfrozenXid = data->NewRelfrozenXid; + + if (MultiXactIdPrecedesOrEquals(data->NewRelminMxid, vacrel->scan_data->NewRelminMxid)) + vacrel->scan_data->NewRelminMxid = data->NewRelminMxid; + + /* Has any one of workers skipped all-visible page? */ + vacrel->scan_data->skippedallvis |= data->skippedallvis; + } } /* @@ -2062,7 +2616,8 @@ lazy_scan_prune(LVRelState *vacrel, /* Can't truncate this page */ if (presult.hastup) - vacrel->scan_data->nonempty_pages = blkno + 1; + vacrel->scan_data->nonempty_pages = + Max(blkno + 1, vacrel->scan_data->nonempty_pages); /* Did we find LP_DEAD items? */ *has_lpdead_items = (presult.lpdead_items > 0); @@ -2435,7 +2990,8 @@ lazy_scan_noprune(LVRelState *vacrel, /* Can't truncate this page */ if (hastup) - vacrel->scan_data->nonempty_pages = blkno + 1; + vacrel->scan_data->nonempty_pages = + Max(blkno + 1, vacrel->scan_data->nonempty_pages); /* Did we find LP_DEAD items? */ *has_lpdead_items = (lpdead_items > 0); @@ -3489,12 +4045,8 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; - /* - * Initialize state for a parallel vacuum. As of now, only one worker can - * be used for an index, so we invoke parallelism only if there are at - * least two indexes on a table. - */ - if (nworkers >= 0 && vacrel->nindexes > 1 && vacrel->do_index_vacuuming) + /* Initialize state for a parallel vacuum */ + if (nworkers >= 0) { /* * Since parallel workers cannot access data in temporary tables, we @@ -3512,11 +4064,17 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else + { + /* + * We initialize the parallel vacuum state for either lazy heap + * scan, index vacuuming, or both. + */ vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, vacrel->nindexes, nworkers, vac_work_mem, vacrel->verbose ? INFO : DEBUG2, vacrel->bstrategy, (void *) vacrel); + } /* * If parallel mode started, dead_items and dead_items_info spaces are @@ -3556,15 +4114,35 @@ dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets, }; int64 prog_val[2]; + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreLockExclusive(vacrel->dead_items); + TidStoreSetBlockOffsets(vacrel->dead_items, blkno, offsets, num_offsets); vacrel->dead_items_info->num_items += num_offsets; + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreUnlock(vacrel->dead_items); + /* update the progress information */ prog_val[0] = vacrel->dead_items_info->num_items; prog_val[1] = TidStoreMemoryUsage(vacrel->dead_items); pgstat_progress_update_multi_param(2, prog_index, prog_val); } +/* + * Check the memory usage of the collected dead items and return true + * if we are close to overrunning the available space for dead_items TIDs. + * However, let's force at least one page-worth of tuples to be stored as + * to ensure we do at least some work when the memory configured is so low + * that we run out before storing anything. + */ +static bool +dead_items_check_memory_limit(LVRelState *vacrel) +{ + return vacrel->dead_items_info->num_items > 0 && + TidStoreMemoryUsage(vacrel->dead_items) > vacrel->dead_items_info->max_bytes; +} + /* * Forget all collected dead items. */ @@ -3760,14 +4338,224 @@ update_relstats_all_indexes(LVRelState *vacrel) /* * Compute the number of workers for parallel heap vacuum. - * - * Return 0 to disable parallel vacuum. */ int heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested, void *state) { - return 0; + int parallel_workers = 0; + + if (nworkers_requested == 0) + { + LVRelState *vacrel = (LVRelState *) state; + int heap_parallel_threshold; + int heap_pages; + BlockNumber allvisible; + BlockNumber allfrozen; + + /* + * Estimate the number of blocks that we're going to scan during + * lazy_scan_heap(). + */ + visibilitymap_count(rel, &allvisible, &allfrozen); + heap_pages = RelationGetNumberOfBlocks(rel) - + (vacrel->aggressive ? allfrozen : allvisible); + + Assert(heap_pages >= 0); + + /* + * Select the number of workers based on the log of the number of + * pages to scan. Note that the upper limit of the + * min_parallel_table_scan_size GUC is chosen to prevent overflow + * here. + */ + heap_parallel_threshold = Max(min_parallel_table_scan_size, 1); + while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3)) + { + parallel_workers++; + heap_parallel_threshold *= 3; + if (heap_parallel_threshold > INT_MAX / 3) + break; + } + } + else + parallel_workers = nworkers_requested; + + return parallel_workers; +} + +/* + * Estimate shared memory size required for parallel heap vacuum. + */ +void +heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + Size size = 0; + + vacrel->leader = palloc(sizeof(ParallelLVLeader)); + + /* Estimate space for ParallelLVShared */ + size = add_size(size, sizeof(ParallelLVShared)); + vacrel->leader->shared_len = size; + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->shared_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for ParallelBlockTableScanDesc */ + vacrel->leader->pbscan_len = table_block_parallelscan_estimate(rel); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->pbscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for an array of ParallelLVScanWorker */ + vacrel->leader->scanworker_len = mul_size(sizeof(ParallelLVScanWorker), nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scanworker_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Set up shared memory for parallel heap vacuum. + */ +void +heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + ParallelLVShared *shared; + ParallelBlockTableScanDesc pbscan; + ParallelLVScanWorker *scanworkers; + + vacrel->plvstate = palloc0(sizeof(ParallelLVState)); + + /* Initialize ParallelLVShared */ + shared = shm_toc_allocate(pcxt->toc, vacrel->leader->shared_len); + MemSet(shared, 0, vacrel->leader->shared_len); + shared->aggressive = vacrel->aggressive; + shared->skipwithvm = vacrel->skipwithvm; + shared->cutoffs = vacrel->cutoffs; + shared->NewRelfrozenXid = vacrel->scan_data->NewRelfrozenXid; + shared->NewRelminMxid = vacrel->scan_data->NewRelminMxid; + shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SHARED, shared); + vacrel->plvstate->shared = shared; + + /* Initialize ParallelBlockTableScanDesc */ + pbscan = shm_toc_allocate(pcxt->toc, vacrel->leader->pbscan_len); + table_block_parallelscan_initialize(rel, (ParallelTableScanDesc) pbscan); + pbscan->base.phs_syncscan = false; /* always start from the first block */ + shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANDESC, pbscan); + vacrel->plvstate->pbscan = pbscan; + + /* Initialize the array of ParallelLVScanWorker */ + scanworkers = shm_toc_allocate(pcxt->toc, vacrel->leader->scanworker_len); + MemSet(scanworkers, 0, vacrel->leader->scanworker_len); + shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANWORKER, scanworkers); + vacrel->leader->scanworkers = scanworkers; +} + +/* + * Initialize lazy vacuum state with the information retrieved from + * shared memory. + */ +void +heap_parallel_vacuum_initialize_worker(Relation rel, ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt, + void **state_out) +{ + LVRelState *vacrel; + ParallelLVState *plvstate; + ParallelLVShared *shared; + ParallelLVScanWorker *scanworker; + ParallelBlockTableScanDesc pbscan; + + /* Initialize ParallelLVState and prepare the related objects */ + + plvstate = palloc0(sizeof(ParallelLVState)); + + /* Prepare ParallelLVShared */ + shared = (ParallelLVShared *) shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SHARED, false); + plvstate->shared = shared; + + /* Prepare ParallelBlockTableScanWorkerData */ + pbscan = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANDESC, false); + plvstate->pbscan = pbscan; + + /* Prepare ParallelLVScanWorker */ + scanworker = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANWORKER, false); + plvstate->scanworker = &(scanworker[ParallelWorkerNumber]); + plvstate->pbscanwork = &(plvstate->scanworker->pbscanworkdata); + + /* Initialize LVRelState and prepare fields required by lazy scan heap */ + vacrel = palloc0(sizeof(LVRelState)); + vacrel->rel = rel; + vacrel->indrels = parallel_vacuum_get_table_indexes(pvs, + &vacrel->nindexes); + vacrel->bstrategy = parallel_vacuum_get_bstrategy(pvs); + vacrel->pvs = pvs; + vacrel->aggressive = shared->aggressive; + vacrel->skipwithvm = shared->skipwithvm; + vacrel->vistest = GlobalVisTestFor(rel); + vacrel->cutoffs = shared->cutoffs; + vacrel->dead_items = parallel_vacuum_get_dead_items(pvs, + &vacrel->dead_items_info); + vacrel->plvstate = plvstate; + vacrel->scan_data = &(plvstate->scanworker->scandata); + MemSet(vacrel->scan_data, 0, sizeof(LVScanData)); + vacrel->scan_data->NewRelfrozenXid = shared->NewRelfrozenXid; + vacrel->scan_data->NewRelminMxid = shared->NewRelminMxid; + vacrel->scan_data->skippedallvis = false; + vacrel->scan_data->rel_pages = RelationGetNumberOfBlocks(rel); + + /* + * Initialize the scan state if not yet. The chunk of blocks will be + * allocated when to get the scan block for the first time. + */ + if (!vacrel->plvstate->scanworker->scan_inited) + { + vacrel->plvstate->scanworker->scan_inited = true; + table_block_parallelscan_startblock_init(rel, + vacrel->plvstate->pbscanwork, + vacrel->plvstate->pbscan); + pg_atomic_init_u32(&(vacrel->plvstate->scanworker->last_blkno), + InvalidBlockNumber); + } + + *state_out = (void *) vacrel; +} + +/* + * Parallel heap vacuum callback for collecting dead items (i.e., lazy heap scan). + */ +void +heap_parallel_vacuum_collect_dead_items(Relation rel, ParallelVacuumState *pvs, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + ErrorContextCallback errcallback; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + + /* + * Setup error traceback support for ereport() for parallel table vacuum + * workers + */ + vacrel->dbname = get_database_name(MyDatabaseId); + vacrel->relnamespace = get_database_name(RelationGetNamespace(rel)); + vacrel->relname = pstrdup(RelationGetRelationName(rel)); + vacrel->indname = NULL; + vacrel->phase = VACUUM_ERRCB_PHASE_SCAN_HEAP; + errcallback.callback = vacuum_error_callback; + errcallback.arg = &vacrel; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Join the parallel heap vacuum */ + do_lazy_scan_heap(vacrel, false); + + /* Advertise the last processed block number */ + pg_atomic_write_u32(&(vacrel->plvstate->scanworker->last_blkno), vacrel->last_blkno); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; } /* diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index bb0d690aed8..e6742b29b7b 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -501,6 +501,35 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) pfree(pvs); } +/* + * Return the number of parallel workers initialized for parallel table vacuum. + */ +int +parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs) +{ + return pvs->nworkers_for_table; +} + +/* + * Return the array of indexes associated to the given table to be vacuumed. + */ +Relation * +parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes) +{ + *nindexes = pvs->nindexes; + + return pvs->indrels; +} + +/* + * Return the buffer strategy for parallel vacuum. + */ +BufferAccessStrategy +parallel_vacuum_get_bstrategy(ParallelVacuumState *pvs) +{ + return pvs->bstrategy; +} + /* * Returns the dead items space and dead items information. */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 6a1ca5d5ca7..b94d783c31e 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -15,6 +15,7 @@ #define HEAPAM_H #include "access/heapam_xlog.h" +#include "access/parallel.h" #include "access/relation.h" /* for backward compatibility */ #include "access/relscan.h" #include "access/sdir.h" @@ -407,10 +408,20 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer, /* in heap/vacuumlazy.c */ struct VacuumParams; +struct ParallelVacuumState; extern void heap_vacuum_rel(Relation rel, struct VacuumParams *params, BufferAccessStrategy bstrategy); extern int heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested, void *state); +extern void heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, + void *state); +extern void heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, + int nworkers, void *state); +extern void heap_parallel_vacuum_initialize_worker(Relation rel, struct ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt, + void **state_out); +extern void heap_parallel_vacuum_collect_dead_items(Relation rel, struct ParallelVacuumState *pvs, + void *state); /* in heap/heapam_visibility.c */ extern bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index e8f75fc67b1..b0b9547b4b8 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -385,6 +385,9 @@ extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels BufferAccessStrategy bstrategy, void *state); extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); +extern int parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs); +extern Relation *parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes); +extern BufferAccessStrategy parallel_vacuum_get_bstrategy(ParallelVacuumState *pvs); extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p); extern void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs); diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out index 0abcc99989e..f92c3f73c29 100644 --- a/src/test/regress/expected/vacuum.out +++ b/src/test/regress/expected/vacuum.out @@ -160,6 +160,11 @@ UPDATE pvactst SET i = i WHERE i < 1000; VACUUM (PARALLEL 2) pvactst; UPDATE pvactst SET i = i WHERE i < 1000; VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum +-- VACUUM invokes parallel heap vacuum. +SET min_parallel_table_scan_size to 0; +VACUUM (PARALLEL 2, FREEZE) pvactst2; +UPDATE pvactst2 SET i = i WHERE i < 1000; +VACUUM (PARALLEL 1) pvactst2; VACUUM (PARALLEL -1) pvactst; -- error ERROR: parallel workers for vacuum must be between 0 and 1024 LINE 1: VACUUM (PARALLEL -1) pvactst; @@ -185,6 +190,7 @@ VACUUM (PARALLEL 1, FULL FALSE) tmp; -- parallel vacuum disabled for temp tables WARNING: disabling parallel option of vacuum on "tmp" --- cannot vacuum temporary tables in parallel VACUUM (PARALLEL 0, FULL TRUE) tmp; -- can specify parallel disabled (even though that's implied by FULL) RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; DROP TABLE pvactst; DROP TABLE pvactst2; -- INDEX_CLEANUP option diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql index a72bdb5b619..b8abab28ea9 100644 --- a/src/test/regress/sql/vacuum.sql +++ b/src/test/regress/sql/vacuum.sql @@ -129,6 +129,12 @@ VACUUM (PARALLEL 2) pvactst; UPDATE pvactst SET i = i WHERE i < 1000; VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum +-- VACUUM invokes parallel heap vacuum. +SET min_parallel_table_scan_size to 0; +VACUUM (PARALLEL 2, FREEZE) pvactst2; +UPDATE pvactst2 SET i = i WHERE i < 1000; +VACUUM (PARALLEL 1) pvactst2; + VACUUM (PARALLEL -1) pvactst; -- error VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) pvactst; VACUUM (PARALLEL 2, FULL TRUE) pvactst; -- error, cannot use both PARALLEL and FULL @@ -148,6 +154,7 @@ CREATE INDEX tmp_idx1 ON tmp (a); VACUUM (PARALLEL 1, FULL FALSE) tmp; -- parallel vacuum disabled for temp tables VACUUM (PARALLEL 0, FULL TRUE) tmp; -- can specify parallel disabled (even though that's implied by FULL) RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; DROP TABLE pvactst; DROP TABLE pvactst2; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0ad0fd90a38..3cdf038ecd2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1937,6 +1937,10 @@ PLpgSQL_type PLpgSQL_type_type PLpgSQL_var PLpgSQL_variable +ParallelLVLeader +ParallelLVScanWorker +ParallelLVShared +ParallelLVState PLwdatum PLword PLyArrayToOb -- 2.43.5