Re: dblink vs SQL/MED - Mailing list pgsql-hackers

From Joe Conway
Subject Re: dblink vs SQL/MED
Date
Msg-id 495FE743.1040907@joeconway.com
Whole thread Raw
In response to Re: dblink vs SQL/MED  (Joe Conway <mail@joeconway.com>)
Responses Re: dblink vs SQL/MED  (Martin Pihlak <martin.pihlak@gmail.com>)
List pgsql-hackers
Joe Conway wrote:
> Peter Eisentraut wrote:
>> Martin had sent some code for that with his original patch series.  I
>> or someone can review that next.
>
> Here is what I would propose (still needs documentation and regression
> test changes, but wanted feedback first). I think it is better to keep
> the changes to dblink very simple.

The attached now includes documentation and regression test changes. It
also includes the refactoring to pull dblink_send_query() out of
dblink_record_internal() and the fix for the bug reported by Oleksiy
Shchukin.

> After looking for an already existing dblink rconn, the passed connstr
> is checked to see if it matches a valid foreign data server prior to
> being used as a connstr. If so, a connstr is constructed from the
> foreign server and user mapping options (for current user). The
> resulting connstr is then treated exactly as if it were one that was
> passed directly to dblink.
>
> Two specific questions on this approach:
> 1. This implies that the exact same dblink_connstr_check() is performed
>    on a predefined foreign server and user mapping as a raw connstr --
>    is this desireable? I'm not entirely clear on the intended purpose
>    and use of foreign data wrappers yet.

On the one hand, why be any less stringent on an fdw server than any
other connect string? But on the other hand, the fdw server definition
has supposedly been vetted by a superuser. Any thoughts of this?

> 2. It seems like get_connect_string() is generically useful to any
>    client of postgresql_fdw.c -- should it go there instead of dblink?

I'm pretty sure get_connect_string() should be moved to postgresql_fdw.c
-- objections?

Thanks,

Joe
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.77
diff -c -r1.77 dblink.c
*** contrib/dblink/dblink.c    1 Jan 2009 17:23:31 -0000    1.77
--- contrib/dblink/dblink.c    3 Jan 2009 21:25:34 -0000
***************
*** 46,51 ****
--- 46,52 ----
  #include "catalog/pg_type.h"
  #include "executor/executor.h"
  #include "executor/spi.h"
+ #include "foreign/foreign.h"
  #include "lib/stringinfo.h"
  #include "miscadmin.h"
  #include "nodes/execnodes.h"
***************
*** 77,83 ****
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
--- 78,84 ----
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 96,101 ****
--- 97,103 ----
  static void dblink_connstr_check(const char *connstr);
  static void dblink_security_check(PGconn *conn, remoteConn *rconn);
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
+ static char *get_connect_string(const char *servername);

  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 165,171 ****
              } \
              else \
              { \
!                 connstr = conname_or_str; \
                  dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
--- 167,175 ----
              } \
              else \
              { \
!                 connstr = get_connect_string(conname_or_str); \
!                 if (connstr == NULL) \
!                     connstr = conname_or_str; \
                  dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
***************
*** 210,215 ****
--- 214,220 ----
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
+     char       *conname_or_str = NULL;
      char       *connstr = NULL;
      char       *connname = NULL;
      char       *msg;
***************
*** 220,235 ****

      if (PG_NARGS() == 2)
      {
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

      /* check password in connection string if not superuser */
      dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);
--- 225,245 ----

      if (PG_NARGS() == 2)
      {
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

+     /* first check for valid foreign data server */
+     connstr = get_connect_string(conname_or_str);
+     if (connstr == NULL)
+         connstr = conname_or_str;
+
      /* check password in connection string if not superuser */
      dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);
***************
*** 689,713 ****
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, false);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
--- 699,745 ----
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     PGconn       *conn = NULL;
!     char       *connstr = NULL;
!     char       *sql = NULL;
!     remoteConn *rconn = NULL;
!     char       *msg;
!     bool        freeconn = false;
!     int            retval;
!
!     if (PG_NARGS() == 2)
!     {
!         DBLINK_GET_CONN;
!         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
!     }
!     else
!         /* shouldn't happen */
!         elog(ERROR, "wrong number of arguments");
!
!     /* async query send */
!     retval = PQsendQuery(conn, sql);
!     if (retval != 1)
!         elog(NOTICE, "%s", PQerrorMessage(conn));
!
!     PG_RETURN_INT32(retval);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
***************
*** 775,788 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else if (is_async && do_get)
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(2);
              }
              else if (PG_NARGS() == 1)
              {
--- 807,820 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else /* is_async */
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(1);
              }
              else if (PG_NARGS() == 1)
              {
***************
*** 793,929 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
-         else
-         {
-             /* send async query */
-             if (PG_NARGS() == 2)
-             {
-                 DBLINK_GET_CONN;
-                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-             }
-             else
-                 /* shouldn't happen */
-                 elog(ERROR, "wrong number of arguments");
-         }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         if (!is_async || (is_async && do_get))
          {
!             /* synchronous query, or async result retrieval */
!             if (!is_async)
!                 res = PQexec(conn, sql);
!             else
!             {
!                 res = PQgetResult(conn);
!                 /* NULL means we're all done with the async results */
!                 if (!res)
!                 {
!                     MemoryContextSwitchTo(oldcontext);
!                     SRF_RETURN_DONE(funcctx);
!                 }
!             }
!
!             if (!res ||
!                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
!                  PQresultStatus(res) != PGRES_TUPLES_OK))
              {
-                 dblink_res_error(conname, res, "could not execute query", fail);
-                 if (freeconn)
-                     PQfinish(conn);
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }

!             if (PQresultStatus(res) == PGRES_COMMAND_OK)
!             {
!                 is_sql_cmd = true;
!
!                 /* need a tuple descriptor representing one TEXT column */
!                 tupdesc = CreateTemplateTupleDesc(1, false);
!                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                    TEXTOID, -1, 0);
!
!                 /*
!                  * and save a copy of the command status string to return as
!                  * our result tuple
!                  */
!                 sql_cmd_status = PQcmdStatus(res);
!                 funcctx->max_calls = 1;
!             }
!             else
!                 funcctx->max_calls = PQntuples(res);
!
!             /* got results, keep track of them */
!             funcctx->user_fctx = res;
!
!             /* if needed, close the connection to the database and cleanup */
              if (freeconn)
                  PQfinish(conn);

!             if (!is_sql_cmd)
!             {
!                 /* get a tuple descriptor for our result type */
!                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
!                 {
!                     case TYPEFUNC_COMPOSITE:
!                         /* success */
!                         break;
!                     case TYPEFUNC_RECORD:
!                         /* failed to determine actual type of RECORD */
!                         ereport(ERROR,
!                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                         errmsg("function returning record called in context "
!                                "that cannot accept type record")));
!                         break;
!                     default:
!                         /* result type isn't composite */
!                         elog(ERROR, "return type must be a row type");
!                         break;
!                 }

!                 /* make sure we have a persistent copy of the tupdesc */
!                 tupdesc = CreateTupleDescCopy(tupdesc);
!             }

              /*
!              * check result and tuple descriptor have the same number of
!              * columns
               */
!             if (PQnfields(res) != tupdesc->natts)
!                 ereport(ERROR,
!                         (errcode(ERRCODE_DATATYPE_MISMATCH),
!                          errmsg("remote query result rowtype does not match "
!                                 "the specified FROM clause rowtype")));

!             /* fast track when no results */
!             if (funcctx->max_calls < 1)
              {
!                 if (res)
!                     PQclear(res);
!                 MemoryContextSwitchTo(oldcontext);
!                 SRF_RETURN_DONE(funcctx);
              }

!             /* store needed metadata for subsequent calls */
!             attinmeta = TupleDescGetAttInMetadata(tupdesc);
!             funcctx->attinmeta = attinmeta;
!
!             MemoryContextSwitchTo(oldcontext);
          }
!         else
          {
!             /* async query send */
              MemoryContextSwitchTo(oldcontext);
!             PG_RETURN_INT32(PQsendQuery(conn, sql));
          }
-     }

!     if (is_async && !do_get)
!     {
!         /* async query send -- should not happen */
!         elog(ERROR, "async query send called more than once");

      }

--- 825,934 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         /* synchronous query, or async result retrieval */
!         if (!is_async)
!             res = PQexec(conn, sql);
!         else
          {
!             res = PQgetResult(conn);
!             /* NULL means we're all done with the async results */
!             if (!res)
              {
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }
+         }

!         if (!res ||
!             (PQresultStatus(res) != PGRES_COMMAND_OK &&
!              PQresultStatus(res) != PGRES_TUPLES_OK))
!         {
!             dblink_res_error(conname, res, "could not execute query", fail);
              if (freeconn)
                  PQfinish(conn);
+             MemoryContextSwitchTo(oldcontext);
+             SRF_RETURN_DONE(funcctx);
+         }

!         if (PQresultStatus(res) == PGRES_COMMAND_OK)
!         {
!             is_sql_cmd = true;

!             /* need a tuple descriptor representing one TEXT column */
!             tupdesc = CreateTemplateTupleDesc(1, false);
!             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                TEXTOID, -1, 0);

              /*
!              * and save a copy of the command status string to return as
!              * our result tuple
               */
!             sql_cmd_status = PQcmdStatus(res);
!             funcctx->max_calls = 1;
!         }
!         else
!             funcctx->max_calls = PQntuples(res);

!         /* got results, keep track of them */
!         funcctx->user_fctx = res;
!
!         /* if needed, close the connection to the database and cleanup */
!         if (freeconn)
!             PQfinish(conn);
!
!         if (!is_sql_cmd)
!         {
!             /* get a tuple descriptor for our result type */
!             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
              {
!                 case TYPEFUNC_COMPOSITE:
!                     /* success */
!                     break;
!                 case TYPEFUNC_RECORD:
!                     /* failed to determine actual type of RECORD */
!                     ereport(ERROR,
!                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                     errmsg("function returning record called in context "
!                            "that cannot accept type record")));
!                     break;
!                 default:
!                     /* result type isn't composite */
!                     elog(ERROR, "return type must be a row type");
!                     break;
              }

!             /* make sure we have a persistent copy of the tupdesc */
!             tupdesc = CreateTupleDescCopy(tupdesc);
          }
!
!         /*
!          * check result and tuple descriptor have the same number of
!          * columns
!          */
!         if (PQnfields(res) != tupdesc->natts)
!             ereport(ERROR,
!                     (errcode(ERRCODE_DATATYPE_MISMATCH),
!                      errmsg("remote query result rowtype does not match "
!                             "the specified FROM clause rowtype")));
!
!         /* fast track when no results */
!         if (funcctx->max_calls < 1)
          {
!             if (res)
!                 PQclear(res);
              MemoryContextSwitchTo(oldcontext);
!             SRF_RETURN_DONE(funcctx);
          }

!         /* store needed metadata for subsequent calls */
!         attinmeta = TupleDescGetAttInMetadata(tupdesc);
!         funcctx->attinmeta = attinmeta;
!
!         MemoryContextSwitchTo(oldcontext);

      }

***************
*** 2358,2360 ****
--- 2363,2405 ----
           errcontext("Error occurred on dblink connection named \"%s\": %s.",
                      dblink_context_conname, dblink_context_msg)));
  }
+
+ /*
+  * Obtain connection string for a foreign server
+  */
+ static char *
+ get_connect_string(const char *servername)
+ {
+     ForeignServer       *foreign_server;
+     UserMapping           *user_mapping;
+     ListCell           *cell;
+     StringInfo            buf = makeStringInfo();
+
+     /* first gather the server connstr options */
+     foreign_server = GetForeignServerByName(servername, true);
+
+     if (foreign_server)
+     {
+         foreach (cell, foreign_server->options)
+         {
+
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         /* next get the user connstr options */
+         user_mapping = GetUserMapping(GetUserId(), foreign_server->serverid);
+         foreach (cell, user_mapping->options)
+         {
+
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         return pstrdup(buf->data);
+     }
+     else
+         return NULL;
+ }
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.24
diff -c -r1.24 dblink.out
*** contrib/dblink/expected/dblink.out    3 Jul 2008 03:56:57 -0000    1.24
--- contrib/dblink/expected/dblink.out    3 Jan 2009 02:56:06 -0000
***************
*** 784,786 ****
--- 784,819 ----
   OK
  (1 row)

+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+ CREATE FOREIGN DATA WRAPPER postgresql LIBRARY 'postgresql_fdw' LANGUAGE C;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+  dblink_connect_u
+ ------------------
+  OK
+ (1 row)
+
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+  a  | b |       c
+ ----+---+---------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+  10 | k | {a10,b10,c10}
+ (11 rows)
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.20
diff -c -r1.20 dblink.sql
*** contrib/dblink/sql/dblink.sql    6 Apr 2008 16:54:48 -0000    1.20
--- contrib/dblink/sql/dblink.sql    3 Jan 2009 02:54:29 -0000
***************
*** 364,366 ****
--- 364,383 ----
  SELECT dblink_cancel_query('dtest1');
  SELECT dblink_error_message('dtest1');
  SELECT dblink_disconnect('dtest1');
+
+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+
+ CREATE FOREIGN DATA WRAPPER postgresql LIBRARY 'postgresql_fdw' LANGUAGE C;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
Index: doc/src/sgml/dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.6
diff -c -r1.6 dblink.sgml
*** doc/src/sgml/dblink.sgml    12 Nov 2008 15:52:44 -0000    1.6
--- doc/src/sgml/dblink.sgml    3 Jan 2009 06:25:10 -0000
***************
*** 42,47 ****
--- 42,59 ----
      only one unnamed connection is permitted at a time.  The connection
      will persist until closed or until the database session is ended.
     </para>
+
+    <para>
+     The connection string may also be the name of an existing foreign
+     server that utilizes the postgresql_fdw foreign data wrapper library.
+     See the example below, as well as the following:
+     <simplelist type="inline">
+      <member><xref linkend="sql-createforeigndatawrapper" endterm="sql-createforeigndatawrapper-title"></member>
+      <member><xref linkend="sql-createserver" endterm="sql-createserver-title"></member>
+      <member><xref linkend="sql-createusermapping" endterm="sql-createusermapping-title"></member>
+     </simplelist>
+    </para>
+
    </refsect1>

    <refsect1>
***************
*** 113,118 ****
--- 125,168 ----
   ----------------
    OK
   (1 row)
+
+  -- FOREIGN DATA WRAPPER functionality
+  -- Note: local connection must require authentication for this to work properly
+  CREATE USER dblink_regression_test WITH PASSWORD 'secret';
+  -- The next two statements are unneeded in the contrib_regression database as they
+  -- have already been performed. They will produce ERROR messages.
+  CREATE FOREIGN DATA WRAPPER postgresql LIBRARY 'postgresql_fdw' LANGUAGE C;
+  CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+  CREATE USER MAPPING FOR dblink_regression_test SERVER fdtest OPTIONS (user 'dblink_regression_test', password
'secret');
+  GRANT SELECT ON TABLE foo TO dblink_regression_test;
+  \set ORIGINAL_USER :USER
+  \c - dblink_regression_test
+  SELECT dblink_connect('myconn', 'fdtest');
+   dblink_connect
+  ----------------
+   OK
+  (1 row)
+
+  SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+   a  | b |       c
+  ----+---+---------------
+    0 | a | {a0,b0,c0}
+    1 | b | {a1,b1,c1}
+    2 | c | {a2,b2,c2}
+    3 | d | {a3,b3,c3}
+    4 | e | {a4,b4,c4}
+    5 | f | {a5,b5,c5}
+    6 | g | {a6,b6,c6}
+    7 | h | {a7,b7,c7}
+    8 | i | {a8,b8,c8}
+    9 | j | {a9,b9,c9}
+   10 | k | {a10,b10,c10}
+  (11 rows)
+
+  \c - :ORIGINAL_USER
+  REVOKE SELECT ON TABLE foo FROM dblink_regression_test;
+  DROP USER MAPPING FOR dblink_regression_test SERVER fdtest;
+  DROP USER dblink_regression_test;
     </programlisting>
    </refsect1>
   </refentry>

pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: Re: Frames vs partitions: is SQL2008 completely insane?
Next
From: Peter Eisentraut
Date:
Subject: Re: Significantly larger toast tables on 8.4?