From ac42a4080e002627af2c221591d7c872612a7ce7 Mon Sep 17 00:00:00 2001 From: jcoleman Date: Sun, 2 Jul 2023 15:23:49 -0400 Subject: [PATCH v12 2/2] Parallelize correlated subqueries When params are provided at the current query level (i.e., are generated within a single worker and not shared across workers) we can safely execute these in parallel. We accomplish this by tracking the PARAM_EXEC params that are needed for a rel to be safely executed in parallel and only inserting a Gather node if that set is empty. --- doc/src/sgml/parallel.sgml | 3 +- src/backend/optimizer/path/allpaths.c | 24 ++- src/backend/optimizer/util/clauses.c | 73 ++++++-- src/backend/optimizer/util/relnode.c | 1 + src/include/nodes/pathnodes.h | 6 + src/include/optimizer/clauses.h | 1 + .../regress/expected/incremental_sort.out | 41 ++--- src/test/regress/expected/partition_prune.out | 104 +++++------ src/test/regress/expected/select_parallel.out | 161 ++++++++++-------- src/test/regress/sql/select_parallel.sql | 8 +- 10 files changed, 266 insertions(+), 156 deletions(-) diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index 5acc9537d6..fd32572ec8 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -518,7 +518,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; - Plan nodes that reference a correlated SubPlan. + Plan nodes that reference a correlated SubPlan where + the result is shared between workers. diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 84c4de488a..37c44d393b 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -632,7 +632,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, if (proparallel != PROPARALLEL_SAFE) return; - if (!is_parallel_safe(root, (Node *) rte->tablesample->args)) + if (!is_parallel_safe_with_params(root, (Node *) rte->tablesample->args, &rel->params_req_for_parallel)) return; } @@ -698,7 +698,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_FUNCTION: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->functions)) + if (!is_parallel_safe_with_params(root, (Node *) rte->functions, &rel->params_req_for_parallel)) return; break; @@ -708,7 +708,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_VALUES: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->values_lists)) + if (!is_parallel_safe_with_params(root, (Node *) rte->values_lists, &rel->params_req_for_parallel)) return; break; @@ -745,14 +745,14 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, * outer join clauses work correctly. It would likely break equivalence * classes, too. */ - if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo)) + if (!is_parallel_safe_with_params(root, (Node *) rel->baserestrictinfo, &rel->params_req_for_parallel)) return; /* * Likewise, if the relation's outputs are not parallel-safe, give up. * (Usually, they're just Vars, but sometimes they're not.) */ - if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs)) + if (!is_parallel_safe_with_params(root, (Node *) rel->reltarget->exprs, &rel->params_req_for_parallel)) return; /* We have a winner. */ @@ -3071,6 +3071,13 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) if (rel->partial_pathlist == NIL) return; + /* + * Wait to insert Gather nodes until all PARAM_EXEC params are provided + * within the current rel since we can't pass them to workers. + */ + if (!bms_is_empty(rel->params_req_for_parallel)) + return; + /* Should we override the rel's rowcount estimate? */ if (override_rows) rowsp = &rows; @@ -3209,6 +3216,13 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r if (rel->partial_pathlist == NIL) return; + /* + * Wait to insert Gather nodes until all PARAM_EXEC params are provided + * within the current rel since we can't pass them to workers. + */ + if (!bms_is_empty(rel->params_req_for_parallel)) + return; + /* Should we override the rel's rowcount estimate? */ if (override_rows) rowsp = &rows; diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 94eb56a1e7..9acad18154 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -91,6 +91,8 @@ typedef struct { char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ + bool check_params; + Bitmapset **required_params; List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ } max_parallel_hazard_context; @@ -720,6 +722,7 @@ max_parallel_hazard(Query *parse) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; + context.check_params = true; context.safe_param_ids = NIL; (void) max_parallel_hazard_walker((Node *) parse, &context); return context.max_hazard; @@ -736,8 +739,6 @@ bool is_parallel_safe(PlannerInfo *root, Node *node) { max_parallel_hazard_context context; - PlannerInfo *proot; - ListCell *l; /* * Even if the original querytree contained nothing unsafe, we need to @@ -751,6 +752,43 @@ is_parallel_safe(PlannerInfo *root, Node *node) /* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */ context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; + context.check_params = false; + context.required_params = NULL; + + return !max_parallel_hazard_walker(node, &context); +} + +/* + * is_parallel_safe_with_params + * As above, but additionally tracking what PARAM_EXEC params required to + * be provided within a worker for a gather to be inserted at this level + * of the query. Those required params are passed to the caller through + * the required_params argument. + * + * Note: required_params is only valid if node is otherwise parallel safe. + */ +bool +is_parallel_safe_with_params(PlannerInfo *root, Node *node, Bitmapset **required_params) +{ + max_parallel_hazard_context context; + PlannerInfo *proot; + ListCell *l; + + /* + * Even if the original querytree contained nothing unsafe, we need to + * search the expression if we have generated any PARAM_EXEC Params while + * planning, because those will have to be provided for the expression to + * remain parallel safe and there might be one in this expression. But + * otherwise we don't need to look. + */ + if (root->glob->maxParallelHazard == PROPARALLEL_SAFE && + root->glob->paramExecTypes == NIL) + return true; + /* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */ + context.max_hazard = PROPARALLEL_SAFE; + context.max_interesting = PROPARALLEL_RESTRICTED; + context.check_params = false; + context.required_params = required_params; context.safe_param_ids = NIL; /* @@ -890,13 +928,22 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) if (!subplan->parallel_safe && max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) return true; - save_safe_param_ids = context->safe_param_ids; - context->safe_param_ids = list_concat_copy(context->safe_param_ids, - subplan->paramIds); + + if (context->check_params || context->required_params != NULL) + { + save_safe_param_ids = context->safe_param_ids; + context->safe_param_ids = list_concat_copy(context->safe_param_ids, + subplan->paramIds); + } if (max_parallel_hazard_walker(subplan->testexpr, context)) return true; /* no need to restore safe_param_ids */ - list_free(context->safe_param_ids); - context->safe_param_ids = save_safe_param_ids; + + if (context->check_params || context->required_params != NULL) + { + list_free(context->safe_param_ids); + context->safe_param_ids = save_safe_param_ids; + } + /* we must also check args, but no special Param treatment there */ if (max_parallel_hazard_walker((Node *) subplan->args, context)) return true; @@ -915,14 +962,18 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) { Param *param = (Param *) node; - if (param->paramkind == PARAM_EXTERN) + if (param->paramkind != PARAM_EXEC || !(context->check_params || context->required_params != NULL)) return false; - if (param->paramkind != PARAM_EXEC || - !list_member_int(context->safe_param_ids, param->paramid)) + if (!list_member_int(context->safe_param_ids, param->paramid)) { if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) - return true; + { + if (context->required_params != NULL) + *context->required_params = bms_add_member(*context->required_params, param->paramid); + if (context->check_params) + return true; + } } return false; /* nothing to recurse to */ } diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c index 22d01cef5b..58228ff269 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c @@ -699,6 +699,7 @@ build_join_rel(PlannerInfo *root, joinrel->consider_startup = (root->tuple_fraction > 0); joinrel->consider_param_startup = false; joinrel->consider_parallel = false; + joinrel->params_req_for_parallel = bms_union(outer_rel->params_req_for_parallel, inner_rel->params_req_for_parallel); joinrel->reltarget = create_empty_pathtarget(); joinrel->pathlist = NIL; joinrel->ppilist = NIL; diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 137da178dc..cb5721963d 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -871,6 +871,12 @@ typedef struct RelOptInfo /* consider parallel paths? */ bool consider_parallel; + /* + * Params, if any, required to be provided when consider_parallel is true. + * Note: if consider_parallel is false then this is not meaningful. + */ + Bitmapset *params_req_for_parallel; + /* * default result targetlist for Paths scanning this relation; list of * Vars/Exprs, cost, width diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 34b301e537..eb4334f565 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -34,6 +34,7 @@ extern bool contain_subplans(Node *clause); extern char max_parallel_hazard(Query *parse); extern bool is_parallel_safe(PlannerInfo *root, Node *node); +extern bool is_parallel_safe_with_params(PlannerInfo *root, Node *node, Bitmapset **required_params); extern bool contain_nonstrict_functions(Node *clause); extern bool contain_exec_param(Node *clause, List *param_ids); extern bool contain_leaked_vars(Node *clause); diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index 7fdb685313..df1c52414e 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -1597,20 +1597,21 @@ explain (costs off) select distinct unique1, (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) from tenk1 t, generate_series(1, 1000); - QUERY PLAN ---------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------- Unique - -> Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 - -> Nested Loop - -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t - -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) -(11 rows) + -> Gather Merge + Workers Planned: 2 + -> Unique + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) + -> Nested Loop + -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t + -> Function Scan on generate_series + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) +(12 rows) explain (costs off) select unique1, @@ -1619,16 +1620,16 @@ from tenk1 t, generate_series(1, 1000) order by 1, 2; QUERY PLAN --------------------------------------------------------------------------- - Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (10 rows) -- Parallel sort but with expression not available until the upper rel. diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out index 9a4c48c055..c45590fdfe 100644 --- a/src/test/regress/expected/partition_prune.out +++ b/src/test/regress/expected/partition_prune.out @@ -1489,60 +1489,64 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM part p(x) ORDER BY x; -- -- pruning won't work for mc3p, because some keys are Params explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = t1.b and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1; - QUERY PLAN ------------------------------------------------------------------------ - Nested Loop - -> Append - -> Seq Scan on mc2p1 t1_1 - Filter: (a = 1) - -> Seq Scan on mc2p2 t1_2 - Filter: (a = 1) - -> Seq Scan on mc2p_default t1_3 - Filter: (a = 1) - -> Aggregate - -> Append - -> Seq Scan on mc3p0 t2_1 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p1 t2_2 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p2 t2_3 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p3 t2_4 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p4 t2_5 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p5 t2_6 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p6 t2_7 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p7 t2_8 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p_default t2_9 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) -(28 rows) + QUERY PLAN +----------------------------------------------------------------------------- + Gather + Workers Planned: 2 + -> Nested Loop + -> Parallel Append + -> Parallel Seq Scan on mc2p1 t1_1 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p2 t1_2 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p_default t1_3 + Filter: (a = 1) + -> Aggregate + -> Append + -> Seq Scan on mc3p0 t2_1 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p1 t2_2 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p2 t2_3 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p3 t2_4 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p4 t2_5 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p5 t2_6 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p6 t2_7 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p7 t2_8 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p_default t2_9 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) +(30 rows) -- pruning should work fine, because values for a prefix of keys (a, b) are -- available explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.c = t1.b and abs(t2.b) = 1 and t2.a = 1) s where t1.a = 1; - QUERY PLAN ------------------------------------------------------------------------ - Nested Loop - -> Append - -> Seq Scan on mc2p1 t1_1 - Filter: (a = 1) - -> Seq Scan on mc2p2 t1_2 - Filter: (a = 1) - -> Seq Scan on mc2p_default t1_3 - Filter: (a = 1) - -> Aggregate - -> Append - -> Seq Scan on mc3p0 t2_1 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p1 t2_2 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p_default t2_3 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) -(16 rows) + QUERY PLAN +----------------------------------------------------------------------------- + Gather + Workers Planned: 2 + -> Nested Loop + -> Parallel Append + -> Parallel Seq Scan on mc2p1 t1_1 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p2 t1_2 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p_default t1_3 + Filter: (a = 1) + -> Aggregate + -> Append + -> Seq Scan on mc3p0 t2_1 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p1 t2_2 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p_default t2_3 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) +(18 rows) -- also here, because values for all keys are provided explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = 1 and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1; diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index b76132ffe4..a6c4d6bf3f 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -130,30 +130,48 @@ select sp_test_func() order by 1; foo (2 rows) --- Parallel Append is not to be used when the subpath depends on the outer param +-- Parallel Append is can be used when the subpath depends on the outer params +-- when those params are consumed within the worker that generates them. create table part_pa_test(a int, b int) partition by range(a); create table part_pa_test_p1 partition of part_pa_test for values from (minvalue) to (0); create table part_pa_test_p2 partition of part_pa_test for values from (0) to (maxvalue); -explain (costs off) +explain (costs off, verbose) select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a))) from part_pa_test pa2; - QUERY PLAN --------------------------------------------------------------- - Aggregate + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + Output: (SubPlan 2) -> Gather + Output: (PARTIAL max((SubPlan 1))) Workers Planned: 3 - -> Parallel Append - -> Parallel Seq Scan on part_pa_test_p1 pa2_1 - -> Parallel Seq Scan on part_pa_test_p2 pa2_2 + -> Partial Aggregate + Output: PARTIAL max((SubPlan 1)) + -> Parallel Append + -> Parallel Seq Scan on public.part_pa_test_p1 pa2_1 + Output: pa2_1.a + -> Parallel Seq Scan on public.part_pa_test_p2 pa2_2 + Output: pa2_2.a + SubPlan 1 + -> Append + -> Seq Scan on public.part_pa_test_p1 pa1_1 + Output: pa1_1.b + Filter: (pa1_1.a = pa2.a) + -> Seq Scan on public.part_pa_test_p2 pa1_2 + Output: pa1_2.b + Filter: (pa1_2.a = pa2.a) SubPlan 2 -> Result - SubPlan 1 - -> Append - -> Seq Scan on part_pa_test_p1 pa1_1 - Filter: (a = pa2.a) - -> Seq Scan on part_pa_test_p2 pa1_2 - Filter: (a = pa2.a) -(14 rows) + Output: max((SubPlan 1)) +(23 rows) + +insert into part_pa_test(a, b) values (-1, 1), (1, 3); +select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a))) +from part_pa_test pa2; + max +----- + 3 +(1 row) drop table part_pa_test; -- test with leader participation disabled @@ -320,19 +338,19 @@ explain (costs off, verbose) select QUERY PLAN ---------------------------------------------------------------------------- Gather - Output: (SubPlan 1) + Output: ((SubPlan 1)) Workers Planned: 4 -> Nested Loop - Output: t.unique1 + Output: (SubPlan 1) -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t Output: t.unique1 -> Function Scan on pg_catalog.generate_series Output: generate_series.generate_series Function Call: generate_series(1, 10) - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (14 rows) explain (costs off, verbose) select @@ -341,63 +359,69 @@ explain (costs off, verbose) select QUERY PLAN ---------------------------------------------------------------------- Gather - Output: (SubPlan 1) + Output: ((SubPlan 1)) Workers Planned: 4 -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t - Output: t.unique1 - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + Output: (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (9 rows) explain (costs off, verbose) select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) from tenk1 t limit 1; - QUERY PLAN -------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------- Limit Output: ((SubPlan 1)) - -> Seq Scan on public.tenk1 t - Output: (SubPlan 1) - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) -(8 rows) + -> Gather + Output: ((SubPlan 1)) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(11 rows) explain (costs off, verbose) select t.unique1 from tenk1 t where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); - QUERY PLAN -------------------------------------------------------------- - Seq Scan on public.tenk1 t + QUERY PLAN +---------------------------------------------------------------------- + Gather Output: t.unique1 - Filter: (t.unique1 = (SubPlan 1)) - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) -(7 rows) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1 + Filter: (t.unique1 = (SubPlan 1)) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(10 rows) explain (costs off, verbose) select * from tenk1 t order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Sort + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Merge Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) - Sort Key: ((SubPlan 1)) - -> Gather - Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1) - Workers Planned: 4 + Workers Planned: 4 + -> Sort + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) + Sort Key: ((SubPlan 1)) -> Parallel Seq Scan on public.tenk1 t - Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4 - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (12 rows) -- test subplan in join/lateral join @@ -409,14 +433,14 @@ explain (costs off, verbose, timing off) select t.unique1, l.* QUERY PLAN ---------------------------------------------------------------------- Gather - Output: t.unique1, (SubPlan 1) + Output: t.unique1, ((SubPlan 1)) Workers Planned: 4 -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t - Output: t.unique1 - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + Output: t.unique1, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (9 rows) -- can't put a gather at the top of a subplan that takes a param @@ -1350,9 +1374,12 @@ SELECT 1 FROM tenk1_vw_sec Workers Planned: 4 -> Parallel Index Only Scan using tenk1_unique1 on tenk1 SubPlan 1 - -> Aggregate - -> Seq Scan on int4_tbl - Filter: (f1 < tenk1_vw_sec.unique1) -(9 rows) + -> Finalize Aggregate + -> Gather + Workers Planned: 1 + -> Partial Aggregate + -> Parallel Seq Scan on int4_tbl + Filter: (f1 < tenk1_vw_sec.unique1) +(12 rows) rollback; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index cc752a7d45..3fc24b2b94 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -53,13 +53,17 @@ $$ select 'foo'::varchar union all select 'bar'::varchar $$ language sql stable; select sp_test_func() order by 1; --- Parallel Append is not to be used when the subpath depends on the outer param +-- Parallel Append is can be used when the subpath depends on the outer params +-- when those params are consumed within the worker that generates them. create table part_pa_test(a int, b int) partition by range(a); create table part_pa_test_p1 partition of part_pa_test for values from (minvalue) to (0); create table part_pa_test_p2 partition of part_pa_test for values from (0) to (maxvalue); -explain (costs off) +explain (costs off, verbose) select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a))) from part_pa_test pa2; +insert into part_pa_test(a, b) values (-1, 1), (1, 3); +select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a))) +from part_pa_test pa2; drop table part_pa_test; -- test with leader participation disabled -- 2.39.3 (Apple Git-145)