Thread: [HACKERS] PoC: Grouped base relation
Attached is a draft patch that lets partial aggregation happen at base relation level. If the relations contain relatively small number of groups, the number of input rows of the aggregation at the query level can be reduced this way. Also, if append relation and postgres_fdw planning is enhanced accordingly, patch like this can let us aggregate individual tables on remote servers (e.g. shard nodes) and thus reduce the amount of rows subject to the final aggregation. For example, consider query SELECT b.j, sum(a.x) FROM a, b WHERE a.i = b.j GROUP BY b.j; and tables "a" i | x ------- 1 | 3 1 | 4 and "b" j --- 1 1 The base relations grouped look like i | sum(a.x)| count(*) ----------------------- 1 | 7 | 2 and j | count(*) ------------- 1 | 2 A few "footnotes": * Equivalence class {a.i, b.j} tells that "a" can be grouped by "i", besides the grouping of "b" which is explicitly required by GROUP BY b.j clause.) * To transfer the aggregate results to upper nodes, I introduced a concept of "grouped variable". Base relation has special target which the planner uses to generate "grouped paths". The grouped target contains one grouped variable per aggregate that the relation computes. During final processing of the plan (setrefs.c), the corresponding (partial) aggregate is restored in the query target if needed - typically this happens to ensure that the final aggregate references the output of the partial aggregate. * So far the grouped variable is only used for aggregates, but it might be useful for grouping expressions in the future as well. Currently the base relation can only be grouped by a plain Var, but it might be worth grouping it by generic grouping expressions of the GROUP BY clause, and using the grouped var mechanism to propagate the expression value to the query target. As for the example, the processing continues by joining the partially grouped sets: i | sum(x)| count(i.*) | j | count(j.*) ---------------------------------------- 1 | 7 | 2 | 1 | 3 Before performing the final aggregation, we need to multiply sum(a.x) by count(j.*) because w/o the aggregation at base relation level the input of the query-level aggregation would look like a.i | a.x | b.j ---------------- 1 | 3 | 1 1 | 4 | 1 1 | 3 | 1 1 | 4 | 1 In other words, grouping of the base relation "b" below the join prevents the join from bringing per-group input set to the aggregate input multiple times. To compensate for this effect, I've added a new field "aggtransmultifn" to pg_aggregate catalog. It "multiplies" the aggregate state w/o repeated processing of the same input set many times. sum() is an example of an aggregate that needs such processing, avg() is one that does not. The example query can eventually produce plans like QUERY PLAN ------------------------------------------------------ Finalize HashAggregate Group Key: b.j -> Gather Workers Planned: 2 -> Hash Join Hash Cond: (a.i = b.j) -> Partial HashAggregate Group Key: a.i -> Parallel Seq Scan on a -> Hash -> Partial HashAggregate Group Key: b.j -> Parallel Seq Scan on b or QUERY PLAN ------------------------------------------------------ Finalize HashAggregate Group Key: b.j -> Hash Join Hash Cond: (a.i = b.j) -> Gather Workers Planned: 2 -> Partial HashAggregate Group Key: a.i -> Parallel Seq Scan on a -> Hash -> Gather Workers Planned: 1 -> Partial HashAggregate Group Key: b.j -> Parallel Seq Scan on b An obvious limitation is that neither grouping expression nor aggregate argument can be below the nullable side of outer join. In such a case the aggregate at the base relation level wouldn't receive the NULL values that it does receive at the query level. Also, no aggregate can reference multiple tables. Does this concept seem worth to continue coding? BTW, if anyone wants to play with the current version: 1. Don't forget to initialize a new cluster (initdb) first. I decided not to bump CATALOG_VERSION_NO so far because it'd probably make the patch incompatible with master branch quite soon. 2. Only hash aggregation is implemented so far at the base relation level. 3. As for sum() aggregate, only sum(float4) is supposed to work correctly so far - this is related to the pg_aggregate changes mentioned above. avg() should work in general, and I didn't care about the other ones so far. 4. As for joins, only hash join is able to process the grouped relations. I didn't want to do too much coding until there's a consensus on the design. -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Mon, Jan 9, 2017 at 12:56 PM, Antonin Houska <ah@cybertec.at> wrote: > Attached is a draft patch that lets partial aggregation happen at base > relation level. If the relations contain relatively small number of groups, > the number of input rows of the aggregation at the query level can be reduced > this way. Also, if append relation and postgres_fdw planning is enhanced > accordingly, patch like this can let us aggregate individual tables on remote > servers (e.g. shard nodes) and thus reduce the amount of rows subject to the > final aggregation. Very interesting. I don't have time to study this in detail right now, but as a concept it seems worthwhile. I think the trick is figuring out at which levels of the path tree it makes sense to consider partial aggregation. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Mon, Jan 9, 2017 at 5:56 PM, Antonin Houska <ah@cybertec.at> wrote:
and "b"
j
---
1
2
Attached is a draft patch that lets partial aggregation happen at base
relation level. If the relations contain relatively small number of groups,
the number of input rows of the aggregation at the query level can be reduced
this way. Also, if append relation and postgres_fdw planning is enhanced
accordingly, patch like this can let us aggregate individual tables on remote
servers (e.g. shard nodes) and thus reduce the amount of rows subject to the
final aggregation.
For example, consider query
SELECT b.j, sum(a.x) FROM a, b WHERE a.i = b.j GROUP BY b.j;
and tables "a"
i | x
-------
1 | 3
1 | 4
and "b"
j
---
1
1
The example should have j= 1,2 , right?
and "b"
j
---
1
2
The base relations grouped look like
i | sum(a.x)| count(*)
-----------------------
1 | 7 | 2
Otherwise, the sum and count would be 14 and 4.
and
j | count(*)
-------------
1 | 2
Pantelis
On Tue, Jan 10, 2017 at 6:52 PM, Pantelis Theodosiou <ypercube@gmail.com> wrote:
On Mon, Jan 9, 2017 at 5:56 PM, Antonin Houska <ah@cybertec.at> wrote:Attached is a draft patch that lets partial aggregation happen at base
relation level. If the relations contain relatively small number of groups,
the number of input rows of the aggregation at the query level can be reduced
this way. Also, if append relation and postgres_fdw planning is enhanced
accordingly, patch like this can let us aggregate individual tables on remote
servers (e.g. shard nodes) and thus reduce the amount of rows subject to the
final aggregation.
For example, consider query
SELECT b.j, sum(a.x) FROM a, b WHERE a.i = b.j GROUP BY b.j;
and tables "a"
i | x
-------
1 | 3
1 | 4
and "b"
j
---
1
1The example should have j= 1,2 , right?
and "b"
j
---
1
2
The base relations grouped look like
i | sum(a.x)| count(*)
-----------------------
1 | 7 | 2Otherwise, the sum and count would be 14 and 4.
and
j | count(*)
-------------
1 | 2
Pantelis
Or perhaps I should be reading more carefully the whole mail before posting. Ignore the previous.
On Mon, Jan 9, 2017 at 11:26 PM, Antonin Houska <ah@cybertec.at> wrote: > Attached is a draft patch that lets partial aggregation happen at base > relation level. If the relations contain relatively small number of groups, > the number of input rows of the aggregation at the query level can be reduced > this way. Also, if append relation and postgres_fdw planning is enhanced > accordingly, patch like this can let us aggregate individual tables on remote > servers (e.g. shard nodes) and thus reduce the amount of rows subject to the > final aggregation. For an appendrel probably you need an ability to switch group->append into append->group. For postgres_fdw, we already support aggregate pushdown. But we don't support fetching partial aggregates from foreign server. What other enhancements do you need? > > For example, consider query > > SELECT b.j, sum(a.x) FROM a, b WHERE a.i = b.j GROUP BY b.j; > > and tables "a" > > i | x > ------- > 1 | 3 > 1 | 4 > > and "b" > > j > --- > 1 > 1 > > The base relations grouped look like > > i | sum(a.x)| count(*) > ----------------------- > 1 | 7 | 2 > > and > > j | count(*) > ------------- > 1 | 2 > Looks like an interesting technique. > > A few "footnotes": > > * Equivalence class {a.i, b.j} tells that "a" can be grouped by "i", besides > the grouping of "b" which is explicitly required by GROUP BY b.j clause.) > > * To transfer the aggregate results to upper nodes, I introduced a concept of > "grouped variable". Base relation has special target which the planner uses to > generate "grouped paths". The grouped target contains one grouped variable per > aggregate that the relation computes. During final processing of the plan > (setrefs.c), the corresponding (partial) aggregate is restored in the query > target if needed - typically this happens to ensure that the final aggregate > references the output of the partial aggregate. > > * So far the grouped variable is only used for aggregates, but it might be > useful for grouping expressions in the future as well. Currently the base > relation can only be grouped by a plain Var, but it might be worth grouping it > by generic grouping expressions of the GROUP BY clause, and using the grouped > var mechanism to propagate the expression value to the query target. > > > As for the example, the processing continues by joining the partially grouped > sets: > > i | sum(x)| count(i.*) | j | count(j.*) > ---------------------------------------- > 1 | 7 | 2 | 1 | 3 > > > Before performing the final aggregation, we need to multiply sum(a.x) by > count(j.*) because w/o the aggregation at base relation level the input > of the query-level aggregation would look like > > a.i | a.x | b.j > ---------------- > 1 | 3 | 1 > 1 | 4 | 1 > 1 | 3 | 1 > 1 | 4 | 1 > > In other words, grouping of the base relation "b" below the join prevents the > join from bringing per-group input set to the aggregate input multiple > times. To compensate for this effect, I've added a new field "aggtransmultifn" > to pg_aggregate catalog. It "multiplies" the aggregate state w/o repeated > processing of the same input set many times. sum() is an example of an > aggregate that needs such processing, avg() is one that does not. For something like product aggregation, where the product (or higher order operations) across rows is accumulated instead of sum, mere multiplication wouldn't help. We will need some higher order operation to "extrapolate" the result based on count(j.*). In fact, the multiplication factor will depend upon the number of relations being joined E.g. select b.j, sum(a.x) where a, b, c where a.i = b.j and a.i = c.k group by b.j > > The example query can eventually produce plans like > > QUERY PLAN > ------------------------------------------------------ > Finalize HashAggregate > Group Key: b.j > -> Gather > Workers Planned: 2 > -> Hash Join > Hash Cond: (a.i = b.j) > -> Partial HashAggregate > Group Key: a.i > -> Parallel Seq Scan on a > -> Hash > -> Partial HashAggregate > Group Key: b.j > -> Parallel Seq Scan on b > > or > > QUERY PLAN > ------------------------------------------------------ > Finalize HashAggregate > Group Key: b.j > -> Hash Join > Hash Cond: (a.i = b.j) > -> Gather > Workers Planned: 2 > -> Partial HashAggregate > Group Key: a.i > -> Parallel Seq Scan on a > -> Hash > -> Gather > Workers Planned: 1 > -> Partial HashAggregate > Group Key: b.j > -> Parallel Seq Scan on b > > > An obvious limitation is that neither grouping expression nor aggregate > argument can be below the nullable side of outer join. In such a case the > aggregate at the base relation level wouldn't receive the NULL values that it > does receive at the query level. Also, no aggregate can reference multiple > tables. > > Does this concept seem worth to continue coding? > May be we want to implement this technique without partial aggregation first i.e. push down aggregation and grouping down the join tree and then add partial aggregation steps. That might make it easy to review. Looking at the changes in create_plain_partial_paths(), it looks like we use this technique only in case of parallel query. I think the technique is useful otherwise as well. Also, if we could generalize this technique to push aggregation/grouping upto any relation (not just base but join as well) where it can be calculated that may be better. Trying that might lead us to a better design; which right now is focused only on base relations. > > BTW, if anyone wants to play with the current version: > > 1. Don't forget to initialize a new cluster (initdb) first. I decided not to > bump CATALOG_VERSION_NO so far because it'd probably make the patch > incompatible with master branch quite soon. > > 2. Only hash aggregation is implemented so far at the base relation level. > > 3. As for sum() aggregate, only sum(float4) is supposed to work correctly so > far - this is related to the pg_aggregate changes mentioned above. avg() > should work in general, and I didn't care about the other ones so far. > > 4. As for joins, only hash join is able to process the grouped relations. I > didn't want to do too much coding until there's a consensus on the design. > Probably it's too early to review code, but ... + /* + * If no join is expected, aggregation at base relation level makes no + * sense. XXX Is there simpler way to find out? (We're not interested in + * RELOPT_OTHER_MEMBER_REL, so simple_rel_array_size does not help.) + */ + for (i = 1; i < root->simple_rel_array_size; i++) + { + RelOptInfo *rel; + + rel = find_base_rel(root, i); + if (rel->reloptkind == RELOPT_BASEREL) + { + nbaserels++; + /* + * We only want to know whether the number of relations is greater + * than one. + */ + if (nbaserels > 1) + break; + } + } You might want to check bms_membership(root->all_baserels), instead of this loop. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: > On Mon, Jan 9, 2017 at 11:26 PM, Antonin Houska <ah@cybertec.at> wrote: > > Attached is a draft patch that lets partial aggregation happen at base > > relation level. If the relations contain relatively small number of groups, > > the number of input rows of the aggregation at the query level can be reduced > > this way. Also, if append relation and postgres_fdw planning is enhanced > > accordingly, patch like this can let us aggregate individual tables on remote > > servers (e.g. shard nodes) and thus reduce the amount of rows subject to the > > final aggregation. > For an appendrel probably you need an ability to switch group->append into > append->group Yes, like the new patch version (see attachment) does: postgres=# EXPLAIN (COSTS false) SELECT b.j, sum(a.x) FROM a JOIN b ON a.i = b.j GROUP BY b.j; QUERY PLAN ------------------------------------------------------ Finalize HashAggregate Group Key: b.j -> Hash Join Hash Cond: (a.i = b.j) -> Append -> Partial HashAggregate Group Key: a.i -> Seq Scan on a -> Partial HashAggregate Group Key: a_1.i -> Seq Scan on a_1 -> Partial HashAggregate Group Key: a_2.i -> Seq Scan on a_2 -> Hash -> Gather Workers Planned: 1 -> Partial HashAggregate Group Key: b.j -> Parallel Seq Scan on b > For postgres_fdw, we already support aggregate pushdown. My understanding is that currently it only works if the whole query can be evaluated by the FDW. What I try to do is to push down aggregation of individual table, and join the partially-aggregated set with other tables, which are not necessarily remote or reside on different remote server. > But we don't support fetching partial aggregates from foreign server. What > other enhancements do you need? Here I try to introduce infrastructure for aggregation pushdown and propagation of the transient aggregate state values from base relations to the final join. postgres_fdw can benefit from it but it's not the only use case, so I'd prefer adjusting it in a separate patch. Yes, an outstanding problem is that the remote nodes need to return transient state values - probably using bytea type. I think this functionality should even be separate from postgres_fdw (e.g. a new contrib module?), because the remote nodes do not need postgres_fdw. > > A few "footnotes": > > > > > > As for the example, the processing continues by joining the partially grouped > > sets: > > > > i | sum(x)| count(i.*) | j | count(j.*) > > ---------------------------------------- > > 1 | 7 | 2 | 1 | 2 [ Sorry, count(j.*) should be 2, not 3 as I wrote in the initial email. ] > > > > > > Before performing the final aggregation, we need to multiply sum(a.x) by > > count(j.*) because w/o the aggregation at base relation level the input > > of the query-level aggregation would look like > > > > a.i | a.x | b.j > > ---------------- > > 1 | 3 | 1 > > 1 | 4 | 1 > > 1 | 3 | 1 > > 1 | 4 | 1 > > > > In other words, grouping of the base relation "b" below the join prevents the > > join from bringing per-group input set to the aggregate input multiple > > times. To compensate for this effect, I've added a new field "aggtransmultifn" > > to pg_aggregate catalog. It "multiplies" the aggregate state w/o repeated > > processing of the same input set many times. sum() is an example of an > > aggregate that needs such processing, avg() is one that does not. > > For something like product aggregation, where the product (or higher order > operations) across rows is accumulated instead of sum, mere multiplication > wouldn't help. We will need some higher order operation to "extrapolate" the > result based on count(j.*). In fact, the multiplication factor will depend > upon the number of relations being joined E.g. select b.j, sum(a.x) where > a, b, c where a.i = b.j and a.i = c.k group by b.j Maybe you're saying what I already try to do. Let me modify the example accordingly (unlike the initial example, each table produces a group of different size so the the count() values are harder to confuse): Table "a": i | x ------- 1 | 3 1 | 4 Table "b": j | y ------- 1 | 10 1 | 20 1 | 30 Table "c": k | z --------- 1 | 100 1 | 200 1 | 300 1 | 400 Your query: SELECT b.j, sum(a.x) FROM a, b, c WHERE a.i = b.j AND a.i = c.k GROUP BY b.j The tables grouped: i | sum(a.x) | count(a.*) ---+----------------------- 1 | 7 | 2 j | sum(b.y) | count(b.*) ---+----------------------- 1 | 60 | 3 j | sum(c.z) | count(c.*) ---+----------------------- 1 | 1000 | 4 Grouped (partially) and joined (table names omitted somewhere so that the table width does not exceed 80 characters): i | sum(x) | count(a.*) | j | sum(y) | count(b.*) | k | sum(z) | count(c.*) ---+------------------------------------------------------------------------- 1 | 7 | 2 | 1 | 60 | 3 | k | 1000 | 4 For sum(a.x), the input for final aggregation is the partial sum(x) multiplied by the count(b.*) and also by count(c.*), because grouping of both "b" and "c" reduced the number of times the input values of 3 and 4 arrived to the aggregate node. Thus 7 * 3 * 4 = 84. Likewise, sum(y) * count(a.*) * count(c.*) = 60 * 2 * 4 = 480 and finally, sum(z) * count(a.*) * count(b.*) = 1000 * 2 * 3 = 6000 I get exactly these values when I run your query on master branch w/o my patch, so my theory could be correct :-) > May be we want to implement this technique without partial aggregation first > i.e. push down aggregation and grouping down the join tree and then add > partial aggregation steps. That might make it easy to review. Looking at > the changes in create_plain_partial_paths(), it looks like we use this > technique only in case of parallel query. I think the technique is useful > otherwise as well. The term "partial" is actually ambiguous. Sometimes it refers to "partial path", i.e. path that can be executed by multiple workers, sometimes "partial aggregate", i.e. aggregate that produces the transient state instead of the final values. The fact that partial (i.e. parallel) path was necessary to push aggregation down was rather a limitation of the initial version of the patch. The current version can push the aggregation down even w/o parallelism - see create_plain_partial_paths() -> create_plain_grouped_path(). > Also, if we could generalize this technique to push aggregation/grouping > upto any relation (not just base but join as well) where it can be > calculated that may be better. Trying that might lead us to a better design; > which right now is focused only on base relations. Yes, that's true. Currently this patch is unable to process queries where an aggregate references multiple tables. It should be possible to apply the "grouped path" to a join. > > > > BTW, if anyone wants to play with the current version: > > > > 1. Don't forget to initialize a new cluster (initdb) first. I decided not to > > bump CATALOG_VERSION_NO so far because it'd probably make the patch > > incompatible with master branch quite soon. > > > > 2. Only hash aggregation is implemented so far at the base relation level. > > > > 3. As for sum() aggregate, only sum(float4) is supposed to work correctly so > > far - this is related to the pg_aggregate changes mentioned above. avg() > > should work in general, and I didn't care about the other ones so far. > > > > 4. As for joins, only hash join is able to process the grouped relations. I > > didn't want to do too much coding until there's a consensus on the design. > > > Probably it's too early to review code, but ... Thank's for doing so anyway! > + /* > + * If no join is expected, aggregation at base relation level makes no > + * sense. XXX Is there simpler way to find out? (We're not interested in > + * RELOPT_OTHER_MEMBER_REL, so simple_rel_array_size does not help.) > + */ > + for (i = 1; i < root->simple_rel_array_size; i++) > + { > + RelOptInfo *rel; > + > + rel = find_base_rel(root, i); > + if (rel->reloptkind == RELOPT_BASEREL) > + { > + nbaserels++; > + /* > + * We only want to know whether the number of relations is greater > + * than one. > + */ > + if (nbaserels > 1) > + break; > + } > + } > You might want to check bms_membership(root->all_baserels), instead of > this loop. I liked this idea, but when tried it, I found out that all_baserels gets initialized much later. And when moved the initialization to add_base_rels_to_query I found out that the result is not the same (RELOPT_DEADREL relations) make difference. Thanks for your suggestion anyway. -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Fri, Jan 13, 2017 at 10:12 PM, Antonin Houska <ah@cybertec.at> wrote: > Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: >> On Mon, Jan 9, 2017 at 11:26 PM, Antonin Houska <ah@cybertec.at> wrote: >> > Attached is a draft patch that lets partial aggregation happen at base >> > relation level. If the relations contain relatively small number of groups, >> > the number of input rows of the aggregation at the query level can be reduced >> > this way. Also, if append relation and postgres_fdw planning is enhanced >> > accordingly, patch like this can let us aggregate individual tables on remote >> > servers (e.g. shard nodes) and thus reduce the amount of rows subject to the >> > final aggregation. > >> For an appendrel probably you need an ability to switch group->append into >> append->group > > Yes, like the new patch version (see attachment) does: > > postgres=# EXPLAIN (COSTS false) SELECT b.j, sum(a.x) FROM a JOIN b ON a.i = b.j GROUP BY b.j; > QUERY PLAN > ------------------------------------------------------ > Finalize HashAggregate > Group Key: b.j > -> Hash Join > Hash Cond: (a.i = b.j) > -> Append > -> Partial HashAggregate > Group Key: a.i > -> Seq Scan on a > -> Partial HashAggregate > Group Key: a_1.i > -> Seq Scan on a_1 > -> Partial HashAggregate > Group Key: a_2.i > -> Seq Scan on a_2 > -> Hash > -> Gather > Workers Planned: 1 > -> Partial HashAggregate > Group Key: b.j > -> Parallel Seq Scan on b > >> For postgres_fdw, we already support aggregate pushdown. > > My understanding is that currently it only works if the whole query can be > evaluated by the FDW. What I try to do is to push down aggregation of > individual table, and join the partially-aggregated set with other tables, > which are not necessarily remote or reside on different remote server. You will need to invoke FDW's hook for aggregate pushdown for the base relations. It would work as long as we don't ask it transient results. But I guess, that can come later. >> But we don't support fetching partial aggregates from foreign server. What >> other enhancements do you need? > > Here I try to introduce infrastructure for aggregation pushdown and > propagation of the transient aggregate state values from base relations to the > final join. postgres_fdw can benefit from it but it's not the only use case, > so I'd prefer adjusting it in a separate patch. > > Yes, an outstanding problem is that the remote nodes need to return transient > state values - probably using bytea type. I think this functionality should > even be separate from postgres_fdw (e.g. a new contrib module?), because the > remote nodes do not need postgres_fdw. Hmm, that's a missing piece. We need to work on it separately. > >> > A few "footnotes": >> > >> > >> > As for the example, the processing continues by joining the partially grouped >> > sets: >> > >> > i | sum(x)| count(i.*) | j | count(j.*) >> > ---------------------------------------- >> > 1 | 7 | 2 | 1 | 2 > > [ Sorry, count(j.*) should be 2, not 3 as I wrote in the initial email. ] > >> > >> > >> > Before performing the final aggregation, we need to multiply sum(a.x) by >> > count(j.*) because w/o the aggregation at base relation level the input >> > of the query-level aggregation would look like >> > >> > a.i | a.x | b.j >> > ---------------- >> > 1 | 3 | 1 >> > 1 | 4 | 1 >> > 1 | 3 | 1 >> > 1 | 4 | 1 >> > >> > In other words, grouping of the base relation "b" below the join prevents the >> > join from bringing per-group input set to the aggregate input multiple >> > times. To compensate for this effect, I've added a new field "aggtransmultifn" >> > to pg_aggregate catalog. It "multiplies" the aggregate state w/o repeated >> > processing of the same input set many times. sum() is an example of an >> > aggregate that needs such processing, avg() is one that does not. >> >> For something like product aggregation, where the product (or higher order >> operations) across rows is accumulated instead of sum, mere multiplication >> wouldn't help. We will need some higher order operation to "extrapolate" the >> result based on count(j.*). In fact, the multiplication factor will depend >> upon the number of relations being joined E.g. select b.j, sum(a.x) where >> a, b, c where a.i = b.j and a.i = c.k group by b.j > > Maybe you're saying what I already try to do. Let me modify the example > accordingly (unlike the initial example, each table produces a group of > different size so the the count() values are harder to confuse): > [... snip ]] This all works well, as long as the aggregate is "summing" something across rows. The method doesn't work when aggregation is say "multiplying" across the rows or "concatenating" across the rows like array_agg() or string_agg(). They need a different strategy to combine aggregates across relations. > > I get exactly these values when I run your query on master branch w/o my > patch, so my theory could be correct :-) > >> May be we want to implement this technique without partial aggregation first >> i.e. push down aggregation and grouping down the join tree and then add >> partial aggregation steps. That might make it easy to review. Looking at >> the changes in create_plain_partial_paths(), it looks like we use this >> technique only in case of parallel query. I think the technique is useful >> otherwise as well. > IIUC, we are trying to solve multiple problems here: 1. Pushing down aggregates/groups down join tree, so that the number of rows to be joined decreases. This might be a good optimization to have. However there are problems in the current patch. Every path built for a relation (join or base) returns the same result expressed by the relation or its subset restricted by parameterization or unification. But this patch changes that. It creates paths which represent grouping in the base relation. I think, we need a separate relation to represent that result and hold paths which produce that result. That itself would be a sizable patch. 2. Try to push down aggregates based on the equivalence classes, where grouping properties can be transferred from one relation to the other using EC mechanism. This seems to require solving the problem of combining aggregates across the relations. But there might be some usecases which could benefit without solving this problem. 3. If the relation to which we push the aggregate is an append relation, push (partial) aggregation/grouping down into the child relations. - We don't do that right now even for grouping aggregation on a single append table. Parallel partial aggregation does that, but not exactly per relation. That may be a sizable project in itself. Even without this piece the rest of the optimizations proposed by this patch are important. 4. Additional goal: push down the aggregation to any relation (join/base) where it can be computed. If we break the problem down into smaller problems as above, 1. the resulting patches will be easier to review 2. Since those problems themselves produce some usable feature, there is a chance that more people will be interested in reviewing/testing/coding on those. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On 10 January 2017 at 06:56, Antonin Houska <ah@cybertec.at> wrote: > Before performing the final aggregation, we need to multiply sum(a.x) by > count(j.*) because w/o the aggregation at base relation level the input > of the query-level aggregation would look like > > a.i | a.x | b.j > ---------------- > 1 | 3 | 1 > 1 | 4 | 1 > 1 | 3 | 1 > 1 | 4 | 1 > > In other words, grouping of the base relation "b" below the join prevents the > join from bringing per-group input set to the aggregate input multiple > times. To compensate for this effect, I've added a new field "aggtransmultifn" > to pg_aggregate catalog. It "multiplies" the aggregate state w/o repeated > processing of the same input set many times. sum() is an example of an > aggregate that needs such processing, avg() is one that does not. First off, I'd like to say that making improvements in this area sounds like a great thing. I'm very keen to see progress here. I've been thinking about this aggtransmultifn and I'm not sure if it's really needed. Adding a whole series of new transition functions is quite a pain. At least I think so, and I have a feeling Robert might agree with me. Let's imagine some worst case (and somewhat silly) aggregate query: SELECT count(*) FROM million_row_table CROSS JOIN another_million_row_table; Today that's going to cause 1 TRILLION transitions! Performance will be terrible. If we pushed the aggregate down into one of those tables and performed a Partial Aggregate on that, then a Finalize Aggregate on that single row result (after the join), then that's 1 million transfn calls, and 1 million combinefn calls, one for each row produced by the join. If we did it your way (providing I understand your proposal correctly) there's 1 million transfn calls on one relation, then 1 million on the other and then 1 multiplyfn call. which does 1000000 * 1000000 What did we save vs. using the existing aggcombinefn infrastructure which went into 9.6? Using this actually costs us 1 extra function call, right? I'd imagine the size of the patch to use aggcombinefn instead would be a fraction of the size of the one which included all the new aggmultiplyfns and pg_aggregate.h changes. There's already a lot of infrastructure in there to help you detect when this optimisation can be applied. For example, AggClauseCosts.hasNonPartial will be true if any aggregates don't have a combinefn, or if there's any DISTINCT or ORDER BY aggregates, which'll also mean you can't apply the optimisation. It sounds like a much more manageable project by using aggcombinefn instead. Then maybe one day when we can detect if a join did not cause any result duplication (i.e Unique Joins), we could finalise the aggregates on the first call, and completely skip the combine state altogether. Thanks for your work on this. Regards David Rowley
On 01/17/2017 12:42 AM, David Rowley wrote: > On 10 January 2017 at 06:56, Antonin Houska <ah@cybertec.at> wrote: >> Before performing the final aggregation, we need to multiply sum(a.x) by >> count(j.*) because w/o the aggregation at base relation level the input >> of the query-level aggregation would look like >> >> a.i | a.x | b.j >> ---------------- >> 1 | 3 | 1 >> 1 | 4 | 1 >> 1 | 3 | 1 >> 1 | 4 | 1 >> >> In other words, grouping of the base relation "b" below the join prevents the >> join from bringing per-group input set to the aggregate input multiple >> times. To compensate for this effect, I've added a new field "aggtransmultifn" >> to pg_aggregate catalog. It "multiplies" the aggregate state w/o repeated >> processing of the same input set many times. sum() is an example of an >> aggregate that needs such processing, avg() is one that does not. > > First off, I'd like to say that making improvements in this area > sounds like a great thing. I'm very keen to see progress here. > +1 Pushing down aggregates would be very useful for analytical queries. > > I've been thinking about this aggtransmultifn and I'm not sure if it's > really needed. Adding a whole series of new transition functions is > quite a pain. At least I think so, and I have a feeling Robert might > agree with me. > > Let's imagine some worst case (and somewhat silly) aggregate query: > > SELECT count(*) > FROM million_row_table > CROSS JOIN another_million_row_table; > > Today that's going to cause 1 TRILLION transitions! Performance will > be terrible. > > If we pushed the aggregate down into one of those tables and performed > a Partial Aggregate on that, then a Finalize Aggregate on that single > row result (after the join), then that's 1 million transfn calls, and > 1 million combinefn calls, one for each row produced by the join. > > If we did it your way (providing I understand your proposal correctly) > there's 1 million transfn calls on one relation, then 1 million on the > other and then 1 multiplyfn call. which does 1000000 * 1000000 > > What did we save vs. using the existing aggcombinefn infrastructure > which went into 9.6? Using this actually costs us 1 extra function > call, right? I'd imagine the size of the patch to use aggcombinefn > instead would be a fraction of the size of the one which included all > the new aggmultiplyfns and pg_aggregate.h changes. > I think the patch relies on the assumption that the grouping reduces cardinality, so a CROSS JOIN without a GROUP BY clause may not be the best counterexample. Let's instead use an example similar to what Antonin mentioned in the initial post - two tables, with two columns each. CREATE TABLE t1 (a INT, b INT); CREATE TABLE t2 (c INT, d INT); And let's assume each table has 100.000 rows, but only 100 groups in the first column, with 1000 rows per group. Something like INSERT INTO t1 SELECT mod(i,100), i FROM generate_series(1, 1e5) s(i); INSERT INTO t2 SELECT mod(i,100), i FROM generate_series(1, 1e5) s(i); And let's assume this query SELECT t1.a, count(t2.d) FROM t1 JOIN t2 ON (t1.a = t2.c) GROUP BY t1.a; On master, EXPLAIN (COSTS OFF, TIMING OFF, ANALYZE) looks like this: QUERY PLAN ----------------------------------------------------------------- HashAggregate (actual rows=100 loops=1) Group Key: t1.a -> Hash Join (actual rows=100000000 loops=1) Hash Cond: (t2.c = t1.a) -> Seq Scan on t2 (actual rows=100000loops=1) -> Hash (actual rows=100000 loops=1) Buckets: 131072 Batches: 2 Memory Usage:2716kB -> Seq Scan on t1 (actual rows=100000 loops=1) Planning time: 0.167 ms Execution time: 17005.300ms (10 rows) while with the patch it looks like this QUERY PLAN --------------------------------------------------------------------- Finalize HashAggregate (actual rows=100 loops=1) Group Key: t1.a -> Hash Join (actual rows=100 loops=1) Hash Cond: (t1.a = t2.c) -> Partial HashAggregate(actual rows=100 loops=1) Group Key: t1.a -> Seq Scan on t1 (actual rows=100000loops=1) -> Hash (actual rows=100 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 14kB -> Partial HashAggregate (actual rows=100 loops=1) Group Key: t2.c -> Seq Scan on t2 (actual rows=100000 loops=1) Planning time: 0.105 ms Execution time: 31.609 ms (14 rows) This of course happens because with the patch we run the transition function 200k-times (on each side of the join) and aggtransmultifn on the 100 rows produced by the join, while on master the join produces 10.000.000 rows (which already takes much more time), and then have to run the transition function on all those rows. The performance difference is pretty obvious, I guess. > > There's already a lot of infrastructure in there to help you detect > when this optimisation can be applied. For example, > AggClauseCosts.hasNonPartial will be true if any aggregates don't have > a combinefn, or if there's any DISTINCT or ORDER BY aggregates, > which'll also mean you can't apply the optimisation. > > It sounds like a much more manageable project by using aggcombinefn > instead. Then maybe one day when we can detect if a join did not cause > any result duplication (i.e Unique Joins), we could finalise the > aggregates on the first call, and completely skip the combine state > altogether. > I don't quite see how the patch could use aggcombinefn without sacrificing a lot of the benefits. Sure, it's possible to run the aggcombinefn in a loop (with number of iterations determined by the group size on the other side of the join), but that sounds pretty expensive and eliminates the reduction of transition function calls. The join cardinality would still be reduced, though. I do have other question about the patch, however. It seems to rely on the fact that the grouping and joins both reference the same columns. I wonder how uncommon such queries are. To give a reasonable example, imagine the typical start schema, which is pretty standard for large analytical databases. A dimension table is "products" and the fact table is "sales", and the schema might look like this: CREATE TABLE products ( id SERIAL PRIMARY KEY, name TEXT, category_id INT, producer_id INT ); CREATE TABLE sales ( product_id REFERENCES products (id), nitems INT, price NUMERIC ); A typical query then looks like this: SELECT category_id, SUM(nitems), SUM(price) FROM products p JOIN sales s ON (p.id = s.product_id) GROUP BY p.category_id; which obviously uses different columns for the grouping and join, and so the patch won't help with that. Of course, a query grouping by product_id would allow the patch to work SELECT category_id, SUM(nitems), SUM(price) FROM products p JOIN sales s ON (p.id = s.product_id) GROUP BY p.product_id; Another thing is that in my experience most queries do joins on foreign keys (so the PK side is unique by definition), so the benefit on practical examples is likely much smaller. But I guess my main question is if there are actual examples of queries the patch is trying to improve, or whether the general benefit is allowing parallel plans for queries where it would not be possible otherwise. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 17 January 2017 at 16:30, Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: > Let's instead use an example similar to what Antonin mentioned in the > initial post - two tables, with two columns each. > > CREATE TABLE t1 (a INT, b INT); > CREATE TABLE t2 (c INT, d INT); > > And let's assume each table has 100.000 rows, but only 100 groups in the > first column, with 1000 rows per group. Something like > > INSERT INTO t1 > SELECT mod(i,100), i FROM generate_series(1, 1e5) s(i); > > INSERT INTO t2 > SELECT mod(i,100), i FROM generate_series(1, 1e5) s(i); > > And let's assume this query > > SELECT t1.a, count(t2.d) FROM t1 JOIN t2 ON (t1.a = t2.c) > GROUP BY t1.a; > > On master, EXPLAIN (COSTS OFF, TIMING OFF, ANALYZE) looks like this: > > QUERY PLAN > ----------------------------------------------------------------- > HashAggregate (actual rows=100 loops=1) > Group Key: t1.a > -> Hash Join (actual rows=100000000 loops=1) > Hash Cond: (t2.c = t1.a) > -> Seq Scan on t2 (actual rows=100000 loops=1) > -> Hash (actual rows=100000 loops=1) > Buckets: 131072 Batches: 2 Memory Usage: 2716kB > -> Seq Scan on t1 (actual rows=100000 loops=1) > Planning time: 0.167 ms > Execution time: 17005.300 ms > (10 rows) > > while with the patch it looks like this > > QUERY PLAN > --------------------------------------------------------------------- > Finalize HashAggregate (actual rows=100 loops=1) > Group Key: t1.a > -> Hash Join (actual rows=100 loops=1) > Hash Cond: (t1.a = t2.c) > -> Partial HashAggregate (actual rows=100 loops=1) > Group Key: t1.a > -> Seq Scan on t1 (actual rows=100000 loops=1) > -> Hash (actual rows=100 loops=1) > Buckets: 1024 Batches: 1 Memory Usage: 14kB > -> Partial HashAggregate (actual rows=100 loops=1) > Group Key: t2.c > -> Seq Scan on t2 (actual rows=100000 loops=1) > Planning time: 0.105 ms > Execution time: 31.609 ms > (14 rows) > > This of course happens because with the patch we run the transition function > 200k-times (on each side of the join) and aggtransmultifn on the 100 rows > produced by the join, while on master the join produces 10.000.000 rows > (which already takes much more time), and then have to run the transition > function on all those rows. > > The performance difference is pretty obvious, I guess. An exceptional improvement. For the combine aggregate example of this query, since no patch exists yet, we could simply mock what the planner would do by rewriting the query. I'll use SUM() in-place of the combinefn for COUNT(): explain analyze SELECT t1.a, sum(t2.d) FROM t1 join (SELECT c,count(d) d from t2 group by c) t2 on t1.a = t2.c group by t1.a; this seems to be 100,000 aggtransfn calls (for t2), then 100,000 aggcombinefn calls (for t1) (total = 200,000), where as the patch would perform 100,000 aggtransfn calls (for t2), then 100,000 aggtransfn calls (for t1), then 100 aggtransmultifn calls (total = 200,100) Is my maths ok? > I don't quite see how the patch could use aggcombinefn without sacrificing a > lot of the benefits. Sure, it's possible to run the aggcombinefn in a loop > (with number of iterations determined by the group size on the other side of > the join), but that sounds pretty expensive and eliminates the reduction of > transition function calls. The join cardinality would still be reduced, > though. I'd be interested in seeing the run time of my example query above. I can't quite see a reason for it to be slower, but please let me know. > I do have other question about the patch, however. It seems to rely on the > fact that the grouping and joins both reference the same columns. I wonder > how uncommon such queries are. > > To give a reasonable example, imagine the typical start schema, which is > pretty standard for large analytical databases. A dimension table is > "products" and the fact table is "sales", and the schema might look like > this: > > CREATE TABLE products ( > id SERIAL PRIMARY KEY, > name TEXT, > category_id INT, > producer_id INT > ); > > CREATE TABLE sales ( > product_id REFERENCES products (id), > nitems INT, > price NUMERIC > ); > > A typical query then looks like this: > > SELECT category_id, SUM(nitems), SUM(price) > FROM products p JOIN sales s ON (p.id = s.product_id) > GROUP BY p.category_id; > > which obviously uses different columns for the grouping and join, and so the > patch won't help with that. Of course, a query grouping by product_id would > allow the patch to work > > SELECT category_id, SUM(nitems), SUM(price) > FROM products p JOIN sales s ON (p.id = s.product_id) > GROUP BY p.product_id; > > Another thing is that in my experience most queries do joins on foreign keys > (so the PK side is unique by definition), so the benefit on practical > examples is likely much smaller. > > But I guess my main question is if there are actual examples of queries the > patch is trying to improve, or whether the general benefit is allowing > parallel plans for queries where it would not be possible otherwise. Using the combine function technique the planner could have performed this query the same as if the query had been written as: SELECT p.category_id, SUM(sum_nitems), SUM(sum_price) FROM products p JOIN (SELECT product_id,SUM(nitems) sum_nitems,SUM(price) sum_price FROM sales GROUP BY product_id) s ON p.product_id = s.product_id GROUP BY p.category_id; The outer SUM() would be the combine function for SUM() in the finalize aggregate node. Why's that less efficient? I don't deny that there's cases that this multiple aggregate function won't be able to optimise better than the combine function, but I'm just not that convinced yet it'll be worth the trouble when combine functions, which are already in core could do most of what would be useful with a fraction of the code. -- David Rowley http://www.2ndQuadrant.com/PostgreSQL Development, 24x7 Support, Training & Services
On 01/17/2017 06:39 AM, David Rowley wrote: > On 17 January 2017 at 16:30, Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: >> Let's instead use an example similar to what Antonin mentioned in the >> initial post - two tables, with two columns each. >> >> CREATE TABLE t1 (a INT, b INT); >> CREATE TABLE t2 (c INT, d INT); >> >> And let's assume each table has 100.000 rows, but only 100 groups in the >> first column, with 1000 rows per group. Something like >> >> INSERT INTO t1 >> SELECT mod(i,100), i FROM generate_series(1, 1e5) s(i); >> >> INSERT INTO t2 >> SELECT mod(i,100), i FROM generate_series(1, 1e5) s(i); >> >> And let's assume this query >> >> SELECT t1.a, count(t2.d) FROM t1 JOIN t2 ON (t1.a = t2.c) >> GROUP BY t1.a; >> >> On master, EXPLAIN (COSTS OFF, TIMING OFF, ANALYZE) looks like this: >> >> QUERY PLAN >> ----------------------------------------------------------------- >> HashAggregate (actual rows=100 loops=1) >> Group Key: t1.a >> -> Hash Join (actual rows=100000000 loops=1) >> Hash Cond: (t2.c = t1.a) >> -> Seq Scan on t2 (actual rows=100000 loops=1) >> -> Hash (actual rows=100000 loops=1) >> Buckets: 131072 Batches: 2 Memory Usage: 2716kB >> -> Seq Scan on t1 (actual rows=100000 loops=1) >> Planning time: 0.167 ms >> Execution time: 17005.300 ms >> (10 rows) >> >> while with the patch it looks like this >> >> QUERY PLAN >> --------------------------------------------------------------------- >> Finalize HashAggregate (actual rows=100 loops=1) >> Group Key: t1.a >> -> Hash Join (actual rows=100 loops=1) >> Hash Cond: (t1.a = t2.c) >> -> Partial HashAggregate (actual rows=100 loops=1) >> Group Key: t1.a >> -> Seq Scan on t1 (actual rows=100000 loops=1) >> -> Hash (actual rows=100 loops=1) >> Buckets: 1024 Batches: 1 Memory Usage: 14kB >> -> Partial HashAggregate (actual rows=100 loops=1) >> Group Key: t2.c >> -> Seq Scan on t2 (actual rows=100000 loops=1) >> Planning time: 0.105 ms >> Execution time: 31.609 ms >> (14 rows) >> >> This of course happens because with the patch we run the transition function >> 200k-times (on each side of the join) and aggtransmultifn on the 100 rows >> produced by the join, while on master the join produces 10.000.000 rows >> (which already takes much more time), and then have to run the transition >> function on all those rows. >> >> The performance difference is pretty obvious, I guess. > > An exceptional improvement. > I'm not sure if you're using "exceptional" in the "excellent" sense, or "rare to happen in practice". But I guess both meanings apply here, actually ;-) > For the combine aggregate example of this query, since no patch exists > yet, we could simply mock what the planner would do by rewriting the > query. I'll use SUM() in-place of the combinefn for COUNT(): > > explain analyze SELECT t1.a, sum(t2.d) FROM t1 join (SELECT c,count(d) > d from t2 group by c) t2 on t1.a = t2.c group by t1.a; > QUERY PLAN -------------------------------------------------------------------------------- HashAggregate (actual rows=100 loops=1) Group Key: t1.a -> Hash Join (actual rows=100000 loops=1) Hash Cond: (t1.a = t2.c) -> Seq Scanon t1 (actual rows=100000 loops=1) -> Hash (actual rows=100 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 13kB -> Subquery Scan on t2 (actual rows=100 loops=1) -> HashAggregate(actual rows=100 loops=1) Group Key: t2_1.c -> Seq Scanon t2 t2_1 (actual rows=100000 loops=1) Planning time: 0.271 ms Execution time: 60.226 ms (13 rows) > this seems to be 100,000 aggtransfn calls (for t2), then 100,000 > aggcombinefn calls (for t1) (total = 200,000), where as the patch > would perform 100,000 aggtransfn calls (for t2), then 100,000 > aggtransfn calls (for t1), then 100 aggtransmultifn calls (total = > 200,100) > > Is my maths ok? > Yes, I believe the math for agg function calls is correct. >> >> I don't quite see how the patch could use aggcombinefn without sacrificing a >> lot of the benefits. Sure, it's possible to run the aggcombinefn in a loop >> (with number of iterations determined by the group size on the other side of >> the join), but that sounds pretty expensive and eliminates the reduction of >> transition function calls. The join cardinality would still be reduced, >> though. > > I'd be interested in seeing the run time of my example query above. > I can't quite see a reason for it to be slower, but please let me > know. > It's a bit slower of course, because the join needs to process more rows from t1. The patch reduces cardinalities on both sides of the join, if possible - the example schema is constructed to benefit from this, of course. >> >> I do have other question about the patch, however. It seems to rely on the >> fact that the grouping and joins both reference the same columns. I wonder >> how uncommon such queries are. >> >> To give a reasonable example, imagine the typical start schema, which is >> pretty standard for large analytical databases. A dimension table is >> "products" and the fact table is "sales", and the schema might look like >> this: >> >> CREATE TABLE products ( >> id SERIAL PRIMARY KEY, >> name TEXT, >> category_id INT, >> producer_id INT >> ); >> >> CREATE TABLE sales ( >> product_id REFERENCES products (id), >> nitems INT, >> price NUMERIC >> ); >> >> A typical query then looks like this: >> >> SELECT category_id, SUM(nitems), SUM(price) >> FROM products p JOIN sales s ON (p.id = s.product_id) >> GROUP BY p.category_id; >> >> which obviously uses different columns for the grouping and join, and so the >> patch won't help with that. Of course, a query grouping by product_id would >> allow the patch to work >> >> SELECT category_id, SUM(nitems), SUM(price) >> FROM products p JOIN sales s ON (p.id = s.product_id) >> GROUP BY p.product_id; >> >> Another thing is that in my experience most queries do joins on foreign keys >> (so the PK side is unique by definition), so the benefit on practical >> examples is likely much smaller. >> >> But I guess my main question is if there are actual examples of queries the >> patch is trying to improve, or whether the general benefit is allowing >> parallel plans for queries where it would not be possible otherwise. > > Using the combine function technique the planner could have performed > this query the same as if the query had been written as: > > SELECT p.category_id, SUM(sum_nitems), SUM(sum_price) FROM products p > JOIN (SELECT product_id,SUM(nitems) sum_nitems,SUM(price) sum_price > FROM sales GROUP BY product_id) s ON p.product_id = s.product_id GROUP > BY p.category_id; > > The outer SUM() would be the combine function for SUM() in the > finalize aggregate node. > > Why's that less efficient? > I'm a bit confused. I wasn't talking about efficiency at all, but rather about which cases the patch currently optimizes, and whether it can be extended. The patch currently does nothing for the "group by category_id" query, because it only changes the case when the grouping and join happen on the same columns. So my question is if this is inherent limitation, or if the patch can be extended to such queries. Perhaps it's just a limitation of the WIP patch version? You're right the rewritten query performs better compared to master: 1) master QUERY PLAN ---------------------------------------------------------------------- HashAggregate (actual rows=100 loops=1) Group Key:p.category_id -> Hash Join (actual rows=10000000 loops=1) Hash Cond: (s.product_id = p.id) -> SeqScan on sales s (actual rows=10000000 loops=1) -> Hash (actual rows=10000 loops=1) Buckets: 16384 Batches: 1 Memory Usage: 519kB -> Seq Scan on products p (actual rows=10000 loops=1) Planning time:0.410 ms Execution time: 3577.070 ms (10 rows) 2) rewritten QUERY PLAN ---------------------------------------------------------------------- HashAggregate (actual rows=100 loops=1) Group Key:p.category_id -> Hash Join (actual rows=10000 loops=1) Hash Cond: (sales.product_id = p.id) -> HashAggregate(actual rows=10000 loops=1) Group Key: sales.product_id -> Seq Scan on sales (actualrows=10000000 loops=1) -> Hash (actual rows=10000 loops=1) Buckets: 16384 Batches: 1 MemoryUsage: 519kB -> Seq Scan on products p (actual rows=10000 loops=1) Planning time: 0.555 ms Executiontime: 2585.287 ms (12 rows) I can't really compare it to the patch, because that simply does the same thing as master. > > I don't deny that there's cases that this multiple aggregate function > won't be able to optimise better than the combine function, but I'm > just not that convinced yet it'll be worth the trouble when combine > functions, which are already in core could do most of what would be > useful with a fraction of the code. > Sure, no problem with that. It's essentially a cost/benefit analysis, and we're still trying to understand what the patch does/can do. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: > [... snip ]] > > This all works well, as long as the aggregate is "summing" something > across rows. The method doesn't work when aggregation is say > "multiplying" across the rows or "concatenating" across the rows like > array_agg() or string_agg(). They need a different strategy to combine > aggregates across relations. Good point. The common characteristic of these seems to be that thay don't have aggcombinefn defined. > IIUC, we are trying to solve multiple problems here: > 1. Pushing down aggregates/groups down join tree, so that the number of rows > to be joined decreases. This might be a good optimization to have. However > there are problems in the current patch. Every path built for a relation > (join or base) returns the same result expressed by the relation or its > subset restricted by parameterization or unification. But this patch changes > that. It creates paths which represent grouping in the base relation. I > think, we need a separate relation to represent that result and hold paths > which produce that result. That itself would be a sizable patch. Whether a separate relation (RelOptInfo) should be created for grouped relation is an important design decision indeed. More important than your argument about the same result ("partial path", used to implement parallel nodes actually does not fit this criterion perfectly - it only returns part of the set) is the fact that the data type (target) differs. I even spent some time coding a prototype where separate RelOptInfo is created for the grouped relation but it was much more invasive. In particular, if only some relations are grouped, it's hard to join them with non-grouped ones w/o changing make_rel_from_joinlist and subroutines substantially. (Decision whether the plain or the grouped relation should be involved in joining makes little sense at the leaf level of the join tree.) So I took the approach that resembles the partial paths - separate pathlists within the same RelOptInfo. > 2. Try to push down aggregates based on the equivalence classes, where > grouping properties can be transferred from one relation to the other using > EC mechanism. I don't think the EC part should increase the patch complexity a lot. Unless I missed something, it's rather isolated to the part where target of the grouped paths is assembled. And I think it's important even for initial version of the patch. > This seems to require solving the problem of combining aggregates across the > relations. But there might be some usecases which could benefit without > solving this problem. If "combining aggregates ..." refers to joining grouped relations, then I insist on doing this in the initial version of the new feature too. Otherwise it'd only work if exactly one base relation of the query is grouped. > 3. If the relation to which we push the aggregate is an append relation, > push (partial) aggregation/grouping down into the child relations. - We > don't do that right now even for grouping aggregation on a single append > table. Parallel partial aggregation does that, but not exactly per > relation. That may be a sizable project in itself. Even without this piece > the rest of the optimizations proposed by this patch are important. Yes, this can be done in a separate patch. I'll consider it. > 4. Additional goal: push down the aggregation to any relation (join/base) > where it can be computed. I think this can be achieved by adding extra aggregation nodes to the join tree. As I still anticipate more important design changes, this part is not at the top of my TODO list. -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at
[ Trying to respond to both Tomas and David. I'll check tomorrow if anything else of the thread needs my comment. ] Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: > On 01/17/2017 12:42 AM, David Rowley wrote: > > On 10 January 2017 at 06:56, Antonin Houska <ah@cybertec.at> wrote: > > I've been thinking about this aggtransmultifn and I'm not sure if it's > > really needed. Adding a whole series of new transition functions is > > quite a pain. At least I think so, and I have a feeling Robert might > > agree with me. > > > > Let's imagine some worst case (and somewhat silly) aggregate query: > > > > SELECT count(*) > > FROM million_row_table > > CROSS JOIN another_million_row_table; > > > > Today that's going to cause 1 TRILLION transitions! Performance will > > be terrible. > > > > If we pushed the aggregate down into one of those tables and performed > > a Partial Aggregate on that, then a Finalize Aggregate on that single > > row result (after the join), then that's 1 million transfn calls, and > > 1 million combinefn calls, one for each row produced by the join. > > > > If we did it your way (providing I understand your proposal correctly) > > there's 1 million transfn calls on one relation, then 1 million on the > > other and then 1 multiplyfn call. which does 1000000 * 1000000 > > > > What did we save vs. using the existing aggcombinefn infrastructure > > which went into 9.6? Using this actually costs us 1 extra function > > call, right? I'd imagine the size of the patch to use aggcombinefn > > instead would be a fraction of the size of the one which included all > > the new aggmultiplyfns and pg_aggregate.h changes. > > > I think the patch relies on the assumption that the grouping reduces > cardinality, Yes. > so a CROSS JOIN without a GROUP BY clause may not be the best > counterexample. Yet it tells me that my approach is not ideal in some cases ... > > It sounds like a much more manageable project by using aggcombinefn > > instead. Then maybe one day when we can detect if a join did not cause > > any result duplication (i.e Unique Joins), we could finalise the > > aggregates on the first call, and completely skip the combine state > > altogether. > > > I don't quite see how the patch could use aggcombinefn without sacrificing a > lot of the benefits. Sure, it's possible to run the aggcombinefn in a loop > (with number of iterations determined by the group size on the other side of > the join), but that sounds pretty expensive and eliminates the reduction of > transition function calls. The join cardinality would still be reduced, > though. That's what I think. The generic case is that neither side of the join is unique. If it appears that both relations should be aggregated below the join, aggcombinefn would have to be called multiple times on each output row of the join to achieve the same result as the calling aggmultiplyfn. > I do have other question about the patch, however. It seems to rely on the > fact that the grouping and joins both reference the same columns. I wonder how > uncommon such queries are. > > To give a reasonable example, imagine the typical start schema, which is > pretty standard for large analytical databases. A dimension table is > "products" and the fact table is "sales", and the schema might look like this: > > CREATE TABLE products ( > id SERIAL PRIMARY KEY, > name TEXT, > category_id INT, > producer_id INT > ); > > CREATE TABLE sales ( > product_id REFERENCES products (id), > nitems INT, > price NUMERIC > ); > > A typical query then looks like this: > > SELECT category_id, SUM(nitems), SUM(price) > FROM products p JOIN sales s ON (p.id = s.product_id) > GROUP BY p.category_id; > > which obviously uses different columns for the grouping and join, and so the > patch won't help with that. Of course, a query grouping by product_id would > allow the patch to work Right, the current version does not handle this. Thanks for suggestion. > Another thing is that in my experience most queries do joins on foreign keys > (so the PK side is unique by definition), so the benefit on practical examples > is likely much smaller. ok. So in some cases the David's approach might be better. However I think the ability to join 2 grouped (originally non-unique) relations is still important. Consider a query involving "sales" as well as another table which also has many-to-one relationship to "products". > But I guess my main question is if there are actual examples of queries the > patch is trying to improve, or whether the general benefit is allowing > parallel plans for queries where it would not be possible otherwise. In fact I did all this with postgres_fdw in mind. From this perspective, David's approach can be slightly more efficient if all the tables are local, but aggregation of multiple base relations below the join can save a lot of effort if the tables are remote (as it reduces the amount of data transferred over network). I'm not terribly happy about changing the system catalog, but adding something like pg_aggregate(aggtransmultifn) is currently the best idea I have. -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at
On 01/17/2017 08:05 PM, Antonin Houska wrote: > [ Trying to respond to both Tomas and David. I'll check tomorrow if anything > else of the thread needs my comment. ] > > Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: > >> On 01/17/2017 12:42 AM, David Rowley wrote: >>> On 10 January 2017 at 06:56, Antonin Houska <ah@cybertec.at> wrote: > >>> I've been thinking about this aggtransmultifn and I'm not sure if it's >>> really needed. Adding a whole series of new transition functions is >>> quite a pain. At least I think so, and I have a feeling Robert might >>> agree with me. >>> >>> Let's imagine some worst case (and somewhat silly) aggregate query: >>> >>> SELECT count(*) >>> FROM million_row_table >>> CROSS JOIN another_million_row_table; >>> >>> Today that's going to cause 1 TRILLION transitions! Performance will >>> be terrible. >>> >>> If we pushed the aggregate down into one of those tables and performed >>> a Partial Aggregate on that, then a Finalize Aggregate on that single >>> row result (after the join), then that's 1 million transfn calls, and >>> 1 million combinefn calls, one for each row produced by the join. >>> >>> If we did it your way (providing I understand your proposal correctly) >>> there's 1 million transfn calls on one relation, then 1 million on the >>> other and then 1 multiplyfn call. which does 1000000 * 1000000 >>> >>> What did we save vs. using the existing aggcombinefn infrastructure >>> which went into 9.6? Using this actually costs us 1 extra function >>> call, right? I'd imagine the size of the patch to use aggcombinefn >>> instead would be a fraction of the size of the one which included all >>> the new aggmultiplyfns and pg_aggregate.h changes. >>> > >> I think the patch relies on the assumption that the grouping reduces >> cardinality, > > Yes. > >> so a CROSS JOIN without a GROUP BY clause may not be the best >> counterexample. > > Yet it tells me that my approach is not ideal in some cases ... > >>> It sounds like a much more manageable project by using aggcombinefn >>> instead. Then maybe one day when we can detect if a join did not cause >>> any result duplication (i.e Unique Joins), we could finalise the >>> aggregates on the first call, and completely skip the combine state >>> altogether. >>> > >> I don't quite see how the patch could use aggcombinefn without sacrificing a >> lot of the benefits. Sure, it's possible to run the aggcombinefn in a loop >> (with number of iterations determined by the group size on the other side of >> the join), but that sounds pretty expensive and eliminates the reduction of >> transition function calls. The join cardinality would still be reduced, >> though. > > That's what I think. The generic case is that neither side of the join is > unique. If it appears that both relations should be aggregated below the join, > aggcombinefn would have to be called multiple times on each output row of the > join to achieve the same result as the calling aggmultiplyfn. > >> I do have other question about the patch, however. It seems to rely on the >> fact that the grouping and joins both reference the same columns. I wonder how >> uncommon such queries are. >> >> To give a reasonable example, imagine the typical start schema, which is >> pretty standard for large analytical databases. A dimension table is >> "products" and the fact table is "sales", and the schema might look like this: >> >> CREATE TABLE products ( >> id SERIAL PRIMARY KEY, >> name TEXT, >> category_id INT, >> producer_id INT >> ); >> >> CREATE TABLE sales ( >> product_id REFERENCES products (id), >> nitems INT, >> price NUMERIC >> ); >> >> A typical query then looks like this: >> >> SELECT category_id, SUM(nitems), SUM(price) >> FROM products p JOIN sales s ON (p.id = s.product_id) >> GROUP BY p.category_id; >> >> which obviously uses different columns for the grouping and join, and so the >> patch won't help with that. Of course, a query grouping by product_id would >> allow the patch to work > > Right, the current version does not handle this. Thanks for suggestion.> So you're saying it's merely a limitation of the initial patch version and not an inherent limitation? > >> Another thing is that in my experience most queries do joins on foreign keys >> (so the PK side is unique by definition), so the benefit on practical examples >> is likely much smaller. > > ok. So in some cases the David's approach might be better. > In which cases would David's approach be more efficient? But even if there are such cases, I assume we could generate both paths and decide based on cost, just like with all other alternative paths. > > However I think the ability to join 2 grouped (originally non-unique) > relations is still important. Consider a query involving "sales" as well as > another table which also has many-to-one relationship to "products". > Well, can you give a practical example? What you describe seems like a combination of two fact tables + a dimension, something like this: CREATE TABLE products ( id SERIAL PRIMARY KEY, name TEXT, category_id INT, producer_id INT ); CREATE TABLE sales ( product_id REFERENCES products (id), nitems INT, price NUMERIC ); CREATE TABLE reviews ( product_id REFERENCES products (id), stars INT ); But how exactly do you join that together? Because SELECT * FROM products p JOIN sales s ON (p.id = s.product_id) JOIN reviews r ON (p.id =r.product_id) is clearly wrong - it's essentially M:N join between the two fact tables, increasing the number of rows. It'd helpful to have an example of a practical query optimized by the patch. I'm not claiming it does not exist, but I've been unable to come up with something reasonable at the moment. >> But I guess my main question is if there are actual examples of queries the >> patch is trying to improve, or whether the general benefit is allowing >> parallel plans for queries where it would not be possible otherwise. > > In fact I did all this with postgres_fdw in mind. > I assume there's not much difference between pushing down aggregates to local workers and to remote nodes. There'll be costing differences, but are there any other differences? > > From this perspective, David's approach can be slightly more efficient if all > the tables are local, but aggregation of multiple base relations below the > join can save a lot of effort if the tables are remote (as it reduces the > amount of data transferred over network). > > I'm not terribly happy about changing the system catalog, but adding something > like pg_aggregate(aggtransmultifn) is currently the best idea I have. > I personally don't see that as a major problem, my impression it can be mostly copied from the partial aggregate patch - it's not trivial, but manageable. Propagating it to the optimizer will be less trivial, but well, if it's necessary ... regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Tue, Jan 17, 2017 at 10:07 PM, Antonin Houska <ah@cybertec.at> wrote: > Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: >> [... snip ]] >> >> This all works well, as long as the aggregate is "summing" something >> across rows. The method doesn't work when aggregation is say >> "multiplying" across the rows or "concatenating" across the rows like >> array_agg() or string_agg(). They need a different strategy to combine >> aggregates across relations. > > Good point. The common characteristic of these seems to be that thay don't > have aggcombinefn defined. I don't think aggcombinefn isn't there because we couldn't write it for array_agg() or string_agg(). I guess, it won't be efficient to have those aggregates combined across parallel workers. Also, the point is naming that kind of function as aggtransmultifn would mean that it's always supposed to multiply, which isn't true for all aggregates. > >> IIUC, we are trying to solve multiple problems here: > >> 1. Pushing down aggregates/groups down join tree, so that the number of rows >> to be joined decreases. This might be a good optimization to have. However >> there are problems in the current patch. Every path built for a relation >> (join or base) returns the same result expressed by the relation or its >> subset restricted by parameterization or unification. But this patch changes >> that. It creates paths which represent grouping in the base relation. I >> think, we need a separate relation to represent that result and hold paths >> which produce that result. That itself would be a sizable patch. > > Whether a separate relation (RelOptInfo) should be created for grouped > relation is an important design decision indeed. More important than your > argument about the same result ("partial path", used to implement parallel > nodes actually does not fit this criterion perfectly - it only returns part of > the set) is the fact that the data type (target) differs. Right! > > I even spent some time coding a prototype where separate RelOptInfo is created > for the grouped relation but it was much more invasive. In particular, if only > some relations are grouped, it's hard to join them with non-grouped ones w/o > changing make_rel_from_joinlist and subroutines substantially. (Decision > whether the plain or the grouped relation should be involved in joining makes > little sense at the leaf level of the join tree.) > > So I took the approach that resembles the partial paths - separate pathlists > within the same RelOptInfo. Yes, it's hard, but I think without having a separate RelOptInfo the design won't be complete. Is there a subset of problem that can be solved by using a separate RelOptInfo e.g. pushing aggregates down child relations or anything else. > >> 2. Try to push down aggregates based on the equivalence classes, where >> grouping properties can be transferred from one relation to the other using >> EC mechanism. > > I don't think the EC part should increase the patch complexity a lot. Unless I > missed something, it's rather isolated to the part where target of the grouped > paths is assembled. And I think it's important even for initial version of the > patch. > >> This seems to require solving the problem of combining aggregates across the >> relations. But there might be some usecases which could benefit without >> solving this problem. > > If "combining aggregates ..." refers to joining grouped relations, then I > insist on doing this in the initial version of the new feature too. Otherwise > it'd only work if exactly one base relation of the query is grouped. No. "combining aggregates" refers to what aggtransmultifn does. But, possibly that problem needs to be solved in the first step itself. > >> 3. If the relation to which we push the aggregate is an append relation, >> push (partial) aggregation/grouping down into the child relations. - We >> don't do that right now even for grouping aggregation on a single append >> table. Parallel partial aggregation does that, but not exactly per >> relation. That may be a sizable project in itself. Even without this piece >> the rest of the optimizations proposed by this patch are important. > > Yes, this can be done in a separate patch. I'll consider it. > >> 4. Additional goal: push down the aggregation to any relation (join/base) >> where it can be computed. > > I think this can be achieved by adding extra aggregation nodes to the join > tree. As I still anticipate more important design changes, this part is not at > the top of my TODO list. I guess, attempting this will reveal some more design changes required; it may actually simplify the design. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On Tue, Jan 17, 2017 at 11:33 PM, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: > I don't think aggcombinefn isn't there because we couldn't write it > for array_agg() or string_agg(). I guess, it won't be efficient to > have those aggregates combined across parallel workers. I think there are many cases where it would work fine. I assume that David just didn't make it a priority to write those functions because it wasn't important to the queries he wanted to optimize. But somebody can submit a patch for it any time and I bet it will have practical use cases. There might be some performance problems shoving large numbers of lengthy values through a shm_mq, but we won't know that until somebody tries it. > Also, the point is naming that kind of function as aggtransmultifn > would mean that it's always supposed to multiply, which isn't true for > all aggregates. TransValue * integer = newTransValue is well-defined for any aggregate. It's the result of aggregating that TransValue with itself a number of times defined by the integer. And that might well be significantly faster than using aggcombinefn many times. On the other hand, how many queries just sit there are re-aggregate the same TransValues over and over again? I am having trouble wrapping my head around that part of this. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On 19 January 2017 at 07:32, Robert Haas <robertmhaas@gmail.com> wrote: > On Tue, Jan 17, 2017 at 11:33 PM, Ashutosh Bapat > <ashutosh.bapat@enterprisedb.com> wrote: >> I don't think aggcombinefn isn't there because we couldn't write it >> for array_agg() or string_agg(). I guess, it won't be efficient to >> have those aggregates combined across parallel workers. > > I think there are many cases where it would work fine. I assume that > David just didn't make it a priority to write those functions because > it wasn't important to the queries he wanted to optimize. But > somebody can submit a patch for it any time and I bet it will have > practical use cases. There might be some performance problems shoving > large numbers of lengthy values through a shm_mq, but we won't know > that until somebody tries it. I had assumed that the combine function which combines a large array or a large string would not be any cheaper than doing that incrementally with the transfn. Of course some of this would happen in parallel, but it still doubles up some of the memcpy()ing, so perhaps it would be slower? ... I didn't ever get a chance to test it. -- David Rowley http://www.2ndQuadrant.com/PostgreSQL Development, 24x7 Support, Training & Services
On Wed, Jan 18, 2017 at 5:14 PM, David Rowley <david.rowley@2ndquadrant.com> wrote: > On 19 January 2017 at 07:32, Robert Haas <robertmhaas@gmail.com> wrote: >> On Tue, Jan 17, 2017 at 11:33 PM, Ashutosh Bapat >> <ashutosh.bapat@enterprisedb.com> wrote: >>> I don't think aggcombinefn isn't there because we couldn't write it >>> for array_agg() or string_agg(). I guess, it won't be efficient to >>> have those aggregates combined across parallel workers. >> >> I think there are many cases where it would work fine. I assume that >> David just didn't make it a priority to write those functions because >> it wasn't important to the queries he wanted to optimize. But >> somebody can submit a patch for it any time and I bet it will have >> practical use cases. There might be some performance problems shoving >> large numbers of lengthy values through a shm_mq, but we won't know >> that until somebody tries it. > > I had assumed that the combine function which combines a large array > or a large string would not be any cheaper than doing that > incrementally with the transfn. Of course some of this would happen in > parallel, but it still doubles up some of the memcpy()ing, so perhaps > it would be slower? ... I didn't ever get a chance to test it. Even if that particular bit is not very much faster, it might have the advantage of letting other parts of the plan be parallelized, and you can still win that way. In the internal-to-EnterpriseDB experiments we've been doing over the last few months, we've seen that kind of thing a lot, and it informs a lot of the patches that my colleagues have been submitting. But I also wouldn't be surprised if there are cases where it wins big even without that. For example, if you're doing an aggregate with lots of groups and good physical-to-logical correlation, the normal case might be for all the rows in a group to be on the same page. So you parallel seq scan the table and have hardly any need to run the combine function in the leader (but of course you have to have it available just in case you do need it). -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Thu, Jan 19, 2017 at 12:02 AM, Robert Haas <robertmhaas@gmail.com> wrote: > On Tue, Jan 17, 2017 at 11:33 PM, Ashutosh Bapat > <ashutosh.bapat@enterprisedb.com> wrote: >> I don't think aggcombinefn isn't there because we couldn't write it >> for array_agg() or string_agg(). I guess, it won't be efficient to >> have those aggregates combined across parallel workers. > > I think there are many cases where it would work fine. I assume that > David just didn't make it a priority to write those functions because > it wasn't important to the queries he wanted to optimize. But > somebody can submit a patch for it any time and I bet it will have > practical use cases. There might be some performance problems shoving > large numbers of lengthy values through a shm_mq, but we won't know > that until somebody tries it. > >> Also, the point is naming that kind of function as aggtransmultifn >> would mean that it's always supposed to multiply, which isn't true for >> all aggregates. > > TransValue * integer = newTransValue is well-defined for any > aggregate. It's the result of aggregating that TransValue with itself > a number of times defined by the integer. And that might well be > significantly faster than using aggcombinefn many times. On the other > hand, how many queries just sit there are re-aggregate the same > TransValues over and over again? I am having trouble wrapping my head > around that part of this. Not all aggregates have TransValue * integer = newTransValue behaviour. Example is array_agg() or string_agg() has "TransValue concatenated integer time" behaviour. Or an aggregate "multiplying" values across rows will have TransValue (raised to) integer behaviour. Labelling all of those as "multi" doesn't look good. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On 01/19/2017 04:09 AM, Ashutosh Bapat wrote: > On Thu, Jan 19, 2017 at 12:02 AM, Robert Haas <robertmhaas@gmail.com> wrote: >> On Tue, Jan 17, 2017 at 11:33 PM, Ashutosh Bapat >> >>> Also, the point is naming that kind of function as aggtransmultifn >>> would mean that it's always supposed to multiply, which isn't true for >>> all aggregates. >> >> TransValue * integer = newTransValue is well-defined for any >> aggregate. It's the result of aggregating that TransValue with itself >> a number of times defined by the integer. And that might well be >> significantly faster than using aggcombinefn many times. On the other >> hand, how many queries just sit there are re-aggregate the same >> TransValues over and over again? I am having trouble wrapping my head >> around that part of this. > > Not all aggregates have TransValue * integer = newTransValue > behaviour. Example is array_agg() or string_agg() has "TransValue > concatenated integer time" behaviour. Or an aggregate "multiplying" > values across rows will have TransValue (raised to) integer behaviour. > Labelling all of those as "multi" doesn't look good. > All aggregates that have (or can have) a combine function have it, because in the worst case you can simply implement it by calling the combine function repeatedly. Also, if you treat the combine function as "+" then the "multiply" function is exactly what "*" is expected to do. So I find the naming quite appropriate, actually. But I think naming of the function is not the most important aspect of the patch, I believe. In the worst case, we can do s/multi/whatever/ sometime later. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: > On 01/17/2017 08:05 PM, Antonin Houska wrote: > > Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: > > > > > >> Another thing is that in my experience most queries do joins on foreign keys > >> (so the PK side is unique by definition), so the benefit on practical examples > >> is likely much smaller. > > > > ok. So in some cases the David's approach might be better. > > > In which cases would David's approach be more efficient? But even if there are > such cases, I assume we could generate both paths and decide based on cost, > just like with all other alternative paths. Sorry, it was my thinko - I somehow confused David's CROSS JOIN example with this one. If one side of the join clause is unique and the other becomes unique due to aggregation (and if parallel processing is not engaged) then neither combinefn nor multiplyfn should be necessary before the finalfn. > > However I think the ability to join 2 grouped (originally non-unique) > > relations is still important. Consider a query involving "sales" as well as > > another table which also has many-to-one relationship to "products". > > > > Well, can you give a practical example? What you describe seems like a > combination of two fact tables + a dimension, something like this: > > CREATE TABLE products ( > id SERIAL PRIMARY KEY, > name TEXT, > category_id INT, > producer_id INT > ); > > CREATE TABLE sales ( > product_id REFERENCES products (id), > nitems INT, > price NUMERIC > ); > > CREATE TABLE reviews ( > product_id REFERENCES products (id), > stars INT > ); > > But how exactly do you join that together? Because > > SELECT * FROM products p JOIN sales s ON (p.id = s.product_id) > JOIN reviews r ON (p.id = r.product_id) > > is clearly wrong - it's essentially M:N join between the two fact tables, > increasing the number of rows. Without elaborating details I imagined join condition between the 2 fact tables which references their non-unique columns, but that does not make sense here. Sorry. > It'd helpful to have an example of a practical query optimized by the > patch. I'm not claiming it does not exist, but I've been unable to come up > with something reasonable at the moment. As I mentioned elsewhere, remote aggregation is the primary use case. Besides that, I can imagine another one - join of partitioned tables (costs are not displayed just to make the plan easier to read). For this query SELECT p.id, sum(price) FROM products AS p JOIN sales AS s ON s.product_id = p.id GROUP BY p.id I get this plan at "normal circumstances" HashAggregate Group Key: p.id -> Hash Join Hash Cond: (s.product_id = p.id) -> Gather WorkersPlanned: 2 -> Append -> Parallel Seq Scan on sales s -> ParallelSeq Scan on sales_2015 s_1 -> Parallel Seq Scan on sales_2016 s_2 -> ParallelSeq Scan on sales_2017 s_3 -> Hash -> Gather Workers Planned: 2 -> Append -> Parallel Seq Scan on products p -> ParallelSeq Scan on products_01 p_1 -> Parallel Seq Scan on products_02 p_2 -> Parallel Seq Scan on products_03 p_3 -> Parallel Seq Scan on products_04 p_4 but if work_mem is sufficiently low for the hash join to be efficient, the aggregation can be moved to individual partitions. Gather Workers Planned: 1 Single Copy: true -> Finalize HashAggregate Group Key: p.id -> Hash Join Hash Cond: (p.id = s.product_id) -> Append -> Partial HashAggregate Group Key: p.id -> Seq Scan on products p -> Partial HashAggregate Group Key: p_1.id -> Seq Scan on products_01 p_1 -> Partial HashAggregate Group Key: p_2.id -> Seq Scan onproducts_02 p_2 -> Partial HashAggregate Group Key: p_3.id -> Seq Scan on products_03 p_3 -> Partial HashAggregate Group Key:p_4.id -> Seq Scan on products_04 p_4 -> Hash -> Append -> Partial HashAggregate Group Key: s.product_id -> Seq Scan on sales s -> Partial HashAggregate Group Key: s_1.product_id -> Seq Scan on sales_2015 s_1 -> PartialHashAggregate Group Key: s_2.product_id -> Seq Scanon sales_2016 s_2 -> Partial HashAggregate Group Key: s_3.product_id -> Seq Scan on sales_2017 s_3 Finally, the patch should help if the aggregation argument is rather expensive to evaluate. I refer to this part of the example plan that I showed in the initial message of this thread (in which case the argument was actually simple ...): -> Gather Workers Planned: 2 -> Partial HashAggregate Group Key: a.i -> Parallel Seq Scan on a The first 2 cases show where the base relation grouping can help alone, the last one combines the base relation grouping with parallel query processing. > >> But I guess my main question is if there are actual examples of queries the > >> patch is trying to improve, or whether the general benefit is allowing > >> parallel plans for queries where it would not be possible otherwise. > > > > In fact I did all this with postgres_fdw in mind. > > I assume there's not much difference between pushing down aggregates to local > workers and to remote nodes. There'll be costing differences, but are there > any other differences? It's easier for me to see what these 2 things have in common than to point out differences. One common thing is that aggregation takes place in multiple stages. They can also be similar in terms of implementation (e.g. by keeping paths in separate lists), although there are still some choices to be done. As for doing aggregation in parallel, I personally don't think that "pushing the aggregate down to worker" is the exact description. If the parallel worker aggregates the base relation and if the nearest Gather node is at the top of the plan, then the worker actually performs the whole plan, except for the Gather node. And yes, costing needs to be considered. Besides estimating the inputs (table width, but especially row count), some "policy decisions" might be necessary, similar to those that planner applies to parameterized paths - see the comments of add_path(). The grouped path should not be used where relatively smally error in estimates of the base relation aggregation can lead to significant error in the total plan costs. >>> CREATE TABLE products ( >>> id SERIAL PRIMARY KEY, >>> name TEXT, >>> category_id INT, >>> producer_id INT >>> ); >>> >>> CREATE TABLE sales ( >>> product_id REFERENCES products (id), >>> nitems INT, >>> price NUMERIC >>> ); >>> >>> A typical query then looks like this: >>> >>> SELECT category_id, SUM(nitems), SUM(price) >>> FROM products p JOIN sales s ON (p.id = s.product_id) >>> GROUP BY p.category_id; >>> >>> which obviously uses different columns for the grouping and join, and so the >>> patch won't help with that. Of course, a query grouping by product_id would >>> allow the patch to work >> >> Right, the current version does not handle this. Thanks for suggestion. >> > > So you're saying it's merely a limitation of the initial patch version and not > an inherent limitation? I just haven't thought about this so far, but now that I try, I seem to miss something. p.category_id as grouping column gives the patch no chance to do anything useful because only one table of the join has that column. "product_id" does help, but gives a different result ... -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at
Antonin Houska <ah@cybertec.at> wrote: Well, the following one does not seem to be a typical example. I could generate the plan, but now I think that the aggregation push down does not in general decrease the number of groups the final aggregation has to process. Maybe I just hit planner limitation to estimate the number of groups within append relation. > For this query > > SELECT > p.id, sum(price) > FROM > products AS p > JOIN sales AS s ON s.product_id = p.id > GROUP BY > p.id > > I get this plan at "normal circumstances" > > HashAggregate > Group Key: p.id > -> Hash Join > Hash Cond: (s.product_id = p.id) > -> Gather > Workers Planned: 2 > -> Append > -> Parallel Seq Scan on sales s > -> Parallel Seq Scan on sales_2015 s_1 > -> Parallel Seq Scan on sales_2016 s_2 > -> Parallel Seq Scan on sales_2017 s_3 > -> Hash > -> Gather > Workers Planned: 2 > -> Append > -> Parallel Seq Scan on products p > -> Parallel Seq Scan on products_01 p_1 > -> Parallel Seq Scan on products_02 p_2 > -> Parallel Seq Scan on products_03 p_3 > -> Parallel Seq Scan on products_04 p_4 > > > but if work_mem is sufficiently low for the hash join to be efficient, the > aggregation can be moved to individual partitions. > > Gather > Workers Planned: 1 > Single Copy: true > -> Finalize HashAggregate > Group Key: p.id > -> Hash Join > Hash Cond: (p.id = s.product_id) > -> Append > -> Partial HashAggregate > Group Key: p.id > -> Seq Scan on products p > -> Partial HashAggregate > Group Key: p_1.id > -> Seq Scan on products_01 p_1 > -> Partial HashAggregate > Group Key: p_2.id > -> Seq Scan on products_02 p_2 > -> Partial HashAggregate > Group Key: p_3.id > -> Seq Scan on products_03 p_3 > -> Partial HashAggregate > Group Key: p_4.id > -> Seq Scan on products_04 p_4 > -> Hash > -> Append > -> Partial HashAggregate > Group Key: s.product_id > -> Seq Scan on sales s > -> Partial HashAggregate > Group Key: s_1.product_id > -> Seq Scan on sales_2015 s_1 > -> Partial HashAggregate > Group Key: s_2.product_id > -> Seq Scan on sales_2016 s_2 > -> Partial HashAggregate > Group Key: s_3.product_id > -> Seq Scan on sales_2017 s_3 -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at
Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: > >> 1. Pushing down aggregates/groups down join tree, so that the number of rows > >> to be joined decreases. This might be a good optimization to have. However > >> there are problems in the current patch. Every path built for a relation > >> (join or base) returns the same result expressed by the relation or its > >> subset restricted by parameterization or unification. But this patch changes > >> that. It creates paths which represent grouping in the base relation. I > >> think, we need a separate relation to represent that result and hold paths > >> which produce that result. That itself would be a sizable patch. > > > > Whether a separate relation (RelOptInfo) should be created for grouped > > relation is an important design decision indeed. More important than your > > argument about the same result ("partial path", used to implement parallel > > nodes actually does not fit this criterion perfectly - it only returns part of > > the set) is the fact that the data type (target) differs. > > I even spent some time coding a prototype where separate RelOptInfo is created > > for the grouped relation but it was much more invasive. In particular, if only > > some relations are grouped, it's hard to join them with non-grouped ones w/o > > changing make_rel_from_joinlist and subroutines substantially. (Decision > > whether the plain or the grouped relation should be involved in joining makes > > little sense at the leaf level of the join tree.) > > > > So I took the approach that resembles the partial paths - separate pathlists > > within the same RelOptInfo. > Yes, it's hard, but I think without having a separate RelOptInfo the > design won't be complete. Is there a subset of problem that can be > solved by using a separate RelOptInfo e.g. pushing aggregates down > child relations or anything else. I'm still not convinced that all the fields of RelOptInfo (typically relids) need to be duplicated. If the current concept should be improved, I'd move all the grouping related fields to a separate structure, e.g. GroupPathInfo, and let RelOptInfo point to it. Similar to ParamPathInfo, which contains parameterization-specific information, GroupPathInfo would conain the grouping-specific information: target, row count, width, maybe path lists too. > > > >> 2. Try to push down aggregates based on the equivalence classes, where > >> grouping properties can be transferred from one relation to the other using > >> EC mechanism. > > > > I don't think the EC part should increase the patch complexity a lot. Unless I > > missed something, it's rather isolated to the part where target of the grouped > > paths is assembled. And I think it's important even for initial version of the > > patch. > > > >> This seems to require solving the problem of combining aggregates across the > >> relations. But there might be some usecases which could benefit without > >> solving this problem. > > > > If "combining aggregates ..." refers to joining grouped relations, then I > > insist on doing this in the initial version of the new feature too. Otherwise > > it'd only work if exactly one base relation of the query is grouped. > > No. "combining aggregates" refers to what aggtransmultifn does. But, > possibly that problem needs to be solved in the first step itself. ok. As the discussion goes on, I see that this part could be more useful than I originally thought. I'll consider it. -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at
> >> Yes, it's hard, but I think without having a separate RelOptInfo the >> design won't be complete. Is there a subset of problem that can be >> solved by using a separate RelOptInfo e.g. pushing aggregates down >> child relations or anything else. > > I'm still not convinced that all the fields of RelOptInfo (typically relids) > need to be duplicated. If the current concept should be improved, I'd move all > the grouping related fields to a separate structure, e.g. GroupPathInfo, and > let RelOptInfo point to it. Similar to ParamPathInfo, which contains > parameterization-specific information, GroupPathInfo would conain the > grouping-specific information: target, row count, width, maybe path lists too. > I didn't think about this option. Still not very clean, but may be acceptable. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On Thu, Jan 19, 2017 at 2:19 AM, Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote: >> Not all aggregates have TransValue * integer = newTransValue >> behaviour. Example is array_agg() or string_agg() has "TransValue >> concatenated integer time" behaviour. Or an aggregate "multiplying" >> values across rows will have TransValue (raised to) integer behaviour. >> Labelling all of those as "multi" doesn't look good. > > All aggregates that have (or can have) a combine function have it, because > in the worst case you can simply implement it by calling the combine > function repeatedly. +1. > Also, if you treat the combine function as "+" then the "multiply" function > is exactly what "*" is expected to do. So I find the naming quite > appropriate, actually. +1. > But I think naming of the function is not the most important aspect of the > patch, I believe. In the worst case, we can do s/multi/whatever/ sometime > later. Yeah. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On 20 January 2017 at 00:22, Antonin Houska <ah@cybertec.at> wrote: > Sorry, it was my thinko - I somehow confused David's CROSS JOIN example with > this one. If one side of the join clause is unique and the other becomes > unique due to aggregation (and if parallel processing is not engaged) then > neither combinefn nor multiplyfn should be necessary before the finalfn. Yes, if the join can be detected not to duplicate the groups then a normal aggregate node can be pushed below the join. No need for Partial Aggregate, or Finalize Aggregate nodes. I've a pending patch in the commitfest named "Unique Joins", which aims teach the planner about the unique properties of joins. So you should just have both stages of aggregation occur for now, and that can be improved on once the planner is a bit smart and knows about unique joins. -- David Rowley http://www.2ndQuadrant.com/PostgreSQL Development, 24x7 Support, Training & Services
David Rowley <david.rowley@2ndquadrant.com> wrote: > On 20 January 2017 at 00:22, Antonin Houska <ah@cybertec.at> wrote: > > Sorry, it was my thinko - I somehow confused David's CROSS JOIN example with > > this one. If one side of the join clause is unique and the other becomes > > unique due to aggregation (and if parallel processing is not engaged) then > > neither combinefn nor multiplyfn should be necessary before the finalfn. > > Yes, if the join can be detected not to duplicate the groups then a > normal aggregate node can be pushed below the join. No need for > Partial Aggregate, or Finalize Aggregate nodes. > > I've a pending patch in the commitfest named "Unique Joins", which > aims teach the planner about the unique properties of joins. So you > should just have both stages of aggregation occur for now, and that > can be improved on once the planner is a bit smart and knows about > unique joins. Thanks for a hint. I haven't paid attention to the "Unique Joins" patch until today. Yes, that's definitely useful. Given the progress of your patch, I don't worry to make the next version of my patch depend on it. Implementing temporary solution for the aggregation push-down seems to me like wasted effort. -- Antonin Houska Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de, http://www.cybertec.at