Re: Speed of user-defined aggregates using array_append as transfn - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Speed of user-defined aggregates using array_append as transfn
Date
Msg-id 18705.1477686897@sss.pgh.pa.us
Whole thread Raw
In response to Speed of user-defined aggregates using array_append as transfn  (Tom Lane <tgl@sss.pgh.pa.us>)
Responses Re: Speed of user-defined aggregates using array_append as transfn  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-hackers
I wrote:
> 3. We could try to fix it mostly from array_append's side, by teaching it
> to return the expanded array in the aggregation context when it's being
> called as an aggregate function, and making some
> hopefully-not-performance-killing tweaks to nodeAgg to play along with
> that.  This would amount to additional complication in the API contract
> for C-coded aggregate functions, but I think it wouldn't affect functions
> that weren't attempting to invoke the optimization, so it should be OK.
> A larger objection is that it still doesn't do anything for the memory
> consumption angle.

Hm, that actually seems to work, at least from a speed standpoint; see
the attached not-terribly-well-documented patch.  The additions to nodeAgg
are only in places where we're going to be doing datum copying or freeing
anyway.

I'm still worried about chewing up a kilobyte-at-least for each transition
value, but maybe that's something we could leave to fix later.  Another
idea is that we could teach the planner to know about that in its hash
table size estimates.

            regards, tom lane

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index b06e1c1..e0288ba 100644
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
*************** advance_transition_function(AggState *ag
*** 791,798 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 791,800 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if transfn returned a
!      * pointer to a R/W expanded object that is already a child of the
!      * aggcontext, assume we can adopt that value without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_transition_function(AggState *ag
*** 800,811 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 802,826 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** advance_combine_function(AggState *aggst
*** 1053,1060 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 1068,1078 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.  Also, if the
!      * combine function returned a pointer to a R/W expanded object that is
!      * already a child of the aggcontext, assume we can adopt that value
!      * without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_combine_function(AggState *aggst
*** 1062,1073 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 1080,1104 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c
index 1ef1500..8d6fa41 100644
*** a/src/backend/utils/adt/array_userfuncs.c
--- b/src/backend/utils/adt/array_userfuncs.c
*************** static Datum array_position_common(Funct
*** 32,37 ****
--- 32,41 ----
   * Caution: if the input is a read/write pointer, this returns the input
   * argument; so callers must be sure that their changes are "safe", that is
   * they cannot leave the array in a corrupt state.
+  *
+  * If we're being called as an aggregate function, make sure any newly-made
+  * expanded array is allocated in the aggregate state context, so as to save
+  * copying operations.
   */
  static ExpandedArrayHeader *
  fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 39,44 ****
--- 43,49 ----
      ExpandedArrayHeader *eah;
      Oid            element_type;
      ArrayMetaState *my_extra;
+     MemoryContext resultcxt;

      /* If first time through, create datatype cache struct */
      my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 51,60 ****
--- 56,72 ----
          fcinfo->flinfo->fn_extra = my_extra;
      }

+     /* Figure out which context we want the result in */
+     if (!AggCheckCallContext(fcinfo, &resultcxt))
+         resultcxt = CurrentMemoryContext;
+
      /* Now collect the array value */
      if (!PG_ARGISNULL(argno))
      {
+         MemoryContext oldcxt = MemoryContextSwitchTo(resultcxt);
+
          eah = PG_GETARG_EXPANDED_ARRAYX(argno, my_extra);
+         MemoryContextSwitchTo(oldcxt);
      }
      else
      {
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 72,78 ****
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              CurrentMemoryContext,
                                               my_extra);
      }

--- 84,90 ----
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              resultcxt,
                                               my_extra);
      }


pgsql-hackers by date:

Previous
From: Jim Nasby
Date:
Subject: Re: emergency outage requiring database restart
Next
From: Merlin Moncure
Date:
Subject: Re: emergency outage requiring database restart