Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Speed dblink using alternate libpq tuple storage
Date
Msg-id 7720.1333467218@sss.pgh.pa.us
Whole thread Raw
In response to Re: Speed dblink using alternate libpq tuple storage  (Tom Lane <tgl@sss.pgh.pa.us>)
Responses Re: Speed dblink using alternate libpq tuple storage  (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>)
List pgsql-hackers
I've whacked the libpq part of this patch around to the point where I'm
reasonably satisfied with it (attached), and am now looking at the
dblink part.  I soon realized that there's a rather nasty issue with the
dblink patch, which is that it fundamentally doesn't work for async
operations.  In an async setup what you would do is dblink_send_query(),
then periodically poll with dblink_is_busy(), then when it says the
query is done, collect the results with dblink_get_result().  The
trouble with this is that PQisBusy will invoke the standard row
processor, so by the time dblink_get_result runs it's way too late to
switch row processors.

I thought about fixing that by installing dblink's custom row processor
permanently, but that doesn't really work because we don't know the
expected tuple column datatypes until we see the call environment for
dblink_get_result().

A hack on top of that hack would be to collect the data into a
tuplestore that contains all text columns, and then convert to the
correct rowtype during dblink_get_result, but that seems rather ugly
and not terribly high-performance.

What I'm currently thinking we should do is just use the old method
for async queries, and only optimize the synchronous case.

I thought for awhile that this might represent a generic deficiency
in the whole concept of a row processor, but probably it's mostly
down to dblink's rather bizarre API.  It would be unusual I think for
people to want a row processor that couldn't know what to do until
after the entire query result is received.

            regards, tom lane


diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 96064bbb0de8cdb0ad3bd52f35c7d845038acf1f..af4db64a06da0a6fc53b77d5e2d7cb366b384e8b 100644
*** a/doc/src/sgml/libpq.sgml
--- b/doc/src/sgml/libpq.sgml
*************** ExecStatusType PQresultStatus(const PGre
*** 2304,2309 ****
--- 2304,2319 ----
             </para>
            </listitem>
           </varlistentry>
+
+          <varlistentry id="libpq-pgres-suspended">
+           <term><literal>PGRES_SUSPENDED</literal></term>
+           <listitem>
+            <para>
+             A custom row processor requested suspension of processing of
+             a query result (see <xref linkend="libpq-row-processor">).
+            </para>
+           </listitem>
+          </varlistentry>
          </variablelist>

          If the result status is <literal>PGRES_TUPLES_OK</literal>, then
*************** defaultNoticeProcessor(void *arg, const
*** 5581,5586 ****
--- 5591,5879 ----

   </sect1>

+  <sect1 id="libpq-row-processor">
+   <title>Custom Row Processing</title>
+
+   <indexterm zone="libpq-row-processor">
+    <primary>PQrowProcessor</primary>
+   </indexterm>
+
+   <indexterm zone="libpq-row-processor">
+    <primary>row processor</primary>
+    <secondary>in libpq</secondary>
+   </indexterm>
+
+   <para>
+    Ordinarily, when receiving a query result from the server,
+    <application>libpq</> adds each row value to the current
+    <type>PGresult</type> until the entire result set is received; then
+    the <type>PGresult</type> is returned to the application as a unit.
+    This approach is simple to work with, but becomes inefficient for large
+    result sets.  To improve performance, an application can register a
+    custom <firstterm>row processor</> function that processes each row
+    as the data is received from the network.  The custom row processor could
+    process the data fully, or store it into some application-specific data
+    structure for later processing.
+   </para>
+
+   <caution>
+    <para>
+     The row processor function sees the rows before it is known whether the
+     query will succeed overall, since the server might return some rows before
+     encountering an error.  For proper transactional behavior, it must be
+     possible to discard or undo whatever the row processor has done, if the
+     query ultimately fails.
+    </para>
+   </caution>
+
+   <para>
+    When using a custom row processor, row data is not accumulated into the
+    <type>PGresult</type>, so the <type>PGresult</type> ultimately delivered to
+    the application will contain no rows (<function>PQntuples</> =
+    <literal>0</>).  However, it still has <function>PQresultStatus</> =
+    <literal>PGRES_TUPLES_OK</>, and it contains correct information about the
+    set of columns in the query result.  On the other hand, if the query fails
+    partway through, the returned <type>PGresult</type> has
+    <function>PQresultStatus</> = <literal>PGRES_FATAL_ERROR</>.  The
+    application must be prepared to undo any actions of the row processor
+    whenever it gets a <literal>PGRES_FATAL_ERROR</> result.
+   </para>
+
+   <para>
+    A custom row processor is registered for a particular connection by
+    calling <function>PQsetRowProcessor</function>, described below.
+    This row processor will be used for all subsequent query results on that
+    connection until changed again.  A row processor function must have a
+    signature matching
+
+ <synopsis>
+ typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
+                                const char **errmsg, void *param);
+ </synopsis>
+    where <type>PGdataValue</> is described by
+ <synopsis>
+ typedef struct pgDataValue
+ {
+     int         len;            /* data length in bytes, or <0 if NULL */
+     const char *value;          /* data value, without zero-termination */
+ } PGdataValue;
+ </synopsis>
+   </para>
+
+   <para>
+    The <parameter>res</> parameter is the <literal>PGRES_TUPLES_OK</>
+    <type>PGresult</type> that will eventually be delivered to the calling
+    application (if no error intervenes).  It contains information about
+    the set of columns in the query result, but no row data.  In particular the
+    row processor must fetch <literal>PQnfields(res)</> to know the number of
+    data columns.
+   </para>
+
+   <para>
+    Immediately after <application>libpq</> has determined the result set's
+    column information, it will make a call to the row processor with
+    <parameter>columns</parameter> set to NULL, but the other parameters as
+    usual.  The row processor can use this call to initialize for a new result
+    set; if it has nothing to do, it can just return <literal>1</>.  In
+    subsequent calls, one per received row, <parameter>columns</parameter>
+    is non-NULL and points to an array of length/pointer structs, one per
+    data column.
+   </para>
+
+   <para>
+    <parameter>errmsg</parameter> is an output parameter used only for error
+    reporting.  If the row processor needs to report an error, it can set
+    <literal>*</><parameter>errmsg</parameter> to point to a suitable message
+    string (and then return <literal>-1</>).  As a special case, returning
+    <literal>-1</> without changing <literal>*</><parameter>errmsg</parameter>
+    from its initial value of NULL is taken to mean <quote>out of memory</>.
+   </para>
+
+   <para>
+    The last parameter, <parameter>param</parameter>, is just a void pointer
+    passed through from <function>PQsetRowProcessor</function>.  This can be
+    used for communication between the row processor function and the
+    surrounding application.
+   </para>
+
+   <para>
+    The row processor function must return one of three values.
+    <literal>1</> is the normal, successful result value; <application>libpq</>
+    will continue with receiving row values from the server and passing them to
+    the row processor.
+    <literal>-1</> indicates that the row processor has encountered an error.
+    <application>libpq</> will discard all remaining rows in the result set
+    and then return a <literal>PGRES_FATAL_ERROR</> <type>PGresult</type> to
+    the application (containing the specified error message, or <quote>out of
+    memory for query result</> if <literal>*</><parameter>errmsg</parameter>
+    was left as NULL).
+    <literal>0</> means that <application>libpq</> should suspend receiving the
+    query result and return a <literal>PGRES_SUSPENDED</> <type>PGresult</type>
+    to the application.  This option allows the surrounding application to deal
+    with the row value and then resume reading the query result.
+   </para>
+
+   <para>
+    In the <type>PGdataValue</> array passed to a row processor, data values
+    cannot be assumed to be zero-terminated, whether the data format is text
+    or binary.  A SQL NULL value is indicated by a negative length field.
+   </para>
+
+   <para>
+    The row processor <emphasis>must</> process the row data values
+    immediately, or else copy them into application-controlled storage.
+    The value pointers passed to the row processor point into
+    <application>libpq</>'s internal data input buffer, which will be
+    overwritten by the next packet fetch.  However, if the row processor
+    returns <literal>0</> to request suspension, the passed values will
+    still be valid when control returns to the calling application.
+   </para>
+
+   <para>
+    After a row processor requests suspension, <application>libpq</> returns a
+    <literal>PGRES_SUSPENDED</> <type>PGresult</type> to the application.
+    This <type>PGresult</type> contains no other information and can be dropped
+    immediately with <function>PQclear</>.  When the application is ready to
+    resume processing of the query result, it should call
+    <function>PQgetResult</>; this may result in another
+    <literal>PGRES_SUSPENDED</> result, etc, until the result set is completely
+    processed.  As with any usage of <function>PQgetResult</>, the application
+    should continue calling <function>PQgetResult</> until it gets a NULL
+    result before issuing any new query.
+   </para>
+
+   <para>
+    Another option for exiting a row processor is to throw an exception using
+    C's <function>longjmp()</> or C++'s <literal>throw</>.  If this is done,
+    the state of the <application>libpq</> connection is the same as if the row
+    processor had requested suspension, and the application can resume
+    processing with <function>PQgetResult</>.
+   </para>
+
+   <para>
+    In some cases, a suspension or exception may mean that the remainder of the
+    query result is not interesting.  In such cases the application can discard
+    the remaining rows with <function>PQskipResult</>, described below.
+    Another possible recovery option is to close the connection altogether with
+    <function>PQfinish</>.
+   </para>
+
+   <para>
+    <variablelist>
+     <varlistentry id="libpq-pqsetrowprocessor">
+      <term>
+       <function>PQsetRowProcessor</function>
+       <indexterm>
+        <primary>PQsetRowProcessor</primary>
+       </indexterm>
+      </term>
+
+      <listitem>
+       <para>
+        Sets a callback function to process each row.
+
+ <synopsis>
+ void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+ </synopsis>
+       </para>
+
+       <para>
+        The specified row processor function <parameter>func</> is installed as
+        the active row processor for the given connection <parameter>conn</>.
+        Also, <parameter>param</> is installed as the passthrough pointer to
+        pass to it.  Alternatively, if <parameter>func</> is NULL, the standard
+        row processor is reinstalled on the given connection (and
+        <parameter>param</> is ignored).
+       </para>
+
+       <para>
+        Although the row processor can be changed at any time in the life of a
+        connection, it's generally unwise to do so while a query is active.
+        In particular, when using asynchronous mode, be aware that both
+        <function>PQisBusy</> and <function>PQgetResult</> can call the current
+        row processor.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="libpq-pqgetrowprocessor">
+      <term>
+       <function>PQgetRowProcessor</function>
+       <indexterm>
+        <primary>PQgetRowProcessor</primary>
+       </indexterm>
+      </term>
+
+      <listitem>
+       <para>
+        Fetches the current row processor for the specified connection.
+
+ <synopsis>
+ PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
+ </synopsis>
+       </para>
+
+       <para>
+        In addition to returning the row processor function pointer, the
+        current passthrough pointer will be returned at
+        <literal>*</><parameter>param</>, if <parameter>param</> is not NULL.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="libpq-pqskipresult">
+      <term>
+       <function>PQskipResult</function>
+       <indexterm>
+        <primary>PQskipResult</primary>
+       </indexterm>
+      </term>
+
+      <listitem>
+       <para>
+        Discard all the remaining rows in the incoming result set.
+
+ <synopsis>
+ PGresult *PQskipResult(PGconn *conn);
+ </synopsis>
+       </para>
+
+       <para>
+        This is a simple convenience function to discard incoming data after a
+        row processor has failed or it's determined that the rest of the result
+        set is not interesting.  <function>PQskipResult</> is exactly
+        equivalent to <function>PQgetResult</> except that it transiently
+        installs a dummy row processor function that just discards data.
+        The returned <type>PGresult</> can be discarded without further ado
+        if it has status <literal>PGRES_TUPLES_OK</>; but other status values
+        should be handled normally.  (In particular,
+        <literal>PGRES_FATAL_ERROR</> indicates a server-reported error that
+        will still need to be dealt with.)
+        As when using <function>PQgetResult</>, one should usually repeat the
+        call until NULL is returned to ensure the connection has reached an
+        idle state.  Another possible usage is to call
+        <function>PQskipResult</> just once, and then resume using
+        <function>PQgetResult</> to process subsequent result sets normally.
+       </para>
+
+       <para>
+        Because <function>PQskipResult</> will wait for server input, it is not
+        very useful in asynchronous applications.  In particular you should not
+        code a loop of <function>PQisBusy</> and <function>PQskipResult</>,
+        because that will result in the normal row processor being called
+        within <function>PQisBusy</>.  To get the proper behavior in an
+        asynchronous application, you'll need to install a dummy row processor
+        (or set a flag to make your normal row processor do nothing) and leave
+        it there until you have discarded all incoming data via your normal
+        <function>PQisBusy</> and <function>PQgetResult</> loop.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
+  </sect1>
+
   <sect1 id="libpq-events">
    <title>Event System</title>

diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 715e23167de82709da99b8c8afa58d81935eb25c..33dc97e95f249576165adb28d7c59b556a2ea08b 100644
*** a/src/bin/psql/common.c
--- b/src/bin/psql/common.c
*************** AcceptResult(const PGresult *result)
*** 450,456 ****
              case PGRES_EMPTY_QUERY:
              case PGRES_COPY_IN:
              case PGRES_COPY_OUT:
-             case PGRES_COPY_BOTH:
                  /* Fine, do nothing */
                  OK = true;
                  break;
--- 450,455 ----
*************** ProcessResult(PGresult **results)
*** 673,701 ****
          result_status = PQresultStatus(*results);
          switch (result_status)
          {
-             case PGRES_COPY_BOTH:
-                 /*
-                  * No now-existing SQL command can yield PGRES_COPY_BOTH, but
-                  * defend against the future.  PQexec() can't short-circuit
-                  * it's way out of a PGRES_COPY_BOTH, so the connection will
-                  * be useless at this point.  XXX is there a method for
-                  * clearing this status that's likely to work with every
-                  * future command that can initiate it?
-                  */
-                 psql_error("unexpected PQresultStatus (%d)", result_status);
-                 return false;
-
-             case PGRES_COPY_OUT:
-             case PGRES_COPY_IN:
-                 is_copy = true;
-                 break;
-
              case PGRES_EMPTY_QUERY:
              case PGRES_COMMAND_OK:
              case PGRES_TUPLES_OK:
                  is_copy = false;
                  break;

              default:
                  /* AcceptResult() should have caught anything else. */
                  is_copy = false;
--- 672,688 ----
          result_status = PQresultStatus(*results);
          switch (result_status)
          {
              case PGRES_EMPTY_QUERY:
              case PGRES_COMMAND_OK:
              case PGRES_TUPLES_OK:
                  is_copy = false;
                  break;

+             case PGRES_COPY_OUT:
+             case PGRES_COPY_IN:
+                 is_copy = true;
+                 break;
+
              default:
                  /* AcceptResult() should have caught anything else. */
                  is_copy = false;
*************** PrintQueryResults(PGresult *results)
*** 817,823 ****

          case PGRES_COPY_OUT:
          case PGRES_COPY_IN:
-         case PGRES_COPY_BOTH:
              /* nothing to do here */
              success = true;
              break;
--- 804,809 ----
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df699edf641ef4defecbdd375d7ff4b3a515..1251455f1f6d92c74e206b5a9b8dcdeb36ac9b98 100644
*** a/src/interfaces/libpq/exports.txt
--- b/src/interfaces/libpq/exports.txt
*************** PQconnectStartParams      157
*** 160,162 ****
--- 160,165 ----
  PQping                    158
  PQpingParams              159
  PQlibVersion              160
+ PQsetRowProcessor         161
+ PQgetRowProcessor         162
+ PQskipResult              163
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 6a20a1485d17b6fdea79cb922ff26c78335388a9..03fd6e45bb9a9b996f39b6c2625643ac8a288ebe 100644
*** a/src/interfaces/libpq/fe-connect.c
--- b/src/interfaces/libpq/fe-connect.c
*************** keep_going:                        /* We will come back to
*** 2425,2431 ****
                      conn->status = CONNECTION_AUTH_OK;

                      /*
!                      * Set asyncStatus so that PQsetResult will think that
                       * what comes back next is the result of a query.  See
                       * below.
                       */
--- 2425,2431 ----
                      conn->status = CONNECTION_AUTH_OK;

                      /*
!                      * Set asyncStatus so that PQgetResult will think that
                       * what comes back next is the result of a query.  See
                       * below.
                       */
*************** makeEmptyPGconn(void)
*** 2686,2693 ****
--- 2686,2696 ----
      /* Zero all pointers and booleans */
      MemSet(conn, 0, sizeof(PGconn));

+     /* install default row processor and notice hooks */
+     PQsetRowProcessor(conn, NULL, NULL);
      conn->noticeHooks.noticeRec = defaultNoticeReceiver;
      conn->noticeHooks.noticeProc = defaultNoticeProcessor;
+
      conn->status = CONNECTION_BAD;
      conn->asyncStatus = PGASYNC_IDLE;
      conn->xactStatus = PQTRANS_IDLE;
*************** makeEmptyPGconn(void)
*** 2721,2731 ****
--- 2724,2737 ----
      conn->inBuffer = (char *) malloc(conn->inBufSize);
      conn->outBufSize = 16 * 1024;
      conn->outBuffer = (char *) malloc(conn->outBufSize);
+     conn->rowBufLen = 32;
+     conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue));
      initPQExpBuffer(&conn->errorMessage);
      initPQExpBuffer(&conn->workBuffer);

      if (conn->inBuffer == NULL ||
          conn->outBuffer == NULL ||
+         conn->rowBuf == NULL ||
          PQExpBufferBroken(&conn->errorMessage) ||
          PQExpBufferBroken(&conn->workBuffer))
      {
*************** freePGconn(PGconn *conn)
*** 2829,2834 ****
--- 2835,2842 ----
          free(conn->inBuffer);
      if (conn->outBuffer)
          free(conn->outBuffer);
+     if (conn->rowBuf)
+         free(conn->rowBuf);
      termPQExpBuffer(&conn->errorMessage);
      termPQExpBuffer(&conn->workBuffer);

*************** closePGconn(PGconn *conn)
*** 2888,2894 ****
      conn->status = CONNECTION_BAD;        /* Well, not really _bad_ - just
                                           * absent */
      conn->asyncStatus = PGASYNC_IDLE;
!     pqClearAsyncResult(conn);    /* deallocate result and curTuple */
      pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
      conn->addrlist = NULL;
      conn->addr_cur = NULL;
--- 2896,2902 ----
      conn->status = CONNECTION_BAD;        /* Well, not really _bad_ - just
                                           * absent */
      conn->asyncStatus = PGASYNC_IDLE;
!     pqClearAsyncResult(conn);    /* deallocate result */
      pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
      conn->addrlist = NULL;
      conn->addr_cur = NULL;
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566a5ddda2e9f30a4148bfcac78ed898fd84..b79cd7e45d6ff63c6beb85232ffbdcc32e7397df 100644
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
*************** char       *const pgresStatus[] = {
*** 38,44 ****
      "PGRES_BAD_RESPONSE",
      "PGRES_NONFATAL_ERROR",
      "PGRES_FATAL_ERROR",
!     "PGRES_COPY_BOTH"
  };

  /*
--- 38,45 ----
      "PGRES_BAD_RESPONSE",
      "PGRES_NONFATAL_ERROR",
      "PGRES_FATAL_ERROR",
!     "PGRES_COPY_BOTH",
!     "PGRES_SUSPENDED"
  };

  /*
*************** static bool static_std_strings = false;
*** 50,55 ****
--- 51,59 ----


  static PGEvent *dupEvents(PGEvent *events, int count);
+ static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
+ static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
+                   const char **errmsg, void *param);
  static bool PQsendQueryStart(PGconn *conn);
  static int PQsendQueryGuts(PGconn *conn,
                  const char *command,
*************** static int PQsendQueryGuts(PGconn *conn,
*** 61,66 ****
--- 65,72 ----
                  const int *paramFormats,
                  int resultFormat);
  static void parseInput(PGconn *conn);
+ static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
+                   const char **errmsg, void *param);
  static bool PQexecStart(PGconn *conn);
  static PGresult *PQexecFinish(PGconn *conn);
  static int PQsendDescribe(PGconn *conn, char desc_type,
*************** PQmakeEmptyPGresult(PGconn *conn, ExecSt
*** 176,181 ****
--- 182,188 ----
              case PGRES_COPY_OUT:
              case PGRES_COPY_IN:
              case PGRES_COPY_BOTH:
+             case PGRES_SUSPENDED:
                  /* non-error cases */
                  break;
              default:
*************** PQclear(PGresult *res)
*** 694,707 ****
  /*
   * Handy subroutine to deallocate any partially constructed async result.
   */
-
  void
  pqClearAsyncResult(PGconn *conn)
  {
      if (conn->result)
          PQclear(conn->result);
      conn->result = NULL;
-     conn->curTuple = NULL;
  }

  /*
--- 701,712 ----
*************** pqPrepareAsyncResult(PGconn *conn)
*** 756,762 ****
       */
      res = conn->result;
      conn->result = NULL;        /* handing over ownership to caller */
-     conn->curTuple = NULL;        /* just in case */
      if (!res)
          res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
      else
--- 761,766 ----
*************** pqInternalNotice(const PGNoticeHooks *ho
*** 832,838 ****
   *      add a row pointer to the PGresult structure, growing it if necessary
   *      Returns TRUE if OK, FALSE if not enough memory to add the row
   */
! int
  pqAddTuple(PGresult *res, PGresAttValue *tup)
  {
      if (res->ntups >= res->tupArrSize)
--- 836,842 ----
   *      add a row pointer to the PGresult structure, growing it if necessary
   *      Returns TRUE if OK, FALSE if not enough memory to add the row
   */
! static bool
  pqAddTuple(PGresult *res, PGresAttValue *tup)
  {
      if (res->ntups >= res->tupArrSize)
*************** pqSaveParameterStatus(PGconn *conn, cons
*** 979,984 ****
--- 983,1106 ----


  /*
+  * PQsetRowProcessor
+  *      Set function that copies row data out from the network buffer,
+  *      along with a passthrough parameter for it.
+  */
+ void
+ PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+ {
+     if (!conn)
+         return;
+
+     if (func)
+     {
+         /* set custom row processor */
+         conn->rowProcessor = func;
+         conn->rowProcessorParam = param;
+     }
+     else
+     {
+         /* set default row processor */
+         conn->rowProcessor = pqStdRowProcessor;
+         conn->rowProcessorParam = conn;
+     }
+ }
+
+ /*
+  * PQgetRowProcessor
+  *      Get current row processor of PGconn.
+  *      If param is not NULL, also store the passthrough parameter at *param.
+  */
+ PQrowProcessor
+ PQgetRowProcessor(const PGconn *conn, void **param)
+ {
+     if (!conn)
+     {
+         if (param)
+             *param = NULL;
+         return NULL;
+     }
+
+     if (param)
+         *param = conn->rowProcessorParam;
+     return conn->rowProcessor;
+ }
+
+ /*
+  * pqStdRowProcessor
+  *      Add the received row to the PGresult structure
+  *      Returns 1 if OK, -1 if error occurred.
+  *
+  * Note: "param" should point to the PGconn, but we don't actually need that
+  * as of the current coding.
+  */
+ static int
+ pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
+                   const char **errmsg, void *param)
+ {
+     int            nfields = res->numAttributes;
+     PGresAttValue *tup;
+     int            i;
+
+     if (columns == NULL)
+     {
+         /* New result set ... we have nothing to do in this function. */
+         return 1;
+     }
+
+     /*
+      * Basically we just allocate space in the PGresult for each field and
+      * copy the data over.
+      *
+      * Note: on malloc failure, we return -1 leaving *errmsg still NULL, which
+      * caller will take to mean "out of memory".  This is preferable to trying
+      * to set up such a message here, because evidently there's not enough
+      * memory for gettext() to do anything.
+      */
+     tup = (PGresAttValue *)
+         pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+     if (tup == NULL)
+         return -1;
+
+     for (i = 0; i < nfields; i++)
+     {
+         int        clen = columns[i].len;
+
+         if (clen < 0)
+         {
+             /* null field */
+             tup[i].len = NULL_LEN;
+             tup[i].value = res->null_field;
+         }
+         else
+         {
+             bool        isbinary = (res->attDescs[i].format != 0);
+             char       *val;
+
+             val = (char *) pqResultAlloc(res, clen + 1, isbinary);
+             if (val == NULL)
+                 return -1;
+
+             /* copy and zero-terminate the data (even if it's binary) */
+             memcpy(val, columns[i].value, clen);
+             val[clen] = '\0';
+
+             tup[i].len = clen;
+             tup[i].value = val;
+         }
+     }
+
+     /* And add the tuple to the PGresult's tuple array */
+     if (!pqAddTuple(res, tup))
+         return -1;
+
+     /* Success */
+     return 1;
+ }
+
+
+ /*
   * PQsendQuery
   *     Submit a query, but don't wait for it to finish
   *
*************** PQsendQueryStart(PGconn *conn)
*** 1223,1229 ****

      /* initialize async result-accumulation state */
      conn->result = NULL;
-     conn->curTuple = NULL;

      /* ready to send command message */
      return true;
--- 1345,1350 ----
*************** PQconsumeInput(PGconn *conn)
*** 1468,1473 ****
--- 1589,1597 ----
   * parseInput: if appropriate, parse input data from backend
   * until input is exhausted or a stopping state is reached.
   * Note that this function will NOT attempt to read more data from the backend.
+  *
+  * Note: callers of parseInput must be prepared for a longjmp exit when we are
+  * in PGASYNC_BUSY state, since an external row processor might do that.
   */
  static void
  parseInput(PGconn *conn)
*************** PQgetResult(PGconn *conn)
*** 1562,1567 ****
--- 1686,1697 ----
              /* Set the state back to BUSY, allowing parsing to proceed. */
              conn->asyncStatus = PGASYNC_BUSY;
              break;
+         case PGASYNC_SUSPENDED:
+             /* Note we do not touch the async result here */
+             res = PQmakeEmptyPGresult(conn, PGRES_SUSPENDED);
+             /* Set the state back to BUSY, allowing parsing to proceed. */
+             conn->asyncStatus = PGASYNC_BUSY;
+             break;
          case PGASYNC_COPY_IN:
              if (conn->result && conn->result->resultStatus == PGRES_COPY_IN)
                  res = pqPrepareAsyncResult(conn);
*************** PQgetResult(PGconn *conn)
*** 1615,1620 ****
--- 1745,1792 ----
      return res;
  }

+ /*
+  * PQskipResult
+  *      Get the next PGresult produced by a query, but discard any data rows.
+  *
+  * This is mainly useful for cleaning up after deciding to abandon a suspended
+  * query.  Note that it's of little value in an async-style application, since
+  * any preceding calls to PQisBusy would have already called the regular row
+  * processor.
+  */
+ PGresult *
+ PQskipResult(PGconn *conn)
+ {
+     PGresult   *res;
+     PQrowProcessor savedRowProcessor;
+
+     if (!conn)
+         return NULL;
+
+     /* temporarily install dummy row processor */
+     savedRowProcessor = conn->rowProcessor;
+     conn->rowProcessor = dummyRowProcessor;
+     /* no need to save/change rowProcessorParam */
+
+     /* fetch the next result */
+     res = PQgetResult(conn);
+
+     /* restore previous row processor */
+     conn->rowProcessor = savedRowProcessor;
+
+     return res;
+ }
+
+ /*
+  * Do-nothing row processor for PQskipResult
+  */
+ static int
+ dummyRowProcessor(PGresult *res, const PGdataValue *columns,
+                   const char **errmsg, void *param)
+ {
+     return 1;
+ }
+

  /*
   * PQexec
*************** PQexecStart(PGconn *conn)
*** 1721,1727 ****
       * Silently discard any prior query result that application didn't eat.
       * This is probably poor design, but it's here for backward compatibility.
       */
!     while ((result = PQgetResult(conn)) != NULL)
      {
          ExecStatusType resultStatus = result->resultStatus;

--- 1893,1899 ----
       * Silently discard any prior query result that application didn't eat.
       * This is probably poor design, but it's here for backward compatibility.
       */
!     while ((result = PQskipResult(conn)) != NULL)
      {
          ExecStatusType resultStatus = result->resultStatus;

diff --git a/src/interfaces/libpq/fe-lobj.c b/src/interfaces/libpq/fe-lobj.c
index 29752270a1d8072fabc73eea0e64383d32b12402..13fd98c2f913d3818758e75bac96822306981b52 100644
*** a/src/interfaces/libpq/fe-lobj.c
--- b/src/interfaces/libpq/fe-lobj.c
***************
*** 40,48 ****
  #define LO_BUFSIZE          8192

  static int    lo_initialize(PGconn *conn);
!
! static Oid
!             lo_import_internal(PGconn *conn, const char *filename, const Oid oid);

  /*
   * lo_open
--- 40,46 ----
  #define LO_BUFSIZE          8192

  static int    lo_initialize(PGconn *conn);
! static Oid    lo_import_internal(PGconn *conn, const char *filename, Oid oid);

  /*
   * lo_open
*************** lo_open(PGconn *conn, Oid lobjId, int mo
*** 59,65 ****
      PQArgBlock    argv[2];
      PGresult   *res;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 57,63 ----
      PQArgBlock    argv[2];
      PGresult   *res;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_close(PGconn *conn, int fd)
*** 101,107 ****
      int            retval;
      int            result_len;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 99,105 ----
      int            retval;
      int            result_len;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_truncate(PGconn *conn, int fd, size_t
*** 139,145 ****
      int            retval;
      int            result_len;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 137,143 ----
      int            retval;
      int            result_len;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_read(PGconn *conn, int fd, char *buf,
*** 192,198 ****
      PGresult   *res;
      int            result_len;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 190,196 ----
      PGresult   *res;
      int            result_len;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_write(PGconn *conn, int fd, const cha
*** 234,240 ****
      int            result_len;
      int            retval;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 232,238 ----
      int            result_len;
      int            retval;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_lseek(PGconn *conn, int fd, int offse
*** 280,286 ****
      int            retval;
      int            result_len;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 278,284 ----
      int            retval;
      int            result_len;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_creat(PGconn *conn, int mode)
*** 328,334 ****
      int            retval;
      int            result_len;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return InvalidOid;
--- 326,332 ----
      int            retval;
      int            result_len;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return InvalidOid;
*************** lo_create(PGconn *conn, Oid lobjId)
*** 367,373 ****
      int            retval;
      int            result_len;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return InvalidOid;
--- 365,371 ----
      int            retval;
      int            result_len;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return InvalidOid;
*************** lo_tell(PGconn *conn, int fd)
*** 413,419 ****
      PGresult   *res;
      int            result_len;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 411,417 ----
      PGresult   *res;
      int            result_len;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_unlink(PGconn *conn, Oid lobjId)
*** 451,457 ****
      int            result_len;
      int            retval;

!     if (conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
--- 449,455 ----
      int            result_len;
      int            retval;

!     if (conn == NULL || conn->lobjfuncs == NULL)
      {
          if (lo_initialize(conn) < 0)
              return -1;
*************** lo_import_with_oid(PGconn *conn, const c
*** 505,511 ****
  }

  static Oid
! lo_import_internal(PGconn *conn, const char *filename, const Oid oid)
  {
      int            fd;
      int            nbytes,
--- 503,509 ----
  }

  static Oid
! lo_import_internal(PGconn *conn, const char *filename, Oid oid)
  {
      int            fd;
      int            nbytes,
*************** lo_initialize(PGconn *conn)
*** 684,691 ****
--- 682,694 ----
      int            n;
      const char *query;
      const char *fname;
+     PQrowProcessor savedRowProcessor;
+     void       *savedRowProcessorParam;
      Oid            foid;

+     if (!conn)
+         return -1;
+
      /*
       * Allocate the structure to hold the functions OID's
       */
*************** lo_initialize(PGconn *conn)
*** 729,735 ****
--- 732,747 ----
              "or proname = 'loread' "
              "or proname = 'lowrite'";

+     /* Ensure the standard row processor is used to collect the result */
+     savedRowProcessor = conn->rowProcessor;
+     savedRowProcessorParam = conn->rowProcessorParam;
+     PQsetRowProcessor(conn, NULL, NULL);
+
      res = PQexec(conn, query);
+
+     conn->rowProcessor = savedRowProcessor;
+     conn->rowProcessorParam = savedRowProcessorParam;
+
      if (res == NULL)
      {
          free(lobjfuncs);
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3f712eab7f2974619f930494c941783874..b5e5519c416aa767921a5549f7947f80e4cf8250 100644
*** a/src/interfaces/libpq/fe-misc.c
--- b/src/interfaces/libpq/fe-misc.c
*************** pqGetnchar(char *s, size_t len, PGconn *
*** 219,224 ****
--- 219,250 ----
  }

  /*
+  * pqSkipnchar:
+  *    skip over len bytes in input buffer.
+  *
+  * Note: this is primarily useful for its debug output, which should
+  * be exactly the same as for pqGetnchar.  We assume the data in question
+  * will actually be used, but just isn't getting copied anywhere as yet.
+  */
+ int
+ pqSkipnchar(size_t len, PGconn *conn)
+ {
+     if (len > (size_t) (conn->inEnd - conn->inCursor))
+         return EOF;
+
+     if (conn->Pfdebug)
+     {
+         fprintf(conn->Pfdebug, "From backend (%lu)> ", (unsigned long) len);
+         fputnbytes(conn->Pfdebug, conn->inBuffer + conn->inCursor, len);
+         fprintf(conn->Pfdebug, "\n");
+     }
+
+     conn->inCursor += len;
+
+     return 0;
+ }
+
+ /*
   * pqPutnchar:
   *    write exactly len bytes to the current message
   */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c38993b8b69f23e7c33f1c41158b1ff9a7d0d8..ec1b2d71bd92bbc1314a410062ec6470d07460cf 100644
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
*************** static int    getNotify(PGconn *conn);
*** 49,59 ****
--- 49,67 ----
  PostgresPollingStatusType
  pqSetenvPoll(PGconn *conn)
  {
+     PostgresPollingStatusType result;
      PGresult   *res;
+     PQrowProcessor savedRowProcessor;
+     void       *savedRowProcessorParam;

      if (conn == NULL || conn->status == CONNECTION_BAD)
          return PGRES_POLLING_FAILED;

+     /* Ensure the standard row processor is used to collect any results */
+     savedRowProcessor = conn->rowProcessor;
+     savedRowProcessorParam = conn->rowProcessorParam;
+     PQsetRowProcessor(conn, NULL, NULL);
+
      /* Check whether there are any data for us */
      switch (conn->setenv_state)
      {
*************** pqSetenvPoll(PGconn *conn)
*** 69,75 ****
                  if (n < 0)
                      goto error_return;
                  if (n == 0)
!                     return PGRES_POLLING_READING;

                  break;
              }
--- 77,86 ----
                  if (n < 0)
                      goto error_return;
                  if (n == 0)
!                 {
!                     result = PGRES_POLLING_READING;
!                     goto normal_return;
!                 }

                  break;
              }
*************** pqSetenvPoll(PGconn *conn)
*** 83,89 ****

              /* Should we raise an error if called when not active? */
          case SETENV_STATE_IDLE:
!             return PGRES_POLLING_OK;

          default:
              printfPQExpBuffer(&conn->errorMessage,
--- 94,101 ----

              /* Should we raise an error if called when not active? */
          case SETENV_STATE_IDLE:
!             result = PGRES_POLLING_OK;
!             goto normal_return;

          default:
              printfPQExpBuffer(&conn->errorMessage,
*************** pqSetenvPoll(PGconn *conn)
*** 180,186 ****
              case SETENV_STATE_CLIENT_ENCODING_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

--- 192,201 ----
              case SETENV_STATE_CLIENT_ENCODING_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 205,211 ****
              case SETENV_STATE_OPTION_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

--- 220,229 ----
              case SETENV_STATE_OPTION_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 244,256 ****
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
!                     return PGRES_POLLING_READING;
                  }

              case SETENV_STATE_QUERY1_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

--- 262,278 ----
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
!                     result = PGRES_POLLING_READING;
!                     goto normal_return;
                  }

              case SETENV_STATE_QUERY1_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 327,339 ****
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
!                     return PGRES_POLLING_READING;
                  }

              case SETENV_STATE_QUERY2_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

--- 349,365 ----
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
!                     result = PGRES_POLLING_READING;
!                     goto normal_return;
                  }

              case SETENV_STATE_QUERY2_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 380,386 ****
                      {
                          /* Query finished, so we're done */
                          conn->setenv_state = SETENV_STATE_IDLE;
!                         return PGRES_POLLING_OK;
                      }
                      break;
                  }
--- 406,413 ----
                      {
                          /* Query finished, so we're done */
                          conn->setenv_state = SETENV_STATE_IDLE;
!                         result = PGRES_POLLING_OK;
!                         goto normal_return;
                      }
                      break;
                  }
*************** pqSetenvPoll(PGconn *conn)
*** 398,404 ****

  error_return:
      conn->setenv_state = SETENV_STATE_IDLE;
!     return PGRES_POLLING_FAILED;
  }


--- 425,436 ----

  error_return:
      conn->setenv_state = SETENV_STATE_IDLE;
!     result = PGRES_POLLING_FAILED;
!
! normal_return:
!     conn->rowProcessor = savedRowProcessor;
!     conn->rowProcessorParam = savedRowProcessorParam;
!     return result;
  }


*************** error_return:
*** 406,411 ****
--- 438,446 ----
   * parseInput: if appropriate, parse input data from backend
   * until input is exhausted or a stopping state is reached.
   * Note that this function will NOT attempt to read more data from the backend.
+  *
+  * Note: callers of parseInput must be prepared for a longjmp exit when we are
+  * in PGASYNC_BUSY state, since an external row processor might do that.
   */
  void
  pqParseInput2(PGconn *conn)
*************** pqParseInput2(PGconn *conn)
*** 549,554 ****
--- 584,591 ----
                          /* First 'T' in a query sequence */
                          if (getRowDescriptions(conn))
                              return;
+                         /* getRowDescriptions() moves inStart itself */
+                         continue;
                      }
                      else
                      {
*************** pqParseInput2(PGconn *conn)
*** 569,574 ****
--- 606,613 ----
                          /* Read another tuple of a normal query response */
                          if (getAnotherTuple(conn, FALSE))
                              return;
+                         /* getAnotherTuple() moves inStart itself */
+                         continue;
                      }
                      else
                      {
*************** pqParseInput2(PGconn *conn)
*** 585,590 ****
--- 624,631 ----
                          /* Read another tuple of a normal query response */
                          if (getAnotherTuple(conn, TRUE))
                              return;
+                         /* getAnotherTuple() moves inStart itself */
+                         continue;
                      }
                      else
                      {
*************** pqParseInput2(PGconn *conn)
*** 627,653 ****
  /*
   * parseInput subroutine to read a 'T' (row descriptions) message.
   * We build a PGresult structure containing the attribute data.
!  * Returns: 0 if completed message, EOF if not enough data yet.
   *
!  * Note that if we run out of data, we have to release the partially
!  * constructed PGresult, and rebuild it again next time.  Fortunately,
!  * that shouldn't happen often, since 'T' messages usually fit in a packet.
   */
  static int
  getRowDescriptions(PGconn *conn)
  {
!     PGresult   *result = NULL;
      int            nfields;
      int            i;

      result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
      if (!result)
!         goto failure;

      /* parseInput already read the 'T' label. */
      /* the next two bytes are the number of fields    */
      if (pqGetInt(&(result->numAttributes), 2, conn))
!         goto failure;
      nfields = result->numAttributes;

      /* allocate space for the attribute descriptors */
--- 668,699 ----
  /*
   * parseInput subroutine to read a 'T' (row descriptions) message.
   * We build a PGresult structure containing the attribute data.
!  * Returns: 0 if completed message, EOF if error, suspend, or not enough
!  * data received yet.
   *
!  * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  Otherwise, conn->inStart
!  * must get advanced past the processed data.
   */
  static int
  getRowDescriptions(PGconn *conn)
  {
!     PGresult   *result;
      int            nfields;
+     const char *errmsg;
      int            i;

      result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
      if (!result)
!     {
!         errmsg = NULL;            /* means "out of memory", see below */
!         goto advance_and_error;
!     }

      /* parseInput already read the 'T' label. */
      /* the next two bytes are the number of fields    */
      if (pqGetInt(&(result->numAttributes), 2, conn))
!         goto EOFexit;
      nfields = result->numAttributes;

      /* allocate space for the attribute descriptors */
*************** getRowDescriptions(PGconn *conn)
*** 656,662 ****
          result->attDescs = (PGresAttDesc *)
              pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
          if (!result->attDescs)
!             goto failure;
          MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
      }

--- 702,711 ----
          result->attDescs = (PGresAttDesc *)
              pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
          if (!result->attDescs)
!         {
!             errmsg = NULL;        /* means "out of memory", see below */
!             goto advance_and_error;
!         }
          MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
      }

*************** getRowDescriptions(PGconn *conn)
*** 671,677 ****
              pqGetInt(&typid, 4, conn) ||
              pqGetInt(&typlen, 2, conn) ||
              pqGetInt(&atttypmod, 4, conn))
!             goto failure;

          /*
           * Since pqGetInt treats 2-byte integers as unsigned, we need to
--- 720,726 ----
              pqGetInt(&typid, 4, conn) ||
              pqGetInt(&typlen, 2, conn) ||
              pqGetInt(&atttypmod, 4, conn))
!             goto EOFexit;

          /*
           * Since pqGetInt treats 2-byte integers as unsigned, we need to
*************** getRowDescriptions(PGconn *conn)
*** 682,688 ****
          result->attDescs[i].name = pqResultStrdup(result,
                                                    conn->workBuffer.data);
          if (!result->attDescs[i].name)
!             goto failure;
          result->attDescs[i].tableid = 0;
          result->attDescs[i].columnid = 0;
          result->attDescs[i].format = 0;
--- 731,740 ----
          result->attDescs[i].name = pqResultStrdup(result,
                                                    conn->workBuffer.data);
          if (!result->attDescs[i].name)
!         {
!             errmsg = NULL;        /* means "out of memory", see below */
!             goto advance_and_error;
!         }
          result->attDescs[i].tableid = 0;
          result->attDescs[i].columnid = 0;
          result->attDescs[i].format = 0;
*************** getRowDescriptions(PGconn *conn)
*** 693,722 ****

      /* Success! */
      conn->result = result;
-     return 0;

! failure:
!     if (result)
          PQclear(result);
      return EOF;
  }

  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
!  * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
      PGresult   *result = conn->result;
      int            nfields = result->numAttributes;
!     PGresAttValue *tup;
!
      /* the backend sends us a bitmap of which attributes are null */
      char        std_bitmap[64]; /* used unless it doesn't fit */
      char       *bitmap = std_bitmap;
--- 745,839 ----

      /* Success! */
      conn->result = result;

!     /*
!      * Advance inStart to show that the "T" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
!     conn->inStart = conn->inCursor;
!
!     /* Give the row processor a chance to initialize for new result set */
!     errmsg = NULL;
!     switch ((*conn->rowProcessor) (result, NULL, &errmsg,
!                                    conn->rowProcessorParam))
!     {
!         case 1:
!             /* everything is good */
!             return 0;
!
!         case 0:
!             /* row processor requested suspension */
!             conn->asyncStatus = PGASYNC_SUSPENDED;
!             return EOF;
!
!         case -1:
!             /* error, report the errmsg below */
!             break;
!
!         default:
!             /* unrecognized return code */
!             errmsg = libpq_gettext("unrecognized return value from row processor");
!             break;
!     }
!     goto set_error_result;
!
! advance_and_error:
!     /*
!      * Discard the failed message.  Unfortunately we don't know for sure
!      * where the end is, so just throw away everything in the input buffer.
!      * This is not very desirable but it's the best we can do in protocol v2.
!      */
!     conn->inStart = conn->inEnd;
!
! set_error_result:
!
!     /*
!      * Replace partially constructed result with an error result. First
!      * discard the old result to try to win back some memory.
!      */
!     pqClearAsyncResult(conn);
!
!     /*
!      * If row processor didn't provide an error message, assume "out of
!      * memory" was meant.  The advantage of having this special case is that
!      * freeing the old result first greatly improves the odds that gettext()
!      * will succeed in providing a translation.
!      */
!     if (!errmsg)
!         errmsg = libpq_gettext("out of memory for query result");
!
!     printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
!
!     /*
!      * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
!      * do to recover...
!      */
!     conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
!     conn->asyncStatus = PGASYNC_READY;
!
! EOFexit:
!     if (result && result != conn->result)
          PQclear(result);
      return EOF;
  }

  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * We fill rowbuf with column pointers and then call the row processor.
!  * Returns: 0 if completed message, EOF if error, suspend, or not enough
!  * data received yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  Otherwise, conn->inStart
!  * must get advanced past the processed data.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
      PGresult   *result = conn->result;
      int            nfields = result->numAttributes;
!     const char *errmsg;
!     PGdataValue *rowbuf;
      /* the backend sends us a bitmap of which attributes are null */
      char        std_bitmap[64]; /* used unless it doesn't fit */
      char       *bitmap = std_bitmap;
*************** getAnotherTuple(PGconn *conn, bool binar
*** 727,754 ****
      int            bitcnt;            /* number of bits examined in current byte */
      int            vlen;            /* length of the current field value */

!     result->binary = binary;
!
!     /* Allocate tuple space if first time for this data message */
!     if (conn->curTuple == NULL)
      {
!         conn->curTuple = (PGresAttValue *)
!             pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
!         if (conn->curTuple == NULL)
!             goto outOfMemory;
!         MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
!
!         /*
!          * If it's binary, fix the column format indicators.  We assume the
!          * backend will consistently send either B or D, not a mix.
!          */
!         if (binary)
          {
!             for (i = 0; i < nfields; i++)
!                 result->attDescs[i].format = 1;
          }
      }
-     tup = conn->curTuple;

      /* Get the null-value bitmap */
      nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
--- 844,876 ----
      int            bitcnt;            /* number of bits examined in current byte */
      int            vlen;            /* length of the current field value */

!     /* Resize row buffer if needed */
!     rowbuf = conn->rowBuf;
!     if (nfields > conn->rowBufLen)
      {
!         rowbuf = (PGdataValue *) realloc(rowbuf,
!                                          nfields * sizeof(PGdataValue));
!         if (!rowbuf)
          {
!             errmsg = NULL;        /* means "out of memory", see below */
!             goto advance_and_error;
          }
+         conn->rowBuf = rowbuf;
+         conn->rowBufLen = nfields;
+     }
+
+     /* Save format specifier */
+     result->binary = binary;
+
+     /*
+      * If it's binary, fix the column format indicators.  We assume the
+      * backend will consistently send either B or D, not a mix.
+      */
+     if (binary)
+     {
+         for (i = 0; i < nfields; i++)
+             result->attDescs[i].format = 1;
      }

      /* Get the null-value bitmap */
      nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
*************** getAnotherTuple(PGconn *conn, bool binar
*** 757,763 ****
      {
          bitmap = (char *) malloc(nbytes);
          if (!bitmap)
!             goto outOfMemory;
      }

      if (pqGetnchar(bitmap, nbytes, conn))
--- 879,888 ----
      {
          bitmap = (char *) malloc(nbytes);
          if (!bitmap)
!         {
!             errmsg = NULL;        /* means "out of memory", see below */
!             goto advance_and_error;
!         }
      }

      if (pqGetnchar(bitmap, nbytes, conn))
*************** getAnotherTuple(PGconn *conn, bool binar
*** 770,804 ****

      for (i = 0; i < nfields; i++)
      {
          if (!(bmap & 0200))
!         {
!             /* if the field value is absent, make it a null string */
!             tup[i].value = result->null_field;
!             tup[i].len = NULL_LEN;
!         }
          else
          {
-             /* get the value length (the first four bytes are for length) */
-             if (pqGetInt(&vlen, 4, conn))
-                 goto EOFexit;
              if (!binary)
                  vlen = vlen - 4;
              if (vlen < 0)
                  vlen = 0;
-             if (tup[i].value == NULL)
-             {
-                 tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                 if (tup[i].value == NULL)
-                     goto outOfMemory;
-             }
-             tup[i].len = vlen;
-             /* read in the value */
-             if (vlen > 0)
-                 if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                     goto EOFexit;
-             /* we have to terminate this ourselves */
-             tup[i].value[vlen] = '\0';
          }
          /* advance the bitmap stuff */
          bitcnt++;
          if (bitcnt == BITS_PER_BYTE)
--- 895,928 ----

      for (i = 0; i < nfields; i++)
      {
+         /* get the value length */
          if (!(bmap & 0200))
!             vlen = NULL_LEN;
!         else if (pqGetInt(&vlen, 4, conn))
!             goto EOFexit;
          else
          {
              if (!binary)
                  vlen = vlen - 4;
              if (vlen < 0)
                  vlen = 0;
          }
+         rowbuf[i].len = vlen;
+
+         /*
+          * rowbuf[i].value always points to the next address in the data
+          * buffer even if the value is NULL.  This allows row processors to
+          * estimate data sizes more easily.
+          */
+         rowbuf[i].value = conn->inBuffer + conn->inCursor;
+
+         /* Skip over the data value */
+         if (vlen > 0)
+         {
+             if (pqSkipnchar(vlen, conn))
+                 goto EOFexit;
+         }
+
          /* advance the bitmap stuff */
          bitcnt++;
          if (bitcnt == BITS_PER_BYTE)
*************** getAnotherTuple(PGconn *conn, bool binar
*** 811,836 ****
              bmap <<= 1;
      }

!     /* Success!  Store the completed tuple in the result */
!     if (!pqAddTuple(result, tup))
!         goto outOfMemory;
!     /* and reset for a new message */
!     conn->curTuple = NULL;
!
      if (bitmap != std_bitmap)
          free(bitmap);
!     return 0;

! outOfMemory:
!     /* Replace partially constructed result with an error result */

      /*
!      * we do NOT use pqSaveErrorResult() here, because of the likelihood that
!      * there's not enough memory to concatenate messages...
       */
      pqClearAsyncResult(conn);
!     printfPQExpBuffer(&conn->errorMessage,
!                       libpq_gettext("out of memory for query result\n"));

      /*
       * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
--- 935,1002 ----
              bmap <<= 1;
      }

!     /* Release bitmap now if we allocated it */
      if (bitmap != std_bitmap)
          free(bitmap);
!     bitmap = NULL;

!     /*
!      * Advance inStart to show that the "D" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
!     conn->inStart = conn->inCursor;

+     /* Pass the completed row values to rowProcessor */
+     errmsg = NULL;
+     switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
+                                    conn->rowProcessorParam))
+     {
+         case 1:
+             /* everything is good */
+             return 0;
+
+         case 0:
+             /* row processor requested suspension */
+             conn->asyncStatus = PGASYNC_SUSPENDED;
+             return EOF;
+
+         case -1:
+             /* error, report the errmsg below */
+             break;
+
+         default:
+             /* unrecognized return code */
+             errmsg = libpq_gettext("unrecognized return value from row processor");
+             break;
+     }
+     goto set_error_result;
+
+ advance_and_error:
      /*
!      * Discard the failed message.  Unfortunately we don't know for sure
!      * where the end is, so just throw away everything in the input buffer.
!      * This is not very desirable but it's the best we can do in protocol v2.
!      */
!     conn->inStart = conn->inEnd;
!
! set_error_result:
!
!     /*
!      * Replace partially constructed result with an error result. First
!      * discard the old result to try to win back some memory.
       */
      pqClearAsyncResult(conn);
!
!     /*
!      * If row processor didn't provide an error message, assume "out of
!      * memory" was meant.  The advantage of having this special case is that
!      * freeing the old result first greatly improves the odds that gettext()
!      * will succeed in providing a translation.
!      */
!     if (!errmsg)
!         errmsg = libpq_gettext("out of memory for query result");
!
!     printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);

      /*
       * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
*************** outOfMemory:
*** 838,845 ****
       */
      conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
      conn->asyncStatus = PGASYNC_READY;
-     /* Discard the failed message --- good idea? */
-     conn->inStart = conn->inEnd;

  EOFexit:
      if (bitmap != NULL && bitmap != std_bitmap)
--- 1004,1009 ----
*************** pqGetline2(PGconn *conn, char *s, int ma
*** 1122,1128 ****
  {
      int            result = 1;        /* return value if buffer overflows */

!     if (conn->sock < 0)
      {
          *s = '\0';
          return EOF;
--- 1286,1293 ----
  {
      int            result = 1;        /* return value if buffer overflows */

!     if (conn->sock < 0 ||
!         conn->asyncStatus != PGASYNC_COPY_OUT)
      {
          *s = '\0';
          return EOF;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbcb00f15e9a675543b874603058ebf012f1..ccf8a814280653e310d533b4cf3a929d17686da2 100644
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 44,50 ****


  static void handleSyncLoss(PGconn *conn, char id, int msgLength);
! static int    getRowDescriptions(PGconn *conn);
  static int    getParamDescriptions(PGconn *conn);
  static int    getAnotherTuple(PGconn *conn, int msgLength);
  static int    getParameterStatus(PGconn *conn);
--- 44,50 ----


  static void handleSyncLoss(PGconn *conn, char id, int msgLength);
! static int    getRowDescriptions(PGconn *conn, int msgLength);
  static int    getParamDescriptions(PGconn *conn);
  static int    getAnotherTuple(PGconn *conn, int msgLength);
  static int    getParameterStatus(PGconn *conn);
*************** static int build_startup_packet(const PG
*** 61,66 ****
--- 61,69 ----
   * parseInput: if appropriate, parse input data from backend
   * until input is exhausted or a stopping state is reached.
   * Note that this function will NOT attempt to read more data from the backend.
+  *
+  * Note: callers of parseInput must be prepared for a longjmp exit when we are
+  * in PGASYNC_BUSY state, since an external row processor might do that.
   */
  void
  pqParseInput3(PGconn *conn)
*************** pqParseInput3(PGconn *conn)
*** 269,283 ****
                          conn->queryclass == PGQUERY_DESCRIBE)
                      {
                          /* First 'T' in a query sequence */
!                         if (getRowDescriptions(conn))
                              return;
!
!                         /*
!                          * If we're doing a Describe, we're ready to pass the
!                          * result back to the client.
!                          */
!                         if (conn->queryclass == PGQUERY_DESCRIBE)
!                             conn->asyncStatus = PGASYNC_READY;
                      }
                      else
                      {
--- 272,281 ----
                          conn->queryclass == PGQUERY_DESCRIBE)
                      {
                          /* First 'T' in a query sequence */
!                         if (getRowDescriptions(conn, msgLength))
                              return;
!                         /* getRowDescriptions() moves inStart itself */
!                         continue;
                      }
                      else
                      {
*************** pqParseInput3(PGconn *conn)
*** 327,332 ****
--- 325,332 ----
                          /* Read another tuple of a normal query response */
                          if (getAnotherTuple(conn, msgLength))
                              return;
+                         /* getAnotherTuple() moves inStart itself */
+                         continue;
                      }
                      else if (conn->result != NULL &&
                               conn->result->resultStatus == PGRES_FATAL_ERROR)
*************** handleSyncLoss(PGconn *conn, char id, in
*** 443,459 ****
   * parseInput subroutine to read a 'T' (row descriptions) message.
   * We'll build a new PGresult structure (unless called for a Describe
   * command for a prepared statement) containing the attribute data.
!  * Returns: 0 if completed message, EOF if not enough data yet.
   *
!  * Note that if we run out of data, we have to release the partially
!  * constructed PGresult, and rebuild it again next time.  Fortunately,
!  * that shouldn't happen often, since 'T' messages usually fit in a packet.
   */
  static int
! getRowDescriptions(PGconn *conn)
  {
      PGresult   *result;
      int            nfields;
      int            i;

      /*
--- 443,461 ----
   * parseInput subroutine to read a 'T' (row descriptions) message.
   * We'll build a new PGresult structure (unless called for a Describe
   * command for a prepared statement) containing the attribute data.
!  * Returns: 0 if processed message successfully, EOF if suspended parsing.
!  * In either case, conn->inStart has been advanced past the message.
   *
!  * Note: the row processor could also choose to longjmp out of libpq,
!  * in which case the library's state must be the same as if we'd gotten
!  * a "suspend parsing" return and exited normally.
   */
  static int
! getRowDescriptions(PGconn *conn, int msgLength)
  {
      PGresult   *result;
      int            nfields;
+     const char *errmsg;
      int            i;

      /*
*************** getRowDescriptions(PGconn *conn)
*** 471,482 ****
      else
          result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
      if (!result)
!         goto failure;

      /* parseInput already read the 'T' label and message length. */
      /* the next two bytes are the number of fields */
      if (pqGetInt(&(result->numAttributes), 2, conn))
!         goto failure;
      nfields = result->numAttributes;

      /* allocate space for the attribute descriptors */
--- 473,491 ----
      else
          result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
      if (!result)
!     {
!         errmsg = NULL;            /* means "out of memory", see below */
!         goto advance_and_error;
!     }

      /* parseInput already read the 'T' label and message length. */
      /* the next two bytes are the number of fields */
      if (pqGetInt(&(result->numAttributes), 2, conn))
!     {
!         /* We should not run out of data here, so complain */
!         errmsg = libpq_gettext("insufficient data in \"T\" message");
!         goto advance_and_error;
!     }
      nfields = result->numAttributes;

      /* allocate space for the attribute descriptors */
*************** getRowDescriptions(PGconn *conn)
*** 485,491 ****
          result->attDescs = (PGresAttDesc *)
              pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
          if (!result->attDescs)
!             goto failure;
          MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
      }

--- 494,503 ----
          result->attDescs = (PGresAttDesc *)
              pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
          if (!result->attDescs)
!         {
!             errmsg = NULL;        /* means "out of memory", see below */
!             goto advance_and_error;
!         }
          MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
      }

*************** getRowDescriptions(PGconn *conn)
*** 510,516 ****
              pqGetInt(&atttypmod, 4, conn) ||
              pqGetInt(&format, 2, conn))
          {
!             goto failure;
          }

          /*
--- 522,530 ----
              pqGetInt(&atttypmod, 4, conn) ||
              pqGetInt(&format, 2, conn))
          {
!             /* We should not run out of data here, so complain */
!             errmsg = libpq_gettext("insufficient data in \"T\" message");
!             goto advance_and_error;
          }

          /*
*************** getRowDescriptions(PGconn *conn)
*** 524,530 ****
          result->attDescs[i].name = pqResultStrdup(result,
                                                    conn->workBuffer.data);
          if (!result->attDescs[i].name)
!             goto failure;
          result->attDescs[i].tableid = tableid;
          result->attDescs[i].columnid = columnid;
          result->attDescs[i].format = format;
--- 538,547 ----
          result->attDescs[i].name = pqResultStrdup(result,
                                                    conn->workBuffer.data);
          if (!result->attDescs[i].name)
!         {
!             errmsg = NULL;        /* means "out of memory", see below */
!             goto advance_and_error;
!         }
          result->attDescs[i].tableid = tableid;
          result->attDescs[i].columnid = columnid;
          result->attDescs[i].format = format;
*************** getRowDescriptions(PGconn *conn)
*** 536,559 ****
              result->binary = 0;
      }

      /* Success! */
      conn->result = result;
-     return 0;

! failure:

      /*
!      * Discard incomplete result, unless it's from getParamDescriptions.
!      *
!      * Note that if we hit a bufferload boundary while handling the
!      * describe-statement case, we'll forget any PGresult space we just
!      * allocated, and then reallocate it on next try.  This will bloat the
!      * PGresult a little bit but the space will be freed at PQclear, so it
!      * doesn't seem worth trying to be smarter.
       */
!     if (result != conn->result)
          PQclear(result);
!     return EOF;
  }

  /*
--- 553,641 ----
              result->binary = 0;
      }

+     /* Sanity check that we absorbed all the data */
+     if (conn->inCursor != conn->inStart + 5 + msgLength)
+     {
+         errmsg = libpq_gettext("extraneous data in \"T\" message");
+         goto advance_and_error;
+     }
+
      /* Success! */
      conn->result = result;

!     /*
!      * Advance inStart to show that the "T" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
!     conn->inStart = conn->inCursor;

      /*
!      * If we're doing a Describe, we're done, and ready to pass the result
!      * back to the client.
       */
!     if (conn->queryclass == PGQUERY_DESCRIBE)
!     {
!         conn->asyncStatus = PGASYNC_READY;
!         return 0;
!     }
!
!     /* Give the row processor a chance to initialize for new result set */
!     errmsg = NULL;
!     switch ((*conn->rowProcessor) (result, NULL, &errmsg,
!                                    conn->rowProcessorParam))
!     {
!         case 1:
!             /* everything is good */
!             return 0;
!
!         case 0:
!             /* row processor requested suspension */
!             conn->asyncStatus = PGASYNC_SUSPENDED;
!             return EOF;
!
!         case -1:
!             /* error, report the errmsg below */
!             break;
!
!         default:
!             /* unrecognized return code */
!             errmsg = libpq_gettext("unrecognized return value from row processor");
!             break;
!     }
!     goto set_error_result;
!
! advance_and_error:
!     /* Discard unsaved result, if any */
!     if (result && result != conn->result)
          PQclear(result);
!
!     /* Discard the failed message by pretending we read it */
!     conn->inStart += 5 + msgLength;
!
! set_error_result:
!
!     /*
!      * Replace partially constructed result with an error result. First
!      * discard the old result to try to win back some memory.
!      */
!     pqClearAsyncResult(conn);
!
!     /*
!      * If row processor didn't provide an error message, assume "out of
!      * memory" was meant.
!      */
!     if (!errmsg)
!         errmsg = libpq_gettext("out of memory for query result");
!
!     printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
!     pqSaveErrorResult(conn);
!
!     /*
!      * Return zero to allow input parsing to continue.  Subsequent "D"
!      * messages will be ignored until we get to end of data, since an error
!      * result is already set up.
!      */
!     return 0;
  }

  /*
*************** failure:
*** 613,659 ****

  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
!  * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
!  * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
      PGresult   *result = conn->result;
      int            nfields = result->numAttributes;
!     PGresAttValue *tup;
      int            tupnfields;        /* # fields from tuple */
      int            vlen;            /* length of the current field value */
      int            i;

-     /* Allocate tuple space if first time for this data message */
-     if (conn->curTuple == NULL)
-     {
-         conn->curTuple = (PGresAttValue *)
-             pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-         if (conn->curTuple == NULL)
-             goto outOfMemory;
-         MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-     }
-     tup = conn->curTuple;
-
      /* Get the field count and make sure it's what we expect */
      if (pqGetInt(&tupnfields, 2, conn))
!         return EOF;

      if (tupnfields != nfields)
      {
!         /* Replace partially constructed result with an error result */
!         printfPQExpBuffer(&conn->errorMessage,
!                  libpq_gettext("unexpected field count in \"D\" message\n"));
!         pqSaveErrorResult(conn);
!         /* Discard the failed message by pretending we read it */
!         conn->inCursor = conn->inStart + 5 + msgLength;
!         return 0;
      }

      /* Scan the fields */
--- 695,746 ----

  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * We fill rowbuf with column pointers and then call the row processor.
!  * Returns: 0 if processed message successfully, EOF if suspended parsing.
!  * In either case, conn->inStart has been advanced past the message.
   *
!  * Note: the row processor could also choose to longjmp out of libpq,
!  * in which case the library's state must be the same as if we'd gotten
!  * a "suspend parsing" return and exited normally.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
      PGresult   *result = conn->result;
      int            nfields = result->numAttributes;
!     const char *errmsg;
!     PGdataValue *rowbuf;
      int            tupnfields;        /* # fields from tuple */
      int            vlen;            /* length of the current field value */
      int            i;

      /* Get the field count and make sure it's what we expect */
      if (pqGetInt(&tupnfields, 2, conn))
!     {
!         /* We should not run out of data here, so complain */
!         errmsg = libpq_gettext("insufficient data in \"D\" message");
!         goto advance_and_error;
!     }

      if (tupnfields != nfields)
      {
!         errmsg = libpq_gettext("unexpected field count in \"D\" message");
!         goto advance_and_error;
!     }
!
!     /* Resize row buffer if needed */
!     rowbuf = conn->rowBuf;
!     if (nfields > conn->rowBufLen)
!     {
!         rowbuf = (PGdataValue *) realloc(rowbuf,
!                                          nfields * sizeof(PGdataValue));
!         if (!rowbuf)
!         {
!             errmsg = NULL;        /* means "out of memory", see below */
!             goto advance_and_error;
!         }
!         conn->rowBuf = rowbuf;
!         conn->rowBufLen = nfields;
      }

      /* Scan the fields */
*************** getAnotherTuple(PGconn *conn, int msgLen
*** 661,714 ****
      {
          /* get the value length */
          if (pqGetInt(&vlen, 4, conn))
-             return EOF;
-         if (vlen == -1)
          {
!             /* null field */
!             tup[i].value = result->null_field;
!             tup[i].len = NULL_LEN;
!             continue;
          }
!         if (vlen < 0)
!             vlen = 0;
!         if (tup[i].value == NULL)
!         {
!             bool        isbinary = (result->attDescs[i].format != 0);

!             tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
!             if (tup[i].value == NULL)
!                 goto outOfMemory;
!         }
!         tup[i].len = vlen;
!         /* read in the value */
          if (vlen > 0)
!             if (pqGetnchar((char *) (tup[i].value), vlen, conn))
!                 return EOF;
!         /* we have to terminate this ourselves */
!         tup[i].value[vlen] = '\0';
      }

!     /* Success!  Store the completed tuple in the result */
!     if (!pqAddTuple(result, tup))
!         goto outOfMemory;
!     /* and reset for a new message */
!     conn->curTuple = NULL;

!     return 0;

! outOfMemory:

      /*
       * Replace partially constructed result with an error result. First
       * discard the old result to try to win back some memory.
       */
      pqClearAsyncResult(conn);
!     printfPQExpBuffer(&conn->errorMessage,
!                       libpq_gettext("out of memory for query result\n"));
      pqSaveErrorResult(conn);

!     /* Discard the failed message by pretending we read it */
!     conn->inCursor = conn->inStart + 5 + msgLength;
      return 0;
  }

--- 748,846 ----
      {
          /* get the value length */
          if (pqGetInt(&vlen, 4, conn))
          {
!             /* We should not run out of data here, so complain */
!             errmsg = libpq_gettext("insufficient data in \"D\" message");
!             goto advance_and_error;
          }
!         rowbuf[i].len = vlen;

!         /*
!          * rowbuf[i].value always points to the next address in the data
!          * buffer even if the value is NULL.  This allows row processors to
!          * estimate data sizes more easily.
!          */
!         rowbuf[i].value = conn->inBuffer + conn->inCursor;
!
!         /* Skip over the data value */
          if (vlen > 0)
!         {
!             if (pqSkipnchar(vlen, conn))
!             {
!                 /* We should not run out of data here, so complain */
!                 errmsg = libpq_gettext("insufficient data in \"D\" message");
!                 goto advance_and_error;
!             }
!         }
      }

!     /* Sanity check that we absorbed all the data */
!     if (conn->inCursor != conn->inStart + 5 + msgLength)
!     {
!         errmsg = libpq_gettext("extraneous data in \"D\" message");
!         goto advance_and_error;
!     }

!     /*
!      * Advance inStart to show that the "D" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
!     conn->inStart = conn->inCursor;

!     /* Pass the completed row values to rowProcessor */
!     errmsg = NULL;
!     switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
!                                    conn->rowProcessorParam))
!     {
!         case 1:
!             /* everything is good */
!             return 0;
!
!         case 0:
!             /* row processor requested suspension */
!             conn->asyncStatus = PGASYNC_SUSPENDED;
!             return EOF;
!
!         case -1:
!             /* error, report the errmsg below */
!             break;
!
!         default:
!             /* unrecognized return code */
!             errmsg = libpq_gettext("unrecognized return value from row processor");
!             break;
!     }
!     goto set_error_result;
!
! advance_and_error:
!     /* Discard the failed message by pretending we read it */
!     conn->inStart += 5 + msgLength;
!
! set_error_result:

      /*
       * Replace partially constructed result with an error result. First
       * discard the old result to try to win back some memory.
       */
      pqClearAsyncResult(conn);
!
!     /*
!      * If row processor didn't provide an error message, assume "out of
!      * memory" was meant.  The advantage of having this special case is that
!      * freeing the old result first greatly improves the odds that gettext()
!      * will succeed in providing a translation.
!      */
!     if (!errmsg)
!         errmsg = libpq_gettext("out of memory for query result");
!
!     printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
      pqSaveErrorResult(conn);

!     /*
!      * Return zero to allow input parsing to continue.  Subsequent "D"
!      * messages will be ignored until we get to end of data, since an error
!      * result is already set up.
!      */
      return 0;
  }

diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9e0d8b3c510c75f16ec79ace35ad7f3cd0..2800be6e97e9383176ba1f837abd32e04ca5a95f 100644
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
*************** extern        "C"
*** 38,50 ****

  /* Application-visible enum types */

  typedef enum
  {
-     /*
-      * Although it is okay to add to this list, values which become unused
-      * should never be removed, nor should constants be redefined - that would
-      * break compatibility with existing code.
-      */
      CONNECTION_OK,
      CONNECTION_BAD,
      /* Non-blocking mode only below here */
--- 38,51 ----

  /* Application-visible enum types */

+ /*
+  * Although it is okay to add to these lists, values which become unused
+  * should never be removed, nor should constants be redefined - that would
+  * break compatibility with existing code.
+  */
+
  typedef enum
  {
      CONNECTION_OK,
      CONNECTION_BAD,
      /* Non-blocking mode only below here */
*************** typedef enum
*** 89,95 ****
                                   * backend */
      PGRES_NONFATAL_ERROR,        /* notice or warning message */
      PGRES_FATAL_ERROR,            /* query failed */
!     PGRES_COPY_BOTH                /* Copy In/Out data transfer in progress */
  } ExecStatusType;

  typedef enum
--- 90,97 ----
                                   * backend */
      PGRES_NONFATAL_ERROR,        /* notice or warning message */
      PGRES_FATAL_ERROR,            /* query failed */
!     PGRES_COPY_BOTH,            /* Copy In/Out data transfer in progress */
!     PGRES_SUSPENDED                /* row processor requested suspension */
  } ExecStatusType;

  typedef enum
*************** typedef struct pg_conn PGconn;
*** 128,133 ****
--- 130,146 ----
   */
  typedef struct pg_result PGresult;

+ /* PGdataValue represents a data field value being passed to a row processor.
+  * It could be either text or binary data; text data is not zero-terminated.
+  * A SQL NULL is represented by len < 0; then value is still valid but there
+  * are no data bytes there.
+  */
+ typedef struct pgDataValue
+ {
+     int            len;            /* data length in bytes, or <0 if NULL */
+     const char *value;            /* data value, without zero-termination */
+ } PGdataValue;
+
  /* PGcancel encapsulates the information needed to cancel a running
   * query on an existing connection.
   * The contents of this struct are not supposed to be known to applications.
*************** typedef struct pgNotify
*** 149,154 ****
--- 162,171 ----
      struct pgNotify *next;        /* list link */
  } PGnotify;

+ /* Function type for row-processor callback */
+ typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
+                                const char **errmsg, void *param);
+
  /* Function types for notice-handling callbacks */
  typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
  typedef void (*PQnoticeProcessor) (void *arg, const char *message);
*************** extern int PQsendQueryPrepared(PGconn *c
*** 388,398 ****
--- 405,420 ----
                      const int *paramFormats,
                      int resultFormat);
  extern PGresult *PQgetResult(PGconn *conn);
+ extern PGresult *PQskipResult(PGconn *conn);

  /* Routines for managing an asynchronous query */
  extern int    PQisBusy(PGconn *conn);
  extern int    PQconsumeInput(PGconn *conn);

+ /* Override default per-row processing */
+ extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+ extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
+
  /* LISTEN/NOTIFY support */
  extern PGnotify *PQnotifies(PGconn *conn);

diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 2103af88329894d5cecee7f4de0214ab8613bbe4..b44ca29f65e2eedbe3e994608250a1489ca77ddf 100644
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
*************** typedef enum
*** 217,222 ****
--- 217,223 ----
      PGASYNC_IDLE,                /* nothing's happening, dude */
      PGASYNC_BUSY,                /* query in progress */
      PGASYNC_READY,                /* result ready for PQgetResult */
+     PGASYNC_SUSPENDED,            /* row processor requested suspension */
      PGASYNC_COPY_IN,            /* Copy In data transfer in progress */
      PGASYNC_COPY_OUT,            /* Copy Out data transfer in progress */
      PGASYNC_COPY_BOTH            /* Copy In/Out data transfer in progress */
*************** struct pg_conn
*** 324,329 ****
--- 325,334 ----
      /* Optional file to write trace info to */
      FILE       *Pfdebug;

+     /* Callback procedure for per-row processing */
+     PQrowProcessor rowProcessor;    /* function pointer */
+     void       *rowProcessorParam;    /* passthrough argument */
+
      /* Callback procedures for notice message processing */
      PGNoticeHooks noticeHooks;

*************** struct pg_conn
*** 396,404 ****
                                   * msg has no length word */
      int            outMsgEnd;        /* offset to msg end (so far) */

      /* Status for asynchronous result construction */
      PGresult   *result;            /* result being constructed */
!     PGresAttValue *curTuple;    /* tuple currently being read */

  #ifdef USE_SSL
      bool        allow_ssl_try;    /* Allowed to try SSL negotiation */
--- 401,414 ----
                                   * msg has no length word */
      int            outMsgEnd;        /* offset to msg end (so far) */

+     /* Row processor interface workspace */
+     PGdataValue *rowBuf;        /* array for passing values to rowProcessor */
+     int            rowBufLen;        /* number of entries allocated in rowBuf */
+
      /* Status for asynchronous result construction */
      PGresult   *result;            /* result being constructed */
!
!     /* Assorted state for SSL, GSS, etc */

  #ifdef USE_SSL
      bool        allow_ssl_try;    /* Allowed to try SSL negotiation */
*************** struct pg_conn
*** 435,441 ****
                                   * connection */
  #endif

-
      /* Buffer for current error message */
      PQExpBufferData errorMessage;        /* expansible string */

--- 445,450 ----
*************** extern void
*** 505,511 ****
  pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
  /* This lets gcc check the format string for consistency. */
  __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
- extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);
  extern void pqSaveMessageField(PGresult *res, char code,
                     const char *value);
  extern void pqSaveParameterStatus(PGconn *conn, const char *name,
--- 514,519 ----
*************** extern int    pqGets(PQExpBuffer buf, PGcon
*** 558,563 ****
--- 566,572 ----
  extern int    pqGets_append(PQExpBuffer buf, PGconn *conn);
  extern int    pqPuts(const char *s, PGconn *conn);
  extern int    pqGetnchar(char *s, size_t len, PGconn *conn);
+ extern int    pqSkipnchar(size_t len, PGconn *conn);
  extern int    pqPutnchar(const char *s, size_t len, PGconn *conn);
  extern int    pqGetInt(int *result, size_t bytes, PGconn *conn);
  extern int    pqPutInt(int value, size_t bytes, PGconn *conn);

pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: invalid search_path complaints
Next
From: Tom Lane
Date:
Subject: Re: patch for parallel pg_dump