Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan - Mailing list pgsql-bugs

PG Bug reporting form <noreply@postgresql.org> writes:
> [ $SUBJECT ]

I got around to looking at this today, and what I find is that the
problem is that exec_stmt_return_query() uses a portal (i.e. a cursor)
to read the results of the query.  That seemed like a good idea, back
in the late bronze age, because it allowed plpgsql to fetch the query
results a few rows at a time and not risk blowing out memory with a huge
SPI result.  However, the parallel-query infrastructure refuses to
parallelize when the query is being read via a cursor.

I think that the latter restriction is probably sane, because we don't
want to suspend execution of a parallel query while we've got worker
processes waiting.  And there might be some implementation restrictions
lurking under it too --- that's not a part of the code I know in any
detail.

However, there's no fundamental reason why exec_stmt_return_query has
to use a cursor.  It's going to run the query to completion immediately
anyway, and shove all the result rows into a tuplestore.  What we lack
is a way to get the SPI query to pass its results directly to a
tuplestore, without the SPITupleTable intermediary.  (Note that the
tuplestore can spill a large result to disk, whereas SPITupleTable
can't do that.)

So, attached is a draft patch to enable that.  By getting rid of the
intermediate SPITupleTable, this should improve the performance of
RETURN QUERY somewhat even without considering the possibility of
parallelizing the source query.  I've not tried to measure that though.
I've also not looked for other places that could use this new
infrastructure, but there may well be some.

One thing I'm not totally pleased about with this is adding another
SPI interface routine using the old parameter-values API (that is,
null flags as char ' '/'n').  That was the path of least resistance
given the other moving parts in pl_exec.c and spi.c, but maybe we
should try to modernize that before we set it in stone.

Another thing standing between this patch and committability is suitable
additions to the SPI documentation.  But I saw no value in writing that
before the previous point is settled.

I will go add this to the next commitfest (for v14), but I wonder
if we should try to squeeze it into v13?  This isn't the only
complaint we've gotten about non-parallelizability of RETURN QUERY.

            regards, tom lane

diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 40be506..a23add6 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -383,7 +383,9 @@ PersistHoldablePortal(Portal portal)
         SetTuplestoreDestReceiverParams(queryDesc->dest,
                                         portal->holdStore,
                                         portal->holdContext,
-                                        true);
+                                        true,
+                                        NULL,
+                                        NULL);

         /* Fetch the result set into the tuplestore */
         ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index b108168..f2b698f 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -60,7 +60,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);

 static int    _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                               Snapshot snapshot, Snapshot crosscheck_snapshot,
-                              bool read_only, bool fire_triggers, uint64 tcount);
+                              bool read_only, bool fire_triggers, uint64 tcount,
+                              DestReceiver *caller_dest);

 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
                                          Datum *Values, const char *Nulls);
@@ -513,7 +514,7 @@ SPI_execute(const char *src, bool read_only, long tcount)

     res = _SPI_execute_plan(&plan, NULL,
                             InvalidSnapshot, InvalidSnapshot,
-                            read_only, true, tcount);
+                            read_only, true, tcount, NULL);

     _SPI_end_call(true);
     return res;
@@ -547,7 +548,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                             _SPI_convert_params(plan->nargs, plan->argtypes,
                                                 Values, Nulls),
                             InvalidSnapshot, InvalidSnapshot,
-                            read_only, true, tcount);
+                            read_only, true, tcount, NULL);

     _SPI_end_call(true);
     return res;
@@ -576,7 +577,34 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,

     res = _SPI_execute_plan(plan, params,
                             InvalidSnapshot, InvalidSnapshot,
-                            read_only, true, tcount);
+                            read_only, true, tcount, NULL);
+
+    _SPI_end_call(true);
+    return res;
+}
+
+/*
+ * Execute a previously prepared plan, sending result tuples to the
+ * caller-supplied DestReceiver rather than the usual SPI output arrangements.
+ */
+int
+SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+                               ParamListInfo params,
+                               bool read_only, long tcount,
+                               DestReceiver *dest)
+{
+    int            res;
+
+    if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+        return SPI_ERROR_ARGUMENT;
+
+    res = _SPI_begin_call(true);
+    if (res < 0)
+        return res;
+
+    res = _SPI_execute_plan(plan, params,
+                            InvalidSnapshot, InvalidSnapshot,
+                            read_only, true, tcount, dest);

     _SPI_end_call(true);
     return res;
@@ -617,7 +645,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
                             _SPI_convert_params(plan->nargs, plan->argtypes,
                                                 Values, Nulls),
                             snapshot, crosscheck_snapshot,
-                            read_only, fire_triggers, tcount);
+                            read_only, fire_triggers, tcount, NULL);

     _SPI_end_call(true);
     return res;
@@ -664,7 +692,56 @@ SPI_execute_with_args(const char *src,

     res = _SPI_execute_plan(&plan, paramLI,
                             InvalidSnapshot, InvalidSnapshot,
-                            read_only, true, tcount);
+                            read_only, true, tcount, NULL);
+
+    _SPI_end_call(true);
+    return res;
+}
+
+/*
+ * SPI_execute_with_receiver -- plan and execute a query with arguments
+ *
+ * This is the same as SPI_execute_with_args except that we send result tuples
+ * to the caller-supplied DestReceiver rather than the usual SPI output
+ * arrangements.
+ */
+int
+SPI_execute_with_receiver(const char *src,
+                          int nargs, Oid *argtypes,
+                          Datum *Values, const char *Nulls,
+                          bool read_only, long tcount,
+                          DestReceiver *dest)
+{
+    int            res;
+    _SPI_plan    plan;
+    ParamListInfo paramLI;
+
+    if (src == NULL || nargs < 0 || tcount < 0)
+        return SPI_ERROR_ARGUMENT;
+
+    if (nargs > 0 && (argtypes == NULL || Values == NULL))
+        return SPI_ERROR_PARAM;
+
+    res = _SPI_begin_call(true);
+    if (res < 0)
+        return res;
+
+    memset(&plan, 0, sizeof(_SPI_plan));
+    plan.magic = _SPI_PLAN_MAGIC;
+    plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
+    plan.nargs = nargs;
+    plan.argtypes = argtypes;
+    plan.parserSetup = NULL;
+    plan.parserSetupArg = NULL;
+
+    paramLI = _SPI_convert_params(nargs, argtypes,
+                                  Values, Nulls);
+
+    _SPI_prepare_oneshot_plan(src, &plan);
+
+    res = _SPI_execute_plan(&plan, paramLI,
+                            InvalidSnapshot, InvalidSnapshot,
+                            read_only, true, tcount, dest);

     _SPI_end_call(true);
     return res;
@@ -2090,11 +2167,13 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
  *        false means any AFTER triggers are postponed to end of outer query
  * tcount: execution tuple-count limit, or 0 for none
+ * caller_dest: DestReceiver to receive output, or NULL for normal SPI output
  */
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                   Snapshot snapshot, Snapshot crosscheck_snapshot,
-                  bool read_only, bool fire_triggers, uint64 tcount)
+                  bool read_only, bool fire_triggers, uint64 tcount,
+                  DestReceiver *caller_dest)
 {
     int            my_res = 0;
     uint64        my_processed = 0;
@@ -2228,6 +2307,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
             bool        canSetTag = stmt->canSetTag;
             DestReceiver *dest;

+            /*
+             * Reset output state.  (Note that if a non-SPI receiver is used,
+             * _SPI_current->processed will stay zero, and that's what we'll
+             * report to the caller.  It's the receiver's job to count tuples
+             * in that case.)
+             */
             _SPI_current->processed = 0;
             _SPI_current->tuptable = NULL;

@@ -2267,7 +2352,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                 UpdateActiveSnapshotCommandId();
             }

-            dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+            /*
+             * Select appropriate tuple receiver.  Output from non-canSetTag
+             * subqueries always goes to the bit bucket.
+             */
+            if (!canSetTag)
+                dest = CreateDestReceiver(DestNone);
+            else if (caller_dest)
+                dest = caller_dest;
+            else
+                dest = CreateDestReceiver(DestSPI);

             if (stmt->utilityStmt == NULL)
             {
@@ -2373,7 +2467,13 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                 SPI_freetuptable(_SPI_current->tuptable);
                 _SPI_current->tuptable = NULL;
             }
-            /* we know that the receiver doesn't need a destroy call */
+
+            /*
+             * We don't issue a destroy call to the receiver.  The SPI and
+             * None receivers would ignore it anyway, while if the caller
+             * supplied a receiver, it's not our job to destroy it.
+             */
+
             if (res < 0)
             {
                 my_res = res;
@@ -2465,7 +2565,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
     switch (operation)
     {
         case CMD_SELECT:
-            if (queryDesc->dest->mydest != DestSPI)
+            if (queryDesc->dest->mydest == DestNone)
             {
                 /* Don't return SPI_OK_SELECT if we're discarding result */
                 res = SPI_OK_UTILITY;
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index 6c2dfbc..e8172be 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -8,6 +8,8 @@
  * toasted values.  This is to support cursors WITH HOLD, which must retain
  * data even if the underlying table is dropped.
  *
+ * Also optionally, we can apply a tuple conversion map before storing.
+ *
  *
  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,6 +23,7 @@
 #include "postgres.h"

 #include "access/detoast.h"
+#include "access/tupconvert.h"
 #include "executor/tstoreReceiver.h"


@@ -31,14 +34,19 @@ typedef struct
     Tuplestorestate *tstore;    /* where to put the data */
     MemoryContext cxt;            /* context containing tstore */
     bool        detoast;        /* were we told to detoast? */
+    TupleDesc    target_tupdesc; /* target tupdesc, or NULL if none */
+    const char *map_failure_msg;    /* tupdesc mapping failure message */
     /* workspace: */
     Datum       *outvalues;        /* values array for result tuple */
     Datum       *tofree;            /* temp values to be pfree'd */
+    TupleConversionMap *tupmap; /* conversion map, if needed */
+    TupleTableSlot *mapslot;    /* slot for mapped tuples */
 } TStoreState;


 static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
 static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self);


 /*
@@ -69,27 +77,46 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
         }
     }

+    /* Check if tuple conversion is needed */
+    if (myState->target_tupdesc)
+        myState->tupmap = convert_tuples_by_position(typeinfo,
+                                                     myState->target_tupdesc,
+                                                     myState->map_failure_msg);
+    else
+        myState->tupmap = NULL;
+
     /* Set up appropriate callback */
     if (needtoast)
     {
+        Assert(!myState->tupmap);
         myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
         /* Create workspace */
         myState->outvalues = (Datum *)
             MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
         myState->tofree = (Datum *)
             MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+        myState->mapslot = NULL;
+    }
+    else if (myState->tupmap)
+    {
+        myState->pub.receiveSlot = tstoreReceiveSlot_tupmap;
+        myState->outvalues = NULL;
+        myState->tofree = NULL;
+        myState->mapslot = MakeSingleTupleTableSlot(myState->target_tupdesc,
+                                                    &TTSOpsVirtual);
     }
     else
     {
         myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
         myState->outvalues = NULL;
         myState->tofree = NULL;
+        myState->mapslot = NULL;
     }
 }

 /*
  * Receive a tuple from the executor and store it in the tuplestore.
- * This is for the easy case where we don't have to detoast.
+ * This is for the easy case where we don't have to detoast nor map anything.
  */
 static bool
 tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
@@ -158,6 +185,21 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
 }

 /*
+ * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the case where we must apply a tuple conversion map.
+ */
+static bool
+tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self)
+{
+    TStoreState *myState = (TStoreState *) self;
+
+    execute_attr_map_slot(myState->tupmap->attrMap, slot, myState->mapslot);
+    tuplestore_puttupleslot(myState->tstore, myState->mapslot);
+
+    return true;
+}
+
+/*
  * Clean up at end of an executor run
  */
 static void
@@ -172,6 +214,12 @@ tstoreShutdownReceiver(DestReceiver *self)
     if (myState->tofree)
         pfree(myState->tofree);
     myState->tofree = NULL;
+    if (myState->tupmap)
+        free_conversion_map(myState->tupmap);
+    myState->tupmap = NULL;
+    if (myState->mapslot)
+        ExecDropSingleTupleTableSlot(myState->mapslot);
+    myState->mapslot = NULL;
 }

 /*
@@ -204,17 +252,32 @@ CreateTuplestoreDestReceiver(void)

 /*
  * Set parameters for a TuplestoreDestReceiver
+ *
+ * tStore: where to store the tuples
+ * tContext: memory context containing tStore
+ * detoast: forcibly detoast contained data?
+ * target_tupdesc: if not NULL, forcibly convert tuples to this rowtype
+ * map_failure_msg: error message to use if mapping to target_tupdesc fails
+ *
+ * We don't currently support both detoast and target_tupdesc at the same
+ * time, just because no existing caller needs that combination.
  */
 void
 SetTuplestoreDestReceiverParams(DestReceiver *self,
                                 Tuplestorestate *tStore,
                                 MemoryContext tContext,
-                                bool detoast)
+                                bool detoast,
+                                TupleDesc target_tupdesc,
+                                const char *map_failure_msg)
 {
     TStoreState *myState = (TStoreState *) self;

+    Assert(!(detoast && target_tupdesc));
+
     Assert(myState->pub.mydest == DestTuplestore);
     myState->tstore = tStore;
     myState->cxt = tContext;
     myState->detoast = detoast;
+    myState->target_tupdesc = target_tupdesc;
+    myState->map_failure_msg = map_failure_msg;
 }
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5781fb2..96ea74f 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -996,7 +996,9 @@ FillPortalStore(Portal portal, bool isTopLevel)
     SetTuplestoreDestReceiverParams(treceiver,
                                     portal->holdStore,
                                     portal->holdContext,
-                                    false);
+                                    false,
+                                    NULL,
+                                    NULL);

     switch (portal->strategy)
     {
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 06de20a..9be0c68 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -90,6 +90,10 @@ extern int    SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 extern int    SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
                                             ParamListInfo params,
                                             bool read_only, long tcount);
+extern int    SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+                                           ParamListInfo params,
+                                           bool read_only, long tcount,
+                                           DestReceiver *dest);
 extern int    SPI_exec(const char *src, long tcount);
 extern int    SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                       long tcount);
@@ -102,6 +106,11 @@ extern int    SPI_execute_with_args(const char *src,
                                   int nargs, Oid *argtypes,
                                   Datum *Values, const char *Nulls,
                                   bool read_only, long tcount);
+extern int    SPI_execute_with_receiver(const char *src,
+                                      int nargs, Oid *argtypes,
+                                      Datum *Values, const char *Nulls,
+                                      bool read_only, long tcount,
+                                      DestReceiver *dest);
 extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
 extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
                                      int cursorOptions);
diff --git a/src/include/executor/tstoreReceiver.h b/src/include/executor/tstoreReceiver.h
index b2390c4..e9461cf 100644
--- a/src/include/executor/tstoreReceiver.h
+++ b/src/include/executor/tstoreReceiver.h
@@ -24,6 +24,8 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
 extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
                                             Tuplestorestate *tStore,
                                             MemoryContext tContext,
-                                            bool detoast);
+                                            bool detoast,
+                                            TupleDesc target_tupdesc,
+                                            const char *map_failure_msg);

 #endif                            /* TSTORE_RECEIVER_H */
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index d3ad4fa..b5563ee 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -27,6 +27,7 @@
 #include "executor/execExpr.h"
 #include "executor/spi.h"
 #include "executor/spi_priv.h"
+#include "executor/tstoreReceiver.h"
 #include "funcapi.h"
 #include "mb/stringinfo_mb.h"
 #include "miscadmin.h"
@@ -3491,9 +3492,11 @@ static int
 exec_stmt_return_query(PLpgSQL_execstate *estate,
                        PLpgSQL_stmt_return_query *stmt)
 {
-    Portal        portal;
-    uint64        processed = 0;
-    TupleConversionMap *tupmap;
+    int64        tcount;
+    DestReceiver *treceiver;
+    int            rc;
+    uint64        processed;
+    MemoryContext stmt_mcontext = get_stmt_mcontext(estate);
     MemoryContext oldcontext;

     if (!estate->retisset)
@@ -3503,60 +3506,115 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,

     if (estate->tuple_store == NULL)
         exec_init_tuple_store(estate);
+    /* There might be some tuples in the tuplestore already */
+    tcount = tuplestore_tuple_count(estate->tuple_store);
+
+    /*
+     * Set up DestReceiver to transfer results directly to tuplestore,
+     * converting rowtype if necessary.  DestReceiver lives in mcontext.
+     */
+    oldcontext = MemoryContextSwitchTo(stmt_mcontext);
+    treceiver = CreateDestReceiver(DestTuplestore);
+    SetTuplestoreDestReceiverParams(treceiver,
+                                    estate->tuple_store,
+                                    estate->tuple_store_cxt,
+                                    false,
+                                    estate->tuple_store_desc,
+                                    gettext_noop("structure of query does not match function result type"));
+    MemoryContextSwitchTo(oldcontext);

     if (stmt->query != NULL)
     {
         /* static query */
-        exec_run_select(estate, stmt->query, 0, &portal);
+        PLpgSQL_expr *expr = stmt->query;
+        ParamListInfo paramLI;
+
+        /*
+         * On the first call for this expression generate the plan.
+         */
+        if (expr->plan == NULL)
+            exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
+
+        /*
+         * Set up ParamListInfo to pass to executor
+         */
+        paramLI = setup_param_list(estate, expr);
+
+        /*
+         * Execute the query
+         */
+        rc = SPI_execute_plan_with_receiver(expr->plan, paramLI,
+                                            estate->readonly_func, 0,
+                                            treceiver);
+        if (rc != SPI_OK_SELECT)
+            ereport(ERROR,
+                    (errcode(ERRCODE_SYNTAX_ERROR),
+                     errmsg("query \"%s\" is not a SELECT", expr->query)));
     }
     else
     {
         /* RETURN QUERY EXECUTE */
-        Assert(stmt->dynquery != NULL);
-        portal = exec_dynquery_with_params(estate, stmt->dynquery,
-                                           stmt->params, NULL,
-                                           0);
-    }
+        Datum        query;
+        bool        isnull;
+        Oid            restype;
+        int32        restypmod;
+        char       *querystr;

-    /* Use eval_mcontext for tuple conversion work */
-    oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
+        /*
+         * Evaluate the string expression after the EXECUTE keyword. Its
+         * result is the querystring we have to execute.
+         */
+        Assert(stmt->dynquery != NULL);
+        query = exec_eval_expr(estate, stmt->dynquery,
+                               &isnull, &restype, &restypmod);
+        if (isnull)
+            ereport(ERROR,
+                    (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+                     errmsg("query string argument of EXECUTE is null")));

-    tupmap = convert_tuples_by_position(portal->tupDesc,
-                                        estate->tuple_store_desc,
-                                        gettext_noop("structure of query does not match function result type"));
+        /* Get the C-String representation */
+        querystr = convert_value_to_string(estate, query, restype);

-    while (true)
-    {
-        uint64        i;
+        /* copy it into the stmt_mcontext before we clean up */
+        querystr = MemoryContextStrdup(stmt_mcontext, querystr);

-        SPI_cursor_fetch(portal, true, 50);
+        exec_eval_cleanup(estate);

-        /* SPI will have changed CurrentMemoryContext */
-        MemoryContextSwitchTo(get_eval_mcontext(estate));
+        /* Execute query, passing params if necessary */
+        if (stmt->params)
+        {
+            PreparedParamsData *ppd;

-        if (SPI_processed == 0)
-            break;
+            ppd = exec_eval_using_params(estate, stmt->params);

-        for (i = 0; i < SPI_processed; i++)
+            rc = SPI_execute_with_receiver(querystr,
+                                           ppd->nargs, ppd->types,
+                                           ppd->values, ppd->nulls,
+                                           estate->readonly_func,
+                                           0,
+                                           treceiver);
+        }
+        else
         {
-            HeapTuple    tuple = SPI_tuptable->vals[i];
-
-            if (tupmap)
-                tuple = execute_attr_map_tuple(tuple, tupmap);
-            tuplestore_puttuple(estate->tuple_store, tuple);
-            if (tupmap)
-                heap_freetuple(tuple);
-            processed++;
+            rc = SPI_execute_with_receiver(querystr,
+                                           0, NULL, NULL, NULL,
+                                           estate->readonly_func,
+                                           0,
+                                           treceiver);
         }

-        SPI_freetuptable(SPI_tuptable);
+        if (rc < 0)
+            elog(ERROR, "SPI_execute_with_receiver failed executing query \"%s\": %s",
+                 querystr, SPI_result_code_string(rc));
     }

-    SPI_freetuptable(SPI_tuptable);
-    SPI_cursor_close(portal);
-
-    MemoryContextSwitchTo(oldcontext);
+    /* Clean up */
+    treceiver->rDestroy(treceiver);
     exec_eval_cleanup(estate);
+    MemoryContextReset(stmt_mcontext);
+
+    /* Count how many tuples we got */
+    processed = tuplestore_tuple_count(estate->tuple_store) - tcount;

     estate->eval_processed = processed;
     exec_set_found(estate, processed != 0);

pgsql-bugs by date:

Previous
From: "David G. Johnston"
Date:
Subject: Re: BUG #16309: Postgres's ISO 8601 date output is not compliant
Next
From: Pavel Stehule
Date:
Subject: Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan