Thread: Speed of user-defined aggregates using array_append as transfn

Speed of user-defined aggregates using array_append as transfn

From
Tom Lane
Date:
I looked into this complaint:
https://www.postgresql.org/message-id/1477487162344-5927751.post@n3.nabble.com
which boils down to array_append being a lot slower in 9.5 & up than it
was before.  Now, using array_append as an aggregate transfn has never
been a very good idea, because it's inherently O(N^2) in the number of
input rows.  But the constant factor is something like 15x worse as of
9.5.  For example you can try this simple case:

create aggregate adims(anyelement)
( sfunc = array_append,
  stype = anyarray,
  finalfunc = array_dims
);

In 9.4 I get times like this:

regression=# select adims(x) from generate_series(1,1000) x;
Time: 3.176 ms
regression=# select adims(x) from generate_series(1,10000) x;
Time: 62.792 ms
regression=# select adims(x) from generate_series(1,100000) x;
Time: 7318.999 ms

but 9.5 does this:

regression=# select adims(x) from generate_series(1,1000) x;
Time: 17.532 ms
regression=# select adims(x) from generate_series(1,10000) x;
Time: 1146.686 ms
regression=# select adims(x) from generate_series(1,100000) x;
Time: 113892.993 ms

For comparison, the nominally equivalent array_agg() is much faster,
giving roughly linear performance in either 9.4 or 9.5:

regression=# select array_dims(array_agg(x)) from generate_series(1,1000) x;
Time: 2.197 ms
regression=# select array_dims(array_agg(x)) from generate_series(1,10000) x;
Time: 6.353 ms
regression=# select array_dims(array_agg(x)) from generate_series(1,100000) x;
Time: 53.198 ms

The reason for the slowdown is that in 9.5, array_append prefers to work
on "expanded arrays", and we're converting back and forth between expanded
and flat array datums at each row of the aggregation.  Ick.

We knew that the expanded-array patch would be giving up performance in
some cases to win it in others, but it'd be nice if it weren't giving up
this much in a case that users actually care about.

If we could tell people not to use array_append as a transition function,
then maybe we would not have to solve this, but array_agg_transfn isn't
a very plausible solution for user-defined aggregates because of its
non-type-safe use of an "internal"-type transvalue.  That means you need
superuser privileges to create the aggregate, and you can't code your
final-function in a PL language.

I thought about various ways that we might fix this, but they all have
disadvantages of one sort or another:

1. We could add a code path in array_append that does it the old way
if it's being called as an aggregate function.  This would get us back
to 9.4-ish performance for this case ... not that that's very good.

2. We could teach nodeAgg.c to deal with expanded-object datums
explicitly, in more or less the same way that plpgsql deals with expanded
values in plpgsql variables.  I made a draft patch for this (attached),
and find that it puts this user-defined aggregate on par with array_agg
speed-wise.  But I'm not too happy with it because it adds some cycles to
advance_transition_function() whether or not we're actually dealing with
an expanded object.  We already know that adding even one instruction
there can make for measurable slowdown.  (The draft patch leaves some
things on the table there, in particular we could arrange not to call
MakeExpandedObjectReadOnly if the transvalue isn't of varlena type;
but there's still going to be some added overhead.)  Another problem
with this approach is that in hashed aggregation, an expanded object
per group might eat more storage than you'd like.  We had to teach
array_agg_transfn not to use a separate memory context per aggregation
value, and that issue would come up here too.  Lastly, the draft patch
hard-wires the optimization to be available only to array_append and
array_prepend.  That's the same as what we did in plpgsql, but it wasn't
very nice there and it's not nice here either.

3. We could try to fix it mostly from array_append's side, by teaching it
to return the expanded array in the aggregation context when it's being
called as an aggregate function, and making some
hopefully-not-performance-killing tweaks to nodeAgg to play along with
that.  This would amount to additional complication in the API contract
for C-coded aggregate functions, but I think it wouldn't affect functions
that weren't attempting to invoke the optimization, so it should be OK.
A larger objection is that it still doesn't do anything for the memory
consumption angle.

I have some other ideas about things we could do going forward, but they
don't seem likely to lead to back-patchable fixes.  The memory consumption
problem could be dealt with if we allowed expanded objects to sometimes
not have their own context, but that would involve API changes for
expanded objects.  And I'm wondering whether, if we did fix all this,
we could get rid of ArrayBuildState entirely in favor of making the
functions that use that build an expanded array directly.  And maybe
we could get rid of array_agg_transfn in favor of using array_append,
eliminating the need for a non-type-safe solution there.

Comments, better ideas?

            regards, tom lane

PS: note this draft patch doesn't touch nodeWindowAgg.c, but that would
presumably need changes parallel to whatever we do in nodeAgg.c.

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index b06e1c1..8c74781 100644
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
***************
*** 164,169 ****
--- 164,170 ----
  #include "parser/parse_coerce.h"
  #include "utils/acl.h"
  #include "utils/builtins.h"
+ #include "utils/fmgroids.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/syscache.h"
*************** typedef struct AggStatePerTransData
*** 288,293 ****
--- 289,300 ----
                  transtypeByVal;

      /*
+      * This flag says whether we trust the transfn with a read-write pointer
+      * to a transition value that is an expanded object.
+      */
+     bool        transfnAllowRW;
+
+     /*
       * Stuff for evaluation of inputs.  We used to just use ExecEvalExpr, but
       * with the addition of ORDER BY we now need at least a slot for passing
       * data to the sort object, which requires a tupledesc, so we might as
*************** advance_transition_function(AggState *ag
*** 752,760 ****
               */
              oldContext = MemoryContextSwitchTo(
                                                 aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             pergroupstate->transValue = datumCopy(fcinfo->arg[1],
!                                                   pertrans->transtypeByVal,
!                                                   pertrans->transtypeLen);
              pergroupstate->transValueIsNull = false;
              pergroupstate->noTransValue = false;
              MemoryContextSwitchTo(oldContext);
--- 759,767 ----
               */
              oldContext = MemoryContextSwitchTo(
                                                 aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             pergroupstate->transValue = datumTransfer(fcinfo->arg[1],
!                                                     pertrans->transtypeByVal,
!                                                       pertrans->transtypeLen);
              pergroupstate->transValueIsNull = false;
              pergroupstate->noTransValue = false;
              MemoryContextSwitchTo(oldContext);
*************** advance_transition_function(AggState *ag
*** 781,787 ****
      /*
       * OK to call the transition function
       */
!     fcinfo->arg[0] = pergroupstate->transValue;
      fcinfo->argnull[0] = pergroupstate->transValueIsNull;
      fcinfo->isnull = false;        /* just in case transfn doesn't set it */

--- 788,799 ----
      /*
       * OK to call the transition function
       */
!     if (pertrans->transfnAllowRW)
!         fcinfo->arg[0] = pergroupstate->transValue;
!     else
!         fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                     pertrans->transtypeLen);
      fcinfo->argnull[0] = pergroupstate->transValueIsNull;
      fcinfo->isnull = false;        /* just in case transfn doesn't set it */

*************** advance_transition_function(AggState *ag
*** 800,811 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 812,830 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumTransfer(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** advance_combine_function(AggState *aggst
*** 1020,1028 ****
              {
                  oldContext = MemoryContextSwitchTo(

aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!                 pergroupstate->transValue = datumCopy(fcinfo->arg[1],
                                                      pertrans->transtypeByVal,
!                                                       pertrans->transtypeLen);
                  MemoryContextSwitchTo(oldContext);
              }
              else
--- 1039,1047 ----
              {
                  oldContext = MemoryContextSwitchTo(

aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!                 pergroupstate->transValue = datumTransfer(fcinfo->arg[1],
                                                      pertrans->transtypeByVal,
!                                                      pertrans->transtypeLen);
                  MemoryContextSwitchTo(oldContext);
              }
              else
*************** advance_combine_function(AggState *aggst
*** 1043,1049 ****
      /*
       * OK to call the combine function
       */
!     fcinfo->arg[0] = pergroupstate->transValue;
      fcinfo->argnull[0] = pergroupstate->transValueIsNull;
      fcinfo->isnull = false;        /* just in case combine func doesn't set it */

--- 1062,1073 ----
      /*
       * OK to call the combine function
       */
!     if (pertrans->transfnAllowRW)
!         fcinfo->arg[0] = pergroupstate->transValue;
!     else
!         fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                     pertrans->transtypeLen);
      fcinfo->argnull[0] = pergroupstate->transValueIsNull;
      fcinfo->isnull = false;        /* just in case combine func doesn't set it */

*************** advance_combine_function(AggState *aggst
*** 1062,1073 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 1086,1104 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumTransfer(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** finalize_aggregate(AggState *aggstate,
*** 1333,1339 ****
                                   (void *) aggstate, NULL);

          /* Fill in the transition state value */
!         fcinfo.arg[0] = pergroupstate->transValue;
          fcinfo.argnull[0] = pergroupstate->transValueIsNull;
          anynull |= pergroupstate->transValueIsNull;

--- 1364,1372 ----
                                   (void *) aggstate, NULL);

          /* Fill in the transition state value */
!         fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                    pertrans->transtypeLen);
          fcinfo.argnull[0] = pergroupstate->transValueIsNull;
          anynull |= pergroupstate->transValueIsNull;

*************** finalize_aggregate(AggState *aggstate,
*** 1360,1366 ****
      }
      else
      {
!         *resultVal = pergroupstate->transValue;
          *resultIsNull = pergroupstate->transValueIsNull;
      }

--- 1393,1401 ----
      }
      else
      {
!         *resultVal = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                 pertrans->transtypeLen);
          *resultIsNull = pergroupstate->transValueIsNull;
      }

*************** finalize_partialaggregate(AggState *aggs
*** 1410,1416 ****
          {
              FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;

!             fcinfo->arg[0] = pergroupstate->transValue;
              fcinfo->argnull[0] = pergroupstate->transValueIsNull;

              *resultVal = FunctionCallInvoke(fcinfo);
--- 1445,1453 ----
          {
              FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;

!             fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                      pertrans->transtypeLen);
              fcinfo->argnull[0] = pergroupstate->transValueIsNull;

              *resultVal = FunctionCallInvoke(fcinfo);
*************** finalize_partialaggregate(AggState *aggs
*** 1419,1425 ****
      }
      else
      {
!         *resultVal = pergroupstate->transValue;
          *resultIsNull = pergroupstate->transValueIsNull;
      }

--- 1456,1464 ----
      }
      else
      {
!         *resultVal = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                 pertrans->transtypeLen);
          *resultIsNull = pergroupstate->transValueIsNull;
      }

*************** build_pertrans_for_aggref(AggStatePerTra
*** 3029,3034 ****
--- 3068,3077 ----
                      &pertrans->transtypeLen,
                      &pertrans->transtypeByVal);

+     /* detect whether we'd like to pass read/write expanded pointers */
+     pertrans->transfnAllowRW = (aggtransfn == F_ARRAY_APPEND ||
+                                 aggtransfn == F_ARRAY_PREPEND);
+
      if (OidIsValid(aggserialfn))
      {
          build_aggregate_serialfn_expr(aggserialfn,

Re: Speed of user-defined aggregates using array_append as transfn

From
Andrew Dunstan
Date:

On 10/28/2016 02:04 PM, Tom Lane wrote:
> Comments, better ideas?
>
>             
>

My initial admittedly ugly thought was why not have a second append 
function that doesn't use expanded arrays?

cheers

andrew




Re: Speed of user-defined aggregates using array_append as transfn

From
Tom Lane
Date:
Andrew Dunstan <andrew@dunslane.net> writes:
> My initial admittedly ugly thought was why not have a second append 
> function that doesn't use expanded arrays?

That won't get us out of the O(N^2) behavior.  Also I don't see what's
better about it than my suggestion of making array_append itself do
that when called as an aggregate function.
        regards, tom lane



Re: Speed of user-defined aggregates using array_append as transfn

From
Andrew Dunstan
Date:

On 10/28/2016 03:14 PM, Tom Lane wrote:
> Andrew Dunstan <andrew@dunslane.net> writes:
>> My initial admittedly ugly thought was why not have a second append
>> function that doesn't use expanded arrays?
> That won't get us out of the O(N^2) behavior.  Also I don't see what's
> better about it than my suggestion of making array_append itself do
> that when called as an aggregate function.
>
>             



Probably nothing, I was just thinking out loud. That suggestion seems 
like the most obviously back-patchable solution.

cheers

andrew



Re: Speed of user-defined aggregates using array_append as transfn

From
Tom Lane
Date:
I wrote:
> 3. We could try to fix it mostly from array_append's side, by teaching it
> to return the expanded array in the aggregation context when it's being
> called as an aggregate function, and making some
> hopefully-not-performance-killing tweaks to nodeAgg to play along with
> that.  This would amount to additional complication in the API contract
> for C-coded aggregate functions, but I think it wouldn't affect functions
> that weren't attempting to invoke the optimization, so it should be OK.
> A larger objection is that it still doesn't do anything for the memory
> consumption angle.

Hm, that actually seems to work, at least from a speed standpoint; see
the attached not-terribly-well-documented patch.  The additions to nodeAgg
are only in places where we're going to be doing datum copying or freeing
anyway.

I'm still worried about chewing up a kilobyte-at-least for each transition
value, but maybe that's something we could leave to fix later.  Another
idea is that we could teach the planner to know about that in its hash
table size estimates.

            regards, tom lane

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index b06e1c1..e0288ba 100644
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
*************** advance_transition_function(AggState *ag
*** 791,798 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 791,800 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if transfn returned a
!      * pointer to a R/W expanded object that is already a child of the
!      * aggcontext, assume we can adopt that value without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_transition_function(AggState *ag
*** 800,811 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 802,826 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** advance_combine_function(AggState *aggst
*** 1053,1060 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 1068,1078 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.  Also, if the
!      * combine function returned a pointer to a R/W expanded object that is
!      * already a child of the aggcontext, assume we can adopt that value
!      * without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_combine_function(AggState *aggst
*** 1062,1073 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 1080,1104 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c
index 1ef1500..8d6fa41 100644
*** a/src/backend/utils/adt/array_userfuncs.c
--- b/src/backend/utils/adt/array_userfuncs.c
*************** static Datum array_position_common(Funct
*** 32,37 ****
--- 32,41 ----
   * Caution: if the input is a read/write pointer, this returns the input
   * argument; so callers must be sure that their changes are "safe", that is
   * they cannot leave the array in a corrupt state.
+  *
+  * If we're being called as an aggregate function, make sure any newly-made
+  * expanded array is allocated in the aggregate state context, so as to save
+  * copying operations.
   */
  static ExpandedArrayHeader *
  fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 39,44 ****
--- 43,49 ----
      ExpandedArrayHeader *eah;
      Oid            element_type;
      ArrayMetaState *my_extra;
+     MemoryContext resultcxt;

      /* If first time through, create datatype cache struct */
      my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 51,60 ****
--- 56,72 ----
          fcinfo->flinfo->fn_extra = my_extra;
      }

+     /* Figure out which context we want the result in */
+     if (!AggCheckCallContext(fcinfo, &resultcxt))
+         resultcxt = CurrentMemoryContext;
+
      /* Now collect the array value */
      if (!PG_ARGISNULL(argno))
      {
+         MemoryContext oldcxt = MemoryContextSwitchTo(resultcxt);
+
          eah = PG_GETARG_EXPANDED_ARRAYX(argno, my_extra);
+         MemoryContextSwitchTo(oldcxt);
      }
      else
      {
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 72,78 ****
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              CurrentMemoryContext,
                                               my_extra);
      }

--- 84,90 ----
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              resultcxt,
                                               my_extra);
      }


Re: Speed of user-defined aggregates using array_append as transfn

From
Tom Lane
Date:
I wrote:
> I'm still worried about chewing up a kilobyte-at-least for each transition
> value, but maybe that's something we could leave to fix later.  Another
> idea is that we could teach the planner to know about that in its hash
> table size estimates.

Here's a complete proposed patch for this.  I decided to hard-wire the
planner adjustment to apply to array_append specifically.  One could
consider doing it whenever the aggregate transtype is an array type,
but that seems likely to be casting too wide a net for now.  We can
revisit it in the future if necessary.  In any case, the estimate
can be overridden per-aggregate using the aggtransspace parameter.

Barring objections, I intend to back-patch this as far as 9.5.

            regards, tom lane

diff --git a/doc/src/sgml/xaggr.sgml b/doc/src/sgml/xaggr.sgml
index fa98572..d432c9a 100644
*** a/doc/src/sgml/xaggr.sgml
--- b/doc/src/sgml/xaggr.sgml
*************** if (AggCheckCallContext(fcinfo, NULL))
*** 626,632 ****
     function, the first input
     must be a temporary state value and can therefore safely be modified
     in-place rather than allocating a new copy.
!    See <literal>int8inc()</> for an example.
     (This is the <emphasis>only</>
     case where it is safe for a function to modify a pass-by-reference input.
     In particular, final functions for normal aggregates must not
--- 626,632 ----
     function, the first input
     must be a temporary state value and can therefore safely be modified
     in-place rather than allocating a new copy.
!    See <function>int8inc()</> for an example.
     (This is the <emphasis>only</>
     case where it is safe for a function to modify a pass-by-reference input.
     In particular, final functions for normal aggregates must not
*************** if (AggCheckCallContext(fcinfo, NULL))
*** 635,640 ****
--- 635,654 ----
    </para>

    <para>
+    The second argument of <function>AggCheckCallContext</> can be used to
+    retrieve the memory context in which aggregate state values are being kept.
+    This is useful for transition functions that wish to use <quote>expanded</>
+    objects (see <xref linkend="xtypes-toast">) as their transition values.
+    On first call, the transition function should return an expanded object
+    whose memory context is a child of the aggregate state context, and then
+    keep returning the same expanded object on subsequent calls.  See
+    <function>array_append()</> for an example.  (<function>array_append()</>
+    is not the transition function of any built-in aggregate, but it is written
+    to behave efficiently when used as transition function of a custom
+    aggregate.)
+   </para>
+
+   <para>
     Another support routine available to aggregate functions written in C
     is <function>AggGetAggref</>, which returns the <literal>Aggref</>
     parse node that defines the aggregate call.  This is mainly useful
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index b06e1c1..28c15ba 100644
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
***************
*** 91,100 ****
   *      transition value or a previous function result, and in either case its
   *      value need not be preserved.  See int8inc() for an example.  Notice that
   *      advance_transition_function() is coded to avoid a data copy step when
!  *      the previous transition value pointer is returned.  Also, some
!  *      transition functions want to store working state in addition to the
!  *      nominal transition value; they can use the memory context returned by
!  *      AggCheckCallContext() to do that.
   *
   *      Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
   *      AggState is available as context in earlier releases (back to 8.1),
--- 91,103 ----
   *      transition value or a previous function result, and in either case its
   *      value need not be preserved.  See int8inc() for an example.  Notice that
   *      advance_transition_function() is coded to avoid a data copy step when
!  *      the previous transition value pointer is returned.  It is also possible
!  *      to avoid repeated data copying when the transition value is an expanded
!  *      object: to do that, the transition function must take care to return
!  *      an expanded object that is in a child context of the memory context
!  *      returned by AggCheckCallContext().  Also, some transition functions want
!  *      to store working state in addition to the nominal transition value; they
!  *      can use the memory context returned by AggCheckCallContext() to do that.
   *
   *      Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
   *      AggState is available as context in earlier releases (back to 8.1),
*************** advance_transition_function(AggState *ag
*** 791,798 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 794,803 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if transfn returned a
!      * pointer to a R/W expanded object that is already a child of the
!      * aggcontext, assume we can adopt that value without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_transition_function(AggState *ag
*** 800,811 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 805,829 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** advance_combine_function(AggState *aggst
*** 1053,1060 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 1071,1081 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.  Also, if the
!      * combine function returned a pointer to a R/W expanded object that is
!      * already a child of the aggcontext, assume we can adopt that value
!      * without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_combine_function(AggState *aggst
*** 1062,1073 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 1083,1107 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** finalize_aggregate(AggState *aggstate,
*** 1333,1339 ****
                                   (void *) aggstate, NULL);

          /* Fill in the transition state value */
!         fcinfo.arg[0] = pergroupstate->transValue;
          fcinfo.argnull[0] = pergroupstate->transValueIsNull;
          anynull |= pergroupstate->transValueIsNull;

--- 1367,1375 ----
                                   (void *) aggstate, NULL);

          /* Fill in the transition state value */
!         fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                    pertrans->transtypeLen);
          fcinfo.argnull[0] = pergroupstate->transValueIsNull;
          anynull |= pergroupstate->transValueIsNull;

*************** finalize_aggregate(AggState *aggstate,
*** 1360,1365 ****
--- 1396,1402 ----
      }
      else
      {
+         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
          *resultVal = pergroupstate->transValue;
          *resultIsNull = pergroupstate->transValueIsNull;
      }
*************** finalize_partialaggregate(AggState *aggs
*** 1410,1416 ****
          {
              FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;

!             fcinfo->arg[0] = pergroupstate->transValue;
              fcinfo->argnull[0] = pergroupstate->transValueIsNull;

              *resultVal = FunctionCallInvoke(fcinfo);
--- 1447,1455 ----
          {
              FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;

!             fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
!                                              pergroupstate->transValueIsNull,
!                                                      pertrans->transtypeLen);
              fcinfo->argnull[0] = pergroupstate->transValueIsNull;

              *resultVal = FunctionCallInvoke(fcinfo);
*************** finalize_partialaggregate(AggState *aggs
*** 1419,1424 ****
--- 1458,1464 ----
      }
      else
      {
+         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
          *resultVal = pergroupstate->transValue;
          *resultIsNull = pergroupstate->transValueIsNull;
      }
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 96c8527..06f8aa0 100644
*** a/src/backend/executor/nodeWindowAgg.c
--- b/src/backend/executor/nodeWindowAgg.c
*************** advance_windowaggregate(WindowAggState *
*** 362,369 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.
       */
      if (!peraggstate->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
--- 362,371 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if transfn returned a
!      * pointer to a R/W expanded object that is already a child of the
!      * aggcontext, assume we can adopt that value without copying it.
       */
      if (!peraggstate->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
*************** advance_windowaggregate(WindowAggState *
*** 371,382 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             newVal = datumCopy(newVal,
!                                peraggstate->transtypeByVal,
!                                peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!             pfree(DatumGetPointer(peraggstate->transValue));
      }

      MemoryContextSwitchTo(oldContext);
--- 373,397 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                peraggstate->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    peraggstate->transtypeByVal,
!                                    peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
!                                                false,
!                                                peraggstate->transtypeLen))
!                 DeleteExpandedObject(peraggstate->transValue);
!             else
!                 pfree(DatumGetPointer(peraggstate->transValue));
!         }
      }

      MemoryContextSwitchTo(oldContext);
*************** advance_windowaggregate_base(WindowAggSt
*** 513,520 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if invtransfn returned a pointer to
!      * its first input, we don't need to do anything.
       *
       * Note: the checks for null values here will never fire, but it seems
       * best to have this stanza look just like advance_windowaggregate.
--- 528,537 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if invtransfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if invtransfn
!      * returned a pointer to a R/W expanded object that is already a child of
!      * the aggcontext, assume we can adopt that value without copying it.
       *
       * Note: the checks for null values here will never fire, but it seems
       * best to have this stanza look just like advance_windowaggregate.
*************** advance_windowaggregate_base(WindowAggSt
*** 525,536 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             newVal = datumCopy(newVal,
!                                peraggstate->transtypeByVal,
!                                peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!             pfree(DatumGetPointer(peraggstate->transValue));
      }

      MemoryContextSwitchTo(oldContext);
--- 542,566 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(peraggstate->aggcontext);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                peraggstate->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    peraggstate->transtypeByVal,
!                                    peraggstate->transtypeLen);
          }
          if (!peraggstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
!                                                false,
!                                                peraggstate->transtypeLen))
!                 DeleteExpandedObject(peraggstate->transValue);
!             else
!                 pfree(DatumGetPointer(peraggstate->transValue));
!         }
      }

      MemoryContextSwitchTo(oldContext);
*************** finalize_windowaggregate(WindowAggState
*** 568,574 ****
                                   numFinalArgs,
                                   perfuncstate->winCollation,
                                   (void *) winstate, NULL);
!         fcinfo.arg[0] = peraggstate->transValue;
          fcinfo.argnull[0] = peraggstate->transValueIsNull;
          anynull = peraggstate->transValueIsNull;

--- 598,606 ----
                                   numFinalArgs,
                                   perfuncstate->winCollation,
                                   (void *) winstate, NULL);
!         fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
!                                                peraggstate->transValueIsNull,
!                                                    peraggstate->transtypeLen);
          fcinfo.argnull[0] = peraggstate->transValueIsNull;
          anynull = peraggstate->transValueIsNull;

*************** finalize_windowaggregate(WindowAggState
*** 596,601 ****
--- 628,634 ----
      }
      else
      {
+         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
          *result = peraggstate->transValue;
          *isnull = peraggstate->transValueIsNull;
      }
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 663ffe0..1688310 100644
*** a/src/backend/optimizer/util/clauses.c
--- b/src/backend/optimizer/util/clauses.c
*************** get_agg_clause_costs_walker(Node *node,
*** 647,652 ****
--- 647,662 ----
              /* Use average width if aggregate definition gave one */
              if (aggtransspace > 0)
                  avgwidth = aggtransspace;
+             else if (aggtransfn == F_ARRAY_APPEND)
+             {
+                 /*
+                  * If the transition function is array_append(), it'll use an
+                  * expanded array as transvalue, which will occupy at least
+                  * ALLOCSET_SMALL_INITSIZE and possibly more.  Use that as the
+                  * estimate for lack of a better idea.
+                  */
+                 avgwidth = ALLOCSET_SMALL_INITSIZE;
+             }
              else
              {
                  /*
diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c
index 1ef1500..8d6fa41 100644
*** a/src/backend/utils/adt/array_userfuncs.c
--- b/src/backend/utils/adt/array_userfuncs.c
*************** static Datum array_position_common(Funct
*** 32,37 ****
--- 32,41 ----
   * Caution: if the input is a read/write pointer, this returns the input
   * argument; so callers must be sure that their changes are "safe", that is
   * they cannot leave the array in a corrupt state.
+  *
+  * If we're being called as an aggregate function, make sure any newly-made
+  * expanded array is allocated in the aggregate state context, so as to save
+  * copying operations.
   */
  static ExpandedArrayHeader *
  fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 39,44 ****
--- 43,49 ----
      ExpandedArrayHeader *eah;
      Oid            element_type;
      ArrayMetaState *my_extra;
+     MemoryContext resultcxt;

      /* If first time through, create datatype cache struct */
      my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 51,60 ****
--- 56,72 ----
          fcinfo->flinfo->fn_extra = my_extra;
      }

+     /* Figure out which context we want the result in */
+     if (!AggCheckCallContext(fcinfo, &resultcxt))
+         resultcxt = CurrentMemoryContext;
+
      /* Now collect the array value */
      if (!PG_ARGISNULL(argno))
      {
+         MemoryContext oldcxt = MemoryContextSwitchTo(resultcxt);
+
          eah = PG_GETARG_EXPANDED_ARRAYX(argno, my_extra);
+         MemoryContextSwitchTo(oldcxt);
      }
      else
      {
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 72,78 ****
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              CurrentMemoryContext,
                                               my_extra);
      }

--- 84,90 ----
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              resultcxt,
                                               my_extra);
      }