diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index b9c3959..02b6484 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -191,7 +191,37 @@ static bool _equalAggref(const Aggref *a, const Aggref *b) { COMPARE_SCALAR_FIELD(aggfnoid); - COMPARE_SCALAR_FIELD(aggtype); + + /* + * XXX Temporary fix, until we find a better one. + * To avoid the failure in setting the upper references in upper plans of + * partial aggregate, with its modified targetlist aggregate references, + * As the aggtype of aggref is changed while forming the targetlist + * of partial aggregate for worker process. + */ + if (a->aggtype != b->aggtype) + { + /* + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(a->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + a->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + if (a->aggtype != aggform->aggtranstype) + { + ReleaseSysCache(aggTuple); + return false; + } + + ReleaseSysCache(aggTuple); + */ + } + COMPARE_SCALAR_FIELD(aggcollid); COMPARE_SCALAR_FIELD(inputcollid); COMPARE_NODE_FIELD(aggdirectargs); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 5fc80e7..184e1e0 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -126,6 +126,7 @@ bool enable_material = true; bool enable_mergejoin = true; bool enable_hashjoin = true; +bool enable_parallelagg = false; typedef struct { PlannerInfo *root; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 6e0db08..6d486b7 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -5221,3 +5221,37 @@ is_projection_capable_plan(Plan *plan) } return true; } + +/* + * create_gather_plan_from_subplan + * + * Create a Gather plan from subplan + */ +Gather * +create_gather_plan_from_subplan(PlannerInfo *root, Plan *subplan, + double path_rows, int parallel_degree) +{ + Gather *gather_plan; + Cost run_cost = 0; + + gather_plan = make_gather(subplan->targetlist, + NIL, + parallel_degree, + false, + subplan); + + /* gather path cost calculation */ + run_cost = subplan->total_cost - subplan->startup_cost; + + /* Parallel setup and communication cost. */ + gather_plan->plan.startup_cost = subplan->startup_cost + parallel_setup_cost; + run_cost += parallel_tuple_cost * path_rows; + + gather_plan->plan.total_cost = (gather_plan->plan.startup_cost + run_cost); + + /* use parallel mode for parallel plans. */ + root->glob->parallelModeNeeded = true; + + return gather_plan; +} + diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index f77c804..f75d12c 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -52,6 +52,8 @@ #include "utils/selfuncs.h" #include "utils/syscache.h" +#include "utils/syscache.h" +#include "catalog/pg_aggregate.h" /* GUC parameters */ double cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION; @@ -81,6 +83,12 @@ typedef struct List *groupClause; /* overrides parse->groupClause */ } standard_qp_extra; +typedef struct +{ + AttrNumber resno; + List *targetlist; +} AddQualInTListExprContext; + /* Local functions */ static Node *preprocess_expression(PlannerInfo *root, Node *expr, int kind); static void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode); @@ -101,6 +109,20 @@ static bool choose_hashed_grouping(PlannerInfo *root, double path_rows, int path_width, Path *cheapest_path, Path *sorted_path, double dNumGroups, AggClauseCosts *agg_costs); +static bool choose_parallel_hashed_grouping(PlannerInfo *root, + double tuple_fraction, double limit_tuples, + double path_rows, int path_width, + Path *cheapest_path, Path *sorted_path, + Path *cheapest_partial_path, + Path *sorted_partial_path, + double dNumGroups, AggClauseCosts *agg_costs); +static bool choose_parallel_grouping(PlannerInfo *root, + double tuple_fraction, double limit_tuples, + double path_rows, int path_width, + Path *cheapest_path, Path *sorted_path, + Path *cheapest_partial_path, + Path *sorted_partial_path, + double dNumGroups, AggClauseCosts *agg_costs); static bool choose_hashed_distinct(PlannerInfo *root, double tuple_fraction, double limit_tuples, double path_rows, int path_width, @@ -139,8 +161,36 @@ static Plan *build_grouping_chain(PlannerInfo *root, AttrNumber *groupColIdx, AggClauseCosts *agg_costs, long numGroups, + bool combineStates, + bool finalizeAggs, + Plan *result_plan); +static Plan *make_group_agg(PlannerInfo *root, + Query *parse, + List *tlist, + bool need_sort_for_grouping, + List *rollup_groupclauses, + List *rollup_lists, + AttrNumber *groupColIdx, + AggClauseCosts *agg_costs, + long numGroups, + int parallel_degree, Plan *result_plan); +static AttrNumber*get_grpColIdx_from_subPlan(PlannerInfo *root, List *tlist); +static List *make_partial_agg_tlist(List *tlist,List *groupClause); +static List* add_qual_in_tlist(List *targetlist, List *qual); +static bool add_qual_in_tlist_walker (Node *node, + AddQualInTListExprContext *context); +static Plan *make_hash_agg(PlannerInfo *root, + Query *parse, + List *tlist, + AggClauseCosts *aggcosts, + int numGroupCols, + AttrNumber *grpColIdx, + long numGroups, + int parallel_degree, + Plan *lefttree); + /***************************************************************************** * * Query optimizer entry point @@ -1948,6 +1998,64 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) AttrNumber *groupColIdx = NULL; bool need_tlist_eval = true; bool need_sort_for_grouping = false; + int parallel_degree = 0; + + /* + * Prepare a gather path on the partial path, in case if it satisfies + * parallel aggregate plan. + */ + if (enable_parallelagg + && !tested_hashed_distinct + && final_rel->partial_pathlist + && (dNumGroups < (path_rows / 4))) + { + /* + * check for parallel aggregate eligibility by referring all aggregate + * functions in both qualification and targetlist. + */ + if ((PAT_ANY == aggregates_allow_partial((Node *)tlist)) + && (PAT_ANY == aggregates_allow_partial(parse->havingQual))) + { + bool is_parallel_plan_cheap = false; + Path *cheapest_partial_path = NULL; + Path *sorted_partial_path = NULL; + + cheapest_partial_path = linitial(final_rel->partial_pathlist); + + /* + * XXX Currently set the sorted partial path as NULL + * Currently, there is no sorted partial path is generated. + */ + sorted_partial_path = NULL; + + if (use_hashed_grouping) + { + is_parallel_plan_cheap = choose_parallel_hashed_grouping(root, + tuple_fraction, limit_tuples, + path_rows, path_width, + cheapest_path, sorted_path, + cheapest_partial_path, + sorted_partial_path, + dNumGroups, &agg_costs); + } + else + { + is_parallel_plan_cheap = choose_parallel_grouping(root, + tuple_fraction, limit_tuples, + path_rows, path_width, + cheapest_path, sorted_path, + cheapest_partial_path, + sorted_partial_path, + dNumGroups, &agg_costs); + } + + if (is_parallel_plan_cheap) + { + parallel_degree = cheapest_partial_path->parallel_degree; + best_path = cheapest_partial_path; + } + } + } result_plan = create_plan(root, best_path); current_pathkeys = best_path->pathkeys; @@ -2046,20 +2154,16 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) */ if (use_hashed_grouping) { - /* Hashed aggregate plan --- no sort needed */ - result_plan = (Plan *) make_agg(root, - tlist, - (List *) parse->havingQual, - AGG_HASHED, - &agg_costs, - numGroupCols, - groupColIdx, - extract_grouping_ops(parse->groupClause), - NIL, - numGroups, - false, - true, - result_plan); + result_plan = make_hash_agg(root, + parse, + tlist, + &agg_costs, + numGroupCols, + groupColIdx, + numGroups, + parallel_degree, + result_plan); + /* Hashed aggregation produces randomly-ordered results */ current_pathkeys = NIL; } @@ -2079,16 +2183,17 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) else current_pathkeys = NIL; - result_plan = build_grouping_chain(root, - parse, - tlist, - need_sort_for_grouping, - rollup_groupclauses, - rollup_lists, - groupColIdx, - &agg_costs, - numGroups, - result_plan); + result_plan = make_group_agg(root, + parse, + tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + groupColIdx, + &agg_costs, + numGroups, + parallel_degree, + result_plan); } else if (parse->groupClause) { @@ -2533,6 +2638,8 @@ build_grouping_chain(PlannerInfo *root, AttrNumber *groupColIdx, AggClauseCosts *agg_costs, long numGroups, + bool combineStates, + bool finalizeAggs, Plan *result_plan) { AttrNumber *top_grpColIdx = groupColIdx; @@ -2605,8 +2712,8 @@ build_grouping_chain(PlannerInfo *root, extract_grouping_ops(groupClause), gsets, numGroups, - false, - true, + combineStates, + finalizeAggs, sort_plan); /* @@ -2646,8 +2753,8 @@ build_grouping_chain(PlannerInfo *root, extract_grouping_ops(groupClause), gsets, numGroups, - false, - true, + combineStates, + finalizeAggs, result_plan); ((Agg *) result_plan)->chain = chain; @@ -3987,6 +4094,252 @@ choose_hashed_grouping(PlannerInfo *root, } /* + * choose_parallel_hashed_grouping - should we use parallel hashed grouping? + * + * Returns TRUE to select parallel hashing, FALSE to select hashing. + */ +static bool +choose_parallel_hashed_grouping(PlannerInfo *root, + double tuple_fraction, double limit_tuples, + double path_rows, int path_width, + Path *cheapest_path, Path *sorted_path, + Path *cheapest_partial_path, + Path *sorted_partial_path, + double dNumGroups, AggClauseCosts *agg_costs) +{ + Query *parse = root->parse; + int numGroupCols = list_length(parse->groupClause); + List *target_pathkeys; + Path hashed_p; + Path parallel_hashed_p; + double worker_path_rows; + Cost run_cost = 0; + int parallel_degree; + + target_pathkeys = root->sort_pathkeys; + parallel_degree = cheapest_partial_path->parallel_degree; + + /* + * See if the estimated cost is no more than doing it the other way. While + * avoiding the need for sorted input is usually a win, the fact that the + * output won't be sorted may be a loss; so we need to do an actual cost + * comparison. + * + * We need to consider cheapest_path + hashagg [+ final sort] versus + * cheapest_partial_path + Partial hashagg + Gather + Finalize hasagg + * [+ final sort]. where brackets indicate a step that may not be needed. + * + * These path variables are dummies that just hold cost fields; we don't + * make actual Paths for these steps. + */ + cost_agg(&hashed_p, root, AGG_HASHED, agg_costs, + numGroupCols, dNumGroups, + cheapest_path->startup_cost, cheapest_path->total_cost, + path_rows); + /* Result of hashed agg is always unsorted */ + if (target_pathkeys) + cost_sort(&hashed_p, root, target_pathkeys, hashed_p.total_cost, + dNumGroups, path_width, + 0.0, work_mem, limit_tuples); + + /* Parallel aggregate cost calculation */ + worker_path_rows = path_rows / parallel_degree; + + /* Partial aggregate cost calculation */ + cost_agg(¶llel_hashed_p, root, AGG_HASHED, agg_costs, + numGroupCols, dNumGroups, + cheapest_partial_path->startup_cost, cheapest_partial_path->total_cost, + worker_path_rows); + + /* gather path cost calculation */ + run_cost = parallel_hashed_p.total_cost - parallel_hashed_p.startup_cost; + + /* Parallel setup and communication cost. */ + parallel_hashed_p.startup_cost += parallel_setup_cost; + run_cost += parallel_tuple_cost * dNumGroups * parallel_degree; + + parallel_hashed_p.total_cost = (parallel_hashed_p.startup_cost + run_cost); + + + /* Final aggregate cost calculation */ + cost_agg(¶llel_hashed_p, root, AGG_HASHED, agg_costs, + numGroupCols, dNumGroups, + parallel_hashed_p.startup_cost, parallel_hashed_p.total_cost, + (dNumGroups * parallel_degree)); + + /* Result of hashed agg is always unsorted */ + if (target_pathkeys) + cost_sort(¶llel_hashed_p, root, target_pathkeys, parallel_hashed_p.total_cost, + dNumGroups, path_width, + 0.0, work_mem, limit_tuples); + + /* + * Now make the decision using the top-level tuple fraction. + */ + if (compare_fractional_path_costs(¶llel_hashed_p, &hashed_p, + tuple_fraction) < 0) + { + /* parallel Hashing is cheaper, so use it */ + return true; + } + return false; +} + +/* + * choose_parallel_grouping - should we use parallel grouping? + * + * Returns TRUE to select parallel grouping, FALSE to select normal grouping. + */ +static bool +choose_parallel_grouping(PlannerInfo *root, + double tuple_fraction, double limit_tuples, + double path_rows, int path_width, + Path *cheapest_path, Path *sorted_path, + Path *cheapest_partial_path, + Path *sorted_partial_path, + double dNumGroups, AggClauseCosts *agg_costs) +{ + Query *parse = root->parse; + int numGroupCols = list_length(parse->groupClause); + List *target_pathkeys; + List *current_pathkeys; + Path sorted_p; + Path parallel_sorted_p; + double worker_path_rows; + Cost run_cost = 0; + int parallel_degree; + + target_pathkeys = root->sort_pathkeys; + parallel_degree = cheapest_partial_path->parallel_degree; + + /* + * See if the estimated cost is no more than doing it the other way. While + * avoiding the need for sorted input is usually a win, the fact that the + * output won't be sorted may be a loss; so we need to do an actual cost + * comparison. + * + * We need to consider cheapest_path [+ sort] + group or agg [+ final sort] or + * presorted_path + group or agg [+ final sort] versus + * cheapest_partial_path [+ sort] + partial group or agg + Gather + * + finalize group or agg [+ final sort] where brackets indicate a + * step that may not be needed. We assume grouping_planner() will have + * passed us a presorted path only if it's a winner compared to + * cheapest_path for this purpose. + * + * These path variables are dummies that just hold cost fields; we don't + * make actual Paths for these steps. + */ + if (sorted_path) + { + sorted_p.startup_cost = sorted_path->startup_cost; + sorted_p.total_cost = sorted_path->total_cost; + current_pathkeys = sorted_path->pathkeys; + } + else + { + sorted_p.startup_cost = cheapest_path->startup_cost; + sorted_p.total_cost = cheapest_path->total_cost; + current_pathkeys = cheapest_path->pathkeys; + } + if (!pathkeys_contained_in(root->group_pathkeys, current_pathkeys)) + { + cost_sort(&sorted_p, root, root->group_pathkeys, sorted_p.total_cost, + path_rows, path_width, + 0.0, work_mem, -1.0); + current_pathkeys = root->group_pathkeys; + } + + if (parse->hasAggs) + cost_agg(&sorted_p, root, AGG_SORTED, agg_costs, + numGroupCols, dNumGroups, + sorted_p.startup_cost, sorted_p.total_cost, + path_rows); + else + cost_group(&sorted_p, root, numGroupCols, dNumGroups, + sorted_p.startup_cost, sorted_p.total_cost, + path_rows); + + /* The Agg or Group node will preserve ordering */ + if (target_pathkeys && + !pathkeys_contained_in(target_pathkeys, current_pathkeys)) + cost_sort(&sorted_p, root, target_pathkeys, sorted_p.total_cost, + dNumGroups, path_width, + 0.0, work_mem, limit_tuples); + + /* Parallel aggregate cost calculation */ + parallel_sorted_p.startup_cost = cheapest_partial_path->startup_cost; + parallel_sorted_p.total_cost = cheapest_partial_path->total_cost; + current_pathkeys = cheapest_partial_path->pathkeys; + + worker_path_rows = path_rows / parallel_degree; + + if (!pathkeys_contained_in(root->group_pathkeys, current_pathkeys)) + { + cost_sort(¶llel_sorted_p, root, root->group_pathkeys, parallel_sorted_p.total_cost, + worker_path_rows, path_width, + 0.0, work_mem, -1.0); + current_pathkeys = root->group_pathkeys; + } + + if (parse->hasAggs) + cost_agg(¶llel_sorted_p, root, AGG_SORTED, agg_costs, + numGroupCols, dNumGroups, + parallel_sorted_p.startup_cost, parallel_sorted_p.total_cost, + worker_path_rows); + else + cost_group(¶llel_sorted_p, root, numGroupCols, dNumGroups, + parallel_sorted_p.startup_cost, parallel_sorted_p.total_cost, + worker_path_rows); + + /* gather path cost calculation */ + run_cost = parallel_sorted_p.total_cost - parallel_sorted_p.startup_cost; + + /* Parallel setup and communication cost. */ + parallel_sorted_p.startup_cost += parallel_setup_cost; + run_cost += parallel_tuple_cost * dNumGroups * parallel_degree; + + parallel_sorted_p.total_cost = (parallel_sorted_p.startup_cost + run_cost); + + /* Final aggregate cost calculation */ + if (!pathkeys_contained_in(root->group_pathkeys, current_pathkeys)) + { + cost_sort(¶llel_sorted_p, root, root->group_pathkeys, parallel_sorted_p.total_cost, + (dNumGroups * parallel_degree), path_width, + 0.0, work_mem, -1.0); + current_pathkeys = root->group_pathkeys; + } + + if (parse->hasAggs) + cost_agg(¶llel_sorted_p, root, AGG_SORTED, agg_costs, + numGroupCols, dNumGroups, + parallel_sorted_p.startup_cost, parallel_sorted_p.total_cost, + (dNumGroups * parallel_degree)); + else + cost_group(¶llel_sorted_p, root, numGroupCols, dNumGroups, + parallel_sorted_p.startup_cost, parallel_sorted_p.total_cost, + (dNumGroups * parallel_degree)); + + /* The Agg or Group node will preserve ordering */ + if (target_pathkeys && + !pathkeys_contained_in(target_pathkeys, current_pathkeys)) + cost_sort(¶llel_sorted_p, root, target_pathkeys, parallel_sorted_p.total_cost, + dNumGroups, path_width, + 0.0, work_mem, limit_tuples); + + /* + * Now make the decision using the top-level tuple fraction. + */ + if (compare_fractional_path_costs(¶llel_sorted_p, &sorted_p, + tuple_fraction) < 0) + { + /* parallel grouping is cheaper, so use it */ + return true; + } + return false; +} + + +/* * choose_hashed_distinct - should we use hashing for DISTINCT? * * This is fairly similar to choose_hashed_grouping, but there are enough @@ -4923,3 +5276,437 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid) return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost); } + +/* + * This function build a hash parallelagg plan as result_plan as following : + * Finalize Hash Aggregate + * -> Gather + * -> Partial Hash Aggregate + * -> Any partial plan + * The input result_plan will be + * -> Any partial plan + * + * So this function will do the following steps: + * If not parallel + * 1. Add the hash aggregate node + * 2. Return the result plan + * + * In case of parallel + * 1. Add the partial hash aggregate node + * 2. Add Gather node on top of partial hash aggregate node + * 3. Add Finalize hash Aggregate on top of Gather node + * 4. Return the result plan + */ + +static Plan * +make_hash_agg(PlannerInfo *root, + Query *parse, + List *tlist, + AggClauseCosts *agg_costs, + int numGroupCols, + AttrNumber *groupColIdx, + long numGroups, + int parallel_degree, + Plan *lefttree) +{ + Plan *result_plan = NULL; + Plan *partial_agg = NULL; + Plan *gather_plan = NULL; + List *partial_agg_tlist = NIL; + List *qual = (List*)parse->havingQual; + AttrNumber *topgroupColIdx = NULL; + + if (!parallel_degree) + { + result_plan = (Plan *) make_agg(root, + tlist, + (List *) parse->havingQual, + AGG_HASHED, + agg_costs, + numGroupCols, + groupColIdx, + extract_grouping_ops(parse->groupClause), + NIL, + numGroups, + false, + true, + lefttree); + return result_plan; + } + + /* + * The underlying Agg targetlist should be a flat tlist of all Vars and Aggs + * needed to evaluate the expressions and final values of aggregates present + * in the main target list. The quals also should be included. + */ + partial_agg_tlist = make_partial_agg_tlist(add_qual_in_tlist(tlist, qual), + parse->groupClause); + + /* Make PartialHashAgg plan node */ + partial_agg = (Plan *) make_agg(root, + partial_agg_tlist, + NULL, + AGG_HASHED, + agg_costs, + numGroupCols, + groupColIdx, + extract_grouping_ops(parse->groupClause), + NIL, + numGroups, + false, + false, + lefttree); + + gather_plan = (Plan *)create_gather_plan_from_subplan(root, + partial_agg, + (numGroups * parallel_degree), + parallel_degree); + + /* + * Get the sortIndex according the subplan + */ + topgroupColIdx = get_grpColIdx_from_subPlan(root, partial_agg_tlist); + + /* Make FinalizeHashAgg plan node */ + result_plan = (Plan *) make_agg(root, + tlist, + (List *) parse->havingQual, + AGG_HASHED, + agg_costs, + numGroupCols, + topgroupColIdx, + extract_grouping_ops(parse->groupClause), + NIL, + numGroups, + true, + true, + gather_plan); + + return result_plan; +} + +/* + * This function build a [group] parallelagg plan as result_plan as following : + * Finalize [Group] Aggregate + * -> [Sort] + * -> Gather + * -> Partial [Group] Aggregate + * -> [Sort] + * -> Any partial plan + * The input result_plan will be + * -> Any partial plan + * + * So this function will do the following steps: + * If not parallel + * 1. Add the [sort] and [group] aggregate node + * 2. Return the result plan + * + * In case of parallel + * 1. Add the [sort] and partial [group] aggregate node + * 2. Add Gather node on top of partial [group] aggregate node + * 3. Add [sort] and Finalize [Group] Aggregate on top of Gather node + * 4. Return the result plan + */ +static Plan * +make_group_agg(PlannerInfo *root, + Query *parse, + List *tlist, + bool need_sort_for_grouping, + List *rollup_groupclauses, + List *rollup_lists, + AttrNumber *groupColIdx, + AggClauseCosts *agg_costs, + long numGroups, + int parallel_degree, + Plan *lefttree) +{ + Plan *result_plan = NULL; + Plan *partial_agg = NULL; + Plan *gather_plan = NULL; + List *qual = (List*)parse->havingQual; + List *partial_agg_tlist = NULL; + AttrNumber *topgroupColIdx = NULL; + + if (!parallel_degree) + { + result_plan = build_grouping_chain(root, + parse, + tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + groupColIdx, + agg_costs, + numGroups, + false, + true, + lefttree); + return result_plan; + } + + /* + * The underlying Agg targetlist should be a flat tlist of all Vars and Aggs + * needed to evaluate the expressions and final values of aggregates present + * in the main target list. The quals also should be included. + */ + partial_agg_tlist = make_partial_agg_tlist(add_qual_in_tlist(tlist, qual), + llast(rollup_groupclauses)); + + /* Add PartialAgg and Sort node */ + partial_agg = build_grouping_chain(root, + parse, + partial_agg_tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + groupColIdx, + agg_costs, + numGroups, + false, + false, + lefttree); + + + + /* Let the Gather node as upper node of partial_agg node */ + gather_plan = (Plan *)create_gather_plan_from_subplan(root, + partial_agg, + (numGroups * parallel_degree), + parallel_degree); + + /* + * Get the sortIndex according the subplan + */ + topgroupColIdx = get_grpColIdx_from_subPlan(root, partial_agg_tlist); + + /* Make the Finalize Group Aggregate node */ + result_plan = build_grouping_chain(root, + parse, + tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + topgroupColIdx, + agg_costs, + numGroups, + true, + true, + gather_plan); + + return result_plan; +} + +/* Function to get the grouping column index from the provided plan */ +static AttrNumber* +get_grpColIdx_from_subPlan(PlannerInfo *root, List *tlist) +{ + Query *parse = root->parse; + int numCols; + + AttrNumber *grpColIdx = NULL; + + numCols = list_length(parse->groupClause); + if (numCols > 0) + { + ListCell *tl; + + grpColIdx = (AttrNumber *) palloc0(sizeof(AttrNumber) * numCols); + + foreach(tl, tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(tl); + int colno; + + colno = get_grouping_column_index(parse, tle); + if (colno >= 0) + { + Assert(grpColIdx[colno] == 0); /* no dups expected */ + grpColIdx[colno] = tle->resno; + } + } + } + + return grpColIdx; +} + +/* + * make_partial_agg_tlist + * Generate appropriate Agg node target list for input to ParallelAgg nodes. + * + * The initial target list passed to ParallelAgg node from the parser contains + * aggregates and GROUP BY columns. For the underlying agg node, we want to + * generate a tlist containing bare aggregate references (Aggref) and GROUP BY + * expressions. So we flatten all expressions except GROUP BY items into their + * component variables. + * For example, given a query like + * SELECT a+b, 2 * SUM(c+d) , AVG(d)+SUM(c+d) FROM table GROUP BY a+b; + * we want to pass this targetlist to the Agg plan: + * a+b, SUM(c+d), AVG(d) + * where the a+b target will be used by the Sort/Group steps, and the + * other targets will be used for computing the final results. + * Note that we don't flatten Aggref's , since those are to be computed + * by the underlying Agg node, and they will be referenced like Vars above it. + * + * 'tlist' is the ParallelAgg's final target list. + * + * The result is the targetlist to be computed by the Agg node below the + * ParallelAgg node. + */ +static List * +make_partial_agg_tlist(List *tlist,List *groupClause) +{ + Bitmapset *sgrefs; + List *new_tlist; + List *flattenable_cols; + List *flattenable_vars; + ListCell *lc; + + /* + * Collect the sortgroupref numbers of GROUP BY clauses + * into a bitmapset for convenient reference below. + */ + sgrefs = NULL; + + /* Add in sortgroupref numbers of GROUP BY clauses */ + foreach(lc, groupClause) + { + SortGroupClause *grpcl = (SortGroupClause *) lfirst(lc); + + sgrefs = bms_add_member(sgrefs, grpcl->tleSortGroupRef); + } + + /* + * Construct a tlist containing all the non-flattenable tlist items, and + * save aside the others for a moment. + */ + new_tlist = NIL; + flattenable_cols = NIL; + + foreach(lc, tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + /* Don't want to deconstruct GROUP BY items. */ + if (tle->ressortgroupref != 0 && + bms_is_member(tle->ressortgroupref, sgrefs)) + { + /* Don't want to deconstruct this value, so add to new_tlist */ + TargetEntry *newtle; + + newtle = makeTargetEntry(tle->expr, + list_length(new_tlist) + 1, + NULL, + false); + /* Preserve its sortgroupref marking, in case it's volatile */ + newtle->ressortgroupref = tle->ressortgroupref; + new_tlist = lappend(new_tlist, newtle); + } + else + { + /* + * Column is to be flattened, so just remember the expression for + * later call to pull_var_clause. There's no need for + * pull_var_clause to examine the TargetEntry node itself. + */ + flattenable_cols = lappend(flattenable_cols, tle->expr); + } + } + + /* + * Pull out all the Vars and Aggrefs mentioned in flattenable columns, and + * add them to the result tlist if not already present. (Some might be + * there already because they're used directly as group clauses.) + * + * Note: it's essential to use PVC_INCLUDE_AGGREGATES here, so that the + * Aggrefs are placed in the Agg node's tlist and not left to be computed + * at higher levels. + */ + flattenable_vars = pull_var_clause((Node *) flattenable_cols, + PVC_INCLUDE_AGGREGATES, + PVC_INCLUDE_PLACEHOLDERS); + new_tlist = add_to_flat_tlist(new_tlist, flattenable_vars); + + /* clean up cruft */ + list_free(flattenable_vars); + list_free(flattenable_cols); + + /* + * Update the targetlist aggref->aggtype with the transtype. This is required to + * send the aggregate transition data from workers to the backend for combining + * and returning the final result. + */ + foreach(lc, new_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (IsA(tle->expr, Aggref)) + { + Aggref *aggref = (Aggref *) tle->expr; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + aggref->aggtype = aggform->aggtranstype; + + ReleaseSysCache(aggTuple); + } + } + + return new_tlist; +} + +/* + * add_qual_in_tlist + * Add the agg functions in qual into the target list used in agg plan + */ +static List* +add_qual_in_tlist(List *targetlist, List *qual) +{ + AddQualInTListExprContext context; + + if(qual == NULL) + return targetlist; + + context.targetlist = copyObject(targetlist); + context.resno = list_length(context.targetlist) + 1;; + + add_qual_in_tlist_walker((Node*)qual, &context); + + return context.targetlist; +} + +/* + * add_qual_in_tlist_walker + * Go through the qual list to get the aggref and add it in targetlist + */ +static bool +add_qual_in_tlist_walker (Node *node, AddQualInTListExprContext *context) +{ + if (node == NULL) + return false; + + if (IsA(node, Aggref)) + { + List *tlist = context->targetlist; + TargetEntry *te = makeNode(TargetEntry); + + te = makeTargetEntry((Expr *) node, + context->resno++, + NULL, + false); + + tlist = lappend(tlist,te); + } + else + return expression_tree_walker(node, add_qual_in_tlist_walker, context); + + return false; +} + + diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 615f3a2..0cefd03 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -15,7 +15,9 @@ */ #include "postgres.h" +#include "access/htup_details.h" #include "access/transam.h" +#include "catalog/pg_aggregate.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -65,6 +67,7 @@ typedef struct indexed_tlist *subplan_itlist; Index newvarno; int rtoffset; + bool partial_agg; } fix_upper_expr_context; /* @@ -104,6 +107,7 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context); static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context); static void set_join_references(PlannerInfo *root, Join *join, int rtoffset); static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_agg_references(PlannerInfo *root, Plan *plan, int rtoffset); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); static Var *search_indexed_tlist_for_var(Var *var, @@ -128,7 +132,8 @@ static Node *fix_upper_expr(PlannerInfo *root, Node *node, indexed_tlist *subplan_itlist, Index newvarno, - int rtoffset); + int rtoffset, + bool partial_agg); static Node *fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context); static List *set_returning_clause_references(PlannerInfo *root, @@ -140,6 +145,7 @@ static bool fix_opfuncids_walker(Node *node, void *context); static bool extract_query_dependencies_walker(Node *node, PlannerInfo *context); + /***************************************************************************** * * SUBPLAN REFERENCES @@ -668,7 +674,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) } break; case T_Agg: - set_upper_references(root, plan, rtoffset); + set_agg_references(root, plan, rtoffset); break; case T_Group: set_upper_references(root, plan, rtoffset); @@ -943,13 +949,15 @@ set_indexonlyscan_references(PlannerInfo *root, (Node *) plan->scan.plan.targetlist, index_itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); plan->scan.plan.qual = (List *) fix_upper_expr(root, (Node *) plan->scan.plan.qual, index_itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); /* indexqual is already transformed to reference index columns */ plan->indexqual = fix_scan_list(root, plan->indexqual, rtoffset); /* indexorderby is already transformed to reference index columns */ @@ -1116,25 +1124,29 @@ set_foreignscan_references(PlannerInfo *root, (Node *) fscan->scan.plan.targetlist, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); fscan->scan.plan.qual = (List *) fix_upper_expr(root, (Node *) fscan->scan.plan.qual, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); fscan->fdw_exprs = (List *) fix_upper_expr(root, (Node *) fscan->fdw_exprs, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); fscan->fdw_recheck_quals = (List *) fix_upper_expr(root, (Node *) fscan->fdw_recheck_quals, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); pfree(itlist); /* fdw_scan_tlist itself just needs fix_scan_list() adjustments */ fscan->fdw_scan_tlist = @@ -1190,19 +1202,22 @@ set_customscan_references(PlannerInfo *root, (Node *) cscan->scan.plan.targetlist, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); cscan->scan.plan.qual = (List *) fix_upper_expr(root, (Node *) cscan->scan.plan.qual, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); cscan->custom_exprs = (List *) fix_upper_expr(root, (Node *) cscan->custom_exprs, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); pfree(itlist); /* custom_scan_tlist itself just needs fix_scan_list() adjustments */ cscan->custom_scan_tlist = @@ -1524,7 +1539,8 @@ set_join_references(PlannerInfo *root, Join *join, int rtoffset) (Node *) nlp->paramval, outer_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); /* Check we replaced any PlaceHolderVar with simple Var */ if (!(IsA(nlp->paramval, Var) && nlp->paramval->varno == OUTER_VAR)) @@ -1648,14 +1664,16 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) (Node *) tle->expr, subplan_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); } else newexpr = fix_upper_expr(root, (Node *) tle->expr, subplan_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); tle = flatCopyTargetEntry(tle); tle->expr = (Expr *) newexpr; output_targetlist = lappend(output_targetlist, tle); @@ -1667,7 +1685,8 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) (Node *) plan->qual, subplan_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); pfree(subplan_itlist); } @@ -2121,7 +2140,8 @@ fix_upper_expr(PlannerInfo *root, Node *node, indexed_tlist *subplan_itlist, Index newvarno, - int rtoffset) + int rtoffset, + bool partial_agg) { fix_upper_expr_context context; @@ -2129,6 +2149,7 @@ fix_upper_expr(PlannerInfo *root, context.subplan_itlist = subplan_itlist; context.newvarno = newvarno; context.rtoffset = rtoffset; + context.partial_agg = partial_agg; return fix_upper_expr_mutator(node, &context); } @@ -2151,6 +2172,36 @@ fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context) elog(ERROR, "variable not found in subplan target list"); return (Node *) newvar; } + if (IsA(node, Aggref) && context->partial_agg) + { + TargetEntry *tle; + Aggref *aggref = (Aggref*)node; + List *args = NIL; + + tle = tlist_member(node, context->subplan_itlist->tlist); + if (tle) + { + /* Found a matching subplan output expression */ + Var *newvar; + TargetEntry *newtle; + + newvar = makeVarFromTargetEntry(context->newvarno, tle); + newvar->varnoold = 0; /* wasn't ever a plain Var */ + newvar->varoattno = 0; + + /* makeTargetEntry ,always set resno to one for finialize agg */ + newtle = makeTargetEntry((Expr*)newvar,1,NULL,false); + args = lappend(args,newtle); + + /* + * Updated the args, let the newvar refer to the right position of + * the agg function in the subplan + */ + aggref->args = args; + + return (Node *) aggref; + } + } if (IsA(node, PlaceHolderVar)) { PlaceHolderVar *phv = (PlaceHolderVar *) node; @@ -2432,3 +2483,87 @@ extract_query_dependencies_walker(Node *node, PlannerInfo *context) return expression_tree_walker(node, extract_query_dependencies_walker, (void *) context); } + +/* + * set_agg_references + * Update the targetlist and quals of an upper-level plan node + * to refer to the tuples returned by its lefttree subplan. + * Also perform opcode lookup for these expressions, and + * add regclass OIDs to root->glob->relationOids. + * + * This is used for single-input plan types like Agg, Group, Result. + * + * In most cases, we have to match up individual Vars in the tlist and + * qual expressions with elements of the subplan's tlist (which was + * generated by flatten_tlist() from these selfsame expressions, so it + * should have all the required variables). There is an important exception, + * however: GROUP BY and ORDER BY expressions will have been pushed into the + * subplan tlist unflattened. If these values are also needed in the output + * then we want to reference the subplan tlist element rather than recomputing + * the expression. + */ +static void +set_agg_references(PlannerInfo *root, Plan *plan, int rtoffset) +{ + Agg *agg = (Agg*)plan; + Plan *subplan = plan->lefttree; + indexed_tlist *subplan_itlist; + List *output_targetlist; + ListCell *l; + + if (!agg->combineStates) + return set_upper_references(root, plan, rtoffset); + + subplan_itlist = build_tlist_index(subplan->targetlist); + + output_targetlist = NIL; + + if(agg->combineStates) + { + foreach(l, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + Node *newexpr; + + /* If it's a non-Var sort/group item, first try to match by sortref */ + if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var)) + { + newexpr = (Node *) + search_indexed_tlist_for_sortgroupref((Node *) tle->expr, + tle->ressortgroupref, + subplan_itlist, + OUTER_VAR); + if (!newexpr) + newexpr = fix_upper_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset, + true); + } + else + newexpr = fix_upper_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset, + true); + tle = flatCopyTargetEntry(tle); + tle->expr = (Expr *) newexpr; + output_targetlist = lappend(output_targetlist, tle); + } + } + + plan->targetlist = output_targetlist; + + plan->qual = (List *) + fix_upper_expr(root, + (Node *) plan->qual, + subplan_itlist, + OUTER_VAR, + rtoffset, + false); + + pfree(subplan_itlist); +} + diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index dff115e..f853d5e 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -90,9 +90,15 @@ typedef struct typedef struct { + PartialAggType allowedtype; +} partial_agg_context; + +typedef struct +{ bool allow_restricted; } has_parallel_hazard_arg; +static bool partial_aggregate_walker(Node *node, partial_agg_context *context); static bool contain_agg_clause_walker(Node *node, void *context); static bool count_agg_clauses_walker(Node *node, count_agg_clauses_context *context); @@ -398,6 +404,86 @@ make_ands_implicit(Expr *clause) /***************************************************************************** * Aggregate-function clause manipulation *****************************************************************************/ +/* + * aggregates_allow_partial + * Recursively search for Aggref clauses and determine the maximum + * 'degree' of partial aggregation which can be supported. Partial + * aggregation requires that each aggregate does not have a DISTINCT or + * ORDER BY clause, and that it also has a combine function set. For + * aggregates with an INTERNAL trans type we only can support all types of + * partial aggregation when the aggregate has a serial and deserial + * function set. If this is not present then we can only support, at most + * partial aggregation in the context of a single backend process, as + * internal state pointers cannot be dereferenced from another backend + * process. + */ +PartialAggType +aggregates_allow_partial(Node *clause) +{ + partial_agg_context context; + + /* initially any type is ok, until we find Aggrefs which say otherwise */ + context.allowedtype = PAT_ANY; + + if (!partial_aggregate_walker(clause, &context)) + return context.allowedtype; + return context.allowedtype; +} + +static bool +partial_aggregate_walker(Node *node, partial_agg_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + Assert(aggref->agglevelsup == 0); + + /* + * We can't perform partial aggregation with Aggrefs containing a + * DISTINCT or ORDER BY clause. + */ + if (aggref->aggdistinct || aggref->aggorder) + { + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + /* + * If there is no combine func, then partial aggregation is not + * possible. + */ + if (!OidIsValid(aggform->aggcombinefn)) + { + ReleaseSysCache(aggTuple); + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + + /* + * Any aggs with an internal transtype are not allowed in parallel + * aggregate currently, until they have a framework to transfer + * between worker and main backned. + */ + if (aggform->aggtranstype == INTERNALOID) + context->allowedtype = PAT_INTERNAL_ONLY; + + ReleaseSysCache(aggTuple); + return false; /* continue searching */ + } + return expression_tree_walker(node, partial_aggregate_walker, + (void *) context); +} /* * contain_agg_clause diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index ea5a09a..1550658 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -841,6 +841,15 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, { + {"enable_parallelagg", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of parallel agg plans."), + NULL + }, + &enable_parallelagg, + true, + NULL, NULL, NULL + }, + { {"enable_material", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of materialization."), NULL diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 3b3fd0f..d03ccc9 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -27,6 +27,26 @@ typedef struct List **windowFuncs; /* lists of WindowFuncs for each winref */ } WindowFuncLists; +/* + * PartialAggType + * PartialAggType stores whether partial aggregation is allowed and + * which context it is allowed in. We require three states here as there are + * two different contexts in which partial aggregation is safe. For aggregates + * which have an 'stype' of INTERNAL, within a single backend process it is + * okay to pass a pointer to the aggregate state, as the memory to which the + * pointer points to will belong to the same process. In cases where the + * aggregate state must be passed between different processes, for example + * during parallel aggregation, passing the pointer is not okay due to the + * fact that the memory being referenced won't be accessible from another + * process. + */ +typedef enum +{ + PAT_ANY = 0, /* Any type of partial aggregation is ok. */ + PAT_INTERNAL_ONLY, /* Some aggregates support only internal mode. */ + PAT_DISABLED /* Some aggregates don't support partial mode at all */ +} PartialAggType; + extern Expr *make_opclause(Oid opno, Oid opresulttype, bool opretset, Expr *leftop, Expr *rightop, @@ -47,6 +67,7 @@ extern Node *make_and_qual(Node *qual1, Node *qual2); extern Expr *make_ands_explicit(List *andclauses); extern List *make_ands_implicit(Expr *clause); +extern PartialAggType aggregates_allow_partial(Node *clause); extern bool contain_agg_clause(Node *clause); extern void count_agg_clauses(PlannerInfo *root, Node *clause, AggClauseCosts *costs); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 78c7cae..0ab043a 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -62,6 +62,7 @@ extern bool enable_bitmapscan; extern bool enable_tidscan; extern bool enable_sort; extern bool enable_hashagg; +extern bool enable_parallelagg; extern bool enable_nestloop; extern bool enable_material; extern bool enable_mergejoin; diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index eaa642b..e284310 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -98,6 +98,8 @@ extern ModifyTable *make_modifytable(PlannerInfo *root, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, int epqParam); extern bool is_projection_capable_plan(Plan *plan); +extern Gather *create_gather_plan_from_subplan(PlannerInfo *root, Plan *subplan, + double path_rows, int parallel_degree); /* * prototypes for plan/initsplan.c diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out index 00ef421..fe186c5 100644 --- a/src/test/regress/expected/rangefuncs.out +++ b/src/test/regress/expected/rangefuncs.out @@ -9,10 +9,11 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%'; enable_material | on enable_mergejoin | on enable_nestloop | on + enable_parallelagg | on enable_seqscan | on enable_sort | on enable_tidscan | on -(11 rows) +(12 rows) CREATE TABLE foo2(fooid int, f2 int); INSERT INTO foo2 VALUES(1, 11);