Thread: Fix for non-blocking connections in libpq

Fix for non-blocking connections in libpq

From
Bernhard Herzog
Date:
Here's a patch against 7.1.3 that fixes a problem with sending larger
queries over non-blocking connections with libpq. "Larger" here
basically means that it doesn't fit into the output buffer.

The basic strategy is to fix pqFlush and pqPutBytes.

The problem with pqFlush as it stands now is that it returns EOF when an
error occurs or when not all data could be sent. The latter case is
clearly not an error for a non-blocking connection but the caller can't
distringuish it from an error very well.

The first part of the fix is therefore to fix pqFlush. This is done by
to renaming it to pqSendSome which only differs from pqFlush in its
return values to allow the caller to make the above distinction and a
new pqFlush which is implemented in terms of pqSendSome and behaves
exactly like the old pqFlush.

The second part of the fix modifies pqPutBytes to use pqSendSome instead
of pqFlush and to either send all the data or if not all data can be
sent on a non-blocking connection to at least put all data into the
output buffer, enlarging it if necessary. The callers of pqPutBytes
don't have to be changed because from their point of view pqPutBytes
behaves like before. It either succeeds in queueing all output data or
fails with an error.

I've also added a new API function PQsendSome which analogously to
PQflush just calls pqSendSome. Programs using non-blocking queries
should use this new function. The main difference is that this function
will have to be called repeatedly (calling select() properly in between)
until all data has been written.

AFAICT, the code in CVS HEAD hasn't changed with respect to non-blocking
queries and this fix should work there, too, but I haven't tested that
yet.


   Bernhard

diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c postgresql-7.1.3/src/interfaces/libpq/fe-exec.c
*** postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c    Sat Feb 10 03:31:30 2001
--- postgresql-7.1.3/src/interfaces/libpq/fe-exec.c    Tue Jan 22 12:12:58 2002
***************
*** 2214,2216 ****
--- 2214,2224 ----

      return (pqFlush(conn));
  }
+
+ /* try to force data out, really only useful for non-blocking users.
+  * This implementation actually works for non-blocking connections */
+ int
+ PQsendSome(PGconn *conn)
+ {
+     return pqSendSome(conn);
+ }
diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c postgresql-7.1.3/src/interfaces/libpq/fe-misc.c
*** postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c    Sun Apr  1 01:13:30 2001
--- postgresql-7.1.3/src/interfaces/libpq/fe-misc.c    Tue Jan 22 12:07:25 2002
***************
*** 90,144 ****
  static int
  pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
  {
!     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
!
!     /*
!      * if we are non-blocking and the send queue is too full to buffer
!      * this request then try to flush some and return an error
       */
-     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
-     {

!         /*
!          * even if the flush failed we may still have written some data,
!          * recalculate the size of the send-queue relative to the amount
!          * we have to send, we may be able to queue it afterall even
!          * though it's not sent to the database it's ok, any routines that
!          * check the data coming from the database better call pqFlush()
!          * anyway.
!          */
!         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
!         {
!             printfPQExpBuffer(&conn->errorMessage,
!                            "pqPutBytes --  pqFlush couldn't flush enough"
!                          " data: space available: %d, space needed %d\n",
!                       Max(conn->outBufSize - conn->outCount, 0), nbytes);
!             return EOF;
!         }
!         /* fixup avail for while loop */
          avail = Max(conn->outBufSize - conn->outCount, 0);
!     }

!     /*
!      * is the amount of data to be sent is larger than the size of the
!      * output buffer then we must flush it to make more room.
!      *
!      * the code above will make sure the loop conditional is never true for
!      * non-blocking connections
!      */
!     while (nbytes > avail)
!     {
!         memcpy(conn->outBuffer + conn->outCount, s, avail);
!         conn->outCount += avail;
!         s += avail;
!         nbytes -= avail;
!         if (pqFlush(conn))
!             return EOF;
!         avail = conn->outBufSize;
!     }

!     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
!     conn->outCount += nbytes;

      return 0;
  }
--- 90,163 ----
  static int
  pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
  {
!     /* Strategy to handle blocking and non-blocking connections: Fill
!      * the output buffer and flush it repeatedly until either all data
!      * has been sent or is at least queued in the buffer.
!      *
!      * For non-blocking connections, grow the buffer if not all data
!      * fits into it and the buffer can't be sent because the socket
!      * would block.
       */

!     while (nbytes)
!     {
!         size_t avail, remaining;
!
!         /* fill the output buffer */
          avail = Max(conn->outBufSize - conn->outCount, 0);
!         remaining = Min(avail, nbytes);
!         memcpy(conn->outBuffer + conn->outCount, s, remaining);
!         conn->outCount += remaining;
!         s += remaining;
!         nbytes -= remaining;
!
!         /* if the data didn't fit completely into the buffer, try to
!          * flush the buffer */
!         if (nbytes)
!         {
!             int send_result = pqSendSome(conn);

!             /* if there were errors, report them */
!             if (send_result < 0)
!                 return EOF;

!             /* if not all data could be sent, increase the output
!              * buffer, put the rest of s into it and return
!              * successfully. This case will only happen in a
!              * non-blocking connection
!              */
!             if (send_result > 0)
!             {
!                 /* try to grow the buffer.
!                  * FIXME: The new size could be chosen more
!                  * intelligently.
!                  */
!                 size_t buflen = conn->outCount + nbytes;
!                 if (buflen > conn->outBufSize)
!                 {
!                     char * newbuf = realloc(conn->outBuffer, buflen);
!                     if (!newbuf)
!                     {
!                         /* realloc failed. Probably out of memory */
!                         printfPQExpBuffer(&conn->errorMessage,
!                                 "cannot allocate memory for output buffer\n");
!                         return EOF;
!                     }
!                     conn->outBuffer = newbuf;
!                     conn->outBufSize = buflen;
!                 }
!                 /* put the data into it */
!                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
!                 conn->outCount += nbytes;
!
!                 /* report success. */
!                 return 0;
!             }
!         }
!
!         /* pqSendSome was able to send all data. Continue with the next
!          * chunk of s. */
!     } /* while */

      return 0;
  }
***************
*** 575,584 ****
  }

  /* --------------------------------------------------------------------- */
! /* pqFlush: send any data waiting in the output buffer
   */
  int
! pqFlush(PGconn *conn)
  {
      char       *ptr = conn->outBuffer;
      int            len = conn->outCount;
--- 594,615 ----
  }

  /* --------------------------------------------------------------------- */
! /* pqSendSome: send any data waiting in the output buffer and return 0
!  * if all data was sent, -1 if an error occurred or 1 if not all data
!  * could be written because the socket would have blocked.
!  *
!  * For a blocking connection all data will be sent unless an error
!  * occurrs. -1 will only be returned if the connection is non-blocking.
!  *
!  * Internally, the case of data remaining in the buffer after pqSendSome
!  * could be determined by looking at outCount, but this function also
!  * serves as the implementation of the new API function PQsendsome.
!  *
!  * FIXME: perhaps it would be more useful to return the number of bytes
!  * remaining?
   */
  int
! pqSendSome(PGconn *conn)
  {
      char       *ptr = conn->outBuffer;
      int            len = conn->outCount;
***************
*** 586,593 ****
      if (conn->sock < 0)
      {
          printfPQExpBuffer(&conn->errorMessage,
!                           "pqFlush() -- connection not open\n");
!         return EOF;
      }

      /*
--- 617,624 ----
      if (conn->sock < 0)
      {
          printfPQExpBuffer(&conn->errorMessage,
!                           "pqSendSome() -- connection not open\n");
!         return -1;
      }

      /*
***************
*** 645,651 ****
                  case ECONNRESET:
  #endif
                      printfPQExpBuffer(&conn->errorMessage,
!                                       "pqFlush() -- backend closed the channel unexpectedly.\n"
                                        "\tThis probably means the backend terminated abnormally"
                             " before or while processing the request.\n");

--- 676,682 ----
                  case ECONNRESET:
  #endif
                      printfPQExpBuffer(&conn->errorMessage,
!                                       "pqSendSome() -- backend closed the channel unexpectedly.\n"
                                        "\tThis probably means the backend terminated abnormally"
                             " before or while processing the request.\n");

***************
*** 657,670 ****
                       * the socket open until pqReadData finds no more data
                       * can be read.
                       */
!                     return EOF;

                  default:
                      printfPQExpBuffer(&conn->errorMessage,
!                       "pqFlush() --  couldn't send data: errno=%d\n%s\n",
                                        errno, strerror(errno));
                      /* We don't assume it's a fatal error... */
!                     return EOF;
              }
          }
          else
--- 688,701 ----
                       * the socket open until pqReadData finds no more data
                       * can be read.
                       */
!                     return -1;

                  default:
                      printfPQExpBuffer(&conn->errorMessage,
!                       "pqSendSome() --  couldn't send data: errno=%d\n%s\n",
                                        errno, strerror(errno));
                      /* We don't assume it's a fatal error... */
!                     return -1;
              }
          }
          else
***************
*** 679,685 ****

              /*
               * if the socket is in non-blocking mode we may need to abort
!              * here
               */
  #ifdef USE_SSL
              /* can't do anything for our SSL users yet */
--- 710,716 ----

              /*
               * if the socket is in non-blocking mode we may need to abort
!              * here and return 1 to indicate that data is still pending.
               */
  #ifdef USE_SSL
              /* can't do anything for our SSL users yet */
***************
*** 691,704 ****
                      /* shift the contents of the buffer */
                      memmove(conn->outBuffer, ptr, len);
                      conn->outCount = len;
!                     return EOF;
                  }
  #ifdef USE_SSL
              }
  #endif

              if (pqWait(FALSE, TRUE, conn))
!                 return EOF;
          }
      }

--- 722,735 ----
                      /* shift the contents of the buffer */
                      memmove(conn->outBuffer, ptr, len);
                      conn->outCount = len;
!                     return 1;
                  }
  #ifdef USE_SSL
              }
  #endif

              if (pqWait(FALSE, TRUE, conn))
!                 return -1;
          }
      }

***************
*** 707,712 ****
--- 738,762 ----
      if (conn->Pfdebug)
          fflush(conn->Pfdebug);

+     return 0;
+ }
+
+
+ /* --------------------------------------------------------------------- */
+ /* pqFlush: send any data waiting in the output buffer
+  *
+  * Implemented in terms of pqSendSome to recreate the old behavior which
+  * returned 0 if all data was sent or EOF. EOF was sent regardless of
+  * whether an error occurred or not all data was sent on a non-blocking
+  * socket.
+  */
+ int
+ pqFlush(PGconn *conn)
+ {
+     if (pqSendSome(conn))
+     {
+         return EOF;
+     }
      return 0;
  }

diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h
*** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h    Thu Mar 22 05:01:27 2001
--- postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h    Tue Jan 22 12:13:17 2002
***************
*** 265,270 ****
--- 265,273 ----

      /* Force the write buffer to be written (or at least try) */
      extern int    PQflush(PGconn *conn);
+     /* Force the write buffer to be written (or at least try)
+            (better than PQflush) */
+     extern int    PQsendSome(PGconn *conn);

      /*
       * "Fast path" interface --- not really recommended for application
diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h
postgresql-7.1.3/src/interfaces/libpq/libpq-int.h
*** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h    Thu Mar 22 05:01:27 2001
--- postgresql-7.1.3/src/interfaces/libpq/libpq-int.h    Tue Jan 22 12:13:17 2002
***************
*** 321,326 ****
--- 321,327 ----
  extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
  extern int    pqReadData(PGconn *conn);
  extern int    pqFlush(PGconn *conn);
+ extern int    pqSendSome(PGconn *conn);
  extern int    pqWait(int forRead, int forWrite, PGconn *conn);
  extern int    pqReadReady(PGconn *conn);
  extern int    pqWriteReady(PGconn *conn);



--
Intevation GmbH                                 http://intevation.de/
Sketch                                 http://sketch.sourceforge.net/
MapIt!                                               http://mapit.de/



Re: Fix for non-blocking connections in libpq

From
Bruce Momjian
Date:
This has been saved for the 7.3 release:

    http://candle.pha.pa.us/cgi-bin/pgpatches2

---------------------------------------------------------------------------

Bernhard Herzog wrote:
>
> Here's a patch against 7.1.3 that fixes a problem with sending larger
> queries over non-blocking connections with libpq. "Larger" here
> basically means that it doesn't fit into the output buffer.
>
> The basic strategy is to fix pqFlush and pqPutBytes.
>
> The problem with pqFlush as it stands now is that it returns EOF when an
> error occurs or when not all data could be sent. The latter case is
> clearly not an error for a non-blocking connection but the caller can't
> distringuish it from an error very well.
>
> The first part of the fix is therefore to fix pqFlush. This is done by
> to renaming it to pqSendSome which only differs from pqFlush in its
> return values to allow the caller to make the above distinction and a
> new pqFlush which is implemented in terms of pqSendSome and behaves
> exactly like the old pqFlush.
>
> The second part of the fix modifies pqPutBytes to use pqSendSome instead
> of pqFlush and to either send all the data or if not all data can be
> sent on a non-blocking connection to at least put all data into the
> output buffer, enlarging it if necessary. The callers of pqPutBytes
> don't have to be changed because from their point of view pqPutBytes
> behaves like before. It either succeeds in queueing all output data or
> fails with an error.
>
> I've also added a new API function PQsendSome which analogously to
> PQflush just calls pqSendSome. Programs using non-blocking queries
> should use this new function. The main difference is that this function
> will have to be called repeatedly (calling select() properly in between)
> until all data has been written.
>
> AFAICT, the code in CVS HEAD hasn't changed with respect to non-blocking
> queries and this fix should work there, too, but I haven't tested that
> yet.
>
>
>    Bernhard
>
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c postgresql-7.1.3/src/interfaces/libpq/fe-exec.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c    Sat Feb 10 03:31:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-exec.c    Tue Jan 22 12:12:58 2002
> ***************
> *** 2214,2216 ****
> --- 2214,2224 ----
>
>       return (pqFlush(conn));
>   }
> +
> + /* try to force data out, really only useful for non-blocking users.
> +  * This implementation actually works for non-blocking connections */
> + int
> + PQsendSome(PGconn *conn)
> + {
> +     return pqSendSome(conn);
> + }
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c postgresql-7.1.3/src/interfaces/libpq/fe-misc.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c    Sun Apr  1 01:13:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-misc.c    Tue Jan 22 12:07:25 2002
> ***************
> *** 90,144 ****
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
> !
> !     /*
> !      * if we are non-blocking and the send queue is too full to buffer
> !      * this request then try to flush some and return an error
>        */
> -     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
> -     {
>
> !         /*
> !          * even if the flush failed we may still have written some data,
> !          * recalculate the size of the send-queue relative to the amount
> !          * we have to send, we may be able to queue it afterall even
> !          * though it's not sent to the database it's ok, any routines that
> !          * check the data coming from the database better call pqFlush()
> !          * anyway.
> !          */
> !         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> !         {
> !             printfPQExpBuffer(&conn->errorMessage,
> !                            "pqPutBytes --  pqFlush couldn't flush enough"
> !                          " data: space available: %d, space needed %d\n",
> !                       Max(conn->outBufSize - conn->outCount, 0), nbytes);
> !             return EOF;
> !         }
> !         /* fixup avail for while loop */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !     }
>
> !     /*
> !      * is the amount of data to be sent is larger than the size of the
> !      * output buffer then we must flush it to make more room.
> !      *
> !      * the code above will make sure the loop conditional is never true for
> !      * non-blocking connections
> !      */
> !     while (nbytes > avail)
> !     {
> !         memcpy(conn->outBuffer + conn->outCount, s, avail);
> !         conn->outCount += avail;
> !         s += avail;
> !         nbytes -= avail;
> !         if (pqFlush(conn))
> !             return EOF;
> !         avail = conn->outBufSize;
> !     }
>
> !     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !     conn->outCount += nbytes;
>
>       return 0;
>   }
> --- 90,163 ----
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     /* Strategy to handle blocking and non-blocking connections: Fill
> !      * the output buffer and flush it repeatedly until either all data
> !      * has been sent or is at least queued in the buffer.
> !      *
> !      * For non-blocking connections, grow the buffer if not all data
> !      * fits into it and the buffer can't be sent because the socket
> !      * would block.
>        */
>
> !     while (nbytes)
> !     {
> !         size_t avail, remaining;
> !
> !         /* fill the output buffer */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !         remaining = Min(avail, nbytes);
> !         memcpy(conn->outBuffer + conn->outCount, s, remaining);
> !         conn->outCount += remaining;
> !         s += remaining;
> !         nbytes -= remaining;
> !
> !         /* if the data didn't fit completely into the buffer, try to
> !          * flush the buffer */
> !         if (nbytes)
> !         {
> !             int send_result = pqSendSome(conn);
>
> !             /* if there were errors, report them */
> !             if (send_result < 0)
> !                 return EOF;
>
> !             /* if not all data could be sent, increase the output
> !              * buffer, put the rest of s into it and return
> !              * successfully. This case will only happen in a
> !              * non-blocking connection
> !              */
> !             if (send_result > 0)
> !             {
> !                 /* try to grow the buffer.
> !                  * FIXME: The new size could be chosen more
> !                  * intelligently.
> !                  */
> !                 size_t buflen = conn->outCount + nbytes;
> !                 if (buflen > conn->outBufSize)
> !                 {
> !                     char * newbuf = realloc(conn->outBuffer, buflen);
> !                     if (!newbuf)
> !                     {
> !                         /* realloc failed. Probably out of memory */
> !                         printfPQExpBuffer(&conn->errorMessage,
> !                                 "cannot allocate memory for output buffer\n");
> !                         return EOF;
> !                     }
> !                     conn->outBuffer = newbuf;
> !                     conn->outBufSize = buflen;
> !                 }
> !                 /* put the data into it */
> !                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !                 conn->outCount += nbytes;
> !
> !                 /* report success. */
> !                 return 0;
> !             }
> !         }
> !
> !         /* pqSendSome was able to send all data. Continue with the next
> !          * chunk of s. */
> !     } /* while */
>
>       return 0;
>   }
> ***************
> *** 575,584 ****
>   }
>
>   /* --------------------------------------------------------------------- */
> ! /* pqFlush: send any data waiting in the output buffer
>    */
>   int
> ! pqFlush(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> --- 594,615 ----
>   }
>
>   /* --------------------------------------------------------------------- */
> ! /* pqSendSome: send any data waiting in the output buffer and return 0
> !  * if all data was sent, -1 if an error occurred or 1 if not all data
> !  * could be written because the socket would have blocked.
> !  *
> !  * For a blocking connection all data will be sent unless an error
> !  * occurrs. -1 will only be returned if the connection is non-blocking.
> !  *
> !  * Internally, the case of data remaining in the buffer after pqSendSome
> !  * could be determined by looking at outCount, but this function also
> !  * serves as the implementation of the new API function PQsendsome.
> !  *
> !  * FIXME: perhaps it would be more useful to return the number of bytes
> !  * remaining?
>    */
>   int
> ! pqSendSome(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> ***************
> *** 586,593 ****
>       if (conn->sock < 0)
>       {
>           printfPQExpBuffer(&conn->errorMessage,
> !                           "pqFlush() -- connection not open\n");
> !         return EOF;
>       }
>
>       /*
> --- 617,624 ----
>       if (conn->sock < 0)
>       {
>           printfPQExpBuffer(&conn->errorMessage,
> !                           "pqSendSome() -- connection not open\n");
> !         return -1;
>       }
>
>       /*
> ***************
> *** 645,651 ****
>                   case ECONNRESET:
>   #endif
>                       printfPQExpBuffer(&conn->errorMessage,
> !                                       "pqFlush() -- backend closed the channel unexpectedly.\n"
>                                         "\tThis probably means the backend terminated abnormally"
>                              " before or while processing the request.\n");
>
> --- 676,682 ----
>                   case ECONNRESET:
>   #endif
>                       printfPQExpBuffer(&conn->errorMessage,
> !                                       "pqSendSome() -- backend closed the channel unexpectedly.\n"
>                                         "\tThis probably means the backend terminated abnormally"
>                              " before or while processing the request.\n");
>
> ***************
> *** 657,670 ****
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return EOF;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
> !                       "pqFlush() --  couldn't send data: errno=%d\n%s\n",
>                                         errno, strerror(errno));
>                       /* We don't assume it's a fatal error... */
> !                     return EOF;
>               }
>           }
>           else
> --- 688,701 ----
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return -1;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
> !                       "pqSendSome() --  couldn't send data: errno=%d\n%s\n",
>                                         errno, strerror(errno));
>                       /* We don't assume it's a fatal error... */
> !                     return -1;
>               }
>           }
>           else
> ***************
> *** 679,685 ****
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> --- 710,716 ----
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here and return 1 to indicate that data is still pending.
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> ***************
> *** 691,704 ****
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return EOF;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return EOF;
>           }
>       }
>
> --- 722,735 ----
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return 1;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return -1;
>           }
>       }
>
> ***************
> *** 707,712 ****
> --- 738,762 ----
>       if (conn->Pfdebug)
>           fflush(conn->Pfdebug);
>
> +     return 0;
> + }
> +
> +
> + /* --------------------------------------------------------------------- */
> + /* pqFlush: send any data waiting in the output buffer
> +  *
> +  * Implemented in terms of pqSendSome to recreate the old behavior which
> +  * returned 0 if all data was sent or EOF. EOF was sent regardless of
> +  * whether an error occurred or not all data was sent on a non-blocking
> +  * socket.
> +  */
> + int
> + pqFlush(PGconn *conn)
> + {
> +     if (pqSendSome(conn))
> +     {
> +         return EOF;
> +     }
>       return 0;
>   }
>
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h
postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h    Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h    Tue Jan 22 12:13:17 2002
> ***************
> *** 265,270 ****
> --- 265,273 ----
>
>       /* Force the write buffer to be written (or at least try) */
>       extern int    PQflush(PGconn *conn);
> +     /* Force the write buffer to be written (or at least try)
> +            (better than PQflush) */
> +     extern int    PQsendSome(PGconn *conn);
>
>       /*
>        * "Fast path" interface --- not really recommended for application
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h
postgresql-7.1.3/src/interfaces/libpq/libpq-int.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h    Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-int.h    Tue Jan 22 12:13:17 2002
> ***************
> *** 321,326 ****
> --- 321,327 ----
>   extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
>   extern int    pqReadData(PGconn *conn);
>   extern int    pqFlush(PGconn *conn);
> + extern int    pqSendSome(PGconn *conn);
>   extern int    pqWait(int forRead, int forWrite, PGconn *conn);
>   extern int    pqReadReady(PGconn *conn);
>   extern int    pqWriteReady(PGconn *conn);
>
>
>
> --
> Intevation GmbH                                 http://intevation.de/
> Sketch                                 http://sketch.sourceforge.net/
> MapIt!                                               http://mapit.de/
>
>
>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html
>

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

Re: Fix for non-blocking connections in libpq

From
Bruce Momjian
Date:
Your patch has been added to the PostgreSQL unapplied patches list at:

    http://candle.pha.pa.us/cgi-bin/pgpatches

I will try to apply it within the next 48 hours.

---------------------------------------------------------------------------


Bernhard Herzog wrote:
>
> Here's a patch against 7.1.3 that fixes a problem with sending larger
> queries over non-blocking connections with libpq. "Larger" here
> basically means that it doesn't fit into the output buffer.
>
> The basic strategy is to fix pqFlush and pqPutBytes.
>
> The problem with pqFlush as it stands now is that it returns EOF when an
> error occurs or when not all data could be sent. The latter case is
> clearly not an error for a non-blocking connection but the caller can't
> distringuish it from an error very well.
>
> The first part of the fix is therefore to fix pqFlush. This is done by
> to renaming it to pqSendSome which only differs from pqFlush in its
> return values to allow the caller to make the above distinction and a
> new pqFlush which is implemented in terms of pqSendSome and behaves
> exactly like the old pqFlush.
>
> The second part of the fix modifies pqPutBytes to use pqSendSome instead
> of pqFlush and to either send all the data or if not all data can be
> sent on a non-blocking connection to at least put all data into the
> output buffer, enlarging it if necessary. The callers of pqPutBytes
> don't have to be changed because from their point of view pqPutBytes
> behaves like before. It either succeeds in queueing all output data or
> fails with an error.
>
> I've also added a new API function PQsendSome which analogously to
> PQflush just calls pqSendSome. Programs using non-blocking queries
> should use this new function. The main difference is that this function
> will have to be called repeatedly (calling select() properly in between)
> until all data has been written.
>
> AFAICT, the code in CVS HEAD hasn't changed with respect to non-blocking
> queries and this fix should work there, too, but I haven't tested that
> yet.

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

Re: Fix for non-blocking connections in libpq

From
Bruce Momjian
Date:
Bernard, just checking.  Is this the most recent version of your patch?


---------------------------------------------------------------------------

Bernhard Herzog wrote:
>
> Here's a patch against 7.1.3 that fixes a problem with sending larger
> queries over non-blocking connections with libpq. "Larger" here
> basically means that it doesn't fit into the output buffer.
>
> The basic strategy is to fix pqFlush and pqPutBytes.
>
> The problem with pqFlush as it stands now is that it returns EOF when an
> error occurs or when not all data could be sent. The latter case is
> clearly not an error for a non-blocking connection but the caller can't
> distringuish it from an error very well.
>
> The first part of the fix is therefore to fix pqFlush. This is done by
> to renaming it to pqSendSome which only differs from pqFlush in its
> return values to allow the caller to make the above distinction and a
> new pqFlush which is implemented in terms of pqSendSome and behaves
> exactly like the old pqFlush.
>
> The second part of the fix modifies pqPutBytes to use pqSendSome instead
> of pqFlush and to either send all the data or if not all data can be
> sent on a non-blocking connection to at least put all data into the
> output buffer, enlarging it if necessary. The callers of pqPutBytes
> don't have to be changed because from their point of view pqPutBytes
> behaves like before. It either succeeds in queueing all output data or
> fails with an error.
>
> I've also added a new API function PQsendSome which analogously to
> PQflush just calls pqSendSome. Programs using non-blocking queries
> should use this new function. The main difference is that this function
> will have to be called repeatedly (calling select() properly in between)
> until all data has been written.
>
> AFAICT, the code in CVS HEAD hasn't changed with respect to non-blocking
> queries and this fix should work there, too, but I haven't tested that
> yet.
>
>
>    Bernhard
>
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c postgresql-7.1.3/src/interfaces/libpq/fe-exec.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c    Sat Feb 10 03:31:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-exec.c    Tue Jan 22 12:12:58 2002
> ***************
> *** 2214,2216 ****
> --- 2214,2224 ----
>
>       return (pqFlush(conn));
>   }
> +
> + /* try to force data out, really only useful for non-blocking users.
> +  * This implementation actually works for non-blocking connections */
> + int
> + PQsendSome(PGconn *conn)
> + {
> +     return pqSendSome(conn);
> + }
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c postgresql-7.1.3/src/interfaces/libpq/fe-misc.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c    Sun Apr  1 01:13:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-misc.c    Tue Jan 22 12:07:25 2002
> ***************
> *** 90,144 ****
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
> !
> !     /*
> !      * if we are non-blocking and the send queue is too full to buffer
> !      * this request then try to flush some and return an error
>        */
> -     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
> -     {
>
> !         /*
> !          * even if the flush failed we may still have written some data,
> !          * recalculate the size of the send-queue relative to the amount
> !          * we have to send, we may be able to queue it afterall even
> !          * though it's not sent to the database it's ok, any routines that
> !          * check the data coming from the database better call pqFlush()
> !          * anyway.
> !          */
> !         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> !         {
> !             printfPQExpBuffer(&conn->errorMessage,
> !                            "pqPutBytes --  pqFlush couldn't flush enough"
> !                          " data: space available: %d, space needed %d\n",
> !                       Max(conn->outBufSize - conn->outCount, 0), nbytes);
> !             return EOF;
> !         }
> !         /* fixup avail for while loop */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !     }
>
> !     /*
> !      * is the amount of data to be sent is larger than the size of the
> !      * output buffer then we must flush it to make more room.
> !      *
> !      * the code above will make sure the loop conditional is never true for
> !      * non-blocking connections
> !      */
> !     while (nbytes > avail)
> !     {
> !         memcpy(conn->outBuffer + conn->outCount, s, avail);
> !         conn->outCount += avail;
> !         s += avail;
> !         nbytes -= avail;
> !         if (pqFlush(conn))
> !             return EOF;
> !         avail = conn->outBufSize;
> !     }
>
> !     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !     conn->outCount += nbytes;
>
>       return 0;
>   }
> --- 90,163 ----
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     /* Strategy to handle blocking and non-blocking connections: Fill
> !      * the output buffer and flush it repeatedly until either all data
> !      * has been sent or is at least queued in the buffer.
> !      *
> !      * For non-blocking connections, grow the buffer if not all data
> !      * fits into it and the buffer can't be sent because the socket
> !      * would block.
>        */
>
> !     while (nbytes)
> !     {
> !         size_t avail, remaining;
> !
> !         /* fill the output buffer */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !         remaining = Min(avail, nbytes);
> !         memcpy(conn->outBuffer + conn->outCount, s, remaining);
> !         conn->outCount += remaining;
> !         s += remaining;
> !         nbytes -= remaining;
> !
> !         /* if the data didn't fit completely into the buffer, try to
> !          * flush the buffer */
> !         if (nbytes)
> !         {
> !             int send_result = pqSendSome(conn);
>
> !             /* if there were errors, report them */
> !             if (send_result < 0)
> !                 return EOF;
>
> !             /* if not all data could be sent, increase the output
> !              * buffer, put the rest of s into it and return
> !              * successfully. This case will only happen in a
> !              * non-blocking connection
> !              */
> !             if (send_result > 0)
> !             {
> !                 /* try to grow the buffer.
> !                  * FIXME: The new size could be chosen more
> !                  * intelligently.
> !                  */
> !                 size_t buflen = conn->outCount + nbytes;
> !                 if (buflen > conn->outBufSize)
> !                 {
> !                     char * newbuf = realloc(conn->outBuffer, buflen);
> !                     if (!newbuf)
> !                     {
> !                         /* realloc failed. Probably out of memory */
> !                         printfPQExpBuffer(&conn->errorMessage,
> !                                 "cannot allocate memory for output buffer\n");
> !                         return EOF;
> !                     }
> !                     conn->outBuffer = newbuf;
> !                     conn->outBufSize = buflen;
> !                 }
> !                 /* put the data into it */
> !                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !                 conn->outCount += nbytes;
> !
> !                 /* report success. */
> !                 return 0;
> !             }
> !         }
> !
> !         /* pqSendSome was able to send all data. Continue with the next
> !          * chunk of s. */
> !     } /* while */
>
>       return 0;
>   }
> ***************
> *** 575,584 ****
>   }
>
>   /* --------------------------------------------------------------------- */
> ! /* pqFlush: send any data waiting in the output buffer
>    */
>   int
> ! pqFlush(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> --- 594,615 ----
>   }
>
>   /* --------------------------------------------------------------------- */
> ! /* pqSendSome: send any data waiting in the output buffer and return 0
> !  * if all data was sent, -1 if an error occurred or 1 if not all data
> !  * could be written because the socket would have blocked.
> !  *
> !  * For a blocking connection all data will be sent unless an error
> !  * occurrs. -1 will only be returned if the connection is non-blocking.
> !  *
> !  * Internally, the case of data remaining in the buffer after pqSendSome
> !  * could be determined by looking at outCount, but this function also
> !  * serves as the implementation of the new API function PQsendsome.
> !  *
> !  * FIXME: perhaps it would be more useful to return the number of bytes
> !  * remaining?
>    */
>   int
> ! pqSendSome(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> ***************
> *** 586,593 ****
>       if (conn->sock < 0)
>       {
>           printfPQExpBuffer(&conn->errorMessage,
> !                           "pqFlush() -- connection not open\n");
> !         return EOF;
>       }
>
>       /*
> --- 617,624 ----
>       if (conn->sock < 0)
>       {
>           printfPQExpBuffer(&conn->errorMessage,
> !                           "pqSendSome() -- connection not open\n");
> !         return -1;
>       }
>
>       /*
> ***************
> *** 645,651 ****
>                   case ECONNRESET:
>   #endif
>                       printfPQExpBuffer(&conn->errorMessage,
> !                                       "pqFlush() -- backend closed the channel unexpectedly.\n"
>                                         "\tThis probably means the backend terminated abnormally"
>                              " before or while processing the request.\n");
>
> --- 676,682 ----
>                   case ECONNRESET:
>   #endif
>                       printfPQExpBuffer(&conn->errorMessage,
> !                                       "pqSendSome() -- backend closed the channel unexpectedly.\n"
>                                         "\tThis probably means the backend terminated abnormally"
>                              " before or while processing the request.\n");
>
> ***************
> *** 657,670 ****
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return EOF;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
> !                       "pqFlush() --  couldn't send data: errno=%d\n%s\n",
>                                         errno, strerror(errno));
>                       /* We don't assume it's a fatal error... */
> !                     return EOF;
>               }
>           }
>           else
> --- 688,701 ----
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return -1;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
> !                       "pqSendSome() --  couldn't send data: errno=%d\n%s\n",
>                                         errno, strerror(errno));
>                       /* We don't assume it's a fatal error... */
> !                     return -1;
>               }
>           }
>           else
> ***************
> *** 679,685 ****
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> --- 710,716 ----
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here and return 1 to indicate that data is still pending.
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> ***************
> *** 691,704 ****
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return EOF;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return EOF;
>           }
>       }
>
> --- 722,735 ----
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return 1;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return -1;
>           }
>       }
>
> ***************
> *** 707,712 ****
> --- 738,762 ----
>       if (conn->Pfdebug)
>           fflush(conn->Pfdebug);
>
> +     return 0;
> + }
> +
> +
> + /* --------------------------------------------------------------------- */
> + /* pqFlush: send any data waiting in the output buffer
> +  *
> +  * Implemented in terms of pqSendSome to recreate the old behavior which
> +  * returned 0 if all data was sent or EOF. EOF was sent regardless of
> +  * whether an error occurred or not all data was sent on a non-blocking
> +  * socket.
> +  */
> + int
> + pqFlush(PGconn *conn)
> + {
> +     if (pqSendSome(conn))
> +     {
> +         return EOF;
> +     }
>       return 0;
>   }
>
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h
postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h    Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h    Tue Jan 22 12:13:17 2002
> ***************
> *** 265,270 ****
> --- 265,273 ----
>
>       /* Force the write buffer to be written (or at least try) */
>       extern int    PQflush(PGconn *conn);
> +     /* Force the write buffer to be written (or at least try)
> +            (better than PQflush) */
> +     extern int    PQsendSome(PGconn *conn);
>
>       /*
>        * "Fast path" interface --- not really recommended for application
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h
postgresql-7.1.3/src/interfaces/libpq/libpq-int.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h    Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-int.h    Tue Jan 22 12:13:17 2002
> ***************
> *** 321,326 ****
> --- 321,327 ----
>   extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
>   extern int    pqReadData(PGconn *conn);
>   extern int    pqFlush(PGconn *conn);
> + extern int    pqSendSome(PGconn *conn);
>   extern int    pqWait(int forRead, int forWrite, PGconn *conn);
>   extern int    pqReadReady(PGconn *conn);
>   extern int    pqWriteReady(PGconn *conn);
>
>
>
> --
> Intevation GmbH                                 http://intevation.de/
> Sketch                                 http://sketch.sourceforge.net/
> MapIt!                                               http://mapit.de/
>
>
>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html
>

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

Re: Fix for non-blocking connections in libpq

From
Bernhard Herzog
Date:
Bruce Momjian <pgman@candle.pha.pa.us> writes:

> Bernard, just checking.  Is this the most recent version of your patch?

In principle, yes. However, I've ported it to the CVS version in the
meantime. Here's a patch against current CVS HEAD:


Index: src/interfaces/libpq/fe-exec.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v
retrieving revision 1.113
diff -c -r1.113 fe-exec.c
*** src/interfaces/libpq/fe-exec.c    2001/10/25 05:50:13    1.113
--- src/interfaces/libpq/fe-exec.c    2002/02/25 10:21:06
***************
*** 2340,2342 ****
--- 2340,2350 ----

      return (pqFlush(conn));
  }
+
+ /* try to force data out, really only useful for non-blocking users.
+  * This implementation actually works for non-blocking connections */
+ int
+ PQsendSome(PGconn *conn)
+ {
+     return pqSendSome(conn);
+ }
Index: src/interfaces/libpq/fe-misc.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v
retrieving revision 1.65
diff -c -r1.65 fe-misc.c
*** src/interfaces/libpq/fe-misc.c    2001/12/03 00:28:24    1.65
--- src/interfaces/libpq/fe-misc.c    2002/02/25 10:21:06
***************
*** 110,164 ****
  static int
  pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
  {
!     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
!
!     /*
!      * if we are non-blocking and the send queue is too full to buffer
!      * this request then try to flush some and return an error
       */
!     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
      {
!         /*
!          * even if the flush failed we may still have written some data,
!          * recalculate the size of the send-queue relative to the amount
!          * we have to send, we may be able to queue it afterall even
!          * though it's not sent to the database it's ok, any routines that
!          * check the data coming from the database better call pqFlush()
!          * anyway.
!          */
!         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
!         {
!             printfPQExpBuffer(&conn->errorMessage,
!                               libpq_gettext("could not flush enough data (space available: %d, space needed %d)\n"),
!                          (int) Max(conn->outBufSize - conn->outCount, 0),
!                               (int) nbytes);
!             return EOF;
!         }
!         /* fixup avail for while loop */
          avail = Max(conn->outBufSize - conn->outCount, 0);
!     }

!     /*
!      * is the amount of data to be sent is larger than the size of the
!      * output buffer then we must flush it to make more room.
!      *
!      * the code above will make sure the loop conditional is never true for
!      * non-blocking connections
!      */
!     while (nbytes > avail)
!     {
!         memcpy(conn->outBuffer + conn->outCount, s, avail);
!         conn->outCount += avail;
!         s += avail;
!         nbytes -= avail;
!         if (pqFlush(conn))
!             return EOF;
!         avail = conn->outBufSize;
!     }

!     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
!     conn->outCount += nbytes;

      return 0;
  }

--- 110,184 ----
  static int
  pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
  {
!     /* Strategy to handle blocking and non-blocking connections: Fill
!      * the output buffer and flush it repeatedly until either all data
!      * has been sent or is at least queued in the buffer.
!      *
!      * For non-blocking connections, grow the buffer if not all data
!      * fits into it and the buffer can't be sent because the socket
!      * would block.
       */
!
!     while (nbytes)
      {
!         size_t avail, remaining;
!
!         /* fill the output buffer */
          avail = Max(conn->outBufSize - conn->outCount, 0);
!         remaining = Min(avail, nbytes);
!         memcpy(conn->outBuffer + conn->outCount, s, remaining);
!         conn->outCount += remaining;
!         s += remaining;
!         nbytes -= remaining;
!
!         /* if the data didn't fit completely into the buffer, try to
!          * flush the buffer */
!         if (nbytes)
!         {
!             int send_result = pqSendSome(conn);

!             /* if there were errors, report them */
!             if (send_result < 0)
!                 return EOF;

!             /* if not all data could be sent, increase the output
!              * buffer, put the rest of s into it and return
!              * successfully. This case will only happen in a
!              * non-blocking connection
!              */
!             if (send_result > 0)
!             {
!                 /* try to grow the buffer.
!                  * FIXME: The new size could be chosen more
!                  * intelligently.
!                  */
!                 size_t buflen = conn->outCount + nbytes;
!                 if (buflen > conn->outBufSize)
!                 {
!                     char * newbuf = realloc(conn->outBuffer, buflen);
!                     if (!newbuf)
!                     {
!                         /* realloc failed. Probably out of memory */
!                         printfPQExpBuffer(&conn->errorMessage,
!                                 "cannot allocate memory for output buffer\n");
!                         return EOF;
!                     }
!                     conn->outBuffer = newbuf;
!                     conn->outBufSize = buflen;
!                 }
!                 /* put the data into it */
!                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
!                 conn->outCount += nbytes;

+                 /* report success. */
+                 return 0;
+             }
+         }
+
+         /* pqSendSome was able to send all data. Continue with the next
+          * chunk of s. */
+     } /* while */
+
      return 0;
  }

***************
*** 604,613 ****
  }

  /*
!  * pqFlush: send any data waiting in the output buffer
   */
  int
! pqFlush(PGconn *conn)
  {
      char       *ptr = conn->outBuffer;
      int            len = conn->outCount;
--- 624,636 ----
  }

  /*
!  * pqSendSome: send any data waiting in the output buffer.
!  *
!  * Return 0 on sucess, -1 on failure and 1 when data remains because the
!  * socket would block and the connection is non-blocking.
   */
  int
! pqSendSome(PGconn *conn)
  {
      char       *ptr = conn->outBuffer;
      int            len = conn->outCount;
***************
*** 616,622 ****
      {
          printfPQExpBuffer(&conn->errorMessage,
                            libpq_gettext("connection not open\n"));
!         return EOF;
      }

      /*
--- 639,645 ----
      {
          printfPQExpBuffer(&conn->errorMessage,
                            libpq_gettext("connection not open\n"));
!         return -1;
      }

      /*
***************
*** 674,680 ****
                      printfPQExpBuffer(&conn->errorMessage,
                                        libpq_gettext(
                              "server closed the connection unexpectedly\n"
!                                                     "\tThis probably means the server terminated abnormally\n"
                           "\tbefore or while processing the request.\n"));

                      /*
--- 697,703 ----
                      printfPQExpBuffer(&conn->errorMessage,
                                        libpq_gettext(
                              "server closed the connection unexpectedly\n"
!                    "\tThis probably means the server terminated abnormally\n"
                           "\tbefore or while processing the request.\n"));

                      /*
***************
*** 685,698 ****
                       * the socket open until pqReadData finds no more data
                       * can be read.
                       */
!                     return EOF;

                  default:
                      printfPQExpBuffer(&conn->errorMessage,
                      libpq_gettext("could not send data to server: %s\n"),
                                        SOCK_STRERROR(SOCK_ERRNO));
                      /* We don't assume it's a fatal error... */
!                     return EOF;
              }
          }
          else
--- 708,721 ----
                       * the socket open until pqReadData finds no more data
                       * can be read.
                       */
!                     return -1;

                  default:
                      printfPQExpBuffer(&conn->errorMessage,
                      libpq_gettext("could not send data to server: %s\n"),
                                        SOCK_STRERROR(SOCK_ERRNO));
                      /* We don't assume it's a fatal error... */
!                     return -1;
              }
          }
          else
***************
*** 707,713 ****

              /*
               * if the socket is in non-blocking mode we may need to abort
!              * here
               */
  #ifdef USE_SSL
              /* can't do anything for our SSL users yet */
--- 730,736 ----

              /*
               * if the socket is in non-blocking mode we may need to abort
!              * here and return 1 to indicate that data is still pending.
               */
  #ifdef USE_SSL
              /* can't do anything for our SSL users yet */
***************
*** 719,732 ****
                      /* shift the contents of the buffer */
                      memmove(conn->outBuffer, ptr, len);
                      conn->outCount = len;
!                     return EOF;
                  }
  #ifdef USE_SSL
              }
  #endif

              if (pqWait(FALSE, TRUE, conn))
!                 return EOF;
          }
      }

--- 742,755 ----
                      /* shift the contents of the buffer */
                      memmove(conn->outBuffer, ptr, len);
                      conn->outCount = len;
!                     return 1;
                  }
  #ifdef USE_SSL
              }
  #endif

              if (pqWait(FALSE, TRUE, conn))
!                 return -1;
          }
      }

***************
*** 735,740 ****
--- 758,783 ----
      if (conn->Pfdebug)
          fflush(conn->Pfdebug);

+     return 0;
+ }
+
+
+
+ /*
+  * pqFlush: send any data waiting in the output buffer
+  *
+  * Implemented in terms of pqSendSome to recreate the old behavior which
+  * returned 0 if all data was sent or EOF. EOF was sent regardless of
+  * whether an error occurred or not all data was sent on a non-blocking
+  * socket.
+  */
+ int
+ pqFlush(PGconn *conn)
+ {
+     if (pqSendSome(conn))
+     {
+         return EOF;
+     }
      return 0;
  }

Index: src/interfaces/libpq/libpq-fe.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-fe.h,v
retrieving revision 1.80
diff -c -r1.80 libpq-fe.h
*** src/interfaces/libpq/libpq-fe.h    2001/11/08 20:37:52    1.80
--- src/interfaces/libpq/libpq-fe.h    2002/02/25 10:21:06
***************
*** 279,284 ****
--- 279,285 ----

  /* Force the write buffer to be written (or at least try) */
  extern int    PQflush(PGconn *conn);
+ extern int    PQsendSome(PGconn *conn);

  /*
   * "Fast path" interface --- not really recommended for application
Index: src/interfaces/libpq/libpq-int.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-int.h,v
retrieving revision 1.44
diff -c -r1.44 libpq-int.h
*** src/interfaces/libpq/libpq-int.h    2001/11/05 17:46:38    1.44
--- src/interfaces/libpq/libpq-int.h    2002/02/25 10:21:06
***************
*** 323,328 ****
--- 323,329 ----
  extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
  extern int    pqReadData(PGconn *conn);
  extern int    pqFlush(PGconn *conn);
+ extern int    pqSendSome(PGconn *conn);
  extern int    pqWait(int forRead, int forWrite, PGconn *conn);
  extern int    pqReadReady(PGconn *conn);
  extern int    pqWriteReady(PGconn *conn);



   Bernhard

--
Intevation GmbH                                 http://intevation.de/
Sketch                                 http://sketch.sourceforge.net/
MapIt!                                               http://mapit.de/

Re: Fix for non-blocking connections in libpq

From
Bruce Momjian
Date:
Patch applied.  Thanks.  I got it merged into the current CVS.

---------------------------------------------------------------------------


Bernhard Herzog wrote:
>
> Here's a patch against 7.1.3 that fixes a problem with sending larger
> queries over non-blocking connections with libpq. "Larger" here
> basically means that it doesn't fit into the output buffer.
>
> The basic strategy is to fix pqFlush and pqPutBytes.
>
> The problem with pqFlush as it stands now is that it returns EOF when an
> error occurs or when not all data could be sent. The latter case is
> clearly not an error for a non-blocking connection but the caller can't
> distringuish it from an error very well.
>
> The first part of the fix is therefore to fix pqFlush. This is done by
> to renaming it to pqSendSome which only differs from pqFlush in its
> return values to allow the caller to make the above distinction and a
> new pqFlush which is implemented in terms of pqSendSome and behaves
> exactly like the old pqFlush.
>
> The second part of the fix modifies pqPutBytes to use pqSendSome instead
> of pqFlush and to either send all the data or if not all data can be
> sent on a non-blocking connection to at least put all data into the
> output buffer, enlarging it if necessary. The callers of pqPutBytes
> don't have to be changed because from their point of view pqPutBytes
> behaves like before. It either succeeds in queueing all output data or
> fails with an error.
>
> I've also added a new API function PQsendSome which analogously to
> PQflush just calls pqSendSome. Programs using non-blocking queries
> should use this new function. The main difference is that this function
> will have to be called repeatedly (calling select() properly in between)
> until all data has been written.
>
> AFAICT, the code in CVS HEAD hasn't changed with respect to non-blocking
> queries and this fix should work there, too, but I haven't tested that
> yet.
>
>
>    Bernhard
>
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c postgresql-7.1.3/src/interfaces/libpq/fe-exec.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c    Sat Feb 10 03:31:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-exec.c    Tue Jan 22 12:12:58 2002
> ***************
> *** 2214,2216 ****
> --- 2214,2224 ----
>
>       return (pqFlush(conn));
>   }
> +
> + /* try to force data out, really only useful for non-blocking users.
> +  * This implementation actually works for non-blocking connections */
> + int
> + PQsendSome(PGconn *conn)
> + {
> +     return pqSendSome(conn);
> + }
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c postgresql-7.1.3/src/interfaces/libpq/fe-misc.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c    Sun Apr  1 01:13:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-misc.c    Tue Jan 22 12:07:25 2002
> ***************
> *** 90,144 ****
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
> !
> !     /*
> !      * if we are non-blocking and the send queue is too full to buffer
> !      * this request then try to flush some and return an error
>        */
> -     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
> -     {
>
> !         /*
> !          * even if the flush failed we may still have written some data,
> !          * recalculate the size of the send-queue relative to the amount
> !          * we have to send, we may be able to queue it afterall even
> !          * though it's not sent to the database it's ok, any routines that
> !          * check the data coming from the database better call pqFlush()
> !          * anyway.
> !          */
> !         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> !         {
> !             printfPQExpBuffer(&conn->errorMessage,
> !                            "pqPutBytes --  pqFlush couldn't flush enough"
> !                          " data: space available: %d, space needed %d\n",
> !                       Max(conn->outBufSize - conn->outCount, 0), nbytes);
> !             return EOF;
> !         }
> !         /* fixup avail for while loop */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !     }
>
> !     /*
> !      * is the amount of data to be sent is larger than the size of the
> !      * output buffer then we must flush it to make more room.
> !      *
> !      * the code above will make sure the loop conditional is never true for
> !      * non-blocking connections
> !      */
> !     while (nbytes > avail)
> !     {
> !         memcpy(conn->outBuffer + conn->outCount, s, avail);
> !         conn->outCount += avail;
> !         s += avail;
> !         nbytes -= avail;
> !         if (pqFlush(conn))
> !             return EOF;
> !         avail = conn->outBufSize;
> !     }
>
> !     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !     conn->outCount += nbytes;
>
>       return 0;
>   }
> --- 90,163 ----
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     /* Strategy to handle blocking and non-blocking connections: Fill
> !      * the output buffer and flush it repeatedly until either all data
> !      * has been sent or is at least queued in the buffer.
> !      *
> !      * For non-blocking connections, grow the buffer if not all data
> !      * fits into it and the buffer can't be sent because the socket
> !      * would block.
>        */
>
> !     while (nbytes)
> !     {
> !         size_t avail, remaining;
> !
> !         /* fill the output buffer */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !         remaining = Min(avail, nbytes);
> !         memcpy(conn->outBuffer + conn->outCount, s, remaining);
> !         conn->outCount += remaining;
> !         s += remaining;
> !         nbytes -= remaining;
> !
> !         /* if the data didn't fit completely into the buffer, try to
> !          * flush the buffer */
> !         if (nbytes)
> !         {
> !             int send_result = pqSendSome(conn);
>
> !             /* if there were errors, report them */
> !             if (send_result < 0)
> !                 return EOF;
>
> !             /* if not all data could be sent, increase the output
> !              * buffer, put the rest of s into it and return
> !              * successfully. This case will only happen in a
> !              * non-blocking connection
> !              */
> !             if (send_result > 0)
> !             {
> !                 /* try to grow the buffer.
> !                  * FIXME: The new size could be chosen more
> !                  * intelligently.
> !                  */
> !                 size_t buflen = conn->outCount + nbytes;
> !                 if (buflen > conn->outBufSize)
> !                 {
> !                     char * newbuf = realloc(conn->outBuffer, buflen);
> !                     if (!newbuf)
> !                     {
> !                         /* realloc failed. Probably out of memory */
> !                         printfPQExpBuffer(&conn->errorMessage,
> !                                 "cannot allocate memory for output buffer\n");
> !                         return EOF;
> !                     }
> !                     conn->outBuffer = newbuf;
> !                     conn->outBufSize = buflen;
> !                 }
> !                 /* put the data into it */
> !                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !                 conn->outCount += nbytes;
> !
> !                 /* report success. */
> !                 return 0;
> !             }
> !         }
> !
> !         /* pqSendSome was able to send all data. Continue with the next
> !          * chunk of s. */
> !     } /* while */
>
>       return 0;
>   }
> ***************
> *** 575,584 ****
>   }
>
>   /* --------------------------------------------------------------------- */
> ! /* pqFlush: send any data waiting in the output buffer
>    */
>   int
> ! pqFlush(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> --- 594,615 ----
>   }
>
>   /* --------------------------------------------------------------------- */
> ! /* pqSendSome: send any data waiting in the output buffer and return 0
> !  * if all data was sent, -1 if an error occurred or 1 if not all data
> !  * could be written because the socket would have blocked.
> !  *
> !  * For a blocking connection all data will be sent unless an error
> !  * occurrs. -1 will only be returned if the connection is non-blocking.
> !  *
> !  * Internally, the case of data remaining in the buffer after pqSendSome
> !  * could be determined by looking at outCount, but this function also
> !  * serves as the implementation of the new API function PQsendsome.
> !  *
> !  * FIXME: perhaps it would be more useful to return the number of bytes
> !  * remaining?
>    */
>   int
> ! pqSendSome(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> ***************
> *** 586,593 ****
>       if (conn->sock < 0)
>       {
>           printfPQExpBuffer(&conn->errorMessage,
> !                           "pqFlush() -- connection not open\n");
> !         return EOF;
>       }
>
>       /*
> --- 617,624 ----
>       if (conn->sock < 0)
>       {
>           printfPQExpBuffer(&conn->errorMessage,
> !                           "pqSendSome() -- connection not open\n");
> !         return -1;
>       }
>
>       /*
> ***************
> *** 645,651 ****
>                   case ECONNRESET:
>   #endif
>                       printfPQExpBuffer(&conn->errorMessage,
> !                                       "pqFlush() -- backend closed the channel unexpectedly.\n"
>                                         "\tThis probably means the backend terminated abnormally"
>                              " before or while processing the request.\n");
>
> --- 676,682 ----
>                   case ECONNRESET:
>   #endif
>                       printfPQExpBuffer(&conn->errorMessage,
> !                                       "pqSendSome() -- backend closed the channel unexpectedly.\n"
>                                         "\tThis probably means the backend terminated abnormally"
>                              " before or while processing the request.\n");
>
> ***************
> *** 657,670 ****
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return EOF;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
> !                       "pqFlush() --  couldn't send data: errno=%d\n%s\n",
>                                         errno, strerror(errno));
>                       /* We don't assume it's a fatal error... */
> !                     return EOF;
>               }
>           }
>           else
> --- 688,701 ----
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return -1;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
> !                       "pqSendSome() --  couldn't send data: errno=%d\n%s\n",
>                                         errno, strerror(errno));
>                       /* We don't assume it's a fatal error... */
> !                     return -1;
>               }
>           }
>           else
> ***************
> *** 679,685 ****
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> --- 710,716 ----
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here and return 1 to indicate that data is still pending.
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> ***************
> *** 691,704 ****
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return EOF;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return EOF;
>           }
>       }
>
> --- 722,735 ----
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return 1;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return -1;
>           }
>       }
>
> ***************
> *** 707,712 ****
> --- 738,762 ----
>       if (conn->Pfdebug)
>           fflush(conn->Pfdebug);
>
> +     return 0;
> + }
> +
> +
> + /* --------------------------------------------------------------------- */
> + /* pqFlush: send any data waiting in the output buffer
> +  *
> +  * Implemented in terms of pqSendSome to recreate the old behavior which
> +  * returned 0 if all data was sent or EOF. EOF was sent regardless of
> +  * whether an error occurred or not all data was sent on a non-blocking
> +  * socket.
> +  */
> + int
> + pqFlush(PGconn *conn)
> + {
> +     if (pqSendSome(conn))
> +     {
> +         return EOF;
> +     }
>       return 0;
>   }
>
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h
postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h    Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h    Tue Jan 22 12:13:17 2002
> ***************
> *** 265,270 ****
> --- 265,273 ----
>
>       /* Force the write buffer to be written (or at least try) */
>       extern int    PQflush(PGconn *conn);
> +     /* Force the write buffer to be written (or at least try)
> +            (better than PQflush) */
> +     extern int    PQsendSome(PGconn *conn);
>
>       /*
>        * "Fast path" interface --- not really recommended for application
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h
postgresql-7.1.3/src/interfaces/libpq/libpq-int.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h    Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-int.h    Tue Jan 22 12:13:17 2002
> ***************
> *** 321,326 ****
> --- 321,327 ----
>   extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
>   extern int    pqReadData(PGconn *conn);
>   extern int    pqFlush(PGconn *conn);
> + extern int    pqSendSome(PGconn *conn);
>   extern int    pqWait(int forRead, int forWrite, PGconn *conn);
>   extern int    pqReadReady(PGconn *conn);
>   extern int    pqWriteReady(PGconn *conn);
>
>
>
> --
> Intevation GmbH                                 http://intevation.de/
> Sketch                                 http://sketch.sourceforge.net/
> MapIt!                                               http://mapit.de/
>
>
>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html
>

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

Re: Fix for non-blocking connections in libpq

From
Bruce Momjian
Date:
Old version backed out.  Newer patch applied.  Thanks.

---------------------------------------------------------------------------


Bernhard Herzog wrote:
> Bruce Momjian <pgman@candle.pha.pa.us> writes:
>
> > Bernard, just checking.  Is this the most recent version of your patch?
>
> In principle, yes. However, I've ported it to the CVS version in the
> meantime. Here's a patch against current CVS HEAD:
>
>
> Index: src/interfaces/libpq/fe-exec.c
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v
> retrieving revision 1.113
> diff -c -r1.113 fe-exec.c
> *** src/interfaces/libpq/fe-exec.c    2001/10/25 05:50:13    1.113
> --- src/interfaces/libpq/fe-exec.c    2002/02/25 10:21:06
> ***************
> *** 2340,2342 ****
> --- 2340,2350 ----
>
>       return (pqFlush(conn));
>   }
> +
> + /* try to force data out, really only useful for non-blocking users.
> +  * This implementation actually works for non-blocking connections */
> + int
> + PQsendSome(PGconn *conn)
> + {
> +     return pqSendSome(conn);
> + }
> Index: src/interfaces/libpq/fe-misc.c
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v
> retrieving revision 1.65
> diff -c -r1.65 fe-misc.c
> *** src/interfaces/libpq/fe-misc.c    2001/12/03 00:28:24    1.65
> --- src/interfaces/libpq/fe-misc.c    2002/02/25 10:21:06
> ***************
> *** 110,164 ****
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
> !
> !     /*
> !      * if we are non-blocking and the send queue is too full to buffer
> !      * this request then try to flush some and return an error
>        */
> !     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
>       {
> !         /*
> !          * even if the flush failed we may still have written some data,
> !          * recalculate the size of the send-queue relative to the amount
> !          * we have to send, we may be able to queue it afterall even
> !          * though it's not sent to the database it's ok, any routines that
> !          * check the data coming from the database better call pqFlush()
> !          * anyway.
> !          */
> !         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> !         {
> !             printfPQExpBuffer(&conn->errorMessage,
> !                               libpq_gettext("could not flush enough data (space available: %d, space needed
%d)\n"),
> !                          (int) Max(conn->outBufSize - conn->outCount, 0),
> !                               (int) nbytes);
> !             return EOF;
> !         }
> !         /* fixup avail for while loop */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !     }
>
> !     /*
> !      * is the amount of data to be sent is larger than the size of the
> !      * output buffer then we must flush it to make more room.
> !      *
> !      * the code above will make sure the loop conditional is never true for
> !      * non-blocking connections
> !      */
> !     while (nbytes > avail)
> !     {
> !         memcpy(conn->outBuffer + conn->outCount, s, avail);
> !         conn->outCount += avail;
> !         s += avail;
> !         nbytes -= avail;
> !         if (pqFlush(conn))
> !             return EOF;
> !         avail = conn->outBufSize;
> !     }
>
> !     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !     conn->outCount += nbytes;
>
>       return 0;
>   }
>
> --- 110,184 ----
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     /* Strategy to handle blocking and non-blocking connections: Fill
> !      * the output buffer and flush it repeatedly until either all data
> !      * has been sent or is at least queued in the buffer.
> !      *
> !      * For non-blocking connections, grow the buffer if not all data
> !      * fits into it and the buffer can't be sent because the socket
> !      * would block.
>        */
> !
> !     while (nbytes)
>       {
> !         size_t avail, remaining;
> !
> !         /* fill the output buffer */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !         remaining = Min(avail, nbytes);
> !         memcpy(conn->outBuffer + conn->outCount, s, remaining);
> !         conn->outCount += remaining;
> !         s += remaining;
> !         nbytes -= remaining;
> !
> !         /* if the data didn't fit completely into the buffer, try to
> !          * flush the buffer */
> !         if (nbytes)
> !         {
> !             int send_result = pqSendSome(conn);
>
> !             /* if there were errors, report them */
> !             if (send_result < 0)
> !                 return EOF;
>
> !             /* if not all data could be sent, increase the output
> !              * buffer, put the rest of s into it and return
> !              * successfully. This case will only happen in a
> !              * non-blocking connection
> !              */
> !             if (send_result > 0)
> !             {
> !                 /* try to grow the buffer.
> !                  * FIXME: The new size could be chosen more
> !                  * intelligently.
> !                  */
> !                 size_t buflen = conn->outCount + nbytes;
> !                 if (buflen > conn->outBufSize)
> !                 {
> !                     char * newbuf = realloc(conn->outBuffer, buflen);
> !                     if (!newbuf)
> !                     {
> !                         /* realloc failed. Probably out of memory */
> !                         printfPQExpBuffer(&conn->errorMessage,
> !                                 "cannot allocate memory for output buffer\n");
> !                         return EOF;
> !                     }
> !                     conn->outBuffer = newbuf;
> !                     conn->outBufSize = buflen;
> !                 }
> !                 /* put the data into it */
> !                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !                 conn->outCount += nbytes;
>
> +                 /* report success. */
> +                 return 0;
> +             }
> +         }
> +
> +         /* pqSendSome was able to send all data. Continue with the next
> +          * chunk of s. */
> +     } /* while */
> +
>       return 0;
>   }
>
> ***************
> *** 604,613 ****
>   }
>
>   /*
> !  * pqFlush: send any data waiting in the output buffer
>    */
>   int
> ! pqFlush(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> --- 624,636 ----
>   }
>
>   /*
> !  * pqSendSome: send any data waiting in the output buffer.
> !  *
> !  * Return 0 on sucess, -1 on failure and 1 when data remains because the
> !  * socket would block and the connection is non-blocking.
>    */
>   int
> ! pqSendSome(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> ***************
> *** 616,622 ****
>       {
>           printfPQExpBuffer(&conn->errorMessage,
>                             libpq_gettext("connection not open\n"));
> !         return EOF;
>       }
>
>       /*
> --- 639,645 ----
>       {
>           printfPQExpBuffer(&conn->errorMessage,
>                             libpq_gettext("connection not open\n"));
> !         return -1;
>       }
>
>       /*
> ***************
> *** 674,680 ****
>                       printfPQExpBuffer(&conn->errorMessage,
>                                         libpq_gettext(
>                               "server closed the connection unexpectedly\n"
> !                                                     "\tThis probably means the server terminated abnormally\n"
>                            "\tbefore or while processing the request.\n"));
>
>                       /*
> --- 697,703 ----
>                       printfPQExpBuffer(&conn->errorMessage,
>                                         libpq_gettext(
>                               "server closed the connection unexpectedly\n"
> !                    "\tThis probably means the server terminated abnormally\n"
>                            "\tbefore or while processing the request.\n"));
>
>                       /*
> ***************
> *** 685,698 ****
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return EOF;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
>                       libpq_gettext("could not send data to server: %s\n"),
>                                         SOCK_STRERROR(SOCK_ERRNO));
>                       /* We don't assume it's a fatal error... */
> !                     return EOF;
>               }
>           }
>           else
> --- 708,721 ----
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return -1;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
>                       libpq_gettext("could not send data to server: %s\n"),
>                                         SOCK_STRERROR(SOCK_ERRNO));
>                       /* We don't assume it's a fatal error... */
> !                     return -1;
>               }
>           }
>           else
> ***************
> *** 707,713 ****
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> --- 730,736 ----
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here and return 1 to indicate that data is still pending.
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> ***************
> *** 719,732 ****
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return EOF;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return EOF;
>           }
>       }
>
> --- 742,755 ----
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return 1;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return -1;
>           }
>       }
>
> ***************
> *** 735,740 ****
> --- 758,783 ----
>       if (conn->Pfdebug)
>           fflush(conn->Pfdebug);
>
> +     return 0;
> + }
> +
> +
> +
> + /*
> +  * pqFlush: send any data waiting in the output buffer
> +  *
> +  * Implemented in terms of pqSendSome to recreate the old behavior which
> +  * returned 0 if all data was sent or EOF. EOF was sent regardless of
> +  * whether an error occurred or not all data was sent on a non-blocking
> +  * socket.
> +  */
> + int
> + pqFlush(PGconn *conn)
> + {
> +     if (pqSendSome(conn))
> +     {
> +         return EOF;
> +     }
>       return 0;
>   }
>
> Index: src/interfaces/libpq/libpq-fe.h
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-fe.h,v
> retrieving revision 1.80
> diff -c -r1.80 libpq-fe.h
> *** src/interfaces/libpq/libpq-fe.h    2001/11/08 20:37:52    1.80
> --- src/interfaces/libpq/libpq-fe.h    2002/02/25 10:21:06
> ***************
> *** 279,284 ****
> --- 279,285 ----
>
>   /* Force the write buffer to be written (or at least try) */
>   extern int    PQflush(PGconn *conn);
> + extern int    PQsendSome(PGconn *conn);
>
>   /*
>    * "Fast path" interface --- not really recommended for application
> Index: src/interfaces/libpq/libpq-int.h
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-int.h,v
> retrieving revision 1.44
> diff -c -r1.44 libpq-int.h
> *** src/interfaces/libpq/libpq-int.h    2001/11/05 17:46:38    1.44
> --- src/interfaces/libpq/libpq-int.h    2002/02/25 10:21:06
> ***************
> *** 323,328 ****
> --- 323,329 ----
>   extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
>   extern int    pqReadData(PGconn *conn);
>   extern int    pqFlush(PGconn *conn);
> + extern int    pqSendSome(PGconn *conn);
>   extern int    pqWait(int forRead, int forWrite, PGconn *conn);
>   extern int    pqReadReady(PGconn *conn);
>   extern int    pqWriteReady(PGconn *conn);
>
>
>
>    Bernhard
>
> --
> Intevation GmbH                                 http://intevation.de/
> Sketch                                 http://sketch.sourceforge.net/
> MapIt!                                               http://mapit.de/
>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html
>

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026