From db57d18f21296feba3f284773bd6b4d0de62c0eb Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 4 Jan 2021 12:09:35 +0530 Subject: [PATCH v19 1/4] Parallel Inserts in CREATE TABLE AS Allow the leader and each worker insert the tuples in parallel if the SELECT part of the CTAS is parallelizable. The design: The main idea is to push the CTAS dest receiver down to Gather node and from there the required information will be shared to workers so that they can perform parallel insertions. Leader will also participate in insertions. After the planning, check if the upper plan node is Gather in createas.c and mark a parallelism flag in the CTAS dest receiver and push it down to Gather node. Each worker can create its own CTAS dest receiver with the information passed from the leader. Leader inserts its share of tuples if instructed to do, and so are workers. Each worker writes atomically its number of inserted tuples into a shared memory variable, the leader combines this with its own number of inserted tuples and shares to the client. --- src/backend/access/heap/heapam.c | 11 - src/backend/access/transam/xact.c | 28 ++- src/backend/commands/createas.c | 84 ++++++- src/backend/commands/explain.c | 47 ++++ src/backend/executor/execParallel.c | 322 ++++++++++++++++++++++++- src/backend/executor/nodeGather.c | 130 +++++++++- src/backend/executor/nodeGatherMerge.c | 4 +- src/include/access/xact.h | 1 + src/include/commands/createas.h | 16 ++ src/include/executor/execParallel.h | 42 +++- src/include/nodes/execnodes.h | 3 + 11 files changed, 640 insertions(+), 48 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 53e997cd55..3741d824bd 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2043,17 +2043,6 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { - /* - * To allow parallel inserts, we need to ensure that they are safe to be - * performed in workers. We have the infrastructure to allow parallel - * inserts in general except for the cases where inserts generate a new - * CommandId (eg. inserts into a table having a foreign key column). - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a2068e3fd4..750d15a572 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -764,17 +764,35 @@ GetCurrentCommandId(bool used) if (used) { /* - * Forbid setting currentCommandIdUsed in a parallel worker, because - * we have no provision for communicating this back to the leader. We - * could relax this restriction when currentCommandIdUsed was already - * true at the start of the parallel operation. + * If in a parallel worker, only allow setting currentCommandIdUsed if + * currentCommandIdUsed was already true at the start of the parallel + * operation (by way of SetCurrentCommandIdUsed()), otherwise forbid + * setting currentCommandIdUsed because we have no provision for + * communicating this back to the leader. Once currentCommandIdUsed is + * set, the commandId used by leader and workers can't be changed, + * because CommandCounterIncrement() then prevents any attempted + * increment of the current commandId. */ - Assert(!IsParallelWorker()); + Assert(!(IsParallelWorker() && !currentCommandIdUsed)); currentCommandIdUsed = true; } return currentCommandId; } +/* + * SetCurrentCommandIdUsedForWorker + * + * For a parallel worker, record that the currentCommandId has been used. This + * must only be called at the start of a parallel operation. + */ +void +SetCurrentCommandIdUsedForWorker(void) +{ + Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId); + + currentCommandIdUsed = true; +} + /* * SetParallelStartTimestamps * diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index dce882012e..a8050a2767 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -38,6 +38,7 @@ #include "commands/prepare.h" #include "commands/tablecmds.h" #include "commands/view.h" +#include "executor/execParallel.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -51,18 +52,6 @@ #include "utils/rls.h" #include "utils/snapmgr.h" -typedef struct -{ - DestReceiver pub; /* publicly-known function pointers */ - IntoClause *into; /* target relation specification */ - /* These fields are filled by intorel_startup: */ - Relation rel; /* relation to write to */ - ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ -} DR_intorel; - /* utility functions for CTAS definition creation */ static ObjectAddress create_ctas_internal(List *attrList, IntoClause *into); static ObjectAddress create_ctas_nodata(List *tlist, IntoClause *into); @@ -294,6 +283,11 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, } else { + ParallelInsertCTASInfo parallel_ins_info; + + parallel_ins_info.intoclause = into; + parallel_ins_info.objectid = InvalidOid; + /* * Parse analysis was done already, but we still have to run the rule * rewriter. We do not do AcquireRewriteLocks: we assume the query @@ -338,6 +332,19 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, /* call ExecutorStart to prepare the plan for execution */ ExecutorStart(queryDesc, GetIntoRelEFlags(into)); + /* See if we can perform parallel insertions. */ + if (IsParallelInsertionAllowed(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + ¶llel_ins_info)) + { + /* + * If the SELECT part of the CTAS is parallelizable, then set the + * parallel insert state. We need plan state to be initialized by + * the executor to decide whether to allow parallel inserts or not. + */ + SetParallelInsertState(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + queryDesc); + } + /* run the plan to completion */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); @@ -441,6 +448,9 @@ CreateIntoRelDestReceiver(IntoClause *intoClause) self->pub.rDestroy = intorel_destroy; self->pub.mydest = DestIntoRel; self->into = intoClause; + self->is_parallel = false; + self->is_parallel_worker = false; + self->object_id = InvalidOid; /* other private fields will be set during intorel_startup */ return (DestReceiver *) self; @@ -461,6 +471,35 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) ListCell *lc; int attnum; + /* + * All the necessary work such as table creation, sanity checks etc. would + * have been done by the leader. So, parallel workers just need to open the + * table, allocate bulk insert state, mark the command id as used, store it + * in the dest receiver and return. + */ + if (myState->is_parallel_worker) + { + /* In the worker */ + intoRelationDesc = table_open(myState->object_id, AccessExclusiveLock); + myState->rel = intoRelationDesc; + myState->reladdr = InvalidObjectAddress; + myState->ti_options = 0; + myState->bistate = GetBulkInsertState(); + + /* + * Right after the table is created in the leader, the command id is + * incremented (in create_ctas_internal()). The new command id is + * marked as used in intorel_startup(), then the parallel mode is + * entered. The command id and transaction id are serialized into + * parallel DSM, they are then available to all parallel workers. All + * the workers need to mark the command id as used before insertion. + */ + SetCurrentCommandIdUsedForWorker(); + myState->output_cid = GetCurrentCommandId(false); + + return; + } + Assert(into != NULL); /* else somebody forgot to set it */ /* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */ @@ -563,6 +602,27 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) else myState->bistate = NULL; + /* If parallel inserts are to be allowed, set few extra information. */ + if (myState->is_parallel) + { + myState->object_id = intoRelationAddr.objectId; + + /* + * We don't need to skip contacting FSM while inserting tuples for + * parallel mode, while extending the relations, workers instead of + * blocking on a page while another worker is inserting, can check the + * FSM for another page that can accommodate the tuples. This results + * in major benefit for parallel inserts. + */ + myState->ti_options = 0; + + /* + * rd_createSubid is marked invalid, otherwise, the table is not + * allowed to be extended by the workers. + */ + myState->rel->rd_createSubid = InvalidSubTransactionId; + } + /* * Valid smgr_targblock implies something already wrote to the relation. * This may be harmless, but this function hasn't planned for it. diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 5d7eb3574c..0ae5d8c65f 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -18,6 +18,7 @@ #include "commands/createas.h" #include "commands/defrem.h" #include "commands/prepare.h" +#include "executor/execParallel.h" #include "executor/nodeHash.h" #include "foreign/fdwapi.h" #include "jit/jit.h" @@ -572,6 +573,27 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, /* call ExecutorStart to prepare the plan for execution */ ExecutorStart(queryDesc, eflags); + if (into) + { + ParallelInsertCTASInfo parallel_ins_info; + + parallel_ins_info.intoclause = into; + parallel_ins_info.objectid = InvalidOid; + + /* See if we can perform parallel insertions. */ + if (IsParallelInsertionAllowed(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + ¶llel_ins_info)) + { + /* + * If the SELECT part of the CTAS is parallelizable, then set the + * parallel insert state. We need plan state to be initialized by + * the executor to decide whether to allow parallel inserts or not. + */ + SetParallelInsertState(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + queryDesc); + } + } + /* Execute the plan for statistics if asked for */ if (es->analyze) { @@ -1791,6 +1813,31 @@ ExplainNode(PlanState *planstate, List *ancestors, if (gather->single_copy || es->format != EXPLAIN_FORMAT_TEXT) ExplainPropertyBool("Single Copy", gather->single_copy, es); + + /* + * Show the create table information under Gather node in case + * parallel workers have inserted the rows. + */ + if (IsA(planstate, GatherState)) + { + GatherState *gstate = (GatherState *) planstate; + + if (GetParallelInsertCmdType(gstate->dest) == + PARALLEL_INSERT_CMD_CREATE_TABLE_AS && + ((DR_intorel *) gstate->dest)->into && + ((DR_intorel *) gstate->dest)->into->rel && + ((DR_intorel *) gstate->dest)->into->rel->relname) + { + es->indent--; + ExplainIndentText(es); + appendStringInfoString(es->str, "-> "); + appendStringInfoString(es->str, "Create "); + appendStringInfo(es->str, "%s\n", + ((DR_intorel *) gstate->dest)->into->rel->relname); + ExplainIndentText(es); + es->indent++; + } + } } break; case T_GatherMerge: diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index c95d5170e4..7ed3e9e3b6 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "commands/createas.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" @@ -65,6 +66,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_INTO_CLAUSE UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -77,6 +79,10 @@ typedef struct FixedParallelExecutorState dsa_pointer param_exec; int eflags; int jit_flags; + ParallelInsertCmdKind ins_cmd_type; /* parallel insertion command type */ + Oid objectid; /* used by workers to open relation */ + /* Number of tuples inserted by all the workers. */ + pg_atomic_uint64 processed; } FixedParallelExecutorState; /* @@ -135,10 +141,23 @@ static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); - -/* Helper function that runs in the parallel worker. */ +static void ParallelInsCmdEstimate(ParallelContext *pcxt, + ParallelInsertCmdKind ins_cmd, + void *ins_info); +static void SaveParallelInsCmdFixedInfo(ParallelExecutorInfo *pei, + FixedParallelExecutorState *fpes, + ParallelInsertCmdKind ins_cmd, + void *ins_info); +static void SaveParallelInsCmdInfo(ParallelContext *pcxt, + ParallelInsertCmdKind ins_cmd, + void *ins_info); + +/* Helper functions that run in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +static DestReceiver *ExecParallelGetInsReceiver(shm_toc *toc, + FixedParallelExecutorState *fpes); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -578,7 +597,9 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, - int64 tuples_needed) + int64 tuples_needed, + ParallelInsertCmdKind parallel_ins_cmd, + void *parallel_ins_info) { ParallelExecutorInfo *pei; ParallelContext *pcxt; @@ -712,6 +733,10 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for parallel insertions. */ + if (parallel_ins_info) + ParallelInsCmdEstimate(pcxt, parallel_ins_cmd, parallel_ins_info); + /* Everyone's had a chance to ask for space, so now create the DSM. */ InitializeParallelDSM(pcxt); @@ -729,6 +754,20 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, fpes->param_exec = InvalidDsaPointer; fpes->eflags = estate->es_top_eflags; fpes->jit_flags = estate->es_jit_flags; + + if (parallel_ins_info) + { + /* Save parallel insertion fixed info into DSA. */ + SaveParallelInsCmdFixedInfo(pei, fpes, parallel_ins_cmd, + parallel_ins_info); + } + else + { + pei->processed = NULL; + fpes->ins_cmd_type = PARALLEL_INSERT_CMD_UNDEF; + fpes->objectid = InvalidOid; + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -758,8 +797,22 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); pei->wal_usage = walusage_space; - /* Set up the tuple queues that the workers will write into. */ - pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); + if (parallel_ins_info) + { + /* Save parallel insertion info into DSA. */ + SaveParallelInsCmdInfo(pcxt, parallel_ins_cmd, parallel_ins_info); + + /* + * Tuple queues are not required in case of parallel insertions by the + * workers, because Gather node will not receive any tuples. + */ + pei->tqueue = NULL; + } + else + { + /* Set up the tuple queues that the workers will write into. */ + pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); + } /* We don't need the TupleQueueReaders yet, though. */ pei->reader = NULL; @@ -1391,8 +1444,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Get fixed-size state. */ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); - /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ - receiver = ExecParallelGetReceiver(seg, toc); + /* Set up DestReceiver. */ + if (fpes->ins_cmd_type == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + receiver = ExecParallelGetInsReceiver(toc, fpes); + else + receiver = ExecParallelGetReceiver(seg, toc); + + /* Set up SharedExecutorInstrumentation, and QueryDesc. */ instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); if (instrumentation != NULL) instrument_options = instrumentation->instrument_options; @@ -1471,6 +1529,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->estate->es_jit->instr; } + /* + * Write out the number of tuples this worker has inserted. Leader will use + * it to inform the end client. + */ + if (fpes->ins_cmd_type == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + pg_atomic_add_fetch_u64(&fpes->processed, queryDesc->estate->es_processed); + /* Must do this after capturing instrumentation. */ ExecutorEnd(queryDesc); @@ -1479,3 +1544,246 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FreeQueryDesc(queryDesc); receiver->rDestroy(receiver); } + +/* + * Estimate space required for sending parallel insert information to workers + * in commands such as CTAS. + */ +static void +ParallelInsCmdEstimate(ParallelContext *pcxt, ParallelInsertCmdKind ins_cmd, + void *ins_info) +{ + Assert(pcxt && ins_info && + (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); + + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + { + ParallelInsertCTASInfo *info = NULL; + char *intoclause_str = NULL; + int intoclause_len = 0; + + info = (ParallelInsertCTASInfo *) ins_info; + intoclause_str = nodeToString(info->intoclause); + intoclause_len = strlen(intoclause_str) + 1; + + shm_toc_estimate_chunk(&pcxt->estimator, intoclause_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* + * Save fixed state information required by workers for parallel inserts in + * commands such as CTAS. + */ +static void +SaveParallelInsCmdFixedInfo(ParallelExecutorInfo *pei, + FixedParallelExecutorState *fpes, + ParallelInsertCmdKind ins_cmd, + void *ins_info) +{ + Assert(pei && fpes && ins_info && + (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); + + pg_atomic_init_u64(&fpes->processed, 0); + fpes->ins_cmd_type = ins_cmd; + pei->processed = &fpes->processed; + + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + { + ParallelInsertCTASInfo *info = NULL; + + info = (ParallelInsertCTASInfo *) ins_info; + fpes->objectid = info->objectid; + } +} + +/* + * Save variable state information required by workers for parallel inserts in + * commands such as CTAS. + */ +static void +SaveParallelInsCmdInfo(ParallelContext *pcxt, ParallelInsertCmdKind ins_cmd, + void *ins_info) +{ + Assert(pcxt && ins_info && + (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); + + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + { + ParallelInsertCTASInfo *info = NULL; + char *intoclause_str = NULL; + int intoclause_len; + char *intoclause_space = NULL; + + info = (ParallelInsertCTASInfo *)ins_info; + intoclause_str = nodeToString(info->intoclause); + intoclause_len = strlen(intoclause_str) + 1; + intoclause_space = shm_toc_allocate(pcxt->toc, intoclause_len); + + memcpy(intoclause_space, intoclause_str, intoclause_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, intoclause_space); + } +} + +/* + * Create a DestReceiver to write produced tuples to target relation in case of + * parallel insertions. + */ +static DestReceiver * +ExecParallelGetInsReceiver(shm_toc *toc, FixedParallelExecutorState *fpes) +{ + ParallelInsertCmdKind ins_cmd; + DestReceiver *receiver; + + Assert(fpes && toc && + (fpes->ins_cmd_type == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); + + ins_cmd = fpes->ins_cmd_type; + + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + { + char *intoclause_str = NULL; + IntoClause *intoclause = NULL; + + intoclause_str = shm_toc_lookup(toc, PARALLEL_KEY_INTO_CLAUSE, true); + + /* + * If the worker is for parallel insert in CTAS, then use the proper + * dest receiver. + */ + intoclause = (IntoClause *) stringToNode(intoclause_str); + receiver = CreateIntoRelDestReceiver(intoclause); + + ((DR_intorel *)receiver)->is_parallel_worker = true; + ((DR_intorel *)receiver)->object_id = fpes->objectid; + } + + return receiver; +} + +/* + * Given a DestReceiver, return the command type if parallelism is allowed. + */ +ParallelInsertCmdKind +GetParallelInsertCmdType(DestReceiver *dest) +{ + if (!dest) + return PARALLEL_INSERT_CMD_UNDEF; + + if (dest->mydest == DestIntoRel && + ((DR_intorel *) dest)->is_parallel) + return PARALLEL_INSERT_CMD_CREATE_TABLE_AS; + + return PARALLEL_INSERT_CMD_UNDEF; +} + +/* + * Given a DestReceiver, allocate and fill parallel insert info structure + * corresponding to command type. + * + * Note that the memory allocated here for the info structure has to be freed + * up in caller. + */ +void * +GetParallelInsertCmdInfo(DestReceiver *dest, ParallelInsertCmdKind ins_cmd) +{ + void *parallel_ins_info = NULL; + + Assert(dest && (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); + + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + { + ParallelInsertCTASInfo *ctas_info = NULL; + + ctas_info = (ParallelInsertCTASInfo *) + palloc0(sizeof(ParallelInsertCTASInfo)); + ctas_info->intoclause = ((DR_intorel *) dest)->into; + ctas_info->objectid = ((DR_intorel *) dest)->object_id; + parallel_ins_info = ctas_info; + } + + return parallel_ins_info; +} + +/* + * Check if parallel insertion is allowed in commands such as CTAS. + * + * Return true if allowed, otherwise false. + */ +bool +IsParallelInsertionAllowed(ParallelInsertCmdKind ins_cmd, void *ins_info) +{ + Assert(ins_info && (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); + + /* + * For CTAS, do not allow parallel inserts if target table is temporary. As + * the temporary tables are backend local, workers can not know about them. + * + * Return false either if the into clause is NULL or if the table is + * temporary, otherwise true. + */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + { + ParallelInsertCTASInfo *ctas_info = NULL; + IntoClause *into = NULL; + + ctas_info = (ParallelInsertCTASInfo *) ins_info; + into = ctas_info->intoclause; + + /* Below check may hit in case this function is called from explain.c. */ + if (!(into && IsA(into, IntoClause))) + return false; + + /* + * Currently, CTAS supports creation of normal(logged), temporary and + * unlogged tables. It does not support foreign or partition table + * creation. Hence the check for temporary table is enough here. + */ + if (into->rel && into->rel->relpersistence == RELPERSISTENCE_TEMP) + return false; + + return true; + } + + return false; +} + +/* + * Set the parallel insert state, if the upper node is Gather and it doesn't + * have any projections. The parallel insert state includes information such as + * a flag in the dest receiver and also a dest receiver reference in the Gather + * node so that the required information will be picked and sent to workers. + */ +void +SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc) +{ + GatherState *gstate; + DestReceiver *dest; + + Assert(queryDesc && (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); + + gstate = (GatherState *) queryDesc->planstate; + dest = queryDesc->dest; + + /* + * Parallel insertions are not possible either if the upper node is not + * Gather or it's a Gather but it have some projections to perform. + */ + if (!IsA(gstate, GatherState) || gstate->ps.ps_ProjInfo) + return; + + /* Okay to parallelize inserts, so mark it. */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + ((DR_intorel *) dest)->is_parallel = true; + + /* + * For parallelizing inserts, we must send some information so that the + * workers can build their own dest receivers. For CTAS, this info is into + * clause, object id (to open the created table). + * + * Since the required information is available in the dest receiver, store + * a reference to it in the Gather state so that it will be used in + * ExecInitParallelPlan to pick the information. + */ + gstate->dest = dest; +} diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 9e1dc464cb..1ab3e0f600 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -48,6 +48,7 @@ static TupleTableSlot *ExecGather(PlanState *pstate); static TupleTableSlot *gather_getnext(GatherState *gatherstate); static MinimalTuple gather_readnext(GatherState *gatherstate); static void ExecShutdownGatherWorkers(GatherState *node); +static void ExecParallelInsert(GatherState *node); /* ---------------------------------------------------------------- @@ -131,6 +132,72 @@ ExecInitGather(Gather *node, EState *estate, int eflags) return gatherstate; } +/* ---------------------------------------------------------------- + * ExecParallelInsert(node) + * + * Facilitates parallel inserts by parallel workers and/or + * leader for commands such as CREATE TABLE AS. + * ---------------------------------------------------------------- + */ +static void +ExecParallelInsert(GatherState *node) +{ + /* + * By now, parallel workers if launched any, would have started their work + * i.e. insertion to target relation. In case the leader is also chosen to + * participate, then finish its share before going to wait for the parallel + * workers to finish. + * + * In case if no workers were launched, allow the leader to insert all + * tuples. + */ + if (node->need_to_scan_locally || node->nworkers_launched == 0) + { + EState *estate = node->ps.state; + TupleTableSlot *outerTupleSlot; + + for(;;) + { + /* Install our DSA area while executing the plan. */ + estate->es_query_dsa = + node->pei ? node->pei->area : NULL; + + outerTupleSlot = ExecProcNode(node->ps.lefttree); + + estate->es_query_dsa = NULL; + + if(TupIsNull(outerTupleSlot)) + break; + + (void) node->dest->receiveSlot(outerTupleSlot, node->dest); + + node->ps.state->es_processed++; + } + + node->need_to_scan_locally = false; + } + + if (node->nworkers_launched > 0) + { + /* + * We wait here for the parallel workers to finish their work and + * accumulate the tuples they inserted and also their buffer/WAL usage. + * We do not destroy the parallel context here, it will be done in + * ExecShutdownGather at the end of the plan. Note that the + * ExecShutdownGatherWorkers call from ExecShutdownGather will be a + * no-op. + */ + ExecShutdownGatherWorkers(node); + + /* + * Add up the total tuples inserted by all workers, to the tuples + * inserted by the leader(if any). This will be shared to client. + */ + node->ps.state->es_processed += + pg_atomic_read_u64(node->pei->processed); + } +} + /* ---------------------------------------------------------------- * ExecGather(node) * @@ -157,6 +224,17 @@ ExecGather(PlanState *pstate) { EState *estate = node->ps.state; Gather *gather = (Gather *) node->ps.plan; + ParallelInsertCmdKind parallel_ins_cmd; + bool perform_parallel_ins = false; + + /* + * Get the parallel insert command type from the dest receiver which + * would have been set in SetParallelInsertState(). + */ + parallel_ins_cmd = GetParallelInsertCmdType(node->dest); + + if (parallel_ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + perform_parallel_ins = true; /* * Sometimes we might have to run without parallelism; but if parallel @@ -165,6 +243,15 @@ ExecGather(PlanState *pstate) if (gather->num_workers > 0 && estate->es_use_parallel_mode) { ParallelContext *pcxt; + void *parallel_ins_info = NULL; + + /* + * Take the necessary information to be passed to workers for + * parallel inserts in commands such as CTAS. + */ + if (perform_parallel_ins) + parallel_ins_info = GetParallelInsertCmdInfo(node->dest, + parallel_ins_cmd); /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) @@ -172,7 +259,9 @@ ExecGather(PlanState *pstate) estate, gather->initParam, gather->num_workers, - node->tuples_needed); + node->tuples_needed, + parallel_ins_cmd, + parallel_ins_info); else ExecParallelReinitialize(node->ps.lefttree, node->pei, @@ -190,13 +279,22 @@ ExecGather(PlanState *pstate) /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - ExecParallelCreateReaders(node->pei); - /* Make a working array showing the active readers */ - node->nreaders = pcxt->nworkers_launched; - node->reader = (TupleQueueReader **) - palloc(node->nreaders * sizeof(TupleQueueReader *)); - memcpy(node->reader, node->pei->reader, - node->nreaders * sizeof(TupleQueueReader *)); + /* + * Do not create tuple queue readers for commands with parallel + * insertion. Because the gather node will not receive any + * tuples, the workers will insert the tuples into the target + * relation. + */ + if (!perform_parallel_ins) + { + ExecParallelCreateReaders(node->pei); + /* Make a working array showing the active readers */ + node->nreaders = pcxt->nworkers_launched; + node->reader = (TupleQueueReader **) + palloc(node->nreaders * sizeof(TupleQueueReader *)); + memcpy(node->reader, node->pei->reader, + node->nreaders * sizeof(TupleQueueReader *)); + } } else { @@ -205,12 +303,24 @@ ExecGather(PlanState *pstate) node->reader = NULL; } node->nextreader = 0; + + /* Free up the parallel insert info, if allocated. */ + if (parallel_ins_info) + pfree(parallel_ins_info); } /* Run plan locally if no workers or enabled and not single-copy. */ - node->need_to_scan_locally = (node->nreaders == 0) - || (!gather->single_copy && parallel_leader_participation); + node->need_to_scan_locally = (node->nreaders == 0 && + !perform_parallel_ins) || (!gather->single_copy && + parallel_leader_participation); node->initialized = true; + + /* Perform parallel inserts for commands such as CTAS. */ + if (perform_parallel_ins) + { + ExecParallelInsert(node); + return NULL; + } } /* diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index aa5743cebf..ea72473c8e 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -216,7 +216,9 @@ ExecGatherMerge(PlanState *pstate) estate, gm->initParam, gm->num_workers, - node->tuples_needed); + node->tuples_needed, + 0, + NULL); else ExecParallelReinitialize(node->ps.lefttree, node->pei, diff --git a/src/include/access/xact.h b/src/include/access/xact.h index f49a57b35e..4cd6f972ed 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -389,6 +389,7 @@ extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); +extern void SetCurrentCommandIdUsedForWorker(void); extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts); extern TimestampTz GetCurrentTransactionStartTimestamp(void); extern TimestampTz GetCurrentStatementStartTimestamp(void); diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h index ad5054d116..74022aab41 100644 --- a/src/include/commands/createas.h +++ b/src/include/commands/createas.h @@ -14,12 +14,28 @@ #ifndef CREATEAS_H #define CREATEAS_H +#include "access/heapam.h" #include "catalog/objectaddress.h" #include "nodes/params.h" #include "parser/parse_node.h" #include "tcop/dest.h" #include "utils/queryenvironment.h" +typedef struct +{ + DestReceiver pub; /* publicly-known function pointers */ + IntoClause *into; /* target relation specification */ + /* These fields are filled by intorel_startup: */ + Relation rel; /* relation to write to */ + ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ + CommandId output_cid; /* cmin to insert in output tuples */ + int ti_options; /* table_tuple_insert performance options */ + BulkInsertState bistate; /* bulk insert state */ + bool is_parallel; /* is parallelism to be considered? */ + bool is_parallel_worker; /* true for parallel worker */ + /* Used by parallel workers for opening the table created in the leader. */ + Oid object_id; +} DR_intorel; extern ObjectAddress ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, ParamListInfo params, QueryEnvironment *queryEnv, diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 3888175a2f..689f577c08 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -14,6 +14,7 @@ #define EXECPARALLEL_H #include "access/parallel.h" +#include "executor/execdesc.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "nodes/plannodes.h" @@ -35,11 +36,42 @@ typedef struct ParallelExecutorInfo /* These two arrays have pcxt->nworkers_launched entries: */ shm_mq_handle **tqueue; /* tuple queues for worker output */ struct TupleQueueReader **reader; /* tuple reader/writer support */ + /* Number of tuples inserted by all workers. */ + volatile pg_atomic_uint64 *processed; } ParallelExecutorInfo; +/* + * List the commands here for which parallel insertions are possible. + */ +typedef enum ParallelInsertCmdKind +{ + PARALLEL_INSERT_CMD_UNDEF = 0, + PARALLEL_INSERT_CMD_CREATE_TABLE_AS +} ParallelInsertCmdKind; + +/* + * For each of the command added to ParallelInsertCmdKind, add a corresponding + * structure encompassing the information that's required to be shared across + * different functions. The way it works is as follows: in the caller, fill in + * the information into one of below structures based on the command kind, pass + * the command kind and a pointer to the filled in structure as a void pointer + * to required functions, say ExecInitParallelPlan. The called functions will + * use command kind to dereference the void pointer to corresponding structure. + * + * This way, the functions that are needed for parallel insertions can be + * generic, clean and extensible. + */ +typedef struct ParallelInsertCTASInfo +{ + IntoClause *intoclause; + Oid objectid; +} ParallelInsertCTASInfo; + extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParam, int nworkers, - int64 tuples_needed); + int64 tuples_needed, + ParallelInsertCmdKind parallel_ins_cmd, + void *parallel_ins_info); extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); @@ -47,5 +79,11 @@ extern void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParam); extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); - +extern ParallelInsertCmdKind GetParallelInsertCmdType(DestReceiver *dest); +extern void *GetParallelInsertCmdInfo(DestReceiver *dest, + ParallelInsertCmdKind ins_cmd); +extern bool IsParallelInsertionAllowed(ParallelInsertCmdKind ins_cmd, + void *ins_info); +extern void SetParallelInsertState(ParallelInsertCmdKind ins_cmd, + QueryDesc *queryDesc); #endif /* EXECPARALLEL_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 48c3f570fa..297b3ff728 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -23,6 +23,7 @@ #include "nodes/tidbitmap.h" #include "partitioning/partdefs.h" #include "storage/condition_variable.h" +#include "tcop/dest.h" #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" @@ -2326,6 +2327,8 @@ typedef struct GatherState int nreaders; /* number of still-active workers */ int nextreader; /* next one to try to read from */ struct TupleQueueReader **reader; /* array with nreaders active entries */ + /* Dest receiver is stored when parallel inserts is allowed in CTAS. */ + DestReceiver *dest; } GatherState; /* ---------------- -- 2.25.1