Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Speed dblink using alternate libpq tuple storage
Date
Msg-id 20120130.180657.220412574.horiguchi.kyotaro@oss.ntt.co.jp
Whole thread Raw
In response to Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
Responses Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
List pgsql-hackers
Thank you for comments, this is revised version of the patch.

The gain of performance is more than expected. Measure script now
does query via dblink ten times for stability of measuring, so
the figures become about ten times longer than the previous ones.
                      sec    % to Original
Original             : 31.5     100.0%
RowProcessor patch   : 31.3      99.4%
dblink patch         : 24.6      78.1%

RowProcessor patch alone makes no loss or very-little gain, and
full patch gives us 22% gain for the benchmark(*1).


The modifications are listed below.


- No more use of PGresAttValue for this mechanism, and added PGrowValue instead. PGresAttValue has been put back to
libpq-int.h

- pqAddTuple() is restored as original and new function paAddRow() to use as RowProcessor. (Previous pqAddTuple
implementhad been buggily mixed the two usage of PGresAttValue)
 

- PQgetRowProcessorParam has been dropped. Contextual parameter is passed as one of the parameters of RowProcessor().

- RowProcessor() returns int (as bool, is that libpq convension?) instead of void *. (Actually, void * had already
becomeuseless as of previous patch)
 

- PQsetRowProcessorErrMes() is changed to do strdup internally.

- The callers of RowProcessor() no more set null_field to PGrowValue.value. Plus, the PGrowValue[] which RowProcessor()
receiveshas nfields + 1 elements to be able to make rough estimate by cols->value[nfields].value - cols->value[0].value
-something.  The somthing here is 4 * nfields for protocol3 and 4 * (non-null fields) for protocol2. I fear that this
appliesonly for textual transfer usage...
 

- PQregisterRowProcessor() sets the default handler when given NULL. (pg_conn|pg_result).rowProcessor cannot be NULL
forits lifetime.
 

- initStoreInfo() and storeHandler() has been provided with malloc error handling.


And more..

- getAnotherTuple()@fe-protocol2.c is not tested utterly.

- The uniformity of the size of columns in the test data prevents realloc from execution in dblink... More test should
bedone.
 

regards,

=====
(*1) The benchmark is done as follows,

==test.sql
select dblink_connect('c', 'host=localhost dbname=test');
select * from dblink('c', 'select a,c from foo limit 2000000') as (a text b bytea) limit 1;
...(repeat 9 times more)
select dblink_disconnect('c');
==

$ for i in $(seq 1 10); do time psql test -f t.sql; done

The environment is CentOS 6.2 on VirtualBox on Core i7 965 3.2GHz # of processor  1 Allocated mem   2GB 
Test DB schema is  Column | Type  | Modifiers  --------+-------+-----------  a      | text  |   b      | text  |   c
 | bytea |  Indexes:     "foo_a_bt" btree (a)     "foo_c_bt" btree (c)
 

test=# select count(*),              min(length(a)) as a_min, max(length(a)) as a_max,              min(length(c)) as
c_min,max(length(c)) as c_max from foo;
 
 count  | a_min | a_max | c_min | c_max 
---------+-------+-------+-------+-------2000000 |    29 |    29 |    29 |    29
(1 row)

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..5ed083c 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,5 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQregisterRowProcessor      161
+PQsetRowProcessorErrMes      162
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..4fe2f41 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,8 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
+    conn->rowProcessor = pqAddRow;
+    conn->rowProcessorParam = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5078,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
+void
+PQregisterRowProcessor(PGconn *conn, RowProcessor func, void *param)
+{
+    conn->rowProcessor = (func ? func : pqAddRow);
+    conn->rowProcessorParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..82914fd 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
+static int    pqAddTuple(PGresult *res, PGresAttValue *tup);/* ----------------
@@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->rowProcessor = pqAddRow;
+    result->rowProcessorParam = NULL;
+    result->rowProcessorErrMes = NULL;    if (conn)    {
@@ -194,6 +198,10 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
+
+        /* copy row processor settings */
+        result->rowProcessor = conn->rowProcessor;
+        result->rowProcessorParam = conn->rowProcessorParam;    }    else    {
@@ -701,7 +709,6 @@ pqClearAsyncResult(PGconn *conn)    if (conn->result)        PQclear(conn->result);    conn->result
=NULL;
 
-    conn->curTuple = NULL;}/*
@@ -756,7 +763,6 @@ pqPrepareAsyncResult(PGconn *conn)     */    res = conn->result;    conn->result = NULL;        /*
handingover ownership to caller */
 
-    conn->curTuple = NULL;        /* just in case */    if (!res)        res = PQmakeEmptyPGresult(conn,
PGRES_FATAL_ERROR);   else
 
@@ -828,9 +834,52 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/*
+ * pqAddRow
+ *      add a row to the PGresult structure, growing it if necessary
+ *      Returns TRUE if OK, FALSE if not enough memory to add the row.
+ */
+int
+pqAddRow(PGresult *res, void *param, PGrowValue *columns)
+{
+    PGresAttValue *tup;
+    int nfields = res->numAttributes;
+    int i;
+
+    tup = (PGresAttValue *)
+        pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+    if (tup == NULL) return FALSE;
+
+    memcpy(tup, columns, nfields * sizeof(PGresAttValue));
+
+    for (i = 0 ; i < nfields ; i++)
+    {
+        tup[i].len = columns[i].len;
+        if (tup[i].len == NULL_LEN)
+        {
+            tup[i].value = res->null_field;
+        }
+        else
+        {
+            bool isbinary = (res->attDescs[i].format != 0);
+            tup[i].value =
+                (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+            if (tup[i].value == NULL)
+                return FALSE;
+
+            memcpy(tup[i].value, columns[i].value, tup[i].len);
+            /* We have to terminate this ourselves */
+            tup[i].value[tup[i].len] = '\0';
+        }
+    }
+
+    return pqAddTuple(res, tup);
+}
+
+/* * pqAddTuple
- *      add a row pointer to the PGresult structure, growing it if necessary
- *      Returns TRUE if OK, FALSE if not enough memory to add the row
+ *      add a row POINTER to the PGresult structure, growing it if
+ *      necessary Returns TRUE if OK, FALSE if not enough memory to add
+ *      the row. */intpqAddTuple(PGresult *res, PGresAttValue *tup)
@@ -1223,7 +1272,6 @@ PQsendQueryStart(PGconn *conn)    /* initialize async result-accumulation state */
conn->result= NULL;
 
-    conn->curTuple = NULL;    /* ready to send command message */    return true;
@@ -2822,6 +2870,30 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
+/* PQsetRowProcessorErrMes
+ *    Set the error message pass back to the caller of RowProcessor.
+ *
+ *  You can replace the previous message by alternative mes, or clear
+ *  it with NULL.
+ */
+void
+PQsetRowProcessorErrMes(PGresult *res, char *mes)
+{
+    /* Free existing message */
+    if (res->rowProcessorErrMes)
+        free(res->rowProcessorErrMes);
+
+    /*
+     * Set the duped message if mes is not NULL. Failure of strdup
+     * will be handled as 'Out of memory' by the caller of the
+     * RowProcessor.
+     */
+    if (mes)
+        res->rowProcessorErrMes = strdup(mes);
+    else
+        res->rowProcessorErrMes = NULL;
+}
+/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..d11cb3c 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/*
+ * pqGetnchar:
+ *    skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+    if (len > (size_t) (conn->inEnd - conn->inCursor))
+        return EOF;
+
+    conn->inCursor += len;
+
+    if (conn->Pfdebug)
+        fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+                (unsigned long) len);
+
+    return 0;
+}
+
+/* * pqPutnchar: *    write exactly len bytes to the current message */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..496c42e 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -715,7 +715,7 @@ getAnotherTuple(PGconn *conn, bool binary){    PGresult   *result = conn->result;    int
nfields= result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  rowval[result->numAttributes + 1];    /* the backend sends us a bitmap of which attributes are null */
  char        std_bitmap[64]; /* used unless it doesn't fit */
 
@@ -729,26 +729,11 @@ getAnotherTuple(PGconn *conn, bool binary)    result->binary = binary;
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
+    if (binary)    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-        /*
-         * If it's binary, fix the column format indicators.  We assume the
-         * backend will consistently send either B or D, not a mix.
-         */
-        if (binary)
-        {
-            for (i = 0; i < nfields; i++)
-                result->attDescs[i].format = 1;
-        }
+        for (i = 0; i < nfields; i++)
+            result->attDescs[i].format = 1;    }
-    tup = conn->curTuple;    /* Get the null-value bitmap */    nbytes = (nfields + BITS_PER_BYTE - 1) /
BITS_PER_BYTE;
@@ -757,7 +742,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto rowProcessError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -771,34 +756,31 @@ getAnotherTuple(PGconn *conn, bool binary)    for (i = 0; i < nfields; i++)    {        if
(!(bmap& 0200))
 
-        {
-            /* if the field value is absent, make it a null string */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-        }
+            vlen = NULL_LEN;
+        else if (pqGetInt(&vlen, 4, conn))
+                goto EOFexit;        else        {
-            /* get the value length (the first four bytes are for length) */
-            if (pqGetInt(&vlen, 4, conn))
-                goto EOFexit;            if (!binary)                vlen = vlen - 4;            if (vlen < 0)
      vlen = 0;
 
-            if (tup[i].value == NULL)
-            {
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                if (tup[i].value == NULL)
-                    goto outOfMemory;
-            }
-            tup[i].len = vlen;
-            /* read in the value */
-            if (vlen > 0)
-                if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                    goto EOFexit;
-            /* we have to terminate this ourselves */
-            tup[i].value[vlen] = '\0';        }
+
+        /*
+         * Buffer content may be shifted on reloading additional
+         * data. So we must set all pointers on every scan.
+         *
+         * rowval[i].value always points to the next address of the
+         * length field even if the value length is zero or the value
+         * is NULL for the access safety.
+         */
+        rowval[i].value = conn->inBuffer + conn->inCursor;
+        rowval[i].len = vlen;
+        /* Skip the value */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            goto EOFexit;
+        /* advance the bitmap stuff */        bitcnt++;        if (bitcnt == BITS_PER_BYTE)
@@ -811,17 +793,33 @@ getAnotherTuple(PGconn *conn, bool binary)            bmap <<= 1;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    /*
+     * Set rowval[nfields] for the access safety. We can estimate the
+     * length of the buffer to store by
+     *
+     *    rowval[nfields].value - rowval[0].value - 4 * (# of non-nulls)).
+     */
+    rowval[nfields].value = conn->inBuffer + conn->inCursor;
+    rowval[nfields].len = NULL_LEN;
+
+    /* Success!  Pass the completed row values to rowProcessor */
+    if (!result->rowProcessor(result, result->rowProcessorParam, rowval))
+        goto rowProcessError;
+
+    /* Free garbage message. */
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    if (bitmap != std_bitmap)        free(bitmap);
+    return 0;
-outOfMemory:
+rowProcessError:
+        /* Replace partially constructed result with an error result */    /*
@@ -829,8 +827,21 @@ outOfMemory:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from RowProcessor, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->rowProcessorErrMes ?
+                                       result->rowProcessorErrMes :
+                                       "out of memory for query result\n"));
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..b7c6118 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -625,22 +625,12 @@ getAnotherTuple(PGconn *conn, int msgLength){    PGresult   *result = conn->result;    int
   nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  rowval[result->numAttributes + 1];    int            tupnfields;        /* # fields from tuple */
int           vlen;            /* length of the current field value */    int            i;    /* Allocate tuple space
iffirst time for this data message */
 
-    if (conn->curTuple == NULL)
-    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-    }
-    tup = conn->curTuple;
-    /* Get the field count and make sure it's what we expect */    if (pqGetInt(&tupnfields, 2, conn))        return
EOF;
@@ -663,48 +653,70 @@ getAnotherTuple(PGconn *conn, int msgLength)        if (pqGetInt(&vlen, 4, conn))
returnEOF;        if (vlen == -1)
 
-        {
-            /* null field */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-            continue;
-        }
-        if (vlen < 0)
+            vlen = NULL_LEN;
+        else if (vlen < 0)            vlen = 0;
-        if (tup[i].value == NULL)
-        {
-            bool        isbinary = (result->attDescs[i].format != 0);
+        
+        /*
+         * Buffer content may be shifted on reloading additional
+         * data. So we must set all pointers on every scan.
+         * 
+         * rowval[i].value always points to the next address of the
+         * length field even if the value length is zero or the value
+         * is NULL for the access safety.
+         */
+        rowval[i].value = conn->inBuffer + conn->inCursor;
+         rowval[i].len = vlen;
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-            if (tup[i].value == NULL)
-                goto outOfMemory;
-        }
-        tup[i].len = vlen;
-        /* read in the value */
-        if (vlen > 0)
-            if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                return EOF;
-        /* we have to terminate this ourselves */
-        tup[i].value[vlen] = '\0';
+        /* Skip to the next length field */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            return EOF;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    /*
+     * Set rowval[nfields] for the access safety. We can estimate the
+     * length of the buffer to store by
+     *
+     *    rowval[nfields].value - rowval[0].value - 4 * nfields.
+     */
+    rowval[nfields].value = conn->inBuffer + conn->inCursor;
+    rowval[nfields].len = NULL_LEN;
+
+    /* Success!  Pass the completed row values to rowProcessor */
+    if (!result->rowProcessor(result, result->rowProcessorParam, rowval))
+        goto rowProcessError;
+    
+    /* Free garbage error message. */
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    return 0;
-outOfMemory:
+rowProcessError:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->rowProcessorErrMes ?
+                                       result->rowProcessorErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..27ef007 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,16 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGrowValue represents a value of one tuple field in string form,
+   used by RowProcessor. NULL is represented as len < 0. Otherwise
+   value points to a string without null termination of the length of
+   len. */
+typedef struct pgRowValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, without null termination */
+} PGrowValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -416,6 +426,32 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative row processor.
+ *
+ * This function must return 1 for success and must return 0 for
+ * failure and may set error message by PQsetRowProcessorErrMes.  It
+ * is assumed by caller as out of memory when the error message is not
+ * set on failure. This function is assumed not to throw any
+ * exception.
+ */
+    typedef int (*RowProcessor)(PGresult *res, void *param,
+                                PGrowValue *columns);
+    
+/*
+ * Register alternative result store function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ * 
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+extern void PQregisterRowProcessor(PGconn *conn, RowProcessor func,
+                                   void *rowProcessorParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +490,7 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern void    PQsetRowProcessorErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern
Oid   PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..06d8b26 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -209,6 +209,11 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    RowProcessor rowProcessor;  /* Result row processor handler. See
+                                 * RowProcessor for details. */
+    void *rowProcessorParam;    /* Contextual parameter for rowProcessor */
+    char *rowProcessorErrMes;   /* Error message from rowProcessor */};/* PGAsyncStatusType defines the state of the
query-executionstate machine */
 
@@ -398,7 +403,6 @@ struct pg_conn    /* Status for asynchronous result construction */    PGresult   *result;
 /* result being constructed */
 
-    PGresAttValue *curTuple;    /* tuple currently being read */#ifdef USE_SSL    bool        allow_ssl_try;    /*
Allowedto try SSL negotiation */
 
@@ -443,6 +447,13 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /* Tuple store handler. The two fields below is copied to newly
+     * created PGresult if rowProcessor is not NULL. Use default
+     * function if NULL. */
+    RowProcessor rowProcessor;   /* Result row processor. See
+                                  * RowProcessor for details. */
+    void *rowProcessorParam;     /* Contextual parameter for rowProcessor */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -507,7 +518,7 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
-extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);
+extern int    pqAddRow(PGresult *res, void *param, PGrowValue *columns);extern void pqSaveMessageField(PGresult *res,
charcode,                   const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name,
 
@@ -560,6 +571,7 @@ extern int    pqGets(PQExpBuffer buf, PGconn *conn);extern int    pqGets_append(PQExpBuffer buf,
PGconn*conn);extern int    pqPuts(const char *s, PGconn *conn);extern int    pqGetnchar(char *s, size_t len, PGconn
*conn);
+extern int    pqSkipnchar(size_t len, PGconn *conn);extern int    pqPutnchar(const char *s, size_t len, PGconn
*conn);externint    pqGetInt(int *result, size_t bytes, PGconn *conn);extern int    pqPutInt(int value, size_t bytes,
PGconn*conn); 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..5417df1 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,199 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-alterrowprocessor">
+  <title>Alternative row processor</title>
+
+  <indexterm zone="libpq-alterrowprocessor">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, users can get the result of command
+   execution from <structname>PGresult</structname> aquired
+   with <function>PGgetResult</function>
+   from <structname>PGConn</structname>. While the memory areas for
+   the PGresult are allocated with malloc() internally within calls of
+   command execution functions such as <function>PQexec</function>
+   and <function>PQgetResult</function>. If you have difficulties to
+   handle the result records in the form of PGresult, you can instruct
+   PGconn to pass every row to your own row processor instead of
+   storing into PGresult.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-registerrowprocessor">
+    <term>
+     <function>PQregisterRowProcessor</function>
+     <indexterm>
+      <primary>PQregisterRowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to process each row.
+<synopsis>
+void PQregisterRowProcessor(PGconn *conn,
+                            RowProcessor func,
+                            void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the storage handler
+           function. PGresult created from this connection calls this
+           function to process each row.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default processor.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-rowprocessor">
+    <term>
+     <type>RowProcessor</type>
+     <indexterm>
+      <primary>RowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the row processor callback function.
+<synopsis>
+bool (*RowProcessor)(PGresult   *res,
+                     void       *param,
+                     PGrowValue *columns);
+
+typedef struct
+{
+    int         len;            /* length in bytes of the value */
+    char       *value;          /* actual value, without null termination */
+} PGrowValue;
+
+</synopsis>
+     </para>
+
+     <para>
+       This function must return TRUE for success, and FALSE for
+       failure. On failure this function should set the error message
+       with <function>PGsetRowProcessorErrMes</function> if the cause
+       is other than out of memory. This funcion must not throw any
+       exception.
+     </para>
+     <variablelist>
+       <varlistentry>
+
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         A pointer to contextual parameter which is registered
+         by <function>PQregisterRowProcessor</function>.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>columns</parameter></term>
+     <listitem>
+       <para>
+         Column values of the row to process.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessorerrmes">
+    <term>
+     <function>PQsetRowProcessorErrMes</function>
+     <indexterm>
+      <primary>PQsetRowProcessorErrMes</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred
+    in <type>RowProcessor</type>.  If this message is not set, the
+    caller assumes the error to be out of memory.
+<synopsis>
+void PQsetRowProcessorErrMes(PGresult *res, char *mes)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        passed to <type>RowProcessor</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>mes</parameter></term>
+        <listitem>
+          <para>
+        Error message. This will be copied internally so there is
+        no need to care of the scope.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message previously
+        set, it will be overritten. Set NULL to cancel the the costom
+        message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..e6edcd5 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char** valbuf;
+    int *valbuflen;
+    bool error_occurred;
+    bool nummismatch;
+    ErrorData *edata;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,36 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        else if (storeinfo.edata)
+            ReThrowError(storeinfo.edata);
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +617,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +678,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +754,214 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            else if (storeinfo.edata)
+                ReThrowError(storeinfo.edata);
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    {
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->edata = NULL;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+
+    /* Preallocate memory of same size with c string array for values. */
+    sinfo->valbuf = (char **) malloc(sinfo->nattrs * sizeof(char*));
+    sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+    if (sinfo->valbuf == NULL || sinfo->valbuflen == NULL)
+        ereport(ERROR,
+                (errcode(ERRCODE_OUT_OF_MEMORY),
+                 errmsg("out of memory")));
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    for (i = 0 ; i < sinfo->nattrs ; i++)    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbuflen[i] = -1;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            is_sql_cmd = false;
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    if (sinfo->valbuf)
+    {
+        for (i = 0 ; i < sinfo->nattrs ; i++)
+        {
+            if (sinfo->valbuf[i])            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
+                free(sinfo->valbuf[i]);
+                sinfo->valbuf[i] = NULL;            }
-
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);        }
+        free(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
-
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
-
-            values = (char **) palloc(nfields * sizeof(char *));
+    if (sinfo->valbuflen)
+    {
+        free(sinfo->valbuflen);
+        sinfo->valbuflen = NULL;
+    }
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+static int
+storeHandler(PGresult *res, void *param, PGrowValue *columns)
+{
+    storeInfo *sinfo = (storeInfo *)param;
+    HeapTuple  tuple;
+    int        fields = PQnfields(res);
+    int        i;
+    char      *cstrs[PQnfields(res)];
-                if (!is_sql_cmd)
-                {
-                    int            i;
+    if (sinfo->error_occurred)
+        return FALSE;
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
+
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        return FALSE;
+    }
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
+    /*
+     * value input functions assumes that the input string is
+     * terminated by zero. We should make the values to be so.
+     */
+    for(i = 0 ; i < fields ; i++)
+    {
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else
+        {
+            if (sinfo->valbuf[i] == NULL)
+            {
+                sinfo->valbuf[i] = (char *)malloc(len + 1);
+                sinfo->valbuflen[i] = len + 1;
+            }
+            else if (sinfo->valbuflen[i] < len + 1)
+            {
+                sinfo->valbuf[i] = (char *)realloc(sinfo->valbuf[i], len + 1);
+                sinfo->valbuflen[i] = len + 1;            }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
+            if (sinfo->valbuf[i] == NULL)
+                ereport(ERROR,
+                        (errcode(ERRCODE_OUT_OF_MEMORY),
+                         errmsg("out of memory")));
+
+            cstrs[i] = sinfo->valbuf[i];
+            memcpy(cstrs[i], columns[i].value, len);
+            cstrs[i][len] = '\0';        }
+    }
-        PQclear(res);
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        MemoryContext context;
+        /*
+         * Store exception for later ReThrow and cancel the exception.
+         */
+        sinfo->error_occurred = TRUE;
+        context = MemoryContextSwitchTo(sinfo->oldcontext);
+        sinfo->edata = CopyErrorData();
+        MemoryContextSwitchTo(context);
+        FlushErrorState();
+        return FALSE;    }    PG_END_TRY();
+
+    return TRUE;}/*

pgsql-hackers by date:

Previous
From: Hitoshi Harada
Date:
Subject: Re: Patch: Allow SQL-language functions to reference parameters by parameter name
Next
From: Simon Riggs
Date:
Subject: Re: Hot standby off of hot standby?