Re: Introducing coarse grain parallelism by postgres_fdw. - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Introducing coarse grain parallelism by postgres_fdw.
Date
Msg-id 20140801.181055.143272972.horiguchi.kyotaro@lab.ntt.co.jp
Whole thread Raw
In response to Re: Introducing coarse grain parallelism by postgres_fdw.  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Responses Re: Introducing coarse grain parallelism by postgres_fdw.
List pgsql-hackers
Hello, this is the new version which is complete to some extent
of parallelism based on postgres_fdw.

This compares the costs for parallel and non-parallel execution
and choose parallel one if it is faster by some extent specified
by GUCs. The attached files are,
0001_parallel_exec_planning_v0.patch:  - PostgreSQL body stuff for parallel execution planning.
0002_enable_postgres_fdw_to_run_in_parallel_v0.patch:  - postgres_fdw parallelization.
0003_file_fdw_changes_to_avoid_error.patch:  - error avoidig stuff for file_fdw (not necessary for this patch)
env.sql:  - simple test script to try this patch.

=====
- planner stuff to handle cost of parallel execution. Including  indication of parallel execution.
- GUCs to control how easy to go parallel.
  parallel_cost_threshold is the threshold of path total cost  where to enable parallel execution.
  prallel_ratio_threshond is the threshold of the ratio of  parallel cost to non-parallel cost where to choose the
parallelpath.
 
- postgres_fdw which can run in multiple sessions using snapshot  export and fetches in parallel for foreign scans on
dedicated connections.
 
  foreign server has a new option 'max_aux_connections', which  limits the number of connections for parallel execution
per (server, user) pairs.
 
- change file_fdw to follow the changes of planner stuff.


Whth the patch attached, the attached sql script shows the
following result (after some line breaks are added).

postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c          FROM fvs1 a join fvs1_2 b on (a.a = b.a);
        QUERY PLAN
 
----------------------------------------------------------------------------
Hash Join  (cost=9573392.96..9573393.34 rows=1 width=40 parallel)          (actual time=2213.400..2213.407 rows=12
loops=1)HashCond: (a.a = b.a)->  Foreign Scan on fvs1 a          (cost=9573392.96..9573393.29 rows=10 width=8 parallel)
        (actual time=2199.992..2199.993 rows=10 loops=1)->  Hash  (cost=9573393.29..9573393.29 rows=10 width=36)
 (actual time=13.388..13.388 rows=10 loops=1)      Buckets: 1024  Batches: 1  Memory Usage: 6kB      ->  Foreign Scan
onfvs1_2 b                   (cost=9573392.96..9573393.29 rows=10 width=36 parallel)                  (actual
time=13.376..13.379rows=10 loops=1)Planning time: 4.761 msExecution time: 2227.462 ms
 
(8 rows)
postgres=# SET parallel_ratio_threshold to 0.0;
postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c          FROM fvs1 a join fvs1 b on (a.a = b.a);
      QUERY PLAN
 
------------------------------------------------------------------------------Hash Join  (cost=318084.32..318084.69
rows=1width=40)           (actual time=4302.913..4302.928 rows=12 loops=1)  Hash Cond: (a.a = b.a)  ->  Foreign Scan on
fvs1a  (cost=159041.93..159042.26 rows=10 width=8)                              (actual time=2122.989..2122.992 rows=10
loops=1) ->  Hash  (cost=159042.26..159042.26 rows=10 width=500)            (actual time=2179.900..2179.900 rows=10
loops=1)       Buckets: 1024  Batches: 1  Memory Usage: 6kB        ->  Foreign Scan on fvs1 b
(cost=159041.93..159042.26rows=10 width=500)                  (actual time=2179.856..2179.864 rows=10 loops=1)Planning
time:5.085 msExecution time: 4303.728 ms
 
(8 rows)

Where, "parallel" indicates that the node includes nodes run in
parallel. The latter EXPLAIN shows the result when parallel
execution is inhibited.

Since the lack of time, sorry that the details for this patch is
comming later.

Is there any suggestions or opinions?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..d810c3c 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1168,16 +1168,18 @@ ExplainNode(PlanState *planstate, List *ancestors,    {        if (es->format ==
EXPLAIN_FORMAT_TEXT)       {
 
-            appendStringInfo(es->str, "  (cost=%.2f..%.2f rows=%.0f width=%d)",
+            appendStringInfo(es->str, "  (cost=%.2f..%.2f rows=%.0f width=%d%s)",
plan->startup_cost,plan->total_cost,
 
-                             plan->plan_rows, plan->plan_width);
+                             plan->plan_rows, plan->plan_width,
+                             plan->parallel_start ? " parallel" : "");        }        else        {
ExplainPropertyFloat("StartupCost", plan->startup_cost, 2, es);            ExplainPropertyFloat("Total Cost",
plan->total_cost,2, es);            ExplainPropertyFloat("Plan Rows", plan->plan_rows, 0, es);
 
-            ExplainPropertyInteger("Plan Width", plan->plan_width, es);
+            ExplainPropertyText("Parallel Exec",
+                                plan->parallel_start ? "true" : "false", es);        }    }
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c81efe9..85d78b4 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -53,6 +53,8 @@ typedef struct pushdown_safety_info/* These parameters are set by GUC */bool        enable_geqo =
false;   /* just in case GUC doesn't set it */int            geqo_threshold;
 
+double        parallel_cost_threshold = 10000;
+double        parallel_ratio_threshold = 0.6;/* Hook for plugins to replace standard_join_search()
*/join_search_hook_typejoin_search_hook = NULL;
 
@@ -112,6 +114,104 @@ static void recurse_push_qual(Node *setOp, Query *topquery,                  RangeTblEntry *rte,
Indexrti, Node *qual);static void remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel);
 
+/*
+ * choose_parallel_walker -- Walk and make whole path tree decide whichever
+ * doing parallel execution or not.
+ */
+static
+void choose_parallel_walker(Path *path, bool run_parallel)
+{
+    ListCell *lc;
+    List *subpaths = NIL;
+    Path *left = NULL, *right = NULL;
+
+    switch (nodeTag(path))
+    {
+    case T_Path:
+    case T_IndexPath:
+    case T_BitmapHeapPath:
+    case T_TidPath:
+    case T_BitmapAndPath:
+    case T_BitmapOrPath:
+    case T_ResultPath:
+        /* These paths is imparallelizable so far, return immediately. */
+        return;
+    case T_ForeignPath: /**/
+        break;
+    case T_NestPath: /**/
+        left  = ((NestPath*)path)->outerjoinpath;
+        right = ((NestPath*)path)->innerjoinpath;
+        break;
+    case T_MergePath: /**/
+        left  = ((MergePath*)path)->jpath.outerjoinpath;
+        right = ((MergePath*)path)->jpath.innerjoinpath;
+        break;
+    case T_HashPath: /**/
+        left  = ((HashPath*)path)->jpath.outerjoinpath;
+        right = ((HashPath*)path)->jpath.innerjoinpath;
+        break;
+    case T_AppendPath: /**/
+        subpaths = ((AppendPath*)path)->subpaths;
+        break;
+    case T_MergeAppendPath: /**/
+        subpaths = ((MergeAppendPath*)path)->subpaths;
+        break;
+    case T_MaterialPath: /**/
+        left = ((MaterialPath*)path)->subpath;
+    case T_UniquePath: /**/
+        left = ((UniquePath*)path)->subpath;
+        break;
+    default:
+        elog(ERROR, "unrecognized path node type: %d",
+             (int) nodeTag(path));
+        break;
+    }
+
+    /* Make this node can be treated ignoreing parallel costs hereafter. */
+    if (run_parallel)
+    {
+        path->startup_cost = path->pstartup_cost;
+        path->total_cost = path->ptotal_cost;
+    }
+    else
+    {
+        path->parallel = false;
+    }
+
+    /* Walk into lower level */
+    if (left)  choose_parallel_walker(left, run_parallel);
+    if (right) choose_parallel_walker(right, run_parallel);
+    foreach (lc, subpaths)
+    {
+        choose_parallel_walker((Path*) lfirst(lc), run_parallel);
+    }
+
+}
+
+/*
+ * choose_parallel_walker --- Decide whether this relation should run in
+ * parallel or not.
+ * 
+ * The relation has path tree having costs for both parallel and non-parallel
+ * execution and we can decide only by looking into the top node of the path
+ * tree to decide whether to do parallel or not. Then make the whole tree to
+ * be handled ignoring parallel execution stuff.
+ */
+void
+choose_parallel_scans(RelOptInfo *rel)
+{
+    ListCell *lc;
+
+    foreach (lc, rel->pathlist)
+    {
+        Path *path = (Path*) lfirst(lc);
+        bool run_parallel = (path->parallel &&
+                 path->total_cost > parallel_cost_threshold &&
+                 path->ptotal_cost < path->total_cost * parallel_ratio_threshold);
+
+        choose_parallel_walker(path, run_parallel);
+    }
+}/* * make_one_rel
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 0cdb790..1f5bcd8 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1334,6 +1334,74 @@ cost_sort(Path *path, PlannerInfo *root,}/*
+ * cost_append
+ *      Determines and returns the cost of a Append node.
+ *
+ * We charge nothing extra for the Append itself, which perhaps is too
+ * optimistic, but since it doesn't do any selection or projection, it is a
+ * pretty cheap node.  If you change this, see also make_append().
+ */
+void
+cost_append(Path *path, List *subpaths)
+{
+    double max_par_startup = 0.0;
+    double par_running = 0.0;
+    double nonpar_startup = -1.0;
+    double nonpar_running = 0.0;
+    ListCell *lc;
+
+    path->rows = 0;
+    path->startup_cost = 0;
+    path->total_cost = 0;
+    foreach(lc, subpaths)
+    {
+        Path       *subpath = (Path *) lfirst(lc);
+        
+        path->rows += subpath->rows;
+        
+        if (lc == list_head(subpaths)) /* first node? */
+        {
+            path->startup_cost = subpath->startup_cost;
+            path->parallel = subpath->parallel;
+        }
+
+        path->total_cost += subpath->total_cost;
+    }
+
+    if (! path->parallel) return;
+
+    /* calculate cost for parallel execution */
+    foreach (lc, subpaths)
+    {
+        Path *subpath = (Path*)lfirst (lc);
+
+        if (subpath->parallel)
+        {
+            if (max_par_startup < subpath->pstartup_cost)
+                max_par_startup = subpath->pstartup_cost;
+            par_running += subpath->ptotal_cost - subpath->pstartup_cost;
+        }
+        else
+        {
+            if (nonpar_startup < 0)
+            {
+                nonpar_startup = subpath->startup_cost;
+                nonpar_running = subpath->total_cost - subpath->startup_cost;
+            }
+            else
+            {
+                nonpar_running += subpath->total_cost;
+            }
+        }
+    }
+    
+    path->pstartup_cost = (max_par_startup < nonpar_startup ?
+                         nonpar_startup : max_par_startup);
+    path->ptotal_cost = path->pstartup_cost +
+        (par_running < nonpar_running ? nonpar_running : par_running);
+}
+
+/* * cost_merge_append *      Determines and returns the cost of a MergeAppend node. *
@@ -1356,12 +1424,17 @@ cost_sort(Path *path, PlannerInfo *root, * 'n_streams' is the number of input streams *
'input_startup_cost'is the sum of the input streams' startup costs * 'input_total_cost' is the sum of the input
streams'total costs
 
+ * 'parallel_estimate' is whether to estimate for parallel execution
+ * 'input_pstart_cost' is the sum of the input streams' parallel startup costs
+ * 'input_ptotal_cost' is the sum of the input streams' parallel total costs * 'tuples' is the number of tuples in all
thestreams */voidcost_merge_append(Path *path, PlannerInfo *root,                  List *pathkeys, int n_streams,
          Cost input_startup_cost, Cost input_total_cost,
 
+                  bool parallel_estimate,
+                  Cost input_pstartup_cost, Cost input_ptotal_cost,                  double tuples){    Cost
startup_cost= 0;
 
@@ -1395,6 +1468,13 @@ cost_merge_append(Path *path, PlannerInfo *root,    path->startup_cost = startup_cost +
input_startup_cost;   path->total_cost = startup_cost + run_cost + input_total_cost;
 
+
+    if (parallel_estimate)
+    {
+        path->pstartup_cost = startup_cost + input_pstartup_cost;
+        path->ptotal_cost = startup_cost + run_cost + input_ptotal_cost;
+        path->parallel = true;
+    }}/*
@@ -1648,6 +1728,54 @@ cost_group(Path *path, PlannerInfo *root,    path->total_cost = total_cost;}
+
+/*
+ * Set the parallel costs for the workspace as non-parallel node.
+ */
+static void
+clear_cost_parallel(JoinCostWorkspace *workspace)
+{
+    workspace->pstartup_cost = workspace->startup_cost;
+    workspace->ptotal_cost = workspace->total_cost;
+    workspace->parallel = false;
+}
+
+/*
+ * Set the initial cost numbers of parallel node for join estimation.
+ * This should be called only if the path node can executed in parallel.
+ */
+static void
+initial_cost_parallel(JoinCostWorkspace *workspace,
+                      Cost outer_pstartup_cost,
+                      Cost inner_pstartup_cost)
+{
+    double pstartup =
+        (outer_pstartup_cost > inner_pstartup_cost ?
+         outer_pstartup_cost : inner_pstartup_cost);
+    workspace->ptotal_cost = 
+        workspace->total_cost -    (workspace->startup_cost - pstartup);
+    workspace->pstartup_cost = pstartup;
+    workspace->parallel = true;
+}
+
+/*
+ * Set the final cost numbers of parallel node for join estimation.
+ * This does nothing if this node won't be executed in parallel.
+ */
+static void
+final_cost_parallel(Path *path, JoinCostWorkspace *workspace,
+                    Cost startup_cost, Cost run_cost)
+{
+    if (workspace->parallel)
+    {
+        path->pstartup_cost =
+            startup_cost + workspace->pstartup_cost;
+        path->ptotal_cost =
+            path->pstartup_cost + run_cost;
+        path->parallel = true;
+    }
+}
+/* * initial_cost_nestloop *      Preliminary estimate of the cost of a nestloop join path.
@@ -1765,6 +1893,14 @@ initial_cost_nestloop(PlannerInfo *root, JoinCostWorkspace *workspace,    /* Save private data
forfinal_cost_nestloop */    workspace->run_cost = run_cost;    workspace->inner_rescan_run_cost =
inner_rescan_run_cost;
+
+    clear_cost_parallel(workspace);
+    if (outer_path->parallel)
+    {
+        initial_cost_parallel(workspace,
+                              outer_path->pstartup_cost,
+                              inner_path->pstartup_cost);
+    }}/*
@@ -1786,7 +1922,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path,    Path       *inner_path =
path->innerjoinpath;   double        outer_path_rows = outer_path->rows;    double        inner_path_rows =
inner_path->rows;
-    Cost        startup_cost = workspace->startup_cost;
+    Cost        startup_cost = 0.0;    Cost        run_cost = workspace->run_cost;    Cost
inner_rescan_run_cost= workspace->inner_rescan_run_cost;    Cost        cpu_per_tuple;
 
@@ -1861,8 +1997,10 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path,    cpu_per_tuple = cpu_tuple_cost +
restrict_qual_cost.per_tuple;   run_cost += cpu_per_tuple * ntuples;
 
-    path->path.startup_cost = startup_cost;
-    path->path.total_cost = startup_cost + run_cost;
+    path->path.startup_cost = startup_cost + workspace->startup_cost;
+    path->path.total_cost = startup_cost + workspace->startup_cost + run_cost;
+
+    final_cost_parallel(&path->path, workspace, startup_cost, run_cost);}/*
@@ -1917,6 +2055,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
innerstartsel,               innerendsel;    Path        sort_path;        /* dummy for result of cost_sort */
 
+    double         outer_startup_cost,
+                inner_startup_cost;    /* Protect some assumptions below that rowcounts aren't zero or NaN */    if
(outer_path_rows<= 0 || isnan(outer_path_rows))
 
@@ -2035,6 +2175,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,                  0.0,
            work_mem,                  -1.0);
 
+        outer_startup_cost = sort_path.startup_cost;        startup_cost += sort_path.startup_cost;
startup_cost+= (sort_path.total_cost - sort_path.startup_cost)            * outerstartsel;
 
@@ -2043,6 +2184,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,    }    else    {
+        outer_startup_cost = outer_path->startup_cost;        startup_cost += outer_path->startup_cost;
startup_cost+= (outer_path->total_cost - outer_path->startup_cost)            * outerstartsel;
 
@@ -2061,6 +2203,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,                  0.0,
            work_mem,                  -1.0);
 
+        inner_startup_cost = sort_path.startup_cost;        startup_cost += sort_path.startup_cost;
startup_cost+= (sort_path.total_cost - sort_path.startup_cost)            * innerstartsel;
 
@@ -2069,6 +2212,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,    }    else    {
+        inner_startup_cost = inner_path->startup_cost;        startup_cost += inner_path->startup_cost;
startup_cost+= (inner_path->total_cost - inner_path->startup_cost)            * innerstartsel;
 
@@ -2096,6 +2240,16 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,    workspace->inner_rows
=inner_rows;    workspace->outer_skip_rows = outer_skip_rows;    workspace->inner_skip_rows = inner_skip_rows;
 
+
+    clear_cost_parallel(workspace);
+    if (outer_path->parallel)
+    {
+        initial_cost_parallel(workspace, 
+                              outer_startup_cost - outer_path->total_cost + 
+                              outer_path->ptotal_cost,
+                              inner_startup_cost - inner_path->total_cost + 
+                              inner_path->ptotal_cost);
+    }}/*
@@ -2128,7 +2282,7 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,    double        inner_path_rows =
inner_path->rows;   List       *mergeclauses = path->path_mergeclauses;    List       *innersortkeys =
path->innersortkeys;
-    Cost        startup_cost = workspace->startup_cost;
+    Cost        startup_cost = 0;    Cost        run_cost = workspace->run_cost;    Cost        inner_run_cost =
workspace->inner_run_cost;   double        outer_rows = workspace->outer_rows;
 
@@ -2320,8 +2474,10 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,    cpu_per_tuple = cpu_tuple_cost +
qp_qual_cost.per_tuple;   run_cost += cpu_per_tuple * mergejointuples;
 
-    path->jpath.path.startup_cost = startup_cost;
-    path->jpath.path.total_cost = startup_cost + run_cost;
+    path->jpath.path.startup_cost = startup_cost + workspace->startup_cost;
+    path->jpath.path.total_cost = path->jpath.path.startup_cost + run_cost;
+
+    final_cost_parallel(&path->jpath.path, workspace, startup_cost, run_cost);}/*
@@ -2485,6 +2641,17 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,    workspace->run_cost =
run_cost;   workspace->numbuckets = numbuckets;    workspace->numbatches = numbatches;
 
+
+    /* costs for parallel execution */
+    clear_cost_parallel(workspace);
+
+    /* Any side is executed first in hash join... */
+    if (inner_path->parallel || outer_path->parallel)
+    {
+        initial_cost_parallel(workspace,
+                              outer_path->pstartup_cost,
+                              inner_path->pstartup_cost);
+    }}/*
@@ -2510,7 +2677,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,    double        outer_path_rows =
outer_path->rows;   double        inner_path_rows = inner_path->rows;    List       *hashclauses =
path->path_hashclauses;
-    Cost        startup_cost = workspace->startup_cost;
+    Cost        startup_cost = 0;    Cost        run_cost = workspace->run_cost;    int            numbuckets =
workspace->numbuckets;   int            numbatches = workspace->numbatches;
 
@@ -2700,8 +2867,10 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,    cpu_per_tuple = cpu_tuple_cost +
qp_qual_cost.per_tuple;   run_cost += cpu_per_tuple * hashjointuples;
 
-    path->jpath.path.startup_cost = startup_cost;
-    path->jpath.path.total_cost = startup_cost + run_cost;
+    path->jpath.path.startup_cost = startup_cost + workspace->startup_cost;
+    path->jpath.path.total_cost = path->jpath.path.startup_cost + run_cost;
+
+    final_cost_parallel(&path->jpath.path, workspace, startup_cost, run_cost);}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 4b641a2..67d801a 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -3163,6 +3163,9 @@ copy_path_costsize(Plan *dest, Path *src)    {        dest->startup_cost = src->startup_cost;
  dest->total_cost = src->total_cost;
 
+        dest->parallel_start = src->parallel;
+        dest->pstartup_cost = src->pstartup_cost;
+        dest->ptotal_cost = src->ptotal_cost;        dest->plan_rows = src->rows;        dest->plan_width =
src->parent->width;   }
 
@@ -3170,6 +3173,9 @@ copy_path_costsize(Plan *dest, Path *src)    {        dest->startup_cost = 0;
dest->total_cost= 0;
 
+        dest->parallel_start = false;
+        dest->pstartup_cost = 0;
+        dest->ptotal_cost = 0;        dest->plan_rows = 0;        dest->plan_width = 0;    }
@@ -3462,6 +3468,7 @@ ForeignScan *make_foreignscan(List *qptlist,                 List *qpqual,                 Index
scanrelid,
+                 bool  parallel,                 List *fdw_exprs,                 List *fdw_private){
@@ -3474,6 +3481,7 @@ make_foreignscan(List *qptlist,    plan->lefttree = NULL;    plan->righttree = NULL;
node->scan.scanrelid= scanrelid;
 
+    node->scan.parallel = parallel;    node->fdw_exprs = fdw_exprs;    node->fdw_private = fdw_private;    /*
fsSystemColwill be filled in by create_foreignscan_plan */
 
@@ -3489,6 +3497,10 @@ make_append(List *appendplans, List *tlist)    Plan       *plan = &node->plan;    double
total_size;   ListCell   *subnode;
 
+    double      max_par_startup = 0;
+    double      par_running = 0;
+    double      nonpar_startup = 0;
+    double      nonpar_running = 0;    /*     * Compute cost as sum of subplan costs.  We charge nothing extra for
the
@@ -3509,12 +3521,27 @@ make_append(List *appendplans, List *tlist)    {        Plan       *subplan = (Plan *)
lfirst(subnode);
-        if (subnode == list_head(appendplans))    /* first node? */
-            plan->startup_cost = subplan->startup_cost;
-        plan->total_cost += subplan->total_cost;
+        if (subplan->parallel_start)
+        {
+            if (max_par_startup < subplan->startup_cost)
+                max_par_startup = subplan->startup_cost;
+            par_running += subplan->total_cost - subplan->startup_cost;
+        }
+        else
+        {
+            nonpar_startup += subplan->startup_cost;
+            nonpar_running += (subplan->total_cost - subplan->startup_cost);
+        }
+        plan->plan_rows += subplan->plan_rows;        total_size += subplan->plan_width * subplan->plan_rows;    }
+
+    plan->startup_cost = 
+        (max_par_startup < nonpar_startup ? nonpar_startup : max_par_startup);
+    plan->total_cost = 
+        plan->startup_cost + par_running + nonpar_running;
+    if (plan->plan_rows > 0)        plan->plan_width = rint(total_size / plan->plan_rows);    else
diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c
index 93484a0..9d75dd7 100644
--- a/src/backend/optimizer/plan/planmain.c
+++ b/src/backend/optimizer/plan/planmain.c
@@ -240,5 +240,8 @@ query_planner(PlannerInfo *root, List *tlist,        final_rel->cheapest_total_path->param_info !=
NULL)       elog(ERROR, "failed to construct the join relation");
 
+    /* Decide each parallelizable path should be run in parallel */
+    choose_parallel_scans(final_rel);
+    return final_rel;}
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 319e8b2..d716dcb 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -900,27 +900,11 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
                * unsorted */    pathnode->subpaths = subpaths;
 
-    /*
-     * We don't bother with inventing a cost_append(), but just do it here.
-     *
-     * Compute rows and costs as sums of subplan rows and costs.  We charge
-     * nothing extra for the Append itself, which perhaps is too optimistic,
-     * but since it doesn't do any selection or projection, it is a pretty
-     * cheap node.  If you change this, see also make_append().
-     */
-    pathnode->path.rows = 0;
-    pathnode->path.startup_cost = 0;
-    pathnode->path.total_cost = 0;
+    cost_append(&pathnode->path, subpaths);
+    foreach(l, subpaths)    {
-        Path       *subpath = (Path *) lfirst(l);
-
-        pathnode->path.rows += subpath->rows;
-
-        if (l == list_head(subpaths))    /* first node? */
-            pathnode->path.startup_cost = subpath->startup_cost;
-        pathnode->path.total_cost += subpath->total_cost;
-
+        Path *subpath = (Path*)lfirst(l);        /* All child paths must have same parameterization */
Assert(bms_equal(PATH_REQ_OUTER(subpath),required_outer));    }
 
@@ -943,7 +927,14 @@ create_merge_append_path(PlannerInfo *root,    MergeAppendPath *pathnode =
makeNode(MergeAppendPath);   Cost        input_startup_cost;    Cost        input_total_cost;
 
+    Cost        input_pstartup_cost;
+    Cost        input_ptotal_cost;    ListCell   *l;
+    Cost        max_par_startup = 0.0;
+    Cost        par_running = 0.0;
+    Cost        nonpar_startup = 0.0;
+    Cost         nonpar_running = 0.0;
+    bool        parallel_exists = false;    pathnode->path.pathtype = T_MergeAppend;    pathnode->path.parent = rel;
@@ -970,14 +961,16 @@ create_merge_append_path(PlannerInfo *root,    foreach(l, subpaths)    {        Path
*subpath= (Path *) lfirst(l);
 
+        double additional_startup_cost = 0;
+        double additional_total_cost = 0;        pathnode->path.rows += subpath->rows;        if
(pathkeys_contained_in(pathkeys,subpath->pathkeys))        {            /* Subpath is adequately ordered, we won't need
tosort it */
 
-            input_startup_cost += subpath->startup_cost;
-            input_total_cost += subpath->total_cost;
+            additional_startup_cost = subpath->startup_cost;
+            additional_total_cost = subpath->total_cost;        }        else        {
@@ -993,18 +986,55 @@ create_merge_append_path(PlannerInfo *root,                      0.0,
work_mem,                     pathnode->limit_tuples);
 
-            input_startup_cost += sort_path.startup_cost;
-            input_total_cost += sort_path.total_cost;
+            additional_startup_cost = sort_path.startup_cost;
+            additional_total_cost = sort_path.total_cost;
+        }
+
+        input_startup_cost += additional_startup_cost;
+        input_total_cost += additional_total_cost;
+
+        /* cost calculation for parallel execution  */
+        if (subpath->parallel)
+        {
+            Cost thiscost =
+                subpath->pstartup_cost + additional_startup_cost;
+            if (max_par_startup < thiscost)
+                max_par_startup = thiscost;
+            par_running +=
+                (subpath->ptotal_cost - subpath->pstartup_cost) +
+                (additional_total_cost - additional_startup_cost) -
+                (subpath->total_cost - subpath->startup_cost);
+            parallel_exists = true;
+        }
+        else
+        {
+            nonpar_startup +=
+                subpath->startup_cost + additional_startup_cost;
+            nonpar_running +=
+                additional_total_cost - additional_startup_cost;        }        /* All child paths must have same
parameterization*/        Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));    }
 
+    
+
+    if (parallel_exists)
+    {
+        input_pstartup_cost =
+            (max_par_startup < nonpar_startup ?
+             nonpar_startup : max_par_startup);
+        input_ptotal_cost = 
+            input_pstartup_cost +
+            (par_running < nonpar_running ? nonpar_running : par_running);
+    }    /* Now we can compute total costs of the MergeAppend */    cost_merge_append(&pathnode->path, root,
          pathkeys, list_length(subpaths),                      input_startup_cost, input_total_cost,
 
+                      parallel_exists,
+                      input_pstartup_cost, input_ptotal_cost,                      rel->tuples);    return pathnode;
@@ -1066,6 +1096,12 @@ create_material_path(RelOptInfo *rel, Path *subpath)                  subpath->rows,
    rel->width);
 
+    if (subpath->parallel)
+    {
+        Cost diff = subpath->pstartup_cost - subpath->startup_cost;
+        pathnode->path.pstartup_cost = pathnode->path.startup_cost + diff;
+        pathnode->path.ptotal_cost   = pathnode->path.total_cost + diff;
+    }    return pathnode;}
@@ -1292,6 +1328,12 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
pathnode->path.total_cost= subpath->total_cost;        pathnode->path.pathkeys = subpath->pathkeys;
 
+        if (subpath->parallel)
+        {
+            pathnode->path.startup_cost = subpath->startup_cost;
+            pathnode->path.total_cost = subpath->total_cost;
+            pathnode->path.parallel = true;
+        }        rel->cheapest_unique_path = (Path *) pathnode;        MemoryContextSwitchTo(oldcontext);
@@ -1328,6 +1370,12 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
pathnode->path.total_cost= subpath->total_cost;                pathnode->path.pathkeys = subpath->pathkeys;
 
+                if (subpath->parallel)
+                {
+                    pathnode->path.startup_cost = subpath->startup_cost;
+                    pathnode->path.total_cost = subpath->total_cost;
+                    pathnode->path.parallel = true;
+                }                rel->cheapest_unique_path = (Path *) pathnode;
MemoryContextSwitchTo(oldcontext);
@@ -1407,6 +1455,19 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
pathnode->path.total_cost= sort_path.total_cost;    }
 
+    /* calculate parallel execution costs */
+    if (subpath->parallel)
+    {
+        pathnode->path.pstartup_cost = 
+            sort_path.startup_cost - subpath->startup_cost
+            + subpath->pstartup_cost;
+        pathnode->path.ptotal_cost = 
+            sort_path.total_cost - subpath->total_cost
+            + subpath->total_cost;
+        pathnode->path.parallel = true;
+    }
+        
+    rel->cheapest_unique_path = (Path *) pathnode;    MemoryContextSwitchTo(oldcontext);
@@ -1576,9 +1637,8 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,ForeignPath
*create_foreignscan_path(PlannerInfo*root, RelOptInfo *rel,                        double rows, Cost startup_cost, Cost
total_cost,
-                        List *pathkeys,
-                        Relids required_outer,
-                        List *fdw_private)
+                        List *pathkeys, Relids required_outer,
+                        bool parallel, List *fdw_private){    ForeignPath *pathnode = makeNode(ForeignPath);
@@ -1589,6 +1649,9 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,    pathnode->path.rows = rows;
pathnode->path.startup_cost= startup_cost;    pathnode->path.total_cost = total_cost;
 
+    pathnode->path.parallel = parallel;
+    pathnode->path.pstartup_cost = startup_cost;
+    pathnode->path.ptotal_cost = total_cost;    pathnode->path.pathkeys = pathkeys;    pathnode->fdw_private =
fdw_private;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6c52db8..cf3f104 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2691,6 +2691,24 @@ static struct config_real ConfigureNamesReal[] =        0.5, 0.0, 1.0,        NULL, NULL, NULL
},
 
+    {
+        {"parallel_cost_threshold", PGC_USERSET, QUERY_TUNING_METHOD,
+             gettext_noop("Set the minimum total cost to allow parallel plans."),
+            NULL
+        },
+        ¶llel_cost_threshold,
+        10000.0, 0.0, 1.0e10,    /* disable_cost */
+        NULL, NULL, NULL
+    },
+    {
+        {"parallel_ratio_threshold", PGC_USERSET, QUERY_TUNING_METHOD,
+             gettext_noop("Set threshold to select parallel plans, as the ratio of parallel cost to non-parallel
cost."),
+            NULL
+        },
+        ¶llel_ratio_threshold,
+        0.8, 0.0, 10.0,
+        NULL, NULL, NULL
+    },    /* End-of-list marker */    {
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 3b9c683..f4f9deb 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -95,6 +95,9 @@ typedef struct Plan     */    Cost        startup_cost;    /* cost expended before fetching any
tuples*/    Cost        total_cost;        /* total cost (assuming all tuples fetched) */
 
+    bool        parallel_start; /* this node will run in parallel */
+    Cost        pstartup_cost;  /* startup cost for parallel execution */
+    Cost        ptotal_cost;    /* total cost for parallel execution */    /*     * planner's estimate of result size
ofthis plan step
 
@@ -264,6 +267,7 @@ typedef struct Scan{    Plan        plan;    Index        scanrelid;        /* relid is index into
therange table */
 
+    bool        parallel;        /* This scan will run in parallel */} Scan;/* ----------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index dacbe9c..08d59aa 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -729,6 +729,10 @@ typedef struct Path    Cost        startup_cost;    /* cost expended before fetching any tuples */
  Cost        total_cost;        /* total cost (assuming all tuples fetched) */
 
+    bool        parallel;        /* can run in parallel ? */
+    Cost        pstartup_cost;    /* startup cost for parallel execution */
+    Cost        ptotal_cost;    /* total cost for parallel execution */
+    List       *pathkeys;        /* sort ordering of path's output */    /* pathkeys is a List of PathKey nodes; see
above*/} Path;
 
@@ -1626,6 +1630,10 @@ typedef struct JoinCostWorkspace    Cost        startup_cost;    /* cost expended before
fetchingany tuples */    Cost        total_cost;        /* total cost (assuming all tuples fetched) */
 
+    bool        parallel;        /* can run in parallel ? */
+    Cost        pstartup_cost;    /* startup cost for parallel execution */
+    Cost        ptotal_cost;    /* total cost for parallel execution */
+        /* Fields below here should be treated as private to costsize.c */    Cost        run_cost;        /*
non-startupcost components */
 
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 75e2afb..30b9858 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -91,9 +91,12 @@ extern void cost_sort(Path *path, PlannerInfo *root,          List *pathkeys, Cost input_cost,
doubletuples, int width,          Cost comparison_cost, int sort_mem,          double limit_tuples);
 
+extern void cost_append(Path *path, List *subpaths);extern void cost_merge_append(Path *path, PlannerInfo *root,
          List *pathkeys, int n_streams,                  Cost input_startup_cost, Cost input_total_cost,
 
+                  bool parallel_estimate,
+                  Cost input_pstartup_cost, Cost input_ptotal_cost,                  double tuples);extern void
cost_material(Path*path,              Cost input_startup_cost, Cost input_total_cost,
 
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index a0bcc82..b84d176 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -79,9 +79,8 @@ extern Path *create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
Relidsrequired_outer);extern ForeignPath *create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
    double rows, Cost startup_cost, Cost total_cost,
 
-                        List *pathkeys,
-                        Relids required_outer,
-                        List *fdw_private);
+                        List *pathkeys,    Relids required_outer,
+                        bool parallel,    List *fdw_private);extern Relids calc_nestloop_required_outer(Path
*outer_path,Path *inner_path);extern Relids calc_non_nestloop_required_outer(Path *outer_path, Path *inner_path);
 
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 9b22fda..36879f1 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -20,8 +20,10 @@/* * allpaths.c */
-extern bool enable_geqo;
-extern int    geqo_threshold;
+extern bool     enable_geqo;
+extern int        geqo_threshold;
+extern double    parallel_cost_threshold;
+extern double    parallel_ratio_threshold;/* Hook for plugins to replace standard_join_search() */typedef RelOptInfo
*(*join_search_hook_type)(PlannerInfo *root,
 
@@ -30,6 +32,7 @@ typedef RelOptInfo *(*join_search_hook_type) (PlannerInfo *root,extern PGDLLIMPORT
join_search_hook_typejoin_search_hook;
 
+extern void choose_parallel_scans(RelOptInfo *rel);extern RelOptInfo *make_one_rel(PlannerInfo *root, List
*joinlist);externRelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,                     List
*initial_rels);
diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h
index 4504250..e72ad72 100644
--- a/src/include/optimizer/planmain.h
+++ b/src/include/optimizer/planmain.h
@@ -44,7 +44,7 @@ extern Plan *create_plan(PlannerInfo *root, Path *best_path);extern SubqueryScan
*make_subqueryscan(List*qptlist, List *qpqual,                  Index scanrelid, Plan *subplan);extern ForeignScan
*make_foreignscan(List*qptlist, List *qpqual,
 
-                 Index scanrelid, List *fdw_exprs, List *fdw_private);
+             Index scanrelid, bool parallel, List *fdw_exprs, List *fdw_private);extern Append *make_append(List
*appendplans,List *tlist);extern RecursiveUnion *make_recursive_union(List *tlist,                     Plan *lefttree,
Plan*righttree, int wtParam, 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..d252eb1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,11 +44,15 @@ typedef struct ConnCacheKeytypedef struct ConnCacheEntry{    ConnCacheKey key;            /* hash
key(must be first) */
 
-    PGconn       *conn;            /* connection to foreign server, or NULL */
+    PGconn       *conn;            /* primary connection to foreign server */
+    List       *extraconns;        /* Other connection list of AdditionalConns */    int            xact_depth;
/*0 = no xact open, 1 = main xact open, 2 =                                 * one level of subxact open, etc */
 
+    char        *snapshot_id;    /* snapshot id for this server */    bool        have_prep_stmt; /* have we prepared
anystmts in this xact? */    bool        have_error;        /* have any subxacts aborted in this xact? */
 
+    int            reserved_aux_conns; /* Number of reserved rooms for parallel
+                                     * connection */} ConnCacheEntry;/*
@@ -75,31 +79,16 @@ static void pgfdw_subxact_callback(SubXactEvent event,                       SubTransactionId
parentSubid,                      void *arg);
 
-/*
- * Get a PGconn which can be used to execute queries on the remote PostgreSQL
- * server with the user's authorization.  A new connection is established
- * if we don't already have a suitable one, and a transaction is opened at
- * the right subtransaction nesting depth if we didn't do that already.
- *
- * will_prep_stmt must be true if caller intends to create any prepared
- * statements.  Since those don't go away automatically at transaction end
- * (not even on error), we need this flag to cue manual cleanup.
- *
- * XXX Note that caching connections theoretically requires a mechanism to
- * detect change of FDW objects to invalidate already established connections.
- * We could manage that by watching for invalidation events on the relevant
- * syscaches.  For the moment, though, it's not clear that this would really
- * be useful and not mere pedantry.  We could not flush any active connections
- * mid-transaction anyway.
+ * Search connection cache entry for specified server and user. Creates new
+ * one if not exists. */
-PGconn *
-GetConnection(ForeignServer *server, UserMapping *user,
-              bool will_prep_stmt)
+static ConnCacheEntry*
+search_cache_entry(ForeignServer *server, UserMapping *user){
-    bool        found;
-    ConnCacheEntry *entry;    ConnCacheKey key;
+    ConnCacheEntry *entry;
+    bool    found;    /* First time through, initialize connection cache hashtable */    if (ConnectionHash == NULL)
@@ -124,9 +113,6 @@ GetConnection(ForeignServer *server, UserMapping *user,
RegisterSubXactCallback(pgfdw_subxact_callback,NULL);    }
 
-    /* Set flag that we did GetConnection during the current transaction */
-    xact_got_connection = true;
-    /* Create hash key for the entry.  Assume no pad bytes in key struct */    key.serverid = server->serverid;
key.userid= user->userid;
 
@@ -139,11 +125,52 @@ GetConnection(ForeignServer *server, UserMapping *user,    {        /* initialize new hashtable
entry(key is already filled in) */        entry->conn = NULL;
 
+        entry->extraconns = NIL;        entry->xact_depth = 0;        entry->have_prep_stmt = false;
entry->have_error= false;
 
+        entry->reserved_aux_conns = 0;
+        entry->snapshot_id = NULL;    }
+    return entry;
+}
+
+/*
+ * Get a PGconn which can be used to execute queries on the remote PostgreSQL
+ * server with the user's authorization.  A new connection is established
+ * if we don't already have a suitable one, and a transaction is opened at
+ * the right subtransaction nesting depth if we didn't do that already.
+ *
+ * will_prep_stmt must be true if caller intends to create any prepared
+ * statements.  Since those don't go away automatically at transaction end
+ * (not even on error), we need this flag to cue manual cleanup.
+ *
+ * Create new auxiliary connection to be used for parallel query if parallel
+ * is true. The number of auxiliary connections assumed to be limited by the
+ * caller using ReserveParallelConnection().
+ *
+ * XXX Note that caching connections theoretically requires a mechanism to
+ * detect change of FDW objects to invalidate already established connections.
+ * We could manage that by watching for invalidation events on the relevant
+ * syscaches.  For the moment, though, it's not clear that this would really
+ * be useful and not mere pedantry.  We could not flush any active connections
+ * mid-transaction anyway.
+ */
+PGconn *
+GetConnection(ForeignServer *server, UserMapping *user,
+              bool will_prep_stmt, bool parallel)
+{
+    ConnCacheEntry *entry;
+    PGconn        *retconn;
+
+    Assert(!(will_prep_stmt && parallel));
+
+    entry = search_cache_entry(server, user);
+
+    /* Set flag that we did GetConnection during the current transaction */
+    xact_got_connection = true;
+    /*     * We don't check the health of cached connection here, because it would     * require some overhead.
Brokenconnection will be detected when the
 
@@ -160,20 +187,102 @@ GetConnection(ForeignServer *server, UserMapping *user,        entry->xact_depth = 0;    /* just
tobe sure */        entry->have_prep_stmt = false;        entry->have_error = false;
 
+        entry->snapshot_id = NULL;        entry->conn = connect_pg_server(server, user);        elog(DEBUG3, "new
postgres_fdwconnection %p for server \"%s\"",             entry->conn, server->servername);
 
+    }
-    /*
-     * Start a new transaction or subtransaction if needed.
-     */    begin_remote_xact(entry);    /* Remember if caller will prepare statements */    entry->have_prep_stmt |=
will_prep_stmt;
-    return entry->conn;
+    retconn = entry->conn;
+    if (parallel)
+    {
+        PGconn        *conn;
+        PGresult    *res;
+        char         sql[64];
+        MemoryContext oldcontext;
+
+        /* Parallel query needs exported snapshot. */
+        if (entry->snapshot_id == NULL)
+            return NULL;
+
+        /* Make new connection and setup it. */
+        conn = connect_pg_server(server, user);
+        if (IsolationIsSerializable())
+            strncpy(sql, "START TRANSACTION ISOLATION LEVEL SERIALIZABLE", 64);
+        else
+            strncpy(sql, "START TRANSACTION ISOLATION LEVEL REPEATABLE READ", 64);
+        res = PQexec(conn, sql);
+        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            pgfdw_report_error(ERROR, res, conn, true, sql);
+        PQclear(res);
+
+        /*
+         * Snapshot setup for parallel query. If the snapshot id could be get
+         * from this server, this should succeed on the same server.
+         */
+        snprintf(sql, 64, "SET TRANSACTION SNAPSHOT \'%s\'", entry->snapshot_id);
+        res = PQexec(conn, sql);
+        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            pgfdw_report_error(ERROR, res, conn, true, sql);
+        PQclear(res);
+
+        /* This list should be in the same context with connection cache */
+        oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+        entry->extraconns = lappend(entry->extraconns, conn);
+        MemoryContextSwitchTo(oldcontext);
+        retconn = conn;
+    }
+
+    elog(DEBUG3, "Get %s connection (parallels %d): %p",
+         parallel ? "parallel" : "base", list_length(entry->extraconns),
+         retconn);
+
+    return retconn;
+}
+
+
+/* 
+ * Reserve a room for parallel connection up to aux_conn_limit.
+ */
+bool
+ReserveParallelConnection(ForeignServer *server, UserMapping *user,
+                          int aux_conn_limit)
+{
+    ConnCacheEntry *entry;
+
+    if (!user)
+        return false;
+
+    entry = search_cache_entry(server, user);
+
+    if (entry->reserved_aux_conns >= aux_conn_limit)
+        return false;
+
+    entry->reserved_aux_conns++;
+    return true;
+}
+
+/*
+ * Reset reserved number of parallel connections
+ */
+void
+ResetReservedParallelConnections(ForeignServer *server, UserMapping *user)
+{
+    ConnCacheEntry *entry;
+
+    if (!user)
+        return false;
+
+    entry = search_cache_entry(server, user);
+
+    entry->reserved_aux_conns = 0;
+    return;}/*
@@ -378,6 +487,7 @@ begin_remote_xact(ConnCacheEntry *entry)    if (entry->xact_depth <= 0)    {        const char
*sql;
+        PGresult *res;        elog(DEBUG3, "starting remote transaction on connection %p",             entry->conn);
@@ -388,6 +498,30 @@ begin_remote_xact(ConnCacheEntry *entry)            sql = "START TRANSACTION ISOLATION LEVEL
REPEATABLEREAD";        do_sql_command(entry->conn, sql);        entry->xact_depth = 1;
 
+
+        /* 
+         * To avoid error on the remote server, check if we can call
+         * pg_export_snapshot() previously.
+         */
+        sql = "SELECT count(*) FROM pg_proc p JOIN pg_namespace n ON "
+                     "(n.oid = p.pronamespace AND n.nspname = 'pg_catalog')"
+            "WHERE p.proname = 'pg_export_snapshot' AND pronargs = 0";
+        res = PQexec(entry->conn, sql);
+        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+            pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+        if (strcmp(PQgetvalue(res, 0, 0), "1") == 0)
+        {
+            /* Get transaction snapshot if we can */
+            PQclear(res);
+            sql = "SELECT pg_export_snapshot()";
+            res = PQexec(entry->conn, sql);
+            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+                pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+            entry->snapshot_id = strdup(PQgetvalue(res, 0, 0));
+            PQclear(res);
+        }
+        else
+            PQclear(res);    }    /*
@@ -406,16 +540,92 @@ begin_remote_xact(ConnCacheEntry *entry)}/*
- * Release connection reference count created by calling GetConnection.
+ * Close all auxiliary connections if any.
+ */
+static void
+ReleaseAllAuxConnections(ConnCacheEntry *entry)
+{
+    ListCell *lc;
+
+    foreach(lc, entry->extraconns)
+    {
+        PGconn *c = (PGconn *) lfirst(lc);
+        PQfinish(c);
+    }
+    list_free(entry->extraconns);
+    entry->extraconns = NIL;
+}
+
+/*
+ * Release connection if required. All connections but the base connection
+ * should be release immediately. */void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(ForeignServer *server, UserMapping *user, PGconn *conn){
+    ConnCacheEntry *entry;
+    ConnCacheKey key;
+
+    /* Clean up current asynchronous query if any */
+    while (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+    {
+        PGresult *res = PQgetResult(conn);
+        if (res)
+            PQclear(res);
+    }        
+
+    /* Create hash key for the entry.  Assume no pad bytes in key struct */
+    key.serverid = server->serverid;
+    key.userid = user->userid;
+    /*
-     * Currently, we don't actually track connection references because all
-     * cleanup is managed on a transaction or subtransaction basis instead. So
-     * there's nothing to do here.
+     * Find cached entry for the connection.     */
+    entry = hash_search(ConnectionHash, &key, HASH_FIND, NULL);
+
+    if (!entry)
+        elog(WARNING, "Inconsistent connection release in postgres_fdw.");
+    else
+    {
+        if (entry->conn == conn) {
+            ReleaseAllAuxConnections(entry);
+
+            /*
+             * Don't reset reserved number before auxiliary connections are
+             * actually made
+             */
+            if (entry->extraconns)
+                entry->reserved_aux_conns = 0;
+        }
+        else
+        {
+            ListCell *lc;
+            PGconn   *c = NULL;
+
+            /* Find  */
+            foreach(lc, entry->extraconns)
+            {
+                c = (PGconn *) lfirst(lc);
+                if (conn == c) break;
+            }
+
+            if (!lc)
+            {
+                /* XXXX: This is basically a bug, but simply ignore it... */
+                elog(WARNING, "Unknown connection is tried to release. Ignore it.");
+            }
+            else
+            {
+                /* Close the found connection and remove it from the list */
+                do_sql_command(c, "COMMIT TRANSACTION");
+                PQfinish(c);
+                elog(DEBUG3, "Parallel connection closed: %p", c);
+                entry->extraconns = list_delete_ptr(entry->extraconns, c);
+                if (entry->reserved_aux_conns > 0) 
+                    entry->reserved_aux_conns--;
+            }
+        }
+    }}/*
@@ -571,6 +781,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)                        res = PQexec(entry->conn,
"DEALLOCATEALL");                        PQclear(res);                    }
 
+                    entry->snapshot_id = NULL;                    entry->have_prep_stmt = false;
entry->have_error= false;                    break;
 
@@ -612,6 +823,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)                            res =
PQexec(entry->conn,"DEALLOCATE ALL");                            PQclear(res);                        }
 
+                        entry->snapshot_id = NULL;                        entry->have_prep_stmt = false;
        entry->have_error = false;                    }
 
@@ -619,6 +831,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)            }        }
+        ReleaseAllAuxConnections(entry);        /* Reset state to show we're out of a transaction */
entry->xact_depth= 0;
 
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 65e7b89..5323d10 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -150,6 +150,7 @@ InitPgFdwOptions(void)        /* cost factors */        {"fdw_startup_cost",
ForeignServerRelationId,false},        {"fdw_tuple_cost", ForeignServerRelationId, false},
 
+        {"max_aux_connections", ForeignServerRelationId, false},        /* updatable is available on both server and
table*/        {"updatable", ForeignServerRelationId, false},        {"updatable", ForeignTableRelationId, false},
 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 4c49776..3fd60e1 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -76,7 +76,8 @@ typedef struct PgFdwRelationInfo    /* Cached catalog information. */    ForeignTable *table;
ForeignServer*server;
 
-    UserMapping *user;            /* only set in use_remote_estimate mode */
+    UserMapping  *user;            /* only set in use_remote_estimate mode */
+    bool          parallel;        /* true if this rel can be scanned in parallel */} PgFdwRelationInfo;/*
@@ -136,6 +137,10 @@ typedef struct PgFdwScanState    /* for remote query execution */    PGconn       *conn;
/* connection for the scan */
 
+    ForeignServer *server;        /* The foreign server this scan based on */
+    UserMapping      *user;        /* The user this scan done by */
+    bool        parallel;        /* ture if this scan is allowed to run in
+                                 * parallel */    unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool       cursor_exists;    /* have we created the cursor? */    int            numParams;        /* number of
parameterspassed to query */
 
@@ -145,6 +150,7 @@ typedef struct PgFdwScanState    /* for storing result tuples */    HeapTuple  *tuples;
/*array of currently-retrieved tuples */
 
+    int            tupplane;        /* Which tuples currently read */    int            num_tuples;        /* # of
tuplesin array */    int            next_tuple;        /* index of next one to return */
 
@@ -168,6 +174,8 @@ typedef struct PgFdwModifyState    /* for remote query execution */    PGconn       *conn;
 /* connection for the scan */    char       *p_name;            /* name of prepared statement, if created */
 
+    ForeignServer *server;        /* The foreign server this scan based on */
+    UserMapping      *user;        /* The user this scan done by */    /* extracted fdw_private data */    char
*query;           /* text of INSERT/UPDATE/DELETE command */
 
@@ -306,7 +314,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass*ec, EquivalenceMember *em,                          void *arg);static void
create_cursor(ForeignScanState*node);
 
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool async);static void close_cursor(PGconn *conn, unsigned int
cursor_number);staticvoid prepare_foreign_modify(PgFdwModifyState *fmstate);static const char
**convert_prep_stmt_params(PgFdwModifyState*fmstate,
 
@@ -328,6 +336,10 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,                           MemoryContext
temp_context);staticvoid conversion_error_callback(void *arg);
 
+/* PQtransactionStatus returns 2 state for IDLE condition */
+#define CONN_IS_IDLE(conn) \
+    (PQtransactionStatus(conn) == PQTRANS_IDLE || \
+     PQtransactionStatus(conn) == PQTRANS_INTRANS)/* * Foreign-data wrapper handler function: return a struct with
pointers
@@ -384,6 +396,7 @@ postgresGetForeignRelSize(PlannerInfo *root,{    PgFdwRelationInfo *fpinfo;    ListCell   *lc;
+    unsigned long max_aux_connections = 0;    /*     * We use PgFdwRelationInfo to pass various information to
subsequent
@@ -414,6 +427,8 @@ postgresGetForeignRelSize(PlannerInfo *root,            fpinfo->fdw_startup_cost =
strtod(defGetString(def),NULL);        else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
fpinfo->fdw_tuple_cost= strtod(defGetString(def), NULL); 
+        else if (strcmp(def->defname, "max_aux_connections") == 0)
+            max_aux_connections = strtol(defGetString(def), NULL, 10);    }    foreach(lc, fpinfo->table->options)
{
@@ -442,6 +457,11 @@ postgresGetForeignRelSize(PlannerInfo *root,    else        fpinfo->user = NULL;
+    /* Check if this foregn relation has the auxiliary connection available.*/
+    fpinfo->parallel =
+        ReserveParallelConnection(fpinfo->server, fpinfo->user,
+                                  max_aux_connections);
+    /*     * Identify which baserestrictinfo clauses can be sent to the remote     * server and which can't.
@@ -558,6 +578,7 @@ postgresGetForeignPaths(PlannerInfo *root,                                   fpinfo->total_cost,
                              NIL, /* no pathkeys */                                   NULL,        /* no outer rel
either*/
 
+                                   fpinfo->parallel,                                   NIL);        /* no fdw_private
list*/    add_path(baserel, (Path *) path);
 
@@ -725,6 +746,7 @@ postgresGetForeignPaths(PlannerInfo *root,                                       total_cost,
                              NIL,        /* no pathkeys */
param_info->ppi_req_outer,
+                                       fpinfo->parallel,                                       NIL);    /* no
fdw_privatelist */        add_path(baserel, (Path *) path);    }
 
@@ -752,6 +774,9 @@ postgresGetForeignPlan(PlannerInfo *root,    StringInfoData sql;    ListCell   *lc;
+    /* reset reserved number of auxiliary connections */
+    ResetReservedParallelConnections(fpinfo->server, fpinfo->user);
+    /*     * Separate the scan_clauses into those that can be executed remotely and     * those that can't.
baserestrictinfoclauses that were previously
 
@@ -866,6 +891,7 @@ postgresGetForeignPlan(PlannerInfo *root,    return make_foreignscan(tlist,
  local_exprs,                            scan_relid,
 
+                            best_path->path.parallel,                            params_list,
 fdw_private);}
 
@@ -888,6 +914,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)    int            numParams;    int
       i;    ListCell   *lc;
 
+    bool        parallel = fsplan->scan.parallel;    /*     * Do nothing in EXPLAIN (no ANALYZE) case.
node->fdw_statestays NULL.
 
@@ -918,7 +945,20 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)     * Get connection to the foreign
server. Connection manager will     * establish new connection if necessary.     */
 
-    fsstate->conn = GetConnection(server, user, false);
+    fsstate->conn = GetConnection(server, user, false, parallel);
+    if (parallel && fsstate->conn == NULL)
+    {
+        /*
+         * Somehow no more auxiliary connection available, so this relation is
+         * scanned in the base connection sequentially.
+         */
+        elog(WARNING, "Failed to get parallel connection for foreign relation %d. Fall back to base connection.",
+             RelationGetRelid(fsstate->rel));
+        parallel = false;
+        fsstate->conn = GetConnection(server, user, false, false);        
+    }
+    fsstate->server = server;
+    fsstate->user = user;    /* Assign a unique ID for my cursor */    fsstate->cursor_number =
GetCursorNumber(fsstate->conn);
@@ -981,6 +1021,17 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)        fsstate->param_values =
(constchar **) palloc0(numParams * sizeof(char *));    else        fsstate->param_values = NULL;
 
+
+    fsstate->parallel = parallel;
+    if (parallel)
+    {
+        /* 
+         * This connection is allowed asynchronous query execution, so start
+         * it just now. 
+         */
+        create_cursor(node);
+        fetch_more_data(node, true);
+    }}/*
@@ -1008,7 +1059,7 @@ postgresIterateForeignScan(ForeignScanState *node)    {        /* No point in another fetch if we
alreadydetected EOF, though. */        if (!fsstate->eof_reached)
 
-            fetch_more_data(node);
+            fetch_more_data(node, fsstate->parallel);        /* If we didn't get any tuples, must be end of data. */
    if (fsstate->next_tuple >= fsstate->num_tuples)            return ExecClearTuple(slot);
 
@@ -1099,7 +1150,7 @@ postgresEndForeignScan(ForeignScanState *node)        close_cursor(fsstate->conn,
fsstate->cursor_number);   /* Release remote connection */
 
-    ReleaseConnection(fsstate->conn);
+    ReleaseConnection(fsstate->server, fsstate->user, fsstate->conn);    fsstate->conn = NULL;    /* MemoryContexts
willbe deleted automatically. */
 
@@ -1301,7 +1352,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate,    user = GetUserMapping(userid,
server->serverid);   /* Open connection; report that we'll create a prepared statement. */
 
-    fmstate->conn = GetConnection(server, user, true);
+    fmstate->conn = GetConnection(server, user, true, false);
+    fmstate->server = server;
+    fmstate->user = user;    fmstate->p_name = NULL;        /* prepared statement not made yet */    /* Deconstruct
fdw_privatedata. */
 
@@ -1599,7 +1652,7 @@ postgresEndForeignModify(EState *estate,    }    /* Release remote connection */
-    ReleaseConnection(fmstate->conn);
+    ReleaseConnection(fmstate->server, fmstate->user, fmstate->conn);    fmstate->conn = NULL;}
@@ -1751,10 +1804,11 @@ estimate_path_cost_size(PlannerInfo *root,                              (fpinfo->remote_conds
==NIL), NULL);        /* Get the remote estimate */
 
-        conn = GetConnection(fpinfo->server, fpinfo->user, false);
+        conn = GetConnection(fpinfo->server, fpinfo->user, false, false);
+        get_remote_estimate(sql.data, conn, &rows, &width,                            &startup_cost, &total_cost);
-        ReleaseConnection(conn);
+        ReleaseConnection(fpinfo->server, fpinfo->user, conn);        retrieved_rows = rows;
@@ -2001,10 +2055,12 @@ create_cursor(ForeignScanState *node)}/*
- * Fetch some more rows from the node's cursor.
+ * Fetch some more rows from the node's cursor. async indicates that this
+ * query runs on the dedicated connection and requested asynchronous query
+ * execution. */static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool async){    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 PGresult   *volatile res = NULL;
 
@@ -2026,6 +2082,7 @@ fetch_more_data(ForeignScanState *node)        int            fetch_size;        int
numrows;       int            i;
 
+        bool        skip_get_result = false;        /* The fetch size is arbitrary, but shouldn't be enormous. */
 fetch_size = 100;
 
@@ -2033,36 +2090,81 @@ fetch_more_data(ForeignScanState *node)        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
              fetch_size, fsstate->cursor_number);
 
-        res = PQexec(conn, sql);
-        /* On error, report the original query, not the FETCH. */
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+        if (async && CONN_IS_IDLE(conn))
+        {
+            if (!PQsendQuery(conn, sql))
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-        /* Convert the data into HeapTuples */
-        numrows = PQntuples(res);
-        fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-        fsstate->num_tuples = numrows;
-        fsstate->next_tuple = 0;
+            /*
+             * IDLE connection state on async query means that this is the
+             * first call for this query, so return immediately. See
+             * postgresBeginForeignScan()
+             */
+            skip_get_result = true;
+        }
-        for (i = 0; i < numrows; i++)
+        if (!skip_get_result)        {
-            fsstate->tuples[i] =
-                make_tuple_from_result_row(res, i,
-                                           fsstate->rel,
-                                           fsstate->attinmeta,
-                                           fsstate->retrieved_attrs,
-                                           fsstate->temp_cxt);
-        }
+            if (async)
+            {
+                res = PQgetResult(conn);
+                if (PQntuples(res) == fetch_size)
+                {
+                    /*
+                     * Connection state doesn't go to IDLE even if all data
+                     * has been sent to client for asynchronous query. One
+                     * more PQgetResult() is needed to reset the state to
+                     * IDLE.  See PQexecFinish() for details.
+                     */
+                    if (PQgetResult(conn) != NULL)
+                        elog(ERROR, "Connection status error.");
+                        
+                }
+            }
+            else
+                res = PQexec(conn, sql);
-        /* Update fetch_ct_2 */
-        if (fsstate->fetch_ct_2 < 2)
-            fsstate->fetch_ct_2++;
+            /* On error, report the original query, not the FETCH. */
+            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-        /* Must be EOF if we didn't get as many tuples as we asked for. */
-        fsstate->eof_reached = (numrows < fetch_size);
+            /* Convert the data into HeapTuples */
+            numrows = PQntuples(res);
+            fsstate->tuples =
+                    (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+            fsstate->num_tuples = numrows;
+            fsstate->next_tuple = 0;
-        PQclear(res);
-        res = NULL;
+            for (i = 0; i < numrows; i++)
+            {
+                fsstate->tuples[i] =
+                    make_tuple_from_result_row(res, i,
+                                               fsstate->rel,
+                                               fsstate->attinmeta,
+                                               fsstate->retrieved_attrs,
+                                               fsstate->temp_cxt);
+            }
+
+            /* Update fetch_ct_2 */
+            if (fsstate->fetch_ct_2 < 2)
+                fsstate->fetch_ct_2++;
+
+            /* Must be EOF if we didn't get as many tuples as we asked for. */
+            fsstate->eof_reached = (numrows < fetch_size);
+
+            PQclear(res);
+            res = NULL;
+        }
+
+        if (async && !skip_get_result && !fsstate->eof_reached)
+        {
+            /*
+             * We can immediately request the next bunch of tuples if we're on
+             * asynchronous connection.
+             */
+            if (!PQsendQuery(conn, sql))
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+        }    }    PG_CATCH();    {
@@ -2315,8 +2417,7 @@ postgresAnalyzeForeignTable(Relation relation,    table =
GetForeignTable(RelationGetRelid(relation));   server = GetForeignServer(table->serverid);    user =
GetUserMapping(relation->rd_rel->relowner,server->serverid);
 
-    conn = GetConnection(server, user, false);
-
+    conn = GetConnection(server, user, false, false);    /*     * Construct command to get page count for relation.
*/
 
@@ -2345,7 +2446,7 @@ postgresAnalyzeForeignTable(Relation relation,    }    PG_END_TRY();
-    ReleaseConnection(conn);
+    ReleaseConnection(server, user, conn);    return true;}
@@ -2407,7 +2508,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,    table =
GetForeignTable(RelationGetRelid(relation));   server = GetForeignServer(table->serverid);    user =
GetUserMapping(relation->rd_rel->relowner,server->serverid);
 
-    conn = GetConnection(server, user, false);
+    conn = GetConnection(server, user, false, false);    /*     * Construct cursor that retrieves whole rows from
remote.
@@ -2479,7 +2580,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,    }    PG_END_TRY();
-    ReleaseConnection(conn);
+    ReleaseConnection(server, user, conn);    /* We assume that we have no dead tuple. */    *totaldeadrows = 0.0;
@@ -2609,7 +2710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)     */    server =
GetForeignServer(serverOid);   mapping = GetUserMapping(GetUserId(), server->serverid);
 
-    conn = GetConnection(server, mapping, false);
+    conn = GetConnection(server, mapping, false, false);    /* Don't attempt to import collation if remote server
hasn'tgot it */    if (PQserverVersion(conn) < 90100)
 
@@ -2826,7 +2927,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)    }    PG_END_TRY();
-    ReleaseConnection(conn);
+    ReleaseConnection(server, mapping, conn);    return commands;}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 94eadae..387b2ff 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -25,10 +25,16 @@ extern int    set_transmission_modes(void);extern void reset_transmission_modes(int nestlevel);/*
inconnection.c */
 
+extern bool ReserveParallelConnection(ForeignServer *server, UserMapping *user,
+                                      int aux_conn_limit);
+extern void ResetReservedParallelConnections(ForeignServer *server,
+                                             UserMapping *user);extern PGconn *GetConnection(ForeignServer *server,
UserMapping*user,
 
-              bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
+                             bool will_prep_stmt, bool parallel);
+extern void ReleaseConnection(ForeignServer *server, UserMapping *user,
+                              PGconn *conn);extern unsigned int GetCursorNumber(PGconn *conn);
+extern int GetServerOrdinate(Oid serverid);extern unsigned int GetPrepStmtNumber(PGconn *conn);extern void
pgfdw_report_error(intelevel, PGresult *res, PGconn *conn,                   bool clear, const char *sql); 
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 5a4d5aa..7a12542 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -524,6 +524,7 @@ fileGetForeignPaths(PlannerInfo *root,                                     total_cost,
                      NIL,        /* no pathkeys */                                     NULL,        /* no outer rel
either*/
 
+                                     false,                                     coptions));    /*
@@ -560,6 +561,7 @@ fileGetForeignPlan(PlannerInfo *root,    return make_foreignscan(tlist,
scan_clauses,                           scan_relid,
 
+                            best_path->path.parallel,                            NIL,    /* no expressions to evaluate
*/                           best_path->fdw_private);} 
-- DROP SERVER IF EXISTS pgs1 CASCADE;
-- DROP VIEW IF EXISTS v CASCADE;
-- DROP TABLE IF EXISTS t CASCADE;

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate
'true',max_aux_connections '3');
 

CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;

CREATE TABLE t (a int, b int, c text);
ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() * 1000)::int) FROM generate_series(0,
2999));

CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY
a.bLIMIT 10;
 

CREATE FOREIGN TABLE fts1 (a int, b int, c text) SERVER pgs1 OPTIONS (table_name 't');
CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');

SET parallel_ratio_threshold to 1.0;

EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1 b on (a.a = b.a);

SET parallel_ratio_threshold to 0.0;

EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1 b on (a.a = b.a);

pgsql-hackers by date:

Previous
From: Anastasia Lubennikova
Date:
Subject: Index-only scans for GIST
Next
From: Kyotaro HORIGUCHI
Date:
Subject: Re: Introducing coarse grain parallelism by postgres_fdw.