Re: New patch for Column-level privileges - Mailing list pgsql-hackers

From KaiGai Kohei
Subject Re: New patch for Column-level privileges
Date
Msg-id 496C56B1.8020708@ak.jp.nec.com
Whole thread Raw
In response to Re: New patch for Column-level privileges  (KaiGai Kohei <kaigai@ak.jp.nec.com>)
Responses Re: New patch for Column-level privileges  (Stephen Frost <sfrost@snowman.net>)
List pgsql-hackers
The attached patch is proof of the concept.

It can be applied on the latest CVS HEAD with colprivs_2009011001.diff.gz.
- It unpatches unnecessary updates at parser/parse_expr.c, parser/parse_node.c
  and parser/parse_relation.c.
- It adds markColumnForSelectPriv() to mark proper rte->cols_sel for
  the given Var node. It is invoked from scanRTEForColumn(), expandRelAttrs()
  and transformWholeRowRef().
- The markColumnForSelectPriv() uses walker function internally, because
  there is no guarantee all the entities within RangeTblEntry->joinaliasvars
  are Var type node. However, it is used to walks on a single Var node, not
  whole of Query tree, so I think its cost is small enough.

==== Results:
postgres=# CREATE TABLE t1 (a int, b text, c bool);
CREATE TABLE
postgres=# CREATE TABLE t2 (x int, y text, z bool);
CREATE TABLE
postgres=# GRANT SELECT (a,b) ON t1 TO ymj;
GRANT
postgres=# GRANT SELECT (x,y) ON t2 TO ymj;
GRANT
postgres=# \c - ymj
psql (8.4devel)
You are now connected to database "postgres" as user "ymj".
postgres=> SELECT a,b FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
 a | b
---+---
(0 rows)

postgres=> SELECT * FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.c required: 0002 allowed: 0000
ERROR:  permission denied for relation t1

postgres=> SELECT a FROM t1 order by c;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.c required: 0002 allowed: 0000
ERROR:  permission denied for relation t1

postgres=> SELECT t1 FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.c required: 0002 allowed: 0000
ERROR:  permission denied for relation t1

postgres=> SELECT 'abcd' FROM t1;
DEBUG:  pg_attribute_aclmask: t1.tableoid required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.cmax required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.xmax required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.cmin required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.xmin required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.ctid required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
 ?column?
----------
(0 rows)

postgres=> SELECT b,y FROM t1 JOIN t2 ON a=x;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.x required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.y required: 0002 allowed: 0002
 b | y
---+---
(0 rows)

postgres=> SELECT b, (SELECT y FROM t2 WHERE x=a) FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.x required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.y required: 0002 allowed: 0002
 b | ?column?
---+----------
(0 rows)

Thanks,

KaiGai Kohei wrote:
> Tom Lane wrote:
>> I'm thinking make_var is not the place to do this.  The places that are
>> supposed to be taking care of permissions are the ones that do this:
>>
>>         /* Require read access --- see comments in setTargetTable() */
>>         rte->requiredPerms |= ACL_SELECT;
>
> Indeed, it seems to me appropriate policy, because CLP feature is
> a part of SQL standard access control model.
>
>
> The rte->requiredPerms is set on the following places:
>
> (1) transformLockingClause() and markQueryForLocking()
> It adds ACL_SELECT_FOR_UPDATE for listed relations.
> In this case, column-level priv feature checks ACL_UPDATE for each
> columns on rte->cols_sel, doesn't it?
> So, I think it is unnecessary to care about anything here.
>
> (2) setTargetTable()
> It is invoked from transformInsertStmt(), transformUpdateStmt() and
> transformDeleteStmt(). I thnk it is a proper place to set rte->cols_mod,
> but the caller does not initialize Query->targetList yet, when it is
> invoked.
> So, I think we need put a function call to set cols_mod on caller side
> based on Query->resultRelation and Query->targetList.
>
> (3) scanRTEForColumn()
> Stephen's original one patched here, but it does not handle RTE_JOIN
> well, so it made a matter on table-joins.
> Please revert a code to mark rte->cols_sel, with proper handling for
> RTE_JOIN cases.
>
> (4) addRangeTableEntry()
> Purpose of the function is to translate RangeVar node to RangeTblEntry
> node listed on FROM clause and so on.
> I think we don't need to add any column references here.
> When the translated relation used for column-references, it is a case
> that rte->cols_sel is empty.
>
> (5) addRangeTableEntryForRelation()
> It is similar to addRangeTableEntry().
> I think we don't need to add something here.
>
> (6) ExpandColumnRefStar() and ExpandAllTables()
> They invoke expandRelAttrs() to handle "SELECT * FROM t;" case.
> I think here is a proper point to mark refered columns.
> Please note that here is no guarantee that given RangeTblEntry has
> RTE_RELATION.
>
> Thus, we need the following reworking in my opinion.
> (a) Implement a function to set rte->cols_sel and rte->cols_mod correctly,
>     even if the given rte has RTE_JOIN.
> (b) Put a function to set rte->cols_mod on the caller of setTargetTable().
> (c) Put a function to set rte->cols_sel on scanRTEForColumn(), expandRelAttrs()
>     and transformWholeRowRef().
>
>> It's possible that we've missed some --- in particular, right at the
>> moment I am not sure that whole-row Vars are handled properly.
>
> Is transformWholeRowRef() a proper place to handle it?
> If given RangeTblEntry has RTE_JOIN, it has to resolve it and set
> its source's cols_sel.
>
>> And maybe we could refactor a little bit to save some code.
>> But those are basically the same places that ought to be adding
>> bits to the column bitmaps.
>


--
OSS Platform Development Division, NEC
KaiGai Kohei <kaigai@ak.jp.nec.com>
diff -cr a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
*** a/src/backend/parser/parse_expr.c    2009-01-13 15:54:53.000000000 +0900
--- b/src/backend/parser/parse_expr.c    2009-01-13 16:31:39.000000000 +0900
***************
*** 15,21 ****

  #include "postgres.h"

- #include "access/sysattr.h"
  #include "catalog/pg_type.h"
  #include "commands/dbcommands.h"
  #include "miscadmin.h"
--- 15,20 ----
***************
*** 1923,1937 ****
                               toid,
                               -1,
                               sublevels_up);
-
-             /* Add this full-row reference to the RTEs cols_sel set to handle
-              * column-level permissions.  Note that we use a bitmapset here
-              * and it can only handle non-negative values, so we need to
-              * offset by FirstLowInvalidHeapAttributeNumber.  This will be
-              * undone in execMain. */
-             rte->cols_sel = bms_add_member(rte->cols_sel,
-                                         InvalidAttrNumber -
-                                         FirstLowInvalidHeapAttributeNumber);
              break;
          case RTE_FUNCTION:
              toid = exprType(rte->funcexpr);
--- 1922,1927 ----
***************
*** 1987,1992 ****
--- 1977,1984 ----
      /* location is not filled in by makeVar */
      result->location = location;

+     markColumnForSelectPriv((Node *) result, pstate);
+
      return (Node *) result;
  }

diff -cr a/src/backend/parser/parse_node.c b/src/backend/parser/parse_node.c
*** a/src/backend/parser/parse_node.c    2009-01-13 15:54:53.000000000 +0900
--- b/src/backend/parser/parse_node.c    2009-01-13 16:31:39.000000000 +0900
***************
*** 15,21 ****
  #include "postgres.h"

  #include "access/heapam.h"
- #include "access/sysattr.h"
  #include "catalog/pg_type.h"
  #include "mb/pg_wchar.h"
  #include "nodes/makefuncs.h"
--- 15,20 ----
***************
*** 189,203 ****
      get_rte_attribute_type(rte, attrno, &vartypeid, &type_mod);
      result = makeVar(vnum, attrno, vartypeid, type_mod, sublevels_up);
      result->location = location;
-
-     /* This is also a convenient place to add this column to the cols_sel
-      * of the RTE.  This is needed to handle column-level permissions.
-      * Note that we use a bitmapset here which can only handle non-negative
-      * values, so we have to offset it by FirstLowInvalidHeapAttributeNumber.
-      */
-     rte->cols_sel = bms_add_member(rte->cols_sel, attrno -
-                                     FirstLowInvalidHeapAttributeNumber);
-
      return result;
  }

--- 188,193 ----
diff -cr a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
*** a/src/backend/parser/parse_relation.c    2009-01-13 15:54:53.000000000 +0900
--- b/src/backend/parser/parse_relation.c    2009-01-13 16:31:39.000000000 +0900
***************
*** 40,50 ****
  static RangeTblEntry *scanNameSpaceForRelid(ParseState *pstate, Oid relid,
                                              int location);
  static bool isLockedRel(ParseState *pstate, char *refname);
! static void expandRelation(RangeTblEntry *rte,
                 int rtindex, int sublevels_up,
                 int location, bool include_dropped,
                 List **colnames, List **colvars);
! static void expandTupleDesc(TupleDesc tupdesc, RangeTblEntry *rte,
                  int rtindex, int sublevels_up,
                  int location, bool include_dropped,
                  List **colnames, List **colvars);
--- 40,50 ----
  static RangeTblEntry *scanNameSpaceForRelid(ParseState *pstate, Oid relid,
                                              int location);
  static bool isLockedRel(ParseState *pstate, char *refname);
! static void expandRelation(Oid relid, Alias *eref,
                 int rtindex, int sublevels_up,
                 int location, bool include_dropped,
                 List **colnames, List **colvars);
! static void expandTupleDesc(TupleDesc tupdesc, Alias *eref,
                  int rtindex, int sublevels_up,
                  int location, bool include_dropped,
                  List **colnames, List **colvars);
***************
*** 430,435 ****
--- 430,470 ----
  }

  /*
+  * markColumnForSelectPriv
+  *     It marks rte->cols_sel for given Var node
+  */
+ bool
+ markColumnForSelectPriv(Node *node, ParseState *pstate)
+ {
+     if (IsA(node, Var))
+     {
+         RangeTblEntry *rte;
+         ParseState *cur = pstate;
+         Var *v = (Var *) node;
+         int lv;
+
+         for (lv = v->varlevelsup; lv > 0; lv--)
+             cur = cur->parentParseState;
+
+         rte = rt_fetch(v->varno, cur->p_rtable);
+         Assert(IsA(rte, RangeTblEntry));
+
+         if (rte->rtekind == RTE_RELATION)
+         {
+             rte->cols_sel = bms_add_member(rte->cols_sel, v->varattno
+                                     - FirstLowInvalidHeapAttributeNumber);
+         }
+         else if (rte->rtekind == RTE_JOIN)
+         {
+             Node *subnode = list_nth(rte->joinaliasvars, v->varattno - 1);
+
+             markColumnForSelectPriv(subnode, cur);
+         }
+     }
+     return expression_tree_walker(node, markColumnForSelectPriv, (void *) pstate);
+ }
+
+ /*
   * scanRTEForColumn
   *      Search the column names of a single RTE for the given name.
   *      If found, return an appropriate Var node, else return NULL.
***************
*** 479,484 ****
--- 514,520 ----
              result = (Node *) make_var(pstate, rte, attnum, location);
              /* Require read access */
              rte->requiredPerms |= ACL_SELECT;
+             markColumnForSelectPriv(result, pstate);
          }
      }

***************
*** 1480,1486 ****
      {
          case RTE_RELATION:
              /* Ordinary relation RTE */
!             expandRelation(rte,
                             rtindex, sublevels_up, location,
                             include_dropped, colnames, colvars);
              break;
--- 1516,1522 ----
      {
          case RTE_RELATION:
              /* Ordinary relation RTE */
!             expandRelation(rte->relid, rte->eref,
                             rtindex, sublevels_up, location,
                             include_dropped, colnames, colvars);
              break;
***************
*** 1538,1544 ****
                  {
                      /* Composite data type, e.g. a table's row type */
                      Assert(tupdesc);
!                     expandTupleDesc(tupdesc, rte,
                                      rtindex, sublevels_up, location,
                                      include_dropped, colnames, colvars);
                  }
--- 1574,1580 ----
                  {
                      /* Composite data type, e.g. a table's row type */
                      Assert(tupdesc);
!                     expandTupleDesc(tupdesc, rte->eref,
                                      rtindex, sublevels_up, location,
                                      include_dropped, colnames, colvars);
                  }
***************
*** 1735,1749 ****
   * expandRelation -- expandRTE subroutine
   */
  static void
! expandRelation(RangeTblEntry *rte, int rtindex, int sublevels_up,
                 int location, bool include_dropped,
                 List **colnames, List **colvars)
  {
      Relation    rel;

      /* Get the tupledesc and turn it over to expandTupleDesc */
!     rel = relation_open(rte->relid, AccessShareLock);
!     expandTupleDesc(rel->rd_att, rte, rtindex, sublevels_up,
                      location, include_dropped,
                      colnames, colvars);
      relation_close(rel, AccessShareLock);
--- 1771,1785 ----
   * expandRelation -- expandRTE subroutine
   */
  static void
! expandRelation(Oid relid, Alias *eref, int rtindex, int sublevels_up,
                 int location, bool include_dropped,
                 List **colnames, List **colvars)
  {
      Relation    rel;

      /* Get the tupledesc and turn it over to expandTupleDesc */
!     rel = relation_open(relid, AccessShareLock);
!     expandTupleDesc(rel->rd_att, eref, rtindex, sublevels_up,
                      location, include_dropped,
                      colnames, colvars);
      relation_close(rel, AccessShareLock);
***************
*** 1753,1764 ****
   * expandTupleDesc -- expandRTE subroutine
   */
  static void
! expandTupleDesc(TupleDesc tupdesc, RangeTblEntry *rte,
                  int rtindex, int sublevels_up,
                  int location, bool include_dropped,
                  List **colnames, List **colvars)
  {
-     Alias        *eref = rte->eref;
      int            maxattrs = tupdesc->natts;
      int            numaliases = list_length(eref->colnames);
      int            varattno;
--- 1789,1799 ----
   * expandTupleDesc -- expandRTE subroutine
   */
  static void
! expandTupleDesc(TupleDesc tupdesc, Alias *eref,
                  int rtindex, int sublevels_up,
                  int location, bool include_dropped,
                  List **colnames, List **colvars)
  {
      int            maxattrs = tupdesc->natts;
      int            numaliases = list_length(eref->colnames);
      int            varattno;
***************
*** 1784,1802 ****
              }
              continue;
          }
-         else
-         {
-             if (rte->rtekind == RTE_RELATION)
-             {
-                 /* Add these columns to the RTEs cols_sel bitmapset.  As a
-                  * bitmapset can only handle non-negative values, we have to
-                  * offset it by FirstLowInvalidHeapAttributeNumber.
-                  * This will be undone in execMain. */
-                 rte->cols_sel = bms_add_member(rte->cols_sel,
-                         varattno+1 -
-                         FirstLowInvalidHeapAttributeNumber);
-             }
-         }

          if (colnames)
          {
--- 1819,1824 ----
***************
*** 1856,1861 ****
--- 1878,1885 ----
                               label,
                               false);
          te_list = lappend(te_list, te);
+
+         markColumnForSelectPriv(varnode, pstate);
      }

      Assert(name == NULL && var == NULL);    /* lists not the same length? */
diff -cr a/src/include/parser/parse_relation.h b/src/include/parser/parse_relation.h
*** a/src/include/parser/parse_relation.h    2009-01-13 15:54:53.000000000 +0900
--- b/src/include/parser/parse_relation.h    2009-01-13 16:31:39.000000000 +0900
***************
*** 36,41 ****
--- 36,43 ----
                         int sublevels_up);
  extern CommonTableExpr *GetCTEForRTE(ParseState *pstate, RangeTblEntry *rte,
                                       int rtelevelsup);
+ extern bool markColumnForSelectPriv(Node *node, ParseState *pstate);
+
  extern Node *scanRTEForColumn(ParseState *pstate, RangeTblEntry *rte,
                   char *colname, int location);
  extern Node *colNameToVar(ParseState *pstate, char *colname, bool localonly,

pgsql-hackers by date:

Previous
From: "Fujii Masao"
Date:
Subject: Re: Synch Rep v5
Next
From: "Fujii Masao"
Date:
Subject: Re: Multiplexing SUGUSR1