Thread: asynchronous and vectorized execution

asynchronous and vectorized execution

From
Robert Haas
Date:
Hi,

I realize that we haven't gotten 9.6beta1 out the door yet, but I
think we can't really wait much longer to start having at least some
discussion of 9.7 topics, so I'm going to go ahead and put this one
out there.  I believe there are other people thinking about these
topics as well, including Andres Freund, Kyotaro Horiguchi, and
probably some folks at 2ndQuadrant (but I don't know exactly who).  To
make a long story short, I think there are several different areas
where we should consider major upgrades to our executor.  It's too
slow and it doesn't do everything we want it to do.  The main things
on my mind are:

1. asynchronous execution, by which I mean the ability of a node to
somehow say that it will generate a tuple eventually, but is not yet
ready, so that the executor can go run some other part of the plan
tree while it waits.  This case most obviously arises for foreign
tables, where it makes little sense to block on I/O if some other part
of the query tree could benefit from the CPU; consider SELECT * FROM
lt WHERE qual UNION SELECT * FROM ft WHERE qual.  It is also a problem
for parallel query: in a parallel sequential scan, the next worker can
begin reading the next block even if the current block hasn't yet been
received from the OS.  Whether or not this will be efficient is a
research question, but it can be done.  However, imagine a parallel
scan of a btree index: we don't know what page to scan next until we
read the previous page and examine the next-pointer.  In the meantime,
any worker that arrives at that scan node has no choice but to block.
It would be better if the scan node could instead say "hey, thanks for
coming but I'm really not ready to be on-CPU just at the moment" and
potentially allow the worker to go work in some other part of the
query tree.  For that worker to actually find useful work to do
elsewhere, we'll probably need it to be the case either that the table
is partitioned or the original query will need to involve UNION ALL,
but those are not silly cases to worry about, particularly if we get
native partitioning in 9.7.

2. vectorized execution, by which I mean the ability of a node to
return tuples in batches rather than one by one.  Andres has opined
more than once that repeated trips through ExecProcNode defeat the
ability of the CPU to do branch prediction correctly, slowing the
whole system down, and that they also result in poor CPU cache
behavior, since we jump all over the place executing a little bit of
code from each node before moving onto the next rather than running
one bit of code first, and then another later.  I think that's
probably right.   For example, consider a 5-table join where all of
the joins are implemented as hash tables.  If this query plan is going
to be run to completion, it would make much more sense to fetch, say,
100 tuples from the driving scan and then probe for all of those in
the first hash table, and then probe for all of those in the second
hash table, and so on.  What we do instead is fetch one tuple and
probe for it in all 5 hash tables, and then repeat.  If one of those
hash tables would fit in the CPU cache but all five together will not,
that seems likely to be a lot worse.   But even just ignoring the CPU
cache aspect of it for a minute, suppose you want to write a loop to
perform a hash join.  The inner loop fetches the next tuple from the
probe table and does a hash lookup.  Right now, fetching the next
tuple from the probe table means calling a function which in turn
calls another function which probably calls another function which
probably calls another function and now about 4 layers down we
actually get the next tuple.  If the scan returned a batch of tuples
to the hash join, fetching the next tuple from the batch would
probably be 0 or 1 function calls rather than ... more.  Admittedly,
you've got to consider the cost of marshaling the batches but I'm
optimistic that there are cycles to be squeezed out here.  We might
also want to consider storing batches of tuples in a column-optimized
rather than row-optimized format so that iterating through one or two
attributes across every tuple in the batch touches the minimal number
of cache lines.

Obviously, both of these are big projects that could touch a large
amount of executor code, and there may be other ideas, in addition to
these, which some of you may be thinking about that could also touch a
large amount of executor code.  It would be nice to agree on a way
forward that minimizes code churn and maximizes everyone's attempt to
contribute without conflicting with each other.  Also, it seems
desirable to enable, as far as possible, incremental development - in
particular, it seems to me that it would be good to pick a design that
doesn't require massive changes to every node all at once.  A single
patch that adds some capability to every node in the executor in one
fell swoop is going to be too large to review effectively.

My proposal for how to do this is to make ExecProcNode function as a
backward-compatibility wrapper.  For asynchronous execution, a node
might return a not-ready-yet indication, but if that node is called
via ExecProcNode, it means the caller isn't prepared to receive such
an indication, so ExecProcNode will just wait for the node to become
ready and then return the tuple.  Similarly, for vectorized execution,
a node might return a bunch of tuples all at once.  ExecProcNode will
extract the first one and return it to the caller, and subsequent
calls to ExecProcNode will iterate through the rest of the batch, only
calling the underlying node-specific function when the batch is
exhausted.  In this way, code that doesn't know about the new stuff
can continue to work pretty much as it does today.  Also, and I think
this is important, nodes don't need the permission of their parent
node to use these new capabilities.  They can use them whenever they
wish, without worrying about whether the upper node is prepared to
deal with it.  If not, ExecProcNode will paper over the problem.  This
seems to me to be a good way to keep the code simple.

For asynchronous execution, I have gone so far as to mock up a bit of
what this might look like.  This shouldn't be taken very seriously at
this point, but I'm attaching a few very-much-WIP patches to show the
direction of my line of thinking.  Basically, I propose to have
ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
by putting them into a new PlanState member called "result", which is
just a Node * so that we can support multiple types of results,
instead of returning them.  There is also a result_ready boolean, so
that a node can return without setting this Boolean to engage
asynchronous behavior.  This triggers an "event loop", which
repeatedly waits for FDs chosen by waiting nodes to become readable
and/or writeable and then gives the node a chance to react.
Eventually, the waiting node will stop waiting and have a result
ready, at which point the event loop will give the parent of that node
a chance to run.  If that node consequently becomes ready, then its
parent gets a chance to run.  Eventually (we hope), the node for which
we're waiting becomes ready, and we can then read a result tuple.
With some more work, this seems like it can handle the FDW case, but I
haven't worked out how to make it handle the related parallel query
case.  What we want there is to wait not for the readiness of an FD
but rather for some other process involved in the parallel query to
reach a point where it can welcome assistance executing that node.  I
don't know exactly what the signaling for that should look like yet -
maybe setting the process latch or something.

By the way, one smaller executor project that I think we should also
look at has to do with this comment in nodeSeqScan.c:

static bool
SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
{
        /*
         * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
         * (and this is very bad) - so, here we do not check are keys ok or not.
         */
        return true;
}

Some quick prototyping by my colleague Dilip Kumar suggests that, in
fact, there are cases where pushing down keys into heap_beginscan()
could be significantly faster.  Some care is required here because any
functions we execute as scan keys are run with the buffer locked, so
we had better not run anything very complicated.  But doing this for
simple things like integer equality operators seems like it could save
quite a few buffer lock/unlock cycles and some other executor overhead
as well.

Thoughts, ideas, suggestions, etc. very welcome.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Attachment

Re: asynchronous and vectorized execution

From
Simon Riggs
Date:
On 9 May 2016 at 19:33, Robert Haas <robertmhaas@gmail.com> wrote:
 
I believe there are other people thinking about these
topics as well, including Andres Freund, Kyotaro Horiguchi, and
probably some folks at 2ndQuadrant (but I don't know exactly who). 
 
1. asynchronous execution

Not looking at that.
 
2. vectorized execution...
 
We might also want to consider storing batches of tuples in a column-optimized
rather than row-optimized format so that iterating through one or two
attributes across every tuple in the batch touches the minimal number
of cache lines.

Team is about 2 years into research and coding prototype on those topics at this point, with agreed time for further work over next 2 years.

I'll let my colleagues chime in with details since I'm not involved at that level any more.

--
Simon Riggs                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: asynchronous and vectorized execution

From
David Rowley
Date:
On 10 May 2016 at 05:33, Robert Haas <robertmhaas@gmail.com> wrote:
> 2. vectorized execution, by which I mean the ability of a node to
> return tuples in batches rather than one by one.  Andres has opined
> more than once that repeated trips through ExecProcNode defeat the
> ability of the CPU to do branch prediction correctly, slowing the
> whole system down, and that they also result in poor CPU cache
> behavior, since we jump all over the place executing a little bit of
> code from each node before moving onto the next rather than running
> one bit of code first, and then another later.  I think that's
> probably right.   For example, consider a 5-table join where all of
> the joins are implemented as hash tables.  If this query plan is going
> to be run to completion, it would make much more sense to fetch, say,
> 100 tuples from the driving scan and then probe for all of those in
> the first hash table, and then probe for all of those in the second
> hash table, and so on.  What we do instead is fetch one tuple and
> probe for it in all 5 hash tables, and then repeat.  If one of those
> hash tables would fit in the CPU cache but all five together will not,
> that seems likely to be a lot worse.   But even just ignoring the CPU
> cache aspect of it for a minute, suppose you want to write a loop to
> perform a hash join.  The inner loop fetches the next tuple from the
> probe table and does a hash lookup.  Right now, fetching the next
> tuple from the probe table means calling a function which in turn
> calls another function which probably calls another function which
> probably calls another function and now about 4 layers down we
> actually get the next tuple.  If the scan returned a batch of tuples
> to the hash join, fetching the next tuple from the batch would
> probably be 0 or 1 function calls rather than ... more.  Admittedly,
> you've got to consider the cost of marshaling the batches but I'm
> optimistic that there are cycles to be squeezed out here.  We might
> also want to consider storing batches of tuples in a column-optimized
> rather than row-optimized format so that iterating through one or two
> attributes across every tuple in the batch touches the minimal number
> of cache lines.

It's interesting that you mention this. We identified this as a pain
point during our work on column stores last year. Simply passing
single tuples around the executor is really unfriendly towards L1
instruction cache, plus also the points you mention about L3 cache and
hash tables and tuple stores. I really think that we're likely to see
significant gains by processing >1 tuple at a time, so this topic very
much interests me.

On researching this we've found that other peoples research does
indicate that there are gains to be had:
http://www.openlinksw.com/weblog/oerling/

In that blog there's a table that indicates that this row-store
database saw a 4.4x performance improvement from changing from a
tuple-at-a-time executor to a batch tuple executor.

Batch Size 1 tuple = 122 seconds
Batch Size 10k tuples = 27.7 seconds

When we start multiplying those increases with the increases with
something like parallel query then we're starting to see very nice
gains in performance.

Alvaro, Tomas and I had been discussing this and late last year I did
look into what would be required to allow this to happen in Postgres.
Basically there's 2 sub-projects, I'll describe what I've managed to
learn so far about each, and the rough plan that I have to implement
them:


1. Batch Execution:

a. Modify ScanAPI to allow batch tuple fetching in predefined batch sizes.
b. Modify TupleTableSlot to allow > 1 tuple to be stored. Add flag to
indicate if the struct contains a single or a multiple tuples.
Multiple tuples may need to be deformed in a non-lazy fashion in order
to prevent too many buffers from having to be pinned at once. Tuples
will be deformed into arrays of each column rather than arrays for
each tuple (this part is important to support the next sub-project)
c. Modify some nodes (perhaps start with nodeAgg.c) to allow them to
process a batch TupleTableSlot. This will require some tight loop to
aggregate the entire TupleTableSlot at once before returning.
d. Add function in execAmi.c which returns true or false depending on
if the node supports batch TupleTableSlots or not.
e. At executor startup determine if the entire plan tree supports
batch TupleTableSlots, if so enable batch scan mode.

That at least is my ideas for stage 1. There's still more to work out.
e.g should batch mode occur when the query has a LIMIT? we might not
want to waste time gather up extra tuples when we're just going to
stop after the first one. So perhaps 'e' above should be up to the
planner instead. Further development work here might add a node type
that de-batches a TupleTableSlot to allow nodes which don't support
batching to be in the plan, i.e "mixed execution mode". I'm less
excited about this as it may be difficult to cost that operation,
probably the time would be better spend just batch-enabling the other
node types, which *may* not be all that difficult. I'm also assuming
that batch mode (in all cases apart from queries with LIMIT or
cursors) will always be faster than tuple-at-a-time, so requires no
costings from the planner.

2. Vector processing

(I admit that I've given this part much less thought so far, but
here's what I have in mind)

This depends on batch execution, and is intended to allow the executor
to perform function calls to an entire batch at once, rather than
tuple-at-a-time. For example, let's take the following example;

SELECT a+b FROM t;

here (as of now) we'd scan "t" one row at a time and perform a+b after
having deformed enough of the tuple to do that. We'd then go and get
another Tuple from the scan node and repeat until the scan gave us no
more Tuples.

With batch execution we'd fetch multiple Tuples from the scan and we'd
then perform the call to say int4_pl() multiple times, which still
kinda sucks as it means calling int4_pl() possibly millions of times
(once per tuple). The vector mode here would require that we modify
pg_operator to add a vector function for each operator so that we can
call the function passing in an array of Datums and a length to have
SIMD operations perform the addition, so we'd call something like
int4_pl_vector() only once per batch of tuples allowing the CPU to
perform SIMD operations on those datum arrays. This could be done in
an incremental way as the code could just callback on the standard
function in cases where a vectorised version of it is not available.
Thought is needed here about when exactly this decision is made as the
user may not have permissions to execute the vector function, so it
can't simply be a run time check.  These functions would simply return
another vector of the results.  Aggregates could be given a vector
transition function, where something like COUNT(*)'s vector_transfn
would simply just current_count += vector_length;

This project does appear to require that we bloat the code with 100's
of vector versions of each function. I'm not quite sure if there's a
better way to handle this. The problem is that the fmgr is pretty much
a barrier to SIMD operations, and this was the only idea that I've had
so far about breaking through that barrier. So further ideas here are
very welcome.

The idea here is that these 2 projects help pave the way to bring
columnar storage into PostgreSQL. Without these we're unlikely to get
much benefit of columnar storage as we'd be stuck processing rows one
at a time still.  Adding columnar storage on the top of the above
should further increase performance as we can skip the tuple-deform
step and pull columnar array/vectors directly into a TupleTableSlot,
although some trickery would be involved here when the scan has keys.

I just want to add that both of the above do require more thought. We
realised that this was required quite late in our column store work
(which we've all now taken a break from to work on other things), so
we've had little time to look much further into it. Although I should
be starting work again on this in the next few months in the hopes to
have something, even the most simple version of it in 9.7.

Comments are welcome

-- David Rowley                   http://www.2ndQuadrant.com/PostgreSQL Development, 24x7 Support, Training & Services



Re: asynchronous and vectorized execution

From
Kouhei Kaigai
Date:
> -----Original Message-----
> From: pgsql-hackers-owner@postgresql.org
> [mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Robert Haas
> Sent: Tuesday, May 10, 2016 2:34 AM
> To: pgsql-hackers@postgresql.org
> Subject: [HACKERS] asynchronous and vectorized execution
> 
> Hi,
> 
> I realize that we haven't gotten 9.6beta1 out the door yet, but I
> think we can't really wait much longer to start having at least some
> discussion of 9.7 topics, so I'm going to go ahead and put this one
> out there.  I believe there are other people thinking about these
> topics as well, including Andres Freund, Kyotaro Horiguchi, and
> probably some folks at 2ndQuadrant (but I don't know exactly who).  To
> make a long story short, I think there are several different areas
> where we should consider major upgrades to our executor.  It's too
> slow and it doesn't do everything we want it to do.  The main things
> on my mind are:
> 
> 1. asynchronous execution, by which I mean the ability of a node to
> somehow say that it will generate a tuple eventually, but is not yet
> ready, so that the executor can go run some other part of the plan
> tree while it waits.  This case most obviously arises for foreign
> tables, where it makes little sense to block on I/O if some other part
> of the query tree could benefit from the CPU; consider SELECT * FROM
> lt WHERE qual UNION SELECT * FROM ft WHERE qual.  It is also a problem
> for parallel query: in a parallel sequential scan, the next worker can
> begin reading the next block even if the current block hasn't yet been
> received from the OS.  Whether or not this will be efficient is a
> research question, but it can be done.  However, imagine a parallel
> scan of a btree index: we don't know what page to scan next until we
> read the previous page and examine the next-pointer.  In the meantime,
> any worker that arrives at that scan node has no choice but to block.
> It would be better if the scan node could instead say "hey, thanks for
> coming but I'm really not ready to be on-CPU just at the moment" and
> potentially allow the worker to go work in some other part of the
> query tree.  For that worker to actually find useful work to do
> elsewhere, we'll probably need it to be the case either that the table
> is partitioned or the original query will need to involve UNION ALL,
> but those are not silly cases to worry about, particularly if we get
> native partitioning in 9.7.
>
Is the parallel aware Append node sufficient to run multiple nodes
asynchronously? (Sorry, I couldn't have enough time to code the feature
even though we had discussion before.)
If a part of child-nodes are blocked by I/O or other heavy stuff, it
cannot enqueue the results into shm_mq, thus, Gather node naturally
skip nodes that are not ready.
In the above example, scan on foreign-table takes longer lead time than
local scan. If Append can map every child nodes on individual workers,
local scan worker begins to return tuples at first, then, mixed tuples
shall be returned eventually.

However, the process internal asynchronous execution may be also beneficial
in case when cost of shm_mq is not ignorable (e.g, no scan qualifiers
are given to worker process). I think it allows to implement pre-fetching
very naturally.

> 2. vectorized execution, by which I mean the ability of a node to
> return tuples in batches rather than one by one.  Andres has opined
> more than once that repeated trips through ExecProcNode defeat the
> ability of the CPU to do branch prediction correctly, slowing the
> whole system down, and that they also result in poor CPU cache
> behavior,

My concern about ExecProcNode is, it is constructed with a large switch
... case statement. It involves tons of comparison operation at run-time.
If we replace this switch ... case by function pointer, probably, it make
performance improvement. Especially, OLAP workloads that process large
amount of rows.

> since we jump all over the place executing a little bit of
> code from each node before moving onto the next rather than running
> one bit of code first, and then another later.  I think that's
> probably right.   For example, consider a 5-table join where all of
> the joins are implemented as hash tables.  If this query plan is going
> to be run to completion, it would make much more sense to fetch, say,
> 100 tuples from the driving scan and then probe for all of those in
> the first hash table, and then probe for all of those in the second
> hash table, and so on.  What we do instead is fetch one tuple and
> probe for it in all 5 hash tables, and then repeat.  If one of those
> hash tables would fit in the CPU cache but all five together will not,
> that seems likely to be a lot worse.
>
I can agree with the above concern from my experience. Each HashJoin
step needs to fill up TupleTableSlot for each depth. Mostly, it is
just relocation of the attributes in case of multi-tables joins.

If HashJoin could gather five underlying hash-tables at once, it can
reduce unnecessary setup of intermediation tuples.
A position example is GpuHashJoin in PG-Strom. It constructs multi-
relations hash table, then, produce joined tuples at once.
Its performance is generally good.

> But even just ignoring the CPU
> cache aspect of it for a minute, suppose you want to write a loop to
> perform a hash join.  The inner loop fetches the next tuple from the
> probe table and does a hash lookup.  Right now, fetching the next
> tuple from the probe table means calling a function which in turn
> calls another function which probably calls another function which
> probably calls another function and now about 4 layers down we
> actually get the next tuple.  If the scan returned a batch of tuples
> to the hash join, fetching the next tuple from the batch would
> probably be 0 or 1 function calls rather than ... more.  Admittedly,
> you've got to consider the cost of marshaling the batches but I'm
> optimistic that there are cycles to be squeezed out here.  We might
> also want to consider storing batches of tuples in a column-optimized
> rather than row-optimized format so that iterating through one or two
> attributes across every tuple in the batch touches the minimal number
> of cache lines.
>
> Obviously, both of these are big projects that could touch a large
> amount of executor code, and there may be other ideas, in addition to
> these, which some of you may be thinking about that could also touch a
> large amount of executor code.  It would be nice to agree on a way
> forward that minimizes code churn and maximizes everyone's attempt to
> contribute without conflicting with each other.  Also, it seems
> desirable to enable, as far as possible, incremental development - in
> particular, it seems to me that it would be good to pick a design that
> doesn't require massive changes to every node all at once.  A single
> patch that adds some capability to every node in the executor in one
> fell swoop is going to be too large to review effectively.
> 
> My proposal for how to do this is to make ExecProcNode function as a
> backward-compatibility wrapper.  For asynchronous execution, a node
> might return a not-ready-yet indication, but if that node is called
> via ExecProcNode, it means the caller isn't prepared to receive such
> an indication, so ExecProcNode will just wait for the node to become
> ready and then return the tuple.
>
Backward compatibility is good. In addition, child node may want to
know the context when it is called. It may want to switch the behavior
according to the caller's expectation. For example, it may be beneficial
if SeqScan makes more aggressive prefetching on asynchronous execution.

Also, can we consider which data format will be returned from the child
node during the planning stage? It affects to the cost of inter-node
data exchange. If a pair of parent-node and child-node supports its
special data format (like as existing HashJoin and Hash doing), it shall
be a discount factor of cost estimation.

> Similarly, for vectorized execution,
> a node might return a bunch of tuples all at once.  ExecProcNode will
> extract the first one and return it to the caller, and subsequent
> calls to ExecProcNode will iterate through the rest of the batch, only
> calling the underlying node-specific function when the batch is
> exhausted.  In this way, code that doesn't know about the new stuff
> can continue to work pretty much as it does today.  Also, and I think
> this is important, nodes don't need the permission of their parent
> node to use these new capabilities.  They can use them whenever they
> wish, without worrying about whether the upper node is prepared to
> deal with it.  If not, ExecProcNode will paper over the problem.  This
> seems to me to be a good way to keep the code simple.
>
> For asynchronous execution, I have gone so far as to mock up a bit of
> what this might look like.  This shouldn't be taken very seriously at
> this point, but I'm attaching a few very-much-WIP patches to show the
> direction of my line of thinking.  Basically, I propose to have
> ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
> by putting them into a new PlanState member called "result", which is
> just a Node * so that we can support multiple types of results,
> instead of returning them.  There is also a result_ready boolean, so
> that a node can return without setting this Boolean to engage
> asynchronous behavior.  This triggers an "event loop", which
> repeatedly waits for FDs chosen by waiting nodes to become readable
> and/or writeable and then gives the node a chance to react.
> Eventually, the waiting node will stop waiting and have a result
> ready, at which point the event loop will give the parent of that node
> a chance to run.  If that node consequently becomes ready, then its
> parent gets a chance to run.  Eventually (we hope), the node for which
> we're waiting becomes ready, and we can then read a result tuple.
> With some more work, this seems like it can handle the FDW case, but I
> haven't worked out how to make it handle the related parallel query
> case.  What we want there is to wait not for the readiness of an FD
> but rather for some other process involved in the parallel query to
> reach a point where it can welcome assistance executing that node.  I
> don't know exactly what the signaling for that should look like yet -
> maybe setting the process latch or something.
> 
> By the way, one smaller executor project that I think we should also
> look at has to do with this comment in nodeSeqScan.c:
> 
> static bool
> SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
> {
>         /*
>          * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
>          * (and this is very bad) - so, here we do not check are keys ok or not.
>          */
>         return true;
> }
> 
> Some quick prototyping by my colleague Dilip Kumar suggests that, in
> fact, there are cases where pushing down keys into heap_beginscan()
> could be significantly faster.  Some care is required here because any
> functions we execute as scan keys are run with the buffer locked, so
> we had better not run anything very complicated.  But doing this for
> simple things like integer equality operators seems like it could save
> quite a few buffer lock/unlock cycles and some other executor overhead
> as well.
> 
> Thoughts, ideas, suggestions, etc. very welcome.
>

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>


Re: asynchronous and vectorized execution

From
Greg Stark
Date:
<p dir="ltr"><br /> On 9 May 2016 8:34 pm, "David Rowley" <<a
href="mailto:david.rowley@2ndquadrant.com">david.rowley@2ndquadrant.com</a>>wrote:<br /> ><br /> > This
projectdoes appear to require that we bloat the code with 100's<br /> > of vector versions of each function. I'm not
quitesure if there's a<br /> > better way to handle this. The problem is that the fmgr is pretty much<br /> > a
barrierto SIMD operations, and this was the only idea that I've had<br /> > so far about breaking through that
barrier.So further ideas here are<br /> > very welcome.<p dir="ltr">Well yes and no. In practice I think you only
needto worry about vectorised versions of integer and possibly float. For other data types there either aren't
vectorisedoperators or there's little using them.<p dir="ltr">And I'll make a bold claim here that the only operators I
thinkreally matter are =<p dir="ltr">The rain is because using SIMD instructions is a minor win if you have any further
workto do per tuple. The only time it's a big win is if you're eliminating entire tuples from consideration
efficiently.= is going to do that often, other btree operator classes might be somewhat useful, but things like +
reallyonly would come up in odd examples.<p dir="ltr">But even that understates things. If you have column oriented
storagethen = becomes even more important since every scan has a series of implied equijoins to reconstruct the tuple.
Andthe coup de grace is that in a column oriented storage you try to store variable length data as integer indexes into
adictionary of common values so *everything* is an integer = operation.<p dir="ltr">How to do this without punching
rightthrough the executor as an abstraction and still supporting extensible data types and operators was puzzling me
already.I do think it involves having these vector operators in the catalogue and also some kind of compression mapping
tointeger indexes. But I'm not sure that's all that would be needed. 

Re: asynchronous and vectorized execution

From
David Rowley
Date:
On 10 May 2016 at 13:38, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
> My concern about ExecProcNode is, it is constructed with a large switch
> ... case statement. It involves tons of comparison operation at run-time.
> If we replace this switch ... case by function pointer, probably, it make
> performance improvement. Especially, OLAP workloads that process large
> amount of rows.

I imagined that any decent compiler would have built the code to use
jump tables for this. I have to say that I've never checked to make
sure though.


-- David Rowley                   http://www.2ndQuadrant.com/PostgreSQL Development, 24x7 Support, Training & Services



Re: asynchronous and vectorized execution

From
Kouhei Kaigai
Date:
> -----Original Message-----
> From: pgsql-hackers-owner@postgresql.org
> [mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of David Rowley
> Sent: Tuesday, May 10, 2016 2:01 PM
> To: Kaigai Kouhei(海外 浩平)
> Cc: Robert Haas; pgsql-hackers@postgresql.org
> Subject: Re: [HACKERS] asynchronous and vectorized execution
> 
> On 10 May 2016 at 13:38, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
> > My concern about ExecProcNode is, it is constructed with a large switch
> > ... case statement. It involves tons of comparison operation at run-time.
> > If we replace this switch ... case by function pointer, probably, it make
> > performance improvement. Especially, OLAP workloads that process large
> > amount of rows.
> 
> I imagined that any decent compiler would have built the code to use
> jump tables for this. I have to say that I've never checked to make
> sure though.
>
Ah, indeed, you are right. Please forget above part.

In GCC 4.8.5, the case label between T_ResultState and T_LimitState were
handled using jump table.

TupleTableSlot *
ExecProcNode(PlanState *node)
{       :     <snip>       :   switch (nodeTag(node)) 5ad361:   8b 03                   mov    (%rbx),%eax 5ad363:   2d
c900 00 00          sub    $0xc9,%eax 5ad368:   83 f8 24                cmp    $0x24,%eax 5ad36b:   0f 87 4f 02 00 00
   ja     5ad5c0 <ExecProcNode+0x290> 5ad371:   ff 24 c5 68 48 8b 00    jmpq   *0x8b4868(,%rax,8) 5ad378:   0f 1f 84 00
0000 00    nopl   0x0(%rax,%rax,1) 5ad37f:   00
 

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

Re: asynchronous and vectorized execution

From
David Rowley
Date:
On 10 May 2016 at 16:34, Greg Stark <stark@mit.edu> wrote:
>
> On 9 May 2016 8:34 pm, "David Rowley" <david.rowley@2ndquadrant.com> wrote:
>>
>> This project does appear to require that we bloat the code with 100's
>> of vector versions of each function. I'm not quite sure if there's a
>> better way to handle this. The problem is that the fmgr is pretty much
>> a barrier to SIMD operations, and this was the only idea that I've had
>> so far about breaking through that barrier. So further ideas here are
>> very welcome.
>
> Well yes and no. In practice I think you only need to worry about vectorised
> versions of integer and possibly float. For other data types there either
> aren't vectorised operators or there's little using them.
>
> And I'll make a bold claim here that the only operators I think really
> matter are =
>
> The rain is because using SIMD instructions is a minor win if you have any
> further work to do per tuple. The only time it's a big win is if you're
> eliminating entire tuples from consideration efficiently. = is going to do
> that often, other btree operator classes might be somewhat useful, but
> things like + really only would come up in odd examples.
>
> But even that understates things. If you have column oriented storage then =
> becomes even more important since every scan has a series of implied
> equijoins to reconstruct the tuple. And the coup de grace is that in a
> column oriented storage you try to store variable length data as integer
> indexes into a dictionary of common values so *everything* is an integer =
> operation.
>
> How to do this without punching right through the executor as an abstraction
> and still supporting extensible data types and operators was puzzling me
> already. I do think it involves having these vector operators in the
> catalogue and also some kind of compression mapping to integer indexes. But
> I'm not sure that's all that would be needed.

Perhaps the first move to make on this front will be for aggregate
functions. Experimentation might be quite simple to realise which
functions will bring enough benefit. I imagined that even Datums where
the type is not processor native might yield a small speedup, not from
SIMD, but just from less calls through fmgr. Perhaps we'll realise
that those are not worth the trouble, I've no idea at this stage.

-- David Rowley                   http://www.2ndQuadrant.com/PostgreSQL Development, 24x7 Support, Training & Services



Re: asynchronous and vectorized execution

From
Pavel Stehule
Date:


2016-05-10 8:05 GMT+02:00 David Rowley <david.rowley@2ndquadrant.com>:
On 10 May 2016 at 16:34, Greg Stark <stark@mit.edu> wrote:
>
> On 9 May 2016 8:34 pm, "David Rowley" <david.rowley@2ndquadrant.com> wrote:
>>
>> This project does appear to require that we bloat the code with 100's
>> of vector versions of each function. I'm not quite sure if there's a
>> better way to handle this. The problem is that the fmgr is pretty much
>> a barrier to SIMD operations, and this was the only idea that I've had
>> so far about breaking through that barrier. So further ideas here are
>> very welcome.
>
> Well yes and no. In practice I think you only need to worry about vectorised
> versions of integer and possibly float. For other data types there either
> aren't vectorised operators or there's little using them.
>
> And I'll make a bold claim here that the only operators I think really
> matter are =
>
> The rain is because using SIMD instructions is a minor win if you have any
> further work to do per tuple. The only time it's a big win is if you're
> eliminating entire tuples from consideration efficiently. = is going to do
> that often, other btree operator classes might be somewhat useful, but
> things like + really only would come up in odd examples.
>
> But even that understates things. If you have column oriented storage then =
> becomes even more important since every scan has a series of implied
> equijoins to reconstruct the tuple. And the coup de grace is that in a
> column oriented storage you try to store variable length data as integer
> indexes into a dictionary of common values so *everything* is an integer =
> operation.
>
> How to do this without punching right through the executor as an abstraction
> and still supporting extensible data types and operators was puzzling me
> already. I do think it involves having these vector operators in the
> catalogue and also some kind of compression mapping to integer indexes. But
> I'm not sure that's all that would be needed.

Perhaps the first move to make on this front will be for aggregate
functions. Experimentation might be quite simple to realise which
functions will bring enough benefit. I imagined that even Datums where
the type is not processor native might yield a small speedup, not from
SIMD, but just from less calls through fmgr. Perhaps we'll realise
that those are not worth the trouble, I've no idea at this stage.

It can be reduced to sum and count in first iteration. On other hand lot of OLAP reports is based on pretty complex expressions - and there probably the compilation is better way.

Regards

Pavel
 

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: asynchronous and vectorized execution

From
konstantin knizhnik
Date:
Hi,

1. asynchronous execution,

It seems to me that asynchronous execution can be considered as alternative to multithreading model (in case of PostgreSQL the roles of threads are played by workers).
Async. operations are used to have smaller overhead but have scalability problems (because in most implementation of cooperative multitasking there is just one processing thread and so it can not consume more than one core).

So I wonder whether asynchronous execution is trying to achieve that same goal as parallel query execution but using slightly different mechanism.
You wrote: 
in the meantime, any worker that arrives at that scan node has no choice but to block.

What's wrong with it that worker is blocked? You can just have more workers (more than CPU cores) to let other of them continue to do useful work.
But I agree that 
Whether or not this will be efficient is a research question



2. vectorized execution

Vector IO is very important for columnar store. In IMCS extension (in-memory columnar store) using vector operations allows to increase speed 10-100 times depending on size of data set and query. Obviously the best results are for grand aggregates.

But there are some researches, for example:

http://www.vldb.org/pvldb/vol4/p539-neumann.pdf

showing that the same or even better effect can be achieved by generation native code for query execution plan (which is not so difficult now, thanks to LLVM).
It eliminates interpretation overhead and increase cache locality.
I get similar results with my own experiments of accelerating SparkSQL. Instead of native code generation I used conversion of query plans to C code and experiment with different data representation. "Horisontal model" with loading columns on demands shows better performance than columnar store.

As far as I know native code generator is currently developed for PostgreSQL by ISP RAN 
Sorry, slides in Russian:
https://pgconf.ru/media/2016/02/19/6%20Мельник%20Дмитрий%20Михайлович,%2005-02-2016.pdf

At this moment (February) them have implemented translation of only few PostgreSQL operators used by ExecQuals  and do not support aggregates.
Them get about 2 times increase of speed at synthetic queries and 25% increase at TPC-H Q1 (for Q1 most critical is generation of native code for aggregates, because ExecQual itself takes only 6% of time for this query).
Actually these 25% for Q1 were achieved not by using dynamic code generation, but switching from PULL to PUSH model in executor.
It seems to be yet another interesting PostgreSQL executor transformation.
As far as I know, them are going to publish result of their work to open source...



On May 9, 2016, at 8:33 PM, Robert Haas wrote:

Hi,

I realize that we haven't gotten 9.6beta1 out the door yet, but I
think we can't really wait much longer to start having at least some
discussion of 9.7 topics, so I'm going to go ahead and put this one
out there.  I believe there are other people thinking about these
topics as well, including Andres Freund, Kyotaro Horiguchi, and
probably some folks at 2ndQuadrant (but I don't know exactly who).  To
make a long story short, I think there are several different areas
where we should consider major upgrades to our executor.  It's too
slow and it doesn't do everything we want it to do.  The main things
on my mind are:

1. asynchronous execution, by which I mean the ability of a node to
somehow say that it will generate a tuple eventually, but is not yet
ready, so that the executor can go run some other part of the plan
tree while it waits.  This case most obviously arises for foreign
tables, where it makes little sense to block on I/O if some other part
of the query tree could benefit from the CPU; consider SELECT * FROM
lt WHERE qual UNION SELECT * FROM ft WHERE qual.  It is also a problem
for parallel query: in a parallel sequential scan, the next worker can
begin reading the next block even if the current block hasn't yet been
received from the OS.  Whether or not this will be efficient is a
research question, but it can be done.  However, imagine a parallel
scan of a btree index: we don't know what page to scan next until we
read the previous page and examine the next-pointer.  In the meantime,
any worker that arrives at that scan node has no choice but to block.
It would be better if the scan node could instead say "hey, thanks for
coming but I'm really not ready to be on-CPU just at the moment" and
potentially allow the worker to go work in some other part of the
query tree.  For that worker to actually find useful work to do
elsewhere, we'll probably need it to be the case either that the table
is partitioned or the original query will need to involve UNION ALL,
but those are not silly cases to worry about, particularly if we get
native partitioning in 9.7.

2. vectorized execution, by which I mean the ability of a node to
return tuples in batches rather than one by one.  Andres has opined
more than once that repeated trips through ExecProcNode defeat the
ability of the CPU to do branch prediction correctly, slowing the
whole system down, and that they also result in poor CPU cache
behavior, since we jump all over the place executing a little bit of
code from each node before moving onto the next rather than running
one bit of code first, and then another later.  I think that's
probably right.   For example, consider a 5-table join where all of
the joins are implemented as hash tables.  If this query plan is going
to be run to completion, it would make much more sense to fetch, say,
100 tuples from the driving scan and then probe for all of those in
the first hash table, and then probe for all of those in the second
hash table, and so on.  What we do instead is fetch one tuple and
probe for it in all 5 hash tables, and then repeat.  If one of those
hash tables would fit in the CPU cache but all five together will not,
that seems likely to be a lot worse.   But even just ignoring the CPU
cache aspect of it for a minute, suppose you want to write a loop to
perform a hash join.  The inner loop fetches the next tuple from the
probe table and does a hash lookup.  Right now, fetching the next
tuple from the probe table means calling a function which in turn
calls another function which probably calls another function which
probably calls another function and now about 4 layers down we
actually get the next tuple.  If the scan returned a batch of tuples
to the hash join, fetching the next tuple from the batch would
probably be 0 or 1 function calls rather than ... more.  Admittedly,
you've got to consider the cost of marshaling the batches but I'm
optimistic that there are cycles to be squeezed out here.  We might
also want to consider storing batches of tuples in a column-optimized
rather than row-optimized format so that iterating through one or two
attributes across every tuple in the batch touches the minimal number
of cache lines.

Obviously, both of these are big projects that could touch a large
amount of executor code, and there may be other ideas, in addition to
these, which some of you may be thinking about that could also touch a
large amount of executor code.  It would be nice to agree on a way
forward that minimizes code churn and maximizes everyone's attempt to
contribute without conflicting with each other.  Also, it seems
desirable to enable, as far as possible, incremental development - in
particular, it seems to me that it would be good to pick a design that
doesn't require massive changes to every node all at once.  A single
patch that adds some capability to every node in the executor in one
fell swoop is going to be too large to review effectively.

My proposal for how to do this is to make ExecProcNode function as a
backward-compatibility wrapper.  For asynchronous execution, a node
might return a not-ready-yet indication, but if that node is called
via ExecProcNode, it means the caller isn't prepared to receive such
an indication, so ExecProcNode will just wait for the node to become
ready and then return the tuple.  Similarly, for vectorized execution,
a node might return a bunch of tuples all at once.  ExecProcNode will
extract the first one and return it to the caller, and subsequent
calls to ExecProcNode will iterate through the rest of the batch, only
calling the underlying node-specific function when the batch is
exhausted.  In this way, code that doesn't know about the new stuff
can continue to work pretty much as it does today.  Also, and I think
this is important, nodes don't need the permission of their parent
node to use these new capabilities.  They can use them whenever they
wish, without worrying about whether the upper node is prepared to
deal with it.  If not, ExecProcNode will paper over the problem.  This
seems to me to be a good way to keep the code simple.

For asynchronous execution, I have gone so far as to mock up a bit of
what this might look like.  This shouldn't be taken very seriously at
this point, but I'm attaching a few very-much-WIP patches to show the
direction of my line of thinking.  Basically, I propose to have
ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
by putting them into a new PlanState member called "result", which is
just a Node * so that we can support multiple types of results,
instead of returning them.  There is also a result_ready boolean, so
that a node can return without setting this Boolean to engage
asynchronous behavior.  This triggers an "event loop", which
repeatedly waits for FDs chosen by waiting nodes to become readable
and/or writeable and then gives the node a chance to react.
Eventually, the waiting node will stop waiting and have a result
ready, at which point the event loop will give the parent of that node
a chance to run.  If that node consequently becomes ready, then its
parent gets a chance to run.  Eventually (we hope), the node for which
we're waiting becomes ready, and we can then read a result tuple.
With some more work, this seems like it can handle the FDW case, but I
haven't worked out how to make it handle the related parallel query
case.  What we want there is to wait not for the readiness of an FD
but rather for some other process involved in the parallel query to
reach a point where it can welcome assistance executing that node.  I
don't know exactly what the signaling for that should look like yet -
maybe setting the process latch or something.

By the way, one smaller executor project that I think we should also
look at has to do with this comment in nodeSeqScan.c:

static bool
SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
{
       /*
        * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
        * (and this is very bad) - so, here we do not check are keys ok or not.
        */
       return true;
}

Some quick prototyping by my colleague Dilip Kumar suggests that, in
fact, there are cases where pushing down keys into heap_beginscan()
could be significantly faster.  Some care is required here because any
functions we execute as scan keys are run with the buffer locked, so
we had better not run anything very complicated.  But doing this for
simple things like integer equality operators seems like it could save
quite a few buffer lock/unlock cycles and some other executor overhead
as well.

Thoughts, ideas, suggestions, etc. very welcome.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
<0001-Modify-PlanState-to-include-a-pointer-to-the-parent-.patch><0002-Modify-PlanState-to-have-result-result_ready-fields.patch><0003-Lightweight-framework-for-waiting-for-events.patch>
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Hello.

At Mon, 9 May 2016 13:33:55 -0400, Robert Haas <robertmhaas@gmail.com> wrote in
<CA+Tgmobx8su_bYtAa3DgrqB+R7xZG6kHRj0ccMUUshKAQVftww@mail.gmail.com>
> Hi,
> 
> I realize that we haven't gotten 9.6beta1 out the door yet, but I
> think we can't really wait much longer to start having at least some
> discussion of 9.7 topics, so I'm going to go ahead and put this one
> out there.  I believe there are other people thinking about these
> topics as well, including Andres Freund, Kyotaro Horiguchi, and
> probably some folks at 2ndQuadrant (but I don't know exactly who).  To
> make a long story short, I think there are several different areas
> where we should consider major upgrades to our executor.  It's too
> slow and it doesn't do everything we want it to do.  The main things
> on my mind are:

> 1. asynchronous execution, by which I mean the ability of a node to
> somehow say that it will generate a tuple eventually, but is not yet
> ready, so that the executor can go run some other part of the plan
> tree while it waits.  This case most obviously arises for foreign
> tables, where it makes little sense to block on I/O if some other part
> of the query tree could benefit from the CPU; consider SELECT * FROM
> lt WHERE qual UNION SELECT * FROM ft WHERE qual.

This is my main concern and what I wanted to solve.

> It is also a problem
> for parallel query: in a parallel sequential scan, the next worker can
> begin reading the next block even if the current block hasn't yet been
> received from the OS.  Whether or not this will be efficient is a
> research question, but it can be done.  However, imagine a parallel
> scan of a btree index: we don't know what page to scan next until we
> read the previous page and examine the next-pointer.  In the meantime,
> any worker that arrives at that scan node has no choice but to block.
> It would be better if the scan node could instead say "hey, thanks for
> coming but I'm really not ready to be on-CPU just at the moment" and
> potentially allow the worker to go work in some other part of the
> query tree.

Especially for foreign tables, there must be gaps between sending
FETCH and getting the result. Visiting other tables is very
effective to fill the gaps. Using file descriptors is greatly
helps this in effective way, thanks to the new API
WaitEventSet. The attached is a WiP of PoC (sorry for including
some debug code and irrelevant code) of that. It is a bit
different in Exec* APIs from the 0002 patch but works even only
for postgres-fdw and append. It embeds waiting code into
ExecAppend but easily replaceable with the framework in the
Robert's 0003 patch.

Apart from the core part, for postgres-fdw, some scans resides
together on one connection. These scans share the same FD but
there's no means to identify for which scan-node the fd is
signalled. To handle the situation, we might need 'seemed to be
ready but really not' route.

> For that worker to actually find useful work to do
> elsewhere, we'll probably need it to be the case either that the table
> is partitioned or the original query will need to involve UNION ALL,
> but those are not silly cases to worry about, particularly if we get
> native partitioning in 9.7.

One annoyance of this method is one FD with latch-like data
drain. Since we should provide FDs for such nodes, gather would
may have another data-passing channel on the FDs.

And I want to realize early-execution of async nodes. This might
need that all types of node return 'not-ready' for the first call
even if it is async-capable.

> 2. vectorized execution, by which I mean the ability of a node to
> return tuples in batches rather than one by one.  Andres has opined
> more than once that repeated trips through ExecProcNode defeat the
> ability of the CPU to do branch prediction correctly, slowing the
> whole system down, and that they also result in poor CPU cache
> behavior, since we jump all over the place executing a little bit of
> code from each node before moving onto the next rather than running
> one bit of code first, and then another later.  I think that's
> probably right.   For example, consider a 5-table join where all of
> the joins are implemented as hash tables.  If this query plan is going
> to be run to completion, it would make much more sense to fetch, say,
> 100 tuples from the driving scan and then probe for all of those in
> the first hash table, and then probe for all of those in the second
> hash table, and so on.  What we do instead is fetch one tuple and
> probe for it in all 5 hash tables, and then repeat.  If one of those
> hash tables would fit in the CPU cache but all five together will not,
> that seems likely to be a lot worse.   But even just ignoring the CPU
> cache aspect of it for a minute, suppose you want to write a loop to
> perform a hash join.  The inner loop fetches the next tuple from the
> probe table and does a hash lookup.  Right now, fetching the next
> tuple from the probe table means calling a function which in turn
> calls another function which probably calls another function which
> probably calls another function and now about 4 layers down we
> actually get the next tuple.  If the scan returned a batch of tuples
> to the hash join, fetching the next tuple from the batch would
> probably be 0 or 1 function calls rather than ... more.  Admittedly,
> you've got to consider the cost of marshaling the batches but I'm
> optimistic that there are cycles to be squeezed out here.  We might
> also want to consider storing batches of tuples in a column-optimized
> rather than row-optimized format so that iterating through one or two
> attributes across every tuple in the batch touches the minimal number
> of cache lines.
> 
> Obviously, both of these are big projects that could touch a large
> amount of executor code, and there may be other ideas, in addition to
> these, which some of you may be thinking about that could also touch a
> large amount of executor code.  It would be nice to agree on a way
> forward that minimizes code churn and maximizes everyone's attempt to
> contribute without conflicting with each other.  Also, it seems
> desirable to enable, as far as possible, incremental development - in
> particular, it seems to me that it would be good to pick a design that
> doesn't require massive changes to every node all at once.  A single
> patch that adds some capability to every node in the executor in one
> fell swoop is going to be too large to review effectively.
> 
> My proposal for how to do this is to make ExecProcNode function as a
> backward-compatibility wrapper.  For asynchronous execution, a node
> might return a not-ready-yet indication, but if that node is called
> via ExecProcNode, it means the caller isn't prepared to receive such
> an indication, so ExecProcNode will just wait for the node to become
> ready and then return the tuple.  Similarly, for vectorized execution,
> a node might return a bunch of tuples all at once.  ExecProcNode will
> extract the first one and return it to the caller, and subsequent
> calls to ExecProcNode will iterate through the rest of the batch, only
> calling the underlying node-specific function when the batch is
> exhausted.  In this way, code that doesn't know about the new stuff
> can continue to work pretty much as it does today.  Also, and I think
> this is important, nodes don't need the permission of their parent
> node to use these new capabilities.  They can use them whenever they
> wish, without worrying about whether the upper node is prepared to
> deal with it.  If not, ExecProcNode will paper over the problem.  This
> seems to me to be a good way to keep the code simple.

Agreed to returning not-ready state and wrapping nodes to
disguise old-style API, but I suppose Exec* may return a tuple as
it does corrently.

> For asynchronous execution, I have gone so far as to mock up a bit of
> what this might look like.  This shouldn't be taken very seriously at
> this point, but I'm attaching a few very-much-WIP patches to show the
> direction of my line of thinking.  Basically, I propose to have
> ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
> by putting them into a new PlanState member called "result", which is
> just a Node * so that we can support multiple types of results,
> instead of returning them.  There is also a result_ready boolean, so
> that a node can return without setting this Boolean to engage
> asynchronous behavior.  This triggers an "event loop", which
> repeatedly waits for FDs chosen by waiting nodes to become readable
> and/or writeable and then gives the node a chance to react.
> Eventually, the waiting node will stop waiting and have a result
> ready, at which point the event loop will give the parent of that node
> a chance to run.  If that node consequently becomes ready, then its
> parent gets a chance to run.  Eventually (we hope), the node for which
> we're waiting becomes ready, and we can then read a result tuple.

I thought almost the same, even only for AppendNode..

> With some more work, this seems like it can handle the FDW case, but I
> haven't worked out how to make it handle the related parallel query
> case.  What we want there is to wait not for the readiness of an FD
> but rather for some other process involved in the parallel query to
> reach a point where it can welcome assistance executing that node.  I
> don't know exactly what the signaling for that should look like yet -
> maybe setting the process latch or something.

Agreed as described above.

> By the way, one smaller executor project that I think we should also
> look at has to do with this comment in nodeSeqScan.c:
> 
> static bool
> SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
> {
>         /*
>          * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
>          * (and this is very bad) - so, here we do not check are keys ok or not.
>          */
>         return true;
> }
> 
> Some quick prototyping by my colleague Dilip Kumar suggests that, in
> fact, there are cases where pushing down keys into heap_beginscan()
> could be significantly faster.  Some care is required here because any
> functions we execute as scan keys are run with the buffer locked, so
> we had better not run anything very complicated.  But doing this for
> simple things like integer equality operators seems like it could save
> quite a few buffer lock/unlock cycles and some other executor overhead
> as well.

The cost of pushing-down keys on seqscans seems calucalatable
with a maybe-small amount of computation. So I suppose it is
promising.

> Thoughts, ideas, suggestions, etc. very welcome.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 2f49268..49e334f 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -120,6 +120,14 @@ enum FdwDirectModifyPrivateIndex    FdwDirectModifyPrivateSetProcessed};
+typedef enum PgFdwFetchState
+{
+    PGFDWFETCH_IDLE,
+    PGFDWFETCH_WAITING,
+    PGFDWFETCH_READY,
+    PGFDWFETCH_EOF
+} PgFdwFetchState;
+/* * Execution state of a foreign scan using postgres_fdw. */
@@ -151,6 +159,8 @@ typedef struct PgFdwScanState    /* batch-level state, for optimizing rewinds and avoiding useless
fetch*/    int            fetch_ct_2;        /* Min(# of fetches done, 2) */    bool        eof_reached;    /* true if
lastfetch reached EOF */
 
+    bool        is_async;
+    PgFdwFetchState fetch_status;    /* working memory contexts */    MemoryContext batch_cxt;    /* context holding
currentbatch of tuples */
 
@@ -1248,7 +1258,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)     */    fsstate = (PgFdwScanState
*)palloc0(sizeof(PgFdwScanState));    node->fdw_state = (void *) fsstate;
 
-
+    fsstate->is_async = ((eflags & EXEC_FLAG_ASYNC) != 0);    /*     * Obtain the foreign server where to connect and
usermapping to use for     * connection. For base relations we obtain this information from
 
@@ -1287,6 +1297,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)     */    fsstate->conn =
GetConnection(user,false);
 
+    /* Set a waiting fd to allow asynchronous waiting in upper node */
+    node->ss.ps.fd = PQsocket(fsstate->conn);
+    /* Assign a unique ID for my cursor */    fsstate->cursor_number = GetCursorNumber(fsstate->conn);
fsstate->cursor_exists= false;
 
@@ -1359,12 +1372,22 @@ postgresIterateForeignScan(ForeignScanState *node)     */    if (fsstate->next_tuple >=
fsstate->num_tuples)   {
 
-        /* No point in another fetch if we already detected EOF, though. */
-        if (!fsstate->eof_reached)
-            fetch_more_data(node);
-        /* If we didn't get any tuples, must be end of data. */
-        if (fsstate->next_tuple >= fsstate->num_tuples)
+        fetch_more_data(node);
+        if (fsstate->fetch_status == PGFDWFETCH_WAITING)
+        {
+            /*
+             * fetch_more_data just sent the asynchronous query for next
+             * output, so ask the caller to visit the next table.
+             */
+            node->ss.ps.exec_status = EXEC_NOT_READY;
+            return ExecClearTuple(slot);
+        }
+        else if (fsstate->fetch_status == PGFDWFETCH_EOF)
+        {
+            /* fetch_more_data give no more tuples */
+            node->ss.ps.exec_status = EXEC_EOT;            return ExecClearTuple(slot);
+        }    }    /*
@@ -2872,7 +2895,9 @@ fetch_more_data(ForeignScanState *node)    PgFdwScanState *fsstate = (PgFdwScanState *)
node->fdw_state;   PGresult   *volatile res = NULL;    MemoryContext oldcontext;
 
-
+    PGconn       *conn = fsstate->conn;
+    char        sql[64];
+        /*     * We'll store the tuples in the batch_cxt.  First, flush the previous     * batch.
@@ -2881,18 +2906,51 @@ fetch_more_data(ForeignScanState *node)    MemoryContextReset(fsstate->batch_cxt);
oldcontext= MemoryContextSwitchTo(fsstate->batch_cxt);
 
+
+    snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+             fsstate->fetch_size, fsstate->cursor_number);
+    if (fsstate->fetch_status != PGFDWFETCH_WAITING)
+    {
+        /*
+         * If we reached the final tuple in previous call, no more tuple will
+         * be fetched this time.
+         */
+        if (fsstate->eof_reached)
+        {
+            fsstate->fetch_status = PGFDWFETCH_EOF;
+            return;
+        }
+
+        if (!PQsendQuery(conn, sql))
+            pgfdw_report_error(ERROR, NULL, conn, false, sql);
+        fsstate->fetch_status = PGFDWFETCH_WAITING;
+
+        /*
+         * When currently on a connection running asynchronous fetching, we
+         * return immediately here.
+         */
+        if (fsstate->is_async)
+            return;
+    }
+    else
+    {
+        Assert(fsstate->is_async);
+        if (!PQconsumeInput(conn))
+            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+                
+        if (PQisBusy(conn))
+            return;
+    }
+    /* PGresult must be released before leaving this function. */    PG_TRY();    {
-        PGconn       *conn = fsstate->conn;
-        char        sql[64];
-        int            numrows;        int            i;
+        int            numrows;
-        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-                 fsstate->fetch_size, fsstate->cursor_number);
+        res = pgfdw_get_result(conn, sql);
+        fsstate->fetch_status = PGFDWFETCH_READY;
-        res = pgfdw_exec_query(conn, sql);        /* On error, report the original query, not the FETCH. */        if
(PQresultStatus(res)!= PGRES_TUPLES_OK)            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
@@ -2923,6 +2981,10 @@ fetch_more_data(ForeignScanState *node)        /* Must be EOF if we didn't get as many tuples as
weasked for. */        fsstate->eof_reached = (numrows < fsstate->fetch_size);
 
+        /* But don't return EOF if any tuple available */
+        if (numrows == 0)
+            fsstate->fetch_status = PGFDWFETCH_EOF;
+        PQclear(res);        res = NULL;    }
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index ac02304..f76fc94 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1553,6 +1553,8 @@ ExecutePlan(EState *estate,    if (use_parallel_mode)        EnterParallelMode();
+    ExecStartNode(planstate);
+    /*     * Loop until we've processed the proper number of tuples from the plan.     */
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 554244f..590b28e 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)    if (node->instrument)        InstrStartNode(node->instrument);
+    node->exec_status = EXEC_READY;
+    switch (nodeTag(node))    {            /*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)    if (node->instrument)        InstrStopNode(node->instrument,
TupIsNull(result)? 0.0 : 1.0);
 
+    if (TupIsNull(result) &&
+        node->exec_status == EXEC_READY)
+        node->exec_status = EXEC_EOT;
+    return result;}
@@ -786,6 +792,30 @@ ExecEndNode(PlanState *node)}/*
+ * ExecStartNode - execute registered early-startup callbacks
+ */
+bool
+ExecStartNode(PlanState *node)
+{
+    if (node == NULL)
+        return false;
+
+    switch (nodeTag(node))
+    {
+    case T_GatherState:
+        return ExecStartGather((GatherState *)node);
+        break;
+    case T_SeqScanState:
+        return ExecStartSeqScan((SeqScanState *)node);
+        break;
+    default:
+        break;    
+    }
+
+    return planstate_tree_walker(node, ExecStartNode, NULL);
+}
+
+/* * ExecShutdownNode * * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 0c1e4a3..95130b0 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -2344,6 +2344,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)    aggstate = makeNode(AggState);
aggstate->ss.ps.plan= (Plan *) node;    aggstate->ss.ps.state = estate;
 
+    aggstate->ss.ps.fd = -1;    aggstate->aggs = NIL;    aggstate->numaggs = 0;
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..004c621 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,9 +121,11 @@ ExecInitAppend(Append *node, EState *estate, int eflags){    AppendState *appendstate =
makeNode(AppendState);   PlanState **appendplanstates;
 
+    AppendAsyncState *asyncstates;    int            nplans;    int            i;    ListCell   *lc;
+    bool        has_async_child = false;    /* check for unsupported flags */    Assert(!(eflags & EXEC_FLAG_MARK));
@@ -134,14 +136,22 @@ ExecInitAppend(Append *node, EState *estate, int eflags)    nplans =
list_length(node->appendplans);   appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
 
+    asyncstates =
+        (AppendAsyncState *) palloc0(nplans * sizeof(AppendAsyncState));
+    for (i = 0 ; i < nplans ; i++)
+        asyncstates[i] = ASYNCCHILD_READY;    /*     * create new AppendState for our append node     */
appendstate->ps.plan= (Plan *) node;    appendstate->ps.state = estate; 
+    appendstate->ps.fd = -1;    appendstate->appendplans = appendplanstates;
+    appendstate->async_state = asyncstates;    appendstate->as_nplans = nplans;
+    appendstate->evset = CreateWaitEventSet(CurrentMemoryContext,
+                                            list_length(node->appendplans));    /*     * Miscellaneous initialization
@@ -165,9 +175,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)    {        Plan       *initNode = (Plan
*)lfirst(lc);
 
+        /* always request async-execition for children */
+        eflags |= EXEC_FLAG_ASYNC;        appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
+
+        /*
+         * A child that can scan asynchronously sets a file descriptor for
+         * polling on them during initialization.
+         */
+        if (appendplanstates[i]->fd >= 0)
+        {
+            AddWaitEventToSet(appendstate->evset, WL_SOCKET_READABLE,
+                              appendplanstates[i]->fd, NULL,
+                              (void *)i);
+            has_async_child = true;
+        }        i++;    }
+    if (!has_async_child)
+    {
+        FreeWaitEventSet(appendstate->evset);
+        appendstate->evset = NULL;
+    }    /*     * initialize output tuple type
@@ -193,45 +222,86 @@ ExecInitAppend(Append *node, EState *estate, int eflags)TupleTableSlot *ExecAppend(AppendState
*node){
-    for (;;)
+    int n_notready = 1;
+
+    while (n_notready > 0)    {
-        PlanState  *subnode;        TupleTableSlot *result;
+        PlanState  *subnode;
+        int i, n;
-        /*
-         * figure out which subplan we are currently processing
-         */
-        subnode = node->appendplans[node->as_whichplan];
+        /* Scan the children in a round-robin policy. */
+        n_notready = 0;
+        n = node->as_whichplan;
+        for (i = 0 ; i < node->as_nplans ; i++, n++)
+        {
+            if (n >= node->as_nplans) n = 0;
-        /*
-         * get a tuple from the subplan
-         */
-        result = ExecProcNode(subnode);
+            if (node->async_state[n] != ASYNCCHILD_READY)
+            {
+                if (node->async_state[n] == ASYNCCHILD_NOT_READY)
+                    n_notready++;
+                continue;
+            }
+
+            subnode = node->appendplans[n];
+
+            result = ExecProcNode(subnode);
-        if (!TupIsNull(result))
-        {            /*             * If the subplan gave us something then return it as-is. We do             * NOT
makeuse of the result slot that was set up in             * ExecInitAppend; there's no need for it.             */
 
-            return result;
+            switch (subnode->exec_status)
+            {
+            case  EXEC_READY:
+                node->as_whichplan = n;
+                return result;
+
+            case  EXEC_NOT_READY:
+                node->async_state[n] = ASYNCCHILD_NOT_READY;
+                n_notready++;
+                break;
+
+            case EXEC_EOT:
+                node->async_state[n] = ASYNCCHILD_DONE;
+                break;
+
+            default:
+                elog(ERROR, "Unkown node status: %d", subnode->exec_status);
+            }                        }        /*
-         * Go on to the "next" subplan in the appropriate direction. If no
-         * more subplans, return the empty slot set up for us by
-         * ExecInitAppend.
+         * If we have any "not ready "children after no children can return a
+         * tuple, wait any of them to be ready.         */
-        if (ScanDirectionIsForward(node->ps.state->es_direction))
-            node->as_whichplan++;
-        else
-            node->as_whichplan--;
-        if (!exec_append_initialize_next(node))
-            return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
-        /* Else loop back and try to get a tuple from the new subplan */
+        if (n_notready > 0)
+        {
+            WaitEvent occurred_events[5];
+            int nevents;
+            int i;
+
+            nevents = WaitEventSetWait(node->evset, -1, occurred_events, 5);
+            Assert(nevents > 0);
+            for (i = 0 ; i < nevents ; i++)
+            {
+                int plannum = (int)occurred_events[i].user_data;
+                node->async_state[plannum] = ASYNCCHILD_READY;
+            }
+            node->as_whichplan = (int)occurred_events[0].user_data;
+            continue;
+        }
+    }
+
+    /* All children exhausted. Free the wait event set if exists */
+    if (node->evset)
+    {
+        FreeWaitEventSet(node->evset);
+        node->evset = NULL;    }
+    return NULL;}/* ----------------------------------------------------------------
@@ -271,6 +341,7 @@ ExecReScanAppend(AppendState *node)    {        PlanState  *subnode = node->appendplans[i];
+        node->async_state[i] = ASYNCCHILD_READY;        /*         * ExecReScan doesn't know about my subplans, so I
haveto do         * changed-parameter signaling myself.
 
diff --git a/src/backend/executor/nodeBitmapAnd.c b/src/backend/executor/nodeBitmapAnd.c
index c39d790..3942285 100644
--- a/src/backend/executor/nodeBitmapAnd.c
+++ b/src/backend/executor/nodeBitmapAnd.c
@@ -63,6 +63,7 @@ ExecInitBitmapAnd(BitmapAnd *node, EState *estate, int eflags)     */    bitmapandstate->ps.plan =
(Plan*) node;    bitmapandstate->ps.state = estate;
 
+    bitmapandstate->ps.fd = -1;    bitmapandstate->bitmapplans = bitmapplanstates;    bitmapandstate->nplans =
nplans;
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 449aacb..cc89d56 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -556,6 +556,7 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)    scanstate =
makeNode(BitmapHeapScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->tbm = NULL;    scanstate->tbmiterator = NULL;
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index a364098..d799292 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -206,6 +206,7 @@ ExecInitBitmapIndexScan(BitmapIndexScan *node, EState *estate, int eflags)    indexstate =
makeNode(BitmapIndexScanState);   indexstate->ss.ps.plan = (Plan *) node;    indexstate->ss.ps.state = estate;
 
+    indexstate->ss.ps.fd = -1;    /* normally we don't make the result bitmap till runtime */
indexstate->biss_result= NULL;
 
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 7e928eb..5f06ce9 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -64,6 +64,7 @@ ExecInitBitmapOr(BitmapOr *node, EState *estate, int eflags)     */    bitmaporstate->ps.plan = (Plan
*)node;    bitmaporstate->ps.state = estate;
 
+    bitmaporstate->ps.fd = -1;    bitmaporstate->bitmapplans = bitmapplanstates;    bitmaporstate->nplans = nplans;
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index 3c2f684..6f09853 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -191,6 +191,7 @@ ExecInitCteScan(CteScan *node, EState *estate, int eflags)    scanstate = makeNode(CteScanState);
scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->eflags = eflags;    scanstate->cte_table = NULL;    scanstate->eof_cte =
false;
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 322abca..e825001 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -44,6 +44,7 @@ ExecInitCustomScan(CustomScan *cscan, EState *estate, int eflags)    /* fill up fields of ScanState
*/   css->ss.ps.plan = &cscan->scan.plan;    css->ss.ps.state = estate;
 
+    css->ss.ps.fd = -1;    /* create expression context for node */    ExecAssignExprContext(estate, &css->ss.ps);
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 300f947..4079529 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -144,6 +144,7 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)    scanstate =
makeNode(ForeignScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index a03f6e7..7d508da 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -299,6 +299,7 @@ ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags)    scanstate =
makeNode(FunctionScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->eflags = eflags;    /*
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 3834ed6..60a1598 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,6 +46,88 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate);static HeapTuple
gather_readnext(GatherState*gatherstate);static void ExecShutdownGatherWorkers(GatherState *node);
 
+/* ----------------------------------------------------------------
+ *        StartGather
+ *
+ *        Gather node can have an advantage from asynchronous execution in most
+ *        cases because of its startup cost.
+ *        ----------------------------------------------------------------
+ */
+bool
+ExecStartGather(GatherState *node)
+{
+    EState       *estate = node->ps.state;
+    Gather       *gather = (Gather *) node->ps.plan;
+    TupleTableSlot *fslot = node->funnel_slot;
+    int i;
+
+    /* Don't start if already started or explicitly inhibited by the upper */
+    if (node->initialized || !node->early_start)
+        return false;
+
+    /*
+     * Initialize the parallel context and workers on first execution. We do
+     * this on first execution rather than during node initialization, as it
+     * needs to allocate large dynamic segment, so it is better to do if it
+     * is really needed.
+     */
+
+    /*
+     * Sometimes we might have to run without parallelism; but if
+     * parallel mode is active then we can try to fire up some workers.
+     */
+    if (gather->num_workers > 0 && IsInParallelMode())
+    {
+        ParallelContext *pcxt;
+        bool    got_any_worker = false;
+
+        /* Initialize the workers required to execute Gather node. */
+        if (!node->pei)
+            node->pei = ExecInitParallelPlan(node->ps.lefttree,
+                                             estate,
+                                             gather->num_workers);
+
+        /*
+         * Register backend workers. We might not get as many as we
+         * requested, or indeed any at all.
+         */
+        pcxt = node->pei->pcxt;
+        LaunchParallelWorkers(pcxt);
+
+        /* Set up tuple queue readers to read the results. */
+        if (pcxt->nworkers > 0)
+        {
+            node->nreaders = 0;
+            node->reader =
+                palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+            for (i = 0; i < pcxt->nworkers; ++i)
+            {
+                if (pcxt->worker[i].bgwhandle == NULL)
+                    continue;
+
+                shm_mq_set_handle(node->pei->tqueue[i],
+                                  pcxt->worker[i].bgwhandle);
+                node->reader[node->nreaders++] =
+                    CreateTupleQueueReader(node->pei->tqueue[i],
+                                           fslot->tts_tupleDescriptor);
+                got_any_worker = true;
+            }
+        }
+
+        /* No workers?  Then never mind. */
+        if (!got_any_worker)
+            ExecShutdownGatherWorkers(node);
+    }
+
+    /* Run plan locally if no workers or not single-copy. */
+    node->need_to_scan_locally = (node->reader == NULL)
+        || !gather->single_copy;
+
+    node->early_start = false;
+    node->initialized = true;
+    return false;
+}/* ---------------------------------------------------------------- *        ExecInitGather
@@ -58,6 +140,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)    Plan       *outerNode;    bool
hasoid;   TupleDesc    tupDesc;
 
+    int            child_eflags;    /* Gather node doesn't have innerPlan node. */    Assert(innerPlan(node) ==
NULL);
@@ -68,6 +151,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)    gatherstate = makeNode(GatherState);
gatherstate->ps.plan= (Plan *) node;    gatherstate->ps.state = estate;
 
+    gatherstate->ps.fd = -1;    gatherstate->need_to_scan_locally = !node->single_copy;    /*
@@ -97,7 +181,12 @@ ExecInitGather(Gather *node, EState *estate, int eflags)     * now initialize outer plan     */
outerNode= outerPlan(node);
 
-    outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+    /*
+     * This outer plan is executed in another process so don't start
+     * asynchronously in this process
+     */
+    child_eflags = eflags & ~EXEC_FLAG_ASYNC;
+    outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);    gatherstate->ps.ps_TupFromTlist =
false;
@@ -115,6 +204,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)    tupDesc =
ExecTypeFromTL(outerNode->targetlist,hasoid);    ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
 
+    /*
+     * Register asynchronous execution callback for this node. Backend workers
+     * needs to allocate large dynamic segment, and it is better to execute
+     * them at the time of first execution from this aspect. So asynchronous
+     * execution should be decided considering that but we omit the aspect for
+     * now.
+     */
+    if (eflags & EXEC_FLAG_ASYNC)
+        gatherstate->early_start = true;
+    return gatherstate;}
@@ -128,74 +227,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)TupleTableSlot *ExecGather(GatherState
*node){
-    TupleTableSlot *fslot = node->funnel_slot;
-    int            i;    TupleTableSlot *slot;    TupleTableSlot *resultSlot;    ExprDoneCond isDone;    ExprContext
*econtext;
-    /*
-     * Initialize the parallel context and workers on first execution. We do
-     * this on first execution rather than during node initialization, as it
-     * needs to allocate large dynamic segment, so it is better to do if it
-     * is really needed.
-     */
+    /* Initialize workers if not yet. */    if (!node->initialized)
-    {
-        EState       *estate = node->ps.state;
-        Gather       *gather = (Gather *) node->ps.plan;
-
-        /*
-         * Sometimes we might have to run without parallelism; but if
-         * parallel mode is active then we can try to fire up some workers.
-         */
-        if (gather->num_workers > 0 && IsInParallelMode())
-        {
-            ParallelContext *pcxt;
-
-            /* Initialize the workers required to execute Gather node. */
-            if (!node->pei)
-                node->pei = ExecInitParallelPlan(node->ps.lefttree,
-                                                 estate,
-                                                 gather->num_workers);
-
-            /*
-             * Register backend workers. We might not get as many as we
-             * requested, or indeed any at all.
-             */
-            pcxt = node->pei->pcxt;
-            LaunchParallelWorkers(pcxt);
-            node->nworkers_launched = pcxt->nworkers_launched;
-
-            /* Set up tuple queue readers to read the results. */
-            if (pcxt->nworkers_launched > 0)
-            {
-                node->nreaders = 0;
-                node->reader =
-                    palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
-
-                for (i = 0; i < pcxt->nworkers_launched; ++i)
-                {
-                    shm_mq_set_handle(node->pei->tqueue[i],
-                                      pcxt->worker[i].bgwhandle);
-                    node->reader[node->nreaders++] =
-                        CreateTupleQueueReader(node->pei->tqueue[i],
-                                               fslot->tts_tupleDescriptor);
-                }
-            }
-            else
-            {
-                /* No workers?  Then never mind. */
-                ExecShutdownGatherWorkers(node);
-            }
-        }
-
-        /* Run plan locally if no workers or not single-copy. */
-        node->need_to_scan_locally = (node->reader == NULL)
-            || !gather->single_copy;
-        node->initialized = true;
-    }
+        ExecStartGather(node);    /*     * Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index dcf5175..33093e7 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -207,6 +207,7 @@ ExecInitGroup(Group *node, EState *estate, int eflags)    grpstate = makeNode(GroupState);
grpstate->ss.ps.plan= (Plan *) node;    grpstate->ss.ps.state = estate;
 
+    grpstate->ss.ps.fd = -1;    grpstate->grp_done = FALSE;    /*
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 9ed09a7..f62b556 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -172,6 +172,7 @@ ExecInitHash(Hash *node, EState *estate, int eflags)    hashstate = makeNode(HashState);
hashstate->ps.plan= (Plan *) node;    hashstate->ps.state = estate;
 
+    hashstate->ps.fd = -1;    hashstate->hashtable = NULL;    hashstate->hashkeys = NIL;    /* will be set by parent
HashJoin*/
 
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 369e666..ec54570 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -451,6 +451,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)    hjstate = makeNode(HashJoinState);
 hjstate->js.ps.plan = (Plan *) node;    hjstate->js.ps.state = estate;
 
+    hjstate->js.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 4f6f91c..94b0193 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -403,6 +403,7 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags)    indexstate =
makeNode(IndexOnlyScanState);   indexstate->ss.ps.plan = (Plan *) node;    indexstate->ss.ps.state = estate;
 
+    indexstate->ss.ps.fd = -1;    indexstate->ioss_HeapFetches = 0;    /*
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index bf16cb1..1beee6f 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -829,6 +829,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)    indexstate =
makeNode(IndexScanState);   indexstate->ss.ps.plan = (Plan *) node;    indexstate->ss.ps.state = estate;
 
+    indexstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index faf32e1..6baf1c0 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -384,6 +384,7 @@ ExecInitLimit(Limit *node, EState *estate, int eflags)    limitstate = makeNode(LimitState);
limitstate->ps.plan= (Plan *) node;    limitstate->ps.state = estate;
 
+    limitstate->ps.fd = -1;    limitstate->lstate = LIMIT_INITIAL;
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 4ebcaff..42b2ff5 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -361,6 +361,7 @@ ExecInitLockRows(LockRows *node, EState *estate, int eflags)    lrstate = makeNode(LockRowsState);
 lrstate->ps.plan = (Plan *) node;    lrstate->ps.state = estate;
 
+    lrstate->ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 9ab03f3..db8279a 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -171,6 +171,7 @@ ExecInitMaterial(Material *node, EState *estate, int eflags)    matstate = makeNode(MaterialState);
  matstate->ss.ps.plan = (Plan *) node;    matstate->ss.ps.state = estate;
 
+    matstate->ss.ps.fd = -1;    /*     * We must have a tuplestore buffering the subplan output to do backward
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..c5323d7 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -83,6 +83,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)     */    mergestate->ps.plan =
(Plan*) node;    mergestate->ps.state = estate;
 
+    mergestate->ps.fd = -1;    mergestate->mergeplans = mergeplanstates;    mergestate->ms_nplans = nplans;
@@ -112,6 +113,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)    {        Plan       *initNode
=(Plan *) lfirst(lc);
 
+        /* always request async execution for now */
+        eflags = eflags | EXEC_FLAG_ASYNC;
+        mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);        i++;    }
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 6db09b8..27ac84e 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -1485,6 +1485,7 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)    mergestate =
makeNode(MergeJoinState);   mergestate->js.ps.plan = (Plan *) node;    mergestate->js.ps.state = estate;
 
+    mergestate->js.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index e62c8aa..78df2e4 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1561,6 +1561,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)    mtstate->ps.plan = (Plan *)
node;   mtstate->ps.state = estate;    mtstate->ps.targetlist = NIL;        /* not actually used */
 
+    mtstate->ps.fd = -1;    mtstate->operation = operation;    mtstate->canSetTag = node->canSetTag;
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..c262d7f 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -309,6 +309,7 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)    nlstate = makeNode(NestLoopState);
 nlstate->js.ps.plan = (Plan *) node;    nlstate->js.ps.state = estate;
 
+    nlstate->js.ps.fd = -1;    /*     * Miscellaneous initialization
@@ -340,11 +341,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)     * inner child, because it will
alwaysbe rescanned with fresh parameter     * values.     */
 
+
+    /*
+     * async execution of outer plan is benetifical if this join is requested
+     * as async
+     */    outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);    if (node->nestParams == NIL)
   eflags |= EXEC_FLAG_REWIND;    else        eflags &= ~EXEC_FLAG_REWIND;
 
+
+    /*
+     * Async execution of the inner is inhibited if parameterized by the
+     * outer
+     */
+    if (list_length(node->nestParams) > 0)
+        eflags &= ~ EXEC_FLAG_ASYNC;
+    innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);    /*
diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c
index e76405a..48a70cb 100644
--- a/src/backend/executor/nodeRecursiveunion.c
+++ b/src/backend/executor/nodeRecursiveunion.c
@@ -176,6 +176,7 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)    rustate =
makeNode(RecursiveUnionState);   rustate->ps.plan = (Plan *) node;    rustate->ps.state = estate;
 
+    rustate->ps.fd = -1;    rustate->eqfunctions = NULL;    rustate->hashfunctions = NULL;
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index 4007b76..027b64e 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -217,6 +217,7 @@ ExecInitResult(Result *node, EState *estate, int eflags)    resstate = makeNode(ResultState);
resstate->ps.plan= (Plan *) node;    resstate->ps.state = estate;
 
+    resstate->ps.fd = -1;    resstate->rs_done = false;    resstate->rs_checkqual = (node->resconstantqual == NULL) ?
false: true;
 
diff --git a/src/backend/executor/nodeSamplescan.c b/src/backend/executor/nodeSamplescan.c
index 9ce7c02..a670e77 100644
--- a/src/backend/executor/nodeSamplescan.c
+++ b/src/backend/executor/nodeSamplescan.c
@@ -152,6 +152,7 @@ ExecInitSampleScan(SampleScan *node, EState *estate, int eflags)    scanstate =
makeNode(SampleScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..86a3015 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,20 @@ static TupleTableSlot *SeqNext(SeqScanState *node); *
----------------------------------------------------------------*/
 
+bool
+ExecStartSeqScan(SeqScanState *node)
+{
+    if (node->early_start)
+    {
+        ereport(LOG,
+                (errmsg("dummy_async_cb is called for %p@ExecStartSeqScan", node),
+                 errhidestmt(true)));
+        node->early_start = false;
+    }
+
+    return false;
+}
+/* ---------------------------------------------------------------- *        SeqNext *
@@ -177,6 +191,7 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)    scanstate = makeNode(SeqScanState);
scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
@@ -214,6 +229,10 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
ExecAssignResultTypeFromTL(&scanstate->ss.ps);   ExecAssignScanProjectionInfo(&scanstate->ss);
 
+    /*  Do early-start when requested */
+    if (eflags & EXEC_FLAG_ASYNC)
+        scanstate->early_start = true;
+    return scanstate;}
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 2d81d46..8eafd91 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -487,6 +487,7 @@ ExecInitSetOp(SetOp *node, EState *estate, int eflags)    setopstate = makeNode(SetOpState);
setopstate->ps.plan= (Plan *) node;    setopstate->ps.state = estate;
 
+    setopstate->ps.fd = -1;    setopstate->eqfunctions = NULL;    setopstate->hashfunctions = NULL;
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a34dcc5..f28dc2d 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -162,6 +162,7 @@ ExecInitSort(Sort *node, EState *estate, int eflags)    sortstate = makeNode(SortState);
sortstate->ss.ps.plan= (Plan *) node;    sortstate->ss.ps.state = estate;
 
+    sortstate->ss.ps.fd = -1;    /*     * We must have random access to the sort output to do backward scan or
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index 0304b15..c2b9bb0 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -117,6 +117,7 @@ ExecInitSubqueryScan(SubqueryScan *node, EState *estate, int eflags)    subquerystate =
makeNode(SubqueryScanState);   subquerystate->ss.ps.plan = (Plan *) node;    subquerystate->ss.ps.state = estate;
 
+    subquerystate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index 2604103..41d69c3 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -461,6 +461,7 @@ ExecInitTidScan(TidScan *node, EState *estate, int eflags)    tidstate = makeNode(TidScanState);
tidstate->ss.ps.plan= (Plan *) node;    tidstate->ss.ps.state = estate;
 
+    tidstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index 4caae34..56c21e8 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -122,6 +122,7 @@ ExecInitUnique(Unique *node, EState *estate, int eflags)    uniquestate = makeNode(UniqueState);
uniquestate->ps.plan= (Plan *) node;    uniquestate->ps.state = estate;
 
+    uniquestate->ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeValuesscan.c b/src/backend/executor/nodeValuesscan.c
index 2c4bd9c..2ec3ed7 100644
--- a/src/backend/executor/nodeValuesscan.c
+++ b/src/backend/executor/nodeValuesscan.c
@@ -205,6 +205,7 @@ ExecInitValuesScan(ValuesScan *node, EState *estate, int eflags)    scanstate =
makeNode(ValuesScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index f06eebe..bc5b9ce 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1787,6 +1787,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)    winstate =
makeNode(WindowAggState);   winstate->ss.ps.plan = (Plan *) node;    winstate->ss.ps.state = estate;
 
+    winstate->ss.ps.fd = -1;    /*     * Create expression contexts.  We need two, one for per-input-tuple
diff --git a/src/backend/executor/nodeWorktablescan.c b/src/backend/executor/nodeWorktablescan.c
index cfed6e6..230c849 100644
--- a/src/backend/executor/nodeWorktablescan.c
+++ b/src/backend/executor/nodeWorktablescan.c
@@ -144,6 +144,7 @@ ExecInitWorkTableScan(WorkTableScan *node, EState *estate, int eflags)    scanstate =
makeNode(WorkTableScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->rustate = NULL;    /* we'll set this later */    /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 44fac27..de78d04 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@#define EXEC_FLAG_WITH_OIDS        0x0020    /* force OIDs in returned tuples */#define
EXEC_FLAG_WITHOUT_OIDS   0x0040    /* force no OIDs in returned tuples */#define EXEC_FLAG_WITH_NO_DATA    0x0080    /*
relscannability doesn't matter */
 
+#define EXEC_FLAG_ASYNC            0x0100    /* request asynchronous execution *//*
@@ -224,6 +225,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);extern PlanState *ExecInitNode(Plan *node, EState
*estate,int eflags);extern TupleTableSlot *ExecProcNode(PlanState *node);extern Node *MultiExecProcNode(PlanState
*node);
+extern bool ExecStartNode(PlanState *node);extern void ExecEndNode(PlanState *node);extern bool
ExecShutdownNode(PlanState*node);
 
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index f76d9be..0a48a03 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -18,6 +18,7 @@extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);extern TupleTableSlot
*ExecGather(GatherState*node);
 
+extern bool ExecStartGather(GatherState *node);extern void ExecEndGather(GatherState *node);extern void
ExecShutdownGather(GatherState*node);extern void ExecReScanGather(GatherState *node);
 
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index f2e61ff..daf54ac 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -19,6 +19,7 @@extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);extern TupleTableSlot
*ExecSeqScan(SeqScanState*node);
 
+extern bool ExecStartSeqScan(SeqScanState *node);extern void ExecEndSeqScan(SeqScanState *node);extern void
ExecReScanSeqScan(SeqScanState*node);
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ee4e189..205a2c8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -20,6 +20,7 @@#include "lib/pairingheap.h"#include "nodes/params.h"#include "nodes/plannodes.h"
+#include "storage/latch.h"#include "utils/reltrigger.h"#include "utils/sortsupport.h"#include "utils/tuplestore.h"
@@ -345,6 +346,14 @@ typedef struct ResultRelInfo    List       *ri_onConflictSetWhere;} ResultRelInfo;
+/* Enum for async awareness */
+typedef enum NodeStatus
+{
+    EXEC_NOT_READY,
+    EXEC_READY,
+    EXEC_EOT
+} NodeStatus;
+/* ---------------- *      EState information *
@@ -1059,6 +1068,9 @@ typedef struct PlanState    ProjectionInfo *ps_ProjInfo;    /* info for doing tuple projection */
  bool        ps_TupFromTlist;/* state flag for processing set-valued                                 * functions in
targetlist*/
 
+
+    NodeStatus    exec_status;
+    int            fd;} PlanState;/* ----------------
@@ -1138,6 +1150,14 @@ typedef struct ModifyTableState                                         * target */}
ModifyTableState;
+
+typedef enum AppendAsyncState
+{
+    ASYNCCHILD_READY,
+    ASYNCCHILD_NOT_READY,
+    ASYNCCHILD_DONE
+} AppendAsyncState;
+/* ---------------- *     AppendState information *
@@ -1149,8 +1169,10 @@ typedef struct AppendState{    PlanState    ps;                /* its first field is NodeTag */
 PlanState **appendplans;    /* array of PlanStates for my inputs */
 
+    AppendAsyncState   *async_state;    int            as_nplans;    int            as_whichplan;
+    WaitEventSet *evset;} AppendState;/* ----------------
@@ -1259,6 +1281,7 @@ typedef struct SeqScanState{    ScanState    ss;                /* its first field is NodeTag */
 Size        pscan_len;        /* size of parallel heap scan descriptor */ 
+    bool        early_start;} SeqScanState;/* ----------------
@@ -1952,6 +1975,7 @@ typedef struct UniqueStatetypedef struct GatherState{    PlanState    ps;                /* its
firstfield is NodeTag */
 
+    bool        early_start;    bool        initialized;    struct ParallelExecutorInfo *pei;    int
nreaders;

Re: asynchronous and vectorized execution

From
Rajeev rastogi
Date:
On 09 May 2016 23:04, Robert Haas Wrote:

>2. vectorized execution, by which I mean the ability of a node to return
>tuples in batches rather than one by one.  Andres has opined more than
>once that repeated trips through ExecProcNode defeat the ability of the
>CPU to do branch prediction correctly, slowing the whole system down,
>and that they also result in poor CPU cache behavior, since we jump all
>over the place executing a little bit of code from each node before
>moving onto the next rather than running one bit of code first, and then
>another later.  I think that's
>probably right.   For example, consider a 5-table join where all of
>the joins are implemented as hash tables.  If this query plan is going
>to be run to completion, it would make much more sense to fetch, say,
>100 tuples from the driving scan and then probe for all of those in the
>first hash table, and then probe for all of those in the second hash
>table, and so on.  What we do instead is fetch one tuple and probe for
>it in all 5 hash tables, and then repeat.  If one of those hash tables
>would fit in the CPU cache but all five together will not,
>that seems likely to be a lot worse.   But even just ignoring the CPU
>cache aspect of it for a minute, suppose you want to write a loop to
>perform a hash join.  The inner loop fetches the next tuple from the
>probe table and does a hash lookup.  Right now, fetching the next tuple
>from the probe table means calling a function which in turn calls
>another function which probably calls another function which probably
>calls another function and now about 4 layers down we actually get the
>next tuple.  If the scan returned a batch of tuples to the hash join,
>fetching the next tuple from the batch would probably be 0 or 1 function
>calls rather than ... more.  Admittedly, you've got to consider the cost
>of marshaling the batches but I'm optimistic that there are cycles to be
>squeezed out here.  We might also want to consider storing batches of
>tuples in a column-optimized rather than row-optimized format so that
>iterating through one or two attributes across every tuple in the batch
>touches the minimal number of cache lines.


This sounds to be really great idea in the direction of performance improvement.
I would like to share my thought as per our research work in the similar area (Mostly it may be as you have
mentioned).
Our goal with this work was to:
1. Makes the processing data-centric instead of operator centric.
2. Instead of pulling each tuple from immediate operator, operator can push the tuple to its parent. It can be allowed
topush until it sees any operator, which cannot be processed without result from other operator.   
 
3. Above two points to make better data-locality.

e.g. we had done some quick prototyping (take it just as thought provoker) as mentioned below:
Query: select * from tbl1, tbl2, tbl3 where tbl1.a=tbl2.b and tbl2.b=tbl3.c;
For hash join:
For each tuple t2 of tbl2Materialize a hash tbl1.a = tbl2.b
For each tuple t3 of tbl3Materialize a hash tbl2.b = tbl3.c
for each tuple t1 of tbl1Search in hash  tbl1.a = tbl2.b    Search in hash tbl2.b = tbl3.c        Output t1*t2*t3

Off course at each level if there is any additional Qual for the table, same can be applied. 

Similarly for Nested Loop Join, plan tree can be processed in the form of post-order traversal of tree.
Scan first relation (leftmost relation), store all tuple --> Outer
Loop through all scan (Or some part of total tuples)node relation starting from second oneScan the current relationFor
eachtuple, match with all tuples of outer result, build the combined tuple.Save all combined satisfied tuple --> Outer
 

The result we got was really impressive.

There is a paper by Thomas Neumann on this idea: http://www.vldb.org/pvldb/vol4/p539-neumann.pdf 

Note: VitesseDB has also implemented this approach for Hash Join along with compilation and their result is really
impressive.

Thanks and Regards,
Kumar Rajeev Rastogi.
http://rajeevrastogi.blogspot.in/   

Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Mon, May 9, 2016 at 8:34 PM, David Rowley
<david.rowley@2ndquadrant.com> wrote:
> It's interesting that you mention this. We identified this as a pain
> point during our work on column stores last year. Simply passing
> single tuples around the executor is really unfriendly towards L1
> instruction cache, plus also the points you mention about L3 cache and
> hash tables and tuple stores. I really think that we're likely to see
> significant gains by processing >1 tuple at a time, so this topic very
> much interests me.

Cool.  I hope we can work together on it, and with anyone else who is
interested.

> When we start multiplying those increases with the increases with
> something like parallel query then we're starting to see very nice
> gains in performance.

Check.

> Alvaro, Tomas and I had been discussing this and late last year I did
> look into what would be required to allow this to happen in Postgres.
> Basically there's 2 sub-projects, I'll describe what I've managed to
> learn so far about each, and the rough plan that I have to implement
> them:
>
> 1. Batch Execution:
>
> a. Modify ScanAPI to allow batch tuple fetching in predefined batch sizes.
> b. Modify TupleTableSlot to allow > 1 tuple to be stored. Add flag to
> indicate if the struct contains a single or a multiple tuples.
> Multiple tuples may need to be deformed in a non-lazy fashion in order
> to prevent too many buffers from having to be pinned at once. Tuples
> will be deformed into arrays of each column rather than arrays for
> each tuple (this part is important to support the next sub-project)
> c. Modify some nodes (perhaps start with nodeAgg.c) to allow them to
> process a batch TupleTableSlot. This will require some tight loop to
> aggregate the entire TupleTableSlot at once before returning.
> d. Add function in execAmi.c which returns true or false depending on
> if the node supports batch TupleTableSlots or not.
> e. At executor startup determine if the entire plan tree supports
> batch TupleTableSlots, if so enable batch scan mode.

I'm wondering if we should instead have a whole new kind of node, a
TupleTableVector, say.  Things that want to work one tuple at a time
can continue to do so with no additional overhead.  Things that want
to return batches can do it via this new result type.

> node types, which *may* not be all that difficult. I'm also assuming
> that batch mode (in all cases apart from queries with LIMIT or
> cursors) will always be faster than tuple-at-a-time, so requires no
> costings from the planner.

I definitely agree that we need to consider cases with and without
LIMIT separately, but there's more than one way to get a LIMIT.  For
example, a subquery returns only one row; a semijoin returns only one
row.  I don't think you are going to escape planner considerations.

Nested Loop Semi Join
-> Seq Scan
-> Index Scan on dont_batch_here

> 2. Vector processing
>
> (I admit that I've given this part much less thought so far, but
> here's what I have in mind)
>
> This depends on batch execution, and is intended to allow the executor
> to perform function calls to an entire batch at once, rather than
> tuple-at-a-time. For example, let's take the following example;
>
> SELECT a+b FROM t;
>
> here (as of now) we'd scan "t" one row at a time and perform a+b after
> having deformed enough of the tuple to do that. We'd then go and get
> another Tuple from the scan node and repeat until the scan gave us no
> more Tuples.
>
> With batch execution we'd fetch multiple Tuples from the scan and we'd
> then perform the call to say int4_pl() multiple times, which still
> kinda sucks as it means calling int4_pl() possibly millions of times
> (once per tuple). The vector mode here would require that we modify
> pg_operator to add a vector function for each operator so that we can
> call the function passing in an array of Datums and a length to have
> SIMD operations perform the addition, so we'd call something like
> int4_pl_vector() only once per batch of tuples allowing the CPU to
> perform SIMD operations on those datum arrays. This could be done in
> an incremental way as the code could just callback on the standard
> function in cases where a vectorised version of it is not available.
> Thought is needed here about when exactly this decision is made as the
> user may not have permissions to execute the vector function, so it
> can't simply be a run time check.  These functions would simply return
> another vector of the results.  Aggregates could be given a vector
> transition function, where something like COUNT(*)'s vector_transfn
> would simply just current_count += vector_length;
>
> This project does appear to require that we bloat the code with 100's
> of vector versions of each function. I'm not quite sure if there's a
> better way to handle this. The problem is that the fmgr is pretty much
> a barrier to SIMD operations, and this was the only idea that I've had
> so far about breaking through that barrier. So further ideas here are
> very welcome.

I don't have any at the moment, but I'm not keen on hundreds of new
vector functions that can all have bugs or behavior differences versus
the unvectorized versions of the same code.  That's a substantial tax
on future development.  I think it's important to understand what
sorts of queries we are targeting here.  KaiGai's GPU-acceleration
stuff does great on queries with complex WHERE clauses, but most
people don't care not only because it's out-of-core but because who
actually looks for the records where (a + b) % c > (d + e) * f / g?
This seems like it has the same issue.  If we can speed up common
queries people are actually likely to run, OK, that's interesting.

By the way, I think KaiGai's GPU-acceleration stuff points to another
pitfall here.  There's other stuff somebody might legitimately want to
do that requires another copy of each function. For example, run-time
code generation likely needs that (a function to tell the code
generator what to generate for each of our functions), and
GPU-acceleration probably does, too.  If fixing a bug in numeric_lt
requires changing not only the regular version and the vectorized
version but also the GPU-accelerated version and the codegen version,
Tom and Dean are going to kill us.  And justifiably so!  Granted,
nobody is proposing those other features in core right now, but
they're totally reasonable things to want to do.

I suspect the number of queries that are being hurt by fmgr overhead
is really large, and I think it would be nice to attack that problem
more directly.  It's a bit hard to discuss what's worthwhile in the
abstract, without performance numbers, but when you vectorize, how
much is the benefit from using SIMD instructions and how much is the
benefit from just not going through the fmgr every time?

> The idea here is that these 2 projects help pave the way to bring
> columnar storage into PostgreSQL. Without these we're unlikely to get
> much benefit of columnar storage as we'd be stuck processing rows one
> at a time still.  Adding columnar storage on the top of the above
> should further increase performance as we can skip the tuple-deform
> step and pull columnar array/vectors directly into a TupleTableSlot,
> although some trickery would be involved here when the scan has keys.

I'm a bit mystified by this.  It seems to me that you could push down
the optimizable quals into the AM, just like what index AMs due for
Index Quals and what postgres_fdw does for pushdown-safe quals.  Then
those quals get executed on the optimized representation, and you only
have to fill TupleTableSlots for the surviving tuples.  AFAICS,
vectorizing the core executor only helps if you want to keep the data
in vectorized form for longer, e.g. to somehow optimize joins or aggs,
or if the data starts out in row-oriented form and we convert it to
columnar form before doing vector ops.  Evidently I'm confused.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Mon, May 9, 2016 at 9:38 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
> Is the parallel aware Append node sufficient to run multiple nodes
> asynchronously? (Sorry, I couldn't have enough time to code the feature
> even though we had discussion before.)

It's tempting to think that parallel query and asynchronous query are
the same thing, but I think that they are actually quite different.
Parallel query involves using multiple processes to service a query.
Asynchronous query involves using each individual process as
efficiently as possible by not having it block any more than
necessary.  You can want these things together or separately.  For
example, consider this query plan:

Append
-> ForeignScan
-> ForeignScan

Here, you do not want parallel query; the queries must both be
launched by the user backend, not some worker process, else you will
not get the right transaction semantics.  The parallel-aware Append
node we talked about before does not help here.  On the other hand,
consider this:

Append -> Seq Scan      Filter: lots_of_cpu() -> Seq Scan      Filter: lots_of_cpu()

Here, asynchronous query is of no help, but parallel query is great.
Now consider this hypothetical plan:

Gather
-> Hash Join -> Parallel Bitmap Heap Scan   -> Bitmap Index Scan -> Parallel Hash   -> Parallel Seq Scan

Let's assume that the bitmap *heap* scan can be performed in parallel
but the bitmap *index* scan can't.  That's pretty reasonable for a
first cut, actually, because the index accesses are probably touching
much less data, and we're doing little CPU work for each index page -
any delay here is likely to be I/O.

So in that world what you want is for the first worker to begin
performing the bitmap index scan and building a shared TIDBitmap for
that the workers can use to scan the heap.  The other workers,
meanwhile, could begin building the shared hash table (which is what I
intend to denote by saying that it's a *Parallel* Hash).  If the
process building the bitmap finishes before the hash table is built,
it can help build the hash table as well.  Once both operations are
done, each process can scan a subset of the rows from the bitmap heap
scan and do the hash table probes for just those rows.  To make all of
this work, you need both *parallel* query, so that you have workers,
and also *asynchronous* query, so that workers which see that the
bitmap index scan is in progress don't get stuck waiting for it but
can look around for other work.

> In the above example, scan on foreign-table takes longer lead time than
> local scan. If Append can map every child nodes on individual workers,
> local scan worker begins to return tuples at first, then, mixed tuples
> shall be returned eventually.

This is getting at an issue I'm somewhat worried about, which is
scheduling.  In my prototype, we only check for events if there are no
nodes ready for the CPU now, but sometimes that might be a loser.  One
probably needs to check for events periodically even when there are
still nodes waiting for the CPU, but "how often?" seems like an
unanswerable question.

> However, the process internal asynchronous execution may be also beneficial
> in case when cost of shm_mq is not ignorable (e.g, no scan qualifiers
> are given to worker process). I think it allows to implement pre-fetching
> very naturally.

Yes.

>> My proposal for how to do this is to make ExecProcNode function as a
>> backward-compatibility wrapper.  For asynchronous execution, a node
>> might return a not-ready-yet indication, but if that node is called
>> via ExecProcNode, it means the caller isn't prepared to receive such
>> an indication, so ExecProcNode will just wait for the node to become
>> ready and then return the tuple.
>>
> Backward compatibility is good. In addition, child node may want to
> know the context when it is called. It may want to switch the behavior
> according to the caller's expectation. For example, it may be beneficial
> if SeqScan makes more aggressive prefetching on asynchronous execution.

Maybe, but I'm a bit doubtful.  I'm not seeing a lot of advantage in
that sort of thing, and it will make the code a lot more complicated.

> Also, can we consider which data format will be returned from the child
> node during the planning stage? It affects to the cost of inter-node
> data exchange. If a pair of parent-node and child-node supports its
> special data format (like as existing HashJoin and Hash doing), it shall
> be a discount factor of cost estimation.

I'm not sure.  The costing aspects of this need a lot more thought
than I have given them so far.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Tue, May 10, 2016 at 3:00 AM, konstantin knizhnik
<k.knizhnik@postgrespro.ru> wrote:
> What's wrong with it that worker is blocked? You can just have more workers
> (more than CPU cores) to let other of them continue to do useful work.

Not really.  The workers are all running the same plan, so they'll all
make the same decision about which node needs to be executed next.  If
that node can't accommodate multiple processes trying to execute it at
the same time, it will have to block all of them but the first one.
Adding more processes just increases the number of processes sitting
around doing nothing.

> But there are some researches, for example:
>
> http://www.vldb.org/pvldb/vol4/p539-neumann.pdf
>
> showing that the same or even better effect can be achieved by generation
> native code for query execution plan (which is not so difficult now, thanks
> to LLVM).
> It eliminates interpretation overhead and increase cache locality.
> I get similar results with my own experiments of accelerating SparkSQL.
> Instead of native code generation I used conversion of query plans to C code
> and experiment with different data representation. "Horisontal model" with
> loading columns on demands shows better performance than columnar store.

Yes, I think this approach should also be considered.

> At this moment (February) them have implemented translation of only few
> PostgreSQL operators used by ExecQuals  and do not support aggregates.
> Them get about 2 times increase of speed at synthetic queries and 25%
> increase at TPC-H Q1 (for Q1 most critical is generation of native code for
> aggregates, because ExecQual itself takes only 6% of time for this query).
> Actually these 25% for Q1 were achieved not by using dynamic code
> generation, but switching from PULL to PUSH model in executor.
> It seems to be yet another interesting PostgreSQL executor transformation.
> As far as I know, them are going to publish result of their work to open
> source...

Interesting.  You may notice that in "asynchronous mode" my prototype
works using a push model of sorts.  Maybe that should be taken
further.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Konstantin Knizhnik
Date:
On 05/10/2016 08:26 PM, Robert Haas wrote:
> On Tue, May 10, 2016 at 3:00 AM, konstantin knizhnik
> <k.knizhnik@postgrespro.ru> wrote:
>> What's wrong with it that worker is blocked? You can just have more workers
>> (more than CPU cores) to let other of them continue to do useful work.
> Not really.  The workers are all running the same plan, so they'll all
> make the same decision about which node needs to be executed next.  If
> that node can't accommodate multiple processes trying to execute it at
> the same time, it will have to block all of them but the first one.
> Adding more processes just increases the number of processes sitting
> around doing nothing.

Doesn't this actually mean that we need to have normal job scheduler which is given queue of jobs and having some pool
ofthreads will be able to orginize efficient execution of queries? Optimizer can build pipeline (graph) of tasks, which
correspondsto 
 
execution plan nodes, i.e. SeqScan, Sort, ... Each task is splitted into several jobs which can be concurretly
scheduledby task dispatcher.  So you will not have blocked worker waiting for something and all system resources will
beutilized. Such approach 
 
with dispatcher allows to implement quotas, priorities,... Also dispatches can care about NUMA and cache optimizations
whichis especially critical on modern architectures. One more reference: http://db.in.tum.de/~leis/papers/morsels.pdf
 

Sorry, may be I wrong, but I still think that async.ops is "multitasking for poor":)
Yes, maintaining threads and especially separate processes adds significant overhead. It leads to extra resources
consumptionand context switches are quite expensive. And I know from my own experience that replacing several
concurrentprocesses performing 
 
some IO (for example with sockets) with just one process using multiplexing allows to increase performance. But still
async.ops. is just a way to make programmer responsible for managing state machine instead of relying on OS tomake
contextswitches. 
 
Manual transmission is still more efficient than automatic transmission. But still most drives prefer last one;)

Seriously, I carefully read your response to Kochei, but still not convinced that async. ops. is what we need.  Or may
bewe just understand different thing by this notion.
 


>
>> But there are some researches, for example:
>>
>> http://www.vldb.org/pvldb/vol4/p539-neumann.pdf
>>
>> showing that the same or even better effect can be achieved by generation
>> native code for query execution plan (which is not so difficult now, thanks
>> to LLVM).
>> It eliminates interpretation overhead and increase cache locality.
>> I get similar results with my own experiments of accelerating SparkSQL.
>> Instead of native code generation I used conversion of query plans to C code
>> and experiment with different data representation. "Horisontal model" with
>> loading columns on demands shows better performance than columnar store.
> Yes, I think this approach should also be considered.
>
>> At this moment (February) them have implemented translation of only few
>> PostgreSQL operators used by ExecQuals  and do not support aggregates.
>> Them get about 2 times increase of speed at synthetic queries and 25%
>> increase at TPC-H Q1 (for Q1 most critical is generation of native code for
>> aggregates, because ExecQual itself takes only 6% of time for this query).
>> Actually these 25% for Q1 were achieved not by using dynamic code
>> generation, but switching from PULL to PUSH model in executor.
>> It seems to be yet another interesting PostgreSQL executor transformation.
>> As far as I know, them are going to publish result of their work to open
>> source...
> Interesting.  You may notice that in "asynchronous mode" my prototype
> works using a push model of sorts.  Maybe that should be taken
> further.
>


-- 
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company




Re: asynchronous and vectorized execution

From
Jim Nasby
Date:
On 5/10/16 12:47 AM, Kouhei Kaigai wrote:
>> > On 10 May 2016 at 13:38, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
>>> > > My concern about ExecProcNode is, it is constructed with a large switch
>>> > > ... case statement. It involves tons of comparison operation at run-time.
>>> > > If we replace this switch ... case by function pointer, probably, it make
>>> > > performance improvement. Especially, OLAP workloads that process large
>>> > > amount of rows.
>> >
>> > I imagined that any decent compiler would have built the code to use
>> > jump tables for this. I have to say that I've never checked to make
>> > sure though.
>> >
> Ah, indeed, you are right. Please forget above part.

Even so, I would think that the simplification in the executor would be 
worth it. If you need to add a new node there's dozens of places where 
you might have to mess with these giant case statements.

In python (for example), types (equivalent to nodes in this case) have 
data structures that define function pointers for a core set of 
operations (such as doing garbage collection, or generating a string 
representation). That means that to add a new type at the C level you 
only need to define a C structure that has the expected members, and an 
initializer function that will properly set everything up when you 
create a new instance. There's no messing around with the rest of the 
guts of python.

*Even more important, everything you need to know about the type is 
contained in one place, not spread throughout the code.*
-- 
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)   mobile: 512-569-9461



Re: asynchronous and vectorized execution

From
Bert
Date:
hmm, the morsels paper looks really interesting at first sight.
Let's see if we can get a poc working in PostgreSQL? :-)

On Tue, May 10, 2016 at 9:42 PM, Konstantin Knizhnik <k.knizhnik@postgrespro.ru> wrote:
On 05/10/2016 08:26 PM, Robert Haas wrote:
On Tue, May 10, 2016 at 3:00 AM, konstantin knizhnik
<k.knizhnik@postgrespro.ru> wrote:
What's wrong with it that worker is blocked? You can just have more workers
(more than CPU cores) to let other of them continue to do useful work.
Not really.  The workers are all running the same plan, so they'll all
make the same decision about which node needs to be executed next.  If
that node can't accommodate multiple processes trying to execute it at
the same time, it will have to block all of them but the first one.
Adding more processes just increases the number of processes sitting
around doing nothing.

Doesn't this actually mean that we need to have normal job scheduler which is given queue of jobs and having some pool of threads will be able to orginize efficient execution of queries? Optimizer can build pipeline (graph) of tasks, which corresponds to execution plan nodes, i.e. SeqScan, Sort, ... Each task is splitted into several jobs which can be concurretly scheduled by task dispatcher.  So you will not have blocked worker waiting for something and all system resources will be utilized. Such approach with dispatcher allows to implement quotas, priorities,... Also dispatches can care about NUMA and cache optimizations which is especially critical on modern architectures. One more reference: http://db.in.tum.de/~leis/papers/morsels.pdf

Sorry, may be I wrong, but I still think that async.ops is "multitasking for poor":)
Yes, maintaining threads and especially separate processes adds significant overhead. It leads to extra resources consumption and context switches are quite expensive. And I know from my own experience that replacing several concurrent processes performing some IO (for example with sockets) with just one process using multiplexing allows to increase performance. But still async. ops. is just a way to make programmer responsible for managing state machine instead of relying on OS tomake context switches. Manual transmission is still more efficient than automatic transmission. But still most drives prefer last one;)

Seriously, I carefully read your response to Kochei, but still not convinced that async. ops. is what we need.  Or may be we just understand different thing by this notion.




But there are some researches, for example:

http://www.vldb.org/pvldb/vol4/p539-neumann.pdf

showing that the same or even better effect can be achieved by generation
native code for query execution plan (which is not so difficult now, thanks
to LLVM).
It eliminates interpretation overhead and increase cache locality.
I get similar results with my own experiments of accelerating SparkSQL.
Instead of native code generation I used conversion of query plans to C code
and experiment with different data representation. "Horisontal model" with
loading columns on demands shows better performance than columnar store.
Yes, I think this approach should also be considered.

At this moment (February) them have implemented translation of only few
PostgreSQL operators used by ExecQuals  and do not support aggregates.
Them get about 2 times increase of speed at synthetic queries and 25%
increase at TPC-H Q1 (for Q1 most critical is generation of native code for
aggregates, because ExecQual itself takes only 6% of time for this query).
Actually these 25% for Q1 were achieved not by using dynamic code
generation, but switching from PULL to PUSH model in executor.
It seems to be yet another interesting PostgreSQL executor transformation.
As far as I know, them are going to publish result of their work to open
source...
Interesting.  You may notice that in "asynchronous mode" my prototype
works using a push model of sorts.  Maybe that should be taken
further.



--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company




--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers



--
Bert Desmet
0477/305361

Re: asynchronous and vectorized execution

From
Andres Freund
Date:
Hi,

On 2016-05-09 13:33:55 -0400, Robert Haas wrote:
> I think there are several different areas
> where we should consider major upgrades to our executor.  It's too
> slow and it doesn't do everything we want it to do.  The main things
> on my mind are:


3) We use a lot of very cache-inefficient datastructures.

Especially the pervasive use of linked lists in the executor is pretty
bad for performance. Every element is likely to incur cache misses,
every list element pretty much has it's own cacheline (thereby reducing
overall cache hit ratio), they have a horrible allocation overhead (both
space and palloc runtime).


> 1. asynchronous execution, by which I mean the ability of a node to
> somehow say that it will generate a tuple eventually, but is not yet
> ready, so that the executor can go run some other part of the plan
> tree while it waits.  [...].  It is also a problem
> for parallel query: in a parallel sequential scan, the next worker can
> begin reading the next block even if the current block hasn't yet been
> received from the OS.  Whether or not this will be efficient is a
> research question, but it can be done.  However, imagine a parallel
> scan of a btree index: we don't know what page to scan next until we
> read the previous page and examine the next-pointer.  In the meantime,
> any worker that arrives at that scan node has no choice but to block.
> It would be better if the scan node could instead say "hey, thanks for
> coming but I'm really not ready to be on-CPU just at the moment" and
> potentially allow the worker to go work in some other part of the
> query tree.  For that worker to actually find useful work to do
> elsewhere, we'll probably need it to be the case either that the table
> is partitioned or the original query will need to involve UNION ALL,
> but those are not silly cases to worry about, particularly if we get
> native partitioning in 9.7.

I've to admit I'm not that convinced about the speedups in the !fdw
case. There seems to be a lot easier avenues for performance
improvements.


> 2. vectorized execution, by which I mean the ability of a node to
> return tuples in batches rather than one by one.  Andres has opined
> more than once that repeated trips through ExecProcNode defeat the
> ability of the CPU to do branch prediction correctly, slowing the
> whole system down, and that they also result in poor CPU cache
> behavior, since we jump all over the place executing a little bit of
> code from each node before moving onto the next rather than running
> one bit of code first, and then another later.

FWIW, I've even hacked something up for a bunch of simple queries, and
the performance improvements were significant.  Besides it only being a
weekend hack project, the big thing I got stuck on was considering how
to exactly determine when to batch and not to batch.


I'd personally say that the CPU pipeline defeating aspect is worse than
the effect of the cache/branch misses themselves. Today's CPUs are
heavily superscalar, and our instruction-per-cycle throughput shows
pretty clearly that we're not good at employing (micro-)instruction
paralellism. We're quite frequently at well below one instruction/cycle.





> My proposal for how to do this is to make ExecProcNode function as a
> backward-compatibility wrapper.  For asynchronous execution, a node
> might return a not-ready-yet indication, but if that node is called
> via ExecProcNode, it means the caller isn't prepared to receive such
> an indication, so ExecProcNode will just wait for the node to become
> ready and then return the tuple.  Similarly, for vectorized execution,
> a node might return a bunch of tuples all at once.  ExecProcNode will
> extract the first one and return it to the caller, and subsequent
> calls to ExecProcNode will iterate through the rest of the batch, only
> calling the underlying node-specific function when the batch is
> exhausted.  In this way, code that doesn't know about the new stuff
> can continue to work pretty much as it does today.

I agree that that generally is a reasonable way forward.


> Also, and I think
> this is important, nodes don't need the permission of their parent
> node to use these new capabilities.  They can use them whenever they
> wish, without worrying about whether the upper node is prepared to
> deal with it.  If not, ExecProcNode will paper over the problem.  This
> seems to me to be a good way to keep the code simple.

Maybe not permission, but for some cases it seems to be important to
hint to *not* prefetch a lot of rows. E.g. anti joins come to mind. Just
using batching with force seems likely to regress some queries quite
badly (e.g an expensive join inside an EXISTS() which returns many
tuples).


> For asynchronous execution, I have gone so far as to mock up a bit of
> what this might look like.  This shouldn't be taken very seriously at
> this point, but I'm attaching a few very-much-WIP patches to show the
> direction of my line of thinking.  Basically, I propose to have
> ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
> by putting them into a new PlanState member called "result", which is
> just a Node * so that we can support multiple types of results,
> instead of returning them.

What different types of results are you envisioning?



> By the way, one smaller executor project that I think we should also
> look at has to do with this comment in nodeSeqScan.c:
> 
> static bool
> SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
> {
>         /*
>          * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
>          * (and this is very bad) - so, here we do not check are keys ok or not.
>          */
>         return true;
> }
> 
> Some quick prototyping by my colleague Dilip Kumar suggests that, in
> fact, there are cases where pushing down keys into heap_beginscan()
> could be significantly faster.

I can immediately believe that.


> Some care is required here because any
> functions we execute as scan keys are run with the buffer locked, so
> we had better not run anything very complicated.  But doing this for
> simple things like integer equality operators seems like it could save
> quite a few buffer lock/unlock cycles and some other executor overhead
> as well.

Hm. Do we really have to keep the page locked in the page-at-a-time
mode? Shouldn't the pin suffice?

Greetings,

Andres Freund



Re: asynchronous and vectorized execution

From
Ants Aasma
Date:
On Tue, May 10, 2016 at 7:56 PM, Robert Haas <robertmhaas@gmail.com> wrote:
> On Mon, May 9, 2016 at 8:34 PM, David Rowley
> <david.rowley@2ndquadrant.com> wrote:
> I don't have any at the moment, but I'm not keen on hundreds of new
> vector functions that can all have bugs or behavior differences versus
> the unvectorized versions of the same code.  That's a substantial tax
> on future development.  I think it's important to understand what
> sorts of queries we are targeting here.  KaiGai's GPU-acceleration
> stuff does great on queries with complex WHERE clauses, but most
> people don't care not only because it's out-of-core but because who
> actually looks for the records where (a + b) % c > (d + e) * f / g?
> This seems like it has the same issue.  If we can speed up common
> queries people are actually likely to run, OK, that's interesting.

I have seen pretty complex expressions in the projection and
aggregation. Couple dozen SUM(CASE WHEN a THEN b*c ELSE MIN(d,e)*f
END) type of expressions. In critical places had to replace them with
a C coded function that processed a row at a time to avoid the
executor dispatch overhead.

> By the way, I think KaiGai's GPU-acceleration stuff points to another
> pitfall here.  There's other stuff somebody might legitimately want to
> do that requires another copy of each function. For example, run-time
> code generation likely needs that (a function to tell the code
> generator what to generate for each of our functions), and
> GPU-acceleration probably does, too.  If fixing a bug in numeric_lt
> requires changing not only the regular version and the vectorized
> version but also the GPU-accelerated version and the codegen version,
> Tom and Dean are going to kill us.  And justifiably so!  Granted,
> nobody is proposing those other features in core right now, but
> they're totally reasonable things to want to do.

My thoughts in this area have been circling around getting LLVM to do
the heavy lifting. LLVM/clang could compile existing C functions to IR
and bundle those with the DB. At query planning time or maybe even
during execution the functions can be inlined into the compiled query
plan, LLVM can then be coaxed to copy propagate, constant fold and
dead code eliminate the bejeezus out of the expression tree. This way
duplication of the specialized code can be kept to a minimum while at
least the common cases can completely avoid the fmgr overhead.

This approach would also mesh together fine with batching. Given
suitably regular data structures and simple functions, LLVM will be
able to vectorize the code. If not it will still end up with a nice
tight loop that is an order of magnitude or two faster than the
current executor.

The first cut could take care of ExecQual, ExecTargetList and friends.
Later improvements could let execution nodes provide basic blocks that
would then be threaded together into the main execution loop. If some
node does not implement the basic block interface a default
implementation is used that calls the current interface. It gets a bit
handwavy at this point, but the main idea would be to enable data
marshaling so that values can be routed directly to the code that
needs them without being written to intermediate storage.

> I suspect the number of queries that are being hurt by fmgr overhead
> is really large, and I think it would be nice to attack that problem
> more directly.  It's a bit hard to discuss what's worthwhile in the
> abstract, without performance numbers, but when you vectorize, how
> much is the benefit from using SIMD instructions and how much is the
> benefit from just not going through the fmgr every time?

My feeling is the same, fmgr overhead and data marshaling, dynamic
dispatch through the executor is the big issue. This is corroborated
by what I have seen found by other VM implementations. Once you get
the data into an uniform format where vectorized execution could be
used, the CPU execution resources are no longer the bottleneck. Memory
bandwidth gets in the way, unless each input value is used in multiple
calculations. And even then, we are looking at a 4x speedup at best.

Regards,
Ants Aasma



Re: asynchronous and vectorized execution

From
Andres Freund
Date:
On 2016-05-10 12:34:19 +1200, David Rowley wrote:
> a. Modify ScanAPI to allow batch tuple fetching in predefined batch sizes.
> b. Modify TupleTableSlot to allow > 1 tuple to be stored. Add flag to
> indicate if the struct contains a single or a multiple tuples.
> Multiple tuples may need to be deformed in a non-lazy fashion in order
> to prevent too many buffers from having to be pinned at once. Tuples
> will be deformed into arrays of each column rather than arrays for
> each tuple (this part is important to support the next sub-project)

FWIW, I don't think that's necessarily required, and it has the
potential to slow down some operations (like target list
processing/projections) considerably. By the time vectored execution for
postgres is ready, gather instructions should be common and fast enough
(IIRC they started to be ok with broadwells, and are better in skylake;
other archs had them for longer).


> c. Modify some nodes (perhaps start with nodeAgg.c) to allow them to
> process a batch TupleTableSlot. This will require some tight loop to
> aggregate the entire TupleTableSlot at once before returning.
> d. Add function in execAmi.c which returns true or false depending on
> if the node supports batch TupleTableSlots or not.
> e. At executor startup determine if the entire plan tree supports
> batch TupleTableSlots, if so enable batch scan mode.

It doesn't really need to be the entire tree. Even if you have a subtree
(say a parametrized index nested loop join) which doesn't support batch
mode, you'll likely still see performance benefits by building a batch
one layer above the non-batch-supporting node.


Greetings,

Andres Freund



Re: asynchronous and vectorized execution

From
Andres Freund
Date:
On 2016-05-10 12:56:17 -0400, Robert Haas wrote:
> I suspect the number of queries that are being hurt by fmgr overhead
> is really large, and I think it would be nice to attack that problem
> more directly.  It's a bit hard to discuss what's worthwhile in the
> abstract, without performance numbers, but when you vectorize, how
> much is the benefit from using SIMD instructions and how much is the
> benefit from just not going through the fmgr every time?

I think fmgr overhead is an issue, but in most profiles of execution
heavy loads I've seen the bottlenecks are elsewhere. They often seem to
roughly look like
+   15.47%  postgres  postgres           [.] slot_deform_tuple
+   12.99%  postgres  postgres           [.] slot_getattr
+   10.36%  postgres  postgres           [.] ExecMakeFunctionResultNoSets
+    9.76%  postgres  postgres           [.] heap_getnext
+    6.34%  postgres  postgres           [.] HeapTupleSatisfiesMVCC
+    5.09%  postgres  postgres           [.] heapgetpage
+    4.59%  postgres  postgres           [.] hash_search_with_hash_value
+    4.36%  postgres  postgres           [.] ExecQual
+    3.30%  postgres  postgres           [.] ExecStoreTuple
+    3.29%  postgres  postgres           [.] ExecScan

or

-   33.67%  postgres  postgres           [.] ExecMakeFunctionResultNoSets  - ExecMakeFunctionResultNoSets     + 99.11%
ExecEvalOr    + 0.89% ExecQual
 
+   14.32%  postgres  postgres           [.] slot_getattr
+    5.66%  postgres  postgres           [.] ExecEvalOr
+    5.06%  postgres  postgres           [.] check_stack_depth
+    5.02%  postgres  postgres           [.] slot_deform_tuple
+    4.05%  postgres  postgres           [.] pgstat_end_function_usage
+    3.69%  postgres  postgres           [.] heap_getnext
+    3.41%  postgres  postgres           [.] ExecEvalScalarVarFast
+    3.36%  postgres  postgres           [.] ExecEvalConst


with a healthy dose of _bt_compare, heap_hot_search_buffer in more index
heavy workloads.

(yes, I just pulled these example profiles from somewhere, but I've more
often seen them look like this, than very fmgr heavy).


That seems to suggest that we need to restructure how we get to calling
fmgr functions, before worrying about the actual fmgr call.


Tomas, Mark, IIRC you'd both generated perf profiles for TPC-H (IIRC?)
queries at some point. Any chance the results are online somewhere?

Greetings,

Andres Freund



Re: asynchronous and vectorized execution

From
Andres Freund
Date:
On 2016-05-11 03:20:12 +0300, Ants Aasma wrote:
> On Tue, May 10, 2016 at 7:56 PM, Robert Haas <robertmhaas@gmail.com> wrote:
> > On Mon, May 9, 2016 at 8:34 PM, David Rowley
> > <david.rowley@2ndquadrant.com> wrote:
> > I don't have any at the moment, but I'm not keen on hundreds of new
> > vector functions that can all have bugs or behavior differences versus
> > the unvectorized versions of the same code.  That's a substantial tax
> > on future development.  I think it's important to understand what
> > sorts of queries we are targeting here.  KaiGai's GPU-acceleration
> > stuff does great on queries with complex WHERE clauses, but most
> > people don't care not only because it's out-of-core but because who
> > actually looks for the records where (a + b) % c > (d + e) * f / g?
> > This seems like it has the same issue.  If we can speed up common
> > queries people are actually likely to run, OK, that's interesting.
> 
> I have seen pretty complex expressions in the projection and
> aggregation. Couple dozen SUM(CASE WHEN a THEN b*c ELSE MIN(d,e)*f
> END) type of expressions. In critical places had to replace them with
> a C coded function that processed a row at a time to avoid the
> executor dispatch overhead.

I've seen that as well, but Was it the actual fmgr indirection causing
the overhead, or was it ExecQual/ExecMakeFunctionResultNoSets et al?

Greetings,

Andres Freund



Re: asynchronous and vectorized execution

From
Ants Aasma
Date:
On Wed, May 11, 2016 at 3:52 AM, Andres Freund <andres@anarazel.de> wrote:
> On 2016-05-11 03:20:12 +0300, Ants Aasma wrote:
>> On Tue, May 10, 2016 at 7:56 PM, Robert Haas <robertmhaas@gmail.com> wrote:
>> > On Mon, May 9, 2016 at 8:34 PM, David Rowley
>> > <david.rowley@2ndquadrant.com> wrote:
>> > I don't have any at the moment, but I'm not keen on hundreds of new
>> > vector functions that can all have bugs or behavior differences versus
>> > the unvectorized versions of the same code.  That's a substantial tax
>> > on future development.  I think it's important to understand what
>> > sorts of queries we are targeting here.  KaiGai's GPU-acceleration
>> > stuff does great on queries with complex WHERE clauses, but most
>> > people don't care not only because it's out-of-core but because who
>> > actually looks for the records where (a + b) % c > (d + e) * f / g?
>> > This seems like it has the same issue.  If we can speed up common
>> > queries people are actually likely to run, OK, that's interesting.
>>
>> I have seen pretty complex expressions in the projection and
>> aggregation. Couple dozen SUM(CASE WHEN a THEN b*c ELSE MIN(d,e)*f
>> END) type of expressions. In critical places had to replace them with
>> a C coded function that processed a row at a time to avoid the
>> executor dispatch overhead.
>
> I've seen that as well, but Was it the actual fmgr indirection causing
> the overhead, or was it ExecQual/ExecMakeFunctionResultNoSets et al?

I don't remember what the exact profile looked like, but IIRC it was
mostly Exec* stuff with advance_aggregates also up there.

Regards,
Ants Aasma



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Tue, May 10, 2016 at 4:57 PM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
> Even so, I would think that the simplification in the executor would be
> worth it. If you need to add a new node there's dozens of places where you
> might have to mess with these giant case statements.

Dozens? I think the number is in the single digits.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Tue, May 10, 2016 at 3:42 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru> wrote:
> Doesn't this actually mean that we need to have normal job scheduler which
> is given queue of jobs and having some pool of threads will be able to
> orginize efficient execution of queries? Optimizer can build pipeline
> (graph) of tasks, which corresponds to execution plan nodes, i.e. SeqScan,
> Sort, ... Each task is splitted into several jobs which can be concurretly
> scheduled by task dispatcher.  So you will not have blocked worker waiting
> for something and all system resources will be utilized. Such approach with
> dispatcher allows to implement quotas, priorities,... Also dispatches can
> care about NUMA and cache optimizations which is especially critical on
> modern architectures. One more reference:
> http://db.in.tum.de/~leis/papers/morsels.pdf

I read this as a proposal to redesign the entire optimizer and
executor to use some new kind of plan.  That's not a project I'm
willing to entertain; it is hard to imagine we could do it in a
reasonable period of time without introducing bugs and performance
regressions.  I think there is a great deal of performance benefit
that we can get by changing things incrementally.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Tue, May 10, 2016 at 7:57 PM, Andres Freund <andres@anarazel.de> wrote:
>> 1. asynchronous execution, by which I mean the ability of a node to
>> somehow say that it will generate a tuple eventually, but is not yet
>> ready, so that the executor can go run some other part of the plan
>> tree while it waits.  [...].  It is also a problem
>> for parallel query: in a parallel sequential scan, the next worker can
>> begin reading the next block even if the current block hasn't yet been
>> received from the OS.  Whether or not this will be efficient is a
>> research question, but it can be done.  However, imagine a parallel
>> scan of a btree index: we don't know what page to scan next until we
>> read the previous page and examine the next-pointer.  In the meantime,
>> any worker that arrives at that scan node has no choice but to block.
>> It would be better if the scan node could instead say "hey, thanks for
>> coming but I'm really not ready to be on-CPU just at the moment" and
>> potentially allow the worker to go work in some other part of the
>> query tree.  For that worker to actually find useful work to do
>> elsewhere, we'll probably need it to be the case either that the table
>> is partitioned or the original query will need to involve UNION ALL,
>> but those are not silly cases to worry about, particularly if we get
>> native partitioning in 9.7.
>
> I've to admit I'm not that convinced about the speedups in the !fdw
> case. There seems to be a lot easier avenues for performance
> improvements.

What I'm talking about is a query like this:

SELECT * FROM inheritance_tree_of_foreign_tables WHERE very_rarely_true;

What we do today is run the remote query on the first child table to
completion, then start it on the second child table, and so on.
Sending all the queries at once can bring a speed-up of a factor of N
to a query with N children, and it's completely independent of every
other speed-up that we might attempt.  This has been under discussion
for years on FDW-related threads as a huge problem that we need to fix
someday, and I really don't see how it's sane not to try.  The shape
of what that looks like is of course arguable, but saying the
optimization isn't valuable blows my mind.

Whether you care about this case or not, this is also important for
parallel query.

> FWIW, I've even hacked something up for a bunch of simple queries, and
> the performance improvements were significant.  Besides it only being a
> weekend hack project, the big thing I got stuck on was considering how
> to exactly determine when to batch and not to batch.

Yeah.  I think we need a system for signalling nodes as to when they
will be run to completion.  But a Boolean is somehow unsatisfying;
LIMIT 1000000 is more like no LIMIT than it it is like LIMIT 1.  I'm
tempted to add a numTuples field to every ExecutorState and give upper
nodes some way to set it, as a hint.

>> For asynchronous execution, I have gone so far as to mock up a bit of
>> what this might look like.  This shouldn't be taken very seriously at
>> this point, but I'm attaching a few very-much-WIP patches to show the
>> direction of my line of thinking.  Basically, I propose to have
>> ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
>> by putting them into a new PlanState member called "result", which is
>> just a Node * so that we can support multiple types of results,
>> instead of returning them.
>
> What different types of results are you envisioning?

TupleTableSlots and TupleTableVectors, mostly.  I think the stuff that
is currently going through MultiExecProcNode() could probably be
folded in as just another type of result.

>> Some care is required here because any
>> functions we execute as scan keys are run with the buffer locked, so
>> we had better not run anything very complicated.  But doing this for
>> simple things like integer equality operators seems like it could save
>> quite a few buffer lock/unlock cycles and some other executor overhead
>> as well.
>
> Hm. Do we really have to keep the page locked in the page-at-a-time
> mode? Shouldn't the pin suffice?

I think we need a lock to examine MVCC visibility information.  A pin
is enough to prevent a tuple from being removed, but not from having
its xmax and cmax overwritten at almost but not quite exactly the same
time.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Konstantin Knizhnik
Date:

On 11.05.2016 17:00, Robert Haas wrote:
> On Tue, May 10, 2016 at 3:42 PM, Konstantin Knizhnik
> <k.knizhnik@postgrespro.ru> wrote:
>> Doesn't this actually mean that we need to have normal job scheduler which
>> is given queue of jobs and having some pool of threads will be able to
>> orginize efficient execution of queries? Optimizer can build pipeline
>> (graph) of tasks, which corresponds to execution plan nodes, i.e. SeqScan,
>> Sort, ... Each task is splitted into several jobs which can be concurretly
>> scheduled by task dispatcher.  So you will not have blocked worker waiting
>> for something and all system resources will be utilized. Such approach with
>> dispatcher allows to implement quotas, priorities,... Also dispatches can
>> care about NUMA and cache optimizations which is especially critical on
>> modern architectures. One more reference:
>> http://db.in.tum.de/~leis/papers/morsels.pdf
> I read this as a proposal to redesign the entire optimizer and
> executor to use some new kind of plan.  That's not a project I'm
> willing to entertain; it is hard to imagine we could do it in a
> reasonable period of time without introducing bugs and performance
> regressions.  I think there is a great deal of performance benefit
> that we can get by changing things incrementally.
>
Yes, I agree with you that complete rewriting of optimizer is huge 
project with unpredictable influence on performance of some queries.
Changing things incrementally is good approach, but only if we are 
moving in right direction.
I still not sure that introduction of async. operations is step in right 
direction. Async.ops are used to significantly complicate code (since 
you have to maintain state yourself). It will be bad if implementation 
of each node has to deal with async state itself in its own manner.

My suggestion is to try to provide some generic mechanism for managing 
state transition and have some scheduler which controls this process. It 
should not be responsibility of node implementation to organize 
asynchronous/parallel execution. Instead of this it should just produce 
set of jobs which execution should  be controlled by scheduler. First 
implementation of scheduler can be quite simple. But later in can become 
more clever: try to bind data to processors and do many other 
optimizations.



-- 
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company




Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Tue, May 10, 2016 at 8:23 PM, Andres Freund <andres@anarazel.de> wrote:
>> c. Modify some nodes (perhaps start with nodeAgg.c) to allow them to
>> process a batch TupleTableSlot. This will require some tight loop to
>> aggregate the entire TupleTableSlot at once before returning.
>> d. Add function in execAmi.c which returns true or false depending on
>> if the node supports batch TupleTableSlots or not.
>> e. At executor startup determine if the entire plan tree supports
>> batch TupleTableSlots, if so enable batch scan mode.
>
> It doesn't really need to be the entire tree. Even if you have a subtree
> (say a parametrized index nested loop join) which doesn't support batch
> mode, you'll likely still see performance benefits by building a batch
> one layer above the non-batch-supporting node.

+1.

I've also wondered about building a new executor node that is sort of
a combination of Nested Loop and Hash Join, but capable of performing
multiple joins in a single operation. (Merge Join is different,
because it's actually matching up the two sides, not just doing
probing once per outer tuple.) So the plan tree would look something
like this:

Multiway Join
-> Seq Scan on driving_table
-> Index Scan on something
-> Index Scan on something_else
-> Hash -> Seq Scan on other_thing
-> Hash -> Seq Scan on other_thing_2
-> Index Scan on another_one

With the current structure, every level of the plan tree has its own
TupleTableSlot and we have to project into each new slot.  Every level
has to go through ExecProcNode.  So it seems to me that this sort of
structure might save quite a few cycles on deep join nests.  I haven't
tried it, though.

With batching, things get even better for this sort of thing.
Assuming the joins are all basically semi-joins, either because they
were written that way or because they are probing unique indexes or
whatever, you can fetch a batch of tuples from the driving table, do
the first join for each tuple to create a matching batch of tuples,
and repeat for each join step.  Then at the end you project.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Konstantin Knizhnik
Date:

On 10.05.2016 20:26, Robert Haas wrote:
> At this moment (February) them have implemented translation of only few
> PostgreSQL operators used by ExecQuals  and do not support aggregates.
> Them get about 2 times increase of speed at synthetic queries and 25%
> increase at TPC-H Q1 (for Q1 most critical is generation of native code for
> aggregates, because ExecQual itself takes only 6% of time for this query).
> Actually these 25% for Q1 were achieved not by using dynamic code
> generation, but switching from PULL to PUSH model in executor.
> It seems to be yet another interesting PostgreSQL executor transformation.
> As far as I know, them are going to publish result of their work to open
> source...
> Interesting.  You may notice that in "asynchronous mode" my prototype
> works using a push model of sorts.  Maybe that should be taken
> further.
>
Latest information from ISP RAS guys: them have made good progress since 
February: them have rewritten most of methods of Scan, Aggregate and 
Join to LLVM API. Also then implemented automatic translation of  
PostgreSQL backend functions to LLVM API.
As a result time of TPC-H Q1 query is reduced four times.

-- 
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company




Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Wed, May 11, 2016 at 10:17 AM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru> wrote:
> Yes, I agree with you that complete rewriting of optimizer is huge project
> with unpredictable influence on performance of some queries.
> Changing things incrementally is good approach, but only if we are moving in
> right direction.
> I still not sure that introduction of async. operations is step in right
> direction. Async.ops are used to significantly complicate code (since you
> have to maintain state yourself). It will be bad if implementation of each
> node has to deal with async state itself in its own manner.

I don't really think so.  The design I've proposed makes adding
asynchronous capability to a node pretty easy, with only minor
changes.

> My suggestion is to try to provide some generic mechanism for managing state
> transition and have some scheduler which controls this process. It should
> not be responsibility of node implementation to organize
> asynchronous/parallel execution. Instead of this it should just produce set
> of jobs which execution should  be controlled by scheduler. First
> implementation of scheduler can be quite simple. But later in can become
> more clever: try to bind data to processors and do many other optimizations.

Whereas this would require a massive rewrite.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Tue, May 10, 2016 at 8:50 PM, Andres Freund <andres@anarazel.de> wrote:
> That seems to suggest that we need to restructure how we get to calling
> fmgr functions, before worrying about the actual fmgr call.

Any ideas on how to do that?  ExecMakeFunctionResultNoSets() isn't
really doing a heck of a lot.  Changing FuncExprState to use an array
rather than a linked list to store its arguments might help some.   We
could also consider having an optimized path that skips the fn_strict
stuff if we can somehow deduce that no NULLs can occur in this
context, but that's a lot of work and new infrastructure.  I feel like
maybe there's something higher-level we could do that would help more,
but I don't know what it is.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Andres Freund
Date:
On 2016-05-11 10:12:26 -0400, Robert Haas wrote:
> > I've to admit I'm not that convinced about the speedups in the !fdw
> > case. There seems to be a lot easier avenues for performance
> > improvements.
> 
> What I'm talking about is a query like this:
> 
> SELECT * FROM inheritance_tree_of_foreign_tables WHERE very_rarely_true;

Note that I said "!fdw case".


> > FWIW, I've even hacked something up for a bunch of simple queries, and
> > the performance improvements were significant.  Besides it only being a
> > weekend hack project, the big thing I got stuck on was considering how
> > to exactly determine when to batch and not to batch.
> 
> Yeah.  I think we need a system for signalling nodes as to when they
> will be run to completion.  But a Boolean is somehow unsatisfying;
> LIMIT 1000000 is more like no LIMIT than it it is like LIMIT 1.  I'm
> tempted to add a numTuples field to every ExecutorState and give upper
> nodes some way to set it, as a hint.

I was wondering whether we should hand down TupleVectorStates to lower
nodes, and their size determines the max batch size...

> >> Some care is required here because any
> >> functions we execute as scan keys are run with the buffer locked, so
> >> we had better not run anything very complicated.  But doing this for
> >> simple things like integer equality operators seems like it could save
> >> quite a few buffer lock/unlock cycles and some other executor overhead
> >> as well.
> >
> > Hm. Do we really have to keep the page locked in the page-at-a-time
> > mode? Shouldn't the pin suffice?
> 
> I think we need a lock to examine MVCC visibility information.  A pin
> is enough to prevent a tuple from being removed, but not from having
> its xmax and cmax overwritten at almost but not quite exactly the same
> time.

We already batch visibility lookups in page-at-a-time
mode. Cf. heapgetpage() / scan->rs_vistuples. So we can evaluate quals
after releasing the lock, but before the pin is released, without that
much effort.  IIRC that isn't used for index lookups, but that's
probably a good idea.

Greetings,

Andres Freund



Re: asynchronous and vectorized execution

From
Andres Freund
Date:
On 2016-05-11 10:32:20 -0400, Robert Haas wrote:
> On Tue, May 10, 2016 at 8:50 PM, Andres Freund <andres@anarazel.de> wrote:
> > That seems to suggest that we need to restructure how we get to calling
> > fmgr functions, before worrying about the actual fmgr call.
> 
> Any ideas on how to do that?  ExecMakeFunctionResultNoSets() isn't
> really doing a heck of a lot.  Changing FuncExprState to use an array
> rather than a linked list to store its arguments might help some.   We
> could also consider having an optimized path that skips the fn_strict
> stuff if we can somehow deduce that no NULLs can occur in this
> context, but that's a lot of work and new infrastructure.  I feel like
> maybe there's something higher-level we could do that would help more,
> but I don't know what it is.

I think it's not just ExecMakeFunctionResultNoSets, it's the whole
call-stack which needs to be optimized together.

E.g. look at a few performance metrics for a simple seqscan query with a
bunch of ORed equality constraints:
SELECT count(*) FROM pgbench_accounts WHERE abalance = -1 OR abalance = -2 OR abalance = -3 OR abalance = -4 OR
abalance= -5 OR abalance = -6 OR abalance = -7 OR abalance = -8 OR abalance = -9 OR abalance = -10;
 

perf record -g -p 27286 -F 5000 -e
cycles:ppp,branch-misses,L1-icache-load-misses,iTLB-load-misses,L1-dcache-load-misses,dTLB-load-misses,LLC-load-misses
sleep3
 
6K cycles:ppp
6K branch-misses
1K L1-icache-load-misses
472 iTLB-load-misses
5K L1-dcache-load-misses
6K dTLB-load-misses
6K LLC-load-misses

You can see that a number of events sample at a high rate, especially
when you take the cycle samples into account.

cycles:
+   32.35%  postgres  postgres           [.] ExecMakeFunctionResultNoSets
+   14.51%  postgres  postgres           [.] slot_getattr
+    5.50%  postgres  postgres           [.] ExecEvalOr
+    5.22%  postgres  postgres           [.] check_stack_depth

branch-misses:
+   73.77%  postgres  postgres           [.] ExecQual
+   17.83%  postgres  postgres           [.] ExecEvalOr
+    1.49%  postgres  postgres           [.] heap_getnext

L1-icache-load-misses:
+    4.71%  postgres  [kernel.kallsyms]  [k] update_curr
+    4.37%  postgres  postgres           [.] hash_search_with_hash_value
+    3.91%  postgres  postgres           [.] heap_getnext
+    3.81%  postgres  [kernel.kallsyms]  [k] task_tick_fair

iTLB-load-misses:
+   27.57%  postgres  postgres           [.] LWLockAcquire
+   18.32%  postgres  postgres           [.] hash_search_with_hash_value
+    7.09%  postgres  postgres           [.] ExecMakeFunctionResultNoSets
+    3.06%  postgres  postgres           [.] ExecEvalConst

L1-dcache-load-misses:
+   20.35%  postgres  postgres           [.] ExecMakeFunctionResultNoSets
+   12.31%  postgres  postgres           [.] check_stack_depth
+    8.84%  postgres  postgres           [.] heap_getnext
+    8.00%  postgres  postgres           [.] slot_deform_tuple
+    7.15%  postgres  postgres           [.] HeapTupleSatisfiesMVCC

dTLB-load-misses:
+   50.13%  postgres  postgres           [.] ExecQual
+   41.36%  postgres  postgres           [.] ExecEvalOr
+    2.96%  postgres  postgres           [.] hash_search_with_hash_value
+    1.30%  postgres  postgres           [.] PinBuffer.isra.3
+    1.19%  postgres  postgres           [.] heap_page_prune_op

LLC-load-misses:
+   24.25%  postgres  postgres           [.] slot_deform_tuple
+   17.45%  postgres  postgres           [.] CheckForSerializableConflictOut
+   10.52%  postgres  postgres           [.] heapgetpage
+    9.55%  postgres  postgres           [.] HeapTupleSatisfiesMVCC
+    7.52%  postgres  postgres           [.] ExecMakeFunctionResultNoSets


For this workload, we expect a lot of LLC-load-misses as the workload is
lot bigger than memory, and it makes sense that they're in
slot_deform_tuple(),heapgetpage(), HeapTupleSatisfiesMVCC() (but uh
CheckForSerializableConflictOut?).  One avenue to optimize is to make
those accesses easier to predict/prefetch, which they're atm likely not.

But leaving that aside, we can see that a lot of the cost is distributed
over ExecQual, ExecEvalOr, ExecMakeFunctionResultNoSets - all of which
judiciously use linked list.  I suspect that by simplifying these
functions / datastructures *AND* by calling them over a batch of tuples,
instead of one-by-one we'd limit the time spent in them considerably.

Greetings,

Andres Freund



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Wed, May 11, 2016 at 11:49 AM, Andres Freund <andres@anarazel.de> wrote:
> On 2016-05-11 10:12:26 -0400, Robert Haas wrote:
>> > I've to admit I'm not that convinced about the speedups in the !fdw
>> > case. There seems to be a lot easier avenues for performance
>> > improvements.
>>
>> What I'm talking about is a query like this:
>>
>> SELECT * FROM inheritance_tree_of_foreign_tables WHERE very_rarely_true;
>
> Note that I said "!fdw case".

Oh, wow, I totally missed that exclamation point.

>> > FWIW, I've even hacked something up for a bunch of simple queries, and
>> > the performance improvements were significant.  Besides it only being a
>> > weekend hack project, the big thing I got stuck on was considering how
>> > to exactly determine when to batch and not to batch.
>>
>> Yeah.  I think we need a system for signalling nodes as to when they
>> will be run to completion.  But a Boolean is somehow unsatisfying;
>> LIMIT 1000000 is more like no LIMIT than it it is like LIMIT 1.  I'm
>> tempted to add a numTuples field to every ExecutorState and give upper
>> nodes some way to set it, as a hint.
>
> I was wondering whether we should hand down TupleVectorStates to lower
> nodes, and their size determines the max batch size...

There's some appeal to that, but it seems complicated to make work.

>> >> Some care is required here because any
>> >> functions we execute as scan keys are run with the buffer locked, so
>> >> we had better not run anything very complicated.  But doing this for
>> >> simple things like integer equality operators seems like it could save
>> >> quite a few buffer lock/unlock cycles and some other executor overhead
>> >> as well.
>> >
>> > Hm. Do we really have to keep the page locked in the page-at-a-time
>> > mode? Shouldn't the pin suffice?
>>
>> I think we need a lock to examine MVCC visibility information.  A pin
>> is enough to prevent a tuple from being removed, but not from having
>> its xmax and cmax overwritten at almost but not quite exactly the same
>> time.
>
> We already batch visibility lookups in page-at-a-time
> mode. Cf. heapgetpage() / scan->rs_vistuples. So we can evaluate quals
> after releasing the lock, but before the pin is released, without that
> much effort.  IIRC that isn't used for index lookups, but that's
> probably a good idea.

The trouble with that is that if you fail the qual, you have to relock
the page.  Which kinda sucks, if the qual is really simple.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Andres Freund
Date:
On 2016-05-11 12:27:55 -0400, Robert Haas wrote:
> On Wed, May 11, 2016 at 11:49 AM, Andres Freund <andres@anarazel.de> wrote:
> > On 2016-05-11 10:12:26 -0400, Robert Haas wrote:
> >> > Hm. Do we really have to keep the page locked in the page-at-a-time
> >> > mode? Shouldn't the pin suffice?
> >>
> >> I think we need a lock to examine MVCC visibility information.  A pin
> >> is enough to prevent a tuple from being removed, but not from having
> >> its xmax and cmax overwritten at almost but not quite exactly the same
> >> time.
> >
> > We already batch visibility lookups in page-at-a-time
> > mode. Cf. heapgetpage() / scan->rs_vistuples. So we can evaluate quals
> > after releasing the lock, but before the pin is released, without that
> > much effort.  IIRC that isn't used for index lookups, but that's
> > probably a good idea.
> 
> The trouble with that is that if you fail the qual, you have to relock
> the page.  Which kinda sucks, if the qual is really simple.

Hm? I'm missing something here? We currently do the visibility checks in
bulk for the whole page. After that we release the page lock. What
prevents us from executing the quals directly after that? And why would
you need to relock the page?



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Wed, May 11, 2016 at 12:30 PM, Andres Freund <andres@anarazel.de> wrote:
> On 2016-05-11 12:27:55 -0400, Robert Haas wrote:
>> On Wed, May 11, 2016 at 11:49 AM, Andres Freund <andres@anarazel.de> wrote:
>> > On 2016-05-11 10:12:26 -0400, Robert Haas wrote:
>> >> > Hm. Do we really have to keep the page locked in the page-at-a-time
>> >> > mode? Shouldn't the pin suffice?
>> >>
>> >> I think we need a lock to examine MVCC visibility information.  A pin
>> >> is enough to prevent a tuple from being removed, but not from having
>> >> its xmax and cmax overwritten at almost but not quite exactly the same
>> >> time.
>> >
>> > We already batch visibility lookups in page-at-a-time
>> > mode. Cf. heapgetpage() / scan->rs_vistuples. So we can evaluate quals
>> > after releasing the lock, but before the pin is released, without that
>> > much effort.  IIRC that isn't used for index lookups, but that's
>> > probably a good idea.
>>
>> The trouble with that is that if you fail the qual, you have to relock
>> the page.  Which kinda sucks, if the qual is really simple.
>
> Hm? I'm missing something here? We currently do the visibility checks in
> bulk for the whole page. After that we release the page lock. What
> prevents us from executing the quals directly after that? And why would
> you need to relock the page?

Oh, yeah, in page-at-a-time mode we can release the lock first.  I was
thinking at what to do in tuple-at-a-time mode (i.e. when the page is
not all-visible).

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Just-in-time compiling things (was: asynchronous and vectorized execution)

From
Andreas Seltenreich
Date:
Konstantin Knizhnik writes:

> Latest information from ISP RAS guys: them have made good progress
> since February: them have rewritten most of methods of Scan, Aggregate
> and Join to LLVM API.

Is their work available somewhere?  I'm experimenting in that area as
well, although I'm using libFirm instead of LLVM.  I wonder what their
motivation to rewrite backend code in LLVM IR was, since I am following
the approach of keeping the IR around when compiling the vanilla
postgres C code, possibly inlining it during JIT and then doing
optimizations on this IR.  That way the logic doesn't have to be
duplicated.

regrads
Andreas



Re: Just-in-time compiling things

From
Konstantin Knizhnik
Date:
On 05/14/2016 12:10 PM, Andreas Seltenreich wrote:
> Konstantin Knizhnik writes:
>
>> Latest information from ISP RAS guys: them have made good progress
>> since February: them have rewritten most of methods of Scan, Aggregate
>> and Join to LLVM API.
> Is their work available somewhere?  I'm experimenting in that area as
> well, although I'm using libFirm instead of LLVM.  I wonder what their
> motivation to rewrite backend code in LLVM IR was, since I am following
> the approach of keeping the IR around when compiling the vanilla
> postgres C code, possibly inlining it during JIT and then doing
> optimizations on this IR.  That way the logic doesn't have to be
> duplicated.

The work is not yet completed but finally it will be definitely put to open source.
I am going to talk a little bit about this project at PGcon in Ottawa at lighting talks, although I do not know details
ofthe project myself.
 
The main difference of their approach comparing with Vitesse DB is that them implement a way of automatic conversion of
PostgreSQLoperators to LLVM IR.
 
So instead of rewritting ~2000 operators manually (a lot of work and errors), them implement converter which transform
thecode of this operators to ... C++ code producing LLVM IR. So manually them need to rewrite only plan nodes. Them
alreadyimplemented 
 
most of nodes (SeqScan, Sort, HashJoin,...) which allows to execute all TPC-H queries. Result will be published soon.
Thelarghest advantage is definitely at Q1 - about 4 times. It is worser than Vitesse DB (8 times) and with manually
writtenoperators (7 
 
times). The most probable reason causing such performance penalty is overflow checking: in manually written LLVM code
itcan be done in more efficient way using correspondent assembler instruction than in code automatically converted from
standardC.
 
But ISP RAS guys are going to fix this problem and improve automatic conversion quality.

I include in CC members of ISP RAS team - you can ask them questions directly.




> regrads
> Andreas


-- 
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company




On Sat, May 14, 2016 at 12:10 PM, Andreas Seltenreich
<seltenreich@gmx.de> wrote:
> Konstantin Knizhnik writes:
>
>> Latest information from ISP RAS guys: them have made good progress
>> since February: them have rewritten most of methods of Scan, Aggregate
>> and Join to LLVM API.
>
> Is their work available somewhere?  I'm experimenting in that area as
> well, although I'm using libFirm instead of LLVM.  I wonder what their
> motivation to rewrite backend code in LLVM IR was, since I am following
> the approach of keeping the IR around when compiling the vanilla
> postgres C code, possibly inlining it during JIT and then doing
> optimizations on this IR.  That way the logic doesn't have to be
> duplicated.

I have discussed availability of their work and the consensus was that
eventually their code will be open source, but not right now, since it
is not ready to be  published. I'll meet (after PGCon)  their
management staff and discuss how we can work together.

>
> regrads
> Andreas
>
>
> --
> Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers



Re: asynchronous and vectorized execution

From
Amit Khandekar
Date:
We may also want to consider handling abstract events such as "tuples-are-available-at-plan-node-X".

One benefit is : we can combine this with batch processing. For e.g. in case of an Append node containing foreign scans, its parent node may not want to process the Append node result until Append is ready with at least 1000 rows. In that case, Append node needs to raise an event "n-tuples-are-ready"; we cannot just rely on fd-ready events. fd-ready event will wake up the foreign scan, but it may not eventually cause its parent Append node to in turn wake up it's parent. 

Other benefit (which I am not sure how significant it is) is this part of the event : "at-plan-node-X". For e.g., for an Append node having 10 foreign scans, when a foreign scan wakes up and becomes ready with tuple(s), it's parent node (i.e. Append) will be executed. But it would speed up things if it knows which of it's foreign scan nodes are ready. From Robert's prototype, I can see that it can find that out by checking the result_ready field of each foreign scan plan state. But if it knows from the event that the node-X is the one who is ready, it can directly take tuples from there. Another thing is, we may want to give the Append node a chance to know all those nodes that are ready, instead of just one node.

How we can do this event abstraction is the other question. We can have one latch for each of the event, and each node would raise its own event by setting the corresponding latch. But I am not sure about latches within a single process as against one process waking up another process. Or else, some internal event structures needs to be present (in estate ?), which then ExecProcNode would use when it does the event looping to wake up (i.e. execute) required nodes.

Also, the WaitEvent.user_data field can have some more info besides the plan state. It can have its parent PlanState stored, so that we don't have to have parent field in plan state. It also can have some more data such as "n-tuples-available".


On 9 May 2016 at 23:03, Robert Haas <robertmhaas@gmail.com> wrote:
Hi,

I realize that we haven't gotten 9.6beta1 out the door yet, but I
think we can't really wait much longer to start having at least some
discussion of 9.7 topics, so I'm going to go ahead and put this one
out there.  I believe there are other people thinking about these
topics as well, including Andres Freund, Kyotaro Horiguchi, and
probably some folks at 2ndQuadrant (but I don't know exactly who).  To
make a long story short, I think there are several different areas
where we should consider major upgrades to our executor.  It's too
slow and it doesn't do everything we want it to do.  The main things
on my mind are:

1. asynchronous execution, by which I mean the ability of a node to
somehow say that it will generate a tuple eventually, but is not yet
ready, so that the executor can go run some other part of the plan
tree while it waits.  This case most obviously arises for foreign
tables, where it makes little sense to block on I/O if some other part
of the query tree could benefit from the CPU; consider SELECT * FROM
lt WHERE qual UNION SELECT * FROM ft WHERE qual.  It is also a problem
for parallel query: in a parallel sequential scan, the next worker can
begin reading the next block even if the current block hasn't yet been
received from the OS.  Whether or not this will be efficient is a
research question, but it can be done.  However, imagine a parallel
scan of a btree index: we don't know what page to scan next until we
read the previous page and examine the next-pointer.  In the meantime,
any worker that arrives at that scan node has no choice but to block.
It would be better if the scan node could instead say "hey, thanks for
coming but I'm really not ready to be on-CPU just at the moment" and
potentially allow the worker to go work in some other part of the
query tree.  For that worker to actually find useful work to do
elsewhere, we'll probably need it to be the case either that the table
is partitioned or the original query will need to involve UNION ALL,
but those are not silly cases to worry about, particularly if we get
native partitioning in 9.7.

2. vectorized execution, by which I mean the ability of a node to
return tuples in batches rather than one by one.  Andres has opined
more than once that repeated trips through ExecProcNode defeat the
ability of the CPU to do branch prediction correctly, slowing the
whole system down, and that they also result in poor CPU cache
behavior, since we jump all over the place executing a little bit of
code from each node before moving onto the next rather than running
one bit of code first, and then another later.  I think that's
probably right.   For example, consider a 5-table join where all of
the joins are implemented as hash tables.  If this query plan is going
to be run to completion, it would make much more sense to fetch, say,
100 tuples from the driving scan and then probe for all of those in
the first hash table, and then probe for all of those in the second
hash table, and so on.  What we do instead is fetch one tuple and
probe for it in all 5 hash tables, and then repeat.  If one of those
hash tables would fit in the CPU cache but all five together will not,
that seems likely to be a lot worse.   But even just ignoring the CPU
cache aspect of it for a minute, suppose you want to write a loop to
perform a hash join.  The inner loop fetches the next tuple from the
probe table and does a hash lookup.  Right now, fetching the next
tuple from the probe table means calling a function which in turn
calls another function which probably calls another function which
probably calls another function and now about 4 layers down we
actually get the next tuple.  If the scan returned a batch of tuples
to the hash join, fetching the next tuple from the batch would
probably be 0 or 1 function calls rather than ... more.  Admittedly,
you've got to consider the cost of marshaling the batches but I'm
optimistic that there are cycles to be squeezed out here.  We might
also want to consider storing batches of tuples in a column-optimized
rather than row-optimized format so that iterating through one or two
attributes across every tuple in the batch touches the minimal number
of cache lines.

Obviously, both of these are big projects that could touch a large
amount of executor code, and there may be other ideas, in addition to
these, which some of you may be thinking about that could also touch a
large amount of executor code.  It would be nice to agree on a way
forward that minimizes code churn and maximizes everyone's attempt to
contribute without conflicting with each other.  Also, it seems
desirable to enable, as far as possible, incremental development - in
particular, it seems to me that it would be good to pick a design that
doesn't require massive changes to every node all at once.  A single
patch that adds some capability to every node in the executor in one
fell swoop is going to be too large to review effectively.

My proposal for how to do this is to make ExecProcNode function as a
backward-compatibility wrapper.  For asynchronous execution, a node
might return a not-ready-yet indication, but if that node is called
via ExecProcNode, it means the caller isn't prepared to receive such
an indication, so ExecProcNode will just wait for the node to become
ready and then return the tuple.  Similarly, for vectorized execution,
a node might return a bunch of tuples all at once.  ExecProcNode will
extract the first one and return it to the caller, and subsequent
calls to ExecProcNode will iterate through the rest of the batch, only
calling the underlying node-specific function when the batch is
exhausted.  In this way, code that doesn't know about the new stuff
can continue to work pretty much as it does today.  Also, and I think
this is important, nodes don't need the permission of their parent
node to use these new capabilities.  They can use them whenever they
wish, without worrying about whether the upper node is prepared to
deal with it.  If not, ExecProcNode will paper over the problem.  This
seems to me to be a good way to keep the code simple.

For asynchronous execution, I have gone so far as to mock up a bit of
what this might look like.  This shouldn't be taken very seriously at
this point, but I'm attaching a few very-much-WIP patches to show the
direction of my line of thinking.  Basically, I propose to have
ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
by putting them into a new PlanState member called "result", which is
just a Node * so that we can support multiple types of results,
instead of returning them.  There is also a result_ready boolean, so
that a node can return without setting this Boolean to engage
asynchronous behavior.  This triggers an "event loop", which
repeatedly waits for FDs chosen by waiting nodes to become readable
and/or writeable and then gives the node a chance to react.
Eventually, the waiting node will stop waiting and have a result
ready, at which point the event loop will give the parent of that node
a chance to run.  If that node consequently becomes ready, then its
parent gets a chance to run.  Eventually (we hope), the node for which
we're waiting becomes ready, and we can then read a result tuple.
With some more work, this seems like it can handle the FDW case, but I
haven't worked out how to make it handle the related parallel query
case.  What we want there is to wait not for the readiness of an FD
but rather for some other process involved in the parallel query to
reach a point where it can welcome assistance executing that node.  I
don't know exactly what the signaling for that should look like yet -
maybe setting the process latch or something.

By the way, one smaller executor project that I think we should also
look at has to do with this comment in nodeSeqScan.c:

static bool
SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
{
        /*
         * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
         * (and this is very bad) - so, here we do not check are keys ok or not.
         */
        return true;
}

Some quick prototyping by my colleague Dilip Kumar suggests that, in
fact, there are cases where pushing down keys into heap_beginscan()
could be significantly faster.  Some care is required here because any
functions we execute as scan keys are run with the buffer locked, so
we had better not run anything very complicated.  But doing this for
simple things like integer equality operators seems like it could save
quite a few buffer lock/unlock cycles and some other executor overhead
as well.

Thoughts, ideas, suggestions, etc. very welcome.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Wed, Jun 29, 2016 at 11:00 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:
> We may also want to consider handling abstract events such as
> "tuples-are-available-at-plan-node-X".
>
> One benefit is : we can combine this with batch processing. For e.g. in case
> of an Append node containing foreign scans, its parent node may not want to
> process the Append node result until Append is ready with at least 1000
> rows. In that case, Append node needs to raise an event
> "n-tuples-are-ready"; we cannot just rely on fd-ready events. fd-ready event
> will wake up the foreign scan, but it may not eventually cause its parent
> Append node to in turn wake up it's parent.

Right, I agree.  I think this case only arises in parallel query.  In
serial execution, there's not really any way for a plan node to just
become ready other than an FD or latch event or the parent becoming
ready.  But in parallel query it can happen, of course, because some
other backend can do work that makes that node ready to produce
tuples.

It's not necessarily the case that we have to deal with this in the
initial patches for this feature, because the most obvious win for
this sort of thing is when we have an Append of ForeignScan plans.
Sure, parallel query has interesting cases, too, but a prototype that
just handles Append over a bunch of postgres_fdw ForeignScans would be
pretty cool.  I suggest that we make that the initial goal here.

> How we can do this event abstraction is the other question. We can have one
> latch for each of the event, and each node would raise its own event by
> setting the corresponding latch. But I am not sure about latches within a
> single process as against one process waking up another process. Or else,
> some internal event structures needs to be present (in estate ?), which then
> ExecProcNode would use when it does the event looping to wake up (i.e.
> execute) required nodes.

I think adding more latches would be a bad idea.  What I think we
should do instead is add two additional data structures to dynamic
shared memory:

1. An array of PGPROC * pointers for all of the workers.  Processes
add their PGPROC * to this array as they start up.  Then, parallel.h
can expose new API ParallelWorkerSetLatchesForGroup(void).  In the
leader, this sets the latch for every worker process for every
parallel context with which the leader is associated; in a worker, it
sets the latch for other processes in the parallel group, and the
leader also.

2. An array of executor nodes where one process might do something
that allows other processes to make progress on that node.  This would
be set up somehow by execParallel.c, which would need to somehow
figure out which plan nodes want to be included in the list.  When an
executor node does something that might unblock other workers, it
calls ParallelWorkerSetLatchesForGroup() and the async stuff then
tries calling all of the nodes in this array again to see if any of
them now think that they've got tuples to return (or just to let them
do additional work without returning tuples).

> Also, the WaitEvent.user_data field can have some more info besides the plan
> state. It can have its parent PlanState stored, so that we don't have to
> have parent field in plan state. It also can have some more data such as
> "n-tuples-available".

I don't think that works, because execution may need to flow
arbitrarily far up the tree.  Just knowing the immediate parent isn't
good enough.  If it generates a tuple, then you have to in turn call
it's parent, and that one then produces a tuple, you have to continue
on even further up the tree.  I think it's going to be very awkward to
make this work without those parent pointers.

BTW, we also need to benchmark those changes to add the parent
pointers and change the return conventions and see if they have any
measurable impact on performance.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Hello,

At Tue, 5 Jul 2016 11:39:41 -0400, Robert Haas <robertmhaas@gmail.com> wrote in
<CA+TgmobnQ6ZpsubttBYC=pSLQ6d=0GuSgBsUFoaARMrie_75BA@mail.gmail.com>
> On Wed, Jun 29, 2016 at 11:00 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:
> > We may also want to consider handling abstract events such as
> > "tuples-are-available-at-plan-node-X".
> >
> > One benefit is : we can combine this with batch processing. For e.g. in case
> > of an Append node containing foreign scans, its parent node may not want to
> > process the Append node result until Append is ready with at least 1000
> > rows. In that case, Append node needs to raise an event
> > "n-tuples-are-ready"; we cannot just rely on fd-ready events. fd-ready event
> > will wake up the foreign scan, but it may not eventually cause its parent
> > Append node to in turn wake up it's parent.
> 
> Right, I agree.  I think this case only arises in parallel query.  In
> serial execution, there's not really any way for a plan node to just
> become ready other than an FD or latch event or the parent becoming
> ready.  But in parallel query it can happen, of course, because some
> other backend can do work that makes that node ready to produce
> tuples.
> 
> It's not necessarily the case that we have to deal with this in the
> initial patches for this feature, because the most obvious win for
> this sort of thing is when we have an Append of ForeignScan plans.
> Sure, parallel query has interesting cases, too, but a prototype that
> just handles Append over a bunch of postgres_fdw ForeignScans would be
> pretty cool.  I suggest that we make that the initial goal here.

This seems to be a good opportunity to show this patch. The
attched patch set does async execution of foreignscan
(postgres_fdw) on the Robert's first infrastructure, with some
modification.

ExecAsyncWaitForNode can get into an inifite-waiting by recursive
calls of ExecAsyncWaitForNode caused by ExecProcNode called from
async-unaware nodes. Such recursive calls cause a wait on
already-ready nodes.

I solved that in the patch set by allocating a separate
async-execution context for every async-execution subtrees, which
is made by ExecProcNode, instead of one async-exec context for
the whole execution tree. This works fine but the way switching
contexts seems ugly.  This may also be solved by make
ExecAsyncWaitForNode return when no node to wait even if the
waiting node is not ready. This might keep the async-exec context
(state) simpler so I'll try this.


> > How we can do this event abstraction is the other question. We can have one
> > latch for each of the event, and each node would raise its own event by
> > setting the corresponding latch. But I am not sure about latches within a
> > single process as against one process waking up another process. Or else,
> > some internal event structures needs to be present (in estate ?), which then
> > ExecProcNode would use when it does the event looping to wake up (i.e.
> > execute) required nodes.
> 
> I think adding more latches would be a bad idea.  What I think we
> should do instead is add two additional data structures to dynamic
> shared memory:
> 
> 1. An array of PGPROC * pointers for all of the workers.  Processes
> add their PGPROC * to this array as they start up.  Then, parallel.h
> can expose new API ParallelWorkerSetLatchesForGroup(void).  In the
> leader, this sets the latch for every worker process for every
> parallel context with which the leader is associated; in a worker, it
> sets the latch for other processes in the parallel group, and the
> leader also.
> 
> 2. An array of executor nodes where one process might do something
> that allows other processes to make progress on that node.  This would
> be set up somehow by execParallel.c, which would need to somehow
> figure out which plan nodes want to be included in the list.  When an
> executor node does something that might unblock other workers, it
> calls ParallelWorkerSetLatchesForGroup() and the async stuff then
> tries calling all of the nodes in this array again to see if any of
> them now think that they've got tuples to return (or just to let them
> do additional work without returning tuples).

Does the ParallelWorkerSetLatchesForGroup use mutex or semaphore
or something like instead of latches?

> > Also, the WaitEvent.user_data field can have some more info besides the plan
> > state. It can have its parent PlanState stored, so that we don't have to
> > have parent field in plan state. It also can have some more data such as
> > "n-tuples-available".
> 
> I don't think that works, because execution may need to flow
> arbitrarily far up the tree.  Just knowing the immediate parent isn't
> good enough.  If it generates a tuple, then you have to in turn call
> it's parent, and that one then produces a tuple, you have to continue
> on even further up the tree.  I think it's going to be very awkward to
> make this work without those parent pointers.

Basically agreed, but going up too far was bad for the reason
above.

> BTW, we also need to benchmark those changes to add the parent
> pointers and change the return conventions and see if they have any
> measurable impact on performance.

I have to bring you a bad news.

With the attached patch, an append on four foreign scans on one
server (at local) performs faster by about 10% and by twice for
three or four foreign scns on separate foreign servers
(connections) respectively, but only when compiled with -O0. I
found that it can take hopelessly small amount of advantage from
compiler optimization, while unpatched version gets faster.

Anyway, the current state of this patch is attached.

For binaries compiled with both -O0 and -O2, ran a simple query
"select sum(a) from <table>" on tables generated by the attached
script, t0, pl, pf0 and pf1 which are a local table, an append on
local tables, an append on foreign tables on the same foreign
server and an append on foreign tables on different foreign
servers respectively. The numbers are the mean values of ten
times run.
       average(ms)     stddev
patched-O0        
t0    891.3934    18.74902154
pl    416.3298    47.38902802
pf0    13523.0777    87.45769903
pf1    3376.6415    183.3578028

patched-O2:
t0    891.4309    5.245807775
pl    408.2932    1.04260004
pf0    13640.3551    52.52211814
pf1    3470.1739    262.3522963

not-patched-O0        
t0    845.3927    18.98379876
pl    363.4933    4.142091341
pf0    14986.1284    23.07288416
pf1    14961.0596    127.2587286

not-patched-O2        
t0    429.8462    31.51970532
pl    176.3959    0.237832551
pf0    8129.3762    44.68774182
pf1    8211.6319    97.93206675

By the way, running the attached testrun.sh, the result for the
first one or two runs of pf0 is faster by about 30%-50% than the
rest for some reason unknown to me...

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Wed, Jul 6, 2016 at 3:29 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> This seems to be a good opportunity to show this patch. The
> attched patch set does async execution of foreignscan
> (postgres_fdw) on the Robert's first infrastructure, with some
> modification.

Cool.

> ExecAsyncWaitForNode can get into an inifite-waiting by recursive
> calls of ExecAsyncWaitForNode caused by ExecProcNode called from
> async-unaware nodes. Such recursive calls cause a wait on
> already-ready nodes.

Hmm, that's annoying.

> I solved that in the patch set by allocating a separate
> async-execution context for every async-execution subtrees, which
> is made by ExecProcNode, instead of one async-exec context for
> the whole execution tree. This works fine but the way switching
> contexts seems ugly.  This may also be solved by make
> ExecAsyncWaitForNode return when no node to wait even if the
> waiting node is not ready. This might keep the async-exec context
> (state) simpler so I'll try this.

I think you should instead try to make ExecAsyncWaitForNode properly reentrant.

> Does the ParallelWorkerSetLatchesForGroup use mutex or semaphore
> or something like instead of latches?

Why would it do that?

>> BTW, we also need to benchmark those changes to add the parent
>> pointers and change the return conventions and see if they have any
>> measurable impact on performance.
>
> I have to bring you a bad news.
>
> With the attached patch, an append on four foreign scans on one
> server (at local) performs faster by about 10% and by twice for
> three or four foreign scns on separate foreign servers
> (connections) respectively, but only when compiled with -O0. I
> found that it can take hopelessly small amount of advantage from
> compiler optimization, while unpatched version gets faster.

Two things:

1. That's not the scenario I'm talking about.  I'm concerned about
making sure that query plans that don't use asynchronous execution
don't get slower.

2. I have to believe that's a defect in your implementation rather
than something intrinsic, or maybe your test scenario is bad.  It's
very hard - really impossible -  to believe that all queries involving
FDW pushdown are locally CPU-bound.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Hello,

At Thu, 7 Jul 2016 13:59:54 -0400, Robert Haas <robertmhaas@gmail.com> wrote in
<CA+TgmobD9uM9=zVz+jvTyEM_o9rwDP3RBJkJPzb0HCpR9-085A@mail.gmail.com>
> On Wed, Jul 6, 2016 at 3:29 AM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > This seems to be a good opportunity to show this patch. The
> > attched patch set does async execution of foreignscan
> > (postgres_fdw) on the Robert's first infrastructure, with some
> > modification.
> 
> Cool.

Thank you.

> > ExecAsyncWaitForNode can get into an inifite-waiting by recursive
> > calls of ExecAsyncWaitForNode caused by ExecProcNode called from
> > async-unaware nodes. Such recursive calls cause a wait on
> > already-ready nodes.
> 
> Hmm, that's annoying.
> 
> > I solved that in the patch set by allocating a separate
> > async-execution context for every async-execution subtrees, which
> > is made by ExecProcNode, instead of one async-exec context for
> > the whole execution tree. This works fine but the way switching
> > contexts seems ugly.  This may also be solved by make
> > ExecAsyncWaitForNode return when no node to wait even if the
> > waiting node is not ready. This might keep the async-exec context
> > (state) simpler so I'll try this.
> 
> I think you should instead try to make ExecAsyncWaitForNode properly reentrant.

I feel the same way. Will try to do that.

> > Does the ParallelWorkerSetLatchesForGroup use mutex or semaphore
> > or something like instead of latches?
> 
> Why would it do that?

I might misunderstand the original sentence but the reason of my
question there is that I didn't see the connection between "When
an executor node does something that might unblock other workers,
it calls ParallelWorkerSetLatchesForGroup()" and "and the async
stuff then tries calling all of the nodes in this array". I
supposed that the former takes place on each worker and the
latter should do the latter on the leader. So I asked the means
to signal the leader to do the latter thing. I should be wrong,
because I feel uneasy (or confused) with this statement..


> >> BTW, we also need to benchmark those changes to add the parent
> >> pointers and change the return conventions and see if they have any
> >> measurable impact on performance.
> >
> > I have to bring you a bad news.
> >
> > With the attached patch, an append on four foreign scans on one
> > server (at local) performs faster by about 10% and by twice for
> > three or four foreign scns on separate foreign servers
> > (connections) respectively, but only when compiled with -O0. I
> > found that it can take hopelessly small amount of advantage from
> > compiler optimization, while unpatched version gets faster.
> 
> Two things:
> 
> 1. That's not the scenario I'm talking about.  I'm concerned about
> making sure that query plans that don't use asynchronous execution
> don't get slower.

The first one donen't (select for t0) is that. It have any
relation with asynchronous staff except the result_ready flag, a
branch caused by it and calling ExecDispatchNode. The difference
from the original is ExecProcNode uses ExecDispatchNode. Even
ExecAsyncWaitForNode is not called.

> 2. I have to believe that's a defect in your implementation rather
> than something intrinsic, or maybe your test scenario is bad.  It's
> very hard - really impossible -  to believe that all queries involving
> FDW pushdown are locally CPU-bound.

Sorry for hard-to-read result but the problem is not in a query
involving FDW, but a query on a local table (but runs parallel
seqscan).  The reason of the difference for the tests involving
FDW should be local scans on the remote side.

Just reverting ExecProcNode and other related part doesn't change
the situation. I proceed the confirmation reverting part by
part.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
At Mon, 11 Jul 2016 17:10:11 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160711.171011.133133724.horiguchi.kyotaro@lab.ntt.co.jp>
> > Two things:
> > 
> > 1. That's not the scenario I'm talking about.  I'm concerned about
> > making sure that query plans that don't use asynchronous execution
> > don't get slower.
> 
> The first one donen't (select for t0) is that. It have any
> relation with asynchronous staff except the result_ready flag, a
> branch caused by it and calling ExecDispatchNode. The difference
> from the original is ExecProcNode uses ExecDispatchNode. Even
> ExecAsyncWaitForNode is not called.
> 
> > 2. I have to believe that's a defect in your implementation rather
> > than something intrinsic, or maybe your test scenario is bad.  It's
> > very hard - really impossible -  to believe that all queries involving
> > FDW pushdown are locally CPU-bound.
> 
> Sorry for hard-to-read result but the problem is not in a query
> involving FDW, but a query on a local table (but runs parallel
> seqscan).  The reason of the difference for the tests involving
> FDW should be local scans on the remote side.
> 
> Just reverting ExecProcNode and other related part doesn't change
> the situation. I proceed the confirmation reverting part by
> part.

Uggg. I had no difference even after finally bumped into master.
What is more strange, a binary built from what should be the same
"master" but extended by "git archive | tar" finishes the query
(select sum(a) from t0) in a half time to the master in my git
reposiotrty with -O2. In short, the patch doesn't seem to be the
cause of the difference.

I should investigate the difference between them, or begin again
with a clean environment..

Anyway I need some time to cool down..

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Cooled down then measured performance again.

I show you the true result briefly for now.

At Mon, 11 Jul 2016 19:07:22 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160711.190722.145849861.horiguchi.kyotaro@lab.ntt.co.jp>
> Anyway I need some time to cool down..

I recalled that I put Makefile.custom that contains
CFLAGS="-O0". Removing that gave me a sainer result.


patched- -O2

table   10-average(ms)  stddev  runtime-diff from unpatched(%) 
t0       441.78         0.32         3.4
pl       201.77         0.32        13.6
pf0     6619.22        18.99       -19.7
pf1     1800.72        32.72       -78.0
---
unpatched- -O2

t0       427.21         0.42
pl       177.54         0.25
pf0     8250.42        23.29
pf1     8206.02        12.91

==========
 3% slower for local 1*seqscan (2-parallel)14% slower for append-4*seqscan (no-prallel)19% faster for
append-4*foreignscan(all scans on one connection)78% faster for append-4*foreignscan (scans have dedicate connection)
 

ExecProcNode might be able to be optimized a bit.
ExecAppend seems to need some fix.

Addition to the aboves, I will try reentrant ExecAsyncWaitForNode
or something.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
I forgot to mention.

At Tue, 12 Jul 2016 11:04:17 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160712.110417.145469826.horiguchi.kyotaro@lab.ntt.co.jp>
> Cooled down then measured performance again.
> 
> I show you the true result briefly for now.
> 
> At Mon, 11 Jul 2016 19:07:22 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20160711.190722.145849861.horiguchi.kyotaro@lab.ntt.co.jp>
 
> > Anyway I need some time to cool down..
> 
> I recalled that I put Makefile.custom that contains
> CFLAGS="-O0". Removing that gave me a sainer result.

Different from the previous measurements, the remote side in
these measurements is unpatched-O2 postgres, so the differences
are made only by the local-side changes.

> patched- -O2
> 
> table   10-average(ms)  stddev  runtime-diff from unpatched(%) 
> t0       441.78         0.32         3.4
> pl       201.77         0.32        13.6
> pf0     6619.22        18.99       -19.7
> pf1     1800.72        32.72       -78.0
> ---
> unpatched- -O2
> 
> t0       427.21         0.42
> pl       177.54         0.25
> pf0     8250.42        23.29
> pf1     8206.02        12.91
> 
> ==========
> 
>   3% slower for local 1*seqscan (2-parallel)
>  14% slower for append-4*seqscan (no-prallel)
>  19% faster for append-4*foreignscan (all scans on one connection)
>  78% faster for append-4*foreignscan (scans have dedicate connection)
> 
> ExecProcNode might be able to be optimized a bit.
> ExecAppend seems to need some fix.
> 
> Addition to the aboves, I will try reentrant ExecAsyncWaitForNode
> or something.

regards,
-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Hello,

At Tue, 12 Jul 2016 11:42:55 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160712.114255.156540680.horiguchi.kyotaro@lab.ntt.co.jp>
> >   3% slower for local 1*seqscan (2-parallel)
> >  14% slower for append-4*seqscan (no-prallel)
> >  19% faster for append-4*foreignscan (all scans on one connection)
> >  78% faster for append-4*foreignscan (scans have dedicate connection)
> > 
> > ExecProcNode might be able to be optimized a bit.
> > ExecAppend seems to need some fix.

After some refactoring, degradation for a simple seqscan is
reduced to 1.4% and that of "Append(SeqScan())" is reduced to
1.7%. The gains are the same to the previous measurement. Scale
has been changed from previous measurement in some test cases.

t0- (SeqScan()) (2 parallel)
pl- (Append(4 * SeqScan()))
pf0 (Append(4 * ForeignScan())) all ForeignScans are on the same connection.
pf1 (Append(4 * ForeignScan())) all ForeignScans have their own connections.
                            
patched-O2    time(ms)  stddev(ms)  gain from unpatched (%)       t0      4121.27      1.1      -1.44       pl
1757.41     0.94     -1.73       pf0     6458.99    192.4      20.26       pf1     1747.4      24.81     78.39
              
 
unpatched-O2                                           t0      4062.6       1.95           pl      1727.45      9.41
      pf0     8100.47     24.51          pf1     8086.52     33.53   
 

> > Addition to the aboves, I will try reentrant ExecAsyncWaitForNode
> > or something.

After some consideration, I found that ExecAsyncWaitForNode
cannot be reentrant because it means that the control goes into
async-unaware nodes while having not-ready nodes, that is
inconsistent state. To inhibit such reentering, I allocated node
identifiers in depth-first order so that ascendant-descendant
relationship can be checked (nested-set model) in simple way and
call ExecAsyncConfigureWait only for the descendant nodes of the
parameter planstate.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
The previous patch set doesn't accept --enable-cassert. The
attached additional one fixes it. It theoretically won't give
degradation but I'll measure the performance change.

At Thu, 21 Jul 2016 18:50:07 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160721.185007.268388411.horiguchi.kyotaro@lab.ntt.co.jp>
> Hello,
> 
> At Tue, 12 Jul 2016 11:42:55 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20160712.114255.156540680.horiguchi.kyotaro@lab.ntt.co.jp>
 
> After some refactoring, degradation for a simple seqscan is
> reduced to 1.4% and that of "Append(SeqScan())" is reduced to
> 1.7%. The gains are the same to the previous measurement. Scale
> has been changed from previous measurement in some test cases.
> 
> t0- (SeqScan()) (2 parallel)
> pl- (Append(4 * SeqScan()))
> pf0 (Append(4 * ForeignScan())) all ForeignScans are on the same connection.
> pf1 (Append(4 * ForeignScan())) all ForeignScans have their own connections.
> 
>                              
> patched-O2    time(ms)  stddev(ms)  gain from unpatched (%)
>         t0      4121.27      1.1      -1.44
>         pl      1757.41      0.94     -1.73
>         pf0     6458.99    192.4      20.26
>         pf1     1747.4      24.81     78.39
>                           
> unpatched-O2                                    
>         t0      4062.6       1.95    
>         pl      1727.45      9.41    
>         pf0     8100.47     24.51   
>         pf1     8086.52     33.53   
> 
> > > Addition to the aboves, I will try reentrant ExecAsyncWaitForNode
> > > or something.
> 
> After some consideration, I found that ExecAsyncWaitForNode
> cannot be reentrant because it means that the control goes into
> async-unaware nodes while having not-ready nodes, that is
> inconsistent state. To inhibit such reentering, I allocated node
> identifiers in depth-first order so that ascendant-descendant
> relationship can be checked (nested-set model) in simple way and
> call ExecAsyncConfigureWait only for the descendant nodes of the
> parameter planstate.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: asynchronous and vectorized execution

From
Amit Khandekar
Date:


On 21 July 2016 at 15:20, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:

After some consideration, I found that ExecAsyncWaitForNode
cannot be reentrant because it means that the control goes into
async-unaware nodes while having not-ready nodes, that is
inconsistent state. To inhibit such reentering, I allocated node
identifiers in depth-first order so that ascendant-descendant
relationship can be checked (nested-set model) in simple way and
call ExecAsyncConfigureWait only for the descendant nodes of the
parameter planstate.


We have estate->waiting_nodes containing a mix of async-aware and non-async-aware nodes. I was thinking, an asynchrony tree would have only async-aware nodes, with possible multiple asynchrony sub-trees in a tree. Somehow, if we restrict the bubbling up of events only upto the root of the asynchrony subtree, do you think we can simplify some of the complexities ? For e.g. ExecAsyncWaitForNode() has become a bit complex seemingly because it has to handle non-async-nodes also, and that's the reason I believe you have introduced modes such as ASYNCCONF_FORCE_ADD.
 
regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center

Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Thank you for the comment.

At Mon, 1 Aug 2016 10:44:56 +0530, Amit Khandekar <amitdkhan.pg@gmail.com> wrote in
<CAJ3gD9ek4Y4SGTSuc_pzkGYwLMbrc9QOM7m1D8bj99JNW16o0g@mail.gmail.com>
> On 21 July 2016 at 15:20, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp
> > wrote:
> 
> >
> > After some consideration, I found that ExecAsyncWaitForNode
> > cannot be reentrant because it means that the control goes into
> > async-unaware nodes while having not-ready nodes, that is
> > inconsistent state. To inhibit such reentering, I allocated node
> > identifiers in depth-first order so that ascendant-descendant
> > relationship can be checked (nested-set model) in simple way and
> > call ExecAsyncConfigureWait only for the descendant nodes of the
> > parameter planstate.
> >
> >
> We have estate->waiting_nodes containing a mix of async-aware and
> non-async-aware nodes. I was thinking, an asynchrony tree would have only
> async-aware nodes, with possible multiple asynchrony sub-trees in a tree.
> Somehow, if we restrict the bubbling up of events only upto the root of the
> asynchrony subtree, do you think we can simplify some of the complexities ?

The current code prohibiting regsitration of nodes outside the
current subtree to avoid the reentring-disaster.

Indeed leaving the "waiting node" mark or something like on every
root node at the first visit will enable the propagation to stop
upto the root of any async-subtree. Neverheless, when an
async-child in an inactive async-root fires, the new tuple is
loaded but is not consumed then the succeeding firing on the same
child leads to a dead-lock (without result queueing). However,
that can be avoided if ExecAsyncConfigureWait doesn't register
nodes in ready state.

On the other hand, any two or more asynchronous nodes can share a
syncronization object. For instance, multiple postgres_fdw scan
node can share one server connection and only one of them can get
into waitable state at once. If no async-child in the current
async subtree is waitable, it must be stuck. So I think it is
crucial for ExecAsyncWaitForNode to force at least one child *in
the current async subtree* to get into waiting state for such
situation. The ascendant-descendant relationship is necessary to
do that anyway.

Since we should have the node-id to detect ascendant-descendant
relationship anyway and finally should restrict async-nodes with
it, activating only descendant node from the first would make the
things rather simple than avoiding possible dead-lock laster as
described above.

# It is implemented as per-subtree waiting-node list but it was
# fragile and too ugly..


> For e.g. ExecAsyncWaitForNode() has become a bit complex seemingly because
> it has to handle non-async-nodes also, and that's the reason I believe you
> have introduced modes such as ASYNCCONF_FORCE_ADD.

As explained above, the ASYNCCONF_FORCE_ADD is not for
non-async-nodes, but for sets of async nodes that share a
synchronization object. We could let ExecAsyncConfigureWait force
acquire async-object from the first, but it in turn causes
possiblly unnecessary transfer of a sync-object among the nodes
sharing it.



I wish the above sentsnces are readable enough, but any questions
are welcome even the meaning of a sentence.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Hello,

I considered applying the async infrastructure onto nodeGather,
but since parallel workers hardly make Gather (or the leader)
wait, it's really useless at least for simple cases. Furthermore,
as several people may have said before, being defferent from
foreign scans, gather (or other kinds of parallel) nodes usually
have several workers and will have up to two digit nubmers at the
most even on so-called many-core boxes. I finally gave up
applying this to nodeGather.

As the result, the attached patchset is functionally the same
with the last version but replace misused Assert with
AssertMacro.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
No, it was wrong.

At Mon, 29 Aug 2016 17:08:36 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160829.170836.161449399.horiguchi.kyotaro@lab.ntt.co.jp>
> Hello,
> 
> I considered applying the async infrastructure onto nodeGather,
> but since parallel workers hardly make Gather (or the leader)
> wait, it's really useless at least for simple cases. Furthermore,
> as several people may have said before, being defferent from
> foreign scans, gather (or other kinds of parallel) nodes usually
> have several workers and will have up to two digit nubmers at the
> most even on so-called many-core boxes. I finally gave up
> applying this to nodeGather.

I overlooked that local scan takes place instead of waiting
workers to be ready. I will reconsider counting that..

> As the result, the attached patchset is functionally the same
> with the last version but replace misused Assert with
> AssertMacro.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
This is random thoughts on this patch.

At Tue, 30 Aug 2016 12:17:52 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160830.121752.100817694.horiguchi.kyotaro@lab.ntt.co.jp>
> > As the result, the attached patchset is functionally the same
> > with the last version but replace misused Assert with
> > AssertMacro.

There's perfomance degradation for non-asynchronous nodes, as
shown as 't0' below.

The patch adds two "if-then" and one additional function call as
asynchronous stuff into ExecProcnode, which is heavily passed and
foremerly consists only five meaningful lines. The stuff slows
performance by about 1% for simple seqscan case. The following is
the performance numbers previously shown upthread.  (Or the
difference might be too small to get meaningful performance
difference..)

===
t0- (SeqScan()) (2 parallel)
pl- (Append(4 * SeqScan()))
pf0 (Append(4 * ForeignScan())) all ForeignScans are on the same connection.
pf1 (Append(4 * ForeignScan())) all ForeignScans have their own connections.
                            
patched-O2    time(ms)  stddev(ms)  gain from unpatched (%)       t0      4121.27      1.1      -1.44       pl
1757.41     0.94     -1.73       pf0     6458.99    192.4      20.26       pf1     1747.4      24.81     78.39
              
 
unpatched-O2                                           t0      4062.6       1.95           pl      1727.45      9.41
      pf0     8100.47     24.51          pf1     8086.52     33.53   
 
===

So, finally, it seems that the infrastructure should not habit in
ExecProcNode, or need to redesign the executor.  I tried
jump-table to dispatch nodes which was in vain. Having a flag in
EState may be able to get rid of async stuff from non-async
route. (similar to, but maybe different from my first patch) JIT
compiling seems promising but it is a different thing.


As for nodeGather, it expects the leader process to be one of
workers, the leader should be free from it so as to behave as an
async node. But still the expectected number of workers seems to
be too small to take a meaningful benefit from async execution.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Hello,

At Thu, 01 Sep 2016 16:12:31 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20160901.161231.110068639.horiguchi.kyotaro@lab.ntt.co.jp>
> There's perfomance degradation for non-asynchronous nodes, as
> shown as 't0' below.
> 
> The patch adds two "if-then" and one additional function call as
> asynchronous stuff into ExecProcnode, which is heavily passed and
> foremerly consists only five meaningful lines. The stuff slows
> performance by about 1% for simple seqscan case. The following is
> the performance numbers previously shown upthread.  (Or the
> difference might be too small to get meaningful performance
> difference..)

I tried __builtin_expect before moving the stuff out of
execProcNode. (attached patch) I found a conversation about the
pragma in past discussion.

https://www.postgresql.org/message-id/CA+TgmoYknejCgWMb8Tg63qA67JoUG2uCc0DZc5mm9=e18AmigA@mail.gmail.com

> If we can show cases where it reliably produces a significant
> speedup, then I would think it would be worthwhile

I got a result as the followings.

master(67e1e2a)-O2     time(ms)  stddev(ms) t0: 3928.22 (  0.40)   # Simple SeqScan only pl: 1665.14 (  0.53)   #
Append(SeqScan)

Patched-O2 / NOT Use __builtin_expect t0: 4042.69 (  0.92)    degradation to master is 2.9% pl: 1698.46 (  0.44)
degradationto master is 2.0%
 

Patched-O2 / Use __builtin_expect t0: 3886.69 (  1.93)    *gain* to master is 1.06% pl: 1671.66 (  0.67)    degradation
tomaster is 0.39%
 

I haven't directly seen the pragmra's implication for
optimization on surrounding code but I suspect there's some
implication. I also tried the pragma to ExecAppend but no
difference seen. The numbers flucture easily by any changes in
the machine's state so the lower digits aren't trustworthy but
several succeeding repetitions showed fluctuations up to some
milliseconds.

execProcNode will be allowed to be as it is if __builtin_expect
is usable but ExecAppend still needs an improvement.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Mon, Aug 29, 2016 at 4:08 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> [ new patches ]

+            /*
+             * We assume that few nodes are async-aware and async-unaware
+             * nodes cannot be revserse-dispatched from lower nodes that is
+             * async-aware. Firing of an async node that is not a descendant
+             * of the planstate will cause such reverse-diaptching to
+             * async-aware nodes, which is unexpected behavior for them.
+             *
+             * For instance, consider an async-unaware Hashjoin(OUTER, INNER)
+             * where the OUTER is running asynchronously but the Hashjoin is
+             * waiting on the async INNER during inner-hash creation. If the
+             * OUTER fires for the case, since anyone is waiting on it,
+             * ExecAsyncWaitForNode finally dispatches to the Hashjoin which
+             * is now in the middle of thing its work.
+             */
+            if (!IsParent(planstate, node))
+                continue;

I'm not entirely sure that I understand this comment, but I don't
think it's going in the right direction.  Let's start with the example
in the second paragraph. If the hash join is async-unaware, then it
isn't possible for the hash join to be both running the outer side of
the join asynchronously and at the same time waiting on the inner
side.  Once it tries to pull the first tuple from the outer side, it's
waiting for that to finish and can't do anything else.  So, the inner
side can't possibly get touched in any way until the outer side
finishes.  For anything else to happen, the hash join would have to be
async-aware.  Even if we did that, I don't think it would be right to
kick off both sides of the hash join at the same time.  Right now, if
the outer side turns out to be empty, we never need to build the hash
table, and that's good.

I don't think it's a good idea to wait for only nodes that are in the
current subtree.  For example, consider a plan like this:

Append
-> Foreign Scan on a
-> Hash Join -> Foreign Scan on b -> Hash   -> Seq Scan on x

Suppose Append and Foreign Scan are parallel-aware but the other nodes
are not.  Append kicks off the Foreign Scan on a and then waits for
the hash join to produce a tuple; the hash join kicks off the Foreign
Scan on b and waits for it to return a tuple.  If, while we're waiting
for the foreign scan on b, the foreign scan on a needs some attention
- either to produce tuples, or maybe just to call PQconsumeInput() so
that more data can be sent from the other side, I think we need to be
able to do that.  There's no real problem here; even if the Append
becomes result-ready before the hash join returns, that is fine.  We
will not actually be able to return from the append until the hash
join returns because of what's on the call stack, but that doesn't
mean that the Append can't be marked result-ready sooner than that.
The situation can be improved by making the hash join node
parallel-aware, but if we don't do that it's still not broken.

I think the reason that you originally got backed into this design was
because of problems with reentrancy.  I don't think I really
understand in any detail exactly what problem you hit there, but it
seems to me that the following problem could occur:
ExecAsyncWaitForNode finds two events and schedules two callbacks.  It
calls the first of those two callbacks.  Before that callback returns,
it again calls ExecAsyncWaitForNode.  But the new invocation of
ExecAsyncWaitForNode doesn't know that there is a second callback
pending, so it somehow gets confused.  However, I think this problem
can fixed using a different method.  The occurred_event and callbacks
arrays defined by ExecAsyncWaitForNode can be made part of the EState
rather than being local variables.  When ExecAsyncWaitForNode is
called, it checks whether there are any pending callbacks; if so, it
removes and calls the first one.  Only if there are no pending
callbacks does it actually wait; when a wait event fires, one or more
new callbacks are generated.  This is very similar to the reason why
ReceiveSharedInvalidMessages uses a static messages array rather than
a normal local variable.  That function is solving a problem which I
suspect is very similar to the one we have here.  However, it would be
helpful if you could provide some more details on what you think the
reentrancy problems are, because I'm not able to figure them out from
your messages so far.

The most mysterious part of this hunk to me is the comment that
"Firing of an async node that is not a descendant of the planstate
will cause such reverse-diaptching to async-aware nodes, which is
unexpected behavior for them."  It is the async-unaware nodes which
might have a problem.  The nodes that have been taught about the new
system should know what to expect.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Tue, Aug 2, 2016 at 3:41 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> Thank you for the comment.
>
> At Mon, 1 Aug 2016 10:44:56 +0530, Amit Khandekar <amitdkhan.pg@gmail.com> wrote in
<CAJ3gD9ek4Y4SGTSuc_pzkGYwLMbrc9QOM7m1D8bj99JNW16o0g@mail.gmail.com>
>> On 21 July 2016 at 15:20, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp
>> > wrote:
>>
>> >
>> > After some consideration, I found that ExecAsyncWaitForNode
>> > cannot be reentrant because it means that the control goes into
>> > async-unaware nodes while having not-ready nodes, that is
>> > inconsistent state. To inhibit such reentering, I allocated node
>> > identifiers in depth-first order so that ascendant-descendant
>> > relationship can be checked (nested-set model) in simple way and
>> > call ExecAsyncConfigureWait only for the descendant nodes of the
>> > parameter planstate.
>> >
>> >
>> We have estate->waiting_nodes containing a mix of async-aware and
>> non-async-aware nodes. I was thinking, an asynchrony tree would have only
>> async-aware nodes, with possible multiple asynchrony sub-trees in a tree.
>> Somehow, if we restrict the bubbling up of events only upto the root of the
>> asynchrony subtree, do you think we can simplify some of the complexities ?
>
> The current code prohibiting regsitration of nodes outside the
> current subtree to avoid the reentring-disaster.
>
> Indeed leaving the "waiting node" mark or something like on every
> root node at the first visit will enable the propagation to stop
> upto the root of any async-subtree. Neverheless, when an
> async-child in an inactive async-root fires, the new tuple is
> loaded but is not consumed then the succeeding firing on the same
> child leads to a dead-lock (without result queueing). However,
> that can be avoided if ExecAsyncConfigureWait doesn't register
> nodes in ready state.

Why would a node call ExecAsyncConfigureWait in the first place if it
already had a result ready?  I think it shouldn't do that.

> On the other hand, any two or more asynchronous nodes can share a
> syncronization object. For instance, multiple postgres_fdw scan
> node can share one server connection and only one of them can get
> into waitable state at once. If no async-child in the current
> async subtree is waitable, it must be stuck. So I think it is
> crucial for ExecAsyncWaitForNode to force at least one child *in
> the current async subtree* to get into waiting state for such
> situation. The ascendant-descendant relationship is necessary to
> do that anyway.

This is another example of a situation where waiting only for nodes
within a subtree causes problems.

Suppose there are two Foreign Scans in completely different parts of
the plan tree that are going to use, in alternation, the same
connection to the same remote server.  When we encounter the first
one, it kicks off the query, uses ExecAsyncConfigureWait to register
itself as waiting, and returns without becoming ready.  When we
encounter the second one, it can't kick off the query and therefore
has no chance of becoming ready until after the first one has finished
with the connection.  Suppose we then wait for the second Foreign
Scan.  Well, we had better wait for the first one, too!  If we don't,
it will never finish with the connection, so the second node will
never get to use it, and now we're in trouble.

I think what we need is for the ConnCacheEntry to have a place to note
the ForeignScanState that is using the connection and any other
PlanState objects that would like to use it.  When one
ForeignScanState is done with the ConnCacheEntry, it activates the
next one, which then takes over.  That seems simple enough, but
there's a problem here for suspended queries: if we stop executing a
plan while some scan within that plan is holding onto a
ConnCacheEntry, and then we run some other query that wants to use the
same one, we've got a problem.  Maybe we can get by with letting the
other query finish running and then executing our own query, but that
might be messy to implement.  Another idea is to somehow let any
in-progress query finish running before allowing the first query to be
suspended; that would need some new infrastructure.

My main point here is that I think waiting for only a subtree is an
idea that cannot work out well.  Whatever problems are pushing you
into that design, we need to confront those problems directly and fix
them.  There shouldn't be any unsolvable problems in waiting for
everything in the whole query, and I'm pretty sure that's going to be
a more elegant and better-performing design.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Amit Khandekar
Date:


On 13 September 2016 at 20:20, Robert Haas <robertmhaas@gmail.com> wrote:
On Mon, Aug 29, 2016 at 4:08 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> [ new patches ]

+            /*
+             * We assume that few nodes are async-aware and async-unaware
+             * nodes cannot be revserse-dispatched from lower nodes that is
+             * async-aware. Firing of an async node that is not a descendant
+             * of the planstate will cause such reverse-diaptching to
+             * async-aware nodes, which is unexpected behavior for them.
+             *
+             * For instance, consider an async-unaware Hashjoin(OUTER, INNER)
+             * where the OUTER is running asynchronously but the Hashjoin is
+             * waiting on the async INNER during inner-hash creation. If the
+             * OUTER fires for the case, since anyone is waiting on it,
+             * ExecAsyncWaitForNode finally dispatches to the Hashjoin which
+             * is now in the middle of thing its work.
+             */
+            if (!IsParent(planstate, node))
+                continue;

I'm not entirely sure that I understand this comment, but I don't
think it's going in the right direction.  Let's start with the example
in the second paragraph. If the hash join is async-unaware, then it
isn't possible for the hash join to be both running the outer side of
the join asynchronously and at the same time waiting on the inner
side.  Once it tries to pull the first tuple from the outer side, it's
waiting for that to finish and can't do anything else.  So, the inner
side can't possibly get touched in any way until the outer side
finishes.  For anything else to happen, the hash join would have to be
async-aware.  Even if we did that, I don't think it would be right to
kick off both sides of the hash join at the same time.  Right now, if
the outer side turns out to be empty, we never need to build the hash
table, and that's good.

I feel the !IsParent() condition is actually to prevent the infinite wait
caused by a re-entrant issue in ExecAsuncWaitForNode() that Kyotaro mentioned
earlier. But yes, the comments don't explain exactly how the hash join can
cause the re-entrant issue.

But I attempted to come up with some testcase which might reproduce the
infinite-waiting in ExecAsyncWaitForNode() after removing the !IsParent() check
so that the other subtree nodes are also included, but I couldn't reproduce.
Kyotaro, is it possible for you to give a testcase that consistently hangs if
we revert back the !IsParent() check ?

I was also thinking about another possibility where the same plan state node is
re-entered, as explained below. 

I don't think it's a good idea to wait for only nodes that are in the
current subtree.  For example, consider a plan like this:

Append
-> Foreign Scan on a
-> Hash Join
  -> Foreign Scan on b
  -> Hash
    -> Seq Scan on x

Suppose Append and Foreign Scan are parallel-aware but the other nodes
are not.  Append kicks off the Foreign Scan on a and then waits for
the hash join to produce a tuple; the hash join kicks off the Foreign
Scan on b and waits for it to return a tuple.  If, while we're waiting
for the foreign scan on b, the foreign scan on a needs some attention
- either to produce tuples, or maybe just to call PQconsumeInput() so
that more data can be sent from the other side, I think we need to be
able to do that.  There's no real problem here; even if the Append
becomes result-ready before the hash join returns, that is fine. 

Yes I agree : we should be able to do this. Sine we have all the waiting events
in a common estate, there's no harm if we start executing nodes of another
sub-tree if we get an event from there.

But I am thinking about what would happen when this node from other sub-tree
returns result_ready, and then it's parents are called, and then the result
gets bubbled up upto the node which had already caused us to call
ExecAsyncWaitForNode() in the first place.

For e.g., in the above plan which you specified, suppose :
1. Hash Join has called ExecProcNode() for the child foreign scan b, and so is
waiting in ExecAsyncWaitForNode(foreign_scan_on_b).
2. The event wait list already has foreign scan on a that is on a different
subtree.
3. This foreign scan a happens to be ready, so in 
ExecAsyncWaitForNode (), ExecDispatchNode(foreign_scan_a) is called,
which returns with result_ready.
4. Since it returns result_ready, it's parent node is now inserted in the
callbacks array, and so it's parent (Append) is executed.
5. But, this Append planstate is already in the middle of executing Hash
join, and is waiting for HashJoin.

Is this safe to execute the same plan state when it is already inside its
execution ? In other words, is the plan state re-entrant ? I suspect, the new
execution may even corrupt the structures with which it was already executing.

In usual cases, a tree can contain multiple plan state nodes belonging to the
same plan node, but in this case, we are having the same plan state node being
executed again while it is already executing.

I suspect this can be one reason why Kyotaro might be getting infinite
recursion issues. May be we need to prevent a plan state node to re-enter,
but allow nodes from any subtree to execute. So propagate the result upwards
until we get a node which is already executing.

 

Re: asynchronous and vectorized execution

From
Robert Haas
Date:
On Fri, Sep 23, 2016 at 8:45 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:
> For e.g., in the above plan which you specified, suppose :
> 1. Hash Join has called ExecProcNode() for the child foreign scan b, and so
> is
> waiting in ExecAsyncWaitForNode(foreign_scan_on_b).
> 2. The event wait list already has foreign scan on a that is on a different
> subtree.
> 3. This foreign scan a happens to be ready, so in
> ExecAsyncWaitForNode (), ExecDispatchNode(foreign_scan_a) is called,
> which returns with result_ready.
> 4. Since it returns result_ready, it's parent node is now inserted in the
> callbacks array, and so it's parent (Append) is executed.
> 5. But, this Append planstate is already in the middle of executing Hash
> join, and is waiting for HashJoin.

Ah, yeah, something like that could happen.  I've spent much of this
week working on a new design for this feature which I think will avoid
this problem.  It doesn't work yet - in fact I can't even really test
it yet.  But I'll post what I've got by the end of the day today so
that anyone who is interested can look at it and critique.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
Hello, thank you for the comment.

At Fri, 23 Sep 2016 18:15:40 +0530, Amit Khandekar <amitdkhan.pg@gmail.com> wrote in
<CAJ3gD9fZ=rtBZ0i1_pxycbkgxi=OzTgv1n0ojkmK318Mcc921A@mail.gmail.com>
> On 13 September 2016 at 20:20, Robert Haas <robertmhaas@gmail.com> wrote:
> 
> > On Mon, Aug 29, 2016 at 4:08 AM, Kyotaro HORIGUCHI
> > <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > > [ new patches ]
> >
> > +            /*
> > +             * We assume that few nodes are async-aware and async-unaware
> > +             * nodes cannot be revserse-dispatched from lower nodes that
> > is
> > +             * async-aware. Firing of an async node that is not a
> > descendant
> > +             * of the planstate will cause such reverse-diaptching to
> > +             * async-aware nodes, which is unexpected behavior for them.
> > +             *
> > +             * For instance, consider an async-unaware Hashjoin(OUTER,
> > INNER)
> > +             * where the OUTER is running asynchronously but the Hashjoin
> > is
> > +             * waiting on the async INNER during inner-hash creation. If
> > the
> > +             * OUTER fires for the case, since anyone is waiting on it,
> > +             * ExecAsyncWaitForNode finally dispatches to the Hashjoin
> > which
> > +             * is now in the middle of thing its work.
> > +             */
> > +            if (!IsParent(planstate, node))
> > +                continue;
> >
> > I'm not entirely sure that I understand this comment, but I don't

Sorry for the read-resistant comment...

> > think it's going in the right direction.  Let's start with the example
> > in the second paragraph. If the hash join is async-unaware, then it
> > isn't possible for the hash join to be both running the outer side of
> > the join asynchronously and at the same time waiting on the inner
> > side.  Once it tries to pull the first tuple from the outer side, it's
> > waiting for that to finish and can't do anything else.  So, the inner
> > side can't possibly get touched in any way until the outer side
> > finishes.  For anything else to happen, the hash join would have to be
> > async-aware.  Even if we did that, I don't think it would be right to
> > kick off both sides of the hash join at the same time.  Right now, if
> > the outer side turns out to be empty, we never need to build the hash
> > table, and that's good.
> >
> 
> I feel the !IsParent() condition is actually to prevent the infinite wait
> caused by a re-entrant issue in ExecAsuncWaitForNode() that Kyotaro
> mentioned
> earlier. But yes, the comments don't explain exactly how the hash join can
> cause the re-entrant issue.
> 
> But I attempted to come up with some testcase which might reproduce the
> infinite-waiting in ExecAsyncWaitForNode() after removing the !IsParent()
> check
> so that the other subtree nodes are also included, but I couldn't reproduce.
> Kyotaro, is it possible for you to give a testcase that consistently hangs
> if
> we revert back the !IsParent() check ?

I dragged out from my memory that it happened during the
regression test of postgres_fdw, and it still reproducible in the
same manner.

postgres_fdw> make check
...
============== running regression test queries        ==============
test postgres_fdw             ... FAILED (test process exited with exit code 2)
...


And in server log,

== contrib/postgres_fdw/log/postmaster.log
TRAP: FailedAssertion("!(hashtable == ((void *)0))", File: "nodeHashjoin.c", Line: 123)
LOG:  could not receive data from client: Connection reset by peer
LOG:  unexpected EOF on client connection with an open transaction
LOG:  server process (PID 9130) was terminated by signal 6: Aborted
DETAIL:  Failed process was running: SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER
BYc1;
 

nodeHashjoin.c:116:
>     switch (node->hj_JoinState)
>     {
>       case HJ_BUILD_HASHTABLE:
> 
>         /*
>          * First time through: build hash table for inner relation.
>          */
>         Assert(hashtable == NULL);

This is the reentrance of ExecHashJoin.

Instead, after doing installcheck, then connecting to the
database "contrib_regression" after failure, we can see what plan
has been tried.

contrib_regression=# explain SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER BY c1;
                                QUERY PLAN                                      
 
--------------------------------------------------------------------------------
-------Sort  (cost=275.96..277.21 rows=500 width=47)  Sort Key: t1.c1  ->  Hash Join  (cost=208.78..253.54 rows=500
width=47)       Hash Cond: (t1.c3 = t2.c3)        ->  Foreign Scan on ft1 t1  (cost=100.00..141.00 rows=1000 width=47)
     ->  Hash  (cost=108.77..108.77 rows=1 width=6)              ->  HashAggregate  (cost=108.76..108.77 rows=1
width=6)                   Group Key: t2.c3                    ->  Foreign Scan on ft2 t2  (cost=100.28..108.73 rows=12
wi
dth=6)
(9 rows)



> I was also thinking about another possibility where the same plan state
> node is
> re-entered, as explained below.
> 
> >
> > I don't think it's a good idea to wait for only nodes that are in the
> > current subtree.  For example, consider a plan like this:
> >
> > Append
> > -> Foreign Scan on a
> > -> Hash Join
> >   -> Foreign Scan on b
> >   -> Hash
> >     -> Seq Scan on x
> >
> > Suppose Append and Foreign Scan are parallel-aware but the other nodes
> > are not.  Append kicks off the Foreign Scan on a and then waits for
> > the hash join to produce a tuple; the hash join kicks off the Foreign
> > Scan on b and waits for it to return a tuple.  If, while we're waiting
> > for the foreign scan on b, the foreign scan on a needs some attention
> > - either to produce tuples, or maybe just to call PQconsumeInput() so
> > that more data can be sent from the other side, I think we need to be
> > able to do that.  There's no real problem here; even if the Append
> > becomes result-ready before the hash join returns, that is fine.
> 
> 
> Yes I agree : we should be able to do this. Sine we have all the waiting
> events
> in a common estate, there's no harm if we start executing nodes of another
> sub-tree if we get an event from there.
> 
> But I am thinking about what would happen when this node from other sub-tree
> returns result_ready, and then it's parents are called, and then the result
> gets bubbled up upto the node which had already caused us to call
> ExecAsyncWaitForNode() in the first place.
>
> For e.g., in the above plan which you specified, suppose :
> 1. Hash Join has called ExecProcNode() for the child foreign scan b, and so
> is
> waiting in ExecAsyncWaitForNode(foreign_scan_on_b).
> 2. The event wait list already has foreign scan on a that is on a different
> subtree.
> 3. This foreign scan a happens to be ready, so in
> ExecAsyncWaitForNode (), ExecDispatchNode(foreign_scan_a) is called,
> which returns with result_ready.
> 4. Since it returns result_ready, it's parent node is now inserted in the
> callbacks array, and so it's parent (Append) is executed.
> 5. But, this Append planstate is already in the middle of executing Hash
> join, and is waiting for HashJoin.

This should be what I wanted to explain by the encrypted commnet:(

> Is this safe to execute the same plan state when it is already inside its
> execution ? In other words, is the plan state re-entrant ? I suspect, the
> new
> execution may even corrupt the structures with which it was already
> executing.

It should be safe for most cases, but HashJoin and some other
nodes have inner state other than descendant nodes. Such nodes
cannot be reentered.

> In usual cases, a tree can contain multiple plan state nodes belonging to
> the
> same plan node, but in this case, we are having the same plan state node
> being
> executed again while it is already executing.
> 
> I suspect this can be one reason why Kyotaro might be getting infinite
> recursion issues. May be we need to prevent a plan state node to re-enter,
> but allow nodes from any subtree to execute. So propagate the result upwards
> until we get a node which is already executing.

Sorry for no response, but, the answer is yes. We could be able
to avoid the problem by managing execution state for every
node. But it needs an additional flag in *State structs and
manipulating on the way shuttling up and down around the
execution tree.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Michael Paquier
Date:
On Thu, Sep 29, 2016 at 5:50 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> Sorry for no response, but, the answer is yes. We could be able
> to avoid the problem by managing execution state for every
> node. But it needs an additional flag in *State structs and
> manipulating on the way shuttling up and down around the
> execution tree.

Moved to next CF.
-- 
Michael



Re: asynchronous and vectorized execution

From
Kyotaro HORIGUCHI
Date:
At Mon, 3 Oct 2016 13:14:23 +0900, Michael Paquier <michael.paquier@gmail.com> wrote in
<CAB7nPqSf8dBndoKT5DeR6FpzDUSuXN_g7uWNPQuN_A_sEwB-uw@mail.gmail.com>
> On Thu, Sep 29, 2016 at 5:50 PM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > Sorry for no response, but, the answer is yes. We could be able
> > to avoid the problem by managing execution state for every
> > node. But it needs an additional flag in *State structs and
> > manipulating on the way shuttling up and down around the
> > execution tree.
> 
> Moved to next CF.

Thank you.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center





Re: asynchronous and vectorized execution

From
Haribabu Kommi
Date:


On Mon, Oct 3, 2016 at 3:25 PM, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
At Mon, 3 Oct 2016 13:14:23 +0900, Michael Paquier <michael.paquier@gmail.com> wrote in <CAB7nPqSf8dBndoKT5DeR6FpzDUSuXN_g7uWNPQuN_A_sEwB-uw@mail.gmail.com>
> On Thu, Sep 29, 2016 at 5:50 PM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > Sorry for no response, but, the answer is yes. We could be able
> > to avoid the problem by managing execution state for every
> > node. But it needs an additional flag in *State structs and
> > manipulating on the way shuttling up and down around the
> > execution tree.
>
> Moved to next CF.

Thank you.


Closed in 2016-11 commitfest with "returned with feedback" status.
This is as per my understanding of the recent mails on the thread.
Please feel free to update the status if the current status doesn't
reflect the exact status of the patch.

Regards,
Hari Babu
Fujitsu Australia