Re: [HACKERS] PoC: Grouped base relation - Mailing list pgsql-hackers

From Antonin Houska
Subject Re: [HACKERS] PoC: Grouped base relation
Date
Msg-id 12311.1484325743@localhost
Whole thread Raw
In response to Re: [HACKERS] PoC: Grouped base relation  (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>)
Responses Re: [HACKERS] PoC: Grouped base relation
List pgsql-hackers
Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote:
> On Mon, Jan 9, 2017 at 11:26 PM, Antonin Houska <ah@cybertec.at> wrote:
> > Attached is a draft patch that lets partial aggregation happen at base
> > relation level. If the relations contain relatively small number of groups,
> > the number of input rows of the aggregation at the query level can be reduced
> > this way.  Also, if append relation and postgres_fdw planning is enhanced
> > accordingly, patch like this can let us aggregate individual tables on remote
> > servers (e.g. shard nodes) and thus reduce the amount of rows subject to the
> > final aggregation.

> For an appendrel probably you need an ability to switch group->append into
> append->group

Yes, like the new patch version (see attachment) does:

postgres=# EXPLAIN (COSTS false) SELECT b.j, sum(a.x) FROM a JOIN b ON a.i = b.j GROUP BY b.j;
                      QUERY PLAN
------------------------------------------------------
 Finalize HashAggregate
   Group Key: b.j
   ->  Hash Join
         Hash Cond: (a.i = b.j)
         ->  Append
               ->  Partial HashAggregate
                     Group Key: a.i
                     ->  Seq Scan on a
               ->  Partial HashAggregate
                     Group Key: a_1.i
                     ->  Seq Scan on a_1
               ->  Partial HashAggregate
                     Group Key: a_2.i
                     ->  Seq Scan on a_2
         ->  Hash
               ->  Gather
                     Workers Planned: 1
                     ->  Partial HashAggregate
                           Group Key: b.j
                           ->  Parallel Seq Scan on b

> For postgres_fdw, we already support aggregate pushdown.

My understanding is that currently it only works if the whole query can be
evaluated by the FDW. What I try to do is to push down aggregation of
individual table, and join the partially-aggregated set with other tables,
which are not necessarily remote or reside on different remote server.

> But we don't support fetching partial aggregates from foreign server. What
> other enhancements do you need?

Here I try to introduce infrastructure for aggregation pushdown and
propagation of the transient aggregate state values from base relations to the
final join. postgres_fdw can benefit from it but it's not the only use case,
so I'd prefer adjusting it in a separate patch.

Yes, an outstanding problem is that the remote nodes need to return transient
state values - probably using bytea type. I think this functionality should
even be separate from postgres_fdw (e.g. a new contrib module?), because the
remote nodes do not need postgres_fdw.

> > A few "footnotes":
> >
> >
> > As for the example, the processing continues by joining the partially grouped
> > sets:
> >
> >  i | sum(x)| count(i.*) | j | count(j.*)
> > ----------------------------------------
> >  1 |     7 |       2    | 1 |       2

[ Sorry, count(j.*) should be 2, not 3 as I wrote in the initial email. ]

> >
> >
> > Before performing the final aggregation, we need to multiply sum(a.x) by
> > count(j.*) because w/o the aggregation at base relation level the input
> > of the query-level aggregation would look like
> >
> >  a.i | a.x | b.j
> > ----------------
> >  1   |   3 |  1
> >  1   |   4 |  1
> >  1   |   3 |  1
> >  1   |   4 |  1
> >
> > In other words, grouping of the base relation "b" below the join prevents the
> > join from bringing per-group input set to the aggregate input multiple
> > times. To compensate for this effect, I've added a new field "aggtransmultifn"
> > to pg_aggregate catalog. It "multiplies" the aggregate state w/o repeated
> > processing of the same input set many times. sum() is an example of an
> > aggregate that needs such processing, avg() is one that does not.
>
> For something like product aggregation, where the product (or higher order
> operations) across rows is accumulated instead of sum, mere multiplication
> wouldn't help. We will need some higher order operation to "extrapolate" the
> result based on count(j.*). In fact, the multiplication factor will depend
> upon the number of relations being joined E.g.  select b.j, sum(a.x) where
> a, b, c where a.i = b.j and a.i = c.k group by b.j

Maybe you're saying what I already try to do. Let me modify the example
accordingly (unlike the initial example, each table produces a group of
different size so the the count() values are harder to confuse):

Table "a":

 i | x
-------
 1 | 3
 1 | 4

Table "b":

 j | y
-------
 1 | 10
 1 | 20
 1 | 30

Table "c":

 k | z
---------
 1 | 100
 1 | 200
 1 | 300
 1 | 400

Your query:

SELECT b.j, sum(a.x)
FROM a, b, c
WHERE a.i = b.j AND a.i = c.k
GROUP BY b.j

The tables grouped:

 i | sum(a.x) | count(a.*)
---+-----------------------
 1 |        7 |          2

 j | sum(b.y) | count(b.*)
---+-----------------------
 1 |       60 |          3

 j | sum(c.z) | count(c.*)
---+-----------------------
 1 |     1000 |          4

Grouped (partially) and joined (table names omitted somewhere so that the
table width does not exceed 80 characters):

 i | sum(x) | count(a.*) | j | sum(y) | count(b.*) | k | sum(z) | count(c.*)
---+-------------------------------------------------------------------------
 1 |      7 |          2 | 1 |     60 |          3 | k |   1000 |          4

For sum(a.x), the input for final aggregation is the partial sum(x) multiplied
by the count(b.*) and also by count(c.*), because grouping of both "b" and "c"
reduced the number of times the input values of 3 and 4 arrived to the
aggregate node. Thus 7 * 3 * 4 = 84.

Likewise, sum(y) * count(a.*) * count(c.*) = 60 * 2 * 4 = 480

and finally, sum(z) * count(a.*) * count(b.*) = 1000 * 2 * 3 = 6000

I get exactly these values when I run your query on master branch w/o my
patch, so my theory could be correct :-)

> May be we want to implement this technique without partial aggregation first
> i.e. push down aggregation and grouping down the join tree and then add
> partial aggregation steps. That might make it easy to review.  Looking at
> the changes in create_plain_partial_paths(), it looks like we use this
> technique only in case of parallel query. I think the technique is useful
> otherwise as well.

The term "partial" is actually ambiguous. Sometimes it refers to "partial
path", i.e. path that can be executed by multiple workers, sometimes "partial
aggregate", i.e. aggregate that produces the transient state instead of the
final values.

The fact that partial (i.e. parallel) path was necessary to push aggregation
down was rather a limitation of the initial version of the patch. The current
version can push the aggregation down even w/o parallelism - see
create_plain_partial_paths() -> create_plain_grouped_path().

> Also, if we could generalize this technique to push aggregation/grouping
> upto any relation (not just base but join as well) where it can be
> calculated that may be better. Trying that might lead us to a better design;
> which right now is focused only on base relations.

Yes, that's true. Currently this patch is unable to process queries where an
aggregate references multiple tables. It should be possible to apply the
"grouped path" to a join.

> >
> > BTW, if anyone wants to play with the current version:
> >
> > 1. Don't forget to initialize a new cluster (initdb) first. I decided not to
> > bump CATALOG_VERSION_NO so far because it'd probably make the patch
> > incompatible with master branch quite soon.
> >
> > 2. Only hash aggregation is implemented so far at the base relation level.
> >
> > 3. As for sum() aggregate, only sum(float4) is supposed to work correctly so
> > far - this is related to the pg_aggregate changes mentioned above. avg()
> > should work in general, and I didn't care about the other ones so far.
> >
> > 4. As for joins, only hash join is able to process the grouped relations. I
> > didn't want to do too much coding until there's a consensus on the design.
> >

> Probably it's too early to review code, but ...

Thank's for doing so anyway!

> +     /*
> +      * If no join is expected, aggregation at base relation level makes no
> +      * sense. XXX Is there simpler way to find out? (We're not interested in
> +      * RELOPT_OTHER_MEMBER_REL, so simple_rel_array_size does not help.)
> +      */
> +     for (i = 1; i < root->simple_rel_array_size; i++)
> +     {
> +         RelOptInfo *rel;
> +
> +         rel = find_base_rel(root, i);
> +         if (rel->reloptkind == RELOPT_BASEREL)
> +         {
> +             nbaserels++;
> +             /*
> +              * We only want to know whether the number of relations is greater
> +              * than one.
> +              */
> +             if (nbaserels > 1)
> +                 break;
> +         }
> +     }

> You might want to check bms_membership(root->all_baserels), instead of
> this loop.

I liked this idea, but when tried it, I found out that all_baserels gets
initialized much later. And when moved the initialization to
add_base_rels_to_query I found out that the result is not the same
(RELOPT_DEADREL relations) make difference. Thanks for your suggestion anyway.

--
Antonin Houska
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt
Web: http://www.postgresql-support.de, http://www.cybertec.at


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

pgsql-hackers by date:

Previous
From: Alvaro Herrera
Date:
Subject: Re: [HACKERS] Adding type info etc for inheritance errmsg: "childtable is missing column ..."
Next
From: Kevin Grittner
Date:
Subject: Re: [HACKERS] Patch to implement pg_current_logfile() function