Re: Re: parallel distinct union and aggregate support patch - Mailing list pgsql-hackers

From Dilip Kumar
Subject Re: Re: parallel distinct union and aggregate support patch
Date
Msg-id CAFiTN-vpk-uxBw8pOfG5so8Jtra_vqL8c_AxsfKcRCz+d0TCDQ@mail.gmail.com
Whole thread Raw
In response to Re: Re: parallel distinct union and aggregate support patch  (Dilip Kumar <dilipbalaut@gmail.com>)
Responses Re: Re: parallel distinct union and aggregate support patch  (Dilip Kumar <dilipbalaut@gmail.com>)
Re: parallel distinct union and aggregate support patch  (Heikki Linnakangas <hlinnaka@iki.fi>)
List pgsql-hackers
On Tue, Nov 3, 2020 at 6:06 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:
> >
> > > 1) It's better to always include the whole patch series - including the
> > > parts that have not changed. Otherwise people have to scavenge the
> > > thread and search for all the pieces, which may be a source of issues.
> > > Also, it confuses the patch tester [1] which tries to apply patches from
> > > a single message, so it will fail for this one.
> >  Pathes 3 and 4 do not rely on 1 and 2 in code.
> >  But, it will fail when you apply the apatches 3 and 4 directly, because
> >  they are written after 1 and 2.
> >  I can generate a new single patch if you need.
> >
> > > 2) I suggest you try to describe the goal of these patches, using some
> > > example queries, explain output etc. Right now the reviewers have to
> > > reverse engineer the patches and deduce what the intention was, which
> > > may be causing unnecessary confusion etc. If this was my patch, I'd try
> > > to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> > > how the patch changes the query plan, showing speedup etc.
> >  I written some example queries in to regress, include "unique" "union"
> >  "group by" and "group by grouping sets".
> >  here is my tests, they are not in regress
> > ```sql
> > begin;
> > create table gtest(id integer, txt text);
> > insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select
generate_series(1,10)id) t2;
 
> > analyze gtest;
> > commit;
> > set jit = off;
> > \timing on
> > ```
> > normal aggregate times
> > ```
> > set enable_batch_hashagg = off;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by txt;
> >                                                  QUERY PLAN
> > -------------------------------------------------------------------------------------------------------------
> >  Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
> >    Output: sum(id), txt
> >    Group Key: gtest.txt
> >    ->  Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
> >          Output: txt, (PARTIAL sum(id))
> >          Workers Planned: 2
> >          Workers Launched: 2
> >          ->  Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
> >                Output: txt, (PARTIAL sum(id))
> >                Sort Key: gtest.txt
> >                Sort Method: external merge  Disk: 11608kB
> >                Worker 0:  actual time=6447.665..7349.431 rows=317512 loops=1
> >                  Sort Method: external merge  Disk: 10576kB
> >                Worker 1:  actual time=6302.882..7061.157 rows=333301 loops=1
> >                  Sort Method: external merge  Disk: 11112kB
> >                ->  Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
> >                      Output: txt, PARTIAL sum(id)
> >                      Group Key: gtest.txt
> >                      Batches: 17  Memory Usage: 4241kB  Disk Usage: 113152kB
> >                      Worker 0:  actual time=2584.345..4486.407 rows=317512 loops=1
> >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 101392kB
> >                      Worker 1:  actual time=2584.369..4393.244 rows=333301 loops=1
> >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 112832kB
> >                      ->  Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
> >                            Output: id, txt
> >                            Worker 0:  actual time=0.104..607.146 rows=3174970 loops=1
> >                            Worker 1:  actual time=0.100..603.951 rows=3332785 loops=1
> >  Planning Time: 0.226 ms
> >  Execution Time: 9021.058 ms
> > (29 rows)
> >
> > Time: 9022.251 ms (00:09.022)
> >
> > set enable_batch_hashagg = on;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by txt;
> >                                            QUERY PLAN
> > -------------------------------------------------------------------------------------------------
> >  Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
> >    Output: (sum(id)), txt
> >    Workers Planned: 2
> >    Workers Launched: 2
> >    ->  Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
> >          Output: sum(id), txt
> >          Group Key: gtest.txt
> >          Worker 0:  actual time=3094.550..5486.992 rows=326082 loops=1
> >          Worker 1:  actual time=3099.562..5480.111 rows=324729 loops=1
> >          ->  Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
> >                Output: id, txt
> >                Worker 0:  actual time=0.080..646.053 rows=3057680 loops=1
> >                Worker 1:  actual time=0.070..662.754 rows=3034370 loops=1
> >  Planning Time: 0.243 ms
> >  Execution Time: 5788.981 ms
> > (15 rows)
> >
> > Time: 5790.143 ms (00:05.790)
> > ```
> >
> > grouping sets times
> > ```
> > set enable_batch_hashagg = off;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by grouping sets(id,txt,());
> >                                         QUERY PLAN
> > ------------------------------------------------------------------------------------------
> >  GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
> >    Output: sum(id), txt, id
> >    Group Key: gtest.id
> >    Group Key: ()
> >    Sort Key: gtest.txt
> >      Group Key: gtest.txt
> >    ->  Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
> >          Output: txt, id
> >          Sort Key: gtest.id
> >          Sort Method: external merge  Disk: 254056kB
> >          ->  Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
> >                Output: txt, id
> >  Planning Time: 0.230 ms
> >  Execution Time: 39203.883 ms
> > (14 rows)
> >
> > Time: 39205.339 ms (00:39.205)
> >
> > set enable_batch_hashagg = on;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by grouping sets(id,txt,());
> >                                            QUERY PLAN
> > -------------------------------------------------------------------------------------------------
> >  Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
> >    Output: (sum(id)), txt, id
> >    Workers Planned: 2
> >    Workers Launched: 2
> >    ->  Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
> >          Output: sum(id), txt, id
> >          Group Key: gtest.id
> >          Group Key: ()
> >          Group Key: gtest.txt
> >          Worker 0:  actual time=5916.370..14062.461 rows=513810 loops=1
> >          Worker 1:  actual time=5916.037..13932.847 rows=775901 loops=1
> >          ->  Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
> >                Output: id, txt
> >                Worker 0:  actual time=0.052..690.955 rows=3349990 loops=1
> >                Worker 1:  actual time=0.050..691.595 rows=3297070 loops=1
> >  Planning Time: 0.157 ms
> >  Execution Time: 14598.416 ms
> > (17 rows)
> >
> > Time: 14599.437 ms (00:14.599)
> > ```
>
> I have done some performance testing with TPCH to see the impact on
> the different query plan,  I could see there are a lot of plan changes
> across various queries but out of those, there are few queries where
> these patches gave noticeable gain query13 and query17 (I have
> attached the plan for these 2 queries).
>
> Test details:
> ----------------
> TPCH scale factor 50 (database size 112GB)
> work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4
>
> Machine information:
> Architecture:          x86_64
> CPU(s):                56
> Thread(s) per core:    2
> Core(s) per socket:    14
> Socket(s):             2
> NUMA node(s):          2
> Model name:            Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz
>
> Observation:
> In the TPCH test, I have noticed that the major gain we are getting in
> this patch is because we are able to use the parallelism where we were
> not able to use due to the limitation of the parallel aggregate.
> Basically, for computing final aggregated results we need to break the
> parallelism because the worker is only performing the partial
> aggregate and after that, we had to gather all the partially
> aggregated results and do the finalize aggregate.  Now, with this
> patch, since we are batching the results we are able to compute the
> final aggregate within the workers itself and that enables us to get
> the parallelism in more cases.
>
> Example:
> If we observe the output of plan 13(13.explain_head.out), the subquery
> is performing the aggregate and the outer query is doing the grouping
> on the aggregated value of the subquery, due to this we are not
> selecting the parallelism in the head because in the inner aggregation
> the number of groups is huge and if we select the parallelism we need
> to transfer a lot of tuple through the tuple queue and we will also
> have to serialize/deserialize those many transition values.  And the
> outer query needs the final aggregated results from the inner query so
> we can not select the parallelism.  Now with the batch
> aggregate(13.explain_patch.out), we are able to compute the finalize
> aggregation within the workers itself and that enabled us to continue
> the parallelism till the top node.  The execution time for this query
> is now reduced to 57sec from 238sec which is 4X faster.
>
> I will perform some more tests with different scale factors and
> analyze the behavior of this.

I have started reviewing these patches, I have a couple of review comments.

Some general comment to make code more readable

1. Comments are missing in the patch, even there are no function
header comments to explain the overall idea about the function.
   I think adding comments will make it easier to review the patch.

2. Code is not written as per the Postgres coding guideline, the
common problems observed with the patch are
  a) There should be an empty line after the variable declaration section
  b) In the function definition, the function return type and the
function name should not be in the same line

Change

+static bool ExecNextParallelBatchSort(BatchSortState *state)
{
}
to
static bool
ExecNextParallelBatchSort(BatchSortState *state)
{
}

c) While typecasting the variable the spacing is not used properly and
uniformly, you can refer to other code and fix it.

*Specific comments to patch 0001*

1.
+#define BATCH_SORT_MAX_BATCHES 512

Did you decide this number based on some experiment or is there some
analysis behind selecting this number?

2.
+BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
+{
+ BatchSortState *state;
+ TypeCacheEntry *typentry;
....
+ for (i=0;i<node->numGroupCols;++i)
+ {
...
+ InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
+ fcinfo->args[0].isnull = false;
+ state->groupFuns = lappend(state->groupFuns, fcinfo);
+ }

From the variable naming, it appeared like the batch sort is dependent
upon the grouping node.  I think instead of using the name
numGroupCols and groupFuns we need to use names that are more relevant
to the batch sort something like numSortKey.

3.
+ if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
+ {
+ /* for now, we only using in group aggregate */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("not support execute flag(s) %d for group sort", eflags)));
+ }

Instead of ereport, you should just put an Assert for the unsupported
flag or elog.

4.
+ state = makeNode(BatchSortState);
+ state->ps.plan = (Plan*) node;
+ state->ps.state = estate;
+ state->ps.ExecProcNode = ExecBatchSortPrepare;

I think the main executor entry function should be named ExecBatchSort
instead of ExecBatchSortPrepare, it will look more consistent with the
other executor machinery.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: Transactions involving multiple postgres foreign servers, take 2
Next
From: Fabien COELHO
Date:
Subject: Re: pgbench stopped supporting large number of client connections on Windows