From 7d32dfb865a419b9fee548826aff0d7e9de303b2 Mon Sep 17 00:00:00 2001 From: David Rowley Date: Wed, 16 Mar 2016 23:02:32 +1300 Subject: [PATCH 1/5] Allow aggregation to happen in parallel This modifies the grouping planner to allow it to generate Paths for parallel aggregation, when possible. --- src/backend/executor/execQual.c | 19 ++- src/backend/nodes/copyfuncs.c | 2 + src/backend/nodes/equalfuncs.c | 2 + src/backend/nodes/nodeFuncs.c | 8 +- src/backend/nodes/outfuncs.c | 2 + src/backend/nodes/readfuncs.c | 2 + src/backend/optimizer/path/costsize.c | 10 +- src/backend/optimizer/plan/createplan.c | 4 +- src/backend/optimizer/plan/planner.c | 275 +++++++++++++++++++++++++++++++- src/backend/optimizer/plan/setrefs.c | 245 +++++++++++++++++++++++++++- src/backend/optimizer/prep/prepunion.c | 4 +- src/backend/optimizer/util/clauses.c | 81 ++++++++++ src/backend/optimizer/util/pathnode.c | 125 ++++++++++++++- src/backend/optimizer/util/tlist.c | 46 ++++++ src/include/nodes/primnodes.h | 19 +++ src/include/nodes/relation.h | 2 + src/include/optimizer/clauses.h | 20 +++ src/include/optimizer/cost.h | 2 +- src/include/optimizer/pathnode.h | 15 +- src/include/optimizer/tlist.h | 1 + 20 files changed, 859 insertions(+), 25 deletions(-) diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c index 778b6c1..4029721 100644 --- a/src/backend/executor/execQual.c +++ b/src/backend/executor/execQual.c @@ -4510,20 +4510,25 @@ ExecInitExpr(Expr *node, PlanState *parent) case T_Aggref: { AggrefExprState *astate = makeNode(AggrefExprState); + AggState *aggstate = (AggState *) parent; + Aggref *aggref = (Aggref *) node; astate->xprstate.evalfunc = (ExprStateEvalFunc) ExecEvalAggref; - if (parent && IsA(parent, AggState)) + if (!aggstate || !IsA(aggstate, AggState)) { - AggState *aggstate = (AggState *) parent; - - aggstate->aggs = lcons(astate, aggstate->aggs); - aggstate->numaggs++; + /* planner messed up */ + elog(ERROR, "Aggref found in non-Agg plan node"); } - else + if (aggref->aggpartial == aggstate->finalizeAggs) { /* planner messed up */ - elog(ERROR, "Aggref found in non-Agg plan node"); + if (aggref->aggpartial) + elog(ERROR, "Partial type Aggref found in FinalizeAgg plan node"); + else + elog(ERROR, "Non-Partial type Aggref found in Non-FinalizeAgg plan node"); } + aggstate->aggs = lcons(astate, aggstate->aggs); + aggstate->numaggs++; state = (ExprState *) astate; } break; diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index df7c2fa..d502aef 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1231,6 +1231,7 @@ _copyAggref(const Aggref *from) COPY_SCALAR_FIELD(aggfnoid); COPY_SCALAR_FIELD(aggtype); + COPY_SCALAR_FIELD(aggpartialtype); COPY_SCALAR_FIELD(aggcollid); COPY_SCALAR_FIELD(inputcollid); COPY_NODE_FIELD(aggdirectargs); @@ -1240,6 +1241,7 @@ _copyAggref(const Aggref *from) COPY_NODE_FIELD(aggfilter); COPY_SCALAR_FIELD(aggstar); COPY_SCALAR_FIELD(aggvariadic); + COPY_SCALAR_FIELD(aggpartial); COPY_SCALAR_FIELD(aggkind); COPY_SCALAR_FIELD(agglevelsup); COPY_LOCATION_FIELD(location); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index b9c3959..bf29227 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -192,6 +192,7 @@ _equalAggref(const Aggref *a, const Aggref *b) { COMPARE_SCALAR_FIELD(aggfnoid); COMPARE_SCALAR_FIELD(aggtype); + COMPARE_SCALAR_FIELD(aggpartialtype); COMPARE_SCALAR_FIELD(aggcollid); COMPARE_SCALAR_FIELD(inputcollid); COMPARE_NODE_FIELD(aggdirectargs); @@ -201,6 +202,7 @@ _equalAggref(const Aggref *a, const Aggref *b) COMPARE_NODE_FIELD(aggfilter); COMPARE_SCALAR_FIELD(aggstar); COMPARE_SCALAR_FIELD(aggvariadic); + COMPARE_SCALAR_FIELD(aggpartial); COMPARE_SCALAR_FIELD(aggkind); COMPARE_SCALAR_FIELD(agglevelsup); COMPARE_LOCATION_FIELD(location); diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index b4ea440..23a8ec8 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -57,7 +57,13 @@ exprType(const Node *expr) type = ((const Param *) expr)->paramtype; break; case T_Aggref: - type = ((const Aggref *) expr)->aggtype; + { + const Aggref *aggref = (const Aggref *) expr; + if (aggref->aggpartial) + type = aggref->aggpartialtype; + else + type = aggref->aggtype; + } break; case T_GroupingFunc: type = INT4OID; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 548a3b9..6e2a6e4 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1031,6 +1031,7 @@ _outAggref(StringInfo str, const Aggref *node) WRITE_OID_FIELD(aggfnoid); WRITE_OID_FIELD(aggtype); + WRITE_OID_FIELD(aggpartialtype); WRITE_OID_FIELD(aggcollid); WRITE_OID_FIELD(inputcollid); WRITE_NODE_FIELD(aggdirectargs); @@ -1040,6 +1041,7 @@ _outAggref(StringInfo str, const Aggref *node) WRITE_NODE_FIELD(aggfilter); WRITE_BOOL_FIELD(aggstar); WRITE_BOOL_FIELD(aggvariadic); + WRITE_BOOL_FIELD(aggpartial); WRITE_CHAR_FIELD(aggkind); WRITE_UINT_FIELD(agglevelsup); WRITE_LOCATION_FIELD(location); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index a2c2243..61be6c5 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -552,6 +552,7 @@ _readAggref(void) READ_OID_FIELD(aggfnoid); READ_OID_FIELD(aggtype); + READ_OID_FIELD(aggpartialtype); READ_OID_FIELD(aggcollid); READ_OID_FIELD(inputcollid); READ_NODE_FIELD(aggdirectargs); @@ -561,6 +562,7 @@ _readAggref(void) READ_NODE_FIELD(aggfilter); READ_BOOL_FIELD(aggstar); READ_BOOL_FIELD(aggvariadic); + READ_BOOL_FIELD(aggpartial); READ_CHAR_FIELD(aggkind); READ_UINT_FIELD(agglevelsup); READ_LOCATION_FIELD(location); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 943fcde..58bfad8 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -350,16 +350,22 @@ cost_samplescan(Path *path, PlannerInfo *root, * * 'rel' is the relation to be operated upon * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + * 'rows' may be used to point to a row estimate, this may be used when a rel + * is unavailable to retrieve row estimates from. This setting, if non-NULL + * overrides both 'rel' and 'param_info'. */ void cost_gather(GatherPath *path, PlannerInfo *root, - RelOptInfo *rel, ParamPathInfo *param_info) + RelOptInfo *rel, ParamPathInfo *param_info, + double *rows) { Cost startup_cost = 0; Cost run_cost = 0; /* Mark the path with the correct row estimate */ - if (param_info) + if (rows) + path->path.rows = *rows; + else if (param_info) path->path.rows = param_info->ppi_rows; else path->path.rows = rel->rows; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index e37bdfd..6953a60 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1572,8 +1572,8 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path) plan = make_agg(tlist, quals, best_path->aggstrategy, - false, - true, + best_path->combineStates, + best_path->finalizeAggs, list_length(best_path->groupClause), extract_grouping_cols(best_path->groupClause, subplan->targetlist), diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index fc0a2d8..3a80a76 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -134,6 +134,8 @@ static RelOptInfo *create_ordered_paths(PlannerInfo *root, double limit_tuples); static PathTarget *make_group_input_target(PlannerInfo *root, PathTarget *final_target); +static PathTarget *make_partialgroup_input_target(PlannerInfo *root, + PathTarget *final_target); static List *postprocess_setop_tlist(List *new_tlist, List *orig_tlist); static List *select_active_windows(PlannerInfo *root, WindowFuncLists *wflists); static PathTarget *make_window_input_target(PlannerInfo *root, @@ -1767,6 +1769,19 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, (*create_upper_paths_hook) (root, current_rel); /* + * Likewise for any partial paths, although this case is more simple as + * we don't track the cheapest path. + */ + foreach(lc, current_rel->partial_pathlist) + { + Path *subpath = (Path *) lfirst(lc); + + Assert(subpath->param_info == NULL); + lfirst(lc) = apply_projection_to_path(root, current_rel, + subpath, scanjoin_target); + } + + /* * If we have grouping and/or aggregation, consider ways to implement * that. We build a new upperrel representing the output of this * phase. @@ -3162,10 +3177,15 @@ create_grouping_paths(PlannerInfo *root, { Query *parse = root->parse; Path *cheapest_path = input_rel->cheapest_total_path; + PathTarget *partial_group_target = NULL; /* for parallel aggregate */ RelOptInfo *grouped_rel; AggClauseCosts agg_costs; double dNumGroups; bool allow_hash; + bool can_hash; + bool can_sort; + bool can_parallel; + ListCell *lc; /* For now, do all work in the (GROUP_AGG, NULL) upperrel */ @@ -3259,12 +3279,44 @@ create_grouping_paths(PlannerInfo *root, rollup_groupclauses); /* + * Determine if it's possible to perform aggregation in parallel using + * multiple worker processes. We can permit this when there's at least one + * partial_path in input_rel, but not if the query has grouping sets, + * (although this likely just requires a bit more thought). We must also + * ensure that any aggregate functions which are present in either the + * target list, or in the HAVING clause all support parallel mode. + */ + can_parallel = false; + + if ((parse->hasAggs || parse->groupClause != NIL) && + input_rel->partial_pathlist != NIL && + parse->groupingSets == NIL && + root->glob->parallelModeOK) + { + /* + * Check that all aggregate functions support partial mode, + * however if there are no aggregate functions then we can skip + * this check. + */ + if (!parse->hasAggs || + (aggregates_allow_partial((Node *) target->exprs) == PAT_ANY && + aggregates_allow_partial(root->parse->havingQual) == PAT_ANY)) + { + can_parallel = true; + partial_group_target = make_partialgroup_input_target(root, + target); + } + } + + /* * Consider sort-based implementations of grouping, if possible. (Note * that if groupClause is empty, grouping_is_sortable() is trivially true, * and all the pathkeys_contained_in() tests will succeed too, so that * we'll consider every surviving input path.) */ - if (grouping_is_sortable(parse->groupClause)) + can_sort = grouping_is_sortable(parse->groupClause); + + if (can_sort) { /* * Use any available suitably-sorted path as input, and also consider @@ -3320,7 +3372,9 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, &agg_costs, - dNumGroups)); + dNumGroups, + false, + true)); } else if (parse->groupClause) { @@ -3344,6 +3398,42 @@ create_grouping_paths(PlannerInfo *root, } } } + + if (can_parallel) + { + AggStrategy aggstrategy; + + if (parse->groupClause != NIL) + aggstrategy = AGG_SORTED; + else + aggstrategy = AGG_PLAIN; + + foreach(lc, input_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + bool is_sorted; + + is_sorted = pathkeys_contained_in(root->group_pathkeys, + path->pathkeys); + if (!is_sorted) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + path, + partial_group_target, + target, + aggstrategy, + aggstrategy, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + } + } } /* @@ -3392,7 +3482,9 @@ create_grouping_paths(PlannerInfo *root, } } - if (allow_hash && grouping_is_hashable(parse->groupClause)) + can_hash = allow_hash && grouping_is_hashable(parse->groupClause); + + if (can_hash) { /* * We just need an Agg over the cheapest-total input path, since input @@ -3406,7 +3498,90 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, &agg_costs, - dNumGroups)); + dNumGroups, + false, + true)); + + if (can_parallel) + { + Path *cheapest_partial_path; + + cheapest_partial_path = (Path *) linitial(input_rel->partial_pathlist); + + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + cheapest_partial_path, + partial_group_target, + target, + AGG_HASHED, + AGG_HASHED, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + } + } + + /* + * For parallel aggregation, since this happens in 2 phases, we'll also try + * a mixing the aggregate strategies to see if that'll bring the cost down + * any. + */ + if (can_parallel && can_hash && can_sort) + { + Path *cheapest_partial_path; + + cheapest_partial_path = (Path *) linitial(input_rel->partial_pathlist); + + Assert(parse->groupClause != NIL); + + /* + * Try hashing in the partial phase, and sorting in the final. We need + * only bother trying this on the cheapest partial path since hashing + * does not care about the order of the input path. + */ + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + cheapest_partial_path, + partial_group_target, + target, + AGG_HASHED, + AGG_SORTED, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + + /* + * Try sorting in the partial phase, and hashing in the final. We do + * this for all partial paths as some may have useful ordering + */ + foreach(lc, input_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + bool is_sorted; + + is_sorted = pathkeys_contained_in(root->group_pathkeys, + path->pathkeys); + if (!is_sorted) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + + add_path(grouped_rel, (Path *) + create_parallelagg_path(root, grouped_rel, + path, + partial_group_target, + target, + AGG_SORTED, + AGG_HASHED, + parse->groupClause, + (List *) parse->havingQual, + &agg_costs, + dNumGroups)); + } } /* Give a helpful error if we failed to find any implementation */ @@ -3735,7 +3910,9 @@ create_distinct_paths(PlannerInfo *root, parse->distinctClause, NIL, NULL, - numDistinctRows)); + numDistinctRows, + false, + true)); } /* Give a helpful error if we failed to find any implementation */ @@ -3915,6 +4092,94 @@ make_group_input_target(PlannerInfo *root, PathTarget *final_target) } /* + * make_partialgroup_input_target + * Generate appropriate PathTarget for input to partial grouping nodes. + * + * This is very similar to make_group_input_target(), only we do not recurse + * into Aggrefs. Aggrefs are left intact and added to the target list. Here we + * also add any Aggrefs which are located in the HAVING clause into the + * PathTarget. + * + * Aggrefs are also setup into partial mode and the partial return types are + * set to become the type of the aggregate transition state rather than the + * aggregate function's return type. + */ +static PathTarget * +make_partialgroup_input_target(PlannerInfo *root, PathTarget *final_target) +{ + Query *parse = root->parse; + PathTarget *input_target; + List *non_group_cols; + List *non_group_exprs; + int i; + ListCell *lc; + + input_target = create_empty_pathtarget(); + non_group_cols = NIL; + + i = -1; + foreach(lc, final_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + + i++; + + if (parse->groupClause) + { + Index sgref = final_target->sortgrouprefs[i]; + + if (sgref && get_sortgroupref_clause_noerr(sgref, parse->groupClause) + != NULL) + { + /* + * It's a grouping column, so add it to the input target as-is. + */ + add_column_to_pathtarget(input_target, expr, sgref); + continue; + } + } + + /* + * Non-grouping column, so just remember the expression for later + * call to pull_var_clause. + */ + non_group_cols = lappend(non_group_cols, expr); + } + + /* + * If there's a HAVING clause, we'll need the Aggrefs it uses, too. + */ + if (parse->havingQual) + non_group_cols = lappend(non_group_cols, parse->havingQual); + + /* + * Pull out all the Vars mentioned in non-group cols (plus HAVING), and + * add them to the input target if not already present. (A Var used + * directly as a GROUP BY item will be present already.) Note this + * includes Vars used in resjunk items, so we are covering the needs of + * ORDER BY and window specifications. Vars used within Aggrefs will be + * ignored and the Aggrefs themselves will be added to the PathTarget. + */ + non_group_exprs = pull_var_clause((Node *) non_group_cols, + PVC_INCLUDE_AGGREGATES | + PVC_RECURSE_WINDOWFUNCS | + PVC_INCLUDE_PLACEHOLDERS); + + add_new_columns_to_pathtarget(input_target, non_group_exprs); + + /* clean up cruft */ + list_free(non_group_exprs); + list_free(non_group_cols); + + /* Adjust Aggrefs to put them in partial mode. */ + apply_partialaggref_adjustment(input_target); + + /* XXX this causes some redundant cost calculation ... */ + input_target = set_pathtarget_cost_width(root, input_target); + return input_target; +} + +/* * postprocess_setop_tlist * Fix up targetlist returned by plan_set_operations(). * diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index aa2c308..7a5cb91 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -104,6 +104,8 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context); static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context); static void set_join_references(PlannerInfo *root, Join *join, int rtoffset); static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_combineagg_references(PlannerInfo *root, Plan *plan, + int rtoffset); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); static Var *search_indexed_tlist_for_var(Var *var, @@ -117,6 +119,8 @@ static Var *search_indexed_tlist_for_sortgroupref(Node *node, Index sortgroupref, indexed_tlist *itlist, Index newvarno); +static Var *search_indexed_tlist_for_partial_aggref(Aggref *aggref, + indexed_tlist *itlist, Index newvarno); static List *fix_join_expr(PlannerInfo *root, List *clauses, indexed_tlist *outer_itlist, @@ -131,6 +135,13 @@ static Node *fix_upper_expr(PlannerInfo *root, int rtoffset); static Node *fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context); +static Node *fix_combine_agg_expr(PlannerInfo *root, + Node *node, + indexed_tlist *subplan_itlist, + Index newvarno, + int rtoffset); +static Node *fix_combine_agg_expr_mutator(Node *node, + fix_upper_expr_context *context); static List *set_returning_clause_references(PlannerInfo *root, List *rlist, Plan *topplan, @@ -667,8 +678,16 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) } break; case T_Agg: - set_upper_references(root, plan, rtoffset); - break; + { + Agg *aggplan = (Agg *) plan; + + if (aggplan->combineStates) + set_combineagg_references(root, plan, rtoffset); + else + set_upper_references(root, plan, rtoffset); + + break; + } case T_Group: set_upper_references(root, plan, rtoffset); break; @@ -1702,6 +1721,72 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) } /* + * set_combineagg_references + * This does a similar job as set_upper_references(), but additionally it + * transforms Aggref nodes args to suit the combine aggregate phase, this + * means that the Aggref->args are converted to reference the corresponding + * aggregate function in the subplan rather than simple Var(s), as would be + * the case for a non-combine aggregate node. + */ +static void +set_combineagg_references(PlannerInfo *root, Plan *plan, int rtoffset) +{ + Plan *subplan = plan->lefttree; + indexed_tlist *subplan_itlist; + List *output_targetlist; + ListCell *l; + + Assert(IsA(plan, Agg)); + Assert(((Agg *) plan)->combineStates); + + subplan_itlist = build_tlist_index(subplan->targetlist); + + output_targetlist = NIL; + + foreach(l, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + Node *newexpr; + + /* If it's a non-Var sort/group item, first try to match by sortref */ + if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var)) + { + newexpr = (Node *) + search_indexed_tlist_for_sortgroupref((Node *) tle->expr, + tle->ressortgroupref, + subplan_itlist, + OUTER_VAR); + if (!newexpr) + newexpr = fix_combine_agg_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset); + } + else + newexpr = fix_combine_agg_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset); + tle = flatCopyTargetEntry(tle); + tle->expr = (Expr *) newexpr; + output_targetlist = lappend(output_targetlist, tle); + } + + plan->targetlist = output_targetlist; + + plan->qual = (List *) + fix_combine_agg_expr(root, + (Node *) plan->qual, + subplan_itlist, + OUTER_VAR, + rtoffset); + + pfree(subplan_itlist); +} + +/* * set_dummy_tlist_references * Replace the targetlist of an upper-level plan node with a simple * list of OUTER_VAR references to its child. @@ -1968,6 +2053,71 @@ search_indexed_tlist_for_sortgroupref(Node *node, } /* + * Find the Var for the matching 'aggref' in 'itlist' + * + * Aggrefs for partial aggregates have their aggpartial setting adjusted to put + * them in partial mode. This means that a standard equal() comparison won't + * match when comparing an Aggref which is in partial mode with an Aggref which + * is not. Here we manually compare all of the fields apart from + * aggpartialtype, which is set only when putting the Aggref into partial mode, + * and aggpartial, which is the flag which determines if the Aggref is in + * partial mode or not. + */ +static Var * +search_indexed_tlist_for_partial_aggref(Aggref *aggref, indexed_tlist *itlist, + Index newvarno) +{ + ListCell *lc; + + foreach(lc, itlist->tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (IsA(tle->expr, Aggref)) + { + Aggref *tlistaggref = (Aggref *) tle->expr; + Var *newvar; + + if (aggref->aggfnoid != tlistaggref->aggfnoid) + continue; + if (aggref->aggtype != tlistaggref->aggtype) + continue; + /* ignore aggpartialtype */ + if (aggref->aggcollid != tlistaggref->aggcollid) + continue; + if (aggref->inputcollid != tlistaggref->inputcollid) + continue; + if (!equal(aggref->aggdirectargs, tlistaggref->aggdirectargs)) + continue; + if (!equal(aggref->args, tlistaggref->args)) + continue; + if (!equal(aggref->aggorder, tlistaggref->aggorder)) + continue; + if (!equal(aggref->aggdistinct, tlistaggref->aggdistinct)) + continue; + if (!equal(aggref->aggfilter, tlistaggref->aggfilter)) + continue; + if (aggref->aggstar != tlistaggref->aggstar) + continue; + if (aggref->aggvariadic != tlistaggref->aggvariadic) + continue; + /* ignore aggpartial */ + if (aggref->aggkind != tlistaggref->aggkind) + continue; + if (aggref->agglevelsup != tlistaggref->agglevelsup) + continue; + + newvar = makeVarFromTargetEntry(newvarno, tle); + newvar->varnoold = 0; /* wasn't ever a plain Var */ + newvar->varoattno = 0; + + return newvar; + } + } + return NULL; +} + +/* * fix_join_expr * Create a new set of targetlist entries or join qual clauses by * changing the varno/varattno values of variables in the clauses @@ -2238,6 +2388,97 @@ fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context) } /* + * fix_combine_agg_expr + * Like fix_upper_expr() but additionally adjusts the Aggref->args of + * Aggrefs so that they references the corresponding Aggref in the subplan. + */ +static Node * +fix_combine_agg_expr(PlannerInfo *root, + Node *node, + indexed_tlist *subplan_itlist, + Index newvarno, + int rtoffset) +{ + fix_upper_expr_context context; + + context.root = root; + context.subplan_itlist = subplan_itlist; + context.newvarno = newvarno; + context.rtoffset = rtoffset; + return fix_combine_agg_expr_mutator(node, &context); +} + +static Node * +fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context) +{ + Var *newvar; + + if (node == NULL) + return NULL; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + newvar = search_indexed_tlist_for_var(var, + context->subplan_itlist, + context->newvarno, + context->rtoffset); + if (!newvar) + elog(ERROR, "variable not found in subplan target list"); + return (Node *) newvar; + } + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + + newvar = search_indexed_tlist_for_partial_aggref(aggref, + context->subplan_itlist, + context->newvarno); + if (newvar) + { + Aggref *newaggref; + TargetEntry *newtle; + + /* + * Now build a new TargetEntry for the Aggref's arguments which is + * a single Var which references the corresponding AggRef in the + * node below. + */ + newtle = makeTargetEntry((Expr *) newvar, 1, NULL, false); + newaggref = (Aggref *) copyObject(aggref); + newaggref->args = list_make1(newtle); + + return (Node *) newaggref; + } + else + elog(ERROR, "Aggref not found in subplan target list"); + } + if (IsA(node, PlaceHolderVar)) + { + PlaceHolderVar *phv = (PlaceHolderVar *) node; + + /* See if the PlaceHolderVar has bubbled up from a lower plan node */ + if (context->subplan_itlist->has_ph_vars) + { + newvar = search_indexed_tlist_for_non_var((Node *) phv, + context->subplan_itlist, + context->newvarno); + if (newvar) + return (Node *) newvar; + } + /* If not supplied by input plan, evaluate the contained expr */ + return fix_upper_expr_mutator((Node *) phv->phexpr, context); + } + if (IsA(node, Param)) + return fix_param_node(context->root, (Param *) node); + + fix_expr_common(context->root, node); + return expression_tree_mutator(node, + fix_combine_agg_expr_mutator, + (void *) context); +} + +/* * set_returning_clause_references * Perform setrefs.c's work on a RETURNING targetlist * diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 6ea3319..fb139af 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -859,7 +859,9 @@ make_union_unique(SetOperationStmt *op, Path *path, List *tlist, groupList, NIL, NULL, - dNumGroups); + dNumGroups, + false, + true); } else { diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index b692e18..f315961 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -52,6 +52,10 @@ #include "utils/syscache.h" #include "utils/typcache.h" +typedef struct +{ + PartialAggType allowedtype; +} partial_agg_context; typedef struct { @@ -93,6 +97,8 @@ typedef struct bool allow_restricted; } has_parallel_hazard_arg; +static bool aggregates_allow_partial_walker(Node *node, + partial_agg_context *context); static bool contain_agg_clause_walker(Node *node, void *context); static bool count_agg_clauses_walker(Node *node, count_agg_clauses_context *context); @@ -400,6 +406,81 @@ make_ands_implicit(Expr *clause) *****************************************************************************/ /* + * aggregates_allow_partial + * Recursively search for Aggref clauses and determine the maximum + * 'degree' of partial aggregation which can be supported. Partial + * aggregation requires that each aggregate does not have a DISTINCT or + * ORDER BY clause, and that it also has a combine function set. + */ +PartialAggType +aggregates_allow_partial(Node *clause) +{ + partial_agg_context context; + + /* initially any type is okay, until we find Aggrefs which say otherwise */ + context.allowedtype = PAT_ANY; + + if (!aggregates_allow_partial_walker(clause, &context)) + return context.allowedtype; + return context.allowedtype; +} + +static bool +aggregates_allow_partial_walker(Node *node, partial_agg_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + Assert(aggref->agglevelsup == 0); + + /* + * We can't perform partial aggregation with Aggrefs containing a + * DISTINCT or ORDER BY clause. + */ + if (aggref->aggdistinct || aggref->aggorder) + { + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + /* + * If there is no combine function, then partial aggregation is not + * possible. + */ + if (!OidIsValid(aggform->aggcombinefn)) + { + ReleaseSysCache(aggTuple); + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + + /* + * If we find any aggs with an internal transtype then we must ensure + * that pointers to aggregate states are not passed to other processes, + * therefore we set the maximum degree to PAT_INTERNAL_ONLY. + */ + if (aggform->aggtranstype == INTERNALOID) + context->allowedtype = PAT_INTERNAL_ONLY; + + ReleaseSysCache(aggTuple); + return false; /* continue searching */ + } + return expression_tree_walker(node, aggregates_allow_partial_walker, + (void *) context); +} + +/* * contain_agg_clause * Recursively search for Aggref/GroupingFunc nodes within a clause. * diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index b8ea316..bc86c04 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1674,7 +1674,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->single_copy = true; } - cost_gather(pathnode, root, rel, pathnode->path.param_info); + cost_gather(pathnode, root, rel, pathnode->path.param_info, NULL); return pathnode; } @@ -2387,6 +2387,8 @@ create_upper_unique_path(PlannerInfo *root, * 'qual' is the HAVING quals if any * 'aggcosts' contains cost info about the aggregate functions to be computed * 'numGroups' is the estimated number of groups (1 if not grouping) + * 'combineStates' is set to true if the Agg node should combine agg states + * 'finalizeAggs' is set to false if the Agg node should not call the finalfn */ AggPath * create_agg_path(PlannerInfo *root, @@ -2397,9 +2399,11 @@ create_agg_path(PlannerInfo *root, List *groupClause, List *qual, const AggClauseCosts *aggcosts, - double numGroups) + double numGroups, + bool combineStates, + bool finalizeAggs) { - AggPath *pathnode = makeNode(AggPath); + AggPath *pathnode = makeNode(AggPath); pathnode->path.pathtype = T_Agg; pathnode->path.parent = rel; @@ -2420,6 +2424,8 @@ create_agg_path(PlannerInfo *root, pathnode->numGroups = numGroups; pathnode->groupClause = groupClause; pathnode->qual = qual; + pathnode->finalizeAggs = finalizeAggs; + pathnode->combineStates = combineStates; cost_agg(&pathnode->path, root, aggstrategy, aggcosts, @@ -2431,6 +2437,119 @@ create_agg_path(PlannerInfo *root, pathnode->path.startup_cost += target->cost.startup; pathnode->path.total_cost += target->cost.startup + target->cost.per_tuple * pathnode->path.rows; + return pathnode; +} + +/* + * create_parallelagg_path + * Creates a chain of path nodes which represents the required executor + * nodes to perform aggregation in parallel. This series of paths consists + * of a partial aggregation phase which is intended to be executed on + * multiple worker processes. This aggregation phase does not execute the + * aggregate's final function, it instead returns the aggregate state. A + * Gather path is then added to bring these aggregated states back into the + * master process, where the final aggregate node combines these + * intermediate states with other states which belong to the same group, + * it's in this phase that the aggregate's final function is called, if + * present, and also where any HAVING clause is applied. + * + * 'rel' is the parent relation associated with the result + * 'subpath' is the path representing the source of data + * 'partialtarget' is the PathTarget for the partial agg phase + * 'finaltarget' is the final PathTarget to be computed + * 'partialstrategy' is the Agg node's implementation strategy for 1st stage + * 'finalstrategy' is the Agg node's implementation strategy for 2nd stage + * 'groupClause' is a list of SortGroupClause's representing the grouping + * 'qual' is the HAVING quals if any + * 'aggcosts' contains cost info about the aggregate functions to be computed + * 'numGroups' is the estimated number of groups (1 if not grouping) + */ +AggPath * +create_parallelagg_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + PathTarget *partialtarget, + PathTarget *finaltarget, + AggStrategy partialstrategy, + AggStrategy finalstrategy, + List *groupClause, + List *qual, + const AggClauseCosts *aggcosts, + double numGroups) +{ + GatherPath *gatherpath = makeNode(GatherPath); + AggPath *pathnode; + Path *currentpath; + double numPartialGroups; + + /* Add the partial aggregate node */ + pathnode = create_agg_path(root, + rel, + subpath, + partialtarget, + partialstrategy, + groupClause, + NIL, /* don't apply qual until final phase */ + aggcosts, + numGroups, + false, + false); + + gatherpath->path.pathtype = T_Gather; + gatherpath->path.parent = rel; + gatherpath->path.pathtarget = partialtarget; + gatherpath->path.param_info = NULL; + gatherpath->path.parallel_aware = false; + gatherpath->path.parallel_safe = false; + gatherpath->path.parallel_degree = subpath->parallel_degree; + gatherpath->path.pathkeys = NIL; /* output is unordered */ + gatherpath->subpath = (Path *) pathnode; + gatherpath->single_copy = false; + + /* + * Estimate the total number of groups which the Gather node will receive + * from the aggregate worker processes. We'll assume that each worker will + * produce every possible group, this might be an overestimate, although it + * seems safer to over estimate here rather than underestimate. To keep + * this number sane we cap the number of groups so it's never larger than + * the number of rows in the input path. This prevents the number of groups + * being estimated to be higher than the actual number of input rows. + */ + numPartialGroups = Min(numGroups, subpath->rows) * + subpath->parallel_degree; + + cost_gather(gatherpath, root, NULL, NULL, &numPartialGroups); + + currentpath = &gatherpath->path; + + /* + * Gather is always unsorted, so we need to sort again if we're using + * the AGG_SORTED strategy + */ + if (finalstrategy == AGG_SORTED) + { + SortPath *sortpath; + + sortpath = create_sort_path(root, + rel, + &gatherpath->path, + root->query_pathkeys, + -1.0); + currentpath = &sortpath->path; + } + + /* create the finalize aggregate node */ + pathnode = create_agg_path(root, + rel, + currentpath, + finaltarget, + finalstrategy, + groupClause, + qual, + aggcosts, + numGroups, + true, + true); return pathnode; } diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c index b297d87..7509747 100644 --- a/src/backend/optimizer/util/tlist.c +++ b/src/backend/optimizer/util/tlist.c @@ -14,9 +14,12 @@ */ #include "postgres.h" +#include "access/htup_details.h" +#include "catalog/pg_aggregate.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/tlist.h" +#include "utils/syscache.h" /***************************************************************************** @@ -748,3 +751,46 @@ apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target) i++; } } + +/* + * apply_partialaggref_adjustment + * Convert PathTarget to be suitable for a partial aggregate node. We simply + * adjust any Aggref nodes found in the target and set the aggpartial to + * TRUE. Here we also apply the aggpartialtype to the Aggref. This allows + * exprType() to return the partial type rather than the agg type. + * + * Note: We expect 'target' to be a flat target list and not have Aggrefs burried + * within other expressions. + */ +void +apply_partialaggref_adjustment(PathTarget *target) +{ + ListCell *lc; + + foreach(lc, target->exprs) + { + Aggref *aggref = (Aggref *) lfirst(lc); + + if (IsA(aggref, Aggref)) + { + HeapTuple aggTuple; + Form_pg_aggregate aggform; + Aggref *newaggref; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + newaggref = (Aggref *) copyObject(aggref); + newaggref->aggpartialtype = aggform->aggtranstype; + newaggref->aggpartial = true; + + lfirst(lc) = newaggref; + + ReleaseSysCache(aggTuple); + } + } +} \ No newline at end of file diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index f942378..947fca6 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -255,12 +255,30 @@ typedef struct Param * DISTINCT is not supported in this case, so aggdistinct will be NIL. * The direct arguments appear in aggdirectargs (as a list of plain * expressions, not TargetEntry nodes). + * + * An Aggref can operate in one of two modes. Normally an aggregate function's + * value is calculated with a single executor Agg node, however there are + * times, such as parallel aggregation when we want to calculate the aggregate + * value in multiple phases. This requires at least a Partial Aggregate phase, + * where normal aggregation takes place, but the aggregate's final function is + * not called, then later a Finalize Aggregate phase, where previously + * aggregated states are combined and the final function is called. No settings + * in Aggref determine this behaviour, the only thing that is required in + * Aggref to allow this behaviour is having the ability to determine the data + * type which this Aggref will produce. The 'aggpartial' field is used to + * determine to which of the two data types the Aggref will produce, either + * 'aggtype' or 'aggpartialtype', the latter of which is only set upon changing + * the Aggref into partial mode. + * + * Note: If you are adding fields here you may also need to add a comparison + * in search_indexed_tlist_for_partial_aggref() */ typedef struct Aggref { Expr xpr; Oid aggfnoid; /* pg_proc Oid of the aggregate */ Oid aggtype; /* type Oid of result of the aggregate */ + Oid aggpartialtype; /* return type if aggpartial is true */ Oid aggcollid; /* OID of collation of result */ Oid inputcollid; /* OID of collation that function should use */ List *aggdirectargs; /* direct arguments, if an ordered-set agg */ @@ -271,6 +289,7 @@ typedef struct Aggref bool aggstar; /* TRUE if argument list was really '*' */ bool aggvariadic; /* true if variadic arguments have been * combined into an array last argument */ + bool aggpartial; /* TRUE if Agg value should not be finalized */ char aggkind; /* aggregate kind (see pg_aggregate.h) */ Index agglevelsup; /* > 0 if agg belongs to outer query */ int location; /* token location, or -1 if unknown */ diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 5032696..ee7007a 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1309,6 +1309,8 @@ typedef struct AggPath double numGroups; /* estimated number of groups in input */ List *groupClause; /* a list of SortGroupClause's */ List *qual; /* quals (HAVING quals), if any */ + bool combineStates; /* input is partially aggregated agg states */ + bool finalizeAggs; /* should the executor call the finalfn? */ } AggPath; /* diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 3b3fd0f..c467f84 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -27,6 +27,25 @@ typedef struct List **windowFuncs; /* lists of WindowFuncs for each winref */ } WindowFuncLists; +/* + * PartialAggType + * PartialAggType stores whether partial aggregation is allowed and + * which context it is allowed in. We require three states here as there are + * two different contexts in which partial aggregation is safe. For aggregates + * which have an 'stype' of INTERNAL, within a single backend process it is + * okay to pass a pointer to the aggregate state, as the memory to which the + * pointer points to will belong to the same process. In cases where the + * aggregate state must be passed between different processes, for example + * during parallel aggregation, passing the pointer is not okay due to the + * fact that the memory being referenced won't be accessible from another + * process. + */ +typedef enum +{ + PAT_ANY = 0, /* Any type of partial aggregation is okay. */ + PAT_INTERNAL_ONLY, /* Some aggregates support only internal mode. */ + PAT_DISABLED /* Some aggregates don't support partial mode at all */ +} PartialAggType; extern Expr *make_opclause(Oid opno, Oid opresulttype, bool opretset, Expr *leftop, Expr *rightop, @@ -47,6 +66,7 @@ extern Node *make_and_qual(Node *qual1, Node *qual2); extern Expr *make_ands_explicit(List *andclauses); extern List *make_ands_implicit(Expr *clause); +extern PartialAggType aggregates_allow_partial(Node *clause); extern bool contain_agg_clause(Node *clause); extern void count_agg_clauses(PlannerInfo *root, Node *clause, AggClauseCosts *costs); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index fea2bb7..d4adca6 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -150,7 +150,7 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, SpecialJoinInfo *sjinfo, SemiAntiJoinFactors *semifactors); extern void cost_gather(GatherPath *path, PlannerInfo *root, - RelOptInfo *baserel, ParamPathInfo *param_info); + RelOptInfo *baserel, ParamPathInfo *param_info, double *rows); extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan); extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root); extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index d1eb22f..7c21bff 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -168,7 +168,20 @@ extern AggPath *create_agg_path(PlannerInfo *root, List *groupClause, List *qual, const AggClauseCosts *aggcosts, - double numGroups); + double numGroups, + bool combineStates, + bool finalizeAggs); +extern AggPath *create_parallelagg_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + PathTarget *partialtarget, + PathTarget *finaltarget, + AggStrategy partialstrategy, + AggStrategy finalstrategy, + List *groupClause, + List *qual, + const AggClauseCosts *aggcosts, + double numGroups); extern GroupingSetsPath *create_groupingsets_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, diff --git a/src/include/optimizer/tlist.h b/src/include/optimizer/tlist.h index 0d745a0..de58db1 100644 --- a/src/include/optimizer/tlist.h +++ b/src/include/optimizer/tlist.h @@ -61,6 +61,7 @@ extern void add_column_to_pathtarget(PathTarget *target, extern void add_new_column_to_pathtarget(PathTarget *target, Expr *expr); extern void add_new_columns_to_pathtarget(PathTarget *target, List *exprs); extern void apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target); +extern void apply_partialaggref_adjustment(PathTarget *target); /* Convenience macro to get a PathTarget with valid cost/width fields */ #define create_pathtarget(root, tlist) \ -- 1.9.5.msysgit.1