From 889de831a66c8f4a97d7822e0aac34e470dc7b60 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 4 Jan 2021 12:13:23 +0530 Subject: [PATCH v19 2/4] Tuple Cost Adjustment for Parallel Inserts in CTAS Let the planner know that the SELECT is from CTAS in createas.c so that it can set the number of tuples transferred from the workers to Gather node to 0. With this change, there are chances that the planner may choose the parallel plan. --- src/backend/commands/createas.c | 13 ++++- src/backend/commands/explain.c | 22 +++++++-- src/backend/commands/prepare.c | 3 +- src/backend/executor/execParallel.c | 70 +++++++++++++++++++++------ src/backend/optimizer/path/costsize.c | 20 +++++++- src/backend/optimizer/plan/planner.c | 40 +++++++++++++++ src/include/commands/explain.h | 3 +- src/include/executor/execParallel.h | 22 ++++++++- src/include/nodes/parsenodes.h | 2 + src/include/optimizer/planner.h | 10 ++++ 10 files changed, 182 insertions(+), 23 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index a8050a2767..53ca3010c6 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -310,6 +310,16 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, query = linitial_node(Query, rewritten); Assert(query->commandType == CMD_SELECT); + /* + * Turn on a flag to indicate planner so that it can ignore parallel + * tuple cost while generating Gather path. + */ + if (IsParallelInsertionAllowed(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + ¶llel_ins_info)) + query->parallelInsCmdTupleCostOpt |= PARALLEL_INSERT_SELECT_QUERY; + else + query->parallelInsCmdTupleCostOpt = 0; + /* plan the query */ plan = pg_plan_query(query, pstate->p_sourcetext, CURSOR_OPT_PARALLEL_OK, params); @@ -342,7 +352,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * the executor to decide whether to allow parallel inserts or not. */ SetParallelInsertState(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, - queryDesc); + queryDesc, + &query->parallelInsCmdTupleCostOpt); } /* run the plan to completion */ diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 0ae5d8c65f..8e01faba7e 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -383,11 +383,25 @@ ExplainOneQuery(Query *query, int cursorOptions, planduration; BufferUsage bufusage_start, bufusage; + ParallelInsertCTASInfo parallel_ins_info; + + parallel_ins_info.intoclause = into; + parallel_ins_info.objectid = InvalidOid; if (es->buffers) bufusage_start = pgBufferUsage; INSTR_TIME_SET_CURRENT(planstart); + /* + * Turn on a flag to indicate planner so that it can ignore parallel + * tuple cost while generating Gather path. + */ + if (IsParallelInsertionAllowed(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + ¶llel_ins_info)) + query->parallelInsCmdTupleCostOpt |= PARALLEL_INSERT_SELECT_QUERY; + else + query->parallelInsCmdTupleCostOpt = 0; + /* plan the query */ plan = pg_plan_query(query, queryString, cursorOptions, params); @@ -403,7 +417,8 @@ ExplainOneQuery(Query *query, int cursorOptions, /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + &query->parallelInsCmdTupleCostOpt); } } @@ -513,7 +528,8 @@ void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage) + const BufferUsage *bufusage, + uint8 *parallel_ins_tuple_cost_opts) { DestReceiver *dest; QueryDesc *queryDesc; @@ -590,7 +606,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, * the executor to decide whether to allow parallel inserts or not. */ SetParallelInsertState(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, - queryDesc); + queryDesc, parallel_ins_tuple_cost_opts); } } diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 653ef8e41a..696d3343d4 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -672,7 +672,8 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, if (pstmt->commandType != CMD_UTILITY) ExplainOnePlan(pstmt, into, es, query_string, paramLI, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + NULL); else ExplainOneUtility(pstmt->utilityStmt, into, es, query_string, paramLI, queryEnv); diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 7ed3e9e3b6..2846df66e6 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -1755,7 +1755,8 @@ IsParallelInsertionAllowed(ParallelInsertCmdKind ins_cmd, void *ins_info) * node so that the required information will be picked and sent to workers. */ void -SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc) +SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc, + uint8 *tuple_cost_opts) { GatherState *gstate; DestReceiver *dest; @@ -1766,24 +1767,63 @@ SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc) dest = queryDesc->dest; /* - * Parallel insertions are not possible either if the upper node is not - * Gather or it's a Gather but it have some projections to perform. + * Parallel insertions are possible only if the upper node is Gather. */ - if (!IsA(gstate, GatherState) || gstate->ps.ps_ProjInfo) + if (!IsA(gstate, GatherState)) return; - /* Okay to parallelize inserts, so mark it. */ - if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) - ((DR_intorel *) dest)->is_parallel = true; + if (tuple_cost_opts && gstate->ps.ps_ProjInfo) + Assert(!(*tuple_cost_opts & PARALLEL_INSERT_TUP_COST_IGNORED)); /* - * For parallelizing inserts, we must send some information so that the - * workers can build their own dest receivers. For CTAS, this info is into - * clause, object id (to open the created table). - * - * Since the required information is available in the dest receiver, store - * a reference to it in the Gather state so that it will be used in - * ExecInitParallelPlan to pick the information. + * Parallelize inserts only when the upper Gather node has no projections. */ - gstate->dest = dest; + if (!gstate->ps.ps_ProjInfo) + { + /* Okay to parallelize inserts, so mark it. */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + ((DR_intorel *) dest)->is_parallel = true; + + /* + * For parallelizing inserts, we must send some information so that the + * workers can build their own dest receivers. For CTAS, this info is + * into clause, object id (to open the created table). + * + * Since the required information is available in the dest receiver, + * store a reference to it in the Gather state so that it will be used + * in ExecInitParallelPlan to pick the information. + */ + gstate->dest = dest; + } + else + { + /* + * Upper Gather node has projections, so parallel insertions are not + * allowed. + */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + ((DR_intorel *) dest)->is_parallel = false; + + gstate->dest = NULL; + + /* + * Before returning, ensure that we have not done wrong parallel tuple + * cost enforcement in the planner. Main reason for this assertion is + * to check if we enforced the planner to ignore the parallel tuple + * cost (with the intention of choosing parallel inserts) due to which + * the parallel plan may have been chosen, but we do not allow the + * parallel inserts now. + * + * If we have correctly ignored parallel tuple cost in the planner + * while creating Gather path, then this assertion failure should not + * occur. In case it occurs, that means the planner may have chosen + * this parallel plan because of our wrong enforcement. So let's try to + * catch that here. + */ + Assert(tuple_cost_opts && !(*tuple_cost_opts & + PARALLEL_INSERT_TUP_COST_IGNORED)); + } + + if (tuple_cost_opts) + *tuple_cost_opts = 0; } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 380336518f..d79842dbf3 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -76,6 +76,7 @@ #include "access/amapi.h" #include "access/htup_details.h" #include "access/tsmapi.h" +#include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeHash.h" @@ -393,7 +394,24 @@ cost_gather(GatherPath *path, PlannerInfo *root, /* Parallel setup and communication cost. */ startup_cost += parallel_setup_cost; - run_cost += parallel_tuple_cost * path->path.rows; + + /* + * Do not consider tuple cost in case of we intend to perform parallel + * inserts by workers. We would have turned on the ignore flag in + * apply_scanjoin_target_to_paths before generating Gather path for the + * upper level SELECT part of the query. + */ + if ((root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_SELECT_QUERY) && + (root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_CAN_IGN_TUP_COST)) + { + /* We are ignoring the parallel tuple cost, so mark it. */ + root->parse->parallelInsCmdTupleCostOpt |= + PARALLEL_INSERT_TUP_COST_IGNORED; + } + else + run_cost += parallel_tuple_cost * path->path.rows; path->path.startup_cost = startup_cost; path->path.total_cost = (startup_cost + run_cost); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 4e6497ff32..d1b7347de2 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -28,6 +28,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "foreign/fdwapi.h" @@ -7338,6 +7339,36 @@ can_partial_agg(PlannerInfo *root) return true; } +/* + * ignore_parallel_tuple_cost + * + * Gather node will not receive any tuples from the workers in case each worker + * inserts them in parallel. So, we turn on a flag to ignore parallel tuple + * cost by the Gather path in cost_gather if the SELECT is for commands in + * which parallel insertion is possible and we are generating an upper level + * Gather path. + */ +static void +ignore_parallel_tuple_cost(PlannerInfo *root) +{ + if (root->query_level == 1 && + (root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_SELECT_QUERY)) + { + /* + * In each of the HAS_PARENT_PATH_GENERATING_CLAUSE cases, a parent + * path will be generated for the upper Gather path(in + * grouping_planner), in which case we can not let parallel inserts + * happen. So we do not turn on ignore tuple cost flag. + */ + if (HAS_PARENT_PATH_GENERATING_CLAUSE(root)) + return; + + root->parse->parallelInsCmdTupleCostOpt |= + PARALLEL_INSERT_CAN_IGN_TUP_COST; + } +} + /* * apply_scanjoin_target_to_paths * @@ -7557,7 +7588,16 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) + { + /* + * Turn on a flag to ignore parallel tuple cost by the Gather path in + * cost_gather if the SELECT is for commands in which parallel + * insertion is possible and we are generating an upper level Gather + * path. + */ + ignore_parallel_tuple_cost(root); generate_useful_gather_paths(root, rel, false); + } /* * Reassess which paths are the cheapest, now that we've potentially added diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h index e94d9e49cf..1a75c3ced3 100644 --- a/src/include/commands/explain.h +++ b/src/include/commands/explain.h @@ -91,7 +91,8 @@ extern void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage); + const BufferUsage *bufusage, + uint8 *parallel_ins_tuple_cost_opts); extern void ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc); extern void ExplainPrintTriggers(ExplainState *es, QueryDesc *queryDesc); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 689f577c08..f76b5c2ffd 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -49,6 +49,25 @@ typedef enum ParallelInsertCmdKind PARALLEL_INSERT_CMD_CREATE_TABLE_AS } ParallelInsertCmdKind; +/* + * Information sent to planner to account for tuple cost calculations in + * cost_gather for parallel insertions in commands such as CTAS. + * + * We need to let the planner know that there will be no tuples received by + * Gather node if workers insert the tuples in parallel. + */ +typedef enum ParallelInsertCmdTupleCostOpt +{ + PARALLEL_INSERT_SELECT_QUERY = 1 << 0, /* turn on this before planning */ + /* + * Turn on this while planning for upper Gather path to ignore parallel + * tuple cost in cost_gather. + */ + PARALLEL_INSERT_CAN_IGN_TUP_COST = 1 << 1, + /* Turn on this after the cost is ignored. */ + PARALLEL_INSERT_TUP_COST_IGNORED = 1 << 2 +} ParallelInsertCmdTupleCostOpt; + /* * For each of the command added to ParallelInsertCmdKind, add a corresponding * structure encompassing the information that's required to be shared across @@ -85,5 +104,6 @@ extern void *GetParallelInsertCmdInfo(DestReceiver *dest, extern bool IsParallelInsertionAllowed(ParallelInsertCmdKind ins_cmd, void *ins_info); extern void SetParallelInsertState(ParallelInsertCmdKind ins_cmd, - QueryDesc *queryDesc); + QueryDesc *queryDesc, + uint8 *tuple_cost_opts); #endif /* EXECPARALLEL_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index a0f37e5268..a1d2cb9d4f 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -180,6 +180,8 @@ typedef struct Query */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ + /* Parallel insertion tuple cost options. */ + uint8 parallelInsCmdTupleCostOpt; } Query; diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 9a15de5025..b71d21d334 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -21,6 +21,16 @@ #include "nodes/pathnodes.h" #include "nodes/plannodes.h" +#define HAS_PARENT_PATH_GENERATING_CLAUSE(root) \ + (root->parse->rowMarks || \ + limit_needed(root->parse) || \ + root->parse->sortClause || \ + root->parse->distinctClause || \ + root->parse->hasWindowFuncs || \ + root->parse->groupClause || \ + root->parse->groupingSets || \ + root->parse->hasAggs || \ + root->hasHavingQual) /* Hook for plugins to get control in planner() */ typedef PlannedStmt *(*planner_hook_type) (Query *parse, -- 2.25.1