Re: quote_ident and schemas (was Re: [HACKERS] connectby - Mailing list pgsql-patches

From Joe Conway
Subject Re: quote_ident and schemas (was Re: [HACKERS] connectby
Date
Msg-id 3DDFBF4D.6090708@joeconway.com
Whole thread Raw
In response to Re: quote_ident and schemas (was Re: [HACKERS] connectby with schema)  (Joe Conway <mail@joeconway.com>)
Responses Re: quote_ident and schemas (was Re: [HACKERS] connectby with schema)  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-patches
Joe Conway wrote:
> p.s. There are similar issues in dblink, but they appear a bit more
> difficult to address. I'll attempt to get them resloved this weekend,
> again in hopes to get them applied before 7.3 is released.
>

Attached patch removes most (hopefully just the appropriate ones) calls in
dblink to quote_ident, requiring the user to quote their own identifiers. I
also added to the regression test a case for a quoted, schema qualified table
name.

If it's not too late, I'd like this to get into 7.3, but in any case,
please apply to HEAD.

Thanks,

Joe
Index: contrib/dblink/README.dblink
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/README.dblink,v
retrieving revision 1.6
diff -c -r1.6 README.dblink
*** contrib/dblink/README.dblink    2 Sep 2002 06:13:31 -0000    1.6
--- contrib/dblink/README.dblink    23 Nov 2002 17:21:40 -0000
***************
*** 151,156 ****
--- 151,160 ----

  Documentation:

+   Note: Parameters representing relation names must include double
+      quotes if the names are mixed-case or contain special characters. They
+      must also be appropriately qualified with schema name if applicable.
+
    See the following files:
       doc/connection
       doc/cursor
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/dblink.c,v
retrieving revision 1.18
diff -c -r1.18 dblink.c
*** contrib/dblink/dblink.c    13 Nov 2002 00:39:46 -0000    1.18
--- contrib/dblink/dblink.c    23 Nov 2002 16:45:36 -0000
***************
*** 71,76 ****
--- 71,77 ----
  static void append_res_ptr(dblink_results * results);
  static void remove_res_ptr(dblink_results * results);
  static TupleDesc pgresultGetTupleDesc(PGresult *res);
+ static char *generate_relation_name(Oid relid);

  /* Global */
  List       *res_id = NIL;
***************
*** 171,177 ****
      }
      PQclear(res);

!     appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
      res = PQexec(conn, str->data);
      if (!res ||
          (PQresultStatus(res) != PGRES_COMMAND_OK &&
--- 172,178 ----
      }
      PQclear(res);

!     appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
      res = PQexec(conn, str->data);
      if (!res ||
          (PQresultStatus(res) != PGRES_COMMAND_OK &&
***************
*** 210,216 ****
      else
          elog(ERROR, "dblink_close: no connection available");

!     appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));

      /* close the cursor */
      res = PQexec(conn, str->data);
--- 211,217 ----
      else
          elog(ERROR, "dblink_close: no connection available");

!     appendStringInfo(str, "CLOSE %s", curname);

      /* close the cursor */
      res = PQexec(conn, str->data);
***************
*** 287,293 ****
          else
              elog(ERROR, "dblink_fetch: no connection available");

!         appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));

          res = PQexec(conn, str->data);
          if (!res ||
--- 288,294 ----
          else
              elog(ERROR, "dblink_fetch: no connection available");

!         appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);

          res = PQexec(conn, str->data);
          if (!res ||
***************
*** 306,312 ****
          {
              /* cursor does not exist - closed already or bad name */
              PQclear(res);
!             elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
          }

          funcctx->max_calls = PQntuples(res);
--- 307,313 ----
          {
              /* cursor does not exist - closed already or bad name */
              PQclear(res);
!             elog(ERROR, "dblink_fetch: cursor %s does not exist", curname);
          }

          funcctx->max_calls = PQntuples(res);
***************
*** 1527,1537 ****
      int            i;
      bool        needComma;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

--- 1528,1540 ----
      int            i;
      bool        needComma;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

***************
*** 1539,1545 ****
      if (!tuple)
          elog(ERROR, "dblink_build_sql_insert: row not found");

!     appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));

      needComma = false;
      for (i = 0; i < natts; i++)
--- 1542,1548 ----
      if (!tuple)
          elog(ERROR, "dblink_build_sql_insert: row not found");

!     appendStringInfo(str, "INSERT INTO %s(", relname);

      needComma = false;
      for (i = 0; i < natts; i++)
***************
*** 1610,1624 ****
      char       *val;
      int            i;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

!     appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
      for (i = 0; i < pknumatts; i++)
      {
          int16        pkattnum = pkattnums[i];
--- 1613,1629 ----
      char       *val;
      int            i;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

!     appendStringInfo(str, "DELETE FROM %s WHERE ", relname);
      for (i = 0; i < pknumatts; i++)
      {
          int16        pkattnum = pkattnums[i];
***************
*** 1669,1679 ****
      int            i;
      bool        needComma;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

--- 1674,1686 ----
      int            i;
      bool        needComma;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = rel->rd_att;
      natts = tupdesc->natts;

***************
*** 1681,1687 ****
      if (!tuple)
          elog(ERROR, "dblink_build_sql_update: row not found");

!     appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));

      needComma = false;
      for (i = 0; i < natts; i++)
--- 1688,1694 ----
      if (!tuple)
          elog(ERROR, "dblink_build_sql_update: row not found");

!     appendStringInfo(str, "UPDATE %s SET ", relname);

      needComma = false;
      for (i = 0; i < natts; i++)
***************
*** 1813,1823 ****
      int            i;
      char       *val = NULL;

      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
-     relname = RelationGetRelationName(rel);
      tupdesc = CreateTupleDescCopy(rel->rd_att);
      relation_close(rel, AccessShareLock);

--- 1820,1832 ----
      int            i;
      char       *val = NULL;

+     /* get relation name including any needed schema prefix and quoting */
+     relname = generate_relation_name(relid);
+
      /*
       * Open relation using relid
       */
      rel = relation_open(relid, AccessShareLock);
      tupdesc = CreateTupleDescCopy(rel->rd_att);
      relation_close(rel, AccessShareLock);

***************
*** 1831,1837 ****
       * Build sql statement to look up tuple of interest Use src_pkattvals
       * as the criteria.
       */
!     appendStringInfo(str, "SELECT * FROM %s WHERE ", quote_ident_cstr(relname));

      for (i = 0; i < pknumatts; i++)
      {
--- 1840,1846 ----
       * Build sql statement to look up tuple of interest Use src_pkattvals
       * as the criteria.
       */
!     appendStringInfo(str, "SELECT * FROM %s WHERE ", relname);

      for (i = 0; i < pknumatts; i++)
      {
***************
*** 2002,2005 ****
--- 2011,2048 ----
      }

      return desc;
+ }
+
+ /*
+  * generate_relation_name - copied from ruleutils.c
+  *        Compute the name to display for a relation specified by OID
+  *
+  * The result includes all necessary quoting and schema-prefixing.
+  */
+ static char *
+ generate_relation_name(Oid relid)
+ {
+     HeapTuple    tp;
+     Form_pg_class reltup;
+     char       *nspname;
+     char       *result;
+
+     tp = SearchSysCache(RELOID,
+                         ObjectIdGetDatum(relid),
+                         0, 0, 0);
+     if (!HeapTupleIsValid(tp))
+         elog(ERROR, "cache lookup of relation %u failed", relid);
+     reltup = (Form_pg_class) GETSTRUCT(tp);
+
+     /* Qualify the name if not visible in search path */
+     if (RelationIsVisible(relid))
+         nspname = NULL;
+     else
+         nspname = get_namespace_name(reltup->relnamespace);
+
+     result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
+
+     ReleaseSysCache(tp);
+
+     return result;
  }
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/expected/dblink.out,v
retrieving revision 1.6
diff -c -r1.6 dblink.out
*** contrib/dblink/expected/dblink.out    3 Nov 2002 04:52:09 -0000    1.6
--- contrib/dblink/expected/dblink.out    23 Nov 2002 17:05:41 -0000
***************
*** 59,64 ****
--- 59,101 ----
   DELETE FROM foo WHERE f1 = '0' AND f2 = 'a'
  (1 row)

+ -- retest using a quoted and schema qualified table
+ CREATE SCHEMA "MySchema";
+ CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2));
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index 'Foo_pkey' for table 'Foo'
+ INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+ -- list the primary key fields
+ SELECT *
+ FROM dblink_get_pkey('"MySchema"."Foo"');
+  position | colname
+ ----------+---------
+         1 | f1
+         2 | f2
+ (2 rows)
+
+ -- build an insert statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+                         dblink_build_sql_insert
+ ------------------------------------------------------------------------
+  INSERT INTO "MySchema"."Foo"(f1,f2,f3) VALUES('99','xyz','{a0,b0,c0}')
+ (1 row)
+
+ -- build an update statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+                                        dblink_build_sql_update
+ -----------------------------------------------------------------------------------------------------
+  UPDATE "MySchema"."Foo" SET f1 = '99', f2 = 'xyz', f3 = '{a0,b0,c0}' WHERE f1 = '99' AND f2 = 'xyz'
+ (1 row)
+
+ -- build a delete statement based on a local tuple,
+ SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+                  dblink_build_sql_delete
+ ----------------------------------------------------------
+  DELETE FROM "MySchema"."Foo" WHERE f1 = '0' AND f2 = 'a'
+ (1 row)
+
  -- regular old dblink
  SELECT *
  FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[])
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.6
diff -c -r1.6 dblink.sql
*** contrib/dblink/sql/dblink.sql    3 Nov 2002 04:52:09 -0000    1.6
--- contrib/dblink/sql/dblink.sql    23 Nov 2002 17:05:37 -0000
***************
*** 44,49 ****
--- 44,69 ----
  -- build a delete statement based on a local tuple,
  SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}');

+ -- retest using a quoted and schema qualified table
+ CREATE SCHEMA "MySchema";
+ CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2));
+ INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+
+ -- list the primary key fields
+ SELECT *
+ FROM dblink_get_pkey('"MySchema"."Foo"');
+
+ -- build an insert statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+
+ -- build an update statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+
+ -- build a delete statement based on a local tuple,
+ SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+
  -- regular old dblink
  SELECT *
  FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[])

pgsql-patches by date:

Previous
From: Tom Lane
Date:
Subject: Re: Hierarchical queries a la Oracle. Patch.
Next
From: Tom Lane
Date:
Subject: Re: quote_ident and schemas (was Re: [HACKERS] connectby with schema)