Re: Re: parallel distinct union and aggregate support patch - Mailing list pgsql-hackers

From Dilip Kumar
Subject Re: Re: parallel distinct union and aggregate support patch
Date
Msg-id CAFiTN-uD5kRae3qGFU4Pud8-4djh27z93z5Zm2fC6uzJE0bnyg@mail.gmail.com
Whole thread Raw
In response to Re: Re: parallel distinct union and aggregate support patch  (Dilip Kumar <dilipbalaut@gmail.com>)
List pgsql-hackers
On Sun, Nov 8, 2020 at 11:54 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Tue, Nov 3, 2020 at 6:06 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:
> > >
> > > > 1) It's better to always include the whole patch series - including the
> > > > parts that have not changed. Otherwise people have to scavenge the
> > > > thread and search for all the pieces, which may be a source of issues.
> > > > Also, it confuses the patch tester [1] which tries to apply patches from
> > > > a single message, so it will fail for this one.
> > >  Pathes 3 and 4 do not rely on 1 and 2 in code.
> > >  But, it will fail when you apply the apatches 3 and 4 directly, because
> > >  they are written after 1 and 2.
> > >  I can generate a new single patch if you need.
> > >
> > > > 2) I suggest you try to describe the goal of these patches, using some
> > > > example queries, explain output etc. Right now the reviewers have to
> > > > reverse engineer the patches and deduce what the intention was, which
> > > > may be causing unnecessary confusion etc. If this was my patch, I'd try
> > > > to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> > > > how the patch changes the query plan, showing speedup etc.
> > >  I written some example queries in to regress, include "unique" "union"
> > >  "group by" and "group by grouping sets".
> > >  here is my tests, they are not in regress
> > > ```sql
> > > begin;
> > > create table gtest(id integer, txt text);
> > > insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select
generate_series(1,10)id) t2;
 
> > > analyze gtest;
> > > commit;
> > > set jit = off;
> > > \timing on
> > > ```
> > > normal aggregate times
> > > ```
> > > set enable_batch_hashagg = off;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by txt;
> > >                                                  QUERY PLAN
> > > -------------------------------------------------------------------------------------------------------------
> > >  Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
> > >    Output: sum(id), txt
> > >    Group Key: gtest.txt
> > >    ->  Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
> > >          Output: txt, (PARTIAL sum(id))
> > >          Workers Planned: 2
> > >          Workers Launched: 2
> > >          ->  Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
> > >                Output: txt, (PARTIAL sum(id))
> > >                Sort Key: gtest.txt
> > >                Sort Method: external merge  Disk: 11608kB
> > >                Worker 0:  actual time=6447.665..7349.431 rows=317512 loops=1
> > >                  Sort Method: external merge  Disk: 10576kB
> > >                Worker 1:  actual time=6302.882..7061.157 rows=333301 loops=1
> > >                  Sort Method: external merge  Disk: 11112kB
> > >                ->  Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
> > >                      Output: txt, PARTIAL sum(id)
> > >                      Group Key: gtest.txt
> > >                      Batches: 17  Memory Usage: 4241kB  Disk Usage: 113152kB
> > >                      Worker 0:  actual time=2584.345..4486.407 rows=317512 loops=1
> > >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 101392kB
> > >                      Worker 1:  actual time=2584.369..4393.244 rows=333301 loops=1
> > >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 112832kB
> > >                      ->  Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
> > >                            Output: id, txt
> > >                            Worker 0:  actual time=0.104..607.146 rows=3174970 loops=1
> > >                            Worker 1:  actual time=0.100..603.951 rows=3332785 loops=1
> > >  Planning Time: 0.226 ms
> > >  Execution Time: 9021.058 ms
> > > (29 rows)
> > >
> > > Time: 9022.251 ms (00:09.022)
> > >
> > > set enable_batch_hashagg = on;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by txt;
> > >                                            QUERY PLAN
> > > -------------------------------------------------------------------------------------------------
> > >  Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
> > >    Output: (sum(id)), txt
> > >    Workers Planned: 2
> > >    Workers Launched: 2
> > >    ->  Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
> > >          Output: sum(id), txt
> > >          Group Key: gtest.txt
> > >          Worker 0:  actual time=3094.550..5486.992 rows=326082 loops=1
> > >          Worker 1:  actual time=3099.562..5480.111 rows=324729 loops=1
> > >          ->  Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
> > >                Output: id, txt
> > >                Worker 0:  actual time=0.080..646.053 rows=3057680 loops=1
> > >                Worker 1:  actual time=0.070..662.754 rows=3034370 loops=1
> > >  Planning Time: 0.243 ms
> > >  Execution Time: 5788.981 ms
> > > (15 rows)
> > >
> > > Time: 5790.143 ms (00:05.790)
> > > ```
> > >
> > > grouping sets times
> > > ```
> > > set enable_batch_hashagg = off;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by grouping sets(id,txt,());
> > >                                         QUERY PLAN
> > > ------------------------------------------------------------------------------------------
> > >  GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
> > >    Output: sum(id), txt, id
> > >    Group Key: gtest.id
> > >    Group Key: ()
> > >    Sort Key: gtest.txt
> > >      Group Key: gtest.txt
> > >    ->  Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
> > >          Output: txt, id
> > >          Sort Key: gtest.id
> > >          Sort Method: external merge  Disk: 254056kB
> > >          ->  Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
> > >                Output: txt, id
> > >  Planning Time: 0.230 ms
> > >  Execution Time: 39203.883 ms
> > > (14 rows)
> > >
> > > Time: 39205.339 ms (00:39.205)
> > >
> > > set enable_batch_hashagg = on;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by grouping sets(id,txt,());
> > >                                            QUERY PLAN
> > > -------------------------------------------------------------------------------------------------
> > >  Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
> > >    Output: (sum(id)), txt, id
> > >    Workers Planned: 2
> > >    Workers Launched: 2
> > >    ->  Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
> > >          Output: sum(id), txt, id
> > >          Group Key: gtest.id
> > >          Group Key: ()
> > >          Group Key: gtest.txt
> > >          Worker 0:  actual time=5916.370..14062.461 rows=513810 loops=1
> > >          Worker 1:  actual time=5916.037..13932.847 rows=775901 loops=1
> > >          ->  Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
> > >                Output: id, txt
> > >                Worker 0:  actual time=0.052..690.955 rows=3349990 loops=1
> > >                Worker 1:  actual time=0.050..691.595 rows=3297070 loops=1
> > >  Planning Time: 0.157 ms
> > >  Execution Time: 14598.416 ms
> > > (17 rows)
> > >
> > > Time: 14599.437 ms (00:14.599)
> > > ```
> >
> > I have done some performance testing with TPCH to see the impact on
> > the different query plan,  I could see there are a lot of plan changes
> > across various queries but out of those, there are few queries where
> > these patches gave noticeable gain query13 and query17 (I have
> > attached the plan for these 2 queries).
> >
> > Test details:
> > ----------------
> > TPCH scale factor 50 (database size 112GB)
> > work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4
> >
> > Machine information:
> > Architecture:          x86_64
> > CPU(s):                56
> > Thread(s) per core:    2
> > Core(s) per socket:    14
> > Socket(s):             2
> > NUMA node(s):          2
> > Model name:            Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz
> >
> > Observation:
> > In the TPCH test, I have noticed that the major gain we are getting in
> > this patch is because we are able to use the parallelism where we were
> > not able to use due to the limitation of the parallel aggregate.
> > Basically, for computing final aggregated results we need to break the
> > parallelism because the worker is only performing the partial
> > aggregate and after that, we had to gather all the partially
> > aggregated results and do the finalize aggregate.  Now, with this
> > patch, since we are batching the results we are able to compute the
> > final aggregate within the workers itself and that enables us to get
> > the parallelism in more cases.
> >
> > Example:
> > If we observe the output of plan 13(13.explain_head.out), the subquery
> > is performing the aggregate and the outer query is doing the grouping
> > on the aggregated value of the subquery, due to this we are not
> > selecting the parallelism in the head because in the inner aggregation
> > the number of groups is huge and if we select the parallelism we need
> > to transfer a lot of tuple through the tuple queue and we will also
> > have to serialize/deserialize those many transition values.  And the
> > outer query needs the final aggregated results from the inner query so
> > we can not select the parallelism.  Now with the batch
> > aggregate(13.explain_patch.out), we are able to compute the finalize
> > aggregation within the workers itself and that enabled us to continue
> > the parallelism till the top node.  The execution time for this query
> > is now reduced to 57sec from 238sec which is 4X faster.
> >
> > I will perform some more tests with different scale factors and
> > analyze the behavior of this.
>
> I have started reviewing these patches, I have a couple of review comments.
>
> Some general comment to make code more readable
>
> 1. Comments are missing in the patch, even there are no function
> header comments to explain the overall idea about the function.
>    I think adding comments will make it easier to review the patch.
>
> 2. Code is not written as per the Postgres coding guideline, the
> common problems observed with the patch are
>   a) There should be an empty line after the variable declaration section
>   b) In the function definition, the function return type and the
> function name should not be in the same line
>
> Change
>
> +static bool ExecNextParallelBatchSort(BatchSortState *state)
> {
> }
> to
> static bool
> ExecNextParallelBatchSort(BatchSortState *state)
> {
> }
>
> c) While typecasting the variable the spacing is not used properly and
> uniformly, you can refer to other code and fix it.
>
> *Specific comments to patch 0001*
>
> 1.
> +#define BATCH_SORT_MAX_BATCHES 512
>
> Did you decide this number based on some experiment or is there some
> analysis behind selecting this number?
>
> 2.
> +BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
> +{
> + BatchSortState *state;
> + TypeCacheEntry *typentry;
> ....
> + for (i=0;i<node->numGroupCols;++i)
> + {
> ...
> + InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
> + fcinfo->args[0].isnull = false;
> + state->groupFuns = lappend(state->groupFuns, fcinfo);
> + }
>
> From the variable naming, it appeared like the batch sort is dependent
> upon the grouping node.  I think instead of using the name
> numGroupCols and groupFuns we need to use names that are more relevant
> to the batch sort something like numSortKey.
>
> 3.
> + if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
> + {
> + /* for now, we only using in group aggregate */
> + ereport(ERROR,
> + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> + errmsg("not support execute flag(s) %d for group sort", eflags)));
> + }
>
> Instead of ereport, you should just put an Assert for the unsupported
> flag or elog.
>
> 4.
> + state = makeNode(BatchSortState);
> + state->ps.plan = (Plan*) node;
> + state->ps.state = estate;
> + state->ps.ExecProcNode = ExecBatchSortPrepare;
>
> I think the main executor entry function should be named ExecBatchSort
> instead of ExecBatchSortPrepare, it will look more consistent with the
> other executor machinery.

1.
+void cost_batchsort(Path *path, PlannerInfo *root,
+                    List *batchkeys, Cost input_cost,
+                    double tuples, int width,
+                    Cost comparison_cost, int sort_mem,
+                    uint32 numGroupCols, uint32 numBatches)
+{
+    Cost        startup_cost = input_cost;
+    Cost        run_cost = 0;
+    double        input_bytes = relation_byte_size(tuples, width);
+    double        batch_bytes = input_bytes / numBatches;
+    double        batch_tuples = tuples / numBatches;
+    long        sort_mem_bytes = sort_mem * 1024L;
+
+    if (sort_mem_bytes < (64*1024))
+        sort_mem_bytes = (64*1024);
+
+    if (!enable_batch_sort)
+        startup_cost += disable_cost;

You don't need to write a duplicate function for this, you can reuse
the cost_tuplesort function with some minor changes.


2. I have one more suggestion, currently, the batches are picked by
workers dynamically and the benefit of that is the work distribution
is quite flexible.  But one downside I see with this approach is that
if we want to make this parallelism to the upper node for example
merge join, therein we can imagine the merge join with both side nodes
as BatchSort.  But the problem is if the worker picks the batch
dynamically then the worker need to pick the same batch on both sides
so for that the right side node should be aware of what batch got
picked on the left side node so for doing that we might have to
introduce a different join node say BatchWiseMergeJoin.  Whereas if we
make the batches as per the worker number then each sort node can be
processed independently without knowing what is happening on the other
side.

3. I have also done some performance tests especially with the small
group size, basically, the cases where parallel aggregate is not
picked due to the small group size, and with the new patch the
parallel aggregate is possible now.

Setup:  I have used TPCH database with S.F 50 and executed an
aggregation query on the ORDER table

Number of rows in order table: 75000000
Total table size: 18 GB

Work_mem: 10GB

postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

                                                              QUERY
PLAN

--------------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=2506201.00..2570706.04 rows=5160403 width=40)
(actual time=94002.681..98733.002 rows=4999889 loops=1)
   Output: sum(o_totalprice), o_custkey
   Group Key: orders.o_custkey
   Batches: 1  Memory Usage: 2228241kB
   ->  Seq Scan on public.orders  (cost=0.00..2131201.00 rows=75000000
width=16) (actual time=0.042..12930.981 rows=75000000 loops=1)
         Output: o_orderkey, o_custkey, o_orderstatus, o_totalprice,
o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment
 Planning Time: 0.317 ms
 Execution Time: 99230.242 ms


postgres=# set enable_batch_sort=on;
SET
postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

 QUERY PLAN


-------------------------------------------------------------------------------------------------------------------------------------------------
---------
 Gather  (cost=1616576.00..1761358.55 rows=40316 width=40) (actual
time=18516.549..28811.164 rows=4999889 loops=1)
   Output: (sum(o_totalprice)), o_custkey
   Workers Planned: 4
   Workers Launched: 4
   ->  GroupAggregate  (cost=1615576.00..1756326.99 rows=10079
width=40) (actual time=18506.051..28131.650 rows=999978 loops=5)
         Output: sum(o_totalprice), o_custkey
         Group Key: orders.o_custkey
         Worker 0:  actual time=18502.746..28406.868 rows=995092 loops=1
         Worker 1:  actual time=18502.339..28518.559 rows=1114511 loops=1
         Worker 2:  actual time=18503.233..28461.975 rows=985574 loops=1
         Worker 3:  actual time=18506.026..28409.130 rows=1005414 loops=1
         ->  Parallel BatchSort  (cost=1615576.00..1662451.00
rows=18750000 width=16) (actual time=18505.982..21839.567
rows=15000000 loops=5)
               Output: o_custkey, o_totalprice
               Sort Key: orders.o_custkey
               batches: 512
               Worker 0:  actual time=18502.666..21945.442 rows=14925544 loops=1
               Worker 1:  actual time=18502.270..21979.350 rows=16714443 loops=1
               Worker 2:  actual time=18503.144..21933.151 rows=14784292 loops=1
               Worker 3:  actual time=18505.950..21943.312 rows=15081559 loops=1
               ->  Parallel Seq Scan on public.orders
(cost=0.00..1568701.00 rows=18750000 width=16) (actual
time=0.082..4662.390 rows=15000000
loops=5)
                     Output: o_custkey, o_totalprice
                     Worker 0:  actual time=0.079..4720.424
rows=15012981 loops=1
                     Worker 1:  actual time=0.083..4710.919
rows=15675399 loops=1
                     Worker 2:  actual time=0.082..4663.096
rows=14558663 loops=1
                     Worker 3:  actual time=0.104..4625.940
rows=14496910 loops=1
 Planning Time: 0.281 ms
 Execution Time: 29504.248 ms


postgres=# set enable_batch_hashagg =on;
postgres=# set enable_batch_sort=off;
postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

QUERY PLAN


-------------------------------------------------------------------------------------------------------------------------------------------------
---
 Gather  (cost=1755004.00..2287170.56 rows=5160403 width=40) (actual
time=12935.338..27064.962 rows=4999889 loops=1)
   Output: (sum(o_totalprice)), o_custkey
   Workers Planned: 4
   Workers Launched: 4
   ->  Parallel BatchHashAggregate  (cost=1754004.00..1770130.26
rows=1290101 width=40) (actual time=12987.830..24726.348 rows=999978
loops=5)
         Output: sum(o_totalprice), o_custkey
         Group Key: orders.o_custkey
         Worker 0:  actual time=13013.228..25078.902 rows=999277 loops=1
         Worker 1:  actual time=12917.375..25456.751 rows=1100607 loops=1
         Worker 2:  actual time=13041.088..24022.445 rows=900562 loops=1
         Worker 3:  actual time=13032.732..25230.101 rows=1001386 loops=1
         ->  Parallel Seq Scan on public.orders
(cost=0.00..1568701.00 rows=18750000 width=16) (actual
time=0.059..2764.881 rows=15000000 loops=
5)
               Output: o_orderkey, o_custkey, o_orderstatus,
o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority,
o_comment
               Worker 0:  actual time=0.056..2754.621 rows=14924063 loops=1
               Worker 1:  actual time=0.063..2815.688 rows=16241825 loops=1
               Worker 2:  actual time=0.067..2750.927 rows=14064529 loops=1
               Worker 3:  actual time=0.055..2753.620 rows=14699841 loops=1
 Planning Time: 0.209 ms
 Execution Time: 27728.363 ms
(19 rows)


I think both parallel batch-wise grouping aggregate and the batch-wise
hash aggregate are giving very huge improvement when the typical group
size is small.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: Amit Kapila
Date:
Subject: Re: [HACKERS] logical decoding of two-phase transactions
Next
From: Simon Riggs
Date:
Subject: Re: VACUUM (DISABLE_PAGE_SKIPPING on)