Re: Fix for non-blocking connections in libpq - Mailing list pgsql-patches

From Bruce Momjian
Subject Re: Fix for non-blocking connections in libpq
Date
Msg-id 200203050607.g2567bn21015@candle.pha.pa.us
Whole thread Raw
In response to Re: Fix for non-blocking connections in libpq  (Bernhard Herzog <bh@intevation.de>)
List pgsql-patches
Old version backed out.  Newer patch applied.  Thanks.

---------------------------------------------------------------------------


Bernhard Herzog wrote:
> Bruce Momjian <pgman@candle.pha.pa.us> writes:
>
> > Bernard, just checking.  Is this the most recent version of your patch?
>
> In principle, yes. However, I've ported it to the CVS version in the
> meantime. Here's a patch against current CVS HEAD:
>
>
> Index: src/interfaces/libpq/fe-exec.c
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v
> retrieving revision 1.113
> diff -c -r1.113 fe-exec.c
> *** src/interfaces/libpq/fe-exec.c    2001/10/25 05:50:13    1.113
> --- src/interfaces/libpq/fe-exec.c    2002/02/25 10:21:06
> ***************
> *** 2340,2342 ****
> --- 2340,2350 ----
>
>       return (pqFlush(conn));
>   }
> +
> + /* try to force data out, really only useful for non-blocking users.
> +  * This implementation actually works for non-blocking connections */
> + int
> + PQsendSome(PGconn *conn)
> + {
> +     return pqSendSome(conn);
> + }
> Index: src/interfaces/libpq/fe-misc.c
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v
> retrieving revision 1.65
> diff -c -r1.65 fe-misc.c
> *** src/interfaces/libpq/fe-misc.c    2001/12/03 00:28:24    1.65
> --- src/interfaces/libpq/fe-misc.c    2002/02/25 10:21:06
> ***************
> *** 110,164 ****
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     size_t        avail = Max(conn->outBufSize - conn->outCount, 0);
> !
> !     /*
> !      * if we are non-blocking and the send queue is too full to buffer
> !      * this request then try to flush some and return an error
>        */
> !     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
>       {
> !         /*
> !          * even if the flush failed we may still have written some data,
> !          * recalculate the size of the send-queue relative to the amount
> !          * we have to send, we may be able to queue it afterall even
> !          * though it's not sent to the database it's ok, any routines that
> !          * check the data coming from the database better call pqFlush()
> !          * anyway.
> !          */
> !         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> !         {
> !             printfPQExpBuffer(&conn->errorMessage,
> !                               libpq_gettext("could not flush enough data (space available: %d, space needed
%d)\n"),
> !                          (int) Max(conn->outBufSize - conn->outCount, 0),
> !                               (int) nbytes);
> !             return EOF;
> !         }
> !         /* fixup avail for while loop */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !     }
>
> !     /*
> !      * is the amount of data to be sent is larger than the size of the
> !      * output buffer then we must flush it to make more room.
> !      *
> !      * the code above will make sure the loop conditional is never true for
> !      * non-blocking connections
> !      */
> !     while (nbytes > avail)
> !     {
> !         memcpy(conn->outBuffer + conn->outCount, s, avail);
> !         conn->outCount += avail;
> !         s += avail;
> !         nbytes -= avail;
> !         if (pqFlush(conn))
> !             return EOF;
> !         avail = conn->outBufSize;
> !     }
>
> !     memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !     conn->outCount += nbytes;
>
>       return 0;
>   }
>
> --- 110,184 ----
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> !     /* Strategy to handle blocking and non-blocking connections: Fill
> !      * the output buffer and flush it repeatedly until either all data
> !      * has been sent or is at least queued in the buffer.
> !      *
> !      * For non-blocking connections, grow the buffer if not all data
> !      * fits into it and the buffer can't be sent because the socket
> !      * would block.
>        */
> !
> !     while (nbytes)
>       {
> !         size_t avail, remaining;
> !
> !         /* fill the output buffer */
>           avail = Max(conn->outBufSize - conn->outCount, 0);
> !         remaining = Min(avail, nbytes);
> !         memcpy(conn->outBuffer + conn->outCount, s, remaining);
> !         conn->outCount += remaining;
> !         s += remaining;
> !         nbytes -= remaining;
> !
> !         /* if the data didn't fit completely into the buffer, try to
> !          * flush the buffer */
> !         if (nbytes)
> !         {
> !             int send_result = pqSendSome(conn);
>
> !             /* if there were errors, report them */
> !             if (send_result < 0)
> !                 return EOF;
>
> !             /* if not all data could be sent, increase the output
> !              * buffer, put the rest of s into it and return
> !              * successfully. This case will only happen in a
> !              * non-blocking connection
> !              */
> !             if (send_result > 0)
> !             {
> !                 /* try to grow the buffer.
> !                  * FIXME: The new size could be chosen more
> !                  * intelligently.
> !                  */
> !                 size_t buflen = conn->outCount + nbytes;
> !                 if (buflen > conn->outBufSize)
> !                 {
> !                     char * newbuf = realloc(conn->outBuffer, buflen);
> !                     if (!newbuf)
> !                     {
> !                         /* realloc failed. Probably out of memory */
> !                         printfPQExpBuffer(&conn->errorMessage,
> !                                 "cannot allocate memory for output buffer\n");
> !                         return EOF;
> !                     }
> !                     conn->outBuffer = newbuf;
> !                     conn->outBufSize = buflen;
> !                 }
> !                 /* put the data into it */
> !                 memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> !                 conn->outCount += nbytes;
>
> +                 /* report success. */
> +                 return 0;
> +             }
> +         }
> +
> +         /* pqSendSome was able to send all data. Continue with the next
> +          * chunk of s. */
> +     } /* while */
> +
>       return 0;
>   }
>
> ***************
> *** 604,613 ****
>   }
>
>   /*
> !  * pqFlush: send any data waiting in the output buffer
>    */
>   int
> ! pqFlush(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> --- 624,636 ----
>   }
>
>   /*
> !  * pqSendSome: send any data waiting in the output buffer.
> !  *
> !  * Return 0 on sucess, -1 on failure and 1 when data remains because the
> !  * socket would block and the connection is non-blocking.
>    */
>   int
> ! pqSendSome(PGconn *conn)
>   {
>       char       *ptr = conn->outBuffer;
>       int            len = conn->outCount;
> ***************
> *** 616,622 ****
>       {
>           printfPQExpBuffer(&conn->errorMessage,
>                             libpq_gettext("connection not open\n"));
> !         return EOF;
>       }
>
>       /*
> --- 639,645 ----
>       {
>           printfPQExpBuffer(&conn->errorMessage,
>                             libpq_gettext("connection not open\n"));
> !         return -1;
>       }
>
>       /*
> ***************
> *** 674,680 ****
>                       printfPQExpBuffer(&conn->errorMessage,
>                                         libpq_gettext(
>                               "server closed the connection unexpectedly\n"
> !                                                     "\tThis probably means the server terminated abnormally\n"
>                            "\tbefore or while processing the request.\n"));
>
>                       /*
> --- 697,703 ----
>                       printfPQExpBuffer(&conn->errorMessage,
>                                         libpq_gettext(
>                               "server closed the connection unexpectedly\n"
> !                    "\tThis probably means the server terminated abnormally\n"
>                            "\tbefore or while processing the request.\n"));
>
>                       /*
> ***************
> *** 685,698 ****
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return EOF;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
>                       libpq_gettext("could not send data to server: %s\n"),
>                                         SOCK_STRERROR(SOCK_ERRNO));
>                       /* We don't assume it's a fatal error... */
> !                     return EOF;
>               }
>           }
>           else
> --- 708,721 ----
>                        * the socket open until pqReadData finds no more data
>                        * can be read.
>                        */
> !                     return -1;
>
>                   default:
>                       printfPQExpBuffer(&conn->errorMessage,
>                       libpq_gettext("could not send data to server: %s\n"),
>                                         SOCK_STRERROR(SOCK_ERRNO));
>                       /* We don't assume it's a fatal error... */
> !                     return -1;
>               }
>           }
>           else
> ***************
> *** 707,713 ****
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> --- 730,736 ----
>
>               /*
>                * if the socket is in non-blocking mode we may need to abort
> !              * here and return 1 to indicate that data is still pending.
>                */
>   #ifdef USE_SSL
>               /* can't do anything for our SSL users yet */
> ***************
> *** 719,732 ****
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return EOF;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return EOF;
>           }
>       }
>
> --- 742,755 ----
>                       /* shift the contents of the buffer */
>                       memmove(conn->outBuffer, ptr, len);
>                       conn->outCount = len;
> !                     return 1;
>                   }
>   #ifdef USE_SSL
>               }
>   #endif
>
>               if (pqWait(FALSE, TRUE, conn))
> !                 return -1;
>           }
>       }
>
> ***************
> *** 735,740 ****
> --- 758,783 ----
>       if (conn->Pfdebug)
>           fflush(conn->Pfdebug);
>
> +     return 0;
> + }
> +
> +
> +
> + /*
> +  * pqFlush: send any data waiting in the output buffer
> +  *
> +  * Implemented in terms of pqSendSome to recreate the old behavior which
> +  * returned 0 if all data was sent or EOF. EOF was sent regardless of
> +  * whether an error occurred or not all data was sent on a non-blocking
> +  * socket.
> +  */
> + int
> + pqFlush(PGconn *conn)
> + {
> +     if (pqSendSome(conn))
> +     {
> +         return EOF;
> +     }
>       return 0;
>   }
>
> Index: src/interfaces/libpq/libpq-fe.h
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-fe.h,v
> retrieving revision 1.80
> diff -c -r1.80 libpq-fe.h
> *** src/interfaces/libpq/libpq-fe.h    2001/11/08 20:37:52    1.80
> --- src/interfaces/libpq/libpq-fe.h    2002/02/25 10:21:06
> ***************
> *** 279,284 ****
> --- 279,285 ----
>
>   /* Force the write buffer to be written (or at least try) */
>   extern int    PQflush(PGconn *conn);
> + extern int    PQsendSome(PGconn *conn);
>
>   /*
>    * "Fast path" interface --- not really recommended for application
> Index: src/interfaces/libpq/libpq-int.h
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-int.h,v
> retrieving revision 1.44
> diff -c -r1.44 libpq-int.h
> *** src/interfaces/libpq/libpq-int.h    2001/11/05 17:46:38    1.44
> --- src/interfaces/libpq/libpq-int.h    2002/02/25 10:21:06
> ***************
> *** 323,328 ****
> --- 323,329 ----
>   extern int    pqPutInt(int value, size_t bytes, PGconn *conn);
>   extern int    pqReadData(PGconn *conn);
>   extern int    pqFlush(PGconn *conn);
> + extern int    pqSendSome(PGconn *conn);
>   extern int    pqWait(int forRead, int forWrite, PGconn *conn);
>   extern int    pqReadReady(PGconn *conn);
>   extern int    pqWriteReady(PGconn *conn);
>
>
>
>    Bernhard
>
> --
> Intevation GmbH                                 http://intevation.de/
> Sketch                                 http://sketch.sourceforge.net/
> MapIt!                                               http://mapit.de/
>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html
>

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

pgsql-patches by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: oid2name cleanup
Next
From: Bruce Momjian
Date:
Subject: Re: \du undocumented in psql help