From 955f1bb259b7a78c36398364f5035c2fc3ce79d6 Mon Sep 17 00:00:00 2001 From: David Rowley Date: Wed, 16 Mar 2016 01:40:53 +1300 Subject: [PATCH 1/4] Allow aggregation to happen in parallel This modifies the grouping planner to allow it to generate Paths for parallel aggregation, when possible. --- src/backend/executor/execQual.c | 34 +++- src/backend/nodes/copyfuncs.c | 17 ++ src/backend/nodes/equalfuncs.c | 12 ++ src/backend/nodes/nodeFuncs.c | 24 +++ src/backend/nodes/outfuncs.c | 12 ++ src/backend/nodes/readfuncs.c | 16 ++ src/backend/optimizer/path/costsize.c | 10 +- src/backend/optimizer/plan/createplan.c | 4 +- src/backend/optimizer/plan/planner.c | 278 +++++++++++++++++++++++++++++++- src/backend/optimizer/plan/setrefs.c | 197 +++++++++++++++++++++- src/backend/optimizer/prep/prepunion.c | 4 +- src/backend/optimizer/util/clauses.c | 81 ++++++++++ src/backend/optimizer/util/pathnode.c | 125 +++++++++++++- src/backend/optimizer/util/tlist.c | 41 +++++ src/backend/utils/adt/ruleutils.c | 6 + src/include/nodes/nodes.h | 1 + src/include/nodes/primnodes.h | 25 +++ src/include/nodes/relation.h | 2 + src/include/optimizer/clauses.h | 20 +++ src/include/optimizer/cost.h | 2 +- src/include/optimizer/pathnode.h | 15 +- src/include/optimizer/tlist.h | 1 + 22 files changed, 907 insertions(+), 20 deletions(-) diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c index 778b6c1..3260f80 100644 --- a/src/backend/executor/execQual.c +++ b/src/backend/executor/execQual.c @@ -4510,11 +4510,12 @@ ExecInitExpr(Expr *node, PlanState *parent) case T_Aggref: { AggrefExprState *astate = makeNode(AggrefExprState); + AggState *aggstate = (AggState *) parent; astate->xprstate.evalfunc = (ExprStateEvalFunc) ExecEvalAggref; - if (parent && IsA(parent, AggState)) + if (aggstate && IsA(aggstate, AggState) && + aggstate->finalizeAggs) { - AggState *aggstate = (AggState *) parent; aggstate->aggs = lcons(astate, aggstate->aggs); aggstate->numaggs++; @@ -4522,11 +4523,38 @@ ExecInitExpr(Expr *node, PlanState *parent) else { /* planner messed up */ - elog(ERROR, "Aggref found in non-Agg plan node"); + elog(ERROR, "Aggref found in non-FinalizeAgg plan node"); } state = (ExprState *) astate; } break; + case T_PartialAggref: + { + AggrefExprState *astate = makeNode(AggrefExprState); + AggState *aggstate = (AggState *) parent; + + astate->xprstate.evalfunc = (ExprStateEvalFunc) ExecEvalAggref; + if (aggstate && IsA(aggstate, AggState) && + !aggstate->finalizeAggs) + { + + aggstate->aggs = lcons(astate, aggstate->aggs); + aggstate->numaggs++; + } + else + { + /* planner messed up */ + elog(ERROR, "PartialAggref found in non-PartialAgg plan node"); + } + state = (ExprState *) astate; + + /* + * Obliterate the PartialAggref and return the underlying + * Aggref node + */ + state->expr = (Expr *) ((PartialAggref *) node)->aggref; + } + return state; /* Don't fall through to the "common" code below */ case T_GroupingFunc: { GroupingFunc *grp_node = (GroupingFunc *) node; diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index df7c2fa..42781c1 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1248,6 +1248,20 @@ _copyAggref(const Aggref *from) } /* + * _copyPartialAggref + */ +static PartialAggref * +_copyPartialAggref(const PartialAggref *from) +{ + PartialAggref *newnode = makeNode(PartialAggref); + + COPY_SCALAR_FIELD(aggtranstype); + COPY_NODE_FIELD(aggref); + + return newnode; +} + +/* * _copyGroupingFunc */ static GroupingFunc * @@ -4393,6 +4407,9 @@ copyObject(const void *from) case T_Aggref: retval = _copyAggref(from); break; + case T_PartialAggref: + retval = _copyPartialAggref(from); + break; case T_GroupingFunc: retval = _copyGroupingFunc(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index b9c3959..de445f1 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -209,6 +209,15 @@ _equalAggref(const Aggref *a, const Aggref *b) } static bool +_equalPartialAggref(const PartialAggref *a, const PartialAggref *b) +{ + COMPARE_SCALAR_FIELD(aggtranstype); + COMPARE_NODE_FIELD(aggref); + + return true; +} + +static bool _equalGroupingFunc(const GroupingFunc *a, const GroupingFunc *b) { COMPARE_NODE_FIELD(args); @@ -2733,6 +2742,9 @@ equal(const void *a, const void *b) case T_Aggref: retval = _equalAggref(a, b); break; + case T_PartialAggref: + retval = _equalPartialAggref(a, b); + break; case T_GroupingFunc: retval = _equalGroupingFunc(a, b); break; diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index b4ea440..6440a7e 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -59,6 +59,9 @@ exprType(const Node *expr) case T_Aggref: type = ((const Aggref *) expr)->aggtype; break; + case T_PartialAggref: + type = ((const PartialAggref *) expr)->aggtranstype; + break; case T_GroupingFunc: type = INT4OID; break; @@ -758,6 +761,9 @@ exprCollation(const Node *expr) case T_Aggref: coll = ((const Aggref *) expr)->aggcollid; break; + case T_PartialAggref: + coll = InvalidOid; /* XXX is this correct? */ + break; case T_GroupingFunc: coll = InvalidOid; break; @@ -1708,6 +1714,15 @@ expression_tree_walker(Node *node, return true; } break; + case T_PartialAggref: + { + PartialAggref *expr = (PartialAggref *) node; + + if (expression_tree_walker((Node *) expr->aggref, walker, + context)) + return true; + } + break; case T_GroupingFunc: { GroupingFunc *grouping = (GroupingFunc *) node; @@ -2281,6 +2296,15 @@ expression_tree_mutator(Node *node, return (Node *) newnode; } break; + case T_PartialAggref: + { + PartialAggref *paggref = (PartialAggref *) node; + PartialAggref *newnode; + + FLATCOPY(newnode, paggref, PartialAggref); + MUTATE(newnode->aggref, paggref->aggref, Aggref *); + return (Node *) newnode; + } case T_GroupingFunc: { GroupingFunc *grouping = (GroupingFunc *) node; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 548a3b9..3773f12 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1046,6 +1046,15 @@ _outAggref(StringInfo str, const Aggref *node) } static void +_outPartialAggref(StringInfo str, const PartialAggref *node) +{ + WRITE_NODE_TYPE("PARTIALAGGREF"); + + WRITE_OID_FIELD(aggtranstype); + WRITE_NODE_FIELD(aggref); +} + +static void _outGroupingFunc(StringInfo str, const GroupingFunc *node) { WRITE_NODE_TYPE("GROUPINGFUNC"); @@ -3376,6 +3385,9 @@ _outNode(StringInfo str, const void *obj) case T_Aggref: _outAggref(str, obj); break; + case T_PartialAggref: + _outPartialAggref(str, obj); + break; case T_GroupingFunc: _outGroupingFunc(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index a2c2243..647d3a8 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -569,6 +569,20 @@ _readAggref(void) } /* + * _readPartialAggref + */ +static PartialAggref * +_readPartialAggref(void) +{ + READ_LOCALS(PartialAggref); + + READ_OID_FIELD(aggtranstype); + READ_NODE_FIELD(aggref); + + READ_DONE(); +} + +/* * _readGroupingFunc */ static GroupingFunc * @@ -2307,6 +2321,8 @@ parseNodeString(void) return_value = _readParam(); else if (MATCH("AGGREF", 6)) return_value = _readAggref(); + else if (MATCH("PARTIALAGGREF", 13)) + return_value = _readPartialAggref(); else if (MATCH("GROUPINGFUNC", 12)) return_value = _readGroupingFunc(); else if (MATCH("WINDOWFUNC", 10)) diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 943fcde..58bfad8 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -350,16 +350,22 @@ cost_samplescan(Path *path, PlannerInfo *root, * * 'rel' is the relation to be operated upon * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + * 'rows' may be used to point to a row estimate, this may be used when a rel + * is unavailable to retrieve row estimates from. This setting, if non-NULL + * overrides both 'rel' and 'param_info'. */ void cost_gather(GatherPath *path, PlannerInfo *root, - RelOptInfo *rel, ParamPathInfo *param_info) + RelOptInfo *rel, ParamPathInfo *param_info, + double *rows) { Cost startup_cost = 0; Cost run_cost = 0; /* Mark the path with the correct row estimate */ - if (param_info) + if (rows) + path->path.rows = *rows; + else if (param_info) path->path.rows = param_info->ppi_rows; else path->path.rows = rel->rows; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index e37bdfd..6953a60 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1572,8 +1572,8 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path) plan = make_agg(tlist, quals, best_path->aggstrategy, - false, - true, + best_path->combineStates, + best_path->finalizeAggs, list_length(best_path->groupClause), extract_grouping_cols(best_path->groupClause, subplan->targetlist), diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index fc0a2d8..bc1c954 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -134,6 +134,8 @@ static RelOptInfo *create_ordered_paths(PlannerInfo *root, double limit_tuples); static PathTarget *make_group_input_target(PlannerInfo *root, PathTarget *final_target); +static PathTarget *make_partialgroup_input_target(PlannerInfo *root, + PathTarget *final_target); static List *postprocess_setop_tlist(List *new_tlist, List *orig_tlist); static List *select_active_windows(PlannerInfo *root, WindowFuncLists *wflists); static PathTarget *make_window_input_target(PlannerInfo *root, @@ -1767,6 +1769,19 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, (*create_upper_paths_hook) (root, current_rel); /* + * Likewise for any partial paths, although this case is more simple as + * we don't track the cheapest path. + */ + foreach(lc, current_rel->partial_pathlist) + { + Path *subpath = (Path *) lfirst(lc); + + Assert(subpath->param_info == NULL); + lfirst(lc) = apply_projection_to_path(root, current_rel, + subpath, scanjoin_target); + } + + /* * If we have grouping and/or aggregation, consider ways to implement * that. We build a new upperrel representing the output of this * phase. @@ -3162,10 +3177,15 @@ create_grouping_paths(PlannerInfo *root, { Query *parse = root->parse; Path *cheapest_path = input_rel->cheapest_total_path; + PathTarget *partial_group_target = NULL; /* for parallel aggregate */ RelOptInfo *grouped_rel; AggClauseCosts agg_costs; double dNumGroups; bool allow_hash; + bool can_hash; + bool can_sort; + bool can_parallel; + ListCell *lc; /* For now, do all work in the (GROUP_AGG, NULL) upperrel */ @@ -3259,12 +3279,44 @@ create_grouping_paths(PlannerInfo *root, rollup_groupclauses); /* + * Determine if it's possible to perform aggregation in parallel using + * multiple worker processes. We can permit this when there's at least one + * partial_path in input_rel, but not if the query has grouping sets, + * (although this likely just requires a bit more thought). We must also + * ensure that any aggregate functions which are present in either the + * target list, or in the HAVING clause all support parallel mode. + */ + can_parallel = false; + + if ((parse->hasAggs || parse->groupClause != NIL) && + input_rel->partial_pathlist != NIL && + parse->groupingSets == NIL && + root->glob->parallelModeOK) + { + /* + * Check that all aggregate functions support partial mode, + * however if there are no aggregate functions then we can skip + * this check. + */ + if (!parse->hasAggs || + (aggregates_allow_partial((Node *) target->exprs) == PAT_ANY && + aggregates_allow_partial(root->parse->havingQual) == PAT_ANY)) + { + can_parallel = true; + partial_group_target = make_partialgroup_input_target(root, + target); + } + } + + /* * Consider sort-based implementations of grouping, if possible. (Note * that if groupClause is empty, grouping_is_sortable() is trivially true, * and all the pathkeys_contained_in() tests will succeed too, so that * we'll consider every surviving input path.) */ - if (grouping_is_sortable(parse->groupClause)) + can_sort = grouping_is_sortable(parse->groupClause); + + if (can_sort) { /* * Use any available suitably-sorted path as input, and also consider @@ -3320,7 +3372,9 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, &agg_costs, - dNumGroups)); + dNumGroups, + false, + true)); } else if (parse->groupClause) { @@ -3344,6 +3398,42 @@ create_grouping_paths(PlannerInfo *root, } } } + + if (can_parallel) + { + AggStrategy aggstrategy; + + if (parse->groupClause != NIL) + aggstrategy = AGG_SORTED; + else + aggstrategy = AGG_PLAIN; + + foreach(lc, input_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + bool is_sorted; + + is_sorted = pathkeys_contained_in(root->group_pathkeys, + path->pathkeys); + if (!is_sorted) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + path, + partial_group_target, + target, + aggstrategy, + aggstrategy, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + } + } } /* @@ -3392,7 +3482,9 @@ create_grouping_paths(PlannerInfo *root, } } - if (allow_hash && grouping_is_hashable(parse->groupClause)) + can_hash = allow_hash && grouping_is_hashable(parse->groupClause); + + if (can_hash) { /* * We just need an Agg over the cheapest-total input path, since input @@ -3406,7 +3498,90 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, &agg_costs, - dNumGroups)); + dNumGroups, + false, + true)); + + if (can_parallel) + { + Path *cheapest_partial_path; + + cheapest_partial_path = (Path *) linitial(input_rel->partial_pathlist); + + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + cheapest_partial_path, + partial_group_target, + target, + AGG_HASHED, + AGG_HASHED, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + } + } + + /* + * For parallel aggregation, since this happens in 2 phases, we'll also try + * a mixing the aggregate strategies to see if that'll bring the cost down + * any. + */ + if (can_parallel && can_hash && can_sort) + { + Path *cheapest_partial_path; + + cheapest_partial_path = (Path *) linitial(input_rel->partial_pathlist); + + Assert(parse->groupClause != NIL); + + /* + * Try hashing in the partial phase, and sorting in the final. We need + * only bother trying this on the cheapest partial path since hashing + * does not care about the order of the input path. + */ + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + cheapest_partial_path, + partial_group_target, + target, + AGG_HASHED, + AGG_SORTED, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + + /* + * Try sorting in the partial phase, and hashing in the final. We do + * this for all partial paths as some may have useful ordering + */ + foreach(lc, input_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + bool is_sorted; + + is_sorted = pathkeys_contained_in(root->group_pathkeys, + path->pathkeys); + if (!is_sorted) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + path, + partial_group_target, + target, + AGG_SORTED, + AGG_HASHED, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + } } /* Give a helpful error if we failed to find any implementation */ @@ -3735,7 +3910,9 @@ create_distinct_paths(PlannerInfo *root, parse->distinctClause, NIL, NULL, - numDistinctRows)); + numDistinctRows, + false, + true)); } /* Give a helpful error if we failed to find any implementation */ @@ -3915,6 +4092,97 @@ make_group_input_target(PlannerInfo *root, PathTarget *final_target) } /* + * make_partialgroup_input_target + * Generate appropriate PathTarget for input to partial grouping nodes. + * + * This is very similar to make_group_input_target(), only we do not recurse + * into Aggrefs. Aggrefs are left intact and added to the target list. Here we + * also add any Aggrefs which are located in the HAVING clause into the + * PathTarget. + * + * Aggrefs are also wrapped in a PartialAggref node in order to allow the + * correct return type to be the aggregate state type rather than the aggregate + * function's return type. + */ +static PathTarget * +make_partialgroup_input_target(PlannerInfo *root, PathTarget *final_target) +{ + Query *parse = root->parse; + PathTarget *input_target; + List *non_group_cols; + List *non_group_exprs; + int i; + ListCell *lc; + + input_target = create_empty_pathtarget(); + non_group_cols = NIL; + + i = -1; + foreach(lc, final_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + + i++; + + if (parse->groupClause) + { + Index sgref = final_target->sortgrouprefs[i]; + + if (sgref && get_sortgroupref_clause_noerr(sgref, parse->groupClause) + != NULL) + { + /* + * It's a grouping column, so add it to the input target as-is. + */ + add_column_to_pathtarget(input_target, expr, sgref); + continue; + } + } + + /* + * Non-grouping column, so just remember the expression for later + * call to pull_var_clause. + */ + non_group_cols = lappend(non_group_cols, expr); + } + + /* + * If there's a HAVING clause, we'll need the Aggrefs it uses, too. + */ + if (parse->havingQual) + non_group_cols = lappend(non_group_cols, parse->havingQual); + + /* + * Pull out all the Vars mentioned in non-group cols (plus HAVING), and + * add them to the input target if not already present. (A Var used + * directly as a GROUP BY item will be present already.) Note this + * includes Vars used in resjunk items, so we are covering the needs of + * ORDER BY and window specifications. Vars used within Aggrefs will be + * ignored and the Aggrefs themselves will be added to the PathTarget. + */ + non_group_exprs = pull_var_clause((Node *) non_group_cols, + PVC_INCLUDE_AGGREGATES | + PVC_RECURSE_WINDOWFUNCS | + PVC_INCLUDE_PLACEHOLDERS); + + add_new_columns_to_pathtarget(input_target, non_group_exprs); + + /* clean up cruft */ + list_free(non_group_exprs); + list_free(non_group_cols); + + /* + * Wrap up the Aggrefs in PartialAggref nodes so that we can return the + * correct type in exprType() + */ + apply_partialaggref_nodes(input_target); + + /* XXX this causes some redundant cost calculation ... */ + input_target = set_pathtarget_cost_width(root, input_target); + return input_target; +} + +/* * postprocess_setop_tlist * Fix up targetlist returned by plan_set_operations(). * diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index aa2c308..2db1753 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -104,6 +104,8 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context); static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context); static void set_join_references(PlannerInfo *root, Join *join, int rtoffset); static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_combineagg_references(PlannerInfo *root, Plan *plan, + int rtoffset); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); static Var *search_indexed_tlist_for_var(Var *var, @@ -131,6 +133,13 @@ static Node *fix_upper_expr(PlannerInfo *root, int rtoffset); static Node *fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context); +static Node *fix_combine_agg_expr(PlannerInfo *root, + Node *node, + indexed_tlist *subplan_itlist, + Index newvarno, + int rtoffset); +static Node *fix_combine_agg_expr_mutator(Node *node, + fix_upper_expr_context *context); static List *set_returning_clause_references(PlannerInfo *root, List *rlist, Plan *topplan, @@ -667,8 +676,16 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) } break; case T_Agg: - set_upper_references(root, plan, rtoffset); - break; + { + Agg *aggplan = (Agg *) plan; + + if (aggplan->combineStates) + set_combineagg_references(root, plan, rtoffset); + else + set_upper_references(root, plan, rtoffset); + + break; + } case T_Group: set_upper_references(root, plan, rtoffset); break; @@ -1702,6 +1719,72 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) } /* + * set_combineagg_references + * This does a similar job as set_upper_references(), but additionally it + * transforms Aggref nodes args to suit the combine aggregate phase, this + * means that the Aggref->args are converted to reference the corresponding + * aggregate function in the subplan rather than simple Var(s), as would be + * the case for a non-combine aggregate node. + */ +static void +set_combineagg_references(PlannerInfo *root, Plan *plan, int rtoffset) +{ + Plan *subplan = plan->lefttree; + indexed_tlist *subplan_itlist; + List *output_targetlist; + ListCell *l; + + Assert(IsA(plan, Agg)); + Assert(((Agg *) plan)->combineStates); + + subplan_itlist = build_tlist_index(subplan->targetlist); + + output_targetlist = NIL; + + foreach(l, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + Node *newexpr; + + /* If it's a non-Var sort/group item, first try to match by sortref */ + if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var)) + { + newexpr = (Node *) + search_indexed_tlist_for_sortgroupref((Node *) tle->expr, + tle->ressortgroupref, + subplan_itlist, + OUTER_VAR); + if (!newexpr) + newexpr = fix_combine_agg_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset); + } + else + newexpr = fix_combine_agg_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset); + tle = flatCopyTargetEntry(tle); + tle->expr = (Expr *) newexpr; + output_targetlist = lappend(output_targetlist, tle); + } + + plan->targetlist = output_targetlist; + + plan->qual = (List *) + fix_combine_agg_expr(root, + (Node *) plan->qual, + subplan_itlist, + OUTER_VAR, + rtoffset); + + pfree(subplan_itlist); +} + +/* * set_dummy_tlist_references * Replace the targetlist of an upper-level plan node with a simple * list of OUTER_VAR references to its child. @@ -2238,6 +2321,116 @@ fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context) } /* + * fix_combine_agg_expr + * Like fix_upper_expr() but additionally adjusts the Aggref->args of + * Aggrefs so that they references the corresponding Aggref in the subplan. + */ +static Node * +fix_combine_agg_expr(PlannerInfo *root, + Node *node, + indexed_tlist *subplan_itlist, + Index newvarno, + int rtoffset) +{ + fix_upper_expr_context context; + + context.root = root; + context.subplan_itlist = subplan_itlist; + context.newvarno = newvarno; + context.rtoffset = rtoffset; + return fix_combine_agg_expr_mutator(node, &context); +} + +static Node * +fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context) +{ + Var *newvar; + + if (node == NULL) + return NULL; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + newvar = search_indexed_tlist_for_var(var, + context->subplan_itlist, + context->newvarno, + context->rtoffset); + if (!newvar) + elog(ERROR, "variable not found in subplan target list"); + return (Node *) newvar; + } + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + TargetEntry *tle; + ListCell *lc; + + /* + * Aggrefs for partial aggregates are wrapped up in a PartialAggref, + * we need to look into the PartialAggref to find the Aggref within. + */ + foreach(lc, context->subplan_itlist->tlist) + { + PartialAggref *paggref; + tle = (TargetEntry *) lfirst(lc); + paggref = (PartialAggref *) tle->expr; + + if (IsA(paggref, PartialAggref) && + equal(paggref->aggref, aggref)) + break; + } + + if (lc != NULL) + { + Var *newvar; + Aggref *newaggref; + TargetEntry *newtle; + + newvar = makeVarFromTargetEntry(context->newvarno, tle); + newvar->varnoold = 0; /* wasn't ever a plain Var */ + newvar->varoattno = 0; + + /* + * Now build a new TargetEntry for the Aggref's arguments which is + * a single Var which references the corresponding PartialAggRef + * in the node below. + */ + newtle = makeTargetEntry((Expr *) newvar, 1, NULL, false); + newaggref = (Aggref *) copyObject(aggref); + newaggref->args = list_make1(newtle); + + return (Node *) newaggref; + } + else + elog(ERROR, "Aggref not found in subplan target list"); + } + if (IsA(node, PlaceHolderVar)) + { + PlaceHolderVar *phv = (PlaceHolderVar *) node; + + /* See if the PlaceHolderVar has bubbled up from a lower plan node */ + if (context->subplan_itlist->has_ph_vars) + { + newvar = search_indexed_tlist_for_non_var((Node *) phv, + context->subplan_itlist, + context->newvarno); + if (newvar) + return (Node *) newvar; + } + /* If not supplied by input plan, evaluate the contained expr */ + return fix_upper_expr_mutator((Node *) phv->phexpr, context); + } + if (IsA(node, Param)) + return fix_param_node(context->root, (Param *) node); + + fix_expr_common(context->root, node); + return expression_tree_mutator(node, + fix_combine_agg_expr_mutator, + (void *) context); +} + +/* * set_returning_clause_references * Perform setrefs.c's work on a RETURNING targetlist * diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 6ea3319..fb139af 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -859,7 +859,9 @@ make_union_unique(SetOperationStmt *op, Path *path, List *tlist, groupList, NIL, NULL, - dNumGroups); + dNumGroups, + false, + true); } else { diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index b692e18..f315961 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -52,6 +52,10 @@ #include "utils/syscache.h" #include "utils/typcache.h" +typedef struct +{ + PartialAggType allowedtype; +} partial_agg_context; typedef struct { @@ -93,6 +97,8 @@ typedef struct bool allow_restricted; } has_parallel_hazard_arg; +static bool aggregates_allow_partial_walker(Node *node, + partial_agg_context *context); static bool contain_agg_clause_walker(Node *node, void *context); static bool count_agg_clauses_walker(Node *node, count_agg_clauses_context *context); @@ -400,6 +406,81 @@ make_ands_implicit(Expr *clause) *****************************************************************************/ /* + * aggregates_allow_partial + * Recursively search for Aggref clauses and determine the maximum + * 'degree' of partial aggregation which can be supported. Partial + * aggregation requires that each aggregate does not have a DISTINCT or + * ORDER BY clause, and that it also has a combine function set. + */ +PartialAggType +aggregates_allow_partial(Node *clause) +{ + partial_agg_context context; + + /* initially any type is okay, until we find Aggrefs which say otherwise */ + context.allowedtype = PAT_ANY; + + if (!aggregates_allow_partial_walker(clause, &context)) + return context.allowedtype; + return context.allowedtype; +} + +static bool +aggregates_allow_partial_walker(Node *node, partial_agg_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + Assert(aggref->agglevelsup == 0); + + /* + * We can't perform partial aggregation with Aggrefs containing a + * DISTINCT or ORDER BY clause. + */ + if (aggref->aggdistinct || aggref->aggorder) + { + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + /* + * If there is no combine function, then partial aggregation is not + * possible. + */ + if (!OidIsValid(aggform->aggcombinefn)) + { + ReleaseSysCache(aggTuple); + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + + /* + * If we find any aggs with an internal transtype then we must ensure + * that pointers to aggregate states are not passed to other processes, + * therefore we set the maximum degree to PAT_INTERNAL_ONLY. + */ + if (aggform->aggtranstype == INTERNALOID) + context->allowedtype = PAT_INTERNAL_ONLY; + + ReleaseSysCache(aggTuple); + return false; /* continue searching */ + } + return expression_tree_walker(node, aggregates_allow_partial_walker, + (void *) context); +} + +/* * contain_agg_clause * Recursively search for Aggref/GroupingFunc nodes within a clause. * diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index b8ea316..bc86c04 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1674,7 +1674,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->single_copy = true; } - cost_gather(pathnode, root, rel, pathnode->path.param_info); + cost_gather(pathnode, root, rel, pathnode->path.param_info, NULL); return pathnode; } @@ -2387,6 +2387,8 @@ create_upper_unique_path(PlannerInfo *root, * 'qual' is the HAVING quals if any * 'aggcosts' contains cost info about the aggregate functions to be computed * 'numGroups' is the estimated number of groups (1 if not grouping) + * 'combineStates' is set to true if the Agg node should combine agg states + * 'finalizeAggs' is set to false if the Agg node should not call the finalfn */ AggPath * create_agg_path(PlannerInfo *root, @@ -2397,9 +2399,11 @@ create_agg_path(PlannerInfo *root, List *groupClause, List *qual, const AggClauseCosts *aggcosts, - double numGroups) + double numGroups, + bool combineStates, + bool finalizeAggs) { - AggPath *pathnode = makeNode(AggPath); + AggPath *pathnode = makeNode(AggPath); pathnode->path.pathtype = T_Agg; pathnode->path.parent = rel; @@ -2420,6 +2424,8 @@ create_agg_path(PlannerInfo *root, pathnode->numGroups = numGroups; pathnode->groupClause = groupClause; pathnode->qual = qual; + pathnode->finalizeAggs = finalizeAggs; + pathnode->combineStates = combineStates; cost_agg(&pathnode->path, root, aggstrategy, aggcosts, @@ -2431,6 +2437,119 @@ create_agg_path(PlannerInfo *root, pathnode->path.startup_cost += target->cost.startup; pathnode->path.total_cost += target->cost.startup + target->cost.per_tuple * pathnode->path.rows; + return pathnode; +} + +/* + * create_parallelagg_path + * Creates a chain of path nodes which represents the required executor + * nodes to perform aggregation in parallel. This series of paths consists + * of a partial aggregation phase which is intended to be executed on + * multiple worker processes. This aggregation phase does not execute the + * aggregate's final function, it instead returns the aggregate state. A + * Gather path is then added to bring these aggregated states back into the + * master process, where the final aggregate node combines these + * intermediate states with other states which belong to the same group, + * it's in this phase that the aggregate's final function is called, if + * present, and also where any HAVING clause is applied. + * + * 'rel' is the parent relation associated with the result + * 'subpath' is the path representing the source of data + * 'partialtarget' is the PathTarget for the partial agg phase + * 'finaltarget' is the final PathTarget to be computed + * 'partialstrategy' is the Agg node's implementation strategy for 1st stage + * 'finalstrategy' is the Agg node's implementation strategy for 2nd stage + * 'groupClause' is a list of SortGroupClause's representing the grouping + * 'qual' is the HAVING quals if any + * 'aggcosts' contains cost info about the aggregate functions to be computed + * 'numGroups' is the estimated number of groups (1 if not grouping) + */ +AggPath * +create_parallelagg_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + PathTarget *partialtarget, + PathTarget *finaltarget, + AggStrategy partialstrategy, + AggStrategy finalstrategy, + List *groupClause, + List *qual, + const AggClauseCosts *aggcosts, + double numGroups) +{ + GatherPath *gatherpath = makeNode(GatherPath); + AggPath *pathnode; + Path *currentpath; + double numPartialGroups; + + /* Add the partial aggregate node */ + pathnode = create_agg_path(root, + rel, + subpath, + partialtarget, + partialstrategy, + groupClause, + NIL, /* don't apply qual until final phase */ + aggcosts, + numGroups, + false, + false); + + gatherpath->path.pathtype = T_Gather; + gatherpath->path.parent = rel; + gatherpath->path.pathtarget = partialtarget; + gatherpath->path.param_info = NULL; + gatherpath->path.parallel_aware = false; + gatherpath->path.parallel_safe = false; + gatherpath->path.parallel_degree = subpath->parallel_degree; + gatherpath->path.pathkeys = NIL; /* output is unordered */ + gatherpath->subpath = (Path *) pathnode; + gatherpath->single_copy = false; + + /* + * Estimate the total number of groups which the Gather node will receive + * from the aggregate worker processes. We'll assume that each worker will + * produce every possible group, this might be an overestimate, although it + * seems safer to over estimate here rather than underestimate. To keep + * this number sane we cap the number of groups so it's never larger than + * the number of rows in the input path. This prevents the number of groups + * being estimated to be higher than the actual number of input rows. + */ + numPartialGroups = Min(numGroups, subpath->rows) * + subpath->parallel_degree; + + cost_gather(gatherpath, root, NULL, NULL, &numPartialGroups); + + currentpath = &gatherpath->path; + + /* + * Gather is always unsorted, so we need to sort again if we're using + * the AGG_SORTED strategy + */ + if (finalstrategy == AGG_SORTED) + { + SortPath *sortpath; + + sortpath = create_sort_path(root, + rel, + &gatherpath->path, + root->query_pathkeys, + -1.0); + currentpath = &sortpath->path; + } + + /* create the finalize aggregate node */ + pathnode = create_agg_path(root, + rel, + currentpath, + finaltarget, + finalstrategy, + groupClause, + qual, + aggcosts, + numGroups, + true, + true); return pathnode; } diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c index b297d87..19f7c3d 100644 --- a/src/backend/optimizer/util/tlist.c +++ b/src/backend/optimizer/util/tlist.c @@ -14,9 +14,12 @@ */ #include "postgres.h" +#include "access/htup_details.h" +#include "catalog/pg_aggregate.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/tlist.h" +#include "utils/syscache.h" /***************************************************************************** @@ -748,3 +751,41 @@ apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target) i++; } } + +/* + * apply_partialaggref_nodes + * Convert PathTarget to be suitable for a partial aggregate node. We simply + * wrap any Aggref nodes found in the target in PartialAggref and lookup the + * transition state type of the aggregate. This allows exprType() to return + * the transition type rather than the agg type. + */ +void +apply_partialaggref_nodes(PathTarget *target) +{ + ListCell *lc; + + foreach(lc, target->exprs) + { + Aggref *aggref = (Aggref *) lfirst(lc); + + if (IsA(aggref, Aggref)) + { + PartialAggref *partialaggref = makeNode(PartialAggref); + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + partialaggref->aggtranstype = aggform->aggtranstype; + ReleaseSysCache(aggTuple); + + partialaggref->aggref = aggref; + lfirst(lc) = partialaggref; + } + } +} \ No newline at end of file diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 490a090..c87448b 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -6740,6 +6740,7 @@ isSimpleNode(Node *node, Node *parentNode, int prettyFlags) case T_XmlExpr: case T_NullIfExpr: case T_Aggref: + case T_PartialAggref: case T_WindowFunc: case T_FuncExpr: /* function-like: name(..) or name[..] */ @@ -7070,6 +7071,11 @@ get_rule_expr(Node *node, deparse_context *context, get_agg_expr((Aggref *) node, context); break; + case T_PartialAggref: + /* just print the Aggref within */ + get_agg_expr((Aggref *) ((PartialAggref *) node)->aggref, context); + break; + case T_GroupingFunc: { GroupingFunc *gexpr = (GroupingFunc *) node; diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 42c9582..b762ccc 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -138,6 +138,7 @@ typedef enum NodeTag T_Const, T_Param, T_Aggref, + T_PartialAggref, T_GroupingFunc, T_WindowFunc, T_ArrayRef, diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index f942378..3ba5e0c 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -277,6 +277,31 @@ typedef struct Aggref } Aggref; /* + * PartialAggref + * + * When partial aggregation is required in a plan, the nodes from the partial + * aggregate node, up until the finalize aggregate node must pass the partially + * aggregated states up the plan tree. In regards to target list construction + * in setrefs.c, this requires that exprType() returns the state's type rather + * than the final aggregate value's type, and since exprType() for Aggref is + * coded to return the aggtype, this is not correct for us. We can't fix this + * by going around modifying the Aggref to change it's return type as setrefs.c + * requires searching for that Aggref using equals() which compares all fields + * in Aggref, and changing the aggtype would cause such a comparison to fail. + * To get around this problem we wrap the Aggref up in a PartialAggref, this + * allows exprType() to return the correct type and we can handle a + * PartialAggref in setrefs.c by just peeking inside the PartialAggref to check + * the underlying Aggref. The PartialAggref lives as long as executor start-up, + * where it's removed and replaced with it's underlying Aggref. + */ +typedef struct PartialAggref +{ + Expr xpr; + Oid aggtranstype; /* transition state type for aggregate */ + Aggref *aggref; /* the Aggref which belongs to this PartialAggref */ +} PartialAggref; + +/* * GroupingFunc * * A GroupingFunc is a GROUPING(...) expression, which behaves in many ways diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 5032696..ee7007a 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1309,6 +1309,8 @@ typedef struct AggPath double numGroups; /* estimated number of groups in input */ List *groupClause; /* a list of SortGroupClause's */ List *qual; /* quals (HAVING quals), if any */ + bool combineStates; /* input is partially aggregated agg states */ + bool finalizeAggs; /* should the executor call the finalfn? */ } AggPath; /* diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 3b3fd0f..c467f84 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -27,6 +27,25 @@ typedef struct List **windowFuncs; /* lists of WindowFuncs for each winref */ } WindowFuncLists; +/* + * PartialAggType + * PartialAggType stores whether partial aggregation is allowed and + * which context it is allowed in. We require three states here as there are + * two different contexts in which partial aggregation is safe. For aggregates + * which have an 'stype' of INTERNAL, within a single backend process it is + * okay to pass a pointer to the aggregate state, as the memory to which the + * pointer points to will belong to the same process. In cases where the + * aggregate state must be passed between different processes, for example + * during parallel aggregation, passing the pointer is not okay due to the + * fact that the memory being referenced won't be accessible from another + * process. + */ +typedef enum +{ + PAT_ANY = 0, /* Any type of partial aggregation is okay. */ + PAT_INTERNAL_ONLY, /* Some aggregates support only internal mode. */ + PAT_DISABLED /* Some aggregates don't support partial mode at all */ +} PartialAggType; extern Expr *make_opclause(Oid opno, Oid opresulttype, bool opretset, Expr *leftop, Expr *rightop, @@ -47,6 +66,7 @@ extern Node *make_and_qual(Node *qual1, Node *qual2); extern Expr *make_ands_explicit(List *andclauses); extern List *make_ands_implicit(Expr *clause); +extern PartialAggType aggregates_allow_partial(Node *clause); extern bool contain_agg_clause(Node *clause); extern void count_agg_clauses(PlannerInfo *root, Node *clause, AggClauseCosts *costs); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index fea2bb7..d4adca6 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -150,7 +150,7 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, SpecialJoinInfo *sjinfo, SemiAntiJoinFactors *semifactors); extern void cost_gather(GatherPath *path, PlannerInfo *root, - RelOptInfo *baserel, ParamPathInfo *param_info); + RelOptInfo *baserel, ParamPathInfo *param_info, double *rows); extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan); extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root); extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index d1eb22f..7c21bff 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -168,7 +168,20 @@ extern AggPath *create_agg_path(PlannerInfo *root, List *groupClause, List *qual, const AggClauseCosts *aggcosts, - double numGroups); + double numGroups, + bool combineStates, + bool finalizeAggs); +extern AggPath *create_parallelagg_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + PathTarget *partialtarget, + PathTarget *finaltarget, + AggStrategy partialstrategy, + AggStrategy finalstrategy, + List *groupClause, + List *qual, + const AggClauseCosts *aggcosts, + double numGroups); extern GroupingSetsPath *create_groupingsets_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, diff --git a/src/include/optimizer/tlist.h b/src/include/optimizer/tlist.h index 0d745a0..ef8cb30 100644 --- a/src/include/optimizer/tlist.h +++ b/src/include/optimizer/tlist.h @@ -61,6 +61,7 @@ extern void add_column_to_pathtarget(PathTarget *target, extern void add_new_column_to_pathtarget(PathTarget *target, Expr *expr); extern void add_new_columns_to_pathtarget(PathTarget *target, List *exprs); extern void apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target); +extern void apply_partialaggref_nodes(PathTarget *target); /* Convenience macro to get a PathTarget with valid cost/width fields */ #define create_pathtarget(root, tlist) \ -- 1.9.5.msysgit.1