Re: [Fwd: dblink patch - Asynchronous queries and parallel - Mailing list pgsql-patches

From Joe Conway
Subject Re: [Fwd: dblink patch - Asynchronous queries and parallel
Date
Msg-id 44F229A0.90902@joeconway.com
Whole thread Raw
In response to [Fwd: dblink patch - Asynchronous queries and parallel execution]  (Joe Conway <mail@joeconway.com>)
Responses Re: [Fwd: dblink patch - Asynchronous queries and parallel
List pgsql-patches
Joe Conway wrote:
> I just received this (offlist), and have not had a chance to review it
> myself yet, but figured I should post it now in case others want to have
> a look and comment or discuss before feature freeze.
>
> If there are no major objections to the concept, I'll take
> responsibility to review and commit once I'm through with the "Values
> list-of-targetlists" stuff.


Sorry for the delay. I've done some rework to the original code sent by
Kai, mainly to reduce duplication with the existing synchronous case,
and to better fit with the existing docs, regression script, etc. I also
changed the return type of dblink_get_connections() to text[], and added
dblink_error_message().

If it isn't already too late, and there are no objections, I'd like to
commit this in the next day or so.

Kai, please verify that this still works in a similar fashion as your
original.

Thanks,

Joe
? .deps
? dblink.sql
? libdblink.so.0.0
? results
Index: README.dblink
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/README.dblink,v
retrieving revision 1.13
diff -c -r1.13 README.dblink
*** README.dblink    3 Jan 2006 23:45:52 -0000    1.13
--- README.dblink    27 Aug 2006 23:21:15 -0000
***************
*** 7,12 ****
--- 7,13 ----
   * And contributors:
   * Darko Prenosil <Darko.Prenosil@finteh.hr>
   * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
+  * Kai Londenberg (K.Londenberg@librics.de)
   *
   * Copyright (c) 2001-2006, PostgreSQL Global Development Group
   * ALL RIGHTS RESERVED;
***************
*** 31,36 ****
--- 32,40 ----
   */

  Release Notes:
+   27 August 2006
+     - Added async query capability. Original patch by
+       Kai Londenberg (K.Londenberg@librics.de), modified by Joe Conway
    Version 0.7 (as of 25 Feb, 2004)
      - Added new version of dblink, dblink_exec, dblink_open, dblink_close,
        and, dblink_fetch -- allows ERROR on remote side of connection to
***************
*** 85,159 ****

      psql template1 < dblink.sql

!   installs following functions into database template1:
!
!      connection
!      ------------
!      dblink_connect(text) RETURNS text
!        - opens an unnamed connection that will persist for duration of
!          current backend or until it is disconnected
!      dblink_connect(text,text) RETURNS text
!        - opens a named connection that will persist for duration of current
!          backend or until it is disconnected
!      dblink_disconnect() RETURNS text
!        - disconnects the unnamed persistent connection
!      dblink_disconnect(text) RETURNS text
!        - disconnects a named persistent connection
!
!      cursor
!      ------------
!      dblink_open(text,text [, bool fail_on_error]) RETURNS text
!        - opens a cursor using unnamed connection already opened with
!          dblink_connect() that will persist for duration of current backend
!          or until it is closed
!      dblink_open(text,text,text [, bool fail_on_error]) RETURNS text
!        - opens a cursor using a named connection already opened with
!          dblink_connect() that will persist for duration of current backend
!          or until it is closed
!      dblink_fetch(text, int [, bool fail_on_error]) RETURNS setof record
!        - fetches data from an already opened cursor on the unnamed connection
!      dblink_fetch(text, text, int [, bool fail_on_error]) RETURNS setof record
!        - fetches data from an already opened cursor on a named connection
!      dblink_close(text [, bool fail_on_error]) RETURNS text
!        - closes a cursor on the unnamed connection
!      dblink_close(text,text [, bool fail_on_error]) RETURNS text
!        - closes a cursor on a named connection
!
!      query
!      ------------
!      dblink(text,text [, bool fail_on_error]) RETURNS setof record
!        - returns a set of results from remote SELECT query; the first argument
!          is either a connection string, or the name of an already opened
!          persistant connection
!      dblink(text [, bool fail_on_error]) RETURNS setof record
!        - returns a set of results from remote SELECT query, using the unnamed
!          connection already opened with dblink_connect()
!
!      execute
!      ------------
!      dblink_exec(text, text [, bool fail_on_error]) RETURNS text
!        - executes an INSERT/UPDATE/DELETE query remotely; the first argument
!          is either a connection string, or the name of an already opened
!          persistant connection
!      dblink_exec(text [, bool fail_on_error]) RETURNS text
!        - executes an INSERT/UPDATE/DELETE query remotely, using connection
!          already opened with dblink_connect()
!
!      misc
!      ------------
!      dblink_current_query() RETURNS text
!        - returns the current query string
!      dblink_get_pkey(text) RETURNS setof text
!        - returns the field names of a relation's primary key fields
!      dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
!        - builds an insert statement using a local tuple, replacing the
!          selection key field values with alternate supplied values
!      dblink_build_sql_delete(text,int2vector,int2,_text) RETURNS text
!        - builds a delete statement using supplied values for selection
!          key field values
!      dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
!        - builds an update statement using a local tuple, replacing the
!          selection key field values with alternate supplied values

  Documentation:

--- 89,95 ----

      psql template1 < dblink.sql

!   installs dblink functions into database template1

  Documentation:

Index: dblink.c
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.57
diff -c -r1.57 dblink.c
*** dblink.c    11 Jul 2006 16:35:31 -0000    1.57
--- dblink.c    27 Aug 2006 23:21:15 -0000
***************
*** 73,78 ****
--- 73,79 ----
  /*
   * Internal declarations
   */
+ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 691,696 ****
--- 692,717 ----
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
+     return dblink_record_internal(fcinfo, false, false);
+ }
+
+ PG_FUNCTION_INFO_V1(dblink_send_query);
+ Datum
+ dblink_send_query(PG_FUNCTION_ARGS)
+ {
+     return dblink_record_internal(fcinfo, true, false);
+ }
+
+ PG_FUNCTION_INFO_V1(dblink_get_result);
+ Datum
+ dblink_get_result(PG_FUNCTION_ARGS)
+ {
+     return dblink_record_internal(fcinfo, true, true);
+ }
+
+ static Datum
+ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
+ {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
      int            call_cntr;
***************
*** 723,850 ****
           */
          oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

!         if (PG_NARGS() == 3)
          {
!             /* text,text,bool */
!             DBLINK_GET_CONN;
!             sql = GET_STR(PG_GETARG_TEXT_P(1));
!             fail = PG_GETARG_BOOL(2);
!         }
!         else if (PG_NARGS() == 2)
!         {
!             /* text,text or text,bool */
!             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
              {
                  conn = pconn->conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
-                 fail = PG_GETARG_BOOL(1);
              }
              else
              {
                  DBLINK_GET_CONN;
!                 sql = GET_STR(PG_GETARG_TEXT_P(1));
              }
          }
!         else if (PG_NARGS() == 1)
          {
!             /* text */
!             conn = pconn->conn;
!             sql = GET_STR(PG_GETARG_TEXT_P(0));
          }
-         else
-             /* shouldn't happen */
-             elog(ERROR, "wrong number of arguments");

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         res = PQexec(conn, sql);
!         if (!res ||
!             (PQresultStatus(res) != PGRES_COMMAND_OK &&
!              PQresultStatus(res) != PGRES_TUPLES_OK))
          {
!             if (fail)
!                 DBLINK_RES_ERROR("sql error");
              else
              {
!                 DBLINK_RES_ERROR_AS_NOTICE("sql error");
!                 if (freeconn)
!                     PQfinish(conn);
!                 SRF_RETURN_DONE(funcctx);
              }
-         }
-
-         if (PQresultStatus(res) == PGRES_COMMAND_OK)
-         {
-             is_sql_cmd = true;
-
-             /* need a tuple descriptor representing one TEXT column */
-             tupdesc = CreateTemplateTupleDesc(1, false);
-             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                                TEXTOID, -1, 0);

!             /*
!              * and save a copy of the command status string to return as our
!              * result tuple
!              */
!             sql_cmd_status = PQcmdStatus(res);
!             funcctx->max_calls = 1;
!         }
!         else
!             funcctx->max_calls = PQntuples(res);
!
!         /* got results, keep track of them */
!         funcctx->user_fctx = res;
!
!         /* if needed, close the connection to the database and cleanup */
!         if (freeconn)
!             PQfinish(conn);
!
!         if (!is_sql_cmd)
!         {
!             /* get a tuple descriptor for our result type */
!             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
              {
!                 case TYPEFUNC_COMPOSITE:
!                     /* success */
!                     break;
!                 case TYPEFUNC_RECORD:
!                     /* failed to determine actual type of RECORD */
!                     ereport(ERROR,
!                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                         errmsg("function returning record called in context "
!                                "that cannot accept type record")));
!                     break;
!                 default:
!                     /* result type isn't composite */
!                     elog(ERROR, "return type must be a row type");
!                     break;
              }
!
!             /* make sure we have a persistent copy of the tupdesc */
!             tupdesc = CreateTupleDescCopy(tupdesc);
          }
!
!         /* check result and tuple descriptor have the same number of columns */
!         if (PQnfields(res) != tupdesc->natts)
!             ereport(ERROR,
!                     (errcode(ERRCODE_DATATYPE_MISMATCH),
!                 errmsg("remote query result rowtype does not match "
!                         "the specified FROM clause rowtype")));
!
!         /* fast track when no results */
!         if (funcctx->max_calls < 1)
          {
!             if (res)
!                 PQclear(res);
!             SRF_RETURN_DONE(funcctx);
          }

!         /* store needed metadata for subsequent calls */
!         attinmeta = TupleDescGetAttInMetadata(tupdesc);
!         funcctx->attinmeta = attinmeta;

-         MemoryContextSwitchTo(oldcontext);
      }

      /* stuff done on every call of the function */
--- 744,930 ----
           */
          oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

!         if (!is_async)
          {
!             if (PG_NARGS() == 3)
!             {
!                 /* text,text,bool */
!                 DBLINK_GET_CONN;
!                 sql = GET_STR(PG_GETARG_TEXT_P(1));
!                 fail = PG_GETARG_BOOL(2);
!             }
!             else if (PG_NARGS() == 2)
!             {
!                 /* text,text or text,bool */
!                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
!                 {
!                     conn = pconn->conn;
!                     sql = GET_STR(PG_GETARG_TEXT_P(0));
!                     fail = PG_GETARG_BOOL(1);
!                 }
!                 else
!                 {
!                     DBLINK_GET_CONN;
!                     sql = GET_STR(PG_GETARG_TEXT_P(1));
!                 }
!             }
!             else if (PG_NARGS() == 1)
              {
+                 /* text */
                  conn = pconn->conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
              }
              else
+                 /* shouldn't happen */
+                 elog(ERROR, "wrong number of arguments");
+         }
+         else if (is_async && do_get)
+         {
+             /* get async result */
+             if (PG_NARGS() == 2)
              {
+                 /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(2);
              }
+             else if (PG_NARGS() == 1)
+             {
+                 /* text */
+                 DBLINK_GET_CONN;
+             }
+             else
+                 /* shouldn't happen */
+                 elog(ERROR, "wrong number of arguments");
          }
!         else
          {
!             /* send async query */
!             if (PG_NARGS() == 2)
!             {
!                 DBLINK_GET_CONN;
!                 sql = GET_STR(PG_GETARG_TEXT_P(1));
!             }
!             else
!                 /* shouldn't happen */
!                 elog(ERROR, "wrong number of arguments");
          }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         if (!is_async || (is_async && do_get))
          {
!             /* synchronous query, or async result retrieval */
!             if (!is_async)
!                 res = PQexec(conn, sql);
              else
              {
!                 res = PQgetResult(conn);
!                 /* NULL means we're all done with the async results */
!                 if (!res)
!                     SRF_RETURN_DONE(funcctx);
              }

!             if (!res ||
!                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
!                 PQresultStatus(res) != PGRES_TUPLES_OK))
              {
!                 if (fail)
!                     DBLINK_RES_ERROR("sql error");
!                 else
!                 {
!                     DBLINK_RES_ERROR_AS_NOTICE("sql error");
!                     if (freeconn)
!                         PQfinish(conn);
!                     SRF_RETURN_DONE(funcctx);
!                 }
              }
!
!             if (PQresultStatus(res) == PGRES_COMMAND_OK)
!             {
!                 is_sql_cmd = true;
!
!                 /* need a tuple descriptor representing one TEXT column */
!                 tupdesc = CreateTemplateTupleDesc(1, false);
!                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                 TEXTOID, -1, 0);
!
!                 /*
!                 * and save a copy of the command status string to return as our
!                 * result tuple
!                 */
!                 sql_cmd_status = PQcmdStatus(res);
!                 funcctx->max_calls = 1;
!             }
!             else
!                 funcctx->max_calls = PQntuples(res);
!
!             /* got results, keep track of them */
!             funcctx->user_fctx = res;
!
!             /* if needed, close the connection to the database and cleanup */
!             if (freeconn)
!                 PQfinish(conn);
!
!             if (!is_sql_cmd)
!             {
!                 /* get a tuple descriptor for our result type */
!                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
!                 {
!                     case TYPEFUNC_COMPOSITE:
!                         /* success */
!                         break;
!                     case TYPEFUNC_RECORD:
!                         /* failed to determine actual type of RECORD */
!                         ereport(ERROR,
!                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                             errmsg("function returning record called in context "
!                                 "that cannot accept type record")));
!                         break;
!                     default:
!                         /* result type isn't composite */
!                         elog(ERROR, "return type must be a row type");
!                         break;
!                 }
!
!                 /* make sure we have a persistent copy of the tupdesc */
!                 tupdesc = CreateTupleDescCopy(tupdesc);
!             }
!
!             /* check result and tuple descriptor have the same number of columns */
!             if (PQnfields(res) != tupdesc->natts)
!                 ereport(ERROR,
!                         (errcode(ERRCODE_DATATYPE_MISMATCH),
!                     errmsg("remote query result rowtype does not match "
!                             "the specified FROM clause rowtype")));
!
!             /* fast track when no results */
!             if (funcctx->max_calls < 1)
!             {
!                 if (res)
!                     PQclear(res);
!                 SRF_RETURN_DONE(funcctx);
!             }
!
!             /* store needed metadata for subsequent calls */
!             attinmeta = TupleDescGetAttInMetadata(tupdesc);
!             funcctx->attinmeta = attinmeta;
!
!             MemoryContextSwitchTo(oldcontext);
          }
!         else
          {
!             /* async query send */
!             MemoryContextSwitchTo(oldcontext);
!             PG_RETURN_INT32(PQsendQuery(conn, sql));
          }
+     }

!     if (is_async && !do_get)
!     {
!         /* async query send -- should not happen */
!         elog(ERROR, "async query send called more than once");

      }

      /* stuff done on every call of the function */
***************
*** 903,908 ****
--- 983,1122 ----
  }

  /*
+  * List all open dblink connections by name.
+  * Returns an array of all connection names.
+  * Takes no params
+  */
+ PG_FUNCTION_INFO_V1(dblink_get_connections);
+ Datum
+ dblink_get_connections(PG_FUNCTION_ARGS)
+ {
+     HASH_SEQ_STATUS            status;
+     remoteConnHashEnt       *hentry;
+     ArrayBuildState           *astate = NULL;
+
+     if (remoteConnHash)
+     {
+         hash_seq_init(&status, remoteConnHash);
+         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
+         {
+             /* stash away current value */
+             astate = accumArrayResult(astate,
+                                       PointerGetDatum(GET_TEXT(hentry->name)),
+                                       false, TEXTOID, CurrentMemoryContext);
+         }
+     }
+
+     if (astate)
+         PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+                                               CurrentMemoryContext));
+     else
+         PG_RETURN_NULL();
+ }
+
+ /*
+  * Checks if a given remote connection is busy
+  *
+  * Returns 1 if the connection is busy, 0 otherwise
+  * Params:
+  *     text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_is_busy);
+ Datum
+ dblink_is_busy(PG_FUNCTION_ARGS)
+ {
+     char           *msg;
+     PGconn           *conn = NULL;
+     char           *conname = NULL;
+     char           *connstr = NULL;
+     remoteConn       *rconn = NULL;
+     bool            freeconn = false;
+
+     DBLINK_INIT;
+     DBLINK_GET_CONN;
+     if (!conn)
+         DBLINK_CONN_NOT_AVAIL;
+
+     PQconsumeInput(conn);
+     PG_RETURN_INT32(PQisBusy(conn));
+ }
+
+ /*
+  * Cancels a running request on a connection
+  *
+  * Returns text:
+  *    "OK" if the cancel request has been sent correctly,
+  *      an error message otherwise
+  *
+  * Params:
+  *     text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_cancel_query);
+ Datum
+ dblink_cancel_query(PG_FUNCTION_ARGS)
+ {
+     char           *msg;
+     int                res = 0;
+     PGconn           *conn = NULL;
+     char           *conname = NULL;
+     char           *connstr = NULL;
+     remoteConn       *rconn = NULL;
+     bool            freeconn = false;
+     PGcancel       *cancel;
+     char            errbuf[256];
+
+     DBLINK_INIT;
+     DBLINK_GET_CONN;
+     if (!conn)
+         DBLINK_CONN_NOT_AVAIL;
+     cancel = PQgetCancel(conn);
+
+     res = PQcancel(cancel, errbuf, 256);
+     PQfreeCancel(cancel);
+
+     if (res == 0)
+         PG_RETURN_TEXT_P(GET_TEXT("OK"));
+     else
+         PG_RETURN_TEXT_P(GET_TEXT(errbuf));
+ }
+
+
+ /*
+  * Get error message from a connection
+  *
+  * Returns text:
+  *    "OK" if no error, an error message otherwise
+  *
+  * Params:
+  *     text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_error_message);
+ Datum
+ dblink_error_message(PG_FUNCTION_ARGS)
+ {
+     char           *msg;
+     PGconn           *conn = NULL;
+     char           *conname = NULL;
+     char           *connstr = NULL;
+     remoteConn       *rconn = NULL;
+     bool            freeconn = false;
+
+     DBLINK_INIT;
+     DBLINK_GET_CONN;
+     if (!conn)
+         DBLINK_CONN_NOT_AVAIL;
+
+     msg = PQerrorMessage(conn);
+     if (!msg)
+         PG_RETURN_TEXT_P(GET_TEXT("OK"));
+     else
+         PG_RETURN_TEXT_P(GET_TEXT(msg));
+ }
+
+ /*
   * Execute an SQL non-SELECT command
   */
  PG_FUNCTION_INFO_V1(dblink_exec);
Index: dblink.h
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.h,v
retrieving revision 1.16
diff -c -r1.16 dblink.h
*** dblink.h    10 Jul 2006 18:40:16 -0000    1.16
--- dblink.h    27 Aug 2006 23:21:15 -0000
***************
*** 45,50 ****
--- 45,56 ----
  extern Datum dblink_close(PG_FUNCTION_ARGS);
  extern Datum dblink_fetch(PG_FUNCTION_ARGS);
  extern Datum dblink_record(PG_FUNCTION_ARGS);
+ extern Datum dblink_send_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_result(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_connections(PG_FUNCTION_ARGS);
+ extern Datum dblink_is_busy(PG_FUNCTION_ARGS);
+ extern Datum dblink_cancel_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_error_message(PG_FUNCTION_ARGS);
  extern Datum dblink_exec(PG_FUNCTION_ARGS);
  extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
  extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
Index: dblink.sql.in
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.sql.in,v
retrieving revision 1.10
diff -c -r1.10 dblink.sql.in
*** dblink.sql.in    27 Feb 2006 16:09:48 -0000    1.10
--- dblink.sql.in    27 Aug 2006 23:21:15 -0000
***************
*** 144,146 ****
--- 144,181 ----
  RETURNS text
  AS 'MODULE_PATHNAME','dblink_current_query'
  LANGUAGE C;
+
+ CREATE OR REPLACE FUNCTION dblink_send_query(text, text)
+ RETURNS int4
+ AS 'MODULE_PATHNAME', 'dblink_send_query'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_is_busy(text)
+ RETURNS int4
+ AS 'MODULE_PATHNAME', 'dblink_is_busy'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_result(text)
+ RETURNS SETOF record
+ AS 'MODULE_PATHNAME', 'dblink_get_result'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_result(text, bool)
+ RETURNS SETOF record
+ AS 'MODULE_PATHNAME', 'dblink_get_result'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_connections()
+ RETURNS text[]
+ AS 'MODULE_PATHNAME', 'dblink_get_connections'
+ LANGUAGE C;
+
+ CREATE OR REPLACE FUNCTION dblink_cancel_query(text)
+ RETURNS text
+ AS 'MODULE_PATHNAME', 'dblink_cancel_query'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_error_message(text)
+ RETURNS text
+ AS 'MODULE_PATHNAME', 'dblink_error_message'
+ LANGUAGE C STRICT;
Index: doc/misc
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/doc/misc,v
retrieving revision 1.3
diff -c -r1.3 misc
*** doc/misc    11 Mar 2006 04:38:29 -0000    1.3
--- doc/misc    27 Aug 2006 23:21:15 -0000
***************
*** 139,141 ****
--- 139,232 ----
   UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
  (1 row)

+
+ ==================================================================
+ Name
+
+ dblink_get_connections -- returns a text array of all active named
+                           dblink connections
+
+ Synopsis
+
+ dblink_get_connections() RETURNS text[]
+
+ Inputs
+
+   none
+
+ Outputs
+
+   Returns text array of all active named dblink connections
+
+ Example usage
+
+   SELECT dblink_get_connections();
+
+ ==================================================================
+ Name
+
+ dblink_is_busy -- checks to see if named connection is busy
+                   with an async query
+
+ Synopsis
+
+ dblink_is_busy(text connname) RETURNS int
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+ Outputs
+
+   Returns 1 if connection is busy, 0 if it is not busy.
+   If this function returns 0, it is guaranteed that dblink_get_result
+   will not block.
+
+ Example usage
+
+   SELECT dblink_is_busy('dtest1');
+
+ ==================================================================
+ Name
+
+ dblink_cancel_query -- cancels any active query on the named connection
+
+ Synopsis
+
+ dblink_cancel_query(text connname) RETURNS text
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+ Outputs
+
+   Returns "OK" on success, or an error message on failure.
+
+ Example usage
+
+   SELECT dblink_cancel_query('dtest1');
+
+ ==================================================================
+ Name
+
+ dblink_error_message -- gets last error message on the named connection
+
+ Synopsis
+
+ dblink_error_message(text connname) RETURNS text
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+ Outputs
+
+   Returns last error message.
+
+ Example usage
+
+   SELECT dblink_error_message('dtest1');
Index: doc/query
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/doc/query,v
retrieving revision 1.4
diff -c -r1.4 query
*** doc/query    21 Jun 2005 04:02:28 -0000    1.4
--- doc/query    27 Aug 2006 23:21:15 -0000
***************
*** 118,120 ****
--- 118,242 ----

     select * from myremote_pg_proc where proname like 'bytea%';

+
+ ==================================================================
+ Name
+
+ dblink_send_query -- Sends an async query to a remote database
+
+ Synopsis
+
+ dblink_send_query(text connname, text sql)
+
+ Inputs
+
+   connname
+     The specific connection name to use.
+
+   sql
+
+     sql statement that you wish to execute on the remote host
+     e.g. "select * from pg_class"
+
+ Outputs
+
+   Returns int. A return value of 1 if the query was successfully dispatched,
+   0 otherwise. If 1, results must be fetched by dblink_get_result(connname).
+   A running query may be cancelled by dblink_cancel_query(connname).
+
+ Example usage
+
+   SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+   SELECT * from
+    dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+ ==================================================================
+ Name
+
+ dblink_get_result -- Gets an async query result
+
+ Synopsis
+
+ dblink_get_result(text connname [, bool fail_on_error])
+
+ Inputs
+
+   connname
+     The specific connection name to use. An asynchronous query must
+     have already been sent using dblink_send_query()
+
+   fail_on_error
+
+     If true (default when not present) then an ERROR thrown on the remote side
+     of the connection causes an ERROR to also be thrown locally. If false, the
+     remote ERROR is locally treated as a NOTICE, and no rows are returned.
+
+ Outputs
+
+   Returns setof record
+
+ Notes
+   Blocks until a result gets available.
+
+   This function *must* be called if dblink_send_query returned
+   a 1, even on cancelled queries - otherwise the connection
+   can't be used anymore. It must be called once for each query
+   sent, and one additional time to obtain an empty set result,
+   prior to using the connection again.
+
+ Example usage
+
+ contrib_regression=#   SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ contrib_regression=#   SELECT * from
+ contrib_regression-#    dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 |     f3
+ ----+----+------------
+   0 | a  | {a0,b0,c0}
+   1 | b  | {a1,b1,c1}
+   2 | c  | {a2,b2,c2}
+ (3 rows)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 | f3
+ ----+----+----
+ (0 rows)
+
+ contrib_regression=#   SELECT * from
+    dblink_send_query('dtest1', 'select * from foo where f1 < 3; select * from foo where f1 > 6') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 |     f3
+ ----+----+------------
+   0 | a  | {a0,b0,c0}
+   1 | b  | {a1,b1,c1}
+   2 | c  | {a2,b2,c2}
+ (3 rows)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 |      f3
+ ----+----+---------------
+   7 | h  | {a7,b7,c7}
+   8 | i  | {a8,b8,c8}
+   9 | j  | {a9,b9,c9}
+  10 | k  | {a10,b10,c10}
+ (4 rows)
+
+ contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+  f1 | f2 | f3
+ ----+----+----
+ (0 rows)
Index: expected/dblink.out
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.17
diff -c -r1.17 dblink.out
*** expected/dblink.out    21 Jun 2006 16:43:11 -0000    1.17
--- expected/dblink.out    27 Aug 2006 23:21:15 -0000
***************
*** 669,671 ****
--- 669,758 ----
  -- should get 'connection "myconn" not available' error
  SELECT dblink_disconnect('myconn');
  ERROR:  connection "myconn" not available
+ -- test asynchronous queries
+ SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ SELECT * from
+  dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ SELECT * from
+  dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ SELECT * from
+  dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+  t1
+ ----
+   1
+ (1 row)
+
+ CREATE TEMPORARY TABLE result AS
+ (SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ ORDER by f1;
+ SELECT dblink_get_connections();
+  dblink_get_connections
+ ------------------------
+  {dtest1,dtest2,dtest3}
+ (1 row)
+
+ SELECT dblink_disconnect('dtest1');
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
+ SELECT dblink_disconnect('dtest2');
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
+ SELECT dblink_disconnect('dtest3');
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
+ SELECT * from result;
+  f1 | f2 |      f3
+ ----+----+---------------
+   0 | a  | {a0,b0,c0}
+   1 | b  | {a1,b1,c1}
+   2 | c  | {a2,b2,c2}
+   3 | d  | {a3,b3,c3}
+   4 | e  | {a4,b4,c4}
+   5 | f  | {a5,b5,c5}
+   6 | g  | {a6,b6,c6}
+   7 | h  | {a7,b7,c7}
+   8 | i  | {a8,b8,c8}
+   9 | j  | {a9,b9,c9}
+  10 | k  | {a10,b10,c10}
+ (11 rows)
+
Index: sql/dblink.sql
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.15
diff -c -r1.15 dblink.sql
*** sql/dblink.sql    18 Oct 2005 02:55:49 -0000    1.15
--- sql/dblink.sql    27 Aug 2006 23:21:15 -0000
***************
*** 319,321 ****
--- 319,350 ----
  -- close the named persistent connection again
  -- should get 'connection "myconn" not available' error
  SELECT dblink_disconnect('myconn');
+
+ -- test asynchronous queries
+ SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+ SELECT * from
+  dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+ SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+ SELECT * from
+  dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+
+ SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+ SELECT * from
+  dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+
+ CREATE TEMPORARY TABLE result AS
+ (SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+ UNION
+ (SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ ORDER by f1;
+
+ SELECT dblink_get_connections();
+
+ SELECT dblink_disconnect('dtest1');
+ SELECT dblink_disconnect('dtest2');
+ SELECT dblink_disconnect('dtest3');
+ SELECT * from result;
+

pgsql-patches by date:

Previous
From: Sven Suursoho
Date:
Subject: Re: plpython improvements
Next
From: Michael Glaesemann
Date:
Subject: Interval month, week -> day