WIP patch to improve amvalidate functions - Mailing list pgsql-hackers

From Tom Lane
Subject WIP patch to improve amvalidate functions
Date
Msg-id 6681.1453329415@sss.pgh.pa.us
Whole thread Raw
Responses Re: WIP patch to improve amvalidate functions  (Alvaro Herrera <alvherre@2ndquadrant.com>)
List pgsql-hackers
I spent some time trying to improve the amvalidate functions that were
committed in a rather crude state in 65c5fcd353a859da.  Attached is a
very-much-WIP patch showing where I'm headed:

* Check operator and function signatures (argument and result types)
not only their strategy numbers.

* Apply more thorough checks for missing operators and support functions.

* Don't throw ERROR if avoidable, because doing it like that means you can
only identify one problem per run.  Instead, print messages at INFO level
and make the amvalidate function return false if there are any complaints.
(A credible alternative would be to print the messages as WARNINGs, but
I think INFO is actually the right choice here, since the point of calling
amvalidate is to see those messages.)

* Make the messages more user-friendly by printing object names not just
OIDs.

I went through the amvalidate modules in the order they're shown in the
attached patch, and only the last (brinvalidate) is really fully
developed.  I need to go back and bring the others up to snuff, including
getting down to just one copy of the support routines that I wrote.
(I'm thinking of dumping those into a new file access/index/amvalidate.c.)
Also, I think the identify_opfamily_groups() function I wrote for BRIN
will allow improvement of the other modules' cross-checks for missing
operators/functions, but I've not tried yet.

Also, after this is done some of the queries in opr_sanity.sql will be
redundant and removable, but I didn't do that here.

I'm posting this now just in case anyone has some comments, or quibbles
about the overall intent.  In particular, if anyone has an idea for a more
thorough missing-objects check on BRIN opfamilies, I would like to hear
it.  The fact that there are two kinds of opfamilies with rather different
internal consistency requirements is a real pain there, and the check
rules I have here are definitely the result of some trial and error.

Comments?

            regards, tom lane

diff --git a/src/backend/access/nbtree/nbtvalidate.c b/src/backend/access/nbtree/nbtvalidate.c
index b814b54..2c03688 100644
*** a/src/backend/access/nbtree/nbtvalidate.c
--- b/src/backend/access/nbtree/nbtvalidate.c
***************
*** 18,28 ****
--- 18,37 ----
  #include "catalog/pg_amop.h"
  #include "catalog/pg_amproc.h"
  #include "catalog/pg_opclass.h"
+ #include "catalog/pg_operator.h"
+ #include "catalog/pg_proc.h"
+ #include "catalog/pg_type.h"
  #include "utils/builtins.h"
  #include "utils/catcache.h"
  #include "utils/syscache.h"


+ static bool check_func_signature(Oid funcid, Oid restype,
+                      int nargs, Oid arg1type, Oid arg2type);
+ static bool check_opr_signature(Oid opno, Oid restype,
+                     Oid lefttype, Oid righttype);
+
+
  /*
   * Validator for a btree opclass.
   *
*************** btvalidate(Oid opclassoid)
*** 77,90 ****
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check that only allowed procedure numbers exist */
!         if (procform->amprocnum != BTORDER_PROC &&
!             procform->amprocnum != BTSORTSUPPORT_PROC)
!             ereport(ERROR,
!                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("btree opfamily %u contains invalid support number %d for procedure %u",
!                             opfamilyoid,
!                             procform->amprocnum, procform->amproc)));

          /* Remember functions that are specifically for the named opclass */
          if (procform->amproclefttype == opcintype &&
--- 86,121 ----
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check procedure numbers and function signatures */
!         switch (procform->amprocnum)
!         {
!             case BTORDER_PROC:
!                 /* comparison func must match the opclass entry */
!                 if (!check_func_signature(procform->amproc, INT4OID, 2,
!                                           procform->amproclefttype,
!                                           procform->amprocrighttype))
!                     ereport(ERROR,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("btree opfamily %u contains support procedure %u with wrong signature",
!                                     opfamilyoid, procform->amproc)));
!                 break;
!             case BTSORTSUPPORT_PROC:
!                 /* sortsupport always takes internal and returns void */
!                 if (!check_func_signature(procform->amproc, VOIDOID, 1,
!                                           INTERNALOID, InvalidOid))
!                     ereport(ERROR,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("btree opfamily %u contains support procedure %u with wrong signature",
!                                     opfamilyoid, procform->amproc)));
!                 break;
!             default:
!                 ereport(ERROR,
!                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                          errmsg("btree opfamily %u contains invalid support number %d for procedure %u",
!                                 opfamilyoid,
!                                 procform->amprocnum, procform->amproc)));
!                 break;
!         }

          /* Remember functions that are specifically for the named opclass */
          if (procform->amproclefttype == opcintype &&
*************** btvalidate(Oid opclassoid)
*** 109,114 ****
--- 140,154 ----
                              opfamilyoid,
                              oprform->amopstrategy, oprform->amopopr)));

+         /* Check operator signature --- same for all btree strategies */
+         if (!check_opr_signature(oprform->amopopr, BOOLOID,
+                                  oprform->amoplefttype,
+                                  oprform->amoprighttype))
+             ereport(ERROR,
+                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                      errmsg("btree opfamily %u contains operator %u with wrong signature",
+                             opfamilyoid, oprform->amopopr)));
+
          /*
           * Check that we have all strategies for each supported datatype
           * combination.  This is easy since the list will be sorted in
*************** btvalidate(Oid opclassoid)
*** 202,204 ****
--- 242,291 ----

      return true;
  }
+
+ static bool
+ check_func_signature(Oid funcid, Oid restype,
+                      int nargs, Oid arg1type, Oid arg2type)
+ {
+     bool        result = true;
+     HeapTuple    tp;
+     Form_pg_proc procform;
+
+     tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
+     if (!HeapTupleIsValid(tp))
+         elog(ERROR, "cache lookup failed for function %u", funcid);
+     procform = (Form_pg_proc) GETSTRUCT(tp);
+
+     if (procform->prorettype != restype || procform->proretset ||
+         procform->pronargs != nargs)
+         result = false;
+     if (procform->pronargs >= 1 &&
+         procform->proargtypes.values[0] != arg1type)
+         result = false;
+     if (procform->pronargs >= 2 &&
+         procform->proargtypes.values[1] != arg2type)
+         result = false;
+
+     ReleaseSysCache(tp);
+     return result;
+ }
+
+ static bool
+ check_opr_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype)
+ {
+     bool        result = true;
+     HeapTuple    tp;
+     Form_pg_operator opform;
+
+     tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
+     if (!HeapTupleIsValid(tp))    /* shouldn't happen */
+         elog(ERROR, "cache lookup failed for operator %u", opno);
+     opform = (Form_pg_operator) GETSTRUCT(tp);
+
+     if (opform->oprresult != restype || opform->oprkind != 'b' ||
+         opform->oprleft != lefttype || opform->oprright != righttype)
+         result = false;
+
+     ReleaseSysCache(tp);
+     return result;
+ }
diff --git a/src/backend/access/hash/hashvalidate.c b/src/backend/access/hash/hashvalidate.c
index abd6784..9a861b2 100644
*** a/src/backend/access/hash/hashvalidate.c
--- b/src/backend/access/hash/hashvalidate.c
***************
*** 18,28 ****
--- 18,38 ----
  #include "catalog/pg_amop.h"
  #include "catalog/pg_amproc.h"
  #include "catalog/pg_opclass.h"
+ #include "catalog/pg_operator.h"
+ #include "catalog/pg_proc.h"
+ #include "catalog/pg_type.h"
+ #include "parser/parse_coerce.h"
  #include "utils/builtins.h"
+ #include "utils/fmgroids.h"
  #include "utils/catcache.h"
  #include "utils/syscache.h"


+ static bool check_hash_func_signature(Oid funcid, Oid restype, Oid argtype);
+ static bool check_opr_signature(Oid opno, Oid restype,
+                     Oid lefttype, Oid righttype);
+
+
  /*
   * Validator for a hash opclass.
   *
*************** hashvalidate(Oid opclassoid)
*** 73,85 ****
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check that only allowed procedure numbers exist */
!         if (procform->amprocnum != HASHPROC)
!             ereport(ERROR,
!                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("hash opfamily %u contains invalid support number %d for procedure %u",
!                             opfamilyoid,
!                             procform->amprocnum, procform->amproc)));

          /* Remember functions that are specifically for the named opclass */
          if (procform->amproclefttype == opcintype &&
--- 83,108 ----
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check procedure numbers and function signatures */
!         switch (procform->amprocnum)
!         {
!             case HASHPROC:
!                 /* hashing func must (usually) match the opclass entry */
!                 if (!check_hash_func_signature(procform->amproc, INT4OID,
!                                                procform->amproclefttype))
!                     ereport(ERROR,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("hash opfamily %u contains support procedure %u with wrong signature",
!                                     opfamilyoid, procform->amproc)));
!                 break;
!             default:
!                 ereport(ERROR,
!                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                          errmsg("hash opfamily %u contains invalid support number %d for procedure %u",
!                                 opfamilyoid,
!                                 procform->amprocnum, procform->amproc)));
!                 break;
!         }

          /* Remember functions that are specifically for the named opclass */
          if (procform->amproclefttype == opcintype &&
*************** hashvalidate(Oid opclassoid)
*** 104,109 ****
--- 127,141 ----
                              opfamilyoid,
                              oprform->amopstrategy, oprform->amopopr)));

+         /* Check operator signature --- same for all hash strategies */
+         if (!check_opr_signature(oprform->amopopr, BOOLOID,
+                                  oprform->amoplefttype,
+                                  oprform->amoprighttype))
+             ereport(ERROR,
+                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                      errmsg("hash opfamily %u contains operator %u with wrong signature",
+                             opfamilyoid, oprform->amopopr)));
+
          /*
           * There should be relevant hash procedures for each operator
           */
*************** hashvalidate(Oid opclassoid)
*** 155,157 ****
--- 187,257 ----

      return true;
  }
+
+ static bool
+ check_hash_func_signature(Oid funcid, Oid restype, Oid argtype)
+ {
+     bool        result = true;
+     HeapTuple    tp;
+     Form_pg_proc procform;
+
+     tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
+     if (!HeapTupleIsValid(tp))
+         elog(ERROR, "cache lookup failed for function %u", funcid);
+     procform = (Form_pg_proc) GETSTRUCT(tp);
+
+     if (procform->prorettype != restype || procform->proretset ||
+         procform->pronargs != 1)
+         result = false;
+     if (!IsBinaryCoercible(argtype, procform->proargtypes.values[0]))
+     {
+         /*
+          * Some of the built-in hash opclasses cheat by using hash functions
+          * that are different from but physically compatible with the opclass
+          * datatype.  In some of these cases, even the "binary coercible"
+          * check fails because there's no relevant cast.  For the moment, fix
+          * it by having a whitelist of allowed cases.  Test the specific
+          * function identity, not just its input type, because hashvarlena()
+          * takes INTERNAL and allowing any such function seems too scary.
+          */
+         if (funcid == F_HASHINT4 &&
+             (argtype == DATEOID ||
+              argtype == ABSTIMEOID || argtype == RELTIMEOID ||
+              argtype == XIDOID || argtype == CIDOID))
+              /* okay, allowed use of hashint4() */ ;
+         else if (funcid == F_TIMESTAMP_HASH &&
+                  argtype == TIMESTAMPTZOID)
+              /* okay, allowed use of timestamp_hash() */ ;
+         else if (funcid == F_HASHCHAR &&
+                  argtype == BOOLOID)
+              /* okay, allowed use of hashchar() */ ;
+         else if (funcid == F_HASHVARLENA &&
+                  argtype == BYTEAOID)
+              /* okay, allowed use of hashvarlena() */ ;
+         else
+             result = false;
+     }
+
+     ReleaseSysCache(tp);
+     return result;
+ }
+
+ static bool
+ check_opr_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype)
+ {
+     bool        result = true;
+     HeapTuple    tp;
+     Form_pg_operator opform;
+
+     tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
+     if (!HeapTupleIsValid(tp))    /* shouldn't happen */
+         elog(ERROR, "cache lookup failed for operator %u", opno);
+     opform = (Form_pg_operator) GETSTRUCT(tp);
+
+     if (opform->oprresult != restype || opform->oprkind != 'b' ||
+         opform->oprleft != lefttype || opform->oprright != righttype)
+         result = false;
+
+     ReleaseSysCache(tp);
+     return result;
+ }
diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c
index c2d2d46..f7a6eb2 100644
*** a/src/backend/access/spgist/spgvalidate.c
--- b/src/backend/access/spgist/spgvalidate.c
***************
*** 18,27 ****
--- 18,36 ----
  #include "catalog/pg_amop.h"
  #include "catalog/pg_amproc.h"
  #include "catalog/pg_opclass.h"
+ #include "catalog/pg_operator.h"
+ #include "catalog/pg_proc.h"
+ #include "catalog/pg_type.h"
  #include "utils/catcache.h"
  #include "utils/syscache.h"


+ static bool check_func_signature(Oid funcid, Oid restype,
+                      int nargs, Oid arg1type, Oid arg2type);
+ static bool check_opr_signature(Oid opno, Oid restype,
+                     Oid lefttype, Oid righttype);
+
+
  /*
   * Validator for an SP-GiST opclass.
   */
*************** spgvalidate(Oid opclassoid)
*** 63,76 ****
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check that only allowed procedure numbers exist */
!         if (procform->amprocnum < 1 ||
!             procform->amprocnum > SPGISTNProc)
!             ereport(ERROR,
!                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("spgist opfamily %u contains invalid support number %d for procedure %u",
!                             opfamilyoid,
!                             procform->amprocnum, procform->amproc)));

          /* Remember functions that are specifically for the named opclass */
          if (procform->amproclefttype == opcintype &&
--- 72,107 ----
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check procedure numbers and function signatures */
!         switch (procform->amprocnum)
!         {
!             case SPGIST_CONFIG_PROC:
!             case SPGIST_CHOOSE_PROC:
!             case SPGIST_PICKSPLIT_PROC:
!             case SPGIST_INNER_CONSISTENT_PROC:
!                 if (!check_func_signature(procform->amproc, VOIDOID, 2,
!                                           INTERNALOID, INTERNALOID))
!                     ereport(ERROR,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("spgist opfamily %u contains support procedure %u with wrong signature",
!                                     opfamilyoid, procform->amproc)));
!                 break;
!             case SPGIST_LEAF_CONSISTENT_PROC:
!                 if (!check_func_signature(procform->amproc, BOOLOID, 2,
!                                           INTERNALOID, INTERNALOID))
!                     ereport(ERROR,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("spgist opfamily %u contains support procedure %u with wrong signature",
!                                     opfamilyoid, procform->amproc)));
!                 break;
!             default:
!                 ereport(ERROR,
!                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                          errmsg("spgist opfamily %u contains invalid support number %d for procedure %u",
!                                 opfamilyoid,
!                                 procform->amprocnum, procform->amproc)));
!                 break;
!         }

          /* Remember functions that are specifically for the named opclass */
          if (procform->amproclefttype == opcintype &&
*************** spgvalidate(Oid opclassoid)
*** 92,97 ****
--- 123,137 ----
                              opfamilyoid,
                              oprform->amopstrategy, oprform->amopopr)));

+         /* Check operator signature --- same for all spgist strategies */
+         if (!check_opr_signature(oprform->amopopr, BOOLOID,
+                                  oprform->amoplefttype,
+                                  oprform->amoprighttype))
+             ereport(ERROR,
+                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                      errmsg("spgist opfamily %u contains operator %u with wrong signature",
+                             opfamilyoid, oprform->amopopr)));
+
          /* spgist doesn't support ORDER BY operators */
          if (oprform->amoppurpose != AMOP_SEARCH ||
              OidIsValid(oprform->amopsortfamily))
*************** spgvalidate(Oid opclassoid)
*** 127,129 ****
--- 167,216 ----

      return true;
  }
+
+ static bool
+ check_func_signature(Oid funcid, Oid restype,
+                      int nargs, Oid arg1type, Oid arg2type)
+ {
+     bool        result = true;
+     HeapTuple    tp;
+     Form_pg_proc procform;
+
+     tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
+     if (!HeapTupleIsValid(tp))
+         elog(ERROR, "cache lookup failed for function %u", funcid);
+     procform = (Form_pg_proc) GETSTRUCT(tp);
+
+     if (procform->prorettype != restype || procform->proretset ||
+         procform->pronargs != nargs)
+         result = false;
+     if (procform->pronargs >= 1 &&
+         procform->proargtypes.values[0] != arg1type)
+         result = false;
+     if (procform->pronargs >= 2 &&
+         procform->proargtypes.values[1] != arg2type)
+         result = false;
+
+     ReleaseSysCache(tp);
+     return result;
+ }
+
+ static bool
+ check_opr_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype)
+ {
+     bool        result = true;
+     HeapTuple    tp;
+     Form_pg_operator opform;
+
+     tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
+     if (!HeapTupleIsValid(tp))    /* shouldn't happen */
+         elog(ERROR, "cache lookup failed for operator %u", opno);
+     opform = (Form_pg_operator) GETSTRUCT(tp);
+
+     if (opform->oprresult != restype || opform->oprkind != 'b' ||
+         opform->oprleft != lefttype || opform->oprright != righttype)
+         result = false;
+
+     ReleaseSysCache(tp);
+     return result;
+ }
diff --git a/src/backend/access/gist/gistvalidate.c b/src/backend/access/gist/gistvalidate.c
index 86b5aea..d0a5851 100644
*** a/src/backend/access/gist/gistvalidate.c
--- b/src/backend/access/gist/gistvalidate.c
***************
*** 15,37 ****
--- 15,54 ----

  #include "access/gist_private.h"
  #include "access/htup_details.h"
+ #include "catalog/pg_am.h"
  #include "catalog/pg_amop.h"
  #include "catalog/pg_amproc.h"
  #include "catalog/pg_opclass.h"
+ #include "catalog/pg_opclass.h"
+ #include "catalog/pg_operator.h"
+ #include "catalog/pg_proc.h"
+ #include "catalog/pg_type.h"
+ #include "parser/parse_coerce.h"
  #include "utils/catcache.h"
+ #include "utils/lsyscache.h"
  #include "utils/syscache.h"


+ static bool check_func_signature(Oid funcid, Oid restype,
+                      int nargs, Oid arg1type, Oid arg2type, Oid arg3type,
+                      Oid arg4type, Oid arg5type);
+ static bool check_opr_signature(Oid opno, Oid restype,
+                     Oid lefttype, Oid righttype);
+ static bool opfamily_can_sort_type(Oid opfamilyoid, Oid datatypeoid);
+
+
  /*
   * Validator for a GiST opclass.
   */
  bool
  gistvalidate(Oid opclassoid)
  {
+     bool        result = true;
      HeapTuple    classtup;
      Form_pg_opclass classform;
      Oid            opfamilyoid;
      Oid            opcintype;
+     Oid            opckeytype;
      int            numclassops;
      int32        classfuncbits;
      CatCList   *proclist,
*************** gistvalidate(Oid opclassoid)
*** 46,53 ****

      opfamilyoid = classform->opcfamily;
      opcintype = classform->opcintype;
!
!     ReleaseSysCache(classtup);

      /* Fetch all operators and support functions of the opfamily */
      oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
--- 63,71 ----

      opfamilyoid = classform->opcfamily;
      opcintype = classform->opcintype;
!     opckeytype = classform->opckeytype;
!     if (!OidIsValid(opckeytype))
!         opckeytype = opcintype;

      /* Fetch all operators and support functions of the opfamily */
      oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
*************** gistvalidate(Oid opclassoid)
*** 63,81 ****
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check that only allowed procedure numbers exist */
!         if (procform->amprocnum < 1 ||
!             procform->amprocnum > GISTNProcs)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gist opfamily %u contains invalid support number %d for procedure %u",
!                             opfamilyoid,
!                             procform->amprocnum, procform->amproc)));

          /* Remember functions that are specifically for the named opclass */
!         if (procform->amproclefttype == opcintype &&
!             procform->amprocrighttype == opcintype)
!             classfuncbits |= (1 << procform->amprocnum);
      }

      /* Check operators */
--- 81,236 ----
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /*
!          * All GiST support functions should be registered with matching
!          * left/right types
!          */
!         if (procform->amproclefttype != procform->amprocrighttype)
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gist opclass %s contains support procedure %u with incorrect registration",
!                             NameStr(classform->opcname),
!                             procform->amproc)));
!             result = false;
!         }
!
!         /*
!          * We can't check signatures except within the specific opclass, since
!          * we need to know opcintype and/or opckeytype for most cases.
!          */
!         if (procform->amproclefttype != opcintype)
!             continue;
!
!         /* Check procedure numbers and function signatures */
!         switch (procform->amprocnum)
!         {
!             case GIST_CONSISTENT_PROC:
!                 if (!check_func_signature(procform->amproc, BOOLOID, 5,
!                                           INTERNALOID,
!                                           opcintype,
!                                           INT2OID,
!                                           OIDOID,
!                                           INTERNALOID))
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("gist opclass %s contains support procedure %u with wrong signature",
!                                     NameStr(classform->opcname),
!                                     procform->amproc)));
!                     result = false;
!                 }
!                 break;
!             case GIST_UNION_PROC:
!                 if (!check_func_signature(procform->amproc, opckeytype, 2,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid))
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("gist opclass %s contains support procedure %u with wrong signature",
!                                     NameStr(classform->opcname),
!                                     procform->amproc)));
!                     result = false;
!                 }
!                 break;
!             case GIST_COMPRESS_PROC:
!             case GIST_DECOMPRESS_PROC:
!             case GIST_FETCH_PROC:
!                 if (!check_func_signature(procform->amproc, INTERNALOID, 1,
!                                           INTERNALOID,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid))
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("gist opclass %s contains support procedure %u with wrong signature",
!                                     NameStr(classform->opcname),
!                                     procform->amproc)));
!                     result = false;
!                 }
!                 break;
!             case GIST_PENALTY_PROC:
!                 if (!check_func_signature(procform->amproc, INTERNALOID, 3,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           InvalidOid,
!                                           InvalidOid))
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("gist opclass %s contains support procedure %u with wrong signature",
!                                     NameStr(classform->opcname),
!                                     procform->amproc)));
!                     result = false;
!                 }
!                 break;
!             case GIST_PICKSPLIT_PROC:
!                 if (!check_func_signature(procform->amproc, INTERNALOID, 2,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid))
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("gist opclass %s contains support procedure %u with wrong signature",
!                                     NameStr(classform->opcname),
!                                     procform->amproc)));
!                     result = false;
!                 }
!                 break;
!             case GIST_EQUAL_PROC:
!                 if (!check_func_signature(procform->amproc, INTERNALOID, 3,
!                                           opckeytype,
!                                           opckeytype,
!                                           INTERNALOID,
!                                           InvalidOid,
!                                           InvalidOid))
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("gist opclass %s contains support procedure %u with wrong signature",
!                                     NameStr(classform->opcname),
!                                     procform->amproc)));
!                     result = false;
!                 }
!                 break;
!             case GIST_DISTANCE_PROC:
!                 if (!check_func_signature(procform->amproc, FLOAT8OID, 5,
!                                           INTERNALOID,
!                                           opcintype,
!                                           INT2OID,
!                                           OIDOID,
!                                           INTERNALOID))
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("gist opclass %s contains support procedure %u with wrong signature",
!                                     NameStr(classform->opcname),
!                                     procform->amproc)));
!                     result = false;
!                 }
!                 break;
!             default:
!                 ereport(INFO,
!                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                          errmsg("gist opclass %s contains invalid support number %d for procedure %u",
!                                 NameStr(classform->opcname),
!                                 procform->amprocnum, procform->amproc)));
!                 result = false;
!                 continue;        /* omit bad proc numbers from classfuncbits */
!         }

          /* Remember functions that are specifically for the named opclass */
!         classfuncbits |= (1 << procform->amprocnum);
      }

      /* Check operators */
*************** gistvalidate(Oid opclassoid)
*** 86,133 ****

          /* TODO: Check that only allowed strategy numbers exist */
          if (oprform->amopstrategy < 1)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gist opfamily %u contains invalid strategy number %d for operator %u",
                              opfamilyoid,
!                             oprform->amopstrategy, oprform->amopopr)));

!         /* GiST supports ORDER BY operators, but must have distance proc */
!         if (oprform->amoppurpose != AMOP_SEARCH &&
!             oprform->amoplefttype == opcintype &&
!             oprform->amoprighttype == opcintype &&
!             (classfuncbits & (1 << GIST_DISTANCE_PROC)) == 0)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gist opfamily %u contains unsupported ORDER BY specification for operator %u",
!                             opfamilyoid, oprform->amopopr)));

!         /* Count operators that are specifically for the named opclass */
!         /* XXX we consider only lefttype here */
!         if (oprform->amoplefttype == opcintype)
              numclassops++;
      }

!     /* Check that the named opclass is complete */
      if (numclassops == 0)
!         ereport(ERROR,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("gist opclass %u is missing operator(s)",
!                         opclassoid)));
      for (i = 1; i <= GISTNProcs; i++)
      {
          if ((classfuncbits & (1 << i)) != 0)
              continue;            /* got it */
          if (i == GIST_DISTANCE_PROC || i == GIST_FETCH_PROC)
              continue;            /* optional methods */
!         ereport(ERROR,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!             errmsg("gist opclass %u is missing required support function %d",
!                    opclassoid, i)));
      }

      ReleaseCatCacheList(proclist);
      ReleaseCatCacheList(oprlist);

!     return true;
  }
--- 241,426 ----

          /* TODO: Check that only allowed strategy numbers exist */
          if (oprform->amopstrategy < 1)
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gist opfamily %u contains invalid strategy number %d for operator %s",
                              opfamilyoid,
!                             oprform->amopstrategy,
!                             get_opname(oprform->amopopr))));
!             result = false;
!         }

!         /* Check operator signature */
!         if (!check_opr_signature(oprform->amopopr,
!                 (oprform->amoppurpose == AMOP_SEARCH) ? BOOLOID : InvalidOid,
!                                  oprform->amoplefttype,
!                                  oprform->amoprighttype))
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gist opfamily %u contains operator %s with wrong signature",
!                             opfamilyoid,
!                             get_opname(oprform->amopopr))));
!             result = false;
!         }

!         /* GiST supports ORDER BY operators */
!         if (oprform->amoppurpose != AMOP_SEARCH)
!         {
!             /* ... but must have matching distance proc */
!             if (!OidIsValid(get_opfamily_proc(opfamilyoid,
!                                               oprform->amoplefttype,
!                                               oprform->amoplefttype,
!                                               GIST_DISTANCE_PROC)))
!             {
!                 ereport(INFO,
!                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                          errmsg("gist opfamily %u contains unsupported ORDER BY specification for operator %s",
!                                 opfamilyoid,
!                                 get_opname(oprform->amopopr))));
!                 result = false;
!             }
!             /* ... and operator result must match the claimed opfamily */
!             if (!opfamily_can_sort_type(oprform->amopsortfamily,
!                                         get_op_rettype(oprform->amopopr)))
!             {
!                 ereport(INFO,
!                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                          errmsg("gist opfamily %u contains incorrect ORDER BY opfamily specification for operator
%s",
!                                 opfamilyoid,
!                                 get_opname(oprform->amopopr))));
!                 result = false;
!             }
!         }
!
!         /*
!          * Count operators that match the named opclass.  Some GiST opclasses
!          * such as gist_cidr_ops have only binary-compatible operators.
!          */
!         if (IsBinaryCoercible(opcintype, oprform->amoplefttype))
              numclassops++;
      }

!     /* TODO: check that the named opclass is complete */
      if (numclassops == 0)
!     {
!         ereport(INFO,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("gist opclass %s is missing operator(s)",
!                         NameStr(classform->opcname))));
!         result = false;
!     }
      for (i = 1; i <= GISTNProcs; i++)
      {
          if ((classfuncbits & (1 << i)) != 0)
              continue;            /* got it */
          if (i == GIST_DISTANCE_PROC || i == GIST_FETCH_PROC)
              continue;            /* optional methods */
!         ereport(INFO,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!             errmsg("gist opclass %s is missing required support function %d",
!                    NameStr(classform->opcname), i)));
!         result = false;
      }

      ReleaseCatCacheList(proclist);
      ReleaseCatCacheList(oprlist);
+     ReleaseSysCache(classtup);

!     return result;
! }
!
! static bool
! check_func_signature(Oid funcid, Oid restype,
!                      int nargs, Oid arg1type, Oid arg2type, Oid arg3type,
!                      Oid arg4type, Oid arg5type)
! {
!     bool        result = true;
!     HeapTuple    tp;
!     Form_pg_proc procform;
!
!     tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
!     if (!HeapTupleIsValid(tp))
!         elog(ERROR, "cache lookup failed for function %u", funcid);
!     procform = (Form_pg_proc) GETSTRUCT(tp);
!
!     if (procform->prorettype != restype || procform->proretset ||
!         procform->pronargs != nargs)
!         result = false;
!     if (procform->pronargs >= 1 &&
!         !IsBinaryCoercible(arg1type, procform->proargtypes.values[0]))
!         result = false;
!     if (procform->pronargs >= 2 &&
!         !IsBinaryCoercible(arg2type, procform->proargtypes.values[1]))
!         result = false;
!     if (procform->pronargs >= 3 &&
!         !IsBinaryCoercible(arg3type, procform->proargtypes.values[2]))
!         result = false;
!     if (procform->pronargs >= 4 &&
!         !IsBinaryCoercible(arg4type, procform->proargtypes.values[3]))
!         result = false;
!     if (procform->pronargs >= 5 &&
!         !IsBinaryCoercible(arg5type, procform->proargtypes.values[4]))
!         result = false;
!
!     ReleaseSysCache(tp);
!     return result;
! }
!
! /* restype can be InvalidOid to omit check on result type */
! static bool
! check_opr_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype)
! {
!     bool        result = true;
!     HeapTuple    tp;
!     Form_pg_operator opform;
!
!     tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
!     if (!HeapTupleIsValid(tp))    /* shouldn't happen */
!         elog(ERROR, "cache lookup failed for operator %u", opno);
!     opform = (Form_pg_operator) GETSTRUCT(tp);
!
!     if (OidIsValid(restype) && opform->oprresult != restype)
!         result = false;
!
!     if (opform->oprkind != 'b' ||
!         opform->oprleft != lefttype || opform->oprright != righttype)
!         result = false;
!
!     ReleaseSysCache(tp);
!     return result;
! }
!
! /* Is the datatype a legitimate input type for the btree opfamily? */
! static bool
! opfamily_can_sort_type(Oid opfamilyoid, Oid datatypeoid)
! {
!     bool        result = false;
!     CatCList   *opclist;
!     int            i;
!
!     /*
!      * We search through all btree opclasses to see if one matches.  This is a
!      * bit inefficient but there is no better index available.  It also saves
!      * making an explicit check that the opfamily belongs to btree.
!      */
!     opclist = SearchSysCacheList1(CLAAMNAMENSP, ObjectIdGetDatum(BTREE_AM_OID));
!
!     for (i = 0; i < opclist->n_members; i++)
!     {
!         HeapTuple    classtup = &opclist->members[i]->tuple;
!         Form_pg_opclass classform = (Form_pg_opclass) GETSTRUCT(classtup);
!
!         if (classform->opcfamily == opfamilyoid &&
!             classform->opcintype == datatypeoid)
!         {
!             result = true;
!             break;
!         }
!     }
!
!     ReleaseCatCacheList(opclist);
!
!     return result;
  }
diff --git a/src/backend/access/gin/ginvalidate.c b/src/backend/access/gin/ginvalidate.c
index e934fe8..1f4ce80 100644
*** a/src/backend/access/gin/ginvalidate.c
--- b/src/backend/access/gin/ginvalidate.c
***************
*** 18,37 ****
--- 18,52 ----
  #include "catalog/pg_amop.h"
  #include "catalog/pg_amproc.h"
  #include "catalog/pg_opclass.h"
+ #include "catalog/pg_operator.h"
+ #include "catalog/pg_proc.h"
+ #include "catalog/pg_type.h"
+ #include "parser/parse_coerce.h"
  #include "utils/catcache.h"
+ #include "utils/lsyscache.h"
  #include "utils/syscache.h"


+ static bool check_func_signature(Oid funcid, Oid restype,
+                      int nargs, Oid arg1type, Oid arg2type, Oid arg3type,
+                      Oid arg4type, Oid arg5type, Oid arg6type,
+                      Oid arg7type, Oid arg8type);
+ static bool check_opr_signature(Oid opno, Oid restype,
+                     Oid lefttype, Oid righttype);
+
+
  /*
   * Validator for a GIN opclass.
   */
  bool
  ginvalidate(Oid opclassoid)
  {
+     bool        result = true;
      HeapTuple    classtup;
      Form_pg_opclass classform;
      Oid            opfamilyoid;
      Oid            opcintype;
+     Oid            opckeytype;
      int            numclassops;
      int32        classfuncbits;
      CatCList   *proclist,
*************** ginvalidate(Oid opclassoid)
*** 46,53 ****

      opfamilyoid = classform->opcfamily;
      opcintype = classform->opcintype;
!
!     ReleaseSysCache(classtup);

      /* Fetch all operators and support functions of the opfamily */
      oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
--- 61,69 ----

      opfamilyoid = classform->opcfamily;
      opcintype = classform->opcintype;
!     opckeytype = classform->opckeytype;
!     if (!OidIsValid(opckeytype))
!         opckeytype = opcintype;

      /* Fetch all operators and support functions of the opfamily */
      oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
*************** ginvalidate(Oid opclassoid)
*** 62,81 ****
      {
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check that only allowed procedure numbers exist */
!         if (procform->amprocnum < 1 ||
!             procform->amprocnum > GINNProcs)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gin opfamily %u contains invalid support number %d for procedure %u",
!                             opfamilyoid,
!                             procform->amprocnum, procform->amproc)));

          /* Remember functions that are specifically for the named opclass */
!         if (procform->amproclefttype == opcintype &&
!             procform->amprocrighttype == opcintype)
!             classfuncbits |= (1 << procform->amprocnum);
      }

      /* Check operators */
--- 78,227 ----
      {
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
+         bool        ok;

!         /*
!          * All GIN support functions should be registered with matching
!          * left/right types
!          */
!         if (procform->amproclefttype != procform->amprocrighttype)
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gin opclass %s contains support procedure %u with incorrect registration",
!                             NameStr(classform->opcname),
!                             procform->amproc)));
!             result = false;
!         }
!
!         /*
!          * We can't check signatures except within the specific opclass, since
!          * we need to know opcintype and/or opckeytype for most cases.
!          */
!         if (procform->amproclefttype != opcintype)
!             continue;
!
!         /* Check procedure numbers and function signatures */
!         switch (procform->amprocnum)
!         {
!             case GIN_COMPARE_PROC:
!                 ok = check_func_signature(procform->amproc, INT4OID, 2,
!                                           opckeytype,
!                                           opckeytype,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid);
!                 break;
!             case GIN_EXTRACTVALUE_PROC:
!                 /* Some opclasses omit nullFlags */
!                 ok = check_func_signature(procform->amproc, INTERNALOID, 3,
!                                           opcintype,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid) ||
!                     check_func_signature(procform->amproc, INTERNALOID, 2,
!                                          opcintype,
!                                          INTERNALOID,
!                                          InvalidOid,
!                                          InvalidOid,
!                                          InvalidOid,
!                                          InvalidOid,
!                                          InvalidOid,
!                                          InvalidOid);
!                 break;
!             case GIN_EXTRACTQUERY_PROC:
!                 /* Some opclasses omit nullFlags and searchMode */
!                 ok = check_func_signature(procform->amproc, INTERNALOID, 7,
!                                           opcintype,
!                                           INTERNALOID,
!                                           INT2OID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           InvalidOid) ||
!                     check_func_signature(procform->amproc, INTERNALOID, 5,
!                                          opcintype,
!                                          INTERNALOID,
!                                          INT2OID,
!                                          INTERNALOID,
!                                          INTERNALOID,
!                                          InvalidOid,
!                                          InvalidOid,
!                                          InvalidOid);
!                 break;
!             case GIN_CONSISTENT_PROC:
!                 /* Some opclasses omit queryKeys and nullFlags */
!                 ok = check_func_signature(procform->amproc, BOOLOID, 8,
!                                           INTERNALOID,
!                                           INT2OID,
!                                           opcintype,
!                                           INT4OID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           INTERNALOID) ||
!                     check_func_signature(procform->amproc, BOOLOID, 6,
!                                          INTERNALOID,
!                                          INT2OID,
!                                          opcintype,
!                                          INT4OID,
!                                          INTERNALOID,
!                                          INTERNALOID,
!                                          InvalidOid,
!                                          InvalidOid);
!                 break;
!             case GIN_COMPARE_PARTIAL_PROC:
!                 ok = check_func_signature(procform->amproc, INT4OID, 4,
!                                           opckeytype,
!                                           opckeytype,
!                                           INT2OID,
!                                           INTERNALOID,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid,
!                                           InvalidOid);
!                 break;
!             case GIN_TRICONSISTENT_PROC:
!                 ok = check_func_signature(procform->amproc, CHAROID, 7,
!                                           INTERNALOID,
!                                           INT2OID,
!                                           opcintype,
!                                           INT4OID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           INTERNALOID,
!                                           InvalidOid);
!                 break;
!             default:
!                 ereport(INFO,
!                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                          errmsg("gin opclass %s contains procedure %u with invalid support number %d",
!                                 NameStr(classform->opcname),
!                                 procform->amproc, procform->amprocnum)));
!                 result = false;
!                 continue;        /* omit bad proc numbers from classfuncbits */
!         }
!
!         if (!ok)
!         {
!             ereport(INFO,
!                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("gin opclass %s contains procedure %u with wrong signature for support number %d",
!                             NameStr(classform->opcname),
!                             procform->amproc, procform->amprocnum)));
!             result = false;
!         }

          /* Remember functions that are specifically for the named opclass */
!         classfuncbits |= (1 << procform->amprocnum);
      }

      /* Check operators */
*************** ginvalidate(Oid opclassoid)
*** 86,122 ****

          /* TODO: Check that only allowed strategy numbers exist */
          if (oprform->amopstrategy < 1)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
                       errmsg("gin opfamily %u contains invalid strategy number %d for operator %u",
                              opfamilyoid,
                              oprform->amopstrategy, oprform->amopopr)));

          /* gin doesn't support ORDER BY operators */
          if (oprform->amoppurpose != AMOP_SEARCH ||
              OidIsValid(oprform->amopsortfamily))
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
                       errmsg("gin opfamily %u contains invalid ORDER BY specification for operator %u",
                              opfamilyoid, oprform->amopopr)));

!         /* Count operators that are specifically for the named opclass */
!         if (oprform->amoplefttype == opcintype &&
!             oprform->amoprighttype == opcintype)
              numclassops++;
      }

!     /* Check that the named opclass is complete */
!
!     /* XXX needs work: we need to detect applicability of ANYARRAY operators */
! #ifdef NOT_USED
      if (numclassops == 0)
!         ereport(ERROR,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("gin opclass %u is missing operator(s)",
!                         opclassoid)));
! #endif
!
      for (i = 1; i <= GINNProcs; i++)
      {
          if ((classfuncbits & (1 << i)) != 0)
--- 232,287 ----

          /* TODO: Check that only allowed strategy numbers exist */
          if (oprform->amopstrategy < 1)
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
                       errmsg("gin opfamily %u contains invalid strategy number %d for operator %u",
                              opfamilyoid,
                              oprform->amopstrategy, oprform->amopopr)));
+             result = false;
+         }
+
+         /* Check operator signature */
+         if (!check_opr_signature(oprform->amopopr, BOOLOID,
+                                  oprform->amoplefttype,
+                                  oprform->amoprighttype))
+         {
+             ereport(INFO,
+                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                      errmsg("gin opfamily %u contains operator %s with wrong signature",
+                             opfamilyoid,
+                             get_opname(oprform->amopopr))));
+             result = false;
+         }

          /* gin doesn't support ORDER BY operators */
          if (oprform->amoppurpose != AMOP_SEARCH ||
              OidIsValid(oprform->amopsortfamily))
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
                       errmsg("gin opfamily %u contains invalid ORDER BY specification for operator %u",
                              opfamilyoid, oprform->amopopr)));
+             result = false;
+         }

!         /*
!          * Count operators that match the named opclass.  Some GIN opclasses
!          * have only binary-compatible operators.
!          */
!         if (IsBinaryCoercible(opcintype, oprform->amoplefttype))
              numclassops++;
      }

!     /* TODO: check that the named opclass is complete */
      if (numclassops == 0)
!     {
!         ereport(INFO,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("gin opclass %s is missing operator(s)",
!                         NameStr(classform->opcname))));
!         result = false;
!     }
      for (i = 1; i <= GINNProcs; i++)
      {
          if ((classfuncbits & (1 << i)) != 0)
*************** ginvalidate(Oid opclassoid)
*** 124,145 ****
          if (i == GIN_COMPARE_PARTIAL_PROC)
              continue;            /* optional method */
          if (i == GIN_CONSISTENT_PROC || i == GIN_TRICONSISTENT_PROC)
!             continue;            /* don't need to have both, see check below
!                                  * loop */
!         ereport(ERROR,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!              errmsg("gin opclass %u is missing required support function %d",
!                     opclassoid, i)));
      }
      if ((classfuncbits & (1 << GIN_CONSISTENT_PROC)) == 0 &&
          (classfuncbits & (1 << GIN_TRICONSISTENT_PROC)) == 0)
!         ereport(ERROR,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("gin opclass %u is missing required support function",
!                         opclassoid)));

      ReleaseCatCacheList(proclist);
      ReleaseCatCacheList(oprlist);

!     return true;
  }
--- 289,381 ----
          if (i == GIN_COMPARE_PARTIAL_PROC)
              continue;            /* optional method */
          if (i == GIN_CONSISTENT_PROC || i == GIN_TRICONSISTENT_PROC)
!             continue;            /* don't need both, see check below loop */
!         ereport(INFO,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!              errmsg("gin opclass %s is missing required support function %d",
!                     NameStr(classform->opcname), i)));
!         result = false;
      }
      if ((classfuncbits & (1 << GIN_CONSISTENT_PROC)) == 0 &&
          (classfuncbits & (1 << GIN_TRICONSISTENT_PROC)) == 0)
!     {
!         ereport(INFO,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("gin opclass %s is missing required support function",
!                         NameStr(classform->opcname))));
!         result = false;
!     }

      ReleaseCatCacheList(proclist);
      ReleaseCatCacheList(oprlist);
+     ReleaseSysCache(classtup);

!     return result;
! }
!
! static bool
! check_func_signature(Oid funcid, Oid restype,
!                      int nargs, Oid arg1type, Oid arg2type, Oid arg3type,
!                      Oid arg4type, Oid arg5type, Oid arg6type,
!                      Oid arg7type, Oid arg8type)
! {
!     bool        result = true;
!     HeapTuple    tp;
!     Form_pg_proc procform;
!
!     tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
!     if (!HeapTupleIsValid(tp))
!         elog(ERROR, "cache lookup failed for function %u", funcid);
!     procform = (Form_pg_proc) GETSTRUCT(tp);
!
!     if (procform->prorettype != restype || procform->proretset ||
!         procform->pronargs != nargs)
!         result = false;
!     if (procform->pronargs >= 1 &&
!         !IsBinaryCoercible(arg1type, procform->proargtypes.values[0]))
!         result = false;
!     if (procform->pronargs >= 2 &&
!         !IsBinaryCoercible(arg2type, procform->proargtypes.values[1]))
!         result = false;
!     if (procform->pronargs >= 3 &&
!         !IsBinaryCoercible(arg3type, procform->proargtypes.values[2]))
!         result = false;
!     if (procform->pronargs >= 4 &&
!         !IsBinaryCoercible(arg4type, procform->proargtypes.values[3]))
!         result = false;
!     if (procform->pronargs >= 5 &&
!         !IsBinaryCoercible(arg5type, procform->proargtypes.values[4]))
!         result = false;
!     if (procform->pronargs >= 6 &&
!         !IsBinaryCoercible(arg6type, procform->proargtypes.values[5]))
!         result = false;
!     if (procform->pronargs >= 7 &&
!         !IsBinaryCoercible(arg7type, procform->proargtypes.values[6]))
!         result = false;
!     if (procform->pronargs >= 8 &&
!         !IsBinaryCoercible(arg8type, procform->proargtypes.values[7]))
!         result = false;
!
!     ReleaseSysCache(tp);
!     return result;
! }
!
! static bool
! check_opr_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype)
! {
!     bool        result = true;
!     HeapTuple    tp;
!     Form_pg_operator opform;
!
!     tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
!     if (!HeapTupleIsValid(tp))    /* shouldn't happen */
!         elog(ERROR, "cache lookup failed for operator %u", opno);
!     opform = (Form_pg_operator) GETSTRUCT(tp);
!
!     if (opform->oprresult != restype || opform->oprkind != 'b' ||
!         opform->oprleft != lefttype || opform->oprright != righttype)
!         result = false;
!
!     ReleaseSysCache(tp);
!     return result;
  }
diff --git a/src/backend/access/brin/brin_validate.c b/src/backend/access/brin/brin_validate.c
index 956ecb9..29e9705 100644
*** a/src/backend/access/brin/brin_validate.c
--- b/src/backend/access/brin/brin_validate.c
***************
*** 18,43 ****
  #include "catalog/pg_amop.h"
  #include "catalog/pg_amproc.h"
  #include "catalog/pg_opclass.h"
  #include "utils/catcache.h"
  #include "utils/syscache.h"


  /*
   * Validator for a BRIN opclass.
   */
  bool
  brinvalidate(Oid opclassoid)
  {
      HeapTuple    classtup;
      Form_pg_opclass classform;
      Oid            opfamilyoid;
      Oid            opcintype;
!     int            numclassops;
!     int32        classfuncbits;
      CatCList   *proclist,
                 *oprlist;
!     int            i,
!                 j;

      /* Fetch opclass information */
      classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
--- 18,69 ----
  #include "catalog/pg_amop.h"
  #include "catalog/pg_amproc.h"
  #include "catalog/pg_opclass.h"
+ #include "catalog/pg_operator.h"
+ #include "catalog/pg_opfamily.h"
+ #include "catalog/pg_proc.h"
+ #include "catalog/pg_type.h"
+ #include "utils/builtins.h"
  #include "utils/catcache.h"
  #include "utils/syscache.h"


+ typedef struct OpFamilyOpFuncGroup
+ {
+     Oid            lefttype;        /* amoplefttype/amproclefttype */
+     Oid            righttype;        /* amoprighttype/amprocrighttype */
+     uint64        operatorset;    /* bitmask of operators with these types */
+     uint64        functionset;    /* bitmask of support funcs with these types */
+ } OpFamilyOpFuncGroup;
+
+ static List *identify_opfamily_groups(CatCList *oprlist, CatCList *proclist);
+ static bool check_func_signature(Oid funcid, Oid restype, int nargs,...);
+ static bool check_opr_signature(Oid opno, Oid restype,
+                     Oid lefttype, Oid righttype);
+
+
  /*
   * Validator for a BRIN opclass.
   */
  bool
  brinvalidate(Oid opclassoid)
  {
+     bool        result = true;
      HeapTuple    classtup;
      Form_pg_opclass classform;
      Oid            opfamilyoid;
      Oid            opcintype;
!     char       *opclassname;
!     HeapTuple    familytup;
!     Form_pg_opfamily familyform;
!     char       *opfamilyname;
      CatCList   *proclist,
                 *oprlist;
!     uint64        allfuncs = 0;
!     uint64        allops = 0;
!     List       *grouplist;
!     OpFamilyOpFuncGroup *opclassgroup;
!     int            i;
!     ListCell   *lc;

      /* Fetch opclass information */
      classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
*************** brinvalidate(Oid opclassoid)
*** 47,152 ****

      opfamilyoid = classform->opcfamily;
      opcintype = classform->opcintype;

!     ReleaseSysCache(classtup);

      /* Fetch all operators and support functions of the opfamily */
      oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
      proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));

!     /* We'll track the ops and functions belonging to the named opclass */
!     numclassops = 0;
!     classfuncbits = 0;
!
!     /* Check support functions */
      for (i = 0; i < proclist->n_members; i++)
      {
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);

!         /* Check that only allowed procedure numbers exist */
!         if (procform->amprocnum < 1 ||
!             procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %u contains invalid support number %d for procedure %u",
!                             opfamilyoid,
!                             procform->amprocnum, procform->amproc)));

!         /* Remember functions that are specifically for the named opclass */
!         if (procform->amproclefttype == opcintype &&
!             procform->amprocrighttype == opcintype)
!             classfuncbits |= (1 << procform->amprocnum);
      }

!     /* Check operators */
      for (i = 0; i < oprlist->n_members; i++)
      {
          HeapTuple    oprtup = &oprlist->members[i]->tuple;
          Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
-         bool        found = false;

!         /* TODO: Check that only allowed strategy numbers exist */
!         if (oprform->amopstrategy < 1)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %u contains invalid strategy number %d for operator %u",
!                             opfamilyoid,
!                             oprform->amopstrategy, oprform->amopopr)));
!
!         /* TODO: check more thoroughly for missing support functions */
!         for (j = 0; j < proclist->n_members; j++)
          {
!             HeapTuple    proctup = &proclist->members[j]->tuple;
!             Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
!
!             /* note only the operator's lefttype matters */
!             if (procform->amproclefttype == oprform->amoplefttype &&
!                 procform->amprocrighttype == oprform->amoplefttype)
!             {
!                 found = true;
!                 break;
!             }
          }

!         if (!found)
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!             errmsg("brin opfamily %u lacks support function for operator %u",
!                    opfamilyoid, oprform->amopopr)));

          /* brin doesn't support ORDER BY operators */
          if (oprform->amoppurpose != AMOP_SEARCH ||
              OidIsValid(oprform->amopsortfamily))
!             ereport(ERROR,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %u contains invalid ORDER BY specification for operator %u",
!                             opfamilyoid, oprform->amopopr)));

!         /* Count operators that are specifically for the named opclass */
!         if (oprform->amoplefttype == opcintype &&
!             oprform->amoprighttype == opcintype)
!             numclassops++;
      }

!     /* Check that the named opclass is complete */
!     if (numclassops == 0)
!         ereport(ERROR,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("brin opclass %u is missing operator(s)",
!                         opclassoid)));
      for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++)
      {
!         if ((classfuncbits & (1 << i)) != 0)
              continue;            /* got it */
!         ereport(ERROR,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!             errmsg("brin opclass %u is missing required support function %d",
!                    opclassoid, i)));
      }

      ReleaseCatCacheList(proclist);
      ReleaseCatCacheList(oprlist);

!     return true;
  }
--- 73,455 ----

      opfamilyoid = classform->opcfamily;
      opcintype = classform->opcintype;
+     opclassname = NameStr(classform->opcname);

!     /* Fetch opfamily information */
!     familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
!     if (!HeapTupleIsValid(familytup))
!         elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
!     familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
!
!     opfamilyname = NameStr(familyform->opfname);

      /* Fetch all operators and support functions of the opfamily */
      oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
      proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));

!     /* Check individual support functions */
      for (i = 0; i < proclist->n_members; i++)
      {
          HeapTuple    proctup = &proclist->members[i]->tuple;
          Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
+         bool        ok;

!         /* Check procedure numbers and function signatures */
!         switch (procform->amprocnum)
!         {
!             case BRIN_PROCNUM_OPCINFO:
!                 ok = check_func_signature(procform->amproc, INTERNALOID, 1,
!                                           INTERNALOID);
!                 break;
!             case BRIN_PROCNUM_ADDVALUE:
!                 ok = check_func_signature(procform->amproc, BOOLOID, 4,
!                                           INTERNALOID, INTERNALOID,
!                                           INTERNALOID, INTERNALOID);
!                 break;
!             case BRIN_PROCNUM_CONSISTENT:
!                 ok = check_func_signature(procform->amproc, BOOLOID, 3,
!                                           INTERNALOID, INTERNALOID,
!                                           INTERNALOID);
!                 break;
!             case BRIN_PROCNUM_UNION:
!                 ok = check_func_signature(procform->amproc, BOOLOID, 3,
!                                           INTERNALOID, INTERNALOID,
!                                           INTERNALOID);
!                 break;
!             default:
!                 /* Complain if it's not a valid optional proc number */
!                 if (procform->amprocnum < BRIN_FIRST_OPTIONAL_PROCNUM ||
!                     procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM)
!                 {
!                     ereport(INFO,
!                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                              errmsg("brin opfamily %s contains function %s with invalid support number %d",
!                                     opfamilyname,
!                                     format_procedure(procform->amproc),
!                                     procform->amprocnum)));
!                     result = false;
!                     continue;    /* omit bad proc numbers from allfuncs */
!                 }
!                 /* Can't check signatures of optional procs, so assume OK */
!                 ok = true;
!                 break;
!         }
!
!         if (!ok)
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %s contains function %s with wrong signature for support number %d",
!                             opfamilyname,
!                             format_procedure(procform->amproc),
!                             procform->amprocnum)));
!             result = false;
!         }

!         /* Track all valid procedure numbers seen in opfamily */
!         allfuncs |= ((uint64) 1) << procform->amprocnum;
      }

!     /* Check individual operators */
      for (i = 0; i < oprlist->n_members; i++)
      {
          HeapTuple    oprtup = &oprlist->members[i]->tuple;
          Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);

!         /* Check that only allowed strategy numbers exist */
!         if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %s contains operator %s with invalid strategy number %d",
!                             opfamilyname,
!                             format_operator(oprform->amopopr),
!                             oprform->amopstrategy)));
!             result = false;
!         }
!         else
          {
!             /*
!              * The set of operators supplied varies across BRIN opfamilies.
!              * Our plan is to identify all operator strategy numbers used in
!              * the opfamily and then complain about datatype combinations that
!              * are missing any operator(s).  However, consider only numbers
!              * that appear in some non-cross-type case, since cross-type
!              * operators may have unique strategies.  (This is not a great
!              * heuristic, in particular an erroneous number used in a
!              * cross-type operator will not get noticed; but the core BRIN
!              * opfamilies are messy enough to make it necessary.)
!              */
!             if (oprform->amoplefttype == oprform->amoprighttype)
!                 allops |= ((uint64) 1) << oprform->amopstrategy;
          }

!         /* Check operator signature --- same for all brin strategies */
!         if (!check_opr_signature(oprform->amopopr, BOOLOID,
!                                  oprform->amoplefttype,
!                                  oprform->amoprighttype))
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %s contains operator %s with wrong signature",
!                             opfamilyname,
!                             format_operator(oprform->amopopr))));
!             result = false;
!         }

          /* brin doesn't support ORDER BY operators */
          if (oprform->amoppurpose != AMOP_SEARCH ||
              OidIsValid(oprform->amopsortfamily))
!         {
!             ereport(INFO,
                      (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %s contains invalid ORDER BY specification for operator %s",
!                             opfamilyname,
!                             format_operator(oprform->amopopr))));
!             result = false;
!         }
!     }

!     /* Now check for inconsistent groups of operators/functions */
!     grouplist = identify_opfamily_groups(oprlist, proclist);
!     opclassgroup = NULL;
!     foreach(lc, grouplist)
!     {
!         OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
!
!         /* Remember the group exactly matching the test opclass */
!         if (thisgroup->lefttype == opcintype &&
!             thisgroup->righttype == opcintype)
!             opclassgroup = thisgroup;
!
!         /*
!          * Some BRIN opfamilies expect cross-type support functions to exist,
!          * and some don't.  We don't know exactly which are which, so if we
!          * find a cross-type operator for which there are no support functions
!          * at all, let it pass.  (Don't expect that all operators exist for
!          * such cross-type cases, either.)
!          */
!         if (thisgroup->functionset == 0 &&
!             thisgroup->lefttype != thisgroup->righttype)
!             continue;
!
!         /*
!          * Else complain if there seems to be an incomplete set of either
!          * operators or support functions for this datatype pair.
!          */
!         if (thisgroup->operatorset != allops)
!         {
!             ereport(INFO,
!                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %s is missing operator(s) for types %s and %s",
!                             opfamilyname,
!                             format_type_be(thisgroup->lefttype),
!                             format_type_be(thisgroup->righttype))));
!             result = false;
!         }
!         if (thisgroup->functionset != allfuncs)
!         {
!             ereport(INFO,
!                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                      errmsg("brin opfamily %s is missing support function(s) for types %s and %s",
!                             opfamilyname,
!                             format_type_be(thisgroup->lefttype),
!                             format_type_be(thisgroup->righttype))));
!             result = false;
!         }
      }

!     /* Check that the originally-named opclass is complete */
!     if (!opclassgroup || opclassgroup->operatorset != allops)
!     {
!         ereport(INFO,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!                  errmsg("brin opclass %s is missing operator(s)",
!                         opclassname)));
!         result = false;
!     }
      for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++)
      {
!         if (opclassgroup &&
!             (opclassgroup->functionset & (((int64) 1) << i)) != 0)
              continue;            /* got it */
!         ereport(INFO,
                  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
!             errmsg("brin opclass %s is missing required support function %d",
!                    opclassname, i)));
!         result = false;
      }

      ReleaseCatCacheList(proclist);
      ReleaseCatCacheList(oprlist);
+     ReleaseSysCache(familytup);
+     ReleaseSysCache(classtup);

!     return result;
! }
!
! static bool
! check_func_signature(Oid funcid, Oid restype, int nargs,...)
! {
!     bool        result = true;
!     HeapTuple    tp;
!     Form_pg_proc procform;
!     va_list        ap;
!     int            i;
!
!     tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
!     if (!HeapTupleIsValid(tp))
!         elog(ERROR, "cache lookup failed for function %u", funcid);
!     procform = (Form_pg_proc) GETSTRUCT(tp);
!
!     if (procform->prorettype != restype || procform->proretset ||
!         procform->pronargs != nargs)
!         result = false;
!
!     va_start(ap, nargs);
!     for (i = 0; i < nargs; i++)
!     {
!         Oid            argtype = va_arg(ap, Oid);
!
!         if (i < procform->pronargs &&
!             procform->proargtypes.values[i] != argtype)
!             result = false;
!     }
!     va_end(ap);
!
!     ReleaseSysCache(tp);
!     return result;
! }
!
! static bool
! check_opr_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype)
! {
!     bool        result = true;
!     HeapTuple    tp;
!     Form_pg_operator opform;
!
!     tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
!     if (!HeapTupleIsValid(tp))    /* shouldn't happen */
!         elog(ERROR, "cache lookup failed for operator %u", opno);
!     opform = (Form_pg_operator) GETSTRUCT(tp);
!
!     if (opform->oprresult != restype || opform->oprkind != 'b' ||
!         opform->oprleft != lefttype || opform->oprright != righttype)
!         result = false;
!
!     ReleaseSysCache(tp);
!     return result;
! }
!
! /*
!  * identify_opfamily_groups returns a List of OpFamilyOpFuncGroup structs,
!  * one for each combination of lefttype/righttype present in the family's
!  * operator and support function lists.  If amopstrategy K is present for
!  * this datatype combination, we set bit 1 << K in operatorset, and similarly
!  * for the support functions.  With uint64 fields we can handle operator and
!  * function numbers up to 63, which is plenty for the foreseeable future.
!  *
!  * The given CatCLists are expected to represent a single opfamily fetched
!  * from the AMOPSTRATEGY and AMPROCNUM caches, so that they will be in
!  * order by those caches' second and third cache keys, namely the datatypes.
!  */
! static List *
! identify_opfamily_groups(CatCList *oprlist, CatCList *proclist)
! {
!     List       *result = NIL;
!     OpFamilyOpFuncGroup *thisgroup;
!     Form_pg_amop oprform;
!     Form_pg_amproc procform;
!     int            io,
!                 ip;
!
!     /* We need the lists to be ordered; should be true in normal operation */
!     if (!oprlist->ordered || !proclist->ordered)
!         elog(ERROR, "cannot validate operator family without ordered data");
!
!     /*
!      * Advance through the lists concurrently.  Thanks to the ordering, we
!      * should see all operators and functions of a given datatype pair
!      * consecutively.
!      */
!     thisgroup = NULL;
!     io = ip = 0;
!     if (io < oprlist->n_members)
!     {
!         oprform = (Form_pg_amop) GETSTRUCT(&oprlist->members[io]->tuple);
!         io++;
!     }
!     else
!         oprform = NULL;
!     if (ip < proclist->n_members)
!     {
!         procform = (Form_pg_amproc) GETSTRUCT(&proclist->members[ip]->tuple);
!         ip++;
!     }
!     else
!         procform = NULL;
!
!     while (oprform || procform)
!     {
!         if (oprform && thisgroup &&
!             oprform->amoplefttype == thisgroup->lefttype &&
!             oprform->amoprighttype == thisgroup->righttype)
!         {
!             /* Operator belongs to current group; include it and advance */
!
!             /* Ignore strategy numbers outside supported range */
!             if (oprform->amopstrategy > 0 && oprform->amopstrategy < 64)
!                 thisgroup->operatorset |= ((uint64) 1) << oprform->amopstrategy;
!
!             if (io < oprlist->n_members)
!             {
!                 oprform = (Form_pg_amop) GETSTRUCT(&oprlist->members[io]->tuple);
!                 io++;
!             }
!             else
!                 oprform = NULL;
!             continue;
!         }
!
!         if (procform && thisgroup &&
!             procform->amproclefttype == thisgroup->lefttype &&
!             procform->amprocrighttype == thisgroup->righttype)
!         {
!             /* Procedure belongs to current group; include it and advance */
!
!             /* Ignore function numbers outside supported range */
!             if (procform->amprocnum > 0 && procform->amprocnum < 64)
!                 thisgroup->functionset |= ((uint64) 1) << procform->amprocnum;
!
!             if (ip < proclist->n_members)
!             {
!                 procform = (Form_pg_amproc) GETSTRUCT(&proclist->members[ip]->tuple);
!                 ip++;
!             }
!             else
!                 procform = NULL;
!             continue;
!         }
!
!         /* Time for a new group */
!         thisgroup = (OpFamilyOpFuncGroup *) palloc(sizeof(OpFamilyOpFuncGroup));
!         if (oprform &&
!             (!procform ||
!              (oprform->amoplefttype < procform->amproclefttype ||
!               (oprform->amoplefttype == procform->amproclefttype &&
!                oprform->amoprighttype < procform->amprocrighttype))))
!         {
!             thisgroup->lefttype = oprform->amoplefttype;
!             thisgroup->righttype = oprform->amoprighttype;
!         }
!         else
!         {
!             thisgroup->lefttype = procform->amproclefttype;
!             thisgroup->righttype = procform->amprocrighttype;
!         }
!         thisgroup->operatorset = thisgroup->functionset = 0;
!         result = lappend(result, thisgroup);
!     }
!
!     return result;
  }
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index a180d2b..cb26d79 100644
*** a/src/backend/utils/cache/lsyscache.c
--- b/src/backend/utils/cache/lsyscache.c
*************** get_opname(Oid opno)
*** 1103,1108 ****
--- 1103,1131 ----
  }

  /*
+  * get_op_rettype
+  *        Given operator oid, return the operator's result type.
+  */
+ Oid
+ get_op_rettype(Oid opno)
+ {
+     HeapTuple    tp;
+
+     tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
+     if (HeapTupleIsValid(tp))
+     {
+         Form_pg_operator optup = (Form_pg_operator) GETSTRUCT(tp);
+         Oid            result;
+
+         result = optup->oprresult;
+         ReleaseSysCache(tp);
+         return result;
+     }
+     else
+         return InvalidOid;
+ }
+
+ /*
   * op_input_types
   *
   *        Returns the left and right input datatypes for an operator
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 9efdea2..dcb8980 100644
*** a/src/include/utils/lsyscache.h
--- b/src/include/utils/lsyscache.h
*************** extern Oid    get_opclass_family(Oid opclas
*** 75,80 ****
--- 75,81 ----
  extern Oid    get_opclass_input_type(Oid opclass);
  extern RegProcedure get_opcode(Oid opno);
  extern char *get_opname(Oid opno);
+ extern Oid    get_op_rettype(Oid opno);
  extern void op_input_types(Oid opno, Oid *lefttype, Oid *righttype);
  extern bool op_mergejoinable(Oid opno, Oid inputtype);
  extern bool op_hashjoinable(Oid opno, Oid inputtype);

pgsql-hackers by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: No Issue Tracker - Say it Ain't So!
Next
From: Bruce Momjian
Date:
Subject: Re: Releasing in September