Thread: Introducing coarse grain parallelism by postgres_fdw.

Introducing coarse grain parallelism by postgres_fdw.

From
Kyotaro HORIGUCHI
Date:
Hello, 

I noticed that postgresql_fdw can run in parallel by very small
change. The attached patch let scans by postgres_fdws on
different foreign servers run sumiltaneously. This seems a
convenient entry point to parallel execution.

For the testing configuration which the attched sql script makes,
it almost halves the response time because the remote queries
take far longer startup time than running time. The two foreign
tables fvs1, fvs2 and fvs1_2 are defined on the same table but
fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is
on the another foreign server pgs2.

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);
   QUERY PLAN
 
-----------------------------------------------------------------------
Hash Join (actual time=12083.640..12083.657 rows=16 loops=1) Hash Cond: (a.a = b.a)->  Foreign Scan on fvs1 a (actual
time=6091.405..6091.407rows=10 loops=1)->  Hash (actual time=5992.212..5992.212 rows=10 loops=1)    Buckets: 1024
Batches:1  Memory Usage: 7kB ->  Foreign Scan on fvs1_2 b (actual time=5992.191..5992.198 rows=10 loops=1)Execution
time:12085.330 ms
 
(7 rows)

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
 QUERY PLAN
 
-----------------------------------------------------------------------
Hash Join (actual time=6325.004..6325.019 rows=16 loops=1) Hash Cond: (a.a = b.a)->  Foreign Scan on fvs1 a (actual
time=6324.910..6324.913rows=10 loops=1)->  Hash (actual time=0.073..0.073 rows=10 loops=1)     Buckets: 1024  Batches:
1 Memory Usage: 7kB ->  Foreign Scan on fvs2 b (actual time=0.048..0.052 rows=10 loops=1)Execution time: 6327.708 ms
 
(7 rows)

In turn, pure local query is executed as below..

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM v a join v b on (a.a = b.a);
 QUERY PLAN
 
------------------------------------------------------------------------------Hash Join (actual
time=15757.915..15757.925rows=16 loops=1)  Hash Cond: (a.a = b.a)  ->  Limit (actual time=7795.919..7795.922 rows=10
loops=1)    ->  Sort (actual time=7795.915..7795.915 rows=10 loops=1)        ->  Nested Loop (actual
time=54.769..7795.618rows=252 loops=1)            ->  Seq Scan on t a (actual time=0.010..2.117 rows=5000 loops=1)
     ->  Materialize (actual time=0.000..0.358 rows=5000 loops=5000)               ->  Seq Scan on t b_1 (actual
time=0.004..2.829rows=5000 ...  ->  Hash (actual time=7961.969..7961.969 rows=10 loops=1)     ->  Subquery Scan on b
(actualtime=7961.948..7961.952 rows=10 loops=1)        ->  Limit (actual time=7961.946..7961.950 rows=10 loops=1)
    ->  Sort (actual time=7961.946..7961.948 rows=10 loops=1)               ->  Nested Loop (actual
time=53.518..7961.611rows=252 loops=1)                  ->  Seq Scan on t a_1 (actual time=0.004..2.247 rows=5000...
             ->  Materialize (actual time=0.000..0.357 rows=5000...                     ->  Seq Scan on t b_2 (actual
time=0.001..1.565rows=500..Execution time: 15758.629 ms
 
(26 rows)


I will try this way for the present.

Any opinions or suggestions?

- Is this a correct entry point?

- Parallel postgres_fdw is of course a intermediate shape. Itshould go toward more intrinsic form.

- Planner should be aware of parallelism. The first step seems tobe doable since postgres_fdw can get correct startup
andrunningcosts. But they might should be calculated locally for loopbackconnections finally. Dedicated node would be
needed.

- The far effective intercommunication means between backendsincluding backend workers (which seems to be discussed
inanotherthread) is needed and this could be the test bench forit.
 

- This patch is the minimal implement to get parallel scanavailable. A facility to exporting/importing execution trees
maypromisefar flexible parallelism. Deparsing is usable toreconstruct partial query?
 

- The means for resource management, especially on number ofbackends is required. This could be done on foreign server
inasimple form for the present. Finally this will be moved intointrinsic loopback connection manager?
 

- Any other points to consider?


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..399ca31 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -411,11 +411,13 @@ begin_remote_xact(ConnCacheEntry *entry)voidReleaseConnection(PGconn *conn){
-    /*
-     * Currently, we don't actually track connection references because all
-     * cleanup is managed on a transaction or subtransaction basis instead. So
-     * there's nothing to do here.
-     */
+    /* Clean up current asynchronous query if any */
+    while (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+    {
+        PGresult *res = PQgetResult(conn);
+        if (res)
+            PQclear(res);
+    }        }/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 4c49776..eec299e 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -306,7 +306,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass*ec, EquivalenceMember *em,                          void *arg);static void
create_cursor(ForeignScanState*node);
 
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool async_start);static void close_cursor(PGconn *conn, unsigned
intcursor_number);static void prepare_foreign_modify(PgFdwModifyState *fmstate);static const char
**convert_prep_stmt_params(PgFdwModifyState*fmstate,
 
@@ -328,6 +328,9 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,                           MemoryContext
temp_context);staticvoid conversion_error_callback(void *arg);
 
+#define CONN_IS_IDLE(conn) \
+    (PQtransactionStatus(conn) == PQTRANS_IDLE || \
+     PQtransactionStatus(conn) == PQTRANS_INTRANS)/* * Foreign-data wrapper handler function: return a struct with
pointers
@@ -981,6 +984,18 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)        fsstate->param_values = (const
char**) palloc0(numParams * sizeof(char *));    else        fsstate->param_values = NULL;
 
+
+    if (CONN_IS_IDLE(fsstate->conn))
+    {
+        /* 
+         * This connection is allowed asynchronous query execution, so start
+         * it now. This relies on the fact that ExecInitForeignScan's share
+         * the same foreign server are executed in the same order with
+         * ExecForeignScan's.
+         */
+        create_cursor(node);
+        fetch_more_data(node, true);
+    }}/*
@@ -1008,7 +1023,7 @@ postgresIterateForeignScan(ForeignScanState *node)    {        /* No point in another fetch if we
alreadydetected EOF, though. */        if (!fsstate->eof_reached)
 
-            fetch_more_data(node);
+            fetch_more_data(node, false);        /* If we didn't get any tuples, must be end of data. */        if
(fsstate->next_tuple>= fsstate->num_tuples)            return ExecClearTuple(slot);
 
@@ -2001,10 +2016,11 @@ create_cursor(ForeignScanState *node)}/*
- * Fetch some more rows from the node's cursor.
+ * Fetch some more rows from the node's cursor. It starts asynchronous query
+ * execution then immediately returns if async_start is true. */static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool async_start){    PgFdwScanState *fsstate = (PgFdwScanState *)
node->fdw_state;   PGresult   *volatile res = NULL;
 
@@ -2033,36 +2049,56 @@ fetch_more_data(ForeignScanState *node)        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
              fetch_size, fsstate->cursor_number);
 
-        res = PQexec(conn, sql);
-        /* On error, report the original query, not the FETCH. */
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-
-        /* Convert the data into HeapTuples */
-        numrows = PQntuples(res);
-        fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-        fsstate->num_tuples = numrows;
-        fsstate->next_tuple = 0;
-
-        for (i = 0; i < numrows; i++)
+        if (async_start)        {
-            fsstate->tuples[i] =
-                make_tuple_from_result_row(res, i,
-                                           fsstate->rel,
-                                           fsstate->attinmeta,
-                                           fsstate->retrieved_attrs,
-                                           fsstate->temp_cxt);
+            Assert(CONN_IS_IDLE(conn));
+
+            if (!PQsendQuery(conn, sql))
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);        }
+        else
+        {
+            if (!CONN_IS_IDLE(conn))
+                res = PQgetResult(conn);
+            /*
+             * Transaction status won't be INTRANS or IDLE before calling
+             * PQgetResult() after all result is received. PQgetResult()
+             * returns NULL for the case.
+             */
-        /* Update fetch_ct_2 */
-        if (fsstate->fetch_ct_2 < 2)
-            fsstate->fetch_ct_2++;
+            if (!res)
+                res = PQexec(conn, sql);
-        /* Must be EOF if we didn't get as many tuples as we asked for. */
-        fsstate->eof_reached = (numrows < fetch_size);
+            /* On error, report the original query, not the FETCH. */
+            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-        PQclear(res);
-        res = NULL;
+            /* Convert the data into HeapTuples */
+            numrows = PQntuples(res);
+            fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+            fsstate->num_tuples = numrows;
+            fsstate->next_tuple = 0;
+
+            for (i = 0; i < numrows; i++)
+            {
+                fsstate->tuples[i] =
+                    make_tuple_from_result_row(res, i,
+                                               fsstate->rel,
+                                               fsstate->attinmeta,
+                                               fsstate->retrieved_attrs,
+                                               fsstate->temp_cxt);
+            }
+
+            /* Update fetch_ct_2 */
+            if (fsstate->fetch_ct_2 < 2)
+                fsstate->fetch_ct_2++;
+
+            /* Must be EOF if we didn't get as many tuples as we asked for. */
+            fsstate->eof_reached = (numrows < fetch_size);
+
+            PQclear(res);
+            res = NULL;
+        }    }    PG_CATCH();    {
DROP SERVER IF EXISTS pgs1 CASCADE;
DROP SERVER IF EXISTS pgs2 CASCADE;
DROP VIEW IF EXISTS v CASCADE;
DROP TABLE IF EXISTS t CASCADE;

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate
'true');
CREATE SERVER pgs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate
'true');

CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;
CREATE USER MAPPING FOR CURRENT_USER SERVER pgs2;

CREATE TABLE t (a int, b int, c text);
ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() * 1000)::int) FROM generate_series(0,
4999));
-- EXPLAIN ANALYZE SELECT * FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;
CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY
a.bLIMIT 10;
 

CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs1_2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs2 OPTIONS (table_name 'v');


EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);


Re: Introducing coarse grain parallelism by postgres_fdw.

From
Ashutosh Bapat
Date:
Hi Kyotaro,
fetch_more_rows() always runs "FETCH 100 FROM <cursor_name>" on the foreign server to get the next set of rows. The changes you have made seem to run only the first FETCHes from all the nodes but not the subsequent ones. The optimization will be helpful only when there are less than 100 rows per postgres connection in the query. If there are more than 100 rows from a single foreign server, the second onwards FETCHes will be serialized.

Is my understanding correct?


On Fri, Jul 25, 2014 at 2:05 PM, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Hello,

I noticed that postgresql_fdw can run in parallel by very small
change. The attached patch let scans by postgres_fdws on
different foreign servers run sumiltaneously. This seems a
convenient entry point to parallel execution.

For the testing configuration which the attched sql script makes,
it almost halves the response time because the remote queries
take far longer startup time than running time. The two foreign
tables fvs1, fvs2 and fvs1_2 are defined on the same table but
fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is
on the another foreign server pgs2.

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);
                            QUERY PLAN
-----------------------------------------------------------------------
Hash Join (actual time=12083.640..12083.657 rows=16 loops=1)
  Hash Cond: (a.a = b.a)
 ->  Foreign Scan on fvs1 a (actual time=6091.405..6091.407 rows=10 loops=1)
 ->  Hash (actual time=5992.212..5992.212 rows=10 loops=1)
     Buckets: 1024  Batches: 1  Memory Usage: 7kB
  ->  Foreign Scan on fvs1_2 b (actual time=5992.191..5992.198 rows=10 loops=1)
 Execution time: 12085.330 ms
(7 rows)

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
                            QUERY PLAN
-----------------------------------------------------------------------
Hash Join (actual time=6325.004..6325.019 rows=16 loops=1)
  Hash Cond: (a.a = b.a)
 ->  Foreign Scan on fvs1 a (actual time=6324.910..6324.913 rows=10 loops=1)
 ->  Hash (actual time=0.073..0.073 rows=10 loops=1)
      Buckets: 1024  Batches: 1  Memory Usage: 7kB
  ->  Foreign Scan on fvs2 b (actual time=0.048..0.052 rows=10 loops=1)
 Execution time: 6327.708 ms
(7 rows)

In turn, pure local query is executed as below..

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM v a join v b on (a.a = b.a);
                                  QUERY PLAN
------------------------------------------------------------------------------
 Hash Join (actual time=15757.915..15757.925 rows=16 loops=1)
   Hash Cond: (a.a = b.a)
   ->  Limit (actual time=7795.919..7795.922 rows=10 loops=1)
      ->  Sort (actual time=7795.915..7795.915 rows=10 loops=1)
         ->  Nested Loop (actual time=54.769..7795.618 rows=252 loops=1)
             ->  Seq Scan on t a (actual time=0.010..2.117 rows=5000 loops=1)
             ->  Materialize (actual time=0.000..0.358 rows=5000 loops=5000)
                ->  Seq Scan on t b_1 (actual time=0.004..2.829 rows=5000 ...
   ->  Hash (actual time=7961.969..7961.969 rows=10 loops=1)
      ->  Subquery Scan on b (actual time=7961.948..7961.952 rows=10 loops=1)
         ->  Limit (actual time=7961.946..7961.950 rows=10 loops=1)
             ->  Sort (actual time=7961.946..7961.948 rows=10 loops=1)
                ->  Nested Loop (actual time=53.518..7961.611 rows=252 loops=1)
                   ->  Seq Scan on t a_1 (actual time=0.004..2.247 rows=5000...
                   ->  Materialize (actual time=0.000..0.357 rows=5000...
                      ->  Seq Scan on t b_2 (actual time=0.001..1.565 rows=500..
 Execution time: 15758.629 ms
(26 rows)


I will try this way for the present.

Any opinions or suggestions?

- Is this a correct entry point?

- Parallel postgres_fdw is of course a intermediate shape. It
 should go toward more intrinsic form.

- Planner should be aware of parallelism. The first step seems to
 be doable since postgres_fdw can get correct startup and running
 costs. But they might should be calculated locally for loopback
 connections finally. Dedicated node would be needed.

- The far effective intercommunication means between backends
 including backend workers (which seems to be discussed in
 another thread) is needed and this could be the test bench for
 it.

- This patch is the minimal implement to get parallel scan
 available. A facility to exporting/importing execution trees may
 promise far flexible parallelism. Deparsing is usable to
 reconstruct partial query?

- The means for resource management, especially on number of
 backends is required. This could be done on foreign server in a
 simple form for the present. Finally this will be moved into
 intrinsic loopback connection manager?

- Any other points to consider?


regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center

DROP SERVER IF EXISTS pgs1 CASCADE;
DROP SERVER IF EXISTS pgs2 CASCADE;
DROP VIEW IF EXISTS v CASCADE;
DROP TABLE IF EXISTS t CASCADE;

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate 'true');
CREATE SERVER pgs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate 'true');

CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;
CREATE USER MAPPING FOR CURRENT_USER SERVER pgs2;

CREATE TABLE t (a int, b int, c text);
ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() * 1000)::int) FROM generate_series(0, 4999));
-- EXPLAIN ANALYZE SELECT * FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;
CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;

CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs1_2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs2 OPTIONS (table_name 'v');


EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);



--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers




--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

Re: Introducing coarse grain parallelism by postgres_fdw.

From
Ashutosh Bapat
Date:
In order to minimize the impact, what can be done is to execute fetch_more_data() in asynchronous mode every time, when there only few rows left to be consumed. So in current code below
1019     /*
1020      * Get some more tuples, if we've run out.
1021      */
1022     if (fsstate->next_tuple >= fsstate->num_tuples)
1023     {
1024         /* No point in another fetch if we already detected EOF, though. */
1025         if (!fsstate->eof_reached)
1026             fetch_more_data(node, false);
1027         /* If we didn't get any tuples, must be end of data. */
1028         if (fsstate->next_tuple >= fsstate->num_tuples)
1029             return ExecClearTuple(slot);
1030     }

replace line 1022 with if (fsstate->next_tuple >= fsstate->num_tuples)
with if (fsstate->next_tuple >= fsstate->num_tuples - SOME_BUFFER_NUMBER_ROWS)

Other possibility is to call PQsendQuery(conn, sql), after line 2100 and if eof_reached is false.

2096             /* Must be EOF if we didn't get as many tuples as we asked for. */
2097             fsstate->eof_reached = (numrows < fetch_size);
2098
2099             PQclear(res);
2100             res = NULL;



On Fri, Jul 25, 2014 at 3:37 PM, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote:
Hi Kyotaro,
fetch_more_rows() always runs "FETCH 100 FROM <cursor_name>" on the foreign server to get the next set of rows. The changes you have made seem to run only the first FETCHes from all the nodes but not the subsequent ones. The optimization will be helpful only when there are less than 100 rows per postgres connection in the query. If there are more than 100 rows from a single foreign server, the second onwards FETCHes will be serialized.

Is my understanding correct?


On Fri, Jul 25, 2014 at 2:05 PM, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Hello,

I noticed that postgresql_fdw can run in parallel by very small
change. The attached patch let scans by postgres_fdws on
different foreign servers run sumiltaneously. This seems a
convenient entry point to parallel execution.

For the testing configuration which the attched sql script makes,
it almost halves the response time because the remote queries
take far longer startup time than running time. The two foreign
tables fvs1, fvs2 and fvs1_2 are defined on the same table but
fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is
on the another foreign server pgs2.

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);
                            QUERY PLAN
-----------------------------------------------------------------------
Hash Join (actual time=12083.640..12083.657 rows=16 loops=1)
  Hash Cond: (a.a = b.a)
 ->  Foreign Scan on fvs1 a (actual time=6091.405..6091.407 rows=10 loops=1)
 ->  Hash (actual time=5992.212..5992.212 rows=10 loops=1)
     Buckets: 1024  Batches: 1  Memory Usage: 7kB
  ->  Foreign Scan on fvs1_2 b (actual time=5992.191..5992.198 rows=10 loops=1)
 Execution time: 12085.330 ms
(7 rows)

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
                            QUERY PLAN
-----------------------------------------------------------------------
Hash Join (actual time=6325.004..6325.019 rows=16 loops=1)
  Hash Cond: (a.a = b.a)
 ->  Foreign Scan on fvs1 a (actual time=6324.910..6324.913 rows=10 loops=1)
 ->  Hash (actual time=0.073..0.073 rows=10 loops=1)
      Buckets: 1024  Batches: 1  Memory Usage: 7kB
  ->  Foreign Scan on fvs2 b (actual time=0.048..0.052 rows=10 loops=1)
 Execution time: 6327.708 ms
(7 rows)

In turn, pure local query is executed as below..

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM v a join v b on (a.a = b.a);
                                  QUERY PLAN
------------------------------------------------------------------------------
 Hash Join (actual time=15757.915..15757.925 rows=16 loops=1)
   Hash Cond: (a.a = b.a)
   ->  Limit (actual time=7795.919..7795.922 rows=10 loops=1)
      ->  Sort (actual time=7795.915..7795.915 rows=10 loops=1)
         ->  Nested Loop (actual time=54.769..7795.618 rows=252 loops=1)
             ->  Seq Scan on t a (actual time=0.010..2.117 rows=5000 loops=1)
             ->  Materialize (actual time=0.000..0.358 rows=5000 loops=5000)
                ->  Seq Scan on t b_1 (actual time=0.004..2.829 rows=5000 ...
   ->  Hash (actual time=7961.969..7961.969 rows=10 loops=1)
      ->  Subquery Scan on b (actual time=7961.948..7961.952 rows=10 loops=1)
         ->  Limit (actual time=7961.946..7961.950 rows=10 loops=1)
             ->  Sort (actual time=7961.946..7961.948 rows=10 loops=1)
                ->  Nested Loop (actual time=53.518..7961.611 rows=252 loops=1)
                   ->  Seq Scan on t a_1 (actual time=0.004..2.247 rows=5000...
                   ->  Materialize (actual time=0.000..0.357 rows=5000...
                      ->  Seq Scan on t b_2 (actual time=0.001..1.565 rows=500..
 Execution time: 15758.629 ms
(26 rows)


I will try this way for the present.

Any opinions or suggestions?

- Is this a correct entry point?

- Parallel postgres_fdw is of course a intermediate shape. It
 should go toward more intrinsic form.

- Planner should be aware of parallelism. The first step seems to
 be doable since postgres_fdw can get correct startup and running
 costs. But they might should be calculated locally for loopback
 connections finally. Dedicated node would be needed.

- The far effective intercommunication means between backends
 including backend workers (which seems to be discussed in
 another thread) is needed and this could be the test bench for
 it.

- This patch is the minimal implement to get parallel scan
 available. A facility to exporting/importing execution trees may
 promise far flexible parallelism. Deparsing is usable to
 reconstruct partial query?

- The means for resource management, especially on number of
 backends is required. This could be done on foreign server in a
 simple form for the present. Finally this will be moved into
 intrinsic loopback connection manager?

- Any other points to consider?


regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center

DROP SERVER IF EXISTS pgs1 CASCADE;
DROP SERVER IF EXISTS pgs2 CASCADE;
DROP VIEW IF EXISTS v CASCADE;
DROP TABLE IF EXISTS t CASCADE;

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate 'true');
CREATE SERVER pgs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate 'true');

CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;
CREATE USER MAPPING FOR CURRENT_USER SERVER pgs2;

CREATE TABLE t (a int, b int, c text);
ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() * 1000)::int) FROM generate_series(0, 4999));
-- EXPLAIN ANALYZE SELECT * FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;
CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;

CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs1_2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs2 OPTIONS (table_name 'v');


EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);



--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers




--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company



--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

Re: Introducing coarse grain parallelism by postgres_fdw.

From
Kyotaro HORIGUCHI
Date:
Hello, thank you for the comment.

> Hi Kyotaro,
> fetch_more_rows() always runs "FETCH 100 FROM <cursor_name>" on the foreign
> server to get the next set of rows. The changes you have made seem to run
> only the first FETCHes from all the nodes but not the subsequent ones. The
> optimization will be helpful only when there are less than 100 rows per
> postgres connection in the query. If there are more than 100 rows from a
> single foreign server, the second onwards FETCHes will be serialized.
> 
> Is my understanding correct?

Yes, you're right. So I wrote that as following.

Me> it almost halves the response time because the remote queries
Me> take far longer startup time than running time.

Parallelizing all FETCHes would be effective if the connection
transfers bytes at a speed near the row fetch speed but I
excluded the case because of the my assumption that the chance is
relatively lower for the gain, and for the simplicity as PoC. If
this approach is not so inappropriate and not getting objections,
I will work on this for the more complete implement, including
cost estimation.

> On Fri, Jul 25, 2014 at 2:05 PM, Kyotaro HORIGUCHI <
> horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> 
> > Hello,
> >
> > I noticed that postgresql_fdw can run in parallel by very small
> > change. The attached patch let scans by postgres_fdws on
> > different foreign servers run sumiltaneously. This seems a
> > convenient entry point to parallel execution.
> >
> > For the testing configuration which the attched sql script makes,
> > it almost halves the response time because the remote queries
> > take far longer startup time than running time. The two foreign
> > tables fvs1, fvs2 and fvs1_2 are defined on the same table but
> > fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is
> > on the another foreign server pgs2.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Introducing coarse grain parallelism by postgres_fdw.

From
Kyotaro HORIGUCHI
Date:
Hello,

> In order to minimize the impact, what can be done is to execute
> fetch_more_data() in asynchronous mode every time, when there only few rows
> left to be consumed. So in current code below
> 1019     /*
> 1020      * Get some more tuples, if we've run out.
> 1021      */
> 1022     if (fsstate->next_tuple >= fsstate->num_tuples)
> 1023     {
> 1024         /* No point in another fetch if we already detected EOF,
> though. */
> 1025         if (!fsstate->eof_reached)
> 1026             fetch_more_data(node, false);
> 1027         /* If we didn't get any tuples, must be end of data. */
> 1028         if (fsstate->next_tuple >= fsstate->num_tuples)
> 1029             return ExecClearTuple(slot);
> 1030     }
> 
> replace line 1022 with if (fsstate->next_tuple >= fsstate->num_tuples)
> with if (fsstate->next_tuple >= fsstate->num_tuples -
> SOME_BUFFER_NUMBER_ROWS)
> Other possibility is to call PQsendQuery(conn, sql), after line 2100 and if
> eof_reached is false.
> 
> 2096             /* Must be EOF if we didn't get as many tuples as we asked
> for. */
> 2097             fsstate->eof_reached = (numrows < fetch_size);
> 2098
> 2099             PQclear(res);
> 2100             res = NULL;

I see, I'll consider it. If late (lazy) error detection is
allowed, single row mode seems available, too.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Introducing coarse grain parallelism by postgres_fdw.

From
Kyotaro HORIGUCHI
Date:
Hello, this is the new version which is complete to some extent
of parallelism based on postgres_fdw.

This compares the costs for parallel and non-parallel execution
and choose parallel one if it is faster by some extent specified
by GUCs. The attached files are,
0001_parallel_exec_planning_v0.patch:  - PostgreSQL body stuff for parallel execution planning.
0002_enable_postgres_fdw_to_run_in_parallel_v0.patch:  - postgres_fdw parallelization.
0003_file_fdw_changes_to_avoid_error.patch:  - error avoidig stuff for file_fdw (not necessary for this patch)
env.sql:  - simple test script to try this patch.

=====
- planner stuff to handle cost of parallel execution. Including  indication of parallel execution.
- GUCs to control how easy to go parallel.
  parallel_cost_threshold is the threshold of path total cost  where to enable parallel execution.
  prallel_ratio_threshond is the threshold of the ratio of  parallel cost to non-parallel cost where to choose the
parallelpath.
 
- postgres_fdw which can run in multiple sessions using snapshot  export and fetches in parallel for foreign scans on
dedicated connections.
 
  foreign server has a new option 'max_aux_connections', which  limits the number of connections for parallel execution
per (server, user) pairs.
 
- change file_fdw to follow the changes of planner stuff.


Whth the patch attached, the attached sql script shows the
following result (after some line breaks are added).

postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c          FROM fvs1 a join fvs1_2 b on (a.a = b.a);
        QUERY PLAN
 
----------------------------------------------------------------------------
Hash Join  (cost=9573392.96..9573393.34 rows=1 width=40 parallel)          (actual time=2213.400..2213.407 rows=12
loops=1)HashCond: (a.a = b.a)->  Foreign Scan on fvs1 a          (cost=9573392.96..9573393.29 rows=10 width=8 parallel)
        (actual time=2199.992..2199.993 rows=10 loops=1)->  Hash  (cost=9573393.29..9573393.29 rows=10 width=36)
 (actual time=13.388..13.388 rows=10 loops=1)      Buckets: 1024  Batches: 1  Memory Usage: 6kB      ->  Foreign Scan
onfvs1_2 b                   (cost=9573392.96..9573393.29 rows=10 width=36 parallel)                  (actual
time=13.376..13.379rows=10 loops=1)Planning time: 4.761 msExecution time: 2227.462 ms
 
(8 rows)
postgres=# SET parallel_ratio_threshold to 0.0;
postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c          FROM fvs1 a join fvs1 b on (a.a = b.a);
      QUERY PLAN
 
------------------------------------------------------------------------------Hash Join  (cost=318084.32..318084.69
rows=1width=40)           (actual time=4302.913..4302.928 rows=12 loops=1)  Hash Cond: (a.a = b.a)  ->  Foreign Scan on
fvs1a  (cost=159041.93..159042.26 rows=10 width=8)                              (actual time=2122.989..2122.992 rows=10
loops=1) ->  Hash  (cost=159042.26..159042.26 rows=10 width=500)            (actual time=2179.900..2179.900 rows=10
loops=1)       Buckets: 1024  Batches: 1  Memory Usage: 6kB        ->  Foreign Scan on fvs1 b
(cost=159041.93..159042.26rows=10 width=500)                  (actual time=2179.856..2179.864 rows=10 loops=1)Planning
time:5.085 msExecution time: 4303.728 ms
 
(8 rows)

Where, "parallel" indicates that the node includes nodes run in
parallel. The latter EXPLAIN shows the result when parallel
execution is inhibited.

Since the lack of time, sorry that the details for this patch is
comming later.

Is there any suggestions or opinions?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..d810c3c 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1168,16 +1168,18 @@ ExplainNode(PlanState *planstate, List *ancestors,    {        if (es->format ==
EXPLAIN_FORMAT_TEXT)       {
 
-            appendStringInfo(es->str, "  (cost=%.2f..%.2f rows=%.0f width=%d)",
+            appendStringInfo(es->str, "  (cost=%.2f..%.2f rows=%.0f width=%d%s)",
plan->startup_cost,plan->total_cost,
 
-                             plan->plan_rows, plan->plan_width);
+                             plan->plan_rows, plan->plan_width,
+                             plan->parallel_start ? " parallel" : "");        }        else        {
ExplainPropertyFloat("StartupCost", plan->startup_cost, 2, es);            ExplainPropertyFloat("Total Cost",
plan->total_cost,2, es);            ExplainPropertyFloat("Plan Rows", plan->plan_rows, 0, es);
 
-            ExplainPropertyInteger("Plan Width", plan->plan_width, es);
+            ExplainPropertyText("Parallel Exec",
+                                plan->parallel_start ? "true" : "false", es);        }    }
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c81efe9..85d78b4 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -53,6 +53,8 @@ typedef struct pushdown_safety_info/* These parameters are set by GUC */bool        enable_geqo =
false;   /* just in case GUC doesn't set it */int            geqo_threshold;
 
+double        parallel_cost_threshold = 10000;
+double        parallel_ratio_threshold = 0.6;/* Hook for plugins to replace standard_join_search()
*/join_search_hook_typejoin_search_hook = NULL;
 
@@ -112,6 +114,104 @@ static void recurse_push_qual(Node *setOp, Query *topquery,                  RangeTblEntry *rte,
Indexrti, Node *qual);static void remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel);
 
+/*
+ * choose_parallel_walker -- Walk and make whole path tree decide whichever
+ * doing parallel execution or not.
+ */
+static
+void choose_parallel_walker(Path *path, bool run_parallel)
+{
+    ListCell *lc;
+    List *subpaths = NIL;
+    Path *left = NULL, *right = NULL;
+
+    switch (nodeTag(path))
+    {
+    case T_Path:
+    case T_IndexPath:
+    case T_BitmapHeapPath:
+    case T_TidPath:
+    case T_BitmapAndPath:
+    case T_BitmapOrPath:
+    case T_ResultPath:
+        /* These paths is imparallelizable so far, return immediately. */
+        return;
+    case T_ForeignPath: /**/
+        break;
+    case T_NestPath: /**/
+        left  = ((NestPath*)path)->outerjoinpath;
+        right = ((NestPath*)path)->innerjoinpath;
+        break;
+    case T_MergePath: /**/
+        left  = ((MergePath*)path)->jpath.outerjoinpath;
+        right = ((MergePath*)path)->jpath.innerjoinpath;
+        break;
+    case T_HashPath: /**/
+        left  = ((HashPath*)path)->jpath.outerjoinpath;
+        right = ((HashPath*)path)->jpath.innerjoinpath;
+        break;
+    case T_AppendPath: /**/
+        subpaths = ((AppendPath*)path)->subpaths;
+        break;
+    case T_MergeAppendPath: /**/
+        subpaths = ((MergeAppendPath*)path)->subpaths;
+        break;
+    case T_MaterialPath: /**/
+        left = ((MaterialPath*)path)->subpath;
+    case T_UniquePath: /**/
+        left = ((UniquePath*)path)->subpath;
+        break;
+    default:
+        elog(ERROR, "unrecognized path node type: %d",
+             (int) nodeTag(path));
+        break;
+    }
+
+    /* Make this node can be treated ignoreing parallel costs hereafter. */
+    if (run_parallel)
+    {
+        path->startup_cost = path->pstartup_cost;
+        path->total_cost = path->ptotal_cost;
+    }
+    else
+    {
+        path->parallel = false;
+    }
+
+    /* Walk into lower level */
+    if (left)  choose_parallel_walker(left, run_parallel);
+    if (right) choose_parallel_walker(right, run_parallel);
+    foreach (lc, subpaths)
+    {
+        choose_parallel_walker((Path*) lfirst(lc), run_parallel);
+    }
+
+}
+
+/*
+ * choose_parallel_walker --- Decide whether this relation should run in
+ * parallel or not.
+ * 
+ * The relation has path tree having costs for both parallel and non-parallel
+ * execution and we can decide only by looking into the top node of the path
+ * tree to decide whether to do parallel or not. Then make the whole tree to
+ * be handled ignoring parallel execution stuff.
+ */
+void
+choose_parallel_scans(RelOptInfo *rel)
+{
+    ListCell *lc;
+
+    foreach (lc, rel->pathlist)
+    {
+        Path *path = (Path*) lfirst(lc);
+        bool run_parallel = (path->parallel &&
+                 path->total_cost > parallel_cost_threshold &&
+                 path->ptotal_cost < path->total_cost * parallel_ratio_threshold);
+
+        choose_parallel_walker(path, run_parallel);
+    }
+}/* * make_one_rel
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 0cdb790..1f5bcd8 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1334,6 +1334,74 @@ cost_sort(Path *path, PlannerInfo *root,}/*
+ * cost_append
+ *      Determines and returns the cost of a Append node.
+ *
+ * We charge nothing extra for the Append itself, which perhaps is too
+ * optimistic, but since it doesn't do any selection or projection, it is a
+ * pretty cheap node.  If you change this, see also make_append().
+ */
+void
+cost_append(Path *path, List *subpaths)
+{
+    double max_par_startup = 0.0;
+    double par_running = 0.0;
+    double nonpar_startup = -1.0;
+    double nonpar_running = 0.0;
+    ListCell *lc;
+
+    path->rows = 0;
+    path->startup_cost = 0;
+    path->total_cost = 0;
+    foreach(lc, subpaths)
+    {
+        Path       *subpath = (Path *) lfirst(lc);
+        
+        path->rows += subpath->rows;
+        
+        if (lc == list_head(subpaths)) /* first node? */
+        {
+            path->startup_cost = subpath->startup_cost;
+            path->parallel = subpath->parallel;
+        }
+
+        path->total_cost += subpath->total_cost;
+    }
+
+    if (! path->parallel) return;
+
+    /* calculate cost for parallel execution */
+    foreach (lc, subpaths)
+    {
+        Path *subpath = (Path*)lfirst (lc);
+
+        if (subpath->parallel)
+        {
+            if (max_par_startup < subpath->pstartup_cost)
+                max_par_startup = subpath->pstartup_cost;
+            par_running += subpath->ptotal_cost - subpath->pstartup_cost;
+        }
+        else
+        {
+            if (nonpar_startup < 0)
+            {
+                nonpar_startup = subpath->startup_cost;
+                nonpar_running = subpath->total_cost - subpath->startup_cost;
+            }
+            else
+            {
+                nonpar_running += subpath->total_cost;
+            }
+        }
+    }
+    
+    path->pstartup_cost = (max_par_startup < nonpar_startup ?
+                         nonpar_startup : max_par_startup);
+    path->ptotal_cost = path->pstartup_cost +
+        (par_running < nonpar_running ? nonpar_running : par_running);
+}
+
+/* * cost_merge_append *      Determines and returns the cost of a MergeAppend node. *
@@ -1356,12 +1424,17 @@ cost_sort(Path *path, PlannerInfo *root, * 'n_streams' is the number of input streams *
'input_startup_cost'is the sum of the input streams' startup costs * 'input_total_cost' is the sum of the input
streams'total costs
 
+ * 'parallel_estimate' is whether to estimate for parallel execution
+ * 'input_pstart_cost' is the sum of the input streams' parallel startup costs
+ * 'input_ptotal_cost' is the sum of the input streams' parallel total costs * 'tuples' is the number of tuples in all
thestreams */voidcost_merge_append(Path *path, PlannerInfo *root,                  List *pathkeys, int n_streams,
          Cost input_startup_cost, Cost input_total_cost,
 
+                  bool parallel_estimate,
+                  Cost input_pstartup_cost, Cost input_ptotal_cost,                  double tuples){    Cost
startup_cost= 0;
 
@@ -1395,6 +1468,13 @@ cost_merge_append(Path *path, PlannerInfo *root,    path->startup_cost = startup_cost +
input_startup_cost;   path->total_cost = startup_cost + run_cost + input_total_cost;
 
+
+    if (parallel_estimate)
+    {
+        path->pstartup_cost = startup_cost + input_pstartup_cost;
+        path->ptotal_cost = startup_cost + run_cost + input_ptotal_cost;
+        path->parallel = true;
+    }}/*
@@ -1648,6 +1728,54 @@ cost_group(Path *path, PlannerInfo *root,    path->total_cost = total_cost;}
+
+/*
+ * Set the parallel costs for the workspace as non-parallel node.
+ */
+static void
+clear_cost_parallel(JoinCostWorkspace *workspace)
+{
+    workspace->pstartup_cost = workspace->startup_cost;
+    workspace->ptotal_cost = workspace->total_cost;
+    workspace->parallel = false;
+}
+
+/*
+ * Set the initial cost numbers of parallel node for join estimation.
+ * This should be called only if the path node can executed in parallel.
+ */
+static void
+initial_cost_parallel(JoinCostWorkspace *workspace,
+                      Cost outer_pstartup_cost,
+                      Cost inner_pstartup_cost)
+{
+    double pstartup =
+        (outer_pstartup_cost > inner_pstartup_cost ?
+         outer_pstartup_cost : inner_pstartup_cost);
+    workspace->ptotal_cost = 
+        workspace->total_cost -    (workspace->startup_cost - pstartup);
+    workspace->pstartup_cost = pstartup;
+    workspace->parallel = true;
+}
+
+/*
+ * Set the final cost numbers of parallel node for join estimation.
+ * This does nothing if this node won't be executed in parallel.
+ */
+static void
+final_cost_parallel(Path *path, JoinCostWorkspace *workspace,
+                    Cost startup_cost, Cost run_cost)
+{
+    if (workspace->parallel)
+    {
+        path->pstartup_cost =
+            startup_cost + workspace->pstartup_cost;
+        path->ptotal_cost =
+            path->pstartup_cost + run_cost;
+        path->parallel = true;
+    }
+}
+/* * initial_cost_nestloop *      Preliminary estimate of the cost of a nestloop join path.
@@ -1765,6 +1893,14 @@ initial_cost_nestloop(PlannerInfo *root, JoinCostWorkspace *workspace,    /* Save private data
forfinal_cost_nestloop */    workspace->run_cost = run_cost;    workspace->inner_rescan_run_cost =
inner_rescan_run_cost;
+
+    clear_cost_parallel(workspace);
+    if (outer_path->parallel)
+    {
+        initial_cost_parallel(workspace,
+                              outer_path->pstartup_cost,
+                              inner_path->pstartup_cost);
+    }}/*
@@ -1786,7 +1922,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path,    Path       *inner_path =
path->innerjoinpath;   double        outer_path_rows = outer_path->rows;    double        inner_path_rows =
inner_path->rows;
-    Cost        startup_cost = workspace->startup_cost;
+    Cost        startup_cost = 0.0;    Cost        run_cost = workspace->run_cost;    Cost
inner_rescan_run_cost= workspace->inner_rescan_run_cost;    Cost        cpu_per_tuple;
 
@@ -1861,8 +1997,10 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path,    cpu_per_tuple = cpu_tuple_cost +
restrict_qual_cost.per_tuple;   run_cost += cpu_per_tuple * ntuples;
 
-    path->path.startup_cost = startup_cost;
-    path->path.total_cost = startup_cost + run_cost;
+    path->path.startup_cost = startup_cost + workspace->startup_cost;
+    path->path.total_cost = startup_cost + workspace->startup_cost + run_cost;
+
+    final_cost_parallel(&path->path, workspace, startup_cost, run_cost);}/*
@@ -1917,6 +2055,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
innerstartsel,               innerendsel;    Path        sort_path;        /* dummy for result of cost_sort */
 
+    double         outer_startup_cost,
+                inner_startup_cost;    /* Protect some assumptions below that rowcounts aren't zero or NaN */    if
(outer_path_rows<= 0 || isnan(outer_path_rows))
 
@@ -2035,6 +2175,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,                  0.0,
            work_mem,                  -1.0);
 
+        outer_startup_cost = sort_path.startup_cost;        startup_cost += sort_path.startup_cost;
startup_cost+= (sort_path.total_cost - sort_path.startup_cost)            * outerstartsel;
 
@@ -2043,6 +2184,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,    }    else    {
+        outer_startup_cost = outer_path->startup_cost;        startup_cost += outer_path->startup_cost;
startup_cost+= (outer_path->total_cost - outer_path->startup_cost)            * outerstartsel;
 
@@ -2061,6 +2203,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,                  0.0,
            work_mem,                  -1.0);
 
+        inner_startup_cost = sort_path.startup_cost;        startup_cost += sort_path.startup_cost;
startup_cost+= (sort_path.total_cost - sort_path.startup_cost)            * innerstartsel;
 
@@ -2069,6 +2212,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,    }    else    {
+        inner_startup_cost = inner_path->startup_cost;        startup_cost += inner_path->startup_cost;
startup_cost+= (inner_path->total_cost - inner_path->startup_cost)            * innerstartsel;
 
@@ -2096,6 +2240,16 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,    workspace->inner_rows
=inner_rows;    workspace->outer_skip_rows = outer_skip_rows;    workspace->inner_skip_rows = inner_skip_rows;
 
+
+    clear_cost_parallel(workspace);
+    if (outer_path->parallel)
+    {
+        initial_cost_parallel(workspace, 
+                              outer_startup_cost - outer_path->total_cost + 
+                              outer_path->ptotal_cost,
+                              inner_startup_cost - inner_path->total_cost + 
+                              inner_path->ptotal_cost);
+    }}/*
@@ -2128,7 +2282,7 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,    double        inner_path_rows =
inner_path->rows;   List       *mergeclauses = path->path_mergeclauses;    List       *innersortkeys =
path->innersortkeys;
-    Cost        startup_cost = workspace->startup_cost;
+    Cost        startup_cost = 0;    Cost        run_cost = workspace->run_cost;    Cost        inner_run_cost =
workspace->inner_run_cost;   double        outer_rows = workspace->outer_rows;
 
@@ -2320,8 +2474,10 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,    cpu_per_tuple = cpu_tuple_cost +
qp_qual_cost.per_tuple;   run_cost += cpu_per_tuple * mergejointuples;
 
-    path->jpath.path.startup_cost = startup_cost;
-    path->jpath.path.total_cost = startup_cost + run_cost;
+    path->jpath.path.startup_cost = startup_cost + workspace->startup_cost;
+    path->jpath.path.total_cost = path->jpath.path.startup_cost + run_cost;
+
+    final_cost_parallel(&path->jpath.path, workspace, startup_cost, run_cost);}/*
@@ -2485,6 +2641,17 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,    workspace->run_cost =
run_cost;   workspace->numbuckets = numbuckets;    workspace->numbatches = numbatches;
 
+
+    /* costs for parallel execution */
+    clear_cost_parallel(workspace);
+
+    /* Any side is executed first in hash join... */
+    if (inner_path->parallel || outer_path->parallel)
+    {
+        initial_cost_parallel(workspace,
+                              outer_path->pstartup_cost,
+                              inner_path->pstartup_cost);
+    }}/*
@@ -2510,7 +2677,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,    double        outer_path_rows =
outer_path->rows;   double        inner_path_rows = inner_path->rows;    List       *hashclauses =
path->path_hashclauses;
-    Cost        startup_cost = workspace->startup_cost;
+    Cost        startup_cost = 0;    Cost        run_cost = workspace->run_cost;    int            numbuckets =
workspace->numbuckets;   int            numbatches = workspace->numbatches;
 
@@ -2700,8 +2867,10 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,    cpu_per_tuple = cpu_tuple_cost +
qp_qual_cost.per_tuple;   run_cost += cpu_per_tuple * hashjointuples;
 
-    path->jpath.path.startup_cost = startup_cost;
-    path->jpath.path.total_cost = startup_cost + run_cost;
+    path->jpath.path.startup_cost = startup_cost + workspace->startup_cost;
+    path->jpath.path.total_cost = path->jpath.path.startup_cost + run_cost;
+
+    final_cost_parallel(&path->jpath.path, workspace, startup_cost, run_cost);}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 4b641a2..67d801a 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -3163,6 +3163,9 @@ copy_path_costsize(Plan *dest, Path *src)    {        dest->startup_cost = src->startup_cost;
  dest->total_cost = src->total_cost;
 
+        dest->parallel_start = src->parallel;
+        dest->pstartup_cost = src->pstartup_cost;
+        dest->ptotal_cost = src->ptotal_cost;        dest->plan_rows = src->rows;        dest->plan_width =
src->parent->width;   }
 
@@ -3170,6 +3173,9 @@ copy_path_costsize(Plan *dest, Path *src)    {        dest->startup_cost = 0;
dest->total_cost= 0;
 
+        dest->parallel_start = false;
+        dest->pstartup_cost = 0;
+        dest->ptotal_cost = 0;        dest->plan_rows = 0;        dest->plan_width = 0;    }
@@ -3462,6 +3468,7 @@ ForeignScan *make_foreignscan(List *qptlist,                 List *qpqual,                 Index
scanrelid,
+                 bool  parallel,                 List *fdw_exprs,                 List *fdw_private){
@@ -3474,6 +3481,7 @@ make_foreignscan(List *qptlist,    plan->lefttree = NULL;    plan->righttree = NULL;
node->scan.scanrelid= scanrelid;
 
+    node->scan.parallel = parallel;    node->fdw_exprs = fdw_exprs;    node->fdw_private = fdw_private;    /*
fsSystemColwill be filled in by create_foreignscan_plan */
 
@@ -3489,6 +3497,10 @@ make_append(List *appendplans, List *tlist)    Plan       *plan = &node->plan;    double
total_size;   ListCell   *subnode;
 
+    double      max_par_startup = 0;
+    double      par_running = 0;
+    double      nonpar_startup = 0;
+    double      nonpar_running = 0;    /*     * Compute cost as sum of subplan costs.  We charge nothing extra for
the
@@ -3509,12 +3521,27 @@ make_append(List *appendplans, List *tlist)    {        Plan       *subplan = (Plan *)
lfirst(subnode);
-        if (subnode == list_head(appendplans))    /* first node? */
-            plan->startup_cost = subplan->startup_cost;
-        plan->total_cost += subplan->total_cost;
+        if (subplan->parallel_start)
+        {
+            if (max_par_startup < subplan->startup_cost)
+                max_par_startup = subplan->startup_cost;
+            par_running += subplan->total_cost - subplan->startup_cost;
+        }
+        else
+        {
+            nonpar_startup += subplan->startup_cost;
+            nonpar_running += (subplan->total_cost - subplan->startup_cost);
+        }
+        plan->plan_rows += subplan->plan_rows;        total_size += subplan->plan_width * subplan->plan_rows;    }
+
+    plan->startup_cost = 
+        (max_par_startup < nonpar_startup ? nonpar_startup : max_par_startup);
+    plan->total_cost = 
+        plan->startup_cost + par_running + nonpar_running;
+    if (plan->plan_rows > 0)        plan->plan_width = rint(total_size / plan->plan_rows);    else
diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c
index 93484a0..9d75dd7 100644
--- a/src/backend/optimizer/plan/planmain.c
+++ b/src/backend/optimizer/plan/planmain.c
@@ -240,5 +240,8 @@ query_planner(PlannerInfo *root, List *tlist,        final_rel->cheapest_total_path->param_info !=
NULL)       elog(ERROR, "failed to construct the join relation");
 
+    /* Decide each parallelizable path should be run in parallel */
+    choose_parallel_scans(final_rel);
+    return final_rel;}
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 319e8b2..d716dcb 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -900,27 +900,11 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
                * unsorted */    pathnode->subpaths = subpaths;
 
-    /*
-     * We don't bother with inventing a cost_append(), but just do it here.
-     *
-     * Compute rows and costs as sums of subplan rows and costs.  We charge
-     * nothing extra for the Append itself, which perhaps is too optimistic,
-     * but since it doesn't do any selection or projection, it is a pretty
-     * cheap node.  If you change this, see also make_append().
-     */
-    pathnode->path.rows = 0;
-    pathnode->path.startup_cost = 0;
-    pathnode->path.total_cost = 0;
+    cost_append(&pathnode->path, subpaths);
+    foreach(l, subpaths)    {
-        Path       *subpath = (Path *) lfirst(l);
-
-        pathnode->path.rows += subpath->rows;
-
-        if (l == list_head(subpaths))    /* first node? */
-            pathnode->path.startup_cost = subpath->startup_cost;
-        pathnode->path.total_cost += subpath->total_cost;
-
+        Path *subpath = (Path*)lfirst(l);        /* All child paths must have same parameterization */
Assert(bms_equal(PATH_REQ_OUTER(subpath),required_outer));    }
 
@@ -943,7 +927,14 @@ create_merge_append_path(PlannerInfo *root,    MergeAppendPath *pathnode =
makeNode(MergeAppendPath);   Cost        input_startup_cost;    Cost        input_total_cost;
 
+    Cost        input_pstartup_cost;
+    Cost        input_ptotal_cost;    ListCell   *l;
+    Cost        max_par_startup = 0.0;
+    Cost        par_running = 0.0;
+    Cost        nonpar_startup = 0.0;
+    Cost         nonpar_running = 0.0;
+    bool        parallel_exists = false;    pathnode->path.pathtype = T_MergeAppend;    pathnode->path.parent = rel;
@@ -970,14 +961,16 @@ create_merge_append_path(PlannerInfo *root,    foreach(l, subpaths)    {        Path
*subpath= (Path *) lfirst(l);
 
+        double additional_startup_cost = 0;
+        double additional_total_cost = 0;        pathnode->path.rows += subpath->rows;        if
(pathkeys_contained_in(pathkeys,subpath->pathkeys))        {            /* Subpath is adequately ordered, we won't need
tosort it */
 
-            input_startup_cost += subpath->startup_cost;
-            input_total_cost += subpath->total_cost;
+            additional_startup_cost = subpath->startup_cost;
+            additional_total_cost = subpath->total_cost;        }        else        {
@@ -993,18 +986,55 @@ create_merge_append_path(PlannerInfo *root,                      0.0,
work_mem,                     pathnode->limit_tuples);
 
-            input_startup_cost += sort_path.startup_cost;
-            input_total_cost += sort_path.total_cost;
+            additional_startup_cost = sort_path.startup_cost;
+            additional_total_cost = sort_path.total_cost;
+        }
+
+        input_startup_cost += additional_startup_cost;
+        input_total_cost += additional_total_cost;
+
+        /* cost calculation for parallel execution  */
+        if (subpath->parallel)
+        {
+            Cost thiscost =
+                subpath->pstartup_cost + additional_startup_cost;
+            if (max_par_startup < thiscost)
+                max_par_startup = thiscost;
+            par_running +=
+                (subpath->ptotal_cost - subpath->pstartup_cost) +
+                (additional_total_cost - additional_startup_cost) -
+                (subpath->total_cost - subpath->startup_cost);
+            parallel_exists = true;
+        }
+        else
+        {
+            nonpar_startup +=
+                subpath->startup_cost + additional_startup_cost;
+            nonpar_running +=
+                additional_total_cost - additional_startup_cost;        }        /* All child paths must have same
parameterization*/        Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));    }
 
+    
+
+    if (parallel_exists)
+    {
+        input_pstartup_cost =
+            (max_par_startup < nonpar_startup ?
+             nonpar_startup : max_par_startup);
+        input_ptotal_cost = 
+            input_pstartup_cost +
+            (par_running < nonpar_running ? nonpar_running : par_running);
+    }    /* Now we can compute total costs of the MergeAppend */    cost_merge_append(&pathnode->path, root,
          pathkeys, list_length(subpaths),                      input_startup_cost, input_total_cost,
 
+                      parallel_exists,
+                      input_pstartup_cost, input_ptotal_cost,                      rel->tuples);    return pathnode;
@@ -1066,6 +1096,12 @@ create_material_path(RelOptInfo *rel, Path *subpath)                  subpath->rows,
    rel->width);
 
+    if (subpath->parallel)
+    {
+        Cost diff = subpath->pstartup_cost - subpath->startup_cost;
+        pathnode->path.pstartup_cost = pathnode->path.startup_cost + diff;
+        pathnode->path.ptotal_cost   = pathnode->path.total_cost + diff;
+    }    return pathnode;}
@@ -1292,6 +1328,12 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
pathnode->path.total_cost= subpath->total_cost;        pathnode->path.pathkeys = subpath->pathkeys;
 
+        if (subpath->parallel)
+        {
+            pathnode->path.startup_cost = subpath->startup_cost;
+            pathnode->path.total_cost = subpath->total_cost;
+            pathnode->path.parallel = true;
+        }        rel->cheapest_unique_path = (Path *) pathnode;        MemoryContextSwitchTo(oldcontext);
@@ -1328,6 +1370,12 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
pathnode->path.total_cost= subpath->total_cost;                pathnode->path.pathkeys = subpath->pathkeys;
 
+                if (subpath->parallel)
+                {
+                    pathnode->path.startup_cost = subpath->startup_cost;
+                    pathnode->path.total_cost = subpath->total_cost;
+                    pathnode->path.parallel = true;
+                }                rel->cheapest_unique_path = (Path *) pathnode;
MemoryContextSwitchTo(oldcontext);
@@ -1407,6 +1455,19 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
pathnode->path.total_cost= sort_path.total_cost;    }
 
+    /* calculate parallel execution costs */
+    if (subpath->parallel)
+    {
+        pathnode->path.pstartup_cost = 
+            sort_path.startup_cost - subpath->startup_cost
+            + subpath->pstartup_cost;
+        pathnode->path.ptotal_cost = 
+            sort_path.total_cost - subpath->total_cost
+            + subpath->total_cost;
+        pathnode->path.parallel = true;
+    }
+        
+    rel->cheapest_unique_path = (Path *) pathnode;    MemoryContextSwitchTo(oldcontext);
@@ -1576,9 +1637,8 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,ForeignPath
*create_foreignscan_path(PlannerInfo*root, RelOptInfo *rel,                        double rows, Cost startup_cost, Cost
total_cost,
-                        List *pathkeys,
-                        Relids required_outer,
-                        List *fdw_private)
+                        List *pathkeys, Relids required_outer,
+                        bool parallel, List *fdw_private){    ForeignPath *pathnode = makeNode(ForeignPath);
@@ -1589,6 +1649,9 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,    pathnode->path.rows = rows;
pathnode->path.startup_cost= startup_cost;    pathnode->path.total_cost = total_cost;
 
+    pathnode->path.parallel = parallel;
+    pathnode->path.pstartup_cost = startup_cost;
+    pathnode->path.ptotal_cost = total_cost;    pathnode->path.pathkeys = pathkeys;    pathnode->fdw_private =
fdw_private;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6c52db8..cf3f104 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2691,6 +2691,24 @@ static struct config_real ConfigureNamesReal[] =        0.5, 0.0, 1.0,        NULL, NULL, NULL
},
 
+    {
+        {"parallel_cost_threshold", PGC_USERSET, QUERY_TUNING_METHOD,
+             gettext_noop("Set the minimum total cost to allow parallel plans."),
+            NULL
+        },
+        ¶llel_cost_threshold,
+        10000.0, 0.0, 1.0e10,    /* disable_cost */
+        NULL, NULL, NULL
+    },
+    {
+        {"parallel_ratio_threshold", PGC_USERSET, QUERY_TUNING_METHOD,
+             gettext_noop("Set threshold to select parallel plans, as the ratio of parallel cost to non-parallel
cost."),
+            NULL
+        },
+        ¶llel_ratio_threshold,
+        0.8, 0.0, 10.0,
+        NULL, NULL, NULL
+    },    /* End-of-list marker */    {
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 3b9c683..f4f9deb 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -95,6 +95,9 @@ typedef struct Plan     */    Cost        startup_cost;    /* cost expended before fetching any
tuples*/    Cost        total_cost;        /* total cost (assuming all tuples fetched) */
 
+    bool        parallel_start; /* this node will run in parallel */
+    Cost        pstartup_cost;  /* startup cost for parallel execution */
+    Cost        ptotal_cost;    /* total cost for parallel execution */    /*     * planner's estimate of result size
ofthis plan step
 
@@ -264,6 +267,7 @@ typedef struct Scan{    Plan        plan;    Index        scanrelid;        /* relid is index into
therange table */
 
+    bool        parallel;        /* This scan will run in parallel */} Scan;/* ----------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index dacbe9c..08d59aa 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -729,6 +729,10 @@ typedef struct Path    Cost        startup_cost;    /* cost expended before fetching any tuples */
  Cost        total_cost;        /* total cost (assuming all tuples fetched) */
 
+    bool        parallel;        /* can run in parallel ? */
+    Cost        pstartup_cost;    /* startup cost for parallel execution */
+    Cost        ptotal_cost;    /* total cost for parallel execution */
+    List       *pathkeys;        /* sort ordering of path's output */    /* pathkeys is a List of PathKey nodes; see
above*/} Path;
 
@@ -1626,6 +1630,10 @@ typedef struct JoinCostWorkspace    Cost        startup_cost;    /* cost expended before
fetchingany tuples */    Cost        total_cost;        /* total cost (assuming all tuples fetched) */
 
+    bool        parallel;        /* can run in parallel ? */
+    Cost        pstartup_cost;    /* startup cost for parallel execution */
+    Cost        ptotal_cost;    /* total cost for parallel execution */
+        /* Fields below here should be treated as private to costsize.c */    Cost        run_cost;        /*
non-startupcost components */
 
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 75e2afb..30b9858 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -91,9 +91,12 @@ extern void cost_sort(Path *path, PlannerInfo *root,          List *pathkeys, Cost input_cost,
doubletuples, int width,          Cost comparison_cost, int sort_mem,          double limit_tuples);
 
+extern void cost_append(Path *path, List *subpaths);extern void cost_merge_append(Path *path, PlannerInfo *root,
          List *pathkeys, int n_streams,                  Cost input_startup_cost, Cost input_total_cost,
 
+                  bool parallel_estimate,
+                  Cost input_pstartup_cost, Cost input_ptotal_cost,                  double tuples);extern void
cost_material(Path*path,              Cost input_startup_cost, Cost input_total_cost,
 
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index a0bcc82..b84d176 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -79,9 +79,8 @@ extern Path *create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
Relidsrequired_outer);extern ForeignPath *create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
    double rows, Cost startup_cost, Cost total_cost,
 
-                        List *pathkeys,
-                        Relids required_outer,
-                        List *fdw_private);
+                        List *pathkeys,    Relids required_outer,
+                        bool parallel,    List *fdw_private);extern Relids calc_nestloop_required_outer(Path
*outer_path,Path *inner_path);extern Relids calc_non_nestloop_required_outer(Path *outer_path, Path *inner_path);
 
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 9b22fda..36879f1 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -20,8 +20,10 @@/* * allpaths.c */
-extern bool enable_geqo;
-extern int    geqo_threshold;
+extern bool     enable_geqo;
+extern int        geqo_threshold;
+extern double    parallel_cost_threshold;
+extern double    parallel_ratio_threshold;/* Hook for plugins to replace standard_join_search() */typedef RelOptInfo
*(*join_search_hook_type)(PlannerInfo *root,
 
@@ -30,6 +32,7 @@ typedef RelOptInfo *(*join_search_hook_type) (PlannerInfo *root,extern PGDLLIMPORT
join_search_hook_typejoin_search_hook;
 
+extern void choose_parallel_scans(RelOptInfo *rel);extern RelOptInfo *make_one_rel(PlannerInfo *root, List
*joinlist);externRelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,                     List
*initial_rels);
diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h
index 4504250..e72ad72 100644
--- a/src/include/optimizer/planmain.h
+++ b/src/include/optimizer/planmain.h
@@ -44,7 +44,7 @@ extern Plan *create_plan(PlannerInfo *root, Path *best_path);extern SubqueryScan
*make_subqueryscan(List*qptlist, List *qpqual,                  Index scanrelid, Plan *subplan);extern ForeignScan
*make_foreignscan(List*qptlist, List *qpqual,
 
-                 Index scanrelid, List *fdw_exprs, List *fdw_private);
+             Index scanrelid, bool parallel, List *fdw_exprs, List *fdw_private);extern Append *make_append(List
*appendplans,List *tlist);extern RecursiveUnion *make_recursive_union(List *tlist,                     Plan *lefttree,
Plan*righttree, int wtParam, 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..d252eb1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,11 +44,15 @@ typedef struct ConnCacheKeytypedef struct ConnCacheEntry{    ConnCacheKey key;            /* hash
key(must be first) */
 
-    PGconn       *conn;            /* connection to foreign server, or NULL */
+    PGconn       *conn;            /* primary connection to foreign server */
+    List       *extraconns;        /* Other connection list of AdditionalConns */    int            xact_depth;
/*0 = no xact open, 1 = main xact open, 2 =                                 * one level of subxact open, etc */
 
+    char        *snapshot_id;    /* snapshot id for this server */    bool        have_prep_stmt; /* have we prepared
anystmts in this xact? */    bool        have_error;        /* have any subxacts aborted in this xact? */
 
+    int            reserved_aux_conns; /* Number of reserved rooms for parallel
+                                     * connection */} ConnCacheEntry;/*
@@ -75,31 +79,16 @@ static void pgfdw_subxact_callback(SubXactEvent event,                       SubTransactionId
parentSubid,                      void *arg);
 
-/*
- * Get a PGconn which can be used to execute queries on the remote PostgreSQL
- * server with the user's authorization.  A new connection is established
- * if we don't already have a suitable one, and a transaction is opened at
- * the right subtransaction nesting depth if we didn't do that already.
- *
- * will_prep_stmt must be true if caller intends to create any prepared
- * statements.  Since those don't go away automatically at transaction end
- * (not even on error), we need this flag to cue manual cleanup.
- *
- * XXX Note that caching connections theoretically requires a mechanism to
- * detect change of FDW objects to invalidate already established connections.
- * We could manage that by watching for invalidation events on the relevant
- * syscaches.  For the moment, though, it's not clear that this would really
- * be useful and not mere pedantry.  We could not flush any active connections
- * mid-transaction anyway.
+ * Search connection cache entry for specified server and user. Creates new
+ * one if not exists. */
-PGconn *
-GetConnection(ForeignServer *server, UserMapping *user,
-              bool will_prep_stmt)
+static ConnCacheEntry*
+search_cache_entry(ForeignServer *server, UserMapping *user){
-    bool        found;
-    ConnCacheEntry *entry;    ConnCacheKey key;
+    ConnCacheEntry *entry;
+    bool    found;    /* First time through, initialize connection cache hashtable */    if (ConnectionHash == NULL)
@@ -124,9 +113,6 @@ GetConnection(ForeignServer *server, UserMapping *user,
RegisterSubXactCallback(pgfdw_subxact_callback,NULL);    }
 
-    /* Set flag that we did GetConnection during the current transaction */
-    xact_got_connection = true;
-    /* Create hash key for the entry.  Assume no pad bytes in key struct */    key.serverid = server->serverid;
key.userid= user->userid;
 
@@ -139,11 +125,52 @@ GetConnection(ForeignServer *server, UserMapping *user,    {        /* initialize new hashtable
entry(key is already filled in) */        entry->conn = NULL;
 
+        entry->extraconns = NIL;        entry->xact_depth = 0;        entry->have_prep_stmt = false;
entry->have_error= false;
 
+        entry->reserved_aux_conns = 0;
+        entry->snapshot_id = NULL;    }
+    return entry;
+}
+
+/*
+ * Get a PGconn which can be used to execute queries on the remote PostgreSQL
+ * server with the user's authorization.  A new connection is established
+ * if we don't already have a suitable one, and a transaction is opened at
+ * the right subtransaction nesting depth if we didn't do that already.
+ *
+ * will_prep_stmt must be true if caller intends to create any prepared
+ * statements.  Since those don't go away automatically at transaction end
+ * (not even on error), we need this flag to cue manual cleanup.
+ *
+ * Create new auxiliary connection to be used for parallel query if parallel
+ * is true. The number of auxiliary connections assumed to be limited by the
+ * caller using ReserveParallelConnection().
+ *
+ * XXX Note that caching connections theoretically requires a mechanism to
+ * detect change of FDW objects to invalidate already established connections.
+ * We could manage that by watching for invalidation events on the relevant
+ * syscaches.  For the moment, though, it's not clear that this would really
+ * be useful and not mere pedantry.  We could not flush any active connections
+ * mid-transaction anyway.
+ */
+PGconn *
+GetConnection(ForeignServer *server, UserMapping *user,
+              bool will_prep_stmt, bool parallel)
+{
+    ConnCacheEntry *entry;
+    PGconn        *retconn;
+
+    Assert(!(will_prep_stmt && parallel));
+
+    entry = search_cache_entry(server, user);
+
+    /* Set flag that we did GetConnection during the current transaction */
+    xact_got_connection = true;
+    /*     * We don't check the health of cached connection here, because it would     * require some overhead.
Brokenconnection will be detected when the
 
@@ -160,20 +187,102 @@ GetConnection(ForeignServer *server, UserMapping *user,        entry->xact_depth = 0;    /* just
tobe sure */        entry->have_prep_stmt = false;        entry->have_error = false;
 
+        entry->snapshot_id = NULL;        entry->conn = connect_pg_server(server, user);        elog(DEBUG3, "new
postgres_fdwconnection %p for server \"%s\"",             entry->conn, server->servername);
 
+    }
-    /*
-     * Start a new transaction or subtransaction if needed.
-     */    begin_remote_xact(entry);    /* Remember if caller will prepare statements */    entry->have_prep_stmt |=
will_prep_stmt;
-    return entry->conn;
+    retconn = entry->conn;
+    if (parallel)
+    {
+        PGconn        *conn;
+        PGresult    *res;
+        char         sql[64];
+        MemoryContext oldcontext;
+
+        /* Parallel query needs exported snapshot. */
+        if (entry->snapshot_id == NULL)
+            return NULL;
+
+        /* Make new connection and setup it. */
+        conn = connect_pg_server(server, user);
+        if (IsolationIsSerializable())
+            strncpy(sql, "START TRANSACTION ISOLATION LEVEL SERIALIZABLE", 64);
+        else
+            strncpy(sql, "START TRANSACTION ISOLATION LEVEL REPEATABLE READ", 64);
+        res = PQexec(conn, sql);
+        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            pgfdw_report_error(ERROR, res, conn, true, sql);
+        PQclear(res);
+
+        /*
+         * Snapshot setup for parallel query. If the snapshot id could be get
+         * from this server, this should succeed on the same server.
+         */
+        snprintf(sql, 64, "SET TRANSACTION SNAPSHOT \'%s\'", entry->snapshot_id);
+        res = PQexec(conn, sql);
+        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            pgfdw_report_error(ERROR, res, conn, true, sql);
+        PQclear(res);
+
+        /* This list should be in the same context with connection cache */
+        oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+        entry->extraconns = lappend(entry->extraconns, conn);
+        MemoryContextSwitchTo(oldcontext);
+        retconn = conn;
+    }
+
+    elog(DEBUG3, "Get %s connection (parallels %d): %p",
+         parallel ? "parallel" : "base", list_length(entry->extraconns),
+         retconn);
+
+    return retconn;
+}
+
+
+/* 
+ * Reserve a room for parallel connection up to aux_conn_limit.
+ */
+bool
+ReserveParallelConnection(ForeignServer *server, UserMapping *user,
+                          int aux_conn_limit)
+{
+    ConnCacheEntry *entry;
+
+    if (!user)
+        return false;
+
+    entry = search_cache_entry(server, user);
+
+    if (entry->reserved_aux_conns >= aux_conn_limit)
+        return false;
+
+    entry->reserved_aux_conns++;
+    return true;
+}
+
+/*
+ * Reset reserved number of parallel connections
+ */
+void
+ResetReservedParallelConnections(ForeignServer *server, UserMapping *user)
+{
+    ConnCacheEntry *entry;
+
+    if (!user)
+        return false;
+
+    entry = search_cache_entry(server, user);
+
+    entry->reserved_aux_conns = 0;
+    return;}/*
@@ -378,6 +487,7 @@ begin_remote_xact(ConnCacheEntry *entry)    if (entry->xact_depth <= 0)    {        const char
*sql;
+        PGresult *res;        elog(DEBUG3, "starting remote transaction on connection %p",             entry->conn);
@@ -388,6 +498,30 @@ begin_remote_xact(ConnCacheEntry *entry)            sql = "START TRANSACTION ISOLATION LEVEL
REPEATABLEREAD";        do_sql_command(entry->conn, sql);        entry->xact_depth = 1;
 
+
+        /* 
+         * To avoid error on the remote server, check if we can call
+         * pg_export_snapshot() previously.
+         */
+        sql = "SELECT count(*) FROM pg_proc p JOIN pg_namespace n ON "
+                     "(n.oid = p.pronamespace AND n.nspname = 'pg_catalog')"
+            "WHERE p.proname = 'pg_export_snapshot' AND pronargs = 0";
+        res = PQexec(entry->conn, sql);
+        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+            pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+        if (strcmp(PQgetvalue(res, 0, 0), "1") == 0)
+        {
+            /* Get transaction snapshot if we can */
+            PQclear(res);
+            sql = "SELECT pg_export_snapshot()";
+            res = PQexec(entry->conn, sql);
+            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+                pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+            entry->snapshot_id = strdup(PQgetvalue(res, 0, 0));
+            PQclear(res);
+        }
+        else
+            PQclear(res);    }    /*
@@ -406,16 +540,92 @@ begin_remote_xact(ConnCacheEntry *entry)}/*
- * Release connection reference count created by calling GetConnection.
+ * Close all auxiliary connections if any.
+ */
+static void
+ReleaseAllAuxConnections(ConnCacheEntry *entry)
+{
+    ListCell *lc;
+
+    foreach(lc, entry->extraconns)
+    {
+        PGconn *c = (PGconn *) lfirst(lc);
+        PQfinish(c);
+    }
+    list_free(entry->extraconns);
+    entry->extraconns = NIL;
+}
+
+/*
+ * Release connection if required. All connections but the base connection
+ * should be release immediately. */void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(ForeignServer *server, UserMapping *user, PGconn *conn){
+    ConnCacheEntry *entry;
+    ConnCacheKey key;
+
+    /* Clean up current asynchronous query if any */
+    while (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+    {
+        PGresult *res = PQgetResult(conn);
+        if (res)
+            PQclear(res);
+    }        
+
+    /* Create hash key for the entry.  Assume no pad bytes in key struct */
+    key.serverid = server->serverid;
+    key.userid = user->userid;
+    /*
-     * Currently, we don't actually track connection references because all
-     * cleanup is managed on a transaction or subtransaction basis instead. So
-     * there's nothing to do here.
+     * Find cached entry for the connection.     */
+    entry = hash_search(ConnectionHash, &key, HASH_FIND, NULL);
+
+    if (!entry)
+        elog(WARNING, "Inconsistent connection release in postgres_fdw.");
+    else
+    {
+        if (entry->conn == conn) {
+            ReleaseAllAuxConnections(entry);
+
+            /*
+             * Don't reset reserved number before auxiliary connections are
+             * actually made
+             */
+            if (entry->extraconns)
+                entry->reserved_aux_conns = 0;
+        }
+        else
+        {
+            ListCell *lc;
+            PGconn   *c = NULL;
+
+            /* Find  */
+            foreach(lc, entry->extraconns)
+            {
+                c = (PGconn *) lfirst(lc);
+                if (conn == c) break;
+            }
+
+            if (!lc)
+            {
+                /* XXXX: This is basically a bug, but simply ignore it... */
+                elog(WARNING, "Unknown connection is tried to release. Ignore it.");
+            }
+            else
+            {
+                /* Close the found connection and remove it from the list */
+                do_sql_command(c, "COMMIT TRANSACTION");
+                PQfinish(c);
+                elog(DEBUG3, "Parallel connection closed: %p", c);
+                entry->extraconns = list_delete_ptr(entry->extraconns, c);
+                if (entry->reserved_aux_conns > 0) 
+                    entry->reserved_aux_conns--;
+            }
+        }
+    }}/*
@@ -571,6 +781,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)                        res = PQexec(entry->conn,
"DEALLOCATEALL");                        PQclear(res);                    }
 
+                    entry->snapshot_id = NULL;                    entry->have_prep_stmt = false;
entry->have_error= false;                    break;
 
@@ -612,6 +823,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)                            res =
PQexec(entry->conn,"DEALLOCATE ALL");                            PQclear(res);                        }
 
+                        entry->snapshot_id = NULL;                        entry->have_prep_stmt = false;
        entry->have_error = false;                    }
 
@@ -619,6 +831,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)            }        }
+        ReleaseAllAuxConnections(entry);        /* Reset state to show we're out of a transaction */
entry->xact_depth= 0;
 
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 65e7b89..5323d10 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -150,6 +150,7 @@ InitPgFdwOptions(void)        /* cost factors */        {"fdw_startup_cost",
ForeignServerRelationId,false},        {"fdw_tuple_cost", ForeignServerRelationId, false},
 
+        {"max_aux_connections", ForeignServerRelationId, false},        /* updatable is available on both server and
table*/        {"updatable", ForeignServerRelationId, false},        {"updatable", ForeignTableRelationId, false},
 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 4c49776..3fd60e1 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -76,7 +76,8 @@ typedef struct PgFdwRelationInfo    /* Cached catalog information. */    ForeignTable *table;
ForeignServer*server;
 
-    UserMapping *user;            /* only set in use_remote_estimate mode */
+    UserMapping  *user;            /* only set in use_remote_estimate mode */
+    bool          parallel;        /* true if this rel can be scanned in parallel */} PgFdwRelationInfo;/*
@@ -136,6 +137,10 @@ typedef struct PgFdwScanState    /* for remote query execution */    PGconn       *conn;
/* connection for the scan */
 
+    ForeignServer *server;        /* The foreign server this scan based on */
+    UserMapping      *user;        /* The user this scan done by */
+    bool        parallel;        /* ture if this scan is allowed to run in
+                                 * parallel */    unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool       cursor_exists;    /* have we created the cursor? */    int            numParams;        /* number of
parameterspassed to query */
 
@@ -145,6 +150,7 @@ typedef struct PgFdwScanState    /* for storing result tuples */    HeapTuple  *tuples;
/*array of currently-retrieved tuples */
 
+    int            tupplane;        /* Which tuples currently read */    int            num_tuples;        /* # of
tuplesin array */    int            next_tuple;        /* index of next one to return */
 
@@ -168,6 +174,8 @@ typedef struct PgFdwModifyState    /* for remote query execution */    PGconn       *conn;
 /* connection for the scan */    char       *p_name;            /* name of prepared statement, if created */
 
+    ForeignServer *server;        /* The foreign server this scan based on */
+    UserMapping      *user;        /* The user this scan done by */    /* extracted fdw_private data */    char
*query;           /* text of INSERT/UPDATE/DELETE command */
 
@@ -306,7 +314,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass*ec, EquivalenceMember *em,                          void *arg);static void
create_cursor(ForeignScanState*node);
 
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool async);static void close_cursor(PGconn *conn, unsigned int
cursor_number);staticvoid prepare_foreign_modify(PgFdwModifyState *fmstate);static const char
**convert_prep_stmt_params(PgFdwModifyState*fmstate,
 
@@ -328,6 +336,10 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,                           MemoryContext
temp_context);staticvoid conversion_error_callback(void *arg);
 
+/* PQtransactionStatus returns 2 state for IDLE condition */
+#define CONN_IS_IDLE(conn) \
+    (PQtransactionStatus(conn) == PQTRANS_IDLE || \
+     PQtransactionStatus(conn) == PQTRANS_INTRANS)/* * Foreign-data wrapper handler function: return a struct with
pointers
@@ -384,6 +396,7 @@ postgresGetForeignRelSize(PlannerInfo *root,{    PgFdwRelationInfo *fpinfo;    ListCell   *lc;
+    unsigned long max_aux_connections = 0;    /*     * We use PgFdwRelationInfo to pass various information to
subsequent
@@ -414,6 +427,8 @@ postgresGetForeignRelSize(PlannerInfo *root,            fpinfo->fdw_startup_cost =
strtod(defGetString(def),NULL);        else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
fpinfo->fdw_tuple_cost= strtod(defGetString(def), NULL); 
+        else if (strcmp(def->defname, "max_aux_connections") == 0)
+            max_aux_connections = strtol(defGetString(def), NULL, 10);    }    foreach(lc, fpinfo->table->options)
{
@@ -442,6 +457,11 @@ postgresGetForeignRelSize(PlannerInfo *root,    else        fpinfo->user = NULL;
+    /* Check if this foregn relation has the auxiliary connection available.*/
+    fpinfo->parallel =
+        ReserveParallelConnection(fpinfo->server, fpinfo->user,
+                                  max_aux_connections);
+    /*     * Identify which baserestrictinfo clauses can be sent to the remote     * server and which can't.
@@ -558,6 +578,7 @@ postgresGetForeignPaths(PlannerInfo *root,                                   fpinfo->total_cost,
                              NIL, /* no pathkeys */                                   NULL,        /* no outer rel
either*/
 
+                                   fpinfo->parallel,                                   NIL);        /* no fdw_private
list*/    add_path(baserel, (Path *) path);
 
@@ -725,6 +746,7 @@ postgresGetForeignPaths(PlannerInfo *root,                                       total_cost,
                              NIL,        /* no pathkeys */
param_info->ppi_req_outer,
+                                       fpinfo->parallel,                                       NIL);    /* no
fdw_privatelist */        add_path(baserel, (Path *) path);    }
 
@@ -752,6 +774,9 @@ postgresGetForeignPlan(PlannerInfo *root,    StringInfoData sql;    ListCell   *lc;
+    /* reset reserved number of auxiliary connections */
+    ResetReservedParallelConnections(fpinfo->server, fpinfo->user);
+    /*     * Separate the scan_clauses into those that can be executed remotely and     * those that can't.
baserestrictinfoclauses that were previously
 
@@ -866,6 +891,7 @@ postgresGetForeignPlan(PlannerInfo *root,    return make_foreignscan(tlist,
  local_exprs,                            scan_relid,
 
+                            best_path->path.parallel,                            params_list,
 fdw_private);}
 
@@ -888,6 +914,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)    int            numParams;    int
       i;    ListCell   *lc;
 
+    bool        parallel = fsplan->scan.parallel;    /*     * Do nothing in EXPLAIN (no ANALYZE) case.
node->fdw_statestays NULL.
 
@@ -918,7 +945,20 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)     * Get connection to the foreign
server. Connection manager will     * establish new connection if necessary.     */
 
-    fsstate->conn = GetConnection(server, user, false);
+    fsstate->conn = GetConnection(server, user, false, parallel);
+    if (parallel && fsstate->conn == NULL)
+    {
+        /*
+         * Somehow no more auxiliary connection available, so this relation is
+         * scanned in the base connection sequentially.
+         */
+        elog(WARNING, "Failed to get parallel connection for foreign relation %d. Fall back to base connection.",
+             RelationGetRelid(fsstate->rel));
+        parallel = false;
+        fsstate->conn = GetConnection(server, user, false, false);        
+    }
+    fsstate->server = server;
+    fsstate->user = user;    /* Assign a unique ID for my cursor */    fsstate->cursor_number =
GetCursorNumber(fsstate->conn);
@@ -981,6 +1021,17 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)        fsstate->param_values =
(constchar **) palloc0(numParams * sizeof(char *));    else        fsstate->param_values = NULL;
 
+
+    fsstate->parallel = parallel;
+    if (parallel)
+    {
+        /* 
+         * This connection is allowed asynchronous query execution, so start
+         * it just now. 
+         */
+        create_cursor(node);
+        fetch_more_data(node, true);
+    }}/*
@@ -1008,7 +1059,7 @@ postgresIterateForeignScan(ForeignScanState *node)    {        /* No point in another fetch if we
alreadydetected EOF, though. */        if (!fsstate->eof_reached)
 
-            fetch_more_data(node);
+            fetch_more_data(node, fsstate->parallel);        /* If we didn't get any tuples, must be end of data. */
    if (fsstate->next_tuple >= fsstate->num_tuples)            return ExecClearTuple(slot);
 
@@ -1099,7 +1150,7 @@ postgresEndForeignScan(ForeignScanState *node)        close_cursor(fsstate->conn,
fsstate->cursor_number);   /* Release remote connection */
 
-    ReleaseConnection(fsstate->conn);
+    ReleaseConnection(fsstate->server, fsstate->user, fsstate->conn);    fsstate->conn = NULL;    /* MemoryContexts
willbe deleted automatically. */
 
@@ -1301,7 +1352,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate,    user = GetUserMapping(userid,
server->serverid);   /* Open connection; report that we'll create a prepared statement. */
 
-    fmstate->conn = GetConnection(server, user, true);
+    fmstate->conn = GetConnection(server, user, true, false);
+    fmstate->server = server;
+    fmstate->user = user;    fmstate->p_name = NULL;        /* prepared statement not made yet */    /* Deconstruct
fdw_privatedata. */
 
@@ -1599,7 +1652,7 @@ postgresEndForeignModify(EState *estate,    }    /* Release remote connection */
-    ReleaseConnection(fmstate->conn);
+    ReleaseConnection(fmstate->server, fmstate->user, fmstate->conn);    fmstate->conn = NULL;}
@@ -1751,10 +1804,11 @@ estimate_path_cost_size(PlannerInfo *root,                              (fpinfo->remote_conds
==NIL), NULL);        /* Get the remote estimate */
 
-        conn = GetConnection(fpinfo->server, fpinfo->user, false);
+        conn = GetConnection(fpinfo->server, fpinfo->user, false, false);
+        get_remote_estimate(sql.data, conn, &rows, &width,                            &startup_cost, &total_cost);
-        ReleaseConnection(conn);
+        ReleaseConnection(fpinfo->server, fpinfo->user, conn);        retrieved_rows = rows;
@@ -2001,10 +2055,12 @@ create_cursor(ForeignScanState *node)}/*
- * Fetch some more rows from the node's cursor.
+ * Fetch some more rows from the node's cursor. async indicates that this
+ * query runs on the dedicated connection and requested asynchronous query
+ * execution. */static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool async){    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 PGresult   *volatile res = NULL;
 
@@ -2026,6 +2082,7 @@ fetch_more_data(ForeignScanState *node)        int            fetch_size;        int
numrows;       int            i;
 
+        bool        skip_get_result = false;        /* The fetch size is arbitrary, but shouldn't be enormous. */
 fetch_size = 100;
 
@@ -2033,36 +2090,81 @@ fetch_more_data(ForeignScanState *node)        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
              fetch_size, fsstate->cursor_number);
 
-        res = PQexec(conn, sql);
-        /* On error, report the original query, not the FETCH. */
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+        if (async && CONN_IS_IDLE(conn))
+        {
+            if (!PQsendQuery(conn, sql))
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-        /* Convert the data into HeapTuples */
-        numrows = PQntuples(res);
-        fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-        fsstate->num_tuples = numrows;
-        fsstate->next_tuple = 0;
+            /*
+             * IDLE connection state on async query means that this is the
+             * first call for this query, so return immediately. See
+             * postgresBeginForeignScan()
+             */
+            skip_get_result = true;
+        }
-        for (i = 0; i < numrows; i++)
+        if (!skip_get_result)        {
-            fsstate->tuples[i] =
-                make_tuple_from_result_row(res, i,
-                                           fsstate->rel,
-                                           fsstate->attinmeta,
-                                           fsstate->retrieved_attrs,
-                                           fsstate->temp_cxt);
-        }
+            if (async)
+            {
+                res = PQgetResult(conn);
+                if (PQntuples(res) == fetch_size)
+                {
+                    /*
+                     * Connection state doesn't go to IDLE even if all data
+                     * has been sent to client for asynchronous query. One
+                     * more PQgetResult() is needed to reset the state to
+                     * IDLE.  See PQexecFinish() for details.
+                     */
+                    if (PQgetResult(conn) != NULL)
+                        elog(ERROR, "Connection status error.");
+                        
+                }
+            }
+            else
+                res = PQexec(conn, sql);
-        /* Update fetch_ct_2 */
-        if (fsstate->fetch_ct_2 < 2)
-            fsstate->fetch_ct_2++;
+            /* On error, report the original query, not the FETCH. */
+            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-        /* Must be EOF if we didn't get as many tuples as we asked for. */
-        fsstate->eof_reached = (numrows < fetch_size);
+            /* Convert the data into HeapTuples */
+            numrows = PQntuples(res);
+            fsstate->tuples =
+                    (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+            fsstate->num_tuples = numrows;
+            fsstate->next_tuple = 0;
-        PQclear(res);
-        res = NULL;
+            for (i = 0; i < numrows; i++)
+            {
+                fsstate->tuples[i] =
+                    make_tuple_from_result_row(res, i,
+                                               fsstate->rel,
+                                               fsstate->attinmeta,
+                                               fsstate->retrieved_attrs,
+                                               fsstate->temp_cxt);
+            }
+
+            /* Update fetch_ct_2 */
+            if (fsstate->fetch_ct_2 < 2)
+                fsstate->fetch_ct_2++;
+
+            /* Must be EOF if we didn't get as many tuples as we asked for. */
+            fsstate->eof_reached = (numrows < fetch_size);
+
+            PQclear(res);
+            res = NULL;
+        }
+
+        if (async && !skip_get_result && !fsstate->eof_reached)
+        {
+            /*
+             * We can immediately request the next bunch of tuples if we're on
+             * asynchronous connection.
+             */
+            if (!PQsendQuery(conn, sql))
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+        }    }    PG_CATCH();    {
@@ -2315,8 +2417,7 @@ postgresAnalyzeForeignTable(Relation relation,    table =
GetForeignTable(RelationGetRelid(relation));   server = GetForeignServer(table->serverid);    user =
GetUserMapping(relation->rd_rel->relowner,server->serverid);
 
-    conn = GetConnection(server, user, false);
-
+    conn = GetConnection(server, user, false, false);    /*     * Construct command to get page count for relation.
*/
 
@@ -2345,7 +2446,7 @@ postgresAnalyzeForeignTable(Relation relation,    }    PG_END_TRY();
-    ReleaseConnection(conn);
+    ReleaseConnection(server, user, conn);    return true;}
@@ -2407,7 +2508,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,    table =
GetForeignTable(RelationGetRelid(relation));   server = GetForeignServer(table->serverid);    user =
GetUserMapping(relation->rd_rel->relowner,server->serverid);
 
-    conn = GetConnection(server, user, false);
+    conn = GetConnection(server, user, false, false);    /*     * Construct cursor that retrieves whole rows from
remote.
@@ -2479,7 +2580,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,    }    PG_END_TRY();
-    ReleaseConnection(conn);
+    ReleaseConnection(server, user, conn);    /* We assume that we have no dead tuple. */    *totaldeadrows = 0.0;
@@ -2609,7 +2710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)     */    server =
GetForeignServer(serverOid);   mapping = GetUserMapping(GetUserId(), server->serverid);
 
-    conn = GetConnection(server, mapping, false);
+    conn = GetConnection(server, mapping, false, false);    /* Don't attempt to import collation if remote server
hasn'tgot it */    if (PQserverVersion(conn) < 90100)
 
@@ -2826,7 +2927,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)    }    PG_END_TRY();
-    ReleaseConnection(conn);
+    ReleaseConnection(server, mapping, conn);    return commands;}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 94eadae..387b2ff 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -25,10 +25,16 @@ extern int    set_transmission_modes(void);extern void reset_transmission_modes(int nestlevel);/*
inconnection.c */
 
+extern bool ReserveParallelConnection(ForeignServer *server, UserMapping *user,
+                                      int aux_conn_limit);
+extern void ResetReservedParallelConnections(ForeignServer *server,
+                                             UserMapping *user);extern PGconn *GetConnection(ForeignServer *server,
UserMapping*user,
 
-              bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
+                             bool will_prep_stmt, bool parallel);
+extern void ReleaseConnection(ForeignServer *server, UserMapping *user,
+                              PGconn *conn);extern unsigned int GetCursorNumber(PGconn *conn);
+extern int GetServerOrdinate(Oid serverid);extern unsigned int GetPrepStmtNumber(PGconn *conn);extern void
pgfdw_report_error(intelevel, PGresult *res, PGconn *conn,                   bool clear, const char *sql); 
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 5a4d5aa..7a12542 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -524,6 +524,7 @@ fileGetForeignPaths(PlannerInfo *root,                                     total_cost,
                      NIL,        /* no pathkeys */                                     NULL,        /* no outer rel
either*/
 
+                                     false,                                     coptions));    /*
@@ -560,6 +561,7 @@ fileGetForeignPlan(PlannerInfo *root,    return make_foreignscan(tlist,
scan_clauses,                           scan_relid,
 
+                            best_path->path.parallel,                            NIL,    /* no expressions to evaluate
*/                           best_path->fdw_private);} 
-- DROP SERVER IF EXISTS pgs1 CASCADE;
-- DROP VIEW IF EXISTS v CASCADE;
-- DROP TABLE IF EXISTS t CASCADE;

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate
'true',max_aux_connections '3');
 

CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;

CREATE TABLE t (a int, b int, c text);
ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() * 1000)::int) FROM generate_series(0,
2999));

CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY
a.bLIMIT 10;
 

CREATE FOREIGN TABLE fts1 (a int, b int, c text) SERVER pgs1 OPTIONS (table_name 't');
CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');

SET parallel_ratio_threshold to 1.0;

EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1 b on (a.a = b.a);

SET parallel_ratio_threshold to 0.0;

EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1 b on (a.a = b.a);

Re: Introducing coarse grain parallelism by postgres_fdw.

From
Kyotaro HORIGUCHI
Date:
Hello,

> Hello, this is the new version which is complete to some extent
> of parallelism based on postgres_fdw.
> 
> This compares the costs for parallel and non-parallel execution
> and choose parallel one if it is faster by some extent specified
> by GUCs. The attached files are,
> 
>  0001_parallel_exec_planning_v0.patch:
>    - PostgreSQL body stuff for parallel execution planning.
> 
>  0002_enable_postgres_fdw_to_run_in_parallel_v0.patch:
>    - postgres_fdw parallelization.
> 
>  0003_file_fdw_changes_to_avoid_error.patch:
>    - error avoidig stuff for file_fdw (not necessary for this patch)
> 
>  env.sql:
>    - simple test script to try this patch.
> 
> =====
> 
>  - planner stuff to handle cost of parallel execution. Including
>    indication of parallel execution.
> 
>  - GUCs to control how easy to go parallel.
> 
>    parallel_cost_threshold is the threshold of path total cost
>    where to enable parallel execution.
> 
>    prallel_ratio_threshond is the threshold of the ratio of
>    parallel cost to non-parallel cost where to choose the
>    parallel path.
> 
>  - postgres_fdw which can run in multiple sessions using snapshot
>    export and fetches in parallel for foreign scans on dedicated
>    connections.

But now the effect of async execution of FETCH'es is omitted
during planning.

>    foreign server has a new option 'max_aux_connections', which
>    limits the number of connections for parallel execution per
>    (server, user) pairs.
> 
>  - change file_fdw to follow the changes of planner stuff.
> 
> 
> Whth the patch attached, the attached sql script shows the
> following result (after some line breaks are added).
> 
> postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c
>            FROM fvs1 a join fvs1_2 b on (a.a = b.a);
>                                 QUERY PLAN
> ----------------------------------------------------------------------------
> Hash Join  (cost=9573392.96..9573393.34 rows=1 width=40 parallel)
>            (actual time=2213.400..2213.407 rows=12 loops=1)
>  Hash Cond: (a.a = b.a)
>  ->  Foreign Scan on fvs1 a
>            (cost=9573392.96..9573393.29 rows=10 width=8 parallel)
>            (actual time=2199.992..2199.993 rows=10 loops=1)
>  ->  Hash  (cost=9573393.29..9573393.29 rows=10 width=36)
>            (actual time=13.388..13.388 rows=10 loops=1)
>        Buckets: 1024  Batches: 1  Memory Usage: 6kB
>        ->  Foreign Scan on fvs1_2 b 
>                    (cost=9573392.96..9573393.29 rows=10 width=36 parallel)
>                    (actual time=13.376..13.379 rows=10 loops=1)
>  Planning time: 4.761 ms
>  Execution time: 2227.462 ms
> (8 rows)
> postgres=# SET parallel_ratio_threshold to 0.0;
> postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c
>            FROM fvs1 a join fvs1 b on (a.a = b.a);
>                                 QUERY PLAN
> ------------------------------------------------------------------------------
>  Hash Join  (cost=318084.32..318084.69 rows=1 width=40)
>             (actual time=4302.913..4302.928 rows=12 loops=1)
>    Hash Cond: (a.a = b.a)
>    ->  Foreign Scan on fvs1 a  (cost=159041.93..159042.26 rows=10 width=8)
>                                (actual time=2122.989..2122.992 rows=10 loops=1)
>    ->  Hash  (cost=159042.26..159042.26 rows=10 width=500)
>              (actual time=2179.900..2179.900 rows=10 loops=1)
>          Buckets: 1024  Batches: 1  Memory Usage: 6kB
>          ->  Foreign Scan on fvs1 b
>                    (cost=159041.93..159042.26 rows=10 width=500)
>                    (actual time=2179.856..2179.864 rows=10 loops=1)
>  Planning time: 5.085 ms
>  Execution time: 4303.728 ms
> (8 rows)
> 
> Where, "parallel" indicates that the node includes nodes run in
> parallel. The latter EXPLAIN shows the result when parallel
> execution is inhibited.
> 
> Since the lack of time, sorry that the details for this patch is
> comming later.
> 
> Is there any suggestions or opinions?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Introducing coarse grain parallelism by postgres_fdw.

From
Ashutosh Bapat
Date:
Hi Kyotaro,
I looked at the patches and felt that the approach taken here is too intrusive, considering that the feature is only for foreign scans.

There are quite a few members added to the generic Path, Plan structures, whose use is is induced only through foreign scans. Each path now stores two sets of costs, one with parallelism and one without. The parallel values will make sense only when there is a foreign scan, which uses parallelism, in the plan tree. So, those costs are maintained unnecessarily or the memory for those members is wasted in most of the cases, where the tables involved are not foreign. Also, not many foreign tables will be able to use the parallelism, e.g. file_fdw. Although, that's my opinion; I would like hear from others.

Instead, an FDW which can use parallelism can add two paths one with and one without parallelism with appropriate costs and let the logic choosing the cheapest path take care of the actual choice. In fact, I thought, parallelism would be always faster than the non-parallel one, except when the foreign server is too much loaded. But we won't be able to check that anyway. Can you point out a case where the parallelism may not win over serial execution?

BTW, the name parallelism seems to be misleading here. All, it will be able to do is fire the queries (or data fetch requests) asynchronously. So, we might want to change the naming appropriately.


On Fri, Aug 1, 2014 at 2:48 PM, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Hello,

> Hello, this is the new version which is complete to some extent
> of parallelism based on postgres_fdw.
>
> This compares the costs for parallel and non-parallel execution
> and choose parallel one if it is faster by some extent specified
> by GUCs. The attached files are,
>
>  0001_parallel_exec_planning_v0.patch:
>    - PostgreSQL body stuff for parallel execution planning.
>
>  0002_enable_postgres_fdw_to_run_in_parallel_v0.patch:
>    - postgres_fdw parallelization.
>
>  0003_file_fdw_changes_to_avoid_error.patch:
>    - error avoidig stuff for file_fdw (not necessary for this patch)
>
>  env.sql:
>    - simple test script to try this patch.
>
> =====
>
>  - planner stuff to handle cost of parallel execution. Including
>    indication of parallel execution.
>
>  - GUCs to control how easy to go parallel.
>
>    parallel_cost_threshold is the threshold of path total cost
>    where to enable parallel execution.
>
>    prallel_ratio_threshond is the threshold of the ratio of
>    parallel cost to non-parallel cost where to choose the
>    parallel path.
>
>  - postgres_fdw which can run in multiple sessions using snapshot
>    export and fetches in parallel for foreign scans on dedicated
>    connections.

But now the effect of async execution of FETCH'es is omitted
during planning.

>    foreign server has a new option 'max_aux_connections', which
>    limits the number of connections for parallel execution per
>    (server, user) pairs.
>
>  - change file_fdw to follow the changes of planner stuff.
>
>
> Whth the patch attached, the attached sql script shows the
> following result (after some line breaks are added).
>
> postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c
>            FROM fvs1 a join fvs1_2 b on (a.a = b.a);
>                                 QUERY PLAN
> ----------------------------------------------------------------------------
> Hash Join  (cost=9573392.96..9573393.34 rows=1 width=40 parallel)
>            (actual time=2213.400..2213.407 rows=12 loops=1)
>  Hash Cond: (a.a = b.a)
>  ->  Foreign Scan on fvs1 a
>            (cost=9573392.96..9573393.29 rows=10 width=8 parallel)
>            (actual time=2199.992..2199.993 rows=10 loops=1)
>  ->  Hash  (cost=9573393.29..9573393.29 rows=10 width=36)
>            (actual time=13.388..13.388 rows=10 loops=1)
>        Buckets: 1024  Batches: 1  Memory Usage: 6kB
>        ->  Foreign Scan on fvs1_2 b
>                    (cost=9573392.96..9573393.29 rows=10 width=36 parallel)
>                    (actual time=13.376..13.379 rows=10 loops=1)
>  Planning time: 4.761 ms
>  Execution time: 2227.462 ms
> (8 rows)
> postgres=# SET parallel_ratio_threshold to 0.0;
> postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c
>            FROM fvs1 a join fvs1 b on (a.a = b.a);
>                                 QUERY PLAN
> ------------------------------------------------------------------------------
>  Hash Join  (cost=318084.32..318084.69 rows=1 width=40)
>             (actual time=4302.913..4302.928 rows=12 loops=1)
>    Hash Cond: (a.a = b.a)
>    ->  Foreign Scan on fvs1 a  (cost=159041.93..159042.26 rows=10 width=8)
>                                (actual time=2122.989..2122.992 rows=10 loops=1)
>    ->  Hash  (cost=159042.26..159042.26 rows=10 width=500)
>              (actual time=2179.900..2179.900 rows=10 loops=1)
>          Buckets: 1024  Batches: 1  Memory Usage: 6kB
>          ->  Foreign Scan on fvs1 b
>                    (cost=159041.93..159042.26 rows=10 width=500)
>                    (actual time=2179.856..2179.864 rows=10 loops=1)
>  Planning time: 5.085 ms
>  Execution time: 4303.728 ms
> (8 rows)
>
> Where, "parallel" indicates that the node includes nodes run in
> parallel. The latter EXPLAIN shows the result when parallel
> execution is inhibited.
>
> Since the lack of time, sorry that the details for this patch is
> comming later.
>
> Is there any suggestions or opinions?

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center



--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

Re: Introducing coarse grain parallelism by postgres_fdw.

From
Robert Haas
Date:
On Tue, Aug 5, 2014 at 7:05 AM, Ashutosh Bapat
<ashutosh.bapat@enterprisedb.com> wrote:
> There are quite a few members added to the generic Path, Plan structures,
> whose use is is induced only through foreign scans. Each path now stores two
> sets of costs, one with parallelism and one without. The parallel values
> will make sense only when there is a foreign scan, which uses parallelism,
> in the plan tree. So, those costs are maintained unnecessarily or the memory
> for those members is wasted in most of the cases, where the tables involved
> are not foreign.

Yeah, I don't think that's going to be acceptable.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Introducing coarse grain parallelism by postgres_fdw.

From
Kyotaro HORIGUCHI
Date:
Hi, thank you for the comment.

> Hi Kyotaro,
> I looked at the patches and felt that the approach taken here is too
> intrusive, considering that the feature is only for foreign scans.

I agree to you premising that it's only for foreign scans but I
regard it as an example of parallel execution planning.

> There are quite a few members added to the generic Path, Plan structures,
> whose use is is induced only through foreign scans. Each path now stores
> two sets of costs, one with parallelism and one without. The parallel
> values will make sense only when there is a foreign scan, which uses
> parallelism, in the plan tree. So, those costs are maintained unnecessarily
> or the memory for those members is wasted in most of the cases, where the
> tables involved are not foreign. Also, not many foreign tables will be able
> to use the parallelism, e.g. file_fdw. Although, that's my opinion; I would
> like hear from others.

I intended to discuss what the estimation and planning for
parallel exexution (not limited to foreign scan) would be
like. Backgroud worker would be able to take on executing some
portion of path tree in 'parallel'. The postgres_fdw for this
patch is simply a case in planning of parallel
executions. Although, as you see, it does only choosing whether
to go parallel for the path constructed regardless of parallel
execution but thinking of the possible alternate paths of
parallel execution will cost too much.

Limiting to parallel scans for this discussion, the overall gain
by multiple simultaneous scans distributed in path/plan tree
won't be known before cost counting is done up to the root node
(more precisely the common parent of them). This patch foolishly
does bucket brigade of parallel cost up to root node, but there
should be smarter way to shortcut it, for example, simplly
picking up parallelizable nodes by scanning completed path/plan
tree and calculate the probably-eliminable costs from them, then
subtract it from or compare to the total (nonparallel) cost. This
might be more acceptable for everyone than current implement.

> Instead, an FDW which can use parallelism can add two paths one with and
> one without parallelism with appropriate costs and let the logic choosing
> the cheapest path take care of the actual choice. In fact, I thought,
> parallelism would be always faster than the non-parallel one, except when
> the foreign server is too much loaded. But we won't be able to check that
> anyway. Can you point out a case where the parallelism may not win over
> serial execution?

It always wins against serial execution if parallel execution can
launched with no extra cost. But actually it costs extra resource
so I thought that parallel execution should be curbed for small
gain. It's the two GUCs added by this patch and what
choose_parallel_scans() does, although in non-automated way. The
overloading issue is not a matter confined to parallel execution
but surely it will be more severe since it is less visible and
controllable from users. However, it anyhow would should go to
manual configuration at end.

> BTW, the name parallelism seems to be misleading here. All, it will be able
> to do is fire the queries (or data fetch requests) asynchronously. So, we
> might want to change the naming appropriately.

It is right ragarding what I did exactly to postgres_fdw. But not
allowing all intermedate tuples from child execution nodes in
parallel to be piled up on memory without restriction, I suppose
all 'parallel' execution to be a kind of this 'asynchronous'
startup/fething. As for postgres_fdw, it would look more like
'parallel' (and perhaps more effeicient) by processing queries
using libpq's single-row mode instead of a cursor but the similar
processing takes place under system calls even for the case.


Well, I will try to make the version not including parallel costs
in plan/path structs, and single-row mode for postgres_fdw. I
hope it will go towards anything.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Introducing coarse grain parallelism by postgres_fdw.

From
Ashutosh Bapat
Date:



On Fri, Aug 8, 2014 at 8:54 AM, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Hi, thank you for the comment.

> Hi Kyotaro,
> I looked at the patches and felt that the approach taken here is too
> intrusive, considering that the feature is only for foreign scans.

I agree to you premising that it's only for foreign scans but I
regard it as an example of parallel execution planning.

> There are quite a few members added to the generic Path, Plan structures,
> whose use is is induced only through foreign scans. Each path now stores
> two sets of costs, one with parallelism and one without. The parallel
> values will make sense only when there is a foreign scan, which uses
> parallelism, in the plan tree. So, those costs are maintained unnecessarily
> or the memory for those members is wasted in most of the cases, where the
> tables involved are not foreign. Also, not many foreign tables will be able
> to use the parallelism, e.g. file_fdw. Although, that's my opinion; I would
> like hear from others.

I intended to discuss what the estimation and planning for
parallel exexution (not limited to foreign scan) would be
like. Backgroud worker would be able to take on executing some
portion of path tree in 'parallel'. The postgres_fdw for this
patch is simply a case in planning of parallel
executions. Although, as you see, it does only choosing whether
to go parallel for the path constructed regardless of parallel
execution but thinking of the possible alternate paths of
parallel execution will cost too much.

Limiting to parallel scans for this discussion, the overall gain
by multiple simultaneous scans distributed in path/plan tree
won't be known before cost counting is done up to the root node
(more precisely the common parent of them). This patch foolishly
does bucket brigade of parallel cost up to root node, but there
should be smarter way to shortcut it, for example, simplly
picking up parallelizable nodes by scanning completed path/plan
tree and calculate the probably-eliminable costs from them, then
subtract it from or compare to the total (nonparallel) cost. This
might be more acceptable for everyone than current implement.


Planning for parallel execution, would be a much harder problem to solve. Just to give a glimpse, how many worker backends can be spawned depends entirely on the load at the time of execution. For prepared queries, the load condition can change between planning and execution and thus the number of parallel backends, which would decide the actual time of execution and hence cost, can not be estimated at the time of the planning. Mixing this that parallelism with FDW's parallelism would make things even more complicated. I think those two problems are to be solved in different ways.
 
> Instead, an FDW which can use parallelism can add two paths one with and
> one without parallelism with appropriate costs and let the logic choosing
> the cheapest path take care of the actual choice. In fact, I thought,
> parallelism would be always faster than the non-parallel one, except when
> the foreign server is too much loaded. But we won't be able to check that
> anyway. Can you point out a case where the parallelism may not win over
> serial execution?

It always wins against serial execution if parallel execution can
launched with no extra cost. But actually it costs extra resource
so I thought that parallel execution should be curbed for small
gain. It's the two GUCs added by this patch and what
choose_parallel_scans() does, although in non-automated way. The
overloading issue is not a matter confined to parallel execution
but surely it will be more severe since it is less visible and
controllable from users. However, it anyhow would should go to
manual configuration at end.

I am not sure, whether the way this patch provides manual control is really effective or in-effective without understanding the full impact. Do we have any numbers to show the cases, when parallelism would effective and when it would not and how those GUCs help choose the effective one?
 

> BTW, the name parallelism seems to be misleading here. All, it will be able
> to do is fire the queries (or data fetch requests) asynchronously. So, we
> might want to change the naming appropriately.

It is right ragarding what I did exactly to postgres_fdw. But not
allowing all intermedate tuples from child execution nodes in
parallel to be piled up on memory without restriction, I suppose
all 'parallel' execution to be a kind of this 'asynchronous'
startup/fething. As for postgres_fdw, it would look more like
'parallel' (and perhaps more effeicient) by processing queries
using libpq's single-row mode instead of a cursor but the similar
processing takes place under system calls even for the case.


By single mode, do you mean executing FETCH for every row? That wouldn't be efficient, since each row will then incur messaging cost between local and foreign server, which can not be neglected for libpq at least.
 

Well, I will try to make the version not including parallel costs
in plan/path structs, and single-row mode for postgres_fdw. I
hope it will go towards anything.

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center



--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company