Re: [HACKERS] asynchronous execution - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: [HACKERS] asynchronous execution
Date
Msg-id 20180511.174520.188681124.horiguchi.kyotaro@lab.ntt.co.jp
Whole thread Raw
In response to Re: [HACKERS] asynchronous execution  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Responses Re: [HACKERS] asynchronous execution  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
List pgsql-hackers
Hello. This is the new version of $Subject.

But, this is not just a rebased version. On the way fixing
serious conflicts, I refactored patch and I believe this becomes
way readable than the previous shape.

# 0003 lacks changes of postgres_fdw.out now.

- Waiting queue manipulation is moved into new functions. It had
  a bug that the same node can be inserted in the queue more than
  once and it is fixed.

- postgresIterateForeginScan had somewhat a tricky strcuture to
  merge similar procedures thus it cannot be said easy-to-read at
  all. Now it is far simpler and straight-forward looking.

- Still this works only on Append/ForeignScan.

> > The attached PoC patch theoretically has no impact on the normal
> > code paths and just brings gain in async cases.

I performed almost the same test as before but with:

- partition tables
  (There should be no difference with inheritance.)

- added test for fetch_size of 200 and 1000 as long as 100.

  100 unreasonably magnifies the lag by context switching on
  single poor box and the test D/F below. (They became faster by
  about twice by adding a small delay (1000 times of
  clock_gettime()(*1)) just before epoll_wait so that it doesn't
  sleep (I suppose)...)

- Table size of test B is one tenth of the previous size, the
  same to one partition.

*1: The reason for the function is that first I found that the
    queries get way faster by just prefixing by "explain analyze"..

>                            patched(ms)  unpatched(ms)   gain(%)
> A: simple table scan     :  3562.32      3444.81        -3.4
> B: local partitioning    :  1451.25      1604.38         9.5
> C: single remote table   :  8818.92      9297.76         5.1
> D: sharding (single con) :  5966.14      6646.73        10.2
> E: sharding (multi con)  :  1802.25      6515.49        72.3

fetch_size = 100
                            patched(ms)  unpatched(ms)   gain(%)
A: simple table scan     :   3033.48       2997.44        -1.2
B: local partitioning    :   1405.52       1426.66         1.5
C: single remote table   :   8335.50       8463.22         1.5
D: sharding (single con) :   6862.92       6820.97        -0.6
E: sharding (multi con)  :   2185.84       6733.63        67.5
F: partition (single con):   6818.13       6741.01        -1.1
G: partition (multi con) :   2150.58       6407.46        66.4

fetch_size = 200
                            patched(ms)  unpatched(ms)   gain(%)
A: simple table scan     :   
B: local partitioning    :   
C: single remote table   :   
D: sharding (single con) :   
E: sharding (multi con)  :   
F: partition (single con):   
G: partition (multi con) :   

fetch_size = 1000
                            patched(ms)  unpatched(ms)   gain(%)
A: simple table scan     :   3050.31       2980.29        -2.3
B: local partitioning    :   1401.34       1419.54         1.3
C: single remote table   :   8375.4        8445.27         0.8
D: sharding (single con) :   3935.97       4737.84        16.9
E: sharding (multi con)  :   1330.44       4752.87        72.0
F: partition (single con):   3997.63       4747.44        15.8
G: partition (multi con) :   1323.02       4807.72        72.5

Async append doesn't affect non-async path at all so B is
expected to get no degradation. It seems within error.

C and F are the gain when all foreign tables share one connection
and D and G are the gain when every foreign tables has dedicate
connection.

I will repost after filling the blank portion of the tables and
complete regression of the patch next week. Sorry for the
incomplete post.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 7ad4210dd20b6672367255492e2b1d95cd90b122 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH 1/3] Allow wait event set to be registered to resource owner

WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
 src/backend/libpq/pqcomm.c                    |  2 +-
 src/backend/storage/ipc/latch.c               | 18 ++++++-
 src/backend/storage/lmgr/condition_variable.c |  2 +-
 src/backend/utils/resowner/resowner.c         | 67 +++++++++++++++++++++++++++
 src/include/storage/latch.h                   |  4 +-
 src/include/utils/resowner_private.h          |  8 ++++
 6 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4deeb..890972b9b8 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -220,7 +220,7 @@ pq_init(void)
                 (errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-    FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+    FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
     AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
                       NULL, NULL);
     AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7fb8..5457899f2d 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -51,6 +51,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/resowner_private.h"
 
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
@@ -77,6 +78,8 @@ struct WaitEventSet
     int            nevents;        /* number of registered events */
     int            nevents_space;    /* maximum number of events in this set */
 
+    ResourceOwner    resowner;    /* Resource owner */
+
     /*
      * Array, of nevents_space length, storing the definition of events this
      * set is waiting for.
@@ -359,7 +362,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
     int            ret = 0;
     int            rc;
     WaitEvent    event;
-    WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+    WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
 
     if (wakeEvents & WL_TIMEOUT)
         Assert(timeout >= 0);
@@ -517,12 +520,15 @@ ResetLatch(volatile Latch *latch)
  * WaitEventSetWait().
  */
 WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
 {
     WaitEventSet *set;
     char       *data;
     Size        sz = 0;
 
+    if (res)
+        ResourceOwnerEnlargeWESs(res);
+
     /*
      * Use MAXALIGN size/alignment to guarantee that later uses of memory are
      * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -591,6 +597,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
     StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
 #endif
 
+    /* Register this wait event set if requested */
+    set->resowner = res;
+    if (res)
+        ResourceOwnerRememberWES(set->resowner, set);
+
     return set;
 }
 
@@ -632,6 +643,9 @@ FreeWaitEventSet(WaitEventSet *set)
     }
 #endif
 
+    if (set->resowner != NULL)
+        ResourceOwnerForgetWES(set->resowner, set);
+
     pfree(set);
 }
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index ef1d5baf01..30edc8e83a 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -69,7 +69,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
     {
         WaitEventSet *new_event_set;
 
-        new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
+        new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2);
         AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
                           MyLatch, NULL);
         AddWaitEventToSet(new_event_set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index bce021e100..802b79a660 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -126,6 +126,7 @@ typedef struct ResourceOwnerData
     ResourceArray filearr;        /* open temporary files */
     ResourceArray dsmarr;        /* dynamic shmem segments */
     ResourceArray jitarr;        /* JIT contexts */
+    ResourceArray wesarr;        /* wait event sets */
 
     /* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
     int            nlocks;            /* number of owned locks */
@@ -171,6 +172,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
 static void PrintSnapshotLeakWarning(Snapshot snapshot);
 static void PrintFileLeakWarning(File file);
 static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
 
 
 /*****************************************************************************
@@ -440,6 +442,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
     ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
     ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
     ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
+    ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
 
     return owner;
 }
@@ -549,6 +552,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 
             jit_release_context(context);
         }
+
+        /* Ditto for wait event sets */
+        while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+        {
+            WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+            if (isCommit)
+                PrintWESLeakWarning(event);
+            FreeWaitEventSet(event);
+        }
     }
     else if (phase == RESOURCE_RELEASE_LOCKS)
     {
@@ -697,6 +710,7 @@ ResourceOwnerDelete(ResourceOwner owner)
     Assert(owner->filearr.nitems == 0);
     Assert(owner->dsmarr.nitems == 0);
     Assert(owner->jitarr.nitems == 0);
+    Assert(owner->wesarr.nitems == 0);
     Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
 
     /*
@@ -724,6 +738,7 @@ ResourceOwnerDelete(ResourceOwner owner)
     ResourceArrayFree(&(owner->filearr));
     ResourceArrayFree(&(owner->dsmarr));
     ResourceArrayFree(&(owner->jitarr));
+    ResourceArrayFree(&(owner->wesarr));
 
     pfree(owner);
 }
@@ -1301,3 +1316,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle)
         elog(ERROR, "JIT context %p is not owned by resource owner %s",
              DatumGetPointer(handle), owner->name);
 }
+
+/*
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+    ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+    ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+    /*
+     * XXXX: There's no property to show as an identier of a wait event set,
+     * use its pointer instead.
+     */
+    if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+        elog(ERROR, "wait event set %p is not owned by resource owner %s",
+             events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+    /*
+     * XXXX: There's no property to show as an identier of a wait event set,
+     * use its pointer instead.
+     */
+    elog(WARNING, "wait event set leak: %p still referenced",
+         events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48874..838845af01 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
 #define LATCH_H
 
 #include <signal.h>
+#include "utils/resowner.h"
 
 /*
  * Latch structure should be treated as opaque and only accessed through
@@ -162,7 +163,8 @@ extern void DisownLatch(volatile Latch *latch);
 extern void SetLatch(volatile Latch *latch);
 extern void ResetLatch(volatile Latch *latch);
 
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+                                        ResourceOwner res, int nevents);
 extern void FreeWaitEventSet(WaitEventSet *set);
 extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
                   Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index a6e8eb71ab..3c06e4c3f8 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
 
 #include "storage/dsm.h"
 #include "storage/fd.h"
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "utils/catcache.h"
 #include "utils/plancache.h"
@@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner,
 extern void ResourceOwnerForgetJIT(ResourceOwner owner,
                        Datum handle);
 
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+                         WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+                       WaitEventSet *);
+
 #endif                            /* RESOWNER_PRIVATE_H */
-- 
2.16.3

From 0b3b692e677f7fd19f618582412acf9d12231bb2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:23:51 +0900
Subject: [PATCH 2/3] core side modification

---
 contrib/postgres_fdw/expected/postgres_fdw.out | 100 +++++-----
 src/backend/commands/explain.c                 |  17 ++
 src/backend/executor/Makefile                  |   2 +-
 src/backend/executor/execAsync.c               | 145 ++++++++++++++
 src/backend/executor/nodeAppend.c              | 262 +++++++++++++++++++++----
 src/backend/executor/nodeForeignscan.c         |  22 ++-
 src/backend/nodes/copyfuncs.c                  |   2 +
 src/backend/nodes/outfuncs.c                   |   2 +
 src/backend/nodes/readfuncs.c                  |   2 +
 src/backend/optimizer/plan/createplan.c        |  68 ++++++-
 src/backend/postmaster/pgstat.c                |   3 +
 src/backend/utils/adt/ruleutils.c              |   8 +-
 src/include/executor/execAsync.h               |  23 +++
 src/include/executor/executor.h                |   1 +
 src/include/executor/nodeForeignscan.h         |   3 +
 src/include/foreign/fdwapi.h                   |  11 ++
 src/include/nodes/execnodes.h                  |  18 +-
 src/include/nodes/plannodes.h                  |   7 +
 src/include/pgstat.h                           |   3 +-
 19 files changed, 603 insertions(+), 96 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index bb6b1a8fdf..248aa73c0b 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6968,12 +6968,13 @@ select * from bar where f1 in (select f1 from foo) for update;
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                           ->  Foreign Scan on public.foo2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+(29 rows)
 
 select * from bar where f1 in (select f1 from foo) for update;
  f1 | f2 
@@ -7006,12 +7007,13 @@ select * from bar where f1 in (select f1 from foo) for share;
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                           ->  Foreign Scan on public.foo2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+(29 rows)
 
 select * from bar where f1 in (select f1 from foo) for share;
  f1 | f2 
@@ -7043,9 +7045,8 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                           ->  Foreign Scan on public.foo2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
    ->  Hash Join
@@ -7061,12 +7062,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                           ->  Foreign Scan on public.foo2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(39 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+(41 rows)
 
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
 select tableoid::regclass, * from bar order by 1,2;
@@ -7096,14 +7098,11 @@ where bar.f1 = ss.f1;
          Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
          Hash Cond: (foo.f1 = bar.f1)
          ->  Append
-               ->  Seq Scan on public.foo
-                     Output: ROW(foo.f1), foo.f1
-               ->  Foreign Scan on public.foo2
+               Async subplans: 2 
+               ->  Async Foreign Scan on public.foo2
                      Output: ROW(foo2.f1), foo2.f1
                      Remote SQL: SELECT f1 FROM public.loct1
-               ->  Seq Scan on public.foo foo_1
-                     Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
-               ->  Foreign Scan on public.foo2 foo2_1
+               ->  Async Foreign Scan on public.foo2 foo2_1
                      Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
                      Remote SQL: SELECT f1 FROM public.loct1
          ->  Hash
@@ -7123,17 +7122,18 @@ where bar.f1 = ss.f1;
                Output: (ROW(foo.f1)), foo.f1
                Sort Key: foo.f1
                ->  Append
-                     ->  Seq Scan on public.foo
-                           Output: ROW(foo.f1), foo.f1
-                     ->  Foreign Scan on public.foo2
+                     Async subplans: 2 
+                     ->  Async Foreign Scan on public.foo2
                            Output: ROW(foo2.f1), foo2.f1
                            Remote SQL: SELECT f1 FROM public.loct1
-                     ->  Seq Scan on public.foo foo_1
-                           Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
-                     ->  Foreign Scan on public.foo2 foo2_1
+                     ->  Async Foreign Scan on public.foo2 foo2_1
                            Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
                            Remote SQL: SELECT f1 FROM public.loct1
-(45 rows)
+                     ->  Seq Scan on public.foo
+                           Output: ROW(foo.f1), foo.f1
+                     ->  Seq Scan on public.foo foo_1
+                           Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
+(47 rows)
 
 update bar set f2 = f2 + 100
 from
@@ -8155,11 +8155,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
  Sort
    Sort Key: t1.a, t3.c
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2 
+         ->  Async Foreign Scan
                Relations: ((public.ftprt1_p1 t1) INNER JOIN (public.ftprt2_p1 t2)) INNER JOIN (public.ftprt1_p1 t3)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: ((public.ftprt1_p2 t1) INNER JOIN (public.ftprt2_p2 t2)) INNER JOIN (public.ftprt1_p2 t3)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE
t1.a% 25 =0 ORDER BY 1,2,3; 
   a  |  b  |  c   
@@ -8178,9 +8179,10 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
  Sort
    Sort Key: t1.a, ftprt2_p1.b, ftprt2_p1.c
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 1 
+         ->  Async Foreign Scan
                Relations: (public.ftprt1_p1 t1) LEFT JOIN (public.ftprt2_p1 fprt2)
-(5 rows)
+(6 rows)
 
 SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10) t2 ON (t1.a = t2.b and t1.b = t2.a)
WHEREt1.a < 10 ORDER BY 1,2,3;
 
  a | b |  c   
@@ -8200,11 +8202,12 @@ SELECT t1,t2 FROM fprt1 t1 JOIN fprt2 t2 ON (t1.a = t2.b and t1.b = t2.a) WHERE
  Sort
    Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2 
+         ->  Async Foreign Scan
                Relations: (public.ftprt1_p1 t1) INNER JOIN (public.ftprt2_p1 t2)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: (public.ftprt1_p2 t1) INNER JOIN (public.ftprt2_p2 t2)
-(7 rows)
+(8 rows)
 
 SELECT t1,t2 FROM fprt1 t1 JOIN fprt2 t2 ON (t1.a = t2.b and t1.b = t2.a) WHERE t1.a % 25 =0 ORDER BY 1,2;
        t1       |       t2       
@@ -8223,11 +8226,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
  Sort
    Sort Key: t1.a, t1.b
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2 
+         ->  Async Foreign Scan
                Relations: (public.ftprt1_p1 t1) INNER JOIN (public.ftprt2_p1 t2)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: (public.ftprt1_p2 t1) INNER JOIN (public.ftprt2_p2 t2)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE
t1.a%25= 0 ORDER BY 1,2;
 
   a  |  b  
@@ -8309,10 +8313,11 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
          Group Key: fpagg_tab_p1.a
          Filter: (avg(fpagg_tab_p1.b) < '22'::numeric)
          ->  Append
-               ->  Foreign Scan on fpagg_tab_p1
-               ->  Foreign Scan on fpagg_tab_p2
-               ->  Foreign Scan on fpagg_tab_p3
-(9 rows)
+               Async subplans: 3 
+               ->  Async Foreign Scan on fpagg_tab_p1
+               ->  Async Foreign Scan on fpagg_tab_p2
+               ->  Async Foreign Scan on fpagg_tab_p3
+(10 rows)
 
 -- Plan with partitionwise aggregates is enabled
 SET enable_partitionwise_aggregate TO true;
@@ -8323,13 +8328,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
  Sort
    Sort Key: fpagg_tab_p1.a
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 3 
+         ->  Async Foreign Scan
                Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab)
-(9 rows)
+(10 rows)
 
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
  a  | sum  | min | count 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 73d94b7235..09c5327cb4 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -83,6 +83,7 @@ static void show_sort_keys(SortState *sortstate, List *ancestors,
                ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
                        ExplainState *es);
+static void show_append_info(AppendState *astate, ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
               ExplainState *es);
 static void show_grouping_sets(PlanState *planstate, Agg *agg,
@@ -1168,6 +1169,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
         }
         if (plan->parallel_aware)
             appendStringInfoString(es->str, "Parallel ");
+        if (plan->async_capable)
+            appendStringInfoString(es->str, "Async ");
         appendStringInfoString(es->str, pname);
         es->indent++;
     }
@@ -1690,6 +1693,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
         case T_Hash:
             show_hash_info(castNode(HashState, planstate), es);
             break;
+
+        case T_Append:
+            show_append_info(castNode(AppendState, planstate), es);
+            break;
+
         default:
             break;
     }
@@ -2027,6 +2035,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
                          ancestors, es);
 }
 
+static void
+show_append_info(AppendState *astate, ExplainState *es)
+{
+    Append *plan = (Append *) astate->ps.plan;
+
+    if (plan->nasyncplans > 0)
+        ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es);
+}
+
 /*
  * Show the grouping keys for an Agg node.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index cc09895fa5..8ad2adfe1c 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/executor
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
+OBJS = execAmi.o execAsync.o execCurrent.o execExpr.o execExprInterp.o \
        execGrouping.o execIndexing.o execJunk.o \
        execMain.o execParallel.o execPartition.o execProcnode.o \
        execReplication.o execScan.o execSRF.o execTuples.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..db477e2cf6
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,145 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *      Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *      src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+void ExecAsyncSetState(PlanState *pstate, AsyncState status)
+{
+    pstate->asyncstate = status;
+}
+
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+                       void *data, bool reinit)
+{
+    switch (nodeTag(node))
+    {
+    case T_ForeignScanState:
+        return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+                                             wes, data, reinit);
+        break;
+    default:
+            elog(ERROR, "unrecognized node type: %d",
+                (int) nodeTag(node));
+    }
+}
+
+/*
+ * struct for memory context callback argument used in ExecAsyncEventWait
+ */
+typedef struct {
+    int **p_refind;
+    int *p_refindsize;
+} ExecAsync_mcbarg;
+
+/*
+ * callback function to reset static variables pointing to the memory in
+ * TopTransactionContext in ExecAsyncEventWait.
+ */
+static void ExecAsyncMemoryContextCallback(void *arg)
+{
+    /* arg is the address of the variable refind in ExecAsyncEventWait */
+    ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg;
+    *mcbarg->p_refind = NULL;
+    *mcbarg->p_refindsize = 0;
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+    static int *refind = NULL;
+    static int refindsize = 0;
+    WaitEventSet *wes;
+    WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+    int noccurred = 0;
+    Bitmapset *fired_events = NULL;
+    int i;
+    int n;
+
+    n = bms_num_members(waitnodes);
+    wes = CreateWaitEventSet(TopTransactionContext,
+                             TopTransactionResourceOwner, n);
+    if (refindsize < n)
+    {
+        if (refindsize == 0)
+            refindsize = EVENT_BUFFER_SIZE; /* XXX */
+        while (refindsize < n)
+            refindsize *= 2;
+        if (refind)
+            refind = (int *) repalloc(refind, refindsize * sizeof(int));
+        else
+        {
+            static ExecAsync_mcbarg mcb_arg =
+                { &refind, &refindsize };
+            static MemoryContextCallback mcb =
+                { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL };
+            MemoryContext oldctxt =
+                MemoryContextSwitchTo(TopTransactionContext);
+
+            /*
+             * refind points to a memory block in
+             * TopTransactionContext. Register a callback to reset it.
+             */
+            MemoryContextRegisterResetCallback(TopTransactionContext, &mcb);
+            refind = (int *) palloc(refindsize * sizeof(int));
+            MemoryContextSwitchTo(oldctxt);
+        }
+    }
+
+    n = 0;
+    for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+         i = bms_next_member(waitnodes, i))
+    {
+        refind[i] = i;
+        if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+            n++;
+    }
+
+    if (n == 0)
+    {
+        FreeWaitEventSet(wes);
+        return NULL;
+    }
+
+    noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+                                 EVENT_BUFFER_SIZE,
+                                 WAIT_EVENT_ASYNC_WAIT);
+    FreeWaitEventSet(wes);
+    if (noccurred == 0)
+        return NULL;
+
+    for (i = 0 ; i < noccurred ; i++)
+    {
+        WaitEvent *w = &occurred_event[i];
+
+        if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+        {
+            int n = *(int*)w->user_data;
+
+            fired_events = bms_add_member(fired_events, n);
+        }
+    }
+
+    return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 6bc3e470bf..ed8612dd37 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,6 +60,7 @@
 #include "executor/execdebug.h"
 #include "executor/execPartition.h"
 #include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
 #include "miscadmin.h"
 
 /* Shared state for parallel-aware Append. */
@@ -81,6 +82,7 @@ struct ParallelAppendState
 #define NO_MATCHING_SUBPLANS        -2
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
 static bool choose_next_subplan_locally(AppendState *node);
 static bool choose_next_subplan_for_leader(AppendState *node);
 static bool choose_next_subplan_for_worker(AppendState *node);
@@ -104,13 +106,14 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
     PlanState **appendplanstates;
     Bitmapset  *validsubplans;
     int            nplans;
+    int            nasyncplans;
     int            firstvalid;
     int            i,
                 j;
     ListCell   *lc;
 
     /* check for unsupported flags */
-    Assert(!(eflags & EXEC_FLAG_MARK));
+    Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
 
     /*
      * Lock the non-leaf tables in the partition tree controlled by this node.
@@ -123,10 +126,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
      */
     appendstate->ps.plan = (Plan *) node;
     appendstate->ps.state = estate;
-    appendstate->ps.ExecProcNode = ExecAppend;
+
+    /* choose appropriate version of Exec function */
+    if (node->nasyncplans == 0)
+        appendstate->ps.ExecProcNode = ExecAppend;
+    else
+        appendstate->ps.ExecProcNode = ExecAppendAsync;
 
     /* Let choose_next_subplan_* function handle setting the first subplan */
-    appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+    appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 
     /* If run-time partition pruning is enabled, then set that up now */
     if (node->part_prune_infos != NIL)
@@ -159,7 +167,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
              */
             if (bms_is_empty(validsubplans))
             {
-                appendstate->as_whichplan = NO_MATCHING_SUBPLANS;
+                appendstate->as_whichsyncplan = NO_MATCHING_SUBPLANS;
 
                 /* Mark the first as valid so that it's initialized below */
                 validsubplans = bms_make_singleton(0);
@@ -213,11 +221,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
      */
     j = i = 0;
     firstvalid = nplans;
+    nasyncplans = 0;
     foreach(lc, node->appendplans)
     {
         if (bms_is_member(i, validsubplans))
         {
             Plan       *initNode = (Plan *) lfirst(lc);
+            int            sub_eflags = eflags;
+
+            /* Let async-capable subplans run asynchronously */
+            if (i < node->nasyncplans)
+            {
+                sub_eflags |= EXEC_FLAG_ASYNC;
+                nasyncplans++;
+            }
 
             /*
              * Record the lowest appendplans index which is a valid partial
@@ -226,7 +243,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
             if (i >= node->first_partial_plan && j < firstvalid)
                 firstvalid = j;
 
-            appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+            appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags);
         }
         i++;
     }
@@ -235,6 +252,21 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
     appendstate->appendplans = appendplanstates;
     appendstate->as_nplans = nplans;
 
+    /* fill in async stuff */
+    appendstate->as_nasyncplans = nasyncplans;
+    appendstate->as_syncdone = (nasyncplans == nplans);
+
+    if (appendstate->as_nasyncplans)
+    {
+        appendstate->as_asyncresult = (TupleTableSlot **)
+            palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+        /* initially, all async requests need a request */
+        for (i = 0; i < appendstate->as_nasyncplans; ++i)
+            appendstate->as_needrequest =
+                bms_add_member(appendstate->as_needrequest, i);
+    }
+
     /*
      * Miscellaneous initialization
      */
@@ -258,21 +290,23 @@ ExecAppend(PlanState *pstate)
 {
     AppendState *node = castNode(AppendState, pstate);
 
-    if (node->as_whichplan < 0)
+    if (node->as_whichsyncplan < 0)
     {
         /*
          * If no subplan has been chosen, we must choose one before
          * proceeding.
          */
-        if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+        if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
             !node->choose_next_subplan(node))
             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
         /* Nothing to do if there are no matching subplans */
-        else if (node->as_whichplan == NO_MATCHING_SUBPLANS)
+        else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     }
 
+    Assert(node->as_nasyncplans == 0);
+
     for (;;)
     {
         PlanState  *subnode;
@@ -283,8 +317,9 @@ ExecAppend(PlanState *pstate)
         /*
          * figure out which subplan we are currently processing
          */
-        Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
-        subnode = node->appendplans[node->as_whichplan];
+        Assert(node->as_whichsyncplan >= 0 &&
+               node->as_whichsyncplan < node->as_nplans);
+        subnode = node->appendplans[node->as_whichsyncplan];
 
         /*
          * get a tuple from the subplan
@@ -307,6 +342,156 @@ ExecAppend(PlanState *pstate)
     }
 }
 
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+    AppendState *node = castNode(AppendState, pstate);
+    Bitmapset *needrequest;
+    int    i;
+
+    Assert(node->as_nasyncplans > 0);
+
+    if (node->as_nasyncresult > 0)
+    {
+        --node->as_nasyncresult;
+        return node->as_asyncresult[node->as_nasyncresult];
+    }
+
+    needrequest = node->as_needrequest;
+    node->as_needrequest = NULL;
+    while ((i = bms_first_member(needrequest)) >= 0)
+    {
+        TupleTableSlot *slot;
+        PlanState *subnode = node->appendplans[i];
+
+        slot = ExecProcNode(subnode);
+        if (subnode->asyncstate == AS_AVAILABLE)
+        {
+            if (!TupIsNull(slot))
+            {
+                node->as_asyncresult[node->as_nasyncresult++] = slot;
+                node->as_needrequest = bms_add_member(node->as_needrequest, i);
+            }
+        }
+        else
+            node->as_pending_async = bms_add_member(node->as_pending_async, i);
+    }
+    bms_free(needrequest);
+
+    for (;;)
+    {
+        TupleTableSlot *result;
+
+        /* return now if a result is available */
+        if (node->as_nasyncresult > 0)
+        {
+            --node->as_nasyncresult;
+            return node->as_asyncresult[node->as_nasyncresult];
+        }
+
+        while (!bms_is_empty(node->as_pending_async))
+        {
+            long timeout = node->as_syncdone ? -1 : 0;
+            Bitmapset *fired;
+            int i;
+
+            fired = ExecAsyncEventWait(node->appendplans,
+                                       node->as_pending_async,
+                                       timeout);
+            Assert(!node->as_syncdone || !bms_is_empty(fired));
+
+            while ((i = bms_first_member(fired)) >= 0)
+            {
+                TupleTableSlot *slot;
+                PlanState *subnode = node->appendplans[i];
+                slot = ExecProcNode(subnode);
+                if (subnode->asyncstate == AS_AVAILABLE)
+                {
+                    if (!TupIsNull(slot))
+                    {
+                        node->as_asyncresult[node->as_nasyncresult++] = slot;
+                        node->as_needrequest =
+                            bms_add_member(node->as_needrequest, i);
+                    }
+                    node->as_pending_async =
+                        bms_del_member(node->as_pending_async, i);
+                }
+            }
+            bms_free(fired);
+
+            /* return now if a result is available */
+            if (node->as_nasyncresult > 0)
+            {
+                --node->as_nasyncresult;
+                return node->as_asyncresult[node->as_nasyncresult];
+            }
+
+            if (!node->as_syncdone)
+                break;
+        }
+
+        /*
+         * If there is no asynchronous activity still pending and the
+         * synchronous activity is also complete, we're totally done scanning
+         * this node.  Otherwise, we're done with the asynchronous stuff but
+         * must continue scanning the synchronous children.
+         */
+        if (node->as_syncdone)
+        {
+            Assert(bms_is_empty(node->as_pending_async));
+            return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+        }
+
+        /*
+         * get a tuple from the subplan
+         */
+        
+        if (node->as_whichsyncplan < 0)
+        {
+            /*
+             * If no subplan has been chosen, we must choose one before
+             * proceeding.
+             */
+            if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
+                !node->choose_next_subplan(node))
+                return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
+            /* Nothing to do if there are no matching subplans */
+            else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
+                return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+        }
+
+        result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+        if (!TupIsNull(result))
+        {
+            /*
+             * If the subplan gave us something then return it as-is. We do
+             * NOT make use of the result slot that was set up in
+             * ExecInitAppend; there's no need for it.
+             */
+            return result;
+        }
+
+        /*
+         * Go on to the "next" subplan in the appropriate direction. If no
+         * more subplans, return the empty slot set up for us by
+         * ExecInitAppend, unless there are async plans we have yet to finish.
+         */
+        if (!node->choose_next_subplan(node))
+        {
+            node->as_syncdone = true;
+            if (bms_is_empty(node->as_pending_async))
+            {
+                Assert(bms_is_empty(node->as_needrequest));
+                return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+            }
+        }
+
+        /* Else loop back and try to get a tuple from the new subplan */
+    }
+}
+
 /* ----------------------------------------------------------------
  *        ExecEndAppend
  *
@@ -353,6 +538,15 @@ ExecReScanAppend(AppendState *node)
         node->as_valid_subplans = NULL;
     }
 
+    /* Reset async state. */
+    for (i = 0; i < node->as_nasyncplans; ++i)
+    {
+        ExecShutdownNode(node->appendplans[i]);
+        node->as_needrequest = bms_add_member(node->as_needrequest, i);
+    }
+    node->as_nasyncresult = 0;
+    node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
     for (i = 0; i < node->as_nplans; i++)
     {
         PlanState  *subnode = node->appendplans[i];
@@ -373,7 +567,7 @@ ExecReScanAppend(AppendState *node)
     }
 
     /* Let choose_next_subplan_* function handle setting the first subplan */
-    node->as_whichplan = INVALID_SUBPLAN_INDEX;
+    node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 }
 
 /* ----------------------------------------------------------------
@@ -461,7 +655,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 static bool
 choose_next_subplan_locally(AppendState *node)
 {
-    int            whichplan = node->as_whichplan;
+    int            whichplan = node->as_whichsyncplan;
     int            nextplan;
 
     /* We should never be called when there are no subplans */
@@ -494,7 +688,7 @@ choose_next_subplan_locally(AppendState *node)
     if (nextplan < 0)
         return false;
 
-    node->as_whichplan = nextplan;
+    node->as_whichsyncplan = nextplan;
 
     return true;
 }
@@ -516,19 +710,19 @@ choose_next_subplan_for_leader(AppendState *node)
     Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
     /* We should never be called when there are no subplans */
-    Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+    Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
     LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
-    if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+    if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
     {
         /* Mark just-completed subplan as finished. */
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
     }
     else
     {
         /* Start with last subplan. */
-        node->as_whichplan = node->as_nplans - 1;
+        node->as_whichsyncplan = node->as_nplans - 1;
 
         /*
          * If we've yet to determine the valid subplans for these parameters
@@ -549,12 +743,12 @@ choose_next_subplan_for_leader(AppendState *node)
     }
 
     /* Loop until we find a subplan to execute. */
-    while (pstate->pa_finished[node->as_whichplan])
+    while (pstate->pa_finished[node->as_whichsyncplan])
     {
-        if (node->as_whichplan == 0)
+        if (node->as_whichsyncplan == 0)
         {
             pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
-            node->as_whichplan = INVALID_SUBPLAN_INDEX;
+            node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
             LWLockRelease(&pstate->pa_lock);
             return false;
         }
@@ -563,12 +757,12 @@ choose_next_subplan_for_leader(AppendState *node)
          * We needn't pay attention to as_valid_subplans here as all invalid
          * plans have been marked as finished.
          */
-        node->as_whichplan--;
+        node->as_whichsyncplan--;
     }
 
     /* If non-partial, immediately mark as finished. */
-    if (node->as_whichplan < node->as_first_partial_plan)
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+    if (node->as_whichsyncplan < node->as_first_partial_plan)
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
     LWLockRelease(&pstate->pa_lock);
 
@@ -597,13 +791,13 @@ choose_next_subplan_for_worker(AppendState *node)
     Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
     /* We should never be called when there are no subplans */
-    Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+    Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
     LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
     /* Mark just-completed subplan as finished. */
-    if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+    if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
     /*
      * If we've yet to determine the valid subplans for these parameters then
@@ -625,7 +819,7 @@ choose_next_subplan_for_worker(AppendState *node)
     }
 
     /* Save the plan from which we are starting the search. */
-    node->as_whichplan = pstate->pa_next_plan;
+    node->as_whichsyncplan = pstate->pa_next_plan;
 
     /* Loop until we find a valid subplan to execute. */
     while (pstate->pa_finished[pstate->pa_next_plan])
@@ -639,7 +833,7 @@ choose_next_subplan_for_worker(AppendState *node)
             /* Advance to the next valid plan. */
             pstate->pa_next_plan = nextplan;
         }
-        else if (node->as_whichplan > node->as_first_partial_plan)
+        else if (node->as_whichsyncplan > node->as_first_partial_plan)
         {
             /*
              * Try looping back to the first valid partial plan, if there is
@@ -648,7 +842,7 @@ choose_next_subplan_for_worker(AppendState *node)
             nextplan = bms_next_member(node->as_valid_subplans,
                                        node->as_first_partial_plan - 1);
             pstate->pa_next_plan =
-                nextplan < 0 ? node->as_whichplan : nextplan;
+                nextplan < 0 ? node->as_whichsyncplan : nextplan;
         }
         else
         {
@@ -656,10 +850,10 @@ choose_next_subplan_for_worker(AppendState *node)
              * At last plan, and either there are no partial plans or we've
              * tried them all.  Arrange to bail out.
              */
-            pstate->pa_next_plan = node->as_whichplan;
+            pstate->pa_next_plan = node->as_whichsyncplan;
         }
 
-        if (pstate->pa_next_plan == node->as_whichplan)
+        if (pstate->pa_next_plan == node->as_whichsyncplan)
         {
             /* We've tried everything! */
             pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -669,7 +863,7 @@ choose_next_subplan_for_worker(AppendState *node)
     }
 
     /* Pick the plan we found, and advance pa_next_plan one more time. */
-    node->as_whichplan = pstate->pa_next_plan;
+    node->as_whichsyncplan = pstate->pa_next_plan;
     pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
                                            pstate->pa_next_plan);
 
@@ -696,8 +890,8 @@ choose_next_subplan_for_worker(AppendState *node)
     }
 
     /* If non-partial, immediately mark as finished. */
-    if (node->as_whichplan < node->as_first_partial_plan)
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+    if (node->as_whichsyncplan < node->as_first_partial_plan)
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
     LWLockRelease(&pstate->pa_lock);
 
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index a2a28b7ec2..915deb7080 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -123,7 +123,6 @@ ExecForeignScan(PlanState *pstate)
                     (ExecScanRecheckMtd) ForeignRecheck);
 }
 
-
 /* ----------------------------------------------------------------
  *        ExecInitForeignScan
  * ----------------------------------------------------------------
@@ -147,6 +146,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
     scanstate->ss.ps.plan = (Plan *) node;
     scanstate->ss.ps.state = estate;
     scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+    scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+    if ((eflags & EXEC_FLAG_ASYNC) != 0)
+        scanstate->fs_async = true;
 
     /*
      * Miscellaneous initialization
@@ -387,3 +390,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
     if (fdwroutine->ShutdownForeignScan)
         fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ *        ExecAsyncForeignScanConfigureWait
+ *
+ *        In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+                              void *caller_data, bool reinit)
+{
+    FdwRoutine *fdwroutine = node->fdwroutine;
+
+    Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+    return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+                                                 caller_data, reinit);
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 7c045a7afe..8304dd5b17 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -246,6 +246,8 @@ _copyAppend(const Append *from)
     COPY_NODE_FIELD(appendplans);
     COPY_SCALAR_FIELD(first_partial_plan);
     COPY_NODE_FIELD(part_prune_infos);
+    COPY_SCALAR_FIELD(nasyncplans);
+    COPY_SCALAR_FIELD(referent);
 
     return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 1da9d7ed15..ed655f4ccb 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -403,6 +403,8 @@ _outAppend(StringInfo str, const Append *node)
     WRITE_NODE_FIELD(appendplans);
     WRITE_INT_FIELD(first_partial_plan);
     WRITE_NODE_FIELD(part_prune_infos);
+    WRITE_INT_FIELD(nasyncplans);
+    WRITE_INT_FIELD(referent);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 2826cec2f8..fb4ae251de 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1652,6 +1652,8 @@ _readAppend(void)
     READ_NODE_FIELD(appendplans);
     READ_INT_FIELD(first_partial_plan);
     READ_NODE_FIELD(part_prune_infos);
+    READ_INT_FIELD(nasyncplans);
+    READ_INT_FIELD(referent);
 
     READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 0317763f43..eda3420d02 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -211,7 +211,9 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
                    Index scanrelid, int wtParam);
 static Append *make_append(List *appendplans, int first_partial_plan,
-            List *tlist, List *partitioned_rels, List *partpruneinfos);
+                           int nasyncplans,    int referent,
+                           List *tlist,
+                           List *partitioned_rels, List *partpruneinfos);
 static RecursiveUnion *make_recursive_union(List *tlist,
                      Plan *lefttree,
                      Plan *righttree,
@@ -294,6 +296,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
                  List *rowMarks, OnConflictExpr *onconflict, int epqParam);
 static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
                          GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
 
 
 /*
@@ -1036,10 +1039,14 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 {
     Append       *plan;
     List       *tlist = build_path_tlist(root, &best_path->path);
-    List       *subplans = NIL;
+    List       *asyncplans = NIL;
+    List       *syncplans = NIL;
     ListCell   *subpaths;
     RelOptInfo *rel = best_path->path.parent;
     List       *partpruneinfos = NIL;
+    int            nasyncplans = 0;
+    bool        first = true;
+    bool        referent_is_sync = true;
 
     /*
      * The subpaths list could be empty, if every child was proven empty by
@@ -1074,7 +1081,22 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
         /* Must insist that all children return the same tlist */
         subplan = create_plan_recurse(root, subpath, CP_EXACT_TLIST);
 
-        subplans = lappend(subplans, subplan);
+        /*
+         * Classify as async-capable or not. If we have decided to run the
+         * chidlren in parallel, we cannot any one of them run asynchronously.
+         */
+        if (!best_path->path.parallel_safe && is_async_capable_path(subpath))
+        {
+            subplan->async_capable = true;
+            asyncplans = lappend(asyncplans, subplan);
+            ++nasyncplans;
+            if (first)
+                referent_is_sync = false;
+        }
+        else
+            syncplans = lappend(syncplans, subplan);
+
+        first = false;
     }
 
     if (enable_partition_pruning &&
@@ -1117,9 +1139,10 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
      * parent-rel Vars it'll be asked to emit.
      */
 
-    plan = make_append(subplans, best_path->first_partial_path,
-                       tlist, best_path->partitioned_rels,
-                       partpruneinfos);
+    plan = make_append(list_concat(asyncplans, syncplans),
+                       best_path->first_partial_path, nasyncplans,
+                       referent_is_sync ? nasyncplans : 0, tlist,
+                       best_path->partitioned_rels, partpruneinfos);
 
     copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -5414,9 +5437,9 @@ make_foreignscan(List *qptlist,
 }
 
 static Append *
-make_append(List *appendplans, int first_partial_plan,
-            List *tlist, List *partitioned_rels,
-            List *partpruneinfos)
+make_append(List *appendplans, int first_partial_plan, int nasyncplans,
+            int referent, List *tlist,
+            List *partitioned_rels, List *partpruneinfos)
 {
     Append       *node = makeNode(Append);
     Plan       *plan = &node->plan;
@@ -5429,6 +5452,9 @@ make_append(List *appendplans, int first_partial_plan,
     node->appendplans = appendplans;
     node->first_partial_plan = first_partial_plan;
     node->part_prune_infos = partpruneinfos;
+    node->nasyncplans = nasyncplans;
+    node->referent = referent;
+
     return node;
 }
 
@@ -6773,3 +6799,27 @@ is_projection_capable_plan(Plan *plan)
     }
     return true;
 }
+
+/*
+ * is_projection_capable_path
+ *        Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+    switch (nodeTag(path))
+    {
+        case T_ForeignPath:
+            {
+                FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+                Assert(fdwroutine != NULL);
+                if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+                    fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+                    return true;
+            }
+        default:
+            break;
+    }
+    return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 084573e77c..7aef97ca97 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3683,6 +3683,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
         case WAIT_EVENT_SYNC_REP:
             event_name = "SyncRep";
             break;
+        case WAIT_EVENT_ASYNC_WAIT:
+            event_name = "AsyncExecWait";
+            break;
             /* no default case, so that compiler will warn */
     }
 
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 065238b0fe..fe202cbfea 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4513,7 +4513,7 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
     dpns->planstate = ps;
 
     /*
-     * We special-case Append and MergeAppend to pretend that the first child
+     * We special-case Append and MergeAppend to pretend that a specific child
      * plan is the OUTER referent; we have to interpret OUTER Vars in their
      * tlists according to one of the children, and the first one is the most
      * natural choice.  Likewise special-case ModifyTable to pretend that the
@@ -4521,7 +4521,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
      * lists containing references to non-target relations.
      */
     if (IsA(ps, AppendState))
-        dpns->outer_planstate = ((AppendState *) ps)->appendplans[0];
+    {
+        AppendState *aps = (AppendState *) ps;
+        Append *app = (Append *) ps->plan;
+        dpns->outer_planstate = aps->appendplans[app->referent];
+    }
     else if (IsA(ps, MergeAppendState))
         dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0];
     else if (IsA(ps, ModifyTableState))
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..5fd67d9004
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ *        Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *        src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern void ExecAsyncSetState(PlanState *pstate, AsyncState status);
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+                                   void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+                                     long timeout);
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index a7ea3c7d10..8e9d87669f 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -63,6 +63,7 @@
 #define EXEC_FLAG_WITH_OIDS        0x0020    /* force OIDs in returned tuples */
 #define EXEC_FLAG_WITHOUT_OIDS    0x0040    /* force no OIDs in returned tuples */
 #define EXEC_FLAG_WITH_NO_DATA    0x0080    /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC            0x0100    /* request async execution */
 
 
 /* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index ccb66be733..67abf8e52e 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
                                 ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+                                          WaitEventSet *wes,
+                                          void *caller_data, bool reinit);
 
 #endif                            /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index c14eb546c6..c00e9621fb 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -168,6 +168,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
                                                             List *fdw_private,
                                                             RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+                                                    WaitEventSet *wes,
+                                                    void *caller_data,
+                                                    bool reinit);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -189,6 +194,7 @@ typedef struct FdwRoutine
     GetForeignPlan_function GetForeignPlan;
     BeginForeignScan_function BeginForeignScan;
     IterateForeignScan_function IterateForeignScan;
+    IterateForeignScan_function IterateForeignScanAsync;
     ReScanForeignScan_function ReScanForeignScan;
     EndForeignScan_function EndForeignScan;
 
@@ -241,6 +247,11 @@ typedef struct FdwRoutine
     InitializeDSMForeignScan_function InitializeDSMForeignScan;
     ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
     InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+    /* Support functions for asynchronous execution */
+    IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+    ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
     ShutdownForeignScan_function ShutdownForeignScan;
 
     /* Support functions for path reparameterization. */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index da7f52cab0..56bfe3f442 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -905,6 +905,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
  * abstract superclass for all PlanState-type nodes.
  * ----------------
  */
+typedef enum AsyncState
+{
+    AS_AVAILABLE,
+    AS_WAITING
+} AsyncState;
+
 typedef struct PlanState
 {
     NodeTag        type;
@@ -953,6 +959,9 @@ typedef struct PlanState
      * descriptor, without encoding knowledge about all executor nodes.
      */
     TupleDesc    scandesc;
+
+    AsyncState    asyncstate;
+    int32        padding;            /* to keep alignment of derived types */
 } PlanState;
 
 /* ----------------
@@ -1087,14 +1096,20 @@ struct AppendState
     PlanState    ps;                /* its first field is NodeTag */
     PlanState **appendplans;    /* array of PlanStates for my inputs */
     int            as_nplans;
-    int            as_whichplan;
+    int            as_whichsyncplan; /* which sync plan is being executed  */
     int            as_first_partial_plan;    /* Index of 'appendplans' containing
                                          * the first partial plan */
+    int            as_nasyncplans;    /* # of async-capable children */
     ParallelAppendState *as_pstate; /* parallel coordination info */
     Size        pstate_len;        /* size of parallel coordination info */
     struct PartitionPruneState *as_prune_state;
     Bitmapset  *as_valid_subplans;
     bool        (*choose_next_subplan) (AppendState *);
+    bool        as_syncdone;    /* all synchronous plans done? */
+    Bitmapset  *as_needrequest;    /* async plans needing a new request */
+    Bitmapset  *as_pending_async;    /* pending async plans */
+    TupleTableSlot **as_asyncresult;    /* unreturned results of async plans */
+    int            as_nasyncresult;    /* # of valid entries in as_asyncresult */
 };
 
 /* ----------------
@@ -1643,6 +1658,7 @@ typedef struct ForeignScanState
     Size        pscan_len;        /* size of parallel coordination information */
     /* use struct pointer to avoid including fdwapi.h here */
     struct FdwRoutine *fdwroutine;
+    bool        fs_async;
     void       *fdw_state;        /* foreign-data wrapper can keep state here */
 } ForeignScanState;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index f2dda82e66..8a64c037c9 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -139,6 +139,11 @@ typedef struct Plan
     bool        parallel_aware; /* engage parallel-aware logic? */
     bool        parallel_safe;    /* OK to use as part of parallel plan? */
 
+    /*
+     * information needed for asynchronous execution
+     */
+    bool        async_capable;  /* engage asyncronous execution logic? */
+
     /*
      * Common structural data for all Plan types.
      */
@@ -262,6 +267,8 @@ typedef struct Append
      * Mapping details for run-time subplan pruning, one per partitioned_rels
      */
     List       *part_prune_infos;
+    int            nasyncplans;    /* # of async plans, always at start of list */
+    int            referent;         /* index of inheritance tree referent */
 } Append;
 
 /* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index be2f59239b..6f4583b46c 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -832,7 +832,8 @@ typedef enum
     WAIT_EVENT_REPLICATION_ORIGIN_DROP,
     WAIT_EVENT_REPLICATION_SLOT_DROP,
     WAIT_EVENT_SAFE_SNAPSHOT,
-    WAIT_EVENT_SYNC_REP
+    WAIT_EVENT_SYNC_REP,
+    WAIT_EVENT_ASYNC_WAIT
 } WaitEventIPC;
 
 /* ----------
-- 
2.16.3

From 072f6af8a2b394402e753a65569d64668e2cfe86 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH 3/3] async postgres_fdw

---
 contrib/postgres_fdw/connection.c              |  26 ++
 contrib/postgres_fdw/expected/postgres_fdw.out | 100 ++--
 contrib/postgres_fdw/postgres_fdw.c            | 619 ++++++++++++++++++++++---
 contrib/postgres_fdw/postgres_fdw.h            |   2 +
 contrib/postgres_fdw/sql/postgres_fdw.sql      |  20 +-
 5 files changed, 628 insertions(+), 139 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index fe4893a8e0..da7c826e4f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
     bool        invalidated;    /* true if reconnect is pending */
     uint32        server_hashvalue;    /* hash value of foreign server OID */
     uint32        mapping_hashvalue;    /* hash value of user mapping OID */
+    void        *storage;        /* connection specific storage */
 } ConnCacheEntry;
 
 /*
@@ -202,6 +203,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 
         elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
              entry->conn, server->servername, user->umid, user->userid);
+        entry->storage = NULL;
     }
 
     /*
@@ -215,6 +217,30 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
     return entry->conn;
 }
 
+/*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+    bool        found;
+    ConnCacheEntry *entry;
+    ConnCacheKey key;
+
+    key = user->umid;
+    entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+    Assert(found);
+
+    if (entry->storage == NULL)
+    {
+        entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+        memset(entry->storage, 0, initsize);
+    }
+
+    return entry->storage;
+}
+
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 248aa73c0b..bb6b1a8fdf 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6968,13 +6968,12 @@ select * from bar where f1 in (select f1 from foo) for update;
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           Async subplans: 1 
-                           ->  Async Foreign Scan on public.foo2
-                                 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
-                                 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
                            ->  Seq Scan on public.foo
                                  Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-(29 rows)
+                           ->  Foreign Scan on public.foo2
+                                 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
+                                 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+(23 rows)
 
 select * from bar where f1 in (select f1 from foo) for update;
  f1 | f2 
@@ -7007,13 +7006,12 @@ select * from bar where f1 in (select f1 from foo) for share;
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           Async subplans: 1 
-                           ->  Async Foreign Scan on public.foo2
-                                 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
-                                 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
                            ->  Seq Scan on public.foo
                                  Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-(29 rows)
+                           ->  Foreign Scan on public.foo2
+                                 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
+                                 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+(23 rows)
 
 select * from bar where f1 in (select f1 from foo) for share;
  f1 | f2 
@@ -7045,8 +7043,9 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           Async subplans: 1 
-                           ->  Async Foreign Scan on public.foo2
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                           ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
    ->  Hash Join
@@ -7062,13 +7061,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           Async subplans: 1 
-                           ->  Async Foreign Scan on public.foo2
-                                 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
-                                 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
                            ->  Seq Scan on public.foo
                                  Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-(41 rows)
+                           ->  Foreign Scan on public.foo2
+                                 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
+                                 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+(39 rows)
 
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
 select tableoid::regclass, * from bar order by 1,2;
@@ -7098,11 +7096,14 @@ where bar.f1 = ss.f1;
          Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
          Hash Cond: (foo.f1 = bar.f1)
          ->  Append
-               Async subplans: 2 
-               ->  Async Foreign Scan on public.foo2
+               ->  Seq Scan on public.foo
+                     Output: ROW(foo.f1), foo.f1
+               ->  Foreign Scan on public.foo2
                      Output: ROW(foo2.f1), foo2.f1
                      Remote SQL: SELECT f1 FROM public.loct1
-               ->  Async Foreign Scan on public.foo2 foo2_1
+               ->  Seq Scan on public.foo foo_1
+                     Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
+               ->  Foreign Scan on public.foo2 foo2_1
                      Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
                      Remote SQL: SELECT f1 FROM public.loct1
          ->  Hash
@@ -7122,18 +7123,17 @@ where bar.f1 = ss.f1;
                Output: (ROW(foo.f1)), foo.f1
                Sort Key: foo.f1
                ->  Append
-                     Async subplans: 2 
-                     ->  Async Foreign Scan on public.foo2
-                           Output: ROW(foo2.f1), foo2.f1
-                           Remote SQL: SELECT f1 FROM public.loct1
-                     ->  Async Foreign Scan on public.foo2 foo2_1
-                           Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
-                           Remote SQL: SELECT f1 FROM public.loct1
                      ->  Seq Scan on public.foo
                            Output: ROW(foo.f1), foo.f1
+                     ->  Foreign Scan on public.foo2
+                           Output: ROW(foo2.f1), foo2.f1
+                           Remote SQL: SELECT f1 FROM public.loct1
                      ->  Seq Scan on public.foo foo_1
                            Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
-(47 rows)
+                     ->  Foreign Scan on public.foo2 foo2_1
+                           Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
+                           Remote SQL: SELECT f1 FROM public.loct1
+(45 rows)
 
 update bar set f2 = f2 + 100
 from
@@ -8155,12 +8155,11 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
  Sort
    Sort Key: t1.a, t3.c
    ->  Append
-         Async subplans: 2 
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: ((public.ftprt1_p1 t1) INNER JOIN (public.ftprt2_p1 t2)) INNER JOIN (public.ftprt1_p1 t3)
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: ((public.ftprt1_p2 t1) INNER JOIN (public.ftprt2_p2 t2)) INNER JOIN (public.ftprt1_p2 t3)
-(8 rows)
+(7 rows)
 
 SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE
t1.a% 25 =0 ORDER BY 1,2,3;
 
   a  |  b  |  c   
@@ -8179,10 +8178,9 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
  Sort
    Sort Key: t1.a, ftprt2_p1.b, ftprt2_p1.c
    ->  Append
-         Async subplans: 1 
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: (public.ftprt1_p1 t1) LEFT JOIN (public.ftprt2_p1 fprt2)
-(6 rows)
+(5 rows)
 
 SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10) t2 ON (t1.a = t2.b and t1.b = t2.a)
WHEREt1.a < 10 ORDER BY 1,2,3;
 
  a | b |  c   
@@ -8202,12 +8200,11 @@ SELECT t1,t2 FROM fprt1 t1 JOIN fprt2 t2 ON (t1.a = t2.b and t1.b = t2.a) WHERE
  Sort
    Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
    ->  Append
-         Async subplans: 2 
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: (public.ftprt1_p1 t1) INNER JOIN (public.ftprt2_p1 t2)
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: (public.ftprt1_p2 t1) INNER JOIN (public.ftprt2_p2 t2)
-(8 rows)
+(7 rows)
 
 SELECT t1,t2 FROM fprt1 t1 JOIN fprt2 t2 ON (t1.a = t2.b and t1.b = t2.a) WHERE t1.a % 25 =0 ORDER BY 1,2;
        t1       |       t2       
@@ -8226,12 +8223,11 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
  Sort
    Sort Key: t1.a, t1.b
    ->  Append
-         Async subplans: 2 
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: (public.ftprt1_p1 t1) INNER JOIN (public.ftprt2_p1 t2)
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: (public.ftprt1_p2 t1) INNER JOIN (public.ftprt2_p2 t2)
-(8 rows)
+(7 rows)
 
 SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE
t1.a%25= 0 ORDER BY 1,2;
 
   a  |  b  
@@ -8313,11 +8309,10 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
          Group Key: fpagg_tab_p1.a
          Filter: (avg(fpagg_tab_p1.b) < '22'::numeric)
          ->  Append
-               Async subplans: 3 
-               ->  Async Foreign Scan on fpagg_tab_p1
-               ->  Async Foreign Scan on fpagg_tab_p2
-               ->  Async Foreign Scan on fpagg_tab_p3
-(10 rows)
+               ->  Foreign Scan on fpagg_tab_p1
+               ->  Foreign Scan on fpagg_tab_p2
+               ->  Foreign Scan on fpagg_tab_p3
+(9 rows)
 
 -- Plan with partitionwise aggregates is enabled
 SET enable_partitionwise_aggregate TO true;
@@ -8328,14 +8323,13 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
  Sort
    Sort Key: fpagg_tab_p1.a
    ->  Append
-         Async subplans: 3 
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab)
-         ->  Async Foreign Scan
+         ->  Foreign Scan
                Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab)
-(10 rows)
+(9 rows)
 
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
  a  | sum  | min | count 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 78b0f43ca8..8efbbf95a8 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -20,6 +20,8 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -34,6 +36,7 @@
 #include "optimizer/var.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "pgstat.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
@@ -53,6 +56,9 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -119,11 +125,28 @@ enum FdwDirectModifyPrivateIndex
     FdwDirectModifyPrivateSetProcessed
 };
 
+/*
+ * Connection private area structure.
+ */
+typedef struct PgFdwConnpriv
+{
+    ForeignScanState   *leader;        /* leader node of this connection */
+    bool                busy;        /* true if this connection is busy */
+} PgFdwConnpriv;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+    PGconn       *conn;            /* connection for the scan */
+    PgFdwConnpriv *connpriv;    /* connection private memory */
+} PgFdwState;
+
 /*
  * Execution state of a foreign scan using postgres_fdw.
  */
 typedef struct PgFdwScanState
 {
+    PgFdwState    s;                /* common structure */
     Relation    rel;            /* relcache entry for the foreign table. NULL
                                  * for a foreign join scan. */
     TupleDesc    tupdesc;        /* tuple descriptor of scan */
@@ -134,7 +157,7 @@ typedef struct PgFdwScanState
     List       *retrieved_attrs;    /* list of retrieved attribute numbers */
 
     /* for remote query execution */
-    PGconn       *conn;            /* connection for the scan */
+    bool        result_ready;
     unsigned int cursor_number; /* quasi-unique ID for my cursor */
     bool        cursor_exists;    /* have we created the cursor? */
     int            numParams;        /* number of parameters passed to query */
@@ -150,6 +173,12 @@ typedef struct PgFdwScanState
     /* batch-level state, for optimizing rewinds and avoiding useless fetch */
     int            fetch_ct_2;        /* Min(# of fetches done, 2) */
     bool        eof_reached;    /* true if last fetch reached EOF */
+    bool        run_async;        /* true if run asynchronously */
+    bool        inqueue;        /* true if this node is in waiter queue */
+    ForeignScanState *waiter;    /* Next node to run a query among nodes
+                                 * sharing the same connection */
+    ForeignScanState *last_waiter;    /* last waiting node in waiting queue.
+                                     * valid only on the leader node */
 
     /* working memory contexts */
     MemoryContext batch_cxt;    /* context holding current batch of tuples */
@@ -163,11 +192,11 @@ typedef struct PgFdwScanState
  */
 typedef struct PgFdwModifyState
 {
+    PgFdwState    s;                /* common structure */
     Relation    rel;            /* relcache entry for the foreign table */
     AttInMetadata *attinmeta;    /* attribute datatype conversion metadata */
 
     /* for remote query execution */
-    PGconn       *conn;            /* connection for the scan */
     char       *p_name;            /* name of prepared statement, if created */
 
     /* extracted fdw_private data */
@@ -190,6 +219,7 @@ typedef struct PgFdwModifyState
  */
 typedef struct PgFdwDirectModifyState
 {
+    PgFdwState    s;                /* common structure */
     Relation    rel;            /* relcache entry for the foreign table */
     AttInMetadata *attinmeta;    /* attribute datatype conversion metadata */
 
@@ -293,6 +323,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
 static void postgresAddForeignUpdateTargets(Query *parsetree,
                                 RangeTblEntry *target_rte,
                                 Relation target_relation);
@@ -358,6 +389,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
                              RelOptInfo *input_rel,
                              RelOptInfo *output_rel,
                              void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+                                              WaitEventSet *wes,
+                                              void *caller_data, bool reinit);
 
 /*
  * Helper functions
@@ -378,7 +413,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
                           EquivalenceClass *ec, EquivalenceMember *em,
                           void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
                       RangeTblEntry *rte,
@@ -469,6 +506,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
     routine->IterateForeignScan = postgresIterateForeignScan;
     routine->ReScanForeignScan = postgresReScanForeignScan;
     routine->EndForeignScan = postgresEndForeignScan;
+    routine->ShutdownForeignScan = postgresShutdownForeignScan;
 
     /* Functions for updating foreign tables */
     routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -505,6 +543,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
     /* Support functions for upper relation push-down */
     routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+    /* Support functions for async execution */
+    routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+    routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
     PG_RETURN_POINTER(routine);
 }
 
@@ -1355,12 +1397,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
      * Get connection to the foreign server.  Connection manager will
      * establish new connection if necessary.
      */
-    fsstate->conn = GetConnection(user, false);
+    fsstate->s.conn = GetConnection(user, false);
+    fsstate->s.connpriv = (PgFdwConnpriv *)
+        GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
+    fsstate->s.connpriv->leader = NULL;
+    fsstate->s.connpriv->busy = false;
+    fsstate->waiter = NULL;
+    fsstate->last_waiter = node;
 
     /* Assign a unique ID for my cursor */
-    fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+    fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
     fsstate->cursor_exists = false;
 
+    /* Initialize async execution status */
+    fsstate->run_async = false;
+    fsstate->inqueue = false;
+
     /* Get private info created by planner functions. */
     fsstate->query = strVal(list_nth(fsplan->fdw_private,
                                      FdwScanPrivateSelectSql));
@@ -1408,40 +1460,250 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
                              &fsstate->param_values);
 }
 
+/*
+ * Async queue manipuration functions
+ */
+
+/*
+ * add_async_waiter:
+ *
+ * adds the node to the end of waiter queue
+ */
+static inline void
+add_async_waiter(ForeignScanState *node)
+{
+    PgFdwScanState   *fsstate = GetPgFdwScanState(node);
+    ForeignScanState *leader = fsstate->s.connpriv->leader;
+    PgFdwScanState   *leader_state;
+    PgFdwScanState   *last_waiter_state;
+
+    Assert(leader && leader != node);
+
+    /* do nothing if the node is already in the queue */
+    if (fsstate->inqueue)
+        return;
+
+    leader_state = GetPgFdwScanState(leader);
+    last_waiter_state = GetPgFdwScanState(leader_state->last_waiter);
+    last_waiter_state->waiter = node;
+    leader_state->last_waiter = node;
+    fsstate->inqueue = true;
+}
+
+/*
+ * move_to_next_waiter:
+ *
+ * Makes the first waiter be next leader
+ * Returns the new leader or NULL if there's no waiter.
+ */
+static inline ForeignScanState *
+move_to_next_waiter(ForeignScanState *node)
+{
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
+    ForeignScanState *ret = fsstate->waiter;
+
+    Assert(fsstate->s.connpriv->leader = node);
+    
+    if (ret)
+    {
+        PgFdwScanState *retstate = GetPgFdwScanState(ret);
+        fsstate->waiter = NULL;
+        retstate->last_waiter = fsstate->last_waiter;
+        retstate->inqueue = false;
+    }
+
+    fsstate->s.connpriv->leader = ret;
+
+    return ret;
+}
+
+/*
+ * remove the node from waiter queue
+ *
+ * This is a bit different from the two above in the sense that this can
+ * operate on connection leader. The result is absorbed when this is called on
+ * active leader.
+ *
+ * Returns true if the node was found.
+ */
+static inline bool
+remove_async_node(ForeignScanState *node)
+{
+    PgFdwScanState        *fsstate = GetPgFdwScanState(node);
+    ForeignScanState    *leader = fsstate->s.connpriv->leader;
+    PgFdwScanState        *leader_state;
+    ForeignScanState    *prev;
+    PgFdwScanState        *prev_state;
+    ForeignScanState    *cur;
+
+    /* no need to remove me */
+    if (!leader || !fsstate->inqueue)
+        return false;
+
+    leader_state = GetPgFdwScanState(leader);
+
+    /* Remove the leader node */
+    if (leader == node)
+    {
+        ForeignScanState    *next_leader;
+
+        if (leader_state->s.connpriv->busy)
+        {
+            /*
+             * this node is waiting for result, absorb the result first so
+             * that the following commands can be sent on the connection.
+             */
+            PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+            PGconn *conn = leader_state->s.conn;
+
+            while(PQisBusy(conn))
+                PQclear(PQgetResult(conn));
+            
+            leader_state->s.connpriv->busy = false;
+        }
+
+        /* Make the first waiter the leader */
+        if (leader_state->waiter)
+        {
+            PgFdwScanState *next_leader_state;
+
+            next_leader = leader_state->waiter;
+            next_leader_state = GetPgFdwScanState(next_leader);
+
+            leader_state->s.connpriv->leader = next_leader;
+            next_leader_state->last_waiter = leader_state->last_waiter;
+        }
+        leader_state->waiter = NULL;
+
+        return true;
+    }
+
+    /*
+     * Just remove the node in queue
+     *
+     * This function is called on the shutdown path. We don't bother
+     * considering faster way to do this.
+     */
+    prev = leader;
+    prev_state = leader_state;
+    cur =  GetPgFdwScanState(prev)->waiter;
+    while (cur)
+    {
+        PgFdwScanState *curstate = GetPgFdwScanState(cur);
+
+        if (cur == node)
+        {
+            prev_state->waiter = curstate->waiter;
+            if (leader_state->last_waiter == cur)
+                leader_state->last_waiter = prev;
+            else
+                leader_state->last_waiter = cur;
+
+            fsstate->inqueue = false;
+
+            return true;
+        }
+        prev = cur;
+        prev_state = curstate;
+        cur = curstate->waiter;
+    }
+
+    return false;
+}
+
 /*
  * postgresIterateForeignScan
- *        Retrieve next row from the result set, or clear tuple slot to indicate
- *        EOF.
+ *        Retrieve next row from the result set.
+ *
+ *        For synchronous nodes, returns clear tuples slot to indicte EOF.
+ *
+ *        If the node is asynchronous one, clear tuple slot has two meanings.
+ *        If the caller receives clear tuple slot, asyncstate indicates wheter
+ *        the node is EOF (AS_AVAILABLE) or waiting for data to
+ *        come(AS_WAITING).
  */
 static TupleTableSlot *
 postgresIterateForeignScan(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
-    /*
-     * If this is the first call after Begin or ReScan, we need to create the
-     * cursor on the remote side.
-     */
-    if (!fsstate->cursor_exists)
-        create_cursor(node);
+    if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached)
+    {
+        /* we've run out, get some more tuples */
+        if (!node->fs_async)
+        {
+            /* finish running query to send my command */
+            if (!fsstate->s.connpriv->busy)
+                vacate_connection((PgFdwState *)fsstate, false);
+                
+            request_more_data(node);
+
+            /*
+             * Fetch the result immediately. This executes the next waiter if
+             * any.
+             */
+            fetch_received_data(node);
+        }
+        else if (!fsstate->s.connpriv->busy)
+        {
+            /* If the connection is not busy, just send the request. */
+            request_more_data(node);
+        }
+        else if (fsstate->s.connpriv->leader == node)
+        {
+            bool available = true;
+
+            /* Check if the result is available */
+            if (PQisBusy(fsstate->s.conn))
+            {
+                int rc = WaitLatchOrSocket(NULL,
+                                           WL_SOCKET_READABLE | WL_TIMEOUT,
+                                           PQsocket(fsstate->s.conn), 0,
+                                           WAIT_EVENT_ASYNC_WAIT);
+                if (!(rc & WL_SOCKET_READABLE))
+                    available = false;
+            }
+
+            /* The next waiter is executed automatcically */
+            if (available)
+                fetch_received_data(node);
+        }
+        else if (fsstate->s.connpriv->leader)
+        {
+            /*
+             * Anyone else is waiting on this connection then add this node to
+             * waiting queue.
+             */
+            add_async_waiter(node);
+        }
+    }
 
     /*
-     * Get some more tuples, if we've run out.
+     * If we haven't received a result for the given node this time,
+     * return with no tuple to give way to another node.
      */
     if (fsstate->next_tuple >= fsstate->num_tuples)
     {
-        /* No point in another fetch if we already detected EOF, though. */
-        if (!fsstate->eof_reached)
-            fetch_more_data(node);
-        /* If we didn't get any tuples, must be end of data. */
-        if (fsstate->next_tuple >= fsstate->num_tuples)
-            return ExecClearTuple(slot);
+        if (fsstate->eof_reached)
+        {
+            fsstate->result_ready = true;
+            node->ss.ps.asyncstate = AS_AVAILABLE;
+        }
+        else
+        {
+            fsstate->result_ready = false;
+            node->ss.ps.asyncstate = AS_WAITING;
+        }
+            
+        return ExecClearTuple(slot);
     }
 
     /*
      * Return the next tuple.
      */
+    fsstate->result_ready = true;
+    node->ss.ps.asyncstate = AS_AVAILABLE;
     ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
                    slot,
                    InvalidBuffer,
@@ -1457,7 +1719,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 static void
 postgresReScanForeignScan(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     char        sql[64];
     PGresult   *res;
 
@@ -1465,6 +1727,8 @@ postgresReScanForeignScan(ForeignScanState *node)
     if (!fsstate->cursor_exists)
         return;
 
+    vacate_connection((PgFdwState *)fsstate, true);
+
     /*
      * If any internal parameters affecting this node have changed, we'd
      * better destroy and recreate the cursor.  Otherwise, rewinding it should
@@ -1493,9 +1757,9 @@ postgresReScanForeignScan(ForeignScanState *node)
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_exec_query(fsstate->conn, sql);
+    res = pgfdw_exec_query(fsstate->s.conn, sql);
     if (PQresultStatus(res) != PGRES_COMMAND_OK)
-        pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+        pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
     PQclear(res);
 
     /* Now force a fresh FETCH. */
@@ -1513,7 +1777,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 static void
 postgresEndForeignScan(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
 
     /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
     if (fsstate == NULL)
@@ -1521,15 +1785,31 @@ postgresEndForeignScan(ForeignScanState *node)
 
     /* Close the cursor if open, to prevent accumulation of cursors */
     if (fsstate->cursor_exists)
-        close_cursor(fsstate->conn, fsstate->cursor_number);
+        close_cursor(fsstate->s.conn, fsstate->cursor_number);
 
     /* Release remote connection */
-    ReleaseConnection(fsstate->conn);
-    fsstate->conn = NULL;
+    ReleaseConnection(fsstate->s.conn);
+    fsstate->s.conn = NULL;
 
     /* MemoryContexts will be deleted automatically. */
 }
 
+/*
+ * postgresShutdownForeignScan
+ *        Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+    ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+    if (plan->operation != CMD_SELECT)
+        return;
+
+    /* remove the node from waiting queue */
+    remove_async_node(node);
+}
+
 /*
  * postgresAddForeignUpdateTargets
  *        Add resjunk column(s) needed for update/delete on a foreign table
@@ -1753,6 +2033,9 @@ postgresExecForeignInsert(EState *estate,
     PGresult   *res;
     int            n_rows;
 
+    /* finish running query to send my command */
+    vacate_connection((PgFdwState *)fmstate, true);
+
     /* Set up the prepared statement on the remote server, if we didn't yet */
     if (!fmstate->p_name)
         prepare_foreign_modify(fmstate);
@@ -1763,14 +2046,14 @@ postgresExecForeignInsert(EState *estate,
     /*
      * Execute the prepared statement.
      */
-    if (!PQsendQueryPrepared(fmstate->conn,
+    if (!PQsendQueryPrepared(fmstate->s.conn,
                              fmstate->p_name,
                              fmstate->p_nums,
                              p_values,
                              NULL,
                              NULL,
                              0))
-        pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+        pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -1778,10 +2061,10 @@ postgresExecForeignInsert(EState *estate,
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_get_result(fmstate->conn, fmstate->query);
+    res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
     if (PQresultStatus(res) !=
         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+        pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
     /* Check number of rows affected, and fetch RETURNING tuple if any */
     if (fmstate->has_returning)
@@ -1819,6 +2102,9 @@ postgresExecForeignUpdate(EState *estate,
     PGresult   *res;
     int            n_rows;
 
+    /* finish running query to send my command */
+    vacate_connection((PgFdwState *)fmstate, true);
+
     /* Set up the prepared statement on the remote server, if we didn't yet */
     if (!fmstate->p_name)
         prepare_foreign_modify(fmstate);
@@ -1839,14 +2125,14 @@ postgresExecForeignUpdate(EState *estate,
     /*
      * Execute the prepared statement.
      */
-    if (!PQsendQueryPrepared(fmstate->conn,
+    if (!PQsendQueryPrepared(fmstate->s.conn,
                              fmstate->p_name,
                              fmstate->p_nums,
                              p_values,
                              NULL,
                              NULL,
                              0))
-        pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+        pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -1854,10 +2140,10 @@ postgresExecForeignUpdate(EState *estate,
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_get_result(fmstate->conn, fmstate->query);
+    res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
     if (PQresultStatus(res) !=
         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+        pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
     /* Check number of rows affected, and fetch RETURNING tuple if any */
     if (fmstate->has_returning)
@@ -1895,6 +2181,9 @@ postgresExecForeignDelete(EState *estate,
     PGresult   *res;
     int            n_rows;
 
+    /* finish running query to send my command */
+    vacate_connection((PgFdwState *)fmstate, true);
+
     /* Set up the prepared statement on the remote server, if we didn't yet */
     if (!fmstate->p_name)
         prepare_foreign_modify(fmstate);
@@ -1915,14 +2204,14 @@ postgresExecForeignDelete(EState *estate,
     /*
      * Execute the prepared statement.
      */
-    if (!PQsendQueryPrepared(fmstate->conn,
+    if (!PQsendQueryPrepared(fmstate->s.conn,
                              fmstate->p_name,
                              fmstate->p_nums,
                              p_values,
                              NULL,
                              NULL,
                              0))
-        pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+        pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -1930,10 +2219,10 @@ postgresExecForeignDelete(EState *estate,
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_get_result(fmstate->conn, fmstate->query);
+    res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
     if (PQresultStatus(res) !=
         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+        pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
     /* Check number of rows affected, and fetch RETURNING tuple if any */
     if (fmstate->has_returning)
@@ -2400,7 +2689,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
      * Get connection to the foreign server.  Connection manager will
      * establish new connection if necessary.
      */
-    dmstate->conn = GetConnection(user, false);
+    dmstate->s.conn = GetConnection(user, false);
+    dmstate->s.connpriv = (PgFdwConnpriv *)
+        GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
 
     /* Update the foreign-join-related fields. */
     if (fsplan->scan.scanrelid == 0)
@@ -2485,7 +2776,11 @@ postgresIterateDirectModify(ForeignScanState *node)
      * If this is the first call after Begin, execute the statement.
      */
     if (dmstate->num_tuples == -1)
+    {
+        /* finish running query to send my command */
+        vacate_connection((PgFdwState *)dmstate, true);
         execute_dml_stmt(node);
+    }
 
     /*
      * If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2532,8 +2827,8 @@ postgresEndDirectModify(ForeignScanState *node)
         PQclear(dmstate->result);
 
     /* Release remote connection */
-    ReleaseConnection(dmstate->conn);
-    dmstate->conn = NULL;
+    ReleaseConnection(dmstate->s.conn);
+    dmstate->s.conn = NULL;
 
     /* close the target relation. */
     if (dmstate->resultRel)
@@ -2656,6 +2951,7 @@ estimate_path_cost_size(PlannerInfo *root,
         List       *local_param_join_conds;
         StringInfoData sql;
         PGconn       *conn;
+        PgFdwConnpriv *connpriv;
         Selectivity local_sel;
         QualCost    local_cost;
         List       *fdw_scan_tlist = NIL;
@@ -2698,6 +2994,18 @@ estimate_path_cost_size(PlannerInfo *root,
 
         /* Get the remote estimate */
         conn = GetConnection(fpinfo->user, false);
+        connpriv = GetConnectionSpecificStorage(fpinfo->user,
+                                                sizeof(PgFdwConnpriv));
+        if (connpriv)
+        {
+            PgFdwState tmpstate;
+            tmpstate.conn = conn;
+            tmpstate.connpriv = connpriv;
+
+            /* finish running query to send my command */
+            vacate_connection(&tmpstate, true);
+        }
+
         get_remote_estimate(sql.data, conn, &rows, &width,
                             &startup_cost, &total_cost);
         ReleaseConnection(conn);
@@ -3061,11 +3369,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void
 create_cursor(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     ExprContext *econtext = node->ss.ps.ps_ExprContext;
     int            numParams = fsstate->numParams;
     const char **values = fsstate->param_values;
-    PGconn       *conn = fsstate->conn;
+    PGconn       *conn = fsstate->s.conn;
     StringInfoData buf;
     PGresult   *res;
 
@@ -3128,50 +3436,121 @@ create_cursor(ForeignScanState *node)
 }
 
 /*
- * Fetch some more rows from the node's cursor.
+ * Sends the next request of the node. If the given node is different from the
+ * current connection leader, pushes it back to waiter queue and let the given
+ * node be the leader.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
+    ForeignScanState *leader = fsstate->s.connpriv->leader;
+    PGconn       *conn = fsstate->s.conn;
+    char        sql[64];
+
+    /* must be non-busy */
+    Assert(!fsstate->s.connpriv->busy);
+    /* must be not-eof */
+    Assert(!fsstate->eof_reached);
+
+    /*
+     * If this is the first call after Begin or ReScan, we need to create the
+     * cursor on the remote side.
+     */
+    if (!fsstate->cursor_exists)
+        create_cursor(node);
+
+    snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+             fsstate->fetch_size, fsstate->cursor_number);
+
+    if (!PQsendQuery(conn, sql))
+        pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+    fsstate->s.connpriv->busy = true;
+
+    /* Let the node be the leader if it is different from current one */
+    if (leader != node)
+    {
+        /*
+         * If the connection leader exists, insert the node as the connection
+         * leader making the current leader be the first waiter.
+         */
+        if (leader != NULL)
+        {
+            remove_async_node(node);
+            fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter;
+            fsstate->waiter = leader;
+        }
+        fsstate->s.connpriv->leader = node;
+    }
+}
+
+/*
+ * Fetches received data and automatically send requests of the next waiter.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     PGresult   *volatile res = NULL;
     MemoryContext oldcontext;
+    ForeignScanState *waiter;
+
+    /* I should be the current connection leader */
+    Assert(fsstate->s.connpriv->leader == node);
 
     /*
      * We'll store the tuples in the batch_cxt.  First, flush the previous
-     * batch.
+     * batch if no tuple is remaining
      */
-    fsstate->tuples = NULL;
-    MemoryContextReset(fsstate->batch_cxt);
+    if (fsstate->next_tuple >= fsstate->num_tuples)
+    {
+        fsstate->tuples = NULL;
+        fsstate->num_tuples = 0;
+        MemoryContextReset(fsstate->batch_cxt);
+    }
+    else if (fsstate->next_tuple > 0)
+    {
+        /* move the remaining tuples to the beginning of the store */
+        int n = 0;
+
+        while(fsstate->next_tuple < fsstate->num_tuples)
+            fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+        fsstate->num_tuples = n;
+    }
+
     oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
     /* PGresult must be released before leaving this function. */
     PG_TRY();
     {
-        PGconn       *conn = fsstate->conn;
+        PGconn       *conn = fsstate->s.conn;
         char        sql[64];
-        int            numrows;
+        int            addrows;
+        size_t        newsize;
         int            i;
 
         snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
                  fsstate->fetch_size, fsstate->cursor_number);
 
-        res = pgfdw_exec_query(conn, sql);
+        res = pgfdw_get_result(conn, sql);
         /* On error, report the original query, not the FETCH. */
         if (PQresultStatus(res) != PGRES_TUPLES_OK)
             pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
         /* Convert the data into HeapTuples */
-        numrows = PQntuples(res);
-        fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-        fsstate->num_tuples = numrows;
-        fsstate->next_tuple = 0;
+        addrows = PQntuples(res);
+        newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+        if (fsstate->tuples)
+            fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+        else
+            fsstate->tuples = (HeapTuple *) palloc(newsize);
 
-        for (i = 0; i < numrows; i++)
+        for (i = 0; i < addrows; i++)
         {
             Assert(IsA(node->ss.ps.plan, ForeignScan));
 
-            fsstate->tuples[i] =
+            fsstate->tuples[fsstate->num_tuples + i] =
                 make_tuple_from_result_row(res, i,
                                            fsstate->rel,
                                            fsstate->attinmeta,
@@ -3181,26 +3560,76 @@ fetch_more_data(ForeignScanState *node)
         }
 
         /* Update fetch_ct_2 */
-        if (fsstate->fetch_ct_2 < 2)
+        if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
             fsstate->fetch_ct_2++;
 
+        fsstate->next_tuple = 0;
+        fsstate->num_tuples += addrows;
+
         /* Must be EOF if we didn't get as many tuples as we asked for. */
-        fsstate->eof_reached = (numrows < fsstate->fetch_size);
+        fsstate->eof_reached = (addrows < fsstate->fetch_size);
 
         PQclear(res);
         res = NULL;
     }
     PG_CATCH();
     {
+        fsstate->s.connpriv->busy = false;
+
         if (res)
             PQclear(res);
         PG_RE_THROW();
     }
     PG_END_TRY();
 
+    fsstate->s.connpriv->busy = false;
+
+    /* let the first waiter be the next leader of this connection */
+    waiter = move_to_next_waiter(node);
+
+    /* send the next request if any */
+    if (waiter)
+        request_more_data(waiter);
+
     MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate, bool clear_queue)
+{
+    PgFdwConnpriv *connpriv = fdwstate->connpriv;
+    ForeignScanState *leader;
+
+    /* the connection is alrady available */
+    if (connpriv == NULL || connpriv->leader == NULL || !connpriv->busy)
+        return;
+
+    /*
+     * let the current connection leader read the result for the running query
+     */
+    leader = connpriv->leader;
+    fetch_received_data(leader);
+
+    /* let the first waiter be the next leader of this connection */
+    move_to_next_waiter(leader);
+
+    if (!clear_queue)
+        return;
+
+    /* Clear the waiting list */
+    while (leader)
+    {
+        PgFdwScanState *fsstate = GetPgFdwScanState(leader);
+
+        fsstate->last_waiter = NULL;
+        leader = fsstate->waiter;
+        fsstate->waiter = NULL;
+    }
+}
+
 /*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
@@ -3314,7 +3743,9 @@ create_foreign_modify(EState *estate,
     user = GetUserMapping(userid, table->serverid);
 
     /* Open connection; report that we'll create a prepared statement. */
-    fmstate->conn = GetConnection(user, true);
+    fmstate->s.conn = GetConnection(user, true);
+    fmstate->s.connpriv = (PgFdwConnpriv *)
+        GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
     fmstate->p_name = NULL;        /* prepared statement not made yet */
 
     /* Set up remote query information. */
@@ -3387,7 +3818,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
     /* Construct name we'll use for the prepared statement. */
     snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
-             GetPrepStmtNumber(fmstate->conn));
+             GetPrepStmtNumber(fmstate->s.conn));
     p_name = pstrdup(prep_name);
 
     /*
@@ -3397,12 +3828,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
      * the prepared statements we use in this module are simple enough that
      * the remote server will make the right choices.
      */
-    if (!PQsendPrepare(fmstate->conn,
+    if (!PQsendPrepare(fmstate->s.conn,
                        p_name,
                        fmstate->query,
                        0,
                        NULL))
-        pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+        pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -3410,9 +3841,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_get_result(fmstate->conn, fmstate->query);
+    res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
     if (PQresultStatus(res) != PGRES_COMMAND_OK)
-        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+        pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
     PQclear(res);
 
     /* This action shows that the prepare has been done. */
@@ -3537,16 +3968,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
          * We don't use a PG_TRY block here, so be careful not to throw error
          * without releasing the PGresult.
          */
-        res = pgfdw_exec_query(fmstate->conn, sql);
+        res = pgfdw_exec_query(fmstate->s.conn, sql);
         if (PQresultStatus(res) != PGRES_COMMAND_OK)
-            pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+            pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
         PQclear(res);
         fmstate->p_name = NULL;
     }
 
     /* Release remote connection */
-    ReleaseConnection(fmstate->conn);
-    fmstate->conn = NULL;
+    ReleaseConnection(fmstate->s.conn);
+    fmstate->s.conn = NULL;
 }
 
 /*
@@ -3706,9 +4137,9 @@ execute_dml_stmt(ForeignScanState *node)
      * the desired result.  This allows us to avoid assuming that the remote
      * server has the same OIDs we do for the parameters' types.
      */
-    if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+    if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
                            NULL, values, NULL, NULL, 0))
-        pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+        pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -3716,10 +4147,10 @@ execute_dml_stmt(ForeignScanState *node)
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+    dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
     if (PQresultStatus(dmstate->result) !=
         (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-        pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+        pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
                            dmstate->query);
 
     /* Get the number of rows affected. */
@@ -5203,6 +5634,42 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
     /* XXX Consider parameterized paths for the join relation */
 }
 
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+    return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection leader. Elsewise
+ * another node on this connection is the leader.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+                                  void *caller_data, bool reinit)
+{
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+    /* If the caller didn't reinit, this event is already in event set */
+    if (!reinit)
+        return true;
+
+    if (fsstate->s.connpriv->leader == node)
+    {
+        AddWaitEventToSet(wes,
+                          WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+                          NULL, caller_data);
+        return true;
+    }
+
+    return false;
+}
+
+
 /*
  * Assess whether the aggregation, grouping and having operations can be pushed
  * down to the foreign server.  As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a5d4011e8d..f344fb7f66 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -77,6 +77,7 @@ typedef struct PgFdwRelationInfo
     UserMapping *user;            /* only set in use_remote_estimate mode */
 
     int            fetch_size;        /* fetch size for this remote table */
+    bool        allow_prefetch;    /* true to allow overlapped fetching  */
 
     /*
      * Name of the relation while EXPLAINing ForeignScan. It is used for join
@@ -116,6 +117,7 @@ extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 231b1e01a5..8ecc903c20 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1617,25 +1617,25 @@ INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE b SET aa = 'new';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'newtoo';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
@@ -1677,12 +1677,12 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
 
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
@@ -1741,8 +1741,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
 
 -- Test that UPDATE/DELETE with inherited target works with row-level triggers
 CREATE TRIGGER trig_row_before
-- 
2.16.3


pgsql-hackers by date:

Previous
From: Amit Langote
Date:
Subject: Re: postgres_fdw: Oddity in pushing down inherited UPDATE/DELETEjoins to remote servers
Next
From: Tatsuo Ishii
Date:
Subject: Re: Having query cache in core