Thread: Parallel grouping sets
Hi all,
Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.
Implementation 1
================
Attached is the patch and also there is a github branch [1] for this
work.
Parallel aggregation has already been supported in PostgreSQL and it is
implemented by aggregating in two stages. First, each worker performs an
aggregation step, producing a partial result for each group of which
that process is aware. Second, the partial results are transferred to
the leader via the Gather node. Finally, the leader merges the partial
results and produces the final result for each group.
We are implementing parallel grouping sets in the same way. The only
difference is that in the final stage, the leader performs a grouping
sets aggregation, rather than a normal aggregation.
The plan looks like:
# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3));
QUERY PLAN
---------------------------------------------------------
Finalize MixedAggregate
Output: c1, c2, avg(c3), c3
Hash Key: t2.c2, t2.c3
Group Key: t2.c1, t2.c2
Group Key: t2.c1
-> Gather Merge
Output: c1, c2, c3, (PARTIAL avg(c3))
Workers Planned: 2
-> Sort
Output: c1, c2, c3, (PARTIAL avg(c3))
Sort Key: t2.c1, t2.c2
-> Partial HashAggregate
Output: c1, c2, c3, PARTIAL avg(c3)
Group Key: t2.c1, t2.c2, t2.c3
-> Parallel Seq Scan on public.t2
Output: c1, c2, c3
(16 rows)
As the partial aggregation can be performed in parallel, we can expect a
speedup if the number of groups seen by the Finalize Aggregate node is
some less than the number of input rows.
For example, for the table provided in the test case within the patch,
running the above query in my Linux box:
# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- without patch
Planning Time: 0.123 ms
Execution Time: 9459.362 ms
# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- with patch
Planning Time: 0.204 ms
Execution Time: 1077.654 ms
But sometimes we may not benefit from this patch. For example, in the
worst-case scenario the number of groups seen by the Finalize Aggregate
node could be as many as the number of input rows which were seen by all
worker processes in the Partial Aggregate stage. This is prone to
happening with this patch, because the group key for Partial Aggregate
is all the columns involved in the grouping sets, such as in the above
query, it is (c1, c2, c3).
So, we have been working on another way to implement parallel grouping
sets.
Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.
Implementation 1
================
Attached is the patch and also there is a github branch [1] for this
work.
Parallel aggregation has already been supported in PostgreSQL and it is
implemented by aggregating in two stages. First, each worker performs an
aggregation step, producing a partial result for each group of which
that process is aware. Second, the partial results are transferred to
the leader via the Gather node. Finally, the leader merges the partial
results and produces the final result for each group.
We are implementing parallel grouping sets in the same way. The only
difference is that in the final stage, the leader performs a grouping
sets aggregation, rather than a normal aggregation.
The plan looks like:
# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3));
QUERY PLAN
---------------------------------------------------------
Finalize MixedAggregate
Output: c1, c2, avg(c3), c3
Hash Key: t2.c2, t2.c3
Group Key: t2.c1, t2.c2
Group Key: t2.c1
-> Gather Merge
Output: c1, c2, c3, (PARTIAL avg(c3))
Workers Planned: 2
-> Sort
Output: c1, c2, c3, (PARTIAL avg(c3))
Sort Key: t2.c1, t2.c2
-> Partial HashAggregate
Output: c1, c2, c3, PARTIAL avg(c3)
Group Key: t2.c1, t2.c2, t2.c3
-> Parallel Seq Scan on public.t2
Output: c1, c2, c3
(16 rows)
As the partial aggregation can be performed in parallel, we can expect a
speedup if the number of groups seen by the Finalize Aggregate node is
some less than the number of input rows.
For example, for the table provided in the test case within the patch,
running the above query in my Linux box:
# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- without patch
Planning Time: 0.123 ms
Execution Time: 9459.362 ms
# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- with patch
Planning Time: 0.204 ms
Execution Time: 1077.654 ms
But sometimes we may not benefit from this patch. For example, in the
worst-case scenario the number of groups seen by the Finalize Aggregate
node could be as many as the number of input rows which were seen by all
worker processes in the Partial Aggregate stage. This is prone to
happening with this patch, because the group key for Partial Aggregate
is all the columns involved in the grouping sets, such as in the above
query, it is (c1, c2, c3).
So, we have been working on another way to implement parallel grouping
sets.
Implementation 2
================
This work can be found in github branch [2]. As it contains some hacky
codes and a list of TODO items, this is far from a patch. So please
consider it as a PoC.
The idea is instead of performing grouping sets aggregation in Finalize
Aggregate, we perform it in Partial Aggregate.
The plan looks like:
# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1));
QUERY PLAN
--------------------------------------------------------------
Finalize GroupAggregate
Output: c1, c2, avg(c3), (gset_id)
Group Key: t2.c1, t2.c2, (gset_id)
-> Gather Merge
Output: c1, c2, (gset_id), (PARTIAL avg(c3))
Workers Planned: 2
-> Sort
Output: c1, c2, (gset_id), (PARTIAL avg(c3))
Sort Key: t2.c1, t2.c2, (gset_id)
-> Partial HashAggregate
Output: c1, c2, gset_id, PARTIAL avg(c3)
Hash Key: t2.c1, t2.c2
Hash Key: t2.c1
-> Parallel Seq Scan on public.t2
Output: c1, c2, c3
(15 rows)
With this method, there is a problem, i.e., in the final stage of
aggregation, the leader does not have a way to distinguish which tuple
comes from which grouping set, which turns out to be needed by leader
for merging the partial results.
For instance, suppose we have a table t(c1, c2, c3) containing one row
(1, NULL, 3), and we are selecting agg(c3) group by grouping sets
((c1,c2), (c1)). Then the leader would get two tuples via Gather node
for that row, both are (1, NULL, agg(3)), one is from group by (c1,c2)
and one is from group by (c1). If the leader cannot tell that the
two tuples are from two different grouping sets, it will merge them
incorrectly.
So we add a hidden column 'gset_id', representing grouping set id, to
the targetlist of Partial Aggregate node, as well as to the group key
for Finalize Aggregate node. So only tuples coming from the same
grouping set can get merged in the final stage of aggregation.
With this method, for grouping sets with multiple rollups, to simplify
the implementation, we generate a separate aggregation path for each
rollup, and then append them for the final path.
References:
[1] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets
[2] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets_2
================
This work can be found in github branch [2]. As it contains some hacky
codes and a list of TODO items, this is far from a patch. So please
consider it as a PoC.
The idea is instead of performing grouping sets aggregation in Finalize
Aggregate, we perform it in Partial Aggregate.
The plan looks like:
# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1));
QUERY PLAN
--------------------------------------------------------------
Finalize GroupAggregate
Output: c1, c2, avg(c3), (gset_id)
Group Key: t2.c1, t2.c2, (gset_id)
-> Gather Merge
Output: c1, c2, (gset_id), (PARTIAL avg(c3))
Workers Planned: 2
-> Sort
Output: c1, c2, (gset_id), (PARTIAL avg(c3))
Sort Key: t2.c1, t2.c2, (gset_id)
-> Partial HashAggregate
Output: c1, c2, gset_id, PARTIAL avg(c3)
Hash Key: t2.c1, t2.c2
Hash Key: t2.c1
-> Parallel Seq Scan on public.t2
Output: c1, c2, c3
(15 rows)
With this method, there is a problem, i.e., in the final stage of
aggregation, the leader does not have a way to distinguish which tuple
comes from which grouping set, which turns out to be needed by leader
for merging the partial results.
For instance, suppose we have a table t(c1, c2, c3) containing one row
(1, NULL, 3), and we are selecting agg(c3) group by grouping sets
((c1,c2), (c1)). Then the leader would get two tuples via Gather node
for that row, both are (1, NULL, agg(3)), one is from group by (c1,c2)
and one is from group by (c1). If the leader cannot tell that the
two tuples are from two different grouping sets, it will merge them
incorrectly.
So we add a hidden column 'gset_id', representing grouping set id, to
the targetlist of Partial Aggregate node, as well as to the group key
for Finalize Aggregate node. So only tuples coming from the same
grouping set can get merged in the final stage of aggregation.
With this method, for grouping sets with multiple rollups, to simplify
the implementation, we generate a separate aggregation path for each
rollup, and then append them for the final path.
References:
[1] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets
[2] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets_2
Any comments and feedback are welcome.
Thanks
Richard
Attachment
On Wed, 12 Jun 2019 at 14:59, Richard Guo <riguo@pivotal.io> wrote: > Implementation 1 > Parallel aggregation has already been supported in PostgreSQL and it is > implemented by aggregating in two stages. First, each worker performs an > aggregation step, producing a partial result for each group of which > that process is aware. Second, the partial results are transferred to > the leader via the Gather node. Finally, the leader merges the partial > results and produces the final result for each group. > > We are implementing parallel grouping sets in the same way. The only > difference is that in the final stage, the leader performs a grouping > sets aggregation, rather than a normal aggregation. Hi Richard, I think it was you an I that discussed #1 at unconference at PGCon 2 weeks ago. The good thing about #1 is that it can be implemented as planner-only changes just by adding some additional paths and some costing. #2 will be useful when we're unable to reduce the number of inputs to the final aggregate node by doing the initial grouping. However, since #1 is easier, then I'd suggest going with it first, since it's the path of least resistance. #1 should be fine as long as you properly cost the parallel agg and don't choose it when the number of groups going into the final agg isn't reduced by the partial agg node. Which brings me to: You'll need to do further work with the dNumGroups value. Since you're grouping by all the columns/exprs in the grouping sets you'll need the number of groups to be an estimate of that. Here's a quick test I did that shows the problem: create table abc(a int, b int, c int); insert into abc select a,b,1 from generate_Series(1,1000) a,generate_Series(1,1000) b; create statistics abc_a_b_stats (ndistinct) on a,b from abc; analyze abc; -- Here the Partial HashAggregate really should estimate that there will be 1 million rows. explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b)); QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------- Finalize HashAggregate (cost=14137.67..14177.67 rows=2000 width=16) (actual time=1482.746..1483.203 rows=2000 loops=1) Hash Key: a Hash Key: b -> Gather (cost=13697.67..14117.67 rows=4000 width=16) (actual time=442.140..765.931 rows=1000000 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial HashAggregate (cost=12697.67..12717.67 rows=2000 width=16) (actual time=402.917..526.045 rows=333333 loops=3) Group Key: a, b -> Parallel Seq Scan on abc (cost=0.00..9572.67 rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3) Planning Time: 0.140 ms Execution Time: 1489.734 ms (11 rows) but really, likely the parallel plan should not be chosen in this case since we're not really reducing the number of groups going into the finalize aggregate node. That'll need to be factored into the costing so that we don't choose the parallel plan when we're not going to reduce the work in the finalize aggregate node. I'm unsure exactly how that'll look. Logically, I think the choice parallelize or not to parallelize needs to be if (cost_partial_agg + cost_gather + cost_final_agg < cost_agg) { do it in parallel } else { do it in serial }. If you build both a serial and parallel set of paths then you should see which one is cheaper without actually constructing an "if" test like the one above. Here's a simple group by with the same group by clause items as you have in the plan above that does get the estimated number of groups perfectly. The plan above should have the same estimate. explain analyze select a,b,sum(c) from abc group by a,b; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------- GroupAggregate (cost=132154.34..152154.34 rows=1000000 width=16) (actual time=404.304..1383.343 rows=1000000 loops=1) Group Key: a, b -> Sort (cost=132154.34..134654.34 rows=1000000 width=12) (actual time=404.291..620.774 rows=1000000 loops=1) Sort Key: a, b Sort Method: external merge Disk: 21584kB -> Seq Scan on abc (cost=0.00..15406.00 rows=1000000 width=12) (actual time=0.017..100.299 rows=1000000 loops=1) Planning Time: 0.115 ms Execution Time: 1412.034 ms (8 rows) Also, in the tests: > insert into gstest select 1,10,100 from generate_series(1,1000000)i; > insert into gstest select 1,10,200 from generate_series(1,1000000)i; > insert into gstest select 1,20,30 from generate_series(1,1000000)i; > insert into gstest select 2,30,40 from generate_series(1,1000000)i; > insert into gstest select 2,40,50 from generate_series(1,1000000)i; > insert into gstest select 3,50,60 from generate_series(1,1000000)i; > insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i; > analyze gstest; You'll likely want to reduce the number of rows being used just to stop the regression tests becoming slow on older machines. I think some of the other parallel aggregate tests use must fewer rows than what you're using there. You might be able to use the standard set of regression test tables too, tenk, tenk1 etc. That'll save the test having to build and populate one of its own. -- David Rowley http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
On Thu, Jun 13, 2019 at 12:29 PM David Rowley <david.rowley@2ndquadrant.com> wrote:
On Wed, 12 Jun 2019 at 14:59, Richard Guo <riguo@pivotal.io> wrote:
> Implementation 1
> Parallel aggregation has already been supported in PostgreSQL and it is
> implemented by aggregating in two stages. First, each worker performs an
> aggregation step, producing a partial result for each group of which
> that process is aware. Second, the partial results are transferred to
> the leader via the Gather node. Finally, the leader merges the partial
> results and produces the final result for each group.
>
> We are implementing parallel grouping sets in the same way. The only
> difference is that in the final stage, the leader performs a grouping
> sets aggregation, rather than a normal aggregation.
Hi Richard,
I think it was you an I that discussed #1 at unconference at PGCon 2
weeks ago. The good thing about #1 is that it can be implemented as
planner-only changes just by adding some additional paths and some
costing. #2 will be useful when we're unable to reduce the number of
inputs to the final aggregate node by doing the initial grouping.
However, since #1 is easier, then I'd suggest going with it first,
since it's the path of least resistance. #1 should be fine as long as
you properly cost the parallel agg and don't choose it when the number
of groups going into the final agg isn't reduced by the partial agg
node. Which brings me to:
Hi David,
Yes. Thank you for the discussion at PGCon. I learned a lot from that.
And glad to meet you here. :)
I agree with you on going with #1 first.
And glad to meet you here. :)
I agree with you on going with #1 first.
You'll need to do further work with the dNumGroups value. Since you're
grouping by all the columns/exprs in the grouping sets you'll need the
number of groups to be an estimate of that.
Exactly. The v1 patch estimates number of partial groups incorrectly, as
it calculates the number of groups for each grouping set and then add
them for dNumPartialPartialGroups, while we actually should calculate
the number of groups for all the columns in the grouping sets. I have
fixed this issue in v2 patch.
it calculates the number of groups for each grouping set and then add
them for dNumPartialPartialGroups, while we actually should calculate
the number of groups for all the columns in the grouping sets. I have
fixed this issue in v2 patch.
Here's a quick test I did that shows the problem:
create table abc(a int, b int, c int);
insert into abc select a,b,1 from generate_Series(1,1000)
a,generate_Series(1,1000) b;
create statistics abc_a_b_stats (ndistinct) on a,b from abc;
analyze abc;
-- Here the Partial HashAggregate really should estimate that there
will be 1 million rows.
explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
Finalize HashAggregate (cost=14137.67..14177.67 rows=2000 width=16)
(actual time=1482.746..1483.203 rows=2000 loops=1)
Hash Key: a
Hash Key: b
-> Gather (cost=13697.67..14117.67 rows=4000 width=16) (actual
time=442.140..765.931 rows=1000000 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial HashAggregate (cost=12697.67..12717.67 rows=2000
width=16) (actual time=402.917..526.045 rows=333333 loops=3)
Group Key: a, b
-> Parallel Seq Scan on abc (cost=0.00..9572.67
rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3)
Planning Time: 0.140 ms
Execution Time: 1489.734 ms
(11 rows)
but really, likely the parallel plan should not be chosen in this case
since we're not really reducing the number of groups going into the
finalize aggregate node. That'll need to be factored into the costing
so that we don't choose the parallel plan when we're not going to
reduce the work in the finalize aggregate node. I'm unsure exactly how
that'll look. Logically, I think the choice parallelize or not to
parallelize needs to be if (cost_partial_agg + cost_gather +
cost_final_agg < cost_agg) { do it in parallel } else { do it in
serial }. If you build both a serial and parallel set of paths then
you should see which one is cheaper without actually constructing an
"if" test like the one above.
Both the serial and parallel set of paths would be built and the cheaper
one will be selected. So we don't need the 'if' test.
With v2 patch, the parallel plan will not be chosen for the above query:
one will be selected. So we don't need the 'if' test.
With v2 patch, the parallel plan will not be chosen for the above query:
# explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------
HashAggregate (cost=20406.00..25426.00 rows=2000 width=16) (actual time=935.048..935.697 rows=2000 loops=1)
Hash Key: a
Hash Key: b
-> Seq Scan on abc (cost=0.00..15406.00 rows=1000000 width=12) (actual time=0.041..170.906 rows=1000000 loops=1)
Planning Time: 0.240 ms
Execution Time: 935.978 ms
(6 rows)
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------
HashAggregate (cost=20406.00..25426.00 rows=2000 width=16) (actual time=935.048..935.697 rows=2000 loops=1)
Hash Key: a
Hash Key: b
-> Seq Scan on abc (cost=0.00..15406.00 rows=1000000 width=12) (actual time=0.041..170.906 rows=1000000 loops=1)
Planning Time: 0.240 ms
Execution Time: 935.978 ms
(6 rows)
Here's a simple group by with the same group by clause items as you
have in the plan above that does get the estimated number of groups
perfectly. The plan above should have the same estimate.
explain analyze select a,b,sum(c) from abc group by a,b;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
GroupAggregate (cost=132154.34..152154.34 rows=1000000 width=16)
(actual time=404.304..1383.343 rows=1000000 loops=1)
Group Key: a, b
-> Sort (cost=132154.34..134654.34 rows=1000000 width=12) (actual
time=404.291..620.774 rows=1000000 loops=1)
Sort Key: a, b
Sort Method: external merge Disk: 21584kB
-> Seq Scan on abc (cost=0.00..15406.00 rows=1000000
width=12) (actual time=0.017..100.299 rows=1000000 loops=1)
Planning Time: 0.115 ms
Execution Time: 1412.034 ms
(8 rows)
Also, in the tests:
> insert into gstest select 1,10,100 from generate_series(1,1000000)i;
> insert into gstest select 1,10,200 from generate_series(1,1000000)i;
> insert into gstest select 1,20,30 from generate_series(1,1000000)i;
> insert into gstest select 2,30,40 from generate_series(1,1000000)i;
> insert into gstest select 2,40,50 from generate_series(1,1000000)i;
> insert into gstest select 3,50,60 from generate_series(1,1000000)i;
> insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i;
> analyze gstest;
You'll likely want to reduce the number of rows being used just to
stop the regression tests becoming slow on older machines. I think
some of the other parallel aggregate tests use must fewer rows than
what you're using there. You might be able to use the standard set of
regression test tables too, tenk, tenk1 etc. That'll save the test
having to build and populate one of its own.
Yes, that makes sense. Table size has been reduced in v2 patch.
Currently I do not use the standard regression test tables as I'd like
to customize the table with some specific data for correctness
verification. But we may switch to the standard test table later.
Currently I do not use the standard regression test tables as I'd like
to customize the table with some specific data for correctness
verification. But we may switch to the standard test table later.
key for sort-based grouping sets in Partial Aggregate, which should be
all the columns in parse->groupClause. The other one is about
GroupingFunc. Since Partial Aggregate will not handle multiple grouping
sets at once, it does not need to evaluate GroupingFunc. So GroupingFunc
is removed from the targetlists of Partial Aggregate.
Thanks
Richard
Attachment
On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote: >Hi all, > >Paul and I have been hacking recently to implement parallel grouping >sets, and here we have two implementations. > >Implementation 1 >================ > >Attached is the patch and also there is a github branch [1] for this >work. > >Parallel aggregation has already been supported in PostgreSQL and it is >implemented by aggregating in two stages. First, each worker performs an >aggregation step, producing a partial result for each group of which >that process is aware. Second, the partial results are transferred to >the leader via the Gather node. Finally, the leader merges the partial >results and produces the final result for each group. > >We are implementing parallel grouping sets in the same way. The only >difference is that in the final stage, the leader performs a grouping >sets aggregation, rather than a normal aggregation. > >The plan looks like: > ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by >grouping sets((c1,c2), (c1), (c2,c3)); > QUERY PLAN >--------------------------------------------------------- > Finalize MixedAggregate > Output: c1, c2, avg(c3), c3 > Hash Key: t2.c2, t2.c3 > Group Key: t2.c1, t2.c2 > Group Key: t2.c1 > -> Gather Merge > Output: c1, c2, c3, (PARTIAL avg(c3)) > Workers Planned: 2 > -> Sort > Output: c1, c2, c3, (PARTIAL avg(c3)) > Sort Key: t2.c1, t2.c2 > -> Partial HashAggregate > Output: c1, c2, c3, PARTIAL avg(c3) > Group Key: t2.c1, t2.c2, t2.c3 > -> Parallel Seq Scan on public.t2 > Output: c1, c2, c3 >(16 rows) > >As the partial aggregation can be performed in parallel, we can expect a >speedup if the number of groups seen by the Finalize Aggregate node is >some less than the number of input rows. > >For example, for the table provided in the test case within the patch, >running the above query in my Linux box: > ># explain analyze select c1, c2, avg(c3) from t2 group by grouping >sets((c1,c2), (c1), (c2,c3)); -- without patch > Planning Time: 0.123 ms > Execution Time: 9459.362 ms > ># explain analyze select c1, c2, avg(c3) from t2 group by grouping >sets((c1,c2), (c1), (c2,c3)); -- with patch > Planning Time: 0.204 ms > Execution Time: 1077.654 ms > Very nice. That's pretty much exactly how I imagined it'd work. >But sometimes we may not benefit from this patch. For example, in the >worst-case scenario the number of groups seen by the Finalize Aggregate >node could be as many as the number of input rows which were seen by all >worker processes in the Partial Aggregate stage. This is prone to >happening with this patch, because the group key for Partial Aggregate >is all the columns involved in the grouping sets, such as in the above >query, it is (c1, c2, c3). > >So, we have been working on another way to implement parallel grouping >sets. > >Implementation 2 >================ > >This work can be found in github branch [2]. As it contains some hacky >codes and a list of TODO items, this is far from a patch. So please >consider it as a PoC. > >The idea is instead of performing grouping sets aggregation in Finalize >Aggregate, we perform it in Partial Aggregate. > >The plan looks like: > ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by >grouping sets((c1,c2), (c1)); > QUERY PLAN >-------------------------------------------------------------- > Finalize GroupAggregate > Output: c1, c2, avg(c3), (gset_id) > Group Key: t2.c1, t2.c2, (gset_id) > -> Gather Merge > Output: c1, c2, (gset_id), (PARTIAL avg(c3)) > Workers Planned: 2 > -> Sort > Output: c1, c2, (gset_id), (PARTIAL avg(c3)) > Sort Key: t2.c1, t2.c2, (gset_id) > -> Partial HashAggregate > Output: c1, c2, gset_id, PARTIAL avg(c3) > Hash Key: t2.c1, t2.c2 > Hash Key: t2.c1 > -> Parallel Seq Scan on public.t2 > Output: c1, c2, c3 >(15 rows) > OK, I'm not sure I understand the point of this - can you give an example which is supposed to benefit from this? Where does the speedup came from? regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Fri, 14 Jun 2019 at 11:45, Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: > > On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote: > ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by > >grouping sets((c1,c2), (c1)); > > QUERY PLAN > >-------------------------------------------------------------- > > Finalize GroupAggregate > > Output: c1, c2, avg(c3), (gset_id) > > Group Key: t2.c1, t2.c2, (gset_id) > > -> Gather Merge > > Output: c1, c2, (gset_id), (PARTIAL avg(c3)) > > Workers Planned: 2 > > -> Sort > > Output: c1, c2, (gset_id), (PARTIAL avg(c3)) > > Sort Key: t2.c1, t2.c2, (gset_id) > > -> Partial HashAggregate > > Output: c1, c2, gset_id, PARTIAL avg(c3) > > Hash Key: t2.c1, t2.c2 > > Hash Key: t2.c1 > > -> Parallel Seq Scan on public.t2 > > Output: c1, c2, c3 > >(15 rows) > > > > OK, I'm not sure I understand the point of this - can you give an > example which is supposed to benefit from this? Where does the speedup > came from? I think this is a bad example since the first grouping set is a superset of the 2nd. If those were independent and each grouping set produced a reasonable number of groups then it may be better to do it this way instead of grouping by all exprs in all grouping sets in the first phase, as is done by #1. To do #2 would require that we tag the aggregate state with the grouping set that belong to, which seem to be what gset_id is in Richard's output. In my example upthread the first phase of aggregation produced a group per input row. Method #2 would work better for that case since it would only produce 2000 groups instead of 1 million. Likely both methods would be good to consider, but since #1 seems much easier than #2, then to me it seems to make sense to start there. -- David Rowley http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
On Fri, Jun 14, 2019 at 12:02:52PM +1200, David Rowley wrote: >On Fri, 14 Jun 2019 at 11:45, Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: >> >> On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote: > >> ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by >> >grouping sets((c1,c2), (c1)); >> > QUERY PLAN >> >-------------------------------------------------------------- >> > Finalize GroupAggregate >> > Output: c1, c2, avg(c3), (gset_id) >> > Group Key: t2.c1, t2.c2, (gset_id) >> > -> Gather Merge >> > Output: c1, c2, (gset_id), (PARTIAL avg(c3)) >> > Workers Planned: 2 >> > -> Sort >> > Output: c1, c2, (gset_id), (PARTIAL avg(c3)) >> > Sort Key: t2.c1, t2.c2, (gset_id) >> > -> Partial HashAggregate >> > Output: c1, c2, gset_id, PARTIAL avg(c3) >> > Hash Key: t2.c1, t2.c2 >> > Hash Key: t2.c1 >> > -> Parallel Seq Scan on public.t2 >> > Output: c1, c2, c3 >> >(15 rows) >> > >> >> OK, I'm not sure I understand the point of this - can you give an >> example which is supposed to benefit from this? Where does the speedup >> came from? > >I think this is a bad example since the first grouping set is a >superset of the 2nd. If those were independent and each grouping set >produced a reasonable number of groups then it may be better to do it >this way instead of grouping by all exprs in all grouping sets in the >first phase, as is done by #1. To do #2 would require that we tag >the aggregate state with the grouping set that belong to, which seem >to be what gset_id is in Richard's output. > Aha! So if we have grouping sets (a,b) and (c,d), then with the first approach we'd do partial aggregate on (a,b,c,d) - which may produce quite a few distinct groups, making it inefficient. But with the second approach, we'd do just (a,b) and (c,d) and mark the rows with gset_id. Neat! >In my example upthread the first phase of aggregation produced a group >per input row. Method #2 would work better for that case since it >would only produce 2000 groups instead of 1 million. > >Likely both methods would be good to consider, but since #1 seems much >easier than #2, then to me it seems to make sense to start there. > Yep. Thanks for the explanation. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <riguo@pivotal.io> wrote:
Hi all,
Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.
Implementation 1
================
Attached is the patch and also there is a github branch [1] for this
work.
Rebased with the latest master.
Thanks
Richard
Attachment
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote: >On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <riguo@pivotal.io> wrote: > >> Hi all, >> >> Paul and I have been hacking recently to implement parallel grouping >> sets, and here we have two implementations. >> >> Implementation 1 >> ================ >> >> Attached is the patch and also there is a github branch [1] for this >> work. >> > >Rebased with the latest master. > Hi Richard, thanks for the rebased patch. I think the patch is mostly fine (at least I don't see any serious issues). A couple minor comments: 1) I think get_number_of_groups() would deserve a short explanation why it's OK to handle (non-partial) grouping sets and regular GROUP BY in the same branch. Before these cases were clearly separated, now it seems a bit mixed up and it may not be immediately obvious why it's OK. 2) There are new regression tests, but they are not added to any schedule (parallel or serial), and so are not executed as part of "make check". I suppose this is a mistake. 3) The regression tests do check plan and results like this: EXPLAIN (COSTS OFF, VERBOSE) SELECT ...; SELECT ... ORDER BY 1, 2, 3; which however means that the query might easily use a different plan than what's verified in the eplain (thanks to the additional ORDER BY clause). So I think this should explain and execute the same query. (In this case the plans seems to be the same, but that may easily change in the future, and we could miss it here, failing to verify the results.) 4) It might be a good idea to check the negative case too, i.e. a query on data set that we should not parallelize (because the number of partial groups would be too high). Do you have any plans to hack on the second approach too? AFAICS those two approaches are complementary (address different data sets / queries), and it would be nice to have both. One of the things I've been wondering is if we need to invent gset_id as a new concept, or if we could simply use the existing GROUPING() function - that uniquely identifies the grouping set. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <riguo@pivotal.io> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>
Hi Richard,
thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:
Hi Tomas,
Thank you for reviewing this patch.
1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.
when doing partial aggregation for grouping sets.
2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.
serial_schedule.
3) The regression tests do check plan and results like this:
EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
SELECT ... ORDER BY 1, 2, 3;
which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.
(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)
Thank you for pointing this out. Fixed it in V4 patch.
4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).
Yes, agree. Added a negative case.
Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.
Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach. I agree with you that we can try GROUPING()
function to see if it can replace gset_id.
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach. I agree with you that we can try GROUPING()
function to see if it can replace gset_id.
Thanks
Richard
Attachment
Hi Richard & Tomas:
I followed the idea of the second approach to add a gset_id in the targetlist of the first stage of
grouping sets and uses it to combine the aggregate in final stage. gset_id stuff is still kept
because of GROUPING() cannot uniquely identify a grouping set, grouping sets may contain
duplicated set, eg: group by grouping sets((c1, c2), (c1,c2)).
There are some differences to implement the second approach comparing to the original idea from
Richard, gset_id is not used as additional group key in the final stage, instead, we use it to
dispatch the input tuple to the specified grouping set directly and then do the aggregate.
One advantage of this is that we can handle multiple rollups with better performance without APPEND node.
the plan now looks like:
gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
QUERY PLAN
--------------------------------------------------------------------------------------------
Finalize MixedAggregate (cost=1000.00..73108.57 rows=8842 width=12)
Dispatched by: (GROUPINGSETID())
Hash Key: c1, c2
Hash Key: c1
Hash Key: c3
Group Key: ()
Group Key: ()
-> Gather (cost=1000.00..71551.48 rows=17684 width=16)
Workers Planned: 2
-> Partial MixedAggregate (cost=0.00..68783.08 rows=8842 width=16)
Hash Key: c1, c2
Hash Key: c1
Hash Key: c3
Group Key: ()
Group Key: ()
-> Parallel Seq Scan on gstest (cost=0.00..47861.33 rows=2083333 width=12)
(16 rows)
gpadmin=# set enable_hashagg to off;
gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (cost=657730.66..663207.45 rows=8842 width=12)
Dispatched by: (GROUPINGSETID())
Group Key: c1, c2
Sort Key: c1
Group Key: c1
Group Key: ()
Group Key: ()
Sort Key: c3
Group Key: c3
QUERY PLAN
--------------------------------------------------------------------------------------------
Finalize MixedAggregate (cost=1000.00..73108.57 rows=8842 width=12)
Dispatched by: (GROUPINGSETID())
Hash Key: c1, c2
Hash Key: c1
Hash Key: c3
Group Key: ()
Group Key: ()
-> Gather (cost=1000.00..71551.48 rows=17684 width=16)
Workers Planned: 2
-> Partial MixedAggregate (cost=0.00..68783.08 rows=8842 width=16)
Hash Key: c1, c2
Hash Key: c1
Hash Key: c3
Group Key: ()
Group Key: ()
-> Parallel Seq Scan on gstest (cost=0.00..47861.33 rows=2083333 width=12)
(16 rows)
gpadmin=# set enable_hashagg to off;
gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (cost=657730.66..663207.45 rows=8842 width=12)
Dispatched by: (GROUPINGSETID())
Group Key: c1, c2
Sort Key: c1
Group Key: c1
Group Key: ()
Group Key: ()
Sort Key: c3
Group Key: c3
-> Sort (cost=657730.66..657774.87 rows=17684 width=16)
Sort Key: c1, c2
-> Gather (cost=338722.94..656483.04 rows=17684 width=16)
Workers Planned: 2
-> Partial GroupAggregate (cost=337722.94..653714.64 rows=8842 width=16)
Group Key: c1, c2
Group Key: c1
Group Key: ()
Group Key: ()
Sort Key: c3
Group Key: c3
-> Sort (cost=337722.94..342931.28 rows=2083333 width=12)
Sort Key: c1, c2
-> Parallel Seq Scan on gstest (cost=0.00..47861.33 rows=2083333 width=12)
Sort Key: c1, c2
-> Gather (cost=338722.94..656483.04 rows=17684 width=16)
Workers Planned: 2
-> Partial GroupAggregate (cost=337722.94..653714.64 rows=8842 width=16)
Group Key: c1, c2
Group Key: c1
Group Key: ()
Group Key: ()
Sort Key: c3
Group Key: c3
-> Sort (cost=337722.94..342931.28 rows=2083333 width=12)
Sort Key: c1, c2
-> Parallel Seq Scan on gstest (cost=0.00..47861.33 rows=2083333 width=12)
On Wed, Jul 31, 2019 at 4:07 PM Richard Guo <riguo@pivotal.io> wrote:
On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <riguo@pivotal.io> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>
Hi Richard,
thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:Hi Tomas,Thank you for reviewing this patch.
1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.Added a short comment in get_number_of_groups() explaining the behaviorwhen doing partial aggregation for grouping sets.
2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.Yes, thanks. Added the new regression test in parallel_schedule andserial_schedule.
3) The regression tests do check plan and results like this:
EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
SELECT ... ORDER BY 1, 2, 3;
which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.
(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)Thank you for pointing this out. Fixed it in V4 patch.
4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).Yes, agree. Added a negative case.
Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach. I agree with you that we can try GROUPING()
function to see if it can replace gset_id.ThanksRichard
Attachment
Hi Hackers,
Richard pointed out that he get incorrect results with the patch I attached, there are bugs somewhere,
I fixed them now and attached the newest version, please refer to [1] for the fix.
Thanks,
Pengzhou
On Mon, Sep 30, 2019 at 5:41 PM Pengzhou Tang <ptang@pivotal.io> wrote:
Hi Richard & Tomas:I followed the idea of the second approach to add a gset_id in the targetlist of the first stage ofgrouping sets and uses it to combine the aggregate in final stage. gset_id stuff is still keptbecause of GROUPING() cannot uniquely identify a grouping set, grouping sets may containduplicated set, eg: group by grouping sets((c1, c2), (c1,c2)).There are some differences to implement the second approach comparing to the original idea fromRichard, gset_id is not used as additional group key in the final stage, instead, we use it todispatch the input tuple to the specified grouping set directly and then do the aggregate.One advantage of this is that we can handle multiple rollups with better performance without APPEND node.the plan now looks like:gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
QUERY PLAN
--------------------------------------------------------------------------------------------
Finalize MixedAggregate (cost=1000.00..73108.57 rows=8842 width=12)
Dispatched by: (GROUPINGSETID())
Hash Key: c1, c2
Hash Key: c1
Hash Key: c3
Group Key: ()
Group Key: ()
-> Gather (cost=1000.00..71551.48 rows=17684 width=16)
Workers Planned: 2
-> Partial MixedAggregate (cost=0.00..68783.08 rows=8842 width=16)
Hash Key: c1, c2
Hash Key: c1
Hash Key: c3
Group Key: ()
Group Key: ()
-> Parallel Seq Scan on gstest (cost=0.00..47861.33 rows=2083333 width=12)
(16 rows)
gpadmin=# set enable_hashagg to off;
gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (cost=657730.66..663207.45 rows=8842 width=12)
Dispatched by: (GROUPINGSETID())
Group Key: c1, c2
Sort Key: c1
Group Key: c1
Group Key: ()
Group Key: ()
Sort Key: c3
Group Key: c3-> Sort (cost=657730.66..657774.87 rows=17684 width=16)
Sort Key: c1, c2
-> Gather (cost=338722.94..656483.04 rows=17684 width=16)
Workers Planned: 2
-> Partial GroupAggregate (cost=337722.94..653714.64 rows=8842 width=16)
Group Key: c1, c2
Group Key: c1
Group Key: ()
Group Key: ()
Sort Key: c3
Group Key: c3
-> Sort (cost=337722.94..342931.28 rows=2083333 width=12)
Sort Key: c1, c2
-> Parallel Seq Scan on gstest (cost=0.00..47861.33 rows=2083333 width=12)On Wed, Jul 31, 2019 at 4:07 PM Richard Guo <riguo@pivotal.io> wrote:On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <riguo@pivotal.io> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>
Hi Richard,
thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:Hi Tomas,Thank you for reviewing this patch.
1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.Added a short comment in get_number_of_groups() explaining the behaviorwhen doing partial aggregation for grouping sets.
2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.Yes, thanks. Added the new regression test in parallel_schedule andserial_schedule.
3) The regression tests do check plan and results like this:
EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
SELECT ... ORDER BY 1, 2, 3;
which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.
(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)Thank you for pointing this out. Fixed it in V4 patch.
4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).Yes, agree. Added a negative case.
Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach. I agree with you that we can try GROUPING()
function to see if it can replace gset_id.ThanksRichard
Attachment
On Thu, Nov 28, 2019 at 07:07:22PM +0800, Pengzhou Tang wrote: > Richard pointed out that he get incorrect results with the patch I > attached, there are bugs somewhere, > I fixed them now and attached the newest version, please refer to [1] for > the fix. Mr Robot is reporting that the latest patch fails to build at least on Windows. Could you please send a rebase? I have moved for now the patch to next CF, waiting on author. -- Michael
Attachment
On Sun, Dec 1, 2019 at 10:03 AM Michael Paquier <michael@paquier.xyz> wrote:
On Thu, Nov 28, 2019 at 07:07:22PM +0800, Pengzhou Tang wrote:
> Richard pointed out that he get incorrect results with the patch I
> attached, there are bugs somewhere,
> I fixed them now and attached the newest version, please refer to [1] for
> the fix.
Mr Robot is reporting that the latest patch fails to build at least on
Windows. Could you please send a rebase? I have moved for now the
patch to next CF, waiting on author.
Thanks for reporting this issue. Here is the rebase.
Thanks
Richard
Attachment
I realized that there are two patches in this thread that are
implemented according to different methods, which causes confusion. So I
decide to update this thread with only one patch, i.e. the patch for
'Implementation 1' as described in the first email and then move the
other patch to a separate thread.
With this idea, here is the patch for 'Implementation 1' that is rebased
with the latest master.
implemented according to different methods, which causes confusion. So I
decide to update this thread with only one patch, i.e. the patch for
'Implementation 1' as described in the first email and then move the
other patch to a separate thread.
With this idea, here is the patch for 'Implementation 1' that is rebased
with the latest master.
Thanks
Richard
On Wed, Jan 8, 2020 at 3:24 PM Richard Guo <riguo@pivotal.io> wrote:
On Sun, Dec 1, 2019 at 10:03 AM Michael Paquier <michael@paquier.xyz> wrote:On Thu, Nov 28, 2019 at 07:07:22PM +0800, Pengzhou Tang wrote:
> Richard pointed out that he get incorrect results with the patch I
> attached, there are bugs somewhere,
> I fixed them now and attached the newest version, please refer to [1] for
> the fix.
Mr Robot is reporting that the latest patch fails to build at least on
Windows. Could you please send a rebase? I have moved for now the
patch to next CF, waiting on author.Thanks for reporting this issue. Here is the rebase.ThanksRichard
Attachment
On Sun, Jan 19, 2020 at 2:23 PM Richard Guo <riguo@pivotal.io> wrote: > > I realized that there are two patches in this thread that are > implemented according to different methods, which causes confusion. > Both the idea seems to be different. Is the second approach [1] inferior for any case as compared to the first approach? Can we keep both approaches for parallel grouping sets, if so how? If not, then won't the code by the first approach be useless once we commit second approach? [1] - https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA%40mail.gmail.com -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
On Thu, Jan 23, 2020 at 2:47 AM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Sun, Jan 19, 2020 at 2:23 PM Richard Guo <riguo@pivotal.io> wrote: > > > > I realized that there are two patches in this thread that are > > implemented according to different methods, which causes confusion. > > > > Both the idea seems to be different. Is the second approach [1] > inferior for any case as compared to the first approach? Can we keep > both approaches for parallel grouping sets, if so how? If not, then > won't the code by the first approach be useless once we commit second > approach? > > > [1] - https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA%40mail.gmail.com > I glanced over both patches. Just the opposite, I have a hunch that v3 is always better than v5. Here's my 6-minute understanding of both. v5 (the one with a simple partial aggregate) works by pushing a little bit of partial aggregate onto workers, and perform grouping aggregate above gather. This has two interesting outcomes: we can execute unmodified partial aggregate on the workers, and execute almost unmodified rollup aggreegate once the trans values are gathered. A parallel plan for a query like SELECT count(*) FROM foo GROUP BY GROUPING SETS (a), (b), (c), (); can be Finalize GroupAggregate Output: count(*) Group Key: a Group Key: b Group Key: c Group Key: () Gather Merge Partial GroupAggregate Output: PARTIAL count(*) Group Key: a, b, c Sort Sort Key: a, b, c Parallel Seq Scan on foo v3 ("the one with grouping set id") really turns the plan from a tree to a multiplexed pipe: we can execute grouping aggregate on the workers, but only partially. When we emit the trans values, also tag the tuple with a group id. After gather, finalize the aggregates with a modified grouping aggregate. Unlike a non-split grouping aggregate, the finalize grouping aggregate does not "flow" the results from one rollup to the next one. Instead, each group only advances on partial inputs tagged for the group. Finalize HashAggregate Output: count(*) Dispatched by: (GroupingSetID()) Group Key: a Group Key: b Group Key: c Gather Partial GroupAggregate Output: PARTIAL count(*), GroupingSetID() Group Key: a Sort Key: b Group Key: b Sort Key: c Group Key: c Sort Sort Key: a Parallel Seq Scan on foo Note that for the first approach to be viable, the partial aggregate *has to* use a group key that's the union of all grouping sets. In cases where individual columns have a low cardinality but joint cardinality is high (say columns a, b, c each has 16 distinct values, but they are independent, so there are 4096 distinct values on (a,b,c)), this results in fairly high traffic through the shm tuple queue. Cheers, Jesse
On Sat, Jan 25, 2020 at 4:22 AM Jesse Zhang <sbjesse@gmail.com> wrote: > > On Thu, Jan 23, 2020 at 2:47 AM Amit Kapila <amit.kapila16@gmail.com> wrote: > > > > On Sun, Jan 19, 2020 at 2:23 PM Richard Guo <riguo@pivotal.io> wrote: > > > > > > I realized that there are two patches in this thread that are > > > implemented according to different methods, which causes confusion. > > > > > > > Both the idea seems to be different. Is the second approach [1] > > inferior for any case as compared to the first approach? Can we keep > > both approaches for parallel grouping sets, if so how? If not, then > > won't the code by the first approach be useless once we commit second > > approach? > > > > > > [1] - https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA%40mail.gmail.com > > > > I glanced over both patches. Just the opposite, I have a hunch that v3 > is always better than v5. > This is what I also understood after reading this thread. So, my question is why not just review v3 and commit something on those lines even though it would take a bit more time. It is possible that if we decide to go with v5, we can make it happen earlier, but later when we try to get v3, the code committed as part of v5 might not be of any use or if it is useful, then in which cases? -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
Hi Jesse,
Thanks for reviewing these two patches.
On Sat, Jan 25, 2020 at 6:52 AM Jesse Zhang <sbjesse@gmail.com> wrote:
I glanced over both patches. Just the opposite, I have a hunch that v3
is always better than v5. Here's my 6-minute understanding of both.
v5 (the one with a simple partial aggregate) works by pushing a little
bit of partial aggregate onto workers, and perform grouping aggregate
above gather. This has two interesting outcomes: we can execute
unmodified partial aggregate on the workers, and execute almost
unmodified rollup aggreegate once the trans values are gathered. A
parallel plan for a query like
SELECT count(*) FROM foo GROUP BY GROUPING SETS (a), (b), (c), ();
can be
Finalize GroupAggregate
Output: count(*)
Group Key: a
Group Key: b
Group Key: c
Group Key: ()
Gather Merge
Partial GroupAggregate
Output: PARTIAL count(*)
Group Key: a, b, c
Sort
Sort Key: a, b, c
Parallel Seq Scan on foo
Yes, this is the idea of v5 patch.
v3 ("the one with grouping set id") really turns the plan from a tree to
a multiplexed pipe: we can execute grouping aggregate on the workers,
but only partially. When we emit the trans values, also tag the tuple
with a group id. After gather, finalize the aggregates with a modified
grouping aggregate. Unlike a non-split grouping aggregate, the finalize
grouping aggregate does not "flow" the results from one rollup to the
next one. Instead, each group only advances on partial inputs tagged for
the group.
Finalize HashAggregate
Output: count(*)
Dispatched by: (GroupingSetID())
Group Key: a
Group Key: b
Group Key: c
Gather
Partial GroupAggregate
Output: PARTIAL count(*), GroupingSetID()
Group Key: a
Sort Key: b
Group Key: b
Sort Key: c
Group Key: c
Sort
Sort Key: a
Parallel Seq Scan on foo
Yes, this is what v3 patch does.
We (Pengzhou and I) had an offline discussion on this plan and we have
some other idea. Since we have tagged 'GroupingSetId' for each tuple
produced by partial aggregate, why not then perform a normal grouping
sets aggregation in the final phase, with the 'GroupingSetId' included
in the group keys? The plan looks like:
We (Pengzhou and I) had an offline discussion on this plan and we have
some other idea. Since we have tagged 'GroupingSetId' for each tuple
produced by partial aggregate, why not then perform a normal grouping
sets aggregation in the final phase, with the 'GroupingSetId' included
in the group keys? The plan looks like:
# explain (costs off, verbose)
select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1),(c2,c3));
QUERY PLAN
------------------------------------------------------------------
Finalize GroupAggregate
Output: c1, c2, c3, avg(c3)
Group Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
-> Sort
Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
Sort Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
-> Gather
Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
Workers Planned: 4
-> Partial HashAggregate
Output: c1, c2, c3, gset_id, PARTIAL avg(c3)
Hash Key: gstest.c1, gstest.c2
Hash Key: gstest.c1
Hash Key: gstest.c2, gstest.c3
-> Parallel Seq Scan on public.gstest
Output: c1, c2, c3
select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1),(c2,c3));
QUERY PLAN
------------------------------------------------------------------
Finalize GroupAggregate
Output: c1, c2, c3, avg(c3)
Group Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
-> Sort
Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
Sort Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
-> Gather
Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
Workers Planned: 4
-> Partial HashAggregate
Output: c1, c2, c3, gset_id, PARTIAL avg(c3)
Hash Key: gstest.c1, gstest.c2
Hash Key: gstest.c1
Hash Key: gstest.c2, gstest.c3
-> Parallel Seq Scan on public.gstest
Output: c1, c2, c3
This plan should be able to give the correct results. We are still
thinking if it is a better plan than the 'multiplexed pipe' plan as in
v3. Inputs of thoughts here would be appreciated.
thinking if it is a better plan than the 'multiplexed pipe' plan as in
v3. Inputs of thoughts here would be appreciated.
Note that for the first approach to be viable, the partial aggregate
*has to* use a group key that's the union of all grouping sets. In cases
where individual columns have a low cardinality but joint cardinality is
high (say columns a, b, c each has 16 distinct values, but they are
independent, so there are 4096 distinct values on (a,b,c)), this results
in fairly high traffic through the shm tuple queue.
Yes, you are right. This is the case mentioned by David earlier in [1].
In this case, ideally the parallel plan would fail when competing with
non-parallel plan in add_path() and so not be chosen.
In this case, ideally the parallel plan would fail when competing with
non-parallel plan in add_path() and so not be chosen.
Thanks
Richard
Hi Amit,
Thanks for reviewing these two patches.
On Sat, Jan 25, 2020 at 6:31 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
This is what I also understood after reading this thread. So, my
question is why not just review v3 and commit something on those lines
even though it would take a bit more time. It is possible that if we
decide to go with v5, we can make it happen earlier, but later when we
try to get v3, the code committed as part of v5 might not be of any
use or if it is useful, then in which cases?
Yes, approach #2 (v3) would be generally better than approach #1 (v5) in
performance. I started with approach #1 because it is much easier.If we decide to go with approach #2, I think we can now concentrate on
v3 patch.
For v3 patch, we have some other idea, which is to perform a normal
grouping sets aggregation in the final phase, with 'GroupingSetId'
included in the group keys (as described in the previous email). With
this idea, we can avoid a lot of hacky codes in current v3 patch.
grouping sets aggregation in the final phase, with 'GroupingSetId'
included in the group keys (as described in the previous email). With
this idea, we can avoid a lot of hacky codes in current v3 patch.
Thanks
Richard
On Mon, Feb 3, 2020 at 12:07 AM Richard Guo <riguo@pivotal.io> wrote: > > Hi Jesse, > > Thanks for reviewing these two patches. I enjoyed it! > > On Sat, Jan 25, 2020 at 6:52 AM Jesse Zhang <sbjesse@gmail.com> wrote: >> >> >> I glanced over both patches. Just the opposite, I have a hunch that v3 >> is always better than v5. Here's my 6-minute understanding of both. >> >> v3 ("the one with grouping set id") really turns the plan from a tree to >> a multiplexed pipe: we can execute grouping aggregate on the workers, >> but only partially. When we emit the trans values, also tag the tuple >> with a group id. After gather, finalize the aggregates with a modified >> grouping aggregate. Unlike a non-split grouping aggregate, the finalize >> grouping aggregate does not "flow" the results from one rollup to the >> next one. Instead, each group only advances on partial inputs tagged for >> the group. >> > > Yes, this is what v3 patch does. > > We (Pengzhou and I) had an offline discussion on this plan and we have > some other idea. Since we have tagged 'GroupingSetId' for each tuple > produced by partial aggregate, why not then perform a normal grouping > sets aggregation in the final phase, with the 'GroupingSetId' included > in the group keys? The plan looks like: > > # explain (costs off, verbose) > select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1),(c2,c3)); > QUERY PLAN > ------------------------------------------------------------------ > Finalize GroupAggregate > Output: c1, c2, c3, avg(c3) > Group Key: (gset_id), gstest.c1, gstest.c2, gstest.c3 > -> Sort > Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3)) > Sort Key: (gset_id), gstest.c1, gstest.c2, gstest.c3 > -> Gather > Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3)) > Workers Planned: 4 > -> Partial HashAggregate > Output: c1, c2, c3, gset_id, PARTIAL avg(c3) > Hash Key: gstest.c1, gstest.c2 > Hash Key: gstest.c1 > Hash Key: gstest.c2, gstest.c3 > -> Parallel Seq Scan on public.gstest > Output: c1, c2, c3 > > This plan should be able to give the correct results. We are still > thinking if it is a better plan than the 'multiplexed pipe' plan as in > v3. Inputs of thoughts here would be appreciated. Ha, I believe you meant to say a "normal aggregate", because what's performed above gather is no longer "grouping sets", right? The group key idea is clever in that it helps "discriminate" tuples by their grouping set id. I haven't completely thought this through, but my hunch is that this leaves some money on the table, for example, won't it also lead to more expensive (and unnecessary) sorting and hashing? The groupings with a few partials are now sharing the same tuplesort with the groupings with a lot of groups even though we only want to tell grouping 1 *apart from* grouping 10, not neccessarily that grouping 1 needs to come before grouping 10. That's why I like the multiplexed pipe / "dispatched by grouping set id" idea: we only pay for sorting (or hashing) within each grouping. That said, I'm open to the criticism that keeping multiple tuplesort and agg hash tabes running is expensive in itself, memory-wise ... Cheers, Jesse
Thanks to reviewing those patches.
Ha, I believe you meant to say a "normal aggregate", because what's
performed above gather is no longer "grouping sets", right?
The group key idea is clever in that it helps "discriminate" tuples by
their grouping set id. I haven't completely thought this through, but my
hunch is that this leaves some money on the table, for example, won't it
also lead to more expensive (and unnecessary) sorting and hashing? The
groupings with a few partials are now sharing the same tuplesort with
the groupings with a lot of groups even though we only want to tell
grouping 1 *apart from* grouping 10, not neccessarily that grouping 1
needs to come before grouping 10. That's why I like the multiplexed pipe
/ "dispatched by grouping set id" idea: we only pay for sorting (or
hashing) within each grouping. That said, I'm open to the criticism that
keeping multiple tuplesort and agg hash tabes running is expensive in
itself, memory-wise ...
Cheers,
Jesse
That's something we need to testing, thanks. Meanwhile, for the approach to
use "normal aggregate" with grouping set id, one concern is that it cannot use
Mixed Hashed which means if a grouping sets contain both non-hashable or
non-sortable sets, it will fallback to one-phase aggregate.
To summarize the current state of parallel grouping sets, we now have
two available implementations for it.
1) Each worker performs an aggregation step, producing a partial result
for each group of which that process is aware. Then the partial results
are gathered to the leader, which then performs a grouping sets
aggregation, as in patch [1].
This implementation is not very efficient sometimes, because the group
key for Partial Aggregate has to be all the columns involved in the
grouping sets.
2) Each worker performs a grouping sets aggregation on its partial
data, and tags 'GroupingSetId' for each tuple produced by partial
aggregate. Then the partial results are gathered to the leader, and the
leader performs a modified grouping aggregate, which dispatches the
partial results into different pipe according to 'GroupingSetId', as in
patch [2], or instead as another method, the leader performs a normal
aggregation, with 'GroupingSetId' included in the group keys, as
discussed in [3].
The second implementation would be generally better than the first one
in performance, and we have decided to concentrate on it.
[1] https://www.postgresql.org/message-id/CAN_9JTx3NM12ZDzEYcOVLFiCBvwMHyM0gENvtTpKBoOOgcs=kw@mail.gmail.com
[2] https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA@mail.gmail.com
[3] https://www.postgresql.org/message-id/CAN_9JTwtzttEmdXvMbJqXt=51kXiBTCKEPKq6kk2PZ6Xz6m5ig@mail.gmail.com
two available implementations for it.
1) Each worker performs an aggregation step, producing a partial result
for each group of which that process is aware. Then the partial results
are gathered to the leader, which then performs a grouping sets
aggregation, as in patch [1].
This implementation is not very efficient sometimes, because the group
key for Partial Aggregate has to be all the columns involved in the
grouping sets.
2) Each worker performs a grouping sets aggregation on its partial
data, and tags 'GroupingSetId' for each tuple produced by partial
aggregate. Then the partial results are gathered to the leader, and the
leader performs a modified grouping aggregate, which dispatches the
partial results into different pipe according to 'GroupingSetId', as in
patch [2], or instead as another method, the leader performs a normal
aggregation, with 'GroupingSetId' included in the group keys, as
discussed in [3].
The second implementation would be generally better than the first one
in performance, and we have decided to concentrate on it.
[1] https://www.postgresql.org/message-id/CAN_9JTx3NM12ZDzEYcOVLFiCBvwMHyM0gENvtTpKBoOOgcs=kw@mail.gmail.com
[2] https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA@mail.gmail.com
[3] https://www.postgresql.org/message-id/CAN_9JTwtzttEmdXvMbJqXt=51kXiBTCKEPKq6kk2PZ6Xz6m5ig@mail.gmail.com
Thanks
Richard
Hi there,
We want to update our work on the parallel groupingsets, the attached
We want to update our work on the parallel groupingsets, the attached
patchset implements parallel grouping sets with the strategy proposed in
It contains some refinement of our code and adds LLVM support. It also
contains a few patches refactoring the grouping sets code to make the
parallel grouping sets implementation cleaner.
Like simple parallel aggregate, we separate the process of grouping sets
into two stages:
The partial stage:
the partial stage is much the same as the current grouping sets
implementation, the differences are:
- In the partial stage, like in regular parallel aggregation, only partial
aggregate results (e.g. transvalues) are produced.
- The output of the partial stage includes a grouping set ID to allow for
disambiguation during the final stage
The optimizations of the existing grouping sets implementation are
preserved during the partial stage, like:
- Grouping sets that can be combined in one rollup are still grouped
- Grouping sets that can be combined in one rollup are still grouped
together (for group agg).
- Hashaggs can be performed concurrently with the first group agg.
- All hash transitions can be done in one expression state.
- Hashaggs can be performed concurrently with the first group agg.
- All hash transitions can be done in one expression state.
The final stage:
In the final stage, the partial aggregate results are combined according to
the grouping set id. None of the optimizations of the partial stage can be
leveraged in the final stage. So all rollups are extracted and each rollup
contains only one grouping set, each aggregate phase processes a single
grouping set. In this stage, tuples are multiplexed into the different phases
according to the grouping set id before we actually aggregate it.
An alternative approach to the final stage implementation that we considered
was using a single AGG with grouping clause: gsetid + all grouping columns.
In the end, we decided against it because it doesn't support mixed aggregation,
firstly, once the grouping columns are a mix of unsortable and unhashable
columns, it cannot produce a path in the final stage, secondly, mixed aggregation
is the cheapest path in some cases and this way can not support it. Meanwhile,
if the union of all the grouping columns is large, this parallel implementation will
incur undue costs.
The patches included in this patchset are as follows:
0001-All-grouping-sets-do-their-own-sorting.patch
This is a refactoring patch for the existing code. It moves the phase 0 SORT
into the AGG instead of assuming that the input is already sorted.
Postgres used to add a SORT path explicitly beneath the AGG for sort group
aggregate. Grouping sets path also adds a SORT path for the first sort
aggregate phase but the following sort aggregate phases do their own sorting
using a tuplesort. This commit unifies the way grouping sets paths do sorting,
all sort aggregate phases now do their own sorting using tuplesort.
We did this refactoring to support the final stage of parallel grouping sets.
Adding a SORT path underneath the AGG in the final stage is wasteful. With
this patch, all non-hashed aggregate phases can do their own sorting after
the tuples are redirected.
Unpatched:
tpch=# explain (costs off) select count(*) from customer group by grouping sets (c_custkey, c_name);
QUERY PLAN
----------------------------------
GroupAggregate
Group Key: c_custkey
Sort Key: c_name
Group Key: c_name
-> Sort
Sort Key: c_custkey
-> Seq Scan on customer
Patched:
tpch=# explain (costs off) select count(*) from customer group by grouping sets (c_custkey, c_name);
QUERY PLAN
----------------------------
GroupAggregate
Sort Key: c_custkey
Group Key: c_custkey
Sort Key: c_name
Group Key: c_name
-> Seq Scan on customer
0002-fix-a-numtrans-bug.patch
Bugfix for the additional size of the hash table for hash aggregate, the additional size is always zero.
0003-Reorganise-the-aggregate-phases.patch
Planner used to organize the grouping sets in [HASHED]->[SORTED] order.
HASHED aggregates were always located before SORTED aggregate. And
ExecInitAgg() organized the aggregate phases in [HASHED]->[SORTED] order.
All HASHED grouping sets are squeezed into phase 0 when executing the
AGG node. For AGG_HASHED or AGG_MIXED strategies, however, the executor
will start from executing phase 1-3 assuming they are all groupaggs and then
return to phase 0 to execute hashaggs if it is AGG_MIXED.
When adding support for parallel grouping sets, this was a big barrier.
Firstly, we needed complicated logic to locate the first sort rollup/phase and
handle the special order for a differentstrategy in many places.
Secondly, squeezing all hashed grouping sets to phase 0 doesn't work for the
final stage. We can't put all transition functions into one expression state in the
final stage. ExecEvalExpr() is optimized to evaluate all the hashed grouping
sets for the same tuple, however, each input to the final stage is a trans value,
so you inherently should not evaluate more than one grouping set for the same input.
This commit organizes the grouping sets in a more natural way:
[SORTED]->[HASHED].
The executor now starts execution from phase 0 for all strategies, the HASHED
sets are no longer squeezed into a single phase. Instead, a HASHED set has its
own phase and we use other ways to put all hash transitions in one expression
state for the partial stage.
This commit also moves 'sort_in' from the AggState to the AggStatePerPhase*
structure, this helps to handle more complicated cases necessitated by the
introduction of parallel grouping sets. For example, we might need to add a
tuplestore 'store_in' to store partial aggregates results for PLAIN sets then.
It also gives us a chance to keep the first TupleSortState, so we do not do a resort
when rescanning.
0004-Parallel-grouping-sets.patch
This is the main logic. Patch 0001 and 0003 allow it to be pretty simple.
Here is an example plan with the patch applied:
tpch=# explain (costs off) select sum(l_quantity) as sum_qty, count(*) as count_order from lineitem group by grouping sets((l_returnflag, l_linestatus), (), l_suppkey);
QUERY PLAN
----------------------------------------------------
Finalize MixedAggregate
Filtered by: (GROUPINGSETID())
Sort Key: l_suppkey
Group Key: l_suppkey
Group Key: ()
Hash Key: l_returnflag, l_linestatus
-> Gather
Workers Planned: 7
-> Partial MixedAggregate
Sort Key: l_suppkey
Group Key: l_suppkey
Group Key: ()
Hash Key: l_returnflag, l_linestatus
-> Parallel Seq Scan on lineitem
(14 rows)
We have done some performance tests as well using a groupingsets-enhanced
0004-Parallel-grouping-sets.patch
This is the main logic. Patch 0001 and 0003 allow it to be pretty simple.
Here is an example plan with the patch applied:
tpch=# explain (costs off) select sum(l_quantity) as sum_qty, count(*) as count_order from lineitem group by grouping sets((l_returnflag, l_linestatus), (), l_suppkey);
QUERY PLAN
----------------------------------------------------
Finalize MixedAggregate
Filtered by: (GROUPINGSETID())
Sort Key: l_suppkey
Group Key: l_suppkey
Group Key: ()
Hash Key: l_returnflag, l_linestatus
-> Gather
Workers Planned: 7
-> Partial MixedAggregate
Sort Key: l_suppkey
Group Key: l_suppkey
Group Key: ()
Hash Key: l_returnflag, l_linestatus
-> Parallel Seq Scan on lineitem
(14 rows)
We have done some performance tests as well using a groupingsets-enhanced
subset of TPCH. TPCH didn't contain grouping sets queries, so we changed all
"group by" clauses to "group by rollup" clauses. We chose 14 queries the test.
We noticed no performance regressions. 3 queries showed performance improvements
due to parallelism: (tpch scale is 10 and max_parallel_workers_per_gather is 8)
1.sql: 16150.780 ms vs 116093.550 ms
13.sql: 5288.635 ms vs 19541.981 ms
18.sql: 52985.084 ms vs 67980.856 ms
Thanks,
Pengzhou & Melanie & Jesse
Attachment
Hi, unfortunately this got a bit broken by the disk-based hash aggregation, committed today, and so it needs a rebase. I've started looking at the patch before that, and I have it rebased on e00912e11a9e (i.e. the commit before the one that breaks it). Attached is the rebased patch series (now broken), with a couple of commits with some minor cosmetic changes I propose to make (easier than explaining it on a list, it's mostly about whitespace, comments etc). Feel free to reject the changes, it's up to you. I'll continue doing the review, but it'd be good to have a fully rebased version. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachment
Thanks you to review this patch.
On Thu, Mar 19, 2020 at 10:09 AM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:
Hi,
unfortunately this got a bit broken by the disk-based hash aggregation,
committed today, and so it needs a rebase. I've started looking at the
patch before that, and I have it rebased on e00912e11a9e (i.e. the
commit before the one that breaks it).
I spent the day to look into the details of the hash spill patch and finally can
successfully rebase it, I tested the first 5 patches and they all passed the
installcheck, the 0006-parallel-xxx.path is not tested yet and I also need to
make hash spill work in the final stage of parallel grouping sets, will do that
tomorrow.
the conflicts mainly located in the handling of hash spill for grouping sets,
the 0004-reorganise-xxxx patch also make the refilling the hash table stage easier and
can avoid the nullcheck in that stage.
Attached is the rebased patch series (now broken), with a couple of
commits with some minor cosmetic changes I propose to make (easier than
explaining it on a list, it's mostly about whitespace, comments etc).
Feel free to reject the changes, it's up to you.
Thanks, I will enhance the comments and take care of the whitespace.
I'll continue doing the review, but it'd be good to have a fully rebased
version.
Very appreciate it.
Thanks,
Pengzhou
Hi Tomas,
I rebased the code and resolved the comments you attached, some unresolved
comments are explained in 0002-fixes.patch, please take a look.
I also make the hash spill working for parallel grouping sets, the plan looks like:
gpadmin=# explain select g100, g10, sum(g::numeric), count(*), max(g::text)
from gstest_p group by cube (g100,g10);
QUERY PLAN
-------------------------------------------------------------------------------------------
Finalize MixedAggregate (cost=1000.00..7639.95 rows=1111 width=80)
Filtered by: (GROUPINGSETID())
Group Key: ()
Hash Key: g100, g10
Hash Key: g100
Hash Key: g10
Planned Partitions: 4
-> Gather (cost=1000.00..6554.34 rows=7777 width=84)
Workers Planned: 7
-> Partial MixedAggregate (cost=0.00..4776.64 rows=1111 width=84)
Group Key: ()
Hash Key: g100, g10
Hash Key: g100
Hash Key: g10
Planned Partitions: 4
-> Parallel Seq Scan on gstest_p (cost=0.00..1367.71 rows=28571 width=12)
(16 rows)
-------------------------------------------------------------------------------------------
Finalize MixedAggregate (cost=1000.00..7639.95 rows=1111 width=80)
Filtered by: (GROUPINGSETID())
Group Key: ()
Hash Key: g100, g10
Hash Key: g100
Hash Key: g10
Planned Partitions: 4
-> Gather (cost=1000.00..6554.34 rows=7777 width=84)
Workers Planned: 7
-> Partial MixedAggregate (cost=0.00..4776.64 rows=1111 width=84)
Group Key: ()
Hash Key: g100, g10
Hash Key: g100
Hash Key: g10
Planned Partitions: 4
-> Parallel Seq Scan on gstest_p (cost=0.00..1367.71 rows=28571 width=12)
(16 rows)
Thanks,
Pengzhou
On Thu, Mar 19, 2020 at 10:09 AM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:
Hi,
unfortunately this got a bit broken by the disk-based hash aggregation,
committed today, and so it needs a rebase. I've started looking at the
patch before that, and I have it rebased on e00912e11a9e (i.e. the
commit before the one that breaks it).
Attached is the rebased patch series (now broken), with a couple of
commits with some minor cosmetic changes I propose to make (easier than
explaining it on a list, it's mostly about whitespace, comments etc).
Feel free to reject the changes, it's up to you.
I'll continue doing the review, but it'd be good to have a fully rebased
version.
regards
--
Tomas Vondra https://urldefense.proofpoint.com/v2/url?u=http-3A__www.2ndQuadrant.com&d=DwIBAg&c=lnl9vOaLMzsy2niBC8-h_K-7QJuNJEsFrzdndhuJ3Sw&r=L968W84_Yb9HJKtAAZUSYw&m=hYswOh9Appfj1CipZAY8-RyPSLWnua0VLEaMDCJ2L3s&s=iYybgoMynB_mcwDfPDmJv3afu-Xdis45lMkS-_6LGnQ&e=
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachment
On Fri, Mar 20, 2020 at 07:57:02PM +0800, Pengzhou Tang wrote: >Hi Tomas, > >I rebased the code and resolved the comments you attached, some unresolved >comments are explained in 0002-fixes.patch, please take a look. > >I also make the hash spill working for parallel grouping sets, the plan >looks like: > >gpadmin=# explain select g100, g10, sum(g::numeric), count(*), max(g::text) >from gstest_p group by cube (g100,g10); > QUERY PLAN >------------------------------------------------------------------------------------------- > Finalize MixedAggregate (cost=1000.00..7639.95 rows=1111 width=80) > Filtered by: (GROUPINGSETID()) > Group Key: () > Hash Key: g100, g10 > Hash Key: g100 > Hash Key: g10 > Planned Partitions: 4 > -> Gather (cost=1000.00..6554.34 rows=7777 width=84) > Workers Planned: 7 > -> Partial MixedAggregate (cost=0.00..4776.64 rows=1111 width=84) > Group Key: () > Hash Key: g100, g10 > Hash Key: g100 > Hash Key: g10 > Planned Partitions: 4 > -> Parallel Seq Scan on gstest_p (cost=0.00..1367.71 >rows=28571 width=12) >(16 rows) > Hmmm, OK. I think there's some sort of memory leak, though. I've tried running a simple grouping set query on catalog_sales table from TPC-DS scale 100GB test. The query is pretty simple: select count(*) from catalog_sales group by cube (cs_warehouse_sk, cs_ship_mode_sk, cs_call_center_sk); with a partial MixedAggregate plan (attached). When executed, it however allocates more and more memory, and eventually gets killed by an OOM killer. This is on a machine with 8GB of RAM, work_mem=4MB (and 4 parallel workers). The memory context stats from a running process before it gets killed by OOM look like this TopMemoryContext: 101560 total in 6 blocks; 7336 free (6 chunks); 94224 used TopTransactionContext: 73816 total in 4 blocks; 11624 free (0 chunks); 62192 used ExecutorState: 1375731712 total in 174 blocks; 5391392 free (382 chunks); 1370340320 used HashAgg meta context: 315784 total in 10 blocks; 15400 free (2 chunks); 300384 used ExprContext: 8192 total in 1 blocks; 7928 free (0 chunks); 264 used ExprContext: 8192 total in 1 blocks; 7928 free (0 chunks); 264 used ExprContext: 8192 total in 1 blocks; 7928 free (0 chunks); 264 used ... That's 1.3GB allocated in ExecutorState - that doesn't seem right. FWIW there are only very few groups (each attribute has fewer than 30 distinct values, so there's only about ~1000 groups. On master it works just fine, of course. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachment
The memory context stats from a running process before it gets killed by
OOM look like this
TopMemoryContext: 101560 total in 6 blocks; 7336 free (6 chunks); 94224 used
TopTransactionContext: 73816 total in 4 blocks; 11624 free (0 chunks); 62192 used
ExecutorState: 1375731712 total in 174 blocks; 5391392 free (382 chunks); 1370340320 used
HashAgg meta context: 315784 total in 10 blocks; 15400 free (2 chunks); 300384 used
ExprContext: 8192 total in 1 blocks; 7928 free (0 chunks); 264 used
ExprContext: 8192 total in 1 blocks; 7928 free (0 chunks); 264 used
ExprContext: 8192 total in 1 blocks; 7928 free (0 chunks); 264 used
...
That's 1.3GB allocated in ExecutorState - that doesn't seem right.
FWIW there are only very few groups (each attribute has fewer than 30
distinct values, so there's only about ~1000 groups. On master it works
just fine, of course.
Thanks a lot, the patch has a memory leak in the lookup_hash_entries, it uses a list_concat there
and causes a 64-byte leak for every tuple, has fixed that.
Also, resolved conflicts and rebased the code.
Thanks,
Pengzhou
Attachment
> On 25 Mar 2020, at 15:35, Pengzhou Tang <ptang@pivotal.io> wrote: > Thanks a lot, the patch has a memory leak in the lookup_hash_entries, it uses a list_concat there > and causes a 64-byte leak for every tuple, has fixed that. > > Also, resolved conflicts and rebased the code. While there hasn't been a review of this version, it no longer applies to HEAD. There was also considerable discussion in a (virtual) hallway-track session during PGCon which reviewed the approach (for lack of a better description), deeming that nodeAgg.c needs a refactoring before complicating it further. Based on that, and an off-list discussion with Melanie who had picked up the patch, I'm marking this Returned with Feedback. cheers ./daniel