Re: [HACKERS] Domains and arrays and composites, oh my - Mailing list pgsql-hackers

From Tom Lane
Subject Re: [HACKERS] Domains and arrays and composites, oh my
Date
Msg-id 12364.1499973554@sss.pgh.pa.us
Whole thread Raw
In response to [HACKERS] Domains and arrays and composites, oh my  (Tom Lane <tgl@sss.pgh.pa.us>)
Responses Re: [HACKERS] Domains and arrays and composites, oh my
Re: [HACKERS] Domains and arrays and composites, oh my
List pgsql-hackers
I wrote:
> I started to look into allowing domains over composite types, which is
> another never-implemented case that there's no very good reason not to
> allow.  Well, other than the argument that the SQL standard only allows
> domains over "predefined" (built-in) types ... but we blew past that
> restriction ages ago.

Attached is a draft patch that allows domains over composite types.
I think it's probably complete on its own terms, but there are some
questions around behavior of functions returning domain-over-composite
that could use discussion, and some of the PLs need some follow-on work.

The core principle here was to not modify execution-time behavior by
adding domain checks to existing code paths.  Rather, I wanted the
parser to insert CoerceToDomain nodes wherever checks would be needed.
Row-returning node types such as RowExpr and FieldStore only return
regular composite types, as before; to generate a value of a composite
domain, we construct a value of the base type and then CoerceToDomain.
(This is pretty analogous to what happens for domains over arrays.)
Whole-row Vars can only have regular composite types as vartype,
TupleDescs can only have regular composite types as tdtypeid, composite
Datums only have regular composite type OIDs in datum_typeid.  (The last
would be expected anyway, since the physical representation of a domain
value is never different from that of its base type.)

Doing it that way led to a nicely small patch, only about 160 net new
lines of code.  However, not touching the executor meant not touching
the behavior of functions that return domains, even if the domain is
domain-over-composite.  In code terms this means that 
get_call_result_type() and sibling functions will return TYPEFUNC_SCALAR
not TYPEFUNC_COMPOSITE for such a result type.  The difficulty with
changing that is that if these functions look through the domain, then
the calling code (in, usually, a PL) will simply build and return a result
of the underlying composite type, failing to apply any domain constraints.
Trying to get out-of-core PLs on board with a change in those requirements
seems like a risky proposition.

Concretely, consider

create type complex as (r float8, i float8);
create domain dcomplex as complex;

You can make a SQL-language function to return complex in either of two
ways:

create function fc() returns complex language sql
as 'select 1.0::float8, 2.0::float8';

create function fc() returns complex language sql
as 'select row(1.0::float8, 2.0::float8)::complex';

As the patch stands, though, only the second way works for domains over
composite:

regression=# create function fdc() returns dcomplex language sql
as 'select 1.0::float8, 2.0::float8';
ERROR:  return type mismatch in function declared to return dcomplex
DETAIL:  Final statement must return exactly one column.
CONTEXT:  SQL function "fdc"
regression=# create function fdc() returns dcomplex language sql
as 'select row(1.0::float8, 2.0)::dcomplex';
CREATE FUNCTION

Now, maybe that's fine.  SQL-language functions have never been very
willing to insert implicit casts to get to the function result type,
and certainly the only way that the first definition could be legal
is if there were an implicit up-cast to the domain type.  It might be
OK to just leave it like this, though some documentation about it
would be a good idea.

plpgsql functions seem generally okay as far as composite domain return
types go, because they don't have anything corresponding to the row
return convention of SQL functions.  And plpgsql's greater willingness
to do implicit coercions reduces the notational burden, too.  But
there's some work yet to be done to get plpgsql to realize that
composite domain local variables have substructure.  For example,
this works:

    declare x complex;
    ...
    x.r := 1;

but it fails if x is dcomplex.  But ISTM that that would be better
handled as a followon feature patch.  I suspect that the other PLs may
have similar issues where it'd be nice to allow domain-over-composite
to act like a plain composite for specific purposes; but I've not looked.

Another issue related to function result types is that the parser
considers a function-in-FROM returning a composite domain to be
producing a scalar result not a rowtype.  Thus you get this for a
function returning complex:

regression=# select * from fc();
 r | i 
---+---
 1 | 2
(1 row)

but this for a function returning dcomplex:

regression=# select * from fdc();
  fdc  
-------
 (1,2)
(1 row)

I think that that could be changed with only local changes in parse
analysis, but do we want to change it?  Arguably, making fdc() act the
same as fc() here would amount to implicitly downcasting the domain to
its base type.  But doing so here is optional, not necessary in order to
make the statement sane at all, and it's arguable that we shouldn't do
that if the user didn't tell us to.  A user who does want that to happen
can downcast explicitly:

regression=# select * from cast(fdc() as complex);
 r | i 
---+---
 1 | 2
(1 row)

(For arcane syntactic reasons you can't abbreviate CAST with :: here.)
Another point is that if you do want the domain value as a domain
value, and not smashed to its base type, it would be hard to get at
if the parser acts this way --- "foo.*" would end up producing the base
rowtype, or if it didn't, we'd have some issues with the previously
noted rule about whole-row Vars never having domain types.

So there's a case to be made that this behavior is fine as-is, but
certainly you could also argue that it's a POLA violation.

Digression: one reason I'm hesitant to introduce inessential reductions
of domains to base types is that I'm looking ahead to arrays over
domains, which will provide a workaround for the people who complain
that they wish 2-D arrays would work type-wise like arrays of 1-D array
objects.  If you "create domain inta as int[]" then inta[] would act
like an array of array objects, mostly solving the problem I think.
But it solves the problem only because we don't consider that a domain
is indistinguishable from its base type.  It's hard to be sure without
having done the work yet, but I think there will be cases where being
over-eager to treat a domain as its base type might break the behavior
we want for that case.  So I don't want to create a precedent for that
here.

Thoughts?

Obviously this is work for v11; I'll go stick it in the next commitfest.

            regards, tom lane

diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index 245a374..1bd8a58 100644
*** a/src/backend/catalog/pg_inherits.c
--- b/src/backend/catalog/pg_inherits.c
*************** has_superclass(Oid relationId)
*** 301,306 ****
--- 301,311 ----
  /*
   * Given two type OIDs, determine whether the first is a complex type
   * (class type) that inherits from the second.
+  *
+  * This essentially asks whether the first type is guaranteed to be coercible
+  * to the second.  Therefore, we allow the first type to be a domain over a
+  * complex type that inherits from the second; that creates no difficulties.
+  * But the second type cannot be a domain.
   */
  bool
  typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId)
*************** typeInheritsFrom(Oid subclassTypeId, Oid
*** 314,322 ****
      ListCell   *queue_item;

      /* We need to work with the associated relation OIDs */
!     subclassRelid = typeidTypeRelid(subclassTypeId);
      if (subclassRelid == InvalidOid)
!         return false;            /* not a complex type */
      superclassRelid = typeidTypeRelid(superclassTypeId);
      if (superclassRelid == InvalidOid)
          return false;            /* not a complex type */
--- 319,327 ----
      ListCell   *queue_item;

      /* We need to work with the associated relation OIDs */
!     subclassRelid = typeOrDomainTypeRelid(subclassTypeId);
      if (subclassRelid == InvalidOid)
!         return false;            /* not a complex type or domain over one */
      superclassRelid = typeidTypeRelid(superclassTypeId);
      if (superclassRelid == InvalidOid)
          return false;            /* not a complex type */
diff --git a/src/backend/catalog/pg_proc.c b/src/backend/catalog/pg_proc.c
index 571856e..47916cf 100644
*** a/src/backend/catalog/pg_proc.c
--- b/src/backend/catalog/pg_proc.c
*************** ProcedureCreate(const char *procedureNam
*** 262,268 ****
       */
      if (parameterCount == 1 &&
          OidIsValid(parameterTypes->values[0]) &&
!         (relid = typeidTypeRelid(parameterTypes->values[0])) != InvalidOid &&
          get_attnum(relid, procedureName) != InvalidAttrNumber)
          ereport(ERROR,
                  (errcode(ERRCODE_DUPLICATE_COLUMN),
--- 262,268 ----
       */
      if (parameterCount == 1 &&
          OidIsValid(parameterTypes->values[0]) &&
!         (relid = typeOrDomainTypeRelid(parameterTypes->values[0])) != InvalidOid &&
          get_attnum(relid, procedureName) != InvalidAttrNumber)
          ereport(ERROR,
                  (errcode(ERRCODE_DUPLICATE_COLUMN),
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index c2fc59d..0a08ef5 100644
*** a/src/backend/commands/typecmds.c
--- b/src/backend/commands/typecmds.c
*************** DefineDomain(CreateDomainStmt *stmt)
*** 796,808 ****
      basetypeoid = HeapTupleGetOid(typeTup);

      /*
!      * Base type must be a plain base type, another domain, an enum or a range
!      * type. Domains over pseudotypes would create a security hole.  Domains
!      * over composite types might be made to work in the future, but not
!      * today.
       */
      typtype = baseType->typtype;
      if (typtype != TYPTYPE_BASE &&
          typtype != TYPTYPE_DOMAIN &&
          typtype != TYPTYPE_ENUM &&
          typtype != TYPTYPE_RANGE)
--- 796,811 ----
      basetypeoid = HeapTupleGetOid(typeTup);

      /*
!      * Base type must be a plain base type, a composite type, another domain,
!      * an enum or a range type.  Domains over pseudotypes would create a
!      * security hole.  (It would be shorter to code this to just check for
!      * pseudotypes; but it seems safer to call out the specific typtypes that
!      * are supported, rather than assume that all future typtypes would be
!      * automatically supported.)
       */
      typtype = baseType->typtype;
      if (typtype != TYPTYPE_BASE &&
+         typtype != TYPTYPE_COMPOSITE &&
          typtype != TYPTYPE_DOMAIN &&
          typtype != TYPTYPE_ENUM &&
          typtype != TYPTYPE_RANGE)
diff --git a/src/backend/parser/parse_coerce.c b/src/backend/parser/parse_coerce.c
index 0bc7dba..7b16d6c 100644
*** a/src/backend/parser/parse_coerce.c
--- b/src/backend/parser/parse_coerce.c
*************** coerce_type(ParseState *pstate, Node *no
*** 500,508 ****
--- 500,525 ----
           * Input class type is a subclass of target, so generate an
           * appropriate runtime conversion (removing unneeded columns and
           * possibly rearranging the ones that are wanted).
+          *
+          * We will also get here when the input is a domain over a subclass of
+          * the target type.  To keep life simple for the executor, we define
+          * ConvertRowtypeExpr as only working between regular composite types;
+          * therefore, in such cases insert a RelabelType to smash the input
+          * expression down to its base type.
           */
+         Oid            baseTypeId = getBaseType(inputTypeId);
          ConvertRowtypeExpr *r = makeNode(ConvertRowtypeExpr);

+         if (baseTypeId != inputTypeId)
+         {
+             RelabelType *rt = makeRelabelType((Expr *) node,
+                                               baseTypeId, -1,
+                                               InvalidOid,
+                                               COERCE_IMPLICIT_CAST);
+
+             rt->location = location;
+             node = (Node *) rt;
+         }
          r->arg = (Expr *) node;
          r->resulttype = targetTypeId;
          r->convertformat = cformat;
*************** coerce_record_to_complex(ParseState *pst
*** 938,943 ****
--- 955,962 ----
                           int location)
  {
      RowExpr    *rowexpr;
+     Oid            baseTypeId;
+     int32        baseTypeMod = -1;
      TupleDesc    tupdesc;
      List       *args = NIL;
      List       *newargs;
*************** coerce_record_to_complex(ParseState *pst
*** 973,979 ****
                          format_type_be(targetTypeId)),
                   parser_coercion_errposition(pstate, location, node)));

!     tupdesc = lookup_rowtype_tupdesc(targetTypeId, -1);
      newargs = NIL;
      ucolno = 1;
      arg = list_head(args);
--- 992,1005 ----
                          format_type_be(targetTypeId)),
                   parser_coercion_errposition(pstate, location, node)));

!     /*
!      * Look up the composite type, accounting for possibility that what we are
!      * given is a domain over composite.
!      */
!     baseTypeId = getBaseTypeAndTypmod(targetTypeId, &baseTypeMod);
!     tupdesc = lookup_rowtype_tupdesc(baseTypeId, baseTypeMod);
!
!     /* Process the fields */
      newargs = NIL;
      ucolno = 1;
      arg = list_head(args);
*************** coerce_record_to_complex(ParseState *pst
*** 1041,1050 ****

      rowexpr = makeNode(RowExpr);
      rowexpr->args = newargs;
!     rowexpr->row_typeid = targetTypeId;
      rowexpr->row_format = cformat;
      rowexpr->colnames = NIL;    /* not needed for named target type */
      rowexpr->location = location;
      return (Node *) rowexpr;
  }

--- 1067,1087 ----

      rowexpr = makeNode(RowExpr);
      rowexpr->args = newargs;
!     rowexpr->row_typeid = baseTypeId;
      rowexpr->row_format = cformat;
      rowexpr->colnames = NIL;    /* not needed for named target type */
      rowexpr->location = location;
+
+     /* If target is a domain, apply constraints */
+     if (baseTypeId != targetTypeId)
+     {
+         rowexpr->row_format = COERCE_IMPLICIT_CAST;
+         return coerce_to_domain((Node *) rowexpr,
+                                 baseTypeId, baseTypeMod,
+                                 targetTypeId,
+                                 cformat, location, true, false);
+     }
+
      return (Node *) rowexpr;
  }

*************** is_complex_array(Oid typid)
*** 2379,2391 ****

  /*
   * Check whether reltypeId is the row type of a typed table of type
!  * reloftypeId.  (This is conceptually similar to the subtype
!  * relationship checked by typeInheritsFrom().)
   */
  static bool
  typeIsOfTypedTable(Oid reltypeId, Oid reloftypeId)
  {
!     Oid            relid = typeidTypeRelid(reltypeId);
      bool        result = false;

      if (relid)
--- 2416,2428 ----

  /*
   * Check whether reltypeId is the row type of a typed table of type
!  * reloftypeId, or is a domain over such a row type.  (This is conceptually
!  * similar to the subtype relationship checked by typeInheritsFrom().)
   */
  static bool
  typeIsOfTypedTable(Oid reltypeId, Oid reloftypeId)
  {
!     Oid            relid = typeOrDomainTypeRelid(reltypeId);
      bool        result = false;

      if (relid)
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 8487eda..7570108 100644
*** a/src/backend/parser/parse_func.c
--- b/src/backend/parser/parse_func.c
*************** ParseComplexProjection(ParseState *pstat
*** 1819,1836 ****
      }

      /*
!      * Else do it the hard way with get_expr_result_type().
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_type.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(first_arg, Var) &&
          ((Var *) first_arg)->vartype == RECORDOID)
          tupdesc = expandRecordVariable(pstate, (Var *) first_arg, 0);
!     else if (get_expr_result_type(first_arg, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
          return NULL;            /* unresolvable RECORD type */
-     Assert(tupdesc);

      for (i = 0; i < tupdesc->natts; i++)
      {
--- 1819,1837 ----
      }

      /*
!      * Else do it the hard way with get_expr_result_tupdesc().
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_tupdesc.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(first_arg, Var) &&
          ((Var *) first_arg)->vartype == RECORDOID)
          tupdesc = expandRecordVariable(pstate, (Var *) first_arg, 0);
!     else
!         tupdesc = get_expr_result_tupdesc(first_arg, true);
!     if (!tupdesc)
          return NULL;            /* unresolvable RECORD type */

      for (i = 0; i < tupdesc->natts; i++)
      {
diff --git a/src/backend/parser/parse_target.c b/src/backend/parser/parse_target.c
index 0a70539..d689e2a 100644
*** a/src/backend/parser/parse_target.c
--- b/src/backend/parser/parse_target.c
*************** transformAssignmentIndirection(ParseStat
*** 725,730 ****
--- 725,732 ----
          else
          {
              FieldStore *fstore;
+             Oid            baseTypeId;
+             int32        baseTypeMod;
              Oid            typrelid;
              AttrNumber    attnum;
              Oid            fieldTypeId;
*************** transformAssignmentIndirection(ParseStat
*** 752,758 ****

              /* No subscripts, so can process field selection here */

!             typrelid = typeidTypeRelid(targetTypeId);
              if (!typrelid)
                  ereport(ERROR,
                          (errcode(ERRCODE_DATATYPE_MISMATCH),
--- 754,767 ----

              /* No subscripts, so can process field selection here */

!             /*
!              * Look up the composite type, accounting for possibility that
!              * what we are given is a domain over composite.
!              */
!             baseTypeMod = targetTypMod;
!             baseTypeId = getBaseTypeAndTypmod(targetTypeId, &baseTypeMod);
!
!             typrelid = typeidTypeRelid(baseTypeId);
              if (!typrelid)
                  ereport(ERROR,
                          (errcode(ERRCODE_DATATYPE_MISMATCH),
*************** transformAssignmentIndirection(ParseStat
*** 796,802 ****
              fstore->arg = (Expr *) basenode;
              fstore->newvals = list_make1(rhs);
              fstore->fieldnums = list_make1_int(attnum);
!             fstore->resulttype = targetTypeId;

              return (Node *) fstore;
          }
--- 805,819 ----
              fstore->arg = (Expr *) basenode;
              fstore->newvals = list_make1(rhs);
              fstore->fieldnums = list_make1_int(attnum);
!             fstore->resulttype = baseTypeId;
!
!             /* If target is a domain, apply constraints */
!             if (baseTypeId != targetTypeId)
!                 return coerce_to_domain((Node *) fstore,
!                                         baseTypeId, baseTypeMod,
!                                         targetTypeId,
!                                         COERCE_IMPLICIT_CAST, location,
!                                         false, false);

              return (Node *) fstore;
          }
*************** ExpandRowReference(ParseState *pstate, N
*** 1385,1406 ****
       * (This can be pretty inefficient if the expression involves nontrivial
       * computation :-(.)
       *
!      * Verify it's a composite type, and get the tupdesc.  We use
!      * get_expr_result_type() because that can handle references to functions
!      * returning anonymous record types.  If that fails, use
!      * lookup_rowtype_tupdesc(), which will almost certainly fail as well, but
!      * it will give an appropriate error message.
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_type.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(expr, Var) &&
          ((Var *) expr)->vartype == RECORDOID)
          tupleDesc = expandRecordVariable(pstate, (Var *) expr, 0);
!     else if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!         tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
!                                                 exprTypmod(expr));
      Assert(tupleDesc);

      /* Generate a list of references to the individual fields */
--- 1402,1419 ----
       * (This can be pretty inefficient if the expression involves nontrivial
       * computation :-(.)
       *
!      * Verify it's a composite type, and get the tupdesc.
!      * get_expr_result_tupdesc() handles this conveniently.
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_tupdesc.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(expr, Var) &&
          ((Var *) expr)->vartype == RECORDOID)
          tupleDesc = expandRecordVariable(pstate, (Var *) expr, 0);
!     else
!         tupleDesc = get_expr_result_tupdesc(expr, false);
      Assert(tupleDesc);

      /* Generate a list of references to the individual fields */
*************** expandRecordVariable(ParseState *pstate,
*** 1608,1622 ****

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_type() can do anything with it.  If not, pass to
!      * lookup_rowtype_tupdesc() which will probably fail, but will give an
!      * appropriate error message while failing.
       */
!     if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!         tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
!                                                 exprTypmod(expr));
!
!     return tupleDesc;
  }


--- 1621,1629 ----

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_tupdesc() can do anything with it.
       */
!     return get_expr_result_tupdesc(expr, false);
  }


diff --git a/src/backend/parser/parse_type.c b/src/backend/parser/parse_type.c
index d0b3fbe..b032651 100644
*** a/src/backend/parser/parse_type.c
--- b/src/backend/parser/parse_type.c
*************** stringTypeDatum(Type tp, char *string, i
*** 641,647 ****
      return OidInputFunctionCall(typinput, string, typioparam, atttypmod);
  }

! /* given a typeid, return the type's typrelid (associated relation, if any) */
  Oid
  typeidTypeRelid(Oid type_id)
  {
--- 641,650 ----
      return OidInputFunctionCall(typinput, string, typioparam, atttypmod);
  }

! /*
!  * Given a typeid, return the type's typrelid (associated relation), if any.
!  * Returns InvalidOid if type is not a composite type.
!  */
  Oid
  typeidTypeRelid(Oid type_id)
  {
*************** typeidTypeRelid(Oid type_id)
*** 652,658 ****
      typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_id));
      if (!HeapTupleIsValid(typeTuple))
          elog(ERROR, "cache lookup failed for type %u", type_id);
-
      type = (Form_pg_type) GETSTRUCT(typeTuple);
      result = type->typrelid;
      ReleaseSysCache(typeTuple);
--- 655,660 ----
*************** typeidTypeRelid(Oid type_id)
*** 660,665 ****
--- 662,699 ----
  }

  /*
+  * Given a typeid, return the type's typrelid (associated relation), if any.
+  * Returns InvalidOid if type is not a composite type or a domain over one.
+  * This is the same as typeidTypeRelid(getBaseType(type_id)), but faster.
+  */
+ Oid
+ typeOrDomainTypeRelid(Oid type_id)
+ {
+     HeapTuple    typeTuple;
+     Form_pg_type type;
+     Oid            result;
+
+     for (;;)
+     {
+         typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_id));
+         if (!HeapTupleIsValid(typeTuple))
+             elog(ERROR, "cache lookup failed for type %u", type_id);
+         type = (Form_pg_type) GETSTRUCT(typeTuple);
+         if (type->typtype != TYPTYPE_DOMAIN)
+         {
+             /* Not a domain, so done looking through domains */
+             break;
+         }
+         /* It is a domain, so examine the base type instead */
+         type_id = type->typbasetype;
+         ReleaseSysCache(typeTuple);
+     }
+     result = type->typrelid;
+     ReleaseSysCache(typeTuple);
+     return result;
+ }
+
+ /*
   * error context callback for parse failure during parseTypeString()
   */
  static void
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 5a4adbd..9d069b5 100644
*** a/src/backend/utils/adt/ruleutils.c
--- b/src/backend/utils/adt/ruleutils.c
*************** get_name_for_var_field(Var *var, int fie
*** 6706,6722 ****

      /*
       * If it's a Var of type RECORD, we have to find what the Var refers to;
!      * if not, we can use get_expr_result_type. If that fails, we try
!      * lookup_rowtype_tupdesc, which will probably fail too, but will ereport
!      * an acceptable message.
       */
      if (!IsA(var, Var) ||
          var->vartype != RECORDOID)
      {
!         if (get_expr_result_type((Node *) var, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!             tupleDesc = lookup_rowtype_tupdesc_copy(exprType((Node *) var),
!                                                     exprTypmod((Node *) var));
!         Assert(tupleDesc);
          /* Got the tupdesc, so we can extract the field name */
          Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
          return NameStr(tupleDesc->attrs[fieldno - 1]->attname);
--- 6706,6717 ----

      /*
       * If it's a Var of type RECORD, we have to find what the Var refers to;
!      * if not, we can use get_expr_result_tupdesc().
       */
      if (!IsA(var, Var) ||
          var->vartype != RECORDOID)
      {
!         tupleDesc = get_expr_result_tupdesc((Node *) var, false);
          /* Got the tupdesc, so we can extract the field name */
          Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
          return NameStr(tupleDesc->attrs[fieldno - 1]->attname);
*************** get_name_for_var_field(Var *var, int fie
*** 7019,7032 ****

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_type() can do anything with it.  If not, pass to
!      * lookup_rowtype_tupdesc() which will probably fail, but will give an
!      * appropriate error message while failing.
       */
!     if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!         tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
!                                                 exprTypmod(expr));
!     Assert(tupleDesc);
      /* Got the tupdesc, so we can extract the field name */
      Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
      return NameStr(tupleDesc->attrs[fieldno - 1]->attname);
--- 7014,7022 ----

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_tupdesc() can do anything with it.
       */
!     tupleDesc = get_expr_result_tupdesc(expr, false);
      /* Got the tupdesc, so we can extract the field name */
      Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
      return NameStr(tupleDesc->attrs[fieldno - 1]->attname);
diff --git a/src/backend/utils/fmgr/funcapi.c b/src/backend/utils/fmgr/funcapi.c
index be47d41..f36be55 100644
*** a/src/backend/utils/fmgr/funcapi.c
--- b/src/backend/utils/fmgr/funcapi.c
*************** internal_get_result_type(Oid funcid,
*** 394,399 ****
--- 394,474 ----
  }

  /*
+  * get_expr_result_tupdesc
+  *        Get a tupdesc describing the result of a composite-valued expression
+  *
+  * If expression is not composite or rowtype can't be determined, returns NULL
+  * if noError is true, else throws error.
+  *
+  * This can resolve the same cases that get_expr_result_type() can, plus one
+  * more: if the expression yields domain-over-composite, it will look through
+  * the domain and return the base type's tupdesc.  This is generally *not*
+  * appropriate to use when the caller is responsible for constructing a value
+  * of the expression's type, since one would end up constructing a value of
+  * the base type and failing to apply domain constraints if any.  However,
+  * when the goal is just to find out what fields can be extracted from the
+  * expression's value, this is a convenient helper function.
+  */
+ TupleDesc
+ get_expr_result_tupdesc(Node *expr, bool noError)
+ {
+     TupleDesc    tupleDesc;
+     TupleDesc    cache_tupdesc;
+     Oid            exprTypeId;
+     Oid            baseTypeId;
+     int32        baseTypeMod;
+
+     /*
+      * First, try get_expr_result_type(), because that can handle references
+      * to functions returning anonymous record types.
+      */
+     if (get_expr_result_type(expr, NULL, &tupleDesc) == TYPEFUNC_COMPOSITE)
+         return tupleDesc;
+
+     /*
+      * There is one case get_expr_result_type() doesn't handle that we should,
+      * which is a domain over composite.  So drill through any domain type
+      * before asking lookup_rowtype_tupdesc_noerror().
+      */
+     exprTypeId = exprType(expr);
+     baseTypeMod = exprTypmod(expr);
+     baseTypeId = getBaseTypeAndTypmod(exprTypeId, &baseTypeMod);
+
+     cache_tupdesc = lookup_rowtype_tupdesc_noerror(baseTypeId, baseTypeMod,
+                                                    true);
+     if (cache_tupdesc)
+     {
+         /*
+          * Success!  But caller isn't expecting to have to manage a tupdesc
+          * refcount, so copy the cached tupdesc.
+          */
+         tupleDesc = CreateTupleDescCopyConstr(cache_tupdesc);
+         DecrTupleDescRefCount(cache_tupdesc);
+         return tupleDesc;
+     }
+
+     /*
+      * Throw error if requested.  We could have left this to
+      * lookup_rowtype_tupdesc_noerror, but if we're dealing with a domain
+      * type, we prefer to finger the domain not its base type.
+      */
+     if (!noError)
+     {
+         if (exprTypeId != RECORDOID)
+             ereport(ERROR,
+                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                      errmsg("type %s is not composite",
+                             format_type_be(exprTypeId))));
+         else
+             ereport(ERROR,
+                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                      errmsg("record type has not been registered")));
+     }
+
+     return NULL;
+ }
+
+ /*
   * Given the result tuple descriptor for a function with OUT parameters,
   * replace any polymorphic columns (ANYELEMENT etc) with correct data types
   * deduced from the input arguments. Returns TRUE if able to deduce all types,
diff --git a/src/include/funcapi.h b/src/include/funcapi.h
index 951af2a..0909961 100644
*** a/src/include/funcapi.h
--- b/src/include/funcapi.h
*************** typedef struct FuncCallContext
*** 143,148 ****
--- 143,151 ----
   *        get_call_result_type.  Note: the cases in which rowtypes cannot be
   *        determined are different from the cases for get_call_result_type.
   *        Do *not* use this if you can use one of the others.
+  *
+  * A related function, which should generally *not* be used for identifying
+  * a function's result type, is get_expr_result_tupdesc().
   *----------
   */

*************** extern TypeFuncClass get_func_result_typ
*** 165,170 ****
--- 168,175 ----
                       Oid *resultTypeId,
                       TupleDesc *resultTupleDesc);

+ extern TupleDesc get_expr_result_tupdesc(Node *expr, bool noError);
+
  extern bool resolve_polymorphic_argtypes(int numargs, Oid *argtypes,
                               char *argmodes,
                               Node *call_expr);
diff --git a/src/include/parser/parse_type.h b/src/include/parser/parse_type.h
index 7b843d0..9deec0d 100644
*** a/src/include/parser/parse_type.h
--- b/src/include/parser/parse_type.h
*************** extern Oid    typeTypeCollation(Type typ);
*** 46,55 ****
  extern Datum stringTypeDatum(Type tp, char *string, int32 atttypmod);

  extern Oid    typeidTypeRelid(Oid type_id);

  extern TypeName *typeStringToTypeName(const char *str);
  extern void parseTypeString(const char *str, Oid *typeid_p, int32 *typmod_p, bool missing_ok);

! #define ISCOMPLEX(typeid) (typeidTypeRelid(typeid) != InvalidOid)

  #endif                            /* PARSE_TYPE_H */
--- 46,56 ----
  extern Datum stringTypeDatum(Type tp, char *string, int32 atttypmod);

  extern Oid    typeidTypeRelid(Oid type_id);
+ extern Oid    typeOrDomainTypeRelid(Oid type_id);

  extern TypeName *typeStringToTypeName(const char *str);
  extern void parseTypeString(const char *str, Oid *typeid_p, int32 *typmod_p, bool missing_ok);

! #define ISCOMPLEX(typeid) (typeOrDomainTypeRelid(typeid) != InvalidOid)

  #endif                            /* PARSE_TYPE_H */
diff --git a/src/test/regress/expected/domain.out b/src/test/regress/expected/domain.out
index 5fe999e..21075f5 100644
*** a/src/test/regress/expected/domain.out
--- b/src/test/regress/expected/domain.out
*************** select pg_typeof('{1,2,3}'::dia || 42);
*** 198,203 ****
--- 198,291 ----
  (1 row)

  drop domain dia;
+ -- Test domains over composites
+ create type comptype as (r float8, i float8);
+ create domain dcomptype as comptype;
+ create table dcomptable (d1 dcomptype unique);
+ insert into dcomptable values (row(1,2)::dcomptype);
+ insert into dcomptable values (row(3,4)::comptype);
+ insert into dcomptable values (row(1,2)::dcomptype);  -- fail on uniqueness
+ ERROR:  duplicate key value violates unique constraint "dcomptable_d1_key"
+ DETAIL:  Key (d1)=((1,2)) already exists.
+ insert into dcomptable (d1.r) values(11);
+ select * from dcomptable;
+   d1
+ -------
+  (1,2)
+  (3,4)
+  (11,)
+ (3 rows)
+
+ select (d1).r, (d1).i, (d1).* from dcomptable;
+  r  | i | r  | i
+ ----+---+----+---
+   1 | 2 |  1 | 2
+   3 | 4 |  3 | 4
+  11 |   | 11 |
+ (3 rows)
+
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;
+ select * from dcomptable;
+   d1
+ -------
+  (11,)
+  (2,2)
+  (4,4)
+ (3 rows)
+
+ alter domain dcomptype add constraint c1 check ((value).r <= (value).i);
+ alter domain dcomptype add constraint c2 check ((value).r > (value).i);  -- fail
+ ERROR:  column "d1" of table "dcomptable" contains values that violate the new constraint
+ select row(2,1)::dcomptype;  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ insert into dcomptable values (row(1,2)::comptype);
+ insert into dcomptable values (row(2,1)::comptype);  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ insert into dcomptable (d1.r) values(99);
+ insert into dcomptable (d1.r, d1.i) values(99, 100);
+ insert into dcomptable (d1.r, d1.i) values(100, 99);  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ select * from dcomptable;
+     d1
+ ----------
+  (11,)
+  (99,)
+  (1,3)
+  (3,5)
+  (0,3)
+  (98,101)
+ (6 rows)
+
+ explain (verbose, costs off)
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+                                           QUERY PLAN
+ -----------------------------------------------------------------------------------------------
+  Update on public.dcomptable
+    ->  Seq Scan on public.dcomptable
+          Output: ROW(((d1).r - '1'::double precision), ((d1).i + '1'::double precision)), ctid
+          Filter: ((dcomptable.d1).i > '0'::double precision)
+ (4 rows)
+
+ create rule silly as on delete to dcomptable do instead
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ \d+ dcomptable
+                                   Table "public.dcomptable"
+  Column |   Type    | Collation | Nullable | Default | Storage  | Stats target | Description
+ --------+-----------+-----------+----------+---------+----------+--------------+-------------
+  d1     | dcomptype |           |          |         | extended |              |
+ Indexes:
+     "dcomptable_d1_key" UNIQUE CONSTRAINT, btree (d1)
+ Rules:
+     silly AS
+     ON DELETE TO dcomptable DO INSTEAD  UPDATE dcomptable SET d1.r = (dcomptable.d1).r - 1::double precision, d1.i =
(dcomptable.d1).i+ 1::double precision 
+   WHERE (dcomptable.d1).i > 0::double precision
+
+ drop table dcomptable;
+ drop type comptype cascade;
+ NOTICE:  drop cascades to type dcomptype
  -- Test domains over arrays of composite
  create type comptype as (r float8, i float8);
  create domain dcomptypea as comptype[];
diff --git a/src/test/regress/sql/domain.sql b/src/test/regress/sql/domain.sql
index 5ec128d..ba45165 100644
*** a/src/test/regress/sql/domain.sql
--- b/src/test/regress/sql/domain.sql
*************** select pg_typeof('{1,2,3}'::dia || 42);
*** 120,125 ****
--- 120,164 ----
  drop domain dia;


+ -- Test domains over composites
+
+ create type comptype as (r float8, i float8);
+ create domain dcomptype as comptype;
+ create table dcomptable (d1 dcomptype unique);
+
+ insert into dcomptable values (row(1,2)::dcomptype);
+ insert into dcomptable values (row(3,4)::comptype);
+ insert into dcomptable values (row(1,2)::dcomptype);  -- fail on uniqueness
+ insert into dcomptable (d1.r) values(11);
+
+ select * from dcomptable;
+ select (d1).r, (d1).i, (d1).* from dcomptable;
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;
+ select * from dcomptable;
+
+ alter domain dcomptype add constraint c1 check ((value).r <= (value).i);
+ alter domain dcomptype add constraint c2 check ((value).r > (value).i);  -- fail
+
+ select row(2,1)::dcomptype;  -- fail
+ insert into dcomptable values (row(1,2)::comptype);
+ insert into dcomptable values (row(2,1)::comptype);  -- fail
+ insert into dcomptable (d1.r) values(99);
+ insert into dcomptable (d1.r, d1.i) values(99, 100);
+ insert into dcomptable (d1.r, d1.i) values(100, 99);  -- fail
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;  -- fail
+ update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ select * from dcomptable;
+
+ explain (verbose, costs off)
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ create rule silly as on delete to dcomptable do instead
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ \d+ dcomptable
+
+ drop table dcomptable;
+ drop type comptype cascade;
+
+
  -- Test domains over arrays of composite

  create type comptype as (r float8, i float8);

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: Re: [HACKERS] PostgreSQL - Weak DH group
Next
From: "David G. Johnston"
Date:
Subject: Re: [HACKERS] CAST vs ::