From 578b693020848e0dfca05b8dac9b7dec5934339b Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 10 Aug 2018 14:38:21 +0900 Subject: [PATCH v8 3/3] Add parallel option to lazy vacuum. --- doc/src/sgml/config.sgml | 9 +- doc/src/sgml/ref/create_table.sgml | 11 +- doc/src/sgml/ref/vacuum.sgml | 16 + src/backend/access/common/reloptions.c | 10 + src/backend/access/transam/parallel.c | 7 + src/backend/catalog/system_views.sql | 23 +- src/backend/commands/Makefile | 2 +- src/backend/commands/vacuum.c | 71 +- src/backend/commands/vacuumlazy.c | 2087 ++++++++++++++++++++++++-------- src/backend/commands/vacuumworker.c | 327 +++++ src/backend/nodes/equalfuncs.c | 7 +- src/backend/optimizer/plan/planner.c | 133 ++ src/backend/parser/gram.y | 90 +- src/backend/postmaster/autovacuum.c | 38 +- src/backend/postmaster/pgstat.c | 25 +- src/backend/tcop/utility.c | 4 +- src/backend/utils/adt/pgstatfuncs.c | 8 +- src/include/catalog/pg_proc.dat | 6 +- src/include/commands/vacuum.h | 11 +- src/include/commands/vacuum_internal.h | 191 +++ src/include/nodes/parsenodes.h | 18 +- src/include/optimizer/planner.h | 1 + src/include/pgstat.h | 9 +- src/include/storage/lwlock.h | 1 + src/include/utils/rel.h | 1 + src/test/regress/expected/rules.out | 20 +- src/test/regress/expected/vacuum.out | 2 + src/test/regress/sql/vacuum.sql | 3 + 28 files changed, 2487 insertions(+), 644 deletions(-) create mode 100644 src/backend/commands/vacuumworker.c create mode 100644 src/include/commands/vacuum_internal.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7554cba..da8c8d3 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2142,10 +2142,11 @@ include_dir 'conf.d' Sets the maximum number of parallel workers that can be - started by a single utility command. Currently, the only - parallel utility command that supports the use of parallel - workers is CREATE INDEX, and only when - building a B-tree index. Parallel workers are taken from the + started by a single utility command. Currently, the parallel + utility commands that supports the use of parallel worker are + CREATE INDEX, and only when + building a B-tree index and VACUUM without + FULL. Parallel workers are taken from the pool of processes established by , limited by . Note that the requested diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml index 10428f8..d4d1106 100644 --- a/doc/src/sgml/ref/create_table.sgml +++ b/doc/src/sgml/ref/create_table.sgml @@ -1423,7 +1423,16 @@ WITH ( MODULUS numeric_literal, REM - + + autovacuum_vacuum_parallel_workers, toast.autovacuum_multixact_freeze_max_age (integer) + + + This sets the number of worker that can be used to vacuum for this table. If not set, the autovacuum performs with no workers (non-parallel). + + + + + autovacuum_freeze_min_age, toast.autovacuum_freeze_min_age (integer) diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml index fd911f5..a742107 100644 --- a/doc/src/sgml/ref/vacuum.sgml +++ b/doc/src/sgml/ref/vacuum.sgml @@ -30,6 +30,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ N DISABLE_PAGE_SKIPPING SKIP_LOCKED @@ -143,6 +144,21 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ N + + + Execute VACUUM in parallel by N + a background workers. Collecting garbage on table is processed + in block-level parallel. For tables with indexes, parallel vacuum assigns each + index to each parallel vacuum worker and all garbages on a index are processed + by particular parallel vacuum worker. The maximum nunber of parallel workers + is . This option can not + use with FULL option. + + + + + DISABLE_PAGE_SKIPPING diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index db84da0..45e2bca 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -348,6 +348,14 @@ static relopt_int intRelOpts[] = }, -1, 0, 1024 }, + { + { + "autovacuum_vacuum_parallel_workers", + "Number of parallel processes that can be used to vacuum for this relation", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, -1, 0, 1024 + }, /* list terminator */ {{NULL}} @@ -1377,6 +1385,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_scale_factor)}, {"autovacuum_analyze_scale_factor", RELOPT_TYPE_REAL, offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, analyze_scale_factor)}, + {"autovacuum_vacuum_parallel_workers", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_parallel_workers)}, {"user_catalog_table", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, user_catalog_table)}, {"parallel_workers", RELOPT_TYPE_INT, diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 8419719..dbb3e5d 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -23,6 +23,7 @@ #include "catalog/index.h" #include "catalog/namespace.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -138,6 +139,9 @@ static const struct }, { "_bt_parallel_build_main", _bt_parallel_build_main + }, + { + "lazy_parallel_vacuum_main", lazy_parallel_vacuum_main } }; @@ -1283,6 +1287,9 @@ ParallelWorkerMain(Datum main_arg) ParallelMasterBackendId = fps->parallel_master_backend_id; on_shmem_exit(ParallelWorkerShutdown, (Datum) 0); + /* Report pid of master process for progress information */ + pgstat_report_leader_pid(fps->parallel_master_pid); + /* * Now we can find and attach to the error queue provided for us. That's * good, because until we do that, any errors that happen here will not be diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 53ddc59..a74b426 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -897,11 +897,24 @@ CREATE VIEW pg_stat_progress_vacuum AS WHEN 5 THEN 'truncating heap' WHEN 6 THEN 'performing final cleanup' END AS phase, - S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned, - S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count, - S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples - FROM pg_stat_get_progress_info('VACUUM') AS S - LEFT JOIN pg_database D ON S.datid = D.oid; + S.param2 AS heap_blks_total, + W.heap_blks_scanned, + W.heap_blks_vacuumed, + W.index_vacuum_count, + S.param6 AS max_dead_tuples, + W.num_dead_tuples + FROM pg_stat_get_progress_info('VACUUM') AS S + LEFT JOIN pg_database D ON S.datid = D.oid + LEFT JOIN + (SELECT leader_pid, + max(param3) AS heap_blks_scanned, + max(param4) AS heap_blks_vacuumed, + max(param5) AS index_vacuum_count, + max(param7) AS num_dead_tuples + FROM pg_stat_get_progress_info('VACUUM') + GROUP BY leader_pid) AS W ON S.pid = W.leader_pid + WHERE + S.pid = S.leader_pid; CREATE VIEW pg_user_mappings AS SELECT diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 4a6c99e..c3623da 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \ policy.o portalcmds.o prepare.o proclang.o publicationcmds.o \ schemacmds.o seclabel.o sequence.o statscmds.o subscriptioncmds.o \ tablecmds.o tablespace.o trigger.o tsearchcmds.o typecmds.o user.o \ - vacuum.o vacuumlazy.o variable.o view.o + vacuum.o vacuumlazy.o vacuumworker.o variable.o view.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index a86963f..0eb38b2 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -38,6 +38,7 @@ #include "commands/vacuum.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "optimizer/planner.h" #include "pgstat.h" #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" @@ -68,13 +69,13 @@ static BufferAccessStrategy vac_strategy; /* non-export function prototypes */ -static List *expand_vacuum_rel(VacuumRelation *vrel, int options); -static List *get_all_vacuum_rels(int options); +static List *expand_vacuum_rel(VacuumRelation *vrel, VacuumOption options); +static List *get_all_vacuum_rels(VacuumOption options); static void vac_truncate_clog(TransactionId frozenXID, MultiXactId minMulti, TransactionId lastSaneFrozenXid, MultiXactId lastSaneMinMulti); -static bool vacuum_rel(Oid relid, RangeVar *relation, int options, +static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumOption options, VacuumParams *params); /* @@ -89,15 +90,15 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel) VacuumParams params; /* sanity checks on options */ - Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE)); - Assert((vacstmt->options & VACOPT_VACUUM) || - !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE))); - Assert(!(vacstmt->options & VACOPT_SKIPTOAST)); + Assert(vacstmt->options.flags & (VACOPT_VACUUM | VACOPT_ANALYZE)); + Assert((vacstmt->options.flags & VACOPT_VACUUM) || + !(vacstmt->options.flags & (VACOPT_FULL | VACOPT_FREEZE))); + Assert(!(vacstmt->options.flags & VACOPT_SKIPTOAST)); /* * Make sure VACOPT_ANALYZE is specified if any column lists are present. */ - if (!(vacstmt->options & VACOPT_ANALYZE)) + if (!(vacstmt->options.flags & VACOPT_ANALYZE)) { ListCell *lc; @@ -116,7 +117,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel) * All freeze ages are zero if the FREEZE option is given; otherwise pass * them as -1 which means to use the default values. */ - if (vacstmt->options & VACOPT_FREEZE) + if (vacstmt->options.flags & VACOPT_FREEZE) { params.freeze_min_age = 0; params.freeze_table_age = 0; @@ -163,7 +164,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel) * memory context that will not disappear at transaction commit. */ void -vacuum(int options, List *relations, VacuumParams *params, +vacuum(VacuumOption options, List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, bool isTopLevel) { static bool in_vacuum = false; @@ -174,7 +175,7 @@ vacuum(int options, List *relations, VacuumParams *params, Assert(params != NULL); - stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; + stmttype = (options.flags & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; /* * We cannot run VACUUM inside a user transaction block; if we were inside @@ -184,7 +185,7 @@ vacuum(int options, List *relations, VacuumParams *params, * * ANALYZE (without VACUUM) can run either way. */ - if (options & VACOPT_VACUUM) + if (options.flags & VACOPT_VACUUM) { PreventInTransactionBlock(isTopLevel, stmttype); in_outer_xact = false; @@ -206,8 +207,8 @@ vacuum(int options, List *relations, VacuumParams *params, /* * Sanity check DISABLE_PAGE_SKIPPING option. */ - if ((options & VACOPT_FULL) != 0 && - (options & VACOPT_DISABLE_PAGE_SKIPPING) != 0) + if ((options.flags & VACOPT_FULL) != 0 && + (options.flags & VACOPT_DISABLE_PAGE_SKIPPING) != 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL"))); @@ -216,7 +217,7 @@ vacuum(int options, List *relations, VacuumParams *params, * Send info about dead objects to the statistics collector, unless we are * in autovacuum --- autovacuum.c does this for itself. */ - if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) + if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) pgstat_vacuum_stat(); /* @@ -281,11 +282,11 @@ vacuum(int options, List *relations, VacuumParams *params, * transaction block, and also in an autovacuum worker, use own * transactions so we can release locks sooner. */ - if (options & VACOPT_VACUUM) + if (options.flags & VACOPT_VACUUM) use_own_xacts = true; else { - Assert(options & VACOPT_ANALYZE); + Assert(options.flags & VACOPT_ANALYZE); if (IsAutoVacuumWorkerProcess()) use_own_xacts = true; else if (in_outer_xact) @@ -335,13 +336,13 @@ vacuum(int options, List *relations, VacuumParams *params, { VacuumRelation *vrel = lfirst_node(VacuumRelation, cur); - if (options & VACOPT_VACUUM) + if (options.flags & VACOPT_VACUUM) { if (!vacuum_rel(vrel->oid, vrel->relation, options, params)) continue; } - if (options & VACOPT_ANALYZE) + if (options.flags & VACOPT_ANALYZE) { /* * If using separate xacts, start one for analyze. Otherwise, @@ -354,7 +355,7 @@ vacuum(int options, List *relations, VacuumParams *params, PushActiveSnapshot(GetTransactionSnapshot()); } - analyze_rel(vrel->oid, vrel->relation, options, params, + analyze_rel(vrel->oid, vrel->relation, options.flags, params, vrel->va_cols, in_outer_xact, vac_strategy); if (use_own_xacts) @@ -390,7 +391,7 @@ vacuum(int options, List *relations, VacuumParams *params, StartTransactionCommand(); } - if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) + if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) { /* * Update pg_database.datfrozenxid, and truncate pg_xact if possible. @@ -603,7 +604,7 @@ vacuum_open_relation(Oid relid, RangeVar *relation, VacuumParams *params, * are made in vac_context. */ static List * -expand_vacuum_rel(VacuumRelation *vrel, int options) +expand_vacuum_rel(VacuumRelation *vrel, VacuumOption options) { List *vacrels = NIL; MemoryContext oldcontext; @@ -635,7 +636,7 @@ expand_vacuum_rel(VacuumRelation *vrel, int options) * below, as well as find_all_inheritors's expectation that the caller * holds some lock on the starting relation. */ - rvr_opts = (options & VACOPT_SKIP_LOCKED) ? RVR_SKIP_LOCKED : 0; + rvr_opts = (options.flags & VACOPT_SKIP_LOCKED) ? RVR_SKIP_LOCKED : 0; relid = RangeVarGetRelidExtended(vrel->relation, AccessShareLock, rvr_opts, @@ -647,7 +648,7 @@ expand_vacuum_rel(VacuumRelation *vrel, int options) */ if (!OidIsValid(relid)) { - if (options & VACOPT_VACUUM) + if (options.flags & VACOPT_VACUUM) ereport(WARNING, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("skipping vacuum of \"%s\" --- lock not available", @@ -673,7 +674,7 @@ expand_vacuum_rel(VacuumRelation *vrel, int options) * Make a returnable VacuumRelation for this rel if user is a proper * owner. */ - if (vacuum_is_relation_owner(relid, classForm, options)) + if (vacuum_is_relation_owner(relid, classForm, options.flags)) { oldcontext = MemoryContextSwitchTo(vac_context); vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation, @@ -742,7 +743,7 @@ expand_vacuum_rel(VacuumRelation *vrel, int options) * the current database. The list is built in vac_context. */ static List * -get_all_vacuum_rels(int options) +get_all_vacuum_rels(VacuumOption options) { List *vacrels = NIL; Relation pgclass; @@ -760,7 +761,7 @@ get_all_vacuum_rels(int options) Oid relid = HeapTupleGetOid(tuple); /* check permissions of relation */ - if (!vacuum_is_relation_owner(relid, classForm, options)) + if (!vacuum_is_relation_owner(relid, classForm, options.flags)) continue; /* @@ -1521,7 +1522,7 @@ vac_truncate_clog(TransactionId frozenXID, * At entry and exit, we are not inside a transaction. */ static bool -vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) +vacuum_rel(Oid relid, RangeVar *relation, VacuumOption options, VacuumParams *params) { LOCKMODE lmode; Relation onerel; @@ -1542,7 +1543,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) */ PushActiveSnapshot(GetTransactionSnapshot()); - if (!(options & VACOPT_FULL)) + if (!(options.flags & VACOPT_FULL)) { /* * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets @@ -1582,10 +1583,10 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either * way, we can be sure that no other backend is vacuuming the same table. */ - lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock; + lmode = (options.flags & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock; /* open the relation and get the appropriate lock on it */ - onerel = vacuum_open_relation(relid, relation, params, options, lmode); + onerel = vacuum_open_relation(relid, relation, params, options.flags, lmode); /* leave if relation could not be opened or locked */ if (!onerel) @@ -1605,7 +1606,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) */ if (!vacuum_is_relation_owner(RelationGetRelid(onerel), onerel->rd_rel, - options & VACOPT_VACUUM)) + options.flags & VACOPT_VACUUM)) { relation_close(onerel, lmode); PopActiveSnapshot(); @@ -1677,7 +1678,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) * us to process it. In VACUUM FULL, though, the toast table is * automatically rebuilt by cluster_rel so we shouldn't recurse to it. */ - if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL)) + if (!(options.flags & VACOPT_SKIPTOAST) && !(options.flags & VACOPT_FULL)) toast_relid = onerel->rd_rel->reltoastrelid; else toast_relid = InvalidOid; @@ -1696,7 +1697,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) /* * Do the actual work --- either FULL or "lazy" vacuum */ - if (options & VACOPT_FULL) + if (options.flags & VACOPT_FULL) { int cluster_options = 0; @@ -1704,7 +1705,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) relation_close(onerel, NoLock); onerel = NULL; - if ((options & VACOPT_VERBOSE) != 0) + if ((options.flags & VACOPT_VERBOSE) != 0) cluster_options |= CLUOPT_VERBOSE; /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */ diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index 8996d36..e4f4183 100644 --- a/src/backend/commands/vacuumlazy.c +++ b/src/backend/commands/vacuumlazy.c @@ -22,6 +22,17 @@ * of index scans performed. So we don't use maintenance_work_mem memory for * the TID array, just enough to hold as many heap tuples as fit on one page. * + * Lazy vacuum can be performed with parallel workers. In parallel lazy vacuum, + * multiple vacuum worker processes get blocks in parallel using parallel heap + * scan and process each of them. If a table with indexes the parallel vacuum + * workers vacuum the heap and indexes in parallel. Also, the dead tuple + * TIDs are shared among all vacuum processes including the leader process. + * Before getting into each state such as scanning heap, vacuum index the + * leader process does some preparation work and asks all vacuum worker process + * to run the same state. If table with no indexes, all vacuum processes just + * vacuum each page as we go. Therefore the dead tuple TIDs are not shared. + * The information required by parallel lazy vacuum such as vacuum statistics, + * parallel heap scan description are also shared among vacuum processes. * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -38,23 +49,32 @@ #include "access/genam.h" #include "access/heapam.h" -#include "access/heapam_xlog.h" #include "access/htup_details.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/transam.h" #include "access/visibilitymap.h" +#include "access/xact.h" #include "access/xlog.h" #include "catalog/storage.h" #include "commands/dbcommands.h" #include "commands/progress.h" #include "commands/vacuum.h" +#include "commands/vacuum_internal.h" #include "miscadmin.h" +#include "optimizer/paths.h" +#include "optimizer/pathnode.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" #include "pgstat.h" #include "portability/instr_time.h" #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/freespace.h" #include "storage/lmgr.h" +#include "storage/ipc.h" +#include "tcop/tcopprot.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_rusage.h" @@ -111,70 +131,148 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) -typedef struct LVRelStats +/* See note in lazy_scan_get_nextpage about forcing scanning of last page */ +#define FORCE_CHECK_PAGE(nblocks, blkno, vacrelstats) \ + ((blkno) == (nblocks) - 1 && should_attempt_truncation((vacrelstats))) + +/* Macros for checking the status of vacuum worker slot */ +#define IsVacuumWorkerStopped(pid) ((pid) == 0) +#define IsVacuumWorkerInvalid(pid) (((pid) == InvalidPid) || ((pid) == 0)) + +/* + * LVTidMap controls the dead tuple TIDs collected during heap scan. The 'shared' + * indicates LVTidMap is shared among vacuum workers. When it's true, it exists + * in shared memory. + */ +struct LVTidMap { - /* hasindex = true means two-pass strategy; false means one-pass */ - bool hasindex; - /* Overall statistics about rel */ - BlockNumber old_rel_pages; /* previous value of pg_class.relpages */ - BlockNumber rel_pages; /* total number of pages */ - BlockNumber scanned_pages; /* number of pages we examined */ - BlockNumber pinskipped_pages; /* # of pages we skipped due to a pin */ - BlockNumber frozenskipped_pages; /* # of frozen pages we skipped */ - BlockNumber tupcount_pages; /* pages whose tuples we counted */ - double old_live_tuples; /* previous value of pg_class.reltuples */ - double new_rel_tuples; /* new estimated total # of tuples */ - double new_live_tuples; /* new estimated total # of live tuples */ - double new_dead_tuples; /* new estimated total # of dead tuples */ - BlockNumber pages_removed; - double tuples_deleted; - BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + int max_items; /* # slots allocated in itemptrs */ + int num_items; /* current # of entries */ + + /* The fields used for vacuum heap */ + int item_idx; + int vacuumed_pages; /* # pages vacuumed in a heap vacuum cycle */ + + /* The fields used for only parallel lazy vacuum */ + bool shared; /* dead tuples is shared among vacuum workers */ + slock_t mutex; + /* List of TIDs of tuples we intend to delete */ /* NB: this list is ordered by TID address */ - int num_dead_tuples; /* current # of entries */ - int max_dead_tuples; /* # slots allocated in array */ - ItemPointer dead_tuples; /* array of ItemPointerData */ - int num_index_scans; - TransactionId latestRemovedXid; - bool lock_waiter_detected; -} LVRelStats; + ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER]; +}; +#define SizeOfLVTidMap offsetof(LVTidMap, itemptrs) + sizeof(ItemPointerData) + +/* + * Struct for index statistics that are used for parallel lazy vacuum. + * In single lazy vacuum, we update the statistics of index after cleanup + * them. However, since any updates are not allowed during parallel mode + * we store all index statistics to LVIndStats and update them after exit + * parallel mode. + */ +typedef struct IndexStats +{ + bool need_update; + BlockNumber num_pages; + BlockNumber num_tuples; +} IndexStats; +struct LVIndStats +{ + /* + * nindexes has the length of stats. nprocessed and mutex are used + * only for parallel lazy vacuum when processing each indexes by + * the workers and the leader. + */ + int nindexes; /* total # of indexes */ + int nprocessed; /* used for vacuum/cleanup index */ + slock_t mutex; /* protect nprocessed */ + IndexStats stats[FLEXIBLE_ARRAY_MEMBER]; +}; +#define SizeOfLVIndStats offsetof(LVIndStats, stats) + sizeof(IndexStats) + +/* Scan description data for lazy vacuum */ +struct LVScanDescData +{ + /* Common information for scanning heap */ + Relation lv_rel; + bool disable_page_skipping; /* enable DISABLE_PAGE_SKIPPING option */ + bool aggressive; /* aggressive vacuum */ + + /* Used for single lazy vacuum, otherwise NULL */ + HeapScanDesc lv_heapscan; + /* Used for parallel lazy vacuum, otherwise invalid values */ + BlockNumber lv_cblock; + BlockNumber lv_next_unskippable_block; + BlockNumber lv_nblocks; +}; + +/* + * Status for leader in parallel lazy vacuum. LVLeader is only present + * in the leader process. + */ +typedef struct LVLeader +{ + /* + * allrelstats points to a shared memory space that stores the all index + * statistics. + */ + LVRelStats *allrelstats; + ParallelContext *pcxt; +} LVLeader; + +/* Global variables for lazy vacuum */ +LVWorkerState *WorkerState = NULL; /* A few variables that don't seem worth passing around as parameters */ static int elevel = -1; - +static BufferAccessStrategy vac_strategy; static TransactionId OldestXmin; static TransactionId FreezeLimit; static MultiXactId MultiXactCutoff; -static BufferAccessStrategy vac_strategy; - - /* non-export function prototypes */ -static void lazy_scan_heap(Relation onerel, int options, - LVRelStats *vacrelstats, Relation *Irel, int nindexes, - bool aggressive); -static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats); +static void lazy_scan_heap(LVState *lvstate); static bool lazy_check_needs_freeze(Buffer buf, bool *hastup); static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats, - LVRelStats *vacrelstats); + LVRelStats *vacrelstats, + LVTidMap *dead_tuples); static void lazy_cleanup_index(Relation indrel, - IndexBulkDeleteResult *stats, - LVRelStats *vacrelstats); -static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, - int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer); + IndexBulkDeleteResult *stats, + LVRelStats *vacrelstats, + IndexStats *indstas); +static int lazy_vacuum_page(LVState *lvstate, Relation onerel, BlockNumber blkno, Buffer buffer, + int tupindex, Buffer *vmbuffer); static bool should_attempt_truncation(LVRelStats *vacrelstats); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); -static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks); -static void lazy_record_dead_tuple(LVRelStats *vacrelstats, +static void lazy_record_dead_tuple(LVTidMap *dead_tuples, ItemPointer itemptr); -static bool lazy_tid_reaped(ItemPointer itemptr, void *state); +static bool lazy_tid_reaped(ItemPointer itemptr, void *dt); static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); +static BlockNumber lazy_get_next_vacuum_page(LVState *lvstate, int *tupindex_p, + int *npages_p); +static bool lazy_dead_tuples_is_full(LVTidMap *tidmap); +static int lazy_get_dead_tuple_count(LVTidMap *dead_tuples); +static BlockNumber lazy_scan_get_nextpage(LVScanDesc lvscan, LVRelStats *vacrelstats, + bool *all_visible_according_to_vm_p, + Buffer *vmbuffer_p); +static long lazy_get_max_dead_tuples(LVRelStats *vacrelstats, BlockNumber relblocks); + +/* function prototypes for parallel vacuum */ +static LVLeader *lazy_vacuum_begin_parallel(LVState *lvstate, int request); +static void lazy_vacuum_end_parallel(LVState *lvstate, LVLeader *lvleader, + bool update_stats); +static void lazy_prepare_next_state(LVState *lvstate, LVLeader *lvleader, + int next_state); +static void lazy_gather_worker_stats(LVLeader *lvleader, LVRelStats *vacrelstats); +static void lazy_wait_for_vacuum_workers_to_be_done(void); +static void lazy_set_workers_state(VacWorkerState new_state); +static void lazy_wait_for_vacuum_workers_attach(ParallelContext *pcxt); /* @@ -187,12 +285,11 @@ static bool heap_page_is_all_visible(Relation rel, Buffer buf, * and locked the relation. */ void -lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, +lazy_vacuum_rel(Relation onerel, VacuumOption options, VacuumParams *params, BufferAccessStrategy bstrategy) { + LVState *lvstate; LVRelStats *vacrelstats; - Relation *Irel; - int nindexes; PGRUsage ru0; TimestampTz starttime = 0; long secs; @@ -218,7 +315,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, starttime = GetCurrentTimestamp(); } - if (options & VACOPT_VERBOSE) + if (options.flags & VACOPT_VERBOSE) elevel = INFO; else elevel = DEBUG2; @@ -246,26 +343,34 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, xidFullScanLimit); aggressive |= MultiXactIdPrecedesOrEquals(onerel->rd_rel->relminmxid, mxactFullScanLimit); - if (options & VACOPT_DISABLE_PAGE_SKIPPING) + if (options.flags & VACOPT_DISABLE_PAGE_SKIPPING) aggressive = true; - vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats)); + /* Create lazy vacuum state and statistics */ + lvstate = (LVState *) palloc0(sizeof(LVState)); + lvstate->options = options; + lvstate->aggressive = aggressive; + lvstate->relid = RelationGetRelid(onerel); + lvstate->relation = onerel; + lvstate->is_wraparound = params->is_wraparound; + lvstate->indstats = NULL; + lvstate->dead_tuples = NULL; + lvstate->lvshared = NULL; + vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats)); vacrelstats->old_rel_pages = onerel->rd_rel->relpages; vacrelstats->old_live_tuples = onerel->rd_rel->reltuples; - vacrelstats->num_index_scans = 0; - vacrelstats->pages_removed = 0; vacrelstats->lock_waiter_detected = false; + lvstate->vacrelstats = vacrelstats; /* Open all indexes of the relation */ - vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel); - vacrelstats->hasindex = (nindexes > 0); + vac_open_indexes(onerel, RowExclusiveLock, &lvstate->nindexes, &lvstate->indRels); + vacrelstats->hasindex = (lvstate->nindexes > 0); - /* Do the vacuuming */ - lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive); + lazy_scan_heap(lvstate); /* Done with indexes */ - vac_close_indexes(nindexes, Irel, NoLock); + vac_close_indexes(lvstate->nindexes, lvstate->indRels, NoLock); /* * Compute whether we actually scanned the all unfrozen pages. If we did, @@ -454,7 +559,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) } /* - * lazy_scan_heap() -- scan an open heap relation + * do_lazy_scan_heap() -- scan an open heap relation * * This routine prunes each page in the heap, which will among other * things truncate dead tuples to dead line pointers, defragment the @@ -469,32 +574,19 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) * dead line pointers need only be retained until all index pointers that * reference them have been killed. */ -static void -lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, - Relation *Irel, int nindexes, bool aggressive) +int +do_lazy_scan_heap(LVState *lvstate, bool *isFinished) { - BlockNumber nblocks, - blkno; + Relation onerel = lvstate->relation; + LVRelStats *vacrelstats = lvstate->vacrelstats; + BlockNumber blkno; HeapTupleData tuple; char *relname; TransactionId relfrozenxid = onerel->rd_rel->relfrozenxid; TransactionId relminmxid = onerel->rd_rel->relminmxid; - BlockNumber empty_pages, - vacuumed_pages, - next_fsm_block_to_vacuum; - double num_tuples, /* total number of nonremovable tuples */ - live_tuples, /* live tuples (reltuples estimate) */ - tups_vacuumed, /* tuples cleaned up by vacuum */ - nkeep, /* dead-but-not-removable tuples */ - nunused; /* unused item pointers */ - IndexBulkDeleteResult **indstats; int i; - PGRUsage ru0; Buffer vmbuffer = InvalidBuffer; - BlockNumber next_unskippable_block; - bool skipping_blocks; - xl_heap_freeze_tuple *frozen; - StringInfoData buf; + bool all_visible_accroding_to_vm; const int initprog_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_TOTAL_HEAP_BLKS, @@ -502,117 +594,17 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, }; int64 initprog_val[3]; - pg_rusage_init(&ru0); - relname = RelationGetRelationName(onerel); - if (aggressive) - ereport(elevel, - (errmsg("aggressively vacuuming \"%s.%s\"", - get_namespace_name(RelationGetNamespace(onerel)), - relname))); - else - ereport(elevel, - (errmsg("vacuuming \"%s.%s\"", - get_namespace_name(RelationGetNamespace(onerel)), - relname))); - - empty_pages = vacuumed_pages = 0; - next_fsm_block_to_vacuum = (BlockNumber) 0; - num_tuples = live_tuples = tups_vacuumed = nkeep = nunused = 0; - - indstats = (IndexBulkDeleteResult **) - palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); - - nblocks = RelationGetNumberOfBlocks(onerel); - vacrelstats->rel_pages = nblocks; - vacrelstats->scanned_pages = 0; - vacrelstats->tupcount_pages = 0; - vacrelstats->nonempty_pages = 0; - vacrelstats->latestRemovedXid = InvalidTransactionId; - - lazy_space_alloc(vacrelstats, nblocks); - frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage); /* Report that we're scanning the heap, advertising total # of blocks */ initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP; - initprog_val[1] = nblocks; - initprog_val[2] = vacrelstats->max_dead_tuples; + initprog_val[1] = lvstate->lvscan->lv_nblocks; + initprog_val[2] = lvstate->dead_tuples->max_items; pgstat_progress_update_multi_param(3, initprog_index, initprog_val); - /* - * Except when aggressive is set, we want to skip pages that are - * all-visible according to the visibility map, but only when we can skip - * at least SKIP_PAGES_THRESHOLD consecutive pages. Since we're reading - * sequentially, the OS should be doing readahead for us, so there's no - * gain in skipping a page now and then; that's likely to disable - * readahead and so be counterproductive. Also, skipping even a single - * page means that we can't update relfrozenxid, so we only want to do it - * if we can skip a goodly number of pages. - * - * When aggressive is set, we can't skip pages just because they are - * all-visible, but we can still skip pages that are all-frozen, since - * such pages do not need freezing and do not affect the value that we can - * safely set for relfrozenxid or relminmxid. - * - * Before entering the main loop, establish the invariant that - * next_unskippable_block is the next block number >= blkno that we can't - * skip based on the visibility map, either all-visible for a regular scan - * or all-frozen for an aggressive scan. We set it to nblocks if there's - * no such block. We also set up the skipping_blocks flag correctly at - * this stage. - * - * Note: The value returned by visibilitymap_get_status could be slightly - * out-of-date, since we make this test before reading the corresponding - * heap page or locking the buffer. This is OK. If we mistakenly think - * that the page is all-visible or all-frozen when in fact the flag's just - * been cleared, we might fail to vacuum the page. It's easy to see that - * skipping a page when aggressive is not set is not a very big deal; we - * might leave some dead tuples lying around, but the next vacuum will - * find them. But even when aggressive *is* set, it's still OK if we miss - * a page whose all-frozen marking has just been cleared. Any new XIDs - * just added to that page are necessarily newer than the GlobalXmin we - * computed, so they'll have no effect on the value to which we can safely - * set relfrozenxid. A similar argument applies for MXIDs and relminmxid. - * - * We will scan the table's last page, at least to the extent of - * determining whether it has tuples or not, even if it should be skipped - * according to the above rules; except when we've already determined that - * it's not worth trying to truncate the table. This avoids having - * lazy_truncate_heap() take access-exclusive lock on the table to attempt - * a truncation that just fails immediately because there are tuples in - * the last page. This is worth avoiding mainly because such a lock must - * be replayed on any hot standby, where it can be disruptive. - */ - next_unskippable_block = 0; - if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) - { - while (next_unskippable_block < nblocks) - { - uint8 vmstatus; - - vmstatus = visibilitymap_get_status(onerel, next_unskippable_block, - &vmbuffer); - if (aggressive) - { - if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0) - break; - } - else - { - if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0) - break; - } - vacuum_delay_point(); - next_unskippable_block++; - } - } - - if (next_unskippable_block >= SKIP_PAGES_THRESHOLD) - skipping_blocks = true; - else - skipping_blocks = false; - - for (blkno = 0; blkno < nblocks; blkno++) + while ((blkno = lazy_scan_get_nextpage(lvstate->lvscan, lvstate->vacrelstats, + &all_visible_accroding_to_vm, &vmbuffer)) + != InvalidBlockNumber) { Buffer buf; Page page; @@ -629,159 +621,9 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, bool has_dead_tuples; TransactionId visibility_cutoff_xid = InvalidTransactionId; - /* see note above about forcing scanning of last page */ -#define FORCE_CHECK_PAGE() \ - (blkno == nblocks - 1 && should_attempt_truncation(vacrelstats)) - - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno); - - if (blkno == next_unskippable_block) - { - /* Time to advance next_unskippable_block */ - next_unskippable_block++; - if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) - { - while (next_unskippable_block < nblocks) - { - uint8 vmskipflags; - - vmskipflags = visibilitymap_get_status(onerel, - next_unskippable_block, - &vmbuffer); - if (aggressive) - { - if ((vmskipflags & VISIBILITYMAP_ALL_FROZEN) == 0) - break; - } - else - { - if ((vmskipflags & VISIBILITYMAP_ALL_VISIBLE) == 0) - break; - } - vacuum_delay_point(); - next_unskippable_block++; - } - } - - /* - * We know we can't skip the current block. But set up - * skipping_blocks to do the right thing at the following blocks. - */ - if (next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD) - skipping_blocks = true; - else - skipping_blocks = false; - - /* - * Normally, the fact that we can't skip this block must mean that - * it's not all-visible. But in an aggressive vacuum we know only - * that it's not all-frozen, so it might still be all-visible. - */ - if (aggressive && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer)) - all_visible_according_to_vm = true; - } - else - { - /* - * The current block is potentially skippable; if we've seen a - * long enough run of skippable blocks to justify skipping it, and - * we're not forced to check it, then go ahead and skip. - * Otherwise, the page must be at least all-visible if not - * all-frozen, so we can set all_visible_according_to_vm = true. - */ - if (skipping_blocks && !FORCE_CHECK_PAGE()) - { - /* - * Tricky, tricky. If this is in aggressive vacuum, the page - * must have been all-frozen at the time we checked whether it - * was skippable, but it might not be any more. We must be - * careful to count it as a skipped all-frozen page in that - * case, or else we'll think we can't update relfrozenxid and - * relminmxid. If it's not an aggressive vacuum, we don't - * know whether it was all-frozen, so we have to recheck; but - * in this case an approximate answer is OK. - */ - if (aggressive || VM_ALL_FROZEN(onerel, blkno, &vmbuffer)) - vacrelstats->frozenskipped_pages++; - continue; - } - all_visible_according_to_vm = true; - } - vacuum_delay_point(); /* - * If we are close to overrunning the available space for dead-tuple - * TIDs, pause and do a cycle of vacuuming before we tackle this page. - */ - if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage && - vacrelstats->num_dead_tuples > 0) - { - const int hvp_index[] = { - PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_NUM_INDEX_VACUUMS - }; - int64 hvp_val[2]; - - /* - * Before beginning index vacuuming, we release any pin we may - * hold on the visibility map page. This isn't necessary for - * correctness, but we do it anyway to avoid holding the pin - * across a lengthy, unrelated operation. - */ - if (BufferIsValid(vmbuffer)) - { - ReleaseBuffer(vmbuffer); - vmbuffer = InvalidBuffer; - } - - /* Log cleanup info before we touch indexes */ - vacuum_log_cleanup_info(onerel, vacrelstats); - - /* Report that we are now vacuuming indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_VACUUM_INDEX); - - /* Remove index entries */ - for (i = 0; i < nindexes; i++) - lazy_vacuum_index(Irel[i], - &indstats[i], - vacrelstats); - - /* - * Report that we are now vacuuming the heap. We also increase - * the number of index scans here; note that by using - * pgstat_progress_update_multi_param we can update both - * parameters atomically. - */ - hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP; - hvp_val[1] = vacrelstats->num_index_scans + 1; - pgstat_progress_update_multi_param(2, hvp_index, hvp_val); - - /* Remove tuples from heap */ - lazy_vacuum_heap(onerel, vacrelstats); - - /* - * Forget the now-vacuumed tuples, and press on, but be careful - * not to reset latestRemovedXid since we want that value to be - * valid. - */ - vacrelstats->num_dead_tuples = 0; - vacrelstats->num_index_scans++; - - /* - * Vacuum the Free Space Map to make newly-freed space visible on - * upper-level FSM pages. Note we have not yet processed blkno. - */ - FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum, blkno); - next_fsm_block_to_vacuum = blkno; - - /* Report that we are once again scanning the heap */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_SCAN_HEAP); - } - - /* * Pin the visibility map page in case we need to mark the page * all-visible. In most cases this will be very cheap, because we'll * already have the correct page pinned anyway. However, it's @@ -804,7 +646,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * it's OK to skip vacuuming pages we get a lock conflict on. They * will be dealt with in some future vacuum. */ - if (!aggressive && !FORCE_CHECK_PAGE()) + if (!lvstate->aggressive && + !FORCE_CHECK_PAGE(vacrelstats->rel_pages, blkno, vacrelstats)) { ReleaseBuffer(buf); vacrelstats->pinskipped_pages++; @@ -837,7 +680,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, vacrelstats->nonempty_pages = blkno + 1; continue; } - if (!aggressive) + if (!lvstate->aggressive) { /* * Here, we must not advance scanned_pages; that would amount @@ -891,7 +734,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, (errmsg("relation \"%s\" page %u is uninitialized --- fixing", relname, blkno))); PageInit(page, BufferGetPageSize(buf), 0); - empty_pages++; + vacrelstats->empty_pages++; } freespace = PageGetHeapFreeSpace(page); MarkBufferDirty(buf); @@ -903,7 +746,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, if (PageIsEmpty(page)) { - empty_pages++; + vacrelstats->empty_pages++; freespace = PageGetHeapFreeSpace(page); /* empty pages are always all-visible and all-frozen */ @@ -945,7 +788,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * * We count tuples removed by the pruning step as removed by VACUUM. */ - tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin, false, + vacrelstats->tuples_deleted += heap_page_prune(onerel, buf, OldestXmin, false, &vacrelstats->latestRemovedXid); /* @@ -956,7 +799,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, has_dead_tuples = false; nfrozen = 0; hastup = false; - prev_dead_count = vacrelstats->num_dead_tuples; + prev_dead_count = lazy_get_dead_tuple_count(lvstate->dead_tuples); maxoff = PageGetMaxOffsetNumber(page); /* @@ -974,7 +817,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, /* Unused items require no processing, but we count 'em */ if (!ItemIdIsUsed(itemid)) { - nunused += 1; + vacrelstats->unused_tuples += 1; continue; } @@ -989,13 +832,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, /* * DEAD item pointers are to be vacuumed normally; but we don't - * count them in tups_vacuumed, else we'd be double-counting (at + * count them in vacrelstats->tuples_deleted, else we'd be double-counting (at * least in the common case where heap_page_prune() just freed up * a non-HOT tuple). */ if (ItemIdIsDead(itemid)) { - lazy_record_dead_tuple(vacrelstats, &(tuple.t_self)); + lazy_record_dead_tuple(lvstate->dead_tuples, &(tuple.t_self)); all_visible = false; continue; } @@ -1047,7 +890,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, */ if (HeapTupleIsHotUpdated(&tuple) || HeapTupleIsHeapOnly(&tuple)) - nkeep += 1; + vacrelstats->new_dead_tuples += 1; else tupgone = true; /* we can delete the tuple */ all_visible = false; @@ -1063,7 +906,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * Count it as live. Not only is this natural, but it's * also what acquire_sample_rows() does. */ - live_tuples += 1; + vacrelstats->live_tuples += 1; /* * Is the tuple definitely visible to all transactions? @@ -1106,7 +949,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * If tuple is recently deleted then we must not remove it * from relation. */ - nkeep += 1; + vacrelstats->new_dead_tuples += 1; all_visible = false; break; case HEAPTUPLE_INSERT_IN_PROGRESS: @@ -1132,7 +975,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * deleting transaction will commit and update the * counters after we report. */ - live_tuples += 1; + vacrelstats->live_tuples += 1; break; default: elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result"); @@ -1141,17 +984,17 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, if (tupgone) { - lazy_record_dead_tuple(vacrelstats, &(tuple.t_self)); + lazy_record_dead_tuple(lvstate->dead_tuples, &(tuple.t_self)); HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data, &vacrelstats->latestRemovedXid); - tups_vacuumed += 1; + vacrelstats->tuples_deleted += 1; has_dead_tuples = true; } else { bool tuple_totally_frozen; - num_tuples += 1; + vacrelstats->num_tuples += 1; hastup = true; /* @@ -1161,9 +1004,9 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, if (heap_prepare_freeze_tuple(tuple.t_data, relfrozenxid, relminmxid, FreezeLimit, MultiXactCutoff, - &frozen[nfrozen], + &(lvstate->frozen[nfrozen]), &tuple_totally_frozen)) - frozen[nfrozen++].offset = offnum; + lvstate->frozen[nfrozen++].offset = offnum; if (!tuple_totally_frozen) all_frozen = false; @@ -1187,10 +1030,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, ItemId itemid; HeapTupleHeader htup; - itemid = PageGetItemId(page, frozen[i].offset); + itemid = PageGetItemId(page, lvstate->frozen[i].offset); htup = (HeapTupleHeader) PageGetItem(page, itemid); - heap_execute_freeze_tuple(htup, &frozen[i]); + heap_execute_freeze_tuple(htup, &(lvstate->frozen[i])); } /* Now WAL-log freezing if necessary */ @@ -1199,7 +1042,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, XLogRecPtr recptr; recptr = log_heap_freeze(onerel, buf, FreezeLimit, - frozen, nfrozen); + lvstate->frozen, nfrozen); PageSetLSN(page, recptr); } @@ -1210,20 +1053,24 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * If there are no indexes then we can vacuum the page right now * instead of doing a second scan. */ - if (nindexes == 0 && - vacrelstats->num_dead_tuples > 0) + if (lvstate->nindexes == 0 && + lazy_get_dead_tuple_count(lvstate->dead_tuples) > 0) { /* Remove tuples from heap */ - lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer); + lazy_vacuum_page(lvstate, onerel, blkno, buf, 0, &vmbuffer); has_dead_tuples = false; /* * Forget the now-vacuumed tuples, and press on, but be careful * not to reset latestRemovedXid since we want that value to be * valid. + * + * If table with no index, since the dead tuple space exists on + * local memory regardless parallel or non-parallel lazy vacuum + * we don't need to acquire the lock to modify it. */ - vacrelstats->num_dead_tuples = 0; - vacuumed_pages++; + lvstate->dead_tuples->num_items = 0; + vacrelstats->vacuumed_pages++; /* * Periodically do incremental FSM vacuuming to make newly-freed @@ -1231,11 +1078,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * the current block, we haven't yet updated its FSM entry (that * happens further down), so passing end == blkno is correct. */ - if (blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) + if (blkno - lvstate->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) { - FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum, + FreeSpaceMapVacuumRange(onerel, lvstate->next_fsm_block_to_vacuum, blkno); - next_fsm_block_to_vacuum = blkno; + lvstate->next_fsm_block_to_vacuum = blkno; } } @@ -1338,127 +1185,31 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * page, so remember its free space as-is. (This path will always be * taken if there are no indexes.) */ - if (vacrelstats->num_dead_tuples == prev_dead_count) + if (lazy_get_dead_tuple_count(lvstate->dead_tuples) == prev_dead_count) RecordPageWithFreeSpace(onerel, blkno, freespace); - } - - /* report that everything is scanned and vacuumed */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno); - - pfree(frozen); - /* save stats for use later */ - vacrelstats->tuples_deleted = tups_vacuumed; - vacrelstats->new_dead_tuples = nkeep; - - /* now we can compute the new value for pg_class.reltuples */ - vacrelstats->new_live_tuples = vac_estimate_reltuples(onerel, - nblocks, - vacrelstats->tupcount_pages, - live_tuples); - - /* also compute total number of surviving heap entries */ - vacrelstats->new_rel_tuples = - vacrelstats->new_live_tuples + vacrelstats->new_dead_tuples; + /* Dead tuple space is full, exit scanning */ + if (lvstate->nindexes > 0 && lazy_dead_tuples_is_full(lvstate->dead_tuples)) + break; + } /* - * Release any remaining pin on visibility map page. + * Before beginning index vacuuming, we release any pin we may + * hold on the visibility map page. This isn't necessary for + * correctness, but we do it anyway to avoid holding the pin + * across a lengthy, unrelated operation. */ if (BufferIsValid(vmbuffer)) - { ReleaseBuffer(vmbuffer); - vmbuffer = InvalidBuffer; - } - - /* If any tuples need to be deleted, perform final vacuum cycle */ - /* XXX put a threshold on min number of tuples here? */ - if (vacrelstats->num_dead_tuples > 0) - { - const int hvp_index[] = { - PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_NUM_INDEX_VACUUMS - }; - int64 hvp_val[2]; - - /* Log cleanup info before we touch indexes */ - vacuum_log_cleanup_info(onerel, vacrelstats); - - /* Report that we are now vacuuming indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_VACUUM_INDEX); - - /* Remove index entries */ - for (i = 0; i < nindexes; i++) - lazy_vacuum_index(Irel[i], - &indstats[i], - vacrelstats); - - /* Report that we are now vacuuming the heap */ - hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP; - hvp_val[1] = vacrelstats->num_index_scans + 1; - pgstat_progress_update_multi_param(2, hvp_index, hvp_val); - - /* Remove tuples from heap */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_VACUUM_HEAP); - lazy_vacuum_heap(onerel, vacrelstats); - vacrelstats->num_index_scans++; - } - - /* - * Vacuum the remainder of the Free Space Map. We must do this whether or - * not there were indexes. - */ - if (blkno > next_fsm_block_to_vacuum) - FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum, blkno); - - /* report all blocks vacuumed; and that we're cleaning up */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno); - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); - - /* Do post-vacuum cleanup and statistics update for each index */ - for (i = 0; i < nindexes; i++) - lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); - /* If no indexes, make log report that lazy_vacuum_heap would've made */ - if (vacuumed_pages) - ereport(elevel, - (errmsg("\"%s\": removed %.0f row versions in %u pages", - RelationGetRelationName(onerel), - tups_vacuumed, vacuumed_pages))); + /* Reached the end of the table */ + if (!BlockNumberIsValid(blkno)) + *isFinished = true; - /* - * This is pretty messy, but we split it up so that we can skip emitting - * individual parts of the message when not applicable. - */ - initStringInfo(&buf); - appendStringInfo(&buf, - _("%.0f dead row versions cannot be removed yet, oldest xmin: %u\n"), - nkeep, OldestXmin); - appendStringInfo(&buf, _("There were %.0f unused item pointers.\n"), - nunused); - appendStringInfo(&buf, ngettext("Skipped %u page due to buffer pins, ", - "Skipped %u pages due to buffer pins, ", - vacrelstats->pinskipped_pages), - vacrelstats->pinskipped_pages); - appendStringInfo(&buf, ngettext("%u frozen page.\n", - "%u frozen pages.\n", - vacrelstats->frozenskipped_pages), - vacrelstats->frozenskipped_pages); - appendStringInfo(&buf, ngettext("%u page is entirely empty.\n", - "%u pages are entirely empty.\n", - empty_pages), - empty_pages); - appendStringInfo(&buf, _("%s."), pg_rusage_show(&ru0)); + /* Remember the just scanned block before leaving */ + lvstate->current_block = blkno; - ereport(elevel, - (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages", - RelationGetRelationName(onerel), - tups_vacuumed, num_tuples, - vacrelstats->scanned_pages, nblocks), - errdetail_internal("%s", buf.data))); - pfree(buf.data); + return lazy_get_dead_tuple_count(lvstate->dead_tuples); } @@ -1473,38 +1224,36 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * the tuples until we've removed their index entries, and we want to * process index entry removal in batches as large as possible. */ -static void -lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) +void +lazy_vacuum_heap(Relation onerel, LVState *lvstate) { - int tupindex; + int tupindex = 0; int npages; PGRUsage ru0; Buffer vmbuffer = InvalidBuffer; + BlockNumber tblk; pg_rusage_init(&ru0); npages = 0; - tupindex = 0; - while (tupindex < vacrelstats->num_dead_tuples) + while ((tblk = lazy_get_next_vacuum_page(lvstate, &tupindex, &npages)) + != InvalidBlockNumber) { - BlockNumber tblk; Buffer buf; Page page; Size freespace; vacuum_delay_point(); - tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]); buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL, vac_strategy); if (!ConditionalLockBufferForCleanup(buf)) { ReleaseBuffer(buf); - ++tupindex; continue; } - tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats, - &vmbuffer); + + lazy_vacuum_page(lvstate, onerel, tblk, buf, tupindex, &vmbuffer); /* Now that we've compacted the page, record its available space */ page = BufferGetPage(buf); @@ -1512,7 +1261,6 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) UnlockReleaseBuffer(buf); RecordPageWithFreeSpace(onerel, tblk, freespace); - npages++; } if (BufferIsValid(vmbuffer)) @@ -1521,11 +1269,13 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) vmbuffer = InvalidBuffer; } - ereport(elevel, - (errmsg("\"%s\": removed %d row versions in %d pages", - RelationGetRelationName(onerel), - tupindex, npages), - errdetail_internal("%s", pg_rusage_show(&ru0)))); + /* Report by only the vacuum leader */ + if (!IsParallelWorker()) + ereport(elevel, + (errmsg("\"%s\": removed %d row versions in %d pages", + RelationGetRelationName(onerel), + tupindex, npages), + errdetail_internal("%s", pg_rusage_show(&ru0)))); } /* @@ -1539,29 +1289,32 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) * The return value is the first tupindex after the tuples of this page. */ static int -lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, - int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer) +lazy_vacuum_page(LVState *lvstate, Relation onerel, BlockNumber blkno, + Buffer buffer, int tupindex, Buffer *vmbuffer) { + LVRelStats *vacrelstats = lvstate->vacrelstats; + LVTidMap *dead_tuples = lvstate->dead_tuples; Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; TransactionId visibility_cutoff_xid; bool all_frozen; + int num_items = lazy_get_dead_tuple_count(dead_tuples); pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno); START_CRIT_SECTION(); - for (; tupindex < vacrelstats->num_dead_tuples; tupindex++) + for (; tupindex < num_items; tupindex++) { BlockNumber tblk; OffsetNumber toff; ItemId itemid; - tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]); + tblk = ItemPointerGetBlockNumber(&dead_tuples->itemptrs[tupindex]); if (tblk != blkno) break; /* past end of tuples for this block */ - toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]); + toff = ItemPointerGetOffsetNumber(&dead_tuples->itemptrs[tupindex]); itemid = PageGetItemId(page, toff); ItemIdSetUnused(itemid); unused[uncnt++] = toff; @@ -1691,7 +1444,8 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup) static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats, - LVRelStats *vacrelstats) + LVRelStats *vacrelstats, + LVTidMap *dead_tuples) { IndexVacuumInfo ivinfo; PGRUsage ru0; @@ -1708,12 +1462,13 @@ lazy_vacuum_index(Relation indrel, /* Do bulk deletion */ *stats = index_bulk_delete(&ivinfo, *stats, - lazy_tid_reaped, (void *) vacrelstats); + lazy_tid_reaped, (void *) dead_tuples); + /* Report by both the leader and workers */ ereport(elevel, (errmsg("scanned index \"%s\" to remove %d row versions", RelationGetRelationName(indrel), - vacrelstats->num_dead_tuples), + lazy_get_dead_tuple_count(dead_tuples)), errdetail_internal("%s", pg_rusage_show(&ru0)))); } @@ -1723,7 +1478,8 @@ lazy_vacuum_index(Relation indrel, static void lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats, - LVRelStats *vacrelstats) + LVRelStats *vacrelstats, + IndexStats *indstats) { IndexVacuumInfo ivinfo; PGRUsage ru0; @@ -1750,18 +1506,29 @@ lazy_cleanup_index(Relation indrel, /* * Now update statistics in pg_class, but only if the index says the count - * is accurate. + * is accurate and in not parallel lazy vacuum. */ if (!stats->estimated_count) - vac_update_relstats(indrel, - stats->num_pages, - stats->num_index_tuples, - 0, - false, - InvalidTransactionId, - InvalidMultiXactId, - false); + { + if (indstats) + { + /* In parallel lazy vacuum, remember them and update later */ + indstats->need_update = true; + indstats->num_pages = stats->num_pages; + indstats->num_tuples = stats->num_index_tuples; + } + else + vac_update_relstats(indrel, + stats->num_pages, + stats->num_index_tuples, + 0, + false, + InvalidTransactionId, + InvalidMultiXactId, + false); + } + /* Report by both the leader and workers */ ereport(elevel, (errmsg("index \"%s\" now contains %.0f row versions in %u pages", RelationGetRelationName(indrel), @@ -2084,57 +1851,51 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats) * * See the comments at the head of this file for rationale. */ -static void -lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks) +void +lazy_space_alloc(LVState *lvstate, BlockNumber relblocks) { long maxtuples; - int vac_work_mem = IsAutoVacuumWorkerProcess() && - autovacuum_work_mem != -1 ? - autovacuum_work_mem : maintenance_work_mem; + LVTidMap *dead_tuples; - if (vacrelstats->hasindex) - { - maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData); - maxtuples = Min(maxtuples, INT_MAX); - maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData)); + Assert(lvstate->dead_tuples == NULL); - /* curious coding here to ensure the multiplication can't overflow */ - if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks) - maxtuples = relblocks * LAZY_ALLOC_TUPLES; + maxtuples = lazy_get_max_dead_tuples(lvstate->vacrelstats, + relblocks); - /* stay sane if small maintenance_work_mem */ - maxtuples = Max(maxtuples, MaxHeapTuplesPerPage); - } - else - { - maxtuples = MaxHeapTuplesPerPage; - } + dead_tuples = (LVTidMap *) palloc(SizeOfLVTidMap + + sizeof(ItemPointerData) * (int) maxtuples); + dead_tuples->max_items = maxtuples; + dead_tuples->num_items = 0; + dead_tuples->shared = false; + dead_tuples->item_idx = 0; - vacrelstats->num_dead_tuples = 0; - vacrelstats->max_dead_tuples = (int) maxtuples; - vacrelstats->dead_tuples = (ItemPointer) - palloc(maxtuples * sizeof(ItemPointerData)); + lvstate->dead_tuples = dead_tuples; } /* * lazy_record_dead_tuple - remember one deletable tuple */ static void -lazy_record_dead_tuple(LVRelStats *vacrelstats, - ItemPointer itemptr) +lazy_record_dead_tuple(LVTidMap *dead_tuples, ItemPointer itemptr) { + if (dead_tuples->shared) + SpinLockAcquire(&(dead_tuples->mutex)); + /* * The array shouldn't overflow under normal behavior, but perhaps it * could if we are given a really small maintenance_work_mem. In that * case, just forget the last few tuples (we'll get 'em next time). */ - if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples) + if (dead_tuples->num_items < dead_tuples->max_items) { - vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr; - vacrelstats->num_dead_tuples++; + dead_tuples->itemptrs[dead_tuples->num_items] = *itemptr; + (dead_tuples->num_items)++; pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES, - vacrelstats->num_dead_tuples); + dead_tuples->num_items); } + + if (dead_tuples->shared) + SpinLockRelease(&(dead_tuples->mutex)); } /* @@ -2145,14 +1906,14 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats, * Assumes dead_tuples array is in sorted order. */ static bool -lazy_tid_reaped(ItemPointer itemptr, void *state) +lazy_tid_reaped(ItemPointer itemptr, void *dt) { - LVRelStats *vacrelstats = (LVRelStats *) state; + LVTidMap *dead_tuples = (LVTidMap *) dt; ItemPointer res; res = (ItemPointer) bsearch((void *) itemptr, - (void *) vacrelstats->dead_tuples, - vacrelstats->num_dead_tuples, + (void *) dead_tuples->itemptrs, + lazy_get_dead_tuple_count(dead_tuples), sizeof(ItemPointerData), vac_cmp_itemptr); @@ -2300,3 +2061,1255 @@ heap_page_is_all_visible(Relation rel, Buffer buf, return all_visible; } + +/* + * Perform single or parallel lazy vacuum. + */ +static void +lazy_scan_heap(LVState *lvstate) +{ + LVRelStats *vacrelstats = lvstate->vacrelstats; + LVLeader *lvleader = NULL; + Relation onerel = lvstate->relation; + bool isFinished = false; + int nworkers = 0; + BlockNumber nblocks; + char *relname; + StringInfoData buf; + PGRUsage ru0; + + /* Reset parallel vacuum worker stats */ + if (WorkerState) + WorkerState = NULL; + + relname = RelationGetRelationName(onerel); + + /* Plan the number of parallel workers */ + if ((lvstate->options.flags & VACOPT_PARALLEL) != 0) + nworkers = plan_lazy_vacuum_workers(RelationGetRelid(lvstate->relation), + lvstate->options.nworkers); + + if (nworkers > 0) + { + /* Set parallel context and attempt to launch parallel workers */ + lvleader = lazy_vacuum_begin_parallel(lvstate, nworkers); + } + else + { + /* Prepare dead tuple space for the single lazy scan heap */ + lazy_space_alloc(lvstate, RelationGetNumberOfBlocks(lvstate->relation)); + } + + pg_rusage_init(&ru0); + + if (lvstate->aggressive) + ereport(elevel, + (errmsg("aggressively vacuuming \"%s.%s\"", + get_namespace_name(RelationGetNamespace(onerel)), + relname))); + else + ereport(elevel, + (errmsg("vacuuming \"%s.%s\"", + get_namespace_name(RelationGetNamespace(onerel)), + relname))); + + nblocks = RelationGetNumberOfBlocks(onerel); + vacrelstats->rel_pages = nblocks; + + vacrelstats->scanned_pages = 0; + vacrelstats->tupcount_pages = 0; + vacrelstats->nonempty_pages = 0; + vacrelstats->empty_pages = 0; + vacrelstats->latestRemovedXid = InvalidTransactionId; + + lvstate->lvscan = lv_beginscan(onerel, lvstate->lvshared, lvstate->aggressive, + (lvstate->options.flags & VACOPT_DISABLE_PAGE_SKIPPING) != 0); + lvstate->indbulkstats = (IndexBulkDeleteResult **) + palloc0(lvstate->nindexes * sizeof(IndexBulkDeleteResult *)); + lvstate->frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage); + + /* Do the actual lazy vacuum */ + while (!isFinished) + { + int ndeadtuples; + const int hvp_index[] = { + PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_NUM_INDEX_VACUUMS + }; + int64 hvp_val[2]; + + /* + * Scan heap until the end of the table or dead tuple space is full if the + * table with indexes. + */ + ndeadtuples = do_lazy_scan_heap(lvstate, &isFinished); + + /* Reached the end of table with no garbage */ + if (isFinished && ndeadtuples == 0) + break; + + /* Log cleanup info before we touch indexes */ + vacuum_log_cleanup_info(onerel, vacrelstats); + + /* Prepare the index vacuum */ + lazy_prepare_next_state(lvstate, lvleader, VACSTATE_VACUUM_INDEX); + + /* Report that we are now vacuuming indexes */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_VACUUM_INDEX); + + /* Remove index entries */ + lazy_vacuum_all_indexes(lvstate); + + /* Prepare the heap vacuum */ + lazy_prepare_next_state(lvstate, lvleader, VACSTATE_VACUUM_HEAP); + + /* + * Report that we are now vacuuming the heap. We also increase + * the number of index scans here; note that by using + * pgstat_progress_update_multi_param we can update both + * parameters atomically. + */ + hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP; + hvp_val[1] = vacrelstats->num_index_scans + 1; + pgstat_progress_update_multi_param(2, hvp_index, hvp_val); + + /* Remove tuples from heap */ + lazy_vacuum_heap(onerel, lvstate); + + /* + * Vacuum the Free Space Map to make newly-freed space visible on + * upper-level FSM pages. Note we have not yet processed blkno. + */ + FreeSpaceMapVacuumRange(onerel, lvstate->next_fsm_block_to_vacuum, + lvstate->current_block); + lvstate->next_fsm_block_to_vacuum = lvstate->current_block; + + vacrelstats->num_index_scans++; + + if (!isFinished) + { + /* + * Prepare for the next heap scan. Forget the now-vacuumed tuples, + * and press on, but be careful not to reset latestRemovedXid since + * we want that value to be valid. + */ + lazy_prepare_next_state(lvstate, lvleader, VACSTATE_SCAN); + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_SCAN_HEAP); + } + else + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, nblocks); + } + + /* report that everything is scanned and vacuumed */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, nblocks); + + /* End heap scan */ + lv_endscan(lvstate->lvscan); + + pfree(lvstate->frozen); + + /* + * Vacuum the remainder of the Free Space Map. We must do this whether or + * not there were indexes. + */ + if (lvstate->current_block > lvstate->next_fsm_block_to_vacuum) + FreeSpaceMapVacuumRange(onerel, lvstate->next_fsm_block_to_vacuum, + lvstate->current_block); + + /* Prepare the cleanup index */ + lazy_prepare_next_state(lvstate, lvleader, VACSTATE_CLEANUP_INDEX); + + /* report all blocks vacuumed; and that we're cleaning up */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); + + /* Do post-vacuum cleanup and statistics update for each index */ + lazy_cleanup_all_indexes(lvstate); + + /* Shut down all vacuum workers */ + lazy_prepare_next_state(lvstate, lvleader, VACSTATE_COMPLETED); + + /* If no indexes, make log report that lazy_vacuum_heap would've made */ + if (vacrelstats->vacuumed_pages) + ereport(elevel, + (errmsg("\"%s\": removed %.0f row versions in %u pages", + RelationGetRelationName(onerel), + vacrelstats->tuples_deleted, vacrelstats->vacuumed_pages))); + + /* Finish parallel lazy vacuum and update index statistics */ + if (nworkers > 0) + lazy_vacuum_end_parallel(lvstate, lvleader, true); + + /* + * This is pretty messy, but we split it up so that we can skip emitting + * individual parts of the message when not applicable. + */ + initStringInfo(&buf); + appendStringInfo(&buf, + _("%.0f dead row versions cannot be removed yet, oldest xmin: %u\n"), + vacrelstats->new_dead_tuples, OldestXmin); + appendStringInfo(&buf, _("There were %.0f unused item pointers.\n"), + vacrelstats->unused_tuples); + appendStringInfo(&buf, ngettext("Skipped %u page due to buffer pins, ", + "Skipped %u pages due to buffer pins, ", + vacrelstats->pinskipped_pages), + vacrelstats->pinskipped_pages); + appendStringInfo(&buf, ngettext("%u frozen page.\n", + "%u frozen pages.\n", + vacrelstats->frozenskipped_pages), + vacrelstats->frozenskipped_pages); + appendStringInfo(&buf, ngettext("%u page is entirely empty.\n", + "%u pages are entirely empty.\n", + vacrelstats->empty_pages), + vacrelstats->empty_pages); + appendStringInfo(&buf, _("%s."), pg_rusage_show(&ru0)); + + ereport(elevel, + (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages", + RelationGetRelationName(lvstate->relation), + vacrelstats->tuples_deleted, vacrelstats->num_tuples, + vacrelstats->scanned_pages, nblocks), + errdetail_internal("%s", buf.data))); + pfree(buf.data); +} + +/* + * Create parallel context, and launch workers for lazy vacuum. + * Also this function constructs the leader's lvstate. + */ +static LVLeader * +lazy_vacuum_begin_parallel(LVState *lvstate, int request) +{ + LVLeader *lvleader = palloc(sizeof(LVLeader)); + ParallelContext *pcxt; + Size estshared, + estvacstats, + estindstats, + estdt, + estworker; + LVRelStats *vacrelstats; + LVShared *lvshared; + int querylen; + int keys = 0; + char *sharedquery; + long maxtuples; + int nparticipants = request + 1; + int i; + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "lazy_parallel_vacuum_main", + request, true); + lvleader->pcxt = pcxt; + + /* Calculate maximum dead tuples we store */ + maxtuples = lazy_get_max_dead_tuples(lvstate->vacrelstats, + RelationGetNumberOfBlocks(lvstate->relation)); + + /* Estimate size for shared state -- VACUUM_KEY_SHARED */ + estshared = MAXALIGN(sizeof(LVShared)); + shm_toc_estimate_chunk(&pcxt->estimator, estshared); + keys++; + + /* Estimate size for vacuum statistics for only workers -- VACUUM_KEY_VACUUM_STATS */ + estvacstats = MAXALIGN(mul_size(sizeof(LVRelStats), request)); + shm_toc_estimate_chunk(&pcxt->estimator, estvacstats); + keys++; + + /* Estimate size for parallel worker status including the leader -- VACUUM_KEY_WORKERS */ + estworker = MAXALIGN(SizeOfLVWorkerState + + mul_size(sizeof(VacuumWorker), request)); + shm_toc_estimate_chunk(&pcxt->estimator, estworker); + keys++; + + /* We have to dead tuple information only when the table has indexes */ + if (lvstate->nindexes > 0) + { + /* Estimate size for index statistics -- VACUUM_KEY_INDEX_STATS */ + estindstats = MAXALIGN(SizeOfLVIndStats + + mul_size(sizeof(IndexStats), lvstate->nindexes)); + shm_toc_estimate_chunk(&pcxt->estimator, estindstats); + keys++; + + /* Estimate size for dead tuple control -- VACUUM_KEY_DEAD_TUPLES */ + estdt = MAXALIGN(SizeOfLVTidMap + + mul_size(sizeof(ItemPointerData), maxtuples)); + shm_toc_estimate_chunk(&pcxt->estimator, estdt); + keys++; + } + else + { + /* Dead tuple are stored into the local memory if no indexes */ + lazy_space_alloc(lvstate, RelationGetNumberOfBlocks(lvstate->relation)); + lvstate->indstats = NULL; + } + + shm_toc_estimate_keys(&pcxt->estimator, keys); + + /* + * Finally, estimate VACUUM_KEY_QUERY_TEXT space. Auto vacuum doesn't have + * debug_query_string. + */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + + InitializeParallelDSM(pcxt); + + /* + * Initialize dynamic shared memory for parallel lazy vacuum. We store + * relevant informations of parallel heap scanning, dead tuple array + * and vacuum statistics for each worker and some parameters for lazy vacuum. + */ + lvshared = shm_toc_allocate(pcxt->toc, estshared); + lvshared->relid = lvstate->relid; + lvshared->aggressive = lvstate->aggressive; + lvshared->options = lvstate->options; + lvshared->oldestXmin = OldestXmin; + lvshared->freezeLimit = FreezeLimit; + lvshared->multiXactCutoff = MultiXactCutoff; + lvshared->elevel = elevel; + lvshared->is_wraparound = lvstate->is_wraparound; + lvshared->cost_delay = VacuumCostDelay; + lvshared->cost_limit = VacuumCostLimit; + lvshared->max_dead_tuples_per_worker = maxtuples / nparticipants; + heap_parallelscan_initialize(&lvshared->heapdesc, lvstate->relation, SnapshotAny); + shm_toc_insert(pcxt->toc, VACUUM_KEY_SHARED, lvshared); + lvstate->lvshared = lvshared; + + /* Prepare vacuum relation statistics */ + vacrelstats = (LVRelStats *) shm_toc_allocate(pcxt->toc, estvacstats); + for (i = 0; i < request; i++) + memcpy(&vacrelstats[i], lvstate->vacrelstats, sizeof(LVRelStats)); + shm_toc_insert(pcxt->toc, VACUUM_KEY_VACUUM_STATS, vacrelstats); + lvleader->allrelstats = vacrelstats; + + /* Prepare worker status */ + WorkerState = (LVWorkerState *) shm_toc_allocate(pcxt->toc, estworker); + ConditionVariableInit(&WorkerState->cv); + LWLockInitialize(&WorkerState->vacuumlock, LWTRANCHE_PARALLEL_VACUUM); + WorkerState->nparticipantvacuum = request; + for (i = 0; i < request; i++) + { + VacuumWorker *worker = &(WorkerState->workers[i]); + + worker->pid = InvalidPid; + worker->state = VACSTATE_INVALID; /* initial state */ + SpinLockInit(&worker->mutex); + } + shm_toc_insert(pcxt->toc, VACUUM_KEY_WORKERS, WorkerState); + + /* Prepare index statistics and deadtuple space if the table has index */ + if (lvstate->nindexes > 0) + { + LVIndStats *indstats; + LVTidMap *dead_tuples; + + /* Prepare Index statistics */ + indstats = shm_toc_allocate(pcxt->toc, estindstats); + indstats->nindexes = lvstate->nindexes; + indstats->nprocessed = 0; + MemSet(indstats->stats, 0, sizeof(IndexStats) * indstats->nindexes); + SpinLockInit(&indstats->mutex); + shm_toc_insert(pcxt->toc, VACUUM_KEY_INDEX_STATS, indstats); + lvstate->indstats = indstats; + + /* Prepare shared dead tuples space */ + dead_tuples = (LVTidMap *) shm_toc_allocate(pcxt->toc, estdt); + dead_tuples->max_items = maxtuples; + dead_tuples->num_items = 0; + dead_tuples->item_idx = 0; + dead_tuples->shared = true; + SpinLockInit(&dead_tuples->mutex); + shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLES, dead_tuples); + lvstate->dead_tuples = dead_tuples; + } + + /* Store query string for workers */ + if (debug_query_string) + { + sharedquery = shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, VACUUM_KEY_QUERY_TEXT, sharedquery); + } + + /* Set master pid to itself */ + pgstat_report_leader_pid(MyProcPid); + + /* Launch workers */ + LaunchParallelWorkers(pcxt); + + if (pcxt->nworkers_launched == 0) + { + lazy_vacuum_end_parallel(lvstate, lvleader, false); + pfree(lvleader); + return NULL; + } + + /* Update the number of workers participating */ + WorkerState->nparticipantvacuum_launched = pcxt->nworkers_launched; + + lazy_wait_for_vacuum_workers_attach(pcxt); + + return lvleader; +} + +/* + * Wait for all workers finish and exit parallel vacuum. If update_stats + * is true, gather vacuum statistics of all parallel workers and + * update index statistics. + */ +static void +lazy_vacuum_end_parallel(LVState *lvstate, LVLeader *lvleader, bool update_stats) +{ + IndexStats *copied_indstats = NULL; + + if (update_stats) + { + /* Copy index stats before destroy parallel context */ + copied_indstats = palloc(sizeof(IndexStats) * lvstate->nindexes); + memcpy(copied_indstats, lvstate->indstats->stats, + sizeof(IndexStats) * lvstate->nindexes); + } + + /* Wait for workers finished vacuum */ + WaitForParallelWorkersToFinish(lvleader->pcxt); + + /* End parallel mode */ + DestroyParallelContext(lvleader->pcxt); + ExitParallelMode(); + + /* + * Since we cannot do any updates in parallel mode we update index statistics + * after exit parallel mode. + */ + if (update_stats) + { + int i; + + for (i = 0; i < lvstate->nindexes; i++) + { + Relation ind = lvstate->indRels[i]; + IndexStats *istat = (IndexStats *) &(copied_indstats[i]); + + /* Update index statsistics */ + if (istat->need_update) + vac_update_relstats(ind, + istat->num_pages, + istat->num_tuples, + 0, + false, + InvalidTransactionId, + InvalidMultiXactId, + false); + } + } + + /* Reset shared fields */ + lvstate->indstats = NULL; + lvstate->dead_tuples = NULL; + WorkerState = NULL; +} + +/* + * lazy_gather_worker_stats() -- Gather vacuum statistics from workers + */ +static void +lazy_gather_worker_stats(LVLeader *lvleader, LVRelStats *vacrelstats) +{ + int i; + + if (!IsInParallelMode()) + return; + + /* Gather worker stats */ + for (i = 0; i < (WorkerState->nparticipantvacuum_launched); i++) + { + LVRelStats *wstats = (LVRelStats *) &lvleader->allrelstats[i]; + + vacrelstats->scanned_pages += wstats->scanned_pages; + vacrelstats->pinskipped_pages += wstats->pinskipped_pages; + vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages; + vacrelstats->tupcount_pages += wstats->tupcount_pages; + vacrelstats->empty_pages += wstats->empty_pages; + vacrelstats->vacuumed_pages += wstats->vacuumed_pages; + vacrelstats->num_tuples += wstats->num_tuples; + vacrelstats->live_tuples += wstats->live_tuples; + vacrelstats->tuples_deleted += wstats->tuples_deleted; + vacrelstats->unused_tuples += wstats->unused_tuples; + vacrelstats->pages_removed += wstats->pages_removed; + vacrelstats->new_dead_tuples += wstats->new_dead_tuples; + vacrelstats->new_live_tuples += wstats->new_live_tuples; + vacrelstats->nonempty_pages += wstats->nonempty_pages; + } +} + +/* + * Return the number of maximum dead tuples can be stored according + * to vac_work_mem. + */ +static long +lazy_get_max_dead_tuples(LVRelStats *vacrelstats, BlockNumber relblocks) +{ + long maxtuples; + int vac_work_mem = IsAutoVacuumWorkerProcess() && + autovacuum_work_mem != -1 ? + autovacuum_work_mem : maintenance_work_mem; + + if (vacrelstats->hasindex) + { + maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData); + maxtuples = Min(maxtuples, INT_MAX); + maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData)); + + /* curious coding here to ensure the multiplication can't overflow */ + if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks) + maxtuples = relblocks * LAZY_ALLOC_TUPLES; + + /* stay sane if small maintenance_work_mem */ + maxtuples = Max(maxtuples, MaxHeapTuplesPerPage); + } + else + maxtuples = MaxHeapTuplesPerPage; + + return maxtuples; +} + +/* + * lazy_prepare_next_state + * + * Before enter the next state prepare the next state. In parallel lazy vacuum, + * we must wait for the all vacuum workers to finish the previous state before + * preparation. Also, after prepared we change the state ot all vacuum workers + * and wake up them. + */ +static void +lazy_prepare_next_state(LVState *lvstate, LVLeader *lvleader, int next_state) +{ + /* Wait for vacuum workers to finish the previous state */ + if (IsInParallelMode()) + lazy_wait_for_vacuum_workers_to_be_done(); + + switch (next_state) + { + /* + * Before enter the next state do the preparation work. Since we can + * guarantee that all vacuum workers don't touch and modify the parallel + * vacuum shared data during preparation, we don't need to take any locks + * related lazy vacuum to modify shared data. + */ + case VACSTATE_SCAN: + { + LVTidMap *dead_tuples = lvstate->dead_tuples; + + /* Before scanning heap, clear the dead tuples */ + MemSet(dead_tuples->itemptrs, 0, + sizeof(ItemPointerData) * dead_tuples->max_items); + dead_tuples->num_items = 0; + dead_tuples->item_idx = 0; + dead_tuples->vacuumed_pages = 0; + break; + } + case VACSTATE_VACUUM_INDEX: + { + LVTidMap *dead_tuples = lvstate->dead_tuples; + + /* Before vacuum indexes, sort the dead tuple array */ + qsort((void *) dead_tuples->itemptrs, + dead_tuples->num_items, + sizeof(ItemPointerData), vac_cmp_itemptr); + + /* Reset the process counter of index vacuum */ + if (lvstate->indstats) + lvstate->indstats->nprocessed = 0; + break; + } + case VACSTATE_CLEANUP_INDEX: + { + LVRelStats *vacrelstats = lvstate->vacrelstats; + + /* Gather vacuum statistics of all vacuum workers */ + lazy_gather_worker_stats(lvleader, lvstate->vacrelstats); + + /* now we can compute the new value for pg_class.reltuples */ + vacrelstats->new_live_tuples = vac_estimate_reltuples(lvstate->relation, + vacrelstats->rel_pages, + vacrelstats->tupcount_pages, + vacrelstats->live_tuples); + + /* also compute total number of surviving heap entries */ + vacrelstats->new_rel_tuples = + vacrelstats->new_live_tuples + vacrelstats->new_dead_tuples; + + /* Reset the process counter of index vacuum */ + if (lvstate->indstats) + lvstate->indstats->nprocessed = 0; + break; + } + case VACSTATE_COMPLETED: + case VACSTATE_VACUUM_HEAP: + /* Before vacuum heap or before exit there is nothing preparation work */ + break; + case VACSTATE_INVALID: + elog(ERROR, "unexpected vacuum state %d", next_state); + break; + default: + elog(ERROR, "invalid vacuum state %d", next_state); + } + + /* Advance state to the VACUUM state and wake up vacuum workers */ + if (IsInParallelMode()) + { + lazy_set_workers_state(next_state); + ConditionVariableBroadcast(&WorkerState->cv); + } +} + +/* + * lazy_dead_tuples_is_full - is the dead tuple space full? + * + * Return true if dead tuple space is full. + */ +static bool +lazy_dead_tuples_is_full(LVTidMap *dead_tuples) +{ + bool isfull; + + if (dead_tuples->shared) + SpinLockAcquire(&(dead_tuples->mutex)); + + isfull = ((dead_tuples->num_items > 0) && + ((dead_tuples->max_items - dead_tuples->num_items) < MaxHeapTuplesPerPage)); + + if (dead_tuples->shared) + SpinLockRelease(&(dead_tuples->mutex)); + + return isfull; +} + +/* + * lazy_get_dead_tuple_count + * + * Get the current number of dead tuples we are having. + */ +static int +lazy_get_dead_tuple_count(LVTidMap *dead_tuples) +{ + int num_items; + + if (dead_tuples->shared) + SpinLockAcquire(&dead_tuples->mutex); + + num_items = dead_tuples->num_items; + + if (dead_tuples->shared) + SpinLockRelease(&dead_tuples->mutex); + + return num_items; +} + +/* + * lazy_get_next_vacuum_page + * + * For vacuum heap pages, return the block number we vacuum next from the + * dead tuple space. Also we advance the index of dead tuple until the + * different next block appears for the next search. + * + * NB: the dead_tuples must be sorted by TID order. + */ +static BlockNumber +lazy_get_next_vacuum_page(LVState *lvstate, int *tupindex_p, int *npages_p) +{ + LVTidMap *dead_tuples = lvstate->dead_tuples; + BlockNumber tblk; + BlockNumber prev_tblk = InvalidBlockNumber; + BlockNumber vacuum_tblk; + + Assert(tupindex_p != NULL && npages_p != NULL); + + if (!dead_tuples->shared) + { + /* Reached the end of dead tuples */ + if (dead_tuples->item_idx >= dead_tuples->num_items) + return InvalidBlockNumber; + + tblk = ItemPointerGetBlockNumber(&(dead_tuples->itemptrs[dead_tuples->item_idx])); + *tupindex_p = dead_tuples->item_idx++; + *npages_p = tblk; + return tblk; + } + + /* + * For parallel vacuum, need locks. + * + * XXX: The number of maximum tuple we need to advance is not a large + * number, up to MaxHeapTuplesPerPage. So we use spin lock here. + */ + if (dead_tuples->shared) + SpinLockAcquire(&(dead_tuples->mutex)); + + if (dead_tuples->item_idx >= dead_tuples->num_items) + { + /* Reached the end of dead tuples array */ + vacuum_tblk = InvalidBlockNumber; + *tupindex_p = dead_tuples->num_items; + *npages_p = dead_tuples->vacuumed_pages; + goto done; + } + + /* Set the block number being vacuumed next */ + vacuum_tblk = ItemPointerGetBlockNumber(&(dead_tuples->itemptrs[dead_tuples->item_idx])); + + /* Set the output arguments */ + *tupindex_p = dead_tuples->item_idx; + *npages_p = ++(dead_tuples->vacuumed_pages); + + /* Advance the index to the beginning of the next different block */ + while (dead_tuples->item_idx < dead_tuples->num_items) + { + tblk = ItemPointerGetBlockNumber(&(dead_tuples->itemptrs[dead_tuples->item_idx])); + + if (BlockNumberIsValid(prev_tblk) && prev_tblk != tblk) + break; + + prev_tblk = tblk; + dead_tuples->item_idx++; + } + +done: + if (dead_tuples->shared) + SpinLockRelease(&(dead_tuples->mutex)); + + return vacuum_tblk; +} + +/* + * Vacuum all indexes. In parallel vacuum, each workers take indexes + * one by one. Also after vacuumed index they mark it as done. This marking + * is necessary to guarantee that all indexes are vacuumed based on + * the current collected dead tuples. The leader process continues to + * vacuum even if any indexes is not vacuumed completely due to failure of + * parallel worker for whatever reason. The mark will be checked before entering + * the next state. + */ +void +lazy_vacuum_all_indexes(LVState *lvstate) +{ + int idx; + int nprocessed = 0; + LVIndStats *sharedstats = lvstate->indstats; + + /* Take the index number we vacuum */ + if (IsInParallelMode()) + { + Assert(sharedstats != NULL); + SpinLockAcquire(&(sharedstats->mutex)); + idx = (sharedstats->nprocessed)++; + SpinLockRelease(&sharedstats->mutex); + } + else + idx = nprocessed++; + + while (idx < lvstate->nindexes) + { + /* Remove index entries */ + lazy_vacuum_index(lvstate->indRels[idx], &lvstate->indbulkstats[idx], + lvstate->vacrelstats, lvstate->dead_tuples); + + /* Take the next index number we vacuum */ + if (IsInParallelMode()) + { + SpinLockAcquire(&(sharedstats->mutex)); + idx = (sharedstats->nprocessed)++; + SpinLockRelease(&sharedstats->mutex); + } + else + idx = nprocessed++; + } +} + +/* + * Cleanup all indexes. + * This function is similar to lazy_vacuum_all_indexes. + */ +void +lazy_cleanup_all_indexes(LVState *lvstate) +{ + int idx; + int nprocessed = 0; + LVIndStats *sharedstats = lvstate->indstats; + + /* Return if no indexes */ + if (lvstate->nindexes == 0) + return; + + /* Get the target index number */ + if (IsInParallelMode()) + { + Assert(sharedstats != NULL); + SpinLockAcquire(&(sharedstats->mutex)); + idx = (sharedstats->nprocessed)++; + SpinLockRelease(&sharedstats->mutex); + } + else + idx = nprocessed++; + + while (idx < lvstate->nindexes) + { + /* + * Do post-vacuum cleanup. Update statistics for each index if not + * in parallel vacuum. + */ + lazy_cleanup_index(lvstate->indRels[idx], + lvstate->indbulkstats[idx], + lvstate->vacrelstats, + (lvstate->indstats) ? &(sharedstats->stats[idx]) : NULL); + + /* Get the next target index number */ + if (IsInParallelMode()) + { + SpinLockAcquire(&(sharedstats->mutex)); + idx = (sharedstats->nprocessed)++; + SpinLockRelease(&sharedstats->mutex); + } + else + idx = nprocessed++; + } +} + +/* + * Set xid limits. This function is for parallel vacuum workers. + */ +void +vacuum_set_xid_limits_for_worker(TransactionId oldestxmin, TransactionId freezelimit, + MultiXactId multixactcutoff) +{ + OldestXmin = oldestxmin; + FreezeLimit = freezelimit; + MultiXactCutoff = multixactcutoff; +} + +/* + * Set error level during lazy vacuum for vacuum workers. + */ +void +vacuum_set_elevel_for_worker(int worker_elevel) +{ + elevel = worker_elevel; +} + +/* + * lazy_set_workers_state - set new state to the all parallel workers + */ +static void +lazy_set_workers_state(VacWorkerState new_state) +{ + int i; + + Assert(!IsParallelWorker()); + + for (i = 0; i < WorkerState->nparticipantvacuum_launched; i++) + { + VacuumWorker *w = &WorkerState->workers[i]; + + SpinLockAcquire(&w->mutex); + if (!IsVacuumWorkerInvalid(w->pid)) + w->state = new_state; + SpinLockRelease(&w->mutex); + } +} + +/* + * Wait for parallel vacuum workers to attach to both the shmem context + * and a worker slot. This is needed for ensuring that the leader can see + * the states of all launched workers when checking. + */ +static void +lazy_wait_for_vacuum_workers_attach(ParallelContext *pcxt) +{ + int i; + + /* Wait for workers to attach to the shmem context */ + WaitForParallelWorkersToAttach(pcxt); + + /* Also, wait for workers to attach to the vacuum worker slot */ + for (i = 0; i < pcxt->nworkers_launched; i++) + { + VacuumWorker *worker = &WorkerState->workers[i]; + int rc; + + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + /* + * If the worker stopped without attaching the vacuum worker + * slot, throw an error. + */ + if (IsVacuumWorkerStopped(worker->pid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parallel vacuum worker failed to initialize"))); + + SpinLockAcquire(&worker->mutex); + pid = worker->pid; + SpinLockRelease(&worker->mutex); + + /* The worker successfully attached */ + if (pid != InvalidPid) + break; + + rc = WaitLatch(MyLatch, + WL_TIMEOUT | WL_POSTMASTER_DEATH, + 10L, WAIT_EVENT_PARALLEL_VACUUM_STARTUP); + + if (rc & WL_POSTMASTER_DEATH) + return; + + ResetLatch(MyLatch); + } + } +} + +/* + * lazy_wait_for_vacuum_workers_to_be_done - all workers are done the previous work? + * + * Wait for all parallel workers to change its state to VACSTATE_WORKER_DONE. + */ +static void +lazy_wait_for_vacuum_workers_to_be_done(void) +{ + while (true) + { + int i; + bool all_finished = true; + + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < WorkerState->nparticipantvacuum_launched; i++) + { + VacuumWorker *w = &WorkerState->workers[i]; + pid_t pid; + int state; + + SpinLockAcquire(&w->mutex); + pid = w->pid; + state = w->state; + SpinLockRelease(&w->mutex); + + /* Skip unused slot */ + if (IsVacuumWorkerInvalid(pid)) + continue; + + if (state != VACSTATE_WORKER_DONE) + { + /* Not finished */ + all_finished = false; + break; + } + } + + /* All vacuum worker done */ + if (all_finished) + break; + + ConditionVariableSleep(&WorkerState->cv, WAIT_EVENT_PARALLEL_VACUUM); + } + + ConditionVariableCancelSleep(); +} + +/* + * lv_beginscan() -- begin lazy vacuum heap scan + * + * In parallel vacuum we use parallel heap scan, so initialize parallel + * heap scan description. + */ +LVScanDesc +lv_beginscan(Relation onerel, LVShared *lvshared, bool aggressive, + bool disable_page_skipping) +{ + LVScanDesc lvscan = (LVScanDesc) palloc(sizeof(LVScanDescData)); + + /* Scan target relation */ + lvscan->lv_rel = onerel; + lvscan->lv_nblocks = RelationGetNumberOfBlocks(onerel); + + /* Set scan options */ + lvscan->aggressive = aggressive; + lvscan->disable_page_skipping = disable_page_skipping; + + /* Initialize other fields */ + lvscan->lv_heapscan = NULL; + lvscan->lv_cblock = 0; + lvscan->lv_next_unskippable_block = 0; + + /* For parallel lazy vacuum */ + if (lvshared) + { + Assert(!IsBootstrapProcessingMode()); + lvscan->lv_heapscan = heap_beginscan_parallel(onerel, &lvshared->heapdesc); + heap_parallelscan_startblock_init(lvscan->lv_heapscan); + } + + return lvscan; +} + +/* + * lv_endscan() -- end lazy vacuum heap scan + */ +void +lv_endscan(LVScanDesc lvscan) +{ + if (lvscan->lv_heapscan != NULL) + heap_endscan(lvscan->lv_heapscan); + pfree(lvscan); +} + +/* + * Return the block number we need to scan next, or InvalidBlockNumber if + * scan finished. + * + * Except when aggressive is set, we want to skip pages that are + * all-visible according to the visibility map, but only when we can skip + * at least SKIP_PAGES_THRESHOLD consecutive pages. Since we're reading + * sequentially, the OS should be doing readahead for us, so there's no + * gain in skipping a page now and then; that's likely to disable + * readahead and so be counterproductive. Also, skipping even a single + * page means that we can't update relfrozenxid, so we only want to do it + * if we can skip a goodly number of pages. + * + * When aggressive is set, we can't skip pages just because they are + * all-visible, but we can still skip pages that are all-frozen, since + * such pages do not need freezing and do not affect the value that we can + * safely set for relfrozenxid or relminmxid. + * + * Before entering the main loop, establish the invariant that + * next_unskippable_block is the next block number >= blkno that we can't + * skip based on the visibility map, either all-visible for a regular scan + * or all-frozen for an aggressive scan. We set it to nblocks if there's + * no such block. We also set up the skipping_blocks flag correctly at + * this stage. + * + * In single lazy scan, before entering the main loop, establish the + * invariant that next_unskippable_block is the next block number >= blkno + * that's not we can't skip based on the visibility map, either all-visible + * for a regular scan or all-frozen for an aggressive scan. We set it to + * nblocks if there's no such block. We also set up the skipping_blocks + * flag correctly at this stage. + * + * In parallel lazy scan, we scan heap pages using parallel heap scan. + * Each worker calls heap_parallelscan_nextpage() in order to exclusively + * get the block number we scan. Unlike single parallel lazy scan, we skip + * all-visible blocks immediately. + * + * Note: The value returned by visibilitymap_get_status could be slightly + * out-of-date, since we make this test before reading the corresponding + * heap page or locking the buffer. This is OK. If we mistakenly think + * that the page is all-visible or all-frozen when in fact the flag's just + * been cleared, we might fail to vacuum the page. It's easy to see that + * skipping a page when aggressive is not set is not a very big deal; we + * might leave some dead tuples lying around, but the next vacuum will + * find them. But even when aggressive *is* set, it's still OK if we miss + * a page whose all-frozen marking has just been cleared. Any new XIDs + * just added to that page are necessarily newer than the GlobalXmin we + * Computed, so they'll have no effect on the value to which we can safely + * set relfrozenxid. A similar argument applies for MXIDs and relminmxid. + * + * We will scan the table's last page, at least to the extent of + * determining whether it has tuples or not, even if it should be skipped + * according to the above rules; except when we've already determined that + * it's not worth trying to truncate the table. This avoids having + * lazy_truncate_heap() take access-exclusive lock on the table to attempt + * a truncation that just fails immediately because there are tuples in + * the last page. This is worth avoiding mainly because such a lock must + * be replayed on any hot standby, where it can be disruptive. + */ +static BlockNumber +lazy_scan_get_nextpage(LVScanDesc lvscan, LVRelStats *vacrelstats, + bool *all_visible_according_to_vm_p, Buffer *vmbuffer_p) +{ + BlockNumber blkno; + + /* Parallel lazy scan mode */ + if (lvscan->lv_heapscan) + { + /* + * In parallel lazy vacuum since we cannot know how many consecutive + * all-visible pages exits on table we skip to scan the all-visible + * page immediately. + */ + while ((blkno = heap_parallelscan_nextpage(lvscan->lv_heapscan)) != InvalidBlockNumber) + { + *all_visible_according_to_vm_p = false; + vacuum_delay_point(); + + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno); + + /* Consider to skip scanning the page according visibility map */ + if (!lvscan->disable_page_skipping && + !FORCE_CHECK_PAGE(vacrelstats->rel_pages, blkno, vacrelstats)) + { + uint8 vmstatus; + + vmstatus = visibilitymap_get_status(lvscan->lv_rel, blkno, vmbuffer_p); + + if (lvscan->aggressive) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0) + { + vacrelstats->frozenskipped_pages++; + continue; + } + else if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0) + *all_visible_according_to_vm_p = true; + } + else + { + if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0) + vacrelstats->frozenskipped_pages++; + continue; + } + } + } + + /* Okay, need to scan current blkno, break */ + break; + } + } + else /* Single lazy scan mode */ + { + bool skipping_blocks = false; + + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, lvscan->lv_cblock); + + /* Initialize lv_nextunskippable_page if needed */ + if (lvscan->lv_cblock == 0 && !lvscan->disable_page_skipping) + { + while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks) + { + uint8 vmstatus; + + vmstatus = visibilitymap_get_status(lvscan->lv_rel, + lvscan->lv_next_unskippable_block, + vmbuffer_p); + if (lvscan->aggressive) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0) + break; + } + else + { + if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0) + break; + } + vacuum_delay_point(); + lvscan->lv_next_unskippable_block++; + } + + if (lvscan->lv_next_unskippable_block >= SKIP_PAGES_THRESHOLD) + skipping_blocks = true; + else + skipping_blocks = false; + } + + /* Decide the block number we need to scan */ + for (blkno = lvscan->lv_cblock; blkno < lvscan->lv_nblocks; blkno++) + { + if (blkno == lvscan->lv_next_unskippable_block) + { + /* Time to advance next_unskippable_block */ + lvscan->lv_next_unskippable_block++; + if (!lvscan->disable_page_skipping) + { + while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks) + { + uint8 vmstatus; + + vmstatus = visibilitymap_get_status(lvscan->lv_rel, + lvscan->lv_next_unskippable_block, + vmbuffer_p); + if (lvscan->aggressive) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0) + break; + } + else + { + if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0) + break; + } + vacuum_delay_point(); + lvscan->lv_next_unskippable_block++; + } + } + + /* + * We know we can't skip the current block. But set up + * skipping_all_visible_blocks to do the right thing at the + * following blocks. + */ + if (lvscan->lv_next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD) + skipping_blocks = true; + else + skipping_blocks = false; + + /* + * Normally, the fact that we can't skip this block must mean that + * it's not all-visible. But in an aggressive vacuum we know only + * that it's not all-frozen, so it might still be all-visible. + */ + if (lvscan->aggressive && VM_ALL_VISIBLE(lvscan->lv_rel, blkno, vmbuffer_p)) + *all_visible_according_to_vm_p = true; + + /* Found out that next unskippable block number */ + break; + } + else + { + /* + * The current block is potentially skippable; if we've seen a + * long enough run of skippable blocks to justify skipping it, and + * we're not forced to check it, then go ahead and skip. + * Otherwise, the page must be at least all-visible if not + * all-frozen, so we can set *all_visible_according_to_vm_p = true. + */ + if (skipping_blocks && + !FORCE_CHECK_PAGE(vacrelstats->rel_pages, blkno, vacrelstats)) + { + /* + * Tricky, tricky. If this is in aggressive vacuum, the page + * must have been all-frozen at the time we checked whether it + * was skippable, but it might not be any more. We must be + * careful to count it as a skipped all-frozen page in that + * case, or else we'll think we can't update relfrozenxid and + * relminmxid. If it's not an aggressive vacuum, we don't + * know whether it was all-frozen, so we have to recheck; but + * in this case an approximate answer is OK. + */ + if (lvscan->aggressive || VM_ALL_FROZEN(lvscan->lv_rel, blkno, vmbuffer_p)) + vacrelstats->frozenskipped_pages++; + continue; + } + + *all_visible_according_to_vm_p = true; + + /* We need to scan current blkno, break */ + break; + } + } /* for */ + + /* Advance the current block number for the next scan */ + lvscan->lv_cblock = blkno + 1; + } + + return (blkno == lvscan->lv_nblocks) ? InvalidBlockNumber : blkno; +} diff --git a/src/backend/commands/vacuumworker.c b/src/backend/commands/vacuumworker.c new file mode 100644 index 0000000..ccdc7b1 --- /dev/null +++ b/src/backend/commands/vacuumworker.c @@ -0,0 +1,327 @@ +/*------------------------------------------------------------------------- + * + * vacuumworker.c + * Parallel lazy vacuum worker. + * + * The parallel vacuum worker process is a process that helps lazy vacuums. + * It continues to wait for its state to be changed by the vacuum leader process. + * After finished any state it sets state as done. Normal termination is also + * by the leader process. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/vacuumworker.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/parallel.h" +#include "access/xact.h" +#include "commands/vacuum.h" +#include "commands/vacuum_internal.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "storage/condition_variable.h" +#include "storage/ipc.h" +#include "tcop/tcopprot.h" + +static VacuumWorker *MyVacuumWorker = NULL; + +/* Parallel vacuum worker function prototypes */ +static void lvworker_set_state(VacWorkerState new_state); +static VacWorkerState lvworker_get_state(void); +static void lvworker_mainloop(LVState *lvstate); +static void lvworker_wait_for_next_work(void); +static void lvworker_attach(void); +static void lvworker_detach(void); +static void lvworker_onexit(int code, Datum arg); + +/* + * Perform work within a launched parallel process. + */ +void +lazy_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) +{ + LVState *lvstate = (LVState *) palloc(sizeof(LVState)); + LVShared *lvshared; + LVRelStats *vacrelstats; + char *sharedquery; + + ereport(DEBUG1, + (errmsg("starting parallel lazy vacuum worker"))); + + /* Register the callback function */ + before_shmem_exit(lvworker_onexit, (Datum) 0); + + /* Look up worker state and attach to the vacuum worker slot */ + WorkerState = (LVWorkerState *) shm_toc_lookup(toc, VACUUM_KEY_WORKERS, false); + lvworker_attach(); + + /* Set shared state */ + lvshared = (LVShared *) shm_toc_lookup(toc, VACUUM_KEY_SHARED, false); + + /* + * Set debug_query_string. The debug_query_string can not be found in + * autovacuum case. + */ + sharedquery = shm_toc_lookup(toc, VACUUM_KEY_QUERY_TEXT, true); + if (sharedquery) + { + debug_query_string = sharedquery; + pgstat_report_activity(STATE_RUNNING, debug_query_string); + } + else + pgstat_report_activity(STATE_RUNNING, lvshared->is_wraparound ? + "autovacuum: parallel worker (to prevent wraparound)" : + "autovacuum: parallel worker"); + + /* Set individual vacuum statistics */ + vacrelstats = (LVRelStats *) shm_toc_lookup(toc, VACUUM_KEY_VACUUM_STATS, false); + + /* Set lazy vacuum state */ + lvstate->relid = lvshared->relid; + lvstate->aggressive = lvshared->aggressive; + lvstate->options = lvshared->options; + lvstate->vacrelstats = vacrelstats + ParallelWorkerNumber; + lvstate->relation = relation_open(lvstate->relid, ShareUpdateExclusiveLock); + vac_open_indexes(lvstate->relation, RowExclusiveLock, &lvstate->nindexes, + &lvstate->indRels); + lvstate->lvshared = lvshared; + lvstate->indstats = NULL; + lvstate->dead_tuples = NULL; + + /* + * Set the PROC_IN_VACUUM flag, which lets other concurrent VACUUMs know that + * they can ignore this one while determining their OldestXmin. Also set the + * PROC_VACUUM_FOR_WRAPAROUND flag. Please see the comment in vacuum_rel for + * details. + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyPgXact->vacuumFlags |= PROC_IN_VACUUM; + if (lvshared->is_wraparound) + MyPgXact->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND; + LWLockRelease(ProcArrayLock); + + /* Set the space for both index statistics and dead tuples if table with index */ + if (lvstate->nindexes > 0) + { + LVTidMap *dead_tuples; + LVIndStats *indstats; + + /* Attach shared dead tuples */ + dead_tuples = (LVTidMap *) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLES, false); + lvstate->dead_tuples = dead_tuples; + + /* Attach Shared index stats */ + indstats = (LVIndStats *) shm_toc_lookup(toc, VACUUM_KEY_INDEX_STATS, false); + lvstate->indstats = indstats; + + /* Prepare for index bulkdelete */ + lvstate->indbulkstats = (IndexBulkDeleteResult **) + palloc0(lvstate->nindexes * sizeof(IndexBulkDeleteResult *)); + } + else + { + /* Dead tuple are stored into the local memory if no indexes */ + lazy_space_alloc(lvstate, RelationGetNumberOfBlocks(lvstate->relation)); + lvstate->indstats = NULL; + } + + /* Restore vacuum xid limits and elevel */ + vacuum_set_xid_limits_for_worker(lvshared->oldestXmin, lvshared->freezeLimit, + lvshared->multiXactCutoff); + vacuum_set_elevel_for_worker(lvshared->elevel); + + pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, + lvshared->relid); + + /* Restore vacuum delay */ + VacuumCostDelay = lvshared->cost_delay; + VacuumCostLimit = lvshared->cost_limit; + VacuumCostActive = (VacuumCostDelay > 0); + VacuumCostBalance = 0; + VacuumPageHit = 0; + VacuumPageMiss = 0; + VacuumPageDirty = 0; + + /* Begin lazy heap scan */ + lvstate->lvscan = lv_beginscan(lvstate->relation, lvstate->lvshared, lvshared->aggressive, + (lvstate->options.flags & VACOPT_DISABLE_PAGE_SKIPPING) != 0); + + /* Prepare other fields */ + lvstate->frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage); + + /* Enter the main loop */ + lvworker_mainloop(lvstate); + + /* The lazy vacuum has done, do the post-processing */ + lv_endscan(lvstate->lvscan); + pgstat_progress_end_command(); + lvworker_detach(); + cancel_before_shmem_exit(lvworker_onexit, (Datum) 0); + + vac_close_indexes(lvstate->nindexes, lvstate->indRels, RowExclusiveLock); + heap_close(lvstate->relation, ShareUpdateExclusiveLock); +} + +/* + * Main loop for vacuum workers. + */ +static void +lvworker_mainloop(LVState *lvstate) +{ + bool exit = false; + + /* + * Loop until the leader commands it to exit. + */ + while (!exit) + { + VacWorkerState mystate; + + /* Wait for the status to be changed by the leader */ + lvworker_wait_for_next_work(); + + /* Get my new state */ + mystate = lvworker_get_state(); + + /* Dispatch the work according to the state */ + switch (mystate) + { + case VACSTATE_SCAN: + { + bool dummy; + do_lazy_scan_heap(lvstate, &dummy); + break; + } + case VACSTATE_VACUUM_INDEX: + { + lazy_vacuum_all_indexes(lvstate); + break; + } + case VACSTATE_VACUUM_HEAP: + { + lazy_vacuum_heap(lvstate->relation, lvstate); + break; + } + case VACSTATE_CLEANUP_INDEX: + { + lazy_cleanup_all_indexes(lvstate); + break; + } + case VACSTATE_COMPLETED: + { + /* The leader asked us to exit */ + exit = true; + break; + } + case VACSTATE_INVALID: + case VACSTATE_WORKER_DONE: + { + elog(ERROR, "unexpected vacuum state %d", mystate); + break; + } + } + + /* Set my state as done after finished */ + lvworker_set_state(VACSTATE_WORKER_DONE); + } +} + +/* + * Wait for the my state to be changed by the vacuum leader. + */ +static void +lvworker_wait_for_next_work(void) +{ + VacWorkerState mystate; + + for (;;) + { + mystate = lvworker_get_state(); + + /* Got the next valid state by the vacuum leader */ + if (mystate != VACSTATE_WORKER_DONE && mystate != VACSTATE_INVALID) + break; + + /* Sleep until the next notification */ + ConditionVariableSleep(&WorkerState->cv, WAIT_EVENT_PARALLEL_VACUUM); + } + + ConditionVariableCancelSleep(); +} + +/* + * lvworker_get_state - get my current state + */ +static VacWorkerState +lvworker_get_state(void) +{ + VacWorkerState state; + + SpinLockAcquire(&MyVacuumWorker->mutex); + state = MyVacuumWorker->state; + SpinLockRelease(&MyVacuumWorker->mutex); + + return state; +} + +/* + * lvworker_set_state - set new state to my state + */ +static void +lvworker_set_state(VacWorkerState new_state) +{ + SpinLockAcquire(&MyVacuumWorker->mutex); + MyVacuumWorker->state = new_state; + SpinLockRelease(&MyVacuumWorker->mutex); + + ConditionVariableBroadcast(&WorkerState->cv); +} + +/* + * Clean up function for parallel vacuum worker + */ +static void +lvworker_onexit(int code, Datum arg) +{ + if (IsInParallelMode() && MyVacuumWorker) + lvworker_detach(); +} + +/* + * Detach the worker and cleanup worker information. + */ +static void +lvworker_detach(void) +{ + SpinLockAcquire(&MyVacuumWorker->mutex); + MyVacuumWorker->state = VACSTATE_INVALID; + MyVacuumWorker->pid = 0; /* the worker is dead */ + SpinLockRelease(&MyVacuumWorker->mutex); + + MyVacuumWorker = NULL; +} + +/* + * Attach to a worker slot according to its ParallelWorkerNumber. + */ +static void +lvworker_attach(void) +{ + VacuumWorker *vworker; + + LWLockAcquire(&WorkerState->vacuumlock, LW_EXCLUSIVE); + vworker = &WorkerState->workers[ParallelWorkerNumber]; + vworker->pid = MyProcPid; + vworker->state = VACSTATE_SCAN; /* first state */ + LWLockRelease(&WorkerState->vacuumlock); + + MyVacuumWorker = vworker; +} diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 3bb91c9..e0e9d6d 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1667,7 +1667,12 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b) static bool _equalVacuumStmt(const VacuumStmt *a, const VacuumStmt *b) { - COMPARE_SCALAR_FIELD(options); + if (a->options.flags != b->options.flags) + return false; + + if (a->options.nworkers != b->options.nworkers) + return false; + COMPARE_NODE_FIELD(rels); return true; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index c729a99..c33af66 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -52,6 +52,7 @@ #include "parser/parsetree.h" #include "parser/parse_agg.h" #include "rewrite/rewriteManip.h" +#include "storage/bufmgr.h" #include "storage/dsm_impl.h" #include "utils/rel.h" #include "utils/selfuncs.h" @@ -6060,6 +6061,138 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid) } /* + * plan_lazy_vacuum_workers_index_workers + * Use the planner to decide how many parallel worker processes + * VACUUM and autovacuum should request for use + * + * tableOid is the table begin vacuumed which must not be non-tables or + * special system tables. + * nworkers_requested is the number of workers requested in VACUUM option + * by user. it's 0 if not requested. + * + * Return value is the number of parallel worker processes to request. It + * may be unsafe to proceed if this is 0. Note that this does not include the + * leader participating as a worker (value is always a number of parallel + * worker processes). + * + * Note: caller had better already hold some type of lock on the table and + * index. + */ +int +plan_lazy_vacuum_workers(Oid tableOid, int nworkers_requested) +{ + int parallel_workers; + PlannerInfo *root; + Query *query; + PlannerGlobal *glob; + RangeTblEntry *rte; + RelOptInfo *rel; + Relation heap; + BlockNumber nblocks; + + /* Return immediately when parallelism disabled */ + if (max_parallel_maintenance_workers == 0) + return 0; + + /* Set up largely-dummy planner state */ + query = makeNode(Query); + query->commandType = CMD_SELECT; + + glob = makeNode(PlannerGlobal); + + root = makeNode(PlannerInfo); + root->parse = query; + root->glob = glob; + root->query_level = 1; + root->planner_cxt = CurrentMemoryContext; + root->wt_param_id = -1; + + /* + * Build a minimal RTE. + * + * Set the target's table to be an inheritance parent. This is a kludge + * that prevents problems within get_relation_info(), which does not + * expect that any IndexOptInfo is currently undergoing REINDEX. + */ + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = tableOid; + rte->relkind = RELKIND_RELATION; /* Don't be too picky. */ + rte->lateral = false; + rte->inh = true; + rte->inFromCl = true; + query->rtable = list_make1(rte); + + /* Set up RTE/RelOptInfo arrays */ + setup_simple_rel_arrays(root); + + /* Build RelOptInfo */ + rel = build_simple_rel(root, 1, NULL); + + heap = heap_open(tableOid, NoLock); + nblocks = RelationGetNumberOfBlocks(heap); + + /* + * If the number of workers is requested accept it (though still cap + * at max_parallel_maitenance_workers). + */ + if (nworkers_requested > 0) + { + parallel_workers = Min(nworkers_requested, + max_parallel_maintenance_workers); + + if (parallel_workers != nworkers_requested) + ereport(NOTICE, + (errmsg("%d vacuum parallel worker requested but cappped by max_parallel_maintenance_workers", + nworkers_requested), + errhint("Increase max_parallel_workers"))); + + goto done; + } + + /* + * If paralell_workers storage parameter is set for the table, accept that + * as the number of parallel worker process to launch (though still cap + * at max_parallel_maintenance_workers). Note that we deliberately do not + * consider any other factor when parallel_workers is set. (e.g., memory + * use by workers.) + */ + if (rel->rel_parallel_workers != -1) + { + parallel_workers = Min(rel->rel_parallel_workers, + max_parallel_maintenance_workers); + goto done; + } + + /* + * Determine number of workers to scan the heap relation using generic + * model. + */ + parallel_workers = compute_parallel_worker(rel, + nblocks, + -1, + max_parallel_maintenance_workers); + /* + * Cap workers based on available maintenance_work_mem as needed. + * + * Note that each tuplesort participant receives an even share of the + * total maintenance_work_mem budget. Aim to leave participants + * (including the leader as a participant) with no less than 32MB of + * memory. This leaves cases where maintenance_work_mem is set to 64MB + * immediately past the threshold of being capable of launching a single + * parallel worker to sort. + */ + while (parallel_workers > 0 && + maintenance_work_mem / (parallel_workers + 1) < 32768L) + parallel_workers--; + +done: + heap_close(heap, NoLock); + + return parallel_workers; +} + +/* * plan_create_index_workers * Use the planner to decide how many parallel worker processes * CREATE INDEX should request for use diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 6d23bfb..47fff29 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -187,6 +187,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType, bool *deferrable, bool *initdeferred, bool *not_valid, bool *no_inherit, core_yyscan_t yyscanner); static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); +static VacuumOption *makeVacOpt(VacuumOptionFlag flag, int nworkers); %} @@ -237,6 +238,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); struct ImportQual *importqual; InsertStmt *istmt; VariableSetStmt *vsetstmt; + VacuumOption *vacopt; PartitionElem *partelem; PartitionSpec *partspec; PartitionBoundSpec *partboundspec; @@ -305,8 +307,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); create_extension_opt_item alter_extension_opt_item %type opt_lock lock_type cast_context -%type vacuum_option_list vacuum_option_elem - analyze_option_list analyze_option_elem +%type vacuum_option_list vacuum_option_elem +%type analyze_option_list analyze_option_elem %type opt_or_replace opt_grant_grant_option opt_grant_admin_option opt_nowait opt_if_exists opt_with_data @@ -10503,22 +10505,29 @@ cluster_index_specification: VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_analyze opt_vacuum_relation_list { VacuumStmt *n = makeNode(VacuumStmt); - n->options = VACOPT_VACUUM; + VacuumOption *vacopt = makeVacOpt(VACOPT_VACUUM, 0); if ($2) - n->options |= VACOPT_FULL; + vacopt->flags |= VACOPT_FULL; if ($3) - n->options |= VACOPT_FREEZE; + vacopt->flags |= VACOPT_FREEZE; if ($4) - n->options |= VACOPT_VERBOSE; + vacopt->flags |= VACOPT_VERBOSE; if ($5) - n->options |= VACOPT_ANALYZE; + vacopt->flags |= VACOPT_ANALYZE; + + n->options.flags = vacopt->flags; + n->options.nworkers = 0; n->rels = $6; $$ = (Node *)n; + pfree(vacopt); } | VACUUM '(' vacuum_option_list ')' opt_vacuum_relation_list { - VacuumStmt *n = makeNode(VacuumStmt); - n->options = VACOPT_VACUUM | $3; + VacuumStmt *n = makeNode(VacuumStmt); + VacuumOption *vacopt = $3; + + n->options.flags = vacopt->flags | VACOPT_VACUUM; + n->options.nworkers = vacopt->nworkers; n->rels = $5; $$ = (Node *) n; } @@ -10526,20 +10535,44 @@ VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_analyze opt_vacuum_relati vacuum_option_list: vacuum_option_elem { $$ = $1; } - | vacuum_option_list ',' vacuum_option_elem { $$ = $1 | $3; } + | vacuum_option_list ',' vacuum_option_elem + { + VacuumOption *vacopt1 = (VacuumOption *) $1; + VacuumOption *vacopt2 = (VacuumOption *) $3; + + /* OR flags */ + vacopt1->flags |= vacopt2->flags; + + /* Set requested parallel worker number */ + if (vacopt2->flags == VACOPT_PARALLEL) + vacopt1->nworkers = vacopt2->nworkers; + + $$ = vacopt1; + pfree(vacopt2); + } ; vacuum_option_elem: - analyze_keyword { $$ = VACOPT_ANALYZE; } - | VERBOSE { $$ = VACOPT_VERBOSE; } - | FREEZE { $$ = VACOPT_FREEZE; } - | FULL { $$ = VACOPT_FULL; } + analyze_keyword { $$ = makeVacOpt(VACOPT_ANALYZE, 0); } + | VERBOSE { $$ = makeVacOpt(VACOPT_VERBOSE, 0); } + | FREEZE { $$ = makeVacOpt(VACOPT_FREEZE, 0); } + | FULL { $$ = makeVacOpt(VACOPT_FULL, 0); } + | PARALLEL { $$ = makeVacOpt(VACOPT_PARALLEL, 0); } + | PARALLEL ICONST + { + if ($2 < 1) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel vacuum degree must be more than 1"), + parser_errposition(@1))); + $$ = makeVacOpt(VACOPT_PARALLEL, $2); + } | IDENT { if (strcmp($1, "disable_page_skipping") == 0) - $$ = VACOPT_DISABLE_PAGE_SKIPPING; + $$ = makeVacOpt(VACOPT_DISABLE_PAGE_SKIPPING, 0); else if (strcmp($1, "skip_locked") == 0) - $$ = VACOPT_SKIP_LOCKED; + $$ = makeVacOpt(VACOPT_SKIP_LOCKED, 0); else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -10551,16 +10584,23 @@ vacuum_option_elem: AnalyzeStmt: analyze_keyword opt_verbose opt_vacuum_relation_list { VacuumStmt *n = makeNode(VacuumStmt); - n->options = VACOPT_ANALYZE; + VacuumOption *vacopt = makeVacOpt(VACOPT_ANALYZE, 0); + if ($2) - n->options |= VACOPT_VERBOSE; + vacopt->flags |= VACOPT_VERBOSE; + + n->options.flags = vacopt->flags; + n->options.nworkers = 0; n->rels = $3; $$ = (Node *)n; + pfree(vacopt); } | analyze_keyword '(' analyze_option_list ')' opt_vacuum_relation_list { - VacuumStmt *n = makeNode(VacuumStmt); - n->options = VACOPT_ANALYZE | $3; + VacuumStmt *n = makeNode(VacuumStmt); + + n->options.flags = $3 | VACOPT_ANALYZE; + n->options.nworkers = 0; n->rels = $5; $$ = (Node *) n; } @@ -16319,6 +16359,16 @@ makeRecursiveViewSelect(char *relname, List *aliases, Node *query) return (Node *) s; } +static VacuumOption * +makeVacOpt(VacuumOptionFlag flag, int nworkers) +{ + VacuumOption *vacopt = palloc(sizeof(VacuumOption)); + + vacopt->flags = flag; + vacopt->nworkers = nworkers; + return vacopt; +} + /* parser_init() * Initialize to parse one query string */ diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 9780895..c117d7c 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -187,16 +187,16 @@ typedef struct av_relation /* struct to keep track of tables to vacuum and/or analyze, after rechecking */ typedef struct autovac_table { - Oid at_relid; - int at_vacoptions; /* bitmask of VacuumOption */ - VacuumParams at_params; - int at_vacuum_cost_delay; - int at_vacuum_cost_limit; - bool at_dobalance; - bool at_sharedrel; - char *at_relname; - char *at_nspname; - char *at_datname; + Oid at_relid; + VacuumOption at_vacoptions; /* bitmask of VacuumOption */ + VacuumParams at_params; + int at_vacuum_cost_delay; + int at_vacuum_cost_limit; + bool at_dobalance; + bool at_sharedrel; + char *at_relname; + char *at_nspname; + char *at_datname; } autovac_table; /*------------- @@ -2490,7 +2490,7 @@ do_autovacuum(void) * next table in our list. */ HOLD_INTERRUPTS(); - if (tab->at_vacoptions & VACOPT_VACUUM) + if (tab->at_vacoptions.flags & VACOPT_VACUUM) errcontext("automatic vacuum of table \"%s.%s.%s\"", tab->at_datname, tab->at_nspname, tab->at_relname); else @@ -2842,6 +2842,7 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map, int vac_cost_limit; int vac_cost_delay; int log_min_duration; + int parallel_workers; /* * Calculate the vacuum cost parameters and the freeze ages. If there @@ -2888,13 +2889,20 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map, ? avopts->multixact_freeze_table_age : default_multixact_freeze_table_age; + parallel_workers = (avopts && + avopts->vacuum_parallel_workers >= 0) + ? avopts->vacuum_parallel_workers + : 0; + tab = palloc(sizeof(autovac_table)); tab->at_relid = relid; tab->at_sharedrel = classForm->relisshared; - tab->at_vacoptions = VACOPT_SKIPTOAST | + tab->at_vacoptions.flags = VACOPT_SKIPTOAST | (dovacuum ? VACOPT_VACUUM : 0) | (doanalyze ? VACOPT_ANALYZE : 0) | - (!wraparound ? VACOPT_SKIP_LOCKED : 0); + (!wraparound ? VACOPT_SKIP_LOCKED : 0) | + (dovacuum ? VACOPT_PARALLEL : 0); /* always consider parallel */ + tab->at_vacoptions.nworkers = parallel_workers; tab->at_params.freeze_min_age = freeze_min_age; tab->at_params.freeze_table_age = freeze_table_age; tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age; @@ -3140,10 +3148,10 @@ autovac_report_activity(autovac_table *tab) int len; /* Report the command and possible options */ - if (tab->at_vacoptions & VACOPT_VACUUM) + if (tab->at_vacoptions.flags & VACOPT_VACUUM) snprintf(activity, MAX_AUTOVAC_ACTIV_LEN, "autovacuum: VACUUM%s", - tab->at_vacoptions & VACOPT_ANALYZE ? " ANALYZE" : ""); + tab->at_vacoptions.flags & VACOPT_ANALYZE ? " ANALYZE" : ""); else snprintf(activity, MAX_AUTOVAC_ACTIV_LEN, "autovacuum: ANALYZE"); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 42bccce..9ffdecb 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -2487,7 +2487,6 @@ pgstat_fetch_stat_funcentry(Oid func_id) return funcentry; } - /* ---------- * pgstat_fetch_stat_beentry() - * @@ -3054,6 +3053,25 @@ pgstat_report_activity(BackendState state, const char *cmd_str) } /*----------- + * pgstat_report_leader_pid() - + * + * Report process id of the leader process that this backend is involved + * with. + */ +void +pgstat_report_leader_pid(int pid) +{ + volatile PgBackendStatus *beentry = MyBEEntry; + + if (!beentry) + return; + + pgstat_increment_changecount_before(beentry); + beentry->st_leader_pid = pid; + pgstat_increment_changecount_after(beentry); +} + +/*----------- * pgstat_progress_start_command() - * * Set st_progress_command (and st_progress_command_target) in own backend @@ -3659,6 +3677,11 @@ pgstat_get_wait_ipc(WaitEventIPC w) break; case WAIT_EVENT_PARALLEL_FINISH: event_name = "ParallelFinish"; + case WAIT_EVENT_PARALLEL_VACUUM_STARTUP: + event_name = "ParallelVacuumStartup"; + break; + case WAIT_EVENT_PARALLEL_VACUUM: + event_name = "ParallelVacuum"; break; case WAIT_EVENT_PROCARRAY_GROUP_UPDATE: event_name = "ProcArrayGroupUpdate"; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 970c94e..23dc6d3 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -664,7 +664,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, VacuumStmt *stmt = (VacuumStmt *) parsetree; /* we choose to allow this during "read only" transactions */ - PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ? + PreventCommandDuringRecovery((stmt->options.flags & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"); /* forbidden in parallel mode due to CommandIsReadOnly */ ExecVacuum(stmt, isTopLevel); @@ -2570,7 +2570,7 @@ CreateCommandTag(Node *parsetree) break; case T_VacuumStmt: - if (((VacuumStmt *) parsetree)->options & VACOPT_VACUUM) + if (((VacuumStmt *) parsetree)->options.flags & VACOPT_VACUUM) tag = "VACUUM"; else tag = "ANALYZE"; diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index e95e347..67aaabf 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -439,7 +439,7 @@ pg_stat_get_backend_idset(PG_FUNCTION_ARGS) Datum pg_stat_get_progress_info(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_PROGRESS_COLS PGSTAT_NUM_PROGRESS_PARAM + 3 +#define PG_STAT_GET_PROGRESS_COLS PGSTAT_NUM_PROGRESS_PARAM + 4 int num_backends = pgstat_fetch_stat_numbackends(); int curr_backend; char *cmd = text_to_cstring(PG_GETARG_TEXT_PP(0)); @@ -516,14 +516,16 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) if (has_privs_of_role(GetUserId(), beentry->st_userid)) { values[2] = ObjectIdGetDatum(beentry->st_progress_command_target); + values[3] = Int32GetDatum(beentry->st_leader_pid); for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++) - values[i + 3] = Int64GetDatum(beentry->st_progress_param[i]); + values[i + 4] = Int64GetDatum(beentry->st_progress_param[i]); } else { nulls[2] = true; + nulls[3] = true; for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++) - nulls[i + 3] = true; + nulls[i + 4] = true; } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 4026018..c30d791 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5005,9 +5005,9 @@ proname => 'pg_stat_get_progress_info', prorows => '100', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,int4,oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}', + proallargtypes => '{text,int4,oid,oid,int4,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{cmdtype,pid,datid,relid,leader_pid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}', prosrc => 'pg_stat_get_progress_info' }, { oid => '3099', descr => 'statistics: information about currently active replication', diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 2f4303e..ea71d60 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -14,16 +14,19 @@ #ifndef VACUUM_H #define VACUUM_H +#include "access/heapam_xlog.h" #include "access/htup.h" #include "catalog/pg_class.h" +#include "access/parallel.h" +#include "access/relscan.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" #include "nodes/parsenodes.h" #include "storage/buf.h" +#include "storage/condition_variable.h" #include "storage/lock.h" #include "utils/relcache.h" - /*---------- * ANALYZE builds one of these structs for each attribute (column) that is * to be analyzed. The struct and subsidiary data are in anl_context, @@ -155,10 +158,9 @@ extern int vacuum_freeze_table_age; extern int vacuum_multixact_freeze_min_age; extern int vacuum_multixact_freeze_table_age; - /* in commands/vacuum.c */ extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel); -extern void vacuum(int options, List *relations, VacuumParams *params, +extern void vacuum(VacuumOption options, List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, bool isTopLevel); extern void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel); @@ -192,8 +194,9 @@ extern Relation vacuum_open_relation(Oid relid, RangeVar *relation, VacuumParams *params, int options, LOCKMODE lmode); /* in commands/vacuumlazy.c */ -extern void lazy_vacuum_rel(Relation onerel, int options, +extern void lazy_vacuum_rel(Relation onerel, VacuumOption options, VacuumParams *params, BufferAccessStrategy bstrategy); +extern void lazy_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ extern void analyze_rel(Oid relid, RangeVar *relation, int options, diff --git a/src/include/commands/vacuum_internal.h b/src/include/commands/vacuum_internal.h new file mode 100644 index 0000000..8a132f9 --- /dev/null +++ b/src/include/commands/vacuum_internal.h @@ -0,0 +1,191 @@ +/*------------------------------------------------------------------------- + * + * vacuum_internal.h + * Internal declarations for lazy vacuum + * + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/vacuum_internal.h + * + *------------------------------------------------------------------------- + */ + +#ifndef VACUUM_INTERNAL_H +#define VACUUM_INTERNAL_H + +/* DSM key for parallel lazy vacuum */ +#define VACUUM_KEY_SHARED UINT64CONST(0xFFFFFFFFFFF00001) +#define VACUUM_KEY_VACUUM_STATS UINT64CONST(0xFFFFFFFFFFF00002) +#define VACUUM_KEY_INDEX_STATS UINT64CONST(0xFFFFFFFFFFF00003) +#define VACUUM_KEY_DEAD_TUPLES UINT64CONST(0xFFFFFFFFFFF00004) +#define VACUUM_KEY_WORKERS UINT64CONST(0xFFFFFFFFFFF00005) +#define VACUUM_KEY_QUERY_TEXT UINT64CONST(0xFFFFFFFFFFF00006) + +/* + * Type definitions of lazy vacuum. The fields of these structs are + * accessed by only the vacuum leader. + */ +typedef struct LVScanDescData LVScanDescData; +typedef struct LVScanDescData *LVScanDesc; +typedef struct LVTidMap LVTidMap; +typedef struct LVIndStats LVIndStats; + +/* Vacuum worker state for parallel lazy vacuum */ +typedef enum VacWorkerSate +{ + VACSTATE_INVALID = 0, + VACSTATE_SCAN, + VACSTATE_VACUUM_INDEX, + VACSTATE_VACUUM_HEAP, + VACSTATE_CLEANUP_INDEX, + VACSTATE_WORKER_DONE, + VACSTATE_COMPLETED +} VacWorkerState; + +/* + * The 'pid' always starts with InvalidPid, which means the vacuum worker + * is starting up. It's sets by the vacuum worker itself during start up. When + * the vacuum worker exits or detaches the vacuum worker slot, 'pid' is set to 0, + * which means the vacuum worker is dead. + */ +typedef struct VacuumWorker +{ + pid_t pid; /* parallel worker's pid. + InvalidPid = not started yet; 0 = dead */ + + VacWorkerState state; /* current worker's state */ + slock_t mutex; /* protect the above fields */ +} VacuumWorker; + +/* Struct to control parallel vacuum workers */ +typedef struct LVWorkerState +{ + int nparticipantvacuum; /* only parallel worker, not including + the leader */ + int nparticipantvacuum_launched; /* actual launched workers of + nparticipantvacuum */ + + /* condition variable signaled when changing status */ + ConditionVariable cv; + + /* protect workers array */ + LWLock vacuumlock; + + VacuumWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} LVWorkerState; +#define SizeOfLVWorkerState offsetof(LVWorkerState, workers) + sizeof(VacuumWorker) + +typedef struct LVRelStats +{ + /* hasindex = true means two-pass strategy; false means one-pass */ + bool hasindex; + /* Overall statistics about rel */ + BlockNumber old_rel_pages; /* previous value of pg_class.relpages */ + BlockNumber rel_pages; /* total number of pages */ + BlockNumber scanned_pages; /* number of pages we examined */ + BlockNumber pinskipped_pages; /* # of pages we skipped due to a pin */ + BlockNumber frozenskipped_pages; /* # of frozen pages we skipped */ + BlockNumber tupcount_pages; /* pages whose tuples we counted */ + BlockNumber empty_pages; /* # of empty pages */ + BlockNumber vacuumed_pages; /* # of pages we vacuumed */ + double num_tuples; /* total number of nonremoval tuples */ + double live_tuples; /* live tuples (reltuples estimate) */ + double tuples_deleted; /* tuples cleaned up by vacuum */ + double unused_tuples; /* unused item pointers */ + double old_live_tuples; /* previous value of pg_class.reltuples */ + double new_rel_tuples; /* new estimated total # of tuples */ + double new_live_tuples; /* new estimated total # of live tuples */ + double new_dead_tuples; /* new estimated total # of dead tuples */ + BlockNumber pages_removed; + BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + int num_index_scans; + TransactionId latestRemovedXid; + bool lock_waiter_detected; +} LVRelStats; + +/* + * Shared information among parallel workers. + */ +typedef struct LVShared +{ + /* Target relation's OID */ + Oid relid; + + /* Options and thresholds used for lazy vacuum */ + VacuumOption options; + bool aggressive; /* is an aggressive vacuum? */ + bool is_wraparound; /* for anti-wraparound purpose? */ + int elevel; /* verbose logging */ + TransactionId oldestXmin; + TransactionId freezeLimit; + MultiXactId multiXactCutoff; + + /* Vacuum delay */ + int cost_delay; + int cost_limit; + + int max_dead_tuples_per_worker; /* Maximum tuples each worker can have */ + ParallelHeapScanDescData heapdesc; /* for heap scan */ +} LVShared; + +/* + * Working state for lazy vacuum execution. LVState is used by both vacuum + * workers and the vacuum leader. In parallel lazy vacuum, the 'vacrelstats' + * for vacuum worker and the 'dead_tuples' exit in shared memory in addition + * to the three fields for parallel lazy vacuum: 'lvshared', 'indstats' and + * 'pcxt'. + */ +typedef struct LVState +{ + /* Vacuum target relation and indexes */ + Oid relid; + Relation relation; + Relation *indRels; + int nindexes; + + /* Used during scanning heap */ + IndexBulkDeleteResult **indbulkstats; + xl_heap_freeze_tuple *frozen; + BlockNumber next_fsm_block_to_vacuum; + BlockNumber current_block; /* block number being scanned */ + VacuumOption options; + bool is_wraparound; + + /* Scan description for lazy vacuum */ + LVScanDesc lvscan; + bool aggressive; + + /* Vacuum statistics for the target table */ + LVRelStats *vacrelstats; + + /* Dead tuple array */ + LVTidMap *dead_tuples; + + /* + * The following fields are only present when a parallel lazy vacuum + * is performed. + */ + LVShared *lvshared; /* shared information among vacuum workers */ + LVIndStats *indstats; /* shared index statistics */ + +} LVState; + +extern LVWorkerState *WorkerState; + +extern LVScanDesc lv_beginscan(Relation relation, LVShared *lvshared, + bool aggressive, bool disable_page_skipping); +extern void lv_endscan(LVScanDesc lvscan); +extern int do_lazy_scan_heap(LVState *lvstate, bool *isFinished); +extern void lazy_cleanup_all_indexes(LVState *lvstate); +extern void lazy_vacuum_all_indexes(LVState *lvstate); +extern void lazy_vacuum_heap(Relation onerel, LVState *lvstate); +extern void vacuum_set_xid_limits_for_worker(TransactionId oldestxmin, + TransactionId freezelimit, + MultiXactId multixactcutoff); +extern void vacuum_set_elevel_for_worker(int elevel); +extern void lazy_space_alloc(LVState *lvstate, BlockNumber relblocks); + + +#endif /* VACUUM_INTERNAL_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index aa4a0db..a0f9578 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3147,7 +3147,7 @@ typedef struct ClusterStmt * and VACOPT_ANALYZE must be set in options. * ---------------------- */ -typedef enum VacuumOption +typedef enum VacuumOptionFlag { VACOPT_VACUUM = 1 << 0, /* do VACUUM */ VACOPT_ANALYZE = 1 << 1, /* do ANALYZE */ @@ -3156,9 +3156,17 @@ typedef enum VacuumOption VACOPT_FULL = 1 << 4, /* FULL (non-concurrent) vacuum */ VACOPT_SKIP_LOCKED = 1 << 5, /* skip if cannot get lock */ VACOPT_SKIPTOAST = 1 << 6, /* don't process the TOAST table, if any */ - VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7 /* don't skip any pages */ + VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7, /* don't skip any pages */ + VACOPT_PARALLEL = 1 << 8 /* do VACUUM in parallel */ +} VacuumOptionFlag; + +typedef struct VacuumOption +{ + VacuumOptionFlag flags; /* OR of VacuumOptionFlag */ + int nworkers; /* # of parallel vacuum workers */ } VacuumOption; + /* * Info about a single target table of VACUUM/ANALYZE. * @@ -3176,9 +3184,9 @@ typedef struct VacuumRelation typedef struct VacuumStmt { - NodeTag type; - int options; /* OR of VacuumOption flags */ - List *rels; /* list of VacuumRelation, or NIL for all */ + NodeTag type; + VacuumOption options; /* OR of VacuumOption flags */ + List *rels; /* list of VacuumRelation, or NIL for all */ } VacuumStmt; /* ---------------------- diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 3e733b3..20270ef 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -60,5 +60,6 @@ extern Expr *preprocess_phv_expression(PlannerInfo *root, Expr *expr); extern bool plan_cluster_use_sort(Oid tableOid, Oid indexOid); extern int plan_create_index_workers(Oid tableOid, Oid indexOid); +extern int plan_lazy_vacuum_workers(Oid tableOid, int nworkers_requested); #endif /* PLANNER_H */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f1c10d1..9c1d3fc 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -828,6 +828,8 @@ typedef enum WAIT_EVENT_PARALLEL_BITMAP_SCAN, WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN, WAIT_EVENT_PARALLEL_FINISH, + WAIT_EVENT_PARALLEL_VACUUM_STARTUP, + WAIT_EVENT_PARALLEL_VACUUM, WAIT_EVENT_PROCARRAY_GROUP_UPDATE, WAIT_EVENT_PROMOTE, WAIT_EVENT_REPLICATION_ORIGIN_DROP, @@ -1032,13 +1034,17 @@ typedef struct PgBackendStatus /* * Command progress reporting. Any command which wishes can advertise - * that it is running by setting st_progress_command, + * that it is running by setting st_leaderpid, st_progress_command, * st_progress_command_target, and st_progress_param[]. * st_progress_command_target should be the OID of the relation which the * command targets (we assume there's just one, as this is meant for * utility commands), but the meaning of each element in the * st_progress_param array is command-specific. + * st_leader_pid can be used for command progress reporting of parallel + * operation. Setting by the leader's pid of parallel operation we can + * group them in progress reporting SQL. */ + int st_leader_pid; ProgressCommandType st_progress_command; Oid st_progress_command_target; int64 st_progress_param[PGSTAT_NUM_PROGRESS_PARAM]; @@ -1205,6 +1211,7 @@ extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer, int buflen); extern const char *pgstat_get_backend_desc(BackendType backendType); +extern void pgstat_report_leader_pid(int pid); extern void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid); extern void pgstat_progress_update_param(int index, int64 val); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index b2dcb73..fddcb54 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -219,6 +219,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_SHARED_TUPLESTORE, LWTRANCHE_TBM, LWTRANCHE_PARALLEL_APPEND, + LWTRANCHE_PARALLEL_VACUUM, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 84469f5..c3715e4 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -250,6 +250,7 @@ typedef struct AutoVacOpts int multixact_freeze_max_age; int multixact_freeze_table_age; int log_min_duration; + int vacuum_parallel_workers; float8 vacuum_scale_factor; float8 analyze_scale_factor; } AutoVacOpts; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 735dd37..e2655fd 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1836,13 +1836,21 @@ pg_stat_progress_vacuum| SELECT s.pid, ELSE NULL::text END AS phase, s.param2 AS heap_blks_total, - s.param3 AS heap_blks_scanned, - s.param4 AS heap_blks_vacuumed, - s.param5 AS index_vacuum_count, + w.heap_blks_scanned, + w.heap_blks_vacuumed, + w.index_vacuum_count, s.param6 AS max_dead_tuples, - s.param7 AS num_dead_tuples - FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10) - LEFT JOIN pg_database d ON ((s.datid = d.oid))); + w.num_dead_tuples + FROM ((pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, leader_pid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10) + LEFT JOIN pg_database d ON ((s.datid = d.oid))) + LEFT JOIN ( SELECT pg_stat_get_progress_info.leader_pid, + max(pg_stat_get_progress_info.param3) AS heap_blks_scanned, + max(pg_stat_get_progress_info.param4) AS heap_blks_vacuumed, + max(pg_stat_get_progress_info.param5) AS index_vacuum_count, + max(pg_stat_get_progress_info.param7) AS num_dead_tuples + FROM pg_stat_get_progress_info('VACUUM'::text) pg_stat_get_progress_info(pid, datid, relid, leader_pid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10) + GROUP BY pg_stat_get_progress_info.leader_pid) w ON ((s.pid = w.leader_pid))) + WHERE (s.pid = s.leader_pid); pg_stat_replication| SELECT s.pid, s.usesysid, u.rolname AS usename, diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out index fa9d663..b8805b2 100644 --- a/src/test/regress/expected/vacuum.out +++ b/src/test/regress/expected/vacuum.out @@ -129,6 +129,8 @@ ERROR: relation "does_not_exist" does not exist VACUUM (SKIP_LOCKED) vactst; VACUUM (SKIP_LOCKED, FULL) vactst; ANALYZE (SKIP_LOCKED) vactst; +-- parallel option +VACUUM (PARALLEL 1) vactst; DROP TABLE vaccluster; DROP TABLE vactst; DROP TABLE vacparted; diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql index 9defa0d..8cb8f64 100644 --- a/src/test/regress/sql/vacuum.sql +++ b/src/test/regress/sql/vacuum.sql @@ -103,6 +103,9 @@ VACUUM (SKIP_LOCKED) vactst; VACUUM (SKIP_LOCKED, FULL) vactst; ANALYZE (SKIP_LOCKED) vactst; +-- parallel option +VACUUM (PARALLEL 1) vactst; + DROP TABLE vaccluster; DROP TABLE vactst; DROP TABLE vacparted; -- 2.10.5