Thread: dblink patches for comment

dblink patches for comment

From
Joe Conway
Date:
The attached addresses items#2 and 3 as listed by Bruce here:
   http://momjian.us/cgi-bin/pgsql/joe

I think it is consistent with the discussions we had a PGCon last week.
Any objections to me committing this for 8.4?

On a side note, should I try to address items #1 & #4 for 8.4 as well?
Perhaps #4 yes since it is arguably a bug fix, but no to #1?

Joe

Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.77
diff -c -r1.77 dblink.c
*** dblink.c    1 Jan 2009 17:23:31 -0000    1.77
--- dblink.c    25 May 2009 22:57:22 -0000
***************
*** 46,51 ****
--- 46,52 ----
  #include "catalog/pg_type.h"
  #include "executor/executor.h"
  #include "executor/spi.h"
+ #include "foreign/foreign.h"
  #include "lib/stringinfo.h"
  #include "miscadmin.h"
  #include "nodes/execnodes.h"
***************
*** 77,83 ****
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
--- 78,84 ----
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 93,101 ****
  static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
  static Oid    get_relid_from_relname(text *relname_text);
  static char *generate_relation_name(Oid relid);
! static void dblink_connstr_check(const char *connstr);
! static void dblink_security_check(PGconn *conn, remoteConn *rconn);
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);

  /* Global */
  static remoteConn *pconn = NULL;
--- 94,103 ----
  static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
  static Oid    get_relid_from_relname(text *relname_text);
  static char *generate_relation_name(Oid relid);
! static void dblink_connstr_check(const char *connstr, bool is_fdw);
! static void dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw);
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
+ static char *get_connect_string(const char *servername);

  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 165,172 ****
              } \
              else \
              { \
!                 connstr = conname_or_str; \
!                 dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
                  { \
--- 167,180 ----
              } \
              else \
              { \
!                 bool is_fdw = true; \
!                 connstr = get_connect_string(conname_or_str); \
!                 if (connstr == NULL) \
!                 { \
!                     is_fdw = false; \
!                     connstr = conname_or_str; \
!                 } \
!                 dblink_connstr_check(connstr, is_fdw); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
                  { \
***************
*** 177,183 ****
                               errmsg("could not establish connection"), \
                               errdetail("%s", msg))); \
                  } \
!                 dblink_security_check(conn, rconn); \
                  freeconn = true; \
              } \
      } while (0)
--- 185,191 ----
                               errmsg("could not establish connection"), \
                               errdetail("%s", msg))); \
                  } \
!                 dblink_security_check(conn, rconn, is_fdw); \
                  freeconn = true; \
              } \
      } while (0)
***************
*** 210,237 ****
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
      char       *connstr = NULL;
      char       *connname = NULL;
      char       *msg;
      PGconn       *conn = NULL;
      remoteConn *rconn = NULL;

      DBLINK_INIT;

      if (PG_NARGS() == 2)
      {
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

      /* check password in connection string if not superuser */
!     dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);

      if (PQstatus(conn) == CONNECTION_BAD)
--- 218,255 ----
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
+     char       *conname_or_str = NULL;
      char       *connstr = NULL;
      char       *connname = NULL;
      char       *msg;
      PGconn       *conn = NULL;
      remoteConn *rconn = NULL;
+     bool        is_fdw = true;

      DBLINK_INIT;

      if (PG_NARGS() == 2)
      {
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

+     /* first check for valid foreign data server */
+     connstr = get_connect_string(conname_or_str);
+     if (connstr == NULL)
+     {
+         is_fdw = false;
+         connstr = conname_or_str;
+     }
+
      /* check password in connection string if not superuser */
!     dblink_connstr_check(connstr, is_fdw);
      conn = PQconnectdb(connstr);

      if (PQstatus(conn) == CONNECTION_BAD)
***************
*** 248,254 ****
      }

      /* check password actually used if not superuser */
!     dblink_security_check(conn, rconn);

      if (connname)
      {
--- 266,272 ----
      }

      /* check password actually used if not superuser */
!     dblink_security_check(conn, rconn, is_fdw);

      if (connname)
      {
***************
*** 689,713 ****
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, false);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
--- 707,753 ----
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     PGconn       *conn = NULL;
!     char       *connstr = NULL;
!     char       *sql = NULL;
!     remoteConn *rconn = NULL;
!     char       *msg;
!     bool        freeconn = false;
!     int            retval;
!
!     if (PG_NARGS() == 2)
!     {
!         DBLINK_GET_CONN;
!         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
!     }
!     else
!         /* shouldn't happen */
!         elog(ERROR, "wrong number of arguments");
!
!     /* async query send */
!     retval = PQsendQuery(conn, sql);
!     if (retval != 1)
!         elog(NOTICE, "%s", PQerrorMessage(conn));
!
!     PG_RETURN_INT32(retval);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
***************
*** 775,788 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else if (is_async && do_get)
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(2);
              }
              else if (PG_NARGS() == 1)
              {
--- 815,828 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else /* is_async */
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(1);
              }
              else if (PG_NARGS() == 1)
              {
***************
*** 793,929 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
-         else
-         {
-             /* send async query */
-             if (PG_NARGS() == 2)
-             {
-                 DBLINK_GET_CONN;
-                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-             }
-             else
-                 /* shouldn't happen */
-                 elog(ERROR, "wrong number of arguments");
-         }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         if (!is_async || (is_async && do_get))
          {
!             /* synchronous query, or async result retrieval */
!             if (!is_async)
!                 res = PQexec(conn, sql);
!             else
              {
-                 res = PQgetResult(conn);
-                 /* NULL means we're all done with the async results */
-                 if (!res)
-                 {
-                     MemoryContextSwitchTo(oldcontext);
-                     SRF_RETURN_DONE(funcctx);
-                 }
-             }
-
-             if (!res ||
-                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
-                  PQresultStatus(res) != PGRES_TUPLES_OK))
-             {
-                 dblink_res_error(conname, res, "could not execute query", fail);
-                 if (freeconn)
-                     PQfinish(conn);
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }

!             if (PQresultStatus(res) == PGRES_COMMAND_OK)
!             {
!                 is_sql_cmd = true;
!
!                 /* need a tuple descriptor representing one TEXT column */
!                 tupdesc = CreateTemplateTupleDesc(1, false);
!                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                    TEXTOID, -1, 0);
!
!                 /*
!                  * and save a copy of the command status string to return as
!                  * our result tuple
!                  */
!                 sql_cmd_status = PQcmdStatus(res);
!                 funcctx->max_calls = 1;
!             }
!             else
!                 funcctx->max_calls = PQntuples(res);
!
!             /* got results, keep track of them */
!             funcctx->user_fctx = res;
!
!             /* if needed, close the connection to the database and cleanup */
              if (freeconn)
                  PQfinish(conn);

!             if (!is_sql_cmd)
!             {
!                 /* get a tuple descriptor for our result type */
!                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
!                 {
!                     case TYPEFUNC_COMPOSITE:
!                         /* success */
!                         break;
!                     case TYPEFUNC_RECORD:
!                         /* failed to determine actual type of RECORD */
!                         ereport(ERROR,
!                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                         errmsg("function returning record called in context "
!                                "that cannot accept type record")));
!                         break;
!                     default:
!                         /* result type isn't composite */
!                         elog(ERROR, "return type must be a row type");
!                         break;
!                 }

!                 /* make sure we have a persistent copy of the tupdesc */
!                 tupdesc = CreateTupleDescCopy(tupdesc);
!             }

              /*
!              * check result and tuple descriptor have the same number of
!              * columns
               */
!             if (PQnfields(res) != tupdesc->natts)
!                 ereport(ERROR,
!                         (errcode(ERRCODE_DATATYPE_MISMATCH),
!                          errmsg("remote query result rowtype does not match "
!                                 "the specified FROM clause rowtype")));

!             /* fast track when no results */
!             if (funcctx->max_calls < 1)
              {
!                 if (res)
!                     PQclear(res);
!                 MemoryContextSwitchTo(oldcontext);
!                 SRF_RETURN_DONE(funcctx);
              }

!             /* store needed metadata for subsequent calls */
!             attinmeta = TupleDescGetAttInMetadata(tupdesc);
!             funcctx->attinmeta = attinmeta;
!
!             MemoryContextSwitchTo(oldcontext);
          }
!         else
          {
!             /* async query send */
              MemoryContextSwitchTo(oldcontext);
!             PG_RETURN_INT32(PQsendQuery(conn, sql));
          }
-     }

!     if (is_async && !do_get)
!     {
!         /* async query send -- should not happen */
!         elog(ERROR, "async query send called more than once");

      }

--- 833,942 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         /* synchronous query, or async result retrieval */
!         if (!is_async)
!             res = PQexec(conn, sql);
!         else
          {
!             res = PQgetResult(conn);
!             /* NULL means we're all done with the async results */
!             if (!res)
              {
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }
+         }

!         if (!res ||
!             (PQresultStatus(res) != PGRES_COMMAND_OK &&
!              PQresultStatus(res) != PGRES_TUPLES_OK))
!         {
!             dblink_res_error(conname, res, "could not execute query", fail);
              if (freeconn)
                  PQfinish(conn);
+             MemoryContextSwitchTo(oldcontext);
+             SRF_RETURN_DONE(funcctx);
+         }

!         if (PQresultStatus(res) == PGRES_COMMAND_OK)
!         {
!             is_sql_cmd = true;

!             /* need a tuple descriptor representing one TEXT column */
!             tupdesc = CreateTemplateTupleDesc(1, false);
!             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                TEXTOID, -1, 0);

              /*
!              * and save a copy of the command status string to return as
!              * our result tuple
               */
!             sql_cmd_status = PQcmdStatus(res);
!             funcctx->max_calls = 1;
!         }
!         else
!             funcctx->max_calls = PQntuples(res);

!         /* got results, keep track of them */
!         funcctx->user_fctx = res;
!
!         /* if needed, close the connection to the database and cleanup */
!         if (freeconn)
!             PQfinish(conn);
!
!         if (!is_sql_cmd)
!         {
!             /* get a tuple descriptor for our result type */
!             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
              {
!                 case TYPEFUNC_COMPOSITE:
!                     /* success */
!                     break;
!                 case TYPEFUNC_RECORD:
!                     /* failed to determine actual type of RECORD */
!                     ereport(ERROR,
!                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                     errmsg("function returning record called in context "
!                            "that cannot accept type record")));
!                     break;
!                 default:
!                     /* result type isn't composite */
!                     elog(ERROR, "return type must be a row type");
!                     break;
              }

!             /* make sure we have a persistent copy of the tupdesc */
!             tupdesc = CreateTupleDescCopy(tupdesc);
          }
!
!         /*
!          * check result and tuple descriptor have the same number of
!          * columns
!          */
!         if (PQnfields(res) != tupdesc->natts)
!             ereport(ERROR,
!                     (errcode(ERRCODE_DATATYPE_MISMATCH),
!                      errmsg("remote query result rowtype does not match "
!                             "the specified FROM clause rowtype")));
!
!         /* fast track when no results */
!         if (funcctx->max_calls < 1)
          {
!             if (res)
!                 PQclear(res);
              MemoryContextSwitchTo(oldcontext);
!             SRF_RETURN_DONE(funcctx);
          }

!         /* store needed metadata for subsequent calls */
!         attinmeta = TupleDescGetAttInMetadata(tupdesc);
!         funcctx->attinmeta = attinmeta;
!
!         MemoryContextSwitchTo(oldcontext);

      }

***************
*** 2249,2257 ****
  }

  static void
! dblink_security_check(PGconn *conn, remoteConn *rconn)
  {
!     if (!superuser())
      {
          if (!PQconnectionUsedPassword(conn))
          {
--- 2262,2270 ----
  }

  static void
! dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw)
  {
!     if (!superuser() && !is_fdw)
      {
          if (!PQconnectionUsedPassword(conn))
          {
***************
*** 2275,2283 ****
   * to be accessible to non-superusers.
   */
  static void
! dblink_connstr_check(const char *connstr)
  {
!     if (!superuser())
      {
          PQconninfoOption   *options;
          PQconninfoOption   *option;
--- 2288,2296 ----
   * to be accessible to non-superusers.
   */
  static void
! dblink_connstr_check(const char *connstr, bool is_fdw)
  {
!     if (!superuser() && !is_fdw)
      {
          PQconninfoOption   *options;
          PQconninfoOption   *option;
***************
*** 2358,2360 ****
--- 2371,2431 ----
           errcontext("Error occurred on dblink connection named \"%s\": %s.",
                      dblink_context_conname, dblink_context_msg)));
  }
+
+ /*
+  * Obtain connection string for a foreign server
+  */
+ static char *
+ get_connect_string(const char *servername)
+ {
+     ForeignServer       *foreign_server;
+     UserMapping           *user_mapping;
+     ListCell           *cell;
+     StringInfo            buf = makeStringInfo();
+     ForeignDataWrapper *fdw;
+     AclResult            aclresult;
+
+     /* first gather the server connstr options */
+     foreign_server = GetForeignServerByName(servername, true);
+
+     if (foreign_server)
+     {
+         Oid        serverid = foreign_server->serverid;
+         Oid        fdwid = foreign_server->fdwid;
+         Oid        userid = GetUserId();
+
+         user_mapping = GetUserMapping(userid, serverid);
+         fdw    = GetForeignDataWrapper(fdwid);
+
+         /* Check permissions, user must have usage on the server. */
+         aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
+         if (aclresult != ACLCHECK_OK)
+             aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
+
+         foreach (cell, fdw->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         foreach (cell, foreign_server->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         foreach (cell, user_mapping->options)
+         {
+
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         return pstrdup(buf->data);
+     }
+     else
+         return NULL;
+ }
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.24
diff -c -r1.24 dblink.out
*** expected/dblink.out    3 Jul 2008 03:56:57 -0000    1.24
--- expected/dblink.out    25 May 2009 23:14:03 -0000
***************
*** 784,786 ****
--- 784,819 ----
   OK
  (1 row)

+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+  dblink_connect_u
+ ------------------
+  OK
+ (1 row)
+
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+  a  | b |       c
+ ----+---+---------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+  10 | k | {a10,b10,c10}
+ (11 rows)
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.20
diff -c -r1.20 dblink.sql
*** sql/dblink.sql    6 Apr 2008 16:54:48 -0000    1.20
--- sql/dblink.sql    25 May 2009 23:13:49 -0000
***************
*** 364,366 ****
--- 364,383 ----
  SELECT dblink_cancel_query('dtest1');
  SELECT dblink_error_message('dtest1');
  SELECT dblink_disconnect('dtest1');
+
+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;

Re: dblink patches for comment

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> The attached addresses items#2 and 3 as listed by Bruce here:
>    http://momjian.us/cgi-bin/pgsql/joe

> I think it is consistent with the discussions we had a PGCon last week. 
> Any objections to me committing this for 8.4?

It's hard to review it without any docs that say what it's supposed to do.
(And you'd need to patch the docs anyway, eh?)

> On a side note, should I try to address items #1 & #4 for 8.4 as well? 
> Perhaps #4 yes since it is arguably a bug fix, but no to #1?

Yeah, my feeling too.  #1 is a new feature that was submitted too late
for 8.4.  I wouldn't have argued if you'd committed it anyway during
the commitfest, but it's definitely too late now.  But #4 seems like
a bugfix.
        regards, tom lane


Re: dblink patches for comment

From
Joe Conway
Date:
Tom Lane wrote:
> It's hard to review it without any docs that say what it's supposed to do.
> (And you'd need to patch the docs anyway, eh?)

Fair enough :-)

Probably better if I break this up in logical chunks too. This patch
only addresses the refactoring you requested here:
http://archives.postgresql.org/message-id/28719.1230996378@sss.pgh.pa.us

I'll follow up later today with a SQL/MED only patch which includes docs.

Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.77
diff -c -r1.77 dblink.c
*** dblink.c    1 Jan 2009 17:23:31 -0000    1.77
--- dblink.c    25 May 2009 22:57:22 -0000
***************
*** 77,83 ****
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
--- 78,84 ----
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 689,713 ****
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, false);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
--- 707,753 ----
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     PGconn       *conn = NULL;
!     char       *connstr = NULL;
!     char       *sql = NULL;
!     remoteConn *rconn = NULL;
!     char       *msg;
!     bool        freeconn = false;
!     int            retval;
!
!     if (PG_NARGS() == 2)
!     {
!         DBLINK_GET_CONN;
!         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
!     }
!     else
!         /* shouldn't happen */
!         elog(ERROR, "wrong number of arguments");
!
!     /* async query send */
!     retval = PQsendQuery(conn, sql);
!     if (retval != 1)
!         elog(NOTICE, "%s", PQerrorMessage(conn));
!
!     PG_RETURN_INT32(retval);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
***************
*** 775,788 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else if (is_async && do_get)
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(2);
              }
              else if (PG_NARGS() == 1)
              {
--- 815,828 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else /* is_async */
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(1);
              }
              else if (PG_NARGS() == 1)
              {
***************
*** 793,929 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
-         else
-         {
-             /* send async query */
-             if (PG_NARGS() == 2)
-             {
-                 DBLINK_GET_CONN;
-                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-             }
-             else
-                 /* shouldn't happen */
-                 elog(ERROR, "wrong number of arguments");
-         }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         if (!is_async || (is_async && do_get))
          {
!             /* synchronous query, or async result retrieval */
!             if (!is_async)
!                 res = PQexec(conn, sql);
!             else
              {
-                 res = PQgetResult(conn);
-                 /* NULL means we're all done with the async results */
-                 if (!res)
-                 {
-                     MemoryContextSwitchTo(oldcontext);
-                     SRF_RETURN_DONE(funcctx);
-                 }
-             }
-
-             if (!res ||
-                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
-                  PQresultStatus(res) != PGRES_TUPLES_OK))
-             {
-                 dblink_res_error(conname, res, "could not execute query", fail);
-                 if (freeconn)
-                     PQfinish(conn);
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }

!             if (PQresultStatus(res) == PGRES_COMMAND_OK)
!             {
!                 is_sql_cmd = true;
!
!                 /* need a tuple descriptor representing one TEXT column */
!                 tupdesc = CreateTemplateTupleDesc(1, false);
!                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                    TEXTOID, -1, 0);
!
!                 /*
!                  * and save a copy of the command status string to return as
!                  * our result tuple
!                  */
!                 sql_cmd_status = PQcmdStatus(res);
!                 funcctx->max_calls = 1;
!             }
!             else
!                 funcctx->max_calls = PQntuples(res);
!
!             /* got results, keep track of them */
!             funcctx->user_fctx = res;
!
!             /* if needed, close the connection to the database and cleanup */
              if (freeconn)
                  PQfinish(conn);

!             if (!is_sql_cmd)
!             {
!                 /* get a tuple descriptor for our result type */
!                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
!                 {
!                     case TYPEFUNC_COMPOSITE:
!                         /* success */
!                         break;
!                     case TYPEFUNC_RECORD:
!                         /* failed to determine actual type of RECORD */
!                         ereport(ERROR,
!                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                         errmsg("function returning record called in context "
!                                "that cannot accept type record")));
!                         break;
!                     default:
!                         /* result type isn't composite */
!                         elog(ERROR, "return type must be a row type");
!                         break;
!                 }

!                 /* make sure we have a persistent copy of the tupdesc */
!                 tupdesc = CreateTupleDescCopy(tupdesc);
!             }

              /*
!              * check result and tuple descriptor have the same number of
!              * columns
               */
!             if (PQnfields(res) != tupdesc->natts)
!                 ereport(ERROR,
!                         (errcode(ERRCODE_DATATYPE_MISMATCH),
!                          errmsg("remote query result rowtype does not match "
!                                 "the specified FROM clause rowtype")));

!             /* fast track when no results */
!             if (funcctx->max_calls < 1)
              {
!                 if (res)
!                     PQclear(res);
!                 MemoryContextSwitchTo(oldcontext);
!                 SRF_RETURN_DONE(funcctx);
              }

!             /* store needed metadata for subsequent calls */
!             attinmeta = TupleDescGetAttInMetadata(tupdesc);
!             funcctx->attinmeta = attinmeta;
!
!             MemoryContextSwitchTo(oldcontext);
          }
!         else
          {
!             /* async query send */
              MemoryContextSwitchTo(oldcontext);
!             PG_RETURN_INT32(PQsendQuery(conn, sql));
          }
-     }

!     if (is_async && !do_get)
!     {
!         /* async query send -- should not happen */
!         elog(ERROR, "async query send called more than once");

      }

--- 833,942 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

!         /* synchronous query, or async result retrieval */
!         if (!is_async)
!             res = PQexec(conn, sql);
!         else
          {
!             res = PQgetResult(conn);
!             /* NULL means we're all done with the async results */
!             if (!res)
              {
                  MemoryContextSwitchTo(oldcontext);
                  SRF_RETURN_DONE(funcctx);
              }
+         }

!         if (!res ||
!             (PQresultStatus(res) != PGRES_COMMAND_OK &&
!              PQresultStatus(res) != PGRES_TUPLES_OK))
!         {
!             dblink_res_error(conname, res, "could not execute query", fail);
              if (freeconn)
                  PQfinish(conn);
+             MemoryContextSwitchTo(oldcontext);
+             SRF_RETURN_DONE(funcctx);
+         }

!         if (PQresultStatus(res) == PGRES_COMMAND_OK)
!         {
!             is_sql_cmd = true;

!             /* need a tuple descriptor representing one TEXT column */
!             tupdesc = CreateTemplateTupleDesc(1, false);
!             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                TEXTOID, -1, 0);

              /*
!              * and save a copy of the command status string to return as
!              * our result tuple
               */
!             sql_cmd_status = PQcmdStatus(res);
!             funcctx->max_calls = 1;
!         }
!         else
!             funcctx->max_calls = PQntuples(res);

!         /* got results, keep track of them */
!         funcctx->user_fctx = res;
!
!         /* if needed, close the connection to the database and cleanup */
!         if (freeconn)
!             PQfinish(conn);
!
!         if (!is_sql_cmd)
!         {
!             /* get a tuple descriptor for our result type */
!             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
              {
!                 case TYPEFUNC_COMPOSITE:
!                     /* success */
!                     break;
!                 case TYPEFUNC_RECORD:
!                     /* failed to determine actual type of RECORD */
!                     ereport(ERROR,
!                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                     errmsg("function returning record called in context "
!                            "that cannot accept type record")));
!                     break;
!                 default:
!                     /* result type isn't composite */
!                     elog(ERROR, "return type must be a row type");
!                     break;
              }

!             /* make sure we have a persistent copy of the tupdesc */
!             tupdesc = CreateTupleDescCopy(tupdesc);
          }
!
!         /*
!          * check result and tuple descriptor have the same number of
!          * columns
!          */
!         if (PQnfields(res) != tupdesc->natts)
!             ereport(ERROR,
!                     (errcode(ERRCODE_DATATYPE_MISMATCH),
!                      errmsg("remote query result rowtype does not match "
!                             "the specified FROM clause rowtype")));
!
!         /* fast track when no results */
!         if (funcctx->max_calls < 1)
          {
!             if (res)
!                 PQclear(res);
              MemoryContextSwitchTo(oldcontext);
!             SRF_RETURN_DONE(funcctx);
          }

!         /* store needed metadata for subsequent calls */
!         attinmeta = TupleDescGetAttInMetadata(tupdesc);
!         funcctx->attinmeta = attinmeta;
!
!         MemoryContextSwitchTo(oldcontext);

      }


Re: dblink patches for comment

From
Joe Conway
Date:
Tom Lane wrote:
> It's hard to review it without any docs that say what it's supposed to do.
> (And you'd need to patch the docs anyway, eh?)

Here's a much simpler SQL/MED support patch for dblink.

This enforces security in the same manner for FOREIGN SERVER connections
as that worked out over time for other dblink connections. Essentially,
the FOREIGN SERVER and associated user MAPPING provides the needed info
for the libpq connection, but otherwise behavior is the same.

I've also attached a doc patch.

Comments?

Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.77
diff -c -r1.77 dblink.c
*** dblink.c    1 Jan 2009 17:23:31 -0000    1.77
--- dblink.c    31 May 2009 21:21:16 -0000
***************
*** 46,51 ****
--- 46,52 ----
  #include "catalog/pg_type.h"
  #include "executor/executor.h"
  #include "executor/spi.h"
+ #include "foreign/foreign.h"
  #include "lib/stringinfo.h"
  #include "miscadmin.h"
  #include "nodes/execnodes.h"
***************
*** 96,101 ****
--- 97,103 ----
  static void dblink_connstr_check(const char *connstr);
  static void dblink_security_check(PGconn *conn, remoteConn *rconn);
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
+ static char *get_connect_string(const char *servername);

  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 165,171 ****
              } \
              else \
              { \
!                 connstr = conname_or_str; \
                  dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
--- 167,177 ----
              } \
              else \
              { \
!                 connstr = get_connect_string(conname_or_str); \
!                 if (connstr == NULL) \
!                 { \
!                     connstr = conname_or_str; \
!                 } \
                  dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
***************
*** 210,215 ****
--- 216,222 ----
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
+     char       *conname_or_str = NULL;
      char       *connstr = NULL;
      char       *connname = NULL;
      char       *msg;
***************
*** 220,235 ****

      if (PG_NARGS() == 2)
      {
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

      /* check password in connection string if not superuser */
      dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);
--- 227,247 ----

      if (PG_NARGS() == 2)
      {
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

+     /* first check for valid foreign data server */
+     connstr = get_connect_string(conname_or_str);
+     if (connstr == NULL)
+         connstr = conname_or_str;
+
      /* check password in connection string if not superuser */
      dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);
***************
*** 2358,2360 ****
--- 2370,2430 ----
           errcontext("Error occurred on dblink connection named \"%s\": %s.",
                      dblink_context_conname, dblink_context_msg)));
  }
+
+ /*
+  * Obtain connection string for a foreign server
+  */
+ static char *
+ get_connect_string(const char *servername)
+ {
+     ForeignServer       *foreign_server;
+     UserMapping           *user_mapping;
+     ListCell           *cell;
+     StringInfo            buf = makeStringInfo();
+     ForeignDataWrapper *fdw;
+     AclResult            aclresult;
+
+     /* first gather the server connstr options */
+     foreign_server = GetForeignServerByName(servername, true);
+
+     if (foreign_server)
+     {
+         Oid        serverid = foreign_server->serverid;
+         Oid        fdwid = foreign_server->fdwid;
+         Oid        userid = GetUserId();
+
+         user_mapping = GetUserMapping(userid, serverid);
+         fdw    = GetForeignDataWrapper(fdwid);
+
+         /* Check permissions, user must have usage on the server. */
+         aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
+         if (aclresult != ACLCHECK_OK)
+             aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
+
+         foreach (cell, fdw->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         foreach (cell, foreign_server->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         foreach (cell, user_mapping->options)
+         {
+
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+         }
+
+         return pstrdup(buf->data);
+     }
+     else
+         return NULL;
+ }
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.24
diff -c -r1.24 dblink.out
*** expected/dblink.out    3 Jul 2008 03:56:57 -0000    1.24
--- expected/dblink.out    1 Jun 2009 00:06:08 -0000
***************
*** 784,786 ****
--- 784,829 ----
   OK
  (1 row)

+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT USAGE ON FOREIGN SERVER fdtest TO dblink_regression_test;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ -- should fail
+ SELECT dblink_connect('myconn', 'fdtest');
+ ERROR:  password is required
+ DETAIL:  Non-superusers must provide a password in the connection string.
+ -- should succeed
+ SELECT dblink_connect_u('myconn', 'fdtest');
+  dblink_connect_u
+ ------------------
+  OK
+ (1 row)
+
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+  a  | b |       c
+ ----+---+---------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+  10 | k | {a10,b10,c10}
+ (11 rows)
+
+ \c - :ORIGINAL_USER
+ REVOKE USAGE ON FOREIGN SERVER fdtest FROM dblink_regression_test;
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
+ DROP USER MAPPING FOR public SERVER fdtest;
+ DROP SERVER fdtest;
+ DROP FOREIGN DATA WRAPPER postgresql;
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.20
diff -c -r1.20 dblink.sql
*** sql/dblink.sql    6 Apr 2008 16:54:48 -0000    1.20
--- sql/dblink.sql    1 Jun 2009 00:05:29 -0000
***************
*** 364,366 ****
--- 364,391 ----
  SELECT dblink_cancel_query('dtest1');
  SELECT dblink_error_message('dtest1');
  SELECT dblink_disconnect('dtest1');
+
+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT USAGE ON FOREIGN SERVER fdtest TO dblink_regression_test;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ -- should fail
+ SELECT dblink_connect('myconn', 'fdtest');
+ -- should succeed
+ SELECT dblink_connect_u('myconn', 'fdtest');
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+
+ \c - :ORIGINAL_USER
+ REVOKE USAGE ON FOREIGN SERVER fdtest FROM dblink_regression_test;
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
+ DROP USER MAPPING FOR public SERVER fdtest;
+ DROP SERVER fdtest;
+ DROP FOREIGN DATA WRAPPER postgresql;
Index: dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.6
diff -c -r1.6 dblink.sgml
*** dblink.sgml    12 Nov 2008 15:52:44 -0000    1.6
--- dblink.sgml    1 Jun 2009 00:12:22 -0000
***************
*** 42,47 ****
--- 42,59 ----
      only one unnamed connection is permitted at a time.  The connection
      will persist until closed or until the database session is ended.
     </para>
+
+    <para>
+     The connection string may also be the name of an existing foreign
+     server that utilizes the postgresql_fdw foreign data wrapper library.
+     See the example below, as well as the following:
+     <simplelist type="inline">
+      <member><xref linkend="sql-createforeigndatawrapper" endterm="sql-createforeigndatawrapper-title"></member>
+      <member><xref linkend="sql-createserver" endterm="sql-createserver-title"></member>
+      <member><xref linkend="sql-createusermapping" endterm="sql-createusermapping-title"></member>
+     </simplelist>
+    </para>
+
    </refsect1>

    <refsect1>
***************
*** 113,118 ****
--- 125,172 ----
   ----------------
    OK
   (1 row)
+
+  -- FOREIGN DATA WRAPPER functionality
+  -- Note: local connection must require authentication for this to work properly
+  CREATE USER dblink_regression_test WITH PASSWORD 'secret';
+  CREATE FOREIGN DATA WRAPPER postgresql;
+  CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '127.0.0.1', dbname 'contrib_regression');
+
+  CREATE USER MAPPING FOR dblink_regression_test SERVER fdtest OPTIONS (user 'dblink_regression_test', password
'secret');
+  GRANT USAGE ON FOREIGN SERVER fdtest TO dblink_regression_test;
+  GRANT SELECT ON TABLE foo TO dblink_regression_test;
+
+  \set ORIGINAL_USER :USER
+  \c - dblink_regression_test
+  SELECT dblink_connect('myconn', 'fdtest');
+   dblink_connect
+  ----------------
+   OK
+  (1 row)
+
+  SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+   a  | b |       c
+  ----+---+---------------
+    0 | a | {a0,b0,c0}
+    1 | b | {a1,b1,c1}
+    2 | c | {a2,b2,c2}
+    3 | d | {a3,b3,c3}
+    4 | e | {a4,b4,c4}
+    5 | f | {a5,b5,c5}
+    6 | g | {a6,b6,c6}
+    7 | h | {a7,b7,c7}
+    8 | i | {a8,b8,c8}
+    9 | j | {a9,b9,c9}
+   10 | k | {a10,b10,c10}
+  (11 rows)
+
+  \c - :ORIGINAL_USER
+  REVOKE USAGE ON FOREIGN SERVER fdtest FROM dblink_regression_test;
+  REVOKE SELECT ON TABLE foo FROM  dblink_regression_test;
+  DROP USER MAPPING FOR dblink_regression_test SERVER fdtest;
+  DROP USER dblink_regression_test;
+  DROP SERVER fdtest;
+  DROP FOREIGN DATA WRAPPER postgresql;
     </programlisting>
    </refsect1>
   </refentry>

Re: dblink patches for comment

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> Probably better if I break this up in logical chunks too. This patch 
> only addresses the refactoring you requested here:
> http://archives.postgresql.org/message-id/28719.1230996378@sss.pgh.pa.us

This looks sane to me in a quick once-over, though I've not tested it.

A small suggestion for future patches: don't bother to reindent code
chunks that aren't changing --- it just complicates the diff with a
lot of uninteresting whitespace changes.  You can either do that after
review, or leave it to be done by pgindent.  (Speaking of which, we
need to schedule that soon...)
        regards, tom lane


Re: dblink patches for comment

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> Here's a much simpler SQL/MED support patch for dblink.

> This enforces security in the same manner for FOREIGN SERVER connections 
> as that worked out over time for other dblink connections. Essentially, 
> the FOREIGN SERVER and associated user MAPPING provides the needed info 
> for the libpq connection, but otherwise behavior is the same.

> I've also attached a doc patch.

The docs patch looks okay, except this comment is a bit hazy:

> +  -- Note: local connection must require authentication for this to work properly

I think what it means is

> +  -- Note: local connection must require password authentication for this to work properly

If not, please clarify some other way.  It might also be good to be a
bit more clear about what "fail to work properly" might entail.

As far as the code goes, hopefully Peter will take a look since he's
spent more time on the SQL/MED code than I have.  The only thing I can
see that looks bogus is that get_connect_string() is failing to handle
any quoting/escaping that might be needed for the values to be inserted
into the connection string.  I don't recall offhand what rules libpq
has for that, but I hope it at least implements doubled single quotes...
        regards, tom lane


Re: dblink patches for comment

From
Joe Conway
Date:
Tom Lane wrote:
> Joe Conway <mail@joeconway.com> writes:
>> Probably better if I break this up in logical chunks too. This patch
>> only addresses the refactoring you requested here:
>> http://archives.postgresql.org/message-id/28719.1230996378@sss.pgh.pa.us
>
> This looks sane to me in a quick once-over, though I've not tested it.

Thanks -- committed.

> A small suggestion for future patches: don't bother to reindent code
> chunks that aren't changing --- it just complicates the diff with a
> lot of uninteresting whitespace changes.  You can either do that after
> review, or leave it to be done by pgindent.  (Speaking of which, we
> need to schedule that soon...)

Sorry. "cvs diff -cb" seems to help (see attached). It is half the size
and much more readable.

Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.77
diff -c -b -r1.77 dblink.c
*** dblink.c    1 Jan 2009 17:23:31 -0000    1.77
--- dblink.c    2 Jun 2009 03:04:04 -0000
***************
*** 77,83 ****
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
--- 77,83 ----
  /*
   * Internal declarations
   */
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 689,713 ****
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, false);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
--- 689,735 ----
  Datum
  dblink_record(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, false);
  }

  PG_FUNCTION_INFO_V1(dblink_send_query);
  Datum
  dblink_send_query(PG_FUNCTION_ARGS)
  {
!     PGconn       *conn = NULL;
!     char       *connstr = NULL;
!     char       *sql = NULL;
!     remoteConn *rconn = NULL;
!     char       *msg;
!     bool        freeconn = false;
!     int            retval;
!
!     if (PG_NARGS() == 2)
!     {
!         DBLINK_GET_CONN;
!         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
!     }
!     else
!         /* shouldn't happen */
!         elog(ERROR, "wrong number of arguments");
!
!     /* async query send */
!     retval = PQsendQuery(conn, sql);
!     if (retval != 1)
!         elog(NOTICE, "%s", PQerrorMessage(conn));
!
!     PG_RETURN_INT32(retval);
  }

  PG_FUNCTION_INFO_V1(dblink_get_result);
  Datum
  dblink_get_result(PG_FUNCTION_ARGS)
  {
!     return dblink_record_internal(fcinfo, true);
  }

  static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  {
      FuncCallContext *funcctx;
      TupleDesc    tupdesc = NULL;
***************
*** 775,788 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else if (is_async && do_get)
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(2);
              }
              else if (PG_NARGS() == 1)
              {
--- 797,810 ----
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
!         else /* is_async */
          {
              /* get async result */
              if (PG_NARGS() == 2)
              {
                  /* text,bool */
                  DBLINK_GET_CONN;
!                 fail = PG_GETARG_BOOL(1);
              }
              else if (PG_NARGS() == 1)
              {
***************
*** 793,816 ****
                  /* shouldn't happen */
                  elog(ERROR, "wrong number of arguments");
          }
-         else
-         {
-             /* send async query */
-             if (PG_NARGS() == 2)
-             {
-                 DBLINK_GET_CONN;
-                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-             }
-             else
-                 /* shouldn't happen */
-                 elog(ERROR, "wrong number of arguments");
-         }

          if (!conn)
              DBLINK_CONN_NOT_AVAIL;

-         if (!is_async || (is_async && do_get))
-         {
              /* synchronous query, or async result retrieval */
              if (!is_async)
                  res = PQexec(conn, sql);
--- 815,824 ----
***************
*** 911,929 ****
              funcctx->attinmeta = attinmeta;

              MemoryContextSwitchTo(oldcontext);
-         }
-         else
-         {
-             /* async query send */
-             MemoryContextSwitchTo(oldcontext);
-             PG_RETURN_INT32(PQsendQuery(conn, sql));
-         }
-     }
-
-     if (is_async && !do_get)
-     {
-         /* async query send -- should not happen */
-         elog(ERROR, "async query send called more than once");

      }

--- 919,924 ----

Re: dblink patches for comment

From
Joe Conway
Date:
Tom Lane wrote:
> The docs patch looks okay, except this comment is a bit hazy:
>
>> +  -- Note: local connection must require authentication for this to work properly
>
> I think what it means is
>
>> +  -- Note: local connection must require password authentication for this to work properly
>
> If not, please clarify some other way.  It might also be good to be a
> bit more clear about what "fail to work properly" might entail.

OK, hopefully the attached is more clear.

> As far as the code goes, hopefully Peter will take a look since he's
> spent more time on the SQL/MED code than I have.  The only thing I can
> see that looks bogus is that get_connect_string() is failing to handle
> any quoting/escaping that might be needed for the values to be inserted
> into the connection string.  I don't recall offhand what rules libpq
> has for that, but I hope it at least implements doubled single quotes...

Added quote_literal_cstr() around the connection string params. Also
found I needed to restrict the servername string length to NAMEDATALEN
in order to avoid an assert if a full connection string is passed to
dblink_connect().

Other comments?

Thanks,

Joe

Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.78
diff -c -r1.78 dblink.c
*** dblink.c    2 Jun 2009 03:21:56 -0000    1.78
--- dblink.c    2 Jun 2009 22:55:42 -0000
***************
*** 46,51 ****
--- 46,52 ----
  #include "catalog/pg_type.h"
  #include "executor/executor.h"
  #include "executor/spi.h"
+ #include "foreign/foreign.h"
  #include "lib/stringinfo.h"
  #include "miscadmin.h"
  #include "nodes/execnodes.h"
***************
*** 96,101 ****
--- 97,103 ----
  static void dblink_connstr_check(const char *connstr);
  static void dblink_security_check(PGconn *conn, remoteConn *rconn);
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
+ static char *get_connect_string(const char *servername);

  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 165,171 ****
              } \
              else \
              { \
!                 connstr = conname_or_str; \
                  dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
--- 167,177 ----
              } \
              else \
              { \
!                 connstr = get_connect_string(conname_or_str); \
!                 if (connstr == NULL) \
!                 { \
!                     connstr = conname_or_str; \
!                 } \
                  dblink_connstr_check(connstr); \
                  conn = PQconnectdb(connstr); \
                  if (PQstatus(conn) == CONNECTION_BAD) \
***************
*** 210,215 ****
--- 216,222 ----
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
+     char       *conname_or_str = NULL;
      char       *connstr = NULL;
      char       *connname = NULL;
      char       *msg;
***************
*** 220,235 ****

      if (PG_NARGS() == 2)
      {
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         connstr = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

      /* check password in connection string if not superuser */
      dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);
--- 227,247 ----

      if (PG_NARGS() == 2)
      {
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
          connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
      }
      else if (PG_NARGS() == 1)
!         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));

      if (connname)
          rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                                    sizeof(remoteConn));

+     /* first check for valid foreign data server */
+     connstr = get_connect_string(conname_or_str);
+     if (connstr == NULL)
+         connstr = conname_or_str;
+
      /* check password in connection string if not superuser */
      dblink_connstr_check(connstr);
      conn = PQconnectdb(connstr);
***************
*** 2353,2355 ****
--- 2365,2426 ----
           errcontext("Error occurred on dblink connection named \"%s\": %s.",
                      dblink_context_conname, dblink_context_msg)));
  }
+
+ /*
+  * Obtain connection string for a foreign server
+  */
+ static char *
+ get_connect_string(const char *servername)
+ {
+     ForeignServer       *foreign_server = NULL;
+     UserMapping           *user_mapping;
+     ListCell           *cell;
+     StringInfo            buf = makeStringInfo();
+     ForeignDataWrapper *fdw;
+     AclResult            aclresult;
+
+     /* first gather the server connstr options */
+     if (strlen(servername) < NAMEDATALEN)
+         foreign_server = GetForeignServerByName(servername, true);
+
+     if (foreign_server)
+     {
+         Oid        serverid = foreign_server->serverid;
+         Oid        fdwid = foreign_server->fdwid;
+         Oid        userid = GetUserId();
+
+         user_mapping = GetUserMapping(userid, serverid);
+         fdw    = GetForeignDataWrapper(fdwid);
+
+         /* Check permissions, user must have usage on the server. */
+         aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
+         if (aclresult != ACLCHECK_OK)
+             aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
+
+         foreach (cell, fdw->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s=%s ", def->defname, quote_literal_cstr(strVal(def->arg)));
+         }
+
+         foreach (cell, foreign_server->options)
+         {
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s=%s ", def->defname, quote_literal_cstr(strVal(def->arg)));
+         }
+
+         foreach (cell, user_mapping->options)
+         {
+
+             DefElem           *def = lfirst(cell);
+
+             appendStringInfo(buf, "%s=%s ", def->defname, quote_literal_cstr(strVal(def->arg)));
+         }
+
+         return pstrdup(buf->data);
+     }
+     else
+         return NULL;
+ }
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.24
diff -c -r1.24 dblink.out
*** expected/dblink.out    3 Jul 2008 03:56:57 -0000    1.24
--- expected/dblink.out    2 Jun 2009 16:54:37 -0000
***************
*** 784,786 ****
--- 784,829 ----
   OK
  (1 row)

+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT USAGE ON FOREIGN SERVER fdtest TO dblink_regression_test;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ -- should fail
+ SELECT dblink_connect('myconn', 'fdtest');
+ ERROR:  password is required
+ DETAIL:  Non-superusers must provide a password in the connection string.
+ -- should succeed
+ SELECT dblink_connect_u('myconn', 'fdtest');
+  dblink_connect_u
+ ------------------
+  OK
+ (1 row)
+
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+  a  | b |       c
+ ----+---+---------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+  10 | k | {a10,b10,c10}
+ (11 rows)
+
+ \c - :ORIGINAL_USER
+ REVOKE USAGE ON FOREIGN SERVER fdtest FROM dblink_regression_test;
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
+ DROP USER MAPPING FOR public SERVER fdtest;
+ DROP SERVER fdtest;
+ DROP FOREIGN DATA WRAPPER postgresql;
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.20
diff -c -r1.20 dblink.sql
*** sql/dblink.sql    6 Apr 2008 16:54:48 -0000    1.20
--- sql/dblink.sql    2 Jun 2009 16:54:37 -0000
***************
*** 364,366 ****
--- 364,391 ----
  SELECT dblink_cancel_query('dtest1');
  SELECT dblink_error_message('dtest1');
  SELECT dblink_disconnect('dtest1');
+
+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT USAGE ON FOREIGN SERVER fdtest TO dblink_regression_test;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ -- should fail
+ SELECT dblink_connect('myconn', 'fdtest');
+ -- should succeed
+ SELECT dblink_connect_u('myconn', 'fdtest');
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+
+ \c - :ORIGINAL_USER
+ REVOKE USAGE ON FOREIGN SERVER fdtest FROM dblink_regression_test;
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
+ DROP USER MAPPING FOR public SERVER fdtest;
+ DROP SERVER fdtest;
+ DROP FOREIGN DATA WRAPPER postgresql;
Index: dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.6
diff -c -r1.6 dblink.sgml
*** dblink.sgml    12 Nov 2008 15:52:44 -0000    1.6
--- dblink.sgml    2 Jun 2009 22:54:59 -0000
***************
*** 42,47 ****
--- 42,59 ----
      only one unnamed connection is permitted at a time.  The connection
      will persist until closed or until the database session is ended.
     </para>
+
+    <para>
+     The connection string may also be the name of an existing foreign
+     server that utilizes the postgresql_fdw foreign data wrapper library.
+     See the example below, as well as the following:
+     <simplelist type="inline">
+      <member><xref linkend="sql-createforeigndatawrapper" endterm="sql-createforeigndatawrapper-title"></member>
+      <member><xref linkend="sql-createserver" endterm="sql-createserver-title"></member>
+      <member><xref linkend="sql-createusermapping" endterm="sql-createusermapping-title"></member>
+     </simplelist>
+    </para>
+
    </refsect1>

    <refsect1>
***************
*** 113,118 ****
--- 125,177 ----
   ----------------
    OK
   (1 row)
+
+  -- FOREIGN DATA WRAPPER functionality
+  -- Note: local connection must require password authentication for this to work properly
+  --       Otherwise, you will receive the following error from dblink_connect():
+  --       ----------------------------------------------------------------------
+  --       ERROR:  password is required
+  --       DETAIL:  Non-superuser cannot connect if the server does not request a password.
+  --       HINT:  Target server's authentication method must be changed.
+  CREATE USER dblink_regression_test WITH PASSWORD 'secret';
+  CREATE FOREIGN DATA WRAPPER postgresql;
+  CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '127.0.0.1', dbname 'contrib_regression');
+
+  CREATE USER MAPPING FOR dblink_regression_test SERVER fdtest OPTIONS (user 'dblink_regression_test', password
'secret');
+  GRANT USAGE ON FOREIGN SERVER fdtest TO dblink_regression_test;
+  GRANT SELECT ON TABLE foo TO dblink_regression_test;
+
+  \set ORIGINAL_USER :USER
+  \c - dblink_regression_test
+  SELECT dblink_connect('myconn', 'fdtest');
+   dblink_connect
+  ----------------
+   OK
+  (1 row)
+
+  SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+   a  | b |       c
+  ----+---+---------------
+    0 | a | {a0,b0,c0}
+    1 | b | {a1,b1,c1}
+    2 | c | {a2,b2,c2}
+    3 | d | {a3,b3,c3}
+    4 | e | {a4,b4,c4}
+    5 | f | {a5,b5,c5}
+    6 | g | {a6,b6,c6}
+    7 | h | {a7,b7,c7}
+    8 | i | {a8,b8,c8}
+    9 | j | {a9,b9,c9}
+   10 | k | {a10,b10,c10}
+  (11 rows)
+
+  \c - :ORIGINAL_USER
+  REVOKE USAGE ON FOREIGN SERVER fdtest FROM dblink_regression_test;
+  REVOKE SELECT ON TABLE foo FROM  dblink_regression_test;
+  DROP USER MAPPING FOR dblink_regression_test SERVER fdtest;
+  DROP USER dblink_regression_test;
+  DROP SERVER fdtest;
+  DROP FOREIGN DATA WRAPPER postgresql;
     </programlisting>
    </refsect1>
   </refentry>