From 2093a78191cacaeecae6b1bd095433dbdbf1eb3d Mon Sep 17 00:00:00 2001 From: Cary Huang Date: Mon, 29 Apr 2024 14:48:44 -0700 Subject: [PATCH] added parallel tid range scan feature --- src/backend/access/heap/heapam.c | 19 +++-- src/backend/access/table/tableam.c | 29 +++++++ src/backend/executor/execParallel.c | 20 +++++ src/backend/executor/nodeTidrangescan.c | 81 +++++++++++++++++++ src/backend/optimizer/path/allpaths.c | 5 ++ src/backend/optimizer/path/costsize.c | 22 +++++ src/backend/optimizer/path/tidpath.c | 31 ++++++- src/backend/optimizer/util/pathnode.c | 7 +- src/include/access/tableam.h | 12 +++ src/include/executor/nodeTidrangescan.h | 7 ++ src/include/nodes/execnodes.h | 1 + src/include/optimizer/pathnode.h | 3 +- src/include/optimizer/paths.h | 2 + src/test/regress/expected/select_parallel.out | 51 ++++++++++++ src/test/regress/sql/select_parallel.sql | 15 ++++ 15 files changed, 293 insertions(+), 12 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 4be0dee4de..2cbb058e96 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1367,7 +1367,8 @@ heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, * Check for an empty range and protect from would be negative results * from the numBlks calculation below. */ - if (ItemPointerCompare(&highestItem, &lowestItem) < 0) + if (ItemPointerCompare(&highestItem, &lowestItem) < 0 && + sscan->rs_parallel == NULL) { /* Set an empty range of blocks to scan */ heap_setscanlimits(sscan, 0, 0); @@ -1381,15 +1382,19 @@ heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, * lowestItem has an offset above MaxOffsetNumber. In this case, we could * advance startBlk by one. Likewise, if highestItem has an offset of 0 * we could scan one fewer blocks. However, such an optimization does not - * seem worth troubling over, currently. + * seem worth troubling over, currently. This is set only in non-parallel + * case. */ - startBlk = ItemPointerGetBlockNumberNoCheck(&lowestItem); + if (sscan->rs_parallel == NULL) + { + startBlk = ItemPointerGetBlockNumberNoCheck(&lowestItem); - numBlks = ItemPointerGetBlockNumberNoCheck(&highestItem) - - ItemPointerGetBlockNumberNoCheck(&lowestItem) + 1; + numBlks = ItemPointerGetBlockNumberNoCheck(&highestItem) - + ItemPointerGetBlockNumberNoCheck(&lowestItem) + 1; - /* Set the start block and number of blocks to scan */ - heap_setscanlimits(sscan, startBlk, numBlks); + /* Set the start block and number of blocks to scan */ + heap_setscanlimits(sscan, startBlk, numBlks); + } /* Finally, set the TID range in sscan */ ItemPointerCopy(&lowestItem, &sscan->rs_mintid); diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index e57a0b7ea3..38253643e8 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -187,6 +187,35 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan) pscan, flags); } +TableScanDesc +table_beginscan_parallel_tidrange(Relation relation, ParallelTableScanDesc pscan, + ItemPointer mintid, ItemPointer maxtid) +{ + Snapshot snapshot; + uint32 flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE; + TableScanDesc sscan; + + Assert(RelationGetRelid(relation) == pscan->phs_relid); + + if (!pscan->phs_snapshot_any) + { + /* Snapshot was serialized -- restore it */ + snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off); + RegisterSnapshot(snapshot); + flags |= SO_TEMP_SNAPSHOT; + } + else + { + /* SnapshotAny passed by caller (not serialized) */ + snapshot = SnapshotAny; + } + + sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL, + pscan, flags); + + return sscan; +} + /* ---------------------------------------------------------------------------- * Index scan related functions. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 8c53d1834e..e4733ca5a3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -40,6 +40,7 @@ #include "executor/nodeSort.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" +#include "executor/nodeTidrangescan.h" #include "jit/jit.h" #include "nodes/nodeFuncs.h" #include "pgstat.h" @@ -296,6 +297,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt); break; + case T_TidRangeScanState: + if (planstate->plan->parallel_aware) + ExecTidRangeScanEstimate((TidRangeScanState *) planstate, + e->pcxt); + break; default: break; } @@ -520,6 +526,11 @@ ExecParallelInitializeDSM(PlanState *planstate, /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt); break; + case T_TidRangeScanState: + if (planstate->plan->parallel_aware) + ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate, + d->pcxt); + break; default: break; } @@ -1006,6 +1017,11 @@ ExecParallelReInitializeDSM(PlanState *planstate, case T_MemoizeState: /* these nodes have DSM state, but no reinitialization is required */ break; + case T_TidRangeScanState: + if (planstate->plan->parallel_aware) + ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate, + pcxt); + break; default: break; @@ -1372,6 +1388,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt); break; + case T_TidRangeScanState: + if (planstate->plan->parallel_aware) + ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate, pwcxt); + break; default: break; } diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c index 9aa7683d7e..a353a17731 100644 --- a/src/backend/executor/nodeTidrangescan.c +++ b/src/backend/executor/nodeTidrangescan.c @@ -403,3 +403,84 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags) */ return tidrangestate; } +/* ---------------------------------------------------------------- + * Parallel Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecTidRangeScanEstimate + * + * Compute the amount of space we'll need in the parallel + * query DSM, and inform pcxt->estimator about our needs. + * ---------------------------------------------------------------- + */ +void +ExecTidRangeScanEstimate(TidRangeScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + + node->pscan_len = table_parallelscan_estimate(node->ss.ss_currentRelation, + estate->es_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecTidRangeScanInitializeDSM + * + * Set up a parallel heap scan descriptor. + * ---------------------------------------------------------------- + */ +void +ExecTidRangeScanInitializeDSM(TidRangeScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + ParallelTableScanDesc pscan; + + pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); + table_parallelscan_initialize(node->ss.ss_currentRelation, + pscan, + estate->es_snapshot); + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); + node->ss.ss_currentScanDesc = + table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan, + &node->trss_mintid, &node->trss_maxtid); +} + +/* ---------------------------------------------------------------- + * ExecTidRangeScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, + ParallelContext *pcxt) +{ + ParallelTableScanDesc pscan; + + pscan = node->ss.ss_currentScanDesc->rs_parallel; + table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan); + +} + +/* ---------------------------------------------------------------- + * ExecSeqScanInitializeWorker + * + * Copy relevant information from TOC into planstate. + * ---------------------------------------------------------------- + */ +void +ExecTidRangeScanInitializeWorker(TidRangeScanState *node, + ParallelWorkerContext *pwcxt) +{ + ParallelTableScanDesc pscan; + + pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false); + node->ss.ss_currentScanDesc = + table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan, + &node->trss_mintid, &node->trss_maxtid); +} diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index cc51ae1757..ab0d1d5fb7 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -794,6 +794,7 @@ static void create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel) { int parallel_workers; + Path *path = NULL; parallel_workers = compute_parallel_worker(rel, rel->pages, -1, max_parallel_workers_per_gather); @@ -804,6 +805,10 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel) /* Add an unordered partial path based on a parallel sequential scan. */ add_partial_path(rel, create_seqscan_path(root, rel, NULL, parallel_workers)); + + path = create_tidrangescan_subpaths(root, rel, parallel_workers); + if (path) + add_partial_path(rel, path); } /* diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index ee23ed7835..262b1ef02d 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -1435,6 +1435,28 @@ cost_tidrangescan(Path *path, PlannerInfo *root, startup_cost += path->pathtarget->cost.startup; run_cost += path->pathtarget->cost.per_tuple * path->rows; + /* Adjust costing for parallelism, if used. */ + if (path->parallel_workers > 0) + { + double parallel_divisor = get_parallel_divisor(path); + + /* The CPU cost is divided among all the workers. */ + run_cost /= parallel_divisor; + + /* + * It may be possible to amortize some of the I/O cost, but probably + * not very much, because most operating systems already do aggressive + * prefetching. For now, we assume that the disk run cost can't be + * amortized at all. + */ + + /* + * In the case of a parallel plan, the row count needs to represent + * the number of tuples processed per worker. + */ + path->rows = clamp_row_est(path->rows / parallel_divisor); + } + path->startup_cost = startup_cost; path->total_cost = startup_cost + run_cost; } diff --git a/src/backend/optimizer/path/tidpath.c b/src/backend/optimizer/path/tidpath.c index 2ae5ddfe43..c5413be9e6 100644 --- a/src/backend/optimizer/path/tidpath.c +++ b/src/backend/optimizer/path/tidpath.c @@ -496,7 +496,8 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel) add_path(rel, (Path *) create_tidrangescan_path(root, rel, tidrangequals, - required_outer)); + required_outer, + 0)); } /* @@ -526,3 +527,31 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel) */ BuildParameterizedTidPaths(root, rel, rel->joininfo); } + +Path * +create_tidrangescan_subpaths(PlannerInfo *root, RelOptInfo *rel, int parallel_workers) +{ + List *tidrangequals; + Path *path; + /* + * If there are range quals in the baserestrict list, generate a + * TidRangePath. + */ + tidrangequals = TidRangeQualFromRestrictInfoList(rel->baserestrictinfo, + rel); + + if (tidrangequals != NIL) + { + /* + * This path uses no join clauses, but it could still have required + * parameterization due to LATERAL refs in its tlist. + */ + Relids required_outer = rel->lateral_relids; + path = (Path *) create_tidrangescan_path(root, rel, + tidrangequals, + required_outer, + parallel_workers); + return path; + } + return NULL; +} diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 3cf1dac087..7ceeaf8688 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1206,7 +1206,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, */ TidRangePath * create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel, - List *tidrangequals, Relids required_outer) + List *tidrangequals, Relids required_outer, + int parallel_workers) { TidRangePath *pathnode = makeNode(TidRangePath); @@ -1215,9 +1216,9 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); - pathnode->path.parallel_aware = false; + pathnode->path.parallel_aware = (parallel_workers > 0); pathnode->path.parallel_safe = rel->consider_parallel; - pathnode->path.parallel_workers = 0; + pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* always unordered */ pathnode->tidrangequals = tidrangequals; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8e583b45cd..14c4a694a1 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1175,6 +1175,18 @@ extern void table_parallelscan_initialize(Relation rel, extern TableScanDesc table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan); +/* + * Begin a parallel tidrange scan. `pscan` needs to have been initialized with + * table_parallelscan_initialize(), for the same relation. The initialization + * does not need to have happened in this backend. + * + * Caller must hold a suitable lock on the relation. + */ +extern TableScanDesc table_beginscan_parallel_tidrange(Relation relation, + ParallelTableScanDesc pscan, + ItemPointer mintid, + ItemPointer maxtid); + /* * Restart a parallel scan. Call this in the leader process. Caller is * responsible for making sure that all workers have finished the scan diff --git a/src/include/executor/nodeTidrangescan.h b/src/include/executor/nodeTidrangescan.h index 1cfc7a07be..977cb8eb6e 100644 --- a/src/include/executor/nodeTidrangescan.h +++ b/src/include/executor/nodeTidrangescan.h @@ -14,6 +14,7 @@ #ifndef NODETIDRANGESCAN_H #define NODETIDRANGESCAN_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node, @@ -21,4 +22,10 @@ extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node, extern void ExecEndTidRangeScan(TidRangeScanState *node); extern void ExecReScanTidRangeScan(TidRangeScanState *node); +/* parallel scan support */ +extern void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt); +extern void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt); +extern void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt); +extern void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt); + #endif /* NODETIDRANGESCAN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index d927ac44a8..81eec34730 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1862,6 +1862,7 @@ typedef struct TidRangeScanState ItemPointerData trss_mintid; ItemPointerData trss_maxtid; bool trss_inScan; + Size pscan_len; /* size of parallel tid range scan descriptor */ } TidRangeScanState; /* ---------------- diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index c5c4756b0f..d7683ec1c3 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -66,7 +66,8 @@ extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, extern TidRangePath *create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel, List *tidrangequals, - Relids required_outer); + Relids required_outer, + int parallel_workers); extern AppendPath *create_append_path(PlannerInfo *root, RelOptInfo *rel, List *subpaths, List *partial_subpaths, List *pathkeys, Relids required_outer, diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 39ba461548..c571354890 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -87,6 +87,8 @@ extern void check_index_predicates(PlannerInfo *root, RelOptInfo *rel); * routines to generate tid paths */ extern void create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel); +extern Path *create_tidrangescan_subpaths(PlannerInfo *root, RelOptInfo *rel, + int parallel_worker); /* * joinpath.c diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 87273fa635..61e6700194 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -1293,4 +1293,55 @@ SELECT 1 FROM tenk1_vw_sec Filter: (f1 < tenk1_vw_sec.unique1) (9 rows) +-- test parallel tid range scan +EXPLAIN (COSTS OFF) +SELECT ctid FROM tenk1 WHERE ctid > '(0,1)' LIMIT 1; + QUERY PLAN +----------------------------------------------- + Limit + -> Gather + Workers Planned: 4 + -> Parallel Tid Range Scan on tenk1 + TID Cond: (ctid > '(0,1)'::tid) +(5 rows) + +EXPLAIN (COSTS OFF) +SELECT ctid FROM tenk1 WHERE ctid < '(400,1)' LIMIT 1; + QUERY PLAN +------------------------------------------------- + Limit + -> Gather + Workers Planned: 4 + -> Parallel Tid Range Scan on tenk1 + TID Cond: (ctid < '(400,1)'::tid) +(5 rows) + +EXPLAIN (COSTS OFF) +SELECT ctid FROM tenk1 WHERE ctid >= '(0,1)' AND ctid <= '(400,1)' LIMIT 1; + QUERY PLAN +------------------------------------------------------------------------------- + Limit + -> Gather + Workers Planned: 4 + -> Parallel Tid Range Scan on tenk1 + TID Cond: ((ctid >= '(0,1)'::tid) AND (ctid <= '(400,1)'::tid)) +(5 rows) + +EXPLAIN (COSTS OFF) +SELECT t.ctid,t2.c FROM tenk1 t, +LATERAL (SELECT count(*) c FROM tenk1 t2 WHERE t2.ctid <= t.ctid) t2 +WHERE t.ctid < '(1,0)' LIMIT 1; + QUERY PLAN +------------------------------------------------------ + Limit + -> Nested Loop + -> Gather + Workers Planned: 4 + -> Parallel Tid Range Scan on tenk1 t + TID Cond: (ctid < '(1,0)'::tid) + -> Aggregate + -> Tid Range Scan on tenk1 t2 + TID Cond: (ctid <= t.ctid) +(9 rows) + rollback; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 20376c03fa..1d4ef68790 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -495,4 +495,19 @@ EXPLAIN (COSTS OFF) SELECT 1 FROM tenk1_vw_sec WHERE (SELECT sum(f1) FROM int4_tbl WHERE f1 < unique1) < 100; +-- test parallel tid range scan +EXPLAIN (COSTS OFF) +SELECT ctid FROM tenk1 WHERE ctid > '(0,1)' LIMIT 1; + +EXPLAIN (COSTS OFF) +SELECT ctid FROM tenk1 WHERE ctid < '(400,1)' LIMIT 1; + +EXPLAIN (COSTS OFF) +SELECT ctid FROM tenk1 WHERE ctid >= '(0,1)' AND ctid <= '(400,1)' LIMIT 1; + +EXPLAIN (COSTS OFF) +SELECT t.ctid,t2.c FROM tenk1 t, +LATERAL (SELECT count(*) c FROM tenk1 t2 WHERE t2.ctid <= t.ctid) t2 +WHERE t.ctid < '(1,0)' LIMIT 1; + rollback; -- 2.17.1