Introducing coarse grain parallelism by postgres_fdw. - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Introducing coarse grain parallelism by postgres_fdw.
Date
Msg-id 20140725.173504.267457780.horiguchi.kyotaro@lab.ntt.co.jp
Whole thread Raw
Responses Re: Introducing coarse grain parallelism by postgres_fdw.  (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>)
List pgsql-hackers
Hello, 

I noticed that postgresql_fdw can run in parallel by very small
change. The attached patch let scans by postgres_fdws on
different foreign servers run sumiltaneously. This seems a
convenient entry point to parallel execution.

For the testing configuration which the attched sql script makes,
it almost halves the response time because the remote queries
take far longer startup time than running time. The two foreign
tables fvs1, fvs2 and fvs1_2 are defined on the same table but
fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is
on the another foreign server pgs2.

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);
   QUERY PLAN
 
-----------------------------------------------------------------------
Hash Join (actual time=12083.640..12083.657 rows=16 loops=1) Hash Cond: (a.a = b.a)->  Foreign Scan on fvs1 a (actual
time=6091.405..6091.407rows=10 loops=1)->  Hash (actual time=5992.212..5992.212 rows=10 loops=1)    Buckets: 1024
Batches:1  Memory Usage: 7kB ->  Foreign Scan on fvs1_2 b (actual time=5992.191..5992.198 rows=10 loops=1)Execution
time:12085.330 ms
 
(7 rows)

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
 QUERY PLAN
 
-----------------------------------------------------------------------
Hash Join (actual time=6325.004..6325.019 rows=16 loops=1) Hash Cond: (a.a = b.a)->  Foreign Scan on fvs1 a (actual
time=6324.910..6324.913rows=10 loops=1)->  Hash (actual time=0.073..0.073 rows=10 loops=1)     Buckets: 1024  Batches:
1 Memory Usage: 7kB ->  Foreign Scan on fvs2 b (actual time=0.048..0.052 rows=10 loops=1)Execution time: 6327.708 ms
 
(7 rows)

In turn, pure local query is executed as below..

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM v a join v b on (a.a = b.a);
 QUERY PLAN
 
------------------------------------------------------------------------------Hash Join (actual
time=15757.915..15757.925rows=16 loops=1)  Hash Cond: (a.a = b.a)  ->  Limit (actual time=7795.919..7795.922 rows=10
loops=1)    ->  Sort (actual time=7795.915..7795.915 rows=10 loops=1)        ->  Nested Loop (actual
time=54.769..7795.618rows=252 loops=1)            ->  Seq Scan on t a (actual time=0.010..2.117 rows=5000 loops=1)
     ->  Materialize (actual time=0.000..0.358 rows=5000 loops=5000)               ->  Seq Scan on t b_1 (actual
time=0.004..2.829rows=5000 ...  ->  Hash (actual time=7961.969..7961.969 rows=10 loops=1)     ->  Subquery Scan on b
(actualtime=7961.948..7961.952 rows=10 loops=1)        ->  Limit (actual time=7961.946..7961.950 rows=10 loops=1)
    ->  Sort (actual time=7961.946..7961.948 rows=10 loops=1)               ->  Nested Loop (actual
time=53.518..7961.611rows=252 loops=1)                  ->  Seq Scan on t a_1 (actual time=0.004..2.247 rows=5000...
             ->  Materialize (actual time=0.000..0.357 rows=5000...                     ->  Seq Scan on t b_2 (actual
time=0.001..1.565rows=500..Execution time: 15758.629 ms
 
(26 rows)


I will try this way for the present.

Any opinions or suggestions?

- Is this a correct entry point?

- Parallel postgres_fdw is of course a intermediate shape. Itshould go toward more intrinsic form.

- Planner should be aware of parallelism. The first step seems tobe doable since postgres_fdw can get correct startup
andrunningcosts. But they might should be calculated locally for loopbackconnections finally. Dedicated node would be
needed.

- The far effective intercommunication means between backendsincluding backend workers (which seems to be discussed
inanotherthread) is needed and this could be the test bench forit.
 

- This patch is the minimal implement to get parallel scanavailable. A facility to exporting/importing execution trees
maypromisefar flexible parallelism. Deparsing is usable toreconstruct partial query?
 

- The means for resource management, especially on number ofbackends is required. This could be done on foreign server
inasimple form for the present. Finally this will be moved intointrinsic loopback connection manager?
 

- Any other points to consider?


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..399ca31 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -411,11 +411,13 @@ begin_remote_xact(ConnCacheEntry *entry)voidReleaseConnection(PGconn *conn){
-    /*
-     * Currently, we don't actually track connection references because all
-     * cleanup is managed on a transaction or subtransaction basis instead. So
-     * there's nothing to do here.
-     */
+    /* Clean up current asynchronous query if any */
+    while (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+    {
+        PGresult *res = PQgetResult(conn);
+        if (res)
+            PQclear(res);
+    }        }/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 4c49776..eec299e 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -306,7 +306,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass*ec, EquivalenceMember *em,                          void *arg);static void
create_cursor(ForeignScanState*node);
 
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool async_start);static void close_cursor(PGconn *conn, unsigned
intcursor_number);static void prepare_foreign_modify(PgFdwModifyState *fmstate);static const char
**convert_prep_stmt_params(PgFdwModifyState*fmstate,
 
@@ -328,6 +328,9 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,                           MemoryContext
temp_context);staticvoid conversion_error_callback(void *arg);
 
+#define CONN_IS_IDLE(conn) \
+    (PQtransactionStatus(conn) == PQTRANS_IDLE || \
+     PQtransactionStatus(conn) == PQTRANS_INTRANS)/* * Foreign-data wrapper handler function: return a struct with
pointers
@@ -981,6 +984,18 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)        fsstate->param_values = (const
char**) palloc0(numParams * sizeof(char *));    else        fsstate->param_values = NULL;
 
+
+    if (CONN_IS_IDLE(fsstate->conn))
+    {
+        /* 
+         * This connection is allowed asynchronous query execution, so start
+         * it now. This relies on the fact that ExecInitForeignScan's share
+         * the same foreign server are executed in the same order with
+         * ExecForeignScan's.
+         */
+        create_cursor(node);
+        fetch_more_data(node, true);
+    }}/*
@@ -1008,7 +1023,7 @@ postgresIterateForeignScan(ForeignScanState *node)    {        /* No point in another fetch if we
alreadydetected EOF, though. */        if (!fsstate->eof_reached)
 
-            fetch_more_data(node);
+            fetch_more_data(node, false);        /* If we didn't get any tuples, must be end of data. */        if
(fsstate->next_tuple>= fsstate->num_tuples)            return ExecClearTuple(slot);
 
@@ -2001,10 +2016,11 @@ create_cursor(ForeignScanState *node)}/*
- * Fetch some more rows from the node's cursor.
+ * Fetch some more rows from the node's cursor. It starts asynchronous query
+ * execution then immediately returns if async_start is true. */static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool async_start){    PgFdwScanState *fsstate = (PgFdwScanState *)
node->fdw_state;   PGresult   *volatile res = NULL;
 
@@ -2033,36 +2049,56 @@ fetch_more_data(ForeignScanState *node)        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
              fetch_size, fsstate->cursor_number);
 
-        res = PQexec(conn, sql);
-        /* On error, report the original query, not the FETCH. */
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-
-        /* Convert the data into HeapTuples */
-        numrows = PQntuples(res);
-        fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-        fsstate->num_tuples = numrows;
-        fsstate->next_tuple = 0;
-
-        for (i = 0; i < numrows; i++)
+        if (async_start)        {
-            fsstate->tuples[i] =
-                make_tuple_from_result_row(res, i,
-                                           fsstate->rel,
-                                           fsstate->attinmeta,
-                                           fsstate->retrieved_attrs,
-                                           fsstate->temp_cxt);
+            Assert(CONN_IS_IDLE(conn));
+
+            if (!PQsendQuery(conn, sql))
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);        }
+        else
+        {
+            if (!CONN_IS_IDLE(conn))
+                res = PQgetResult(conn);
+            /*
+             * Transaction status won't be INTRANS or IDLE before calling
+             * PQgetResult() after all result is received. PQgetResult()
+             * returns NULL for the case.
+             */
-        /* Update fetch_ct_2 */
-        if (fsstate->fetch_ct_2 < 2)
-            fsstate->fetch_ct_2++;
+            if (!res)
+                res = PQexec(conn, sql);
-        /* Must be EOF if we didn't get as many tuples as we asked for. */
-        fsstate->eof_reached = (numrows < fetch_size);
+            /* On error, report the original query, not the FETCH. */
+            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+                pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-        PQclear(res);
-        res = NULL;
+            /* Convert the data into HeapTuples */
+            numrows = PQntuples(res);
+            fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+            fsstate->num_tuples = numrows;
+            fsstate->next_tuple = 0;
+
+            for (i = 0; i < numrows; i++)
+            {
+                fsstate->tuples[i] =
+                    make_tuple_from_result_row(res, i,
+                                               fsstate->rel,
+                                               fsstate->attinmeta,
+                                               fsstate->retrieved_attrs,
+                                               fsstate->temp_cxt);
+            }
+
+            /* Update fetch_ct_2 */
+            if (fsstate->fetch_ct_2 < 2)
+                fsstate->fetch_ct_2++;
+
+            /* Must be EOF if we didn't get as many tuples as we asked for. */
+            fsstate->eof_reached = (numrows < fetch_size);
+
+            PQclear(res);
+            res = NULL;
+        }    }    PG_CATCH();    {
DROP SERVER IF EXISTS pgs1 CASCADE;
DROP SERVER IF EXISTS pgs2 CASCADE;
DROP VIEW IF EXISTS v CASCADE;
DROP TABLE IF EXISTS t CASCADE;

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate
'true');
CREATE SERVER pgs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', dbname 'postgres', use_remote_estimate
'true');

CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;
CREATE USER MAPPING FOR CURRENT_USER SERVER pgs2;

CREATE TABLE t (a int, b int, c text);
ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() * 1000)::int) FROM generate_series(0,
4999));
-- EXPLAIN ANALYZE SELECT * FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;
CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY
a.bLIMIT 10;
 

CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs1_2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs2 (a int, b int, c text, a2 int, b2 int, c2 text) SERVER pgs2 OPTIONS (table_name 'v');


EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);


pgsql-hackers by date:

Previous
From: Albe Laurenz
Date:
Subject: Re: Optimization for updating foreign tables in Postgres FDW
Next
From: Emre Hasegeli
Date:
Subject: Re: Shapes on the regression test for polygon