Thread: parallel distinct union and aggregate support patch

parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
Hi hackers,
I write a path for soupport parallel distinct, union and aggregate using batch sort.
steps:
 1. generate hash value for group clauses values, and using mod hash value save to batch
 2. end of outer plan, wait all other workers finish write to batch
 3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
 4. return row for this batch
 5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

path 2 for parallel aggregate, this is a simple use
but regress failed for partitionwise aggregation difference plan
from GatherMerge->Sort->Append->...
to  Sort->Gahter->Append->...
I have no idea how to modify it.

Same idea I writed a batch shared tuple store for HashAgg in our PG version, I will send patch for PG14 when I finish it.

The following is a description in Chinese
英语不好,所以这里写点中文,希望上面写的不对的地方请大家帮忙纠正一下。
BatchSort的工作原理
 1. 先按group clause计算出hash值,并按取模的值放入不同的批次
 2. 当下层plan返回所有的行后,等待所有其它的工作进程结束
 3. 每一个工作进程索取一个唯一的一个批次, 并调用tuplesort_performsort()函数完成最终排序
 4. 返回本批次的所有行
 5. 如果所有的批次没有读完,则返回第3步
BatchSort plan能保证相同的数据(按分给表达式)在同一个周期内返回,所以几个去重和分组相关的plan可以正常工作。
第2个补丁是支持并行分组的,只做一次分组,而不是并行进程做每一次分组后,主进程再进行二次分组。
这个补丁导致了regress测试中的partitionwise aggregation失败,原来的执行计划有所变更。
补丁只写了一个简单的使用BatchSort plan的方法,可能还需要添加其它用法。

用同样的思想我写了一个使用shared tuple store的HashAgg在我们的AntDB版本中(最新版本暂未开源),适配完PG14版本后我会发出来。
打个广告:欢迎关注我们亚信公司基于PG的分布式数据库产品AntDB,开源地址 https://github.com/ADBSQL/AntDB

bucoo@sohu.com
Attachment

Re: parallel distinct union and aggregate support patch

From
Thomas Munro
Date:
On Tue, Oct 20, 2020 at 3:49 AM bucoo@sohu.com <bucoo@sohu.com> wrote:
> I write a path for soupport parallel distinct, union and aggregate using batch sort.
> steps:
>  1. generate hash value for group clauses values, and using mod hash value save to batch
>  2. end of outer plan, wait all other workers finish write to batch
>  3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
>  4. return row for this batch
>  5. if not end of all batchs, got step 3
>
> BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

Hi!

Interesting work!  In the past a few people have speculated about a
Parallel Repartition operator that could partition tuples a bit like
this, so that each process gets a different set of partitions.  Here
you combine that with a sort.  By doing both things in one node, you
avoid a lot of overheads (writing into a tuplestore once in the
repartitioning node, and then once again in the sort node, with tuples
being copied one-by-one between the two nodes).

If I understood correctly, the tuples emitted by Parallel Batch Sort
in each process are ordered by (hash(key, ...) % npartitions, key,
...), but the path is claiming to be ordered by (key, ...), no?
That's enough for Unique and Aggregate to give the correct answer,
because they really only require equal keys to be consecutive (and in
the same process), but maybe some other plan could break?



Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Mon, Oct 19, 2020 at 8:19 PM bucoo@sohu.com <bucoo@sohu.com> wrote:
>
> Hi hackers,
> I write a path for soupport parallel distinct, union and aggregate using batch sort.
> steps:
>  1. generate hash value for group clauses values, and using mod hash value save to batch
>  2. end of outer plan, wait all other workers finish write to batch
>  3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
>  4. return row for this batch
>  5. if not end of all batchs, got step 3
>
> BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation.  I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch.  By
doing this way, we can avoid doing large sorts.  And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
> If I understood correctly, the tuples emitted by Parallel Batch Sort
> in each process are ordered by (hash(key, ...) % npartitions, key,
> ...), but the path is claiming to be ordered by (key, ...), no?
> That's enough for Unique and Aggregate to give the correct answer,
> because they really only require equal keys to be consecutive (and in
> the same process), but maybe some other plan could break?

The path not claiming to be ordered by (key, ...), the path save PathKey(s) in BatchSortPath::batchkeys, not Path::pathkeys.
I don't understand "but maybe some other plan could break", mean some on path using this path? no, BathSortPath on for some special path(Unique, GroupAgg ...).


bucoo@sohu.com
 
Date: 2020-10-21 12:27
Subject: Re: parallel distinct union and aggregate support patch
On Tue, Oct 20, 2020 at 3:49 AM bucoo@sohu.com <bucoo@sohu.com> wrote:
> I write a path for soupport parallel distinct, union and aggregate using batch sort.
> steps:
>  1. generate hash value for group clauses values, and using mod hash value save to batch
>  2. end of outer plan, wait all other workers finish write to batch
>  3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
>  4. return row for this batch
>  5. if not end of all batchs, got step 3
>
> BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.
 
Hi!
 
Interesting work!  In the past a few people have speculated about a
Parallel Repartition operator that could partition tuples a bit like
this, so that each process gets a different set of partitions.  Here
you combine that with a sort.  By doing both things in one node, you
avoid a lot of overheads (writing into a tuplestore once in the
repartitioning node, and then once again in the sort node, with tuples
being copied one-by-one between the two nodes).
 
If I understood correctly, the tuples emitted by Parallel Batch Sort
in each process are ordered by (hash(key, ...) % npartitions, key,
...), but the path is claiming to be ordered by (key, ...), no?
That's enough for Unique and Aggregate to give the correct answer,
because they really only require equal keys to be consecutive (and in
the same process), but maybe some other plan could break?

Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
> Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> will directly put it into the respective batch(shared tuple store),
> based on the hash on grouping column and once all the workers are
> doing preparing the batch then each worker will pick those baches one
> by one, perform sort and finish the aggregation.  I think there is a
> scope of improvement that instead of directly putting the tuple to the
> batch what if the worker does the partial aggregations and then it
> places the partially aggregated rows in the shared tuple store based
> on the hash value and then the worker can pick the batch by batch.  By
> doing this way, we can avoid doing large sorts.  And then this
> approach can also be used with the hash aggregate, I mean the
> partially aggregated data by the hash aggregate can be put into the
> respective batch.

Good idea. Batch sort suitable for large aggregate result rows,
in large aggregate result using partial aggregation maybe out of memory,
and all aggregate functions must support partial(using batch sort this is unnecessary).

Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
to shared tuple store, But it's not support parallel grouping sets.
I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
and need support parallel grouping sets hash aggregate.

Re: Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Fri, Oct 23, 2020 at 11:58 AM bucoo@sohu.com <bucoo@sohu.com> wrote:
>
> > Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> > will directly put it into the respective batch(shared tuple store),
> > based on the hash on grouping column and once all the workers are
> > doing preparing the batch then each worker will pick those baches one
> > by one, perform sort and finish the aggregation.  I think there is a
> > scope of improvement that instead of directly putting the tuple to the
> > batch what if the worker does the partial aggregations and then it
> > places the partially aggregated rows in the shared tuple store based
> > on the hash value and then the worker can pick the batch by batch.  By
> > doing this way, we can avoid doing large sorts.  And then this
> > approach can also be used with the hash aggregate, I mean the
> > partially aggregated data by the hash aggregate can be put into the
> > respective batch.
>
> Good idea. Batch sort suitable for large aggregate result rows,
> in large aggregate result using partial aggregation maybe out of memory,
> and all aggregate functions must support partial(using batch sort this is unnecessary).
>
> Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
> but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
> to shared tuple store, But it's not support parallel grouping sets.
> I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
> and need support parallel grouping sets hash aggregate.

I was trying to look into this patch to understand the logic in more
detail.  Actually, there are no comments at all so it's really hard to
understand what the code is trying to do.

I was reading the below functions, which is the main entry point for
the batch sort.

+static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
+{
...
+ for (;;)
+ {
...
+ tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
+ }
+
+ for (i=node->numBatches;i>0;)
+ tuplesort_performsort(state->batches[--i]);
+build_already_done_:
+ if (parallel)
+ {
+ for (i=node->numBatches;i>0;)
+ {
+ --i;
+ if (state->batches[i])
+ {
+ tuplesort_end(state->batches[i]);
+ state->batches[i] = NULL;
+ }
+ }

I did not understand this part, that once each worker has performed
their local batch-wise sort why we are clearing the baches?  I mean
individual workers have their on batches so eventually they supposed
to get merged.  Can you explain this part and also it will be better
if you can add the comments.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: parallel distinct union and aggregate support patch

From
Robert Haas
Date:
On Thu, Oct 22, 2020 at 5:08 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> will directly put it into the respective batch(shared tuple store),
> based on the hash on grouping column and once all the workers are
> doing preparing the batch then each worker will pick those baches one
> by one, perform sort and finish the aggregation.  I think there is a
> scope of improvement that instead of directly putting the tuple to the
> batch what if the worker does the partial aggregations and then it
> places the partially aggregated rows in the shared tuple store based
> on the hash value and then the worker can pick the batch by batch.  By
> doing this way, we can avoid doing large sorts.  And then this
> approach can also be used with the hash aggregate, I mean the
> partially aggregated data by the hash aggregate can be put into the
> respective batch.

I am not sure if this would be a win if the typical group size is
small and the transition state has to be serialized/deserialized.
Possibly we need multiple strategies, but I guess we'd have to test
performance to be sure.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Tue, Oct 27, 2020 at 3:27 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Fri, Oct 23, 2020 at 11:58 AM bucoo@sohu.com <bucoo@sohu.com> wrote:
> >
> > > Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> > > will directly put it into the respective batch(shared tuple store),
> > > based on the hash on grouping column and once all the workers are
> > > doing preparing the batch then each worker will pick those baches one
> > > by one, perform sort and finish the aggregation.  I think there is a
> > > scope of improvement that instead of directly putting the tuple to the
> > > batch what if the worker does the partial aggregations and then it
> > > places the partially aggregated rows in the shared tuple store based
> > > on the hash value and then the worker can pick the batch by batch.  By
> > > doing this way, we can avoid doing large sorts.  And then this
> > > approach can also be used with the hash aggregate, I mean the
> > > partially aggregated data by the hash aggregate can be put into the
> > > respective batch.
> >
> > Good idea. Batch sort suitable for large aggregate result rows,
> > in large aggregate result using partial aggregation maybe out of memory,
> > and all aggregate functions must support partial(using batch sort this is unnecessary).
> >
> > Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
> > but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
> > to shared tuple store, But it's not support parallel grouping sets.
> > I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
> > and need support parallel grouping sets hash aggregate.
>
> I was trying to look into this patch to understand the logic in more
> detail.  Actually, there are no comments at all so it's really hard to
> understand what the code is trying to do.
>
> I was reading the below functions, which is the main entry point for
> the batch sort.
>
> +static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
> +{
> ...
> + for (;;)
> + {
> ...
> + tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
> + }
> +
> + for (i=node->numBatches;i>0;)
> + tuplesort_performsort(state->batches[--i]);
> +build_already_done_:
> + if (parallel)
> + {
> + for (i=node->numBatches;i>0;)
> + {
> + --i;
> + if (state->batches[i])
> + {
> + tuplesort_end(state->batches[i]);
> + state->batches[i] = NULL;
> + }
> + }
>
> I did not understand this part, that once each worker has performed
> their local batch-wise sort why we are clearing the baches?  I mean
> individual workers have their on batches so eventually they supposed
> to get merged.  Can you explain this part and also it will be better
> if you can add the comments.

I think I got this,  IIUC, each worker is initializing the shared
short and performing the batch-wise sorting and we will wait on a
barrier so that all the workers can finish with their sorting.  Once
that is done the workers will coordinate and pick the batch by batch
and perform the final merge for the batch.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Tue, Oct 27, 2020 at 5:43 PM Robert Haas <robertmhaas@gmail.com> wrote:
>
> On Thu, Oct 22, 2020 at 5:08 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> > Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> > will directly put it into the respective batch(shared tuple store),
> > based on the hash on grouping column and once all the workers are
> > doing preparing the batch then each worker will pick those baches one
> > by one, perform sort and finish the aggregation.  I think there is a
> > scope of improvement that instead of directly putting the tuple to the
> > batch what if the worker does the partial aggregations and then it
> > places the partially aggregated rows in the shared tuple store based
> > on the hash value and then the worker can pick the batch by batch.  By
> > doing this way, we can avoid doing large sorts.  And then this
> > approach can also be used with the hash aggregate, I mean the
> > partially aggregated data by the hash aggregate can be put into the
> > respective batch.
>
> I am not sure if this would be a win if the typical group size is
> small and the transition state has to be serialized/deserialized.
> Possibly we need multiple strategies, but I guess we'd have to test
> performance to be sure.

+1

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
> On Tue, Oct 27, 2020 at 3:27 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Fri, Oct 23, 2020 at 11:58 AM bucoo@sohu.com <bucoo@sohu.com> wrote:
> > >
> > > > Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> > > > will directly put it into the respective batch(shared tuple store),
> > > > based on the hash on grouping column and once all the workers are
> > > > doing preparing the batch then each worker will pick those baches one
> > > > by one, perform sort and finish the aggregation.  I think there is a
> > > > scope of improvement that instead of directly putting the tuple to the
> > > > batch what if the worker does the partial aggregations and then it
> > > > places the partially aggregated rows in the shared tuple store based
> > > > on the hash value and then the worker can pick the batch by batch.  By
> > > > doing this way, we can avoid doing large sorts.  And then this
> > > > approach can also be used with the hash aggregate, I mean the
> > > > partially aggregated data by the hash aggregate can be put into the
> > > > respective batch.
> > >
> > > Good idea. Batch sort suitable for large aggregate result rows,
> > > in large aggregate result using partial aggregation maybe out of memory,
> > > and all aggregate functions must support partial(using batch sort this is unnecessary).
> > >
> > > Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
> > > but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
> > > to shared tuple store, But it's not support parallel grouping sets.
> > > I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
> > > and need support parallel grouping sets hash aggregate.
> >
> > I was trying to look into this patch to understand the logic in more
> > detail.  Actually, there are no comments at all so it's really hard to
> > understand what the code is trying to do.
> >
> > I was reading the below functions, which is the main entry point for
> > the batch sort.
> >
> > +static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
> > +{
> > ...
> > + for (;;)
> > + {
> > ...
> > + tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
> > + }
> > +
> > + for (i=node->numBatches;i>0;)
> > + tuplesort_performsort(state->batches[--i]);
> > +build_already_done_:
> > + if (parallel)
> > + {
> > + for (i=node->numBatches;i>0;)
> > + {
> > + --i;
> > + if (state->batches[i])
> > + {
> > + tuplesort_end(state->batches[i]);
> > + state->batches[i] = NULL;
> > + }
> > + }
> >
> > I did not understand this part, that once each worker has performed
> > their local batch-wise sort why we are clearing the baches?  I mean
> > individual workers have their on batches so eventually they supposed
> > to get merged.  Can you explain this part and also it will be better
> > if you can add the comments.
>  
> I think I got this,  IIUC, each worker is initializing the shared
> short and performing the batch-wise sorting and we will wait on a
> barrier so that all the workers can finish with their sorting.  Once
> that is done the workers will coordinate and pick the batch by batch
> and perform the final merge for the batch.

Yes, it is. Each worker open the shared sort as "worker" (nodeBatchSort.c:134),
end of all worker performing, pick one batch and open it as "leader"(nodeBatchSort.c:54).

Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
Hi
Here is patch for parallel distinct union aggregate and grouping sets support using batch hash agg.
Please review.

how to use:
set enable_batch_hashagg = on

how to work:
like batch sort, but not sort each batch, just save hash value in each rows

unfinished work:
not support rescan yet. welcome to add. Actually I don't really understand how rescan works in parallel mode.

other:
patch 1 base on branch master(80f8eb79e24d9b7963eaf17ce846667e2c6b6e6f)
patch 3:
 extpand shared tuple store and add batch store module.
 By the way, use atomic operations instead LWLock for shared tuple store get next read page.
patch 4:
 using batch hash agg support parallels

 
发件人: bucoo@sohu.com
发送时间: 2020-10-19 22:42
收件人: pgsql-hackers
主题: parallel distinct union and aggregate support patch
Hi hackers,
I write a path for soupport parallel distinct, union and aggregate using batch sort.
steps:
 1. generate hash value for group clauses values, and using mod hash value save to batch
 2. end of outer plan, wait all other workers finish write to batch
 3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
 4. return row for this batch
 5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

path 2 for parallel aggregate, this is a simple use
but regress failed for partitionwise aggregation difference plan
from GatherMerge->Sort->Append->...
to  Sort->Gahter->Append->...
I have no idea how to modify it.

Same idea I writed a batch shared tuple store for HashAgg in our PG version, I will send patch for PG14 when I finish it.

The following is a description in Chinese
英语不好,所以这里写点中文,希望上面写的不对的地方请大家帮忙纠正一下。
BatchSort的工作原理
 1. 先按group clause计算出hash值,并按取模的值放入不同的批次
 2. 当下层plan返回所有的行后,等待所有其它的工作进程结束
 3. 每一个工作进程索取一个唯一的一个批次, 并调用tuplesort_performsort()函数完成最终排序
 4. 返回本批次的所有行
 5. 如果所有的批次没有读完,则返回第3步
BatchSort plan能保证相同的数据(按分给表达式)在同一个周期内返回,所以几个去重和分组相关的plan可以正常工作。
第2个补丁是支持并行分组的,只做一次分组,而不是并行进程做每一次分组后,主进程再进行二次分组。
这个补丁导致了regress测试中的partitionwise aggregation失败,原来的执行计划有所变更。
补丁只写了一个简单的使用BatchSort plan的方法,可能还需要添加其它用法。

用同样的思想我写了一个使用shared tuple store的HashAgg在我们的AntDB版本中(最新版本暂未开源),适配完PG14版本后我会发出来。
打个广告:欢迎关注我们亚信公司基于PG的分布式数据库产品AntDB,开源地址 https://github.com/ADBSQL/AntDB

bucoo@sohu.com
Attachment

Re: parallel distinct union and aggregate support patch

From
Tomas Vondra
Date:
Hi,

On Wed, Oct 28, 2020 at 05:37:40PM +0800, bucoo@sohu.com wrote:
>Hi
>Here is patch for parallel distinct union aggregate and grouping sets support using batch hash agg.
>Please review.
>
>how to use:
>set enable_batch_hashagg = on
>
>how to work:
>like batch sort, but not sort each batch, just save hash value in each rows
>
>unfinished work:
>not support rescan yet. welcome to add. Actually I don't really understand how rescan works in parallel mode.
>
>other:
>patch 1 base on branch master(80f8eb79e24d9b7963eaf17ce846667e2c6b6e6f)
>patch 1 and 2 see https://www.postgresql.org/message-id/2020101922424962544053%40sohu.com
>patch 3:
> extpand shared tuple store and add batch store module.
> By the way, use atomic operations instead LWLock for shared tuple store get next read page.
>patch 4:
> using batch hash agg support parallels
>

Thanks for the patch!

Two generic comments:

1) It's better to always include the whole patch series - including the
parts that have not changed. Otherwise people have to scavenge the
thread and search for all the pieces, which may be a source of issues.
Also, it confuses the patch tester [1] which tries to apply patches from
a single message, so it will fail for this one.

2) I suggest you try to describe the goal of these patches, using some
example queries, explain output etc. Right now the reviewers have to
reverse engineer the patches and deduce what the intention was, which
may be causing unnecessary confusion etc. If this was my patch, I'd try
to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
how the patch changes the query plan, showing speedup etc.


I'd like to do a review and some testing, and this would make it much
easier for me.


kind regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services 



Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
> 1) It's better to always include the whole patch series - including the
> parts that have not changed. Otherwise people have to scavenge the
> thread and search for all the pieces, which may be a source of issues.
> Also, it confuses the patch tester [1] which tries to apply patches from
> a single message, so it will fail for this one.
 Pathes 3 and 4 do not rely on 1 and 2 in code.
 But, it will fail when you apply the apatches 3 and 4 directly, because
 they are written after 1 and 2.
 I can generate a new single patch if you need.

> 2) I suggest you try to describe the goal of these patches, using some
> example queries, explain output etc. Right now the reviewers have to
> reverse engineer the patches and deduce what the intention was, which
> may be causing unnecessary confusion etc. If this was my patch, I'd try
> to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> how the patch changes the query plan, showing speedup etc.
 I written some example queries in to regress, include "unique" "union"
 "group by" and "group by grouping sets".
 here is my tests, they are not in regress
```sql
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set jit = off;
\timing on
```
normal aggregate times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
                                                 QUERY PLAN
-------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
   Output: sum(id), txt
   Group Key: gtest.txt
   ->  Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
         Output: txt, (PARTIAL sum(id))
         Workers Planned: 2
         Workers Launched: 2
         ->  Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
               Output: txt, (PARTIAL sum(id))
               Sort Key: gtest.txt
               Sort Method: external merge  Disk: 11608kB
               Worker 0:  actual time=6447.665..7349.431 rows=317512 loops=1
                 Sort Method: external merge  Disk: 10576kB
               Worker 1:  actual time=6302.882..7061.157 rows=333301 loops=1
                 Sort Method: external merge  Disk: 11112kB
               ->  Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
                     Output: txt, PARTIAL sum(id)
                     Group Key: gtest.txt
                     Batches: 17  Memory Usage: 4241kB  Disk Usage: 113152kB
                     Worker 0:  actual time=2584.345..4486.407 rows=317512 loops=1
                       Batches: 17  Memory Usage: 4241kB  Disk Usage: 101392kB
                     Worker 1:  actual time=2584.369..4393.244 rows=333301 loops=1
                       Batches: 17  Memory Usage: 4241kB  Disk Usage: 112832kB
                     ->  Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
                           Output: id, txt
                           Worker 0:  actual time=0.104..607.146 rows=3174970 loops=1
                           Worker 1:  actual time=0.100..603.951 rows=3332785 loops=1
 Planning Time: 0.226 ms
 Execution Time: 9021.058 ms
(29 rows)

Time: 9022.251 ms (00:09.022)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
                                           QUERY PLAN
-------------------------------------------------------------------------------------------------
 Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
   Output: (sum(id)), txt
   Workers Planned: 2
   Workers Launched: 2
   ->  Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
         Output: sum(id), txt
         Group Key: gtest.txt
         Worker 0:  actual time=3094.550..5486.992 rows=326082 loops=1
         Worker 1:  actual time=3099.562..5480.111 rows=324729 loops=1
         ->  Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
               Output: id, txt
               Worker 0:  actual time=0.080..646.053 rows=3057680 loops=1
               Worker 1:  actual time=0.070..662.754 rows=3034370 loops=1
 Planning Time: 0.243 ms
 Execution Time: 5788.981 ms
(15 rows)

Time: 5790.143 ms (00:05.790)
```

grouping sets times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
                                        QUERY PLAN
------------------------------------------------------------------------------------------
 GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
   Output: sum(id), txt, id
   Group Key: gtest.id
   Group Key: ()
   Sort Key: gtest.txt
     Group Key: gtest.txt
   ->  Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
         Output: txt, id
         Sort Key: gtest.id
         Sort Method: external merge  Disk: 254056kB
         ->  Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
               Output: txt, id
 Planning Time: 0.230 ms
 Execution Time: 39203.883 ms
(14 rows)

Time: 39205.339 ms (00:39.205)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
                                           QUERY PLAN
-------------------------------------------------------------------------------------------------
 Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
   Output: (sum(id)), txt, id
   Workers Planned: 2
   Workers Launched: 2
   ->  Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
         Output: sum(id), txt, id
         Group Key: gtest.id
         Group Key: ()
         Group Key: gtest.txt
         Worker 0:  actual time=5916.370..14062.461 rows=513810 loops=1
         Worker 1:  actual time=5916.037..13932.847 rows=775901 loops=1
         ->  Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
               Output: id, txt
               Worker 0:  actual time=0.052..690.955 rows=3349990 loops=1
               Worker 1:  actual time=0.050..691.595 rows=3297070 loops=1
 Planning Time: 0.157 ms
 Execution Time: 14598.416 ms
(17 rows)

Time: 14599.437 ms (00:14.599)
```

Re: Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:
>
> > 1) It's better to always include the whole patch series - including the
> > parts that have not changed. Otherwise people have to scavenge the
> > thread and search for all the pieces, which may be a source of issues.
> > Also, it confuses the patch tester [1] which tries to apply patches from
> > a single message, so it will fail for this one.
>  Pathes 3 and 4 do not rely on 1 and 2 in code.
>  But, it will fail when you apply the apatches 3 and 4 directly, because
>  they are written after 1 and 2.
>  I can generate a new single patch if you need.
>
> > 2) I suggest you try to describe the goal of these patches, using some
> > example queries, explain output etc. Right now the reviewers have to
> > reverse engineer the patches and deduce what the intention was, which
> > may be causing unnecessary confusion etc. If this was my patch, I'd try
> > to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> > how the patch changes the query plan, showing speedup etc.
>  I written some example queries in to regress, include "unique" "union"
>  "group by" and "group by grouping sets".
>  here is my tests, they are not in regress
> ```sql
> begin;
> create table gtest(id integer, txt text);
> insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select
generate_series(1,10)id) t2;
 
> analyze gtest;
> commit;
> set jit = off;
> \timing on
> ```
> normal aggregate times
> ```
> set enable_batch_hashagg = off;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by txt;
>                                                  QUERY PLAN
> -------------------------------------------------------------------------------------------------------------
>  Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
>    Output: sum(id), txt
>    Group Key: gtest.txt
>    ->  Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
>          Output: txt, (PARTIAL sum(id))
>          Workers Planned: 2
>          Workers Launched: 2
>          ->  Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
>                Output: txt, (PARTIAL sum(id))
>                Sort Key: gtest.txt
>                Sort Method: external merge  Disk: 11608kB
>                Worker 0:  actual time=6447.665..7349.431 rows=317512 loops=1
>                  Sort Method: external merge  Disk: 10576kB
>                Worker 1:  actual time=6302.882..7061.157 rows=333301 loops=1
>                  Sort Method: external merge  Disk: 11112kB
>                ->  Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
>                      Output: txt, PARTIAL sum(id)
>                      Group Key: gtest.txt
>                      Batches: 17  Memory Usage: 4241kB  Disk Usage: 113152kB
>                      Worker 0:  actual time=2584.345..4486.407 rows=317512 loops=1
>                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 101392kB
>                      Worker 1:  actual time=2584.369..4393.244 rows=333301 loops=1
>                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 112832kB
>                      ->  Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
>                            Output: id, txt
>                            Worker 0:  actual time=0.104..607.146 rows=3174970 loops=1
>                            Worker 1:  actual time=0.100..603.951 rows=3332785 loops=1
>  Planning Time: 0.226 ms
>  Execution Time: 9021.058 ms
> (29 rows)
>
> Time: 9022.251 ms (00:09.022)
>
> set enable_batch_hashagg = on;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by txt;
>                                            QUERY PLAN
> -------------------------------------------------------------------------------------------------
>  Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
>    Output: (sum(id)), txt
>    Workers Planned: 2
>    Workers Launched: 2
>    ->  Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
>          Output: sum(id), txt
>          Group Key: gtest.txt
>          Worker 0:  actual time=3094.550..5486.992 rows=326082 loops=1
>          Worker 1:  actual time=3099.562..5480.111 rows=324729 loops=1
>          ->  Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
>                Output: id, txt
>                Worker 0:  actual time=0.080..646.053 rows=3057680 loops=1
>                Worker 1:  actual time=0.070..662.754 rows=3034370 loops=1
>  Planning Time: 0.243 ms
>  Execution Time: 5788.981 ms
> (15 rows)
>
> Time: 5790.143 ms (00:05.790)
> ```
>
> grouping sets times
> ```
> set enable_batch_hashagg = off;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by grouping sets(id,txt,());
>                                         QUERY PLAN
> ------------------------------------------------------------------------------------------
>  GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
>    Output: sum(id), txt, id
>    Group Key: gtest.id
>    Group Key: ()
>    Sort Key: gtest.txt
>      Group Key: gtest.txt
>    ->  Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
>          Output: txt, id
>          Sort Key: gtest.id
>          Sort Method: external merge  Disk: 254056kB
>          ->  Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
>                Output: txt, id
>  Planning Time: 0.230 ms
>  Execution Time: 39203.883 ms
> (14 rows)
>
> Time: 39205.339 ms (00:39.205)
>
> set enable_batch_hashagg = on;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by grouping sets(id,txt,());
>                                            QUERY PLAN
> -------------------------------------------------------------------------------------------------
>  Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
>    Output: (sum(id)), txt, id
>    Workers Planned: 2
>    Workers Launched: 2
>    ->  Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
>          Output: sum(id), txt, id
>          Group Key: gtest.id
>          Group Key: ()
>          Group Key: gtest.txt
>          Worker 0:  actual time=5916.370..14062.461 rows=513810 loops=1
>          Worker 1:  actual time=5916.037..13932.847 rows=775901 loops=1
>          ->  Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
>                Output: id, txt
>                Worker 0:  actual time=0.052..690.955 rows=3349990 loops=1
>                Worker 1:  actual time=0.050..691.595 rows=3297070 loops=1
>  Planning Time: 0.157 ms
>  Execution Time: 14598.416 ms
> (17 rows)
>
> Time: 14599.437 ms (00:14.599)
> ```

I have done some performance testing with TPCH to see the impact on
the different query plan,  I could see there are a lot of plan changes
across various queries but out of those, there are few queries where
these patches gave noticeable gain query13 and query17 (I have
attached the plan for these 2 queries).

Test details:
----------------
TPCH scale factor 50 (database size 112GB)
work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4

Machine information:
Architecture:          x86_64
CPU(s):                56
Thread(s) per core:    2
Core(s) per socket:    14
Socket(s):             2
NUMA node(s):          2
Model name:            Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz

Observation:
In the TPCH test, I have noticed that the major gain we are getting in
this patch is because we are able to use the parallelism where we were
not able to use due to the limitation of the parallel aggregate.
Basically, for computing final aggregated results we need to break the
parallelism because the worker is only performing the partial
aggregate and after that, we had to gather all the partially
aggregated results and do the finalize aggregate.  Now, with this
patch, since we are batching the results we are able to compute the
final aggregate within the workers itself and that enables us to get
the parallelism in more cases.

Example:
If we observe the output of plan 13(13.explain_head.out), the subquery
is performing the aggregate and the outer query is doing the grouping
on the aggregated value of the subquery, due to this we are not
selecting the parallelism in the head because in the inner aggregation
the number of groups is huge and if we select the parallelism we need
to transfer a lot of tuple through the tuple queue and we will also
have to serialize/deserialize those many transition values.  And the
outer query needs the final aggregated results from the inner query so
we can not select the parallelism.  Now with the batch
aggregate(13.explain_patch.out), we are able to compute the finalize
aggregation within the workers itself and that enabled us to continue
the parallelism till the top node.  The execution time for this query
is now reduced to 57sec from 238sec which is 4X faster.

I will perform some more tests with different scale factors and
analyze the behavior of this.






--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment

Re: Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Tue, Nov 3, 2020 at 6:06 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:
> >
> > > 1) It's better to always include the whole patch series - including the
> > > parts that have not changed. Otherwise people have to scavenge the
> > > thread and search for all the pieces, which may be a source of issues.
> > > Also, it confuses the patch tester [1] which tries to apply patches from
> > > a single message, so it will fail for this one.
> >  Pathes 3 and 4 do not rely on 1 and 2 in code.
> >  But, it will fail when you apply the apatches 3 and 4 directly, because
> >  they are written after 1 and 2.
> >  I can generate a new single patch if you need.
> >
> > > 2) I suggest you try to describe the goal of these patches, using some
> > > example queries, explain output etc. Right now the reviewers have to
> > > reverse engineer the patches and deduce what the intention was, which
> > > may be causing unnecessary confusion etc. If this was my patch, I'd try
> > > to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> > > how the patch changes the query plan, showing speedup etc.
> >  I written some example queries in to regress, include "unique" "union"
> >  "group by" and "group by grouping sets".
> >  here is my tests, they are not in regress
> > ```sql
> > begin;
> > create table gtest(id integer, txt text);
> > insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select
generate_series(1,10)id) t2;
 
> > analyze gtest;
> > commit;
> > set jit = off;
> > \timing on
> > ```
> > normal aggregate times
> > ```
> > set enable_batch_hashagg = off;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by txt;
> >                                                  QUERY PLAN
> > -------------------------------------------------------------------------------------------------------------
> >  Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
> >    Output: sum(id), txt
> >    Group Key: gtest.txt
> >    ->  Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
> >          Output: txt, (PARTIAL sum(id))
> >          Workers Planned: 2
> >          Workers Launched: 2
> >          ->  Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
> >                Output: txt, (PARTIAL sum(id))
> >                Sort Key: gtest.txt
> >                Sort Method: external merge  Disk: 11608kB
> >                Worker 0:  actual time=6447.665..7349.431 rows=317512 loops=1
> >                  Sort Method: external merge  Disk: 10576kB
> >                Worker 1:  actual time=6302.882..7061.157 rows=333301 loops=1
> >                  Sort Method: external merge  Disk: 11112kB
> >                ->  Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
> >                      Output: txt, PARTIAL sum(id)
> >                      Group Key: gtest.txt
> >                      Batches: 17  Memory Usage: 4241kB  Disk Usage: 113152kB
> >                      Worker 0:  actual time=2584.345..4486.407 rows=317512 loops=1
> >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 101392kB
> >                      Worker 1:  actual time=2584.369..4393.244 rows=333301 loops=1
> >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 112832kB
> >                      ->  Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
> >                            Output: id, txt
> >                            Worker 0:  actual time=0.104..607.146 rows=3174970 loops=1
> >                            Worker 1:  actual time=0.100..603.951 rows=3332785 loops=1
> >  Planning Time: 0.226 ms
> >  Execution Time: 9021.058 ms
> > (29 rows)
> >
> > Time: 9022.251 ms (00:09.022)
> >
> > set enable_batch_hashagg = on;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by txt;
> >                                            QUERY PLAN
> > -------------------------------------------------------------------------------------------------
> >  Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
> >    Output: (sum(id)), txt
> >    Workers Planned: 2
> >    Workers Launched: 2
> >    ->  Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
> >          Output: sum(id), txt
> >          Group Key: gtest.txt
> >          Worker 0:  actual time=3094.550..5486.992 rows=326082 loops=1
> >          Worker 1:  actual time=3099.562..5480.111 rows=324729 loops=1
> >          ->  Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
> >                Output: id, txt
> >                Worker 0:  actual time=0.080..646.053 rows=3057680 loops=1
> >                Worker 1:  actual time=0.070..662.754 rows=3034370 loops=1
> >  Planning Time: 0.243 ms
> >  Execution Time: 5788.981 ms
> > (15 rows)
> >
> > Time: 5790.143 ms (00:05.790)
> > ```
> >
> > grouping sets times
> > ```
> > set enable_batch_hashagg = off;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by grouping sets(id,txt,());
> >                                         QUERY PLAN
> > ------------------------------------------------------------------------------------------
> >  GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
> >    Output: sum(id), txt, id
> >    Group Key: gtest.id
> >    Group Key: ()
> >    Sort Key: gtest.txt
> >      Group Key: gtest.txt
> >    ->  Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
> >          Output: txt, id
> >          Sort Key: gtest.id
> >          Sort Method: external merge  Disk: 254056kB
> >          ->  Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
> >                Output: txt, id
> >  Planning Time: 0.230 ms
> >  Execution Time: 39203.883 ms
> > (14 rows)
> >
> > Time: 39205.339 ms (00:39.205)
> >
> > set enable_batch_hashagg = on;
> > explain (costs off,analyze,verbose)
> > select sum(id),txt from gtest group by grouping sets(id,txt,());
> >                                            QUERY PLAN
> > -------------------------------------------------------------------------------------------------
> >  Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
> >    Output: (sum(id)), txt, id
> >    Workers Planned: 2
> >    Workers Launched: 2
> >    ->  Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
> >          Output: sum(id), txt, id
> >          Group Key: gtest.id
> >          Group Key: ()
> >          Group Key: gtest.txt
> >          Worker 0:  actual time=5916.370..14062.461 rows=513810 loops=1
> >          Worker 1:  actual time=5916.037..13932.847 rows=775901 loops=1
> >          ->  Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
> >                Output: id, txt
> >                Worker 0:  actual time=0.052..690.955 rows=3349990 loops=1
> >                Worker 1:  actual time=0.050..691.595 rows=3297070 loops=1
> >  Planning Time: 0.157 ms
> >  Execution Time: 14598.416 ms
> > (17 rows)
> >
> > Time: 14599.437 ms (00:14.599)
> > ```
>
> I have done some performance testing with TPCH to see the impact on
> the different query plan,  I could see there are a lot of plan changes
> across various queries but out of those, there are few queries where
> these patches gave noticeable gain query13 and query17 (I have
> attached the plan for these 2 queries).
>
> Test details:
> ----------------
> TPCH scale factor 50 (database size 112GB)
> work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4
>
> Machine information:
> Architecture:          x86_64
> CPU(s):                56
> Thread(s) per core:    2
> Core(s) per socket:    14
> Socket(s):             2
> NUMA node(s):          2
> Model name:            Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz
>
> Observation:
> In the TPCH test, I have noticed that the major gain we are getting in
> this patch is because we are able to use the parallelism where we were
> not able to use due to the limitation of the parallel aggregate.
> Basically, for computing final aggregated results we need to break the
> parallelism because the worker is only performing the partial
> aggregate and after that, we had to gather all the partially
> aggregated results and do the finalize aggregate.  Now, with this
> patch, since we are batching the results we are able to compute the
> final aggregate within the workers itself and that enables us to get
> the parallelism in more cases.
>
> Example:
> If we observe the output of plan 13(13.explain_head.out), the subquery
> is performing the aggregate and the outer query is doing the grouping
> on the aggregated value of the subquery, due to this we are not
> selecting the parallelism in the head because in the inner aggregation
> the number of groups is huge and if we select the parallelism we need
> to transfer a lot of tuple through the tuple queue and we will also
> have to serialize/deserialize those many transition values.  And the
> outer query needs the final aggregated results from the inner query so
> we can not select the parallelism.  Now with the batch
> aggregate(13.explain_patch.out), we are able to compute the finalize
> aggregation within the workers itself and that enabled us to continue
> the parallelism till the top node.  The execution time for this query
> is now reduced to 57sec from 238sec which is 4X faster.
>
> I will perform some more tests with different scale factors and
> analyze the behavior of this.

I have started reviewing these patches, I have a couple of review comments.

Some general comment to make code more readable

1. Comments are missing in the patch, even there are no function
header comments to explain the overall idea about the function.
   I think adding comments will make it easier to review the patch.

2. Code is not written as per the Postgres coding guideline, the
common problems observed with the patch are
  a) There should be an empty line after the variable declaration section
  b) In the function definition, the function return type and the
function name should not be in the same line

Change

+static bool ExecNextParallelBatchSort(BatchSortState *state)
{
}
to
static bool
ExecNextParallelBatchSort(BatchSortState *state)
{
}

c) While typecasting the variable the spacing is not used properly and
uniformly, you can refer to other code and fix it.

*Specific comments to patch 0001*

1.
+#define BATCH_SORT_MAX_BATCHES 512

Did you decide this number based on some experiment or is there some
analysis behind selecting this number?

2.
+BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
+{
+ BatchSortState *state;
+ TypeCacheEntry *typentry;
....
+ for (i=0;i<node->numGroupCols;++i)
+ {
...
+ InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
+ fcinfo->args[0].isnull = false;
+ state->groupFuns = lappend(state->groupFuns, fcinfo);
+ }

From the variable naming, it appeared like the batch sort is dependent
upon the grouping node.  I think instead of using the name
numGroupCols and groupFuns we need to use names that are more relevant
to the batch sort something like numSortKey.

3.
+ if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
+ {
+ /* for now, we only using in group aggregate */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("not support execute flag(s) %d for group sort", eflags)));
+ }

Instead of ereport, you should just put an Assert for the unsupported
flag or elog.

4.
+ state = makeNode(BatchSortState);
+ state->ps.plan = (Plan*) node;
+ state->ps.state = estate;
+ state->ps.ExecProcNode = ExecBatchSortPrepare;

I think the main executor entry function should be named ExecBatchSort
instead of ExecBatchSortPrepare, it will look more consistent with the
other executor machinery.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Sun, Nov 8, 2020 at 11:54 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Tue, Nov 3, 2020 at 6:06 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:
> > >
> > > > 1) It's better to always include the whole patch series - including the
> > > > parts that have not changed. Otherwise people have to scavenge the
> > > > thread and search for all the pieces, which may be a source of issues.
> > > > Also, it confuses the patch tester [1] which tries to apply patches from
> > > > a single message, so it will fail for this one.
> > >  Pathes 3 and 4 do not rely on 1 and 2 in code.
> > >  But, it will fail when you apply the apatches 3 and 4 directly, because
> > >  they are written after 1 and 2.
> > >  I can generate a new single patch if you need.
> > >
> > > > 2) I suggest you try to describe the goal of these patches, using some
> > > > example queries, explain output etc. Right now the reviewers have to
> > > > reverse engineer the patches and deduce what the intention was, which
> > > > may be causing unnecessary confusion etc. If this was my patch, I'd try
> > > > to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> > > > how the patch changes the query plan, showing speedup etc.
> > >  I written some example queries in to regress, include "unique" "union"
> > >  "group by" and "group by grouping sets".
> > >  here is my tests, they are not in regress
> > > ```sql
> > > begin;
> > > create table gtest(id integer, txt text);
> > > insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select
generate_series(1,10)id) t2;
 
> > > analyze gtest;
> > > commit;
> > > set jit = off;
> > > \timing on
> > > ```
> > > normal aggregate times
> > > ```
> > > set enable_batch_hashagg = off;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by txt;
> > >                                                  QUERY PLAN
> > > -------------------------------------------------------------------------------------------------------------
> > >  Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
> > >    Output: sum(id), txt
> > >    Group Key: gtest.txt
> > >    ->  Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
> > >          Output: txt, (PARTIAL sum(id))
> > >          Workers Planned: 2
> > >          Workers Launched: 2
> > >          ->  Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
> > >                Output: txt, (PARTIAL sum(id))
> > >                Sort Key: gtest.txt
> > >                Sort Method: external merge  Disk: 11608kB
> > >                Worker 0:  actual time=6447.665..7349.431 rows=317512 loops=1
> > >                  Sort Method: external merge  Disk: 10576kB
> > >                Worker 1:  actual time=6302.882..7061.157 rows=333301 loops=1
> > >                  Sort Method: external merge  Disk: 11112kB
> > >                ->  Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
> > >                      Output: txt, PARTIAL sum(id)
> > >                      Group Key: gtest.txt
> > >                      Batches: 17  Memory Usage: 4241kB  Disk Usage: 113152kB
> > >                      Worker 0:  actual time=2584.345..4486.407 rows=317512 loops=1
> > >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 101392kB
> > >                      Worker 1:  actual time=2584.369..4393.244 rows=333301 loops=1
> > >                        Batches: 17  Memory Usage: 4241kB  Disk Usage: 112832kB
> > >                      ->  Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
> > >                            Output: id, txt
> > >                            Worker 0:  actual time=0.104..607.146 rows=3174970 loops=1
> > >                            Worker 1:  actual time=0.100..603.951 rows=3332785 loops=1
> > >  Planning Time: 0.226 ms
> > >  Execution Time: 9021.058 ms
> > > (29 rows)
> > >
> > > Time: 9022.251 ms (00:09.022)
> > >
> > > set enable_batch_hashagg = on;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by txt;
> > >                                            QUERY PLAN
> > > -------------------------------------------------------------------------------------------------
> > >  Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
> > >    Output: (sum(id)), txt
> > >    Workers Planned: 2
> > >    Workers Launched: 2
> > >    ->  Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
> > >          Output: sum(id), txt
> > >          Group Key: gtest.txt
> > >          Worker 0:  actual time=3094.550..5486.992 rows=326082 loops=1
> > >          Worker 1:  actual time=3099.562..5480.111 rows=324729 loops=1
> > >          ->  Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
> > >                Output: id, txt
> > >                Worker 0:  actual time=0.080..646.053 rows=3057680 loops=1
> > >                Worker 1:  actual time=0.070..662.754 rows=3034370 loops=1
> > >  Planning Time: 0.243 ms
> > >  Execution Time: 5788.981 ms
> > > (15 rows)
> > >
> > > Time: 5790.143 ms (00:05.790)
> > > ```
> > >
> > > grouping sets times
> > > ```
> > > set enable_batch_hashagg = off;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by grouping sets(id,txt,());
> > >                                         QUERY PLAN
> > > ------------------------------------------------------------------------------------------
> > >  GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
> > >    Output: sum(id), txt, id
> > >    Group Key: gtest.id
> > >    Group Key: ()
> > >    Sort Key: gtest.txt
> > >      Group Key: gtest.txt
> > >    ->  Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
> > >          Output: txt, id
> > >          Sort Key: gtest.id
> > >          Sort Method: external merge  Disk: 254056kB
> > >          ->  Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
> > >                Output: txt, id
> > >  Planning Time: 0.230 ms
> > >  Execution Time: 39203.883 ms
> > > (14 rows)
> > >
> > > Time: 39205.339 ms (00:39.205)
> > >
> > > set enable_batch_hashagg = on;
> > > explain (costs off,analyze,verbose)
> > > select sum(id),txt from gtest group by grouping sets(id,txt,());
> > >                                            QUERY PLAN
> > > -------------------------------------------------------------------------------------------------
> > >  Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
> > >    Output: (sum(id)), txt, id
> > >    Workers Planned: 2
> > >    Workers Launched: 2
> > >    ->  Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
> > >          Output: sum(id), txt, id
> > >          Group Key: gtest.id
> > >          Group Key: ()
> > >          Group Key: gtest.txt
> > >          Worker 0:  actual time=5916.370..14062.461 rows=513810 loops=1
> > >          Worker 1:  actual time=5916.037..13932.847 rows=775901 loops=1
> > >          ->  Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
> > >                Output: id, txt
> > >                Worker 0:  actual time=0.052..690.955 rows=3349990 loops=1
> > >                Worker 1:  actual time=0.050..691.595 rows=3297070 loops=1
> > >  Planning Time: 0.157 ms
> > >  Execution Time: 14598.416 ms
> > > (17 rows)
> > >
> > > Time: 14599.437 ms (00:14.599)
> > > ```
> >
> > I have done some performance testing with TPCH to see the impact on
> > the different query plan,  I could see there are a lot of plan changes
> > across various queries but out of those, there are few queries where
> > these patches gave noticeable gain query13 and query17 (I have
> > attached the plan for these 2 queries).
> >
> > Test details:
> > ----------------
> > TPCH scale factor 50 (database size 112GB)
> > work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4
> >
> > Machine information:
> > Architecture:          x86_64
> > CPU(s):                56
> > Thread(s) per core:    2
> > Core(s) per socket:    14
> > Socket(s):             2
> > NUMA node(s):          2
> > Model name:            Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz
> >
> > Observation:
> > In the TPCH test, I have noticed that the major gain we are getting in
> > this patch is because we are able to use the parallelism where we were
> > not able to use due to the limitation of the parallel aggregate.
> > Basically, for computing final aggregated results we need to break the
> > parallelism because the worker is only performing the partial
> > aggregate and after that, we had to gather all the partially
> > aggregated results and do the finalize aggregate.  Now, with this
> > patch, since we are batching the results we are able to compute the
> > final aggregate within the workers itself and that enables us to get
> > the parallelism in more cases.
> >
> > Example:
> > If we observe the output of plan 13(13.explain_head.out), the subquery
> > is performing the aggregate and the outer query is doing the grouping
> > on the aggregated value of the subquery, due to this we are not
> > selecting the parallelism in the head because in the inner aggregation
> > the number of groups is huge and if we select the parallelism we need
> > to transfer a lot of tuple through the tuple queue and we will also
> > have to serialize/deserialize those many transition values.  And the
> > outer query needs the final aggregated results from the inner query so
> > we can not select the parallelism.  Now with the batch
> > aggregate(13.explain_patch.out), we are able to compute the finalize
> > aggregation within the workers itself and that enabled us to continue
> > the parallelism till the top node.  The execution time for this query
> > is now reduced to 57sec from 238sec which is 4X faster.
> >
> > I will perform some more tests with different scale factors and
> > analyze the behavior of this.
>
> I have started reviewing these patches, I have a couple of review comments.
>
> Some general comment to make code more readable
>
> 1. Comments are missing in the patch, even there are no function
> header comments to explain the overall idea about the function.
>    I think adding comments will make it easier to review the patch.
>
> 2. Code is not written as per the Postgres coding guideline, the
> common problems observed with the patch are
>   a) There should be an empty line after the variable declaration section
>   b) In the function definition, the function return type and the
> function name should not be in the same line
>
> Change
>
> +static bool ExecNextParallelBatchSort(BatchSortState *state)
> {
> }
> to
> static bool
> ExecNextParallelBatchSort(BatchSortState *state)
> {
> }
>
> c) While typecasting the variable the spacing is not used properly and
> uniformly, you can refer to other code and fix it.
>
> *Specific comments to patch 0001*
>
> 1.
> +#define BATCH_SORT_MAX_BATCHES 512
>
> Did you decide this number based on some experiment or is there some
> analysis behind selecting this number?
>
> 2.
> +BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
> +{
> + BatchSortState *state;
> + TypeCacheEntry *typentry;
> ....
> + for (i=0;i<node->numGroupCols;++i)
> + {
> ...
> + InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
> + fcinfo->args[0].isnull = false;
> + state->groupFuns = lappend(state->groupFuns, fcinfo);
> + }
>
> From the variable naming, it appeared like the batch sort is dependent
> upon the grouping node.  I think instead of using the name
> numGroupCols and groupFuns we need to use names that are more relevant
> to the batch sort something like numSortKey.
>
> 3.
> + if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
> + {
> + /* for now, we only using in group aggregate */
> + ereport(ERROR,
> + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> + errmsg("not support execute flag(s) %d for group sort", eflags)));
> + }
>
> Instead of ereport, you should just put an Assert for the unsupported
> flag or elog.
>
> 4.
> + state = makeNode(BatchSortState);
> + state->ps.plan = (Plan*) node;
> + state->ps.state = estate;
> + state->ps.ExecProcNode = ExecBatchSortPrepare;
>
> I think the main executor entry function should be named ExecBatchSort
> instead of ExecBatchSortPrepare, it will look more consistent with the
> other executor machinery.

1.
+void cost_batchsort(Path *path, PlannerInfo *root,
+                    List *batchkeys, Cost input_cost,
+                    double tuples, int width,
+                    Cost comparison_cost, int sort_mem,
+                    uint32 numGroupCols, uint32 numBatches)
+{
+    Cost        startup_cost = input_cost;
+    Cost        run_cost = 0;
+    double        input_bytes = relation_byte_size(tuples, width);
+    double        batch_bytes = input_bytes / numBatches;
+    double        batch_tuples = tuples / numBatches;
+    long        sort_mem_bytes = sort_mem * 1024L;
+
+    if (sort_mem_bytes < (64*1024))
+        sort_mem_bytes = (64*1024);
+
+    if (!enable_batch_sort)
+        startup_cost += disable_cost;

You don't need to write a duplicate function for this, you can reuse
the cost_tuplesort function with some minor changes.


2. I have one more suggestion, currently, the batches are picked by
workers dynamically and the benefit of that is the work distribution
is quite flexible.  But one downside I see with this approach is that
if we want to make this parallelism to the upper node for example
merge join, therein we can imagine the merge join with both side nodes
as BatchSort.  But the problem is if the worker picks the batch
dynamically then the worker need to pick the same batch on both sides
so for that the right side node should be aware of what batch got
picked on the left side node so for doing that we might have to
introduce a different join node say BatchWiseMergeJoin.  Whereas if we
make the batches as per the worker number then each sort node can be
processed independently without knowing what is happening on the other
side.

3. I have also done some performance tests especially with the small
group size, basically, the cases where parallel aggregate is not
picked due to the small group size, and with the new patch the
parallel aggregate is possible now.

Setup:  I have used TPCH database with S.F 50 and executed an
aggregation query on the ORDER table

Number of rows in order table: 75000000
Total table size: 18 GB

Work_mem: 10GB

postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

                                                              QUERY
PLAN

--------------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=2506201.00..2570706.04 rows=5160403 width=40)
(actual time=94002.681..98733.002 rows=4999889 loops=1)
   Output: sum(o_totalprice), o_custkey
   Group Key: orders.o_custkey
   Batches: 1  Memory Usage: 2228241kB
   ->  Seq Scan on public.orders  (cost=0.00..2131201.00 rows=75000000
width=16) (actual time=0.042..12930.981 rows=75000000 loops=1)
         Output: o_orderkey, o_custkey, o_orderstatus, o_totalprice,
o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment
 Planning Time: 0.317 ms
 Execution Time: 99230.242 ms


postgres=# set enable_batch_sort=on;
SET
postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

 QUERY PLAN


-------------------------------------------------------------------------------------------------------------------------------------------------
---------
 Gather  (cost=1616576.00..1761358.55 rows=40316 width=40) (actual
time=18516.549..28811.164 rows=4999889 loops=1)
   Output: (sum(o_totalprice)), o_custkey
   Workers Planned: 4
   Workers Launched: 4
   ->  GroupAggregate  (cost=1615576.00..1756326.99 rows=10079
width=40) (actual time=18506.051..28131.650 rows=999978 loops=5)
         Output: sum(o_totalprice), o_custkey
         Group Key: orders.o_custkey
         Worker 0:  actual time=18502.746..28406.868 rows=995092 loops=1
         Worker 1:  actual time=18502.339..28518.559 rows=1114511 loops=1
         Worker 2:  actual time=18503.233..28461.975 rows=985574 loops=1
         Worker 3:  actual time=18506.026..28409.130 rows=1005414 loops=1
         ->  Parallel BatchSort  (cost=1615576.00..1662451.00
rows=18750000 width=16) (actual time=18505.982..21839.567
rows=15000000 loops=5)
               Output: o_custkey, o_totalprice
               Sort Key: orders.o_custkey
               batches: 512
               Worker 0:  actual time=18502.666..21945.442 rows=14925544 loops=1
               Worker 1:  actual time=18502.270..21979.350 rows=16714443 loops=1
               Worker 2:  actual time=18503.144..21933.151 rows=14784292 loops=1
               Worker 3:  actual time=18505.950..21943.312 rows=15081559 loops=1
               ->  Parallel Seq Scan on public.orders
(cost=0.00..1568701.00 rows=18750000 width=16) (actual
time=0.082..4662.390 rows=15000000
loops=5)
                     Output: o_custkey, o_totalprice
                     Worker 0:  actual time=0.079..4720.424
rows=15012981 loops=1
                     Worker 1:  actual time=0.083..4710.919
rows=15675399 loops=1
                     Worker 2:  actual time=0.082..4663.096
rows=14558663 loops=1
                     Worker 3:  actual time=0.104..4625.940
rows=14496910 loops=1
 Planning Time: 0.281 ms
 Execution Time: 29504.248 ms


postgres=# set enable_batch_hashagg =on;
postgres=# set enable_batch_sort=off;
postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

QUERY PLAN


-------------------------------------------------------------------------------------------------------------------------------------------------
---
 Gather  (cost=1755004.00..2287170.56 rows=5160403 width=40) (actual
time=12935.338..27064.962 rows=4999889 loops=1)
   Output: (sum(o_totalprice)), o_custkey
   Workers Planned: 4
   Workers Launched: 4
   ->  Parallel BatchHashAggregate  (cost=1754004.00..1770130.26
rows=1290101 width=40) (actual time=12987.830..24726.348 rows=999978
loops=5)
         Output: sum(o_totalprice), o_custkey
         Group Key: orders.o_custkey
         Worker 0:  actual time=13013.228..25078.902 rows=999277 loops=1
         Worker 1:  actual time=12917.375..25456.751 rows=1100607 loops=1
         Worker 2:  actual time=13041.088..24022.445 rows=900562 loops=1
         Worker 3:  actual time=13032.732..25230.101 rows=1001386 loops=1
         ->  Parallel Seq Scan on public.orders
(cost=0.00..1568701.00 rows=18750000 width=16) (actual
time=0.059..2764.881 rows=15000000 loops=
5)
               Output: o_orderkey, o_custkey, o_orderstatus,
o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority,
o_comment
               Worker 0:  actual time=0.056..2754.621 rows=14924063 loops=1
               Worker 1:  actual time=0.063..2815.688 rows=16241825 loops=1
               Worker 2:  actual time=0.067..2750.927 rows=14064529 loops=1
               Worker 3:  actual time=0.055..2753.620 rows=14699841 loops=1
 Planning Time: 0.209 ms
 Execution Time: 27728.363 ms
(19 rows)


I think both parallel batch-wise grouping aggregate and the batch-wise
hash aggregate are giving very huge improvement when the typical group
size is small.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: parallel distinct union and aggregate support patch

From
Heikki Linnakangas
Date:
I also had a quick look at the patch and the comments made so far. Summary:

1. The performance results are promising.

2. The code needs comments.

Regarding the design:

Thomas Munro mentioned the idea of a "Parallel Repartition" node that 
would redistribute tuples like this. As I understand it, the difference 
is that this BatchSort implementation collects all tuples in a tuplesort 
or a tuplestore, while a Parallel Repartition node would just 
redistribute the tuples to the workers, without buffering. The receiving 
worker could put the tuples to a tuplestore or sort if needed.

I think a non-buffering Reparttion node would be simpler, and thus 
better. In these patches, you have a BatchSort node, and batchstore, but 
a simple Parallel Repartition node could do both. For example, to 
implement distinct:

Gather
-  > Unique
        -> Sort
            -> Parallel Redistribute
                -> Parallel Seq Scan

And a Hash Agg would look like this:

Gather
-  > Hash Agg
         -> Parallel Redistribute
             -> Parallel Seq Scan


I'm marking this as Waiting on Author in the commitfest.

- Heikki



Re: parallel distinct union and aggregate support patch

From
Robert Haas
Date:
On Fri, Nov 27, 2020 at 10:55 AM Heikki Linnakangas <hlinnaka@iki.fi> wrote:
> I think a non-buffering Reparttion node would be simpler, and thus
> better. In these patches, you have a BatchSort node, and batchstore, but
> a simple Parallel Repartition node could do both. For example, to
> implement distinct:
>
> Gather
> -  > Unique
>         -> Sort
>             -> Parallel Redistribute
>                 -> Parallel Seq Scan
>
> And a Hash Agg would look like this:
>
> Gather
> -  > Hash Agg
>          -> Parallel Redistribute
>              -> Parallel Seq Scan
>
> I'm marking this as Waiting on Author in the commitfest.

I'm also intrigued by the parallel redistribute operator -- it seems
like it might be more flexible than this approach. However, I'm
concerned that there may be deadlock risks. If there is no buffer, or
a fixed-size buffer, the buffer might be full, and process trying to
jam tuples into the parallel redistribute would have to wait. Now if A
can wait for B and at the same time B can wait for A, deadlock will
ensue. In a naive implementation, this could happen with a single
parallel redistribute operator: worker 1 is trying to send a tuple to
worker 2, which can't receive it because it's busy sending a tuple to
worker 1. That could probably be fixed by arranging for workers to try
to try to receive data whenever they block in the middle of sending
data. However, in general there can be multiple nodes that cause
waiting in the tree: any number of Parallel Redistribute nodes, plus a
Gather, plus maybe other stuff. The cheap way out of that problem is
to use a buffer that can grow arbitrarily large, but that's not
terribly satisfying either.

-- 
Robert Haas
EDB: http://www.enterprisedb.com



Re: parallel distinct union and aggregate support patch

From
Dilip Kumar
Date:
On Fri, Nov 27, 2020 at 9:25 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:
>
> I also had a quick look at the patch and the comments made so far. Summary:
>
> 1. The performance results are promising.
>
> 2. The code needs comments.
>
> Regarding the design:
>
> Thomas Munro mentioned the idea of a "Parallel Repartition" node that
> would redistribute tuples like this. As I understand it, the difference
> is that this BatchSort implementation collects all tuples in a tuplesort
> or a tuplestore, while a Parallel Repartition node would just
> redistribute the tuples to the workers, without buffering.

I think the advantage of the "Parallel BatchSort" is that it give
flexibility to pick the batches dynamically by the worker after the
repartition.  OTOH if we distribute batches directly based on the
worker number the advantage is that the operator will be quite
flexible, e.g. if we want to implement the merge join we can just
place the "Parallel Repartition" node above both side of the scan node
and we will simply get the batch wise merge join because each worker
knows their batch.  Whereas if we allow workers to dynamically pick
the batch the right side node needs to know which batch to pick
because it is dynamically picked, I mean it is not as difficult
because it is the same worker but it seems less flexible.

 The receiving
> worker could put the tuples to a tuplestore or sort if needed.

If we are using it without buffering then the sending worker can
directly put the tuple into the respective sort/tuplestore node.

> I think a non-buffering Reparttion node would be simpler, and thus
> better. In these patches, you have a BatchSort node, and batchstore, but
> a simple Parallel Repartition node could do both. For example, to
> implement distinct:
>
> Gather
> -  > Unique
>         -> Sort
>             -> Parallel Redistribute
>                 -> Parallel Seq Scan
>
> And a Hash Agg would look like this:
>
> Gather
> -  > Hash Agg
>          -> Parallel Redistribute
>              -> Parallel Seq Scan
>
>
> I'm marking this as Waiting on Author in the commitfest.

I agree that the simple parallel redistribute/repartition node will be
flexible and could do both, but I see one problem. Basically, if we
use the common operator then first the Parallel Redistribute operator
will use the tuplestore for redistributing the data as per the worker
and then each worker might use the disk again to sort their respective
data.  Instead of that while redistributing the data itself we can use
the parallel sort so that each worker gets their respective batch in
form of sorted tapes.


-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:

> 1.
> +#define BATCH_SORT_MAX_BATCHES 512
>  
> Did you decide this number based on some experiment or is there some
> analysis behind selecting this number?
When there are too few batches, if a certain process works too slowly, it will cause unbalanced load.
When there are too many batches, FD will open and close files frequently.

> 2.
> +BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
> +{
> + BatchSortState *state;
> + TypeCacheEntry *typentry;
> ....
> + for (i=0;i<node->numGroupCols;++i)
> + {
> ...
> + InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
> + fcinfo->args[0].isnull = false;
> + state->groupFuns = lappend(state->groupFuns, fcinfo);
> + }
>  
> From the variable naming, it appeared like the batch sort is dependent
> upon the grouping node.  I think instead of using the name
> numGroupCols and groupFuns we need to use names that are more relevant
> to the batch sort something like numSortKey.
Not all data types support both sorting and hashing calculations, such as user-defined data types.
We do not need all columns to support hash calculation when we batch, so I used two variables.

> 3.
> + if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
> + {
> + /* for now, we only using in group aggregate */
> + ereport(ERROR,
> + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> + errmsg("not support execute flag(s) %d for group sort", eflags)));
> + }
>  
> Instead of ereport, you should just put an Assert for the unsupported
> flag or elog.
In fact, this is an unfinished feature, BatchSort should also support these features, welcome to supplement.

> 4.
> + state = makeNode(BatchSortState);
> + state->ps.plan = (Plan*) node;
> + state->ps.state = estate;
> + state->ps.ExecProcNode = ExecBatchSortPrepare;
>  
> I think the main executor entry function should be named ExecBatchSort
> instead of ExecBatchSortPrepare, it will look more consistent with the
> other executor machinery.
The job of the ExecBatchSortPrepare function is to preprocess the data (batch and pre-sort),
and when its work ends, it will call "ExecSetExecProcNode(pstate, ExecBatchSort)" to return the data to the ExecBatchSort function.
There is another advantage of dividing into two functions, 
It is not necessary to judge whether tuplesort is now available every time the function is processed to improve the subtle performance.
And I think this code is clearer.

Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
Now, I rewrite batch hashagg and sort, add some comment and combin too patches. base on master 2ad78a87f018260d4474eee63187e1cc73c9b976.
They are support rescan and change GUC enable_batch_hashagg/enable_batch_sort to max_hashagg_batches/max_sort_batch, default value is "0"(mean is disable).
The "max_hashagg_batches" in grouping sets each chain using this value, maybe we need a better algorithm.
Do not set "max_sort_batch" too large, because each tuplesort's work memory is "work_mem/max_sort_batch".

Next step I want use batch sort add parallel merge join(thinks Dilip Kumar) and except/intersect support after this patch commit, welcome to discuss.

Some test result:
hash group by: 17,974.797 ms -> 10,137.909 ms
sort group by: 117,475.380 ms -> 34,830.489 ms
grouping sets: 91,915.597 ms -> 24,585.103 ms
union: 95,765.297 ms -> 21,416.414 ms

---------------------------test details-------------------------------
Machine information:
Architecture:        x86_64
CPU(s):            88
Thread(s) per core:    2
Core(s) per socket:    22
Socket(s):          2
NUMA node(s):        2
Model name:          Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz

prepare data:
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,10*1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set max_parallel_workers_per_gather=8;
set work_mem = '100MB';

hash aggregate:
explain (verbose,costs off,analyze)
select sum(id),txt from gtest group by txt;
                                               QUERY PLAN                                                
---------------------------------------------------------------------------------------------------------
 Finalize HashAggregate (actual time=10832.805..17403.671 rows=10000000 loops=1)
   Output: sum(id), txt
   Group Key: gtest.txt
   Batches: 29  Memory Usage: 102489kB  Disk Usage: 404696kB
   ->  Gather (actual time=4389.345..7227.279 rows=10000058 loops=1)
         Output: txt, (PARTIAL sum(id))
         Workers Planned: 6
         Workers Launched: 6
         ->  Partial HashAggregate (actual time=4353.147..5992.183 rows=1428580 loops=7)
               Output: txt, PARTIAL sum(id)
               Group Key: gtest.txt
               Batches: 5  Memory Usage: 110641kB  Disk Usage: 238424kB
               Worker 0:  actual time=4347.155..5954.088 rows=1398608 loops=1
                 Batches: 5  Memory Usage: 114737kB  Disk Usage: 203928kB
               Worker 1:  actual time=4347.061..6209.121 rows=1443046 loops=1
                 Batches: 5  Memory Usage: 114737kB  Disk Usage: 224384kB
               Worker 2:  actual time=4347.175..5882.065 rows=1408238 loops=1
                 Batches: 5  Memory Usage: 110641kB  Disk Usage: 216360kB
               Worker 3:  actual time=4347.193..6015.830 rows=1477568 loops=1
                 Batches: 5  Memory Usage: 110641kB  Disk Usage: 240824kB
               Worker 4:  actual time=4347.210..5950.730 rows=1404288 loops=1
                 Batches: 5  Memory Usage: 110641kB  Disk Usage: 214872kB
               Worker 5:  actual time=4347.482..6064.460 rows=1439454 loops=1
                 Batches: 5  Memory Usage: 110641kB  Disk Usage: 239400kB
               ->  Parallel Seq Scan on public.gtest (actual time=0.051..1216.378 rows=14285714 loops=7)
                     Output: id, txt
                     Worker 0:  actual time=0.048..1219.133 rows=13986000 loops=1
                     Worker 1:  actual time=0.047..1214.860 rows=14430370 loops=1
                     Worker 2:  actual time=0.051..1222.124 rows=14082300 loops=1
                     Worker 3:  actual time=0.061..1213.851 rows=14775580 loops=1
                     Worker 4:  actual time=0.073..1216.712 rows=14042795 loops=1
                     Worker 5:  actual time=0.049..1210.870 rows=14394480 loops=1
 Planning Time: 0.673 ms
 Execution Time: 17974.797 ms
batch hash aggregate:
set max_hashagg_batches = 100;
explain (verbose,costs off,analyze)
select sum(id),txt from gtest group by txt;
                                            QUERY PLAN                                             
---------------------------------------------------------------------------------------------------
 Gather (actual time=5050.110..9757.292 rows=10000000 loops=1)
   Output: (sum(id)), txt
   Workers Planned: 6
   Workers Launched: 6
   ->  Parallel BatchHashAggregate (actual time=5032.178..7810.979 rows=1428571 loops=7)
         Output: sum(id), txt
         Group Key: gtest.txt
         Worker 0:  actual time=5016.488..7694.715 rows=1399958 loops=1
         Worker 1:  actual time=5021.651..7942.628 rows=1501753 loops=1
         Worker 2:  actual time=5018.327..7944.842 rows=1400176 loops=1
         Worker 3:  actual time=5082.977..7973.635 rows=1400818 loops=1
         Worker 4:  actual time=5019.229..7847.522 rows=1499952 loops=1
         Worker 5:  actual time=5017.086..7667.116 rows=1398470 loops=1
         ->  Parallel Seq Scan on public.gtest (actual time=0.055..1378.237 rows=14285714 loops=7)
               Output: id, txt
               Worker 0:  actual time=0.057..1349.870 rows=14533515 loops=1
               Worker 1:  actual time=0.052..1376.305 rows=13847620 loops=1
               Worker 2:  actual time=0.068..1382.226 rows=13836705 loops=1
               Worker 3:  actual time=0.071..1405.669 rows=13856130 loops=1
               Worker 4:  actual time=0.055..1406.186 rows=14677345 loops=1
               Worker 5:  actual time=0.045..1351.142 rows=15344825 loops=1
 Planning Time: 0.250 ms
 Execution Time: 10137.909 ms

sort aggregate:
set enable_hashagg = off;
set max_hashagg_batches = 0;
explain (verbose,costs off,analyze)
select sum(id),txt from gtest group by txt;
                                                   QUERY PLAN                                                   
----------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate (actual time=10370.559..116494.922 rows=10000000 loops=1)
   Output: sum(id), txt
   Group Key: gtest.txt
   ->  Gather Merge (actual time=10370.487..112470.148 rows=10000059 loops=1)
         Output: txt, (PARTIAL sum(id))
         Workers Planned: 6
         Workers Launched: 6
         ->  Partial GroupAggregate (actual time=8608.563..24526.716 rows=1428580 loops=7)
               Output: txt, PARTIAL sum(id)
               Group Key: gtest.txt
               Worker 0:  actual time=8283.755..18641.475 rows=887626 loops=1
               Worker 1:  actual time=8303.984..26206.673 rows=1536832 loops=1
               Worker 2:  actual time=8290.611..28110.145 rows=1676544 loops=1
               Worker 3:  actual time=10347.326..29912.135 rows=1783536 loops=1
               Worker 4:  actual time=8329.604..20262.795 rows=980352 loops=1
               Worker 5:  actual time=8322.877..27957.446 rows=1758958 loops=1
               ->  Sort (actual time=8608.501..21752.009 rows=14285714 loops=7)
                     Output: txt, id
                     Sort Key: gtest.txt
                     Sort Method: external merge  Disk: 349760kB
                     Worker 0:  actual time=8283.648..16831.068 rows=8876115 loops=1
                       Sort Method: external merge  Disk: 225832kB
                     Worker 1:  actual time=8303.927..23053.078 rows=15368320 loops=1
                       Sort Method: external merge  Disk: 391008kB
                     Worker 2:  actual time=8290.556..24735.395 rows=16765440 loops=1
                       Sort Method: external merge  Disk: 426552kB
                     Worker 3:  actual time=10347.264..26438.333 rows=17835210 loops=1
                       Sort Method: external merge  Disk: 453768kB
                     Worker 4:  actual time=8329.534..18248.302 rows=9803520 loops=1
                       Sort Method: external merge  Disk: 249408kB
                     Worker 5:  actual time=8322.827..24480.383 rows=17589430 loops=1
                       Sort Method: external merge  Disk: 447520kB
                     ->  Parallel Seq Scan on public.gtest (actual time=51.618..1530.850 rows=14285714 loops=7)
                           Output: txt, id
                           Worker 0:  actual time=49.907..1001.606 rows=8876115 loops=1
                           Worker 1:  actual time=51.011..1665.980 rows=15368320 loops=1
                           Worker 2:  actual time=50.087..1812.426 rows=16765440 loops=1
                           Worker 3:  actual time=51.010..1828.299 rows=17835210 loops=1
                           Worker 4:  actual time=42.614..1077.896 rows=9803520 loops=1
                           Worker 5:  actual time=51.010..1790.012 rows=17589430 loops=1
 Planning Time: 0.119 ms
 Execution Time: 117475.380 ms
batch sort aggregate:
set max_sort_batches = 21;
explain (verbose,costs off,analyze)
select sum(id),txt from gtest group by txt;
                                                QUERY PLAN                                                
----------------------------------------------------------------------------------------------------------
 Gather (actual time=18699.622..34438.083 rows=10000000 loops=1)
   Output: (sum(id)), txt
   Workers Planned: 6
   Workers Launched: 6
   ->  GroupAggregate (actual time=18671.875..31121.607 rows=1428571 loops=7)
         Output: sum(id), txt
         Group Key: gtest.txt
         Worker 0:  actual time=18669.038..30913.680 rows=1427622 loops=1
         Worker 1:  actual time=18674.356..31045.516 rows=1430448 loops=1
         Worker 2:  actual time=18677.565..31375.340 rows=1427636 loops=1
         Worker 3:  actual time=18667.879..31359.458 rows=1427935 loops=1
         Worker 4:  actual time=18669.760..31263.414 rows=1430220 loops=1
         Worker 5:  actual time=18645.428..30813.141 rows=1427411 loops=1
         ->  Parallel BatchSort (actual time=18671.796..29348.606 rows=14285714 loops=7)
               Output: txt, id
               Sort Key: gtest.txt
               batches: 21
               Worker 0:  actual time=18668.856..29172.519 rows=14276220 loops=1
               Worker 1:  actual time=18674.287..29280.794 rows=14304480 loops=1
               Worker 2:  actual time=18677.501..29569.974 rows=14276360 loops=1
               Worker 3:  actual time=18667.801..29558.286 rows=14279350 loops=1
               Worker 4:  actual time=18669.689..29468.636 rows=14302200 loops=1
               Worker 5:  actual time=18645.367..29076.665 rows=14274110 loops=1
               ->  Parallel Seq Scan on public.gtest (actual time=50.164..1893.727 rows=14285714 loops=7)
                     Output: txt, id
                     Worker 0:  actual time=50.058..1818.959 rows=13953440 loops=1
                     Worker 1:  actual time=50.974..1723.268 rows=13066735 loops=1
                     Worker 2:  actual time=48.050..1855.469 rows=13985175 loops=1
                     Worker 3:  actual time=49.640..1791.897 rows=12673240 loops=1
                     Worker 4:  actual time=48.027..1932.927 rows=14586880 loops=1
                     Worker 5:  actual time=51.151..2094.981 rows=16360290 loops=1
 Planning Time: 0.160 ms
 Execution Time: 34830.489 ms

normal grouping sets:
set enable_hashagg = on;
set max_sort_batches = 0;
set max_hashagg_batches = 0;
explain (costs off,verbose,analyze)
select sum(id),txt from gtest group by grouping sets(id,txt,());
                                                QUERY PLAN                                                
----------------------------------------------------------------------------------------------------------
 MixedAggregate (actual time=4563.123..90348.608 rows=20000001 loops=1)
   Output: sum(id), txt, id
   Hash Key: gtest.txt
   Group Key: gtest.id
   Group Key: ()
   Batches: 29  Memory Usage: 114737kB  Disk Usage: 3241968kB
   ->  Gather Merge (actual time=4563.070..39429.593 rows=100000000 loops=1)
         Output: txt, id
         Workers Planned: 6
         Workers Launched: 6
         ->  Sort (actual time=4493.638..7532.910 rows=14285714 loops=7)
               Output: txt, id
               Sort Key: gtest.id
               Sort Method: external merge  Disk: 353080kB
               Worker 0:  actual time=4474.665..7853.595 rows=14327510 loops=1
                 Sort Method: external merge  Disk: 364528kB
               Worker 1:  actual time=4492.273..7796.141 rows=14613250 loops=1
                 Sort Method: external merge  Disk: 371776kB
               Worker 2:  actual time=4472.937..7626.318 rows=14339905 loops=1
                 Sort Method: external merge  Disk: 364840kB
               Worker 3:  actual time=4480.141..7730.419 rows=14406135 loops=1
                 Sort Method: external merge  Disk: 366528kB
               Worker 4:  actual time=4490.723..7581.102 rows=13971200 loops=1
                 Sort Method: external merge  Disk: 355096kB
               Worker 5:  actual time=4482.204..7894.434 rows=14464410 loops=1
                 Sort Method: external merge  Disk: 368008kB
               ->  Parallel Seq Scan on public.gtest (actual time=27.040..1514.516 rows=14285714 loops=7)
                     Output: txt, id
                     Worker 0:  actual time=23.111..1514.219 rows=14327510 loops=1
                     Worker 1:  actual time=22.696..1528.771 rows=14613250 loops=1
                     Worker 2:  actual time=23.119..1519.190 rows=14339905 loops=1
                     Worker 3:  actual time=22.705..1525.183 rows=14406135 loops=1
                     Worker 4:  actual time=23.134..1509.694 rows=13971200 loops=1
                     Worker 5:  actual time=23.652..1516.585 rows=14464410 loops=1
 Planning Time: 0.162 ms
 Execution Time: 91915.597 ms

batch grouping sets:
set max_hashagg_batches = 100;
explain (costs off,verbose,analyze)
select sum(id),txt from gtest group by grouping sets(id,txt,());
                                            QUERY PLAN                                             
---------------------------------------------------------------------------------------------------
 Gather (actual time=9082.581..23203.803 rows=20000001 loops=1)
   Output: (sum(id)), txt, id
   Workers Planned: 6
   Workers Launched: 6
   ->  Parallel BatchHashAggregate (actual time=9040.895..15911.190 rows=2857143 loops=7)
         Output: sum(id), txt, id
         Group Key: gtest.id
         Group Key: ()
         Group Key: gtest.txt
         Worker 0:  actual time=9031.714..15499.292 rows=3101124 loops=1
         Worker 1:  actual time=9038.217..15403.655 rows=3100997 loops=1
         Worker 2:  actual time=9030.557..15157.267 rows=3103320 loops=1
         Worker 3:  actual time=9034.391..15537.851 rows=3100505 loops=1
         Worker 4:  actual time=9037.079..19823.359 rows=1400191 loops=1
         Worker 5:  actual time=9032.359..15012.338 rows=3097137 loops=1
         ->  Parallel Seq Scan on public.gtest (actual time=0.052..1506.109 rows=14285714 loops=7)
               Output: id, txt
               Worker 0:  actual time=0.058..1521.705 rows=13759375 loops=1
               Worker 1:  actual time=0.054..1514.218 rows=13758635 loops=1
               Worker 2:  actual time=0.062..1531.244 rows=14456270 loops=1
               Worker 3:  actual time=0.050..1506.569 rows=14451930 loops=1
               Worker 4:  actual time=0.053..1495.908 rows=15411240 loops=1
               Worker 5:  actual time=0.055..1503.382 rows=14988885 loops=1
 Planning Time: 0.160 ms
 Execution Time: 24585.103 ms

normal union:
set max_hashagg_batches = 0;
set max_sort_batches = 0;
explain (verbose,costs false,analyze)
select * from gtest union select * from gtest;
                                               QUERY PLAN                                                
---------------------------------------------------------------------------------------------------------
 Unique (actual time=53939.294..94666.573 rows=10000000 loops=1)
   Output: gtest.id, gtest.txt
   ->  Sort (actual time=53939.292..76581.157 rows=200000000 loops=1)
         Output: gtest.id, gtest.txt
         Sort Key: gtest.id, gtest.txt
         Sort Method: external merge  Disk: 4871024kB
         ->  Append (actual time=0.020..25832.476 rows=200000000 loops=1)
               ->  Seq Scan on public.gtest (actual time=0.019..7074.113 rows=100000000 loops=1)
                     Output: gtest.id, gtest.txt
               ->  Seq Scan on public.gtest gtest_1 (actual time=0.006..7067.898 rows=100000000 loops=1)
                     Output: gtest_1.id, gtest_1.txt
 Planning Time: 0.152 ms
 Execution Time: 95765.297 ms

batch hash aggregate union:
set max_hashagg_batches = 100;
explain (verbose,costs false,analyze)
select * from gtest union select * from gtest;
                                                   QUERY PLAN                                                    
-----------------------------------------------------------------------------------------------------------------
 Gather (actual time=11623.986..21021.317 rows=10000000 loops=1)
   Output: gtest.id, gtest.txt
   Workers Planned: 6
   Workers Launched: 6
   ->  Parallel BatchHashAggregate (actual time=11636.753..16584.067 rows=1428571 loops=7)
         Output: gtest.id, gtest.txt
         Group Key: gtest.id, gtest.txt
         Worker 0:  actual time=11631.225..16846.376 rows=1500587 loops=1
         Worker 1:  actual time=11553.019..16233.006 rows=1397874 loops=1
         Worker 2:  actual time=11581.523..16807.962 rows=1499049 loops=1
         Worker 3:  actual time=11593.865..16416.381 rows=1399579 loops=1
         Worker 4:  actual time=11772.115..16783.605 rows=1400961 loops=1
         Worker 5:  actual time=11702.415..16571.841 rows=1400943 loops=1
         ->  Parallel Append (actual time=0.047..4339.450 rows=28571429 loops=7)
               Worker 0:  actual time=0.062..4396.130 rows=28591565 loops=1
               Worker 1:  actual time=0.053..4383.983 rows=29536360 loops=1
               Worker 2:  actual time=0.045..4305.253 rows=28282900 loops=1
               Worker 3:  actual time=0.053..4295.805 rows=28409625 loops=1
               Worker 4:  actual time=0.061..4314.450 rows=28363645 loops=1
               Worker 5:  actual time=0.015..4311.121 rows=29163585 loops=1
               ->  Parallel Seq Scan on public.gtest (actual time=0.030..1201.563 rows=14285714 loops=7)
                     Output: gtest.id, gtest.txt
                     Worker 0:  actual time=0.019..281.903 rows=3277090 loops=1
                     Worker 1:  actual time=0.050..2473.135 rows=29536360 loops=1
                     Worker 2:  actual time=0.021..273.766 rows=3252955 loops=1
                     Worker 3:  actual time=0.018..285.911 rows=3185145 loops=1
                     Worker 4:  actual time=0.058..2387.626 rows=28363645 loops=1
                     Worker 5:  actual time=0.013..2432.342 rows=29163585 loops=1
               ->  Parallel Seq Scan on public.gtest gtest_1 (actual time=0.048..2140.373 rows=25000000 loops=4)
                     Output: gtest_1.id, gtest_1.txt
                     Worker 0:  actual time=0.059..2173.690 rows=25314475 loops=1
                     Worker 2:  actual time=0.043..2114.314 rows=25029945 loops=1
                     Worker 3:  actual time=0.050..2142.670 rows=25224480 loops=1
 Planning Time: 0.137 ms
 Execution Time: 21416.414 ms

bucoo@sohu.com
Attachment

Re: parallel distinct union and aggregate support patch

From
David Steele
Date:
On 1/25/21 9:14 AM, bucoo@sohu.com wrote:
> Now, I rewrite batch hashagg and sort, add some comment and combin too 
> patches. base on master 2ad78a87f018260d4474eee63187e1cc73c9b976.
> They are support rescan and change GUC 
> enable_batch_hashagg/enable_batch_sort to 
> max_hashagg_batches/max_sort_batch, default value is "0"(mean is disable).
> The "max_hashagg_batches" in grouping sets each chain using this value, 
> maybe we need a better algorithm.
> Do not set "max_sort_batch" too large, because each tuplesort's work 
> memory is "work_mem/max_sort_batch".
> 
> Next step I want use batch sort add parallel merge join(thinks Dilip 
> Kumar) and except/intersect support after this patch commit, welcome to 
> discuss.

This patch has not gotten any review in the last two CFs and is unlikely 
to be committed for PG14 so I have moved it to the 2021-07 CF. A rebase 
is also required so marked Waiting for Author.

I can see this is a work in progress, but you may want to consider the 
several suggestions that an unbuffered approach might be better.

Regards,
-- 
-David
david@pgmasters.net



Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
> This patch has not gotten any review in the last two CFs and is unlikely
> to be committed for PG14 so I have moved it to the 2021-07 CF. A rebase
> is also required so marked Waiting for Author.
>  
> I can see this is a work in progress, but you may want to consider the
> several suggestions that an unbuffered approach might be better.

I have written a plan with similar functions, It is known that the following two situations do not work well.
1. Under "Parallel Append" plan
  Gather
  -> Parallel Append
      -> Agg
          -> Parallel Redistribute(1)
              -> ...
      -> Agg
          -> Parallel Redistribute(2)
              -> ...
  when parallel worker 1 execute "Parallel Redistribute(1)" and worker execute "Parallel Redistribute(2)",
  both "Parallel Redistribute" plan can not send tuples to other worker(both worker are stuck),
  because outher worker's memory buffer run out soon.

2. Under "Nestloop" plan
  Gather
  -> Nestloop(1)
      -> Nestloop(2)
          -> Parallel Redistribute
              -> ...
          -> IndexScan
      -> Agg
  At some point might be the case: parallel worker 1 executing Agg and "Parallel Redistribute" plan's memory buffer is full,
  worker 2 executing "Parallel Redistribute" and it waiting worker 1 eat "Parallel Redistribute" plan's memory buffer,
  it's stuck.



bucoo@sohu.com

Re: Re: parallel distinct union and aggregate support patch

From
David Rowley
Date:
On Tue, 30 Mar 2021 at 22:33, bucoo@sohu.com <bucoo@sohu.com> wrote:
> I have written a plan with similar functions, It is known that the following two situations do not work well.

I read through this thread and also wondered about a Parallel
Partition type operator.  It also seems to me that if it could be done
this way then you could just plug in existing nodes to get Sorting and
Aggregation rather than having to modify existing nodes to get them to
do what you need.

From what I've seen looking over the thread, a few people suggested
this and I didn't see anywhere where you responded to them about the
idea.  Just so you're aware, contributing to PostgreSQL is not a case
of throwing code at a wall and seeing which parts stick.  You need to
interact and respond to people reviewing your work. This is especially
true for the people who actually have the authority to merge any of
your work with the main code repo.

It seems to me you might be getting off to a bad start and you might
not be aware of this process. So I hope this email will help put you
on track.

Some of the people that you've not properly responded to include:

Thomas Munro:  PostgreSQL committer. Wrote Parallel Hash Join.
Robert Hass: PostgreSQL committer. Wrote much of the original parallel
query code
Heikki Linnakangas: PostgreSQL committer. Worked on many parts of the
planner and executor. Also works for the company that develops
Greenplum, a massively parallel processing RDBMS, based on Postgres.

You might find other information in [1].

If I wanted to do what you want to do, I think those 3 people might be
some of the last people I'd pick to ignore questions from! :-)

Also, I'd say also copying in Tom Lane randomly when he's not shown
any interest in the patch here is likely not a good way of making
forward progress.  You might find that it might bubble up on his radar
if you start constructively interacting with the people who have
questioned your design.  I'd say that should be your next step.

The probability of anyone merging any of your code without properly
discussing the design with the appropriate people are either very
close to zero or actually zero.

I hope this email helps you get on track.

David

[1] https://www.postgresql.org/community/contributors/



Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
Sorry, this email was marked spam by sohu, so I didn't notice it, and last few months I work hard for merge PostgreSQL 14 to our cluster version(github.com/ADBSQL/AntDB).

I have an idea how to make "Parallel Redistribute" work, even under "Parallel Append" and "Nestloop". but "grouping sets" can not work in parallel mode using "Parallel Redistribute".
Wait days please, path coming soon.

 
Date: 2021-07-06 10:48
Subject: Re: Re: parallel distinct union and aggregate support patch
On Tue, 30 Mar 2021 at 22:33, bucoo@sohu.com <bucoo@sohu.com> wrote:
> I have written a plan with similar functions, It is known that the following two situations do not work well.
 
I read through this thread and also wondered about a Parallel
Partition type operator.  It also seems to me that if it could be done
this way then you could just plug in existing nodes to get Sorting and
Aggregation rather than having to modify existing nodes to get them to
do what you need.
 
From what I've seen looking over the thread, a few people suggested
this and I didn't see anywhere where you responded to them about the
idea.  Just so you're aware, contributing to PostgreSQL is not a case
of throwing code at a wall and seeing which parts stick.  You need to
interact and respond to people reviewing your work. This is especially
true for the people who actually have the authority to merge any of
your work with the main code repo.
 
It seems to me you might be getting off to a bad start and you might
not be aware of this process. So I hope this email will help put you
on track.
 
Some of the people that you've not properly responded to include:
 
Thomas Munro:  PostgreSQL committer. Wrote Parallel Hash Join.
Robert Hass: PostgreSQL committer. Wrote much of the original parallel
query code
Heikki Linnakangas: PostgreSQL committer. Worked on many parts of the
planner and executor. Also works for the company that develops
Greenplum, a massively parallel processing RDBMS, based on Postgres.
 
You might find other information in [1].
 
If I wanted to do what you want to do, I think those 3 people might be
some of the last people I'd pick to ignore questions from! :-)
 
Also, I'd say also copying in Tom Lane randomly when he's not shown
any interest in the patch here is likely not a good way of making
forward progress.  You might find that it might bubble up on his radar
if you start constructively interacting with the people who have
questioned your design.  I'd say that should be your next step.
 
The probability of anyone merging any of your code without properly
discussing the design with the appropriate people are either very
close to zero or actually zero.
 
I hope this email helps you get on track.
 
David
 
[1] https://www.postgresql.org/community/contributors/

Re: Re: parallel distinct union and aggregate support patch

From
"bucoo@sohu.com"
Date:
That are busy days, sorry patchs too later.
Here is an unbuffered plan Redistribute for parallel aggregate/distinct/union, 
like this(when new GUC param redistribute_query_size large then 0):
 Gather
   ->  Finalize HashAggregate
         ->  Parallel Redistribute
               ->  Partial HashAggregate
                     ->  Parallel Seq Scan on test
0001-xxx.patch:
Fix cost_subqueryscan() get wrong parallel cost, it always same as none parallel path.
If not apply this patch parallel union always can't be choose.

How Redistribute work:
Each have N*MQ + 1*SharedTuplestore, N is parallel workers number(include leader).
1. Alloc shared memory for Redistribute(using plan parallel worker number).
2. Leader worker after all parallel workers launched change "final_worker_num" to launched workers number.
3. Each worker try to get a unique part number. part number count is "final_worker_num".
4. If get a invalid part number return null tuple.
5. Try read tuple from MQ, if get a tuple then return it, else goto next step.
6-0. Get tuple from outer, if get a tuple compute mod as "hash value % final_worker_num", else goto step 7.
6-1. If mod equal our part number then return this tuple.
6-2. Use mod get part's MQ and try write tuple to the MQ, if write success got step 6-0.
6-3. Write tuple to part's SharedTuplestore.
7. Read tuple from MQ, if get a tuple then return it, else close all opend MQ and goto next step.
8. Read tuple from SharedTuplestore, if get a tuple then return it, else close it and goto next step.
9. Try get next unique part number, if get an invalid part number then return null tuple, else goto step 7.

In step "6-2" we can't use shm_mq_send() function, because it maybe write partial data,
if this happend we must write remaining data to this MQ, so we must wait other worker read some date from this MQ.
However, we do't want to wait(this may cause all worker to wait for each other).
So, I write a new function named shm_mq_send_once(). It like shm_mq_send, but return would block immediately when
no space for write data and "do not write any data" to MQ.
This will cause a problem, when MQ ring size small then tuple size, it never write to MQ(write to SharedTuplestore).
So it's best to make sure that MQ has enough space for tuple(change GUC param "redistribute_query_size").

Execute comparison
prepare data:
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,10*1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set max_parallel_workers_per_gather=8;
set work_mem = '256MB';

hash aggregate
explain (verbose,analyze,costs off)
select sum(id),txt from gtest group by txt;
                                               QUERY PLAN
---------------------------------------------------------------------------------------------------------
 Finalize HashAggregate (actual time=11733.519..19075.309 rows=10000000 loops=1)
   Output: sum(id), txt
   Group Key: gtest.txt
   Batches: 21  Memory Usage: 262201kB  Disk Usage: 359808kB
   ->  Gather (actual time=5540.052..8029.550 rows=10000056 loops=1)
         Output: txt, (PARTIAL sum(id))
         Workers Planned: 6
         Workers Launched: 6
         ->  Partial HashAggregate (actual time=5534.690..5914.643 rows=1428579 loops=7)
               Output: txt, PARTIAL sum(id)
               Group Key: gtest.txt
               Batches: 1  Memory Usage: 188433kB
               Worker 0:  actual time=5533.956..5913.461 rows=1443740 loops=1
                 Batches: 1  Memory Usage: 188433kB
               Worker 1:  actual time=5533.552..5913.595 rows=1400439 loops=1
                 Batches: 1  Memory Usage: 188433kB
               Worker 2:  actual time=5533.553..5913.357 rows=1451759 loops=1
                 Batches: 1  Memory Usage: 188433kB
               Worker 3:  actual time=5533.834..5907.952 rows=1379830 loops=1
                 Batches: 1  Memory Usage: 180241kB
               Worker 4:  actual time=5533.782..5912.408 rows=1428060 loops=1
                 Batches: 1  Memory Usage: 188433kB
               Worker 5:  actual time=5534.271..5910.458 rows=1426987 loops=1
                 Batches: 1  Memory Usage: 188433kB
               ->  Parallel Seq Scan on public.gtest (actual time=0.022..1523.231 rows=14285714 loops=7)
                     Output: id, txt
                     Worker 0:  actual time=0.032..1487.403 rows=14437315 loops=1
                     Worker 1:  actual time=0.016..1635.675 rows=14004315 loops=1
                     Worker 2:  actual time=0.015..1482.005 rows=14517505 loops=1
                     Worker 3:  actual time=0.017..1664.469 rows=13798225 loops=1
                     Worker 4:  actual time=0.018..1471.233 rows=14280520 loops=1
                     Worker 5:  actual time=0.030..1463.973 rows=14269790 loops=1
 Planning Time: 0.075 ms
 Execution Time: 19575.976 ms

parallel hash aggregate
set redistribute_query_size = '256kB';
explain (verbose,analyze,costs off)
select sum(id),txt from gtest group by txt;
                                                  QUERY PLAN
---------------------------------------------------------------------------------------------------------------
 Gather (actual time=9710.061..11372.560 rows=10000000 loops=1)
   Output: (sum(id)), txt
   Workers Planned: 6
   Workers Launched: 6
   ->  Finalize HashAggregate (actual time=9703.098..10082.575 rows=1428571 loops=7)
         Output: sum(id), txt
         Group Key: gtest.txt
         Batches: 1  Memory Usage: 188433kB
         Worker 0:  actual time=9701.365..10077.995 rows=1428857 loops=1
           Batches: 1  Memory Usage: 188433kB
         Worker 1:  actual time=9701.415..10095.876 rows=1430065 loops=1
           Batches: 1  Memory Usage: 188433kB
         Worker 2:  actual time=9701.315..10077.635 rows=1425811 loops=1
           Batches: 1  Memory Usage: 188433kB
         Worker 3:  actual time=9703.047..10088.985 rows=1427745 loops=1
           Batches: 1  Memory Usage: 188433kB
         Worker 4:  actual time=9703.166..10077.937 rows=1431644 loops=1
           Batches: 1  Memory Usage: 188433kB
         Worker 5:  actual time=9701.809..10076.922 rows=1426156 loops=1
           Batches: 1  Memory Usage: 188433kB
         ->  Parallel Redistribute (actual time=5593.440..9036.392 rows=1428579 loops=7)
               Output: txt, (PARTIAL sum(id))
               Hash Key: gtest.txt
               Parts: 1  Disk Usage: 0kB  Disk Rows: 0
               Worker 0:  actual time=5591.812..9036.394 rows=1428865 loops=1
                 Parts: 1  Disk Usage: 0kB  Disk Rows: 0
               Worker 1:  actual time=5591.773..9002.576 rows=1430072 loops=1
                 Parts: 1  Disk Usage: 0kB  Disk Rows: 0
               Worker 2:  actual time=5591.774..9039.341 rows=1425817 loops=1
                 Parts: 1  Disk Usage: 0kB  Disk Rows: 0
               Worker 3:  actual time=5593.635..9040.148 rows=1427753 loops=1
                 Parts: 1  Disk Usage: 0kB  Disk Rows: 0
               Worker 4:  actual time=5593.565..9044.528 rows=1431652 loops=1
                 Parts: 1  Disk Usage: 0kB  Disk Rows: 0
               Worker 5:  actual time=5592.220..9043.953 rows=1426167 loops=1
                 Parts: 1  Disk Usage: 0kB  Disk Rows: 0
               ->  Partial HashAggregate (actual time=5566.237..5990.671 rows=1428579 loops=7)
                     Output: txt, PARTIAL sum(id)
                     Group Key: gtest.txt
                     Batches: 1  Memory Usage: 188433kB
                     Worker 0:  actual time=5565.941..5997.635 rows=1449687 loops=1
                       Batches: 1  Memory Usage: 188433kB
                     Worker 1:  actual time=5565.930..6073.977 rows=1400013 loops=1
                       Batches: 1  Memory Usage: 188433kB
                     Worker 2:  actual time=5565.945..5975.454 rows=1446727 loops=1
                       Batches: 1  Memory Usage: 188433kB
                     Worker 3:  actual time=5567.673..5981.978 rows=1396379 loops=1
                       Batches: 1  Memory Usage: 180241kB
                     Worker 4:  actual time=5567.622..5972.500 rows=1415832 loops=1
                       Batches: 1  Memory Usage: 188433kB
                     Worker 5:  actual time=5566.148..5962.503 rows=1415665 loops=1
                       Batches: 1  Memory Usage: 188433kB
                     ->  Parallel Seq Scan on public.gtest (actual time=0.022..1520.647 rows=14285714 loops=7)
                           Output: id, txt
                           Worker 0:  actual time=0.021..1476.653 rows=14496785 loops=1
                           Worker 1:  actual time=0.020..1519.023 rows=14000060 loops=1
                           Worker 2:  actual time=0.020..1476.707 rows=14467185 loops=1
                           Worker 3:  actual time=0.019..1654.088 rows=13963715 loops=1
                           Worker 4:  actual time=0.027..1527.803 rows=14158235 loops=1
                           Worker 5:  actual time=0.030..1514.247 rows=14156570 loops=1
 Planning Time: 0.080 ms
 Execution Time: 11830.773 ms


Attachment

Re: parallel distinct union and aggregate support patch

From
Daniel Gustafsson
Date:
> On 29 Mar 2021, at 15:36, David Steele <david@pgmasters.net> wrote:

> A rebase is also required so marked Waiting for Author.

Many months on and this patch still needs a rebase to apply, and the thread has
stalled.  I'm marking this Returned with Feedback.  Please feel free to open a
new entry if you return to this patch.

--
Daniel Gustafsson        https://vmware.com/