From b8990055f96a052bc0d6609df59d072e02ec0db0 Mon Sep 17 00:00:00 2001 From: Greg Nancarrow Date: Thu, 15 Oct 2020 13:59:25 +1100 Subject: [PATCH v2] Enable parallel SELECT for "INSERT INTO ... SELECT ...". Enable "INSERT INTO ... SELECT ..." to utilize a parallel SELECT, where it is safe to do so. Parallel SELECT can't be utilized in the following cases: - INSERT statement uses ON CONFLICT ... DO UPDATE ... - Target table is a foreign or temporary table - Target table has a: - Row-level trigger or transition-table trigger - Parallel-unsafe trigger - Foreign key trigger (RI_TRIGGER_FK) - Parallel-unsafe index expression - Parallel-unsafe column default expression - Parallel-unsafe check constraint - Partitioned table or partition with any of the above parallel-unsafe features. Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com --- src/backend/access/transam/varsup.c | 5 +- src/backend/access/transam/xact.c | 4 +- src/backend/optimizer/plan/planner.c | 295 ++++++++++++++++++++++++++++++++++- 3 files changed, 294 insertions(+), 10 deletions(-) diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index a4944fa..925c875 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -15,6 +15,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/parallel.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -56,8 +57,8 @@ GetNewTransactionId(bool isSubXact) * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new XIDs after that point. */ - if (IsInParallelMode()) - elog(ERROR, "cannot assign TransactionIds during a parallel operation"); + if (IsParallelWorker()) + elog(ERROR, "cannot assign TransactionIds in a parallel worker"); /* * During bootstrap initialization, we return the special bootstrap diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index af6afce..ef423fb 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -580,8 +580,8 @@ AssignTransactionId(TransactionState s) * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new XIDs at this point. */ - if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot assign XIDs during a parallel operation"); + if (IsParallelWorker()) + elog(ERROR, "cannot assign XIDs in a parallel worker"); /* * Ensure parent(s) have XIDs, so that a child always has an XID later diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index f331f82..3bd2e22 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -24,10 +24,12 @@ #include "access/sysattr.h" #include "access/table.h" #include "access/xact.h" +#include "catalog/index.h" #include "catalog/pg_constraint.h" #include "catalog/pg_inherits.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "foreign/fdwapi.h" @@ -58,6 +60,7 @@ #include "parser/parse_agg.h" #include "parser/parsetree.h" #include "partitioning/partdesc.h" +#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" #include "storage/dsm_impl.h" #include "utils/lsyscache.h" @@ -248,7 +251,11 @@ static bool group_by_has_partkey(RelOptInfo *input_rel, List *targetList, List *groupClause); static int common_prefix_cmp(const void *a, const void *b); - +static bool IsTriggerDataParallelModeSafe(TriggerDesc *trigdesc); +static bool IsRelParallelModeSafeForModify(Oid relid); +static bool AreIndexExprsParallelModeSafe(Relation rel); +static bool IsParallelModeSafeForModify(Query *parse); +static bool IsModifySupportedInParallelMode(CmdType commandType); /***************************************************************************** * @@ -319,11 +326,11 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* * Assess whether it's feasible to use parallel mode for this query. We * can't do this in a standalone backend, or if the command will try to - * modify any data, or if this is a cursor operation, or if GUCs are set - * to values that don't permit parallelism, or if parallel-unsafe - * functions are present in the query tree. + * modify any data using a CTE, or if this is a cursor operation, or if + * GUCs are set to values that don't permit parallelism, or if + * parallel-unsafe functions are present in the query tree. * - * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE + * (Note that we do allow CREATE TABLE AS, INSERT, SELECT INTO, and CREATE * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader * backend writes into a completely new table. In the future, we can * extend it to allow workers to write into the table. However, to allow @@ -337,7 +344,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || + IsModifySupportedInParallelMode(parse->commandType)) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) @@ -345,6 +353,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* all the cheap tests pass, so scan the query tree */ glob->maxParallelHazard = max_parallel_hazard(parse); glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); + + if (glob->parallelModeOK && + IsModifySupportedInParallelMode(parse->commandType)) + { + glob->parallelModeOK = IsParallelModeSafeForModify(parse); + if (!glob->parallelModeOK) + glob->maxParallelHazard = PROPARALLEL_UNSAFE; + } } else { @@ -7355,6 +7371,273 @@ can_partial_agg(PlannerInfo *root, const AggClauseCosts *agg_costs) } /* + * IsModifySupportedInParallelMode + * + * Indicates whether execution of the specified table-modification command + * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain + * conditions. + */ +static pg_attribute_always_inline bool +IsModifySupportedInParallelMode(CmdType commandType) +{ + /* Currently only INSERT is supported */ + return (commandType == CMD_INSERT); +} + +/* + * IsTriggerDataParallelModeSafe + * + * Checks if the specified trigger data is parallel-mode safe. + * Returns false if any one of the triggers are not safe for parallel + * operation. + */ +static bool +IsTriggerDataParallelModeSafe(TriggerDesc *trigdesc) +{ + int i; + + /* + * Can't support execution of row-level or transition-table triggers + * during parallel-mode, since such triggers may query the table + * into which the data is being inserted, and the content returned + * would vary unpredictably according to the order of retrieval by + * the workers and the rows already inserted. + */ + if (trigdesc != NULL && + (trigdesc->trig_insert_instead_row || + trigdesc->trig_insert_before_row || + trigdesc->trig_insert_after_row || + trigdesc->trig_insert_new_table)) + { + return false; + } + + for (i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + int trigtype; + + if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE) + return false; + + /* If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in + * the relation, and this would result in creation of new CommandIds + * on insert/update/delete and this isn't supported during + * parallel-mode. + */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_FK) + return false; + } + + return true; +} + +/* + * AreIndexExprsParallelModeSafe + * + * Checks if index expressions for a specified relation are parallel-mode safe. + * Returns false if any index expressions exist which are not safe for parallel + * operation. + */ +static bool +AreIndexExprsParallelModeSafe(Relation rel) +{ + List *indexOidList; + ListCell *lc; + LOCKMODE lockmode = AccessShareLock; + + indexOidList = RelationGetIndexList(rel); + foreach(lc, indexOidList) + { + Oid indexOid = lfirst_oid(lc); + Relation indexRel; + IndexInfo *indexInfo; + + indexRel = index_open(indexOid, lockmode); + + indexInfo = BuildIndexInfo(indexRel); + + if (indexInfo->ii_Expressions != NIL) + { + int i; + ListCell *indexExprItem = list_head(indexInfo->ii_Expressions); + + for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) + { + int keycol = indexInfo->ii_IndexAttrNumbers[i]; + if (keycol == 0) + { + /* Found an index expression */ + + Node *indexExpr; + + if (indexExprItem == NULL) /* shouldn't happen */ + elog(ERROR, "too few entries in indexprs list"); + + indexExpr = (Node *)lfirst(indexExprItem); + indexExpr = (Node *)expression_planner((Expr *)indexExpr); + + if (max_parallel_hazard((Query *)indexExpr) != PROPARALLEL_SAFE) + { + index_close(indexRel, lockmode); + return false; + } + + indexExprItem = lnext(indexInfo->ii_Expressions, indexExprItem); + } + } + } + index_close(indexRel, lockmode); + } + + return true; +} + +/* + * IsRelParallelModeSafeForModify + * + * Determines whether a specified relation is safe for modification in + * parallel-mode. + */ +static bool +IsRelParallelModeSafeForModify(Oid relid) +{ + Relation rel; + TupleDesc tupdesc; + int attnum; + + LOCKMODE lockmode = AccessShareLock; + + rel = table_open(relid, lockmode); + + /* + * We can't support table modification in parallel-mode if it's a + * foreign table/partition (no FDW API for supporting parallel access) + * or a temporary table. + */ + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(rel)) + { + table_close(rel, lockmode); + return false; + } + + /* + * If a partitioned table, check that each partition is safe for + * modification in parallel-mode. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + int i; + PartitionDesc pd = RelationGetPartitionDesc(rel); + for (i = 0; i < pd->nparts; i++) + { + if (!IsRelParallelModeSafeForModify(pd->oids[i])) + { + table_close(rel, lockmode); + return false; + } + } + } + + /* + * If there are any index expressions, check that they are parallel-mode + * safe. + */ + if (!AreIndexExprsParallelModeSafe(rel)) + { + table_close(rel, lockmode); + return false; + } + + /* + * If any triggers exist, check that they are parallel safe. + */ + if (rel->trigdesc != NULL && + !IsTriggerDataParallelModeSafe(rel->trigdesc)) + { + table_close(rel, lockmode); + return false; + } + + /* + * Check if there are any column default expressions which are not + * parallel-mode safe. + */ + tupdesc = RelationGetDescr(rel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Expr *defexpr; + + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atthasdef) + { + defexpr = (Expr *)build_column_default(rel, attnum + 1); + + /* Run the expression through planner */ + defexpr = expression_planner(defexpr); + + if (max_parallel_hazard((Query *)defexpr) != PROPARALLEL_SAFE) + { + table_close(rel, lockmode); + return false; + } + } + } + + /* + * Check if there are any CHECK constraints which are not parallel-safe. + */ + if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0) + { + int i; + + ConstrCheck *check = tupdesc->constr->check; + + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *checkExpr = stringToNode(check->ccbin); + if (max_parallel_hazard((Query *)checkExpr) != PROPARALLEL_SAFE) + { + table_close(rel, lockmode); + return false; + } + } + } + + table_close(rel, lockmode); + return true; +} + +/* + * IsParallelModeSafeForModify + * + * Determines whether the specified table-modification statement is + * parallel-mode safe, based on the statement attributes and target table. + */ +static bool +IsParallelModeSafeForModify(Query *parse) +{ + RangeTblEntry *rte; + + /* + * UPDATE is not currently supported in parallel-mode, so prohibit + * INSERT...ON CONFLICT...DO UPDATE... + */ + if (parse->onConflict != NULL && parse->onConflict->action == ONCONFLICT_UPDATE) + return false; + + rte = rt_fetch(parse->resultRelation, parse->rtable); + return (IsRelParallelModeSafeForModify(rte->relid)); +} + +/* * apply_scanjoin_target_to_paths * * Adjust the final scan/join relation, and recursively all of its children, -- 1.8.3.1