Re: Inlining comparators as a performance optimisation - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Inlining comparators as a performance optimisation
Date
Msg-id 1698.1323222387@sss.pgh.pa.us
Whole thread Raw
In response to Re: Inlining comparators as a performance optimisation  (Robert Haas <robertmhaas@gmail.com>)
Responses Re: Inlining comparators as a performance optimisation
List pgsql-hackers
Robert Haas <robertmhaas@gmail.com> writes:
> Works for me.  I think we should go ahead and get this part committed
> first, and then we can look at the inlining stuff as a further
> optimization for certain cases...

Yeah.  Attached is a second-draft patch, which I'm fairly happy with
except that the documentation hasn't been updated.  It extends the
changes into the executor as well as analyze.c, with the result that
we can get rid of some of the old infrastructure altogether.
(inlineApplySortFunction is still there for the moment, though.)
Also, I adopted a style similar to what we've done for inlined list
functions to make ApplyComparatorFunction inline-able by all callers.

There are three somewhat-independent lines of work to pursue from here:

1. Adding sortsupport infrastructure for more datatypes.
2. Revising nbtree and related code to use this infrastructure.
3. Integrating Peter's work into this framework.

I'll try to take care of #1 for at least a few key datatypes before
I commit, but I think #2 is best done as a separate patch, so I'll
postpone that till later.

            regards, tom lane

diff --git a/doc/src/sgml/xindex.sgml b/doc/src/sgml/xindex.sgml
index 28324361a950f31737c0cbb6909726d02ce1af5f..51f847c9f9777707756a823e1f76ef0275bceed7 100644
*** a/doc/src/sgml/xindex.sgml
--- b/doc/src/sgml/xindex.sgml
*************** DEFAULT FOR TYPE int4 USING btree FAMILY
*** 758,764 ****
    OPERATOR 3 = ,
    OPERATOR 4 >= ,
    OPERATOR 5 > ,
!   FUNCTION 1 btint4cmp(int4, int4) ;

  CREATE OPERATOR CLASS int2_ops
  DEFAULT FOR TYPE int2 USING btree FAMILY integer_ops AS
--- 758,765 ----
    OPERATOR 3 = ,
    OPERATOR 4 >= ,
    OPERATOR 5 > ,
!   FUNCTION 1 btint4cmp(int4, int4) ,
!   FUNCTION 2 btint4sortsupport(internal) ;

  CREATE OPERATOR CLASS int2_ops
  DEFAULT FOR TYPE int2 USING btree FAMILY integer_ops AS
diff --git a/src/backend/access/nbtree/nbtcompare.c b/src/backend/access/nbtree/nbtcompare.c
index 23f2b61fe902cff7eebcf8526cf8116d90be75e8..9e065f71e3a6a21349bc6e43c0bb922ae641a98e 100644
*** a/src/backend/access/nbtree/nbtcompare.c
--- b/src/backend/access/nbtree/nbtcompare.c
***************
*** 49,54 ****
--- 49,55 ----
  #include "postgres.h"

  #include "utils/builtins.h"
+ #include "utils/sortsupport.h"


  Datum
*************** btint4cmp(PG_FUNCTION_ARGS)
*** 83,88 ****
--- 84,112 ----
          PG_RETURN_INT32(-1);
  }

+ static int
+ btint4fastcmp(Datum x, Datum y, SortSupport ssup)
+ {
+     int32        a = DatumGetInt32(x);
+     int32        b = DatumGetInt32(y);
+
+     if (a > b)
+         return 1;
+     else if (a == b)
+         return 0;
+     else
+         return -1;
+ }
+
+ Datum
+ btint4sortsupport(PG_FUNCTION_ARGS)
+ {
+     SortSupport    ssup = (SortSupport) PG_GETARG_POINTER(0);
+
+     ssup->comparator = btint4fastcmp;
+     PG_RETURN_VOID();
+ }
+
  Datum
  btint8cmp(PG_FUNCTION_ARGS)
  {
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 314324618a8216e7adc1eb2c18175f9a6b2ae5dd..c3d3958da7628ca47493e53e3e18e087a93cbd70 100644
*** a/src/backend/commands/analyze.c
--- b/src/backend/commands/analyze.c
***************
*** 47,54 ****
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/pg_rusage.h"
  #include "utils/syscache.h"
- #include "utils/tuplesort.h"
  #include "utils/timestamp.h"
  #include "utils/tqual.h"

--- 47,54 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/pg_rusage.h"
+ #include "utils/sortsupport.h"
  #include "utils/syscache.h"
  #include "utils/timestamp.h"
  #include "utils/tqual.h"

*************** typedef struct
*** 1774,1781 ****

  typedef struct
  {
!     FmgrInfo   *cmpFn;
!     int            cmpFlags;
      int           *tupnoLink;
  } CompareScalarsContext;

--- 1774,1780 ----

  typedef struct
  {
!     SortSupport ssup;
      int           *tupnoLink;
  } CompareScalarsContext;

*************** compute_scalar_stats(VacAttrStatsP stats
*** 2222,2230 ****
      bool        is_varwidth = (!stats->attrtype->typbyval &&
                                 stats->attrtype->typlen < 0);
      double        corr_xysum;
!     Oid            cmpFn;
!     int            cmpFlags;
!     FmgrInfo    f_cmpfn;
      ScalarItem *values;
      int            values_cnt = 0;
      int           *tupnoLink;
--- 2221,2227 ----
      bool        is_varwidth = (!stats->attrtype->typbyval &&
                                 stats->attrtype->typlen < 0);
      double        corr_xysum;
!     SortSupportData ssup;
      ScalarItem *values;
      int            values_cnt = 0;
      int           *tupnoLink;
*************** compute_scalar_stats(VacAttrStatsP stats
*** 2238,2245 ****
      tupnoLink = (int *) palloc(samplerows * sizeof(int));
      track = (ScalarMCVItem *) palloc(num_mcv * sizeof(ScalarMCVItem));

!     SelectSortFunction(mystats->ltopr, false, &cmpFn, &cmpFlags);
!     fmgr_info(cmpFn, &f_cmpfn);

      /* Initial scan to find sortable values */
      for (i = 0; i < samplerows; i++)
--- 2235,2247 ----
      tupnoLink = (int *) palloc(samplerows * sizeof(int));
      track = (ScalarMCVItem *) palloc(num_mcv * sizeof(ScalarMCVItem));

!     memset(&ssup, 0, sizeof(ssup));
!     ssup.ssup_cxt = CurrentMemoryContext;
!     /* We always use the default collation for statistics */
!     ssup.ssup_collation = DEFAULT_COLLATION_OID;
!     ssup.ssup_nulls_first = false;
!
!     PrepareSortSupportFromOrderingOp(mystats->ltopr, &ssup);

      /* Initial scan to find sortable values */
      for (i = 0; i < samplerows; i++)
*************** compute_scalar_stats(VacAttrStatsP stats
*** 2307,2314 ****
          CompareScalarsContext cxt;

          /* Sort the collected values */
!         cxt.cmpFn = &f_cmpfn;
!         cxt.cmpFlags = cmpFlags;
          cxt.tupnoLink = tupnoLink;
          qsort_arg((void *) values, values_cnt, sizeof(ScalarItem),
                    compare_scalars, (void *) &cxt);
--- 2309,2315 ----
          CompareScalarsContext cxt;

          /* Sort the collected values */
!         cxt.ssup = &ssup;
          cxt.tupnoLink = tupnoLink;
          qsort_arg((void *) values, values_cnt, sizeof(ScalarItem),
                    compare_scalars, (void *) &cxt);
*************** compare_scalars(const void *a, const voi
*** 2712,2723 ****
      Datum        db = ((const ScalarItem *) b)->value;
      int            tb = ((const ScalarItem *) b)->tupno;
      CompareScalarsContext *cxt = (CompareScalarsContext *) arg;
!     int32        compare;

!     /* We always use the default collation for statistics */
!     compare = ApplySortFunction(cxt->cmpFn, cxt->cmpFlags,
!                                 DEFAULT_COLLATION_OID,
!                                 da, false, db, false);
      if (compare != 0)
          return compare;

--- 2713,2721 ----
      Datum        db = ((const ScalarItem *) b)->value;
      int            tb = ((const ScalarItem *) b)->tupno;
      CompareScalarsContext *cxt = (CompareScalarsContext *) arg;
!     int            compare;

!     compare = ApplySortComparator(da, false, db, false, cxt->ssup);
      if (compare != 0)
          return compare;

diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 43059664b9348ae462dcb6b1166124f4f0fde2ee..6065e2176ac118e135583fb1f0cdf72bb5f22252 100644
*** a/src/backend/executor/nodeMergeAppend.c
--- b/src/backend/executor/nodeMergeAppend.c
***************
*** 38,47 ****

  #include "postgres.h"

- #include "access/nbtree.h"
  #include "executor/execdebug.h"
  #include "executor/nodeMergeAppend.h"
- #include "utils/lsyscache.h"

  /*
   * It gets quite confusing having a heap array (indexed by integers) which
--- 38,45 ----
*************** ExecInitMergeAppend(MergeAppend *node, E
*** 128,165 ****
       * initialize sort-key information
       */
      mergestate->ms_nkeys = node->numCols;
!     mergestate->ms_scankeys = palloc0(sizeof(ScanKeyData) * node->numCols);

      for (i = 0; i < node->numCols; i++)
      {
!         Oid            sortFunction;
!         bool        reverse;
!         int            flags;
!
!         if (!get_compare_function_for_ordering_op(node->sortOperators[i],
!                                                   &sortFunction, &reverse))
!             elog(ERROR, "operator %u is not a valid ordering operator",
!                  node->sortOperators[i]);

!         /* We use btree's conventions for encoding directionality */
!         flags = 0;
!         if (reverse)
!             flags |= SK_BT_DESC;
!         if (node->nullsFirst[i])
!             flags |= SK_BT_NULLS_FIRST;

!         /*
!          * We needn't fill in sk_strategy or sk_subtype since these scankeys
!          * will never be passed to an index.
!          */
!         ScanKeyEntryInitialize(&mergestate->ms_scankeys[i],
!                                flags,
!                                node->sortColIdx[i],
!                                InvalidStrategy,
!                                InvalidOid,
!                                node->collations[i],
!                                sortFunction,
!                                (Datum) 0);
      }

      /*
--- 126,143 ----
       * initialize sort-key information
       */
      mergestate->ms_nkeys = node->numCols;
!     mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);

      for (i = 0; i < node->numCols; i++)
      {
!         SortSupport    sortKey = mergestate->ms_sortkeys + i;

!         sortKey->ssup_cxt = CurrentMemoryContext;
!         sortKey->ssup_collation = node->collations[i];
!         sortKey->ssup_nulls_first = node->nullsFirst[i];
!         sortKey->ssup_attno = node->sortColIdx[i];

!         PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
      }

      /*
*************** heap_compare_slots(MergeAppendState *nod
*** 298,342 ****

      for (nkey = 0; nkey < node->ms_nkeys; nkey++)
      {
!         ScanKey        scankey = node->ms_scankeys + nkey;
!         AttrNumber    attno = scankey->sk_attno;
          Datum        datum1,
                      datum2;
          bool        isNull1,
                      isNull2;
!         int32        compare;

          datum1 = slot_getattr(s1, attno, &isNull1);
          datum2 = slot_getattr(s2, attno, &isNull2);

!         if (isNull1)
!         {
!             if (isNull2)
!                 continue;        /* NULL "=" NULL */
!             else if (scankey->sk_flags & SK_BT_NULLS_FIRST)
!                 return -1;        /* NULL "<" NOT_NULL */
!             else
!                 return 1;        /* NULL ">" NOT_NULL */
!         }
!         else if (isNull2)
!         {
!             if (scankey->sk_flags & SK_BT_NULLS_FIRST)
!                 return 1;        /* NOT_NULL ">" NULL */
!             else
!                 return -1;        /* NOT_NULL "<" NULL */
!         }
!         else
!         {
!             compare = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
!                                                       scankey->sk_collation,
!                                                       datum1, datum2));
!             if (compare != 0)
!             {
!                 if (scankey->sk_flags & SK_BT_DESC)
!                     compare = -compare;
!                 return compare;
!             }
!         }
      }
      return 0;
  }
--- 276,297 ----

      for (nkey = 0; nkey < node->ms_nkeys; nkey++)
      {
!         SortSupport    sortKey = node->ms_sortkeys + nkey;
!         AttrNumber    attno = sortKey->ssup_attno;
          Datum        datum1,
                      datum2;
          bool        isNull1,
                      isNull2;
!         int            compare;

          datum1 = slot_getattr(s1, attno, &isNull1);
          datum2 = slot_getattr(s2, attno, &isNull2);

!         compare = ApplySortComparator(datum1, isNull1,
!                                       datum2, isNull2,
!                                       sortKey);
!         if (compare != 0)
!             return compare;
      }
      return 0;
  }
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index deaa79ed9fbddfa5397887439085f499a5981259..634931da4251056d1b6762144b03b42d4a47573d 100644
*** a/src/backend/executor/nodeMergejoin.c
--- b/src/backend/executor/nodeMergejoin.c
***************
*** 95,102 ****
  #include "access/nbtree.h"
  #include "executor/execdebug.h"
  #include "executor/nodeMergejoin.h"
- #include "miscadmin.h"
- #include "utils/acl.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"

--- 95,100 ----
*************** typedef struct MergeJoinClauseData
*** 135,147 ****
      bool        risnull;

      /*
!      * The comparison strategy in use, and the lookup info to let us call the
!      * btree comparison support function, and the collation to use.
       */
!     bool        reverse;        /* if true, negate the cmpfn's output */
!     bool        nulls_first;    /* if true, nulls sort low */
!     FmgrInfo    cmpfinfo;
!     Oid            collation;
  }    MergeJoinClauseData;

  /* Result type for MJEvalOuterValues and MJEvalInnerValues */
--- 133,142 ----
      bool        risnull;

      /*
!      * Everything we need to know to compare the left and right values is
!      * stored here.
       */
!     SortSupportData ssup;
  }    MergeJoinClauseData;

  /* Result type for MJEvalOuterValues and MJEvalInnerValues */
*************** MJExamineQuals(List *mergeclauses,
*** 203,210 ****
          int            op_strategy;
          Oid            op_lefttype;
          Oid            op_righttype;
!         RegProcedure cmpproc;
!         AclResult    aclresult;

          if (!IsA(qual, OpExpr))
              elog(ERROR, "mergejoin clause is not an OpExpr");
--- 198,204 ----
          int            op_strategy;
          Oid            op_lefttype;
          Oid            op_righttype;
!         Oid            sortfunc;

          if (!IsA(qual, OpExpr))
              elog(ERROR, "mergejoin clause is not an OpExpr");
*************** MJExamineQuals(List *mergeclauses,
*** 215,220 ****
--- 209,225 ----
          clause->lexpr = ExecInitExpr((Expr *) linitial(qual->args), parent);
          clause->rexpr = ExecInitExpr((Expr *) lsecond(qual->args), parent);

+         /* Set up sort support data */
+         clause->ssup.ssup_cxt = CurrentMemoryContext;
+         clause->ssup.ssup_collation = collation;
+         if (opstrategy == BTLessStrategyNumber)
+             clause->ssup.ssup_reverse = false;
+         else if (opstrategy == BTGreaterStrategyNumber)
+             clause->ssup.ssup_reverse = true;
+         else    /* planner screwed up */
+             elog(ERROR, "unsupported mergejoin strategy %d", opstrategy);
+         clause->ssup.ssup_nulls_first = nulls_first;
+
          /* Extract the operator's declared left/right datatypes */
          get_op_opfamily_properties(qual->opno, opfamily, false,
                                     &op_strategy,
*************** MJExamineQuals(List *mergeclauses,
*** 224,259 ****
              elog(ERROR, "cannot merge using non-equality operator %u",
                   qual->opno);

!         /* And get the matching support procedure (comparison function) */
!         cmpproc = get_opfamily_proc(opfamily,
!                                     op_lefttype,
!                                     op_righttype,
!                                     BTORDER_PROC);
!         if (!RegProcedureIsValid(cmpproc))        /* should not happen */
!             elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
!                  BTORDER_PROC, op_lefttype, op_righttype, opfamily);
!
!         /* Check permission to call cmp function */
!         aclresult = pg_proc_aclcheck(cmpproc, GetUserId(), ACL_EXECUTE);
!         if (aclresult != ACLCHECK_OK)
!             aclcheck_error(aclresult, ACL_KIND_PROC,
!                            get_func_name(cmpproc));
!
!         /* Set up the fmgr lookup information */
!         fmgr_info(cmpproc, &(clause->cmpfinfo));
!
!         /* Fill the additional comparison-strategy flags */
!         if (opstrategy == BTLessStrategyNumber)
!             clause->reverse = false;
!         else if (opstrategy == BTGreaterStrategyNumber)
!             clause->reverse = true;
!         else    /* planner screwed up */
!             elog(ERROR, "unsupported mergejoin strategy %d", opstrategy);
!
!         clause->nulls_first = nulls_first;
!
!         /* ... and the collation too */
!         clause->collation = collation;

          iClause++;
      }
--- 229,258 ----
              elog(ERROR, "cannot merge using non-equality operator %u",
                   qual->opno);

!         /* And get the matching support or comparison function */
!         sortfunc = get_opfamily_proc(opfamily,
!                                      op_lefttype,
!                                      op_righttype,
!                                      BTSORTSUPPORT_PROC);
!         if (OidIsValid(sortfunc))
!         {
!             /* The sort support function should provide a comparator */
!             OidFunctionCall1(sortfunc, PointerGetDatum(&clause->ssup));
!             Assert(clause->ssup.comparator != NULL);
!         }
!         else
!         {
!             /* opfamily doesn't provide sort support, get comparison func */
!             sortfunc = get_opfamily_proc(opfamily,
!                                          op_lefttype,
!                                          op_righttype,
!                                          BTORDER_PROC);
!             if (!OidIsValid(sortfunc))        /* should not happen */
!                 elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
!                      BTORDER_PROC, op_lefttype, op_righttype, opfamily);
!             /* We'll use a shim to call the old-style btree comparator */
!             PrepareSortSupportComparisonShim(sortfunc, &clause->ssup);
!         }

          iClause++;
      }
*************** MJEvalOuterValues(MergeJoinState *merges
*** 310,316 ****
          if (clause->lisnull)
          {
              /* match is impossible; can we end the join early? */
!             if (i == 0 && !clause->nulls_first && !mergestate->mj_FillOuter)
                  result = MJEVAL_ENDOFJOIN;
              else if (result == MJEVAL_MATCHABLE)
                  result = MJEVAL_NONMATCHABLE;
--- 309,316 ----
          if (clause->lisnull)
          {
              /* match is impossible; can we end the join early? */
!             if (i == 0 && !clause->ssup.ssup_nulls_first &&
!                 !mergestate->mj_FillOuter)
                  result = MJEVAL_ENDOFJOIN;
              else if (result == MJEVAL_MATCHABLE)
                  result = MJEVAL_NONMATCHABLE;
*************** MJEvalInnerValues(MergeJoinState *merges
*** 356,362 ****
          if (clause->risnull)
          {
              /* match is impossible; can we end the join early? */
!             if (i == 0 && !clause->nulls_first && !mergestate->mj_FillInner)
                  result = MJEVAL_ENDOFJOIN;
              else if (result == MJEVAL_MATCHABLE)
                  result = MJEVAL_NONMATCHABLE;
--- 356,363 ----
          if (clause->risnull)
          {
              /* match is impossible; can we end the join early? */
!             if (i == 0 && !clause->ssup.ssup_nulls_first &&
!                 !mergestate->mj_FillInner)
                  result = MJEVAL_ENDOFJOIN;
              else if (result == MJEVAL_MATCHABLE)
                  result = MJEVAL_NONMATCHABLE;
*************** MJEvalInnerValues(MergeJoinState *merges
*** 373,392 ****
   *
   * Compare the mergejoinable values of the current two input tuples
   * and return 0 if they are equal (ie, the mergejoin equalities all
!  * succeed), +1 if outer > inner, -1 if outer < inner.
   *
   * MJEvalOuterValues and MJEvalInnerValues must already have been called
   * for the current outer and inner tuples, respectively.
   */
! static int32
  MJCompare(MergeJoinState *mergestate)
  {
!     int32        result = 0;
      bool        nulleqnull = false;
      ExprContext *econtext = mergestate->js.ps.ps_ExprContext;
      int            i;
      MemoryContext oldContext;
-     FunctionCallInfoData fcinfo;

      /*
       * Call the comparison functions in short-lived context, in case they leak
--- 374,392 ----
   *
   * Compare the mergejoinable values of the current two input tuples
   * and return 0 if they are equal (ie, the mergejoin equalities all
!  * succeed), >0 if outer > inner, <0 if outer < inner.
   *
   * MJEvalOuterValues and MJEvalInnerValues must already have been called
   * for the current outer and inner tuples, respectively.
   */
! static int
  MJCompare(MergeJoinState *mergestate)
  {
!     int            result = 0;
      bool        nulleqnull = false;
      ExprContext *econtext = mergestate->js.ps.ps_ExprContext;
      int            i;
      MemoryContext oldContext;

      /*
       * Call the comparison functions in short-lived context, in case they leak
*************** MJCompare(MergeJoinState *mergestate)
*** 399,460 ****
      for (i = 0; i < mergestate->mj_NumClauses; i++)
      {
          MergeJoinClause clause = &mergestate->mj_Clauses[i];
-         Datum        fresult;

          /*
!          * Deal with null inputs.
!          */
!         if (clause->lisnull)
!         {
!             if (clause->risnull)
!             {
!                 nulleqnull = true;        /* NULL "=" NULL */
!                 continue;
!             }
!             if (clause->nulls_first)
!                 result = -1;    /* NULL "<" NOT_NULL */
!             else
!                 result = 1;        /* NULL ">" NOT_NULL */
!             break;
!         }
!         if (clause->risnull)
!         {
!             if (clause->nulls_first)
!                 result = 1;        /* NOT_NULL ">" NULL */
!             else
!                 result = -1;    /* NOT_NULL "<" NULL */
!             break;
!         }
!
!         /*
!          * OK to call the comparison function.
           */
!         InitFunctionCallInfoData(fcinfo, &(clause->cmpfinfo), 2,
!                                  clause->collation, NULL, NULL);
!         fcinfo.arg[0] = clause->ldatum;
!         fcinfo.arg[1] = clause->rdatum;
!         fcinfo.argnull[0] = false;
!         fcinfo.argnull[1] = false;
!         fresult = FunctionCallInvoke(&fcinfo);
!         if (fcinfo.isnull)
          {
!             nulleqnull = true;    /* treat like NULL = NULL */
              continue;
          }
-         result = DatumGetInt32(fresult);

!         if (clause->reverse)
!             result = -result;

          if (result != 0)
              break;
      }

      /*
!      * If we had any null comparison results or NULL-vs-NULL inputs, we do not
!      * want to report that the tuples are equal.  Instead, if result is still
!      * 0, change it to +1.    This will result in advancing the inner side of
!      * the join.
       *
       * Likewise, if there was a constant-false joinqual, do not report
       * equality.  We have to check this as part of the mergequals, else the
--- 399,426 ----
      for (i = 0; i < mergestate->mj_NumClauses; i++)
      {
          MergeJoinClause clause = &mergestate->mj_Clauses[i];

          /*
!          * Special case for NULL-vs-NULL, else use standard comparison.
           */
!         if (clause->lisnull && clause->risnull)
          {
!             nulleqnull = true;        /* NULL "=" NULL */
              continue;
          }

!         result = ApplySortComparator(clause->ldatum, clause->lisnull,
!                                      clause->rdatum, clause->risnull,
!                                      &clause->ssup);

          if (result != 0)
              break;
      }

      /*
!      * If we had any NULL-vs-NULL inputs, we do not want to report that the
!      * tuples are equal.  Instead, if result is still 0, change it to +1.
!      * This will result in advancing the inner side of the join.
       *
       * Likewise, if there was a constant-false joinqual, do not report
       * equality.  We have to check this as part of the mergequals, else the
*************** ExecMergeJoin(MergeJoinState *node)
*** 647,653 ****
      List       *joinqual;
      List       *otherqual;
      bool        qualResult;
!     int32        compareResult;
      PlanState  *innerPlan;
      TupleTableSlot *innerTupleSlot;
      PlanState  *outerPlan;
--- 613,619 ----
      List       *joinqual;
      List       *otherqual;
      bool        qualResult;
!     int            compareResult;
      PlanState  *innerPlan;
      TupleTableSlot *innerTupleSlot;
      PlanState  *outerPlan;
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index cb341b8db679382c3ab8de31ff7a0bb57bc8dc50..7a4306e93f5545764cbd02e6aa042120b385790c 100644
*** a/src/backend/utils/cache/lsyscache.c
--- b/src/backend/utils/cache/lsyscache.c
*************** get_ordering_op_properties(Oid opno,
*** 244,262 ****
  }

  /*
!  * get_compare_function_for_ordering_op
!  *        Get the OID of the datatype-specific btree comparison function
   *        associated with an ordering operator (a "<" or ">" operator).
   *
!  * *cmpfunc receives the comparison function OID.
   * *reverse is set FALSE if the operator is "<", TRUE if it's ">"
!  * (indicating the comparison result must be negated before use).
   *
   * Returns TRUE if successful, FALSE if no btree function can be found.
   * (This indicates that the operator is not a valid ordering operator.)
   */
  bool
! get_compare_function_for_ordering_op(Oid opno, Oid *cmpfunc, bool *reverse)
  {
      Oid            opfamily;
      Oid            opcintype;
--- 244,265 ----
  }

  /*
!  * get_sort_function_for_ordering_op
!  *        Get the OID of the datatype-specific btree sort support function,
!  *        or if there is none, the btree comparison function,
   *        associated with an ordering operator (a "<" or ">" operator).
   *
!  * *sortfunc receives the support or comparison function OID.
!  * *issupport is set TRUE if it's a support func, FALSE if a comparison func.
   * *reverse is set FALSE if the operator is "<", TRUE if it's ">"
!  * (indicating that comparison results must be negated before use).
   *
   * Returns TRUE if successful, FALSE if no btree function can be found.
   * (This indicates that the operator is not a valid ordering operator.)
   */
  bool
! get_sort_function_for_ordering_op(Oid opno, Oid *sortfunc,
!                                   bool *issupport, bool *reverse)
  {
      Oid            opfamily;
      Oid            opcintype;
*************** get_compare_function_for_ordering_op(Oid
*** 267,287 ****
                                     &opfamily, &opcintype, &strategy))
      {
          /* Found a suitable opfamily, get matching support function */
!         *cmpfunc = get_opfamily_proc(opfamily,
!                                      opcintype,
!                                      opcintype,
!                                      BTORDER_PROC);
!
!         if (!OidIsValid(*cmpfunc))        /* should not happen */
!             elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
!                  BTORDER_PROC, opcintype, opcintype, opfamily);
          *reverse = (strategy == BTGreaterStrategyNumber);
          return true;
      }

      /* ensure outputs are set on failure */
!     *cmpfunc = InvalidOid;
!
      *reverse = false;
      return false;
  }
--- 270,300 ----
                                     &opfamily, &opcintype, &strategy))
      {
          /* Found a suitable opfamily, get matching support function */
!         *sortfunc = get_opfamily_proc(opfamily,
!                                       opcintype,
!                                       opcintype,
!                                       BTSORTSUPPORT_PROC);
!         if (OidIsValid(*sortfunc))
!             *issupport = true;
!         else
!         {
!             /* opfamily doesn't provide sort support, get comparison func */
!             *sortfunc = get_opfamily_proc(opfamily,
!                                           opcintype,
!                                           opcintype,
!                                           BTORDER_PROC);
!             if (!OidIsValid(*sortfunc))        /* should not happen */
!                 elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
!                      BTORDER_PROC, opcintype, opcintype, opfamily);
!             *issupport = false;
!         }
          *reverse = (strategy == BTGreaterStrategyNumber);
          return true;
      }

      /* ensure outputs are set on failure */
!     *sortfunc = InvalidOid;
!     *issupport = false;
      *reverse = false;
      return false;
  }
diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile
index 9e20ecb3267baed7c95b607d8f711a2ed9a3a81b..2ef4965ee6db442f3d2e94903d76a4cdb69db521 100644
*** a/src/backend/utils/sort/Makefile
--- b/src/backend/utils/sort/Makefile
*************** subdir = src/backend/utils/sort
*** 12,17 ****
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global

! OBJS = logtape.o tuplesort.o tuplestore.o

  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global

! OBJS = logtape.o sortsupport.o tuplesort.o tuplestore.o

  include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/utils/sort/sortsupport.c b/src/backend/utils/sort/sortsupport.c
index ...4a87f11827f36e81708a3ee0a9031e04832b24e0 .
*** a/src/backend/utils/sort/sortsupport.c
--- b/src/backend/utils/sort/sortsupport.c
***************
*** 0 ****
--- 1,160 ----
+ /*-------------------------------------------------------------------------
+  *
+  * sortsupport.c
+  *      Support routines for accelerated sorting.
+  *
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *      src/backend/utils/sort/sortsupport.c
+  *
+  *-------------------------------------------------------------------------
+  */
+
+ #include "postgres.h"
+
+ #include "fmgr.h"
+ #include "utils/lsyscache.h"
+ #include "utils/sortsupport.h"
+
+
+ /* Info needed to use an old-style comparison function as a sort comparator */
+ typedef struct
+ {
+     FunctionCallInfoData fcinfo;    /* reusable callinfo structure */
+     FmgrInfo    flinfo;                /* lookup data for comparison function */
+ } SortShimExtra;
+
+
+ /*
+  * sortsupport.h defines inline versions of these functions if allowed by the
+  * compiler; in which case the definitions below are skipped.
+  */
+ #ifndef USE_INLINE
+
+ /*
+  * Apply a sort comparator function and return a 3-way comparison result.
+  * This takes care of handling reverse-sort and NULLs-ordering properly.
+  */
+ int
+ ApplySortComparator(Datum datum1, bool isNull1,
+                     Datum datum2, bool isNull2,
+                     SortSupport ssup)
+ {
+     int            compare;
+
+     if (isNull1)
+     {
+         if (isNull2)
+             compare = 0;        /* NULL "=" NULL */
+         else if (ssup->ssup_nulls_first)
+             compare = -1;        /* NULL "<" NOT_NULL */
+         else
+             compare = 1;        /* NULL ">" NOT_NULL */
+     }
+     else if (isNull2)
+     {
+         if (ssup->ssup_nulls_first)
+             compare = 1;        /* NOT_NULL ">" NULL */
+         else
+             compare = -1;        /* NOT_NULL "<" NULL */
+     }
+     else
+     {
+         compare = (*ssup->comparator) (datum1, datum2, ssup);
+         if (ssup->ssup_reverse)
+             compare = -compare;
+     }
+
+     return compare;
+ }
+
+ #endif   /* ! USE_INLINE */
+
+ /*
+  * Shim function for calling an old-style comparator
+  *
+  * This is essentially an inlined version of FunctionCall2Coll(), except
+  * we assume that the FunctionCallInfoData was already mostly set up by
+  * PrepareSortSupportComparisonShim.
+  */
+ static int
+ comparison_shim(Datum x, Datum y, SortSupport ssup)
+ {
+     SortShimExtra *extra = (SortShimExtra *) ssup->ssup_extra;
+     Datum        result;
+
+     extra->fcinfo.arg[0] = x;
+     extra->fcinfo.arg[1] = y;
+
+     /* just for paranoia's sake, we reset isnull each time */
+     extra->fcinfo.isnull = false;
+
+     result = FunctionCallInvoke(&extra->fcinfo);
+
+     /* Check for null result, since caller is clearly not expecting one */
+     if (extra->fcinfo.isnull)
+         elog(ERROR, "function %u returned NULL", extra->flinfo.fn_oid);
+
+     return result;
+ }
+
+ /*
+  * Set up a shim function to allow use of an old-style btree comparison
+  * function as if it were a sort support comparator.
+  */
+ void
+ PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup)
+ {
+     SortShimExtra   *extra;
+
+     extra = (SortShimExtra *) MemoryContextAlloc(ssup->ssup_cxt,
+                                                  sizeof(SortShimExtra));
+
+     /* Lookup the comparison function */
+     fmgr_info_cxt(cmpFunc, &extra->flinfo, ssup->ssup_cxt);
+
+     /* We can initialize the callinfo just once and re-use it */
+     InitFunctionCallInfoData(extra->fcinfo, &extra->flinfo, 2,
+                              ssup->ssup_collation, NULL, NULL);
+     extra->fcinfo.argnull[0] = false;
+     extra->fcinfo.argnull[1] = false;
+
+     ssup->ssup_extra = extra;
+     ssup->comparator = comparison_shim;
+ }
+
+ /*
+  * Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
+  *
+  * Caller must previously have zeroed the SortSupportData structure and then
+  * filled in ssup_cxt, ssup_collation, and ssup_nulls_first.  This will fill
+  * in ssup_reverse as well as the comparator function pointer.
+  */
+ void
+ PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
+ {
+     Oid            sortFunction;
+     bool        issupport;
+
+     if (!get_sort_function_for_ordering_op(orderingOp,
+                                            &sortFunction,
+                                            &issupport,
+                                            &ssup->ssup_reverse))
+         elog(ERROR, "operator %u is not a valid ordering operator",
+              orderingOp);
+
+     if (issupport)
+     {
+         /* The sort support function should provide a comparator */
+         OidFunctionCall1(sortFunction, PointerGetDatum(ssup));
+         Assert(ssup->comparator != NULL);
+     }
+     else
+     {
+         /* We'll use a shim to call the old-style btree comparator */
+         PrepareSortSupportComparisonShim(sortFunction, ssup);
+     }
+ }
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 3505236e5d3b9c753fed11822e7d4b147b0db0eb..28d585fc3b5f9a501253dd4b9d9042e0e33d1d0e 100644
*** a/src/backend/utils/sort/tuplesort.c
--- b/src/backend/utils/sort/tuplesort.c
***************
*** 112,117 ****
--- 112,118 ----
  #include "utils/memutils.h"
  #include "utils/pg_rusage.h"
  #include "utils/rel.h"
+ #include "utils/sortsupport.h"
  #include "utils/tuplesort.h"


*************** struct Tuplesortstate
*** 339,345 ****
       * tuplesort_begin_heap and used only by the MinimalTuple routines.
       */
      TupleDesc    tupDesc;
!     ScanKey        scanKeys;        /* array of length nKeys */

      /*
       * These variables are specific to the CLUSTER case; they are set by
--- 340,346 ----
       * tuplesort_begin_heap and used only by the MinimalTuple routines.
       */
      TupleDesc    tupDesc;
!     SortSupport    sortKeys;        /* array of length nKeys */

      /*
       * These variables are specific to the CLUSTER case; they are set by
*************** struct Tuplesortstate
*** 367,375 ****
       * tuplesort_begin_datum and used only by the DatumTuple routines.
       */
      Oid            datumType;
!     FmgrInfo    sortOpFn;        /* cached lookup data for sortOperator */
!     int            sortFnFlags;    /* equivalent to sk_flags */
!     Oid            sortCollation;    /* equivalent to sk_collation */
      /* we need typelen and byval in order to know how to copy the Datums. */
      int            datumTypeLen;
      bool        datumTypeByVal;
--- 368,374 ----
       * tuplesort_begin_datum and used only by the DatumTuple routines.
       */
      Oid            datumType;
!     SortSupport    datumKey;
      /* we need typelen and byval in order to know how to copy the Datums. */
      int            datumTypeLen;
      bool        datumTypeByVal;
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 613,653 ****
      state->reversedirection = reversedirection_heap;

      state->tupDesc = tupDesc;    /* assume we need not copy tupDesc */
!     state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));

      for (i = 0; i < nkeys; i++)
      {
!         Oid            sortFunction;
!         bool        reverse;
!         int            flags;

          AssertArg(attNums[i] != 0);
          AssertArg(sortOperators[i] != 0);

!         if (!get_compare_function_for_ordering_op(sortOperators[i],
!                                                   &sortFunction, &reverse))
!             elog(ERROR, "operator %u is not a valid ordering operator",
!                  sortOperators[i]);
!
!         /* We use btree's conventions for encoding directionality */
!         flags = 0;
!         if (reverse)
!             flags |= SK_BT_DESC;
!         if (nullsFirstFlags[i])
!             flags |= SK_BT_NULLS_FIRST;

!         /*
!          * We needn't fill in sk_strategy or sk_subtype since these scankeys
!          * will never be passed to an index.
!          */
!         ScanKeyEntryInitialize(&state->scanKeys[i],
!                                flags,
!                                attNums[i],
!                                InvalidStrategy,
!                                InvalidOid,
!                                sortCollations[i],
!                                sortFunction,
!                                (Datum) 0);
      }

      MemoryContextSwitchTo(oldcontext);
--- 612,634 ----
      state->reversedirection = reversedirection_heap;

      state->tupDesc = tupDesc;    /* assume we need not copy tupDesc */
!
!     /* Prepare SortSupport data for each column */
!     state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));

      for (i = 0; i < nkeys; i++)
      {
!         SortSupport    sortKey = state->sortKeys + i;

          AssertArg(attNums[i] != 0);
          AssertArg(sortOperators[i] != 0);

!         sortKey->ssup_cxt = CurrentMemoryContext;
!         sortKey->ssup_collation = sortCollations[i];
!         sortKey->ssup_nulls_first = nullsFirstFlags[i];
!         sortKey->ssup_attno = attNums[i];

!         PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
      }

      MemoryContextSwitchTo(oldcontext);
*************** tuplesort_begin_datum(Oid datumType, Oid
*** 799,806 ****
  {
      Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
      MemoryContext oldcontext;
-     Oid            sortFunction;
-     bool        reverse;
      int16        typlen;
      bool        typbyval;

--- 780,785 ----
*************** tuplesort_begin_datum(Oid datumType, Oid
*** 829,846 ****

      state->datumType = datumType;

!     /* lookup the ordering function */
!     if (!get_compare_function_for_ordering_op(sortOperator,
!                                               &sortFunction, &reverse))
!         elog(ERROR, "operator %u is not a valid ordering operator",
!              sortOperator);
!     fmgr_info(sortFunction, &state->sortOpFn);

!     /* set ordering flags and collation */
!     state->sortFnFlags = reverse ? SK_BT_DESC : 0;
!     if (nullsFirstFlag)
!         state->sortFnFlags |= SK_BT_NULLS_FIRST;
!     state->sortCollation = sortCollation;

      /* lookup necessary attributes of the datum type */
      get_typlenbyval(datumType, &typlen, &typbyval);
--- 808,821 ----

      state->datumType = datumType;

!     /* Prepare SortSupport data */
!     state->datumKey = (SortSupport) palloc0(sizeof(SortSupportData));

!     state->datumKey->ssup_cxt = CurrentMemoryContext;
!     state->datumKey->ssup_collation = sortCollation;
!     state->datumKey->ssup_nulls_first = nullsFirstFlag;
!
!     PrepareSortSupportFromOrderingOp(sortOperator, state->datumKey);

      /* lookup necessary attributes of the datum type */
      get_typlenbyval(datumType, &typlen, &typbyval);
*************** markrunend(Tuplesortstate *state, int ta
*** 2605,2633 ****


  /*
-  * Set up for an external caller of ApplySortFunction.    This function
-  * basically just exists to localize knowledge of the encoding of sk_flags
-  * used in this module.
-  */
- void
- SelectSortFunction(Oid sortOperator,
-                    bool nulls_first,
-                    Oid *sortFunction,
-                    int *sortFlags)
- {
-     bool        reverse;
-
-     if (!get_compare_function_for_ordering_op(sortOperator,
-                                               sortFunction, &reverse))
-         elog(ERROR, "operator %u is not a valid ordering operator",
-              sortOperator);
-
-     *sortFlags = reverse ? SK_BT_DESC : 0;
-     if (nulls_first)
-         *sortFlags |= SK_BT_NULLS_FIRST;
- }
-
- /*
   * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
   */
  static inline Datum
--- 2580,2585 ----
*************** inlineApplySortFunction(FmgrInfo *sortFu
*** 2693,2712 ****
      return compare;
  }

- /*
-  * Non-inline ApplySortFunction() --- this is needed only to conform to
-  * C99's brain-dead notions about how to implement inline functions...
-  */
- int32
- ApplySortFunction(FmgrInfo *sortFunction, int sortFlags, Oid collation,
-                   Datum datum1, bool isNull1,
-                   Datum datum2, bool isNull2)
- {
-     return inlineApplySortFunction(sortFunction, sortFlags, collation,
-                                    datum1, isNull1,
-                                    datum2, isNull2);
- }
-

  /*
   * Routines specialized for HeapTuple (actually MinimalTuple) case
--- 2645,2650 ----
*************** ApplySortFunction(FmgrInfo *sortFunction
*** 2715,2721 ****
  static int
  comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
  {
!     ScanKey        scanKey = state->scanKeys;
      HeapTupleData ltup;
      HeapTupleData rtup;
      TupleDesc    tupDesc;
--- 2653,2659 ----
  static int
  comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
  {
!     SortSupport    sortKey = state->sortKeys;
      HeapTupleData ltup;
      HeapTupleData rtup;
      TupleDesc    tupDesc;
*************** comparetup_heap(const SortTuple *a, cons
*** 2726,2735 ****
      CHECK_FOR_INTERRUPTS();

      /* Compare the leading sort key */
!     compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
!                                       scanKey->sk_collation,
!                                       a->datum1, a->isnull1,
!                                       b->datum1, b->isnull1);
      if (compare != 0)
          return compare;

--- 2664,2672 ----
      CHECK_FOR_INTERRUPTS();

      /* Compare the leading sort key */
!     compare = ApplySortComparator(a->datum1, a->isnull1,
!                                   b->datum1, b->isnull1,
!                                   sortKey);
      if (compare != 0)
          return compare;

*************** comparetup_heap(const SortTuple *a, cons
*** 2739,2748 ****
      rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
      rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
      tupDesc = state->tupDesc;
!     scanKey++;
!     for (nkey = 1; nkey < state->nKeys; nkey++, scanKey++)
      {
!         AttrNumber    attno = scanKey->sk_attno;
          Datum        datum1,
                      datum2;
          bool        isnull1,
--- 2676,2685 ----
      rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
      rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
      tupDesc = state->tupDesc;
!     sortKey++;
!     for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
      {
!         AttrNumber    attno = sortKey->ssup_attno;
          Datum        datum1,
                      datum2;
          bool        isnull1,
*************** comparetup_heap(const SortTuple *a, cons
*** 2751,2760 ****
          datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
          datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);

!         compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
!                                           scanKey->sk_collation,
!                                           datum1, isnull1,
!                                           datum2, isnull2);
          if (compare != 0)
              return compare;
      }
--- 2688,2696 ----
          datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
          datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);

!         compare = ApplySortComparator(datum1, isnull1,
!                                       datum2, isnull2,
!                                       sortKey);
          if (compare != 0)
              return compare;
      }
*************** copytup_heap(Tuplesortstate *state, Sort
*** 2781,2787 ****
      htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
      htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
      stup->datum1 = heap_getattr(&htup,
!                                 state->scanKeys[0].sk_attno,
                                  state->tupDesc,
                                  &stup->isnull1);
  }
--- 2717,2723 ----
      htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
      htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
      stup->datum1 = heap_getattr(&htup,
!                                 state->sortKeys[0].ssup_attno,
                                  state->tupDesc,
                                  &stup->isnull1);
  }
*************** readtup_heap(Tuplesortstate *state, Sort
*** 2833,2839 ****
      htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
      htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
      stup->datum1 = heap_getattr(&htup,
!                                 state->scanKeys[0].sk_attno,
                                  state->tupDesc,
                                  &stup->isnull1);
  }
--- 2769,2775 ----
      htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
      htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
      stup->datum1 = heap_getattr(&htup,
!                                 state->sortKeys[0].ssup_attno,
                                  state->tupDesc,
                                  &stup->isnull1);
  }
*************** readtup_heap(Tuplesortstate *state, Sort
*** 2841,2852 ****
  static void
  reversedirection_heap(Tuplesortstate *state)
  {
!     ScanKey        scanKey = state->scanKeys;
      int            nkey;

!     for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
      {
!         scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
      }
  }

--- 2777,2789 ----
  static void
  reversedirection_heap(Tuplesortstate *state)
  {
!     SortSupport    sortKey = state->sortKeys;
      int            nkey;

!     for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
      {
!         sortKey->ssup_reverse = !sortKey->ssup_reverse;
!         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
      }
  }

*************** comparetup_datum(const SortTuple *a, con
*** 3297,3306 ****
      /* Allow interrupting long sorts */
      CHECK_FOR_INTERRUPTS();

!     return inlineApplySortFunction(&state->sortOpFn, state->sortFnFlags,
!                                    state->sortCollation,
!                                    a->datum1, a->isnull1,
!                                    b->datum1, b->isnull1);
  }

  static void
--- 3234,3242 ----
      /* Allow interrupting long sorts */
      CHECK_FOR_INTERRUPTS();

!     return ApplySortComparator(a->datum1, a->isnull1,
!                                b->datum1, b->isnull1,
!                                state->datumKey);
  }

  static void
*************** readtup_datum(Tuplesortstate *state, Sor
*** 3392,3398 ****
  static void
  reversedirection_datum(Tuplesortstate *state)
  {
!     state->sortFnFlags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
  }

  /*
--- 3328,3335 ----
  static void
  reversedirection_datum(Tuplesortstate *state)
  {
!     state->datumKey->ssup_reverse = !state->datumKey->ssup_reverse;
!     state->datumKey->ssup_nulls_first = !state->datumKey->ssup_nulls_first;
  }

  /*
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 347d9423ba3d092235033d2447fe12f21f43011d..9a751ce100455f81740fa29491d03b050572bee9 100644
*** a/src/include/access/nbtree.h
--- b/src/include/access/nbtree.h
*************** typedef struct xl_btree_newroot
*** 417,429 ****

  /*
   *    When a new operator class is declared, we require that the user
!  *    supply us with an amproc procedure for determining whether, for
!  *    two keys a and b, a < b, a = b, or a > b.  This routine must
!  *    return < 0, 0, > 0, respectively, in these three cases.  Since we
!  *    only have one such proc in amproc, it's number 1.
   */

! #define BTORDER_PROC    1

  /*
   *    We need to be able to tell the difference between read and write
--- 417,434 ----

  /*
   *    When a new operator class is declared, we require that the user
!  *    supply us with an amproc procedure (BTORDER_PROC) for determining
!  *    whether, for two keys a and b, a < b, a = b, or a > b.  This routine
!  *    must return < 0, 0, > 0, respectively, in these three cases.  (It must
!  *    not return INT_MIN, since we may negate the result before using it.)
!  *
!  *    To facilitate accelerated sorting, an operator class may choose to
!  *    offer a second procedure (BTSORTSUPPORT_PROC).  For full details, see
!  *    src/include/utils/sortsupport.h.
   */

! #define BTORDER_PROC        1
! #define BTSORTSUPPORT_PROC    2

  /*
   *    We need to be able to tell the difference between read and write
diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h
index 8b075d30d689a9f7fa022d3f554626b6f5086883..ddacdf274c49ac7e83b941aa983dbf96214f5812 100644
*** a/src/include/catalog/pg_am.h
--- b/src/include/catalog/pg_am.h
*************** typedef FormData_pg_am *Form_pg_am;
*** 117,123 ****
   * ----------------
   */

! DATA(insert OID = 403 (  btree    5 1 t f t t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan
btendscanbtmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcostestimate btoptions )); 
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
  DATA(insert OID = 405 (  hash    1 1 f f t f f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap
hashrescanhashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup
hashcostestimatehashoptions )); 
--- 117,123 ----
   * ----------------
   */

! DATA(insert OID = 403 (  btree    5 2 t f t t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan
btendscanbtmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcostestimate btoptions )); 
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
  DATA(insert OID = 405 (  hash    1 1 f f t f f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap
hashrescanhashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup
hashcostestimatehashoptions )); 
diff --git a/src/include/catalog/pg_amproc.h b/src/include/catalog/pg_amproc.h
index e5d43f7c1d3ef02322ad2c5b376885a17e5f7430..9d0cb28fac8b772ea384ba72aa6ac70fa4174644 100644
*** a/src/include/catalog/pg_amproc.h
--- b/src/include/catalog/pg_amproc.h
*************** DATA(insert (    1976   21 21 1 350 ));
*** 100,105 ****
--- 100,106 ----
  DATA(insert (    1976   21 23 1 2190 ));
  DATA(insert (    1976   21 20 1 2192 ));
  DATA(insert (    1976   23 23 1 351 ));
+ DATA(insert (    1976   23 23 2 321 ));
  DATA(insert (    1976   23 20 1 2188 ));
  DATA(insert (    1976   23 21 1 2191 ));
  DATA(insert (    1976   20 20 1 842 ));
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 28e53b7b7c367731b5b0102f5afe032b9bbf09a0..3bd52e7add9645875c31f518baaff28824bf9f85 100644
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 350 (  btint2cmp           P
*** 568,573 ****
--- 568,575 ----
  DESCR("less-equal-greater");
  DATA(insert OID = 351 (  btint4cmp           PGNSP PGUID 12 1 0 0 0 f f f t f i 2 0 23 "23 23" _null_ _null_ _null_
_null_btint4cmp _null_ _null_ _null_ )); 
  DESCR("less-equal-greater");
+ DATA(insert OID = 321 (  btint4sortsupport PGNSP PGUID 12 1 0 0 0 f f f t f i 1 0 2278 "2281" _null_ _null_ _null_
_null_btint4sortsupport _null_ _null_ _null_ )); 
+ DESCR("sort support");
  DATA(insert OID = 842 (  btint8cmp           PGNSP PGUID 12 1 0 0 0 f f f t f i 2 0 23 "20 20" _null_ _null_ _null_
_null_btint8cmp _null_ _null_ _null_ )); 
  DESCR("less-equal-greater");
  DATA(insert OID = 354 (  btfloat4cmp       PGNSP PGUID 12 1 0 0 0 f f f t f i 2 0 23 "700 700" _null_ _null_ _null_
_null_btfloat4cmp _null_ _null_ _null_ )); 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 0a89f189d7c6d30ff7f3065cdd8ea360f402264d..f5935e2106f67562caa8f589f77c7cfc2a1d14fe 100644
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 20,25 ****
--- 20,26 ----
  #include "nodes/params.h"
  #include "nodes/plannodes.h"
  #include "utils/reltrigger.h"
+ #include "utils/sortsupport.h"
  #include "utils/tuplestore.h"


*************** typedef struct AppendState
*** 1087,1093 ****
   *
   *        nplans            how many plans are in the array
   *        nkeys            number of sort key columns
!  *        scankeys        sort keys in ScanKey representation
   *        slots            current output tuple of each subplan
   *        heap            heap of active tuples (represented as array indexes)
   *        heap_size        number of active heap entries
--- 1088,1094 ----
   *
   *        nplans            how many plans are in the array
   *        nkeys            number of sort key columns
!  *        sortkeys        sort keys in SortSupport representation
   *        slots            current output tuple of each subplan
   *        heap            heap of active tuples (represented as array indexes)
   *        heap_size        number of active heap entries
*************** typedef struct MergeAppendState
*** 1101,1107 ****
      PlanState **mergeplans;        /* array of PlanStates for my inputs */
      int            ms_nplans;
      int            ms_nkeys;
!     ScanKey        ms_scankeys;    /* array of length ms_nkeys */
      TupleTableSlot **ms_slots;    /* array of length ms_nplans */
      int           *ms_heap;        /* array of length ms_nplans */
      int            ms_heap_size;    /* current active length of ms_heap[] */
--- 1102,1108 ----
      PlanState **mergeplans;        /* array of PlanStates for my inputs */
      int            ms_nplans;
      int            ms_nkeys;
!     SortSupport    ms_sortkeys;    /* array of length ms_nkeys */
      TupleTableSlot **ms_slots;    /* array of length ms_nplans */
      int           *ms_heap;        /* array of length ms_nplans */
      int            ms_heap_size;    /* current active length of ms_heap[] */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 47a14125c48fc384b5d5c8d77abad99cec114e22..9a43e490b1d2d91c8d19a9c68524b54a3831a727 100644
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
*************** extern void pg_lltoa(int64 ll, char *a);
*** 279,285 ****

  /*
   *        Per-opclass comparison functions for new btrees.  These are
!  *        stored in pg_amproc and defined in access/nbtree/nbtcompare.c
   */
  extern Datum btboolcmp(PG_FUNCTION_ARGS);
  extern Datum btint2cmp(PG_FUNCTION_ARGS);
--- 279,285 ----

  /*
   *        Per-opclass comparison functions for new btrees.  These are
!  *        stored in pg_amproc; most are defined in access/nbtree/nbtcompare.c
   */
  extern Datum btboolcmp(PG_FUNCTION_ARGS);
  extern Datum btint2cmp(PG_FUNCTION_ARGS);
*************** extern Datum btcharcmp(PG_FUNCTION_ARGS)
*** 304,309 ****
--- 304,316 ----
  extern Datum btnamecmp(PG_FUNCTION_ARGS);
  extern Datum bttextcmp(PG_FUNCTION_ARGS);

+ /*
+  *        Per-opclass sort support functions for new btrees.  Like the
+  *        functions above, these are stored in pg_amproc; most are defined in
+  *        access/nbtree/nbtcompare.c
+  */
+ extern Datum btint4sortsupport(PG_FUNCTION_ARGS);
+
  /* float.c */
  extern PGDLLIMPORT int extra_float_digits;

diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index d3ad4f14282641fc7ba35f3188d76a8b097dd22e..311aa25862eba1e83af367dd7f4b81860538feb7 100644
*** a/src/include/utils/lsyscache.h
--- b/src/include/utils/lsyscache.h
*************** extern Oid get_opfamily_member(Oid opfam
*** 48,55 ****
                      int16 strategy);
  extern bool get_ordering_op_properties(Oid opno,
                             Oid *opfamily, Oid *opcintype, int16 *strategy);
! extern bool get_compare_function_for_ordering_op(Oid opno,
!                                      Oid *cmpfunc, bool *reverse);
  extern Oid    get_equality_op_for_ordering_op(Oid opno, bool *reverse);
  extern Oid    get_ordering_op_for_equality_op(Oid opno, bool use_lhs_type);
  extern List *get_mergejoin_opfamilies(Oid opno);
--- 48,55 ----
                      int16 strategy);
  extern bool get_ordering_op_properties(Oid opno,
                             Oid *opfamily, Oid *opcintype, int16 *strategy);
! extern bool get_sort_function_for_ordering_op(Oid opno, Oid *sortfunc,
!                                   bool *issupport, bool *reverse);
  extern Oid    get_equality_op_for_ordering_op(Oid opno, bool *reverse);
  extern Oid    get_ordering_op_for_equality_op(Oid opno, bool use_lhs_type);
  extern List *get_mergejoin_opfamilies(Oid opno);
diff --git a/src/include/utils/sortsupport.h b/src/include/utils/sortsupport.h
index ...bd6dde34729aa03ca7b80f66de24744136732a86 .
*** a/src/include/utils/sortsupport.h
--- b/src/include/utils/sortsupport.h
***************
*** 0 ****
--- 1,156 ----
+ /*-------------------------------------------------------------------------
+  *
+  * sortsupport.h
+  *      Framework for accelerated sorting.
+  *
+  * Traditionally, PostgreSQL has implemented sorting by repeatedly invoking
+  * an SQL-callable comparison function "cmp(x, y) returns int" on pairs of
+  * values to be compared, where the comparison function is the BTORDER_PROC
+  * pg_amproc support function of the appropriate btree index opclass.
+  *
+  * This file defines alternative APIs that allow sorting to be performed with
+  * reduced overhead.  To support lower-overhead sorting, a btree opclass may
+  * provide a BTSORTSUPPORT_PROC pg_amproc entry, which must take a single
+  * argument of type internal and return void.  The argument is actually a
+  * pointer to a SortSupportData struct, which is defined below.
+  *
+  * If provided, the BTSORTSUPPORT function will be called during sort setup,
+  * and it must initialize the provided struct with pointers to function(s)
+  * that can be called to perform sorting.  This API is defined to allow
+  * multiple acceleration mechanisms to be supported, but no opclass is
+  * required to provide all of them.  The BTSORTSUPPORT function should
+  * simply not set any function pointers for mechanisms it doesn't support.
+  * (However, all opclasses that provide BTSORTSUPPORT are required to provide
+  * the comparator function.)
+  *
+  * All sort support functions will be passed the address of the
+  * SortSupportData struct when called, so they can use it to store
+  * additional private data as needed.  In particular, for collation-aware
+  * datatypes, the ssup_collation field is set before calling BTSORTSUPPORT
+  * and is available to all support functions.  Additional opclass-dependent
+  * data can be stored using the ssup_extra field.  Any such data
+  * should be allocated in the ssup_cxt memory context.
+  *
+  * Note: since pg_amproc functions are indexed by (lefttype, righttype)
+  * it is possible to associate a BTSORTSUPPORT function with a cross-type
+  * comparison.  This could sensibly be used to provide a fast comparator
+  * function for such cases, but probably not any other acceleration method.
+  *
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * src/include/utils/sortsupport.h
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef SORTSUPPORT_H
+ #define SORTSUPPORT_H
+
+ #include "access/attnum.h"
+
+ typedef struct SortSupportData *SortSupport;
+
+ typedef struct SortSupportData
+ {
+     /*
+      * These fields are initialized before calling the BTSORTSUPPORT function
+      * and should not be changed later.
+      */
+     MemoryContext ssup_cxt;                /* Context containing sort info */
+     Oid            ssup_collation;            /* Collation to use, or InvalidOid */
+
+     /*
+      * Additional sorting parameters; but unlike ssup_collation, these can
+      * be changed after BTSORTSUPPORT is called, so don't use them in
+      * selecting sort support functions.
+      */
+     bool        ssup_reverse;            /* descending-order sort? */
+     bool        ssup_nulls_first;        /* sort nulls first? */
+
+     /*
+      * These fields are workspace for callers, and should not be touched by
+      * opclass-specific functions.
+      */
+     AttrNumber    ssup_attno;                /* column number to sort */
+
+     /*
+      * ssup_extra is zeroed before calling the BTSORTSUPPORT function, and
+      * is not touched subsequently by callers.
+      */
+     void       *ssup_extra;                /* Workspace for opclass functions */
+
+     /*
+      * Function pointers are zeroed before calling the BTSORTSUPPORT function,
+      * and must be set by it for any acceleration methods it wants to supply.
+      * The comparator pointer must be set, others are optional.
+      */
+
+     /*
+      * Comparator function has the same API as the traditional btree
+      * comparison function, ie, return <0, 0, or >0 according as x is less
+      * than, equal to, or greater than y.  Note that x and y are guaranteed
+      * not null, and there is no way to return null either.  Do not return
+      * INT_MIN, as callers are allowed to negate the result before using it.
+      */
+     int            (*comparator) (Datum x, Datum y, SortSupport ssup);
+
+     /*
+      * Additional sort-acceleration functions might be added here later.
+      */
+ } SortSupportData;
+
+
+ /* ApplySortComparator should be inlined if possible */
+ #ifdef USE_INLINE
+
+ /*
+  * Apply a sort comparator function and return a 3-way comparison result.
+  * This takes care of handling reverse-sort and NULLs-ordering properly.
+  */
+ static inline int
+ ApplySortComparator(Datum datum1, bool isNull1,
+                     Datum datum2, bool isNull2,
+                     SortSupport ssup)
+ {
+     int            compare;
+
+     if (isNull1)
+     {
+         if (isNull2)
+             compare = 0;        /* NULL "=" NULL */
+         else if (ssup->ssup_nulls_first)
+             compare = -1;        /* NULL "<" NOT_NULL */
+         else
+             compare = 1;        /* NULL ">" NOT_NULL */
+     }
+     else if (isNull2)
+     {
+         if (ssup->ssup_nulls_first)
+             compare = 1;        /* NOT_NULL ">" NULL */
+         else
+             compare = -1;        /* NOT_NULL "<" NULL */
+     }
+     else
+     {
+         compare = (*ssup->comparator) (datum1, datum2, ssup);
+         if (ssup->ssup_reverse)
+             compare = -compare;
+     }
+
+     return compare;
+ }
+
+ #else
+
+ extern int    ApplySortComparator(Datum datum1, bool isNull1,
+                     Datum datum2, bool isNull2,
+                     SortSupport ssup);
+
+ #endif   /* USE_INLINE */
+
+ /* Other functions in utils/sort/sortsupport.c */
+ extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup);
+ extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup);
+
+ #endif   /* SORTSUPPORT_H */
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 1ebcbfe17242a8e5262fe5d942210d0d77b1dd73..5c48e0e10f593de6ef51b42c75f150eae2be9cbc 100644
*** a/src/include/utils/tuplesort.h
--- b/src/include/utils/tuplesort.h
*************** extern void tuplesort_rescan(Tuplesortst
*** 116,134 ****
  extern void tuplesort_markpos(Tuplesortstate *state);
  extern void tuplesort_restorepos(Tuplesortstate *state);

- /* Setup for ApplySortFunction */
- extern void SelectSortFunction(Oid sortOperator, bool nulls_first,
-                    Oid *sortFunction,
-                    int *sortFlags);
-
- /*
-  * Apply a sort function (by now converted to fmgr lookup form)
-  * and return a 3-way comparison result.  This takes care of handling
-  * reverse-sort and NULLs-ordering properly.
-  */
- extern int32 ApplySortFunction(FmgrInfo *sortFunction, int sortFlags,
-                   Oid collation,
-                   Datum datum1, bool isNull1,
-                   Datum datum2, bool isNull2);
-
  #endif   /* TUPLESORT_H */
--- 116,119 ----
diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out
index b915134fff33d3154a24d56361d0427810b90af1..a0ffd77e0ed889001b11fe174d62737e0d281d5e 100644
*** a/src/test/regress/expected/opr_sanity.out
--- b/src/test/regress/expected/opr_sanity.out
*************** WHERE p1.amprocfamily = p3.oid AND p3.op
*** 1186,1225 ****

  -- Detect missing pg_amproc entries: should have as many support functions
  -- as AM expects for each datatype combination supported by the opfamily.
! -- GiST/GIN are special cases because each has an optional support function.
! SELECT p1.amname, p2.opfname, p3.amproclefttype, p3.amprocrighttype
! FROM pg_am AS p1, pg_opfamily AS p2, pg_amproc AS p3
! WHERE p2.opfmethod = p1.oid AND p3.amprocfamily = p2.oid AND
!     p1.amname <> 'gist' AND p1.amname <> 'gin' AND
!     p1.amsupport != (SELECT count(*) FROM pg_amproc AS p4
!                      WHERE p4.amprocfamily = p2.oid AND
!                            p4.amproclefttype = p3.amproclefttype AND
!                            p4.amprocrighttype = p3.amprocrighttype);
!  amname | opfname | amproclefttype | amprocrighttype
! --------+---------+----------------+-----------------
! (0 rows)
!
! -- Similar check for GiST/GIN, allowing one optional proc
  SELECT p1.amname, p2.opfname, p3.amproclefttype, p3.amprocrighttype
  FROM pg_am AS p1, pg_opfamily AS p2, pg_amproc AS p3
  WHERE p2.opfmethod = p1.oid AND p3.amprocfamily = p2.oid AND
-     (p1.amname = 'gist' OR p1.amname = 'gin') AND
      (SELECT count(*) FROM pg_amproc AS p4
       WHERE p4.amprocfamily = p2.oid AND
             p4.amproclefttype = p3.amproclefttype AND
             p4.amprocrighttype = p3.amprocrighttype)
!       NOT IN (p1.amsupport, p1.amsupport - 1);
   amname | opfname | amproclefttype | amprocrighttype
  --------+---------+----------------+-----------------
  (0 rows)

  -- Also, check if there are any pg_opclass entries that don't seem to have
! -- pg_amproc support.  Again, GiST/GIN have to be checked specially.
  SELECT amname, opcname, count(*)
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname <> 'gist' AND am.amname <> 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING count(*) != amsupport OR amprocfamily IS NULL;
   amname | opcname | count
--- 1186,1215 ----

  -- Detect missing pg_amproc entries: should have as many support functions
  -- as AM expects for each datatype combination supported by the opfamily.
! -- btree/GiST/GIN each allow one optional support function, though.
  SELECT p1.amname, p2.opfname, p3.amproclefttype, p3.amprocrighttype
  FROM pg_am AS p1, pg_opfamily AS p2, pg_amproc AS p3
  WHERE p2.opfmethod = p1.oid AND p3.amprocfamily = p2.oid AND
      (SELECT count(*) FROM pg_amproc AS p4
       WHERE p4.amprocfamily = p2.oid AND
             p4.amproclefttype = p3.amproclefttype AND
             p4.amprocrighttype = p3.amprocrighttype)
!     NOT BETWEEN
!       (CASE WHEN p1.amname IN ('btree', 'gist', 'gin') THEN p1.amsupport - 1
!             ELSE p1.amsupport END)
!       AND p1.amsupport;
   amname | opfname | amproclefttype | amprocrighttype
  --------+---------+----------------+-----------------
  (0 rows)

  -- Also, check if there are any pg_opclass entries that don't seem to have
! -- pg_amproc support.  Again, opclasses with an optional support proc have
! -- to be checked specially.
  SELECT amname, opcname, count(*)
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname <> 'btree' AND am.amname <> 'gist' AND am.amname <> 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING count(*) != amsupport OR amprocfamily IS NULL;
   amname | opcname | count
*************** SELECT amname, opcname, count(*)
*** 1230,1236 ****
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname = 'gist' OR am.amname = 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING (count(*) != amsupport AND count(*) != amsupport - 1)
      OR amprocfamily IS NULL;
--- 1220,1226 ----
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname = 'btree' OR am.amname = 'gist' OR am.amname = 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING (count(*) != amsupport AND count(*) != amsupport - 1)
      OR amprocfamily IS NULL;
*************** WHERE p1.amprocfamily = p3.oid AND p4.am
*** 1261,1279 ****
  (0 rows)

  -- For btree, though, we can do better since we know the support routines
! -- must be of the form cmp(lefttype, righttype) returns int4.
  SELECT p1.amprocfamily, p1.amprocnum,
      p2.oid, p2.proname,
      p3.opfname
  FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3
  WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree')
      AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND
!     (amprocnum != 1
!      OR proretset
!      OR prorettype != 'int4'::regtype
!      OR pronargs != 2
!      OR proargtypes[0] != amproclefttype
!      OR proargtypes[1] != amprocrighttype);
   amprocfamily | amprocnum | oid | proname | opfname
  --------------+-----------+-----+---------+---------
  (0 rows)
--- 1251,1272 ----
  (0 rows)

  -- For btree, though, we can do better since we know the support routines
! -- must be of the form cmp(lefttype, righttype) returns int4
! -- or sortsupport(internal) returns void.
  SELECT p1.amprocfamily, p1.amprocnum,
      p2.oid, p2.proname,
      p3.opfname
  FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3
  WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree')
      AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND
!     (CASE WHEN amprocnum = 1
!           THEN prorettype != 'int4'::regtype OR proretset OR pronargs != 2
!                OR proargtypes[0] != amproclefttype
!                OR proargtypes[1] != amprocrighttype
!           WHEN amprocnum = 2
!           THEN prorettype != 'void'::regtype OR proretset OR pronargs != 1
!                OR proargtypes[0] != 'internal'::regtype
!           ELSE true END);
   amprocfamily | amprocnum | oid | proname | opfname
  --------------+-----------+-----+---------+---------
  (0 rows)
diff --git a/src/test/regress/sql/opr_sanity.sql b/src/test/regress/sql/opr_sanity.sql
index b0d143087e8fd68b30461c5d59b5bc5fdd3da086..6a79ea180c1c6c2d83891ca32a05e2249202ea5c 100644
*** a/src/test/regress/sql/opr_sanity.sql
--- b/src/test/regress/sql/opr_sanity.sql
*************** WHERE p1.amprocfamily = p3.oid AND p3.op
*** 926,962 ****

  -- Detect missing pg_amproc entries: should have as many support functions
  -- as AM expects for each datatype combination supported by the opfamily.
! -- GiST/GIN are special cases because each has an optional support function.
!
! SELECT p1.amname, p2.opfname, p3.amproclefttype, p3.amprocrighttype
! FROM pg_am AS p1, pg_opfamily AS p2, pg_amproc AS p3
! WHERE p2.opfmethod = p1.oid AND p3.amprocfamily = p2.oid AND
!     p1.amname <> 'gist' AND p1.amname <> 'gin' AND
!     p1.amsupport != (SELECT count(*) FROM pg_amproc AS p4
!                      WHERE p4.amprocfamily = p2.oid AND
!                            p4.amproclefttype = p3.amproclefttype AND
!                            p4.amprocrighttype = p3.amprocrighttype);
!
! -- Similar check for GiST/GIN, allowing one optional proc

  SELECT p1.amname, p2.opfname, p3.amproclefttype, p3.amprocrighttype
  FROM pg_am AS p1, pg_opfamily AS p2, pg_amproc AS p3
  WHERE p2.opfmethod = p1.oid AND p3.amprocfamily = p2.oid AND
-     (p1.amname = 'gist' OR p1.amname = 'gin') AND
      (SELECT count(*) FROM pg_amproc AS p4
       WHERE p4.amprocfamily = p2.oid AND
             p4.amproclefttype = p3.amproclefttype AND
             p4.amprocrighttype = p3.amprocrighttype)
!       NOT IN (p1.amsupport, p1.amsupport - 1);

  -- Also, check if there are any pg_opclass entries that don't seem to have
! -- pg_amproc support.  Again, GiST/GIN have to be checked specially.

  SELECT amname, opcname, count(*)
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname <> 'gist' AND am.amname <> 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING count(*) != amsupport OR amprocfamily IS NULL;

--- 926,954 ----

  -- Detect missing pg_amproc entries: should have as many support functions
  -- as AM expects for each datatype combination supported by the opfamily.
! -- btree/GiST/GIN each allow one optional support function, though.

  SELECT p1.amname, p2.opfname, p3.amproclefttype, p3.amprocrighttype
  FROM pg_am AS p1, pg_opfamily AS p2, pg_amproc AS p3
  WHERE p2.opfmethod = p1.oid AND p3.amprocfamily = p2.oid AND
      (SELECT count(*) FROM pg_amproc AS p4
       WHERE p4.amprocfamily = p2.oid AND
             p4.amproclefttype = p3.amproclefttype AND
             p4.amprocrighttype = p3.amprocrighttype)
!     NOT BETWEEN
!       (CASE WHEN p1.amname IN ('btree', 'gist', 'gin') THEN p1.amsupport - 1
!             ELSE p1.amsupport END)
!       AND p1.amsupport;

  -- Also, check if there are any pg_opclass entries that don't seem to have
! -- pg_amproc support.  Again, opclasses with an optional support proc have
! -- to be checked specially.

  SELECT amname, opcname, count(*)
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname <> 'btree' AND am.amname <> 'gist' AND am.amname <> 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING count(*) != amsupport OR amprocfamily IS NULL;

*************** SELECT amname, opcname, count(*)
*** 964,970 ****
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname = 'gist' OR am.amname = 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING (count(*) != amsupport AND count(*) != amsupport - 1)
      OR amprocfamily IS NULL;
--- 956,962 ----
  FROM pg_am am JOIN pg_opclass op ON opcmethod = am.oid
       LEFT JOIN pg_amproc p ON amprocfamily = opcfamily AND
           amproclefttype = amprocrighttype AND amproclefttype = opcintype
! WHERE am.amname = 'btree' OR am.amname = 'gist' OR am.amname = 'gin'
  GROUP BY amname, amsupport, opcname, amprocfamily
  HAVING (count(*) != amsupport AND count(*) != amsupport - 1)
      OR amprocfamily IS NULL;
*************** WHERE p1.amprocfamily = p3.oid AND p4.am
*** 990,996 ****
      (p2.proretset OR p5.proretset OR p2.pronargs != p5.pronargs);

  -- For btree, though, we can do better since we know the support routines
! -- must be of the form cmp(lefttype, righttype) returns int4.

  SELECT p1.amprocfamily, p1.amprocnum,
      p2.oid, p2.proname,
--- 982,989 ----
      (p2.proretset OR p5.proretset OR p2.pronargs != p5.pronargs);

  -- For btree, though, we can do better since we know the support routines
! -- must be of the form cmp(lefttype, righttype) returns int4
! -- or sortsupport(internal) returns void.

  SELECT p1.amprocfamily, p1.amprocnum,
      p2.oid, p2.proname,
*************** SELECT p1.amprocfamily, p1.amprocnum,
*** 998,1009 ****
  FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3
  WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree')
      AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND
!     (amprocnum != 1
!      OR proretset
!      OR prorettype != 'int4'::regtype
!      OR pronargs != 2
!      OR proargtypes[0] != amproclefttype
!      OR proargtypes[1] != amprocrighttype);

  -- For hash we can also do a little better: the support routines must be
  -- of the form hash(lefttype) returns int4.  There are several cases where
--- 991,1004 ----
  FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3
  WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree')
      AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND
!     (CASE WHEN amprocnum = 1
!           THEN prorettype != 'int4'::regtype OR proretset OR pronargs != 2
!                OR proargtypes[0] != amproclefttype
!                OR proargtypes[1] != amprocrighttype
!           WHEN amprocnum = 2
!           THEN prorettype != 'void'::regtype OR proretset OR pronargs != 1
!                OR proargtypes[0] != 'internal'::regtype
!           ELSE true END);

  -- For hash we can also do a little better: the support routines must be
  -- of the form hash(lefttype) returns int4.  There are several cases where

pgsql-hackers by date:

Previous
From: Peter Geoghegan
Date:
Subject: Re: pg_stat_statements with query tree based normalization
Next
From: Greg Smith
Date:
Subject: Timing overhead and Linux clock sources