Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan - Mailing list pgsql-bugs
From | Tom Lane |
---|---|
Subject | Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan |
Date | |
Msg-id | 1741.1584847383@sss.pgh.pa.us Whole thread Raw |
In response to | BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan (PG Bug reporting form <noreply@postgresql.org>) |
Responses |
Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan
Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan |
List | pgsql-bugs |
PG Bug reporting form <noreply@postgresql.org> writes: > [ $SUBJECT ] I got around to looking at this today, and what I find is that the problem is that exec_stmt_return_query() uses a portal (i.e. a cursor) to read the results of the query. That seemed like a good idea, back in the late bronze age, because it allowed plpgsql to fetch the query results a few rows at a time and not risk blowing out memory with a huge SPI result. However, the parallel-query infrastructure refuses to parallelize when the query is being read via a cursor. I think that the latter restriction is probably sane, because we don't want to suspend execution of a parallel query while we've got worker processes waiting. And there might be some implementation restrictions lurking under it too --- that's not a part of the code I know in any detail. However, there's no fundamental reason why exec_stmt_return_query has to use a cursor. It's going to run the query to completion immediately anyway, and shove all the result rows into a tuplestore. What we lack is a way to get the SPI query to pass its results directly to a tuplestore, without the SPITupleTable intermediary. (Note that the tuplestore can spill a large result to disk, whereas SPITupleTable can't do that.) So, attached is a draft patch to enable that. By getting rid of the intermediate SPITupleTable, this should improve the performance of RETURN QUERY somewhat even without considering the possibility of parallelizing the source query. I've not tried to measure that though. I've also not looked for other places that could use this new infrastructure, but there may well be some. One thing I'm not totally pleased about with this is adding another SPI interface routine using the old parameter-values API (that is, null flags as char ' '/'n'). That was the path of least resistance given the other moving parts in pl_exec.c and spi.c, but maybe we should try to modernize that before we set it in stone. Another thing standing between this patch and committability is suitable additions to the SPI documentation. But I saw no value in writing that before the previous point is settled. I will go add this to the next commitfest (for v14), but I wonder if we should try to squeeze it into v13? This isn't the only complaint we've gotten about non-parallelizability of RETURN QUERY. regards, tom lane diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 40be506..a23add6 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -383,7 +383,9 @@ PersistHoldablePortal(Portal portal) SetTuplestoreDestReceiverParams(queryDesc->dest, portal->holdStore, portal->holdContext, - true); + true, + NULL, + NULL); /* Fetch the result set into the tuplestore */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, false); diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index b108168..f2b698f 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -60,7 +60,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan); static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount); + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *caller_dest); static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes, Datum *Values, const char *Nulls); @@ -513,7 +514,7 @@ SPI_execute(const char *src, bool read_only, long tcount) res = _SPI_execute_plan(&plan, NULL, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -547,7 +548,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -576,7 +577,34 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, res = _SPI_execute_plan(plan, params, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} + +/* + * Execute a previously prepared plan, sending result tuples to the + * caller-supplied DestReceiver rather than the usual SPI output arrangements. + */ +int +SPI_execute_plan_with_receiver(SPIPlanPtr plan, + ParamListInfo params, + bool read_only, long tcount, + DestReceiver *dest) +{ + int res; + + if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0) + return SPI_ERROR_ARGUMENT; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + res = _SPI_execute_plan(plan, params, + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, dest); _SPI_end_call(true); return res; @@ -617,7 +645,7 @@ SPI_execute_snapshot(SPIPlanPtr plan, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), snapshot, crosscheck_snapshot, - read_only, fire_triggers, tcount); + read_only, fire_triggers, tcount, NULL); _SPI_end_call(true); return res; @@ -664,7 +692,56 @@ SPI_execute_with_args(const char *src, res = _SPI_execute_plan(&plan, paramLI, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} + +/* + * SPI_execute_with_receiver -- plan and execute a query with arguments + * + * This is the same as SPI_execute_with_args except that we send result tuples + * to the caller-supplied DestReceiver rather than the usual SPI output + * arrangements. + */ +int +SPI_execute_with_receiver(const char *src, + int nargs, Oid *argtypes, + Datum *Values, const char *Nulls, + bool read_only, long tcount, + DestReceiver *dest) +{ + int res; + _SPI_plan plan; + ParamListInfo paramLI; + + if (src == NULL || nargs < 0 || tcount < 0) + return SPI_ERROR_ARGUMENT; + + if (nargs > 0 && (argtypes == NULL || Values == NULL)) + return SPI_ERROR_PARAM; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + memset(&plan, 0, sizeof(_SPI_plan)); + plan.magic = _SPI_PLAN_MAGIC; + plan.cursor_options = CURSOR_OPT_PARALLEL_OK; + plan.nargs = nargs; + plan.argtypes = argtypes; + plan.parserSetup = NULL; + plan.parserSetupArg = NULL; + + paramLI = _SPI_convert_params(nargs, argtypes, + Values, Nulls); + + _SPI_prepare_oneshot_plan(src, &plan); + + res = _SPI_execute_plan(&plan, paramLI, + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, dest); _SPI_end_call(true); return res; @@ -2090,11 +2167,13 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan) * fire_triggers: true to fire AFTER triggers at end of query (normal case); * false means any AFTER triggers are postponed to end of outer query * tcount: execution tuple-count limit, or 0 for none + * caller_dest: DestReceiver to receive output, or NULL for normal SPI output */ static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount) + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *caller_dest) { int my_res = 0; uint64 my_processed = 0; @@ -2228,6 +2307,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, bool canSetTag = stmt->canSetTag; DestReceiver *dest; + /* + * Reset output state. (Note that if a non-SPI receiver is used, + * _SPI_current->processed will stay zero, and that's what we'll + * report to the caller. It's the receiver's job to count tuples + * in that case.) + */ _SPI_current->processed = 0; _SPI_current->tuptable = NULL; @@ -2267,7 +2352,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, UpdateActiveSnapshotCommandId(); } - dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); + /* + * Select appropriate tuple receiver. Output from non-canSetTag + * subqueries always goes to the bit bucket. + */ + if (!canSetTag) + dest = CreateDestReceiver(DestNone); + else if (caller_dest) + dest = caller_dest; + else + dest = CreateDestReceiver(DestSPI); if (stmt->utilityStmt == NULL) { @@ -2373,7 +2467,13 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, SPI_freetuptable(_SPI_current->tuptable); _SPI_current->tuptable = NULL; } - /* we know that the receiver doesn't need a destroy call */ + + /* + * We don't issue a destroy call to the receiver. The SPI and + * None receivers would ignore it anyway, while if the caller + * supplied a receiver, it's not our job to destroy it. + */ + if (res < 0) { my_res = res; @@ -2465,7 +2565,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount) switch (operation) { case CMD_SELECT: - if (queryDesc->dest->mydest != DestSPI) + if (queryDesc->dest->mydest == DestNone) { /* Don't return SPI_OK_SELECT if we're discarding result */ res = SPI_OK_UTILITY; diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c index 6c2dfbc..e8172be 100644 --- a/src/backend/executor/tstoreReceiver.c +++ b/src/backend/executor/tstoreReceiver.c @@ -8,6 +8,8 @@ * toasted values. This is to support cursors WITH HOLD, which must retain * data even if the underlying table is dropped. * + * Also optionally, we can apply a tuple conversion map before storing. + * * * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -21,6 +23,7 @@ #include "postgres.h" #include "access/detoast.h" +#include "access/tupconvert.h" #include "executor/tstoreReceiver.h" @@ -31,14 +34,19 @@ typedef struct Tuplestorestate *tstore; /* where to put the data */ MemoryContext cxt; /* context containing tstore */ bool detoast; /* were we told to detoast? */ + TupleDesc target_tupdesc; /* target tupdesc, or NULL if none */ + const char *map_failure_msg; /* tupdesc mapping failure message */ /* workspace: */ Datum *outvalues; /* values array for result tuple */ Datum *tofree; /* temp values to be pfree'd */ + TupleConversionMap *tupmap; /* conversion map, if needed */ + TupleTableSlot *mapslot; /* slot for mapped tuples */ } TStoreState; static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); +static bool tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self); /* @@ -69,27 +77,46 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) } } + /* Check if tuple conversion is needed */ + if (myState->target_tupdesc) + myState->tupmap = convert_tuples_by_position(typeinfo, + myState->target_tupdesc, + myState->map_failure_msg); + else + myState->tupmap = NULL; + /* Set up appropriate callback */ if (needtoast) { + Assert(!myState->tupmap); myState->pub.receiveSlot = tstoreReceiveSlot_detoast; /* Create workspace */ myState->outvalues = (Datum *) MemoryContextAlloc(myState->cxt, natts * sizeof(Datum)); myState->tofree = (Datum *) MemoryContextAlloc(myState->cxt, natts * sizeof(Datum)); + myState->mapslot = NULL; + } + else if (myState->tupmap) + { + myState->pub.receiveSlot = tstoreReceiveSlot_tupmap; + myState->outvalues = NULL; + myState->tofree = NULL; + myState->mapslot = MakeSingleTupleTableSlot(myState->target_tupdesc, + &TTSOpsVirtual); } else { myState->pub.receiveSlot = tstoreReceiveSlot_notoast; myState->outvalues = NULL; myState->tofree = NULL; + myState->mapslot = NULL; } } /* * Receive a tuple from the executor and store it in the tuplestore. - * This is for the easy case where we don't have to detoast. + * This is for the easy case where we don't have to detoast nor map anything. */ static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self) @@ -158,6 +185,21 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) } /* + * Receive a tuple from the executor and store it in the tuplestore. + * This is for the case where we must apply a tuple conversion map. + */ +static bool +tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self) +{ + TStoreState *myState = (TStoreState *) self; + + execute_attr_map_slot(myState->tupmap->attrMap, slot, myState->mapslot); + tuplestore_puttupleslot(myState->tstore, myState->mapslot); + + return true; +} + +/* * Clean up at end of an executor run */ static void @@ -172,6 +214,12 @@ tstoreShutdownReceiver(DestReceiver *self) if (myState->tofree) pfree(myState->tofree); myState->tofree = NULL; + if (myState->tupmap) + free_conversion_map(myState->tupmap); + myState->tupmap = NULL; + if (myState->mapslot) + ExecDropSingleTupleTableSlot(myState->mapslot); + myState->mapslot = NULL; } /* @@ -204,17 +252,32 @@ CreateTuplestoreDestReceiver(void) /* * Set parameters for a TuplestoreDestReceiver + * + * tStore: where to store the tuples + * tContext: memory context containing tStore + * detoast: forcibly detoast contained data? + * target_tupdesc: if not NULL, forcibly convert tuples to this rowtype + * map_failure_msg: error message to use if mapping to target_tupdesc fails + * + * We don't currently support both detoast and target_tupdesc at the same + * time, just because no existing caller needs that combination. */ void SetTuplestoreDestReceiverParams(DestReceiver *self, Tuplestorestate *tStore, MemoryContext tContext, - bool detoast) + bool detoast, + TupleDesc target_tupdesc, + const char *map_failure_msg) { TStoreState *myState = (TStoreState *) self; + Assert(!(detoast && target_tupdesc)); + Assert(myState->pub.mydest == DestTuplestore); myState->tstore = tStore; myState->cxt = tContext; myState->detoast = detoast; + myState->target_tupdesc = target_tupdesc; + myState->map_failure_msg = map_failure_msg; } diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 5781fb2..96ea74f 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -996,7 +996,9 @@ FillPortalStore(Portal portal, bool isTopLevel) SetTuplestoreDestReceiverParams(treceiver, portal->holdStore, portal->holdContext, - false); + false, + NULL, + NULL); switch (portal->strategy) { diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index 06de20a..9be0c68 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -90,6 +90,10 @@ extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, bool read_only, long tcount); +extern int SPI_execute_plan_with_receiver(SPIPlanPtr plan, + ParamListInfo params, + bool read_only, long tcount, + DestReceiver *dest); extern int SPI_exec(const char *src, long tcount); extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount); @@ -102,6 +106,11 @@ extern int SPI_execute_with_args(const char *src, int nargs, Oid *argtypes, Datum *Values, const char *Nulls, bool read_only, long tcount); +extern int SPI_execute_with_receiver(const char *src, + int nargs, Oid *argtypes, + Datum *Values, const char *Nulls, + bool read_only, long tcount, + DestReceiver *dest); extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes); extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes, int cursorOptions); diff --git a/src/include/executor/tstoreReceiver.h b/src/include/executor/tstoreReceiver.h index b2390c4..e9461cf 100644 --- a/src/include/executor/tstoreReceiver.h +++ b/src/include/executor/tstoreReceiver.h @@ -24,6 +24,8 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void); extern void SetTuplestoreDestReceiverParams(DestReceiver *self, Tuplestorestate *tStore, MemoryContext tContext, - bool detoast); + bool detoast, + TupleDesc target_tupdesc, + const char *map_failure_msg); #endif /* TSTORE_RECEIVER_H */ diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index d3ad4fa..b5563ee 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -27,6 +27,7 @@ #include "executor/execExpr.h" #include "executor/spi.h" #include "executor/spi_priv.h" +#include "executor/tstoreReceiver.h" #include "funcapi.h" #include "mb/stringinfo_mb.h" #include "miscadmin.h" @@ -3491,9 +3492,11 @@ static int exec_stmt_return_query(PLpgSQL_execstate *estate, PLpgSQL_stmt_return_query *stmt) { - Portal portal; - uint64 processed = 0; - TupleConversionMap *tupmap; + int64 tcount; + DestReceiver *treceiver; + int rc; + uint64 processed; + MemoryContext stmt_mcontext = get_stmt_mcontext(estate); MemoryContext oldcontext; if (!estate->retisset) @@ -3503,60 +3506,115 @@ exec_stmt_return_query(PLpgSQL_execstate *estate, if (estate->tuple_store == NULL) exec_init_tuple_store(estate); + /* There might be some tuples in the tuplestore already */ + tcount = tuplestore_tuple_count(estate->tuple_store); + + /* + * Set up DestReceiver to transfer results directly to tuplestore, + * converting rowtype if necessary. DestReceiver lives in mcontext. + */ + oldcontext = MemoryContextSwitchTo(stmt_mcontext); + treceiver = CreateDestReceiver(DestTuplestore); + SetTuplestoreDestReceiverParams(treceiver, + estate->tuple_store, + estate->tuple_store_cxt, + false, + estate->tuple_store_desc, + gettext_noop("structure of query does not match function result type")); + MemoryContextSwitchTo(oldcontext); if (stmt->query != NULL) { /* static query */ - exec_run_select(estate, stmt->query, 0, &portal); + PLpgSQL_expr *expr = stmt->query; + ParamListInfo paramLI; + + /* + * On the first call for this expression generate the plan. + */ + if (expr->plan == NULL) + exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true); + + /* + * Set up ParamListInfo to pass to executor + */ + paramLI = setup_param_list(estate, expr); + + /* + * Execute the query + */ + rc = SPI_execute_plan_with_receiver(expr->plan, paramLI, + estate->readonly_func, 0, + treceiver); + if (rc != SPI_OK_SELECT) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("query \"%s\" is not a SELECT", expr->query))); } else { /* RETURN QUERY EXECUTE */ - Assert(stmt->dynquery != NULL); - portal = exec_dynquery_with_params(estate, stmt->dynquery, - stmt->params, NULL, - 0); - } + Datum query; + bool isnull; + Oid restype; + int32 restypmod; + char *querystr; - /* Use eval_mcontext for tuple conversion work */ - oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate)); + /* + * Evaluate the string expression after the EXECUTE keyword. Its + * result is the querystring we have to execute. + */ + Assert(stmt->dynquery != NULL); + query = exec_eval_expr(estate, stmt->dynquery, + &isnull, &restype, &restypmod); + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("query string argument of EXECUTE is null"))); - tupmap = convert_tuples_by_position(portal->tupDesc, - estate->tuple_store_desc, - gettext_noop("structure of query does not match function result type")); + /* Get the C-String representation */ + querystr = convert_value_to_string(estate, query, restype); - while (true) - { - uint64 i; + /* copy it into the stmt_mcontext before we clean up */ + querystr = MemoryContextStrdup(stmt_mcontext, querystr); - SPI_cursor_fetch(portal, true, 50); + exec_eval_cleanup(estate); - /* SPI will have changed CurrentMemoryContext */ - MemoryContextSwitchTo(get_eval_mcontext(estate)); + /* Execute query, passing params if necessary */ + if (stmt->params) + { + PreparedParamsData *ppd; - if (SPI_processed == 0) - break; + ppd = exec_eval_using_params(estate, stmt->params); - for (i = 0; i < SPI_processed; i++) + rc = SPI_execute_with_receiver(querystr, + ppd->nargs, ppd->types, + ppd->values, ppd->nulls, + estate->readonly_func, + 0, + treceiver); + } + else { - HeapTuple tuple = SPI_tuptable->vals[i]; - - if (tupmap) - tuple = execute_attr_map_tuple(tuple, tupmap); - tuplestore_puttuple(estate->tuple_store, tuple); - if (tupmap) - heap_freetuple(tuple); - processed++; + rc = SPI_execute_with_receiver(querystr, + 0, NULL, NULL, NULL, + estate->readonly_func, + 0, + treceiver); } - SPI_freetuptable(SPI_tuptable); + if (rc < 0) + elog(ERROR, "SPI_execute_with_receiver failed executing query \"%s\": %s", + querystr, SPI_result_code_string(rc)); } - SPI_freetuptable(SPI_tuptable); - SPI_cursor_close(portal); - - MemoryContextSwitchTo(oldcontext); + /* Clean up */ + treceiver->rDestroy(treceiver); exec_eval_cleanup(estate); + MemoryContextReset(stmt_mcontext); + + /* Count how many tuples we got */ + processed = tuplestore_tuple_count(estate->tuple_store) - tcount; estate->eval_processed = processed; exec_set_found(estate, processed != 0);
pgsql-bugs by date: