From 1b3c7d245ecc7e9f65e89a70638fa02399c0b353 Mon Sep 17 00:00:00 2001 From: Greg Nancarrow Date: Wed, 10 Feb 2021 14:50:40 +1100 Subject: [PATCH v18 3/4] Enable parallel INSERT and/or SELECT for "INSERT INTO ... SELECT ...", where it is safe to do so. Parallel INSERT can't be utilized in the following cases: - A parallel query plan can't be generated for the underlying SELECT, because, for example, the SELECT statement uses a parallel-unsafe function - INSERT statement uses the ON CONFLICT DO UPDATE clause - Target table is a foreign or temporary table - Target table has a foreign key, or a parallel-unsafe trigger, index expression, column default expression or check constraint - Target table is a partitioned table with a parallel-unsafe partition key expression or support function Where the above-mentioned target table features are found to be, at worst, parallel-restricted, rather than parallel-unsafe, at least parallel SELECT may be utilized for the INSERT's query plan. Some prior work (85f6b49 and 3ba59cc) has already been done to establish the necessary infrastructure to allow parallel INSERTs, in general, to be safe, except for cases where new commandIds would be generated in the parallel-worker code (such as inserts into a table having a foreign key) - these cases need to be avoided. The planner is updated with additional changes that build upon those made to support parallel SELECT for "INSERT INTO ... SELECT ...". Where Gather paths are normally generated for parallel SELECT, in the case of an "INSERT INTO ... SELECT ...", these Gather paths are now generated only if the parallel-safety level is found to be RESTRICTED (resulting in non-parallel INSERT with parallel SELECT), otherwise if the parallel-safety level is found to be parallel SAFE, then partial paths for parallel INSERT are generated before Gather paths are added (resulting in parallel INSERT+SELECT). The executor is updated for Gather and ModifyTable node processing, to handle parallel INSERT, by only starting tuple queue readers if there is a RETURNING clause, and by firing any before/after statement triggers in the leader (and preventing them from being fired in the workers). The handling of the currentCommandId is updated to set it as used in the leader prior to entering parallel-mode for parallel INSERT, and to record it as used at the start of the parallel INSERT operation in the worker. The parallel-worker framework is updated to support serialization of an INSERT planned statement, to be passed to the workers, and to support return of the number of tuples processed (INSERTed) by the workers, for the executor state. Note that this commit changes the RELATION_IS_LOCAL() macro (typically used to decide whether we can skip acquiring locks), as now a relation created in the current transaction can no longer be assumed to be accessible only to the current backend, as it may be accessible to parallel workers. Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com --- src/backend/access/heap/heapam.c | 30 ++- src/backend/access/transam/xact.c | 43 +++- src/backend/executor/execMain.c | 8 +- src/backend/executor/execParallel.c | 61 +++++- src/backend/executor/nodeGather.c | 69 ++++++- src/backend/executor/nodeModifyTable.c | 44 +++- src/backend/optimizer/path/costsize.c | 46 +++++ src/backend/optimizer/plan/createplan.c | 2 +- src/backend/optimizer/plan/planner.c | 264 ++++++++++++++++-------- src/backend/optimizer/plan/setrefs.c | 28 ++- src/backend/optimizer/util/pathnode.c | 56 ++--- src/include/access/xact.h | 3 +- src/include/executor/execParallel.h | 1 + src/include/executor/nodeModifyTable.h | 3 +- src/include/nodes/execnodes.h | 3 +- src/include/optimizer/cost.h | 1 + src/include/optimizer/pathnode.h | 3 +- src/include/utils/rel.h | 9 +- 18 files changed, 511 insertions(+), 163 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 9926e2bd54..1da8d8379c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -52,6 +52,9 @@ #include "access/xloginsert.h" #include "access/xlogutils.h" #include "catalog/catalog.h" +#ifdef USE_ASSERT_CHECKING +#include "commands/trigger.h" +#endif #include "miscadmin.h" #include "pgstat.h" #include "port/atomics.h" @@ -2147,10 +2150,31 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, * inserts in general except for the cases where inserts generate a new * CommandId (eg. inserts into a table having a foreign key column). */ +#ifdef USE_ASSERT_CHECKING if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); + { + /* + * Assert that for this relation, no trigger of type RI_TRIGGER_FK + * exists, as it would indicate that the relation has a FK column, + * which would, on insert, result in creation of a new CommandId, + * and this isn't currently supported in a parallel worker. + */ + TriggerDesc *trigdesc = relation->trigdesc; + if (trigdesc != NULL) + { + int i; + + for (i = 0; i < trigdesc->numtriggers; i++) + { + int trigtype; + Trigger *trigger = &trigdesc->triggers[i]; + + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + Assert(trigtype != RI_TRIGGER_FK); + } + } + } +#endif tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2456a2bf10..b241ea5ea9 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -517,6 +517,20 @@ GetCurrentFullTransactionIdIfAny(void) return CurrentTransactionState->fullTransactionId; } +/* + * SetCurrentCommandIdUsedForWorker + * + * For a parallel worker, record that the currentCommandId has been used. + * This must only be called at the start of a parallel operation. + */ +void +SetCurrentCommandIdUsedForWorker(void) +{ + Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId); + + currentCommandIdUsed = true; +} + /* * MarkCurrentTransactionIdLoggedIfAny * @@ -765,12 +779,16 @@ GetCurrentCommandId(bool used) if (used) { /* - * Forbid setting currentCommandIdUsed in a parallel worker, because - * we have no provision for communicating this back to the leader. We - * could relax this restriction when currentCommandIdUsed was already - * true at the start of the parallel operation. + * If in a parallel worker, only allow setting currentCommandIdUsed if + * currentCommandIdUsed was already true at the start of the parallel + * operation (by way of SetCurrentCommandIdUsedForWorker()), otherwise + * forbid setting currentCommandIdUsed because we have no provision for + * communicating this back to the leader. Once currentCommandIdUsed is + * set, the commandId used by leader and workers can't be changed, + * because CommandCounterIncrement() then prevents any attempted + * increment of the current commandId. */ - Assert(!IsParallelWorker()); + Assert(!(IsParallelWorker() && !currentCommandIdUsed)); currentCommandIdUsed = true; } return currentCommandId; @@ -1021,12 +1039,25 @@ IsInParallelMode(void) * Prepare for entering parallel mode plan execution, based on command-type. */ void -PrepareParallelModePlanExec(CmdType commandType) +PrepareParallelModePlanExec(CmdType commandType, bool isParallelModifyLeader) { if (IsModifySupportedInParallelMode(commandType)) { Assert(!IsInParallelMode()); + if (isParallelModifyLeader) + { + /* + * Set currentCommandIdUsed to true, to ensure that the current + * CommandId (which will be used by the parallel workers) won't + * change during this parallel operation, as starting new + * commands in parallel-mode is not currently supported. + * See related comments in GetCurrentCommandId and + * CommandCounterIncrement. + */ + (void) GetCurrentCommandId(true); + } + /* * Prepare for entering parallel mode by assigning a * FullTransactionId, to be included in the transaction state that is diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 0648dd82ba..9510ebc83f 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -776,7 +776,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) PreventCommandIfReadOnly(CreateCommandName((Node *) plannedstmt)); } - if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE) + if ((plannedstmt->commandType != CMD_SELECT && + !IsModifySupportedInParallelMode(plannedstmt->commandType)) || plannedstmt->hasModifyingCTE) PreventCommandIfParallelMode(CreateCommandName((Node *) plannedstmt)); } @@ -1513,7 +1514,10 @@ ExecutePlan(EState *estate, estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) { - PrepareParallelModePlanExec(estate->es_plannedstmt->commandType); + bool isParallelModifyLeader = IsA(planstate, GatherState) && + IsA(outerPlanState(planstate), ModifyTableState); + + PrepareParallelModePlanExec(estate->es_plannedstmt->commandType, isParallelModifyLeader); EnterParallelMode(); } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index c95d5170e4..4a667752a1 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "access/xact.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" @@ -65,6 +66,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PROCESSED_COUNT UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -173,9 +175,11 @@ ExecSerializePlan(Plan *plan, EState *estate) * PlannedStmt to start the executor. */ pstmt = makeNode(PlannedStmt); - pstmt->commandType = CMD_SELECT; + Assert(estate->es_plannedstmt->commandType == CMD_SELECT || + IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType)); + pstmt->commandType = IsA(plan, ModifyTable) ? castNode(ModifyTable, plan)->operation : CMD_SELECT; pstmt->queryId = UINT64CONST(0); - pstmt->hasReturning = false; + pstmt->hasReturning = estate->es_plannedstmt->hasReturning; pstmt->hasModifyingCTE = false; pstmt->canSetTag = true; pstmt->transientPlan = false; @@ -183,7 +187,7 @@ ExecSerializePlan(Plan *plan, EState *estate) pstmt->parallelModeNeeded = false; pstmt->planTree = plan; pstmt->rtable = estate->es_range_table; - pstmt->resultRelations = NIL; + pstmt->resultRelations = estate->es_plannedstmt->resultRelations; pstmt->appendRelations = NIL; /* @@ -675,6 +679,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + if (IsA(planstate->plan, ModifyTable)) + { + /* Estimate space for returned "# of tuples processed" count. */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + /* * Give parallel-aware nodes a chance to add to the estimates, and get a * count of how many PlanState nodes there are. @@ -764,6 +776,19 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* We don't need the TupleQueueReaders yet, though. */ pei->reader = NULL; + if (IsA(planstate->plan, ModifyTable)) + { + /* + * Allocate space for each worker's returned "# of tuples processed" + * count. + */ + pei->processed_count = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PROCESSED_COUNT, pei->processed_count); + } + else + pei->processed_count = NULL; + /* * If instrumentation options were supplied, allocate space for the data. * It only gets partially initialized here; the rest happens during @@ -1152,6 +1177,15 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + /* + * Update total # of tuples processed, using counts from each worker. + */ + if (pei->processed_count != NULL) + { + for (i = 0; i < nworkers; i++) + pei->planstate->state->es_processed += pei->processed_count[i]; + } + pei->finished = true; } @@ -1379,6 +1413,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; WalUsage *wal_usage; + uint64 *processed_count; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1400,6 +1435,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) true); queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); + Assert(queryDesc->operation == CMD_SELECT || IsModifySupportedInParallelMode(queryDesc->operation)); + if (IsModifySupportedInParallelMode(queryDesc->operation)) + { + /* + * Record that the CurrentCommandId is used, at the start of the + * parallel operation. + */ + SetCurrentCommandIdUsedForWorker(); + } + /* Setting debug_query_string for individual workers */ debug_query_string = queryDesc->sourceText; @@ -1458,6 +1503,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); + if (IsModifySupportedInParallelMode(queryDesc->operation)) + { + /* + * Report the # of tuples processed during execution of a parallel + * table-modification command. + */ + processed_count = shm_toc_lookup(toc, PARALLEL_KEY_PROCESSED_COUNT, false); + processed_count[ParallelWorkerNumber] = queryDesc->estate->es_processed; + } + /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) ExecParallelReportInstrumentation(queryDesc->planstate, diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 9e1dc464cb..814c0e5c93 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -35,6 +35,7 @@ #include "executor/execdebug.h" #include "executor/execParallel.h" #include "executor/nodeGather.h" +#include "executor/nodeModifyTable.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" #include "miscadmin.h" @@ -60,6 +61,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) GatherState *gatherstate; Plan *outerNode; TupleDesc tupDesc; + Index varno; /* Gather node doesn't have innerPlan node. */ Assert(innerPlan(node) == NULL); @@ -104,7 +106,9 @@ ExecInitGather(Gather *node, EState *estate, int eflags) * Initialize result type and projection. */ ExecInitResultTypeTL(&gatherstate->ps); - ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); + varno = (IsA(outerNode, ModifyTable) && castNode(ModifyTable, outerNode)->returningLists != NULL) ? + castNode(ModifyTableState, outerPlanState(gatherstate))->resultRelInfo->ri_RangeTableIndex : OUTER_VAR; + ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, varno); /* * Without projections result slot type is not trivially known, see @@ -144,9 +148,19 @@ ExecGather(PlanState *pstate) GatherState *node = castNode(GatherState, pstate); TupleTableSlot *slot; ExprContext *econtext; + ModifyTableState *nodeModifyTableState = NULL; + bool isModify = false; + bool isModifyWithReturning = false; CHECK_FOR_INTERRUPTS(); + if (IsA(outerPlanState(pstate), ModifyTableState)) + { + nodeModifyTableState = castNode(ModifyTableState, outerPlanState(pstate)); + isModify = IsModifySupportedInParallelMode(nodeModifyTableState->operation); + isModifyWithReturning = isModify && nodeModifyTableState->ps.plan->targetlist != NIL; + } + /* * Initialize the parallel context and workers on first execution. We do * this on first execution rather than during node initialization, as it @@ -178,6 +192,16 @@ ExecGather(PlanState *pstate) node->pei, gather->initParam); + if (isModify) + { + /* + * For a supported parallel table-modification command, if + * there are BEFORE STATEMENT triggers, these must be fired by + * the leader, not by the parallel workers. + */ + fireBSTriggersInLeader(nodeModifyTableState); + } + /* * Register backend workers. We might not get as many as we * requested, or indeed any at all. @@ -188,7 +212,7 @@ ExecGather(PlanState *pstate) node->nworkers_launched = pcxt->nworkers_launched; /* Set up tuple queue readers to read the results. */ - if (pcxt->nworkers_launched > 0) + if (pcxt->nworkers_launched > 0 && (!isModify || isModifyWithReturning)) { ExecParallelCreateReaders(node->pei); /* Make a working array showing the active readers */ @@ -200,7 +224,11 @@ ExecGather(PlanState *pstate) } else { - /* No workers? Then never mind. */ + /* + * No workers were launched, or this is a supported parallel + * table-modification command without a RETURNING clause - no + * readers are required. + */ node->nreaders = 0; node->reader = NULL; } @@ -208,7 +236,7 @@ ExecGather(PlanState *pstate) } /* Run plan locally if no workers or enabled and not single-copy. */ - node->need_to_scan_locally = (node->nreaders == 0) + node->need_to_scan_locally = (node->nworkers_launched <= 0) || (!gather->single_copy && parallel_leader_participation); node->initialized = true; } @@ -229,7 +257,7 @@ ExecGather(PlanState *pstate) return NULL; /* If no projection is required, we're done. */ - if (node->ps.ps_ProjInfo == NULL) + if (node->ps.ps_ProjInfo == NULL || isModifyWithReturning) return slot; /* @@ -418,14 +446,35 @@ ExecShutdownGatherWorkers(GatherState *node) void ExecShutdownGather(GatherState *node) { - ExecShutdownGatherWorkers(node); + bool isModify; - /* Now destroy the parallel context. */ - if (node->pei != NULL) + /* + * If the parallel context has already been destroyed, this function must + * have been previously called, so just return. + */ + if (node->pei == NULL) + return; + + isModify = IsA(outerPlanState(node), ModifyTableState) && + IsModifySupportedInParallelMode(castNode(ModifyTableState, outerPlanState(node))->operation); + + if (isModify) { - ExecParallelCleanup(node->pei); - node->pei = NULL; + /* + * For a supported parallel table-modification command, if there are + * AFTER STATEMENT triggers, these must be fired by the leader, not by + * the parallel workers. + */ + ModifyTableState *nodeModifyTableState = castNode(ModifyTableState, outerPlanState(node)); + + fireASTriggersInLeader(nodeModifyTableState); } + + ExecShutdownGatherWorkers(node); + + /* Now destroy the parallel context. */ + ExecParallelCleanup(node->pei); + node->pei = NULL; } /* ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 2993ba43e3..3ff85599cb 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -39,6 +39,7 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/tableam.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -1950,6 +1951,39 @@ fireASTriggers(ModifyTableState *node) } } +/* + * Process BEFORE EACH STATEMENT triggers, in the leader + */ +void +fireBSTriggersInLeader(ModifyTableState *node) +{ + Assert(IsInParallelMode() && !IsParallelWorker()); + + if (node->fireBSTriggers) + { + fireBSTriggers(node); + node->fireBSTriggers = false; + + /* + * Disable firing of AFTER STATEMENT triggers by local plan execution + * (ModifyTable processing). These will be fired at the end of Gather + * processing. + */ + node->fireASTriggers = false; + } +} + +/* + * Process AFTER EACH STATEMENT triggers, in the leader + */ +void +fireASTriggersInLeader(ModifyTableState *node) +{ + Assert(IsInParallelMode() && !IsParallelWorker()); + + fireASTriggers(node); +} + /* * Set up the state needed for collecting transition tuples for AFTER * triggers. @@ -2298,7 +2332,11 @@ ExecModifyTable(PlanState *pstate) /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ - fireASTriggers(node); + if (node->fireASTriggers) + { + fireASTriggers(node); + node->fireASTriggers = false; + } node->mt_done = true; @@ -2375,7 +2413,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* set up epqstate with dummy subplan data for the moment */ EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam); - mtstate->fireBSTriggers = true; + /* Statement-level triggers must not be fired by parallel workers */ + mtstate->fireBSTriggers = !IsParallelWorker(); + mtstate->fireASTriggers = !IsParallelWorker(); /* * Build state for collecting transition tuples. This requires having a diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index aab06c7d21..f7c13bed7a 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -212,6 +212,52 @@ clamp_row_est(double nrows) } +/* + * cost_modifytable + * Determines and returns the cost of a ModifyTable node. + */ +void +cost_modifytable(ModifyTablePath *path) +{ + double total_size; + ListCell *lc; + + /* + * Compute cost & rowcount as sum of subpath costs & rowcounts. + * + * Currently, we don't charge anything extra for the actual table + * modification work, nor for the WITH CHECK OPTIONS or RETURNING + * expressions if any. + */ + path->path.startup_cost = 0; + path->path.total_cost = 0; + path->path.rows = 0; + total_size = 0; + foreach(lc, path->subpaths) + { + Path *subpath = (Path *) lfirst(lc); + + if (lc == list_head(path->subpaths)) /* first node? */ + path->path.startup_cost = subpath->startup_cost; + path->path.total_cost += subpath->total_cost; + if (path->returningLists != NIL) + { + path->path.rows += subpath->rows; + total_size += subpath->pathtarget->width * subpath->rows; + } + } + + /* + * Set width to the average width of the subpath outputs. XXX this is + * totally wrong: we should return an average of the RETURNING tlist + * widths. But it's what happened historically, and improving it is a + * task for another day. + */ + if (path->path.rows > 0) + total_size /= path->path.rows; + path->path.pathtarget->width = rint(total_size); +} + /* * cost_seqscan * Determines and returns the cost of scanning a relation sequentially. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 6c8305c977..f04fa7e0b2 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -339,7 +339,7 @@ create_plan(PlannerInfo *root, Path *best_path) * top-level tlist seen at execution time. However, ModifyTable plan * nodes don't have a tlist matching the querytree targetlist. */ - if (!IsA(plan, ModifyTable)) + if (!IsA(plan, ModifyTable) && !(IsA(plan, Gather) && IsA(outerPlan(plan), ModifyTable))) apply_tlist_labeling(plan->targetlist, root->processed_tlist); /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index cf3604a7ea..b42e05c8b2 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -130,6 +130,9 @@ typedef struct static Node *preprocess_expression(PlannerInfo *root, Node *expr, int kind); static void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode); static void inheritance_planner(PlannerInfo *root); +static Path *generate_final_rel_path(PlannerInfo *root, RelOptInfo *final_rel, + bool inheritance_update, Path *path, int64 offset_est, + int64 count_est, bool isParallelModify); static void grouping_planner(PlannerInfo *root, bool inheritance_update, double tuple_fraction); static grouping_sets_data *preprocess_grouping_sets(PlannerInfo *root); @@ -339,10 +342,11 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, * * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as - * of now, only the leader backend writes into a completely new table. In - * the future, we can extend it to allow workers to write into the table. - * However, to allow parallel updates and deletes, we have to solve other - * problems, especially around combo CIDs.) + * of now, only INSERT INTO...SELECT employs workers to write into the + * table, while for the other cases only the leader backend writes into a + * completely new table. In the future, we can extend it to allow workers + * for more cases. However, to allow parallel updates and deletes, we have + * to solve other problems, especially around combo CIDs.) * * For now, we don't try to use parallel mode if we're running inside a * parallel worker. We might eventually be able to relax this @@ -1811,7 +1815,120 @@ inheritance_planner(PlannerInfo *root) returningLists, rowMarks, NULL, - assign_special_exec_param(root))); + assign_special_exec_param(root), + 0)); +} + +/* + * generate_final_rel_path + * Generate a path for the final_rel, with LockRows, Limit, and/or + * ModifyTable steps added if needed. + */ +static Path * +generate_final_rel_path(PlannerInfo *root, RelOptInfo *final_rel, + bool inheritance_update, Path *path, + int64 offset_est, int64 count_est, bool isParallelModify) +{ + Query *parse = root->parse; + + /* + * If there is a FOR [KEY] UPDATE/SHARE clause, add the LockRows node. + * (Note: we intentionally test parse->rowMarks not root->rowMarks + * here. If there are only non-locking rowmarks, they should be + * handled by the ModifyTable node instead. However, root->rowMarks + * is what goes into the LockRows node.) + */ + if (parse->rowMarks) + { + path = (Path *) create_lockrows_path(root, final_rel, path, + root->rowMarks, + assign_special_exec_param(root)); + } + + /* + * If there is a LIMIT/OFFSET clause, add the LIMIT node. + */ + if (limit_needed(parse)) + { + path = (Path *) create_limit_path(root, final_rel, path, + parse->limitOffset, + parse->limitCount, + parse->limitOption, + offset_est, count_est); + } + + /* + * If this is an INSERT/UPDATE/DELETE, and we're not being called from + * inheritance_planner, add the ModifyTable node. + */ + if (parse->commandType != CMD_SELECT && !inheritance_update) + { + Index rootRelation; + List *withCheckOptionLists; + List *returningLists; + List *rowMarks; + int parallelWorkers; + + /* + * If target is a partition root table, we need to mark the + * ModifyTable node appropriately for that. + */ + if (rt_fetch(parse->resultRelation, parse->rtable)->relkind == + RELKIND_PARTITIONED_TABLE) + rootRelation = parse->resultRelation; + else + rootRelation = 0; + + /* + * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if + * needed. + */ + if (parse->withCheckOptions) + withCheckOptionLists = list_make1(parse->withCheckOptions); + else + withCheckOptionLists = NIL; + + if (parse->returningList) + returningLists = list_make1(parse->returningList); + else + returningLists = NIL; + + /* + * If there was a FOR [KEY] UPDATE/SHARE clause, the LockRows node + * will have dealt with fetching non-locked marked rows, else we + * need to have ModifyTable do that. + */ + if (parse->rowMarks) + rowMarks = NIL; + else + rowMarks = root->rowMarks; + + /* + * For the number of workers to use for a parallel + * INSERT/UPDATE/DELETE, it seems reasonable to use the same number + * of workers as estimated for the underlying query. + */ + parallelWorkers = isParallelModify ? path->parallel_workers : 0; + + path = (Path *) + create_modifytable_path(root, final_rel, + parse->commandType, + parse->canSetTag, + parse->resultRelation, + rootRelation, + false, + list_make1_int(parse->resultRelation), + list_make1(path), + list_make1(root), + withCheckOptionLists, + returningLists, + rowMarks, + parse->onConflict, + assign_special_exec_param(root), + parallelWorkers); + } + + return path; } /*-------------------- @@ -1859,6 +1976,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, RelOptInfo *final_rel; FinalPathExtraData extra; ListCell *lc; + bool parallel_modify_partial_path_added = false; /* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */ if (parse->limitCount || parse->limitOffset) @@ -2299,96 +2417,33 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, { Path *path = (Path *) lfirst(lc); - /* - * If there is a FOR [KEY] UPDATE/SHARE clause, add the LockRows node. - * (Note: we intentionally test parse->rowMarks not root->rowMarks - * here. If there are only non-locking rowmarks, they should be - * handled by the ModifyTable node instead. However, root->rowMarks - * is what goes into the LockRows node.) - */ - if (parse->rowMarks) - { - path = (Path *) create_lockrows_path(root, final_rel, path, - root->rowMarks, - assign_special_exec_param(root)); - } + path = generate_final_rel_path(root, final_rel, inheritance_update, path, + offset_est, count_est, false); - /* - * If there is a LIMIT/OFFSET clause, add the LIMIT node. - */ - if (limit_needed(parse)) - { - path = (Path *) create_limit_path(root, final_rel, path, - parse->limitOffset, - parse->limitCount, - parse->limitOption, - offset_est, count_est); - } + /* And shove it into final_rel */ + add_path(final_rel, path); + } + /* Consider a supported parallel table-modification command */ + if (IsModifySupportedInParallelMode(parse->commandType) && + !inheritance_update && + final_rel->consider_parallel && + parse->rowMarks == NIL) + { /* - * If this is an INSERT/UPDATE/DELETE, and we're not being called from - * inheritance_planner, add the ModifyTable node. + * Generate partial paths for the final_rel. Insert all surviving + * paths, with Limit, and/or ModifyTable steps added if needed. */ - if (parse->commandType != CMD_SELECT && !inheritance_update) + foreach(lc, current_rel->partial_pathlist) { - Index rootRelation; - List *withCheckOptionLists; - List *returningLists; - List *rowMarks; - - /* - * If target is a partition root table, we need to mark the - * ModifyTable node appropriately for that. - */ - if (rt_fetch(parse->resultRelation, parse->rtable)->relkind == - RELKIND_PARTITIONED_TABLE) - rootRelation = parse->resultRelation; - else - rootRelation = 0; - - /* - * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if - * needed. - */ - if (parse->withCheckOptions) - withCheckOptionLists = list_make1(parse->withCheckOptions); - else - withCheckOptionLists = NIL; - - if (parse->returningList) - returningLists = list_make1(parse->returningList); - else - returningLists = NIL; + Path *path = (Path *) lfirst(lc); - /* - * If there was a FOR [KEY] UPDATE/SHARE clause, the LockRows node - * will have dealt with fetching non-locked marked rows, else we - * need to have ModifyTable do that. - */ - if (parse->rowMarks) - rowMarks = NIL; - else - rowMarks = root->rowMarks; + path = generate_final_rel_path(root, final_rel, inheritance_update, path, + offset_est, count_est, true); - path = (Path *) - create_modifytable_path(root, final_rel, - parse->commandType, - parse->canSetTag, - parse->resultRelation, - rootRelation, - false, - list_make1_int(parse->resultRelation), - list_make1(path), - list_make1(root), - withCheckOptionLists, - returningLists, - rowMarks, - parse->onConflict, - assign_special_exec_param(root)); + add_partial_path(final_rel, path); + parallel_modify_partial_path_added = true; } - - /* And shove it into final_rel */ - add_path(final_rel, path); } /* @@ -2407,6 +2462,13 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, } } + if (parallel_modify_partial_path_added) + { + final_rel->rows = current_rel->rows; /* ??? why hasn't this been + * set above somewhere ???? */ + generate_useful_gather_paths(root, final_rel, false); + } + extra.limit_needed = limit_needed(parse); extra.limit_tuples = limit_tuples; extra.count_est = count_est; @@ -7575,7 +7637,33 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) - generate_useful_gather_paths(root, rel, false); + { + if (IsModifySupportedInParallelMode(root->parse->commandType)) + { + Assert(root->glob->parallelModeOK); + if (root->glob->maxParallelHazard != PROPARALLEL_SAFE) + { + /* + * Don't allow a supported parallel table-modification + * command, because it's not safe. + */ + if (root->glob->maxParallelHazard == PROPARALLEL_RESTRICTED) + { + /* + * However, do allow any underlying query to be run by + * parallel workers. + */ + generate_useful_gather_paths(root, rel, false); + } + rel->partial_pathlist = NIL; + rel->consider_parallel = false; + } + } + else + { + generate_useful_gather_paths(root, rel, false); + } + } /* * Reassess which paths are the cheapest, now that we've potentially added diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index f7d91c67bb..48467d6297 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -257,6 +257,7 @@ set_plan_references(PlannerInfo *root, Plan *plan) PlannerGlobal *glob = root->glob; int rtoffset = list_length(glob->finalrtable); ListCell *lc; + Plan *finalPlan; /* * Add all the query's RTEs to the flattened rangetable. The live ones @@ -319,7 +320,9 @@ set_plan_references(PlannerInfo *root, Plan *plan) } /* Now fix the Plan tree */ - return set_plan_refs(root, plan, rtoffset); + finalPlan = set_plan_refs(root, plan, rtoffset); + + return finalPlan; } /* @@ -1067,6 +1070,29 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) plan->lefttree = set_plan_refs(root, plan->lefttree, rtoffset); plan->righttree = set_plan_refs(root, plan->righttree, rtoffset); + /* + * FIXME: The following code block is a bit of a hack to fix the missing + * targetlist on the Gather node, in the case of an underlying ModifyTable + * node for Parallel INSERT. The current design expects the ModifyTable + * targetlist to be set in set_plan_refs(), but the targetlist is needed + * by the parent Gather node, which is processed first. + * This issue is a consequence of the fact that, prior to Parallel INSERT + * support, ModifyTable node was always a top-level plan node. Now the + * ModifyTable node may be in the subplan of the Gather node, so the + * expected order of node processing and configuration has changed. + * Currently it is not known how to fix this issue in a more elegant way. + */ + if (nodeTag(plan) == T_Gather) + { + Plan *subplan = plan->lefttree; + + if (IsA(subplan, ModifyTable) && + castNode(ModifyTable, subplan)->returningLists != NIL) + { + plan->targetlist = subplan->targetlist; + } + } + return plan; } diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 9be0c4a6af..a47a88a249 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3524,6 +3524,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, * 'rowMarks' is a list of PlanRowMarks (non-locking only) * 'onconflict' is the ON CONFLICT clause, or NULL * 'epqParam' is the ID of Param for EvalPlanQual re-eval + * 'parallelWorkers' is the no. of parallel workers to use */ ModifyTablePath * create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, @@ -3534,10 +3535,10 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam) + int epqParam, + int parallelWorkers) { ModifyTablePath *pathnode = makeNode(ModifyTablePath); - double total_size; ListCell *lc; Assert(list_length(resultRelations) == list_length(subpaths)); @@ -3554,47 +3555,22 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; - pathnode->path.parallel_workers = 0; - pathnode->path.pathkeys = NIL; - - /* - * Compute cost & rowcount as sum of subpath costs & rowcounts. - * - * Currently, we don't charge anything extra for the actual table - * modification work, nor for the WITH CHECK OPTIONS or RETURNING - * expressions if any. It would only be window dressing, since - * ModifyTable is always a top-level node and there is no way for the - * costs to change any higher-level planning choices. But we might want - * to make it look better sometime. - */ - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; - pathnode->path.rows = 0; - total_size = 0; - foreach(lc, subpaths) + pathnode->path.parallel_safe = rel->consider_parallel && parallelWorkers > 0; + if (pathnode->path.parallel_safe) { - Path *subpath = (Path *) lfirst(lc); - - if (lc == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; - if (returningLists != NIL) + foreach(lc, subpaths) { - pathnode->path.rows += subpath->rows; - total_size += subpath->pathtarget->width * subpath->rows; + Path *sp = (Path *) lfirst(lc); + + if (!sp->parallel_safe) + { + pathnode->path.parallel_safe = false; + break; + } } } - - /* - * Set width to the average width of the subpath outputs. XXX this is - * totally wrong: we should return an average of the RETURNING tlist - * widths. But it's what happened historically, and improving it is a task - * for another day. - */ - if (pathnode->path.rows > 0) - total_size /= pathnode->path.rows; - pathnode->path.pathtarget->width = rint(total_size); + pathnode->path.parallel_workers = parallelWorkers; + pathnode->path.pathkeys = NIL; pathnode->operation = operation; pathnode->canSetTag = canSetTag; @@ -3610,6 +3586,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; + cost_modifytable(pathnode); + return pathnode; } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 34cfaf542c..ea814e48fc 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -386,6 +386,7 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void SetCurrentCommandIdUsedForWorker(void); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); @@ -466,7 +467,7 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); -extern void PrepareParallelModePlanExec(CmdType commandType); +extern void PrepareParallelModePlanExec(CmdType commandType, bool isParallelModifyLeader); /* * IsModifySupportedInParallelMode diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 3888175a2f..072869fdda 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ WalUsage *wal_usage; /* walusage area in DSM */ + uint64 *processed_count; /* processed tuple count area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 83e2965531..5206a4e6fd 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -22,5 +22,6 @@ extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); - +extern void fireBSTriggersInLeader(ModifyTableState *node); +extern void fireASTriggersInLeader(ModifyTableState *node); #endif /* NODEMODIFYTABLE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 943931f65d..86e811f974 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1177,7 +1177,8 @@ typedef struct ModifyTableState List **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ - bool fireBSTriggers; /* do we need to fire stmt triggers? */ + bool fireBSTriggers; /* do we need to fire before stmt triggers? */ + bool fireASTriggers; /* do we need to fire after stmt triggers? */ /* * Slot for storing tuples in the root partitioned table's rowtype during diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index ed2e4af4be..9f15fcb240 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); +extern void cost_modifytable(ModifyTablePath *path); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 8dfc36a4e1..273d92f877 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -263,7 +263,8 @@ extern ModifyTablePath *create_modifytable_path(PlannerInfo *root, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam); + int epqParam, + int parallel_workers); extern LimitPath *create_limit_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, Node *limitOffset, Node *limitCount, diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 10b63982c0..2a41a00f29 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -15,6 +15,7 @@ #define REL_H #include "access/tupdesc.h" +#include "access/xact.h" #include "access/xlog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" @@ -575,15 +576,15 @@ typedef struct ViewOptions /* * RELATION_IS_LOCAL - * If a rel is either temp or newly created in the current transaction, - * it can be assumed to be accessible only to the current backend. - * This is typically used to decide that we can skip acquiring locks. + * If a rel is temp, it can be assumed to be accessible only to the + * current backend. This is typically used to decide that we can + * skip acquiring locks. * * Beware of multiple eval of argument */ #define RELATION_IS_LOCAL(relation) \ ((relation)->rd_islocaltemp || \ - (relation)->rd_createSubid != InvalidSubTransactionId) + (!IsInParallelMode() && ((relation)->rd_createSubid != InvalidSubTransactionId))) /* * RELATION_IS_OTHER_TEMP -- 2.27.0