Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Speed dblink using alternate libpq tuple storage
Date
Msg-id 20120117.175333.162644966.horiguchi.kyotaro@oss.ntt.co.jp
Whole thread Raw
In response to Speed dblink using alternate libpq tuple storage  (Greg Smith <greg@2ndQuadrant.com>)
Responses Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
List pgsql-hackers
Hello,  This is revised and rebased version of the patch.

a. Old term `Add Tuple Function' is changed to 'Store  Handler'. The reason why not `storage' is simply length of the
symbols.

b. I couldn't find the place to settle PGgetAsCString() in. It is  removed and storeHandler()@dblink.c touches
PGresAttValue directly in this new patch. Definition of PGresAttValue stays  in lipq-fe.h and provided with comment.
 

c. Refine error handling of dblink.c. I think it preserves the  previous behavior for column number mismatch and type
conversionexception.
 

d. Document is revised.

> It jumped from 332K tuples/sec to 450K, a 35% gain, and had a
> lower memory footprint too.  Test methodology and those results
> are at
> http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php

It is a disappointment that I found that the gain had become
lower than that according to the re-measuring.

For CentOS6.2 and other conditions are the same to the previous
testing, the overall performance became hihger and the loss of
libpq patch was 1.8% and the gain of full patch had been fallen
to 5.6%. But the reduction of the memory usage was not changed.

Original             : 3.96s  100.0%
w/libpq patch        : 4.03s  101.8%
w/libpq+dblink patch : 3.74s   94.4%


The attachments are listed below.

libpq_altstore_20120117.patch - Allow alternative storage for libpql.

dblink_perf_20120117.patch - Modify dblink to use alternative storage mechanism.
libpq_altstore_doc_20120117.patch - Document for libpq_altstore. Shows in "31.19. Alternatie result storage"


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..83525e1 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQregisterStoreHandler      161
+PQgetStoreHandlerParam      163
+PQsetStoreHandlerErrMes      164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..5559f0b 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
+    conn->storeHandler = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
+void
+PQregisterStoreHandler(PGconn *conn, StoreHandler func, void *param)
+{
+    conn->storeHandler = func;
+    conn->storeHandlerParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..96e5974 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -67,6 +67,10 @@ static int PQsendDescribe(PGconn *conn, char desc_type,               const char
*desc_target);staticint    check_field_number(const PGresult *res, int field_num);
 
+static void *pqDefaultStoreHandler(PGresult *res, PQStoreFunc func,
+                                   int id, size_t len);
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup);
+/* ---------------- * Space management for PGresult.
@@ -160,6 +164,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->storeHandler = pqDefaultStoreHandler;
+    result->storeHandlerParam = NULL;
+    result->storeHandlerErrMes = NULL;    if (conn)    {
@@ -194,6 +201,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
+
+        if (conn->storeHandler)
+        {
+            result->storeHandler = conn->storeHandler;
+            result->storeHandlerParam = conn->storeHandlerParam;
+        }    }    else    {
@@ -487,6 +500,33 @@ PQresultAlloc(PGresult *res, size_t nBytes)    return pqResultAlloc(res, nBytes, TRUE);}
+void *
+pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, int id, size_t len)
+{
+    void *p;
+
+    switch (func)
+    {
+        case PQSF_ALLOC_TEXT:
+            return pqResultAlloc(res, len, TRUE);
+
+        case PQSF_ALLOC_BINARY:
+            p = pqResultAlloc(res, len, FALSE);
+
+            if (id == -1)
+                res->storeHandlerParam = p;
+
+            return p;
+
+        case PQSF_ADD_TUPLE:
+            return pqAddTuple(res, res->storeHandlerParam);
+
+        default:
+            /* Ignore */
+            break;
+    }
+    return NULL;
+}/* * pqResultAlloc - *        Allocate subsidiary storage for a PGresult.
@@ -830,9 +870,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple *      add a row
pointerto the PGresult structure, growing it if necessary
 
- *      Returns TRUE if OK, FALSE if not enough memory to add the row
+ *      Returns tup if OK, NULL if not enough memory to add the row. */
-int
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup){    if (res->ntups >= res->tupArrSize)
@@ -858,13 +898,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)            newTuples = (PGresAttValue **)
    realloc(res->tuples, newSize * sizeof(PGresAttValue *));        if (!newTuples)
 
-            return FALSE;        /* malloc or realloc failed */
+            return NULL;        /* malloc or realloc failed */        res->tupArrSize = newSize;        res->tuples =
newTuples;   }    res->tuples[res->ntups] = tup;    res->ntups++;
 
-    return TRUE;
+    return tup;}/*
@@ -2822,6 +2862,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
+/* PQgetAddStoreHandlerParam
+ *    Get the pointer to the contextual parameter from PGresult which is
+ *    registered to PGconn by PQregisterStoreHandler
+ */
+void *
+PQgetStoreHandlerParam(const PGresult *res)
+{
+    if (!res)
+        return NULL;
+    return res->storeHandlerParam;
+}
+
+/* PQsetStorHandlerErrMes
+ *    Set the error message pass back to the caller of StoreHandler.
+ *
+ *  mes must be a malloc'ed memory block and it will be released by
+ *  the caller of StoreHandler.  You can replace the previous message
+ *  by alternative mes, or clear it with NULL. The previous one will
+ *  be freed internally.
+ */
+void
+PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+{
+    /* Free existing message */
+    if (res->storeHandlerErrMes)
+        free(res->storeHandlerErrMes);
+    res->storeHandlerErrMes = mes;
+}
+/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..205502b 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));        /*
@@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto addTupleError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary)                vlen = 0;            if (tup[i].value ==
NULL)           {
 
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
+                PQStoreFunc func = 
+                    (binary ? PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+                tup[i].value =
+                    (char *) result->storeHandler(result, func, i, vlen + 1);                if (tup[i].value ==
NULL)
-                    goto outOfMemory;
+                    goto addTupleError;            }            tup[i].len = vlen;            /* read in the value */
@@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary)    }    /* Success!  Store the completed tuple in the
result*/
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+    /* and reset for a new message */    conn->curTuple = NULL;
@@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary)        free(bitmap);    return 0;
-outOfMemory:
+addTupleError:    /* Replace partially constructed result with an error result */    /*
@@ -829,8 +834,21 @@ outOfMemory:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->storeHandlerErrMes ?
+                                       result->storeHandlerErrMes :
+                                       "out of memory for query result\n"));
+    if (result->storeHandlerErrMes)
+    {
+        free(result->storeHandlerErrMes);
+        result->storeHandlerErrMes = NULL;
+    }    /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..117c38a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));    }    tup =
conn->curTuple;
@@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength)            vlen = 0;        if (tup[i].value ==
NULL)       {
 
-            bool        isbinary = (result->attDescs[i].format != 0);
-
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
+            PQStoreFunc func = (result->attDescs[i].format != 0 ?
+                                PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+            tup[i].value =
+                (char *) result->storeHandler(result, func, i, vlen + 1);            if (tup[i].value == NULL)
-                goto outOfMemory;
+                goto addTupleError;        }        tup[i].len = vlen;        /* read in the value */
@@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength)    }    /* Success!  Store the completed tuple in
theresult */
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+        /* and reset for a new message */    conn->curTuple = NULL;    return 0;
-outOfMemory:
+addTupleError:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->storeHandlerErrMes ?
+                                       result->storeHandlerErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->storeHandlerErrMes)
+    {
+        free(result->storeHandlerErrMes);
+        result->storeHandlerErrMes = NULL;
+    }    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..6d86fa0 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,6 +116,16 @@ typedef enum    PQPING_NO_ATTEMPT            /* connection not attempted (bad params) */} PGPing;
+/* PQStoreFunc is the enum for one of the parameters of storeHandler
+ * that decides what to do. See the typedef StoreHandler for
+ * details */
+typedef enum 
+{
+    PQSF_ALLOC_TEXT,          /* Requested non-aligned memory for text value */
+    PQSF_ALLOC_BINARY,        /* Requested aligned memory for binary value */
+    PQSF_ADD_TUPLE            /* Requested to add tuple data into store */
+} PQStoreFunc;
+/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to
applications.*/
 
@@ -149,6 +159,15 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGresAttValue represents a value of one tuple field in string form.
+   NULL is represented as len < 0. Otherwise value points to a null
+   terminated C string with the length of len. */
+typedef struct pgresAttValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, plus terminating zero byte */
+} PGresAttValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -416,6 +435,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative result store handler.
+ *
+ * This function pointer is used for alternative result store handler
+ * callback in PGresult and PGconn.
+ *
+ * StoreHandler is called for three functions designated by the enum
+ * PQStoreFunc.
+ *
+ * id is the identifier for allocated memory block. The caller sets -1
+ * for PGresAttValue array, and 0 to number of cols - 1 for each
+ * column.
+ *
+ * PQSF_ALLOC_TEXT requests the size bytes memory block for a text
+ * value which may not be alingned to the word boundary.
+ *
+ * PQSF_ALLOC_BINARY requests the size bytes memory block for a binary
+ * value which is aligned to the word boundary.
+ *
+ * PQSF_ADD_TUPLE requests to add tuple data into the result store,
+ * and free the memory blocks allocated by this function if necessary.
+ * id and size are to be ignored for this function.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetStoreHandlerErrMes. It is assumed by caller as out of memory
+ * when the error message is NULL on failure. This function is assumed
+ * not to throw any exception.
+ */
+typedef void *(*StoreHandler)(PGresult *res, PQStoreFunc func,
+                              int id, size_t size);
+
+/*
+ * Register alternative result store function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own result
+ * store and calls it to append rows one by one.
+ *
+ * func is tuple store function. See the typedef StoreHandler.
+ * 
+ * storeHandlerParam is the contextual variable that can be get with
+ * PQgetStoreHandlerParam in StoreHandler.
+ */
+extern void PQregisterStoreHandler(PGconn *conn, StoreHandler func,
+                                   void *storeHandlerParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +519,8 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern void *PQgetStoreHandlerParam(const PGresult *res);
+extern void    PQsetStoreHandlerErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern
Oid   PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..e28e712 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN        (-1)    /* pg_result len for NULL value */
-typedef struct pgresAttValue
-{
-    int            len;            /* length in bytes of the value */
-    char       *value;            /* actual value, plus terminating zero byte */
-} PGresAttValue;
-/* Typedef for message-field list entries */typedef struct pgMessageField{
@@ -209,6 +203,11 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    StoreHandler storeHandler;  /* Result store handler. See
+                                 * StoreHandler for details. */
+    void *storeHandlerParam;    /* Contextual parameter for storeHandler */
+    char *storeHandlerErrMes;   /* Error message from storeHandler */};/* PGAsyncStatusType defines the state of the
query-executionstate machine */
 
@@ -443,6 +442,13 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /* Tuple store handler. The two fields below is copied to newly
+     * created PGresult if tupStoreHandler is not NULL. Use default
+     * function if NULL. */
+    StoreHandler storeHandler;   /* Result store handler. See
+                                  * StoreHandler for details. */
+    void *storeHandlerParam;  /* Contextual parameter for storeHandler */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
-extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code,
               const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, 
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..a8685a9 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,24 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    AttInMetadata *attinmeta;
+    MemoryContext oldcontext;
+    char *attrvalbuf;
+    void **valbuf;
+    size_t *valbufsize;
+    bool error_occurred;
+    bool nummismatch;
+    ErrorData *edata;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        else if (storeinfo.edata)
+            ReThrowError(storeinfo.edata);
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +755,213 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            else if (storeinfo.edata)
+                ReThrowError(storeinfo.edata);
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
-
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->edata = NULL;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+    sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbufsize[i] = 0;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    /* Preallocate memory of same size with PGresAttDesc array for values. */
+    sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
-            is_sql_cmd = false;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        if (sinfo->valbuf[i])
+        {
+            free(sinfo->valbuf[i]);
+            sinfo->valbuf[i] = NULL;        }
+    }
+    if (sinfo->attrvalbuf)
+        free(sinfo->attrvalbuf);
+    sinfo->attrvalbuf = NULL;
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+static void *
+storeHandler(PGresult *res, PQStoreFunc  func, int id, size_t size)
+{
+    storeInfo *sinfo = (storeInfo *)PQgetStoreHandlerParam(res);
+    HeapTuple    tuple;
+    int fields = PQnfields(res);
+    int i;
+    PGresAttValue *attval;
+    char        **cstrs;
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->error_occurred)
+        return NULL;
+
+    switch (func)
+    {
+        case PQSF_ALLOC_TEXT:
+        case PQSF_ALLOC_BINARY:
+            if (id == -1)
+                return sinfo->attrvalbuf;
-            values = (char **) palloc(nfields * sizeof(char *));
+            if (id < 0 || id >= sinfo->nattrs)
+                return NULL;
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
+            if (sinfo->valbufsize[id] < size)            {
-                HeapTuple    tuple;
+                if (sinfo->valbuf[id] == NULL)
+                    sinfo->valbuf[id] = malloc(size);
+                else
+                    sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+                sinfo->valbufsize[id] = size;
+            }
+            return sinfo->valbuf[id];
-                if (!is_sql_cmd)
-                {
-                    int            i;
+        case PQSF_ADD_TUPLE:
+            break;   /* Go through */
+        default:
+            /* Ignore */
+            break;
+    }
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        return NULL;
+    }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+    /*
+     * Rewrite PGresAttValue[] to char(*)[] in-place.
+     */
+    Assert(sizeof(char*) <= sizeof(PGresAttValue));
-        PQclear(res);
+    attval = (PGresAttValue *)sinfo->attrvalbuf;
+    cstrs   = (char **)sinfo->attrvalbuf;
+    for(i = 0 ; i < fields ; i++)
+    {
+        if (attval->len < 0)
+            cstrs[i] = NULL;
+        else
+            cstrs[i] = attval->value;
+    }
+
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        MemoryContext context;
+        /*
+         * Store exception for later ReThrow and cancel the exception.
+         */
+        sinfo->error_occurred = TRUE;
+        context = MemoryContextSwitchTo(sinfo->oldcontext);
+        sinfo->edata = CopyErrorData();
+        MemoryContextSwitchTo(context);
+        FlushErrorState();
+
+        return NULL;    }    PG_END_TRY();
+
+    return sinfo->attrvalbuf;}/*
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..8803999 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,293 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-alterstorage">
+  <title>Alternative result storage</title>
+
+  <indexterm zone="libpq-alterstorage">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, users can get the result of command
+   execution from <structname>PGresult</structname> aquired
+   with <function>PGgetResult</function>
+   from <structname>PGConn</structname>. While the memory areas for
+   the PGresult are allocated with malloc() internally within calls of
+   command execution functions such as <function>PQexec</function>
+   and <function>PQgetResult</function>. If you have difficulties to
+   handle the result records in the form of PGresult, you can instruct
+   PGconn to store them into your own storage instead of PGresult.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-registerstorehandler">
+    <term>
+     <function>PQregisterStoreHandler</function>
+     <indexterm>
+      <primary>PQregisterStoreHandler</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to allocate memory for each tuple and
+       column values, and add the complete tuple into the alternative
+       result storage.
+<synopsis>
+void PQregisterStoreHandler(PGconn *conn,
+                            StoreHandler func,
+                            void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the storage handler
+           function. PGresult created from this connection calls this
+           function to store the result instead of storing into its
+           internal storage.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default storage.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>. You can get this poiner
+           in <type>StoreHandler</type>
+           by <function>PQgetStoreHandlerParam</function>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-storehandler">
+    <term>
+     <type>Storehandler</type>
+     <indexterm>
+      <primary>StoreHandler</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the storage handler callback function.
+<synopsis>
+typedef enum 
+{
+  PQSF_ALLOC_TEXT,
+  PQSF_ALLOC_BINARY,
+  PQSF_ADD_TUPLE
+} PQStoreFunc;
+
+void *(*StoreHandler)(PGresult *res,
+                      PQStoreFunc func,
+                      int id,
+                      size_t size);
+</synopsis>
+     </para>
+
+     <para>
+       Generally this function must return NULL for failure and should
+       set the error message
+       with <function>PGsetStoreHandlerErrMes</function> if the cause
+       is other than out of memory. This funcion must not throw any
+       exception. This function is called in the sequence following.
+
+       <itemizedlist spacing="compact">
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>PQSF_ALLOC_BINARY</firstterm>
+       and <parameter>id</parameter> = -1 to request the memory
+       for a tuple to be used as an array
+       of <type>PGresAttValue</type>. </simpara>
+     </listitem>
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>PQSF_ALLOC_TEXT</firstterm>
+       or <firstterm>PQSF_ALLOC_BINARY</firstterm>
+       and <parameter>id</parameter> is zero to the number of columns
+       - 1 to request the memory for each column value in current
+       tuple.</simpara>
+     </listitem>
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>PQSF_ADD_TUPLE</firstterm> to request the
+       constructed tuple to be stored.</simpara>
+     </listitem>
+       </itemizedlist>
+     </para>
+     <para>
+       Calling <type>StoreHandler</type>
+       with <parameter>func</parameter> =
+       <firstterm>PQSF_ALLOC_TEXT</firstterm> is telling to return a
+        memory block with at least <parameter>size</parameter> bytes
+        which may not be aligned to the word boundary.
+       <parameter>id</parameter> is a zero or positive number
+       distinguishes the usage of requested memory block, that is the
+       position of the column for which the memory block is used.
+     </para>
+     <para>
+       When <parameter>func</parameter>
+       = <firstterm>PQSF_ALLOC_BINARY</firstterm>, this function is
+       telled to return a memory block with at
+       least <parameter>size</parameter> bytes which is aligned to the
+       word boundary.
+       <parameter>id</parameter> is the identifier distinguishes the
+       usage of requested memory block. -1 means that it is used as an
+       array of <type>PGresAttValue</type> to store the tuple. Zero or
+       positive numbers have the same meanings as for
+       <firstterm>PQSF_ALLOC_BINARY</firstterm>.
+     </para>
+     <para>When <parameter>func</parameter>
+       = <firstterm>PQSF_ADD_TUPLE</firstterm>, this function is
+       telled to store the <type>PGresAttValue</type> structure
+       constructed by the caller into your storage. The pointer to the
+       tuple structure is not passed so you should memorize the
+       pointer to the memory block passed back the caller on
+       <parameter>func</parameter>
+       = <parameter>PQSF_ALLOC_BINARY</parameter>
+       with <parameter>id</parameter> is -1. This function must return
+       any non-NULL values for success. You must properly put back the
+       memory blocks passed to the caller in this function if needed.
+     </para>
+     <variablelist>
+       <varlistentry>
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+     <term><parameter>func</parameter></term>
+     <listitem>
+       <para>
+         An <type>enum</type> value telling the function to perform.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         A pointer to contextual parameter passed to func.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetstorehandlerparam">
+    <term>
+     <function>PQgetStoreHandlerParam</function>
+     <indexterm>
+      <primary>PQgetStoreHandlerParam</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Get the pointer passed to <function>PQregisterStoreHandler</function>
+    as <parameter>param</parameter>.
+<synopsis>
+void *PQgetStoreHandlerParam(PGresult *res)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetstorehandlererrmes">
+    <term>
+     <function>PQsetStoreHandlerErrMes</function>
+     <indexterm>
+      <primary>PQsetStoreHandlerErrMes</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred
+    in <type>StoreHandler</type>.  If this message is not set, the
+    caller assumes the error to be out of memory.
+<synopsis>
+void PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        passed to <type>StoreHandler</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>mes</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the memory block containing the error
+        message, which is allocated
+        by <function>malloc()</function>. The memory block
+        will be freed with <function>free()</function> in the
+        caller of
+        <type>StoreHandler</type> only if it returns NULL.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message previously
+        set, it is freed and then the given message is set. Set NULL
+        to cancel the the costom message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>

pgsql-hackers by date:

Previous
From: Alexander Korotkov
Date:
Subject: Re: Collect frequency statistics for arrays
Next
From: Noah Misch
Date:
Subject: Re: foreign key locks, 2nd attempt