Thread: [Fwd: dblink patch - Asynchronous queries and parallel execution]

[Fwd: dblink patch - Asynchronous queries and parallel execution]

From
Joe Conway
Date:
I just received this (offlist), and have not had a chance to review it
myself yet, but figured I should post it now in case others want to have
a look and comment or discuss before feature freeze.

If there are no major objections to the concept, I'll take
responsibility to review and commit once I'm through with the "Values
list-of-targetlists" stuff.

(I'm not sure where we finished off with the discussion of PATCHES vs
HACKERS list for this kind of stuff, so I'm going to send another copy
of this to HACKERS without the attachement)

Thanks,

Joe


-------- Original Message --------
Subject: dblink patch - Asynchronous queries and parallel execution
Date: Mon, 24 Jul 2006 12:47:51 +0200
From: Kai Londenberg <K.Londenberg@librics.de>
To: mail@joeconway.com

Hello,

I needed parallel query execution features for a project, and so I
modified the dblink module to add support
for asynchronous query execution.

I thought others might find these features useful as well, therefore I'd
like to contribute this to the
current Postgresql contrib/dblink codebase.

The code is based on the contrib/dblink code included with the current
8.1.4 version of PostgreSQL.
I'm including the entire modified contrib/dblink directory in archived form.

I modified dblink.c and dblink.sql.in, and created the file README.async

Hope you like it, and include it in a possible next version of dblink.

The code still needs some testing and code review. I made it work for
me, but I don't have any
experience writing Postgresql Extensions, and haven't touched C for a while.

The most important thing about this code is that it allows parallel
execution of queries on several
backend databases, and re-joining of their results. This solves a lot of
scalability problems.

This is my corresponding README.async file which describes my additions.

-----
dblink-async patch by Kai Londenberg (K.Londenberg@librics.de)
24.7.2006

All code is licensed under the same terms as the rest of the dblink
code.

SQL Function declarations have been added at the bottom of dblink.sql

Added functions:

int dblink_send_query(connstr text, sql text)
      Sends a query to a remote server for asynchronous execution.

      returns immediately without waiting for results.

          returns 1 on success, or 1 on failure.
      results *must* be fetched by dblink_get_result(connstr)
      a running query may be cancelled by dblink_cancel_query(connstr)


dblink_get_result(connstr text[,bool fail_on_error])
      retrieves the result of a query started by dblink_send_query.

      Blocks until a result gets available.

      This function *must* be called if dblink_send_query returned
      a 1, even on cancelled queries - otherwise the connection
          can't be used anymore.

dblink_get_connections()
      List all open dblink connections by name.
      Returns a comma separated string of all connection names.
      Takes no params

      Example: SELECT string_to_array(dblink_get_connections(), ',');

int dblink_is_busy(connstr)
      returns 1 if connection is busy, 0 if it is not busy.
      If this function returns 0, it is guaranteed that dblink_get_result
      will not block.

text dblink_cancel_query(connstr)
      Cancels a running query on a given connection.
      returns "OK" on success, or an error message on failure.


Examples:

---- Example 1 - Union over parallel executed remote queries --

SELECT dblink_connect('dtest1', 'host=server1 port=5432 dbname=dtest_1
user=duser password=pass');
SELECT * from
   dblink_send_query('dtest1', 'SELECT country_code, city from
world_cities where city like \'fe%\'') as t1;

SELECT dblink_connect('dtest2', 'host=server2 port=5432 dbname=dtest_2
user=duser password=pass');
SELECT * from
   dblink_send_query('dtest2', 'SELECT country_code, city from
world_cities where city like \'fe%\'') as t1;

SELECT dblink_connect('dtest3', 'host=server3 port=5432 dbname=dtest_3
user=duser password=pass');
SELECT * from
   dblink_send_query('dtest3', 'SELECT country_code, city from
world_cities where city like \'fe%\'') as t1;

CREATE TEMPORARY TABLE result AS
(SELECT * from dblink_get_result('dtest1') as t1(country_code text, city
text))
UNION
(SELECT * from dblink_get_result('dtest2') as t2(country_code text, city
text))
UNION
(SELECT * from dblink_get_result('dtest3') as t3(country_code text, city
text))
ORDER by city DESC LIMIT 100;

SELECT dblink_disconnect('dtest1');
SELECT dblink_disconnect('dtest2');
SELECT dblink_disconnect('dtest3');
SELECT * from result;

--- End of Example 1 ---

best regards,

Kai Londenberg




Attachment

Re: [Fwd: dblink patch - Asynchronous queries and parallel

From
Joe Conway
Date:
Joe Conway wrote:
> I just received this (offlist), and have not had a chance to review it
> myself yet, but figured I should post it now in case others want to have
> a look and comment or discuss before feature freeze.
>
> If there are no major objections to the concept, I'll take
> responsibility to review and commit once I'm through with the "Values
> list-of-targetlists" stuff.


Sorry for the delay. I've done some rework to the original code sent by
Kai, mainly to reduce duplication with the existing synchronous case,
and to better fit with the existing docs, regression script, etc. I also
changed the return type of dblink_get_connections() to text[], and added
dblink_error_message().

If it isn't already too late, and there are no objections, I'd like to
commit this in the next day or so.

Kai, please verify that this still works in a similar fashion as your
original.

Thanks,

Joe
? .deps
? dblink.sql
? libdblink.so.0.0
? results
Index: README.dblink
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/README.dblink,v
retrieving revision 1.13
diff -c -r1.13 README.dblink
*** README.dblink    3 Jan 2006 23:45:52 -0000    1.13
--- README.dblink    27 Aug 2006 23:21:15 -0000
***************
*** 7,12 ****
--- 7,13 ----
   * And contributors:
   * Darko Prenosil <Darko.Prenosil@finteh.hr>
   * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
+  * Kai Londenberg (K.Londenberg@librics.de)
   *
   * Copyright (c) 2001-2006, PostgreSQL Global Development Group
   * ALL RIGHTS RESERVED;
***************
*** 31,36 ****
--- 32,40 ----
   */

  Release Notes:
+   27 August 2006
+     - Added async query capability. Original patch by
+       Kai Londenberg (K.Londenberg@librics.de), modified by Joe Conway
    Version 0.7 (as of 25 Feb, 2004)
      - Added new version of dblink, dblink_exec, dblink_open, dblink_close,
        and, dblink_fetch -- allows ERROR on remote side of connection to
***************
*** 85,159 ****

      psql template1 < dblink.sql

!   installs following functions into database template1:
!
!      connection
!      ------------
!      dblink_connect(text) RETURNS text
!        - opens an unnamed connection that will persist for duration of
!          current backend or until it is disconnected
!      dblink_connect(text,text) RETURNS text
!        - opens a named connection that will persist for duration of current
!          backend or until it is disconnected
!      dblink_disconnect() RETURNS text
!        - disconnects the unnamed persistent connection
!      dblink_disconnect(text) RETURNS text
!        - disconnects a named persistent connection
!
!      cursor
!      ------------
!      dblink_open(text,text [, bool fail_on_error]) RETURNS text
!        - opens a cursor using unnamed connection already opened with
!          dblink_connect() that will persist for duration of current backend
!          or until it is closed
!      dblink_open(text,text,text [, bool fail_on_error]) RETURNS text
!        - opens a cursor using a named connection already opened with
!          dblink_connect() that will persist for duration of current backend
!          or until it is closed
!      dblink_fetch(text, int [, bool fail_on_error]) RETURNS setof record
!        - fetches data from an already opened cursor on the unnamed connection
!      dblink_fetch(text, text, int [, bool fail_on_error]) RETURNS setof record
!        - fetches data from an already opened cursor on a named connection
!      dblink_close(text [, bool fail_on_error]) RETURNS text
!        - closes a cursor on the unnamed connection
!      dblink_close(text,text [, bool fail_on_error]) RETURNS text
!        - closes a cursor on a named connection
!
!      query
!      ------------
!      dblink(text,text [, bool fail_on_error]) RETURNS setof record
!        - returns a set of results from remote SELECT query; the first argument
!          is either a connection string, or the name of an already opened
!          persistant connection
!      dblink(text [, bool fail_on_error]) RETURNS setof record
!        - returns a set of results from remote SELECT query, using the unnamed
!          connection already opened with dblink_connect()
!
!      execute
!      ------------
!      dblink_exec(text, text [, bool fail_on_error]) RETURNS text
!        - executes an INSERT/UPDATE/DELETE query remotely; the first argument
!          is either a connection string, or the name of an already opened
!          persistant connection
!      dblink_exec(text [, bool fail_on_error]) RETURNS text
!        - executes an INSERT/UPDATE/DELETE query remotely, using connection
!          already opened with dblink_connect()
!
!      misc
!      ------------
!      dblink_current_query() RETURNS text
!        - returns the current query string
!      dblink_get_pkey(text) RETURNS setof text
!        - returns the field names of a relation's primary key fields
!      dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
!        - builds an insert statement using a local tuple, replacing the
!          selection key field values with alternate supplied values
!      dblink_build_sql_delete(text,int2vector,int2,_text) RETURNS text
!        - builds a delete statement using supplied values for selection
!          key field values
!      dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
!        - builds an update statement using a local tuple, replacing the
!          selection key field values with alternate supplied values

  Documentation:

--- 89,95 ----

      psql template1 < dblink.sql

!   installs dblink functions into database template1

  Documentation:

Index: dblink.c
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.57
diff -c -r1.57 dblink.c
*** dblink.c    11 Jul 2006 16:35:31 -0000    1.57
--- dblink.c    27 Aug 2006 23:21:15 -0000
***************
*** 73,78 ****
--- 73,79 ----
  /*
   * Internal declarations
   */
+ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 691,696 ****
--- 692,717 ----
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
+     return dblink_record_internal(fcinfo, false, false);
+ }
+
+ PG_FUNCTION_INFO_V1(dblink_send_query);
+ Datum
+ dblink_send_query(PG_FUNCTION_ARGS)
+ {
+     return dblink_record_internal(fcinfo, true, false);
+ }
+
+ PG_FUNCTION_INFO_V1(dblink_get_result);
+ Datum
+ dblink_get_result(PG_FUNCTION_ARGS)
+ {
+     return dblink_record_internal(fcinfo, true, true);
+ }
+
+ static Datum
+ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
+ {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
      int            call_cntr;
***************
*** 723,850 ****
           */
          oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

!         if (PG_NARGS() == 3)
          {
!             /* text,text,bool */
!             DBLINK_GET_CONN;
!             sql = GET_STR(PG_GETARG_TEXT_P(1));
!             fail = PG_GETARG_BOOL(2);
!         }
!         else if (PG_NARGS() == 2)
!         {
!             /* text,text or text,bool */
!             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
              {
                  conn = pconn->conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
-                 fail = PG_GETARG_BOOL(1);
              }
              else
              {
                  DBLINK_GET_CONN;
!                 sql = GET_STR(PG_GETARG_TEXT_P(1));
              }
          }
!         else if (PG_NARGS() == 1)
          {
!             /* text */
!             conn = pconn->conn;
!             sql = GET_STR(PG_GETARG_TEXT_P(0));
          }
-         else
-             /* shouldn't happen */
-             elog(ERROR, "wrong number of arguments");

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         res = PQexec(conn, sql);
!         if (!res ||
!             (PQresultStatus(res) != PGRES_COMMAND_OK &&
!              PQresultStatus(res) != PGRES_TUPLES_OK))
          {
!             if (fail)
!                 DBLINK_RES_ERROR("sql error");
              else
              {
!                 DBLINK_RES_ERROR_AS_NOTICE("sql error");
!                 if (freeconn)
!                     PQfinish(conn);
!                 SRF_RETURN_DONE(funcctx);
              }
-         }
-
-         if (PQresultStatus(res) == PGRES_COMMAND_OK)
-         {
-             is_sql_cmd = true;
-
-             /* need a tuple descriptor representing one TEXT column */
-             tupdesc = CreateTemplateTupleDesc(1, false);
-             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                                TEXTOID, -1, 0);

!             /*
!              * and save a copy of the command status string to return as our
!              * result tuple
!              */
!             sql_cmd_status = PQcmdStatus(res);
!             funcctx->max_calls = 1;
!         }
!         else
!             funcctx->max_calls = PQntuples(res);
!
!         /* got results, keep track of them */
!         funcctx->user_fctx = res;
!
!         /* if needed, close the connection to the database and cleanup */
!         if (freeconn)
!             PQfinish(conn);
!
!         if (!is_sql_cmd)
!         {
!             /* get a tuple descriptor for our result type */
!             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
              {
!                 case TYPEFUNC_COMPOSITE:
!                     /* success */
!                     break;
!                 case TYPEFUNC_RECORD:
!                     /* failed to determine actual type of RECORD */
!                     ereport(ERROR,
!                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                         errmsg("function returning record called in context "
!                                "that cannot accept type record")));
!                     break;
!                 default:
!                     /* result type isn't composite */
!                     elog(ERROR, "return type must be a row type");
!                     break;
              }
!
!             /* make sure we have a persistent copy of the tupdesc */
!             tupdesc = CreateTupleDescCopy(tupdesc);
          }
!
!         /* check result and tuple descriptor have the same number of columns */
!         if (PQnfields(res) != tupdesc->natts)
!             ereport(ERROR,
!                     (errcode(ERRCODE_DATATYPE_MISMATCH),
!                 errmsg("remote query result rowtype does not match "
!                         "the specified FROM clause rowtype")));
!
!         /* fast track when no results */
!         if (funcctx->max_calls < 1)
          {
!             if (res)
!                 PQclear(res);
!             SRF_RETURN_DONE(funcctx);
          }

!         /* store needed metadata for subsequent calls */
!         attinmeta = TupleDescGetAttInMetadata(tupdesc);
!         funcctx->attinmeta = attinmeta;

-         MemoryContextSwitchTo(oldcontext);
      }

      /* stuff done on every call of the function */
--- 744,930 ----
           */
          oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

!         if (!is_async)
          {
!             if (PG_NARGS() == 3)
!             {
!                 /* text,text,bool */
!                 DBLINK_GET_CONN;
!                 sql = GET_STR(PG_GETARG_TEXT_P(1));
!                 fail = PG_GETARG_BOOL(2);
!             }
!             else if (PG_NARGS() == 2)
!             {
!                 /* text,text or text,bool */
!                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
!                 {
!                     conn = pconn->conn;
!                     sql = GET_STR(PG_GETARG_TEXT_P(0));
!                     fail = PG_GETARG_BOOL(1);
!                 }
!                 else
!                 {
!                     DBLINK_GET_CONN;
!                     sql = GET_STR(PG_GETARG_TEXT_P(1));
!                 }
!             }
!             else if (PG_NARGS() == 1)
              {
+                 /* text */
                  conn = pconn->conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
              }
              else
+                 /* shouldn't happen */
+                 elog(ERROR, "wrong number of arguments");
+         }
+         else if (is_async && do_get)
+         {
+             /* get async result */
+             if (PG_NARGS() == 2)
              {
+                 /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(2);
              }
+             else if (PG_NARGS() == 1)
+             {
+                 /* text */
+                 DBLINK_GET_CONN;
+             }
+             else
+                 /* shouldn't happen */
+                 elog(ERROR, "wrong number of arguments");
          }
!         else
          {
!             /* send async query */
!             if (PG_NARGS() == 2)
!             {
!                 DBLINK_GET_CONN;
!                 sql = GET_STR(PG_GETARG_TEXT_P(1));
!             }
!             else
!                 /* shouldn't happen */
!                 elog(ERROR, "wrong number of arguments");
          }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         if (!is_async || (is_async && do_get))
          {
!             /* synchronous query, or async result retrieval */
!             if (!is_async)
!                 res = PQexec(conn, sql);
              else
              {
!                 res = PQgetResult(conn);
!                 /* NULL means we're all done with the async results */
!                 if (!res)
!                     SRF_RETURN_DONE(funcctx);
              }

!             if (!res ||
!                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
!                 PQresultStatus(res) != PGRES_TUPLES_OK))
              {
!                 if (fail)
!                     DBLINK_RES_ERROR("sql error");
!                 else
!                 {
!                     DBLINK_RES_ERROR_AS_NOTICE("sql error");
!                     if (freeconn)
!                         PQfinish(conn);
!                     SRF_RETURN_DONE(funcctx);
!                 }
              }
!
!             if (PQresultStatus(res) == PGRES_COMMAND_OK)
!             {
!                 is_sql_cmd = true;
!
!                 /* need a tuple descriptor representing one TEXT column */
!                 tupdesc = CreateTemplateTupleDesc(1, false);
!                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                 TEXTOID, -1, 0);
!
!                 /*
!                 * and save a copy of the command status string to return as our
!                 * result tuple
!                 */
!                 sql_cmd_status = PQcmdStatus(res);
!                 funcctx->max_calls = 1;
!             }
!             else
!                 funcctx->max_calls = PQntuples(res);
!
!             /* got results, keep track of them */
!             funcctx->user_fctx = res;
!
!             /* if needed, close the connection to the database and cleanup */
!             if (freeconn)
!                 PQfinish(conn);
!
!             if (!is_sql_cmd)
!             {
!                 /* get a tuple descriptor for our result type */
!                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
!                 {
!                     case TYPEFUNC_COMPOSITE:
!                         /* success */
!                         break;
!                     case TYPEFUNC_RECORD:
!                         /* failed to determine actual type of RECORD */
!                         ereport(ERROR,
!                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                             errmsg("function returning record called in context "
!                                 "that cannot accept type record")));
!                         break;
!                     default:
!                         /* result type isn't composite */
!                         elog(ERROR, "return type must be a row type");
!                         break;
!                 }
!
!                 /* make sure we have a persistent copy of the tupdesc */
!                 tupdesc = CreateTupleDescCopy(tupdesc);
!             }
!
!             /* check result and tuple descriptor have the same number of columns */
!             if (PQnfields(res) != tupdesc->natts)
!                 ereport(ERROR,
!                         (errcode(ERRCODE_DATATYPE_MISMATCH),
!                     errmsg("remote query result rowtype does not match "
!                             "the specified FROM clause rowtype")));
!
!             /* fast track when no results */
!             if (funcctx->max_calls < 1)
!             {
!                 if (res)
!                     PQclear(res);
!                 SRF_RETURN_DONE(funcctx);
!             }
!
!             /* store needed metadata for subsequent calls */
!             attinmeta = TupleDescGetAttInMetadata(tupdesc);
!             funcctx->attinmeta = attinmeta;
!
!             MemoryContextSwitchTo(oldcontext);
          }
!         else
          {
!             /* async query send */
!             MemoryContextSwitchTo(oldcontext);
!             PG_RETURN_INT32(PQsendQuery(conn, sql));
          }
+     }

!     if (is_async && !do_get)
!     {
!         /* async query send -- should not happen */
!         elog(ERROR, "async query send called more than once");

      }

      /* stuff done on every call of the function */
***************
*** 903,908 ****
--- 983,1122 ----
  }

  /*
+  * List all open dblink connections by name.
+  * Returns an array of all connection names.
+  * Takes no params
+  */
+ PG_FUNCTION_INFO_V1(dblink_get_connections);
+ Datum
+ dblink_get_connections(PG_FUNCTION_ARGS)
+ {
+     HASH_SEQ_STATUS            status;
+     remoteConnHashEnt       *hentry;
+     ArrayBuildState           *astate = NULL;
+
+     if (remoteConnHash)
+     {
+         hash_seq_init(&status, remoteConnHash);
+         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
+         {
+             /* stash away current value */
+             astate = accumArrayResult(astate,
+                                       PointerGetDatum(GET_TEXT(hentry->name)),
+                                       false, TEXTOID, CurrentMemoryContext);
+         }
+     }
+
+     if (astate)
+         PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+                                               CurrentMemoryContext));
+     else
+         PG_RETURN_NULL();
+ }
+
+ /*
+  * Checks if a given remote connection is busy
+  *
+  * Returns 1 if the connection is busy, 0 otherwise
+  * Params:
+  *     text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_is_busy);
+ Datum
+ dblink_is_busy(PG_FUNCTION_ARGS)
+ {
+     char           *msg;
+     PGconn           *conn = NULL;
+     char           *conname = NULL;
+     char           *connstr = NULL;
+     remoteConn       *rconn = NULL;
+     bool            freeconn = false;
+
+     DBLINK_INIT;
+     DBLINK_GET_CONN;
+     if (!conn)
+         DBLINK_CONN_NOT_AVAIL;
+
+     PQconsumeInput(conn);
+     PG_RETURN_INT32(PQisBusy(conn));
+ }
+
+ /*
+  * Cancels a running request on a connection
+  *
+  * Returns text:
+  *    "OK" if the cancel request has been sent correctly,
+  *      an error message otherwise
+  *
+  * Params:
+  *     text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_cancel_query);
+ Datum
+ dblink_cancel_query(PG_FUNCTION_ARGS)
+ {
+     char           *msg;
+     int                res = 0;
+     PGconn           *conn = NULL;
+     char           *conname = NULL;
+     char           *connstr = NULL;
+     remoteConn       *rconn = NULL;
+     bool            freeconn = false;
+     PGcancel       *cancel;
+     char            errbuf[256];
+
+     DBLINK_INIT;
+     DBLINK_GET_CONN;
+     if (!conn)
+         DBLINK_CONN_NOT_AVAIL;
+     cancel = PQgetCancel(conn);
+
+     res = PQcancel(cancel, errbuf, 256);
+     PQfreeCancel(cancel);
+
+     if (res == 0)
+         PG_RETURN_TEXT_P(GET_TEXT("OK"));
+     else
+         PG_RETURN_TEXT_P(GET_TEXT(errbuf));
+ }
+
+
+ /*
+  * Get error message from a connection
+  *
+  * Returns text:
+  *    "OK" if no error, an error message otherwise
+  *
+  * Params:
+  *     text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_error_message);
+ Datum
+ dblink_error_message(PG_FUNCTION_ARGS)
+ {
+     char           *msg;
+     PGconn           *conn = NULL;
+     char           *conname = NULL;
+     char           *connstr = NULL;
+     remoteConn       *rconn = NULL;
+     bool            freeconn = false;
+
+     DBLINK_INIT;
+     DBLINK_GET_CONN;
+     if (!conn)
+         DBLINK_CONN_NOT_AVAIL;
+
+     msg = PQerrorMessage(conn);
+     if (!msg)
+         PG_RETURN_TEXT_P(GET_TEXT("OK"));
+     else
+         PG_RETURN_TEXT_P(GET_TEXT(msg));
+ }
+
+ /*
   * Execute an SQL non-SELECT command
   */
  PG_FUNCTION_INFO_V1(dblink_exec);
Index: dblink.h
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.h,v
retrieving revision 1.16
diff -c -r1.16 dblink.h
*** dblink.h    10 Jul 2006 18:40:16 -0000    1.16
--- dblink.h    27 Aug 2006 23:21:15 -0000
***************
*** 45,50 ****
--- 45,56 ----
  extern Datum dblink_close(PG_FUNCTION_ARGS);
  extern Datum dblink_fetch(PG_FUNCTION_ARGS);
  extern Datum dblink_record(PG_FUNCTION_ARGS);
+ extern Datum dblink_send_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_result(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_connections(PG_FUNCTION_ARGS);
+ extern Datum dblink_is_busy(PG_FUNCTION_ARGS);
+ extern Datum dblink_cancel_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_error_message(PG_FUNCTION_ARGS);
  extern Datum dblink_exec(PG_FUNCTION_ARGS);
  extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
  extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
Index: dblink.sql.in
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.sql.in,v
retrieving revision 1.10
diff -c -r1.10 dblink.sql.in
*** dblink.sql.in    27 Feb 2006 16:09:48 -0000    1.10
--- dblink.sql.in    27 Aug 2006 23:21:15 -0000
***************
*** 144,146 ****
--- 144,181 ----
  RETURNS text
  AS 'MODULE_PATHNAME','dblink_current_query'
  LANGUAGE C;
+
+ CREATE OR REPLACE FUNCTION dblink_send_query(text, text)
+ RETURNS int4
+ AS 'MODULE_PATHNAME', 'dblink_send_query'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_is_busy(text)
+ RETURNS int4
+ AS 'MODULE_PATHNAME', 'dblink_is_busy'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_result(text)
+ RETURNS SETOF record
+ AS 'MODULE_PATHNAME', 'dblink_get_result'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_result(text, bool)
+ RETURNS SETOF record
+ AS 'MODULE_PATHNAME', 'dblink_get_result'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_connections()
+ RETURNS text[]
+ AS 'MODULE_PATHNAME', 'dblink_get_connections'
+ LANGUAGE C;
+
+ CREATE OR REPLACE FUNCTION dblink_cancel_query(text)
+ RETURNS text
+ AS 'MODULE_PATHNAME', 'dblink_cancel_query'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_error_message(text)
+ RETURNS text
+ AS 'MODULE_PATHNAME', 'dblink_error_message'
+ LANGUAGE C STRICT;
Index: doc/misc
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/doc/misc,v
retrieving revision 1.3
diff -c -r1.3 misc
*** doc/misc    11 Mar 2006 04:38:29 -0000    1.3
--- doc/misc    27 Aug 2006 23:21:15 -0000
***************
*** 139,141 ****
--- 139,232 ----
   UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
  (1 row)

+
+ ==================================================================
+ Name
+
+ dblink_get_connections -- returns a text array of all active named
+                           dblink connections
+
+ Synopsis
+
+ dblink_get_connections() RETURNS text[]
+
+ Inputs
+
+   none
+
+ Outputs
+
+   Returns text array of all active named dblink connections
+
+ Example usage
+
+   SELECT dblink_get_connections();
+
+ ==================================================================
+ Name
+
+ dblink_is_busy -- checks to see if named connection is busy
+                   with an async query
+
+ Synopsis
+
+ dblink_is_busy(text connname) RETURNS int
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+ Outputs
+
+   Returns 1 if connection is busy, 0 if it is not busy.
+   If this function returns 0, it is guaranteed that dblink_get_result
+   will not block.
+
+ Example usage
+
+   SELECT dblink_is_busy('dtest1');
+
+ ==================================================================
+ Name
+
+ dblink_cancel_query -- cancels any active query on the named connection
+
+ Synopsis
+
+ dblink_cancel_query(text connname) RETURNS text
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+ Outputs
+
+   Returns "OK" on success, or an error message on failure.
+
+ Example usage
+
+   SELECT dblink_cancel_query('dtest1');
+
+ ==================================================================
+ Name
+
+ dblink_error_message -- gets last error message on the named connection
+
+ Synopsis
+
+ dblink_error_message(text connname) RETURNS text
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+ Outputs
+
+   Returns last error message.
+
+ Example usage
+
+   SELECT dblink_error_message('dtest1');
Index: doc/query
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/doc/query,v
retrieving revision 1.4
diff -c -r1.4 query
*** doc/query    21 Jun 2005 04:02:28 -0000    1.4
--- doc/query    27 Aug 2006 23:21:15 -0000
***************
*** 118,120 ****
--- 118,242 ----

     select * from myremote_pg_proc where proname like 'bytea%';

+
+ ==================================================================
+ Name
+
+ dblink_send_query -- Sends an async query to a remote database
+
+ Synopsis
+
+ dblink_send_query(text connname, text sql)
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+   sql
+
+     sql statement that you wish to execute on the remote host
+     e.g. "select * from pg_class"
+
+ Outputs
+
+   Returns int. A return value of 1 if the query was successfully dispatched,
+   0 otherwise. If 1, results must be fetched by dblink_get_result(connname).
+   A running query may be cancelled by dblink_cancel_query(connname).
+
+ Example usage
+
+   SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+   SELECT * from
+    dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+ ==================================================================
+ Name
+
+ dblink_get_result -- Gets an async query result
+
+ Synopsis
+
+ dblink_get_result(text connname [, bool fail_on_error])
+
+ Inputs
+
+   connname
+     The specific connection name to use. An asynchronous query must
+     have already been sent using dblink_send_query()
+
+   fail_on_error
+
+     If true (default when not present) then an ERROR thrown on the remote side
+     of the connection causes an ERROR to also be thrown locally. If false, the
+     remote ERROR is locally treated as a NOTICE, and no rows are returned.
+
+ Outputs
+
+   Returns setof record
+
+ Notes
+   Blocks until a result gets available.
+
+   This function *must* be called if dblink_send_query returned
+   a 1, even on cancelled queries - otherwise the connection
+   can't be used anymore. It must be called once for each query
+   sent, and one additional time to obtain an empty set result,
+   prior to using the connection again.
+
+ Example usage
+
+ contrib_regression=#   SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ contrib_regression=#   SELECT * from
+ contrib_regression-#    dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 |     f3
+ ----+----+------------
+   0 | a  | {a0,b0,c0}
+   1 | b  | {a1,b1,c1}
+   2 | c  | {a2,b2,c2}
+ (3 rows)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 | f3
+ ----+----+----
+ (0 rows)
+
+ contrib_regression=#   SELECT * from
+    dblink_send_query('dtest1', 'select * from foo where f1 < 3; select * from foo where f1 > 6') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 |     f3
+ ----+----+------------
+   0 | a  | {a0,b0,c0}
+   1 | b  | {a1,b1,c1}
+   2 | c  | {a2,b2,c2}
+ (3 rows)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 |      f3
+ ----+----+---------------
+   7 | h  | {a7,b7,c7}
+   8 | i  | {a8,b8,c8}
+   9 | j  | {a9,b9,c9}
+  10 | k  | {a10,b10,c10}
+ (4 rows)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 | f3
+ ----+----+----
+ (0 rows)
Index: expected/dblink.out
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.17
diff -c -r1.17 dblink.out
*** expected/dblink.out    21 Jun 2006 16:43:11 -0000    1.17
--- expected/dblink.out    27 Aug 2006 23:21:15 -0000
***************
*** 669,671 ****
--- 669,758 ----
  -- should get 'connection "myconn" not available' error
  SELECT dblink_disconnect('myconn');
  ERROR:  connection "myconn" not available
+ -- test asynchronous queries
+ SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ SELECT * from
+  dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ SELECT * from
+  dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ SELECT * from
+  dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ CREATE TEMPORARY TABLE result AS
+ (SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ ORDER by f1;
+ SELECT dblink_get_connections();
+  dblink_get_connections
+ ------------------------
+  {dtest1,dtest2,dtest3}
+ (1 row)
+
+ SELECT dblink_disconnect('dtest1');
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
+ SELECT dblink_disconnect('dtest2');
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
+ SELECT dblink_disconnect('dtest3');
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
+ SELECT * from result;
+  f1 | f2 |      f3
+ ----+----+---------------
+   0 | a  | {a0,b0,c0}
+   1 | b  | {a1,b1,c1}
+   2 | c  | {a2,b2,c2}
+   3 | d  | {a3,b3,c3}
+   4 | e  | {a4,b4,c4}
+   5 | f  | {a5,b5,c5}
+   6 | g  | {a6,b6,c6}
+   7 | h  | {a7,b7,c7}
+   8 | i  | {a8,b8,c8}
+   9 | j  | {a9,b9,c9}
+  10 | k  | {a10,b10,c10}
+ (11 rows)
+
Index: sql/dblink.sql
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.15
diff -c -r1.15 dblink.sql
*** sql/dblink.sql    18 Oct 2005 02:55:49 -0000    1.15
--- sql/dblink.sql    27 Aug 2006 23:21:15 -0000
***************
*** 319,321 ****
--- 319,350 ----
  -- close the named persistent connection again
  -- should get 'connection "myconn" not available' error
  SELECT dblink_disconnect('myconn');
+
+ -- test asynchronous queries
+ SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+ SELECT * from
+  dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+ SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+ SELECT * from
+  dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+
+ SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+ SELECT * from
+  dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+
+ CREATE TEMPORARY TABLE result AS
+ (SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ ORDER by f1;
+
+ SELECT dblink_get_connections();
+
+ SELECT dblink_disconnect('dtest1');
+ SELECT dblink_disconnect('dtest2');
+ SELECT dblink_disconnect('dtest3');
+ SELECT * from result;
+

Re: [Fwd: dblink patch - Asynchronous queries and parallel

From
Joe Conway
Date:
Joe Conway wrote:
> Sorry for the delay. I've done some rework to the original code sent by
> Kai, mainly to reduce duplication with the existing synchronous case,
> and to better fit with the existing docs, regression script, etc. I also
> changed the return type of dblink_get_connections() to text[], and added
> dblink_error_message().
>
> If it isn't already too late, and there are no objections, I'd like to
> commit this in the next day or so.

Patch applied.

Joe