From 3f8b6f9ec4f853c1870a6b91d81829381937470d Mon Sep 17 00:00:00 2001 From: Richard Guo Date: Tue, 11 Jun 2019 07:48:29 +0000 Subject: [PATCH] Implementing parallel grouping sets. Parallel aggregation has already been supported in PostgreSQL and it is implemented by aggregating in two stages. First, each worker performs an aggregation step, producing a partial result for each group of which that process is aware. Second, the partial results are transferred to the leader via the Gather node. Finally, the leader merges the partial results and produces the final result for each group. We are implementing parallel grouping sets in the same way. The only difference is that in the final stage, the leader performs a grouping sets aggregation, rather than a normal aggregation. Co-authored-by: Richard Guo Co-authored-by: Paul Guo --- src/backend/optimizer/plan/createplan.c | 4 +- src/backend/optimizer/plan/planner.c | 59 ++++--- src/backend/optimizer/util/pathnode.c | 2 + src/include/nodes/pathnodes.h | 1 + src/include/optimizer/pathnode.h | 1 + src/test/regress/expected/parallelgroupingsets.out | 178 +++++++++++++++++++++ src/test/regress/sql/parallelgroupingsets.sql | 43 +++++ 7 files changed, 265 insertions(+), 23 deletions(-) create mode 100644 src/test/regress/expected/parallelgroupingsets.out create mode 100644 src/test/regress/sql/parallelgroupingsets.sql diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 608d5ad..6e9dfa5 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -2245,7 +2245,7 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) agg_plan = (Plan *) make_agg(NIL, NIL, strat, - AGGSPLIT_SIMPLE, + best_path->aggsplit, list_length((List *) linitial(rollup->gsets)), new_grpColIdx, extract_grouping_ops(rollup->groupClause), @@ -2283,7 +2283,7 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) plan = make_agg(build_path_tlist(root, &best_path->path), best_path->qual, best_path->aggstrategy, - AGGSPLIT_SIMPLE, + best_path->aggsplit, numGroupCols, top_grpColIdx, extract_grouping_ops(rollup->groupClause), diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index cb897cc..f6566f9 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -176,7 +176,8 @@ static void consider_groupingsets_paths(PlannerInfo *root, bool can_hash, grouping_sets_data *gd, const AggClauseCosts *agg_costs, - double dNumGroups); + double dNumGroups, + AggSplit aggsplit); static RelOptInfo *create_window_paths(PlannerInfo *root, RelOptInfo *input_rel, PathTarget *input_target, @@ -4183,7 +4184,8 @@ consider_groupingsets_paths(PlannerInfo *root, bool can_hash, grouping_sets_data *gd, const AggClauseCosts *agg_costs, - double dNumGroups) + double dNumGroups, + AggSplit aggsplit) { Query *parse = root->parse; @@ -4345,6 +4347,7 @@ consider_groupingsets_paths(PlannerInfo *root, path, (List *) parse->havingQual, strat, + aggsplit, new_rollups, agg_costs, dNumGroups)); @@ -4502,6 +4505,7 @@ consider_groupingsets_paths(PlannerInfo *root, path, (List *) parse->havingQual, AGG_MIXED, + aggsplit, rollups, agg_costs, dNumGroups)); @@ -4518,6 +4522,7 @@ consider_groupingsets_paths(PlannerInfo *root, path, (List *) parse->havingQual, AGG_SORTED, + aggsplit, gd->rollups, agg_costs, dNumGroups)); @@ -6406,7 +6411,7 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, { consider_groupingsets_paths(root, grouped_rel, path, true, can_hash, - gd, agg_costs, dNumGroups); + gd, agg_costs, dNumGroups, AGGSPLIT_SIMPLE); } else if (parse->hasAggs) { @@ -6473,7 +6478,14 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, -1.0); } - if (parse->hasAggs) + /* + * parallel grouping sets + */ + if (parse->groupingSets) + consider_groupingsets_paths(root, grouped_rel, + path, true, can_hash, + gd, agg_final_costs, dNumGroups, AGGSPLIT_FINAL_DESERIAL); + else if (parse->hasAggs) add_path(grouped_rel, (Path *) create_agg_path(root, grouped_rel, @@ -6508,7 +6520,7 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, */ consider_groupingsets_paths(root, grouped_rel, cheapest_path, false, true, - gd, agg_costs, dNumGroups); + gd, agg_costs, dNumGroups, AGGSPLIT_SIMPLE); } else { @@ -6556,17 +6568,27 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, dNumGroups); if (hashaggtablesize < work_mem * 1024L) - add_path(grouped_rel, (Path *) - create_agg_path(root, - grouped_rel, - path, - grouped_rel->reltarget, - AGG_HASHED, - AGGSPLIT_FINAL_DESERIAL, - parse->groupClause, - havingQual, - agg_final_costs, - dNumGroups)); + { + /* + * parallel grouping sets + */ + if (parse->groupingSets) + consider_groupingsets_paths(root, grouped_rel, + path, false, true, + gd, agg_final_costs, dNumGroups, AGGSPLIT_FINAL_DESERIAL); + else + add_path(grouped_rel, (Path *) + create_agg_path(root, + grouped_rel, + path, + grouped_rel->reltarget, + AGG_HASHED, + AGGSPLIT_FINAL_DESERIAL, + parse->groupClause, + havingQual, + agg_final_costs, + dNumGroups)); + } } } @@ -6952,11 +6974,6 @@ can_partial_agg(PlannerInfo *root, const AggClauseCosts *agg_costs) */ return false; } - else if (parse->groupingSets) - { - /* We don't know how to do grouping sets in parallel. */ - return false; - } else if (agg_costs->hasNonPartial || agg_costs->hasNonSerial) { /* Insufficient support for partial mode. */ diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index d884d2b..b5d79d2 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3014,6 +3014,7 @@ create_groupingsets_path(PlannerInfo *root, Path *subpath, List *having_qual, AggStrategy aggstrategy, + AggSplit aggsplit, List *rollups, const AggClauseCosts *agg_costs, double numGroups) @@ -3059,6 +3060,7 @@ create_groupingsets_path(PlannerInfo *root, pathnode->path.pathkeys = NIL; pathnode->aggstrategy = aggstrategy; + pathnode->aggsplit = aggsplit; pathnode->rollups = rollups; pathnode->qual = having_qual; diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 4b7703d..739f279 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -1693,6 +1693,7 @@ typedef struct GroupingSetsPath Path path; Path *subpath; /* path representing input source */ AggStrategy aggstrategy; /* basic strategy */ + AggSplit aggsplit; /* agg-splitting mode, see nodes.h */ List *rollups; /* list of RollupData */ List *qual; /* quals (HAVING quals), if any */ } GroupingSetsPath; diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index e70d6a3..9d912fd 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -217,6 +217,7 @@ extern GroupingSetsPath *create_groupingsets_path(PlannerInfo *root, Path *subpath, List *having_qual, AggStrategy aggstrategy, + AggSplit aggsplit, List *rollups, const AggClauseCosts *agg_costs, double numGroups); diff --git a/src/test/regress/expected/parallelgroupingsets.out b/src/test/regress/expected/parallelgroupingsets.out new file mode 100644 index 0000000..52761e7 --- /dev/null +++ b/src/test/regress/expected/parallelgroupingsets.out @@ -0,0 +1,178 @@ +-- +-- grouping sets +-- +-- test data sources +create table gstest(c1 int, c2 int, c3 int); +insert into gstest select 1,10,100 from generate_series(1,1000000)i; +insert into gstest select 1,10,200 from generate_series(1,1000000)i; +insert into gstest select 1,20,30 from generate_series(1,1000000)i; +insert into gstest select 2,30,40 from generate_series(1,1000000)i; +insert into gstest select 2,40,50 from generate_series(1,1000000)i; +insert into gstest select 3,50,60 from generate_series(1,1000000)i; +insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i; +analyze gstest; +SET parallel_tuple_cost=0; +SET parallel_setup_cost=0; +SET max_parallel_workers_per_gather=4; +-- test for hashagg +set enable_hashagg to on; +explain (costs off, verbose) +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)); + QUERY PLAN +------------------------------------------------------------ + Finalize GroupAggregate + Output: c1, c2, avg(c3) + Group Key: gstest.c1, gstest.c2 + Group Key: gstest.c1 + -> Gather Merge + Output: c1, c2, (PARTIAL avg(c3)) + Workers Planned: 4 + -> Sort + Output: c1, c2, (PARTIAL avg(c3)) + Sort Key: gstest.c1, gstest.c2 + -> Partial HashAggregate + Output: c1, c2, PARTIAL avg(c3) + Group Key: gstest.c1, gstest.c2 + -> Parallel Seq Scan on public.gstest + Output: c1, c2, c3 +(15 rows) + +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)) order by 1,2,3; + c1 | c2 | avg +----+----+---------------------------- + 1 | 10 | 150.0000000000000000 + 1 | 20 | 30.0000000000000000 + 1 | | 0.000000000000000000000000 + 1 | | 82.5000000000000000 + 2 | 30 | 40.0000000000000000 + 2 | 40 | 50.0000000000000000 + 2 | | 45.0000000000000000 + 3 | 50 | 60.0000000000000000 + 3 | | 60.0000000000000000 +(9 rows) + +explain (costs off, verbose) +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)); + QUERY PLAN +---------------------------------------------------------------- + Finalize MixedAggregate + Output: c1, c2, c3, avg(c3) + Hash Key: gstest.c2, gstest.c3 + Group Key: gstest.c1, gstest.c2 + Group Key: gstest.c1 + -> Gather Merge + Output: c1, c2, c3, (PARTIAL avg(c3)) + Workers Planned: 4 + -> Sort + Output: c1, c2, c3, (PARTIAL avg(c3)) + Sort Key: gstest.c1, gstest.c2 + -> Partial HashAggregate + Output: c1, c2, c3, PARTIAL avg(c3) + Group Key: gstest.c1, gstest.c2, gstest.c3 + -> Parallel Seq Scan on public.gstest + Output: c1, c2, c3 +(16 rows) + +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)) order by 1,2,3,4; + c1 | c2 | c3 | avg +----+----+-----+---------------------------- + 1 | 10 | | 150.0000000000000000 + 1 | 20 | | 30.0000000000000000 + 1 | | | 0.000000000000000000000000 + 1 | | | 82.5000000000000000 + 2 | 30 | | 40.0000000000000000 + 2 | 40 | | 50.0000000000000000 + 2 | | | 45.0000000000000000 + 3 | 50 | | 60.0000000000000000 + 3 | | | 60.0000000000000000 + | 10 | 100 | 100.0000000000000000 + | 10 | 200 | 200.0000000000000000 + | 20 | 30 | 30.0000000000000000 + | 30 | 40 | 40.0000000000000000 + | 40 | 50 | 50.0000000000000000 + | 50 | 60 | 60.0000000000000000 + | | 0 | 0.000000000000000000000000 +(16 rows) + +-- test for groupagg +set enable_hashagg to off; +explain (costs off, verbose) +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)); + QUERY PLAN +------------------------------------------------------------ + Finalize GroupAggregate + Output: c1, c2, avg(c3) + Group Key: gstest.c1, gstest.c2 + Group Key: gstest.c1 + -> Gather Merge + Output: c1, c2, (PARTIAL avg(c3)) + Workers Planned: 4 + -> Partial GroupAggregate + Output: c1, c2, PARTIAL avg(c3) + Group Key: gstest.c1, gstest.c2 + -> Sort + Output: c1, c2, c3 + Sort Key: gstest.c1, gstest.c2 + -> Parallel Seq Scan on public.gstest + Output: c1, c2, c3 +(15 rows) + +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)) order by 1,2,3; + c1 | c2 | avg +----+----+---------------------------- + 1 | 10 | 150.0000000000000000 + 1 | 20 | 30.0000000000000000 + 1 | | 0.000000000000000000000000 + 1 | | 82.5000000000000000 + 2 | 30 | 40.0000000000000000 + 2 | 40 | 50.0000000000000000 + 2 | | 45.0000000000000000 + 3 | 50 | 60.0000000000000000 + 3 | | 60.0000000000000000 +(9 rows) + +explain (costs off, verbose) +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)); + QUERY PLAN +------------------------------------------------------------ + Finalize GroupAggregate + Output: c1, c2, c3, avg(c3) + Group Key: gstest.c1, gstest.c2 + Group Key: gstest.c1 + Sort Key: gstest.c2, gstest.c3 + Group Key: gstest.c2, gstest.c3 + -> Gather Merge + Output: c1, c2, c3, (PARTIAL avg(c3)) + Workers Planned: 4 + -> Partial GroupAggregate + Output: c1, c2, c3, PARTIAL avg(c3) + Group Key: gstest.c1, gstest.c2, gstest.c3 + -> Sort + Output: c1, c2, c3 + Sort Key: gstest.c1, gstest.c2 + -> Parallel Seq Scan on public.gstest + Output: c1, c2, c3 +(17 rows) + +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)) order by 1,2,3,4; + c1 | c2 | c3 | avg +----+----+-----+---------------------------- + 1 | 10 | | 150.0000000000000000 + 1 | 20 | | 30.0000000000000000 + 1 | | | 0.000000000000000000000000 + 1 | | | 82.5000000000000000 + 2 | 30 | | 40.0000000000000000 + 2 | 40 | | 50.0000000000000000 + 2 | | | 45.0000000000000000 + 3 | 50 | | 60.0000000000000000 + 3 | | | 60.0000000000000000 + | 10 | 100 | 100.0000000000000000 + | 10 | 200 | 200.0000000000000000 + | 20 | 30 | 30.0000000000000000 + | 30 | 40 | 40.0000000000000000 + | 40 | 50 | 50.0000000000000000 + | 50 | 60 | 60.0000000000000000 + | | 0 | 0.000000000000000000000000 +(16 rows) + +drop table gstest; diff --git a/src/test/regress/sql/parallelgroupingsets.sql b/src/test/regress/sql/parallelgroupingsets.sql new file mode 100644 index 0000000..24cdb3b --- /dev/null +++ b/src/test/regress/sql/parallelgroupingsets.sql @@ -0,0 +1,43 @@ +-- +-- grouping sets +-- + +-- test data sources +create table gstest(c1 int, c2 int, c3 int); + +insert into gstest select 1,10,100 from generate_series(1,1000000)i; +insert into gstest select 1,10,200 from generate_series(1,1000000)i; +insert into gstest select 1,20,30 from generate_series(1,1000000)i; +insert into gstest select 2,30,40 from generate_series(1,1000000)i; +insert into gstest select 2,40,50 from generate_series(1,1000000)i; +insert into gstest select 3,50,60 from generate_series(1,1000000)i; +insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i; +analyze gstest; + +SET parallel_tuple_cost=0; +SET parallel_setup_cost=0; +SET max_parallel_workers_per_gather=4; + +-- test for hashagg +set enable_hashagg to on; +explain (costs off, verbose) +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)); +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)) order by 1,2,3; + +explain (costs off, verbose) +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)); +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)) order by 1,2,3,4; + + +-- test for groupagg +set enable_hashagg to off; +explain (costs off, verbose) +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)); +select c1, c2, avg(c3) from gstest group by grouping sets((c1,c2),(c1)) order by 1,2,3; + +explain (costs off, verbose) +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)); +select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1), (c2,c3)) order by 1,2,3,4; + + +drop table gstest; -- 2.7.4