Re: Speed of user-defined aggregates using array_append as transfn - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Speed of user-defined aggregates using array_append as transfn
Date
Msg-id 365.1477769025@sss.pgh.pa.us
Whole thread Raw
In response to Re: Speed of user-defined aggregates using array_append as transfn  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-hackers
I wrote:
> I'm still worried about chewing up a kilobyte-at-least for each transition
> value, but maybe that's something we could leave to fix later.  Another
> idea is that we could teach the planner to know about that in its hash
> table size estimates.

Here's a complete proposed patch for this.  I decided to hard-wire the
planner adjustment to apply to array_append specifically.  One could
consider doing it whenever the aggregate transtype is an array type,
but that seems likely to be casting too wide a net for now.  We can
revisit it in the future if necessary.  In any case, the estimate
can be overridden per-aggregate using the aggtransspace parameter.

Barring objections, I intend to back-patch this as far as 9.5.

            regards, tom lane

diff --git a/doc/src/sgml/xaggr.sgml b/doc/src/sgml/xaggr.sgml
index fa98572..d432c9a 100644
*** a/doc/src/sgml/xaggr.sgml
--- b/doc/src/sgml/xaggr.sgml
*************** if (AggCheckCallContext(fcinfo, NULL))
*** 626,632 ****
     function, the first input
     must be a temporary state value and can therefore safely be modified
     in-place rather than allocating a new copy.
!    See <literal>int8inc()</> for an example.
     (This is the <emphasis>only</>
     case where it is safe for a function to modify a pass-by-reference input.
     In particular, final functions for normal aggregates must not
--- 626,632 ----
     function, the first input
     must be a temporary state value and can therefore safely be modified
     in-place rather than allocating a new copy.
!    See <function>int8inc()</> for an example.
     (This is the <emphasis>only</>
     case where it is safe for a function to modify a pass-by-reference input.
     In particular, final functions for normal aggregates must not
*************** if (AggCheckCallContext(fcinfo, NULL))
*** 635,640 ****
--- 635,654 ----
    </para>

    <para>
+    The second argument of <function>AggCheckCallContext</> can be used to
+    retrieve the memory context in which aggregate state values are being kept.
+    This is useful for transition functions that wish to use <quote>expanded</>
+    objects (see <xref linkend="xtypes-toast">) as their transition values.
+    On first call, the transition function should return an expanded object
+    whose memory context is a child of the aggregate state context, and then
+    keep returning the same expanded object on subsequent calls.  See
+    <function>array_append()</> for an example.  (<function>array_append()</>
+    is not the transition function of any built-in aggregate, but it is written
+    to behave efficiently when used as transition function of a custom
+    aggregate.)
+   </para>
+
+   <para>
     Another support routine available to aggregate functions written in C
     is <function>AggGetAggref</>, which returns the <literal>Aggref</>
     parse node that defines the aggregate call.  This is mainly useful
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index b06e1c1..28c15ba 100644
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
***************
*** 91,100 ****
   *      transition value or a previous function result, and in either case its
   *      value need not be preserved.  See int8inc() for an example.  Notice that
   *      advance_transition_function() is coded to avoid a data copy step when
!  *      the previous transition value pointer is returned.  Also, some
!  *      transition functions want to store working state in addition to the
!  *      nominal transition value; they can use the memory context returned by
!  *      AggCheckCallContext() to do that.
   *
   *      Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
   *      AggState is available as context in earlier releases (back to 8.1),
--- 91,103 ----
   *      transition value or a previous function result, and in either case its
   *      value need not be preserved.  See int8inc() for an example.  Notice that
   *      advance_transition_function() is coded to avoid a data copy step when
!  *      the previous transition value pointer is returned.  It is also possible
!  *      to avoid repeated data copying when the transition value is an expanded
!  *      object: to do that, the transition function must take care to return
!  *      an expanded object that is in a child context of the memory context
!  *      returned by AggCheckCallContext().  Also, some transition functions want
!  *      to store working state in addition to the nominal transition value; they
!  *      can use the memory context returned by AggCheckCallContext() to do that.
   *
   *      Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
   *      AggState is available as context in earlier releases (back to 8.1),
*************** advance_transition_function(AggState *ag
*** 791,798 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 794,803 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if transfn returned a
!      * pointer to a R/W expanded object that is already a child of the
!      * aggcontext, assume we can adopt that value without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_transition_function(AggState *ag
*** 800,811 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 805,829 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** advance_combine_function(AggState *aggst
*** 1053,1060 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 1071,1081 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.  Also, if the
!      * combine function returned a pointer to a R/W expanded object that is
!      * already a child of the aggcontext, assume we can adopt that value
!      * without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_combine_function(AggState *aggst
*** 1062,1073 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 1083,1107 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** finalize_aggregate(AggState *aggstate,
*** 1333,1339 ****
                                   (void *) aggstate, NULL);

          /* Fill in the transition state value */
!         fcinfo.arg[0] = pergroupstate->transValue;
          fcinfo.argnull[0] = pergroupstate->transValueIsNull;
          anynull |= pergroupstate->transValueIsNull;

--- 1367,1375 ----
                                   (void *) aggstate, NULL);

          /* Fill in the transition state value */
!         fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                    pertrans->transtypeLen);
          fcinfo.argnull[0] = pergroupstate->transValueIsNull;
          anynull |= pergroupstate->transValueIsNull;

*************** finalize_aggregate(AggState *aggstate,
*** 1360,1365 ****
--- 1396,1402 ----
      }
      else
      {
+         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
          *resultVal = pergroupstate->transValue;
          *resultIsNull = pergroupstate->transValueIsNull;
      }
*************** finalize_partialaggregate(AggState *aggs
*** 1410,1416 ****
          {
              FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;

!             fcinfo->arg[0] = pergroupstate->transValue;
              fcinfo->argnull[0] = pergroupstate->transValueIsNull;

              *resultVal = FunctionCallInvoke(fcinfo);
--- 1447,1455 ----
          {
              FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;

!             fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                      pertrans->transtypeLen);
              fcinfo->argnull[0] = pergroupstate->transValueIsNull;

              *resultVal = FunctionCallInvoke(fcinfo);
*************** finalize_partialaggregate(AggState *aggs
*** 1419,1424 ****
--- 1458,1464 ----
      }
      else
      {
+         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
          *resultVal = pergroupstate->transValue;
          *resultIsNull = pergroupstate->transValueIsNull;
      }
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 96c8527..06f8aa0 100644
*** a/src/backend/executor/nodeWindowAgg.c
--- b/src/backend/executor/nodeWindowAgg.c
*************** advance_windowaggregate(WindowAggState *
*** 362,369 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.
       */
      if (!peraggstate->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
--- 362,371 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if transfn returned a
!      * pointer to a R/W expanded object that is already a child of the
!      * aggcontext, assume we can adopt that value without copying it.
       */
      if (!peraggstate->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
*************** advance_windowaggregate(WindowAggState *
*** 371,382 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             newVal = datumCopy(newVal,
!                                peraggstate->transtypeByVal,
!                                peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!             pfree(DatumGetPointer(peraggstate->transValue));
      }

      MemoryContextSwitchTo(oldContext);
--- 373,397 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                peraggstate->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    peraggstate->transtypeByVal,
!                                    peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
!                                                false,
!                                                peraggstate->transtypeLen))
!                 DeleteExpandedObject(peraggstate->transValue);
!             else
!                 pfree(DatumGetPointer(peraggstate->transValue));
!         }
      }

      MemoryContextSwitchTo(oldContext);
*************** advance_windowaggregate_base(WindowAggSt
*** 513,520 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if invtransfn returned a pointer to
!      * its first input, we don't need to do anything.
       *
       * Note: the checks for null values here will never fire, but it seems
       * best to have this stanza look just like advance_windowaggregate.
--- 528,537 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if invtransfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if invtransfn
!      * returned a pointer to a R/W expanded object that is already a child of
!      * the aggcontext, assume we can adopt that value without copying it.
       *
       * Note: the checks for null values here will never fire, but it seems
       * best to have this stanza look just like advance_windowaggregate.
*************** advance_windowaggregate_base(WindowAggSt
*** 525,536 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             newVal = datumCopy(newVal,
!                                peraggstate->transtypeByVal,
!                                peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!             pfree(DatumGetPointer(peraggstate->transValue));
      }

      MemoryContextSwitchTo(oldContext);
--- 542,566 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                peraggstate->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    peraggstate->transtypeByVal,
!                                    peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
!                                                false,
!                                                peraggstate->transtypeLen))
!                 DeleteExpandedObject(peraggstate->transValue);
!             else
!                 pfree(DatumGetPointer(peraggstate->transValue));
!         }
      }

      MemoryContextSwitchTo(oldContext);
*************** finalize_windowaggregate(WindowAggState
*** 568,574 ****
                                   numFinalArgs,
                                   perfuncstate->winCollation,
                                   (void *) winstate, NULL);
!         fcinfo.arg[0] = peraggstate->transValue;
          fcinfo.argnull[0] = peraggstate->transValueIsNull;
          anynull = peraggstate->transValueIsNull;

--- 598,606 ----
                                   numFinalArgs,
                                   perfuncstate->winCollation,
                                   (void *) winstate, NULL);
!         fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
!                                                peraggstate->transValueIsNull,
!                                                    peraggstate->transtypeLen);
          fcinfo.argnull[0] = peraggstate->transValueIsNull;
          anynull = peraggstate->transValueIsNull;

*************** finalize_windowaggregate(WindowAggState
*** 596,601 ****
--- 628,634 ----
      }
      else
      {
+         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
          *result = peraggstate->transValue;
          *isnull = peraggstate->transValueIsNull;
      }
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 663ffe0..1688310 100644
*** a/src/backend/optimizer/util/clauses.c
--- b/src/backend/optimizer/util/clauses.c
*************** get_agg_clause_costs_walker(Node *node,
*** 647,652 ****
--- 647,662 ----
              /* Use average width if aggregate definition gave one */
              if (aggtransspace > 0)
                  avgwidth = aggtransspace;
+             else if (aggtransfn == F_ARRAY_APPEND)
+             {
+                 /*
+                  * If the transition function is array_append(), it'll use an
+                  * expanded array as transvalue, which will occupy at least
+                  * ALLOCSET_SMALL_INITSIZE and possibly more.  Use that as the
+                  * estimate for lack of a better idea.
+                  */
+                 avgwidth = ALLOCSET_SMALL_INITSIZE;
+             }
              else
              {
                  /*
diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c
index 1ef1500..8d6fa41 100644
*** a/src/backend/utils/adt/array_userfuncs.c
--- b/src/backend/utils/adt/array_userfuncs.c
*************** static Datum array_position_common(Funct
*** 32,37 ****
--- 32,41 ----
   * Caution: if the input is a read/write pointer, this returns the input
   * argument; so callers must be sure that their changes are "safe", that is
   * they cannot leave the array in a corrupt state.
+  *
+  * If we're being called as an aggregate function, make sure any newly-made
+  * expanded array is allocated in the aggregate state context, so as to save
+  * copying operations.
   */
  static ExpandedArrayHeader *
  fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 39,44 ****
--- 43,49 ----
      ExpandedArrayHeader *eah;
      Oid            element_type;
      ArrayMetaState *my_extra;
+     MemoryContext resultcxt;

      /* If first time through, create datatype cache struct */
      my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 51,60 ****
--- 56,72 ----
          fcinfo->flinfo->fn_extra = my_extra;
      }

+     /* Figure out which context we want the result in */
+     if (!AggCheckCallContext(fcinfo, &resultcxt))
+         resultcxt = CurrentMemoryContext;
+
      /* Now collect the array value */
      if (!PG_ARGISNULL(argno))
      {
+         MemoryContext oldcxt = MemoryContextSwitchTo(resultcxt);
+
          eah = PG_GETARG_EXPANDED_ARRAYX(argno, my_extra);
+         MemoryContextSwitchTo(oldcxt);
      }
      else
      {
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 72,78 ****
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              CurrentMemoryContext,
                                               my_extra);
      }

--- 84,90 ----
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              resultcxt,
                                               my_extra);
      }


pgsql-hackers by date:

Previous
From: Tomas Vondra
Date:
Subject: Re: multivariate statistics (v19)
Next
From: Gilles Darold
Date:
Subject: Re: Patch to implement pg_current_logfile() function