From 20aa2cede64793a9f9b2b7f9b6c7574279df9c73 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 16 Dec 2020 11:45:58 +0530 Subject: [PATCH v12 2/3] Tuple Cost Adjustment for Parallel Inserts in CTAS Let the planner know that the SELECT is from CTAS in createas.c so that it can set the number of tuples transferred from the workers to Gather node to 0. With this change, there are chances that the planner may choose the parallel plan. Authors: Bharath Rupireddy, Hou, Zhijie --- src/backend/commands/createas.c | 88 ++++++++++++++++++++------- src/backend/commands/explain.c | 7 ++- src/backend/commands/prepare.c | 3 +- src/backend/optimizer/path/allpaths.c | 39 ++++++++++++ src/backend/optimizer/path/costsize.c | 22 ++++++- src/backend/optimizer/plan/planner.c | 59 ++++++++++++++++++ src/include/commands/createas.h | 23 ++++++- src/include/commands/explain.h | 3 +- src/include/nodes/parsenodes.h | 1 + src/include/optimizer/planner.h | 10 +++ 10 files changed, 225 insertions(+), 30 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index be381f9748..8401e80185 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -344,7 +344,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * into the target table. We need plan state to be initialized by the * executor to decide whether to allow parallel inserts or not. */ - ChooseParallelInsertsInCTAS(into, queryDesc); + ChooseParallelInsertsInCTAS(into, queryDesc, + &query->CTASParallelInsInfo); /* run the plan to completion */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); @@ -673,7 +674,8 @@ intorel_destroy(DestReceiver *self) * parallel insertions by the workers. Otherwise returns false. */ static bool -PushDownCTASParallelInsertState(DestReceiver *dest, PlanState *ps) +PushDownCTASParallelInsertState(DestReceiver *dest, PlanState *ps, + bool *gather_exists) { bool parallel = false; @@ -687,29 +689,39 @@ PushDownCTASParallelInsertState(DestReceiver *dest, PlanState *ps) for (int i = 0; i < aps->as_nplans; i++) { parallel |= PushDownCTASParallelInsertState(dest, - aps->appendplans[i]); + aps->appendplans[i], + gather_exists); } } - else if (IsA(ps, GatherState) && !ps->ps_ProjInfo) + else if (IsA(ps, GatherState)) { - GatherState *gstate = (GatherState *) ps; - parallel = true; - /* - * For parallelizing inserts in CTAS i.e. making each parallel worker - * insert the tuples, we must send information such as into clause (for - * each worker to build separate dest receiver), object id (for each - * worker to open the created table). + * Set to true if there exists at least one Gather node either at the + * top of the plan or as a direct sub node under Append node. */ - ((DR_intorel *) dest)->is_parallel = true; - gstate->dest = dest; + *gather_exists |= true; - /* - * Since there are no rows that are transferred from workers to Gather - * node, so we set it to 0 to be visible in estimated row count of - * explain plans. - */ - ps->plan->plan_rows = 0; + if (!ps->ps_ProjInfo) + { + GatherState *gstate = (GatherState *) ps; + parallel = true; + + /* + * For parallelizing inserts in CTAS i.e. making each parallel + * worker insert the tuples, we must send information such as into + * clause (for each worker to build separate dest receiver), object + * id (for each worker to open the created table). + */ + ((DR_intorel *) dest)->is_parallel = true; + gstate->dest = dest; + + /* + * Since there are no rows that are transferred from workers to + * Gather node, so we set it to 0 to be visible in estimated row + * count of explain plans. + */ + ps->plan->plan_rows = 0; + } } return parallel; @@ -720,8 +732,12 @@ PushDownCTASParallelInsertState(DestReceiver *dest, PlanState *ps) * insertion is possible, if yes set the parallel insert state i.e. push down * the dest receiver to the Gather nodes. */ -void ChooseParallelInsertsInCTAS(IntoClause *into, QueryDesc *queryDesc) +void ChooseParallelInsertsInCTAS(IntoClause *into, QueryDesc *queryDesc, + uint8 *tuple_cost_flags) { + bool gather_exists = false; + bool allow = false; + if (!IS_CTAS(into)) return; @@ -735,7 +751,33 @@ void ChooseParallelInsertsInCTAS(IntoClause *into, QueryDesc *queryDesc) if (!into->rel || into->rel->relpersistence == RELPERSISTENCE_TEMP) return; - if (queryDesc) - (void) PushDownCTASParallelInsertState(queryDesc->dest, - queryDesc->planstate); + if (!queryDesc) + return; + + allow = PushDownCTASParallelInsertState(queryDesc->dest, + queryDesc->planstate, + &gather_exists); + + /* + * It should not happen that in cost_gather we have ignored the parallel + * tuple cost and now we are not allowing the parallel inserts. And also we + * might need assertion only if the top node is Gather or Append under it + * we have Gather. The main intention of assertion is to check if we + * enforced planner to ignore the parallel tuple cost (with the intention + * of choosing parallel inserts) due to which the parallel plan may have + * been chosen, but we do not allow the parallel inserts now. + */ + if (!allow && tuple_cost_flags && gather_exists) + { + /* + * If we have correctly ignored parallel tuple cost in planner while + * creating Gather path, then this assertion failure should not occur. + * If it occurs, that means the planner may have chosen this parallel + * plan because of our enforcement to ignore the parallel tuple cost. + */ + Assert(!(*tuple_cost_flags & CTAS_PARALLEL_INS_TUP_COST_IGNORED)); + } + + if (tuple_cost_flags) + *tuple_cost_flags = CTAS_PARALLEL_INS_UNDEF; } diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index fbd0bc5a81..efdb34d1f0 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -402,7 +402,8 @@ ExplainOneQuery(Query *query, int cursorOptions, /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + &query->CTASParallelInsInfo); } } @@ -496,7 +497,7 @@ void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage) + const BufferUsage *bufusage, uint8 *ctas_tuple_cost_flags) { DestReceiver *dest; QueryDesc *queryDesc; @@ -562,7 +563,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, * target table. We need plan state to be initialized by the executor to * decide whether to allow parallel inserts or not. */ - ChooseParallelInsertsInCTAS(into, queryDesc); + ChooseParallelInsertsInCTAS(into, queryDesc, ctas_tuple_cost_flags); /* Execute the plan for statistics if asked for */ if (es->analyze) diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 89087a7be3..07166479e7 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -672,7 +672,8 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, if (pstmt->commandType != CMD_UTILITY) ExplainOnePlan(pstmt, into, es, query_string, paramLI, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + NULL); else ExplainOneUtility(pstmt->utilityStmt, into, es, query_string, paramLI, queryEnv); diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 84a69b064a..07c9d0f3d7 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -23,6 +23,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" +#include "commands/createas.h" #include "foreign/fdwapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -1103,11 +1104,49 @@ set_append_rel_size(PlannerInfo *root, RelOptInfo *rel, if (root->glob->parallelModeOK && rel->consider_parallel) set_rel_consider_parallel(root, childrel, childRTE); + /* + * When subplan is subquery, It's possible to do parallel insert if top + * node of subquery is Gather, so we set the flag to ignore parallel + * tuple cost by the Gather path in cost_gather if the SELECT is for + * CTAS. + */ + if (childrel->rtekind == RTE_SUBQUERY) + { + /* + * We set the flag for two cases when there is no parent path will + * be created(such as : limit,sort,distinct...): + * i) query_level is 1 + * ii) query_level > 1 then set the flag in the parent_root. + * The case ii) is to check append under append: + * Append + * ->Append + * ->Gather + * ->Other plan + */ + if (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_SELECT && + (root->query_level == 1 || + root->parent_root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND) && + !(HAS_PARENT_PATH_GENERATING_CLAUSE(root))) + { + root->parse->CTASParallelInsInfo |= + CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND; + } + } + /* * Compute the child's size. */ set_rel_size(root, childrel, childRTindex, childRTE); + if (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND) + { + root->parse->CTASParallelInsInfo &= + ~CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND; + } + /* * It is possible that constraint exclusion detected a contradiction * within a child subquery, even though we didn't prove one above. If diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 22d6935824..800f25903d 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -76,6 +76,7 @@ #include "access/amapi.h" #include "access/htup_details.h" #include "access/tsmapi.h" +#include "commands/createas.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeHash.h" @@ -378,6 +379,7 @@ cost_gather(GatherPath *path, PlannerInfo *root, { Cost startup_cost = 0; Cost run_cost = 0; + bool ignore_tuple_cost = false; /* Mark the path with the correct row estimate */ if (rows) @@ -393,7 +395,25 @@ cost_gather(GatherPath *path, PlannerInfo *root, /* Parallel setup and communication cost. */ startup_cost += parallel_setup_cost; - run_cost += parallel_tuple_cost * path->path.rows; + + /* + * Do not consider tuple cost in case of we intend to perform parallel + * inserts by workers. We would have set ignore flag in + * apply_scanjoin_target_to_paths before generating Gather path for the + * upper level SELECT part of the CTAS. + */ + if ((root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT) && + (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN)) + { + ignore_tuple_cost = true; + root->parse->CTASParallelInsInfo &= + ~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_IGNORED; + } + + if (!ignore_tuple_cost) + run_cost += parallel_tuple_cost * path->path.rows; path->path.startup_cost = startup_cost; path->path.total_cost = (startup_cost + run_cost); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 1a94b58f8b..7555cde61a 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -28,6 +28,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/createas.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "foreign/fdwapi.h" @@ -7338,6 +7339,43 @@ can_partial_agg(PlannerInfo *root) return true; } +/* + * ignore_parallel_tuple_cost + * + * Gather node will not receive any tuples from the workers in case each worker + * inserts them in parallel. So, we set a flag to ignore parallel tuple cost by + * the Gather path in cost_gather if the SELECT is for CTAS and we are + * generating an upper level Gather path. +*/ +static bool +ignore_parallel_tuple_cost(PlannerInfo *root) +{ + if (root->query_level != 1 && + (root->parent_root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND)) + { + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_SELECT; + } + + if (root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT) + { + /* + * In each of the HAS_PARENT_PATH_GENERATING_CLAUSE cases, a parent + * path will be generated for the upper Gather path(in + * grouping_planner), in which case we can not let parallel inserts + * happen. So we do not set ignore tuple cost flag. + */ + if (HAS_PARENT_PATH_GENERATING_CLAUSE(root)) + return false; + + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + + return true; + } + + return false; +} + /* * apply_scanjoin_target_to_paths * @@ -7557,8 +7595,29 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) + { + /* + * Set a flag to ignore parallel tuple cost by the Gather path in + * cost_gather if the SELECT is for CTAS and we are generating an upper + * level Gather path. + */ + bool ignore = ignore_parallel_tuple_cost(root); + generate_useful_gather_paths(root, rel, false); + /* + * Reset the ignore flag, in case we set it but + * generate_useful_gather_paths returned without reaching cost_gather. + */ + if (ignore && + (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN)) + { + root->parse->CTASParallelInsInfo &= + ~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + } + } + /* * Reassess which paths are the cheapest, now that we've potentially added * new Gather (or Gather Merge) and/or Append (or MergeAppend) paths to diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h index ed4690305b..4103ac65f0 100644 --- a/src/include/commands/createas.h +++ b/src/include/commands/createas.h @@ -39,6 +39,26 @@ typedef struct Oid object_id; } DR_intorel; +/* + * Information sent to the planner from CTAS to account for the cost + * calculations in cost_gather. We need to do this because, no tuples will be + * received by the Gather node if the workers insert the tuples in parallel. + */ +typedef enum CTASParallelInsertOpt +{ + CTAS_PARALLEL_INS_UNDEF = 0, /* undefined */ + CTAS_PARALLEL_INS_SELECT = 1 << 0, /* set to this before planning */ + /* + * Set to this while planning for upper Gather path to ignore parallel + * tuple cost in cost_gather. + */ + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN = 1 << 1, + /* Set to this after the cost is ignored. */ + CTAS_PARALLEL_INS_TUP_COST_IGNORED = 1 << 2, + /* Set to this in case tuple cost needs to be ignored for Append cases. */ + CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND = 1 << 3 +} CTASParallelInsertOpt; + #define IS_CTAS(intoclause) (intoclause && IsA(intoclause, IntoClause)) #define IS_PARALLEL_CTAS_DEST(dest) (dest && dest->mydest == DestIntoRel && \ IS_CTAS(((DR_intorel *) dest)->into) && \ @@ -53,5 +73,6 @@ extern int GetIntoRelEFlags(IntoClause *intoClause); extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause); extern void ChooseParallelInsertsInCTAS(IntoClause *into, - QueryDesc *queryDesc); + QueryDesc *queryDesc, + uint8 *tuple_cost_flags); #endif /* CREATEAS_H */ diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h index ba661d32a6..1a1806dbf1 100644 --- a/src/include/commands/explain.h +++ b/src/include/commands/explain.h @@ -91,7 +91,8 @@ extern void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage); + const BufferUsage *bufusage, + uint8 *ctas_tuple_cost_flags); extern void ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc); extern void ExplainPrintTriggers(ExplainState *es, QueryDesc *queryDesc); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 48a79a7657..81b148c383 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -180,6 +180,7 @@ typedef struct Query */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ + uint8 CTASParallelInsInfo; /* parallel insert in CTAS info */ } Query; diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index beb7dbbcbe..74b2563828 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -21,6 +21,16 @@ #include "nodes/pathnodes.h" #include "nodes/plannodes.h" +#define HAS_PARENT_PATH_GENERATING_CLAUSE(root) \ + (root->parse->rowMarks || \ + limit_needed(root->parse) || \ + root->parse->sortClause || \ + root->parse->distinctClause || \ + root->parse->hasWindowFuncs || \ + root->parse->groupClause || \ + root->parse->groupingSets || \ + root->parse->hasAggs || \ + root->hasHavingQual) /* Hook for plugins to get control in planner() */ typedef PlannedStmt *(*planner_hook_type) (Query *parse, -- 2.25.1