Thread: Re: quote_ident and schemas (was Re: [HACKERS] connectby with schema)

Re: quote_ident and schemas (was Re: [HACKERS] connectby with schema)

From
Joe Conway
Date:
Tom Lane wrote:
> Since connectby takes a string parameter (correct?) for the table name,
> my advice would be to have it not do quote_ident, but instead expect the
> user to include double quotes in the string value if dealing with
> mixed-case names.  Compare the behavior of nextval() for example:
>
> regression=# select nextval('Foo.Bar');
> ERROR:  Namespace "foo" does not exist
> regression=# select nextval('"Foo"."Bar"');
> ERROR:  Namespace "Foo" does not exist
> regression=# select nextval('"Foo.Bar"');
> ERROR:  Relation "Foo.Bar" does not exist
>

OK. Attached patch removes calls within the function to quote_ident, requiring
the user to appropriately quote their own identifiers. I also tweaked the
regression test to deal with "value" becoming a reserved word.

If it's not too late, I'd like this to get into 7.3, but in any case, please
apply to HEAD.

Thanks,

Joe

p.s. There are similar issues in dblink, but they appear a bit more difficult
to address. I'll attempt to get them resloved this weekend, again in hopes to
get them applied before 7.3 is released.
Index: contrib/tablefunc/tablefunc.c
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/tablefunc/tablefunc.c,v
retrieving revision 1.10
diff -c -r1.10 tablefunc.c
*** contrib/tablefunc/tablefunc.c    3 Oct 2002 17:11:12 -0000    1.10
--- contrib/tablefunc/tablefunc.c    22 Nov 2002 22:04:59 -0000
***************
*** 66,72 ****
                               MemoryContext per_query_ctx,
                               AttInMetadata *attinmeta,
                               Tuplestorestate *tupstore);
- static char *quote_ident_cstr(char *rawstr);

  typedef struct
  {
--- 66,71 ----
***************
*** 776,787 ****

      /* Build initial sql statement */
      appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = '%s' AND %s IS NOT NULL",
!                      quote_ident_cstr(key_fld),
!                      quote_ident_cstr(parent_key_fld),
!                      quote_ident_cstr(relname),
!                      quote_ident_cstr(parent_key_fld),
                       start_with,
!                      quote_ident_cstr(key_fld));

      /* Retrieve the desired rows */
      ret = SPI_exec(sql->data, 0);
--- 775,786 ----

      /* Build initial sql statement */
      appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = '%s' AND %s IS NOT NULL",
!                      key_fld,
!                      parent_key_fld,
!                      relname,
!                      parent_key_fld,
                       start_with,
!                      key_fld);

      /* Retrieve the desired rows */
      ret = SPI_exec(sql->data, 0);
***************
*** 1082,1103 ****
      }

      return tupdesc;
- }
-
- /*
-  * Return a properly quoted identifier.
-  * Uses quote_ident in quote.c
-  */
- static char *
- quote_ident_cstr(char *rawstr)
- {
-     text       *rawstr_text;
-     text       *result_text;
-     char       *result;
-
-     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
-     result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
-     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
-
-     return result;
  }
--- 1081,1084 ----
Index: contrib/tablefunc/expected/tablefunc.out
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/tablefunc/expected/tablefunc.out,v
retrieving revision 1.4
diff -c -r1.4 tablefunc.out
*** contrib/tablefunc/expected/tablefunc.out    18 Oct 2002 18:41:21 -0000    1.4
--- contrib/tablefunc/expected/tablefunc.out    22 Nov 2002 23:14:32 -0000
***************
*** 16,122 ****
  --
  -- crosstab()
  --
! CREATE TABLE ct(id int, rowclass text, rowid text, attribute text, value text);
  \copy ct from 'data/ct.data'
! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2
  ----------+------------+------------
   test1    | val2       | val3
   test2    | val6       | val7
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test1    | val2       | val3       |
   test2    | val6       | val7       |
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test1    | val2       | val3       |            |
   test2    | val6       | val7       |            |
  (2 rows)

! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
   row_name | category_1 | category_2
  ----------+------------+------------
   test1    | val1       | val2
   test2    | val5       | val6
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test1    | val1       | val2       | val3
   test2    | val5       | val6       | val7
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test1    | val1       | val2       | val3       | val4
   test2    | val5       | val6       | val7       | val8
  (2 rows)

! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2
  ----------+------------+------------
   test3    | val1       | val2
   test4    | val4       | val5
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test3    | val1       | val2       |
   test4    | val4       | val5       |
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test3    | val1       | val2       |            |
   test4    | val4       | val5       |            |
  (2 rows)

! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
   row_name | category_1 | category_2
  ----------+------------+------------
   test3    | val1       | val2
   test4    | val4       | val5
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test3    | val1       | val2       | val3
   test4    | val4       | val5       | val6
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test3    | val1       | val2       | val3       |
   test4    | val4       | val5       | val6       |
  (2 rows)

! SELECT * FROM crosstab('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 2) AS
c(rowidtext, att1 text, att2 text); 
   rowid | att1 | att2
  -------+------+------
   test1 | val1 | val2
   test2 | val5 | val6
  (2 rows)

! SELECT * FROM crosstab('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 3) AS
c(rowidtext, att1 text, att2 text, att3 text); 
   rowid | att1 | att2 | att3
  -------+------+------+------
   test1 | val1 | val2 | val3
   test2 | val5 | val6 | val7
  (2 rows)

! SELECT * FROM crosstab('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 4) AS
c(rowidtext, att1 text, att2 text, att3 text, att4 text); 
   rowid | att1 | att2 | att3 | att4
  -------+------+------+------+------
   test1 | val1 | val2 | val3 | val4
--- 16,122 ----
  --
  -- crosstab()
  --
! CREATE TABLE ct(id int, rowclass text, rowid text, attribute text, val text);
  \copy ct from 'data/ct.data'
! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2
  ----------+------------+------------
   test1    | val2       | val3
   test2    | val6       | val7
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test1    | val2       | val3       |
   test2    | val6       | val7       |
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test1    | val2       | val3       |            |
   test2    | val6       | val7       |            |
  (2 rows)

! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
   row_name | category_1 | category_2
  ----------+------------+------------
   test1    | val1       | val2
   test2    | val5       | val6
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test1    | val1       | val2       | val3
   test2    | val5       | val6       | val7
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test1    | val1       | val2       | val3       | val4
   test2    | val5       | val6       | val7       | val8
  (2 rows)

! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2
  ----------+------------+------------
   test3    | val1       | val2
   test4    | val4       | val5
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test3    | val1       | val2       |
   test4    | val4       | val5       |
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test3    | val1       | val2       |            |
   test4    | val4       | val5       |            |
  (2 rows)

! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
   row_name | category_1 | category_2
  ----------+------------+------------
   test3    | val1       | val2
   test4    | val4       | val5
  (2 rows)

! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3
  ----------+------------+------------+------------
   test3    | val1       | val2       | val3
   test4    | val4       | val5       | val6
  (2 rows)

! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
   row_name | category_1 | category_2 | category_3 | category_4
  ----------+------------+------------+------------+------------
   test3    | val1       | val2       | val3       |
   test4    | val4       | val5       | val6       |
  (2 rows)

! SELECT * FROM crosstab('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 2) AS
c(rowidtext, att1 text, att2 text); 
   rowid | att1 | att2
  -------+------+------
   test1 | val1 | val2
   test2 | val5 | val6
  (2 rows)

! SELECT * FROM crosstab('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 3) AS
c(rowidtext, att1 text, att2 text, att3 text); 
   rowid | att1 | att2 | att3
  -------+------+------+------
   test1 | val1 | val2 | val3
   test2 | val5 | val6 | val7
  (2 rows)

! SELECT * FROM crosstab('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 4) AS
c(rowidtext, att1 text, att2 text, att3 text, att4 text); 
   rowid | att1 | att2 | att3 | att4
  -------+------+------+------+------
   test1 | val1 | val2 | val3 | val4
Index: contrib/tablefunc/sql/tablefunc.sql
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/tablefunc/sql/tablefunc.sql,v
retrieving revision 1.5
diff -c -r1.5 tablefunc.sql
*** contrib/tablefunc/sql/tablefunc.sql    21 Oct 2002 01:42:14 -0000    1.5
--- contrib/tablefunc/sql/tablefunc.sql    22 Nov 2002 23:13:59 -0000
***************
*** 15,42 ****
  --
  -- crosstab()
  --
! CREATE TABLE ct(id int, rowclass text, rowid text, attribute text, value text);
  \copy ct from 'data/ct.data'

! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 

! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;');

! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 

! SELECT * FROM crosstab2('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
! SELECT * FROM crosstab3('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
! SELECT * FROM crosstab4('SELECT rowid, attribute, value FROM ct where rowclass = ''group2'' ORDER BY 1,2;');

! SELECT * FROM crosstab('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 2) AS
c(rowidtext, att1 text, att2 text); 
! SELECT * FROM crosstab('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 3) AS
c(rowidtext, att1 text, att2 text, att3 text); 
! SELECT * FROM crosstab('SELECT rowid, attribute, value FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 4) AS
c(rowidtext, att1 text, att2 text, att3 text, att4 text); 

  -- test connectby with text based hierarchy
  CREATE TABLE connectby_text(keyid text, parent_keyid text);
--- 15,42 ----
  --
  -- crosstab()
  --
! CREATE TABLE ct(id int, rowclass text, rowid text, attribute text, val text);
  \copy ct from 'data/ct.data'

! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' and (attribute = ''att2''
orattribute = ''att3'') ORDER BY 1,2;'); 

! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;');
! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;');

! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 
! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' and (attribute = ''att1''
orattribute = ''att2'') ORDER BY 1,2;'); 

! SELECT * FROM crosstab2('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
! SELECT * FROM crosstab3('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' ORDER BY 1,2;');
! SELECT * FROM crosstab4('SELECT rowid, attribute, val FROM ct where rowclass = ''group2'' ORDER BY 1,2;');

! SELECT * FROM crosstab('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 2) AS
c(rowidtext, att1 text, att2 text); 
! SELECT * FROM crosstab('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 3) AS
c(rowidtext, att1 text, att2 text, att3 text); 
! SELECT * FROM crosstab('SELECT rowid, attribute, val FROM ct where rowclass = ''group1'' ORDER BY 1,2;', 4) AS
c(rowidtext, att1 text, att2 text, att3 text, att4 text); 

  -- test connectby with text based hierarchy
  CREATE TABLE connectby_text(keyid text, parent_keyid text);

Re: quote_ident and schemas (was Re: [HACKERS] connectby with schema)

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> OK. Attached patch removes calls within the function to quote_ident, requiring
> the user to appropriately quote their own identifiers. I also tweaked the
> regression test to deal with "value" becoming a reserved word.

Applied.  I also threw in a quick note in the README to mention the need
for quoting.

            regards, tom lane

Re: quote_ident and schemas (was Re: [HACKERS] connectby

From
Joe Conway
Date:
Joe Conway wrote:
> p.s. There are similar issues in dblink, but they appear a bit more
> difficult to address. I'll attempt to get them resloved this weekend,
> again in hopes to get them applied before 7.3 is released.
>

Attached patch removes most (hopefully just the appropriate ones) calls in
dblink to quote_ident, requiring the user to quote their own identifiers. I
also added to the regression test a case for a quoted, schema qualified table
name.

If it's not too late, I'd like this to get into 7.3, but in any case,
please apply to HEAD.

Thanks,

Joe
Index: contrib/dblink/README.dblink
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/README.dblink,v
retrieving revision 1.6
diff -c -r1.6 README.dblink
*** contrib/dblink/README.dblink    2 Sep 2002 06:13:31 -0000    1.6
--- contrib/dblink/README.dblink    23 Nov 2002 17:21:40 -0000
***************
*** 151,156 ****
--- 151,160 ----

  Documentation:

+   Note: Parameters representing relation names must include double
+      quotes if the names are mixed-case or contain special characters. They
+      must also be appropriately qualified with schema name if applicable.
+
    See the following files:
       doc/connection
       doc/cursor
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/dblink.c,v
retrieving revision 1.18
diff -c -r1.18 dblink.c
*** contrib/dblink/dblink.c    13 Nov 2002 00:39:46 -0000    1.18
--- contrib/dblink/dblink.c    23 Nov 2002 16:45:36 -0000
***************
*** 71,76 ****
--- 71,77 ----
  static void append_res_ptr(dblink_results * results);
  static void remove_res_ptr(dblink_results * results);
  static TupleDesc pgresultGetTupleDesc(PGresult *res);
+ static char *generate_relation_name(Oid relid);

  /* Global */
  List       *res_id = NIL;
***************
*** 171,177 ****
      }
      PQclear(res);

!     appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
      res = PQexec(conn, str->data);
      if (!res ||
          (PQresultStatus(res) != PGRES_COMMAND_OK &&
--- 172,178 ----
      }
      PQclear(res);

!     appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
      res = PQexec(conn, str->data);
      if (!res ||
          (PQresultStatus(res) != PGRES_COMMAND_OK &&
***************
*** 210,216 ****
      else
          elog(ERROR, "dblink_close: no connection available");

!     appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));

      /* close the cursor */
      res = PQexec(conn, str->data);
--- 211,217 ----
      else
          elog(ERROR, "dblink_close: no connection available");

!     appendStringInfo(str, "CLOSE %s", curname);

      /* close the cursor */
      res = PQexec(conn, str->data);
***************
*** 287,293 ****
          else
              elog(ERROR, "dblink_fetch: no connection available");

!         appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));

          res = PQexec(conn, str->data);
          if (!res ||
--- 288,294 ----
          else
              elog(ERROR, "dblink_fetch: no connection available");

!         appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);

          res = PQexec(conn, str->data);
          if (!res ||
***************
*** 306,312 ****
          {
              /* cursor does not exist - closed already or bad name */
              PQclear(res);
!             elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
          }

          funcctx->max_calls = PQntuples(res);
--- 307,313 ----
          {
              /* cursor does not exist - closed already or bad name */
              PQclear(res);
!             elog(ERROR, "dblink_fetch: cursor %s does not exist", curname);
          }

          funcctx->max_calls = PQntuples(res);
***************
*** 1527,1537 ****
      int            i;
      bool        needComma;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

--- 1528,1540 ----
      int            i;
      bool        needComma;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

***************
*** 1539,1545 ****
      if (!tuple)
          elog(ERROR, "dblink_build_sql_insert: row not found");

!     appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));

      needComma = false;
      for (i = 0; i < natts; i++)
--- 1542,1548 ----
      if (!tuple)
          elog(ERROR, "dblink_build_sql_insert: row not found");

!     appendStringInfo(str, "INSERT INTO %s(", relname);

      needComma = false;
      for (i = 0; i < natts; i++)
***************
*** 1610,1624 ****
      char       *val;
      int            i;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

!     appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
      for (i = 0; i < pknumatts; i++)
      {
          int16        pkattnum = pkattnums[i];
--- 1613,1629 ----
      char       *val;
      int            i;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

!     appendStringInfo(str, "DELETE FROM %s WHERE ", relname);
      for (i = 0; i < pknumatts; i++)
      {
          int16        pkattnum = pkattnums[i];
***************
*** 1669,1679 ****
      int            i;
      bool        needComma;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

--- 1674,1686 ----
      int            i;
      bool        needComma;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

***************
*** 1681,1687 ****
      if (!tuple)
          elog(ERROR, "dblink_build_sql_update: row not found");

!     appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));

      needComma = false;
      for (i = 0; i < natts; i++)
--- 1688,1694 ----
      if (!tuple)
          elog(ERROR, "dblink_build_sql_update: row not found");

!     appendStringInfo(str, "UPDATE %s SET ", relname);

      needComma = false;
      for (i = 0; i < natts; i++)
***************
*** 1813,1823 ****
      int            i;
      char       *val = NULL;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = CreateTupleDescCopy(rel->rd_att);
      relation_close(rel, AccessShareLock);

--- 1820,1832 ----
      int            i;
      char       *val = NULL;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = CreateTupleDescCopy(rel->rd_att);
      relation_close(rel, AccessShareLock);

***************
*** 1831,1837 ****
       * Build sql statement to look up tuple of interest Use src_pkattvals
       * as the criteria.
       */
!     appendStringInfo(str, "SELECT * FROM %s WHERE ", quote_ident_cstr(relname));

      for (i = 0; i < pknumatts; i++)
      {
--- 1840,1846 ----
       * Build sql statement to look up tuple of interest Use src_pkattvals
       * as the criteria.
       */
!     appendStringInfo(str, "SELECT * FROM %s WHERE ", relname);

      for (i = 0; i < pknumatts; i++)
      {
***************
*** 2002,2005 ****
--- 2011,2048 ----
      }

      return desc;
+ }
+
+ /*
+  * generate_relation_name - copied from ruleutils.c
+  *        Compute the name to display for a relation specified by OID
+  *
+  * The result includes all necessary quoting and schema-prefixing.
+  */
+ static char *
+ generate_relation_name(Oid relid)
+ {
+     HeapTuple    tp;
+     Form_pg_class reltup;
+     char       *nspname;
+     char       *result;
+
+     tp = SearchSysCache(RELOID,
+                         ObjectIdGetDatum(relid),
+                         0, 0, 0);
+     if (!HeapTupleIsValid(tp))
+         elog(ERROR, "cache lookup of relation %u failed", relid);
+     reltup = (Form_pg_class) GETSTRUCT(tp);
+
+     /* Qualify the name if not visible in search path */
+     if (RelationIsVisible(relid))
+         nspname = NULL;
+     else
+         nspname = get_namespace_name(reltup->relnamespace);
+
+     result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
+
+     ReleaseSysCache(tp);
+
+     return result;
  }
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/expected/dblink.out,v
retrieving revision 1.6
diff -c -r1.6 dblink.out
*** contrib/dblink/expected/dblink.out    3 Nov 2002 04:52:09 -0000    1.6
--- contrib/dblink/expected/dblink.out    23 Nov 2002 17:05:41 -0000
***************
*** 59,64 ****
--- 59,101 ----
   DELETE FROM foo WHERE f1 = '0' AND f2 = 'a'
  (1 row)

+ -- retest using a quoted and schema qualified table
+ CREATE SCHEMA "MySchema";
+ CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2));
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index 'Foo_pkey' for table 'Foo'
+ INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+ -- list the primary key fields
+ SELECT *
+ FROM dblink_get_pkey('"MySchema"."Foo"');
+  position | colname
+ ----------+---------
+         1 | f1
+         2 | f2
+ (2 rows)
+
+ -- build an insert statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+                         dblink_build_sql_insert
+ ------------------------------------------------------------------------
+  INSERT INTO "MySchema"."Foo"(f1,f2,f3) VALUES('99','xyz','{a0,b0,c0}')
+ (1 row)
+
+ -- build an update statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+                                        dblink_build_sql_update
+ -----------------------------------------------------------------------------------------------------
+  UPDATE "MySchema"."Foo" SET f1 = '99', f2 = 'xyz', f3 = '{a0,b0,c0}' WHERE f1 = '99' AND f2 = 'xyz'
+ (1 row)
+
+ -- build a delete statement based on a local tuple,
+ SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+                  dblink_build_sql_delete
+ ----------------------------------------------------------
+  DELETE FROM "MySchema"."Foo" WHERE f1 = '0' AND f2 = 'a'
+ (1 row)
+
  -- regular old dblink
  SELECT *
  FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[])
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.6
diff -c -r1.6 dblink.sql
*** contrib/dblink/sql/dblink.sql    3 Nov 2002 04:52:09 -0000    1.6
--- contrib/dblink/sql/dblink.sql    23 Nov 2002 17:05:37 -0000
***************
*** 44,49 ****
--- 44,69 ----
  -- build a delete statement based on a local tuple,
  SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}');

+ -- retest using a quoted and schema qualified table
+ CREATE SCHEMA "MySchema";
+ CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2));
+ INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+
+ -- list the primary key fields
+ SELECT *
+ FROM dblink_get_pkey('"MySchema"."Foo"');
+
+ -- build an insert statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+
+ -- build an update statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+
+ -- build a delete statement based on a local tuple,
+ SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+
  -- regular old dblink
  SELECT *
  FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[])

Re: quote_ident and schemas (was Re: [HACKERS] connectby with schema)

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> Attached patch removes most (hopefully just the appropriate ones) calls in
> dblink to quote_ident, requiring the user to quote their own identifiers. I
> also added to the regression test a case for a quoted, schema qualified table
> name.

> If it's not too late, I'd like this to get into 7.3, but in any case,
> please apply to HEAD.

Applied --- in 7.3 also, since Marc hasn't rolled RC2 yet.

            regards, tom lane