From 76d8a1bfd5207d28a4e9fe98a0e1ea7c096d70aa Mon Sep 17 00:00:00 2001 From: "Sami Imseih (AWS)" Date: Thu, 17 Feb 2022 04:21:04 +0000 Subject: [PATCH 1/1] Expose progress for the "vacuuming indexes" and "cleaning up indexes" phase of a VACUUM operation. Add 2 new columns to pg_stat_progress_vacuum. The columns are indexes_total as the total indexes to be vacuumed or cleaned and indexes_processed as the number of indexes vacuumed or cleaned up so far. Author: Sami Imseih, based on suggestions by Nathan Bossart, Peter Geoghegan and Masahiko Sawada Reviewed by: Nathan Bossart, Justin Pryzby --- doc/src/sgml/monitoring.sgml | 22 +++ src/backend/access/heap/vacuumlazy.c | 207 +++++++++++++++++++++++++- src/backend/catalog/system_views.sql | 3 +- src/backend/commands/vacuumparallel.c | 7 + src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/adt/pgstatfuncs.c | 12 ++ src/include/commands/progress.h | 2 + src/include/commands/vacuum.h | 8 + src/test/regress/expected/rules.out | 4 +- 9 files changed, 261 insertions(+), 7 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index bf7625d988..04440dfa88 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6278,6 +6278,28 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, Number of dead tuples collected since the last index vacuum cycle. + + + + indexes_total bigint + + + The number of indexes to be processed in the + vacuuming indexes or cleaning up indexes phase + of the vacuum. + + + + + + indexes_processed bigint + + + The number of indexes processed in the + vacuuming indexes or cleaning up indexes phase. + At the start of an index vacuum cycle, this value is set to 0. + + diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 242511a235..1198677bc0 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -58,6 +58,7 @@ #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "storage/freespace.h" +#include "storage/ipc.h" #include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" @@ -245,6 +246,26 @@ typedef struct LVSavedErrInfo VacErrPhase phase; } LVSavedErrInfo; +/* + * Structs for tracking shared Progress information + * amongst worker ( and leader ) processes of a vacuum. + */ +typedef struct VacOneWorkerProgressInfo +{ + int leader_pid; + int indexes_processed; +} VacOneWorkerProgressInfo; + +typedef struct VacWorkerProgressInfo +{ + int num_vacuums; /* number of active VACUUMS with parallel workers */ + int max_vacuums; /* max number of VACUUMS with parallel workers */ + slock_t mutex; + VacOneWorkerProgressInfo vacuums[FLEXIBLE_ARRAY_MEMBER]; +} VacWorkerProgressInfo; + +static VacWorkerProgressInfo *vacworkerprogress; + /* non-export function prototypes */ static void lazy_scan_heap(LVRelState *vacrel, int nworkers); @@ -420,6 +441,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->rel = rel; vac_open_indexes(vacrel->rel, RowExclusiveLock, &vacrel->nindexes, &vacrel->indrels); + /* Advertise the number of indexes we are vacuuming */ + pgstat_progress_update_param(PROGRESS_VACUUM_TOTAL_INDEXES, vacrel->nindexes); if (instrument && vacrel->nindexes > 0) { /* Copy index names used by instrumentation (not error reporting) */ @@ -2328,6 +2351,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) lazy_vacuum_one_index(indrel, istat, vacrel->old_live_tuples, vacrel); + /* + * For the non-parallel variant of a vacuum, the array position + * of the index determines how many indexes are processed so far. + * Add 1 to the posititon as vacrel->nindexes is a 0-based array. + */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, idx + 1); + if (lazy_check_wraparound_failsafe(vacrel)) { /* Wraparound emergency -- end current index scan */ @@ -2338,9 +2368,20 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) } else { - /* Outsource everything to parallel variant */ - parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples, - vacrel->num_index_scans); + /* + * parallel_vacuum_bulkdel_all_indexes will call vacuum_worker_update + * which updates shared memory for the index progress. To ensure shared + * memory cleanup, do the work with PG_ENSURE_ERROR_CLEANUP. + */ + PG_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + { + + parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, + vacrel->old_live_tuples, + vacrel->num_index_scans); + } + PG_END_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + vacuum_worker_end(MyProcPid); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2350,6 +2391,12 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) allindexes = false; } + /* + * We're done with index vacuuming. + * Set the total number of indexes completed as the total number of indexes + */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, vacrel->nindexes) ; + /* * We delete all LP_DEAD items from the first heap pass in all indexes on * each call here (except calls where we choose to do the failsafe). This @@ -2675,15 +2722,34 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) vacrel->indstats[idx] = lazy_cleanup_one_index(indrel, istat, reltuples, estimated_count, vacrel); + + /* See the lazy_vacuum_all_indexes comments */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, idx + 1); } } else { - /* Outsource everything to parallel variant */ - parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples, + /* + * Outsource everything to parallel variant + * + * See the lazy_vacuum_all_indexes comments + */ + PG_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + { + parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples, vacrel->num_index_scans, (vacrel->scanned_pages < vacrel->rel_pages)); + } + PG_END_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + vacuum_worker_end(MyProcPid); } + + /* + * We're done with index cleanup. + * Set the total number of indexes completed as the total number of indexes + */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, vacrel->nindexes) ; + } /* @@ -3464,3 +3530,134 @@ restore_vacuum_error_info(LVRelState *vacrel, vacrel->offnum = saved_vacrel->offnum; vacrel->phase = saved_vacrel->phase; } + +void +vacuum_worker_end(int leader_pid) +{ + SpinLockAcquire(&vacworkerprogress->mutex); + for (int i = 0; i < vacworkerprogress->num_vacuums; i++) + { + VacOneWorkerProgressInfo *vac = &vacworkerprogress->vacuums[i]; + + if (vac->leader_pid == leader_pid) + { + *vac = vacworkerprogress->vacuums[vacworkerprogress->num_vacuums - 1]; + vacworkerprogress->num_vacuums--; + SpinLockRelease(&vacworkerprogress->mutex); + break; + } + } + SpinLockRelease(&vacworkerprogress->mutex); +} + +/* + * vacuum_worker_end wrapped as an on_shmem_exit callback function + */ +void +vacuum_worker_end_callback(int code, Datum arg) +{ + vacuum_worker_end(DatumGetInt32(arg)); +} + +/* + * vacuum_worker_update sets the number of indexes processed so far + * in a parallel vacuum. + */ +void +vacuum_worker_update(int leader_pid) +{ + VacOneWorkerProgressInfo *vac; + + SpinLockAcquire(&vacworkerprogress->mutex); + + for (int i = 0; i < vacworkerprogress->num_vacuums; i++) + { + int next_leader_pid; + + vac = &vacworkerprogress->vacuums[i]; + + next_leader_pid = vac->leader_pid; + + if (next_leader_pid == leader_pid) + { + vac->indexes_processed++; + SpinLockRelease(&vacworkerprogress->mutex); + return; + } + } + + if (vacworkerprogress->num_vacuums >= vacworkerprogress->max_vacuums) + { + SpinLockRelease(&vacworkerprogress->mutex); + elog(ERROR, "out of vacuum worker progress slots"); + } + + vac = &vacworkerprogress->vacuums[vacworkerprogress->num_vacuums]; + vac->leader_pid = leader_pid; + vac->indexes_processed = 1; + vacworkerprogress->num_vacuums++; + SpinLockRelease(&vacworkerprogress->mutex); +} + +/* + * vacuum_progress_cb updates the number of indexes that have been + * vacuumed or cleaned up in a parallel vacuum. + */ +void +vacuum_progress_cb(Datum *values, int offset) +{ + VacOneWorkerProgressInfo *vac; + int leader_pid = values[0]; + + /* If we are vacuuming in parallel, set the number of indexes vacuumed + * from the shared memory counter. + * */ + for (int i = 0; i < vacworkerprogress->num_vacuums; i++) + { + int next_leader_pid; + + vac = &vacworkerprogress->vacuums[i]; + + next_leader_pid = vac->leader_pid; + + if (next_leader_pid == leader_pid) + values[PROGRESS_VACUUM_INDEXES_COMPLETED + offset] = vac->indexes_processed; + } +} + +/* + * VacuumWorkerProgressShmemSize --- report amount of shared memory space needed + */ +Size +VacuumWorkerProgressShmemSize(void) +{ + Size size; + + size = offsetof(VacWorkerProgressInfo, vacuums); + size = add_size(size, mul_size(GetMaxBackends(), sizeof(VacOneWorkerProgressInfo))); + return size; +} + +/* + * VacuumWorkerProgressShmemInit --- initialize this module's shared memory + */ +void +VacuumWorkerProgressShmemInit(void) +{ + bool found; + + vacworkerprogress = (VacWorkerProgressInfo *) ShmemInitStruct("Vacuum Worker Progress Stats", + VacuumWorkerProgressShmemSize(), + &found); + + if (!IsUnderPostmaster) + { + /* Initialize shared memory area */ + Assert(!found); + + vacworkerprogress->max_vacuums = GetMaxBackends(); + SpinLockInit(&vacworkerprogress->mutex); + } + else + Assert(found); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1f87..eaa0508c0b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1126,7 +1126,8 @@ CREATE VIEW pg_stat_progress_vacuum AS END AS phase, S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned, S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count, - S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples + S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples, + S.param8 AS indexes_total, S.param9 AS indexes_processed FROM pg_stat_get_progress_info('VACUUM') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 974a29e7a9..9b465e12cc 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -29,6 +29,7 @@ #include "access/amapi.h" #include "access/table.h" #include "catalog/index.h" +#include "commands/progress.h" #include "commands/vacuum.h" #include "optimizer/paths.h" #include "pgstat.h" @@ -101,6 +102,9 @@ typedef struct PVShared /* Counter for vacuuming and cleanup */ pg_atomic_uint32 idx; + + /* Leader PID of the vacuum */ + int leader_pid; } PVShared; /* Status used during parallel index vacuum or cleanup */ @@ -357,6 +361,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, (nindexes_mwm > 0) ? maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : maintenance_work_mem; + shared->leader_pid = MyProcPid; pg_atomic_init_u32(&(shared->cost_balance), 0); pg_atomic_init_u32(&(shared->active_nworkers), 0); @@ -844,9 +849,11 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, { case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items); + vacuum_worker_update(pvs->shared->leader_pid); break; case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: istat_res = vac_cleanup_one_index(&ivinfo, istat); + vacuum_worker_update(pvs->shared->leader_pid); break; default: elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index cd4ebe2fc5..a4bd6a14a3 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -24,6 +24,7 @@ #include "access/twophase.h" #include "access/xlogrecovery.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -145,6 +146,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, VacuumWorkerProgressShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -296,6 +298,7 @@ CreateSharedMemoryAndSemaphores(void) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + VacuumWorkerProgressShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 15cb17ace4..3d90b23ce4 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -18,6 +18,7 @@ #include "access/xlog.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" +#include "commands/vacuum.h" #include "common/ip.h" #include "funcapi.h" #include "miscadmin.h" @@ -452,6 +453,10 @@ pg_stat_get_backend_idset(PG_FUNCTION_ARGS) /* * Returns command progress information for the named command. + * + * A command type can optionally define a callback function + * which will derive Datum values rather than use values + * directly from the backends progress array. */ Datum pg_stat_get_progress_info(PG_FUNCTION_ARGS) @@ -466,6 +471,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; MemoryContext per_query_ctx; MemoryContext oldcontext; + void (*callback)(Datum *, int) = NULL; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) @@ -483,7 +489,10 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) /* Translate command name into command type code. */ if (pg_strcasecmp(cmd, "VACUUM") == 0) + { cmdtype = PROGRESS_COMMAND_VACUUM; + callback = vacuum_progress_cb; + } else if (pg_strcasecmp(cmd, "ANALYZE") == 0) cmdtype = PROGRESS_COMMAND_ANALYZE; else if (pg_strcasecmp(cmd, "CLUSTER") == 0) @@ -552,6 +561,9 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) nulls[i + 3] = true; } + if (callback) + callback(values, 3); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index a28938caf4..e4f3cd9133 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -25,6 +25,8 @@ #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS 4 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES 5 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES 6 +#define PROGRESS_VACUUM_TOTAL_INDEXES 7 +#define PROGRESS_VACUUM_INDEXES_COMPLETED 8 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */ #define PROGRESS_VACUUM_PHASE_SCAN_HEAP 1 diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index d64f6268f2..5642fae0cd 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -336,4 +336,12 @@ extern double anl_random_fract(void); extern double anl_init_selection_state(int n); extern double anl_get_next_S(double t, int n, double *stateptr); +/* in commands/vacuumparallel.c */ +extern Size VacuumWorkerProgressShmemSize(void); +extern void VacuumWorkerProgressShmemInit(void); +extern void vacuum_worker_end(int leader_pid); +extern void vacuum_worker_update(int leader_pid); +extern void vacuum_progress_cb(Datum *values, int offset); +extern void vacuum_worker_end_callback(int code, Datum arg); + #endif /* VACUUM_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1420288d67..dc27b8614e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2002,7 +2002,9 @@ pg_stat_progress_vacuum| SELECT s.pid, s.param4 AS heap_blks_vacuumed, s.param5 AS index_vacuum_count, s.param6 AS max_dead_tuples, - s.param7 AS num_dead_tuples + s.param7 AS num_dead_tuples, + s.param8 AS indexes_total, + s.param9 AS indexes_processed FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_replication| SELECT s.pid, -- 2.32.0