Re: Asynchronous Append on postgres_fdw nodes. - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Re: Asynchronous Append on postgres_fdw nodes. |
Date | |
Msg-id | 20200604.150015.2305436578805492574.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: Asynchronous Append on postgres_fdw nodes. (Andrey Lepikhov <a.lepikhov@postgrespro.ru>) |
Responses |
Re: Asynchronous Append on postgres_fdw nodes.
|
List | pgsql-hackers |
Hello, Andrey. At Wed, 3 Jun 2020 15:00:06 +0500, Andrey Lepikhov <a.lepikhov@postgrespro.ru> wrote in > This patch no longer applies cleanly. > In addition, code comments contain spelling errors. Sure. Thaks for noticing of them and sorry for the many typos. Additional item in WaitEventIPC conflicted with this. I found the following typos. connection.c: s/Rerturns/Returns/ postgres-fdw.c: s/Retrive/Retrieve/ s/ForeginScanState/ForeignScanState/ s/manipuration/manipulation/ s/asyncstate/async state/ s/alrady/already/ nodeAppend.c: s/Rery/Retry/ createplan.c: s/chidlren/children/ resowner.c: s/identier/identifier/ X 2 execnodes.h: s/sutff/stuff/ plannodes.h: s/asyncronous/asynchronous/ Removed a useless variable PgFdwScanState.result_ready. Removed duplicate code from remove_async_node() by using move_to_next_waiter(). Done some minor cleanups. regards. -- Kyotaro Horiguchi NTT Open Source Software Center From db231fa99da5954b52e195f6af800c0f9b991ed4 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Mon, 22 May 2017 12:42:58 +0900 Subject: [PATCH v3 1/3] Allow wait event set to be registered to resource owner WaitEventSet needs to be released using resource owner for a certain case. This change adds WaitEventSet reowner and allow the creator of a WaitEventSet to specify a resource owner. --- src/backend/libpq/pqcomm.c | 2 +- src/backend/storage/ipc/latch.c | 18 ++++- src/backend/storage/lmgr/condition_variable.c | 2 +- src/backend/utils/resowner/resowner.c | 67 +++++++++++++++++++ src/include/storage/latch.h | 4 +- src/include/utils/resowner_private.h | 8 +++ 6 files changed, 96 insertions(+), 5 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 7717bb2719..16aefb03ee 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -218,7 +218,7 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif - FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); + FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3); AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL); diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 05df5017c4..a8b52cd381 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -56,6 +56,7 @@ #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/resowner_private.h" /* * Select the fd readiness primitive to use. Normally the "most modern" @@ -84,6 +85,8 @@ struct WaitEventSet int nevents; /* number of registered events */ int nevents_space; /* maximum number of events in this set */ + ResourceOwner resowner; /* Resource owner */ + /* * Array, of nevents_space length, storing the definition of events this * set is waiting for. @@ -393,7 +396,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, int ret = 0; int rc; WaitEvent event; - WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3); + WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3); if (wakeEvents & WL_TIMEOUT) Assert(timeout >= 0); @@ -560,12 +563,15 @@ ResetLatch(Latch *latch) * WaitEventSetWait(). */ WaitEventSet * -CreateWaitEventSet(MemoryContext context, int nevents) +CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents) { WaitEventSet *set; char *data; Size sz = 0; + if (res) + ResourceOwnerEnlargeWESs(res); + /* * Use MAXALIGN size/alignment to guarantee that later uses of memory are * aligned correctly. E.g. epoll_event might need 8 byte alignment on some @@ -680,6 +686,11 @@ CreateWaitEventSet(MemoryContext context, int nevents) StaticAssertStmt(WSA_INVALID_EVENT == NULL, ""); #endif + /* Register this wait event set if requested */ + set->resowner = res; + if (res) + ResourceOwnerRememberWES(set->resowner, set); + return set; } @@ -725,6 +736,9 @@ FreeWaitEventSet(WaitEventSet *set) } #endif + if (set->resowner != NULL) + ResourceOwnerForgetWES(set->resowner, set); + pfree(set); } diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 37b6a4eecd..fcc92138fe 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -70,7 +70,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) { WaitEventSet *new_event_set; - new_event_set = CreateWaitEventSet(TopMemoryContext, 2); + new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2); AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index 8bc2c4e9ea..237ca9fa30 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -128,6 +128,7 @@ typedef struct ResourceOwnerData ResourceArray filearr; /* open temporary files */ ResourceArray dsmarr; /* dynamic shmem segments */ ResourceArray jitarr; /* JIT contexts */ + ResourceArray wesarr; /* wait event sets */ /* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */ int nlocks; /* number of owned locks */ @@ -175,6 +176,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc); static void PrintSnapshotLeakWarning(Snapshot snapshot); static void PrintFileLeakWarning(File file); static void PrintDSMLeakWarning(dsm_segment *seg); +static void PrintWESLeakWarning(WaitEventSet *events); /***************************************************************************** @@ -444,6 +446,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name) ResourceArrayInit(&(owner->filearr), FileGetDatum(-1)); ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL)); ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL)); + ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL)); return owner; } @@ -553,6 +556,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, jit_release_context(context); } + + /* Ditto for wait event sets */ + while (ResourceArrayGetAny(&(owner->wesarr), &foundres)) + { + WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres); + + if (isCommit) + PrintWESLeakWarning(event); + FreeWaitEventSet(event); + } } else if (phase == RESOURCE_RELEASE_LOCKS) { @@ -725,6 +738,7 @@ ResourceOwnerDelete(ResourceOwner owner) Assert(owner->filearr.nitems == 0); Assert(owner->dsmarr.nitems == 0); Assert(owner->jitarr.nitems == 0); + Assert(owner->wesarr.nitems == 0); Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1); /* @@ -752,6 +766,7 @@ ResourceOwnerDelete(ResourceOwner owner) ResourceArrayFree(&(owner->filearr)); ResourceArrayFree(&(owner->dsmarr)); ResourceArrayFree(&(owner->jitarr)); + ResourceArrayFree(&(owner->wesarr)); pfree(owner); } @@ -1370,3 +1385,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle) elog(ERROR, "JIT context %p is not owned by resource owner %s", DatumGetPointer(handle), owner->name); } + +/* + * wait event set reference array. + * + * This is separate from actually inserting an entry because if we run out + * of memory, it's critical to do so *before* acquiring the resource. + */ +void +ResourceOwnerEnlargeWESs(ResourceOwner owner) +{ + ResourceArrayEnlarge(&(owner->wesarr)); +} + +/* + * Remember that a wait event set is owned by a ResourceOwner + * + * Caller must have previously done ResourceOwnerEnlargeWESs() + */ +void +ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events) +{ + ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events)); +} + +/* + * Forget that a wait event set is owned by a ResourceOwner + */ +void +ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events) +{ + /* + * XXXX: There's no property to show as an identier of a wait event set, + * use its pointer instead. + */ + if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events))) + elog(ERROR, "wait event set %p is not owned by resource owner %s", + events, owner->name); +} + +/* + * Debugging subroutine + */ +static void +PrintWESLeakWarning(WaitEventSet *events) +{ + /* + * XXXX: There's no property to show as an identier of a wait event set, + * use its pointer instead. + */ + elog(WARNING, "wait event set leak: %p still referenced", + events); +} diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 46ae56cae3..b1b8375768 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -101,6 +101,7 @@ #define LATCH_H #include <signal.h> +#include "utils/resowner.h" /* * Latch structure should be treated as opaque and only accessed through @@ -163,7 +164,8 @@ extern void DisownLatch(Latch *latch); extern void SetLatch(Latch *latch); extern void ResetLatch(Latch *latch); -extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents); +extern WaitEventSet *CreateWaitEventSet(MemoryContext context, + ResourceOwner res, int nevents); extern void FreeWaitEventSet(WaitEventSet *set); extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data); diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h index a781a7a2aa..7d19dadd57 100644 --- a/src/include/utils/resowner_private.h +++ b/src/include/utils/resowner_private.h @@ -18,6 +18,7 @@ #include "storage/dsm.h" #include "storage/fd.h" +#include "storage/latch.h" #include "storage/lock.h" #include "utils/catcache.h" #include "utils/plancache.h" @@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner, extern void ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle); +/* support for wait event set management */ +extern void ResourceOwnerEnlargeWESs(ResourceOwner owner); +extern void ResourceOwnerRememberWES(ResourceOwner owner, + WaitEventSet *); +extern void ResourceOwnerForgetWES(ResourceOwner owner, + WaitEventSet *); + #endif /* RESOWNER_PRIVATE_H */ -- 2.18.2 From ced3307f27f01e657499ae6ef4436efaa5e350e5 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Tue, 15 May 2018 20:21:32 +0900 Subject: [PATCH v3 2/3] infrastructure for asynchronous execution This patch add an infrastructure for asynchronous execution. As a PoC this makes only Append capable to handle asynchronously executable subnodes. --- src/backend/commands/explain.c | 17 ++ src/backend/executor/Makefile | 1 + src/backend/executor/execAsync.c | 152 +++++++++++ src/backend/executor/nodeAppend.c | 342 ++++++++++++++++++++---- src/backend/executor/nodeForeignscan.c | 21 ++ src/backend/nodes/bitmapset.c | 72 +++++ src/backend/nodes/copyfuncs.c | 3 + src/backend/nodes/outfuncs.c | 3 + src/backend/nodes/readfuncs.c | 3 + src/backend/optimizer/plan/createplan.c | 66 ++++- src/backend/postmaster/pgstat.c | 3 + src/backend/postmaster/syslogger.c | 2 +- src/backend/utils/adt/ruleutils.c | 8 +- src/backend/utils/resowner/resowner.c | 4 +- src/include/executor/execAsync.h | 22 ++ src/include/executor/executor.h | 1 + src/include/executor/nodeForeignscan.h | 3 + src/include/foreign/fdwapi.h | 11 + src/include/nodes/bitmapset.h | 1 + src/include/nodes/execnodes.h | 23 +- src/include/nodes/plannodes.h | 9 + src/include/pgstat.h | 3 +- 22 files changed, 705 insertions(+), 65 deletions(-) create mode 100644 src/backend/executor/execAsync.c create mode 100644 src/include/executor/execAsync.h diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index efd7201d61..708e9ed546 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -86,6 +86,7 @@ static void show_incremental_sort_keys(IncrementalSortState *incrsortstate, List *ancestors, ExplainState *es); static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ExplainState *es); +static void show_append_info(AppendState *astate, ExplainState *es); static void show_agg_keys(AggState *astate, List *ancestors, ExplainState *es); static void show_grouping_sets(PlanState *planstate, Agg *agg, @@ -1389,6 +1390,8 @@ ExplainNode(PlanState *planstate, List *ancestors, } if (plan->parallel_aware) appendStringInfoString(es->str, "Parallel "); + if (plan->async_capable) + appendStringInfoString(es->str, "Async "); appendStringInfoString(es->str, pname); es->indent++; } @@ -1969,6 +1972,11 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Hash: show_hash_info(castNode(HashState, planstate), es); break; + + case T_Append: + show_append_info(castNode(AppendState, planstate), es); + break; + default: break; } @@ -2322,6 +2330,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ancestors, es); } +static void +show_append_info(AppendState *astate, ExplainState *es) +{ + Append *plan = (Append *) astate->ps.plan; + + if (plan->nasyncplans > 0) + ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es); +} + /* * Show the grouping keys for an Agg node. */ diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index f990c6473a..1004647d4f 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ + execAsync.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c new file mode 100644 index 0000000000..2b7d1877e0 --- /dev/null +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,152 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/nodeAppend.h" +#include "executor/nodeForeignscan.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* + * ExecAsyncConfigureWait: Add wait event to the WaitEventSet if needed. + * + * If reinit is true, the caller didn't reuse existing WaitEventSet. + */ +bool +ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node, + void *data, bool reinit) +{ + switch (nodeTag(node)) + { + case T_ForeignScanState: + return ExecForeignAsyncConfigureWait((ForeignScanState *)node, + wes, data, reinit); + break; + default: + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(node)); + } +} + +/* + * struct for memory context callback argument used in ExecAsyncEventWait + */ +typedef struct { + int **p_refind; + int *p_refindsize; +} ExecAsync_mcbarg; + +/* + * callback function to reset static variables pointing to the memory in + * TopTransactionContext in ExecAsyncEventWait. + */ +static void ExecAsyncMemoryContextCallback(void *arg) +{ + /* arg is the address of the variable refind in ExecAsyncEventWait */ + ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg; + *mcbarg->p_refind = NULL; + *mcbarg->p_refindsize = 0; +} + +#define EVENT_BUFFER_SIZE 16 + +/* + * ExecAsyncEventWait: + * + * Wait for async events to fire. Returns the Bitmapset of fired events. + */ +Bitmapset * +ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout) +{ + static int *refind = NULL; + static int refindsize = 0; + WaitEventSet *wes; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred = 0; + Bitmapset *fired_events = NULL; + int i; + int n; + + n = bms_num_members(waitnodes); + wes = CreateWaitEventSet(TopTransactionContext, + TopTransactionResourceOwner, n); + if (refindsize < n) + { + if (refindsize == 0) + refindsize = EVENT_BUFFER_SIZE; /* XXX */ + while (refindsize < n) + refindsize *= 2; + if (refind) + refind = (int *) repalloc(refind, refindsize * sizeof(int)); + else + { + static ExecAsync_mcbarg mcb_arg = + { &refind, &refindsize }; + static MemoryContextCallback mcb = + { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL }; + MemoryContext oldctxt = + MemoryContextSwitchTo(TopTransactionContext); + + /* + * refind points to a memory block in + * TopTransactionContext. Register a callback to reset it. + */ + MemoryContextRegisterResetCallback(TopTransactionContext, &mcb); + refind = (int *) palloc(refindsize * sizeof(int)); + MemoryContextSwitchTo(oldctxt); + } + } + + /* Prepare WaitEventSet for waiting on the waitnodes. */ + n = 0; + for (i = bms_next_member(waitnodes, -1) ; i >= 0 ; + i = bms_next_member(waitnodes, i)) + { + refind[i] = i; + if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true)) + n++; + } + + /* Return immediately if no node to wait. */ + if (n == 0) + { + FreeWaitEventSet(wes); + return NULL; + } + + noccurred = WaitEventSetWait(wes, timeout, occurred_event, + EVENT_BUFFER_SIZE, + WAIT_EVENT_ASYNC_WAIT); + FreeWaitEventSet(wes); + if (noccurred == 0) + return NULL; + + for (i = 0 ; i < noccurred ; i++) + { + WaitEvent *w = &occurred_event[i]; + + if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0) + { + int n = *(int*)w->user_data; + + fired_events = bms_add_member(fired_events, n); + } + } + + return fired_events; +} diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 88919e62fa..60c36ee048 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -60,6 +60,7 @@ #include "executor/execdebug.h" #include "executor/execPartition.h" #include "executor/nodeAppend.h" +#include "executor/execAsync.h" #include "miscadmin.h" /* Shared state for parallel-aware Append. */ @@ -80,6 +81,7 @@ struct ParallelAppendState #define INVALID_SUBPLAN_INDEX -1 static TupleTableSlot *ExecAppend(PlanState *pstate); +static TupleTableSlot *ExecAppendAsync(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); static bool choose_next_subplan_for_worker(AppendState *node); @@ -103,22 +105,22 @@ ExecInitAppend(Append *node, EState *estate, int eflags) PlanState **appendplanstates; Bitmapset *validsubplans; int nplans; + int nasyncplans; int firstvalid; int i, j; /* check for unsupported flags */ - Assert(!(eflags & EXEC_FLAG_MARK)); + Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC))); /* * create new AppendState for our append node */ appendstate->ps.plan = (Plan *) node; appendstate->ps.state = estate; - appendstate->ps.ExecProcNode = ExecAppend; /* Let choose_next_subplan_* function handle setting the first subplan */ - appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX; /* If run-time partition pruning is enabled, then set that up now */ if (node->part_prune_info != NULL) @@ -152,11 +154,12 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* * When no run-time pruning is required and there's at least one - * subplan, we can fill as_valid_subplans immediately, preventing + * subplan, we can fill as_valid_syncsubplans immediately, preventing * later calls to ExecFindMatchingSubPlans. */ if (!prunestate->do_exec_prune && nplans > 0) - appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1); + appendstate->as_valid_syncsubplans = + bms_add_range(NULL, node->nasyncplans, nplans - 1); } else { @@ -167,8 +170,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags) * subplans as valid; they must also all be initialized. */ Assert(nplans > 0); - appendstate->as_valid_subplans = validsubplans = - bms_add_range(NULL, 0, nplans - 1); + validsubplans = bms_add_range(NULL, 0, nplans - 1); + appendstate->as_valid_syncsubplans = + bms_add_range(NULL, node->nasyncplans, nplans - 1); appendstate->as_prune_state = NULL; } @@ -192,10 +196,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags) */ j = 0; firstvalid = nplans; + nasyncplans = 0; + i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->appendplans, i); + int sub_eflags = eflags; + + /* Let async-capable subplans run asynchronously */ + if (i < node->nasyncplans) + { + sub_eflags |= EXEC_FLAG_ASYNC; + nasyncplans++; + } /* * Record the lowest appendplans index which is a valid partial plan. @@ -203,13 +217,46 @@ ExecInitAppend(Append *node, EState *estate, int eflags) if (i >= node->first_partial_plan && j < firstvalid) firstvalid = j; - appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); + appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags); } appendstate->as_first_partial_plan = firstvalid; appendstate->appendplans = appendplanstates; appendstate->as_nplans = nplans; + /* fill in async stuff */ + appendstate->as_nasyncplans = nasyncplans; + appendstate->as_syncdone = (nasyncplans == nplans); + appendstate->as_exec_prune = false; + + /* choose appropriate version of Exec function */ + if (appendstate->as_nasyncplans == 0) + appendstate->ps.ExecProcNode = ExecAppend; + else + appendstate->ps.ExecProcNode = ExecAppendAsync; + + if (appendstate->as_nasyncplans) + { + appendstate->as_asyncresult = (TupleTableSlot **) + palloc0(appendstate->as_nasyncplans * sizeof(TupleTableSlot *)); + + /* initially, all async requests need a request */ + appendstate->as_needrequest = + bms_add_range(NULL, 0, appendstate->as_nasyncplans - 1); + + /* + * ExecAppendAsync needs as_valid_syncsubplans to handle async + * subnodes. + */ + if (appendstate->as_prune_state != NULL && + appendstate->as_prune_state->do_exec_prune) + { + Assert(appendstate->as_valid_syncsubplans == NULL); + + appendstate->as_exec_prune = true; + } + } + /* * Miscellaneous initialization */ @@ -233,7 +280,7 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); - if (node->as_whichplan < 0) + if (node->as_whichsyncplan < 0) { /* Nothing to do if there are no subplans */ if (node->as_nplans == 0) @@ -243,11 +290,13 @@ ExecAppend(PlanState *pstate) * If no subplan has been chosen, we must choose one before * proceeding. */ - if (node->as_whichplan == INVALID_SUBPLAN_INDEX && + if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX && !node->choose_next_subplan(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } + Assert(node->as_nasyncplans == 0); + for (;;) { PlanState *subnode; @@ -258,8 +307,9 @@ ExecAppend(PlanState *pstate) /* * figure out which subplan we are currently processing */ - Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); - subnode = node->appendplans[node->as_whichplan]; + Assert(node->as_whichsyncplan >= 0 && + node->as_whichsyncplan < node->as_nplans); + subnode = node->appendplans[node->as_whichsyncplan]; /* * get a tuple from the subplan @@ -282,6 +332,172 @@ ExecAppend(PlanState *pstate) } } +static TupleTableSlot * +ExecAppendAsync(PlanState *pstate) +{ + AppendState *node = castNode(AppendState, pstate); + Bitmapset *needrequest; + int i; + + Assert(node->as_nasyncplans > 0); + +restart: + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + if (node->as_exec_prune) + { + Bitmapset *valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + /* Distribute valid subplans into sync and async */ + node->as_needrequest = + bms_intersect(node->as_needrequest, valid_subplans); + node->as_valid_syncsubplans = + bms_difference(valid_subplans, node->as_needrequest); + + node->as_exec_prune = false; + } + + needrequest = node->as_needrequest; + node->as_needrequest = NULL; + while ((i = bms_first_member(needrequest)) >= 0) + { + TupleTableSlot *slot; + PlanState *subnode = node->appendplans[i]; + + slot = ExecProcNode(subnode); + if (subnode->asyncstate == AS_AVAILABLE) + { + if (!TupIsNull(slot)) + { + node->as_asyncresult[node->as_nasyncresult++] = slot; + node->as_needrequest = bms_add_member(node->as_needrequest, i); + } + } + else + node->as_pending_async = bms_add_member(node->as_pending_async, i); + } + bms_free(needrequest); + + for (;;) + { + TupleTableSlot *result; + + /* return now if a result is available */ + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + while (!bms_is_empty(node->as_pending_async)) + { + /* Don't wait for async nodes if any sync node exists. */ + long timeout = node->as_syncdone ? -1 : 0; + Bitmapset *fired; + int i; + + fired = ExecAsyncEventWait(node->appendplans, + node->as_pending_async, + timeout); + + if (bms_is_empty(fired) && node->as_syncdone) + { + /* + * We come here when all the subnodes had fired before + * waiting. Retry fetching from the nodes. + */ + node->as_needrequest = node->as_pending_async; + node->as_pending_async = NULL; + goto restart; + } + + while ((i = bms_first_member(fired)) >= 0) + { + TupleTableSlot *slot; + PlanState *subnode = node->appendplans[i]; + slot = ExecProcNode(subnode); + + Assert(subnode->asyncstate == AS_AVAILABLE); + + if (!TupIsNull(slot)) + { + node->as_asyncresult[node->as_nasyncresult++] = slot; + node->as_needrequest = + bms_add_member(node->as_needrequest, i); + } + + node->as_pending_async = + bms_del_member(node->as_pending_async, i); + } + bms_free(fired); + + /* return now if a result is available */ + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + if (!node->as_syncdone) + break; + } + + /* + * If there is no asynchronous activity still pending and the + * synchronous activity is also complete, we're totally done scanning + * this node. Otherwise, we're done with the asynchronous stuff but + * must continue scanning the synchronous children. + */ + + if (!node->as_syncdone && + node->as_whichsyncplan == INVALID_SUBPLAN_INDEX) + node->as_syncdone = !node->choose_next_subplan(node); + + if (node->as_syncdone) + { + Assert(bms_is_empty(node->as_pending_async)); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + + /* + * get a tuple from the subplan + */ + result = ExecProcNode(node->appendplans[node->as_whichsyncplan]); + + if (!TupIsNull(result)) + { + /* + * If the subplan gave us something then return it as-is. We do + * NOT make use of the result slot that was set up in + * ExecInitAppend; there's no need for it. + */ + return result; + } + + /* + * Go on to the "next" subplan. If no more subplans, return the empty + * slot set up for us by ExecInitAppend, unless there are async plans + * we have yet to finish. + */ + if (!node->choose_next_subplan(node)) + { + node->as_syncdone = true; + if (bms_is_empty(node->as_pending_async)) + { + Assert(bms_is_empty(node->as_needrequest)); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + } + + /* Else loop back and try to get a tuple from the new subplan */ + } +} + /* ---------------------------------------------------------------- * ExecEndAppend * @@ -324,10 +540,18 @@ ExecReScanAppend(AppendState *node) bms_overlap(node->ps.chgParam, node->as_prune_state->execparamids)) { - bms_free(node->as_valid_subplans); - node->as_valid_subplans = NULL; + bms_free(node->as_valid_syncsubplans); + node->as_valid_syncsubplans = NULL; } + /* Reset async state. */ + for (i = 0; i < node->as_nasyncplans; ++i) + ExecShutdownNode(node->appendplans[i]); + + node->as_nasyncresult = 0; + node->as_needrequest = bms_add_range(NULL, 0, node->as_nasyncplans - 1); + node->as_syncdone = (node->as_nasyncplans == node->as_nplans); + for (i = 0; i < node->as_nplans; i++) { PlanState *subnode = node->appendplans[i]; @@ -348,7 +572,7 @@ ExecReScanAppend(AppendState *node) } /* Let choose_next_subplan_* function handle setting the first subplan */ - node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_whichsyncplan = INVALID_SUBPLAN_INDEX; } /* ---------------------------------------------------------------- @@ -436,7 +660,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) static bool choose_next_subplan_locally(AppendState *node) { - int whichplan = node->as_whichplan; + int whichplan = node->as_whichsyncplan; int nextplan; /* We should never be called when there are no subplans */ @@ -451,10 +675,18 @@ choose_next_subplan_locally(AppendState *node) */ if (whichplan == INVALID_SUBPLAN_INDEX) { - if (node->as_valid_subplans == NULL) - node->as_valid_subplans = + /* Shouldn't have an active async node */ + Assert(bms_is_empty(node->as_needrequest)); + + if (node->as_valid_syncsubplans == NULL) + node->as_valid_syncsubplans = ExecFindMatchingSubPlans(node->as_prune_state); + /* Exclude async plans */ + if (node->as_nasyncplans > 0) + bms_del_range(node->as_valid_syncsubplans, + 0, node->as_nasyncplans - 1); + whichplan = -1; } @@ -462,14 +694,14 @@ choose_next_subplan_locally(AppendState *node) Assert(whichplan >= -1 && whichplan <= node->as_nplans); if (ScanDirectionIsForward(node->ps.state->es_direction)) - nextplan = bms_next_member(node->as_valid_subplans, whichplan); + nextplan = bms_next_member(node->as_valid_syncsubplans, whichplan); else - nextplan = bms_prev_member(node->as_valid_subplans, whichplan); + nextplan = bms_prev_member(node->as_valid_syncsubplans, whichplan); if (nextplan < 0) return false; - node->as_whichplan = nextplan; + node->as_whichsyncplan = nextplan; return true; } @@ -490,29 +722,29 @@ choose_next_subplan_for_leader(AppendState *node) /* Backward scan is not supported by parallel-aware plans */ Assert(ScanDirectionIsForward(node->ps.state->es_direction)); - /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + /* We should never be called when there are no sync subplans */ + Assert(node->as_nplans > node->as_nasyncplans); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); - if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX) { /* Mark just-completed subplan as finished. */ - node->as_pstate->pa_finished[node->as_whichplan] = true; + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; } else { /* Start with last subplan. */ - node->as_whichplan = node->as_nplans - 1; + node->as_whichsyncplan = node->as_nplans - 1; /* * If we've yet to determine the valid subplans then do so now. If * run-time pruning is disabled then the valid subplans will always be * set to all subplans. */ - if (node->as_valid_subplans == NULL) + if (node->as_valid_syncsubplans == NULL) { - node->as_valid_subplans = + node->as_valid_syncsubplans = ExecFindMatchingSubPlans(node->as_prune_state); /* @@ -524,26 +756,26 @@ choose_next_subplan_for_leader(AppendState *node) } /* Loop until we find a subplan to execute. */ - while (pstate->pa_finished[node->as_whichplan]) + while (pstate->pa_finished[node->as_whichsyncplan]) { - if (node->as_whichplan == 0) + if (node->as_whichsyncplan == 0) { pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; - node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_whichsyncplan = INVALID_SUBPLAN_INDEX; LWLockRelease(&pstate->pa_lock); return false; } /* - * We needn't pay attention to as_valid_subplans here as all invalid + * We needn't pay attention to as_valid_syncsubplans here as all invalid * plans have been marked as finished. */ - node->as_whichplan--; + node->as_whichsyncplan--; } /* If non-partial, immediately mark as finished. */ - if (node->as_whichplan < node->as_first_partial_plan) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; LWLockRelease(&pstate->pa_lock); @@ -571,23 +803,23 @@ choose_next_subplan_for_worker(AppendState *node) /* Backward scan is not supported by parallel-aware plans */ Assert(ScanDirectionIsForward(node->ps.state->es_direction)); - /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + /* We should never be called when there are no sync subplans */ + Assert(node->as_nplans > node->as_nasyncplans); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); /* Mark just-completed subplan as finished. */ - if (node->as_whichplan != INVALID_SUBPLAN_INDEX) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; /* * If we've yet to determine the valid subplans then do so now. If * run-time pruning is disabled then the valid subplans will always be set * to all subplans. */ - else if (node->as_valid_subplans == NULL) + else if (node->as_valid_syncsubplans == NULL) { - node->as_valid_subplans = + node->as_valid_syncsubplans = ExecFindMatchingSubPlans(node->as_prune_state); mark_invalid_subplans_as_finished(node); } @@ -600,30 +832,30 @@ choose_next_subplan_for_worker(AppendState *node) } /* Save the plan from which we are starting the search. */ - node->as_whichplan = pstate->pa_next_plan; + node->as_whichsyncplan = pstate->pa_next_plan; /* Loop until we find a valid subplan to execute. */ while (pstate->pa_finished[pstate->pa_next_plan]) { int nextplan; - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as_valid_syncsubplans, pstate->pa_next_plan); if (nextplan >= 0) { /* Advance to the next valid plan. */ pstate->pa_next_plan = nextplan; } - else if (node->as_whichplan > node->as_first_partial_plan) + else if (node->as_whichsyncplan > node->as_first_partial_plan) { /* * Try looping back to the first valid partial plan, if there is * one. If there isn't, arrange to bail out below. */ - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as_valid_syncsubplans, node->as_first_partial_plan - 1); pstate->pa_next_plan = - nextplan < 0 ? node->as_whichplan : nextplan; + nextplan < 0 ? node->as_whichsyncplan : nextplan; } else { @@ -631,10 +863,10 @@ choose_next_subplan_for_worker(AppendState *node) * At last plan, and either there are no partial plans or we've * tried them all. Arrange to bail out. */ - pstate->pa_next_plan = node->as_whichplan; + pstate->pa_next_plan = node->as_whichsyncplan; } - if (pstate->pa_next_plan == node->as_whichplan) + if (pstate->pa_next_plan == node->as_whichsyncplan) { /* We've tried everything! */ pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; @@ -644,8 +876,8 @@ choose_next_subplan_for_worker(AppendState *node) } /* Pick the plan we found, and advance pa_next_plan one more time. */ - node->as_whichplan = pstate->pa_next_plan; - pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, + node->as_whichsyncplan = pstate->pa_next_plan; + pstate->pa_next_plan = bms_next_member(node->as_valid_syncsubplans, pstate->pa_next_plan); /* @@ -654,7 +886,7 @@ choose_next_subplan_for_worker(AppendState *node) */ if (pstate->pa_next_plan < 0) { - int nextplan = bms_next_member(node->as_valid_subplans, + int nextplan = bms_next_member(node->as_valid_syncsubplans, node->as_first_partial_plan - 1); if (nextplan >= 0) @@ -671,8 +903,8 @@ choose_next_subplan_for_worker(AppendState *node) } /* If non-partial, immediately mark as finished. */ - if (node->as_whichplan < node->as_first_partial_plan) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; LWLockRelease(&pstate->pa_lock); @@ -699,13 +931,13 @@ mark_invalid_subplans_as_finished(AppendState *node) Assert(node->as_prune_state); /* Nothing to do if all plans are valid */ - if (bms_num_members(node->as_valid_subplans) == node->as_nplans) + if (bms_num_members(node->as_valid_syncsubplans) == node->as_nplans) return; /* Mark all non-valid plans as finished */ for (i = 0; i < node->as_nplans; i++) { - if (!bms_is_member(i, node->as_valid_subplans)) + if (!bms_is_member(i, node->as_valid_syncsubplans)) node->as_pstate->pa_finished[i] = true; } } diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 513471ab9b..3bf4aaa63d 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -141,6 +141,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) scanstate->ss.ps.plan = (Plan *) node; scanstate->ss.ps.state = estate; scanstate->ss.ps.ExecProcNode = ExecForeignScan; + scanstate->ss.ps.asyncstate = AS_AVAILABLE; + + if ((eflags & EXEC_FLAG_ASYNC) != 0) + scanstate->fs_async = true; /* * Miscellaneous initialization @@ -384,3 +388,20 @@ ExecShutdownForeignScan(ForeignScanState *node) if (fdwroutine->ShutdownForeignScan) fdwroutine->ShutdownForeignScan(node); } + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanConfigureWait + * + * In async mode, configure for a wait + * ---------------------------------------------------------------- + */ +bool +ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes, + void *caller_data, bool reinit) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); + return fdwroutine->ForeignAsyncConfigureWait(node, wes, + caller_data, reinit); +} diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c index 2719ea45a3..05b625783b 100644 --- a/src/backend/nodes/bitmapset.c +++ b/src/backend/nodes/bitmapset.c @@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper) return a; } +/* + * bms_del_range + * Delete members in the range of 'lower' to 'upper' from the set. + * + * Note this could also be done by calling bms_del_member in a loop, however, + * using this function will be faster when the range is large as we work at + * the bitmapword level rather than at bit level. + */ +Bitmapset * +bms_del_range(Bitmapset *a, int lower, int upper) +{ + int lwordnum, + lbitnum, + uwordnum, + ushiftbits, + wordnum; + + if (lower < 0 || upper < 0) + elog(ERROR, "negative bitmapset member not allowed"); + if (lower > upper) + elog(ERROR, "lower range must not be above upper range"); + uwordnum = WORDNUM(upper); + + if (a == NULL) + { + a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1)); + a->nwords = uwordnum + 1; + } + + /* ensure we have enough words to store the upper bit */ + else if (uwordnum >= a->nwords) + { + int oldnwords = a->nwords; + int i; + + a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1)); + a->nwords = uwordnum + 1; + /* zero out the enlarged portion */ + for (i = oldnwords; i < a->nwords; i++) + a->words[i] = 0; + } + + wordnum = lwordnum = WORDNUM(lower); + + lbitnum = BITNUM(lower); + ushiftbits = BITNUM(upper) + 1; + + /* + * Special case when lwordnum is the same as uwordnum we must perform the + * upper and lower masking on the word. + */ + if (lwordnum == uwordnum) + { + a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1) + | (~(bitmapword) 0) << ushiftbits); + } + else + { + /* turn off lbitnum and all bits left of it */ + a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1); + + /* turn off all bits for any intermediate words */ + while (wordnum < uwordnum) + a->words[wordnum++] = (bitmapword) 0; + + /* turn off upper's bit and all bits right of it. */ + a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits; + } + + return a; +} + /* * bms_int_members - like bms_intersect, but left input is recycled */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index d8cf87e6d0..89a49e2fdc 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -121,6 +121,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_SCALAR_FIELD(plan_width); COPY_SCALAR_FIELD(parallel_aware); COPY_SCALAR_FIELD(parallel_safe); + COPY_SCALAR_FIELD(async_capable); COPY_SCALAR_FIELD(plan_node_id); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); @@ -246,6 +247,8 @@ _copyAppend(const Append *from) COPY_NODE_FIELD(appendplans); COPY_SCALAR_FIELD(first_partial_plan); COPY_NODE_FIELD(part_prune_info); + COPY_SCALAR_FIELD(nasyncplans); + COPY_SCALAR_FIELD(referent); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index e2f177515d..d4bb44b268 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -334,6 +334,7 @@ _outPlanInfo(StringInfo str, const Plan *node) WRITE_INT_FIELD(plan_width); WRITE_BOOL_FIELD(parallel_aware); WRITE_BOOL_FIELD(parallel_safe); + WRITE_BOOL_FIELD(async_capable); WRITE_INT_FIELD(plan_node_id); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); @@ -436,6 +437,8 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(appendplans); WRITE_INT_FIELD(first_partial_plan); WRITE_NODE_FIELD(part_prune_info); + WRITE_INT_FIELD(nasyncplans); + WRITE_INT_FIELD(referent); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 42050ab719..63af7c02d8 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1572,6 +1572,7 @@ ReadCommonPlan(Plan *local_node) READ_INT_FIELD(plan_width); READ_BOOL_FIELD(parallel_aware); READ_BOOL_FIELD(parallel_safe); + READ_BOOL_FIELD(async_capable); READ_INT_FIELD(plan_node_id); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); @@ -1672,6 +1673,8 @@ _readAppend(void) READ_NODE_FIELD(appendplans); READ_INT_FIELD(first_partial_plan); READ_NODE_FIELD(part_prune_info); + READ_INT_FIELD(nasyncplans); + READ_INT_FIELD(referent); READ_DONE(); } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 744eed187d..ba18dd88a8 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -300,6 +300,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root, List *rowMarks, OnConflictExpr *onconflict, int epqParam); static GatherMerge *create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path); +static bool is_async_capable_path(Path *path); /* @@ -1082,6 +1083,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) bool tlist_was_changed = false; List *pathkeys = best_path->path.pathkeys; List *subplans = NIL; + List *asyncplans = NIL; + List *syncplans = NIL; + List *asyncpaths = NIL; + List *syncpaths = NIL; + List *newsubpaths = NIL; ListCell *subpaths; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; @@ -1090,6 +1096,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) Oid *nodeSortOperators = NULL; Oid *nodeCollations = NULL; bool *nodeNullsFirst = NULL; + int nasyncplans = 0; + bool first = true; + bool referent_is_sync = true; /* * The subpaths list could be empty, if every child was proven empty by @@ -1219,9 +1228,36 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } } - subplans = lappend(subplans, subplan); + /* + * Classify as async-capable or not. If we have decided to run the + * children in parallel, we cannot any one of them run asynchronously. + */ + if (!best_path->path.parallel_safe && is_async_capable_path(subpath)) + { + subplan->async_capable = true; + asyncplans = lappend(asyncplans, subplan); + asyncpaths = lappend(asyncpaths, subpath); + ++nasyncplans; + if (first) + referent_is_sync = false; + } + else + { + syncplans = lappend(syncplans, subplan); + syncpaths = lappend(syncpaths, subpath); + } + + first = false; } + /* + * subplan contains asyncplans in the first half, if any, and sync plans in + * another half, if any. We need that the same for subpaths to make + * partition pruning information in sync with subplans. + */ + subplans = list_concat(asyncplans, syncplans); + newsubpaths = list_concat(asyncpaths, syncpaths); + /* * If any quals exist, they may be useful to perform further partition * pruning during execution. Gather information needed by the executor to @@ -1249,7 +1285,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) if (prunequal != NIL) partpruneinfo = make_partition_pruneinfo(root, rel, - best_path->subpaths, + newsubpaths, best_path->partitioned_rels, prunequal); } @@ -1257,6 +1293,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) plan->appendplans = subplans; plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; + plan->nasyncplans = nasyncplans; + plan->referent = referent_is_sync ? nasyncplans : 0; copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -7016,3 +7054,27 @@ is_projection_capable_plan(Plan *plan) } return true; } + +/* + * is_projection_capable_path + * Check whether a given Path node is async-capable. + */ +static bool +is_async_capable_path(Path *path) +{ + switch (nodeTag(path)) + { + case T_ForeignPath: + { + FdwRoutine *fdwroutine = path->parent->fdwroutine; + + Assert(fdwroutine != NULL); + if (fdwroutine->IsForeignPathAsyncCapable != NULL && + fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path)) + return true; + } + default: + break; + } + return false; +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index d7f99d9944..79a2562454 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3882,6 +3882,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; + case WAIT_EVENT_ASYNC_WAIT: + event_name = "AsyncExecWait"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index ffcb54968f..a4de6d90e2 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -300,7 +300,7 @@ SysLoggerMain(int argc, char *argv[]) * syslog pipe, which implies that all other backends have exited * (including the postmaster). */ - wes = CreateWaitEventSet(CurrentMemoryContext, 2); + wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2); AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); #ifndef WIN32 AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL); diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 076c3c019f..f7b5587d7f 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -4584,10 +4584,14 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) * tlists according to one of the children, and the first one is the most * natural choice. Likewise special-case ModifyTable to pretend that the * first child plan is the OUTER referent; this is to support RETURNING - * lists containing references to non-target relations. + * lists containing references to non-target relations. For Append, use the + * explicitly specified referent. */ if (IsA(plan, Append)) - dpns->outer_plan = linitial(((Append *) plan)->appendplans); + { + Append *app = (Append *) plan; + dpns->outer_plan = list_nth(app->appendplans, app->referent); + } else if (IsA(plan, MergeAppend)) dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans); else if (IsA(plan, ModifyTable)) diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index 237ca9fa30..27742a1641 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -1416,7 +1416,7 @@ void ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events) { /* - * XXXX: There's no property to show as an identier of a wait event set, + * XXXX: There's no property to show as an identifier of a wait event set, * use its pointer instead. */ if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events))) @@ -1431,7 +1431,7 @@ static void PrintWESLeakWarning(WaitEventSet *events) { /* - * XXXX: There's no property to show as an identier of a wait event set, + * XXXX: There's no property to show as an identifier of a wait event set, * use its pointer instead. */ elog(WARNING, "wait event set leak: %p still referenced", diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h new file mode 100644 index 0000000000..3b6bf4a516 --- /dev/null +++ b/src/include/executor/execAsync.h @@ -0,0 +1,22 @@ +/*-------------------------------------------------------------------- + * execAsync.c + * Support functions for asynchronous query execution + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + *-------------------------------------------------------------------- + */ +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" +#include "storage/latch.h" + +extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node, + void *data, bool reinit); +extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, + long timeout); +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index c7deeac662..aca9e2bddd 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -59,6 +59,7 @@ #define EXEC_FLAG_MARK 0x0008 /* need mark/restore */ #define EXEC_FLAG_SKIP_TRIGGERS 0x0010 /* skip AfterTrigger calls */ #define EXEC_FLAG_WITH_NO_DATA 0x0020 /* rel scannability doesn't matter */ +#define EXEC_FLAG_ASYNC 0x0040 /* request async execution */ /* Hook for plugins to get control in ExecutorStart() */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 326d713ebf..71a233b41f 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, bool reinit); #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..853ba2b5ad 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root, typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); +typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, + bool reinit); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler @@ -190,6 +195,7 @@ typedef struct FdwRoutine GetForeignPlan_function GetForeignPlan; BeginForeignScan_function BeginForeignScan; IterateForeignScan_function IterateForeignScan; + IterateForeignScan_function IterateForeignScanAsync; ReScanForeignScan_function ReScanForeignScan; EndForeignScan_function EndForeignScan; @@ -242,6 +248,11 @@ typedef struct FdwRoutine InitializeDSMForeignScan_function InitializeDSMForeignScan; ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan; InitializeWorkerForeignScan_function InitializeWorkerForeignScan; + + /* Support functions for asynchronous execution */ + IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable; + ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; + ShutdownForeignScan_function ShutdownForeignScan; /* Support functions for path reparameterization. */ diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h index d113c271ee..177e6218cb 100644 --- a/src/include/nodes/bitmapset.h +++ b/src/include/nodes/bitmapset.h @@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b); extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper); extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b); extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b); +extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper); extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b); /* support for iterating through the integer elements of a set: */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 98e0072b8a..cd50494c74 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -938,6 +938,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate); * abstract superclass for all PlanState-type nodes. * ---------------- */ +typedef enum AsyncState +{ + AS_AVAILABLE, + AS_WAITING +} AsyncState; + typedef struct PlanState { NodeTag type; @@ -1026,6 +1032,11 @@ typedef struct PlanState bool outeropsset; bool inneropsset; bool resultopsset; + + /* Async subnode execution stuff */ + AsyncState asyncstate; + + int32 padding; /* to keep alignment of derived types */ } PlanState; /* ---------------- @@ -1221,14 +1232,21 @@ struct AppendState PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; - int as_whichplan; + int as_whichsyncplan; /* which sync plan is being executed */ int as_first_partial_plan; /* Index of 'appendplans' containing * the first partial plan */ + int as_nasyncplans; /* # of async-capable children */ ParallelAppendState *as_pstate; /* parallel coordination info */ Size pstate_len; /* size of parallel coordination info */ struct PartitionPruneState *as_prune_state; - Bitmapset *as_valid_subplans; + Bitmapset *as_valid_syncsubplans; bool (*choose_next_subplan) (AppendState *); + bool as_syncdone; /* all synchronous plans done? */ + Bitmapset *as_needrequest; /* async plans needing a new request */ + Bitmapset *as_pending_async; /* pending async plans */ + TupleTableSlot **as_asyncresult; /* results of each async plan */ + int as_nasyncresult; /* # of valid entries in as_asyncresult */ + bool as_exec_prune; /* runtime pruning needed for async exec? */ }; /* ---------------- @@ -1796,6 +1814,7 @@ typedef struct ForeignScanState Size pscan_len; /* size of parallel coordination information */ /* use struct pointer to avoid including fdwapi.h here */ struct FdwRoutine *fdwroutine; + bool fs_async; void *fdw_state; /* foreign-data wrapper can keep state here */ } ForeignScanState; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 83e01074ed..abad89b327 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -135,6 +135,11 @@ typedef struct Plan bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ + /* + * information needed for asynchronous execution + */ + bool async_capable; /* engage asynchronous execution logic? */ + /* * Common structural data for all Plan types. */ @@ -262,6 +267,10 @@ typedef struct Append /* Info for run-time subplan pruning; NULL if we're not doing that */ struct PartitionPruneInfo *part_prune_info; + + /* Async child node execution stuff */ + int nasyncplans; /* # async subplans, always at start of list */ + int referent; /* index of inheritance tree referent */ } Append; /* ---------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c55dc1481c..2259910637 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -887,7 +887,8 @@ typedef enum WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, - WAIT_EVENT_XACT_GROUP_UPDATE + WAIT_EVENT_XACT_GROUP_UPDATE, + WAIT_EVENT_ASYNC_WAIT } WaitEventIPC; /* ---------- -- 2.18.2 From 4c207a7901f0a9d05aacb5ce46a7f1daa83ce474 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Thu, 19 Oct 2017 17:24:07 +0900 Subject: [PATCH v3 3/3] async postgres_fdw --- contrib/postgres_fdw/connection.c | 28 + .../postgres_fdw/expected/postgres_fdw.out | 222 ++++--- contrib/postgres_fdw/postgres_fdw.c | 603 +++++++++++++++--- contrib/postgres_fdw/postgres_fdw.h | 2 + contrib/postgres_fdw/sql/postgres_fdw.sql | 20 +- 5 files changed, 694 insertions(+), 181 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 52d1fe3563..d9edc5e4de 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,6 +58,7 @@ typedef struct ConnCacheEntry bool invalidated; /* true if reconnect is pending */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + void *storage; /* connection specific storage */ } ConnCacheEntry; /* @@ -202,6 +203,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); + entry->storage = NULL; } /* @@ -215,6 +217,32 @@ GetConnection(UserMapping *user, bool will_prep_stmt) return entry->conn; } +/* + * Returns the connection specific storage for this user. Allocate with + * initsize if not exists. + */ +void * +GetConnectionSpecificStorage(UserMapping *user, size_t initsize) +{ + bool found; + ConnCacheEntry *entry; + ConnCacheKey key; + + /* Find storage using the same key with GetConnection */ + key = user->umid; + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + Assert(found); + + /* Create one if not yet. */ + if (entry->storage == NULL) + { + entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize); + memset(entry->storage, 0, initsize); + } + + return entry->storage; +} + /* * Connect to remote server using specified server and user mapping properties. */ diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 82fc1290ef..29aa09db8e 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6973,7 +6973,7 @@ INSERT INTO a(aa) VALUES('aaaaa'); INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+------- a | aaa @@ -7001,7 +7001,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7029,7 +7029,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7057,7 +7057,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | newtoo @@ -7127,35 +7127,41 @@ insert into bar2 values(3,33,33); insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for update; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo foo_1 - Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) + -> Seq Scan on public.foo foo_1 + Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid +(29 rows) -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; f1 | f2 ----+---- 1 | 11 @@ -7165,35 +7171,41 @@ select * from bar where f1 in (select f1 from foo) for update; (4 rows) explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for share; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo foo_1 - Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) + -> Seq Scan on public.foo foo_1 + Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid +(29 rows) -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; f1 | f2 ----+---- 1 | 11 @@ -7223,11 +7235,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo foo_1 - Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 + -> Seq Scan on public.foo foo_1 + Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid -> Hash Join Output: bar_1.f1, (bar_1.f2 + 100), bar_1.f3, bar_1.ctid, foo.ctid, foo.*, foo.tableoid Inner Unique: true @@ -7241,12 +7254,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo foo_1 - Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(39 rows) + -> Seq Scan on public.foo foo_1 + Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid +(41 rows) update bar set f2 = f2 + 100 where f1 in (select f1 from foo); select tableoid::regclass, * from bar order by 1,2; @@ -7276,16 +7290,17 @@ where bar.f1 = ss.f1; Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1)) Hash Cond: (foo.f1 = bar.f1) -> Append + Async subplans: 2 + -> Async Foreign Scan on public.foo2 foo_1 + Output: ROW(foo_1.f1), foo_1.f1 + Remote SQL: SELECT f1 FROM public.loct1 + -> Async Foreign Scan on public.foo2 foo_3 + Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3) + Remote SQL: SELECT f1 FROM public.loct1 -> Seq Scan on public.foo Output: ROW(foo.f1), foo.f1 - -> Foreign Scan on public.foo2 foo_1 - Output: ROW(foo_1.f1), foo_1.f1 - Remote SQL: SELECT f1 FROM public.loct1 -> Seq Scan on public.foo foo_2 Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3) - -> Foreign Scan on public.foo2 foo_3 - Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3) - Remote SQL: SELECT f1 FROM public.loct1 -> Hash Output: bar.f1, bar.f2, bar.ctid -> Seq Scan on public.bar @@ -7303,17 +7318,18 @@ where bar.f1 = ss.f1; Output: (ROW(foo.f1)), foo.f1 Sort Key: foo.f1 -> Append + Async subplans: 2 + -> Async Foreign Scan on public.foo2 foo_1 + Output: ROW(foo_1.f1), foo_1.f1 + Remote SQL: SELECT f1 FROM public.loct1 + -> Async Foreign Scan on public.foo2 foo_3 + Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3) + Remote SQL: SELECT f1 FROM public.loct1 -> Seq Scan on public.foo Output: ROW(foo.f1), foo.f1 - -> Foreign Scan on public.foo2 foo_1 - Output: ROW(foo_1.f1), foo_1.f1 - Remote SQL: SELECT f1 FROM public.loct1 -> Seq Scan on public.foo foo_2 Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3) - -> Foreign Scan on public.foo2 foo_3 - Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3) - Remote SQL: SELECT f1 FROM public.loct1 -(45 rows) +(47 rows) update bar set f2 = f2 + 100 from @@ -7463,27 +7479,33 @@ delete from foo where f1 < 5 returning *; (5 rows) explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; - QUERY PLAN ------------------------------------------------------------------------------- - Update on public.bar - Output: bar.f1, bar.f2 - Update on public.bar - Foreign Update on public.bar2 bar_1 - -> Seq Scan on public.bar - Output: bar.f1, (bar.f2 + 100), bar.ctid - -> Foreign Update on public.bar2 bar_1 - Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 -(8 rows) +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; + QUERY PLAN +-------------------------------------------------------------------------------------- + Sort + Output: u.f1, u.f2 + Sort Key: u.f1 + CTE u + -> Update on public.bar + Output: bar.f1, bar.f2 + Update on public.bar + Foreign Update on public.bar2 bar_1 + -> Seq Scan on public.bar + Output: bar.f1, (bar.f2 + 100), bar.ctid + -> Foreign Update on public.bar2 bar_1 + Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 + -> CTE Scan on u + Output: u.f1, u.f2 +(14 rows) -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; f1 | f2 ----+----- 1 | 311 2 | 322 - 6 | 266 3 | 333 4 | 344 + 6 | 266 7 | 277 (6 rows) @@ -8558,11 +8580,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J Sort Sort Key: t1.a, t3.c -> Append - -> Foreign Scan + Async subplans: 2 + -> Async Foreign Scan Relations: ((ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)) INNER JOIN (ftprt1_p1 t3_1) - -> Foreign Scan + -> Async Foreign Scan Relations: ((ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)) INNER JOIN (ftprt1_p2 t3_2) -(7 rows) +(8 rows) SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE t1.a% 25 =0 ORDER BY 1,2,3; a | b | c @@ -8597,20 +8620,22 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10) -- with whole-row reference; partitionwise join does not apply EXPLAIN (COSTS OFF) SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2; - QUERY PLAN --------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------- Sort Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2) -> Hash Full Join Hash Cond: (t1.a = t2.b) -> Append - -> Foreign Scan on ftprt1_p1 t1_1 - -> Foreign Scan on ftprt1_p2 t1_2 + Async subplans: 2 + -> Async Foreign Scan on ftprt1_p1 t1_1 + -> Async Foreign Scan on ftprt1_p2 t1_2 -> Hash -> Append - -> Foreign Scan on ftprt2_p1 t2_1 - -> Foreign Scan on ftprt2_p2 t2_2 -(11 rows) + Async subplans: 2 + -> Async Foreign Scan on ftprt2_p1 t2_1 + -> Async Foreign Scan on ftprt2_p2 t2_2 +(13 rows) SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2; wr | wr @@ -8639,11 +8664,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t Sort Sort Key: t1.a, t1.b -> Append - -> Foreign Scan + Async subplans: 2 + -> Async Foreign Scan Relations: (ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1) - -> Foreign Scan + -> Async Foreign Scan Relations: (ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2) -(7 rows) +(8 rows) SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE t1.a%25= 0 ORDER BY 1,2; a | b @@ -8696,21 +8722,23 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE -- test FOR UPDATE; partitionwise join does not apply EXPLAIN (COSTS OFF) SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1; - QUERY PLAN --------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------- LockRows -> Sort Sort Key: t1.a -> Hash Join Hash Cond: (t2.b = t1.a) -> Append - -> Foreign Scan on ftprt2_p1 t2_1 - -> Foreign Scan on ftprt2_p2 t2_2 + Async subplans: 2 + -> Async Foreign Scan on ftprt2_p1 t2_1 + -> Async Foreign Scan on ftprt2_p2 t2_2 -> Hash -> Append - -> Foreign Scan on ftprt1_p1 t1_1 - -> Foreign Scan on ftprt1_p2 t1_2 -(12 rows) + Async subplans: 2 + -> Async Foreign Scan on ftprt1_p1 t1_1 + -> Async Foreign Scan on ftprt1_p2 t1_2 +(14 rows) SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1; a | b @@ -8745,18 +8773,19 @@ ANALYZE fpagg_tab_p3; SET enable_partitionwise_aggregate TO false; EXPLAIN (COSTS OFF) SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1; - QUERY PLAN ------------------------------------------------------------ + QUERY PLAN +----------------------------------------------------------------- Sort Sort Key: pagg_tab.a -> HashAggregate Group Key: pagg_tab.a Filter: (avg(pagg_tab.b) < '22'::numeric) -> Append - -> Foreign Scan on fpagg_tab_p1 pagg_tab_1 - -> Foreign Scan on fpagg_tab_p2 pagg_tab_2 - -> Foreign Scan on fpagg_tab_p3 pagg_tab_3 -(9 rows) + Async subplans: 3 + -> Async Foreign Scan on fpagg_tab_p1 pagg_tab_1 + -> Async Foreign Scan on fpagg_tab_p2 pagg_tab_2 + -> Async Foreign Scan on fpagg_tab_p3 pagg_tab_3 +(10 rows) -- Plan with partitionwise aggregates is enabled SET enable_partitionwise_aggregate TO true; @@ -8767,13 +8796,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O Sort Sort Key: pagg_tab.a -> Append - -> Foreign Scan + Async subplans: 3 + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p1 pagg_tab) - -> Foreign Scan + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1) - -> Foreign Scan + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2) -(9 rows) +(10 rows) SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1; a | sum | min | count diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..b04b6a0e54 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,8 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" +#include "executor/nodeForeignscan.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -35,6 +37,7 @@ #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" +#include "pgstat.h" #include "postgres_fdw.h" #include "utils/builtins.h" #include "utils/float.h" @@ -56,6 +59,9 @@ PG_MODULE_MAGIC; /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Retrieve PgFdwScanState struct from ForeignScanState */ +#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state) + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -122,11 +128,29 @@ enum FdwDirectModifyPrivateIndex FdwDirectModifyPrivateSetProcessed }; +/* + * Connection common state - shared among all PgFdwState instances using the + * same connection. + */ +typedef struct PgFdwConnCommonState +{ + ForeignScanState *leader; /* leader node of this connection */ + bool busy; /* true if this connection is busy */ +} PgFdwConnCommonState; + +/* Execution state base type */ +typedef struct PgFdwState +{ + PGconn *conn; /* connection for the scan */ + PgFdwConnCommonState *commonstate; /* connection common state */ +} PgFdwState; + /* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ TupleDesc tupdesc; /* tuple descriptor of scan */ @@ -137,7 +161,6 @@ typedef struct PgFdwScanState List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -153,6 +176,12 @@ typedef struct PgFdwScanState /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + bool async; /* true if run asynchronously */ + bool queued; /* true if this node is in waiter queue */ + ForeignScanState *waiter; /* Next node to run a query among nodes + * sharing the same connection */ + ForeignScanState *last_waiter; /* last element in waiter queue. + * valid only on the leader node */ /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ @@ -166,11 +195,11 @@ typedef struct PgFdwScanState */ typedef struct PgFdwModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -197,6 +226,7 @@ typedef struct PgFdwModifyState */ typedef struct PgFdwDirectModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ @@ -326,6 +356,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); +static void postgresShutdownForeignScan(ForeignScanState *node); static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); @@ -391,6 +422,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static bool postgresForeignAsyncConfigureWait(ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, bool reinit); /* * Helper functions @@ -419,7 +454,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); +static void request_more_data(ForeignScanState *node); +static void fetch_received_data(ForeignScanState *node); +static void vacate_connection(PgFdwState *fdwconn, bool clear_queue); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *create_foreign_modify(EState *estate, RangeTblEntry *rte, @@ -522,6 +559,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; + routine->ShutdownForeignScan = postgresShutdownForeignScan; /* Functions for updating foreign tables */ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; @@ -558,6 +596,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for async execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + PG_RETURN_POINTER(routine); } @@ -1434,12 +1476,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->s.conn = GetConnection(user, false); + fsstate->s.commonstate = (PgFdwConnCommonState *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState)); + fsstate->s.commonstate->leader = NULL; + fsstate->s.commonstate->busy = false; + fsstate->waiter = NULL; + fsstate->last_waiter = node; /* Assign a unique ID for my cursor */ - fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_number = GetCursorNumber(fsstate->s.conn); fsstate->cursor_exists = false; + /* Initialize async execution status */ + fsstate->async = false; + fsstate->queued = false; + /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); @@ -1487,40 +1539,244 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_values); } +/* + * Async queue manipulation functions + */ + +/* + * add_async_waiter: + * + * Enqueue node if it isn't in the queue. Immediately send request it if the + * underlying connection is not busy. + */ +static inline void +add_async_waiter(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *leader = fsstate->s.commonstate->leader; + + /* + * Do nothing if the node is already in the queue or already eof'ed. + * Note: leader node is not marked as queued. + */ + if (leader == node || fsstate->queued || fsstate->eof_reached) + return; + + if (leader == NULL) + { + /* no leader means not busy, send request immediately */ + request_more_data(node); + } + else + { + /* the connection is busy, queue the node */ + PgFdwScanState *leader_state = GetPgFdwScanState(leader); + PgFdwScanState *last_waiter_state + = GetPgFdwScanState(leader_state->last_waiter); + + last_waiter_state->waiter = node; + leader_state->last_waiter = node; + fsstate->queued = true; + } +} + +/* + * move_to_next_waiter: + * + * Make the first waiter be the next leader + * Returns the new leader or NULL if there's no waiter. + */ +static inline ForeignScanState * +move_to_next_waiter(ForeignScanState *node) +{ + PgFdwScanState *leader_state = GetPgFdwScanState(node); + ForeignScanState *next_leader = leader_state->waiter; + + Assert(leader_state->s.commonstate->leader = node); + + if (next_leader) + { + /* the first waiter becomes the next leader */ + PgFdwScanState *next_leader_state = GetPgFdwScanState(next_leader); + next_leader_state->last_waiter = leader_state->last_waiter; + next_leader_state->queued = false; + } + + leader_state->waiter = NULL; + leader_state->s.commonstate->leader = next_leader; + + return next_leader; +} + +/* + * Remove the node from waiter queue. + * + * Remaining results are cleared if the node is a busy leader. + * This intended to be used during node shutdown. + */ +static inline void +remove_async_node(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *leader = fsstate->s.commonstate->leader; + PgFdwScanState *leader_state; + ForeignScanState *prev; + PgFdwScanState *prev_state; + ForeignScanState *cur; + + /* no need to remove me */ + if (!leader || !fsstate->queued) + return; + + leader_state = GetPgFdwScanState(leader); + + if (leader == node) + { + /* It's the leader */ + ForeignScanState *next_leader; + + if (leader_state->s.commonstate->busy) + { + /* + * this node is waiting for result, absorb the result first so + * that the following commands can be sent on the connection. + */ + PgFdwScanState *leader_state = GetPgFdwScanState(leader); + PGconn *conn = leader_state->s.conn; + + while(PQisBusy(conn)) + PQclear(PQgetResult(conn)); + + leader_state->s.commonstate->busy = false; + } + + move_to_next_waiter(node); + + return; + } + + /* + * Just remove the node from the queue + * + * Nodes don't have a link to the previous node but anyway this function is + * called on the shutdown path, so we don't bother seeking for faster way + * to do this. + */ + prev = leader; + prev_state = leader_state; + cur = GetPgFdwScanState(prev)->waiter; + while (cur) + { + PgFdwScanState *curstate = GetPgFdwScanState(cur); + + if (cur == node) + { + prev_state->waiter = curstate->waiter; + + /* relink to the previous node if the last node was removed */ + if (leader_state->last_waiter == cur) + leader_state->last_waiter = prev; + + fsstate->queued = false; + + return; + } + prev = cur; + prev_state = curstate; + cur = curstate->waiter; + } +} + /* * postgresIterateForeignScan - * Retrieve next row from the result set, or clear tuple slot to indicate - * EOF. + * Retrieve next row from the result set. + * + * For synchronous nodes, returns clear tuple slot means EOF. + * + * For asynchronous nodes, if clear tuple slot is returned, the caller + * needs to check async state to tell if all tuples received + * (AS_AVAILABLE) or waiting for the next data to come (AS_WAITING). */ static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; - /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. - */ - if (!fsstate->cursor_exists) - create_cursor(node); - - /* - * Get some more tuples, if we've run out. - */ + if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached) + { + /* we've run out, get some more tuples */ + if (!node->fs_async) + { + /* + * finish the running query before sending the next command for + * this node + */ + if (!fsstate->s.commonstate->busy) + vacate_connection((PgFdwState *)fsstate, false); + + request_more_data(node); + + /* Fetch the result immediately. */ + fetch_received_data(node); + } + else if (!fsstate->s.commonstate->busy) + { + /* If the connection is not busy, just send the request. */ + request_more_data(node); + } + else + { + /* The connection is busy, queue the request */ + bool available = true; + ForeignScanState *leader = fsstate->s.commonstate->leader; + PgFdwScanState *leader_state = GetPgFdwScanState(leader); + + /* queue the requested node */ + add_async_waiter(node); + + /* + * The request for the next node cannot be sent before the leader + * responds. Finish the current leader if possible. + */ + if (PQisBusy(leader_state->s.conn)) + { + int rc = WaitLatchOrSocket(NULL, + WL_SOCKET_READABLE | WL_TIMEOUT | + WL_EXIT_ON_PM_DEATH, + PQsocket(leader_state->s.conn), 0, + WAIT_EVENT_ASYNC_WAIT); + if (!(rc & WL_SOCKET_READABLE)) + available = false; + } + + /* fetch the leader's data and enqueue it for the next request */ + if (available) + { + fetch_received_data(leader); + add_async_waiter(leader); + } + } + } + if (fsstate->next_tuple >= fsstate->num_tuples) { - /* No point in another fetch if we already detected EOF, though. */ - if (!fsstate->eof_reached) - fetch_more_data(node); - /* If we didn't get any tuples, must be end of data. */ - if (fsstate->next_tuple >= fsstate->num_tuples) - return ExecClearTuple(slot); + /* + * We haven't received a result for the given node this time, return + * with no tuple to give way to another node. + */ + if (fsstate->eof_reached) + node->ss.ps.asyncstate = AS_AVAILABLE; + else + node->ss.ps.asyncstate = AS_WAITING; + + return ExecClearTuple(slot); } /* * Return the next tuple. */ + node->ss.ps.asyncstate = AS_AVAILABLE; ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++], slot, false); @@ -1535,7 +1791,7 @@ postgresIterateForeignScan(ForeignScanState *node) static void postgresReScanForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); char sql[64]; PGresult *res; @@ -1543,6 +1799,8 @@ postgresReScanForeignScan(ForeignScanState *node) if (!fsstate->cursor_exists) return; + vacate_connection((PgFdwState *)fsstate, true); + /* * If any internal parameters affecting this node have changed, we'd * better destroy and recreate the cursor. Otherwise, rewinding it should @@ -1571,9 +1829,9 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql); PQclear(res); /* Now force a fresh FETCH. */ @@ -1591,7 +1849,7 @@ postgresReScanForeignScan(ForeignScanState *node) static void postgresEndForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) @@ -1599,15 +1857,31 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->s.conn, fsstate->cursor_number); /* Release remote connection */ - ReleaseConnection(fsstate->conn); - fsstate->conn = NULL; + ReleaseConnection(fsstate->s.conn); + fsstate->s.conn = NULL; /* MemoryContexts will be deleted automatically. */ } +/* + * postgresShutdownForeignScan + * Remove asynchrony stuff and cleanup garbage on the connection. + */ +static void +postgresShutdownForeignScan(ForeignScanState *node) +{ + ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; + + if (plan->operation != CMD_SELECT) + return; + + /* remove the node from waiting queue */ + remove_async_node(node); +} + /* * postgresAddForeignUpdateTargets * Add resjunk column(s) needed for update/delete on a foreign table @@ -2372,7 +2646,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->s.conn = GetConnection(user, false); + dmstate->s.commonstate = (PgFdwConnCommonState *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState)); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2457,7 +2733,11 @@ postgresIterateDirectModify(ForeignScanState *node) * If this is the first call after Begin, execute the statement. */ if (dmstate->num_tuples == -1) + { + /* finish running query to send my command */ + vacate_connection((PgFdwState *)dmstate, true); execute_dml_stmt(node); + } /* * If the local query doesn't specify RETURNING, just clear tuple slot. @@ -2504,8 +2784,8 @@ postgresEndDirectModify(ForeignScanState *node) PQclear(dmstate->result); /* Release remote connection */ - ReleaseConnection(dmstate->conn); - dmstate->conn = NULL; + ReleaseConnection(dmstate->s.conn); + dmstate->s.conn = NULL; /* MemoryContext will be deleted automatically. */ } @@ -2703,6 +2983,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_param_join_conds; StringInfoData sql; PGconn *conn; + PgFdwConnCommonState *commonstate; Selectivity local_sel; QualCost local_cost; List *fdw_scan_tlist = NIL; @@ -2747,6 +3028,18 @@ estimate_path_cost_size(PlannerInfo *root, /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false); + commonstate = GetConnectionSpecificStorage(fpinfo->user, + sizeof(PgFdwConnCommonState)); + if (commonstate) + { + PgFdwState tmpstate; + tmpstate.conn = conn; + tmpstate.commonstate = commonstate; + + /* finish running query to send my command */ + vacate_connection(&tmpstate, true); + } + get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3317,11 +3610,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, static void create_cursor(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; StringInfoData buf; PGresult *res; @@ -3384,50 +3677,120 @@ create_cursor(ForeignScanState *node) } /* - * Fetch some more rows from the node's cursor. + * Sends the next request of the node. If the given node is different from the + * current connection leader, pushes it back to waiter queue and let the given + * node be the leader. */ static void -fetch_more_data(ForeignScanState *node) +request_more_data(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *leader = fsstate->s.commonstate->leader; + PGconn *conn = fsstate->s.conn; + char sql[64]; + + /* must be non-busy */ + Assert(!fsstate->s.commonstate->busy); + /* must be not-eof'ed */ + Assert(!fsstate->eof_reached); + + /* + * If this is the first call after Begin or ReScan, we need to create the + * cursor on the remote side. + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); + + fsstate->s.commonstate->busy = true; + + /* The node is the current leader, just return. */ + if (leader == node) + return; + + /* Let the node be the leader */ + if (leader != NULL) + { + remove_async_node(node); + fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter; + fsstate->waiter = leader; + } + else + { + fsstate->last_waiter = node; + fsstate->waiter = NULL; + } + + fsstate->s.commonstate->leader = node; +} + +/* + * Fetches received data and automatically send requests of the next waiter. + */ +static void +fetch_received_data(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); PGresult *volatile res = NULL; MemoryContext oldcontext; + ForeignScanState *waiter; + + /* I should be the current connection leader */ + Assert(fsstate->s.commonstate->leader == node); /* * We'll store the tuples in the batch_cxt. First, flush the previous - * batch. + * batch if no tuple is remaining */ - fsstate->tuples = NULL; - MemoryContextReset(fsstate->batch_cxt); + if (fsstate->next_tuple >= fsstate->num_tuples) + { + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + MemoryContextReset(fsstate->batch_cxt); + } + else if (fsstate->next_tuple > 0) + { + /* There's some remains. Move them to the beginning of the store */ + int n = 0; + + while(fsstate->next_tuple < fsstate->num_tuples) + fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++]; + fsstate->num_tuples = n; + } + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; char sql[64]; - int numrows; + int addrows; + size_t newsize; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); - - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ + res = pgfdw_get_result(conn, fsstate->query); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - fsstate->num_tuples = numrows; - fsstate->next_tuple = 0; + addrows = PQntuples(res); + newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple); + if (fsstate->tuples) + fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize); + else + fsstate->tuples = (HeapTuple *) palloc(newsize); - for (i = 0; i < numrows; i++) + for (i = 0; i < addrows; i++) { Assert(IsA(node->ss.ps.plan, ForeignScan)); - fsstate->tuples[i] = + fsstate->tuples[fsstate->num_tuples + i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, @@ -3437,22 +3800,73 @@ fetch_more_data(ForeignScanState *node) } /* Update fetch_ct_2 */ - if (fsstate->fetch_ct_2 < 2) + if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0) fsstate->fetch_ct_2++; + fsstate->next_tuple = 0; + fsstate->num_tuples += addrows; + /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fsstate->fetch_size); + fsstate->eof_reached = (addrows < fsstate->fetch_size); } PG_FINALLY(); { + fsstate->s.commonstate->busy = false; + if (res) PQclear(res); } PG_END_TRY(); + /* let the first waiter be the next leader of this connection */ + waiter = move_to_next_waiter(node); + + /* send the next request if any */ + if (waiter) + request_more_data(waiter); + MemoryContextSwitchTo(oldcontext); } +/* + * Vacate the underlying connection so that this node can send the next query. + */ +static void +vacate_connection(PgFdwState *fdwstate, bool clear_queue) +{ + PgFdwConnCommonState *commonstate = fdwstate->commonstate; + ForeignScanState *leader; + + Assert(commonstate != NULL); + + /* just return if the connection is already available */ + if (commonstate->leader == NULL || !commonstate->busy) + return; + + /* + * let the current connection leader read all of the result for the running + * query + */ + leader = commonstate->leader; + fetch_received_data(leader); + + /* let the first waiter be the next leader of this connection */ + move_to_next_waiter(leader); + + if (!clear_queue) + return; + + /* Clear the waiting list */ + while (leader) + { + PgFdwScanState *fsstate = GetPgFdwScanState(leader); + + fsstate->last_waiter = NULL; + leader = fsstate->waiter; + fsstate->waiter = NULL; + } +} + /* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. @@ -3566,7 +3980,9 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->s.conn = GetConnection(user, true); + fmstate->s.commonstate = (PgFdwConnCommonState *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState)); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -3653,6 +4069,9 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + /* finish running query to send my command */ + vacate_connection((PgFdwState *)fmstate, true); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -3680,14 +4099,14 @@ execute_foreign_modify(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -3695,10 +4114,10 @@ execute_foreign_modify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -3734,7 +4153,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", - GetPrepStmtNumber(fmstate->conn)); + GetPrepStmtNumber(fmstate->s.conn)); p_name = pstrdup(prep_name); /* @@ -3744,12 +4163,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. */ - if (!PQsendPrepare(fmstate->conn, + if (!PQsendPrepare(fmstate->s.conn, p_name, fmstate->query, 0, NULL)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -3757,9 +4176,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ @@ -3888,16 +4307,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql); PQclear(res); fmstate->p_name = NULL; } /* Release remote connection */ - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; + ReleaseConnection(fmstate->s.conn); + fmstate->s.conn = NULL; } /* @@ -4056,9 +4475,9 @@ execute_dml_stmt(ForeignScanState *node) * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ - if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams, NULL, values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query); /* * Get the result, and check for success. @@ -4066,10 +4485,10 @@ execute_dml_stmt(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, + pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true, dmstate->query); /* Get the number of rows affected. */ @@ -5560,6 +5979,40 @@ postgresGetForeignJoinPaths(PlannerInfo *root, /* XXX Consider parameterized paths for the join relation */ } +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + return true; +} + + +/* + * Configure waiting event. + * + * Add wait event so that the ForeignScan node is going to wait for. + */ +static bool +postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes, + void *caller_data, bool reinit) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + + + /* Reinit is not supported for now. */ + Assert(reinit); + + if (fsstate->s.commonstate->leader == node) + { + AddWaitEventToSet(wes, + WL_SOCKET_READABLE, PQsocket(fsstate->s.conn), + NULL, caller_data); + return true; + } + + return false; +} + + /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..96af75a33e 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo UserMapping *user; /* only set in use_remote_estimate mode */ int fetch_size; /* fetch size for this remote table */ + bool allow_prefetch; /* true to allow overlapped fetching */ /* * Name of the relation, for use while EXPLAINing ForeignScan. It is used @@ -130,6 +131,7 @@ extern void reset_transmission_modes(int nestlevel); /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 83971665e3..359208a12a 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1780,25 +1780,25 @@ INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; @@ -1840,12 +1840,12 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) @@ -1904,8 +1904,8 @@ explain (verbose, costs off) delete from foo where f1 < 5 returning *; delete from foo where f1 < 5 returning *; explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; -- Test that UPDATE/DELETE with inherited target works with row-level triggers CREATE TRIGGER trig_row_before -- 2.18.2
pgsql-hackers by date: