From 07ccd5b8d1776b5109c54d20ed3dcaef22d752f9 Mon Sep 17 00:00:00 2001 From: jcoleman Date: Fri, 21 Jan 2022 22:38:46 -0500 Subject: [PATCH v5 2/3] Parallelize correlated subqueries When params are provided at the current query level (i.e., are generated within a single worker and not shared across workers) we can safely execute these in parallel. Alternative approach using just relids subset check --- doc/src/sgml/parallel.sgml | 3 +- src/backend/optimizer/path/allpaths.c | 18 ++- src/backend/optimizer/path/joinpath.c | 16 ++- src/backend/optimizer/util/clauses.c | 3 + src/backend/optimizer/util/pathnode.c | 2 + src/include/nodes/pathnodes.h | 2 +- .../regress/expected/incremental_sort.out | 28 ++-- src/test/regress/expected/partition_prune.out | 104 +++++++------- src/test/regress/expected/select_parallel.out | 128 ++++++++++-------- 9 files changed, 169 insertions(+), 135 deletions(-) diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index c37fb67065..d44325dd89 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -517,7 +517,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; - Plan nodes that reference a correlated SubPlan. + Plan nodes that reference a correlated SubPlan where + the result is shared between workers. diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 8fc28007f5..e1ad9ae372 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -558,7 +558,8 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, * the final scan/join targetlist is available (see grouping_planner). */ if (rel->reloptkind == RELOPT_BASEREL && - !bms_equal(rel->relids, root->all_baserels)) + !bms_equal(rel->relids, root->all_baserels) + && (rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY)) generate_useful_gather_paths(root, rel, false); /* Now find the cheapest of the paths for this rel */ @@ -3025,7 +3026,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) cheapest_partial_path->rows * cheapest_partial_path->parallel_workers; simple_gather_path = (Path *) create_gather_path(root, rel, cheapest_partial_path, rel->reltarget, - NULL, rowsp); + rel->lateral_relids, rowsp); add_path(rel, simple_gather_path); /* @@ -3042,7 +3043,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) rows = subpath->rows * subpath->parallel_workers; path = create_gather_merge_path(root, rel, subpath, rel->reltarget, - subpath->pathkeys, NULL, rowsp); + subpath->pathkeys, rel->lateral_relids, rowsp); add_path(rel, &path->path); } } @@ -3144,11 +3145,15 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r double *rowsp = NULL; List *useful_pathkeys_list = NIL; Path *cheapest_partial_path = NULL; + Relids required_outer = rel->lateral_relids; /* If there are no partial paths, there's nothing to do here. */ if (rel->partial_pathlist == NIL) return; + if (!bms_is_subset(required_outer, rel->relids)) + return; + /* Should we override the rel's rowcount estimate? */ if (override_rows) rowsp = &rows; @@ -3220,7 +3225,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r tmp, rel->reltarget, tmp->pathkeys, - NULL, + required_outer, rowsp); add_path(rel, &path->path); @@ -3254,7 +3259,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r tmp, rel->reltarget, tmp->pathkeys, - NULL, + required_outer, rowsp); add_path(rel, &path->path); @@ -3433,7 +3438,8 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels) /* * Except for the topmost scan/join rel, consider gathering * partial paths. We'll do the same for the topmost scan/join rel - * once we know the final targetlist (see grouping_planner). + * once we know the final targetlist (see + * apply_scanjoin_target_to_paths). */ if (!bms_equal(rel->relids, root->all_baserels)) generate_useful_gather_paths(root, rel, false); diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index 2a3f0ab7bf..21606f6e59 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1791,16 +1791,24 @@ match_unsorted_outer(PlannerInfo *root, * partial path and the joinrel is parallel-safe. However, we can't * handle JOIN_UNIQUE_OUTER, because the outer path will be partial, and * therefore we won't be able to properly guarantee uniqueness. Nor can - * we handle joins needing lateral rels, since partial paths must not be - * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT, - * because they can produce false null extended rows. + * we handle JOIN_FULL and JOIN_RIGHT, because they can produce false null + * extended rows. + * + * Partial paths may only have parameters in limited cases + * where the parameterization is fully satisfied without sharing state + * between workers, so we only allow lateral rels on inputs to the join + * if the resulting join contains no lateral rels, the inner rel's laterals + * are fully satisfied by the outer rel, and the outer rel doesn't depend + * on the inner rel to produce any laterals. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && - bms_is_empty(joinrel->lateral_relids)) + bms_is_empty(joinrel->lateral_relids) && + bms_is_subset(innerrel->lateral_relids, outerrel->relids) && + (bms_is_empty(outerrel->lateral_relids) || !bms_is_subset(outerrel->lateral_relids, innerrel->relids))) { if (nestjoinOK) consider_parallel_nestloop(root, joinrel, outerrel, innerrel, diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index bf3a7cae60..11bab7fa7e 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -822,6 +822,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) if (param->paramkind == PARAM_EXTERN) return false; + if (param->paramkind == PARAM_EXEC) + return false; + if (param->paramkind != PARAM_EXEC || !list_member_int(context->safe_param_ids, param->paramid)) { diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index e10561d843..e8309342f0 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2437,6 +2437,8 @@ create_nestloop_path(PlannerInfo *root, NestPath *pathnode = makeNode(NestPath); Relids inner_req_outer = PATH_REQ_OUTER(inner_path); + /* TODO: Assert lateral relids subset safety? */ + /* * If the inner path is parameterized by the outer, we must drop any * restrict_clauses that are due to be moved into the inner path. We have diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 294cfe9c47..b69d4e2692 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -2964,7 +2964,7 @@ typedef struct MinMaxAggInfo * for conflicting purposes. * * In addition, PARAM_EXEC slots are assigned for Params representing outputs - * from subplans (values that are setParam items for those subplans). These + * from subplans (values that are setParam items for those subplans). [TODO: is this true, or only for init plans?] These * IDs need not be tracked via PlannerParamItems, since we do not need any * duplicate-elimination nor later processing of the represented expressions. * Instead, we just record the assignment of the slot number by appending to diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index 49953eaade..66c09381f1 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -1612,16 +1612,16 @@ from tenk1 t, generate_series(1, 1000); QUERY PLAN --------------------------------------------------------------------------------- Unique - -> Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + -> Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (11 rows) explain (costs off) select @@ -1631,16 +1631,16 @@ from tenk1 t, generate_series(1, 1000) order by 1, 2; QUERY PLAN --------------------------------------------------------------------------- - Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (10 rows) -- Parallel sort but with expression not available until the upper rel. diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out index 7555764c77..5c45f9c0a5 100644 --- a/src/test/regress/expected/partition_prune.out +++ b/src/test/regress/expected/partition_prune.out @@ -1284,60 +1284,64 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM part p(x) ORDER BY x; -- -- pruning won't work for mc3p, because some keys are Params explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = t1.b and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1; - QUERY PLAN ------------------------------------------------------------------------ - Nested Loop - -> Append - -> Seq Scan on mc2p1 t1_1 - Filter: (a = 1) - -> Seq Scan on mc2p2 t1_2 - Filter: (a = 1) - -> Seq Scan on mc2p_default t1_3 - Filter: (a = 1) - -> Aggregate - -> Append - -> Seq Scan on mc3p0 t2_1 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p1 t2_2 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p2 t2_3 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p3 t2_4 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p4 t2_5 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p5 t2_6 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p6 t2_7 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p7 t2_8 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p_default t2_9 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) -(28 rows) + QUERY PLAN +----------------------------------------------------------------------------- + Gather + Workers Planned: 2 + -> Nested Loop + -> Parallel Append + -> Parallel Seq Scan on mc2p1 t1_1 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p2 t1_2 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p_default t1_3 + Filter: (a = 1) + -> Aggregate + -> Append + -> Seq Scan on mc3p0 t2_1 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p1 t2_2 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p2 t2_3 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p3 t2_4 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p4 t2_5 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p5 t2_6 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p6 t2_7 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p7 t2_8 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p_default t2_9 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) +(30 rows) -- pruning should work fine, because values for a prefix of keys (a, b) are -- available explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.c = t1.b and abs(t2.b) = 1 and t2.a = 1) s where t1.a = 1; - QUERY PLAN ------------------------------------------------------------------------ - Nested Loop - -> Append - -> Seq Scan on mc2p1 t1_1 - Filter: (a = 1) - -> Seq Scan on mc2p2 t1_2 - Filter: (a = 1) - -> Seq Scan on mc2p_default t1_3 - Filter: (a = 1) - -> Aggregate - -> Append - -> Seq Scan on mc3p0 t2_1 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p1 t2_2 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p_default t2_3 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) -(16 rows) + QUERY PLAN +----------------------------------------------------------------------------- + Gather + Workers Planned: 2 + -> Nested Loop + -> Parallel Append + -> Parallel Seq Scan on mc2p1 t1_1 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p2 t1_2 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p_default t1_3 + Filter: (a = 1) + -> Aggregate + -> Append + -> Seq Scan on mc3p0 t2_1 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p1 t2_2 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p_default t2_3 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) +(18 rows) -- also here, because values for all keys are provided explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = 1 and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1; diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 9b4d7dd44a..01443e2ffb 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -137,8 +137,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m explain (costs off) select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a))) from part_pa_test pa2; - QUERY PLAN --------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------- Aggregate -> Gather Workers Planned: 3 @@ -148,12 +148,14 @@ explain (costs off) SubPlan 2 -> Result SubPlan 1 - -> Append - -> Seq Scan on part_pa_test_p1 pa1_1 - Filter: (a = pa2.a) - -> Seq Scan on part_pa_test_p2 pa1_2 - Filter: (a = pa2.a) -(14 rows) + -> Gather + Workers Planned: 3 + -> Parallel Append + -> Parallel Seq Scan on part_pa_test_p1 pa1_1 + Filter: (a = pa2.a) + -> Parallel Seq Scan on part_pa_test_p2 pa1_2 + Filter: (a = pa2.a) +(16 rows) drop table part_pa_test; -- test with leader participation disabled @@ -320,19 +322,19 @@ explain (costs off, verbose) select QUERY PLAN ---------------------------------------------------------------------------- Gather - Output: (SubPlan 1) + Output: ((SubPlan 1)) Workers Planned: 4 -> Nested Loop - Output: t.unique1 + Output: (SubPlan 1) -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t Output: t.unique1 -> Function Scan on pg_catalog.generate_series Output: generate_series.generate_series Function Call: generate_series(1, 10) - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (14 rows) explain (costs off, verbose) select @@ -341,63 +343,69 @@ explain (costs off, verbose) select QUERY PLAN ---------------------------------------------------------------------- Gather - Output: (SubPlan 1) + Output: ((SubPlan 1)) Workers Planned: 4 -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t - Output: t.unique1 - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + Output: (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (9 rows) explain (costs off, verbose) select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) from tenk1 t limit 1; - QUERY PLAN -------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------- Limit Output: ((SubPlan 1)) - -> Seq Scan on public.tenk1 t - Output: (SubPlan 1) - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) -(8 rows) + -> Gather + Output: ((SubPlan 1)) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(11 rows) explain (costs off, verbose) select t.unique1 from tenk1 t where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); - QUERY PLAN -------------------------------------------------------------- - Seq Scan on public.tenk1 t + QUERY PLAN +---------------------------------------------------------------------- + Gather Output: t.unique1 - Filter: (t.unique1 = (SubPlan 1)) - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) -(7 rows) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1 + Filter: (t.unique1 = (SubPlan 1)) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(10 rows) explain (costs off, verbose) select * from tenk1 t order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Sort + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Merge Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) - Sort Key: ((SubPlan 1)) - -> Gather - Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1) - Workers Planned: 4 + Workers Planned: 4 + -> Sort + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) + Sort Key: ((SubPlan 1)) -> Parallel Seq Scan on public.tenk1 t - Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4 - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (12 rows) -- test subplan in join/lateral join @@ -409,14 +417,14 @@ explain (costs off, verbose, timing off) select t.unique1, l.* QUERY PLAN ---------------------------------------------------------------------- Gather - Output: t.unique1, (SubPlan 1) + Output: t.unique1, ((SubPlan 1)) Workers Planned: 4 -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t - Output: t.unique1 - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on public.tenk1 - Output: t.unique1 - Index Cond: (tenk1.unique1 = t.unique1) + Output: t.unique1, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) (9 rows) -- this is not parallel-safe due to use of random() within SubLink's testexpr: @@ -1322,8 +1330,10 @@ SELECT 1 FROM tenk1_vw_sec -> Parallel Index Only Scan using tenk1_unique1 on tenk1 SubPlan 1 -> Aggregate - -> Seq Scan on int4_tbl - Filter: (f1 < tenk1_vw_sec.unique1) -(9 rows) + -> Gather + Workers Planned: 1 + -> Parallel Seq Scan on int4_tbl + Filter: (f1 < tenk1_vw_sec.unique1) +(11 rows) rollback; -- 2.32.1 (Apple Git-133)