Re: Asynchronous Append on postgres_fdw nodes. - Mailing list pgsql-hackers
| From | Kyotaro Horiguchi |
|---|---|
| Subject | Re: Asynchronous Append on postgres_fdw nodes. |
| Date | |
| Msg-id | 20201001.134331.402539330073512128.horikyota.ntt@gmail.com Whole thread Raw |
| In response to | Asynchronous Append on postgres_fdw nodes. (Kyotaro Horiguchi <horikyota.ntt@gmail.com>) |
| List | pgsql-hackers |
At Thu, 1 Oct 2020 12:56:02 +0900, Michael Paquier <michael@paquier.xyz> wrote in
> On Thu, Oct 01, 2020 at 11:16:53AM +0900, Kyotaro Horiguchi wrote:
> > Thanks. Since it starts all remote nodes before local ones, the
> > startup gain would be the shorter of the startup time of the fastest
> > remote and the time required for all local nodes. Plus remote
> > transfer gets asynchronous fetch gain.
>
> The patch fails to apply per the CF bot. For now, I have moved it to
> next CF, waiting on author.
Thanks! Rebased.
--
Kyotaro Horiguchi
NTT Open Source Software Center
From 09a38c30aed31673d3f9360a1853f5f99948f016 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH v7 1/3] Allow wait event set to be registered to resource
owner
WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
src/backend/libpq/pqcomm.c | 2 +-
src/backend/postmaster/pgstat.c | 2 +-
src/backend/postmaster/syslogger.c | 2 +-
src/backend/storage/ipc/latch.c | 20 ++++++--
src/backend/utils/resowner/resowner.c | 67 +++++++++++++++++++++++++++
src/include/storage/latch.h | 4 +-
src/include/utils/resowner_private.h | 8 ++++
7 files changed, 98 insertions(+), 7 deletions(-)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index ac986c0505..799fa5006d 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -218,7 +218,7 @@ pq_init(void)
(errmsg("could not set socket to nonblocking mode: %m")));
#endif
- FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+ FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e6be2b7836..30020f8cda 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4503,7 +4503,7 @@ PgstatCollectorMain(int argc, char *argv[])
pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
/* Prepare to wait for our latch or data in our socket. */
- wes = CreateWaitEventSet(CurrentMemoryContext, 3);
+ wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index ffcb54968f..a4de6d90e2 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -300,7 +300,7 @@ SysLoggerMain(int argc, char *argv[])
* syslog pipe, which implies that all other backends have exited
* (including the postmaster).
*/
- wes = CreateWaitEventSet(CurrentMemoryContext, 2);
+ wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
#ifndef WIN32
AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 63c6c97536..108a6127e9 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -57,6 +57,7 @@
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/memutils.h"
+#include "utils/resowner_private.h"
/*
* Select the fd readiness primitive to use. Normally the "most modern"
@@ -85,6 +86,8 @@ struct WaitEventSet
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ ResourceOwner resowner; /* Resource owner */
+
/*
* Array, of nevents_space length, storing the definition of events this
* set is waiting for.
@@ -257,7 +260,7 @@ InitializeLatchWaitSet(void)
Assert(LatchWaitSet == NULL);
/* Set up the WaitEventSet used by WaitLatch(). */
- LatchWaitSet = CreateWaitEventSet(TopMemoryContext, 2);
+ LatchWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 2);
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
if (IsUnderPostmaster)
@@ -441,7 +444,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
int ret = 0;
int rc;
WaitEvent event;
- WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+ WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
if (wakeEvents & WL_TIMEOUT)
Assert(timeout >= 0);
@@ -608,12 +611,15 @@ ResetLatch(Latch *latch)
* WaitEventSetWait().
*/
WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
{
WaitEventSet *set;
char *data;
Size sz = 0;
+ if (res)
+ ResourceOwnerEnlargeWESs(res);
+
/*
* Use MAXALIGN size/alignment to guarantee that later uses of memory are
* aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -728,6 +734,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
#endif
+ /* Register this wait event set if requested */
+ set->resowner = res;
+ if (res)
+ ResourceOwnerRememberWES(set->resowner, set);
+
return set;
}
@@ -773,6 +784,9 @@ FreeWaitEventSet(WaitEventSet *set)
}
#endif
+ if (set->resowner != NULL)
+ ResourceOwnerForgetWES(set->resowner, set);
+
pfree(set);
}
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 8bc2c4e9ea..237ca9fa30 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -128,6 +128,7 @@ typedef struct ResourceOwnerData
ResourceArray filearr; /* open temporary files */
ResourceArray dsmarr; /* dynamic shmem segments */
ResourceArray jitarr; /* JIT contexts */
+ ResourceArray wesarr; /* wait event sets */
/* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
int nlocks; /* number of owned locks */
@@ -175,6 +176,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
static void PrintSnapshotLeakWarning(Snapshot snapshot);
static void PrintFileLeakWarning(File file);
static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
/*****************************************************************************
@@ -444,6 +446,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
+ ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
return owner;
}
@@ -553,6 +556,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
jit_release_context(context);
}
+
+ /* Ditto for wait event sets */
+ while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+ {
+ WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+ if (isCommit)
+ PrintWESLeakWarning(event);
+ FreeWaitEventSet(event);
+ }
}
else if (phase == RESOURCE_RELEASE_LOCKS)
{
@@ -725,6 +738,7 @@ ResourceOwnerDelete(ResourceOwner owner)
Assert(owner->filearr.nitems == 0);
Assert(owner->dsmarr.nitems == 0);
Assert(owner->jitarr.nitems == 0);
+ Assert(owner->wesarr.nitems == 0);
Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
/*
@@ -752,6 +766,7 @@ ResourceOwnerDelete(ResourceOwner owner)
ResourceArrayFree(&(owner->filearr));
ResourceArrayFree(&(owner->dsmarr));
ResourceArrayFree(&(owner->jitarr));
+ ResourceArrayFree(&(owner->wesarr));
pfree(owner);
}
@@ -1370,3 +1385,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle)
elog(ERROR, "JIT context %p is not owned by resource owner %s",
DatumGetPointer(handle), owner->name);
}
+
+/*
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+ ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+ ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+ elog(ERROR, "wait event set %p is not owned by resource owner %s",
+ events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ elog(WARNING, "wait event set leak: %p still referenced",
+ events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 7c742021fb..ae13d4c08d 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
#define LATCH_H
#include <signal.h>
+#include "utils/resowner.h"
/*
* Latch structure should be treated as opaque and only accessed through
@@ -163,7 +164,8 @@ extern void DisownLatch(Latch *latch);
extern void SetLatch(Latch *latch);
extern void ResetLatch(Latch *latch);
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+ ResourceOwner res, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index a781a7a2aa..7d19dadd57 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
#include "storage/dsm.h"
#include "storage/fd.h"
+#include "storage/latch.h"
#include "storage/lock.h"
#include "utils/catcache.h"
#include "utils/plancache.h"
@@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner,
extern void ResourceOwnerForgetJIT(ResourceOwner owner,
Datum handle);
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+ WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+ WaitEventSet *);
+
#endif /* RESOWNER_PRIVATE_H */
--
2.18.4
From c47ea326f3557d7ac03886c91fda5ebf689ae068 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 15 May 2018 20:21:32 +0900
Subject: [PATCH v7 2/3] Infrastructure for asynchronous execution
This patch add an infrastructure for asynchronous execution. As a PoC
this makes only Append capable to handle asynchronously executable
subnodes.
---
src/backend/commands/explain.c | 17 ++
src/backend/executor/Makefile | 1 +
src/backend/executor/execAsync.c | 152 +++++++++++
src/backend/executor/nodeAppend.c | 342 ++++++++++++++++++++----
src/backend/executor/nodeForeignscan.c | 21 ++
src/backend/nodes/bitmapset.c | 72 +++++
src/backend/nodes/copyfuncs.c | 3 +
src/backend/nodes/outfuncs.c | 3 +
src/backend/nodes/readfuncs.c | 3 +
src/backend/optimizer/path/allpaths.c | 24 ++
src/backend/optimizer/path/costsize.c | 55 +++-
src/backend/optimizer/plan/createplan.c | 45 +++-
src/backend/postmaster/pgstat.c | 3 +
src/backend/utils/adt/ruleutils.c | 8 +-
src/backend/utils/resowner/resowner.c | 4 +-
src/include/executor/execAsync.h | 22 ++
src/include/executor/executor.h | 1 +
src/include/executor/nodeForeignscan.h | 3 +
src/include/foreign/fdwapi.h | 11 +
src/include/nodes/bitmapset.h | 1 +
src/include/nodes/execnodes.h | 23 +-
src/include/nodes/plannodes.h | 9 +
src/include/optimizer/paths.h | 2 +
src/include/pgstat.h | 3 +-
24 files changed, 756 insertions(+), 72 deletions(-)
create mode 100644 src/backend/executor/execAsync.c
create mode 100644 src/include/executor/execAsync.h
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index c98c9b5547..097355f6f9 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -86,6 +86,7 @@ static void show_incremental_sort_keys(IncrementalSortState *incrsortstate,
List *ancestors, ExplainState *es);
static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
ExplainState *es);
+static void show_append_info(AppendState *astate, ExplainState *es);
static void show_agg_keys(AggState *astate, List *ancestors,
ExplainState *es);
static void show_grouping_sets(PlanState *planstate, Agg *agg,
@@ -1377,6 +1378,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
+ if (plan->async_capable)
+ appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
@@ -1958,6 +1961,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
case T_Hash:
show_hash_info(castNode(HashState, planstate), es);
break;
+
+ case T_Append:
+ show_append_info(castNode(AppendState, planstate), es);
+ break;
+
default:
break;
}
@@ -2311,6 +2319,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
ancestors, es);
}
+static void
+show_append_info(AppendState *astate, ExplainState *es)
+{
+ Append *plan = (Append *) astate->ps.plan;
+
+ if (plan->nasyncplans > 0)
+ ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es);
+}
+
/*
* Show the grouping keys for an Agg node.
*/
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index f990c6473a..1004647d4f 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
+ execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..2b7d1877e0
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,152 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/*
+ * ExecAsyncConfigureWait: Add wait event to the WaitEventSet if needed.
+ *
+ * If reinit is true, the caller didn't reuse existing WaitEventSet.
+ */
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+ void *data, bool reinit)
+{
+ switch (nodeTag(node))
+ {
+ case T_ForeignScanState:
+ return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+ wes, data, reinit);
+ break;
+ default:
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(node));
+ }
+}
+
+/*
+ * struct for memory context callback argument used in ExecAsyncEventWait
+ */
+typedef struct {
+ int **p_refind;
+ int *p_refindsize;
+} ExecAsync_mcbarg;
+
+/*
+ * callback function to reset static variables pointing to the memory in
+ * TopTransactionContext in ExecAsyncEventWait.
+ */
+static void ExecAsyncMemoryContextCallback(void *arg)
+{
+ /* arg is the address of the variable refind in ExecAsyncEventWait */
+ ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg;
+ *mcbarg->p_refind = NULL;
+ *mcbarg->p_refindsize = 0;
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+/*
+ * ExecAsyncEventWait:
+ *
+ * Wait for async events to fire. Returns the Bitmapset of fired events.
+ */
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+ static int *refind = NULL;
+ static int refindsize = 0;
+ WaitEventSet *wes;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred = 0;
+ Bitmapset *fired_events = NULL;
+ int i;
+ int n;
+
+ n = bms_num_members(waitnodes);
+ wes = CreateWaitEventSet(TopTransactionContext,
+ TopTransactionResourceOwner, n);
+ if (refindsize < n)
+ {
+ if (refindsize == 0)
+ refindsize = EVENT_BUFFER_SIZE; /* XXX */
+ while (refindsize < n)
+ refindsize *= 2;
+ if (refind)
+ refind = (int *) repalloc(refind, refindsize * sizeof(int));
+ else
+ {
+ static ExecAsync_mcbarg mcb_arg =
+ { &refind, &refindsize };
+ static MemoryContextCallback mcb =
+ { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL };
+ MemoryContext oldctxt =
+ MemoryContextSwitchTo(TopTransactionContext);
+
+ /*
+ * refind points to a memory block in
+ * TopTransactionContext. Register a callback to reset it.
+ */
+ MemoryContextRegisterResetCallback(TopTransactionContext, &mcb);
+ refind = (int *) palloc(refindsize * sizeof(int));
+ MemoryContextSwitchTo(oldctxt);
+ }
+ }
+
+ /* Prepare WaitEventSet for waiting on the waitnodes. */
+ n = 0;
+ for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+ i = bms_next_member(waitnodes, i))
+ {
+ refind[i] = i;
+ if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+ n++;
+ }
+
+ /* Return immediately if no node to wait. */
+ if (n == 0)
+ {
+ FreeWaitEventSet(wes);
+ return NULL;
+ }
+
+ noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+ EVENT_BUFFER_SIZE,
+ WAIT_EVENT_ASYNC_WAIT);
+ FreeWaitEventSet(wes);
+ if (noccurred == 0)
+ return NULL;
+
+ for (i = 0 ; i < noccurred ; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+ {
+ int n = *(int*)w->user_data;
+
+ fired_events = bms_add_member(fired_events, n);
+ }
+ }
+
+ return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 88919e62fa..60c36ee048 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,6 +60,7 @@
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
#include "miscadmin.h"
/* Shared state for parallel-aware Append. */
@@ -80,6 +81,7 @@ struct ParallelAppendState
#define INVALID_SUBPLAN_INDEX -1
static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
@@ -103,22 +105,22 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
PlanState **appendplanstates;
Bitmapset *validsubplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
/* check for unsupported flags */
- Assert(!(eflags & EXEC_FLAG_MARK));
+ Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
/*
* create new AppendState for our append node
*/
appendstate->ps.plan = (Plan *) node;
appendstate->ps.state = estate;
- appendstate->ps.ExecProcNode = ExecAppend;
/* Let choose_next_subplan_* function handle setting the first subplan */
- appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@@ -152,11 +154,12 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/*
* When no run-time pruning is required and there's at least one
- * subplan, we can fill as_valid_subplans immediately, preventing
+ * subplan, we can fill as_valid_syncsubplans immediately, preventing
* later calls to ExecFindMatchingSubPlans.
*/
if (!prunestate->do_exec_prune && nplans > 0)
- appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+ appendstate->as_valid_syncsubplans =
+ bms_add_range(NULL, node->nasyncplans, nplans - 1);
}
else
{
@@ -167,8 +170,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* subplans as valid; they must also all be initialized.
*/
Assert(nplans > 0);
- appendstate->as_valid_subplans = validsubplans =
- bms_add_range(NULL, 0, nplans - 1);
+ validsubplans = bms_add_range(NULL, 0, nplans - 1);
+ appendstate->as_valid_syncsubplans =
+ bms_add_range(NULL, node->nasyncplans, nplans - 1);
appendstate->as_prune_state = NULL;
}
@@ -192,10 +196,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
*/
j = 0;
firstvalid = nplans;
+ nasyncplans = 0;
+
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
+ int sub_eflags = eflags;
+
+ /* Let async-capable subplans run asynchronously */
+ if (i < node->nasyncplans)
+ {
+ sub_eflags |= EXEC_FLAG_ASYNC;
+ nasyncplans++;
+ }
/*
* Record the lowest appendplans index which is a valid partial plan.
@@ -203,13 +217,46 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
if (i >= node->first_partial_plan && j < firstvalid)
firstvalid = j;
- appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+ appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags);
}
appendstate->as_first_partial_plan = firstvalid;
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* fill in async stuff */
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_syncdone = (nasyncplans == nplans);
+ appendstate->as_exec_prune = false;
+
+ /* choose appropriate version of Exec function */
+ if (appendstate->as_nasyncplans == 0)
+ appendstate->ps.ExecProcNode = ExecAppend;
+ else
+ appendstate->ps.ExecProcNode = ExecAppendAsync;
+
+ if (appendstate->as_nasyncplans)
+ {
+ appendstate->as_asyncresult = (TupleTableSlot **)
+ palloc0(appendstate->as_nasyncplans * sizeof(TupleTableSlot *));
+
+ /* initially, all async requests need a request */
+ appendstate->as_needrequest =
+ bms_add_range(NULL, 0, appendstate->as_nasyncplans - 1);
+
+ /*
+ * ExecAppendAsync needs as_valid_syncsubplans to handle async
+ * subnodes.
+ */
+ if (appendstate->as_prune_state != NULL &&
+ appendstate->as_prune_state->do_exec_prune)
+ {
+ Assert(appendstate->as_valid_syncsubplans == NULL);
+
+ appendstate->as_exec_prune = true;
+ }
+ }
+
/*
* Miscellaneous initialization
*/
@@ -233,7 +280,7 @@ ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
- if (node->as_whichplan < 0)
+ if (node->as_whichsyncplan < 0)
{
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
@@ -243,11 +290,13 @@ ExecAppend(PlanState *pstate)
* If no subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+ if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
!node->choose_next_subplan(node))
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
+ Assert(node->as_nasyncplans == 0);
+
for (;;)
{
PlanState *subnode;
@@ -258,8 +307,9 @@ ExecAppend(PlanState *pstate)
/*
* figure out which subplan we are currently processing
*/
- Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
- subnode = node->appendplans[node->as_whichplan];
+ Assert(node->as_whichsyncplan >= 0 &&
+ node->as_whichsyncplan < node->as_nplans);
+ subnode = node->appendplans[node->as_whichsyncplan];
/*
* get a tuple from the subplan
@@ -282,6 +332,172 @@ ExecAppend(PlanState *pstate)
}
}
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+ AppendState *node = castNode(AppendState, pstate);
+ Bitmapset *needrequest;
+ int i;
+
+ Assert(node->as_nasyncplans > 0);
+
+restart:
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ if (node->as_exec_prune)
+ {
+ Bitmapset *valid_subplans =
+ ExecFindMatchingSubPlans(node->as_prune_state);
+
+ /* Distribute valid subplans into sync and async */
+ node->as_needrequest =
+ bms_intersect(node->as_needrequest, valid_subplans);
+ node->as_valid_syncsubplans =
+ bms_difference(valid_subplans, node->as_needrequest);
+
+ node->as_exec_prune = false;
+ }
+
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ while ((i = bms_first_member(needrequest)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+
+ slot = ExecProcNode(subnode);
+ if (subnode->asyncstate == AS_AVAILABLE)
+ {
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest = bms_add_member(node->as_needrequest, i);
+ }
+ }
+ else
+ node->as_pending_async = bms_add_member(node->as_pending_async, i);
+ }
+ bms_free(needrequest);
+
+ for (;;)
+ {
+ TupleTableSlot *result;
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ while (!bms_is_empty(node->as_pending_async))
+ {
+ /* Don't wait for async nodes if any sync node exists. */
+ long timeout = node->as_syncdone ? -1 : 0;
+ Bitmapset *fired;
+ int i;
+
+ fired = ExecAsyncEventWait(node->appendplans,
+ node->as_pending_async,
+ timeout);
+
+ if (bms_is_empty(fired) && node->as_syncdone)
+ {
+ /*
+ * We come here when all the subnodes had fired before
+ * waiting. Retry fetching from the nodes.
+ */
+ node->as_needrequest = node->as_pending_async;
+ node->as_pending_async = NULL;
+ goto restart;
+ }
+
+ while ((i = bms_first_member(fired)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+ slot = ExecProcNode(subnode);
+
+ Assert(subnode->asyncstate == AS_AVAILABLE);
+
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest =
+ bms_add_member(node->as_needrequest, i);
+ }
+
+ node->as_pending_async =
+ bms_del_member(node->as_pending_async, i);
+ }
+ bms_free(fired);
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If there is no asynchronous activity still pending and the
+ * synchronous activity is also complete, we're totally done scanning
+ * this node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the synchronous children.
+ */
+
+ if (!node->as_syncdone &&
+ node->as_whichsyncplan == INVALID_SUBPLAN_INDEX)
+ node->as_syncdone = !node->choose_next_subplan(node);
+
+ if (node->as_syncdone)
+ {
+ Assert(bms_is_empty(node->as_pending_async));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+
+ /*
+ * get a tuple from the subplan
+ */
+ result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+ if (!TupIsNull(result))
+ {
+ /*
+ * If the subplan gave us something then return it as-is. We do
+ * NOT make use of the result slot that was set up in
+ * ExecInitAppend; there's no need for it.
+ */
+ return result;
+ }
+
+ /*
+ * Go on to the "next" subplan. If no more subplans, return the empty
+ * slot set up for us by ExecInitAppend, unless there are async plans
+ * we have yet to finish.
+ */
+ if (!node->choose_next_subplan(node))
+ {
+ node->as_syncdone = true;
+ if (bms_is_empty(node->as_pending_async))
+ {
+ Assert(bms_is_empty(node->as_needrequest));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+ }
+
+ /* Else loop back and try to get a tuple from the new subplan */
+ }
+}
+
/* ----------------------------------------------------------------
* ExecEndAppend
*
@@ -324,10 +540,18 @@ ExecReScanAppend(AppendState *node)
bms_overlap(node->ps.chgParam,
node->as_prune_state->execparamids))
{
- bms_free(node->as_valid_subplans);
- node->as_valid_subplans = NULL;
+ bms_free(node->as_valid_syncsubplans);
+ node->as_valid_syncsubplans = NULL;
}
+ /* Reset async state. */
+ for (i = 0; i < node->as_nasyncplans; ++i)
+ ExecShutdownNode(node->appendplans[i]);
+
+ node->as_nasyncresult = 0;
+ node->as_needrequest = bms_add_range(NULL, 0, node->as_nasyncplans - 1);
+ node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
for (i = 0; i < node->as_nplans; i++)
{
PlanState *subnode = node->appendplans[i];
@@ -348,7 +572,7 @@ ExecReScanAppend(AppendState *node)
}
/* Let choose_next_subplan_* function handle setting the first subplan */
- node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
}
/* ----------------------------------------------------------------
@@ -436,7 +660,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
static bool
choose_next_subplan_locally(AppendState *node)
{
- int whichplan = node->as_whichplan;
+ int whichplan = node->as_whichsyncplan;
int nextplan;
/* We should never be called when there are no subplans */
@@ -451,10 +675,18 @@ choose_next_subplan_locally(AppendState *node)
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
- if (node->as_valid_subplans == NULL)
- node->as_valid_subplans =
+ /* Shouldn't have an active async node */
+ Assert(bms_is_empty(node->as_needrequest));
+
+ if (node->as_valid_syncsubplans == NULL)
+ node->as_valid_syncsubplans =
ExecFindMatchingSubPlans(node->as_prune_state);
+ /* Exclude async plans */
+ if (node->as_nasyncplans > 0)
+ bms_del_range(node->as_valid_syncsubplans,
+ 0, node->as_nasyncplans - 1);
+
whichplan = -1;
}
@@ -462,14 +694,14 @@ choose_next_subplan_locally(AppendState *node)
Assert(whichplan >= -1 && whichplan <= node->as_nplans);
if (ScanDirectionIsForward(node->ps.state->es_direction))
- nextplan = bms_next_member(node->as_valid_subplans, whichplan);
+ nextplan = bms_next_member(node->as_valid_syncsubplans, whichplan);
else
- nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
+ nextplan = bms_prev_member(node->as_valid_syncsubplans, whichplan);
if (nextplan < 0)
return false;
- node->as_whichplan = nextplan;
+ node->as_whichsyncplan = nextplan;
return true;
}
@@ -490,29 +722,29 @@ choose_next_subplan_for_leader(AppendState *node)
/* Backward scan is not supported by parallel-aware plans */
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
- /* We should never be called when there are no subplans */
- Assert(node->as_nplans > 0);
+ /* We should never be called when there are no sync subplans */
+ Assert(node->as_nplans > node->as_nasyncplans);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
{
/* Mark just-completed subplan as finished. */
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
}
else
{
/* Start with last subplan. */
- node->as_whichplan = node->as_nplans - 1;
+ node->as_whichsyncplan = node->as_nplans - 1;
/*
* If we've yet to determine the valid subplans then do so now. If
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
- if (node->as_valid_subplans == NULL)
+ if (node->as_valid_syncsubplans == NULL)
{
- node->as_valid_subplans =
+ node->as_valid_syncsubplans =
ExecFindMatchingSubPlans(node->as_prune_state);
/*
@@ -524,26 +756,26 @@ choose_next_subplan_for_leader(AppendState *node)
}
/* Loop until we find a subplan to execute. */
- while (pstate->pa_finished[node->as_whichplan])
+ while (pstate->pa_finished[node->as_whichsyncplan])
{
- if (node->as_whichplan == 0)
+ if (node->as_whichsyncplan == 0)
{
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
- node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
LWLockRelease(&pstate->pa_lock);
return false;
}
/*
- * We needn't pay attention to as_valid_subplans here as all invalid
+ * We needn't pay attention to as_valid_syncsubplans here as all invalid
* plans have been marked as finished.
*/
- node->as_whichplan--;
+ node->as_whichsyncplan--;
}
/* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < node->as_first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < node->as_first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
LWLockRelease(&pstate->pa_lock);
@@ -571,23 +803,23 @@ choose_next_subplan_for_worker(AppendState *node)
/* Backward scan is not supported by parallel-aware plans */
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
- /* We should never be called when there are no subplans */
- Assert(node->as_nplans > 0);
+ /* We should never be called when there are no sync subplans */
+ Assert(node->as_nplans > node->as_nasyncplans);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
/* Mark just-completed subplan as finished. */
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
/*
* If we've yet to determine the valid subplans then do so now. If
* run-time pruning is disabled then the valid subplans will always be set
* to all subplans.
*/
- else if (node->as_valid_subplans == NULL)
+ else if (node->as_valid_syncsubplans == NULL)
{
- node->as_valid_subplans =
+ node->as_valid_syncsubplans =
ExecFindMatchingSubPlans(node->as_prune_state);
mark_invalid_subplans_as_finished(node);
}
@@ -600,30 +832,30 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* Save the plan from which we are starting the search. */
- node->as_whichplan = pstate->pa_next_plan;
+ node->as_whichsyncplan = pstate->pa_next_plan;
/* Loop until we find a valid subplan to execute. */
while (pstate->pa_finished[pstate->pa_next_plan])
{
int nextplan;
- nextplan = bms_next_member(node->as_valid_subplans,
+ nextplan = bms_next_member(node->as_valid_syncsubplans,
pstate->pa_next_plan);
if (nextplan >= 0)
{
/* Advance to the next valid plan. */
pstate->pa_next_plan = nextplan;
}
- else if (node->as_whichplan > node->as_first_partial_plan)
+ else if (node->as_whichsyncplan > node->as_first_partial_plan)
{
/*
* Try looping back to the first valid partial plan, if there is
* one. If there isn't, arrange to bail out below.
*/
- nextplan = bms_next_member(node->as_valid_subplans,
+ nextplan = bms_next_member(node->as_valid_syncsubplans,
node->as_first_partial_plan - 1);
pstate->pa_next_plan =
- nextplan < 0 ? node->as_whichplan : nextplan;
+ nextplan < 0 ? node->as_whichsyncplan : nextplan;
}
else
{
@@ -631,10 +863,10 @@ choose_next_subplan_for_worker(AppendState *node)
* At last plan, and either there are no partial plans or we've
* tried them all. Arrange to bail out.
*/
- pstate->pa_next_plan = node->as_whichplan;
+ pstate->pa_next_plan = node->as_whichsyncplan;
}
- if (pstate->pa_next_plan == node->as_whichplan)
+ if (pstate->pa_next_plan == node->as_whichsyncplan)
{
/* We've tried everything! */
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -644,8 +876,8 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* Pick the plan we found, and advance pa_next_plan one more time. */
- node->as_whichplan = pstate->pa_next_plan;
- pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
+ node->as_whichsyncplan = pstate->pa_next_plan;
+ pstate->pa_next_plan = bms_next_member(node->as_valid_syncsubplans,
pstate->pa_next_plan);
/*
@@ -654,7 +886,7 @@ choose_next_subplan_for_worker(AppendState *node)
*/
if (pstate->pa_next_plan < 0)
{
- int nextplan = bms_next_member(node->as_valid_subplans,
+ int nextplan = bms_next_member(node->as_valid_syncsubplans,
node->as_first_partial_plan - 1);
if (nextplan >= 0)
@@ -671,8 +903,8 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < node->as_first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < node->as_first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
LWLockRelease(&pstate->pa_lock);
@@ -699,13 +931,13 @@ mark_invalid_subplans_as_finished(AppendState *node)
Assert(node->as_prune_state);
/* Nothing to do if all plans are valid */
- if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
+ if (bms_num_members(node->as_valid_syncsubplans) == node->as_nplans)
return;
/* Mark all non-valid plans as finished */
for (i = 0; i < node->as_nplans; i++)
{
- if (!bms_is_member(i, node->as_valid_subplans))
+ if (!bms_is_member(i, node->as_valid_syncsubplans))
node->as_pstate->pa_finished[i] = true;
}
}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 513471ab9b..3bf4aaa63d 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -141,6 +141,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+ scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+ if ((eflags & EXEC_FLAG_ASYNC) != 0)
+ scanstate->fs_async = true;
/*
* Miscellaneous initialization
@@ -384,3 +388,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+ void *caller_data, bool reinit)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+ caller_data, reinit);
+}
diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c
index 2719ea45a3..05b625783b 100644
--- a/src/backend/nodes/bitmapset.c
+++ b/src/backend/nodes/bitmapset.c
@@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper)
return a;
}
+/*
+ * bms_del_range
+ * Delete members in the range of 'lower' to 'upper' from the set.
+ *
+ * Note this could also be done by calling bms_del_member in a loop, however,
+ * using this function will be faster when the range is large as we work at
+ * the bitmapword level rather than at bit level.
+ */
+Bitmapset *
+bms_del_range(Bitmapset *a, int lower, int upper)
+{
+ int lwordnum,
+ lbitnum,
+ uwordnum,
+ ushiftbits,
+ wordnum;
+
+ if (lower < 0 || upper < 0)
+ elog(ERROR, "negative bitmapset member not allowed");
+ if (lower > upper)
+ elog(ERROR, "lower range must not be above upper range");
+ uwordnum = WORDNUM(upper);
+
+ if (a == NULL)
+ {
+ a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1));
+ a->nwords = uwordnum + 1;
+ }
+
+ /* ensure we have enough words to store the upper bit */
+ else if (uwordnum >= a->nwords)
+ {
+ int oldnwords = a->nwords;
+ int i;
+
+ a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1));
+ a->nwords = uwordnum + 1;
+ /* zero out the enlarged portion */
+ for (i = oldnwords; i < a->nwords; i++)
+ a->words[i] = 0;
+ }
+
+ wordnum = lwordnum = WORDNUM(lower);
+
+ lbitnum = BITNUM(lower);
+ ushiftbits = BITNUM(upper) + 1;
+
+ /*
+ * Special case when lwordnum is the same as uwordnum we must perform the
+ * upper and lower masking on the word.
+ */
+ if (lwordnum == uwordnum)
+ {
+ a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1)
+ | (~(bitmapword) 0) << ushiftbits);
+ }
+ else
+ {
+ /* turn off lbitnum and all bits left of it */
+ a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1);
+
+ /* turn off all bits for any intermediate words */
+ while (wordnum < uwordnum)
+ a->words[wordnum++] = (bitmapword) 0;
+
+ /* turn off upper's bit and all bits right of it. */
+ a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits;
+ }
+
+ return a;
+}
+
/*
* bms_int_members - like bms_intersect, but left input is recycled
*/
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 0409a40b82..4eff3712b7 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -121,6 +121,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
COPY_SCALAR_FIELD(plan_width);
COPY_SCALAR_FIELD(parallel_aware);
COPY_SCALAR_FIELD(parallel_safe);
+ COPY_SCALAR_FIELD(async_capable);
COPY_SCALAR_FIELD(plan_node_id);
COPY_NODE_FIELD(targetlist);
COPY_NODE_FIELD(qual);
@@ -246,6 +247,8 @@ _copyAppend(const Append *from)
COPY_NODE_FIELD(appendplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);
+ COPY_SCALAR_FIELD(nasyncplans);
+ COPY_SCALAR_FIELD(referent);
return newnode;
}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f0386480ab..2b1b0e9141 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -334,6 +334,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
WRITE_INT_FIELD(plan_width);
WRITE_BOOL_FIELD(parallel_aware);
WRITE_BOOL_FIELD(parallel_safe);
+ WRITE_BOOL_FIELD(async_capable);
WRITE_INT_FIELD(plan_node_id);
WRITE_NODE_FIELD(targetlist);
WRITE_NODE_FIELD(qual);
@@ -436,6 +437,8 @@ _outAppend(StringInfo str, const Append *node)
WRITE_NODE_FIELD(appendplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
+ WRITE_INT_FIELD(nasyncplans);
+ WRITE_INT_FIELD(referent);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 42050ab719..63af7c02d8 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1572,6 +1572,7 @@ ReadCommonPlan(Plan *local_node)
READ_INT_FIELD(plan_width);
READ_BOOL_FIELD(parallel_aware);
READ_BOOL_FIELD(parallel_safe);
+ READ_BOOL_FIELD(async_capable);
READ_INT_FIELD(plan_node_id);
READ_NODE_FIELD(targetlist);
READ_NODE_FIELD(qual);
@@ -1672,6 +1673,8 @@ _readAppend(void)
READ_NODE_FIELD(appendplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);
+ READ_INT_FIELD(nasyncplans);
+ READ_INT_FIELD(referent);
READ_DONE();
}
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index b399592ff8..17e9a7a897 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3973,6 +3973,30 @@ generate_partitionwise_join_paths(PlannerInfo *root, RelOptInfo *rel)
list_free(live_children);
}
+/*
+ * is_projection_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ default:
+ break;
+ }
+ return false;
+}
+
/*****************************************************************************
* DEBUG SUPPORT
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index cd3716d494..143e00b13e 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -2048,22 +2048,59 @@ cost_append(AppendPath *apath)
if (pathkeys == NIL)
{
- Path *subpath = (Path *) linitial(apath->subpaths);
-
- /*
- * For an unordered, non-parallel-aware Append we take the startup
- * cost as the startup cost of the first subpath.
- */
- apath->path.startup_cost = subpath->startup_cost;
+ Cost first_nonasync_startup_cost = -1.0;
+ Cost async_min_startup_cost = -1;
+ Cost async_max_cost = 0.0;
/* Compute rows and costs as sums of subplan rows and costs. */
foreach(l, apath->subpaths)
{
Path *subpath = (Path *) lfirst(l);
+ /*
+ * For an unordered, non-parallel-aware Append we take the
+ * startup cost as the startup cost of the first
+ * nonasync-capable subpath or the minimum startup cost of
+ * async-capable subpaths.
+ */
+ if (!is_async_capable_path(subpath))
+ {
+ if (first_nonasync_startup_cost < 0.0)
+ first_nonasync_startup_cost = subpath->startup_cost;
+
+ apath->path.total_cost += subpath->total_cost;
+ }
+ else
+ {
+ if (async_min_startup_cost < 0.0 ||
+ async_min_startup_cost > subpath->startup_cost)
+ async_min_startup_cost = subpath->startup_cost;
+
+ /*
+ * It's not obvious how to determine the total cost of
+ * async subnodes. Although it is not always true, we
+ * assume it is the maximum cost among all async subnodes.
+ */
+ if (async_max_cost < subpath->total_cost)
+ async_max_cost = subpath->total_cost;
+ }
+
apath->path.rows += subpath->rows;
- apath->path.total_cost += subpath->total_cost;
}
+
+ /*
+ * If there's an sync subnodes, the startup cost is the startup
+ * cost of the first sync subnode. Otherwise it's the minimal
+ * startup cost of async subnodes.
+ */
+ if (first_nonasync_startup_cost >= 0.0)
+ apath->path.startup_cost = first_nonasync_startup_cost;
+ else
+ apath->path.startup_cost = async_min_startup_cost;
+
+ /* Use async maximum cost if it exceeds the sync total cost */
+ if (async_max_cost > apath->path.total_cost)
+ apath->path.total_cost = async_max_cost;
}
else
{
@@ -2084,6 +2121,8 @@ cost_append(AppendPath *apath)
* This case is also different from the above in that we have to
* account for possibly injecting sorts into subpaths that aren't
* natively ordered.
+ *
+ * Note: An ordered append won't be run asynchronously.
*/
foreach(l, apath->subpaths)
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 3d7a4e373f..3ae46ed6f1 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1082,6 +1082,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
bool tlist_was_changed = false;
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
+ List *asyncplans = NIL;
+ List *syncplans = NIL;
+ List *asyncpaths = NIL;
+ List *syncpaths = NIL;
+ List *newsubpaths = NIL;
ListCell *subpaths;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
@@ -1090,6 +1095,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
+ int nasyncplans = 0;
+ bool first = true;
+ bool referent_is_sync = true;
/*
* The subpaths list could be empty, if every child was proven empty by
@@ -1219,9 +1227,40 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
}
- subplans = lappend(subplans, subplan);
+ /*
+ * Classify as async-capable or not. If we have decided to run the
+ * children in parallel, we cannot any one of them run asynchronously.
+ * Planner thinks that all subnodes are executed in order if this
+ * append is orderd. No subpaths cannot be run asynchronously in that
+ * case.
+ */
+ if (pathkeys == NIL &&
+ !best_path->path.parallel_safe && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ asyncplans = lappend(asyncplans, subplan);
+ asyncpaths = lappend(asyncpaths, subpath);
+ ++nasyncplans;
+ if (first)
+ referent_is_sync = false;
+ }
+ else
+ {
+ syncplans = lappend(syncplans, subplan);
+ syncpaths = lappend(syncpaths, subpath);
+ }
+
+ first = false;
}
+ /*
+ * subplan contains asyncplans in the first half, if any, and sync plans in
+ * another half, if any. We need that the same for subpaths to make
+ * partition pruning information in sync with subplans.
+ */
+ subplans = list_concat(asyncplans, syncplans);
+ newsubpaths = list_concat(asyncpaths, syncpaths);
+
/*
* If any quals exist, they may be useful to perform further partition
* pruning during execution. Gather information needed by the executor to
@@ -1249,7 +1288,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
if (prunequal != NIL)
partpruneinfo =
make_partition_pruneinfo(root, rel,
- best_path->subpaths,
+ newsubpaths,
best_path->partitioned_rels,
prunequal);
}
@@ -1257,6 +1296,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
plan->appendplans = subplans;
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;
+ plan->nasyncplans = nasyncplans;
+ plan->referent = referent_is_sync ? nasyncplans : 0;
copy_generic_path_info(&plan->plan, (Path *) best_path);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 30020f8cda..faed3f2442 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3878,6 +3878,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_XACT_GROUP_UPDATE:
event_name = "XactGroupUpdate";
break;
+ case WAIT_EVENT_ASYNC_WAIT:
+ event_name = "AsyncExecWait";
+ break;
/* no default case, so that compiler will warn */
}
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 62023c20b2..07aeb43a7f 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4574,10 +4574,14 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan)
* tlists according to one of the children, and the first one is the most
* natural choice. Likewise special-case ModifyTable to pretend that the
* first child plan is the OUTER referent; this is to support RETURNING
- * lists containing references to non-target relations.
+ * lists containing references to non-target relations. For Append, use the
+ * explicitly specified referent.
*/
if (IsA(plan, Append))
- dpns->outer_plan = linitial(((Append *) plan)->appendplans);
+ {
+ Append *app = (Append *) plan;
+ dpns->outer_plan = list_nth(app->appendplans, app->referent);
+ }
else if (IsA(plan, MergeAppend))
dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans);
else if (IsA(plan, ModifyTable))
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 237ca9fa30..27742a1641 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -1416,7 +1416,7 @@ void
ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
{
/*
- * XXXX: There's no property to show as an identier of a wait event set,
+ * XXXX: There's no property to show as an identifier of a wait event set,
* use its pointer instead.
*/
if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
@@ -1431,7 +1431,7 @@ static void
PrintWESLeakWarning(WaitEventSet *events)
{
/*
- * XXXX: There's no property to show as an identier of a wait event set,
+ * XXXX: There's no property to show as an identifier of a wait event set,
* use its pointer instead.
*/
elog(WARNING, "wait event set leak: %p still referenced",
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..3b6bf4a516
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,22 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ * Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+ void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+ long timeout);
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 415e117407..9cf2c1f676 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -59,6 +59,7 @@
#define EXEC_FLAG_MARK 0x0008 /* need mark/restore */
#define EXEC_FLAG_SKIP_TRIGGERS 0x0010 /* skip AfterTrigger calls */
#define EXEC_FLAG_WITH_NO_DATA 0x0020 /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC 0x0040 /* request async execution */
/* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 326d713ebf..71a233b41f 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data, bool reinit);
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..853ba2b5ad 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data,
+ bool reinit);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -190,6 +195,7 @@ typedef struct FdwRoutine
GetForeignPlan_function GetForeignPlan;
BeginForeignScan_function BeginForeignScan;
IterateForeignScan_function IterateForeignScan;
+ IterateForeignScan_function IterateForeignScanAsync;
ReScanForeignScan_function ReScanForeignScan;
EndForeignScan_function EndForeignScan;
@@ -242,6 +248,11 @@ typedef struct FdwRoutine
InitializeDSMForeignScan_function InitializeDSMForeignScan;
ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
ShutdownForeignScan_function ShutdownForeignScan;
/* Support functions for path reparameterization. */
diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h
index d113c271ee..177e6218cb 100644
--- a/src/include/nodes/bitmapset.h
+++ b/src/include/nodes/bitmapset.h
@@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b);
extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper);
extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b);
extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b);
+extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper);
extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b);
/* support for iterating through the integer elements of a set: */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ef448d67c7..dce7fb0e07 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -925,6 +925,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
* abstract superclass for all PlanState-type nodes.
* ----------------
*/
+typedef enum AsyncState
+{
+ AS_AVAILABLE,
+ AS_WAITING
+} AsyncState;
+
typedef struct PlanState
{
NodeTag type;
@@ -1013,6 +1019,11 @@ typedef struct PlanState
bool outeropsset;
bool inneropsset;
bool resultopsset;
+
+ /* Async subnode execution stuff */
+ AsyncState asyncstate;
+
+ int32 padding; /* to keep alignment of derived types */
} PlanState;
/* ----------------
@@ -1208,14 +1219,21 @@ struct AppendState
PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
- int as_whichplan;
+ int as_whichsyncplan; /* which sync plan is being executed */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
+ int as_nasyncplans; /* # of async-capable children */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
struct PartitionPruneState *as_prune_state;
- Bitmapset *as_valid_subplans;
+ Bitmapset *as_valid_syncsubplans;
bool (*choose_next_subplan) (AppendState *);
+ bool as_syncdone; /* all synchronous plans done? */
+ Bitmapset *as_needrequest; /* async plans needing a new request */
+ Bitmapset *as_pending_async; /* pending async plans */
+ TupleTableSlot **as_asyncresult; /* results of each async plan */
+ int as_nasyncresult; /* # of valid entries in as_asyncresult */
+ bool as_exec_prune; /* runtime pruning needed for async exec? */
};
/* ----------------
@@ -1783,6 +1801,7 @@ typedef struct ForeignScanState
Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
+ bool fs_async;
void *fdw_state; /* foreign-data wrapper can keep state here */
} ForeignScanState;
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 83e01074ed..abad89b327 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -135,6 +135,11 @@ typedef struct Plan
bool parallel_aware; /* engage parallel-aware logic? */
bool parallel_safe; /* OK to use as part of parallel plan? */
+ /*
+ * information needed for asynchronous execution
+ */
+ bool async_capable; /* engage asynchronous execution logic? */
+
/*
* Common structural data for all Plan types.
*/
@@ -262,6 +267,10 @@ typedef struct Append
/* Info for run-time subplan pruning; NULL if we're not doing that */
struct PartitionPruneInfo *part_prune_info;
+
+ /* Async child node execution stuff */
+ int nasyncplans; /* # async subplans, always at start of list */
+ int referent; /* index of inheritance tree referent */
} Append;
/* ----------------
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 10b6e81079..53876b2d8b 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -241,4 +241,6 @@ extern PathKey *make_canonical_pathkey(PlannerInfo *root,
extern void add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
List *live_childrels);
+extern bool is_async_capable_path(Path *path);
+
#endif /* PATHS_H */
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac46b4..d673f9da6b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -887,7 +887,8 @@ typedef enum
WAIT_EVENT_REPLICATION_SLOT_DROP,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP,
- WAIT_EVENT_XACT_GROUP_UPDATE
+ WAIT_EVENT_XACT_GROUP_UPDATE,
+ WAIT_EVENT_ASYNC_WAIT
} WaitEventIPC;
/* ----------
--
2.18.4
From 4df7f9b34ad8d9fd9b415459e2673ebe27f72343 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH v7 3/3] async postgres_fdw
---
contrib/postgres_fdw/connection.c | 28 +
.../postgres_fdw/expected/postgres_fdw.out | 272 ++++----
contrib/postgres_fdw/postgres_fdw.c | 601 +++++++++++++++---
contrib/postgres_fdw/postgres_fdw.h | 2 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 20 +-
5 files changed, 710 insertions(+), 213 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 08daf26fdf..be5948f613 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -59,6 +59,7 @@ typedef struct ConnCacheEntry
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ void *storage; /* connection specific storage */
} ConnCacheEntry;
/*
@@ -203,6 +204,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->conn, server->servername, user->umid, user->userid);
+ entry->storage = NULL;
}
/*
@@ -216,6 +218,32 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
return entry->conn;
}
+/*
+ * Returns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+ bool found;
+ ConnCacheEntry *entry;
+ ConnCacheKey key;
+
+ /* Find storage using the same key with GetConnection */
+ key = user->umid;
+ entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+ Assert(found);
+
+ /* Create one if not yet. */
+ if (entry->storage == NULL)
+ {
+ entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+ memset(entry->storage, 0, initsize);
+ }
+
+ return entry->storage;
+}
+
/*
* Connect to remote server using specified server and user mapping properties.
*/
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 10e23d02ed..0634ab9f6a 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6986,7 +6986,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+-------
a | aaa
@@ -7014,7 +7014,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7042,7 +7042,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7070,7 +7070,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | newtoo
@@ -7140,35 +7140,41 @@ insert into bar2 values(3,33,33);
insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo foo_1
- Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+ -> Seq Scan on public.foo foo_1
+ Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
+(29 rows)
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
f1 | f2
----+----
1 | 11
@@ -7178,35 +7184,41 @@ select * from bar where f1 in (select f1 from foo) for update;
(4 rows)
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo foo_1
- Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+ -> Seq Scan on public.foo foo_1
+ Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
+(29 rows)
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
f1 | f2
----+----
1 | 11
@@ -7236,11 +7248,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo foo_1
- Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+ -> Seq Scan on public.foo foo_1
+ Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-> Hash Join
Output: bar_1.f1, (bar_1.f2 + 100), bar_1.f3, bar_1.ctid, foo.ctid, foo.*, foo.tableoid
Inner Unique: true
@@ -7254,12 +7267,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo foo_1
- Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(39 rows)
+ -> Seq Scan on public.foo foo_1
+ Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
+(41 rows)
update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
select tableoid::regclass, * from bar order by 1,2;
@@ -7289,16 +7303,17 @@ where bar.f1 = ss.f1;
Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
Hash Cond: (foo.f1 = bar.f1)
-> Append
+ Async subplans: 2
+ -> Async Foreign Scan on public.foo2 foo_1
+ Output: ROW(foo_1.f1), foo_1.f1
+ Remote SQL: SELECT f1 FROM public.loct1
+ -> Async Foreign Scan on public.foo2 foo_3
+ Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
+ Remote SQL: SELECT f1 FROM public.loct1
-> Seq Scan on public.foo
Output: ROW(foo.f1), foo.f1
- -> Foreign Scan on public.foo2 foo_1
- Output: ROW(foo_1.f1), foo_1.f1
- Remote SQL: SELECT f1 FROM public.loct1
-> Seq Scan on public.foo foo_2
Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
- -> Foreign Scan on public.foo2 foo_3
- Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
- Remote SQL: SELECT f1 FROM public.loct1
-> Hash
Output: bar.f1, bar.f2, bar.ctid
-> Seq Scan on public.bar
@@ -7316,17 +7331,18 @@ where bar.f1 = ss.f1;
Output: (ROW(foo.f1)), foo.f1
Sort Key: foo.f1
-> Append
+ Async subplans: 2
+ -> Async Foreign Scan on public.foo2 foo_1
+ Output: ROW(foo_1.f1), foo_1.f1
+ Remote SQL: SELECT f1 FROM public.loct1
+ -> Async Foreign Scan on public.foo2 foo_3
+ Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
+ Remote SQL: SELECT f1 FROM public.loct1
-> Seq Scan on public.foo
Output: ROW(foo.f1), foo.f1
- -> Foreign Scan on public.foo2 foo_1
- Output: ROW(foo_1.f1), foo_1.f1
- Remote SQL: SELECT f1 FROM public.loct1
-> Seq Scan on public.foo foo_2
Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
- -> Foreign Scan on public.foo2 foo_3
- Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
- Remote SQL: SELECT f1 FROM public.loct1
-(45 rows)
+(47 rows)
update bar set f2 = f2 + 100
from
@@ -7476,27 +7492,33 @@ delete from foo where f1 < 5 returning *;
(5 rows)
explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
- QUERY PLAN
-------------------------------------------------------------------------------
- Update on public.bar
- Output: bar.f1, bar.f2
- Update on public.bar
- Foreign Update on public.bar2 bar_1
- -> Seq Scan on public.bar
- Output: bar.f1, (bar.f2 + 100), bar.ctid
- -> Foreign Update on public.bar2 bar_1
- Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+ QUERY PLAN
+--------------------------------------------------------------------------------------
+ Sort
+ Output: u.f1, u.f2
+ Sort Key: u.f1
+ CTE u
+ -> Update on public.bar
+ Output: bar.f1, bar.f2
+ Update on public.bar
+ Foreign Update on public.bar2 bar_1
+ -> Seq Scan on public.bar
+ Output: bar.f1, (bar.f2 + 100), bar.ctid
+ -> Foreign Update on public.bar2 bar_1
+ Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+ -> CTE Scan on u
+ Output: u.f1, u.f2
+(14 rows)
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
f1 | f2
----+-----
1 | 311
2 | 322
- 6 | 266
3 | 333
4 | 344
+ 6 | 266
7 | 277
(6 rows)
@@ -8571,11 +8593,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
Sort
Sort Key: t1.a, t3.c
-> Append
- -> Foreign Scan
+ Async subplans: 2
+ -> Async Foreign Scan
Relations: ((ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)) INNER JOIN (ftprt1_p1 t3_1)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: ((ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)) INNER JOIN (ftprt1_p2 t3_2)
-(7 rows)
+(8 rows)
SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE
t1.a% 25 =0 ORDER BY 1,2,3;
a | b | c
@@ -8610,20 +8633,22 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
-- with whole-row reference; partitionwise join does not apply
EXPLAIN (COSTS OFF)
SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2
t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
- QUERY PLAN
---------------------------------------------------------
+ QUERY PLAN
+--------------------------------------------------------------
Sort
Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
-> Hash Full Join
Hash Cond: (t1.a = t2.b)
-> Append
- -> Foreign Scan on ftprt1_p1 t1_1
- -> Foreign Scan on ftprt1_p2 t1_2
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt1_p1 t1_1
+ -> Async Foreign Scan on ftprt1_p2 t1_2
-> Hash
-> Append
- -> Foreign Scan on ftprt2_p1 t2_1
- -> Foreign Scan on ftprt2_p2 t2_2
-(11 rows)
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt2_p1 t2_1
+ -> Async Foreign Scan on ftprt2_p2 t2_2
+(13 rows)
SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2
t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
wr | wr
@@ -8652,11 +8677,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
Sort
Sort Key: t1.a, t1.b
-> Append
- -> Foreign Scan
+ Async subplans: 2
+ -> Async Foreign Scan
Relations: (ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: (ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)
-(7 rows)
+(8 rows)
SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE
t1.a%25= 0 ORDER BY 1,2;
a | b
@@ -8709,21 +8735,23 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
-- test FOR UPDATE; partitionwise join does not apply
EXPLAIN (COSTS OFF)
SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF
t1;
- QUERY PLAN
---------------------------------------------------------------
+ QUERY PLAN
+--------------------------------------------------------------------
LockRows
-> Sort
Sort Key: t1.a
-> Hash Join
Hash Cond: (t2.b = t1.a)
-> Append
- -> Foreign Scan on ftprt2_p1 t2_1
- -> Foreign Scan on ftprt2_p2 t2_2
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt2_p1 t2_1
+ -> Async Foreign Scan on ftprt2_p2 t2_2
-> Hash
-> Append
- -> Foreign Scan on ftprt1_p1 t1_1
- -> Foreign Scan on ftprt1_p2 t1_2
-(12 rows)
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt1_p1 t1_1
+ -> Async Foreign Scan on ftprt1_p2 t1_2
+(14 rows)
SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF
t1;
a | b
@@ -8758,18 +8786,19 @@ ANALYZE fpagg_tab_p3;
SET enable_partitionwise_aggregate TO false;
EXPLAIN (COSTS OFF)
SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
- QUERY PLAN
------------------------------------------------------------
+ QUERY PLAN
+-----------------------------------------------------------------
Sort
Sort Key: pagg_tab.a
-> HashAggregate
Group Key: pagg_tab.a
Filter: (avg(pagg_tab.b) < '22'::numeric)
-> Append
- -> Foreign Scan on fpagg_tab_p1 pagg_tab_1
- -> Foreign Scan on fpagg_tab_p2 pagg_tab_2
- -> Foreign Scan on fpagg_tab_p3 pagg_tab_3
-(9 rows)
+ Async subplans: 3
+ -> Async Foreign Scan on fpagg_tab_p1 pagg_tab_1
+ -> Async Foreign Scan on fpagg_tab_p2 pagg_tab_2
+ -> Async Foreign Scan on fpagg_tab_p3 pagg_tab_3
+(10 rows)
-- Plan with partitionwise aggregates is enabled
SET enable_partitionwise_aggregate TO true;
@@ -8780,13 +8809,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
Sort
Sort Key: pagg_tab.a
-> Append
- -> Foreign Scan
+ Async subplans: 3
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p1 pagg_tab)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2)
-(9 rows)
+(10 rows)
SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
a | sum | min | count
@@ -8808,29 +8838,22 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
Sort
Output: t1.a, (count(((t1.*)::pagg_tab)))
Sort Key: t1.a
- -> Append
- -> HashAggregate
- Output: t1.a, count(((t1.*)::pagg_tab))
- Group Key: t1.a
- Filter: (avg(t1.b) < '22'::numeric)
- -> Foreign Scan on public.fpagg_tab_p1 t1
- Output: t1.a, t1.*, t1.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p1
- -> HashAggregate
- Output: t1_1.a, count(((t1_1.*)::pagg_tab))
- Group Key: t1_1.a
- Filter: (avg(t1_1.b) < '22'::numeric)
- -> Foreign Scan on public.fpagg_tab_p2 t1_1
+ -> HashAggregate
+ Output: t1.a, count(((t1.*)::pagg_tab))
+ Group Key: t1.a
+ Filter: (avg(t1.b) < '22'::numeric)
+ -> Append
+ Async subplans: 3
+ -> Async Foreign Scan on public.fpagg_tab_p1 t1_1
Output: t1_1.a, t1_1.*, t1_1.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p2
- -> HashAggregate
- Output: t1_2.a, count(((t1_2.*)::pagg_tab))
- Group Key: t1_2.a
- Filter: (avg(t1_2.b) < '22'::numeric)
- -> Foreign Scan on public.fpagg_tab_p3 t1_2
+ Remote SQL: SELECT a, b, c FROM public.pagg_tab_p1
+ -> Async Foreign Scan on public.fpagg_tab_p2 t1_2
Output: t1_2.a, t1_2.*, t1_2.b
+ Remote SQL: SELECT a, b, c FROM public.pagg_tab_p2
+ -> Async Foreign Scan on public.fpagg_tab_p3 t1_3
+ Output: t1_3.a, t1_3.*, t1_3.b
Remote SQL: SELECT a, b, c FROM public.pagg_tab_p3
-(25 rows)
+(18 rows)
SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
a | count
@@ -8850,20 +8873,15 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700
-----------------------------------------------------------------
Sort
Sort Key: pagg_tab.b
- -> Finalize HashAggregate
+ -> HashAggregate
Group Key: pagg_tab.b
Filter: (sum(pagg_tab.a) < 700)
-> Append
- -> Partial HashAggregate
- Group Key: pagg_tab.b
- -> Foreign Scan on fpagg_tab_p1 pagg_tab
- -> Partial HashAggregate
- Group Key: pagg_tab_1.b
- -> Foreign Scan on fpagg_tab_p2 pagg_tab_1
- -> Partial HashAggregate
- Group Key: pagg_tab_2.b
- -> Foreign Scan on fpagg_tab_p3 pagg_tab_2
-(15 rows)
+ Async subplans: 3
+ -> Async Foreign Scan on fpagg_tab_p1 pagg_tab_1
+ -> Async Foreign Scan on fpagg_tab_p2 pagg_tab_2
+ -> Async Foreign Scan on fpagg_tab_p3 pagg_tab_3
+(10 rows)
-- ===================================================================
-- access rights and superuser
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index a31abce7c9..14824368cc 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,8 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -35,6 +37,7 @@
#include "optimizer/restrictinfo.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
+#include "pgstat.h"
#include "postgres_fdw.h"
#include "utils/builtins.h"
#include "utils/float.h"
@@ -56,6 +59,9 @@ PG_MODULE_MAGIC;
/* If no remote estimates, assume a sort costs 20% extra */
#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
+/* Retrieve PgFdwScanState struct from ForeignScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
/*
* Indexes of FDW-private information stored in fdw_private lists.
*
@@ -122,11 +128,29 @@ enum FdwDirectModifyPrivateIndex
FdwDirectModifyPrivateSetProcessed
};
+/*
+ * Connection common state - shared among all PgFdwState instances using the
+ * same connection.
+ */
+typedef struct PgFdwConnCommonState
+{
+ ForeignScanState *leader; /* leader node of this connection */
+ bool busy; /* true if this connection is busy */
+} PgFdwConnCommonState;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+ PGconn *conn; /* connection for the scan */
+ PgFdwConnCommonState *commonstate; /* connection common state */
+} PgFdwState;
+
/*
* Execution state of a foreign scan using postgres_fdw.
*/
typedef struct PgFdwScanState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table. NULL
* for a foreign join scan. */
TupleDesc tupdesc; /* tuple descriptor of scan */
@@ -137,7 +161,6 @@ typedef struct PgFdwScanState
List *retrieved_attrs; /* list of retrieved attribute numbers */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
@@ -153,6 +176,12 @@ typedef struct PgFdwScanState
/* batch-level state, for optimizing rewinds and avoiding useless fetch */
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ bool async; /* true if run asynchronously */
+ bool queued; /* true if this node is in waiter queue */
+ ForeignScanState *waiter; /* Next node to run a query among nodes
+ * sharing the same connection */
+ ForeignScanState *last_waiter; /* last element in waiter queue.
+ * valid only on the leader node */
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
@@ -166,11 +195,11 @@ typedef struct PgFdwScanState
*/
typedef struct PgFdwModifyState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
@@ -197,6 +226,7 @@ typedef struct PgFdwModifyState
*/
typedef struct PgFdwDirectModifyState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
@@ -326,6 +356,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
static void postgresReScanForeignScan(ForeignScanState *node);
static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
static void postgresAddForeignUpdateTargets(Query *parsetree,
RangeTblEntry *target_rte,
Relation target_relation);
@@ -391,6 +422,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data, bool reinit);
/*
* Helper functions
@@ -419,7 +454,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static PgFdwModifyState *create_foreign_modify(EState *estate,
RangeTblEntry *rte,
@@ -522,6 +559,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->IterateForeignScan = postgresIterateForeignScan;
routine->ReScanForeignScan = postgresReScanForeignScan;
routine->EndForeignScan = postgresEndForeignScan;
+ routine->ShutdownForeignScan = postgresShutdownForeignScan;
/* Functions for updating foreign tables */
routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -558,6 +596,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for async execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
PG_RETURN_POINTER(routine);
}
@@ -1433,12 +1475,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->s.conn = GetConnection(user, false);
+ fsstate->s.commonstate = (PgFdwConnCommonState *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState));
+ fsstate->s.commonstate->leader = NULL;
+ fsstate->s.commonstate->busy = false;
+ fsstate->waiter = NULL;
+ fsstate->last_waiter = node;
/* Assign a unique ID for my cursor */
- fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+ fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
fsstate->cursor_exists = false;
+ /* Initialize async execution status */
+ fsstate->async = false;
+ fsstate->queued = false;
+
/* Get private info created by planner functions. */
fsstate->query = strVal(list_nth(fsplan->fdw_private,
FdwScanPrivateSelectSql));
@@ -1486,40 +1538,241 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_values);
}
+/*
+ * Async queue manipulation functions
+ */
+
+/*
+ * add_async_waiter:
+ *
+ * Enqueue node if it isn't in the queue. Immediately send request it if the
+ * underlying connection is not busy.
+ */
+static inline void
+add_async_waiter(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.commonstate->leader;
+
+ /*
+ * Do nothing if the node is already in the queue or already eof'ed.
+ * Note: leader node is not marked as queued.
+ */
+ if (leader == node || fsstate->queued || fsstate->eof_reached)
+ return;
+
+ if (leader == NULL)
+ {
+ /* no leader means not busy, send request immediately */
+ request_more_data(node);
+ }
+ else
+ {
+ /* the connection is busy, queue the node */
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+ PgFdwScanState *last_waiter_state
+ = GetPgFdwScanState(leader_state->last_waiter);
+
+ last_waiter_state->waiter = node;
+ leader_state->last_waiter = node;
+ fsstate->queued = true;
+ }
+}
+
+/*
+ * move_to_next_waiter:
+ *
+ * Make the first waiter be the next leader
+ * Returns the new leader or NULL if there's no waiter.
+ */
+static inline ForeignScanState *
+move_to_next_waiter(ForeignScanState *node)
+{
+ PgFdwScanState *leader_state = GetPgFdwScanState(node);
+ ForeignScanState *next_leader = leader_state->waiter;
+
+ Assert(leader_state->s.commonstate->leader = node);
+
+ if (next_leader)
+ {
+ /* the first waiter becomes the next leader */
+ PgFdwScanState *next_leader_state = GetPgFdwScanState(next_leader);
+ next_leader_state->last_waiter = leader_state->last_waiter;
+ next_leader_state->queued = false;
+ }
+
+ leader_state->waiter = NULL;
+ leader_state->s.commonstate->leader = next_leader;
+
+ return next_leader;
+}
+
+/*
+ * Remove the node from waiter queue.
+ *
+ * Remaining results are cleared if the node is a busy leader.
+ * This intended to be used during node shutdown.
+ */
+static inline void
+remove_async_node(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.commonstate->leader;
+ PgFdwScanState *leader_state;
+ ForeignScanState *prev;
+ PgFdwScanState *prev_state;
+ ForeignScanState *cur;
+
+ /* no need to remove me */
+ if (!leader || !fsstate->queued)
+ return;
+
+ leader_state = GetPgFdwScanState(leader);
+
+ if (leader == node)
+ {
+ if (leader_state->s.commonstate->busy)
+ {
+ /*
+ * this node is waiting for result, absorb the result first so
+ * that the following commands can be sent on the connection.
+ */
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+ PGconn *conn = leader_state->s.conn;
+
+ while(PQisBusy(conn))
+ PQclear(PQgetResult(conn));
+
+ leader_state->s.commonstate->busy = false;
+ }
+
+ move_to_next_waiter(node);
+
+ return;
+ }
+
+ /*
+ * Just remove the node from the queue
+ *
+ * Nodes don't have a link to the previous node but anyway this function is
+ * called on the shutdown path, so we don't bother seeking for faster way
+ * to do this.
+ */
+ prev = leader;
+ prev_state = leader_state;
+ cur = GetPgFdwScanState(prev)->waiter;
+ while (cur)
+ {
+ PgFdwScanState *curstate = GetPgFdwScanState(cur);
+
+ if (cur == node)
+ {
+ prev_state->waiter = curstate->waiter;
+
+ /* relink to the previous node if the last node was removed */
+ if (leader_state->last_waiter == cur)
+ leader_state->last_waiter = prev;
+
+ fsstate->queued = false;
+
+ return;
+ }
+ prev = cur;
+ prev_state = curstate;
+ cur = curstate->waiter;
+ }
+}
+
/*
* postgresIterateForeignScan
- * Retrieve next row from the result set, or clear tuple slot to indicate
- * EOF.
+ * Retrieve next row from the result set.
+ *
+ * For synchronous nodes, returns clear tuple slot means EOF.
+ *
+ * For asynchronous nodes, if clear tuple slot is returned, the caller
+ * needs to check async state to tell if all tuples received
+ * (AS_AVAILABLE) or waiting for the next data to come (AS_WAITING).
*/
static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
- /*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
- */
- if (!fsstate->cursor_exists)
- create_cursor(node);
-
- /*
- * Get some more tuples, if we've run out.
- */
+ if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached)
+ {
+ /* we've run out, get some more tuples */
+ if (!node->fs_async)
+ {
+ /*
+ * finish the running query before sending the next command for
+ * this node
+ */
+ if (!fsstate->s.commonstate->busy)
+ vacate_connection((PgFdwState *)fsstate, false);
+
+ request_more_data(node);
+
+ /* Fetch the result immediately. */
+ fetch_received_data(node);
+ }
+ else if (!fsstate->s.commonstate->busy)
+ {
+ /* If the connection is not busy, just send the request. */
+ request_more_data(node);
+ }
+ else
+ {
+ /* The connection is busy, queue the request */
+ bool available = true;
+ ForeignScanState *leader = fsstate->s.commonstate->leader;
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+
+ /* queue the requested node */
+ add_async_waiter(node);
+
+ /*
+ * The request for the next node cannot be sent before the leader
+ * responds. Finish the current leader if possible.
+ */
+ if (PQisBusy(leader_state->s.conn))
+ {
+ int rc = WaitLatchOrSocket(NULL,
+ WL_SOCKET_READABLE | WL_TIMEOUT |
+ WL_EXIT_ON_PM_DEATH,
+ PQsocket(leader_state->s.conn), 0,
+ WAIT_EVENT_ASYNC_WAIT);
+ if (!(rc & WL_SOCKET_READABLE))
+ available = false;
+ }
+
+ /* fetch the leader's data and enqueue it for the next request */
+ if (available)
+ {
+ fetch_received_data(leader);
+ add_async_waiter(leader);
+ }
+ }
+ }
+
if (fsstate->next_tuple >= fsstate->num_tuples)
{
- /* No point in another fetch if we already detected EOF, though. */
- if (!fsstate->eof_reached)
- fetch_more_data(node);
- /* If we didn't get any tuples, must be end of data. */
- if (fsstate->next_tuple >= fsstate->num_tuples)
- return ExecClearTuple(slot);
+ /*
+ * We haven't received a result for the given node this time, return
+ * with no tuple to give way to another node.
+ */
+ if (fsstate->eof_reached)
+ node->ss.ps.asyncstate = AS_AVAILABLE;
+ else
+ node->ss.ps.asyncstate = AS_WAITING;
+
+ return ExecClearTuple(slot);
}
/*
* Return the next tuple.
*/
+ node->ss.ps.asyncstate = AS_AVAILABLE;
ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
slot,
false);
@@ -1534,7 +1787,7 @@ postgresIterateForeignScan(ForeignScanState *node)
static void
postgresReScanForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
char sql[64];
PGresult *res;
@@ -1542,6 +1795,8 @@ postgresReScanForeignScan(ForeignScanState *node)
if (!fsstate->cursor_exists)
return;
+ vacate_connection((PgFdwState *)fsstate, true);
+
/*
* If any internal parameters affecting this node have changed, we'd
* better destroy and recreate the cursor. Otherwise, rewinding it should
@@ -1570,9 +1825,9 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->s.conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
PQclear(res);
/* Now force a fresh FETCH. */
@@ -1590,7 +1845,7 @@ postgresReScanForeignScan(ForeignScanState *node)
static void
postgresEndForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
if (fsstate == NULL)
@@ -1598,15 +1853,31 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
- close_cursor(fsstate->conn, fsstate->cursor_number);
+ close_cursor(fsstate->s.conn, fsstate->cursor_number);
/* Release remote connection */
- ReleaseConnection(fsstate->conn);
- fsstate->conn = NULL;
+ ReleaseConnection(fsstate->s.conn);
+ fsstate->s.conn = NULL;
/* MemoryContexts will be deleted automatically. */
}
+/*
+ * postgresShutdownForeignScan
+ * Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+ ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+ if (plan->operation != CMD_SELECT)
+ return;
+
+ /* remove the node from waiting queue */
+ remove_async_node(node);
+}
+
/*
* postgresAddForeignUpdateTargets
* Add resjunk column(s) needed for update/delete on a foreign table
@@ -2371,7 +2642,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->s.conn = GetConnection(user, false);
+ dmstate->s.commonstate = (PgFdwConnCommonState *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState));
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2456,7 +2729,11 @@ postgresIterateDirectModify(ForeignScanState *node)
* If this is the first call after Begin, execute the statement.
*/
if (dmstate->num_tuples == -1)
+ {
+ /* finish running query to send my command */
+ vacate_connection((PgFdwState *)dmstate, true);
execute_dml_stmt(node);
+ }
/*
* If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2503,8 +2780,8 @@ postgresEndDirectModify(ForeignScanState *node)
PQclear(dmstate->result);
/* Release remote connection */
- ReleaseConnection(dmstate->conn);
- dmstate->conn = NULL;
+ ReleaseConnection(dmstate->s.conn);
+ dmstate->s.conn = NULL;
/* MemoryContext will be deleted automatically. */
}
@@ -2702,6 +2979,7 @@ estimate_path_cost_size(PlannerInfo *root,
List *local_param_join_conds;
StringInfoData sql;
PGconn *conn;
+ PgFdwConnCommonState *commonstate;
Selectivity local_sel;
QualCost local_cost;
List *fdw_scan_tlist = NIL;
@@ -2746,6 +3024,18 @@ estimate_path_cost_size(PlannerInfo *root,
/* Get the remote estimate */
conn = GetConnection(fpinfo->user, false);
+ commonstate = GetConnectionSpecificStorage(fpinfo->user,
+ sizeof(PgFdwConnCommonState));
+ if (commonstate)
+ {
+ PgFdwState tmpstate;
+ tmpstate.conn = conn;
+ tmpstate.commonstate = commonstate;
+
+ /* finish running query to send my command */
+ vacate_connection(&tmpstate, true);
+ }
+
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3316,11 +3606,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
static void
create_cursor(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int numParams = fsstate->numParams;
const char **values = fsstate->param_values;
- PGconn *conn = fsstate->conn;
+ PGconn *conn = fsstate->s.conn;
StringInfoData buf;
PGresult *res;
@@ -3383,50 +3673,119 @@ create_cursor(ForeignScanState *node)
}
/*
- * Fetch some more rows from the node's cursor.
+ * Sends the next request of the node. If the given node is different from the
+ * current connection leader, pushes it back to waiter queue and let the given
+ * node be the leader.
*/
static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.commonstate->leader;
+ PGconn *conn = fsstate->s.conn;
+ char sql[64];
+
+ /* must be non-busy */
+ Assert(!fsstate->s.commonstate->busy);
+ /* must be not-eof'ed */
+ Assert(!fsstate->eof_reached);
+
+ /*
+ * If this is the first call after Begin or ReScan, we need to create the
+ * cursor on the remote side.
+ */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (!PQsendQuery(conn, sql))
+ pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+ fsstate->s.commonstate->busy = true;
+
+ /* The node is the current leader, just return. */
+ if (leader == node)
+ return;
+
+ /* Let the node be the leader */
+ if (leader != NULL)
+ {
+ remove_async_node(node);
+ fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter;
+ fsstate->waiter = leader;
+ }
+ else
+ {
+ fsstate->last_waiter = node;
+ fsstate->waiter = NULL;
+ }
+
+ fsstate->s.commonstate->leader = node;
+}
+
+/*
+ * Fetches received data and automatically send requests of the next waiter.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
PGresult *volatile res = NULL;
MemoryContext oldcontext;
+ ForeignScanState *waiter;
+
+ /* I should be the current connection leader */
+ Assert(fsstate->s.commonstate->leader == node);
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
- * batch.
+ * batch if no tuple is remaining
*/
- fsstate->tuples = NULL;
- MemoryContextReset(fsstate->batch_cxt);
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ MemoryContextReset(fsstate->batch_cxt);
+ }
+ else if (fsstate->next_tuple > 0)
+ {
+ /* There's some remains. Move them to the beginning of the store */
+ int n = 0;
+
+ while(fsstate->next_tuple < fsstate->num_tuples)
+ fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+ fsstate->num_tuples = n;
+ }
+
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
/* PGresult must be released before leaving this function. */
PG_TRY();
{
- PGconn *conn = fsstate->conn;
- char sql[64];
- int numrows;
+ PGconn *conn = fsstate->s.conn;
+ int addrows;
+ size_t newsize;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
-
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
+ res = pgfdw_get_result(conn, fsstate->query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
/* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
+ addrows = PQntuples(res);
+ newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+ if (fsstate->tuples)
+ fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+ else
+ fsstate->tuples = (HeapTuple *) palloc(newsize);
- for (i = 0; i < numrows; i++)
+ for (i = 0; i < addrows; i++)
{
Assert(IsA(node->ss.ps.plan, ForeignScan));
- fsstate->tuples[i] =
+ fsstate->tuples[fsstate->num_tuples + i] =
make_tuple_from_result_row(res, i,
fsstate->rel,
fsstate->attinmeta,
@@ -3436,22 +3795,73 @@ fetch_more_data(ForeignScanState *node)
}
/* Update fetch_ct_2 */
- if (fsstate->fetch_ct_2 < 2)
+ if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
fsstate->fetch_ct_2++;
+ fsstate->next_tuple = 0;
+ fsstate->num_tuples += addrows;
+
/* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
+ fsstate->eof_reached = (addrows < fsstate->fetch_size);
}
PG_FINALLY();
{
+ fsstate->s.commonstate->busy = false;
+
if (res)
PQclear(res);
}
PG_END_TRY();
+ /* let the first waiter be the next leader of this connection */
+ waiter = move_to_next_waiter(node);
+
+ /* send the next request if any */
+ if (waiter)
+ request_more_data(waiter);
+
MemoryContextSwitchTo(oldcontext);
}
+/*
+ * Vacate the underlying connection so that this node can send the next query.
+ */
+static void
+vacate_connection(PgFdwState *fdwstate, bool clear_queue)
+{
+ PgFdwConnCommonState *commonstate = fdwstate->commonstate;
+ ForeignScanState *leader;
+
+ Assert(commonstate != NULL);
+
+ /* just return if the connection is already available */
+ if (commonstate->leader == NULL || !commonstate->busy)
+ return;
+
+ /*
+ * let the current connection leader read all of the result for the running
+ * query
+ */
+ leader = commonstate->leader;
+ fetch_received_data(leader);
+
+ /* let the first waiter be the next leader of this connection */
+ move_to_next_waiter(leader);
+
+ if (!clear_queue)
+ return;
+
+ /* Clear the waiting list */
+ while (leader)
+ {
+ PgFdwScanState *fsstate = GetPgFdwScanState(leader);
+
+ fsstate->last_waiter = NULL;
+ leader = fsstate->waiter;
+ fsstate->waiter = NULL;
+ }
+}
+
/*
* Force assorted GUC parameters to settings that ensure that we'll output
* data values in a form that is unambiguous to the remote server.
@@ -3565,7 +3975,9 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->s.conn = GetConnection(user, true);
+ fmstate->s.commonstate = (PgFdwConnCommonState *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState));
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -3652,6 +4064,9 @@ execute_foreign_modify(EState *estate,
operation == CMD_UPDATE ||
operation == CMD_DELETE);
+ /* finish running query to send my command */
+ vacate_connection((PgFdwState *)fmstate, true);
+
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
@@ -3679,14 +4094,14 @@ execute_foreign_modify(EState *estate,
/*
* Execute the prepared statement.
*/
- if (!PQsendQueryPrepared(fmstate->conn,
+ if (!PQsendQueryPrepared(fmstate->s.conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -3694,10 +4109,10 @@ execute_foreign_modify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
@@ -3733,7 +4148,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
- GetPrepStmtNumber(fmstate->conn));
+ GetPrepStmtNumber(fmstate->s.conn));
p_name = pstrdup(prep_name);
/*
@@ -3743,12 +4158,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
*/
- if (!PQsendPrepare(fmstate->conn,
+ if (!PQsendPrepare(fmstate->s.conn,
p_name,
fmstate->query,
0,
NULL))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -3756,9 +4171,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
PQclear(res);
/* This action shows that the prepare has been done. */
@@ -3887,16 +4302,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->s.conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
PQclear(res);
fmstate->p_name = NULL;
}
/* Release remote connection */
- ReleaseConnection(fmstate->conn);
- fmstate->conn = NULL;
+ ReleaseConnection(fmstate->s.conn);
+ fmstate->s.conn = NULL;
}
/*
@@ -4055,9 +4470,9 @@ execute_dml_stmt(ForeignScanState *node)
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
*/
- if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
NULL, values, NULL, NULL, 0))
- pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+ pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
/*
* Get the result, and check for success.
@@ -4065,10 +4480,10 @@ execute_dml_stmt(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+ dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+ pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
dmstate->query);
/* Get the number of rows affected. */
@@ -5559,6 +5974,40 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
/* XXX Consider parameterized paths for the join relation */
}
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add wait event so that the ForeignScan node is going to wait for.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+ void *caller_data, bool reinit)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+ /* Reinit is not supported for now. */
+ Assert(reinit);
+
+ if (fsstate->s.commonstate->leader == node)
+ {
+ AddWaitEventToSet(wes,
+ WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+ NULL, caller_data);
+ return true;
+ }
+
+ return false;
+}
+
+
/*
* Assess whether the aggregation, grouping and having operations can be pushed
* down to the foreign server. As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..96af75a33e 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo
UserMapping *user; /* only set in use_remote_estimate mode */
int fetch_size; /* fetch size for this remote table */
+ bool allow_prefetch; /* true to allow overlapped fetching */
/*
* Name of the relation, for use while EXPLAINing ForeignScan. It is used
@@ -130,6 +131,7 @@ extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 78156d10b4..17d461b1a4 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1799,25 +1799,25 @@ INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
@@ -1859,12 +1859,12 @@ insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
-- Check UPDATE with inherited target and an inherited source table
explain (verbose, costs off)
@@ -1923,8 +1923,8 @@ explain (verbose, costs off)
delete from foo where f1 < 5 returning *;
delete from foo where f1 < 5 returning *;
explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
-- Test that UPDATE/DELETE with inherited target works with row-level triggers
CREATE TRIGGER trig_row_before
--
2.18.4
pgsql-hackers by date: