From f2843d05463986d3aa704a7188cb51d89f7f5826 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 18 Mar 2026 20:55:00 +0100 Subject: [PATCH v4 4/4] show prefetch stats for TidRangeScan - enable show_scan_io_usage for TidRangeScan - add infrastructure to allocate/collect instrumentation from parallel workers --- src/backend/commands/explain.c | 28 ++++++ src/backend/executor/execParallel.c | 3 + src/backend/executor/nodeTidrangescan.c | 110 ++++++++++++++++++++++-- src/include/executor/instrument_node.h | 11 +++ src/include/executor/nodeTidrangescan.h | 1 + src/include/nodes/execnodes.h | 1 + 6 files changed, 149 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 07a418e50cc..8f4e3b07c3e 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2152,6 +2152,7 @@ ExplainNode(PlanState *planstate, List *ancestors, if (plan->qual) show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); + show_scan_io_usage((ScanState *) planstate, es); } break; case T_ForeignScan: @@ -4114,6 +4115,23 @@ show_scan_io_usage(ScanState *planstate, ExplainState *es) } } + break; + } + case T_TidRangeScan: + { + SharedTidRangeScanInstrumentation *sinstrument + = ((TidRangeScanState *) planstate)->trss_sinstrument; + + /* get the sum of the counters set within each and every process */ + if (sinstrument) + { + for (int i = 0; i < sinstrument->num_workers; ++i) + { + TidRangeScanInstrumentation *winstrument = &sinstrument->sinstrument[i]; + ACCUMULATE_IO_STATS(&stats, &winstrument->io); + } + } + break; } default: @@ -4160,6 +4178,16 @@ show_io_usage(PlanState *planstate, ExplainState *es, int worker) stats = &instrument->io; + break; + } + case T_TidRangeScan: + { + TidRangeScanState *state = ((TidRangeScanState *) planstate); + SharedTidRangeScanInstrumentation *sinstrument = state->trss_sinstrument; + TidRangeScanInstrumentation *instrument = &sinstrument->sinstrument[worker]; + + stats = &instrument->io; + break; } default: diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ffc708ed6be..62b26bfd18a 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -1122,6 +1122,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, case T_SeqScanState: ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate); break; + case T_TidRangeScanState: + ExecTidRangeScanRetrieveInstrumentation((TidRangeScanState *) planstate); + break; default: break; } diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c index 617713bde04..4ff6e333144 100644 --- a/src/backend/executor/nodeTidrangescan.c +++ b/src/backend/executor/nodeTidrangescan.c @@ -340,6 +340,29 @@ ExecEndTidRangeScan(TidRangeScanState *node) { TableScanDesc scan = node->ss.ss_currentScanDesc; + /* + * When ending a parallel worker, copy the statistics gathered by the + * worker back into shared memory so that it can be picked up by the main + * process to report in EXPLAIN ANALYZE. + */ + if (node->trss_sinstrument != NULL && IsParallelWorker()) + { + TidRangeScanInstrumentation *si; + + Assert(ParallelWorkerNumber < node->trss_sinstrument->num_workers); + si = &node->trss_sinstrument->sinstrument[ParallelWorkerNumber]; + + /* + * Here we accumulate the stats rather than performing memcpy on + * node->stats into si. When a Gather/GatherMerge node finishes it + * will perform planner shutdown on the workers. On rescan it will + * spin up new workers which will have a new state and zeroed stats. + */ + + /* collect prefetch info for this process from the read_stream */ + ACCUMULATE_IO_STATS(&si->io, &node->ss.ss_currentScanDesc->rs_iostats); + } + if (scan != NULL) table_endscan(scan); } @@ -433,11 +456,23 @@ void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt) { EState *estate = node->ss.ps.state; + Size size; + + size = table_parallelscan_estimate(node->ss.ss_currentRelation, + estate->es_snapshot); + node->trss_pscanlen = size; + + /* make sure the instrumentation is properly aligned */ + size = MAXALIGN(size); - node->trss_pscanlen = - table_parallelscan_estimate(node->ss.ss_currentRelation, - estate->es_snapshot); - shm_toc_estimate_chunk(&pcxt->estimator, node->trss_pscanlen); + /* account for instrumentation, if required */ + if (node->ss.ps.instrument && pcxt->nworkers > 0) + { + size = add_size(size, offsetof(SharedTidRangeScanInstrumentation, sinstrument)); + size = add_size(size, mul_size(pcxt->nworkers, sizeof(TidRangeScanInstrumentation))); + } + + shm_toc_estimate_chunk(&pcxt->estimator, size); shm_toc_estimate_keys(&pcxt->estimator, 1); } @@ -452,8 +487,18 @@ ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt) { EState *estate = node->ss.ps.state; ParallelTableScanDesc pscan; + SharedTidRangeScanInstrumentation *sinstrument = NULL; + Size size; + char *ptr; - pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen); + size = MAXALIGN(node->trss_pscanlen); + if (node->ss.ps.instrument && pcxt->nworkers > 0) + { + size = add_size(size, offsetof(SharedTidRangeScanInstrumentation, sinstrument)); + size = add_size(size, mul_size(pcxt->nworkers, sizeof(TidRangeScanInstrumentation))); + } + + pscan = shm_toc_allocate(pcxt->toc, size); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, estate->es_snapshot); @@ -461,6 +506,23 @@ ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt) node->ss.ss_currentScanDesc = table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan); + + /* initialize the shared instrumentation (with correct alignment) */ + ptr = (char *) pscan; + ptr += MAXALIGN(node->trss_pscanlen); + if (node->ss.ps.instrument && pcxt->nworkers > 0) + sinstrument = (SharedTidRangeScanInstrumentation *) ptr; + + if (sinstrument) + { + sinstrument->num_workers = pcxt->nworkers; + + /* ensure any unfilled slots will contain zeroes */ + memset(sinstrument->sinstrument, 0, + pcxt->nworkers * sizeof(TidRangeScanInstrumentation)); + } + + node->trss_sinstrument = sinstrument; } /* ---------------------------------------------------------------- @@ -489,10 +551,48 @@ void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt) { + EState *estate = node->ss.ps.state; ParallelTableScanDesc pscan; + char *ptr; + Size size; pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false); node->ss.ss_currentScanDesc = table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan); + + /* + * Workers don't get the trss_pscanlen value in scan descriptor, so use + * the TAM callback again. The result has to match the earlier result in + * ExecTidRangeScanEstimate. + */ + size = table_parallelscan_estimate(node->ss.ss_currentRelation, + estate->es_snapshot); + ptr = (char *) pscan; + ptr += MAXALIGN(size); + + if (node->ss.ps.instrument) + node->trss_sinstrument = (SharedTidRangeScanInstrumentation *) ptr; +} + +/* ---------------------------------------------------------------- + * ExecTidRangeScanRetrieveInstrumentation + * + * Transfer scan statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecTidRangeScanRetrieveInstrumentation(TidRangeScanState *node) +{ + SharedTidRangeScanInstrumentation *sinstrument = node->trss_sinstrument; + Size size; + + if (sinstrument == NULL) + return; + + size = offsetof(SharedTidRangeScanInstrumentation, sinstrument) + + sinstrument->num_workers * sizeof(TidRangeScanInstrumentation); + + node->trss_sinstrument = palloc(size); + memcpy(node->trss_sinstrument, sinstrument, size); } diff --git a/src/include/executor/instrument_node.h b/src/include/executor/instrument_node.h index ce643ef0635..b33b296ce03 100644 --- a/src/include/executor/instrument_node.h +++ b/src/include/executor/instrument_node.h @@ -99,6 +99,17 @@ typedef struct SharedSeqScanInstrumentation SeqScanInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]; } SharedSeqScanInstrumentation; +typedef struct TidRangeScanInstrumentation +{ + IOStats io; +} TidRangeScanInstrumentation; + +typedef struct SharedTidRangeScanInstrumentation +{ + int num_workers; + TidRangeScanInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]; +} SharedTidRangeScanInstrumentation; + typedef struct IndexScanInstrumentation { /* Index search count (incremented with pgstat_count_index_scan call) */ diff --git a/src/include/executor/nodeTidrangescan.h b/src/include/executor/nodeTidrangescan.h index 8752d1ea8c4..4f5dd74e0fc 100644 --- a/src/include/executor/nodeTidrangescan.h +++ b/src/include/executor/nodeTidrangescan.h @@ -27,5 +27,6 @@ extern void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *p extern void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt); extern void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt); extern void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt); +extern void ExecTidRangeScanRetrieveInstrumentation(TidRangeScanState *node); #endif /* NODETIDRANGESCAN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index f287f8fc813..611a6280987 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1896,6 +1896,7 @@ typedef struct TidRangeScanState ItemPointerData trss_maxtid; bool trss_inScan; Size trss_pscanlen; + SharedTidRangeScanInstrumentation *trss_sinstrument; } TidRangeScanState; /* ---------------- -- 2.53.0