diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml
index eaa410b..f0e4407 100644
--- a/doc/src/sgml/ref/create_aggregate.sgml
+++ b/doc/src/sgml/ref/create_aggregate.sgml
@@ -27,6 +27,7 @@ CREATE AGGREGATE name ( [ state_data_size ]
[ , FINALFUNC = ffunc ]
[ , FINALFUNC_EXTRA ]
+ [ , CFUNC = cfunc ]
[ , INITCOND = initial_condition ]
[ , MSFUNC = msfunc ]
[ , MINVFUNC = minvfunc ]
@@ -45,6 +46,7 @@ CREATE AGGREGATE name ( [ [ state_data_size ]
[ , FINALFUNC = ffunc ]
[ , FINALFUNC_EXTRA ]
+ [ , CFUNC = cfunc ]
[ , INITCOND = initial_condition ]
[ , HYPOTHETICAL ]
)
@@ -58,6 +60,7 @@ CREATE AGGREGATE name (
[ , SSPACE = state_data_size ]
[ , FINALFUNC = ffunc ]
[ , FINALFUNC_EXTRA ]
+ [ , CFUNC = cfunc ]
[ , INITCOND = initial_condition ]
[ , MSFUNC = msfunc ]
[ , MINVFUNC = minvfunc ]
@@ -105,12 +108,15 @@ CREATE AGGREGATE name (
functions:
a state transition function
sfunc,
- and an optional final calculation function
- ffunc.
+ an optional final calculation function
+ ffunc,
+ and an optional combine function
+ cfunc.
These are used as follows:
sfunc( internal-state, next-data-values ) ---> next-internal-state
ffunc( internal-state ) ---> aggregate-value
+cfunc( internal-state, internal-state ) ---> next-internal-state
@@ -128,6 +134,13 @@ CREATE AGGREGATE name (
+ An aggregate function may also supply a combining function, which allows
+ the aggregation process to be broken down into multiple steps. This
+ facilitates query optimization techniques such as parallel query,
+ pre-join aggregation and aggregation while sorting.
+
+
+
An aggregate function can provide an initial condition,
that is, an initial value for the internal state value.
This is specified and stored in the database as a value of type
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c
index 121c27f..848a868 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -57,6 +57,7 @@ AggregateCreate(const char *aggName,
Oid variadicArgType,
List *aggtransfnName,
List *aggfinalfnName,
+ List *aggcombinefnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
@@ -77,6 +78,7 @@ AggregateCreate(const char *aggName,
Form_pg_proc proc;
Oid transfn;
Oid finalfn = InvalidOid; /* can be omitted */
+ Oid combinefn = InvalidOid; /* can be omitted */
Oid mtransfn = InvalidOid; /* can be omitted */
Oid minvtransfn = InvalidOid; /* can be omitted */
Oid mfinalfn = InvalidOid; /* can be omitted */
@@ -396,6 +398,20 @@ AggregateCreate(const char *aggName,
}
Assert(OidIsValid(finaltype));
+ /* handle the combinefn, if supplied */
+ if (aggcombinefnName)
+ {
+ /*
+ * Combine function must have 2 argument, each of which is the
+ * trans type
+ */
+ fnArgs[0] = aggTransType;
+ fnArgs[1] = aggTransType;
+
+ combinefn = lookup_agg_function(aggcombinefnName, 2, fnArgs,
+ variadicArgType, &finaltype);
+ }
+
/*
* If finaltype (i.e. aggregate return type) is polymorphic, inputs must
* be polymorphic also, else parser will fail to deduce result type.
@@ -567,6 +583,7 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs);
values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn);
values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
+ values[Anum_pg_aggregate_aggcombinefn - 1] = ObjectIdGetDatum(combinefn);
values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index 894c89d..035882e 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -61,6 +61,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
char aggKind = AGGKIND_NORMAL;
List *transfuncName = NIL;
List *finalfuncName = NIL;
+ List *combinefuncName = NIL;
List *mtransfuncName = NIL;
List *minvtransfuncName = NIL;
List *mfinalfuncName = NIL;
@@ -124,6 +125,8 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "finalfunc") == 0)
finalfuncName = defGetQualifiedName(defel);
+ else if (pg_strcasecmp(defel->defname, "cfunc") == 0)
+ combinefuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "msfunc") == 0)
mtransfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "minvfunc") == 0)
@@ -383,6 +386,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
variadicArgType,
transfuncName, /* step function name */
finalfuncName, /* final function name */
+ combinefuncName, /* combine function name */
mtransfuncName, /* fwd trans function name */
minvtransfuncName, /* inv trans function name */
mfinalfuncName, /* final function name */
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 12dae77..4a92bfc 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -908,25 +908,38 @@ ExplainNode(PlanState *planstate, List *ancestors,
pname = sname = "Group";
break;
case T_Agg:
- sname = "Aggregate";
- switch (((Agg *) plan)->aggstrategy)
{
- case AGG_PLAIN:
- pname = "Aggregate";
- strategy = "Plain";
- break;
- case AGG_SORTED:
- pname = "GroupAggregate";
- strategy = "Sorted";
- break;
- case AGG_HASHED:
- pname = "HashAggregate";
- strategy = "Hashed";
- break;
- default:
- pname = "Aggregate ???";
- strategy = "???";
- break;
+ char *modifier;
+ Agg *agg = (Agg *) plan;
+
+ sname = "Aggregate";
+
+ if (agg->finalizeAggs == false)
+ modifier = "Partial ";
+ else if (agg->combineStates == true)
+ modifier = "Finalize ";
+ else
+ modifier = "";
+
+ switch (agg->aggstrategy)
+ {
+ case AGG_PLAIN:
+ pname = psprintf("%sAggregate", modifier);
+ strategy = "Plain";
+ break;
+ case AGG_SORTED:
+ pname = psprintf("%sGroupAggregate", modifier);
+ strategy = "Sorted";
+ break;
+ case AGG_HASHED:
+ pname = psprintf("%sHashAggregate", modifier);
+ strategy = "Hashed";
+ break;
+ default:
+ pname = "Aggregate ???";
+ strategy = "???";
+ break;
+ }
}
break;
case T_WindowAgg:
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 2e36855..a74a0fc 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -3,15 +3,24 @@
* nodeAgg.c
* Routines to handle aggregate nodes.
*
- * ExecAgg evaluates each aggregate in the following steps:
+ * ExecAgg normally evaluates each aggregate in the following steps:
*
* transvalue = initcond
* foreach input_tuple do
* transvalue = transfunc(transvalue, input_value(s))
* result = finalfunc(transvalue, direct_argument(s))
*
- * If a finalfunc is not supplied then the result is just the ending
- * value of transvalue.
+ * If a finalfunc is not supplied or finalizeAggs is false, then the result
+ * is just the ending value of transvalue.
+ *
+ * If combineStates is true then we assume that input values are other
+ * transition states. In this case we use the aggregate's combinefunc to
+ * 'add' the passed in trans state to the trans state being operated on.
+ * This allows aggregation to happen in multiple stages. 'combineStates'
+ * will only be true if another nodeAgg is below this one in the plan tree.
+ *
+ * 'finalizeAggs' should be false for all nodeAggs apart from the upper most
+ * one in the plan tree.
*
* If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
* input tuples and eliminate duplicates (if required) before performing
@@ -197,7 +206,7 @@ typedef struct AggStatePerTransData
*/
int numTransInputs;
- /* Oid of the state transition function */
+ /* Oid of the state transition or combine function */
Oid transfn_oid;
/* Oid of state value's datatype */
@@ -209,8 +218,8 @@ typedef struct AggStatePerTransData
List *aggdirectargs; /* states of direct-argument expressions */
/*
- * fmgr lookup data for transition function. Note in particular that the
- * fn_strict flag is kept here.
+ * fmgr lookup data for transition function or combination function. Note
+ * in particular that the fn_strict flag is kept here.
*/
FmgrInfo transfn;
@@ -421,6 +430,10 @@ static void advance_transition_function(AggState *aggstate,
AggStatePerTrans pertrans,
AggStatePerGroup pergroupstate);
static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
+static void advance_combination_function(AggState *aggstate,
+ AggStatePerTrans pertrans,
+ AggStatePerGroup pergroupstate);
+static void combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
static void process_ordered_aggregate_single(AggState *aggstate,
AggStatePerTrans pertrans,
AggStatePerGroup pergroupstate);
@@ -796,6 +809,8 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numTrans = aggstate->numtrans;
+ Assert(!aggstate->combineStates);
+
for (transno = 0; transno < numTrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
@@ -879,6 +894,125 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
}
}
+static void
+combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
+{
+ int transno;
+ int numTrans = aggstate->numtrans;
+
+ /* combine not supported with grouping sets */
+ Assert(aggstate->phase->numsets == 0);
+ Assert(aggstate->combineStates);
+
+ for (transno = 0; transno < numTrans; transno++)
+ {
+ AggStatePerTrans pertrans = &aggstate->pertrans[transno];
+ TupleTableSlot *slot;
+ FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
+ AggStatePerGroup pergroupstate = &pergroup[transno];
+
+ /* Evaluate the current input expressions for this aggregate */
+ slot = ExecProject(pertrans->evalproj, NULL);
+ Assert(slot->tts_nvalid >= 1);
+
+ fcinfo->arg[1] = slot->tts_values[0];
+ fcinfo->argnull[1] = slot->tts_isnull[0];
+
+ advance_combination_function(aggstate, pertrans, pergroupstate);
+ }
+}
+
+/*
+ * Perform combination of states between 2 aggregate states. Effectively this
+ * 'adds' two states together by whichever logic is defined in the aggregate
+ * function's combine function.
+ *
+ * Note that in this case transfn is set to the combination function. This
+ * perhaps should be changed to avoid confusion, but one field is ok for now
+ * as they'll never be needed at the same time.
+ */
+static void
+advance_combination_function(AggState *aggstate,
+ AggStatePerTrans pertrans,
+ AggStatePerGroup pergroupstate)
+{
+ FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
+ MemoryContext oldContext;
+ Datum newVal;
+
+ if (pertrans->transfn.fn_strict)
+ {
+ /* if we're asked to merge to a NULL state, then do nothing */
+ if (fcinfo->argnull[1])
+ return;
+
+ if (pergroupstate->noTransValue)
+ {
+ /*
+ * transValue has not been initialized. This is the first non-NULL
+ * input value. We use it as the initial value for transValue. (We
+ * already checked that the agg's input type is binary-compatible
+ * with its transtype, so straight copy here is OK.)
+ *
+ * We must copy the datum into aggcontext if it is pass-by-ref. We
+ * do not need to pfree the old transValue, since it's NULL.
+ */
+ oldContext = MemoryContextSwitchTo(
+ aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
+ pergroupstate->transValue = datumCopy(fcinfo->arg[1],
+ pertrans->transtypeByVal,
+ pertrans->transtypeLen);
+ pergroupstate->transValueIsNull = false;
+ pergroupstate->noTransValue = false;
+ MemoryContextSwitchTo(oldContext);
+
+ return;
+ }
+ }
+
+ /* We run the combine functions in per-input-tuple memory context */
+ oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
+
+ /* set up aggstate->curpertrans for AggGetAggref() */
+ aggstate->curpertrans = pertrans;
+
+ /*
+ * OK to call the combine function
+ */
+ fcinfo->arg[0] = pergroupstate->transValue;
+ fcinfo->argnull[0] = pergroupstate->transValueIsNull;
+ fcinfo->isnull = false; /* just in case combine func doesn't set it */
+
+ newVal = FunctionCallInvoke(fcinfo);
+
+ aggstate->curpertrans = NULL;
+
+ /*
+ * If pass-by-ref datatype, must copy the new value into aggcontext and
+ * pfree the prior transValue. But if the combine function returned a
+ * pointer to its first input, we don't need to do anything.
+ */
+ if (!pertrans->transtypeByVal &&
+ DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
+ {
+ if (!fcinfo->isnull)
+ {
+ MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
+ newVal = datumCopy(newVal,
+ pertrans->transtypeByVal,
+ pertrans->transtypeLen);
+ }
+ if (!pergroupstate->transValueIsNull)
+ pfree(DatumGetPointer(pergroupstate->transValue));
+ }
+
+ pergroupstate->transValue = newVal;
+ pergroupstate->transValueIsNull = fcinfo->isnull;
+
+ MemoryContextSwitchTo(oldContext);
+
+}
+
/*
* Run the transition function for a DISTINCT or ORDER BY aggregate
@@ -1278,8 +1412,14 @@ finalize_aggregates(AggState *aggstate,
pergroupstate);
}
- finalize_aggregate(aggstate, peragg, pergroupstate,
- &aggvalues[aggno], &aggnulls[aggno]);
+ if (aggstate->finalizeAggs)
+ finalize_aggregate(aggstate, peragg, pergroupstate,
+ &aggvalues[aggno], &aggnulls[aggno]);
+ else
+ {
+ aggvalues[aggno] = pergroupstate->transValue;
+ aggnulls[aggno] = pergroupstate->transValueIsNull;
+ }
}
}
@@ -1294,9 +1434,11 @@ project_aggregates(AggState *aggstate)
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
/*
- * Check the qual (HAVING clause); if the group does not match, ignore it.
+ * iif performing the final aggregate stage we'll check the qual (HAVING
+ * clause); if the group does not match, ignore it.
*/
- if (ExecQual(aggstate->ss.ps.qual, econtext, false))
+ if (aggstate->finalizeAggs == false ||
+ ExecQual(aggstate->ss.ps.qual, econtext, false))
{
/*
* Form and return or store a projection tuple using the aggregate
@@ -1811,7 +1953,10 @@ agg_retrieve_direct(AggState *aggstate)
*/
for (;;)
{
- advance_aggregates(aggstate, pergroup);
+ if (!aggstate->combineStates)
+ advance_aggregates(aggstate, pergroup);
+ else
+ combine_aggregates(aggstate, pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
@@ -1919,7 +2064,10 @@ agg_fill_hash_table(AggState *aggstate)
entry = lookup_hash_entry(aggstate, outerslot);
/* Advance the aggregates */
- advance_aggregates(aggstate, entry->pergroup);
+ if (!aggstate->combineStates)
+ advance_aggregates(aggstate, entry->pergroup);
+ else
+ combine_aggregates(aggstate, entry->pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
@@ -2051,6 +2199,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->pertrans = NULL;
aggstate->curpertrans = NULL;
aggstate->agg_done = false;
+ aggstate->combineStates = node->combineStates;
+ aggstate->finalizeAggs = node->finalizeAggs;
aggstate->input_done = false;
aggstate->pergroup = NULL;
aggstate->grp_firstTuple = NULL;
@@ -2402,7 +2552,21 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
get_func_name(aggref->aggfnoid));
InvokeFunctionExecuteHook(aggref->aggfnoid);
- transfn_oid = aggform->aggtransfn;
+ /*
+ * if this aggregation is performing state combines, then instead of
+ * using the transition function, we'll use the combine function
+ */
+ if (aggstate->combineStates)
+ {
+ transfn_oid = aggform->aggcombinefn;
+
+ /* If not set then the planner messed up */
+ if (!OidIsValid(transfn_oid))
+ elog(ERROR, "combinefn not set during aggregate state combine phase");
+ }
+ else
+ transfn_oid = aggform->aggtransfn;
+
peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
/* Check that aggregate owner has permission to call component fns */
@@ -2583,44 +2747,69 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
pertrans->numTransInputs = numArguments;
/*
- * Set up infrastructure for calling the transfn
+ * When combining states, we have no use at all for the aggregate
+ * function's transfn. Instead we use the combinefn. However we do
+ * reuse the transfnexpr for the combinefn, perhaps this should change
*/
- build_aggregate_transfn_expr(inputTypes,
- numArguments,
- numDirectArgs,
- aggref->aggvariadic,
- aggtranstype,
- aggref->inputcollid,
- aggtransfn,
- InvalidOid, /* invtrans is not needed here */
- &transfnexpr,
- NULL);
- fmgr_info(aggtransfn, &pertrans->transfn);
- fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
-
- InitFunctionCallInfoData(pertrans->transfn_fcinfo,
- &pertrans->transfn,
- pertrans->numTransInputs + 1,
- pertrans->aggCollation,
- (void *) aggstate, NULL);
+ if (aggstate->combineStates)
+ {
+ build_aggregate_combinefn_expr(aggref->aggvariadic,
+ aggtranstype,
+ aggref->inputcollid,
+ aggtransfn,
+ &transfnexpr);
+ fmgr_info(aggtransfn, &pertrans->transfn);
+ fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
+
+ InitFunctionCallInfoData(pertrans->transfn_fcinfo,
+ &pertrans->transfn,
+ 2,
+ pertrans->aggCollation,
+ (void *) aggstate, NULL);
- /*
- * If the transfn is strict and the initval is NULL, make sure input type
- * and transtype are the same (or at least binary-compatible), so that
- * it's OK to use the first aggregated input value as the initial
- * transValue. This should have been checked at agg definition time, but
- * we must check again in case the transfn's strictness property has been
- * changed.
- */
- if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
+ }
+ else
{
- if (numArguments <= numDirectArgs ||
- !IsBinaryCoercible(inputTypes[numDirectArgs],
- aggtranstype))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
- errmsg("aggregate %u needs to have compatible input type and transition type",
- aggref->aggfnoid)));
+ /*
+ * Set up infrastructure for calling the transfn
+ */
+ build_aggregate_transfn_expr(inputTypes,
+ numArguments,
+ numDirectArgs,
+ aggref->aggvariadic,
+ aggtranstype,
+ aggref->inputcollid,
+ aggtransfn,
+ InvalidOid, /* invtrans is not needed here */
+ &transfnexpr,
+ NULL);
+ fmgr_info(aggtransfn, &pertrans->transfn);
+ fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
+
+ InitFunctionCallInfoData(pertrans->transfn_fcinfo,
+ &pertrans->transfn,
+ pertrans->numTransInputs + 1,
+ pertrans->aggCollation,
+ (void *) aggstate, NULL);
+
+ /*
+ * If the transfn is strict and the initval is NULL, make sure input type
+ * and transtype are the same (or at least binary-compatible), so that
+ * it's OK to use the first aggregated input value as the initial
+ * transValue. This should have been checked at agg definition time, but
+ * we must check again in case the transfn's strictness property has been
+ * changed.
+ */
+ if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
+ {
+ if (numArguments <= numDirectArgs ||
+ !IsBinaryCoercible(inputTypes[numDirectArgs],
+ aggtranstype))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate %u needs to have compatible input type and transition type",
+ aggref->aggfnoid)));
+ }
}
/* get info about the state value's datatype */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index ba04b72..b2dc451 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -865,6 +865,8 @@ _copyAgg(const Agg *from)
COPY_SCALAR_FIELD(aggstrategy);
COPY_SCALAR_FIELD(numCols);
+ COPY_SCALAR_FIELD(combineStates);
+ COPY_SCALAR_FIELD(finalizeAggs);
if (from->numCols > 0)
{
COPY_POINTER_FIELD(grpColIdx, from->numCols * sizeof(AttrNumber));
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 63fae82..6de4c88 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -690,11 +690,13 @@ _outAgg(StringInfo str, const Agg *node)
WRITE_ENUM_FIELD(aggstrategy, AggStrategy);
WRITE_INT_FIELD(numCols);
-
appendStringInfoString(str, " :grpColIdx");
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %d", node->grpColIdx[i]);
+ WRITE_BOOL_FIELD(combineStates);
+ WRITE_BOOL_FIELD(finalizeAggs);
+
appendStringInfoString(str, " :grpOperators");
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %u", node->grpOperators[i]);
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 222e2ed..ec6790a 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1989,6 +1989,8 @@ _readAgg(void)
READ_ENUM_FIELD(aggstrategy, AggStrategy);
READ_INT_FIELD(numCols);
READ_ATTRNUMBER_ARRAY(grpColIdx, local_node->numCols);
+ READ_BOOL_FIELD(combineStates);
+ READ_BOOL_FIELD(finalizeAggs);
READ_OID_ARRAY(grpOperators, local_node->numCols);
READ_LONG_FIELD(numGroups);
READ_NODE_FIELD(groupingSets);
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 990486c..b6f37a4 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -125,6 +125,7 @@ bool enable_material = true;
bool enable_mergejoin = true;
bool enable_hashjoin = true;
+bool enable_parallelagg = false;
typedef struct
{
PlannerInfo *root;
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 32f903d..b34d635 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1053,6 +1053,8 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path)
groupOperators,
NIL,
numGroups,
+ false,
+ true,
subplan);
}
else
@@ -4554,9 +4556,8 @@ Agg *
make_agg(PlannerInfo *root, List *tlist, List *qual,
AggStrategy aggstrategy, const AggClauseCosts *aggcosts,
int numGroupCols, AttrNumber *grpColIdx, Oid *grpOperators,
- List *groupingSets,
- long numGroups,
- Plan *lefttree)
+ List *groupingSets, long numGroups, bool combineStates,
+ bool finalizeAggs, Plan *lefttree)
{
Agg *node = makeNode(Agg);
Plan *plan = &node->plan;
@@ -4565,6 +4566,8 @@ make_agg(PlannerInfo *root, List *tlist, List *qual,
node->aggstrategy = aggstrategy;
node->numCols = numGroupCols;
+ node->combineStates = combineStates;
+ node->finalizeAggs = finalizeAggs;
node->grpColIdx = grpColIdx;
node->grpOperators = grpOperators;
node->numGroups = numGroups;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 2c04f5c..1427847 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -49,6 +49,8 @@
#include "utils/rel.h"
#include "utils/selfuncs.h"
+#include "utils/syscache.h"
+#include "catalog/pg_aggregate.h"
/* GUC parameter */
double cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION;
@@ -77,6 +79,17 @@ typedef struct
List *groupClause; /* overrides parse->groupClause */
} standard_qp_extra;
+typedef struct
+{
+ bool agguseparallel;
+} CheckParallelAggAvaiContext;
+
+typedef struct
+{
+ AttrNumber resno;
+ List *targetlist;
+} AddQualInTListExprContext;
+
/* Local functions */
static Node *preprocess_expression(PlannerInfo *root, Node *expr, int kind);
static void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode);
@@ -134,8 +147,39 @@ static Plan *build_grouping_chain(PlannerInfo *root,
AttrNumber *groupColIdx,
AggClauseCosts *agg_costs,
long numGroups,
+ bool combineStates,
+ bool finalizeAggs,
+ Plan *result_plan);
+static bool check_parallel_agg_available(Plan *plan,
+ List *targetlist,
+ List *qual);
+static bool check_parallel_agg_available_walker (Node *node,
+ CheckParallelAggAvaiContext *context);
+static Plan *build_group_parallelagg(PlannerInfo *root,
+ Query *parse,
+ List *tlist,
+ bool need_sort_for_grouping,
+ List *rollup_groupclauses,
+ List *rollup_lists,
+ AttrNumber *groupColIdx,
+ AggClauseCosts *agg_costs,
+ long numGroups,
Plan *result_plan);
+static Plan *get_plan(Plan *plan, NodeTag type);
+static AttrNumber*get_sortIdx_from_subPlan(PlannerInfo *root, List *tlist);
+static List *make_partial_agg_tlist(List *tlist,List *groupClause);
+static List* add_qual_in_tlist(List *targetlist, List *qual);
+static bool add_qual_in_tlist_walker (Node *node,
+ AddQualInTListExprContext *context);
+static Plan *build_hash_parallelagg(PlannerInfo *root,
+ Query *parse,
+ List *tlist,
+ AggClauseCosts *aggcosts,
+ int numGroupCols,
+ AttrNumber *grpColIdx,
+ long numGroups,
+ Plan *lefttree);
/*****************************************************************************
*
* Query optimizer entry point
@@ -1333,6 +1377,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
double dNumGroups = 0;
bool use_hashed_distinct = false;
bool tested_hashed_distinct = false;
+ bool parallelagg_available = false;
/* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */
if (parse->limitCount || parse->limitOffset)
@@ -1893,6 +1938,14 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
result_plan = create_plan(root, best_path);
current_pathkeys = best_path->pathkeys;
+ if(enable_parallelagg
+ && check_parallel_agg_available(result_plan,
+ tlist,
+ (List *)parse->havingQual))
+ {
+ parallelagg_available = true;
+ }
+
/* Detect if we'll need an explicit sort for grouping */
if (parse->groupClause && !use_hashed_grouping &&
!pathkeys_contained_in(root->group_pathkeys, current_pathkeys))
@@ -1912,7 +1965,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
* the top plan node. However, we can skip that if we determined
* that whatever create_plan chose to return will be good enough.
*/
- if (need_tlist_eval)
+ if (need_tlist_eval & !parallelagg_available)
{
/*
* If the top-level plan node is one that cannot do expression
@@ -1984,18 +2037,56 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
*/
if (use_hashed_grouping)
{
- /* Hashed aggregate plan --- no sort needed */
- result_plan = (Plan *) make_agg(root,
- tlist,
- (List *) parse->havingQual,
- AGG_HASHED,
- &agg_costs,
- numGroupCols,
- groupColIdx,
- extract_grouping_ops(parse->groupClause),
- NIL,
- numGroups,
- result_plan);
+ if(parallelagg_available)
+ {
+ Plan *parallelagg_plan;
+
+ parallelagg_plan = build_hash_parallelagg(root,
+ parse,
+ tlist,
+ &agg_costs,
+ numGroupCols,
+ groupColIdx,
+ numGroups,
+ result_plan);
+
+ if(!parallelagg_plan)
+ {
+ /* Hashed aggregate plan --- no sort needed */
+ result_plan = (Plan *) make_agg(root,
+ tlist,
+ (List *) parse->havingQual,
+ AGG_HASHED,
+ &agg_costs,
+ numGroupCols,
+ groupColIdx,
+ extract_grouping_ops(parse->groupClause),
+ NIL,
+ numGroups,
+ false,
+ true,
+ result_plan);
+ }
+ else
+ result_plan = parallelagg_plan;
+ }
+ else
+ {
+ /* Hashed aggregate plan --- no sort needed */
+ result_plan = (Plan *) make_agg(root,
+ tlist,
+ (List *) parse->havingQual,
+ AGG_HASHED,
+ &agg_costs,
+ numGroupCols,
+ groupColIdx,
+ extract_grouping_ops(parse->groupClause),
+ NIL,
+ numGroups,
+ false,
+ true,
+ result_plan);
+ }
/* Hashed aggregation produces randomly-ordered results */
current_pathkeys = NIL;
}
@@ -2012,7 +2103,25 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
else
current_pathkeys = NIL;
- result_plan = build_grouping_chain(root,
+
+ if(parallelagg_available)
+ {
+ Plan *parallelagg_plan;
+
+ parallelagg_plan = build_group_parallelagg(root,
+ parse,
+ tlist,
+ need_sort_for_grouping,
+ rollup_groupclauses,
+ rollup_lists,
+ groupColIdx,
+ &agg_costs,
+ numGroups,
+ result_plan);
+
+ if(parallelagg_plan == NULL)
+ {
+ result_plan = build_grouping_chain(root,
parse,
tlist,
need_sort_for_grouping,
@@ -2021,7 +2130,29 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
groupColIdx,
&agg_costs,
numGroups,
+ false,
+ true,
result_plan);
+ }
+ else
+ result_plan = parallelagg_plan;
+
+ }
+ else
+ {
+ result_plan = build_grouping_chain(root,
+ parse,
+ tlist,
+ need_sort_for_grouping,
+ rollup_groupclauses,
+ rollup_lists,
+ groupColIdx,
+ &agg_costs,
+ numGroups,
+ false,
+ true,
+ result_plan);
+ }
/*
* these are destroyed by build_grouping_chain, so make sure
@@ -2306,6 +2437,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
extract_grouping_ops(parse->distinctClause),
NIL,
numDistinctRows,
+ false,
+ true,
result_plan);
/* Hashed aggregation produces randomly-ordered results */
current_pathkeys = NIL;
@@ -2473,10 +2606,16 @@ build_grouping_chain(PlannerInfo *root,
AttrNumber *groupColIdx,
AggClauseCosts *agg_costs,
long numGroups,
+ bool combineStates,
+ bool finalizeAggs,
Plan *result_plan)
{
- AttrNumber *top_grpColIdx = groupColIdx;
- List *chain = NIL;
+ AttrNumber *top_grpColIdx = groupColIdx;
+ List *chain = NIL;
+ List *qual = NIL;
+
+ if(finalizeAggs)
+ qual = (List *) parse->havingQual;
/*
* Prepare the grpColIdx for the real Agg node first, because we may need
@@ -2531,7 +2670,7 @@ build_grouping_chain(PlannerInfo *root,
agg_plan = (Plan *) make_agg(root,
tlist,
- (List *) parse->havingQual,
+ qual,
AGG_SORTED,
agg_costs,
list_length(linitial(gsets)),
@@ -2539,6 +2678,8 @@ build_grouping_chain(PlannerInfo *root,
extract_grouping_ops(groupClause),
gsets,
numGroups,
+ combineStates,
+ finalizeAggs,
sort_plan);
sort_plan->lefttree = NULL;
@@ -2567,7 +2708,7 @@ build_grouping_chain(PlannerInfo *root,
result_plan = (Plan *) make_agg(root,
tlist,
- (List *) parse->havingQual,
+ qual,
(numGroupCols > 0) ? AGG_SORTED : AGG_PLAIN,
agg_costs,
numGroupCols,
@@ -2575,6 +2716,8 @@ build_grouping_chain(PlannerInfo *root,
extract_grouping_ops(groupClause),
gsets,
numGroups,
+ combineStates,
+ finalizeAggs,
result_plan);
((Agg *) result_plan)->chain = chain;
@@ -4704,3 +4847,476 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid)
return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost);
}
+
+/*
+ * check_parallel_agg_available
+ * The function is used to check whether parallel_agg can be used by checking
+ * the agg functions and formulas.
+ *
+ * If there is any agg function indicating no-using parallel_agg or the agg function
+ * is used in a formula as a entity of targetlist then the parallel_agg can't be used
+ * and the function return false;
+ */
+static bool
+check_parallel_agg_available(Plan *plan, List *targetlist, List *qual)
+{
+ CheckParallelAggAvaiContext context;
+
+#ifndef PAGG_TEST
+ if(IsA(plan, Gather) == false)
+ return false;
+#endif
+
+ context.agguseparallel = true;
+
+ check_parallel_agg_available_walker((Node*)targetlist, &context);
+ if(context.agguseparallel == false)
+ return false;
+
+ check_parallel_agg_available_walker((Node*)qual, &context);
+ if(context.agguseparallel == false)
+ return false;
+
+ return true;
+}
+
+/*
+ * check_parallel_agg_available_walker
+ * Go through the list in the node to check every agg function or formulas.
+ *
+ */
+static bool
+check_parallel_agg_available_walker (Node *node, CheckParallelAggAvaiContext *context)
+{
+ if (node == NULL)
+ return false;
+
+ if (IsA(node, Aggref))
+ {
+ HeapTuple aggTuple;
+ Aggref *aggref = (Aggref *) node;
+ Form_pg_aggregate aggform;
+
+
+ aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid));
+
+ if (!HeapTupleIsValid(aggTuple))
+ elog(ERROR, "cache lookup failed for parallel aggregate %u",
+ aggref->aggfnoid);
+
+ aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+
+ ReleaseSysCache(aggTuple);
+
+ if(!OidIsValid(aggform->aggcombinefn))
+ {
+ context->agguseparallel = false;
+ return true;
+ }
+
+ /*
+ * If Distinct word in parameters in aggfunction, then the parallel-agg
+ * policy will not be used.
+ * */
+ if(aggref->aggdistinct != NULL)
+ {
+ context->agguseparallel = false;
+ return true;
+ }
+ }
+ else
+ return expression_tree_walker(node, check_parallel_agg_available_walker, context);
+
+ return false;
+}
+
+/*
+ * This function build a group parallelagg plan as result_plan as following :
+ * Finalize Group Aggregate
+ * -> Sort
+ * -> Gather
+ * -> Partial Group Aggregate
+ * -> Sort
+ * -> Partial Seq Scan
+ * The input result_plan will be
+ * Gather
+ * -> Partial Seq Scan
+ * So this function will do the following steps:
+ * 1. Move up the Gather node and change its targetlist
+ * 2. Change the Group Aggregate to be Partial Group Aggregate
+ * 3. Add Finalize Group Aggregate and Sort node
+ */
+static Plan *
+build_group_parallelagg(PlannerInfo *root,
+ Query *parse,
+ List *tlist,
+ bool need_sort_for_grouping,
+ List *rollup_groupclauses,
+ List *rollup_lists,
+ AttrNumber *groupColIdx,
+ AggClauseCosts *agg_costs,
+ long numGroups,
+ Plan *result_plan)
+{
+ Plan *parallel_seqscan = NULL;
+ Plan *partial_agg = NULL;
+ Gather *gather_plan = NULL;
+ List *qual = (List*)parse->havingQual;
+ List *partial_agg_tlist = NULL;
+
+ AttrNumber *topsortIdx = NULL;
+
+ gather_plan = (Gather*)get_plan(result_plan, T_Gather);
+ if(gather_plan == NULL)
+ return NULL;
+
+ /* Get the partial seqscan */
+ parallel_seqscan =gather_plan->plan.lefttree;
+// get_plan((Plan*)gather_plan, T_PartialSeqScan);
+
+ /*
+ * The underlying Agg targetlist should be a flat tlist of all Vars and Aggs
+ * needed to evaluate the expressions and final values of aggregates present
+ * in the main target list. The quals also should be included.
+ */
+ partial_agg_tlist = make_partial_agg_tlist(add_qual_in_tlist(tlist, qual),
+ llast(rollup_groupclauses));
+
+ /* Add PartialAgg and Sort node above Partialseqscan*/
+ partial_agg = build_grouping_chain(root,
+ parse,
+ partial_agg_tlist,
+ need_sort_for_grouping,
+ rollup_groupclauses,
+ rollup_lists,
+ groupColIdx,
+ agg_costs,
+ numGroups,
+ false,
+ false,
+ parallel_seqscan);
+
+
+
+ /* Let the Gather node as upper node of partial_agg node */
+ gather_plan->plan.targetlist = partial_agg->targetlist;
+ gather_plan->plan.lefttree = partial_agg;
+
+ /*
+ * Get the sortIndex according the subplan
+ */
+ topsortIdx = get_sortIdx_from_subPlan(root,partial_agg_tlist);
+
+ /* Make the Finalize Group Aggregate node */
+ result_plan = build_grouping_chain(root,
+ parse,
+ tlist,
+ need_sort_for_grouping,
+ rollup_groupclauses,
+ rollup_lists,
+ topsortIdx,
+ agg_costs,
+ numGroups,
+ true,
+ true,
+ (Plan*)gather_plan);
+
+ return result_plan;
+}
+
+/*
+ * This function try to find the type of sub plan in the plan and return it.
+ *
+ * If not found, return NULL. Otherwise return the subplan.
+ */
+static Plan *
+get_plan(Plan *plan, NodeTag type)
+{
+ if(plan == NULL)
+ return NULL;
+ else if(nodeTag(plan) == type)
+ return plan;
+ else
+ return get_plan(plan->lefttree, type);
+}
+
+
+static AttrNumber*
+get_sortIdx_from_subPlan(PlannerInfo *root, List *tlist)
+{
+ Query *parse = root->parse;
+ int numCols;
+
+ AttrNumber *grpColIdx = NULL;
+
+ numCols = list_length(parse->groupClause);
+ if (numCols > 0)
+ {
+ ListCell *tl;
+
+ grpColIdx = (AttrNumber *) palloc0(sizeof(AttrNumber) * numCols);
+
+ foreach(tl, tlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(tl);
+ int colno;
+
+ colno = get_grouping_column_index(parse, tle);
+ if (colno >= 0)
+ {
+ Assert(grpColIdx[colno] == 0); /* no dups expected */
+ grpColIdx[colno] = tle->resno;
+ }
+ }
+ }
+
+ return grpColIdx;
+}
+
+/*
+ * make_partial_agg_tlist
+ * Generate appropriate Agg node target list for input to ParallelAgg nodes.
+ *
+ * The initial target list passed to ParallelAgg node from the parser contains
+ * aggregates and GROUP BY columns. For the underlying agg node, we want to
+ * generate a tlist containing bare aggregate references (Aggref) and GROUP BY
+ * expressions. So we flatten all expressions except GROUP BY items into their
+ * component variables.
+ * For example, given a query like
+ * SELECT a+b, 2 * SUM(c+d) , AVG(d)+SUM(c+d) FROM table GROUP BY a+b;
+ * we want to pass this targetlist to the Agg plan:
+ * a+b, SUM(c+d), AVG(d)
+ * where the a+b target will be used by the Sort/Group steps, and the
+ * other targets will be used for computing the final results.
+ * Note that we don't flatten Aggref's , since those are to be computed
+ * by the underlying Agg node, and they will be referenced like Vars above it.
+ *
+ * 'tlist' is the ParallelAgg's final target list.
+ *
+ * The result is the targetlist to be computed by the Agg node below the
+ * ParallelAgg node.
+ */
+static List *
+make_partial_agg_tlist(List *tlist,List *groupClause)
+{
+ Bitmapset *sgrefs;
+ List *new_tlist;
+ List *flattenable_cols;
+ List *flattenable_vars;
+ ListCell *lc;
+
+ /*
+ * Collect the sortgroupref numbers of GROUP BY clauses
+ * into a bitmapset for convenient reference below.
+ */
+ sgrefs = NULL;
+
+ /* Add in sortgroupref numbers of GROUP BY clauses */
+ foreach(lc, groupClause)
+ {
+ SortGroupClause *grpcl = (SortGroupClause *) lfirst(lc);
+
+ sgrefs = bms_add_member(sgrefs, grpcl->tleSortGroupRef);
+ }
+
+ /*
+ * Construct a tlist containing all the non-flattenable tlist items, and
+ * save aside the others for a moment.
+ */
+ new_tlist = NIL;
+ flattenable_cols = NIL;
+
+ foreach(lc, tlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+ /* Don't want to deconstruct GROUP BY items. */
+ if (tle->ressortgroupref != 0 &&
+ bms_is_member(tle->ressortgroupref, sgrefs))
+ {
+ /* Don't want to deconstruct this value, so add to new_tlist */
+ TargetEntry *newtle;
+
+ newtle = makeTargetEntry(tle->expr,
+ list_length(new_tlist) + 1,
+ NULL,
+ false);
+ /* Preserve its sortgroupref marking, in case it's volatile */
+ newtle->ressortgroupref = tle->ressortgroupref;
+ new_tlist = lappend(new_tlist, newtle);
+ }
+ else
+ {
+ /*
+ * Column is to be flattened, so just remember the expression for
+ * later call to pull_var_clause. There's no need for
+ * pull_var_clause to examine the TargetEntry node itself.
+ */
+ flattenable_cols = lappend(flattenable_cols, tle->expr);
+ }
+ }
+
+ /*
+ * Pull out all the Vars and Aggrefs mentioned in flattenable columns, and
+ * add them to the result tlist if not already present. (Some might be
+ * there already because they're used directly as group clauses.)
+ *
+ * Note: it's essential to use PVC_INCLUDE_AGGREGATES here, so that the
+ * Aggrefs are placed in the Agg node's tlist and not left to be computed
+ * at higher levels.
+ */
+ flattenable_vars = pull_var_clause((Node *) flattenable_cols,
+ PVC_INCLUDE_AGGREGATES,
+ PVC_INCLUDE_PLACEHOLDERS);
+ new_tlist = add_to_flat_tlist(new_tlist, flattenable_vars);
+
+ /* clean up cruft */
+ list_free(flattenable_vars);
+ list_free(flattenable_cols);
+
+ return new_tlist;
+}
+
+/*
+ * add_qual_in_tlist
+ * Add the agg functions in qual into the target list used in agg plan
+ */
+static List*
+add_qual_in_tlist(List *targetlist, List *qual)
+{
+ AddQualInTListExprContext context;
+
+ if(qual == NULL)
+ return targetlist;
+
+ context.targetlist = copyObject(targetlist);
+ context.resno = list_length(context.targetlist) + 1;;
+
+ add_qual_in_tlist_walker((Node*)qual, &context);
+
+ return context.targetlist;
+}
+
+/*
+ * add_qual_in_tlist_walker
+ * Go through the qual list to get the aggref and add it in targetlist
+ */
+static bool
+add_qual_in_tlist_walker (Node *node, AddQualInTListExprContext *context)
+{
+ if (node == NULL)
+ return false;
+
+ if (IsA(node, Aggref))
+ {
+ List *tlist = context->targetlist;
+// Aggref *aggref = (Aggref *)node;
+
+ TargetEntry *te = makeNode(TargetEntry);
+
+// aggref->resno = context->resno;
+
+ te = makeTargetEntry((Expr *) node,
+ context->resno++,
+ NULL,
+ false);
+
+ tlist = lappend(tlist,te);
+ }
+ else
+ return expression_tree_walker(node, add_qual_in_tlist_walker, context);
+
+ return false;
+}
+
+/*
+ * This function build a hasg parallelagg plan as result_plan as following :
+ * Finalize Hash Aggregate
+ * -> Gather
+ * -> Partial Hash Aggregate
+ * -> Partial Seq Scan
+ * The input result_plan will be
+ * Gather
+ * -> Partial Seq Scan
+ * So this function will do the following steps:
+ * 1. Make a PartialHashAgg and set Gather node as above node
+ * 2. Change the targetlist of Gather node
+ * 3. Make a FinalizeHashAgg as top node above the Gather node
+ */
+
+static Plan *
+build_hash_parallelagg(PlannerInfo *root,
+ Query *parse,
+ List *tlist,
+ AggClauseCosts *aggcosts,
+ int numGroupCols,
+ AttrNumber *grpColIdx,
+ long numGroups,
+ Plan *lefttree)
+{
+ Plan *result_plan = NULL;
+ Plan *parallel_seqscan = NULL;
+ Plan *partial_agg_plan = NULL;
+ Plan *gather_plan = NULL;
+ List *partial_agg_tlist = NIL;
+ List *qual = (List*)parse->havingQual;
+
+ AttrNumber *topsortIdx = NULL;
+
+ gather_plan = get_plan(lefttree, T_Gather);
+ if(gather_plan == NULL)
+ return NULL;
+
+ /* Get the partial seqscan */
+ parallel_seqscan = gather_plan->lefttree;
+ if(parallel_seqscan == NULL)
+ return NULL;
+
+ /*
+ * The underlying Agg targetlist should be a flat tlist of all Vars and Aggs
+ * needed to evaluate the expressions and final values of aggregates present
+ * in the main target list. The quals also should be included.
+ */
+ partial_agg_tlist = make_partial_agg_tlist(add_qual_in_tlist(tlist, qual),
+ parse->groupClause);
+
+ /* Make PartialHashAgg plan node */
+ partial_agg_plan = (Plan *) make_agg(root,
+ partial_agg_tlist,
+ NULL,
+ AGG_HASHED,
+ aggcosts,
+ numGroupCols,
+ grpColIdx,
+ extract_grouping_ops(parse->groupClause),
+ NIL,
+ numGroups,
+ false,
+ false,
+ parallel_seqscan);
+
+ gather_plan->lefttree = partial_agg_plan;
+ gather_plan->targetlist = partial_agg_plan->targetlist;
+
+ /*
+ * Get the sortIndex according the subplan
+ */
+ topsortIdx = get_sortIdx_from_subPlan(root,partial_agg_tlist);
+
+ /* Make FinalizeHashAgg plan node */
+ result_plan = (Plan *) make_agg(root,
+ tlist,
+ (List *) parse->havingQual,
+ AGG_HASHED,
+ aggcosts,
+ numGroupCols,
+ topsortIdx,
+ extract_grouping_ops(parse->groupClause),
+ NIL,
+ numGroups,
+ true,
+ true,
+ gather_plan);
+
+ return result_plan;
+}
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 12e9290..78cfae9 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -140,6 +140,14 @@ static bool fix_opfuncids_walker(Node *node, void *context);
static bool extract_query_dependencies_walker(Node *node,
PlannerInfo *context);
+static void set_agg_references(PlannerInfo *root, Plan *plan, int rtoffset);
+static Node *fix_combine_agg_expr(PlannerInfo *root,
+ Node *node,
+ indexed_tlist *subplan_itlist,
+ Index newvarno,
+ int rtoffset);
+static Node * fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context);
+
/*****************************************************************************
*
* SUBPLAN REFERENCES
@@ -668,7 +676,8 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
}
break;
case T_Agg:
- set_upper_references(root, plan, rtoffset);
+// set_upper_references(root, plan, rtoffset);
+ set_agg_references(root, plan, rtoffset);
break;
case T_Group:
set_upper_references(root, plan, rtoffset);
@@ -2432,3 +2441,212 @@ extract_query_dependencies_walker(Node *node, PlannerInfo *context)
return expression_tree_walker(node, extract_query_dependencies_walker,
(void *) context);
}
+
+
+/*
+ * set_upper_references
+ * Update the targetlist and quals of an upper-level plan node
+ * to refer to the tuples returned by its lefttree subplan.
+ * Also perform opcode lookup for these expressions, and
+ * add regclass OIDs to root->glob->relationOids.
+ *
+ * This is used for single-input plan types like Agg, Group, Result.
+ *
+ * In most cases, we have to match up individual Vars in the tlist and
+ * qual expressions with elements of the subplan's tlist (which was
+ * generated by flatten_tlist() from these selfsame expressions, so it
+ * should have all the required variables). There is an important exception,
+ * however: GROUP BY and ORDER BY expressions will have been pushed into the
+ * subplan tlist unflattened. If these values are also needed in the output
+ * then we want to reference the subplan tlist element rather than recomputing
+ * the expression.
+ */
+static void
+set_agg_references(PlannerInfo *root, Plan *plan, int rtoffset)
+{
+ Agg *agg = (Agg*)plan;
+ Plan *subplan = plan->lefttree;
+ indexed_tlist *subplan_itlist;
+ List *output_targetlist;
+ ListCell *l;
+
+ subplan_itlist = build_tlist_index(subplan->targetlist);
+
+ output_targetlist = NIL;
+
+ if(agg->combineStates)
+ {
+ foreach(l, plan->targetlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(l);
+ Node *newexpr;
+
+ /* If it's a non-Var sort/group item, first try to match by sortref */
+ if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var))
+ {
+ newexpr = (Node *)
+ search_indexed_tlist_for_sortgroupref((Node *) tle->expr,
+ tle->ressortgroupref,
+ subplan_itlist,
+ OUTER_VAR);
+ if (!newexpr)
+ newexpr = fix_combine_agg_expr(root,
+ (Node *) tle->expr,
+ subplan_itlist,
+ OUTER_VAR,
+ rtoffset);
+ }
+ else
+ newexpr = fix_combine_agg_expr(root,
+ (Node *) tle->expr,
+ subplan_itlist,
+ OUTER_VAR,
+ rtoffset);
+ tle = flatCopyTargetEntry(tle);
+ tle->expr = (Expr *) newexpr;
+ output_targetlist = lappend(output_targetlist, tle);
+ }
+ }
+ else
+ {
+ foreach(l, plan->targetlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(l);
+ Node *newexpr;
+
+ /* If it's a non-Var sort/group item, first try to match by sortref */
+ if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var))
+ {
+ newexpr = (Node *)
+ search_indexed_tlist_for_sortgroupref((Node *) tle->expr,
+ tle->ressortgroupref,
+ subplan_itlist,
+ OUTER_VAR);
+ if (!newexpr)
+ newexpr = fix_upper_expr(root,
+ (Node *) tle->expr,
+ subplan_itlist,
+ OUTER_VAR,
+ rtoffset);
+ }
+ else
+ newexpr = fix_upper_expr(root,
+ (Node *) tle->expr,
+ subplan_itlist,
+ OUTER_VAR,
+ rtoffset);
+ tle = flatCopyTargetEntry(tle);
+ tle->expr = (Expr *) newexpr;
+ output_targetlist = lappend(output_targetlist, tle);
+ }
+ }
+
+ plan->targetlist = output_targetlist;
+
+ plan->qual = (List *)
+ fix_upper_expr(root,
+ (Node *) plan->qual,
+ subplan_itlist,
+ OUTER_VAR,
+ rtoffset);
+
+ pfree(subplan_itlist);
+}
+
+
+/*
+ * This function is only used by combineAgg to set the Var nodes as args of
+ * Aggref reference output of a Gather plan.
+ */
+static Node *
+fix_combine_agg_expr(PlannerInfo *root,
+ Node *node,
+ indexed_tlist *subplan_itlist,
+ Index newvarno,
+ int rtoffset)
+{
+ fix_upper_expr_context context;
+
+ context.root = root;
+ context.subplan_itlist = subplan_itlist;
+ context.newvarno = newvarno;
+ context.rtoffset = rtoffset;
+ return fix_combine_agg_expr_mutator(node, &context);
+}
+
+static Node *
+fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context)
+{
+ Var *newvar;
+
+ if (node == NULL)
+ return NULL;
+ if (IsA(node, Var))
+ {
+ Var *var = (Var *) node;
+
+ newvar = search_indexed_tlist_for_var(var,
+ context->subplan_itlist,
+ context->newvarno,
+ context->rtoffset);
+ if (!newvar)
+ elog(ERROR, "variable not found in subplan target list");
+ return (Node *) newvar;
+ }
+ if (IsA(node, Aggref))
+ {
+ TargetEntry *tle;
+ Aggref *aggref = (Aggref*)node;
+ List *args = NIL;
+
+ tle = tlist_member(node, context->subplan_itlist->tlist);
+ if (tle)
+ {
+ /* Found a matching subplan output expression */
+ Var *newvar;
+ TargetEntry *newtle;
+
+ newvar = makeVarFromTargetEntry(context->newvarno, tle);
+ newvar->varnoold = 0; /* wasn't ever a plain Var */
+ newvar->varoattno = 0;
+
+ /* update the args in the aggref */
+
+ /* makeTargetEntry ,always set resno to one for finialize agg */
+ newtle = makeTargetEntry((Expr*)newvar,1,NULL,false);
+ args = lappend(args,newtle);
+
+ /*
+ * Updated the args, let the newvar refer to the right position of
+ * the agg function in the subplan
+ */
+ aggref->args = args;
+
+ return (Node *) aggref;
+ }
+ }
+ if (IsA(node, PlaceHolderVar))
+ {
+ PlaceHolderVar *phv = (PlaceHolderVar *) node;
+
+ /* See if the PlaceHolderVar has bubbled up from a lower plan node */
+ if (context->subplan_itlist->has_ph_vars)
+ {
+ newvar = search_indexed_tlist_for_non_var((Node *) phv,
+ context->subplan_itlist,
+ context->newvarno);
+ if (newvar)
+ return (Node *) newvar;
+ }
+ /* If not supplied by input plan, evaluate the contained expr */
+ return fix_upper_expr_mutator((Node *) phv->phexpr, context);
+ }
+ if (IsA(node, Param))
+ return fix_param_node(context->root, (Param *) node);
+
+ fix_expr_common(context->root, node);
+ return expression_tree_mutator(node,
+ fix_combine_agg_expr_mutator,
+ (void *) context);
+}
+
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 2e55131..45de122 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -775,6 +775,8 @@ make_union_unique(SetOperationStmt *op, Plan *plan,
extract_grouping_ops(groupList),
NIL,
numGroups,
+ false,
+ true,
plan);
/* Hashed aggregation produces randomly-ordered results */
*sortClauses = NIL;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 2c45bd6..96a7386 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1929,6 +1929,43 @@ build_aggregate_transfn_expr(Oid *agg_input_types,
/*
* Like build_aggregate_transfn_expr, but creates an expression tree for the
+ * combine function of an aggregate, rather than the transition function.
+ */
+void
+build_aggregate_combinefn_expr(bool agg_variadic,
+ Oid agg_state_type,
+ Oid agg_input_collation,
+ Oid combinefn_oid,
+ Expr **combinefnexpr)
+{
+ Param *argp;
+ List *args;
+ FuncExpr *fexpr;
+
+ /* Build arg list to use in the combinefn FuncExpr node. */
+ argp = makeNode(Param);
+ argp->paramkind = PARAM_EXEC;
+ argp->paramid = -1;
+ argp->paramtype = agg_state_type;
+ argp->paramtypmod = -1;
+ argp->paramcollid = agg_input_collation;
+ argp->location = -1;
+
+ /* trans state type is arg 1 and 2 */
+ args = list_make2(argp, argp);
+
+ fexpr = makeFuncExpr(combinefn_oid,
+ agg_state_type,
+ args,
+ InvalidOid,
+ agg_input_collation,
+ COERCE_EXPLICIT_CALL);
+ fexpr->funcvariadic = agg_variadic;
+ *combinefnexpr = (Expr *) fexpr;
+}
+
+/*
+ * Like build_aggregate_transfn_expr, but creates an expression tree for the
* final function of an aggregate, rather than the transition function.
*/
void
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a185749..63cde6b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -828,6 +828,15 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
{
+ {"enable_parallelagg", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of parallel agg plans."),
+ NULL
+ },
+ &enable_parallelagg,
+ true,
+ NULL, NULL, NULL
+ },
+ {
{"enable_material", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of materialization."),
NULL
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 36863df..cb39107 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -12279,6 +12279,7 @@ dumpAgg(Archive *fout, DumpOptions *dopt, AggInfo *agginfo)
PGresult *res;
int i_aggtransfn;
int i_aggfinalfn;
+ int i_aggcombinefn;
int i_aggmtransfn;
int i_aggminvtransfn;
int i_aggmfinalfn;
@@ -12295,6 +12296,7 @@ dumpAgg(Archive *fout, DumpOptions *dopt, AggInfo *agginfo)
int i_convertok;
const char *aggtransfn;
const char *aggfinalfn;
+ const char *aggcombinefn;
const char *aggmtransfn;
const char *aggminvtransfn;
const char *aggmfinalfn;
@@ -12325,7 +12327,26 @@ dumpAgg(Archive *fout, DumpOptions *dopt, AggInfo *agginfo)
selectSourceSchema(fout, agginfo->aggfn.dobj.namespace->dobj.name);
/* Get aggregate-specific details */
- if (fout->remoteVersion >= 90400)
+ if (fout->remoteVersion >= 90600)
+ {
+ appendPQExpBuffer(query, "SELECT aggtransfn, "
+ "aggfinalfn, aggtranstype::pg_catalog.regtype, "
+ "aggcombinefn, aggmtransfn, aggminvtransfn, "
+ "aggmfinalfn, aggmtranstype::pg_catalog.regtype, "
+ "aggfinalextra, aggmfinalextra, "
+ "aggsortop::pg_catalog.regoperator, "
+ "(aggkind = 'h') AS hypothetical, "
+ "aggtransspace, agginitval, "
+ "aggmtransspace, aggminitval, "
+ "true AS convertok, "
+ "pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, "
+ "pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs "
+ "FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
+ "WHERE a.aggfnoid = p.oid "
+ "AND p.oid = '%u'::pg_catalog.oid",
+ agginfo->aggfn.dobj.catId.oid);
+ }
+ else if (fout->remoteVersion >= 90400)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
@@ -12435,6 +12456,7 @@ dumpAgg(Archive *fout, DumpOptions *dopt, AggInfo *agginfo)
i_aggtransfn = PQfnumber(res, "aggtransfn");
i_aggfinalfn = PQfnumber(res, "aggfinalfn");
+ i_aggcombinefn = PQfnumber(res, "aggcombinefn");
i_aggmtransfn = PQfnumber(res, "aggmtransfn");
i_aggminvtransfn = PQfnumber(res, "aggminvtransfn");
i_aggmfinalfn = PQfnumber(res, "aggmfinalfn");
@@ -12452,6 +12474,7 @@ dumpAgg(Archive *fout, DumpOptions *dopt, AggInfo *agginfo)
aggtransfn = PQgetvalue(res, 0, i_aggtransfn);
aggfinalfn = PQgetvalue(res, 0, i_aggfinalfn);
+ aggcombinefn = PQgetvalue(res, 0, i_aggcombinefn);
aggmtransfn = PQgetvalue(res, 0, i_aggmtransfn);
aggminvtransfn = PQgetvalue(res, 0, i_aggminvtransfn);
aggmfinalfn = PQgetvalue(res, 0, i_aggmfinalfn);
@@ -12540,6 +12563,11 @@ dumpAgg(Archive *fout, DumpOptions *dopt, AggInfo *agginfo)
appendPQExpBufferStr(details, ",\n FINALFUNC_EXTRA");
}
+ if (strcmp(aggcombinefn, "-") != 0)
+ {
+ appendPQExpBuffer(details, ",\n CFUNC = %s", aggcombinefn);
+ }
+
if (strcmp(aggmtransfn, "-") != 0)
{
appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s",
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index dd6079f..b306f9b 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -33,6 +33,7 @@
* aggnumdirectargs number of arguments that are "direct" arguments
* aggtransfn transition function
* aggfinalfn final function (0 if none)
+ * aggcombinefn combine function (0 if none)
* aggmtransfn forward function for moving-aggregate mode (0 if none)
* aggminvtransfn inverse function for moving-aggregate mode (0 if none)
* aggmfinalfn final function for moving-aggregate mode (0 if none)
@@ -56,6 +57,7 @@ CATALOG(pg_aggregate,2600) BKI_WITHOUT_OIDS
int16 aggnumdirectargs;
regproc aggtransfn;
regproc aggfinalfn;
+ regproc aggcombinefn;
regproc aggmtransfn;
regproc aggminvtransfn;
regproc aggmfinalfn;
@@ -85,24 +87,25 @@ typedef FormData_pg_aggregate *Form_pg_aggregate;
* ----------------
*/
-#define Natts_pg_aggregate 17
+#define Natts_pg_aggregate 18
#define Anum_pg_aggregate_aggfnoid 1
#define Anum_pg_aggregate_aggkind 2
#define Anum_pg_aggregate_aggnumdirectargs 3
#define Anum_pg_aggregate_aggtransfn 4
#define Anum_pg_aggregate_aggfinalfn 5
-#define Anum_pg_aggregate_aggmtransfn 6
-#define Anum_pg_aggregate_aggminvtransfn 7
-#define Anum_pg_aggregate_aggmfinalfn 8
-#define Anum_pg_aggregate_aggfinalextra 9
-#define Anum_pg_aggregate_aggmfinalextra 10
-#define Anum_pg_aggregate_aggsortop 11
-#define Anum_pg_aggregate_aggtranstype 12
-#define Anum_pg_aggregate_aggtransspace 13
-#define Anum_pg_aggregate_aggmtranstype 14
-#define Anum_pg_aggregate_aggmtransspace 15
-#define Anum_pg_aggregate_agginitval 16
-#define Anum_pg_aggregate_aggminitval 17
+#define Anum_pg_aggregate_aggcombinefn 6
+#define Anum_pg_aggregate_aggmtransfn 7
+#define Anum_pg_aggregate_aggminvtransfn 8
+#define Anum_pg_aggregate_aggmfinalfn 9
+#define Anum_pg_aggregate_aggfinalextra 10
+#define Anum_pg_aggregate_aggmfinalextra 11
+#define Anum_pg_aggregate_aggsortop 12
+#define Anum_pg_aggregate_aggtranstype 13
+#define Anum_pg_aggregate_aggtransspace 14
+#define Anum_pg_aggregate_aggmtranstype 15
+#define Anum_pg_aggregate_aggmtransspace 16
+#define Anum_pg_aggregate_agginitval 17
+#define Anum_pg_aggregate_aggminitval 18
/*
* Symbolic values for aggkind column. We distinguish normal aggregates
@@ -126,184 +129,184 @@ typedef FormData_pg_aggregate *Form_pg_aggregate;
*/
/* avg */
-DATA(insert ( 2100 n 0 int8_avg_accum numeric_poly_avg int8_avg_accum int8_avg_accum_inv numeric_poly_avg f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2101 n 0 int4_avg_accum int8_avg int4_avg_accum int4_avg_accum_inv int8_avg f f 0 1016 0 1016 0 "{0,0}" "{0,0}" ));
-DATA(insert ( 2102 n 0 int2_avg_accum int8_avg int2_avg_accum int2_avg_accum_inv int8_avg f f 0 1016 0 1016 0 "{0,0}" "{0,0}" ));
-DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg numeric_avg_accum numeric_accum_inv numeric_avg f f 0 2281 128 2281 128 _null_ _null_ ));
-DATA(insert ( 2104 n 0 float4_accum float8_avg - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2105 n 0 float8_accum float8_avg - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2106 n 0 interval_accum interval_avg interval_accum interval_accum_inv interval_avg f f 0 1187 0 1187 0 "{0 second,0 second}" "{0 second,0 second}" ));
+DATA(insert ( 2100 n 0 int8_avg_accum numeric_poly_avg - int8_avg_accum int8_avg_accum_inv numeric_poly_avg f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2101 n 0 int4_avg_accum int8_avg - int4_avg_accum int4_avg_accum_inv int8_avg f f 0 1016 0 1016 0 "{0,0}" "{0,0}" ));
+DATA(insert ( 2102 n 0 int2_avg_accum int8_avg - int2_avg_accum int2_avg_accum_inv int8_avg f f 0 1016 0 1016 0 "{0,0}" "{0,0}" ));
+DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg - numeric_avg_accum numeric_accum_inv numeric_avg f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2104 n 0 float4_accum float8_avg - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2105 n 0 float8_accum float8_avg - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2106 n 0 interval_accum interval_avg - interval_accum interval_accum_inv interval_avg f f 0 1187 0 1187 0 "{0 second,0 second}" "{0 second,0 second}" ));
/* sum */
-DATA(insert ( 2107 n 0 int8_avg_accum numeric_poly_sum int8_avg_accum int8_avg_accum_inv numeric_poly_sum f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2108 n 0 int4_sum - int4_avg_accum int4_avg_accum_inv int2int4_sum f f 0 20 0 1016 0 _null_ "{0,0}" ));
-DATA(insert ( 2109 n 0 int2_sum - int2_avg_accum int2_avg_accum_inv int2int4_sum f f 0 20 0 1016 0 _null_ "{0,0}" ));
-DATA(insert ( 2110 n 0 float4pl - - - - f f 0 700 0 0 0 _null_ _null_ ));
-DATA(insert ( 2111 n 0 float8pl - - - - f f 0 701 0 0 0 _null_ _null_ ));
-DATA(insert ( 2112 n 0 cash_pl - cash_pl cash_mi - f f 0 790 0 790 0 _null_ _null_ ));
-DATA(insert ( 2113 n 0 interval_pl - interval_pl interval_mi - f f 0 1186 0 1186 0 _null_ _null_ ));
-DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum numeric_avg_accum numeric_accum_inv numeric_sum f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2107 n 0 int8_avg_accum numeric_poly_sum - int8_avg_accum int8_avg_accum_inv numeric_poly_sum f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2108 n 0 int4_sum - int8pl int4_avg_accum int4_avg_accum_inv int2int4_sum f f 0 20 0 1016 0 _null_ "{0,0}" ));
+DATA(insert ( 2109 n 0 int2_sum - int8pl int2_avg_accum int2_avg_accum_inv int2int4_sum f f 0 20 0 1016 0 _null_ "{0,0}" ));
+DATA(insert ( 2110 n 0 float4pl - float4pl - - - f f 0 700 0 0 0 _null_ _null_ ));
+DATA(insert ( 2111 n 0 float8pl - float8pl - - - f f 0 701 0 0 0 _null_ _null_ ));
+DATA(insert ( 2112 n 0 cash_pl - cash_pl cash_pl cash_mi - f f 0 790 0 790 0 _null_ _null_ ));
+DATA(insert ( 2113 n 0 interval_pl - interval_pl interval_pl interval_mi - f f 0 1186 0 1186 0 _null_ _null_ ));
+DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum - numeric_avg_accum numeric_accum_inv numeric_sum f f 0 2281 128 2281 128 _null_ _null_ ));
/* max */
-DATA(insert ( 2115 n 0 int8larger - - - - f f 413 20 0 0 0 _null_ _null_ ));
-DATA(insert ( 2116 n 0 int4larger - - - - f f 521 23 0 0 0 _null_ _null_ ));
-DATA(insert ( 2117 n 0 int2larger - - - - f f 520 21 0 0 0 _null_ _null_ ));
-DATA(insert ( 2118 n 0 oidlarger - - - - f f 610 26 0 0 0 _null_ _null_ ));
-DATA(insert ( 2119 n 0 float4larger - - - - f f 623 700 0 0 0 _null_ _null_ ));
-DATA(insert ( 2120 n 0 float8larger - - - - f f 674 701 0 0 0 _null_ _null_ ));
-DATA(insert ( 2121 n 0 int4larger - - - - f f 563 702 0 0 0 _null_ _null_ ));
-DATA(insert ( 2122 n 0 date_larger - - - - f f 1097 1082 0 0 0 _null_ _null_ ));
-DATA(insert ( 2123 n 0 time_larger - - - - f f 1112 1083 0 0 0 _null_ _null_ ));
-DATA(insert ( 2124 n 0 timetz_larger - - - - f f 1554 1266 0 0 0 _null_ _null_ ));
-DATA(insert ( 2125 n 0 cashlarger - - - - f f 903 790 0 0 0 _null_ _null_ ));
-DATA(insert ( 2126 n 0 timestamp_larger - - - - f f 2064 1114 0 0 0 _null_ _null_ ));
-DATA(insert ( 2127 n 0 timestamptz_larger - - - - f f 1324 1184 0 0 0 _null_ _null_ ));
-DATA(insert ( 2128 n 0 interval_larger - - - - f f 1334 1186 0 0 0 _null_ _null_ ));
-DATA(insert ( 2129 n 0 text_larger - - - - f f 666 25 0 0 0 _null_ _null_ ));
-DATA(insert ( 2130 n 0 numeric_larger - - - - f f 1756 1700 0 0 0 _null_ _null_ ));
-DATA(insert ( 2050 n 0 array_larger - - - - f f 1073 2277 0 0 0 _null_ _null_ ));
-DATA(insert ( 2244 n 0 bpchar_larger - - - - f f 1060 1042 0 0 0 _null_ _null_ ));
-DATA(insert ( 2797 n 0 tidlarger - - - - f f 2800 27 0 0 0 _null_ _null_ ));
-DATA(insert ( 3526 n 0 enum_larger - - - - f f 3519 3500 0 0 0 _null_ _null_ ));
-DATA(insert ( 3564 n 0 network_larger - - - - f f 1205 869 0 0 0 _null_ _null_ ));
+DATA(insert ( 2115 n 0 int8larger - int8larger - - - f f 413 20 0 0 0 _null_ _null_ ));
+DATA(insert ( 2116 n 0 int4larger - int4larger - - - f f 521 23 0 0 0 _null_ _null_ ));
+DATA(insert ( 2117 n 0 int2larger - int2larger - - - f f 520 21 0 0 0 _null_ _null_ ));
+DATA(insert ( 2118 n 0 oidlarger - oidlarger - - - f f 610 26 0 0 0 _null_ _null_ ));
+DATA(insert ( 2119 n 0 float4larger - float4larger - - - f f 623 700 0 0 0 _null_ _null_ ));
+DATA(insert ( 2120 n 0 float8larger - float8larger - - - f f 674 701 0 0 0 _null_ _null_ ));
+DATA(insert ( 2121 n 0 int4larger - int4larger - - - f f 563 702 0 0 0 _null_ _null_ ));
+DATA(insert ( 2122 n 0 date_larger - date_larger - - - f f 1097 1082 0 0 0 _null_ _null_ ));
+DATA(insert ( 2123 n 0 time_larger - time_larger - - - f f 1112 1083 0 0 0 _null_ _null_ ));
+DATA(insert ( 2124 n 0 timetz_larger - timetz_larger - - - f f 1554 1266 0 0 0 _null_ _null_ ));
+DATA(insert ( 2125 n 0 cashlarger - cashlarger - - - f f 903 790 0 0 0 _null_ _null_ ));
+DATA(insert ( 2126 n 0 timestamp_larger - timestamp_larger - - - f f 2064 1114 0 0 0 _null_ _null_ ));
+DATA(insert ( 2127 n 0 timestamptz_larger - timestamptz_larger - - - f f 1324 1184 0 0 0 _null_ _null_ ));
+DATA(insert ( 2128 n 0 interval_larger - interval_larger - - - f f 1334 1186 0 0 0 _null_ _null_ ));
+DATA(insert ( 2129 n 0 text_larger - text_larger - - - f f 666 25 0 0 0 _null_ _null_ ));
+DATA(insert ( 2130 n 0 numeric_larger - numeric_larger - - - f f 1756 1700 0 0 0 _null_ _null_ ));
+DATA(insert ( 2050 n 0 array_larger - array_larger - - - f f 1073 2277 0 0 0 _null_ _null_ ));
+DATA(insert ( 2244 n 0 bpchar_larger - bpchar_larger - - - f f 1060 1042 0 0 0 _null_ _null_ ));
+DATA(insert ( 2797 n 0 tidlarger - tidlarger - - - f f 2800 27 0 0 0 _null_ _null_ ));
+DATA(insert ( 3526 n 0 enum_larger - enum_larger - - - f f 3519 3500 0 0 0 _null_ _null_ ));
+DATA(insert ( 3564 n 0 network_larger - network_larger - - - f f 1205 869 0 0 0 _null_ _null_ ));
/* min */
-DATA(insert ( 2131 n 0 int8smaller - - - - f f 412 20 0 0 0 _null_ _null_ ));
-DATA(insert ( 2132 n 0 int4smaller - - - - f f 97 23 0 0 0 _null_ _null_ ));
-DATA(insert ( 2133 n 0 int2smaller - - - - f f 95 21 0 0 0 _null_ _null_ ));
-DATA(insert ( 2134 n 0 oidsmaller - - - - f f 609 26 0 0 0 _null_ _null_ ));
-DATA(insert ( 2135 n 0 float4smaller - - - - f f 622 700 0 0 0 _null_ _null_ ));
-DATA(insert ( 2136 n 0 float8smaller - - - - f f 672 701 0 0 0 _null_ _null_ ));
-DATA(insert ( 2137 n 0 int4smaller - - - - f f 562 702 0 0 0 _null_ _null_ ));
-DATA(insert ( 2138 n 0 date_smaller - - - - f f 1095 1082 0 0 0 _null_ _null_ ));
-DATA(insert ( 2139 n 0 time_smaller - - - - f f 1110 1083 0 0 0 _null_ _null_ ));
-DATA(insert ( 2140 n 0 timetz_smaller - - - - f f 1552 1266 0 0 0 _null_ _null_ ));
-DATA(insert ( 2141 n 0 cashsmaller - - - - f f 902 790 0 0 0 _null_ _null_ ));
-DATA(insert ( 2142 n 0 timestamp_smaller - - - - f f 2062 1114 0 0 0 _null_ _null_ ));
-DATA(insert ( 2143 n 0 timestamptz_smaller - - - - f f 1322 1184 0 0 0 _null_ _null_ ));
-DATA(insert ( 2144 n 0 interval_smaller - - - - f f 1332 1186 0 0 0 _null_ _null_ ));
-DATA(insert ( 2145 n 0 text_smaller - - - - f f 664 25 0 0 0 _null_ _null_ ));
-DATA(insert ( 2146 n 0 numeric_smaller - - - - f f 1754 1700 0 0 0 _null_ _null_ ));
-DATA(insert ( 2051 n 0 array_smaller - - - - f f 1072 2277 0 0 0 _null_ _null_ ));
-DATA(insert ( 2245 n 0 bpchar_smaller - - - - f f 1058 1042 0 0 0 _null_ _null_ ));
-DATA(insert ( 2798 n 0 tidsmaller - - - - f f 2799 27 0 0 0 _null_ _null_ ));
-DATA(insert ( 3527 n 0 enum_smaller - - - - f f 3518 3500 0 0 0 _null_ _null_ ));
-DATA(insert ( 3565 n 0 network_smaller - - - - f f 1203 869 0 0 0 _null_ _null_ ));
+DATA(insert ( 2131 n 0 int8smaller - int8smaller - - - f f 412 20 0 0 0 _null_ _null_ ));
+DATA(insert ( 2132 n 0 int4smaller - int4smaller - - - f f 97 23 0 0 0 _null_ _null_ ));
+DATA(insert ( 2133 n 0 int2smaller - int2smaller - - - f f 95 21 0 0 0 _null_ _null_ ));
+DATA(insert ( 2134 n 0 oidsmaller - oidsmaller - - - f f 609 26 0 0 0 _null_ _null_ ));
+DATA(insert ( 2135 n 0 float4smaller - float4smaller - - - f f 622 700 0 0 0 _null_ _null_ ));
+DATA(insert ( 2136 n 0 float8smaller - float8smaller - - - f f 672 701 0 0 0 _null_ _null_ ));
+DATA(insert ( 2137 n 0 int4smaller - int4smaller - - - f f 562 702 0 0 0 _null_ _null_ ));
+DATA(insert ( 2138 n 0 date_smaller - date_smaller - - - f f 1095 1082 0 0 0 _null_ _null_ ));
+DATA(insert ( 2139 n 0 time_smaller - time_smaller - - - f f 1110 1083 0 0 0 _null_ _null_ ));
+DATA(insert ( 2140 n 0 timetz_smaller - timetz_smaller - - - f f 1552 1266 0 0 0 _null_ _null_ ));
+DATA(insert ( 2141 n 0 cashsmaller - cashsmaller - - - f f 902 790 0 0 0 _null_ _null_ ));
+DATA(insert ( 2142 n 0 timestamp_smaller - timestamp_smaller - - - f f 2062 1114 0 0 0 _null_ _null_ ));
+DATA(insert ( 2143 n 0 timestamptz_smaller - timestamptz_smaller - - - f f 1322 1184 0 0 0 _null_ _null_ ));
+DATA(insert ( 2144 n 0 interval_smaller - interval_smaller - - - f f 1332 1186 0 0 0 _null_ _null_ ));
+DATA(insert ( 2145 n 0 text_smaller - text_smaller - - - f f 664 25 0 0 0 _null_ _null_ ));
+DATA(insert ( 2146 n 0 numeric_smaller - numeric_smaller - - - f f 1754 1700 0 0 0 _null_ _null_ ));
+DATA(insert ( 2051 n 0 array_smaller - array_smaller - - - f f 1072 2277 0 0 0 _null_ _null_ ));
+DATA(insert ( 2245 n 0 bpchar_smaller - bpchar_smaller - - - f f 1058 1042 0 0 0 _null_ _null_ ));
+DATA(insert ( 2798 n 0 tidsmaller - tidsmaller - - - f f 2799 27 0 0 0 _null_ _null_ ));
+DATA(insert ( 3527 n 0 enum_smaller - enum_smaller - - - f f 3518 3500 0 0 0 _null_ _null_ ));
+DATA(insert ( 3565 n 0 network_smaller - network_smaller - - - f f 1203 869 0 0 0 _null_ _null_ ));
/* count */
-DATA(insert ( 2147 n 0 int8inc_any - int8inc_any int8dec_any - f f 0 20 0 20 0 "0" "0" ));
-DATA(insert ( 2803 n 0 int8inc - int8inc int8dec - f f 0 20 0 20 0 "0" "0" ));
+DATA(insert ( 2147 n 0 int8inc_any - int8pl int8inc_any int8dec_any - f f 0 20 0 20 0 "0" "0" ));
+DATA(insert ( 2803 n 0 int8inc - int8pl int8inc int8dec - f f 0 20 0 20 0 "0" "0" ));
/* var_pop */
-DATA(insert ( 2718 n 0 int8_accum numeric_var_pop int8_accum int8_accum_inv numeric_var_pop f f 0 2281 128 2281 128 _null_ _null_ ));
-DATA(insert ( 2719 n 0 int4_accum numeric_poly_var_pop int4_accum int4_accum_inv numeric_poly_var_pop f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2720 n 0 int2_accum numeric_poly_var_pop int2_accum int2_accum_inv numeric_poly_var_pop f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2721 n 0 float4_accum float8_var_pop - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2722 n 0 float8_accum float8_var_pop - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop numeric_accum numeric_accum_inv numeric_var_pop f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2718 n 0 int8_accum numeric_var_pop - int8_accum int8_accum_inv numeric_var_pop f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2719 n 0 int4_accum numeric_poly_var_pop - int4_accum int4_accum_inv numeric_poly_var_pop f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2720 n 0 int2_accum numeric_poly_var_pop - int2_accum int2_accum_inv numeric_poly_var_pop f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2721 n 0 float4_accum float8_var_pop - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2722 n 0 float8_accum float8_var_pop - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop - numeric_accum numeric_accum_inv numeric_var_pop f f 0 2281 128 2281 128 _null_ _null_ ));
/* var_samp */
-DATA(insert ( 2641 n 0 int8_accum numeric_var_samp int8_accum int8_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
-DATA(insert ( 2642 n 0 int4_accum numeric_poly_var_samp int4_accum int4_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2643 n 0 int2_accum numeric_poly_var_samp int2_accum int2_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2644 n 0 float4_accum float8_var_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2645 n 0 float8_accum float8_var_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp numeric_accum numeric_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2641 n 0 int8_accum numeric_var_samp - int8_accum int8_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2642 n 0 int4_accum numeric_poly_var_samp - int4_accum int4_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2643 n 0 int2_accum numeric_poly_var_samp - int2_accum int2_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2644 n 0 float4_accum float8_var_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2645 n 0 float8_accum float8_var_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp - numeric_accum numeric_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
/* variance: historical Postgres syntax for var_samp */
-DATA(insert ( 2148 n 0 int8_accum numeric_var_samp int8_accum int8_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
-DATA(insert ( 2149 n 0 int4_accum numeric_poly_var_samp int4_accum int4_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2150 n 0 int2_accum numeric_poly_var_samp int2_accum int2_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2151 n 0 float4_accum float8_var_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2152 n 0 float8_accum float8_var_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp numeric_accum numeric_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2148 n 0 int8_accum numeric_var_samp - int8_accum int8_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2149 n 0 int4_accum numeric_poly_var_samp - int4_accum int4_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2150 n 0 int2_accum numeric_poly_var_samp - int2_accum int2_accum_inv numeric_poly_var_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2151 n 0 float4_accum float8_var_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2152 n 0 float8_accum float8_var_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp - numeric_accum numeric_accum_inv numeric_var_samp f f 0 2281 128 2281 128 _null_ _null_ ));
/* stddev_pop */
-DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop int8_accum int8_accum_inv numeric_stddev_pop f f 0 2281 128 2281 128 _null_ _null_ ));
-DATA(insert ( 2725 n 0 int4_accum numeric_poly_stddev_pop int4_accum int4_accum_inv numeric_poly_stddev_pop f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2726 n 0 int2_accum numeric_poly_stddev_pop int2_accum int2_accum_inv numeric_poly_stddev_pop f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop numeric_accum numeric_accum_inv numeric_stddev_pop f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop - int8_accum int8_accum_inv numeric_stddev_pop f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2725 n 0 int4_accum numeric_poly_stddev_pop - int4_accum int4_accum_inv numeric_poly_stddev_pop f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2726 n 0 int2_accum numeric_poly_stddev_pop - int2_accum int2_accum_inv numeric_poly_stddev_pop f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop - numeric_accum numeric_accum_inv numeric_stddev_pop f f 0 2281 128 2281 128 _null_ _null_ ));
/* stddev_samp */
-DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp int8_accum int8_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
-DATA(insert ( 2713 n 0 int4_accum numeric_poly_stddev_samp int4_accum int4_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2714 n 0 int2_accum numeric_poly_stddev_samp int2_accum int2_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp - int8_accum int8_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2713 n 0 int4_accum numeric_poly_stddev_samp - int4_accum int4_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2714 n 0 int2_accum numeric_poly_stddev_samp - int2_accum int2_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp - numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
/* stddev: historical Postgres syntax for stddev_samp */
-DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp int8_accum int8_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
-DATA(insert ( 2155 n 0 int4_accum numeric_poly_stddev_samp int4_accum int4_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2156 n 0 int2_accum numeric_poly_stddev_samp int2_accum int2_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
-DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
-DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp - int8_accum int8_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
+DATA(insert ( 2155 n 0 int4_accum numeric_poly_stddev_samp - int4_accum int4_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2156 n 0 int2_accum numeric_poly_stddev_samp - int2_accum int2_accum_inv numeric_poly_stddev_samp f f 0 2281 48 2281 48 _null_ _null_ ));
+DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp - - - - f f 0 1022 0 0 0 "{0,0,0}" _null_ ));
+DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp - numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 2281 128 2281 128 _null_ _null_ ));
/* SQL2003 binary regression aggregates */
-DATA(insert ( 2818 n 0 int8inc_float8_float8 - - - - f f 0 20 0 0 0 "0" _null_ ));
-DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
-DATA(insert ( 2829 n 0 float8_regr_accum float8_corr - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2818 n 0 int8inc_float8_float8 - - - - - f f 0 20 0 0 0 "0" _null_ ));
+DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
+DATA(insert ( 2829 n 0 float8_regr_accum float8_corr - - - - f f 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
/* boolean-and and boolean-or */
-DATA(insert ( 2517 n 0 booland_statefunc - bool_accum bool_accum_inv bool_alltrue f f 58 16 0 2281 16 _null_ _null_ ));
-DATA(insert ( 2518 n 0 boolor_statefunc - bool_accum bool_accum_inv bool_anytrue f f 59 16 0 2281 16 _null_ _null_ ));
-DATA(insert ( 2519 n 0 booland_statefunc - bool_accum bool_accum_inv bool_alltrue f f 58 16 0 2281 16 _null_ _null_ ));
+DATA(insert ( 2517 n 0 booland_statefunc - - bool_accum bool_accum_inv bool_alltrue f f 58 16 0 2281 16 _null_ _null_ ));
+DATA(insert ( 2518 n 0 boolor_statefunc - - bool_accum bool_accum_inv bool_anytrue f f 59 16 0 2281 16 _null_ _null_ ));
+DATA(insert ( 2519 n 0 booland_statefunc - - bool_accum bool_accum_inv bool_alltrue f f 58 16 0 2281 16 _null_ _null_ ));
/* bitwise integer */
-DATA(insert ( 2236 n 0 int2and - - - - f f 0 21 0 0 0 _null_ _null_ ));
-DATA(insert ( 2237 n 0 int2or - - - - f f 0 21 0 0 0 _null_ _null_ ));
-DATA(insert ( 2238 n 0 int4and - - - - f f 0 23 0 0 0 _null_ _null_ ));
-DATA(insert ( 2239 n 0 int4or - - - - f f 0 23 0 0 0 _null_ _null_ ));
-DATA(insert ( 2240 n 0 int8and - - - - f f 0 20 0 0 0 _null_ _null_ ));
-DATA(insert ( 2241 n 0 int8or - - - - f f 0 20 0 0 0 _null_ _null_ ));
-DATA(insert ( 2242 n 0 bitand - - - - f f 0 1560 0 0 0 _null_ _null_ ));
-DATA(insert ( 2243 n 0 bitor - - - - f f 0 1560 0 0 0 _null_ _null_ ));
+DATA(insert ( 2236 n 0 int2and - int2and - - - f f 0 21 0 0 0 _null_ _null_ ));
+DATA(insert ( 2237 n 0 int2or - int2or - - - f f 0 21 0 0 0 _null_ _null_ ));
+DATA(insert ( 2238 n 0 int4and - int4and - - - f f 0 23 0 0 0 _null_ _null_ ));
+DATA(insert ( 2239 n 0 int4or - int4or - - - f f 0 23 0 0 0 _null_ _null_ ));
+DATA(insert ( 2240 n 0 int8and - int8and - - - f f 0 20 0 0 0 _null_ _null_ ));
+DATA(insert ( 2241 n 0 int8or - int8or - - - f f 0 20 0 0 0 _null_ _null_ ));
+DATA(insert ( 2242 n 0 bitand - bitand - - - f f 0 1560 0 0 0 _null_ _null_ ));
+DATA(insert ( 2243 n 0 bitor - bitor - - - f f 0 1560 0 0 0 _null_ _null_ ));
/* xml */
-DATA(insert ( 2901 n 0 xmlconcat2 - - - - f f 0 142 0 0 0 _null_ _null_ ));
+DATA(insert ( 2901 n 0 xmlconcat2 - - - - - f f 0 142 0 0 0 _null_ _null_ ));
/* array */
-DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - t f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn - - - - t f 0 2281 0 0 0 _null_ _null_ ));
/* text */
-DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - - f f 0 2281 0 0 0 _null_ _null_ ));
/* bytea */
-DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - - f f 0 2281 0 0 0 _null_ _null_ ));
/* json */
-DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - f f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn - - - - f f 0 2281 0 0 0 _null_ _null_ ));
/* jsonb */
-DATA(insert ( 3267 n 0 jsonb_agg_transfn jsonb_agg_finalfn - - - f f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3267 n 0 jsonb_agg_transfn jsonb_agg_finalfn - - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - - f f 0 2281 0 0 0 _null_ _null_ ));
/* ordered-set and hypothetical-set aggregates */
-DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - t f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - f f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - f f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - t f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - f f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - f f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - t f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - t f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - t f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - t f 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - - f f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - - t f 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final - - - - t f 0 2281 0 0 0 _null_ _null_ ));
/*
@@ -322,6 +325,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
Oid variadicArgType,
List *aggtransfnName,
List *aggfinalfnName,
+ List *aggcombinefnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 5ccf470..4243c0b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1851,6 +1851,8 @@ typedef struct AggState
AggStatePerTrans curpertrans; /* currently active trans state */
bool input_done; /* indicates end of input */
bool agg_done; /* indicates completion of Agg scan */
+ bool combineStates; /* input tuples contain transition states */
+ bool finalizeAggs; /* should we call the finalfn on agg states? */
int projected_set; /* The last projected grouping set */
int current_set; /* The current grouping set being evaluated */
Bitmapset *grouped_cols; /* grouped cols in current projection */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 37086c6..9ae2a1b 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -726,6 +726,8 @@ typedef struct Agg
AggStrategy aggstrategy;
int numCols; /* number of grouping columns */
AttrNumber *grpColIdx; /* their indexes in the target list */
+ bool combineStates; /* input tuples contain transition states */
+ bool finalizeAggs; /* should we call the finalfn on agg states? */
Oid *grpOperators; /* equality operators to compare with */
long numGroups; /* estimated number of groups in input */
List *groupingSets; /* grouping sets to use */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ac21a3a..9bd6b07 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -62,6 +62,7 @@ extern bool enable_bitmapscan;
extern bool enable_tidscan;
extern bool enable_sort;
extern bool enable_hashagg;
+extern bool enable_parallelagg;
extern bool enable_nestloop;
extern bool enable_material;
extern bool enable_mergejoin;
diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h
index f96e9ee..2989eac 100644
--- a/src/include/optimizer/planmain.h
+++ b/src/include/optimizer/planmain.h
@@ -60,9 +60,8 @@ extern Sort *make_sort_from_groupcols(PlannerInfo *root, List *groupcls,
extern Agg *make_agg(PlannerInfo *root, List *tlist, List *qual,
AggStrategy aggstrategy, const AggClauseCosts *aggcosts,
int numGroupCols, AttrNumber *grpColIdx, Oid *grpOperators,
- List *groupingSets,
- long numGroups,
- Plan *lefttree);
+ List *groupingSets, long numGroups, bool combineStates,
+ bool finalizeAggs, Plan *lefttree);
extern WindowAgg *make_windowagg(PlannerInfo *root, List *tlist,
List *windowFuncs, Index winref,
int partNumCols, AttrNumber *partColIdx, Oid *partOperators,
diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h
index e2b3894..621b6b9 100644
--- a/src/include/parser/parse_agg.h
+++ b/src/include/parser/parse_agg.h
@@ -46,6 +46,12 @@ extern void build_aggregate_transfn_expr(Oid *agg_input_types,
Expr **transfnexpr,
Expr **invtransfnexpr);
+extern void build_aggregate_combinefn_expr(bool agg_variadic,
+ Oid agg_state_type,
+ Oid agg_input_collation,
+ Oid combinefn_oid,
+ Expr **combinefnexpr);
+
extern void build_aggregate_finalfn_expr(Oid *agg_input_types,
int num_finalfn_inputs,
Oid agg_state_type,
diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out
index 82a34fb..56643f2 100644
--- a/src/test/regress/expected/create_aggregate.out
+++ b/src/test/regress/expected/create_aggregate.out
@@ -101,6 +101,23 @@ CREATE AGGREGATE sumdouble (float8)
msfunc = float8pl,
minvfunc = float8mi
);
+-- aggregate combine functions
+CREATE AGGREGATE mymax (int)
+(
+ stype = int4,
+ sfunc = int4larger,
+ cfunc = int4larger
+);
+-- Ensure all these functions made it into the catalog
+SELECT aggfnoid,aggtransfn,aggcombinefn,aggtranstype
+FROM pg_aggregate
+WHERE aggfnoid = 'mymax'::REGPROC;
+ aggfnoid | aggtransfn | aggcombinefn | aggtranstype
+----------+------------+--------------+--------------
+ mymax | int4larger | int4larger | 23
+(1 row)
+
+DROP AGGREGATE mymax (int);
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS
$$ SELECT $1 - $2; $$
diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql
index 0ec1572..0070382 100644
--- a/src/test/regress/sql/create_aggregate.sql
+++ b/src/test/regress/sql/create_aggregate.sql
@@ -115,6 +115,21 @@ CREATE AGGREGATE sumdouble (float8)
minvfunc = float8mi
);
+-- aggregate combine functions
+CREATE AGGREGATE mymax (int)
+(
+ stype = int4,
+ sfunc = int4larger,
+ cfunc = int4larger
+);
+
+-- Ensure all these functions made it into the catalog
+SELECT aggfnoid,aggtransfn,aggcombinefn,aggtranstype
+FROM pg_aggregate
+WHERE aggfnoid = 'mymax'::REGPROC;
+
+DROP AGGREGATE mymax (int);
+
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS