From 9a6720c2ed0906f3f2f12dd30390945933afa972 Mon Sep 17 00:00:00 2001 From: Greg Nancarrow Date: Fri, 19 Feb 2021 11:20:56 +1100 Subject: [PATCH v18 1/4] Enable parallel SELECT for "INSERT INTO ... SELECT ...", where it is safe to do so. Parallel SELECT can't be utilized for INSERT in the following cases: - INSERT statement uses the ON CONFLICT DO UPDATE clause - Target table has a parallel-unsafe trigger, index expression, column default expression or check constraint - Target table is a partitioned table with a parallel-unsafe partition key expression or support function The planner is updated to perform additional parallel-safety checks for the cases listed above, for determining whether it is safe to run INSERT in parallel-mode with an underlying parallel SELECT. The planner is further updated to consider using parallel SELECT for "INSERT INTO ... SELECT ...", provided nothing unsafe is found from the additional parallel-safety checks, or from the existing parallel-safety checks for SELECT. Prior to entering parallel-mode for execution of INSERT with parallel SELECT, a TransactionId is acquired and assigned to the current transaction state which is then serialized in the parallel DSM for the parallel workers to use. This patch includes a TEMPORARY fix for a bug in the query rewriter. Discussion: https://www.postgresql-archive.org/Bug-in-query-rewriter-hasModifyingCTE-not-getting-set-td6176917.html It was found that for re-written queries with a modifying CTE, the hasModifyingCTE flag was not getting set to true - resulting in a test failure in the "with" tests when force_parallel_mode=regress was in effect, as it allowed a CTE with a DELETE statement to be executed in parallel-mode. This TEMPORARY fix (made in the planner) will need to be removed once the bug in query rewriter is fixed. Note that this fix is currently just done for INSERT, but the problem actually exists for SELECT too. Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com --- src/backend/access/transam/xact.c | 22 ++ src/backend/executor/execMain.c | 3 + src/backend/optimizer/plan/planner.c | 93 ++++- src/backend/optimizer/plan/setrefs.c | 17 + src/backend/optimizer/util/clauses.c | 490 ++++++++++++++++++++++++++- src/include/access/xact.h | 15 + src/include/nodes/pathnodes.h | 2 + src/include/optimizer/clauses.h | 4 +- 8 files changed, 630 insertions(+), 16 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 17fbc41bbb..2456a2bf10 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -41,6 +41,7 @@ #include "libpq/be-fsstubs.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "optimizer/optimizer.h" #include "pg_trace.h" #include "pgstat.h" #include "replication/logical.h" @@ -1014,6 +1015,27 @@ IsInParallelMode(void) return CurrentTransactionState->parallelModeLevel != 0; } +/* + * PrepareParallelModePlanExec + * + * Prepare for entering parallel mode plan execution, based on command-type. + */ +void +PrepareParallelModePlanExec(CmdType commandType) +{ + if (IsModifySupportedInParallelMode(commandType)) + { + Assert(!IsInParallelMode()); + + /* + * Prepare for entering parallel mode by assigning a + * FullTransactionId, to be included in the transaction state that is + * serialized in the parallel DSM. + */ + (void) GetCurrentTransactionId(); + } +} + /* * CommandCounterIncrement */ diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index c74ce36ffb..0648dd82ba 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1512,7 +1512,10 @@ ExecutePlan(EState *estate, estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) + { + PrepareParallelModePlanExec(estate->es_plannedstmt->commandType); EnterParallelMode(); + } /* * Loop until we've processed the proper number of tuples from the plan. diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index adf68d8790..cf3604a7ea 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -246,7 +246,7 @@ static bool group_by_has_partkey(RelOptInfo *input_rel, List *targetList, List *groupClause); static int common_prefix_cmp(const void *a, const void *b); - +static bool query_has_modifying_cte(Query *parse); /***************************************************************************** * @@ -305,6 +305,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->resultRelations = NIL; glob->appendRelations = NIL; glob->relationOids = NIL; + glob->partitionOids = NIL; glob->invalItems = NIL; glob->paramExecTypes = NIL; glob->lastPHId = 0; @@ -313,19 +314,35 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->transientPlan = false; glob->dependsOnRole = false; + if (IsModifySupportedInParallelMode(parse->commandType) && + !parse->hasModifyingCTE) + { + /* + * FIXME + * There is a known bug in the query rewriter: re-written queries with + * a modifying CTE may not have the "hasModifyingCTE" flag set. When + * that bug is fixed, this temporary fix must be removed. + * + * Note that here we've made a fix for this problem only for a + * supported-in-parallel-mode table-modification statement (i.e. + * INSERT), but this bug exists for SELECT too. + */ + parse->hasModifyingCTE = query_has_modifying_cte(parse); + } + /* * Assess whether it's feasible to use parallel mode for this query. We * can't do this in a standalone backend, or if the command will try to - * modify any data, or if this is a cursor operation, or if GUCs are set - * to values that don't permit parallelism, or if parallel-unsafe - * functions are present in the query tree. + * modify any data using a CTE, or if this is a cursor operation, or if + * GUCs are set to values that don't permit parallelism, or if + * parallel-unsafe functions are present in the query tree. * - * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE - * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader - * backend writes into a completely new table. In the future, we can - * extend it to allow workers to write into the table. However, to allow - * parallel updates and deletes, we have to solve other problems, - * especially around combo CIDs.) + * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT + * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as + * of now, only the leader backend writes into a completely new table. In + * the future, we can extend it to allow workers to write into the table. + * However, to allow parallel updates and deletes, we have to solve other + * problems, especially around combo CIDs.) * * For now, we don't try to use parallel mode if we're running inside a * parallel worker. We might eventually be able to relax this @@ -334,13 +351,15 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || + (IsModifySupportedInParallelMode(parse->commandType) && + is_parallel_possible_for_modify(parse))) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) { /* all the cheap tests pass, so scan the query tree */ - glob->maxParallelHazard = max_parallel_hazard(parse); + glob->maxParallelHazard = max_parallel_hazard(parse, glob); glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); } else @@ -7768,3 +7787,53 @@ group_by_has_partkey(RelOptInfo *input_rel, return true; } + +/* + * Determine if the specified query has a modifying-CTE. + * + * There is a known bug in the query rewriter: re-written queries with + * a modifying CTE may not have the "hasModifyingCTE" flag set. When + * that bug is fixed, this function should be removed. + * + */ +static bool +query_has_modifying_cte(Query *parsetree) +{ + int rt_index; + bool hasModifyingCTE = false; + + /* Recursively check subqueries */ + rt_index = 0; + while (rt_index < list_length(parsetree->rtable)) + { + RangeTblEntry *rte; + ++rt_index; + rte = rt_fetch(rt_index, parsetree->rtable); + if (rte->rtekind == RTE_SUBQUERY) + { + hasModifyingCTE = query_has_modifying_cte(rte->subquery); + if (hasModifyingCTE) + break; + } + } + + if (!hasModifyingCTE) + { + ListCell *lc; + + /* Check for INSERT/UPDATE/DELETE CTEs */ + foreach(lc, parsetree->cteList) + { + CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc); + Query *ctequery = castNode(Query, cte->ctequery); + + if (ctequery->commandType != CMD_SELECT) + { + hasModifyingCTE = true; + break; + } + } + } + + return hasModifyingCTE; +} diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index c3c36be13e..f7d91c67bb 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -15,7 +15,10 @@ */ #include "postgres.h" +#include "access/table.h" +#include "access/xact.h" #include "access/transam.h" +#include "catalog/pg_class.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -24,6 +27,8 @@ #include "optimizer/planmain.h" #include "optimizer/planner.h" #include "optimizer/tlist.h" +#include "parser/parsetree.h" +#include "partitioning/partdesc.h" #include "tcop/utility.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -260,6 +265,18 @@ set_plan_references(PlannerInfo *root, Plan *plan) */ add_rtes_to_flat_rtable(root, false); + /* + * If modifying a partitioned table, add its parallel-safety-checked + * partitions too to glob->relationOids, to register them as plan + * dependencies. + */ + if (IsModifySupportedInParallelMode(root->parse->commandType)) + { + if (glob->partitionOids != NIL) + glob->relationOids = + list_concat(glob->relationOids, glob->partitionOids); + } + /* * Adjust RT indexes of PlanRowMarks and add to final rowmarks list */ diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index f3786dd2b6..af8b6d4437 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -19,13 +19,19 @@ #include "postgres.h" +#include "access/genam.h" #include "access/htup_details.h" +#include "access/table.h" +#include "access/xact.h" +#include "catalog/index.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_class.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -43,6 +49,8 @@ #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_func.h" +#include "parser/parsetree.h" +#include "partitioning/partdesc.h" #include "rewrite/rewriteManip.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -51,6 +59,8 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/partcache.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/typcache.h" @@ -88,6 +98,9 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + RangeTblEntry *target_rte; /* query's target relation if any */ + CmdType command_type; /* query's command type */ + PlannerGlobal *planner_global; /* global info for planner invocation */ } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -98,6 +111,15 @@ static bool contain_volatile_functions_walker(Node *node, void *context); static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context); static bool max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context); +static bool target_rel_max_parallel_hazard(max_parallel_hazard_context *context); +static bool target_rel_max_parallel_hazard_recurse(Relation relation, + CmdType command_type, + max_parallel_hazard_context *context); +static bool target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc, + max_parallel_hazard_context *context); +static bool target_rel_index_expr_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static bool target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context); static bool contain_nonstrict_functions_walker(Node *node, void *context); static bool contain_exec_param_walker(Node *node, List *param_ids); static bool contain_context_dependent_node(Node *clause); @@ -148,7 +170,6 @@ static Query *substitute_actual_srf_parameters(Query *expr, static Node *substitute_actual_srf_parameters_mutator(Node *node, substitute_actual_srf_parameters_context *context); - /***************************************************************************** * Aggregate-function clause manipulation *****************************************************************************/ @@ -545,14 +566,19 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context) * later, in the common case where everything is SAFE. */ char -max_parallel_hazard(Query *parse) +max_parallel_hazard(Query *parse, PlannerGlobal *glob) { max_parallel_hazard_context context; context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; context.safe_param_ids = NIL; + context.target_rte = parse->resultRelation > 0 ? + rt_fetch(parse->resultRelation, parse->rtable) : NULL; + context.command_type = parse->commandType; + context.planner_global = glob; (void) max_parallel_hazard_walker((Node *) parse, &context); + return context.max_hazard; } @@ -583,6 +609,9 @@ is_parallel_safe(PlannerInfo *root, Node *node) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; + context.command_type = node != NULL && IsA(node, Query) ? + castNode(Query, node)->commandType : CMD_UNKNOWN; + context.planner_global = root->glob; /* * The params that refer to the same or parent query level are considered @@ -757,6 +786,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) } return false; /* nothing to recurse to */ } + else if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rte = (RangeTblEntry *) node; + + /* Nothing interesting to check for SELECTs */ + if (context->target_rte == NULL) + return false; + + if (rte == context->target_rte) + return target_rel_max_parallel_hazard(context); + + return false; + } /* * When we're first invoked on a completely unplanned tree, we must @@ -777,7 +819,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) /* Recurse into subselects */ return query_tree_walker(query, max_parallel_hazard_walker, - context, 0); + context, + context->target_rte != NULL ? + QTW_EXAMINE_RTES_BEFORE: 0); } /* Recurse to check arguments */ @@ -786,6 +830,446 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) context); } +/* + * target_rel_trigger_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for the specified trigger data. + */ +static bool +target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc, + max_parallel_hazard_context *context) +{ + int i; + + for (i = 0; i < trigdesc->numtriggers; i++) + { + int trigtype; + Trigger *trigger = &trigdesc->triggers[i]; + + if (max_parallel_hazard_test(func_parallel(trigger->tgfoid), context)) + return true; + + /* + * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in + * the relation, and this would result in creation of new CommandIds + * on insert/update/delete and this isn't supported in a parallel + * worker (but is safe in the parallel leader). + */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_FK) + { + if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) + return true; + } + } + + return false; +} + +/* + * target_rel_index_expr_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for any existing index + * expressions of a specified relation. + */ +static bool +target_rel_index_expr_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + List *index_oid_list; + ListCell *lc; + bool found_max_hazard = false; + LOCKMODE lockmode = AccessShareLock; + + index_oid_list = RelationGetIndexList(rel); + foreach(lc, index_oid_list) + { + Relation index_rel; + Form_pg_index indexStruct; + List *ii_Expressions; + Oid index_oid = lfirst_oid(lc); + + index_rel = index_open(index_oid, lockmode); + + indexStruct = index_rel->rd_index; + ii_Expressions = RelationGetIndexExpressions(index_rel); + + if (ii_Expressions != NIL) + { + int i; + ListCell *index_expr_item = list_head(ii_Expressions); + + for (i = 0; i < indexStruct->indnatts; i++) + { + int keycol = indexStruct->indkey.values[i]; + + if (keycol == 0) + { + /* Found an index expression */ + + Node *index_expr; + + Assert(index_expr_item != NULL); + if (index_expr_item == NULL) /* shouldn't happen */ + { + elog(WARNING, "too few entries in indexprs list"); + context->max_hazard = PROPARALLEL_UNSAFE; + found_max_hazard = true; + break; + } + + index_expr = (Node *) lfirst(index_expr_item); + + if (max_parallel_hazard_walker(index_expr, context)) + { + found_max_hazard = true; + break; + } + + index_expr_item = lnext(ii_Expressions, index_expr_item); + } + } + } + index_close(index_rel, lockmode); + } + list_free(index_oid_list); + + return found_max_hazard; +} + +/* + * target_rel_domain_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for the specified DOMAIN type. + * Only any CHECK expressions are examined for parallel safety. + * DEFAULT values of DOMAIN-type columns in the target-list are already + * being checked for parallel-safety in the max_parallel_hazard() scan of the + * query tree in standard_planner(). + * + */ +static bool +target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context) +{ + Relation con_rel; + ScanKeyData key[1]; + SysScanDesc scan; + HeapTuple tup; + bool found_max_hazard = false; + + LOCKMODE lockmode = AccessShareLock; + + con_rel = table_open(ConstraintRelationId, lockmode); + + ScanKeyInit(&key[0], + Anum_pg_constraint_contypid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(typid)); + scan = systable_beginscan(con_rel, ConstraintTypidIndexId, true, + NULL, 1, key); + + while (HeapTupleIsValid((tup = systable_getnext(scan)))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup); + + if (con->contype == CONSTRAINT_CHECK) + { + char *conbin; + Datum val; + bool isnull; + Expr *check_expr; + + val = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_conbin, &isnull); + Assert(!isnull); + if (isnull) + { + /* + * This shouldn't ever happen, but if it does, log a WARNING + * and return UNSAFE, rather than erroring out. + */ + elog(WARNING, "null conbin for constraint %u", con->oid); + context->max_hazard = PROPARALLEL_UNSAFE; + found_max_hazard = true; + break; + } + conbin = TextDatumGetCString(val); + check_expr = stringToNode(conbin); + pfree(conbin); + if (max_parallel_hazard_walker((Node *) check_expr, context)) + { + found_max_hazard = true; + break; + } + } + } + + systable_endscan(scan); + table_close(con_rel, lockmode); + return found_max_hazard; +} + +/* + * target_rel_max_parallel_hazard + * + * Determines the maximum parallel-mode hazard level for modification + * of a specified relation. + */ +static bool +target_rel_max_parallel_hazard(max_parallel_hazard_context *context) +{ + bool max_hazard_found; + + Relation targetRel = table_open(context->target_rte->relid, + context->target_rte->rellockmode); + max_hazard_found = target_rel_max_parallel_hazard_recurse(targetRel, + context->command_type, + context); + + table_close(targetRel, NoLock); + + return max_hazard_found; +} + +static bool +target_rel_max_parallel_hazard_recurse(Relation rel, + CmdType command_type, + max_parallel_hazard_context *context) +{ + TupleDesc tupdesc; + int attnum; + + /* Currently only CMD_INSERT is supported */ + Assert(command_type == CMD_INSERT); + + /* + * We can't support table modification in a parallel worker if it's a + * foreign table/partition (no FDW API for supporting parallel access) or a + * temporary table. + */ + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(rel)) + { + if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) + { + return true; + } + } + + /* + * If a partitioned table, check that each partition is safe for + * modification in parallel-mode. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + int i; + PartitionDesc pdesc; + PartitionKey pkey; + ListCell *partexprs_item; + int partnatts; + List *partexprs; + PlannerGlobal *glob; + + pkey = RelationGetPartitionKey(rel); + + partnatts = get_partition_natts(pkey); + partexprs = get_partition_exprs(pkey); + + partexprs_item = list_head(partexprs); + for (i = 0; i < partnatts; i++) + { + /* Check parallel-safety of partition key support functions */ + if (OidIsValid(pkey->partsupfunc[i].fn_oid)) + { + if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context)) + { + return true; + } + } + + /* Check parallel-safety of any expressions in the partition key */ + if (get_partition_col_attnum(pkey, i) == 0) + { + Node *check_expr = (Node *) lfirst(partexprs_item); + + if (max_parallel_hazard_walker(check_expr, context)) + { + return true; + } + + partexprs_item = lnext(partexprs, partexprs_item); + } + } + + /* Recursively check each partition ... */ + + /* Create the PartitionDirectory infrastructure if we didn't already */ + glob = context->planner_global; + if (glob->partition_directory == NULL) + glob->partition_directory = + CreatePartitionDirectory(CurrentMemoryContext); + + pdesc = PartitionDirectoryLookup(glob->partition_directory, rel); + + for (i = 0; i < pdesc->nparts; i++) + { + bool max_hazard_found; + Relation part_rel; + + part_rel = table_open(pdesc->oids[i], AccessShareLock); + max_hazard_found = target_rel_max_parallel_hazard_recurse(part_rel, + command_type, + context); + table_close(part_rel, AccessShareLock); + + /* Record the partition as a plan dependency. */ + glob->partitionOids = + lappend_oid(glob->partitionOids, pdesc->oids[i]); + + if (max_hazard_found) + { + return true; + } + } + } + + /* + * If there are any index expressions, check that they are parallel-mode + * safe. + */ + if (target_rel_index_expr_max_parallel_hazard(rel, context)) + { + return true; + } + + /* + * If any triggers exist, check that they are parallel safe. + */ + if (rel->trigdesc != NULL) + { + if (target_rel_trigger_max_parallel_hazard(rel->trigdesc, context)) + { + return true; + } + } + + /* + * Column default expressions and check constraints are only applicable to + * INSERT and UPDATE, but since only parallel INSERT is currently supported, + * only command_type==CMD_INSERT is checked here. + */ + if (command_type == CMD_INSERT) + { + /* + * Column default expressions for columns in the target-list are already + * being checked for parallel-safety in the max_parallel_hazard() scan + * of the query tree in standard_planner(). + * Note that even though column defaults may be specified separately for + * each partition in a partitioned table, a partition's default value is + * not applied when inserting a tuple through a partitioned table. + */ + + tupdesc = RelationGetDescr(rel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + /* + * If the column is of a DOMAIN type, determine whether that + * domain has any CHECK expressions that are not parallel-mode + * safe. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + if (target_rel_domain_max_parallel_hazard(att->atttypid, context)) + { + return true; + } + } + } + + /* + * Determine if there are any CHECK constraints which are not + * parallel-safe. + */ + if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0) + { + int i; + + ConstrCheck *check = tupdesc->constr->check; + + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *check_expr = stringToNode(check->ccbin); + + if (max_parallel_hazard_walker((Node *) check_expr, context)) + { + return true; + } + } + } + } + + return false; +} + +/* + * is_parallel_possible_for_modify + * + * Check at a high-level if parallel mode is able to be used for the specified + * table-modification statement. + * It's not possible in the following cases: + * + * 1) INSERT...ON CONFLICT...DO UPDATE + * 2) INSERT without SELECT + * + * (Note: we don't do in-depth parallel-safety checks here, we do only the + * cheaper tests that can quickly exclude obvious cases for which + * parallelism isn't supported, to avoid having to do further parallel-safety + * checks for these) + */ +bool +is_parallel_possible_for_modify(Query *parse) +{ + bool hasSubQuery; + RangeTblEntry *rte; + ListCell *lc; + + Assert(IsModifySupportedInParallelMode(parse->commandType)); + + /* + * UPDATE is not currently supported in parallel-mode, so prohibit + * INSERT...ON CONFLICT...DO UPDATE... + * In order to support update, even if only in the leader, some + * further work would need to be done. A mechanism would be needed + * for sharing combo-cids between leader and workers during + * parallel-mode, since for example, the leader might generate a + * combo-cid and it needs to be propagated to the workers. + */ + if (parse->commandType == CMD_INSERT && + parse->onConflict != NULL && + parse->onConflict->action == ONCONFLICT_UPDATE) + return false; + + /* + * If there is no underlying SELECT, a parallel table-modification + * operation is not possible (nor desirable). + */ + hasSubQuery = false; + foreach(lc, parse->rtable) + { + rte = lfirst_node(RangeTblEntry, lc); + if (rte->rtekind == RTE_SUBQUERY) + { + hasSubQuery = true; + break; + } + } + + return hasSubQuery; +} /***************************************************************************** * Check clauses for nonstrict functions diff --git a/src/include/access/xact.h b/src/include/access/xact.h index f49a57b35e..34cfaf542c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); +extern void PrepareParallelModePlanExec(CmdType commandType); + +/* + * IsModifySupportedInParallelMode + * + * Indicates whether execution of the specified table-modification command + * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain + * parallel-safety conditions. + */ +static inline bool +IsModifySupportedInParallelMode(CmdType commandType) +{ + /* Currently only INSERT is supported */ + return (commandType == CMD_INSERT); +} #endif /* XACT_H */ diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 0ec93e648c..6eabab4224 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -120,6 +120,8 @@ typedef struct PlannerGlobal List *relationOids; /* OIDs of relations the plan depends on */ + List *partitionOids; /* OIDs of partitions the plan depends on */ + List *invalItems; /* other dependencies, as PlanInvalItems */ List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */ diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 0673887a85..e2f0fe3aa6 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -32,7 +32,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause); extern bool contain_subplans(Node *clause); -extern char max_parallel_hazard(Query *parse); +extern char max_parallel_hazard(Query *parse, PlannerGlobal *glob); extern bool is_parallel_safe(PlannerInfo *root, Node *node); extern bool contain_nonstrict_functions(Node *clause); extern bool contain_exec_param(Node *clause, List *param_ids); @@ -52,5 +52,7 @@ extern void CommuteOpExpr(OpExpr *clause); extern Query *inline_set_returning_function(PlannerInfo *root, RangeTblEntry *rte); +extern char max_parallel_hazard_for_modify(Query *parse, char initial_max_parallel_hazard); +extern bool is_parallel_possible_for_modify(Query *parse); #endif /* CLAUSES_H */ -- 2.27.0