Thread: [PoC] Asynchronous execution again (which is not parallel)

[PoC] Asynchronous execution again (which is not parallel)

From
Kyotaro HORIGUCHI
Date:
Hello, the parallel scan became to work. So I'd like to repropose
the 'asynchronous execution' or 'early execution'.

In previous proposal, I had only foreign scan as workable
example, but now I can use the parallel execution instead to make
this distinctive from parallel execution itself.

I could put more work on this before proposal but I'd like to
show this at this time in order to judge wheter this deserves
further work.


==== Overview of asynchronos execution

"Asynchronous execution" is a feature to start substantial work
of nodes before doing Exec*. This can reduce total startup time
by folding startup time of multiple execution nodes. Especially
effective for the combination of joins or appends and their
multiple children that needs long time to startup.

This patch does that by inserting another phase "Start*" between
ExecInit* and Exec* to launch parallel processing including
pgworker and FDWs before requesting the very first tuple of the
result.

==== About this patch

As a proof of concept, the first tree patchs adds such start
phase to executor and add facility to trace node status for
almost all kind of the executor nodes (Part of this would be
useless, though). Then the two last implement an example usage of
the infrastracture.

The two introduced GUCs enable_parasortmerge and
enable_asyncexec respecively controls whether to use gather for
sorts under merge join and whether to make asyncronous execution
effective.

For evaluation, I made merge join to use bgworker for some
codition as an example. It is mere a mock implement but enough to
show the difference between parallel execution and async
execution (More appropriate names are welcome) and its
effectiveness. Thanks for Amit's great work.

==== Performance test

Apply all the patches then do the following in order. Of course
this test is artificially made so that this patch wins:)

CREATE TABLE t1 (a int, b int);
CREATE TABLE t2 (a int, b int);
CREATE TABLE t3 (a int, b int);
INSERT INTO t1 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM generate_series(0, 999999) a);
INSERT INTO t2 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM generate_series(0, 999999) a);
INSERT INTO t3 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM generate_series(0, 999999) a);
ANALYZE t1;
ANALYZE t2;
ANALYZE t3;
SET enable_nestloop TO true;
SET enable_hashjoin TO true;
SET enable_material TO true;
SET enable_parasortmerge TO false;
SET enable_asyncexec TO false;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT
10;
SET enable_nestloop TO false;
SET enable_hashjoin TO false;
SET enable_material TO false;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT
10;
SET enable_parasortmerge TO true;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT
10;
SET enable_asyncexec TO true;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT
10;

==== Test results

On my environment, the following results were given.

- The first attempt, planner chooses hash join plan and it takes about 3.3s.

- The second, Merge Joins are done in single backend, takes about 5.1s.

- The third, simply use parallel execution of MJ, takes about 5.8s

- The fourth, start execution asynchronously of MJ, takes about 3.0s.

So asynchronous exeuction at least accelerates parallel execution
for this case, even faster than the current fastest (maybe) plan.


====== TODO or random thoughts, not restricted on this patch.

- This patch doesn't contain planner part, it must be aware of async execution in order that this can be  in
effective.

- Some measture to control execution on bgworker would be needed. At least merge join requires position mark/reset
functions.

- Currently, more tuples make reduce effectiveness of parallel execution, some method to transfer tuples in larger unit
wouldbe needed, or would be good to have shared workmem?
 

- The term "asynchronous execution" looks a little confusing with paralle execution. Early execution/start might be
usablebut I'm not so confident.
 


Any suggestions? thoughts?

I must apologize for the incomplete proposal and cluttered thoughts.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: [PoC] Asynchronous execution again (which is not parallel)

From
Amit Kapila
Date:
On Mon, Nov 30, 2015 at 6:17 PM, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
>
>
> ====== TODO or random thoughts, not restricted on this patch.
>
> - This patch doesn't contain planner part, it must be aware of
>   async execution in order that this can be  in effective.
>

How will you decide whether sync-execution is cheaper than parallel
execution.  Do you have some specific cases in mind where async
execution will be more useful than parallel execution?
 
>
> - Some measture to control execution on bgworker would be
>   needed. At least merge join requires position mark/reset
>   functions.
>
> - Currently, more tuples make reduce effectiveness of parallel
>   execution, some method to transfer tuples in larger unit would
>   be needed, or would be good to have shared workmem?
>

Yeah, I think here one thing we need to figure out is whether the
performance bottleneck is due to the amount of data that is transferred
between worker and master or something else. One idea could be to pass
TID and may be keep the buffer pin (which will be released by master
backend), but on the other hand if we have to perform costly target list
evaluation by bgworker, then it might be beneficial to pass the projected
list back.


With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Re: [PoC] Asynchronous execution again (which is not parallel)

From
Kyotaro HORIGUCHI
Date:
Thank you for picking this up.

At Tue, 1 Dec 2015 20:33:02 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in
<CAA4eK1LBwj7heY8pxRmMCOLhuMFr81TLHck-+ByBFuUADgeu+A@mail.gmail.com>
> On Mon, Nov 30, 2015 at 6:17 PM, Kyotaro HORIGUCHI <
> horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > ====== TODO or random thoughts, not restricted on this patch.
> >
> > - This patch doesn't contain planner part, it must be aware of
> >   async execution in order that this can be  in effective.
> >
> 
> How will you decide whether sync-execution is cheaper than parallel
> execution.  Do you have some specific cases in mind where async
> execution will be more useful than parallel execution?

Mmm.. Some confusion in wording? Sync-async is a discrimination
about when to start execution of a node (and its
descendents). Parallel-serial(sequential) is that of whether
multiple nodes can execute simultaneously. Async execution
premises parallel execution in any terms, bgworker or FDW.

As I wrote in the previous mail, async execution reduces startup
time of execution of parallel execution. So async execution is
not useful than parallel execution, but it accelerates parallel
execution. Is is effective when startup time of every parallel
execution node is rather long. We have enough numbers to cost it.

> > - Some measture to control execution on bgworker would be
> >   needed. At least merge join requires position mark/reset
> >   functions.
> >
> > - Currently, more tuples make reduce effectiveness of parallel
> >   execution, some method to transfer tuples in larger unit would
> >   be needed, or would be good to have shared workmem?
> >
> 
> Yeah, I think here one thing we need to figure out is whether the
> performance bottleneck is due to the amount of data that is transferred
> between worker and master or something else. One idea could be to pass
> TID and may be keep the buffer pin (which will be released by master
> backend), but on the other hand if we have to perform costly target list
> evaluation by bgworker, then it might be beneficial to pass the projected
> list back.

On possible bottle neck is singnalling between backends. Current
parallel execution uses signal to make producer-consumer world go
round. Conveying TID won't make it faster if the bottleneck is
the inter-process communication. I brought up bulk-transferring
or shared workmem as a example menas to reduce IPC frequency.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: [PoC] Asynchronous execution again (which is not parallel)

From
Amit Kapila
Date:
On Wed, Dec 2, 2015 at 7:45 AM, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
>
> Thank you for picking this up.
>
> At Tue, 1 Dec 2015 20:33:02 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in <CAA4eK1LBwj7heY8pxRmMCOLhuMFr81TLHck-+ByBFuUADgeu+A@mail.gmail.com>
> > On Mon, Nov 30, 2015 at 6:17 PM, Kyotaro HORIGUCHI <
> > horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > > ====== TODO or random thoughts, not restricted on this patch.
> > >
> > > - This patch doesn't contain planner part, it must be aware of
> > >   async execution in order that this can be  in effective.
> > >
> >
> > How will you decide whether sync-execution is cheaper than parallel
> > execution.  Do you have some specific cases in mind where async
> > execution will be more useful than parallel execution?
>
> Mmm.. Some confusion in wording? Sync-async is a discrimination
> about when to start execution of a node (and its
> descendents). Parallel-serial(sequential) is that of whether
> multiple nodes can execute simultaneously. Async execution
> premises parallel execution in any terms, bgworker or FDW.
>
> As I wrote in the previous mail, async execution reduces startup
> time of execution of parallel execution.
>

Could you please explain in more detail how async execution reduces
the startup time for parallel execution? Can you share the plans for
both the execution plans(with and without async execution), that might
help to understand the reason for same?


With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Re: [PoC] Asynchronous execution again (which is not parallel)

From
Robert Haas
Date:
On Mon, Nov 30, 2015 at 7:47 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> "Asynchronous execution" is a feature to start substantial work
> of nodes before doing Exec*. This can reduce total startup time
> by folding startup time of multiple execution nodes. Especially
> effective for the combination of joins or appends and their
> multiple children that needs long time to startup.
>
> This patch does that by inserting another phase "Start*" between
> ExecInit* and Exec* to launch parallel processing including
> pgworker and FDWs before requesting the very first tuple of the
> result.

I have thought about this, too, but I'm not very convinced that this
is the right model.  In a typical case involving parallelism, you hope
to have the Gather node as close to the top of the plan tree as
possible.  Therefore, the start phase will not happen much before the
first execution of the node, and you don't get much benefit.
Moreover, I think that prefetching can be useful not only at the start
of the query - which is the only thing that your model supports - but
also in mid-query.  For example, consider an Append of two ForeignScan
nodes.  Ideally we'd like to return the results in the order that they
become available, rather than serially.  This model might help with
that for the first batch of rows you fetch, but not after that.

There are a couple of other problems here that are specific to this
example.  You get a benefit here because you've got two Gather nodes
that both get kicked off before we try to read tuples from either, but
that's generally something to avoid - you can only use 3 processes and
typically at most 2 of those will actually be running (as opposed to
sleeping) at the same time: the workers will run to completion, and
then the leader will wake up and do its thing.   I'm not saying our
current implementation of parallel query scales well to a large number
of workers (it doesn't) but I think that's more about improving the
implementation than any theoretical problem, so this seems a little
worse.  Also, currently, both merge and hash joins have an
optimization wherein if the outer side of the join turns out to be
empty, we avoid paying the startup cost for the inner side of the
join; kicking off the work on the inner side of the merge join
asynchronously before we've gotten any tuples from the outer side
loses the benefit of that optimization.

I suspect there is no single paradigm that will help with all of the
cases where asynchronous execution is useful.  We're going to need a
series of changes that are targeted at specific problems.  For
example, here it would be useful to have one side of the join confirm
at the earliest possible stage that it will definitely return at least
one tuple eventually, but then return control to the caller so that we
can kick off the other side of the join.  The sort node never
eliminates anything, so as soon as the sequential scan underneath it
coughs up a tuple, we're definitely getting a return value eventually.
At that point it's safe to kick off the other Gather node.  I don't
quite know how to design a signalling system for that, but it could be
done.

But is it important enough to be worthwhile?  Maybe, maybe not.  I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel.  From my point of view, the case
where we really need some kind of asynchronous execution solution is a
ForeignScan, and in particular a ForeignScan which is the child of an
Append.  In that case it's obviously really useful to be able to kick
off all the foreign scans and then return a tuple from whichever one
coughs it up first.  Is that the ONLY case where asynchronous
execution is useful?  Probably not, but I bet it's the big one.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [PoC] Asynchronous execution again (which is not parallel)

From
Amit Kapila
Date:
On Tue, Dec 8, 2015 at 9:10 PM, Robert Haas <robertmhaas@gmail.com> wrote:
>
> On Mon, Nov 30, 2015 at 7:47 AM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
>
> But is it important enough to be worthwhile?  Maybe, maybe not.  I
> think we should be working toward a world where the Gather is at the
> top of the plan tree as often as possible, in which case
> asynchronously kicking off a Gather node won't be that exciting any
> more - see notes on the "parallelism + sorting" thread where I talk
> about primitives that would allow massively parallel merge joins,
> rather than 2 or 3 way parallel.  From my point of view, the case
> where we really need some kind of asynchronous execution solution is a
> ForeignScan, and in particular a ForeignScan which is the child of an
> Append.  In that case it's obviously really useful to be able to kick
> off all the foreign scans and then return a tuple from whichever one
> coughs it up first.
>

How will this be better than doing the same thing in a way we have done
Parallel Sequential Scan at ExecutorRun() time?



With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Re: [PoC] Asynchronous execution again (which is not parallel)

From
Kyotaro HORIGUCHI
Date:
Hello, thank you for the comment.

At Tue, 8 Dec 2015 10:40:20 -0500, Robert Haas <robertmhaas@gmail.com> wrote in
<CA+TgmobLEaho40e9puy3pLbeUx_a6hKBoDUqDNQO4rwORUM-eA@mail.gmail.com>
> On Mon, Nov 30, 2015 at 7:47 AM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > "Asynchronous execution" is a feature to start substantial work
> > of nodes before doing Exec*. This can reduce total startup time
> > by folding startup time of multiple execution nodes. Especially
> > effective for the combination of joins or appends and their
> > multiple children that needs long time to startup.
> >
> > This patch does that by inserting another phase "Start*" between
> > ExecInit* and Exec* to launch parallel processing including
> > pgworker and FDWs before requesting the very first tuple of the
> > result.
> 
> I have thought about this, too, but I'm not very convinced that this
> is the right model.  In a typical case involving parallelism, you hope
> to have the Gather node as close to the top of the plan tree as
> possible.  Therefore, the start phase will not happen much before the
> first execution of the node, and you don't get much benefit.

Obeying the Init-Exec semantics, Gather node cannot execute
underlying, say, Sort node before the upper node requests for the
first tuple. Async execution also potentially works for the case.

On the other hand, the patch is currently desined considering
Gahter as driven-all-time node. Since it has the same
characteristic with Append or MergeAppend in the sense that it
potentially executes multiple (and various kinds of) underlying
nodes, the patch should be redesigned following that but as far
as I can see for now that Gather executes multiple same (or
divided) scan nodes so I haven't make Gather
"asynch-aware".  (If I didn't take it wrongly.)

And if necessary, we can mark the query as 'async requested' in
planning phase.

> Moreover, I think that prefetching can be useful not only at the start
> of the query - which is the only thing that your model supports - but
> also in mid-query.  For example, consider an Append of two ForeignScan
> nodes.  Ideally we'd like to return the results in the order that they
> become available, rather than serially.  This model might help with
> that for the first batch of rows you fetch, but not after that.

Yeah, async-exec can have the similar mechanism as Gahter to
fetch tuples from underlying nodes.

> There are a couple of other problems here that are specific to this
> example.  You get a benefit here because you've got two Gather nodes
> that both get kicked off before we try to read tuples from either, but
> that's generally something to avoid - you can only use 3 processes and
> typically at most 2 of those will actually be running (as opposed to

Yes, it is one of the reason why I said the example as artificial.

> sleeping) at the same time: the workers will run to completion, and
> then the leader will wake up and do its thing.   I'm not saying our
> current implementation of parallel query scales well to a large number
> of workers (it doesn't) but I think that's more about improving the
> implementation than any theoretical problem, so this seems a little
> worse.  Also, currently, both merge and hash joins have an
> optimization wherein if the outer side of the join turns out to be
> empty, we avoid paying the startup cost for the inner side of the
> join; kicking off the work on the inner side of the merge join
> asynchronously before we've gotten any tuples from the outer side
> loses the benefit of that optimization.

It is a matter of comparson, async wins if the startup time of
the outer is longer (to some extent) than the time to build the
inner hash. But it requries planner part. I'll take it into
account if async exec itself is found to be useful.

> I suspect there is no single paradigm that will help with all of the
> cases where asynchronous execution is useful.  We're going to need a
> series of changes that are targeted at specific problems.  For
> example, here it would be useful to have one side of the join confirm
> at the earliest possible stage that it will definitely return at least
> one tuple eventually, but then return control to the caller so that we
> can kick off the other side of the join.  The sort node never
> eliminates anything, so as soon as the sequential scan underneath it
> coughs up a tuple, we're definitely getting a return value eventually.

It's quite impressive. But it might be a business of the planner.

> At that point it's safe to kick off the other Gather node.  I don't
> quite know how to design a signalling system for that, but it could be
> done.

I agree. I'll make further considertaion on that.

> But is it important enough to be worthwhile?  Maybe, maybe not.  I
> think we should be working toward a world where the Gather is at the
> top of the plan tree as often as possible, in which case
> asynchronously kicking off a Gather node won't be that exciting any
> more - see notes on the "parallelism + sorting" thread where I talk
> about primitives that would allow massively parallel merge joins,
> rather than 2 or 3 way parallel. 

Could you give me the subject of the thread? Or important message
of that.

> From my point of view, the case
> where we really need some kind of asynchronous execution solution is a
> ForeignScan, and in particular a ForeignScan which is the child of an
> Append.  In that case it's obviously really useful to be able to kick
> off all the foreign scans and then return a tuple from whichever one
> coughs it up first.  Is that the ONLY case where asynchronous
> execution is useful?  Probably not, but I bet it's the big one.

Yes, the most significant and obvious (but hard to estimate the
benefit) target of async execution is (Merge)Append-ForeignScan,
which is narrow but freuquently used.  And this patch has started
from it.

It is because of the startup-heavy nature of FDW. So I involved
sort as a target later then redesigned to give the ability on all
nodes.  If it is obviously over-done for the (currently) expected
benefit and if it is preferable to shrink this patch so as to
touch only the portion where async-exec has a benefit, I'll do
so.


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: [PoC] Asynchronous execution again (which is not parallel)

From
Amit Langote
Date:
Hi,

On 2015/12/14 17:34, Kyotaro HORIGUCHI wrote:
> At Tue, 8 Dec 2015 10:40:20 -0500, Robert Haas <robertmhaas@gmail.com> wrote
>> But is it important enough to be worthwhile?  Maybe, maybe not.  I
>> think we should be working toward a world where the Gather is at the
>> top of the plan tree as often as possible, in which case
>> asynchronously kicking off a Gather node won't be that exciting any
>> more - see notes on the "parallelism + sorting" thread where I talk
>> about primitives that would allow massively parallel merge joins,
>> rather than 2 or 3 way parallel. 
> 
> Could you give me the subject of the thread? Or important message
> of that.

I think that would be the following thread:

* parallelism and sorting *
http://www.postgresql.org/message-id/CA+TgmoYh4zsQMgqiyra7zO1RBBvG1qHn1fJT5q0Fpw+Q0xAjrg@mail.gmail.com

Thanks,
Amit





Re: [PoC] Asynchronous execution again (which is not parallel)

From
Kyotaro HORIGUCHI
Date:
Thank you a lot!

At Mon, 14 Dec 2015 17:51:41 +0900, Amit Langote <Langote_Amit_f8@lab.ntt.co.jp> wrote in
<566E831D.1050703@lab.ntt.co.jp>
> 
> Hi,
> 
> On 2015/12/14 17:34, Kyotaro HORIGUCHI wrote:
> > At Tue, 8 Dec 2015 10:40:20 -0500, Robert Haas <robertmhaas@gmail.com> wrote
> >> But is it important enough to be worthwhile?  Maybe, maybe not.  I
> >> think we should be working toward a world where the Gather is at the
> >> top of the plan tree as often as possible, in which case
> >> asynchronously kicking off a Gather node won't be that exciting any
> >> more - see notes on the "parallelism + sorting" thread where I talk
> >> about primitives that would allow massively parallel merge joins,
> >> rather than 2 or 3 way parallel. 
> > 
> > Could you give me the subject of the thread? Or important message
> > of that.
> 
> I think that would be the following thread:
> 
> * parallelism and sorting *
> http://www.postgresql.org/message-id/CA+TgmoYh4zsQMgqiyra7zO1RBBvG1qHn1fJT5q0Fpw+Q0xAjrg@mail.gmail.com

Thank you for the pointer. I'll read it.

# It's hard for me to do eyeball-greping on English texts..

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: [PoC] Asynchronous execution again (which is not parallel)

From
Robert Haas
Date:
On Fri, Dec 11, 2015 at 11:49 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
>> But is it important enough to be worthwhile?  Maybe, maybe not.  I
>> think we should be working toward a world where the Gather is at the
>> top of the plan tree as often as possible, in which case
>> asynchronously kicking off a Gather node won't be that exciting any
>> more - see notes on the "parallelism + sorting" thread where I talk
>> about primitives that would allow massively parallel merge joins,
>> rather than 2 or 3 way parallel.  From my point of view, the case
>> where we really need some kind of asynchronous execution solution is a
>> ForeignScan, and in particular a ForeignScan which is the child of an
>> Append.  In that case it's obviously really useful to be able to kick
>> off all the foreign scans and then return a tuple from whichever one
>> coughs it up first.
>
> How will this be better than doing the same thing in a way we have done
> Parallel Sequential Scan at ExecutorRun() time?

I'm not sure if this is what you are asking, but I think it probably
should be done at ExecutorRun() time, rather than a separate phase.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [PoC] Asynchronous execution again (which is not parallel)

From
Robert Haas
Date:
On Mon, Dec 14, 2015 at 3:34 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> Yes, the most significant and obvious (but hard to estimate the
> benefit) target of async execution is (Merge)Append-ForeignScan,
> which is narrow but freuquently used.  And this patch has started
> from it.
>
> It is because of the startup-heavy nature of FDW. So I involved
> sort as a target later then redesigned to give the ability on all
> nodes.  If it is obviously over-done for the (currently) expected
> benefit and if it is preferable to shrink this patch so as to
> touch only the portion where async-exec has a benefit, I'll do
> so.

Suppose we equip each EState with the ability to fire "callbacks".
Callbacks have the signature:

typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
*slot, void *context);

Executor nodes can register immediate callbacks to be run at the
earliest possible opportunity using a function like
ExecRegisterCallback(estate, callback, planstate, slot, context).
They can registered deferred callbacks that will be called when a file
descriptor becomes ready for I/O, or when the process latch is set,
using a call like ExecRegisterFileCallback(estate, fd, event,
callback, planstate, slot, context) or
ExecRegisterLatchCallback(estate, callback, planstate, slot, context).

To execute callbacks, an executor node can call ExecFireCallbacks(),
which will fire immediate callbacks in order of registration, and wait
for the file descriptors for which callbacks have been registered and
for the process latch when no immediate callbacks remain but there are
still deferred callbacks.  It will return when (1) there are no
remaining immediate or deferred callbacks or (2) one of the callbacks
returns "true".

Then, suppose we add a function bool ExecStartAsync(PlanState *target,
ExecCallback callback, PlanState *cb_planstate, void *cb_context).
For non-async-aware plan nodes, this just returns false.  async-aware
plan nodes should initiate some work, register some callbacks, and
return.  The callback that get registered should arrange in turn to
register the callback passed as an argument when a tuple becomes
available, passing the planstate and context provided by
ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.

So, in response to ExecStartAsync, if there's no tuple currently
available, postgres_fdw can send a query to the remote server and
request a callback when the fd becomes ready-ready.  It must save the
callback passed to ExecStartAsync inside the PlanState someplace so
that when a tuple becomes available it can register that callback.

ExecAppend can call ExecStartAsync on each of its subplans.  For any
subplan where ExecStartAsync returns false, ExecAppend will just
execute it normally, by calling ExecProcNode repeatedly until no more
tuples are returned.  But for async-capable subplans, it can call
ExecStartAsync on all of them, and then call ExecFireCallbacks.  The
tuple-ready callback it passes to its child plans will take the tuple
provided by the child plan and store it into the Append node's slot.
It will then return true if, and only if, ExecFireCallbacks is being
invoked from ExecAppend (which it can figure out via some kind of
signalling either through its own PlanState or centralized signalling
through the EState).  That way, if ExecAppend were itself invoked
asynchronously, its tuple-ready callback could simply populate a slot
appropriately register its invoker's tuple-ready callback.  Whether
called synchronously or asynchronously, each invocation of as
asynchronous append after the first would just need to again
ExecStartAsync on the child that last returned a tuple.

It seems pretty straightforward to fit Gather into this infrastructure.

It is unclear to me how useful this is beyond ForeignScan, Gather, and
Append.  MergeAppend's ordering constraint makes it less useful; we
can asynchronously kick off the request for the next tuple before
returning the previous one, but we're going to need to have that tuple
before we can return the next one.  But it could be done.  It could
potentially even be applied to seq scans or index scans using some set
of asynchronous I/O interfaces, but I don't see how it could be
applied to joins or aggregates, which typically can't really proceed
until they get the next tuple.  They could be plugged into this
interface easily enough but it would only help to the extent that it
enabled asynchrony elsewhere in the plan tree to be pulled up towards
the root.

Thoughts?

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [PoC] Asynchronous execution again (which is not parallel)

From
Amit Kapila
Date:
On Wed, Dec 16, 2015 at 4:54 AM, Robert Haas <robertmhaas@gmail.com> wrote:
>
> On Fri, Dec 11, 2015 at 11:49 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
> >> But is it important enough to be worthwhile?  Maybe, maybe not.  I
> >> think we should be working toward a world where the Gather is at the
> >> top of the plan tree as often as possible, in which case
> >> asynchronously kicking off a Gather node won't be that exciting any
> >> more - see notes on the "parallelism + sorting" thread where I talk
> >> about primitives that would allow massively parallel merge joins,
> >> rather than 2 or 3 way parallel.  From my point of view, the case
> >> where we really need some kind of asynchronous execution solution is a
> >> ForeignScan, and in particular a ForeignScan which is the child of an
> >> Append.  In that case it's obviously really useful to be able to kick
> >> off all the foreign scans and then return a tuple from whichever one
> >> coughs it up first.
> >
> > How will this be better than doing the same thing in a way we have done
> > Parallel Sequential Scan at ExecutorRun() time?
>
> I'm not sure if this is what you are asking, but I think it probably
> should be done at ExecutorRun() time, rather than a separate phase.
>

Yes, thats one thing I wanted to know, yet another point which is not
clear to me about this Async infrastructure is why the current infrastructure
of Parallelism can't be used to achieve the Async benefits of ForeignScan?


With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Re: [PoC] Asynchronous execution again (which is not parallel)

From
Robert Haas
Date:
On Wed, Dec 16, 2015 at 1:34 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
> Yes, thats one thing I wanted to know, yet another point which is not
> clear to me about this Async infrastructure is why the current
> infrastructure
> of Parallelism can't be used to achieve the Async benefits of ForeignScan?

Well, all a ForeignScan by postgres_fdw does is read the tuples that
are generated remotely.  Turning around and sticking those into a
Funnel doesn't seem like it gains much: now instead of having to read
tuples from someplace, the leader has to read tuples from some other
place.  Yeah, there are cases where it could win, like when there's a
selective nonpushable qual, but that's not that exciting.

There's another, more serious problem: if the leader has a connection
open to the remote server and that connection is in mid-transaction,
you can't have a worker open a new connection without changing the
semantics.  Working around that problem looks hard to me.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [PoC] Asynchronous execution again (which is not parallel)

From
Amit Kapila
Date:
On Wed, Dec 16, 2015 at 6:04 PM, Robert Haas <robertmhaas@gmail.com> wrote:
>
> On Wed, Dec 16, 2015 at 1:34 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
> > Yes, thats one thing I wanted to know, yet another point which is not
> > clear to me about this Async infrastructure is why the current
> > infrastructure
> > of Parallelism can't be used to achieve the Async benefits of ForeignScan?
>
> Well, all a ForeignScan by postgres_fdw does is read the tuples that
> are generated remotely.  Turning around and sticking those into a
> Funnel doesn't seem like it gains much: now instead of having to read
> tuples from someplace, the leader has to read tuples from some other
> place.  Yeah, there are cases where it could win, like when there's a
> selective nonpushable qual, but that's not that exciting.
>
> There's another, more serious problem: if the leader has a connection
> open to the remote server and that connection is in mid-transaction,
> you can't have a worker open a new connection without changing the
> semantics.  Working around that problem looks hard to me.
>

Okay.  It seems there are cases where it could benefit from Async
execution infrastructure instead of directly using Gather node kind
of infrastructure.  I am not the right person to judge whether there
are enough cases that we need a new infrastructure for such executions,
but I think it is a point to consider and I am sure you will make the
right move.


With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Re: [PoC] Asynchronous execution again (which is not parallel)

From
Mart Kelder
Date:
Hi Robert and others,

First, I currently don't know the postgresql code well enough yet. I still 
hope my toughts are usefull.

Robert Haas wrote:
> It is unclear to me how useful this is beyond ForeignScan, Gather, and
> Append.  MergeAppend's ordering constraint makes it less useful; we
> can asynchronously kick off the request for the next tuple before
> returning the previous one, but we're going to need to have that tuple
> before we can return the next one.  But it could be done.  It could
> potentially even be applied to seq scans or index scans using some set
> of asynchronous I/O interfaces, but I don't see how it could be
> applied to joins or aggregates, which typically can't really proceed
> until they get the next tuple.  They could be plugged into this
> interface easily enough but it would only help to the extent that it
> enabled asynchrony elsewhere in the plan tree to be pulled up towards
> the root.

As far as I understand, this comes down to a number of subplans. The subplan 
can be asked to return a tuple directly or at some later point in time 
(asynchrone). Conceptionally, the subplan goes in a "tuples wanted" state 
and it starts it works that need to be done to receive that tuple. Later, it 
either returns the tuple or the message that there are no tuples.

I see opportunities to use this feature to make some queryplans less memory 
intensive without increasing the total amount of work to be done. I think 
the same subplan can be placed at several places in the execution plan. If 
the subplan ends with a tuple store, then if a row is requested, it can 
either return it from store, or generate it in the subplan. If both outputs 
of the subplan request tuples at around the same rate, the tuple store size 
is limited where we currently need to save all the tuples or generate the 
tuples multiple times. I think this can be potentionally beneficial for 
CTE's.

I also think the planner can predict what the chances are that a subplan can 
be reused. If both subplans are on a different side of a hash join, it can 
know that one output will be exhausted before the second output will request 
the first row. That would mean that the everything needs to be stored. Also, 
not every callback needs to be invoked at the same rate: tuple storage might 
be avoided by choosing the callback to invoke wisely.

I am a little bit worried about the performance as a result of context 
switching. I think it is a good idea to only register the callback if it 
actually hits a point where the tuple can't be generated directly.

> Thoughts?

Regards,

Mart




Re: [PoC] Asynchronous execution again (which is not parallel)

From
Kyotaro HORIGUCHI
Date:
Thank you for the comment.

At Tue, 15 Dec 2015 21:01:27 -0500, Robert Haas <robertmhaas@gmail.com> wrote in
<CA+TgmoZuAqVDJQ14YHCa3izbdaaaUSuwrG1YbtJD0rKO5EmeKQ@mail.gmail.com>
> On Mon, Dec 14, 2015 at 3:34 AM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > Yes, the most significant and obvious (but hard to estimate the
> > benefit) target of async execution is (Merge)Append-ForeignScan,
> > which is narrow but freuquently used.  And this patch has started
> > from it.
> >
> > It is because of the startup-heavy nature of FDW. So I involved
> > sort as a target later then redesigned to give the ability on all
> > nodes.  If it is obviously over-done for the (currently) expected
> > benefit and if it is preferable to shrink this patch so as to
> > touch only the portion where async-exec has a benefit, I'll do
> > so.
> 
> Suppose we equip each EState with the ability to fire "callbacks".
> Callbacks have the signature:
> 
> typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
> *slot, void *context);
> 
> Executor nodes can register immediate callbacks to be run at the
> earliest possible opportunity using a function like
> ExecRegisterCallback(estate, callback, planstate, slot, context).
> They can registered deferred callbacks that will be called when a file
> descriptor becomes ready for I/O, or when the process latch is set,
> using a call like ExecRegisterFileCallback(estate, fd, event,
> callback, planstate, slot, context) or
> ExecRegisterLatchCallback(estate, callback, planstate, slot, context).
> 
> To execute callbacks, an executor node can call ExecFireCallbacks(),
> which will fire immediate callbacks in order of registration, and wait
> for the file descriptors for which callbacks have been registered and
> for the process latch when no immediate callbacks remain but there are
> still deferred callbacks.  It will return when (1) there are no
> remaining immediate or deferred callbacks or (2) one of the callbacks
> returns "true".

Excellent! I unconsciously excluded the case of callbacks because
I supposed (without certain ground) all executor nodes can have a
chance to win from this. Such callback is a good choice to do
what Start*Node did in the lastest patch.

> Then, suppose we add a function bool ExecStartAsync(PlanState *target,
> ExecCallback callback, PlanState *cb_planstate, void *cb_context).
> For non-async-aware plan nodes, this just returns false.  async-aware
> plan nodes should initiate some work, register some callbacks, and
> return.  The callback that get registered should arrange in turn to
> register the callback passed as an argument when a tuple becomes
> available, passing the planstate and context provided by
> ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.

Although I don't imagine clearly about the case of
async-aware-nodes under non-aware-nodes, it seems to have a high
affinity with (true) parallel execution framework.

> So, in response to ExecStartAsync, if there's no tuple currently
> available, postgres_fdw can send a query to the remote server and
> request a callback when the fd becomes ready-ready.  It must save the
> callback passed to ExecStartAsync inside the PlanState someplace so
> that when a tuple becomes available it can register that callback.
> 
> ExecAppend can call ExecStartAsync on each of its subplans.  For any
> subplan where ExecStartAsync returns false, ExecAppend will just
> execute it normally, by calling ExecProcNode repeatedly until no more
> tuples are returned.  But for async-capable subplans, it can call
> ExecStartAsync on all of them, and then call ExecFireCallbacks.  The
> tuple-ready callback it passes to its child plans will take the tuple
> provided by the child plan and store it into the Append node's slot.
> It will then return true if, and only if, ExecFireCallbacks is being
> invoked from ExecAppend (which it can figure out via some kind of
> signalling either through its own PlanState or centralized signalling
> through the EState).  That way, if ExecAppend were itself invoked
> asynchronously, its tuple-ready callback could simply populate a slot
> appropriately register its invoker's tuple-ready callback.  Whether
> called synchronously or asynchronously, each invocation of as
> asynchronous append after the first would just need to again
> ExecStartAsync on the child that last returned a tuple.

Thanks for the attentive explanation. My concern about this is
that the latency by synchronizing one by one for every tuple
between the producer and the consumer. My previous patch is not
asynchronous on every tuple so it can give a pure gain without
loss from tuple-wise synchronization. But it looks clean and I
like it so I'll consider this.

> It seems pretty straightforward to fit Gather into this infrastructure.

Yes.

> It is unclear to me how useful this is beyond ForeignScan, Gather, and
> Append.  MergeAppend's ordering constraint makes it less useful; we
> can asynchronously kick off the request for the next tuple before
> returning the previous one, but we're going to need to have that tuple
> before we can return the next one.  But it could be done.  It could
> potentially even be applied to seq scans or index scans using some set
> of asynchronous I/O interfaces, but I don't see how it could be
> applied to joins or aggregates, which typically can't really proceed
> until they get the next tuple.  They could be plugged into this
> interface easily enough but it would only help to the extent that it
> enabled asynchrony elsewhere in the plan tree to be pulled up towards
> the root.

This is mainly not an argument on "asynchronous execution/start"
but "asynchronous tuple-passing". As I showed before, a merge
join on asynchronous and parallel children running sort *can* win
over a hash join (if planner foresees that). If asynchronous
tuple-passing is not so effective like MergeAppend, we can
simplly refrain from doing that. But cost modeling for it is a
difficult problem.

> Thoughts?

I'll try the callback framework and in-process asynchronous
tuple-passing (like select(2)). Please wait for a while.

regares,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: [PoC] Asynchronous execution again (which is not parallel)

From
Kyotaro HORIGUCHI
Date:
Hello,

I put some consideration and trial on callbacks as a means to
async(early)-execution.

> > Suppose we equip each EState with the ability to fire "callbacks".
> > Callbacks have the signature:
> > 
> > typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
> > *slot, void *context);
> > 
> > Executor nodes can register immediate callbacks to be run at the
> > earliest possible opportunity using a function like
> > ExecRegisterCallback(estate, callback, planstate, slot, context).
> > They can registered deferred callbacks that will be called when a file
> > descriptor becomes ready for I/O, or when the process latch is set,
> > using a call like ExecRegisterFileCallback(estate, fd, event,
> > callback, planstate, slot, context) or
> > ExecRegisterLatchCallback(estate, callback, planstate, slot, context).

I considered on this. The immediate callbacks seems fine but
using latch or fds to signal tuple availability doesn't seem to
fit callbacks stored in estate. They are deferrable until
parent's tuple request and such kind of events can be handled at
the time as ExecGather does now. However some kind of
synchronize/waiting mechanism like latch or select() is needed
anyway.

> > To execute callbacks, an executor node can call ExecFireCallbacks(),
> > which will fire immediate callbacks in order of registration, and wait
> > for the file descriptors for which callbacks have been registered and
> > for the process latch when no immediate callbacks remain but there are
> > still deferred callbacks.  It will return when (1) there are no
> > remaining immediate or deferred callbacks or (2) one of the callbacks
> > returns "true".
> 
> Excellent! I unconsciously excluded the case of callbacks because
> I supposed (without certain ground) all executor nodes can have a
> chance to win from this. Such callback is a good choice to do
> what Start*Node did in the lastest patch.

The previous code added a large amount of garbage, which was the
mechanism of async-execution including additional code for
ExecStartNode phase in the same manner to ExecProcNode and
ExecEndNode. Most of the additional code is totally useless for
most of the types of node.

Callback is usable for not-so-common invoked-for-a-event-at-once
operations such like error-handling. For this case, the
operations can be asynch-execution of a node and the event can be
just before ExecProcNode on the topmost node. The first patch
attached allows async-capable nodes to register callbacks on Init
phase and executes them just before Exec phase on the topmost
node. It grately reduces the additional code as the result. My
first impression from the word "callbacks" is this.

The following operation yields LOG messages from dummy callback
with this patch.

CREATE TABLE t1 (a int, b int);
INSERT INTO t1 (SELECT a, 1 FROM generate_series(0, 99) a);
CREATE TABLE t2 (a int, b int);
INSERT INTO t2 (SELECT a, 2 FROM generate_series(0, 99) a);
CREATE TABLE t3 (a int, b int);
INSERT INTO t3 (SELECT a, 3 FROM generate_series(0, 99) a);
SELECT * FROM t1 UNION ALL SELECT * FROM t2 UNION ALL SELECT * FROM t3;
===
LOG:  dummy_async_cb is called for 0x2783a98
LOG:  dummy_async_cb is called for 0x2784248
LOG:  dummy_async_cb is called for 0x2784ad0

What my previous patch could do is doable by this first patch
with far less diffs.

If this design is not bad, I'll do postgres_fdw part.


Next is discussion about async tuple fetching.

> > Then, suppose we add a function bool ExecStartAsync(PlanState *target,
> > ExecCallback callback, PlanState *cb_planstate, void *cb_context).
> > For non-async-aware plan nodes, this just returns false.  async-aware
> > plan nodes should initiate some work, register some callbacks, and
> > return.  The callback that get registered should arrange in turn to
> > register the callback passed as an argument when a tuple becomes
> > available, passing the planstate and context provided by
> > ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.
> 
> Although I don't imagine clearly about the case of
> async-aware-nodes under non-aware-nodes, it seems to have a high
> affinity with (true) parallel execution framework.

The ExecStartAsync is similar to ExecStartNode of my old
patch. One of the most annoying things of that is that it needs
to walk down to their descendents and in turn it needs garbageous
corresponding additional codes for all type of nodes which can
have children.

Instead, in the second patch, I modified ExecProcNode to return
async status in EState. It will be EXEC_READY or EXEC_EOT(End of
table/No more tuple?) for non-async-capable nodes and
async-capable nodes can set it EXEC_NOT_READY, which indicates
that there could be more tuple but not available yet.

Async-aware nodes such as Append can go to the next child if the
predecessor returned EXEC_NOT_READY. If all !EXEC_EOT nodes
returned EXEC_NOT_READY, Append will wait using some signaling
mechanism (it runs busily now instead.). As an example, the
second patch modifies ExecAppend to handle it and modified
ExecSeqScan to return EXEC_NOT_READY by certain probability as an
emulation of asynchronous tuple fetching. The UNION ALL query
above returns results stirred among the tree tables as the result.

> > So, in response to ExecStartAsync, if there's no tuple currently
> > available, postgres_fdw can send a query to the remote server and
> > request a callback when the fd becomes ready-ready.  It must save the
> > callback passed to ExecStartAsync inside the PlanState someplace so
> > that when a tuple becomes available it can register that callback.
> > 
> > ExecAppend can call ExecStartAsync on each of its subplans.  For any
> > subplan where ExecStartAsync returns false, ExecAppend will just
> > execute it normally, by calling ExecProcNode repeatedly until no more
> > tuples are returned.  But for async-capable subplans, it can call
> > ExecStartAsync on all of them, and then call ExecFireCallbacks.  The
> > tuple-ready callback it passes to its child plans will take the tuple
> > provided by the child plan and store it into the Append node's slot.
> > It will then return true if, and only if, ExecFireCallbacks is being
> > invoked from ExecAppend (which it can figure out via some kind of
> > signalling either through its own PlanState or centralized signalling
> > through the EState).  That way, if ExecAppend were itself invoked
> > asynchronously, its tuple-ready callback could simply populate a slot
> > appropriately register its invoker's tuple-ready callback.  Whether
> > called synchronously or asynchronously, each invocation of as
> > asynchronous append after the first would just need to again
> > ExecStartAsync on the child that last returned a tuple.
> 
> Thanks for the attentive explanation. My concern about this is
> that the latency by synchronizing one by one for every tuple
> between the producer and the consumer. My previous patch is not
> asynchronous on every tuple so it can give a pure gain without
> loss from tuple-wise synchronization. But it looks clean and I
> like it so I'll consider this.
> 
> > It seems pretty straightforward to fit Gather into this infrastructure.
>
> Yes.

If Gather's children become a regular node struct with a name
like Worker(Node), instead of non-Node structure as it is now, we
can generalize the tuple-synchronization mecanism so that it can
be used by other nodes such as ForeginScan. Append(ForegnScan,
ForegnScan,...) with async tuple passing can average multiple
foreign servers so I suppose that it is preferable if no penalty
exists.

> > It is unclear to me how useful this is beyond ForeignScan, Gather, and
> > Append.  MergeAppend's ordering constraint makes it less useful; we
> > can asynchronously kick off the request for the next tuple before
> > returning the previous one, but we're going to need to have that tuple
> > before we can return the next one.  But it could be done.  It could
> > potentially even be applied to seq scans or index scans using some set
> > of asynchronous I/O interfaces, but I don't see how it could be
> > applied to joins or aggregates, which typically can't really proceed
> > until they get the next tuple.  They could be plugged into this
> > interface easily enough but it would only help to the extent that it
> > enabled asynchrony elsewhere in the plan tree to be pulled up towards
> > the root.
> 
> This is mainly not an argument on "asynchronous execution/start"
> but "asynchronous tuple-passing". As I showed before, a merge
> join on asynchronous and parallel children running sort *can* win
> over a hash join (if planner foresees that). If asynchronous
> tuple-passing is not so effective like MergeAppend, we can
> simplly refrain from doing that. But cost modeling for it is a
> difficult problem.
> 
> > Thoughts?
> 
> I'll try the callback framework and in-process asynchronous
> tuple-passing (like select(2)). Please wait for a while.

Finally, the two patches attached became somewhat different from
Robert's suggestion and lacks of synchronization feature. However
if this way to is not so bad, I'll build the feature on this way.

Suggestions? Thoughts?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: [PoC] Asynchronous execution again (which is not parallel)

From
Amit Langote
Date:
Hi!

On 2016/01/21 18:26, Kyotaro HORIGUCHI wrote:
>>> Then, suppose we add a function bool ExecStartAsync(PlanState *target,
>>> ExecCallback callback, PlanState *cb_planstate, void *cb_context).
>>> For non-async-aware plan nodes, this just returns false.  async-aware
>>> plan nodes should initiate some work, register some callbacks, and
>>> return.  The callback that get registered should arrange in turn to
>>> register the callback passed as an argument when a tuple becomes
>>> available, passing the planstate and context provided by
>>> ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.
>>
>> Although I don't imagine clearly about the case of
>> async-aware-nodes under non-aware-nodes, it seems to have a high
>> affinity with (true) parallel execution framework.
> 
> The ExecStartAsync is similar to ExecStartNode of my old
> patch. One of the most annoying things of that is that it needs
> to walk down to their descendents and in turn it needs garbageous
> corresponding additional codes for all type of nodes which can
> have children.

Unless I am missing something, I wonder if this is where
planstate_tree_walker() introduced by commit 8dd401aa is useful. For
example, I see that it's used in ExecShutdownNode() in a manner that looks
interesting for this discussion.

Thanks,
Amit





Re: [PoC] Asynchronous execution again (which is not parallel)

From
Kyotaro HORIGUCHI
Date:
Hi.

At Thu, 21 Jan 2016 19:09:19 +0900, Amit Langote <Langote_Amit_f8@lab.ntt.co.jp> wrote in
<56A0AE4F.9000508@lab.ntt.co.jp>
> 
> Hi!
> 
> On 2016/01/21 18:26, Kyotaro HORIGUCHI wrote:
> >>> Then, suppose we add a function bool ExecStartAsync(PlanState *target,
> >>> ExecCallback callback, PlanState *cb_planstate, void *cb_context).
> >>> For non-async-aware plan nodes, this just returns false.  async-aware
> >>> plan nodes should initiate some work, register some callbacks, and
> >>> return.  The callback that get registered should arrange in turn to
> >>> register the callback passed as an argument when a tuple becomes
> >>> available, passing the planstate and context provided by
> >>> ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.
> >>
> >> Although I don't imagine clearly about the case of
> >> async-aware-nodes under non-aware-nodes, it seems to have a high
> >> affinity with (true) parallel execution framework.
> > 
> > The ExecStartAsync is similar to ExecStartNode of my old
> > patch. One of the most annoying things of that is that it needs
> > to walk down to their descendents and in turn it needs garbageous
> > corresponding additional codes for all type of nodes which can
> > have children.
> 
> Unless I am missing something, I wonder if this is where
> planstate_tree_walker() introduced by commit 8dd401aa is useful. For
> example, I see that it's used in ExecShutdownNode() in a manner that looks
> interesting for this discussion.

Oh, that's a part of parallel execution sutff. Thanks for letting
me know of that. The walker approach also fits to kick functions
that most types of node is unrelated. Only one (or two, including
ForeignScan) types of nodes are involved.

The attached patches have the same functionality but using
planstate_tree_walker instead of callbacks. This seems further
simpler the callbacks.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


Re: [PoC] Asynchronous execution again (which is not parallel)

From
Robert Haas
Date:
On Thu, Jan 21, 2016 at 4:26 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> I put some consideration and trial on callbacks as a means to
> async(early)-execution.

Thanks for working on this.

>> > Suppose we equip each EState with the ability to fire "callbacks".
>> > Callbacks have the signature:
>> >
>> > typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
>> > *slot, void *context);
>> >
>> > Executor nodes can register immediate callbacks to be run at the
>> > earliest possible opportunity using a function like
>> > ExecRegisterCallback(estate, callback, planstate, slot, context).
>> > They can registered deferred callbacks that will be called when a file
>> > descriptor becomes ready for I/O, or when the process latch is set,
>> > using a call like ExecRegisterFileCallback(estate, fd, event,
>> > callback, planstate, slot, context) or
>> > ExecRegisterLatchCallback(estate, callback, planstate, slot, context).
>
> I considered on this. The immediate callbacks seems fine but
> using latch or fds to signal tuple availability doesn't seem to
> fit callbacks stored in estate. They are deferrable until
> parent's tuple request and such kind of events can be handled at
> the time as ExecGather does now. However some kind of
> synchronize/waiting mechanism like latch or select() is needed
> anyway.

I am not entirely sure I understand what you are trying to say here,
but if I do understand it then I disagree.  Consider an Append node
with 1000 children, each a ForeignScan.  What we want to do is fire
off all 1000 remote queries and then return a tuple from whichever
ForeignScan becomes ready first.  What we do NOT want to do is fire
off all 1000 remote queries and have to either (a) iterate through all
1000 subplans checking repeatedly whether each is ready or (b) pick
one of those 1000 subplans to wait for and ignore the other 999 until
the one we pick is ready.  I don't see any way to achieve what we need
here without some way to convince the executor to do a select() across
all 1000 fds and take some action based on which one becomes
read-ready.

> Callback is usable for not-so-common invoked-for-a-event-at-once
> operations such like error-handling. For this case, the
> operations can be asynch-execution of a node and the event can be
> just before ExecProcNode on the topmost node. The first patch
> attached allows async-capable nodes to register callbacks on Init
> phase and executes them just before Exec phase on the topmost
> node. It grately reduces the additional code as the result. My
> first impression from the word "callbacks" is this.

This strikes me as pretty much uninteresting.  I bet if you test this
you'll find that it doesn't make anything faster.  You're kicking
things off asynchronously only a tiny fraction of a second before you
would have started them anyway.  What we need is a way to proceed
asynchronously through the entire execution of the query, not just at
the beginning.  I understand that your merge-join-double-gather
example can benefit from this, but it does so by kicking off both
subplans before we know for sure that we'll want to read from both
subplans.  I admit that optimization rarely kicks in, but it's a
potentially huge savings of effort when it does.

> Instead, in the second patch, I modified ExecProcNode to return
> async status in EState. It will be EXEC_READY or EXEC_EOT(End of
> table/No more tuple?) for non-async-capable nodes and
> async-capable nodes can set it EXEC_NOT_READY, which indicates
> that there could be more tuple but not available yet.
>
> Async-aware nodes such as Append can go to the next child if the
> predecessor returned EXEC_NOT_READY. If all !EXEC_EOT nodes
> returned EXEC_NOT_READY, Append will wait using some signaling
> mechanism (it runs busily now instead.). As an example, the
> second patch modifies ExecAppend to handle it and modified
> ExecSeqScan to return EXEC_NOT_READY by certain probability as an
> emulation of asynchronous tuple fetching. The UNION ALL query
> above returns results stirred among the tree tables as the result.

I think the idea of distinguishing between "end of tuples" and "no
tuples currently ready" is a good one.  I am not particularly excited
about this particular signalling mechanism.  I am not sure why you
want to put this in the EState - isn't that shared between all nodes
in the plan tree, and doesn't that create therefore the risk of
confusion?  What I might imagine is giving ExecProcNode a second
argument that functions as an out parameter and is only meaningful
when TupIsNull(result).  It could for example be a boolean indicating
whether more tuples might become available later.  Maybe an enum is
better, but let's suppose a Boolean for now.  At the top of
ExecProcNode we would do *async_pending = false.  Then, we'd pass
async_pending to ExecWhatever function that is potentially
async-capable and it could set *async_pending = true if it wants.

>> Thanks for the attentive explanation. My concern about this is
>> that the latency by synchronizing one by one for every tuple
>> between the producer and the consumer. My previous patch is not
>> asynchronous on every tuple so it can give a pure gain without
>> loss from tuple-wise synchronization. But it looks clean and I
>> like it so I'll consider this.
>>
>> > It seems pretty straightforward to fit Gather into this infrastructure.
>>
>> Yes.
>
> If Gather's children become a regular node struct with a name
> like Worker(Node), instead of non-Node structure as it is now, we
> can generalize the tuple-synchronization mecanism so that it can
> be used by other nodes such as ForeginScan. Append(ForegnScan,
> ForegnScan,...) with async tuple passing can average multiple
> foreign servers so I suppose that it is preferable if no penalty
> exists.

I don't quite understand what you mean by saying that Gather's
children are not a "regular node struct".  Which struct are you
talking about?

I do agree that transferring tuples one by one seems like it might be
inefficient.  Gather suffered from a problem of this nature which was
repaired, at least partially, by commit
bc7fcab5e36b9597857fa7e3fa6d9ba54aaea167.  I'm wondering if we need
some kind of tuple-buffering abstraction.  A tuple-buffer could have
an on-receipt-of-tuples callback; if it doesn't, then the tuples can
be read out synchronously one by one.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company