Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Speed dblink using alternate libpq tuple storage
Date
Msg-id 20120307.151457.91509043.horiguchi.kyotaro@oss.ntt.co.jp
Whole thread Raw
In response to Re: Speed dblink using alternate libpq tuple storage  (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>)
Responses Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
List pgsql-hackers
Hello, this is new version of the patch.

# early_exit.diff is not included for this time and maybe also
# later.  The set of the return values of PQrowProcessor looks
# unnatural if the 0 is removed.

> * Convert old EOFs to protocol errors in V3 getAnotherTuple()

done.

> * V2 getAnotherTuple() can leak PGresult when handling custom
>   error from row processor.

Custom error message is removed from both V2 and V3.

> * remove pqIsnonblocking(conn) check when row processor returned 2.
>   I missed that it's valid to call PQisBusy/PQconsumeInput/PQgetResult
>   on sync connection.

done. This affects PQisBusy, but PQgetResult won't be affected as
far as I see. I have no idea for PQconsumeInput()..

> * It seems the return codes from callback should be remapped,
>   (0, 1, 2) is unusual pattern.  Better would be:
> 
>    -1 - error
>     0 - stop parsing / early exit ("I'm not done yet")
>     1 - OK ("I'm done with the row")

done.

This might be described more precisely as follows,

|    -1 - error - erase result and change result status to
|               - FATAL_ERROR All the rest rows in current result
|               - will skipped(consumed).
|     0 - stop parsing / early exit ("I'm not done yet")
|                - getAnotherTuple returns EOF without dropping PGresult.
#                - We expect PQisBusy(), PQconsumeInput()(?) and 
#                - PQgetResult() to exit immediately and we can
#                - call PQgetResult(), PQskipResult() or
#                - PQisBusy() after.
|     1 - OK ("I'm done with the row")
|                - save result and getAnotherTuple returns 0.

The lines prefixed with '#' is the desirable behavior I have
understood from the discussion so far. And I doubt that it works
as we expected for PQgetResult().

> * Please drop PQsetRowProcessorErrMsg() / PQresultSetErrMsg().

done.

> My suggestion - check in getAnotherTuple whether resultStatus is
> already error and do nothing then.  This allows internal pqAddRow
> to set regular "out of memory" error.  Otherwise give generic
> "row processor error".

Current implement seems already doing this in
parseInput3(). Could you give me further explanation?

regards, 

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..a6418ec 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQsetRowProcessor            161
+PQgetRowProcessor            162
+PQskipResult                163
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 27a9805..4605e49 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2693,6 +2693,9 @@ makeEmptyPGconn(void)    conn->wait_ssl_try = false;#endif
+    /* set default row processor */
+    PQsetRowProcessor(conn, NULL, NULL);
+    /*     * We try to send at least 8K at a time, which is the usual size of pipe     * buffers on Unix systems.
Thatway, when we are sending a large amount
 
@@ -2711,8 +2714,13 @@ makeEmptyPGconn(void)    initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
+    /* set up initial row buffer */
+    conn->rowBufLen = 32;
+    conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+    if (conn->inBuffer == NULL ||        conn->outBuffer == NULL ||
+        conn->rowBuf == NULL ||        PQExpBufferBroken(&conn->errorMessage) ||
PQExpBufferBroken(&conn->workBuffer))   {
 
@@ -2814,6 +2822,8 @@ freePGconn(PGconn *conn)        free(conn->inBuffer);    if (conn->outBuffer)
free(conn->outBuffer);
+    if (conn->rowBuf)
+        free(conn->rowBuf);    termPQExpBuffer(&conn->errorMessage);    termPQExpBuffer(&conn->workBuffer);
@@ -5078,3 +5088,4 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..161d210 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
+static int    pqAddRow(PGresult *res, PGrowValue *columns, void *param);/* ----------------
@@ -701,7 +702,6 @@ pqClearAsyncResult(PGconn *conn)    if (conn->result)        PQclear(conn->result);    conn->result
=NULL;
 
-    conn->curTuple = NULL;}/*
@@ -756,7 +756,6 @@ pqPrepareAsyncResult(PGconn *conn)     */    res = conn->result;    conn->result = NULL;        /*
handingover ownership to caller */
 
-    conn->curTuple = NULL;        /* just in case */    if (!res)        res = PQmakeEmptyPGresult(conn,
PGRES_FATAL_ERROR);   else
 
@@ -828,6 +827,71 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/*
+ * PQsetRowProcessor
+ *   Set function that copies column data out from network buffer.
+ */
+void
+PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+{
+    conn->rowProcessor = (func ? func : pqAddRow);
+    conn->rowProcessorParam = param;
+}
+
+/*
+ * PQgetRowProcessor
+ *   Get current row processor of conn. set pointer to current parameter for
+ *   row processor to param if not NULL.
+ */
+PQrowProcessor
+PQgetRowProcessor(PGconn *conn, void **param)
+{
+    if (param)
+        *param = conn->rowProcessorParam;
+
+    return conn->rowProcessor;
+}
+
+/*
+ * pqAddRow
+ *      add a row to the PGresult structure, growing it if necessary
+ *      Returns 1 if OK, -1 if error occurred.
+ */
+static int
+pqAddRow(PGresult *res, PGrowValue *columns, void *param)
+{
+    PGresAttValue *tup;
+    int            nfields = res->numAttributes;
+    int            i;
+
+    tup = (PGresAttValue *)
+        pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+    if (tup == NULL)
+        return -1;
+
+    for (i = 0 ; i < nfields ; i++)
+    {
+        tup[i].len = columns[i].len;
+        if (tup[i].len == NULL_LEN)
+        {
+            tup[i].value = res->null_field;
+        }
+        else
+        {
+            bool isbinary = (res->attDescs[i].format != 0);
+            tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+            if (tup[i].value == NULL)
+                return -1;
+
+            memcpy(tup[i].value, columns[i].value, tup[i].len);
+            /* We have to terminate this ourselves */
+            tup[i].value[tup[i].len] = '\0';
+        }
+    }
+
+    return (pqAddTuple(res, tup) ? 1 : -1);
+}
+
+/* * pqAddTuple *      add a row pointer to the PGresult structure, growing it if necessary *      Returns TRUE if OK,
FALSEif not enough memory to add the row
 
@@ -1223,7 +1287,6 @@ PQsendQueryStart(PGconn *conn)    /* initialize async result-accumulation state */
conn->result= NULL;
 
-    conn->curTuple = NULL;    /* ready to send command message */    return true;
@@ -1831,6 +1894,55 @@ PQexecFinish(PGconn *conn)    return lastResult;}
+
+/*
+ * Do-nothing row processor for PQskipResult
+ */
+static int
+dummyRowProcessor(PGresult *res, PGrowValue *columns, void *param)
+{
+    return 1;
+}
+
+/*
+ * Exaust remaining Data Rows in curret conn.
+ * 
+ * Exaust current result if skipAll is false and all succeeding results if
+ * true.
+ */
+int
+PQskipResult(PGconn *conn, int skipAll)
+{
+    PQrowProcessor savedRowProcessor;
+    void * savedRowProcParam;
+    PGresult *res;
+    int ret = 0;
+
+    /* save the current row processor settings and set dummy processor */
+    savedRowProcessor = PQgetRowProcessor(conn, &savedRowProcParam);
+    PQsetRowProcessor(conn, dummyRowProcessor, NULL);
+    
+    /*
+     * Throw away the remaining rows in current result, or all succeeding
+     * results if skipAll is not FALSE.
+     */
+    if (skipAll)
+    {
+        while ((res = PQgetResult(conn)) != NULL)
+            PQclear(res);
+    }
+    else if ((res = PQgetResult(conn)) != NULL)
+    {
+        PQclear(res);
+        ret = 1;
+    }
+    
+    PQsetRowProcessor(conn, savedRowProcessor, savedRowProcParam);
+
+    return ret;
+}
+
+/* * PQdescribePrepared *      Obtain information about a previously prepared statement
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..d11cb3c 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/*
+ * pqGetnchar:
+ *    skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+    if (len > (size_t) (conn->inEnd - conn->inCursor))
+        return EOF;
+
+    conn->inCursor += len;
+
+    if (conn->Pfdebug)
+        fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+                (unsigned long) len);
+
+    return 0;
+}
+
+/* * pqPutnchar: *    write exactly len bytes to the current message */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..7fcb10f 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -569,6 +569,8 @@ pqParseInput2(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, FALSE))                            return;
 
+                        /* getAnotherTuple moves inStart itself */
+                        continue;                    }                    else                    {
@@ -585,6 +587,8 @@ pqParseInput2(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, TRUE))                            return;
 
+                        /* getAnotherTuple moves inStart itself */
+                        continue;                    }                    else                    {
@@ -703,19 +707,18 @@ failure:/* * parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error
ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess
 
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, bool binary){    PGresult
*result= conn->result;    int            nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  *rowbuf;    /* the backend sends us a bitmap of which attributes are null */    char
std_bitmap[64];/* used unless it doesn't fit */
 
@@ -726,29 +729,32 @@ getAnotherTuple(PGconn *conn, bool binary)    int            bitmap_index;    /* Its index */
int           bitcnt;            /* number of bits examined in current byte */    int            vlen;            /*
lengthof the current field value */
 
+    char        *errmsg = "unknown error\n";
-    result->binary = binary;
-
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
+    /* resize row buffer if needed */
+    if (nfields > conn->rowBufLen)    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-        /*
-         * If it's binary, fix the column format indicators.  We assume the
-         * backend will consistently send either B or D, not a mix.
-         */
-        if (binary)
+        rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+        if (!rowbuf)        {
-            for (i = 0; i < nfields; i++)
-                result->attDescs[i].format = 1;
+            errmsg = "out of memory for query result\n";
+            goto error_clearresult;        }
+        conn->rowBuf = rowbuf;
+        conn->rowBufLen = nfields;
+    }
+    else
+    {
+        rowbuf = conn->rowBuf;
+    }
+
+    result->binary = binary;
+
+    if (binary)
+    {
+        for (i = 0; i < nfields; i++)
+            result->attDescs[i].format = 1;    }
-    tup = conn->curTuple;    /* Get the null-value bitmap */    nbytes = (nfields + BITS_PER_BYTE - 1) /
BITS_PER_BYTE;
@@ -757,11 +763,15 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);
if(!bitmap)
 
-            goto outOfMemory;
+        {
+            errmsg = "out of memory for query result\n";
+            goto error_clearresult;
+        }    }    if (pqGetnchar(bitmap, nbytes, conn))
-        goto EOFexit;
+        goto error_clearresult;
+    /* Scan the fields */    bitmap_index = 0;
@@ -771,34 +781,29 @@ getAnotherTuple(PGconn *conn, bool binary)    for (i = 0; i < nfields; i++)    {        if
(!(bmap& 0200))
 
-        {
-            /* if the field value is absent, make it a null string */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-        }
+            vlen = NULL_LEN;
+        else if (pqGetInt(&vlen, 4, conn))
+                goto EOFexit;        else        {
-            /* get the value length (the first four bytes are for length) */
-            if (pqGetInt(&vlen, 4, conn))
-                goto EOFexit;            if (!binary)                vlen = vlen - 4;            if (vlen < 0)
      vlen = 0;
 
-            if (tup[i].value == NULL)
-            {
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                if (tup[i].value == NULL)
-                    goto outOfMemory;
-            }
-            tup[i].len = vlen;
-            /* read in the value */
-            if (vlen > 0)
-                if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                    goto EOFexit;
-            /* we have to terminate this ourselves */
-            tup[i].value[vlen] = '\0';        }
+
+        /*
+         * rowbuf[i].value always points to the next address of the
+         * length field even if the value is NULL, to allow safe
+         * size estimates and data copy.
+         */
+        rowbuf[i].value = conn->inBuffer + conn->inCursor;
+        rowbuf[i].len = vlen;
+
+        /* Skip the value */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            goto EOFexit;
+        /* advance the bitmap stuff */        bitcnt++;        if (bitcnt == BITS_PER_BYTE)
@@ -811,33 +816,51 @@ getAnotherTuple(PGconn *conn, bool binary)            bmap <<= 1;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
-    if (bitmap != std_bitmap)        free(bitmap);
-    return 0;
+    bitmap = NULL;
+
+    /* tag the row as parsed */
+    conn->inStart = conn->inCursor;
+
+    /* Pass the completed row values to rowProcessor */
+    switch (conn->rowProcessor(result, rowbuf, conn->rowProcessorParam))
+    {
+        case 1:
+            /* everything is good */
+            return 0;
-outOfMemory:
-    /* Replace partially constructed result with an error result */
+        case 0:
+            /* processor requested early exit */
+            return EOF;
+            
+        case -1:
+            errmsg = "error in row processor";
+            goto error_clearresult;
+        default:
+            /* Illega reurn code */
+            errmsg = "invalid return value from row processor\n";
+            goto error_clearresult;
+    }
+
+error_clearresult:    /*     * we do NOT use pqSaveErrorResult() here, because of the likelihood that     * there's
notenough memory to concatenate messages...     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    printfPQExpBuffer(&conn->errorMessage, "%s", libpq_gettext(errmsg));
+        /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can     * do to recover...     */
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
 
+    conn->asyncStatus = PGASYNC_READY;
+    /* Discard the failed message --- good idea? */    conn->inStart = conn->inEnd;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..b51b04c 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -327,6 +327,9 @@ pqParseInput3(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, msgLength))                            return;
 
+
+                        /* getAnotherTuple() moves inStart itself */
+                        continue;                    }                    else if (conn->result != NULL &&
               conn->result->resultStatus == PGRES_FATAL_ERROR)
 
@@ -613,47 +616,49 @@ failure:/* * parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
- * Returns: 0 if completed message, EOF if error or not enough data yet.
+ * It fills rowbuf with column pointers and then calls row processor.
+ * Returns: 0 if completed message, 1 if error. * * Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, int msgLength){    PGresult
*result= conn->result;    int            nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  *rowbuf;    int            tupnfields;        /* # fields from tuple */    int            vlen;
   /* length of the current field value */    int            i;
 
-
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
-    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-    }
-    tup = conn->curTuple;
+    char        *errmsg = "unknown error\n";    /* Get the field count and make sure it's what we expect */    if
(pqGetInt(&tupnfields,2, conn))
 
-        return EOF;
+    {
+        /* Whole the message must be loaded on the buffer here */
+        errmsg = "protocol error\n";
+        goto error_saveresult;
+    }    if (tupnfields != nfields)    {
-        /* Replace partially constructed result with an error result */
-        printfPQExpBuffer(&conn->errorMessage,
-                 libpq_gettext("unexpected field count in \"D\" message\n"));
-        pqSaveErrorResult(conn);
-        /* Discard the failed message by pretending we read it */
-        conn->inCursor = conn->inStart + 5 + msgLength;
-        return 0;
+        errmsg = "unexpected field count in \"D\" message\n";
+        goto error_and_forward;
+    }
+
+    /* resize row buffer if needed */
+    rowbuf = conn->rowBuf;
+    if (nfields > conn->rowBufLen)
+    {
+        rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+        if (!rowbuf)
+        {
+            errmsg = "out of memory for query result\n";
+            goto error_and_forward;
+        }
+        conn->rowBuf = rowbuf;
+        conn->rowBufLen = nfields;    }    /* Scan the fields */
@@ -661,54 +666,78 @@ getAnotherTuple(PGconn *conn, int msgLength)    {        /* get the value length */        if
(pqGetInt(&vlen,4, conn))
 
-            return EOF;
-        if (vlen == -1)        {
-            /* null field */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-            continue;
+            /* Whole the message must be loaded on the buffer here */
+            errmsg = "protocol error\n";
+            goto error_saveresult;        }
-        if (vlen < 0)
+
+        if (vlen == -1)
+            vlen = NULL_LEN;
+        else if (vlen < 0)            vlen = 0;
-        if (tup[i].value == NULL)
-        {
-            bool        isbinary = (result->attDescs[i].format != 0);
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-            if (tup[i].value == NULL)
-                goto outOfMemory;
+        /*
+         * rowbuf[i].value always points to the next address of the
+         * length field even if the value is NULL, to allow safe
+         * size estimates and data copy.
+         */
+        rowbuf[i].value = conn->inBuffer + conn->inCursor;
+        rowbuf[i].len = vlen;
+
+        /* Skip to the next length field */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+        {
+            /* Whole the message must be loaded on the buffer here */
+            errmsg = "protocol error\n";
+            goto error_saveresult;        }
-        tup[i].len = vlen;
-        /* read in the value */
-        if (vlen > 0)
-            if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                return EOF;
-        /* we have to terminate this ourselves */
-        tup[i].value[vlen] = '\0';    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    /* tag the row as parsed, check if correctly */
+    conn->inStart += 5 + msgLength;
+    if (conn->inCursor != conn->inStart)
+    {
+        errmsg = "invalid row contents\n";
+        goto error_clearresult;
+    }
+
+    /* Pass the completed row values to rowProcessor */
+    switch (conn->rowProcessor(result, rowbuf, conn->rowProcessorParam))
+    {
+        case 1:
+            /* everything is good */
+            return 0;
+
+        case 0:
+            /* processor requested early exit - stop parsing without error*/
+            return EOF;
+
+        case -1:
+            errmsg = "error in row processor";
+            goto error_clearresult;
+
+        default:
+            /* Illega reurn code */
+            errmsg = "invalid return value from row processor\n";
+            goto error_clearresult;
+    }
-    return 0;
-outOfMemory:
+error_and_forward:
+    /* Discard the failed message by pretending we read it */
+    conn->inCursor = conn->inStart + 5 + msgLength;
+error_clearresult:
+    pqClearAsyncResult(conn);
+    
+error_saveresult:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */
 
-    pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    printfPQExpBuffer(&conn->errorMessage, "%s", libpq_gettext(errmsg));    pqSaveErrorResult(conn);
-
-    /* Discard the failed message by pretending we read it */
-    conn->inCursor = conn->inStart + 5 + msgLength;    return 0;}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..e1d3339 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,17 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGrowValue points a column value of in network buffer.
+ * Value is a string without null termination and length len.
+ * NULL is represented as len < 0, value points then to place
+ * where value would have been.
+ */
+typedef struct pgRowValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, without null termination */
+} PGrowValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -416,6 +427,38 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative row processor.
+ *
+ * Columns array will contain PQnfields() entries, each one pointing
+ * to particular column data in network buffer.  This function is
+ * supposed to copy data out from there and store somewhere.  NULL is
+ * signified with len<0.
+ *
+ * This function must return 1 for success and -1 for failure and the
+ * caller relreases the current PGresult for the case. Returning 0
+ * instructs libpq to exit immediately from the topmost libpq function
+ * without releasing PGresult under work.
+ */
+typedef int (*PQrowProcessor)(PGresult *res, PGrowValue *columns,
+                              void *param);
+
+/*
+ * Set alternative row data processor for PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ *
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+                                   void *rowProcessorParam);
+extern PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param);
+extern int  PQskipResult(PGconn *conn, int skipAll);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 987311e..9cabd20 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -398,7 +398,6 @@ struct pg_conn    /* Status for asynchronous result construction */    PGresult   *result;
 /* result being constructed */
 
-    PGresAttValue *curTuple;    /* tuple currently being read */#ifdef USE_SSL    bool        allow_ssl_try;    /*
Allowedto try SSL negotiation */
 
@@ -443,6 +442,14 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /*
+     * Read column data from network buffer.
+     */
+    PQrowProcessor rowProcessor;/* Function pointer */
+    void *rowProcessorParam;    /* Contextual parameter for rowProcessor */
+    PGrowValue *rowBuf;            /* Buffer for passing values to rowProcessor */
+    int rowBufLen;                /* Number of columns allocated in rowBuf */};/* PGcancel stores all data necessary
tocancel a connection. A copy of this
 
@@ -560,6 +567,7 @@ extern int    pqGets(PQExpBuffer buf, PGconn *conn);extern int    pqGets_append(PQExpBuffer buf,
PGconn*conn);extern int    pqPuts(const char *s, PGconn *conn);extern int    pqGetnchar(char *s, size_t len, PGconn
*conn);
+extern int    pqSkipnchar(size_t len, PGconn *conn);extern int    pqPutnchar(const char *s, size_t len, PGconn
*conn);externint    pqGetInt(int *result, size_t bytes, PGconn *conn);extern int    pqPutInt(int value, size_t bytes,
PGconn*conn); 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..e9233bd 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,281 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-altrowprocessor">
+  <title>Alternative row processor</title>
+
+  <indexterm zone="libpq-altrowprocessor">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, rows are stored into <type>PGresult</type>
+   until full resultset is received.  Then such completely-filled
+   <type>PGresult</type> is passed to user.  This behavior can be
+   changed by registering alternative row processor function,
+   that will see each row data as soon as it is received
+   from network.  It has the option of processing the data
+   immediately, or storing it into custom container.
+  </para>
+
+  <para>
+   Note - as row processor sees rows as they arrive, it cannot know
+   whether the SQL statement actually finishes successfully on server
+   or not.  So some care must be taken to get proper
+   transactionality.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessor">
+    <term>
+     <function>PQsetRowProcessor</function>
+     <indexterm>
+      <primary>PQsetRowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to process each row.
+<synopsis>
+void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the row processor function.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default processor.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqrowprocessor">
+    <term>
+     <type>PQrowProcessor</type>
+     <indexterm>
+      <primary>PQrowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the row processor callback function.
+<synopsis>
+int (*PQrowProcessor)(PGresult *res, PGrowValue *columns, void *param);
+
+typedef struct
+{
+    int         len;            /* length in bytes of the value, -1 if NULL */
+    char       *value;          /* actual value, without null termination */
+} PGrowValue;
+</synopsis>
+     </para>
+
+     <para>
+      The <parameter>columns</parameter> array will have PQnfields()
+      elements, each one pointing to column value in network buffer.
+      The <parameter>len</parameter> field will contain number of
+      bytes in value.  If the field value is NULL then
+      <parameter>len</parameter> will be -1 and value will point
+      to position where the value would have been in buffer.
+      This allows estimating row size by pointer arithmetic.
+     </para>
+
+     <para>
+       This function must process or copy row values away from network
+       buffer before it returns, as next row might overwrite them.
+     </para>
+
+     <para>
+       This function must return 1 for success, and -1 for failure and
+       the caller releases the PGresult under work for the
+       case. It can also return 0 for early exit
+       from <function>PQisBusy</function> function.  The
+       supplied <parameter>res</parameter>
+       and <parameter>columns</parameter> values will stay valid so
+       row can be processed outside of callback.  Caller is
+       responsible for tracking whether
+       the <parameter>PQisBusy</parameter> returned early from
+       callback or for other reasons.  Usually this should happen via
+       setting cached values to NULL before
+       calling <function>PQisBusy</function>.
+     </para>
+
+     <para>
+       The function is allowed to exit via exception (setjmp/longjmp).
+       The connection and row are guaranteed to be in valid state.
+       The connection can later be closed
+       via <function>PQfinish</function>.  Processing can also be
+       continued without closing the connection,
+       call <function>getResult</function> on synchronous mode,
+       <function>PQisBusy</function> on asynchronous connection.  Then
+       processing will continue with new row, previous row that got
+       exception will be skipped. Or you can discard all remaining
+       rows by calling <function>PQskipResult</function> without
+       closing connection.
+     </para>
+
+     <variablelist>
+       <varlistentry>
+
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>columns</parameter></term>
+     <listitem>
+       <para>
+         Column values of the row to process.  Column values
+         are located in network buffer, the processor must
+         copy them out from there.
+       </para>
+       <para>
+         Column values are not null-terminated, so processor cannot
+         use C string functions on them directly.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         Extra parameter that was given to <function>PQsetRowProcessor</function>.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqskipresult">
+    <term>
+     <function>PQskipResult</function>
+     <indexterm>
+      <primary>PQskipResult</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+        Discard all the remaining row data
+        after <function>PQexec</function>
+        or <function>PQgetResult</function> exits by the exception raised
+        in <type>RowProcessor</type> without closing connection.
+<synopsis>
+void PQskipResult(PGconn *conn, int skipAll)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><parameter>skipAll</parameter></term>
+       <listitem>
+         <para>
+           Skip remaining rows in current result
+           if <parameter>skipAll</parameter> is false(0). Skip
+           remaining rows in current result and all rows in
+           succeeding results if true(non-zero).
+         </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetrowprcessor">
+    <term>
+     <function>PQgetRowProcessor</function>
+     <indexterm>
+      <primary>PQgetRowProcessor</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+       Get row processor and its context parameter currently set to
+       the connection.
+<synopsis>
+PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+              Set the current row processor parameter of the
+              connection here if not NULL.
+         </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..09d6de8 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char** valbuf;
+    int *valbuflen;
+    char **cstrs;
+    bool error_occurred;
+    bool nummismatch;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,51 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */
 
-    res = PQexec(conn, buf.data);
+    PG_TRY();
+    {
+        res = PQexec(conn, buf.data);
+    }
+    PG_CATCH();
+    {
+        ErrorData *edata;
+
+        finishStoreInfo(&storeinfo);
+        edata = CopyErrorData();
+        FlushErrorState();
+
+        /* Skip remaining results when storeHandler raises exception. */
+        PQskipResult(conn, FALSE);
+        ReThrowError(edata);
+    }
+    PG_END_TRY();
+
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +632,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +693,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -660,6 +714,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)        {            /*
text,text,bool*/            DBLINK_GET_CONN;
 
+            conname = text_to_cstring(PG_GETARG_TEXT_PP(0));            sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
        fail = PG_GETARG_BOOL(2);        }
 
@@ -675,6 +730,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)            else            {
      DBLINK_GET_CONN;
 
+                conname = text_to_cstring(PG_GETARG_TEXT_PP(0));                sql =
text_to_cstring(PG_GETARG_TEXT_PP(1));           }        }
 
@@ -705,6 +761,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)        else            /* shouldn't
happen*/            elog(ERROR, "wrong number of arguments");
 
+
+        conname = text_to_cstring(PG_GETARG_TEXT_PP(0));    }    if (!conn)
@@ -715,164 +773,251 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */
-    if (!is_async)
-        res = PQexec(conn, sql);
-    else
+    PG_TRY();    {
-        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
+        if (!is_async)
+            res = PQexec(conn, sql);
+        else
+            res = PQgetResult(conn);    }
+    PG_CATCH();
+    {
+        ErrorData *edata;
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+        finishStoreInfo(&storeinfo);
+        edata = CopyErrorData();
+        FlushErrorState();
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+        /* Skip remaining results when storeHandler raises exception. */
+        PQskipResult(conn, FALSE);
+        ReThrowError(edata);
+    }
+    PG_END_TRY();
+
+    finishStoreInfo(&storeinfo);
+
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
+    TupleDesc    tupdesc;
+    int i;
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
-            is_sql_cmd = false;
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->valbuf = NULL;
+    sinfo->valbuflen = NULL;
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+    /* Preallocate memory of same size with c string array for values. */
+    sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
+    if (sinfo->valbuf)
+        sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+    if (sinfo->valbuflen)
+        sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*));
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
-        }
+    if (sinfo->cstrs == NULL)
+    {
+        if (sinfo->valbuf)
+            free(sinfo->valbuf);
+        if (sinfo->valbuflen)
+            free(sinfo->valbuflen);
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+        ereport(ERROR,
+                (errcode(ERRCODE_OUT_OF_MEMORY),
+                 errmsg("out of memory")));
+    }
+
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbuflen[i] = -1;
+    }
-        if (ntuples > 0)
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
+
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
+
+    if (sinfo->valbuf)
+    {
+        for (i = 0 ; i < sinfo->nattrs ; i++)        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+            if (sinfo->valbuf[i])
+                free(sinfo->valbuf[i]);
+        }
+        free(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-            values = (char **) palloc(nfields * sizeof(char *));
+    if (sinfo->valbuflen)
+    {
+        free(sinfo->valbuflen);
+        sinfo->valbuflen = NULL;
+    }
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+    if (sinfo->cstrs)
+    {
+        free(sinfo->cstrs);
+        sinfo->cstrs = NULL;
+    }
-                if (!is_sql_cmd)
-                {
-                    int            i;
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+/* Prototype of this function is PQrowProcessor */
+static int
+storeHandler(PGresult *res, PGrowValue *columns, void *param)
+{
+    storeInfo *sinfo = (storeInfo *)param;
+    HeapTuple  tuple;
+    int        fields = PQnfields(res);
+    int        i;
+    char      **cstrs = sinfo->cstrs;
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+    if (sinfo->error_occurred)
+        return -1;
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-        PQclear(res);
+        /* This error will be processed in dblink_record_internal() */
+        return -1;    }
-    PG_CATCH();
+
+    /*
+     * value input functions assumes that the input string is
+     * terminated by zero. We should make the values to be so.
+     */
+    for(i = 0 ; i < fields ; i++)    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else
+        {
+            char *tmp = sinfo->valbuf[i];
+            int tmplen = sinfo->valbuflen[i];
+
+            /*
+             * Divide calls to malloc and realloc so that things will
+             * go fine even on the systems of which realloc() does not
+             * accept NULL as old memory block.
+             *
+             * Also try to (re)allocate in bigger steps to
+             * avoid flood of allocations on weird data.
+             */
+            if (tmp == NULL)
+            {
+                tmplen = len + 1;
+                if (tmplen < 64)
+                    tmplen = 64;
+                tmp = (char *)malloc(tmplen);
+            }
+            else if (tmplen < len + 1)
+            {
+                if (len + 1 > tmplen * 2)
+                    tmplen = len + 1;
+                else
+                    tmplen = tmplen * 2;
+                tmp = (char *)realloc(tmp, tmplen);
+            }
+
+            /*
+             * sinfo->valbuf[n] will be freed in finishStoreInfo()
+             * when realloc returns NULL.
+             */
+            if (tmp == NULL)
+                return -1;  /* Inform out of memory to the caller */
+
+            sinfo->valbuf[i] = tmp;
+            sinfo->valbuflen[i] = tmplen;
+
+            cstrs[i] = sinfo->valbuf[i];
+            memcpy(cstrs[i], columns[i].value, len);
+            cstrs[i][len] = '\0';
+        }    }
-    PG_END_TRY();
+
+    /*
+     * These functions may throw exception. It will be caught in
+     * dblink_record_internal()
+     */
+    tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+    tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+    return 1;}/*

pgsql-hackers by date:

Previous
From: Pavel Stehule
Date:
Subject: Re: review: CHECK FUNCTION statement
Next
From: Fujii Masao
Date:
Subject: Re: Scaling XLog insertion (was Re: Moving more work outside WALInsertLock)