dblink patches for comment - Mailing list pgsql-hackers

From Joe Conway
Subject dblink patches for comment
Date
Msg-id 4A1C9C8C.6030405@joeconway.com
Whole thread Raw
Responses Re: dblink patches for comment  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-hackers
The attached addresses items#2 and 3 as listed by Bruce here:
   http://momjian.us/cgi-bin/pgsql/joe

I think it is consistent with the discussions we had a PGCon last week.
Any objections to me committing this for 8.4?

On a side note, should I try to address items #1 & #4 for 8.4 as well?
Perhaps #4 yes since it is arguably a bug fix, but no to #1?

Joe

Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.77
diff -c -r1.77 dblink.c
*** dblink.c    1 Jan 2009 17:23:31 -0000    1.77
--- dblink.c    25 May 2009 22:57:22 -0000
***************
*** 46,51 ****
--- 46,52 ----
  #include "catalog/pg_type.h"
  #include "executor/executor.h"
  #include "executor/spi.h"
+ #include "foreign/foreign.h"
  #include "lib/stringinfo.h"
  #include "miscadmin.h"
  #include "nodes/execnodes.h"
***************
*** 77,83 ****
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
--- 78,84 ----
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 93,101 ****
  static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
  static Oid    get_relid_from_relname(text *relname_text);
  static char *generate_relation_name(Oid relid);
! static void dblink_connstr_check(const char *connstr);
! static void dblink_security_check(PGconn *conn, remoteConn *rconn);
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);

  /* Global */
  static remoteConn *pconn = NULL;
--- 94,103 ----
  static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
  static Oid    get_relid_from_relname(text *relname_text);
  static char *generate_relation_name(Oid relid);
! static void dblink_connstr_check(const char *connstr, bool is_fdw);
! static void dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw);
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
+ static char *get_connect_string(const char *servername);

  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 165,172 ****
              } \
              else \
              { \
!                 connstr = conname_or_str; \
!                 dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
                  { \
--- 167,180 ----
              } \
              else \
              { \
!                 bool is_fdw = true; \
!                 connstr = get_connect_string(conname_or_str); \
!                 if (connstr == NULL) \
!                 { \
!                     is_fdw = false; \
!                     connstr = conname_or_str; \
!                 } \
!                 dblink_connstr_check(connstr, is_fdw); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
                  { \
***************
*** 177,183 ****
                               errmsg("could not establish connection"), \
                               errdetail("%s", msg))); \
                  } \
!                 dblink_security_check(conn, rconn); \
                  freeconn = true; \
              } \
      } while (0)
--- 185,191 ----
                               errmsg("could not establish connection"), \
                               errdetail("%s", msg))); \
                  } \
!                 dblink_security_check(conn, rconn, is_fdw); \
                  freeconn = true; \
              } \
      } while (0)
***************
*** 210,237 ****
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
      char       *connstr = NULL;
      char       *connname = NULL;
      char       *msg;
      PGconn       *conn = NULL;
      remoteConn *rconn = NULL;

      DBLINK_INIT;

      if (PG_NARGS() == 2)
      {
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

      /* check password in connection string if not superuser */
!     dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);

      if (PQstatus(conn) == CONNECTION_BAD)
--- 218,255 ----
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
+     char       *conname_or_str = NULL;
      char       *connstr = NULL;
      char       *connname = NULL;
      char       *msg;
      PGconn       *conn = NULL;
      remoteConn *rconn = NULL;
+     bool        is_fdw = true;

      DBLINK_INIT;

      if (PG_NARGS() == 2)
      {
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

+     /* first check for valid foreign data server */
+     connstr = get_connect_string(conname_or_str);
+     if (connstr == NULL)
+     {
+         is_fdw = false;
+         connstr = conname_or_str;
+     }
+
      /* check password in connection string if not superuser */
!     dblink_connstr_check(connstr, is_fdw);
      conn = PQconnectdb(connstr);

      if (PQstatus(conn) == CONNECTION_BAD)
***************
*** 248,254 ****
      }

      /* check password actually used if not superuser */
!     dblink_security_check(conn, rconn);

      if (connname)
      {
--- 266,272 ----
      }

      /* check password actually used if not superuser */
!     dblink_security_check(conn, rconn, is_fdw);

      if (connname)
      {
***************
*** 689,713 ****
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, false);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
--- 707,753 ----
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     PGconn       *conn = NULL;
!     char       *connstr = NULL;
!     char       *sql = NULL;
!     remoteConn *rconn = NULL;
!     char       *msg;
!     bool        freeconn = false;
!     int            retval;
!
!     if (PG_NARGS() == 2)
!     {
!         DBLINK_GET_CONN;
!         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
!     }
!     else
!         /* shouldn't happen */
!         elog(ERROR, "wrong number of arguments");
!
!     /* async query send */
!     retval = PQsendQuery(conn, sql);
!     if (retval != 1)
!         elog(NOTICE, "%s", PQerrorMessage(conn));
!
!     PG_RETURN_INT32(retval);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
***************
*** 775,788 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else if (is_async && do_get)
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(2);
              }
              else if (PG_NARGS() == 1)
              {
--- 815,828 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else /* is_async */
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(1);
              }
              else if (PG_NARGS() == 1)
              {
***************
*** 793,929 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
-         else
-         {
-             /* send async query */
-             if (PG_NARGS() == 2)
-             {
-                 DBLINK_GET_CONN;
-                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-             }
-             else
-                 /* shouldn't happen */
-                 elog(ERROR, "wrong number of arguments");
-         }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         if (!is_async || (is_async && do_get))
          {
!             /* synchronous query, or async result retrieval */
!             if (!is_async)
!                 res = PQexec(conn, sql);
!             else
              {
-                 res = PQgetResult(conn);
-                 /* NULL means we're all done with the async results */
-                 if (!res)
-                 {
-                     MemoryContextSwitchTo(oldcontext);
-                     SRF_RETURN_DONE(funcctx);
-                 }
-             }
-
-             if (!res ||
-                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
-                  PQresultStatus(res) != PGRES_TUPLES_OK))
-             {
-                 dblink_res_error(conname, res, "could not execute query", fail);
-                 if (freeconn)
-                     PQfinish(conn);
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }

!             if (PQresultStatus(res) == PGRES_COMMAND_OK)
!             {
!                 is_sql_cmd = true;
!
!                 /* need a tuple descriptor representing one TEXT column */
!                 tupdesc = CreateTemplateTupleDesc(1, false);
!                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                    TEXTOID, -1, 0);
!
!                 /*
!                  * and save a copy of the command status string to return as
!                  * our result tuple
!                  */
!                 sql_cmd_status = PQcmdStatus(res);
!                 funcctx->max_calls = 1;
!             }
!             else
!                 funcctx->max_calls = PQntuples(res);
!
!             /* got results, keep track of them */
!             funcctx->user_fctx = res;
!
!             /* if needed, close the connection to the database and cleanup */
              if (freeconn)
                  PQfinish(conn);

!             if (!is_sql_cmd)
!             {
!                 /* get a tuple descriptor for our result type */
!                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
!                 {
!                     case TYPEFUNC_COMPOSITE:
!                         /* success */
!                         break;
!                     case TYPEFUNC_RECORD:
!                         /* failed to determine actual type of RECORD */
!                         ereport(ERROR,
!                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                         errmsg("function returning record called in context "
!                                "that cannot accept type record")));
!                         break;
!                     default:
!                         /* result type isn't composite */
!                         elog(ERROR, "return type must be a row type");
!                         break;
!                 }

!                 /* make sure we have a persistent copy of the tupdesc */
!                 tupdesc = CreateTupleDescCopy(tupdesc);
!             }

              /*
!              * check result and tuple descriptor have the same number of
!              * columns
               */
!             if (PQnfields(res) != tupdesc->natts)
!                 ereport(ERROR,
!                         (errcode(ERRCODE_DATATYPE_MISMATCH),
!                          errmsg("remote query result rowtype does not match "
!                                 "the specified FROM clause rowtype")));

!             /* fast track when no results */
!             if (funcctx->max_calls < 1)
              {
!                 if (res)
!                     PQclear(res);
!                 MemoryContextSwitchTo(oldcontext);
!                 SRF_RETURN_DONE(funcctx);
              }

!             /* store needed metadata for subsequent calls */
!             attinmeta = TupleDescGetAttInMetadata(tupdesc);
!             funcctx->attinmeta = attinmeta;
!
!             MemoryContextSwitchTo(oldcontext);
          }
!         else
          {
!             /* async query send */
              MemoryContextSwitchTo(oldcontext);
!             PG_RETURN_INT32(PQsendQuery(conn, sql));
          }
-     }

!     if (is_async && !do_get)
!     {
!         /* async query send -- should not happen */
!         elog(ERROR, "async query send called more than once");

      }

--- 833,942 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         /* synchronous query, or async result retrieval */
!         if (!is_async)
!             res = PQexec(conn, sql);
!         else
          {
!             res = PQgetResult(conn);
!             /* NULL means we're all done with the async results */
!             if (!res)
              {
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }
+         }

!         if (!res ||
!             (PQresultStatus(res) != PGRES_COMMAND_OK &&
!              PQresultStatus(res) != PGRES_TUPLES_OK))
!         {
!             dblink_res_error(conname, res, "could not execute query", fail);
              if (freeconn)
                  PQfinish(conn);
+             MemoryContextSwitchTo(oldcontext);
+             SRF_RETURN_DONE(funcctx);
+         }

!         if (PQresultStatus(res) == PGRES_COMMAND_OK)
!         {
!             is_sql_cmd = true;

!             /* need a tuple descriptor representing one TEXT column */
!             tupdesc = CreateTemplateTupleDesc(1, false);
!             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                TEXTOID, -1, 0);

              /*
!              * and save a copy of the command status string to return as
!              * our result tuple
               */
!             sql_cmd_status = PQcmdStatus(res);
!             funcctx->max_calls = 1;
!         }
!         else
!             funcctx->max_calls = PQntuples(res);

!         /* got results, keep track of them */
!         funcctx->user_fctx = res;
!
!         /* if needed, close the connection to the database and cleanup */
!         if (freeconn)
!             PQfinish(conn);
!
!         if (!is_sql_cmd)
!         {
!             /* get a tuple descriptor for our result type */
!             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
              {
!                 case TYPEFUNC_COMPOSITE:
!                     /* success */
!                     break;
!                 case TYPEFUNC_RECORD:
!                     /* failed to determine actual type of RECORD */
!                     ereport(ERROR,
!                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                     errmsg("function returning record called in context "
!                            "that cannot accept type record")));
!                     break;
!                 default:
!                     /* result type isn't composite */
!                     elog(ERROR, "return type must be a row type");
!                     break;
              }

!             /* make sure we have a persistent copy of the tupdesc */
!             tupdesc = CreateTupleDescCopy(tupdesc);
          }
!
!         /*
!          * check result and tuple descriptor have the same number of
!          * columns
!          */
!         if (PQnfields(res) != tupdesc->natts)
!             ereport(ERROR,
!                     (errcode(ERRCODE_DATATYPE_MISMATCH),
!                      errmsg("remote query result rowtype does not match "
!                             "the specified FROM clause rowtype")));
!
!         /* fast track when no results */
!         if (funcctx->max_calls < 1)
          {
!             if (res)
!                 PQclear(res);
              MemoryContextSwitchTo(oldcontext);
!             SRF_RETURN_DONE(funcctx);
          }

!         /* store needed metadata for subsequent calls */
!         attinmeta = TupleDescGetAttInMetadata(tupdesc);
!         funcctx->attinmeta = attinmeta;
!
!         MemoryContextSwitchTo(oldcontext);

      }

***************
*** 2249,2257 ****
  }

  static void
! dblink_security_check(PGconn *conn, remoteConn *rconn)
  {
!     if (!superuser())
      {
          if (!PQconnectionUsedPassword(conn))
          {
--- 2262,2270 ----
  }

  static void
! dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw)
  {
!     if (!superuser() && !is_fdw)
      {
          if (!PQconnectionUsedPassword(conn))
          {
***************
*** 2275,2283 ****
   * to be accessible to non-superusers.
   */
  static void
! dblink_connstr_check(const char *connstr)
  {
!     if (!superuser())
      {
          PQconninfoOption   *options;
          PQconninfoOption   *option;
--- 2288,2296 ----
   * to be accessible to non-superusers.
   */
  static void
! dblink_connstr_check(const char *connstr, bool is_fdw)
  {
!     if (!superuser() && !is_fdw)
      {
          PQconninfoOption   *options;
          PQconninfoOption   *option;
***************
*** 2358,2360 ****
--- 2371,2431 ----
           errcontext("Error occurred on dblink connection named \"%s\": %s.",
                      dblink_context_conname, dblink_context_msg)));
  }
+
+ /*
+  * Obtain connection string for a foreign server
+  */
+ static char *
+ get_connect_string(const char *servername)
+ {
+     ForeignServer       *foreign_server;
+     UserMapping           *user_mapping;
+     ListCell           *cell;
+     StringInfo            buf = makeStringInfo();
+     ForeignDataWrapper *fdw;
+     AclResult            aclresult;
+
+     /* first gather the server connstr options */
+     foreign_server = GetForeignServerByName(servername, true);
+
+     if (foreign_server)
+     {
+         Oid        serverid = foreign_server->serverid;
+         Oid        fdwid = foreign_server->fdwid;
+         Oid        userid = GetUserId();
+
+         user_mapping = GetUserMapping(userid, serverid);
+         fdw    = GetForeignDataWrapper(fdwid);
+
+         /* Check permissions, user must have usage on the server. */
+         aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
+         if (aclresult != ACLCHECK_OK)
+             aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
+
+         foreach (cell, fdw->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         foreach (cell, foreign_server->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         foreach (cell, user_mapping->options)
+         {
+
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         return pstrdup(buf->data);
+     }
+     else
+         return NULL;
+ }
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.24
diff -c -r1.24 dblink.out
*** expected/dblink.out    3 Jul 2008 03:56:57 -0000    1.24
--- expected/dblink.out    25 May 2009 23:14:03 -0000
***************
*** 784,786 ****
--- 784,819 ----
   OK
  (1 row)

+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+  dblink_connect_u
+ ------------------
+  OK
+ (1 row)
+
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+  a  | b |       c
+ ----+---+---------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+  10 | k | {a10,b10,c10}
+ (11 rows)
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.20
diff -c -r1.20 dblink.sql
*** sql/dblink.sql    6 Apr 2008 16:54:48 -0000    1.20
--- sql/dblink.sql    25 May 2009 23:13:49 -0000
***************
*** 364,366 ****
--- 364,383 ----
  SELECT dblink_cancel_query('dtest1');
  SELECT dblink_error_message('dtest1');
  SELECT dblink_disconnect('dtest1');
+
+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;

pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: [pgsql-www] commitfest management webapp
Next
From: Mark Wong
Date:
Subject: effects of posix_fadvise on WAL logs