Re: Fix for non-blocking connections in libpq - Mailing list pgsql-patches

From Bernhard Herzog
Subject Re: Fix for non-blocking connections in libpq
Date
Msg-id 6qelj9bynl.fsf@abnoba.intevation.de
Whole thread Raw
In response to Re: Fix for non-blocking connections in libpq  (Bruce Momjian <pgman@candle.pha.pa.us>)
Responses Re: Fix for non-blocking connections in libpq  (Bruce Momjian <pgman@candle.pha.pa.us>)
List pgsql-patches
Bruce Momjian <pgman@candle.pha.pa.us> writes:

> Bernard, just checking.  Is this the most recent version of your patch?

In principle, yes. However, I've ported it to the CVS version in the
meantime. Here's a patch against current CVS HEAD:


Index: src/interfaces/libpq/fe-exec.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v
retrieving revision 1.113
diff -c -r1.113 fe-exec.c
*** src/interfaces/libpq/fe-exec.c    2001/10/25 05:50:13    1.113
--- src/interfaces/libpq/fe-exec.c    2002/02/25 10:21:06
***************
*** 2340,2342 ****
--- 2340,2350 ----

      return (pqFlush(conn));
  }
+
+ /* try to force data out, really only useful for non-blocking users.
+  * This implementation actually works for non-blocking connections */
+ int
+ PQsendSome(PGconn *conn)
+ {
+     return pqSendSome(conn);
+ }
Index: src/interfaces/libpq/fe-misc.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v
retrieving revision 1.65
diff -c -r1.65 fe-misc.c
*** src/interfaces/libpq/fe-misc.c    2001/12/03 00:28:24    1.65
--- src/interfaces/libpq/fe-misc.c    2002/02/25 10:21:06
***************
*** 110,164 ****
  static int
  pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
  {
!     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
!
!     /*
!      * if we are non-blocking and the send queue is too full to buffer
!      * this request then try to flush some and return an error
       */
!     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
      {
!         /*
!          * even if the flush failed we may still have written some data,
!          * recalculate the size of the send-queue relative to the amount
!          * we have to send, we may be able to queue it afterall even
!          * though it's not sent to the database it's ok, any routines that
!          * check the data coming from the database better call pqFlush()
!          * anyway.
!          */
!         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
!         {
!             printfPQExpBuffer(&conn->errorMessage,
!                               libpq_gettext("could not flush enough data (space available: %d, space needed %d)\n"),
!                          (int) Max(conn->outBufSize - conn->outCount, 0),
!                               (int) nbytes);
!             return EOF;
!         }
!         /* fixup avail for while loop */
          avail = Max(conn->outBufSize - conn->outCount, 0);
!     }

!     /*
!      * is the amount of data to be sent is larger than the size of the
!      * output buffer then we must flush it to make more room.
!      *
!      * the code above will make sure the loop conditional is never true for
!      * non-blocking connections
!      */
!     while (nbytes > avail)
!     {
!         memcpy(conn->outBuffer + conn->outCount, s, avail);
!         conn->outCount += avail;
!         s += avail;
!         nbytes -= avail;
!         if (pqFlush(conn))
!             return EOF;
!         avail = conn->outBufSize;
!     }

!     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
!     conn->outCount += nbytes;

      return 0;
  }

--- 110,184 ----
  static int
  pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
  {
!     /* Strategy to handle blocking and non-blocking connections: Fill
!      * the output buffer and flush it repeatedly until either all data
!      * has been sent or is at least queued in the buffer.
!      *
!      * For non-blocking connections, grow the buffer if not all data
!      * fits into it and the buffer can't be sent because the socket
!      * would block.
       */
!
!     while (nbytes)
      {
!         size_t avail, remaining;
!
!         /* fill the output buffer */
          avail = Max(conn->outBufSize - conn->outCount, 0);
!         remaining = Min(avail, nbytes);
!         memcpy(conn->outBuffer + conn->outCount, s, remaining);
!         conn->outCount += remaining;
!         s += remaining;
!         nbytes -= remaining;
!
!         /* if the data didn't fit completely into the buffer, try to
!          * flush the buffer */
!         if (nbytes)
!         {
!             int send_result = pqSendSome(conn);

!             /* if there were errors, report them */
!             if (send_result < 0)
!                 return EOF;

!             /* if not all data could be sent, increase the output
!              * buffer, put the rest of s into it and return
!              * successfully. This case will only happen in a
!              * non-blocking connection
!              */
!             if (send_result > 0)
!             {
!                 /* try to grow the buffer.
!                  * FIXME: The new size could be chosen more
!                  * intelligently.
!                  */
!                 size_t buflen = conn->outCount + nbytes;
!                 if (buflen > conn->outBufSize)
!                 {
!                     char * newbuf = realloc(conn->outBuffer, buflen);
!                     if (!newbuf)
!                     {
!                         /* realloc failed. Probably out of memory */
!                         printfPQExpBuffer(&conn->errorMessage,
!                                 "cannot allocate memory for output buffer\n");
!                         return EOF;
!                     }
!                     conn->outBuffer = newbuf;
!                     conn->outBufSize = buflen;
!                 }
!                 /* put the data into it */
!                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
!                 conn->outCount += nbytes;

+                 /* report success. */
+                 return 0;
+             }
+         }
+
+         /* pqSendSome was able to send all data. Continue with the next
+          * chunk of s. */
+     } /* while */
+
      return 0;
  }

***************
*** 604,613 ****
  }

  /*
!  * pqFlush: send any data waiting in the output buffer
   */
  int
! pqFlush(PGconn *conn)
  {
      char       *ptr = conn->outBuffer;
      int            len = conn->outCount;
--- 624,636 ----
  }

  /*
!  * pqSendSome: send any data waiting in the output buffer.
!  *
!  * Return 0 on sucess, -1 on failure and 1 when data remains because the
!  * socket would block and the connection is non-blocking.
   */
  int
! pqSendSome(PGconn *conn)
  {
      char       *ptr = conn->outBuffer;
      int            len = conn->outCount;
***************
*** 616,622 ****
      {
          printfPQExpBuffer(&conn->errorMessage,
                            libpq_gettext("connection not open\n"));
!         return EOF;
      }

      /*
--- 639,645 ----
      {
          printfPQExpBuffer(&conn->errorMessage,
                            libpq_gettext("connection not open\n"));
!         return -1;
      }

      /*
***************
*** 674,680 ****
                      printfPQExpBuffer(&conn->errorMessage,
                                        libpq_gettext(
                              "server closed the connection unexpectedly\n"
!                                                     "\tThis probably means the server terminated abnormally\n"
                           "\tbefore or while processing the request.\n"));

                      /*
--- 697,703 ----
                      printfPQExpBuffer(&conn->errorMessage,
                                        libpq_gettext(
                              "server closed the connection unexpectedly\n"
!                    "\tThis probably means the server terminated abnormally\n"
                           "\tbefore or while processing the request.\n"));

                      /*
***************
*** 685,698 ****
                       * the socket open until pqReadData finds no more data
                       * can be read.
                       */
!                     return EOF;

                  default:
                      printfPQExpBuffer(&conn->errorMessage,
                      libpq_gettext("could not send data to server: %s\n"),
                                        SOCK_STRERROR(SOCK_ERRNO));
                      /* We don't assume it's a fatal error... */
!                     return EOF;
              }
          }
          else
--- 708,721 ----
                       * the socket open until pqReadData finds no more data
                       * can be read.
                       */
!                     return -1;

                  default:
                      printfPQExpBuffer(&conn->errorMessage,
                      libpq_gettext("could not send data to server: %s\n"),
                                        SOCK_STRERROR(SOCK_ERRNO));
                      /* We don't assume it's a fatal error... */
!                     return -1;
              }
          }
          else
***************
*** 707,713 ****

              /*
               * if the socket is in non-blocking mode we may need to abort
!              * here
               */
  #ifdef USE_SSL
              /* can't do anything for our SSL users yet */
--- 730,736 ----

              /*
               * if the socket is in non-blocking mode we may need to abort
!              * here and return 1 to indicate that data is still pending.
               */
  #ifdef USE_SSL
              /* can't do anything for our SSL users yet */
***************
*** 719,732 ****
                      /* shift the contents of the buffer */
                      memmove(conn->outBuffer, ptr, len);
                      conn->outCount = len;
!                     return EOF;
                  }
  #ifdef USE_SSL
              }
  #endif

              if (pqWait(FALSE, TRUE, conn))
!                 return EOF;
          }
      }

--- 742,755 ----
                      /* shift the contents of the buffer */
                      memmove(conn->outBuffer, ptr, len);
                      conn->outCount = len;
!                     return 1;
                  }
  #ifdef USE_SSL
              }
  #endif

              if (pqWait(FALSE, TRUE, conn))
!                 return -1;
          }
      }

***************
*** 735,740 ****
--- 758,783 ----
      if (conn->Pfdebug)
          fflush(conn->Pfdebug);

+     return 0;
+ }
+
+
+
+ /*
+  * pqFlush: send any data waiting in the output buffer
+  *
+  * Implemented in terms of pqSendSome to recreate the old behavior which
+  * returned 0 if all data was sent or EOF. EOF was sent regardless of
+  * whether an error occurred or not all data was sent on a non-blocking
+  * socket.
+  */
+ int
+ pqFlush(PGconn *conn)
+ {
+     if (pqSendSome(conn))
+     {
+         return EOF;
+     }
      return 0;
  }

Index: src/interfaces/libpq/libpq-fe.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-fe.h,v
retrieving revision 1.80
diff -c -r1.80 libpq-fe.h
*** src/interfaces/libpq/libpq-fe.h    2001/11/08 20:37:52    1.80
--- src/interfaces/libpq/libpq-fe.h    2002/02/25 10:21:06
***************
*** 279,284 ****
--- 279,285 ----

  /* Force the write buffer to be written (or at least try) */
  extern int    PQflush(PGconn *conn);
+ extern int    PQsendSome(PGconn *conn);

  /*
   * "Fast path" interface --- not really recommended for application
Index: src/interfaces/libpq/libpq-int.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-int.h,v
retrieving revision 1.44
diff -c -r1.44 libpq-int.h
*** src/interfaces/libpq/libpq-int.h    2001/11/05 17:46:38    1.44
--- src/interfaces/libpq/libpq-int.h    2002/02/25 10:21:06
***************
*** 323,328 ****
--- 323,329 ----
  extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
  extern int    pqReadData(PGconn *conn);
  extern int    pqFlush(PGconn *conn);
+ extern int    pqSendSome(PGconn *conn);
  extern int    pqWait(int forRead, int forWrite, PGconn *conn);
  extern int    pqReadReady(PGconn *conn);
  extern int    pqWriteReady(PGconn *conn);



   Bernhard

--
Intevation GmbH                                 http://intevation.de/
Sketch                                 http://sketch.sourceforge.net/
MapIt!                                               http://mapit.de/

pgsql-patches by date:

Previous
From: Paul Eggert
Date:
Subject: support for POSIX 1003.1-2001 hosts
Next
From: Paul Vixie
Date:
Subject: Re: IPv6 Support for INET/CIDR types.