Re: Allow substitute allocators for PGresult. - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Allow substitute allocators for PGresult.
Date
Msg-id 20111201.194803.07040624.horiguchi.kyotaro@oss.ntt.co.jp
Whole thread Raw
In response to Re: Allow substitute allocators for PGresult.  (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>)
Responses Re: Allow substitute allocators for PGresult.  (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>)
Re: Allow substitute allocators for PGresult.  (Greg Smith <greg@2ndQuadrant.com>)
List pgsql-hackers
Ouch! I'm sorry for making a reverse patch for the first modification.

This is an amendment of the message below. The body text is
copied into this message.

http://archives.postgresql.org/message-id/20111201.192419.103527179.horiguchi.kyotaro@oss.ntt.co.jp

=======
Hello, This is the next version of Allow substitute allocators
for PGresult.

Totally chaning the concept from the previous one, this patch
allows libpq to handle alternative tuple store for received
tuples.

Design guidelines are shown below.
- No need to modify existing client code of libpq.
- Existing libpq client runs with roughly same performance, and  dblink with modification runs faster to some extent
and requires less memory.
 

I have measured roughly of run time and memory requirement for
three configurations on CentOS6 on Vbox with 2GB mem 4 cores
running on Win7-Corei7, transferring (30 bytes * 2 cols) *
2000000 tuples (120MB net) within this virutal machine. The
results are below.
                      xfer time    Peak RSS
Original                    : 6.02s        850MB
libpq patch + Original dblink    : 6.11s        850MB
full patch                       : 4.44s        643MB

xfer time here is the mean of five 'real time's measured by
running sql script like this after warmup run.

=== test.sql
select dblink_connect('c', 'host=localhost port=5432 dbname=test');
select * from dblink('c', 'select a,c from foo limit 2000000') as (a text, b bytea) limit 1;

select dblink_disconnect('c');
===
$  for i in $(seq 1 10); do time psql test -f t.sql; done 
===

Peak RSS is measured by picking up heap Rss in /proc/[pid]/smaps.


It seems somewhat slow using patched libpq and original dblink,
but it seems within error range too. If this amount of slowdown
is not permissible, it might be improved by restoring the static
call route before for extra redundancy of the code.

On the other hand, full patch version seems obviously fast and
requires less memory. Isn't it nice?

This patch consists of two sub patches.

The first is a patch for libpq to allow rewiring tuple storage
mechanism. But default behavior is not changed. Existing libpq
client should run with it.

The second is modify dblink to storing received tuples into
tuplestore directly using the mechanism above.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..a360d78 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,7 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQregisterTupleAdder      161
+PQgetAsCstring          162
+PQgetAddTupleParam      163
+PQsetAddTupleErrMes      164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 50f3f83..437be26 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
+    conn->addTupleFunc = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5064,3 +5065,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
+void
+PQregisterTupleAdder(PGconn *conn, addTupleFunction func, void *param)
+{
+    conn->addTupleFunc = func;
+    conn->addTupleFuncParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 113aab0..c8ec9bd 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -48,7 +48,6 @@ char       *const pgresStatus[] = {static int    static_client_encoding = PG_SQL_ASCII;static bool
static_std_strings= false;
 
-static PGEvent *dupEvents(PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static int
PQsendQueryGuts(PGconn*conn,
 
@@ -66,7 +65,9 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
-
+static void *pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func,
+                                   int id, size_t len);
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup);/* ---------------- * Space management for PGresult.
@@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->addTupleFunc = pqDefaultAddTupleFunc;
+    result->addTupleFuncParam = NULL;
+    result->addTupleFuncErrMes = NULL;    if (conn)    {
@@ -194,6 +198,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
+
+        if (conn->addTupleFunc)
+        {
+            result->addTupleFunc = conn->addTupleFunc;
+            result->addTupleFuncParam = conn->addTupleFuncParam;
+        }    }    else    {
@@ -487,6 +497,33 @@ PQresultAlloc(PGresult *res, size_t nBytes)    return pqResultAlloc(res, nBytes, TRUE);}
+void *
+pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, int id, size_t len)
+{
+    void *p;
+
+    switch (func)
+    {
+        case ADDTUP_ALLOC_TEXT:
+            return pqResultAlloc(res, len, TRUE);
+
+        case ADDTUP_ALLOC_BINARY:
+            p = pqResultAlloc(res, len, FALSE);
+
+            if (id == -1)
+                res->addTupleFuncParam = p;
+
+            return p;
+
+        case ADDTUP_ADD_TUPLE:
+            return pqAddTuple(res, res->addTupleFuncParam);
+
+        default:
+            /* Ignore */
+            break;
+    }
+    return NULL;
+}/* * pqResultAlloc - *        Allocate subsidiary storage for a PGresult.
@@ -830,9 +867,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple *      add a row
pointerto the PGresult structure, growing it if necessary
 
- *      Returns TRUE if OK, FALSE if not enough memory to add the row
+ *      Returns tup if OK, NULL if not enough memory to add the row. */
-int
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup){    if (res->ntups >= res->tupArrSize)
@@ -858,13 +895,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)            newTuples = (PGresAttValue **)
    realloc(res->tuples, newSize * sizeof(PGresAttValue *));        if (!newTuples)
 
-            return FALSE;        /* malloc or realloc failed */
+            return NULL;        /* malloc or realloc failed */        res->tupArrSize = newSize;        res->tuples =
newTuples;   }    res->tuples[res->ntups] = tup;    res->ntups++;
 
-    return TRUE;
+    return tup;}/*
@@ -2822,6 +2859,43 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
+/* PQgetAsCString
+ *    returns the field as C string.
+ */
+char *
+PQgetAsCstring(PGresAttValue *attval)
+{
+    return attval->len == NULL_LEN ? NULL : attval->value;
+}
+
+/* PQgetAddTupleParam
+ *    Get the pointer to the contextual parameter from PGresult which is
+ *    registered to PGconn by PQregisterTupleAdder
+ */
+void *
+PQgetAddTupleParam(const PGresult *res)
+{
+    if (!res)
+        return NULL;
+    return res->addTupleFuncParam;
+}
+
+/* PQsetAddTupleErrMes
+ *    Set the error message pass back to the caller of addTupleFunc
+ *  mes must be a malloc'ed memory block and it is released by the
+ *  caller of addTupleFunc if set.
+ *  You can replace the previous message by alternative mes, or clear
+ *  it with NULL.
+ */
+void
+PQsetAddTupleErrMes(PGresult *res, char *mes)
+{
+    /* Free existing message */
+    if (res->addTupleFuncErrMes)
+        free(res->addTupleFuncErrMes);
+    res->addTupleFuncErrMes = mes;
+}
+/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 77c4d5a..c7f74ae 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));        /*
@@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto addTupleError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary)                vlen = 0;            if (tup[i].value ==
NULL)           {
 
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
+                AddTupFunc func =
+                    (binary ? ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
+                tup[i].value =
+                    (char *) result->addTupleFunc(result, func, i, vlen + 1);                if (tup[i].value ==
NULL)
-                    goto outOfMemory;
+                    goto addTupleError;            }            tup[i].len = vlen;            /* read in the value */
@@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary)    }    /* Success!  Store the completed tuple in the
result*/
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+    /* and reset for a new message */    conn->curTuple = NULL;
@@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary)        free(bitmap);    return 0;
-outOfMemory:
+addTupleError:    /* Replace partially constructed result with an error result */    /*
@@ -829,8 +834,21 @@ outOfMemory:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->addTupleFuncErrMes ?
+                                       result->addTupleFuncErrMes :
+                                       "out of memory for query result\n"));
+    if (result->addTupleFuncErrMes)
+    {
+        free(result->addTupleFuncErrMes);
+        result->addTupleFuncErrMes = NULL;
+    }    /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 45a84d8..d14b57a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));    }    tup =
conn->curTuple;
@@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength)            vlen = 0;        if (tup[i].value ==
NULL)       {
 
-            bool        isbinary = (result->attDescs[i].format != 0);
-
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
+            AddTupFunc func = (result->attDescs[i].format != 0 ?
+                               ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
+            tup[i].value =
+                (char *) result->addTupleFunc(result, func, i, vlen + 1);            if (tup[i].value == NULL)
-                goto outOfMemory;
+                goto addTupleError;        }        tup[i].len = vlen;        /* read in the value */
@@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength)    }    /* Success!  Store the completed tuple in
theresult */
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+    /* and reset for a new message */    conn->curTuple = NULL;    return 0;
-outOfMemory:
+addTupleError:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->addTupleFuncErrMes ?
+                                       result->addTupleFuncErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->addTupleFuncErrMes)
+    {
+        free(result->addTupleFuncErrMes);
+        result->addTupleFuncErrMes = NULL;
+    }    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index d13a5b9..bdce294 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,6 +116,16 @@ typedef enum    PQPING_NO_ATTEMPT            /* connection not attempted (bad params) */} PGPing;
+/* AddTupFunc is one of the parameters of addTupleFunc that decides
+ * the function of the addTupleFunction. See addTupleFunction for
+ * details */
+typedef enum 
+{
+    ADDTUP_ALLOC_TEXT,          /* Returns non-aligned memory for text value */
+    ADDTUP_ALLOC_BINARY,        /* Returns aligned memory for binary value */
+    ADDTUP_ADD_TUPLE            /* Adds tuple data into tuple storage */
+} AddTupFunc;
+/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to
applications.*/
 
@@ -225,6 +235,12 @@ typedef struct pgresAttDesc    int            atttypmod;        /* type-specific modifier info */}
PGresAttDesc;
+typedef struct pgresAttValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, plus terminating zero byte */
+} PGresAttValue;
+/* ---------------- * Exported functions of libpq * ----------------
@@ -416,6 +432,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for tuple storage function.
+ *
+ * This function pointer is used for tuple storage function in
+ * PGresult and PGconn.
+ *
+ * addTupleFunction is called for four types of function designated by
+ * the enum AddTupFunc.
+ *
+ * id is the identifier for allocated memory block. The caller sets -1
+ * for PGresAttValue array, and 0 to number of cols - 1 for each
+ * column.
+ *
+ * ADDTUP_ALLOC_TEXT requests the size bytes memory block for a text
+ * value which may not be alingned to the word boundary.
+ *
+ * ADDTUP_ALLOC_BINARY requests the size bytes memory block for a
+ * binary value which is aligned to the word boundary.
+ *
+ * ADDTUP_ADD_TUPLE requests to add tuple data into storage, and
+ * free the memory blocks allocated by this function if necessary.
+ * id and size are ignored.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetAddTupleErrMes in malloc'ed memory. Assumed by caller as out
+ * of memory if the error message is NULL on failure. This function is
+ * assumed not to throw any exception.
+ */
+    typedef void *(*addTupleFunction)(PGresult *res, AddTupFunc func,
+                                      int id, size_t size);
+
+/*
+ * Register alternative tuple storage function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own tuple
+ * storage and calls it to append rows one by one.
+ *
+ * func is tuple store function. See addTupleFunction.
+ * 
+ * addTupFuncParam is contextual storage that can be get with
+ * PQgetAddTupleParam in func.
+ */
+extern void PQregisterTupleAdder(PGconn *conn, addTupleFunction func,
+                                 void *addTupFuncParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +516,9 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern char *PQgetAsCstring(PGresAttValue *attdesc);
+extern void *PQgetAddTupleParam(const PGresult *res);
+extern void    PQsetAddTupleErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern Oid
PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 64dfcb2..45e4c93 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN        (-1)    /* pg_result len for NULL value */
-typedef struct pgresAttValue
-{
-    int            len;            /* length in bytes of the value */
-    char       *value;            /* actual value, plus terminating zero byte */
-} PGresAttValue;
-/* Typedef for message-field list entries */typedef struct pgMessageField{
@@ -209,6 +203,11 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    addTupleFunction addTupleFunc; /* Tuple storage function. See
+                                    * addTupleFunction for details. */
+    void *addTupleFuncParam;       /* Contextual parameter for addTupleFunc */
+    char *addTupleFuncErrMes;      /* Error message returned from addTupFunc */};/* PGAsyncStatusType defines the
stateof the query-execution state machine */
 
@@ -443,6 +442,13 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /* Tuple store function. The two fields below is copied to newly
+     * created PGresult if addTupleFunc is not NULL. Use default
+     * function if addTupleFunc is NULL. */
+    addTupleFunction addTupleFunc; /* Tuple storage function. See
+                                    * addTupleFunction for details. */
+    void *addTupleFuncParam;       /* Contextual parameter for addTupFunc */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
-extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code,
               const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, 
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 62c810a..fb2e10e 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    AttInMetadata *attinmeta;
+    MemoryContext oldcontext;
+    char *attrvalbuf;
+    void **valbuf;
+    size_t *valbufsize;
+    bool error_occurred;
+    bool nummismatch;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *addTuple(PGresult *res, AddTupFunc func, int id, size_t size);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,30 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterTupleAdder(conn, addTuple, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* This is only for backward compatibility */
+        if (storeinfo.nummismatch)
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));        dblink_res_error(conname, res, "could not
fetchfrom cursor", fail);        return (Datum) 0;    }
 
@@ -580,7 +612,6 @@ dblink_fetch(PG_FUNCTION_ARGS)                 errmsg("cursor \"%s\" does not exist", curname)));
}
 
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +671,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +747,206 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterTupleAdder(conn, addTuple, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* This is only for backward compatibility */
+            if (storeinfo.nummismatch)
+            {
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
-
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+    sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbufsize[i] = 0;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    /* Preallocate memory of same size with PGresAttDesc array for values. */
+    sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
-            is_sql_cmd = false;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        if (sinfo->valbuf[i])
+        {
+            free(sinfo->valbuf[i]);
+            sinfo->valbuf[i] = NULL;        }
+    }
+    if (sinfo->attrvalbuf)
+        free(sinfo->attrvalbuf);
+    sinfo->attrvalbuf = NULL;
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+static void *
+addTuple(PGresult *res, AddTupFunc  func, int id, size_t size)
+{
+    storeInfo *sinfo = (storeInfo *)PQgetAddTupleParam(res);
+    HeapTuple    tuple;
+    int fields = PQnfields(res);
+    int i;
+    PGresAttValue *attval;
+    char        **cstrs;
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->error_occurred)
+        return NULL;
-            values = (char **) palloc(nfields * sizeof(char *));
+    switch (func)
+    {
+        case ADDTUP_ALLOC_TEXT:
+        case ADDTUP_ALLOC_BINARY:
+            if (id == -1)
+                return sinfo->attrvalbuf;
+
+            if (id < 0 || id >= sinfo->nattrs)
+                return NULL;
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
+            if (sinfo->valbufsize[id] < size)            {
-                HeapTuple    tuple;
+                if (sinfo->valbuf[id] == NULL)
+                    sinfo->valbuf[id] = malloc(size);
+                else
+                    sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+                sinfo->valbufsize[id] = size;
+            }
+            return sinfo->valbuf[id];
-                if (!is_sql_cmd)
-                {
-                    int            i;
+        case ADDTUP_ADD_TUPLE:
+            break;   /* Go through */
+        default:
+            /* Ignore */
+            break;
+    }
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+        PQsetAddTupleErrMes(res,
+                            strdup("function returning record called in "
+                                   "context that cannot accept type record"));
+        return NULL;
+    }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+    /*
+     * Rewrite PGresAttDesc[] to char(*)[] in-place.
+     */
+    Assert(sizeof(char*) <= sizeof(PGresAttValue));
+    attval = (PGresAttValue *)sinfo->attrvalbuf;
+    cstrs   = (char **)sinfo->attrvalbuf;
+    for(i = 0 ; i < fields ; i++)
+        cstrs[i] = PQgetAsCstring(attval++);
-        PQclear(res);
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        /*
+         * Return the error message in the exception to the caller and
+         * cancel the exception.
+         */
+        ErrorData *edata;
+
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+
+        finishStoreInfo(sinfo);
+
+        edata = CopyErrorData();
+        FlushErrorState();
+
+        PQsetAddTupleErrMes(res, strdup(edata->message));
+        return NULL;    }    PG_END_TRY();
+
+    return sinfo->attrvalbuf;}/*

pgsql-hackers by date:

Previous
From: Kyotaro HORIGUCHI
Date:
Subject: Re: Allow substitute allocators for PGresult.
Next
From: Greg Smith
Date:
Subject: Re: [PATCH] optional cleaning queries stored in pg_stat_statements