Re: non-blocking patches. - Mailing list pgsql-patches

From Bruce Momjian
Subject Re: non-blocking patches.
Date
Msg-id 200006011802.OAA19327@candle.pha.pa.us
Whole thread Raw
List pgsql-patches
Just to confirm that these patches appear to have been applied.


>
> These patches to libpq allow a user to toggle the blocking nature of
> the database connection.
>
> They also fix a problem where if the database's pipe was full it could
> busy-loop attempting to flush the send buffer.
>
> It's assumed that callers to PQexec want blocking behavior and the
> mode of the connection will be toggled while the query is being
> executed via PQexec.
>
> A new field has been added to the PGconn structure to allow the database
> to track the non-blocking nature of the connection without polling the
> status of the socket via syscalls.
>
> When in non-blocking mode the library is careful to make sure that
> it will send a complete command/line down the wire before allowing it.
>
> The case of EINTR in pqFlush() is caught and averted from making a
> useless select() call.
>
> There is a problem though, some of the code (particularly "fe-exec.c" line 518)
> may now get out of sync because:
>
>     if (pqPutnchar("Q", 1, conn) ||
>         pqPuts(query, conn) ||
>         pqFlush(conn))
>     {
>         handleSendFailure(conn);
>         return 0;
>     }
>
> may send a 'Q' but be unable to send the query, I'm unsure if
> handleSendFailure() is able to reliably deal with this.  I may need
> to work on reservations for the send buffer if not.
>
> Does anyone know?  I'll be investigating meanwhile but I wanted
> people to get a snapshot of what I was working on so I could get
> some feedback if i'm going in the right direction.
>
> These patches need review.  My apologies for not running it through
> pgindent, but the patches supplied postgresql don't seem to apply
> cleanly to FreeBSD's indent any longer and my indent was segfaulting.
>
> Hopefully I kept within the guidelines for acceptable changes.
>
> thanks,
> -Alfred Perlstein - [bright@rush.net|alfred@freebsd.org]
> Wintelcom systems administrator and programmer
>    - http://www.wintelcom.net/ [bright@wintelcom.net]
>
>
> Index: fe-connect.c
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-connect.c,v
> retrieving revision 1.108
> diff -u -u -r1.108 fe-connect.c
> --- fe-connect.c    1999/12/02 00:26:15    1.108
> +++ fe-connect.c    1999/12/14 09:42:24
> @@ -595,31 +595,6 @@
>      return 0;
>  }
>
> -
> -/* ----------
> - * connectMakeNonblocking -
> - * Make a connection non-blocking.
> - * Returns 1 if successful, 0 if not.
> - * ----------
> - */
> -static int
> -connectMakeNonblocking(PGconn *conn)
> -{
> -#ifndef WIN32
> -    if (fcntl(conn->sock, F_SETFL, O_NONBLOCK) < 0)
> -#else
> -    if (ioctlsocket(conn->sock, FIONBIO, &on) != 0)
> -#endif
> -    {
> -        printfPQExpBuffer(&conn->errorMessage,
> -                          "connectMakeNonblocking -- fcntl() failed: errno=%d\n%s\n",
> -                          errno, strerror(errno));
> -        return 0;
> -    }
> -
> -    return 1;
> -}
> -
>  /* ----------
>   * connectNoDelay -
>   * Sets the TCP_NODELAY socket option.
> @@ -792,7 +767,7 @@
>       *   Ewan Mellor <eem21@cam.ac.uk>.
>       * ---------- */
>  #if (!defined(WIN32) || defined(WIN32_NON_BLOCKING_CONNECTIONS)) && !defined(USE_SSL)
> -    if (!connectMakeNonblocking(conn))
> +    if (PQsetnonblocking(conn, TRUE) != 0)
>          goto connect_errReturn;
>  #endif
>
> @@ -904,7 +879,7 @@
>      /* This makes the connection non-blocking, for all those cases which forced us
>         not to do it above. */
>  #if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL)
> -    if (!connectMakeNonblocking(conn))
> +    if (PQsetnonblocking(conn, TRUE) != 0)
>          goto connect_errReturn;
>  #endif
>
> @@ -1702,6 +1677,7 @@
>      conn->inBuffer = (char *) malloc(conn->inBufSize);
>      conn->outBufSize = 8 * 1024;
>      conn->outBuffer = (char *) malloc(conn->outBufSize);
> +    conn->nonblocking = FALSE;
>      initPQExpBuffer(&conn->errorMessage);
>      initPQExpBuffer(&conn->workBuffer);
>      if (conn->inBuffer == NULL ||
> @@ -1811,6 +1787,7 @@
>      conn->lobjfuncs = NULL;
>      conn->inStart = conn->inCursor = conn->inEnd = 0;
>      conn->outCount = 0;
> +    conn->nonblocking = FALSE;
>
>  }
>
> Index: fe-exec.c
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-exec.c,v
> retrieving revision 1.86
> diff -u -u -r1.86 fe-exec.c
> --- fe-exec.c    1999/11/11 00:10:14    1.86
> +++ fe-exec.c    1999/12/14 05:55:11
> @@ -13,6 +13,7 @@
>   */
>  #include <errno.h>
>  #include <ctype.h>
> +#include <fcntl.h>
>
>  #include "postgres.h"
>  #include "libpq-fe.h"
> @@ -24,7 +25,6 @@
>  #include <unistd.h>
>  #endif
>
> -
>  /* keep this in same order as ExecStatusType in libpq-fe.h */
>  const char *const pgresStatus[] = {
>      "PGRES_EMPTY_QUERY",
> @@ -574,7 +574,15 @@
>       * we will NOT block waiting for more input.
>       */
>      if (pqReadData(conn) < 0)
> +    {
> +        /*
> +         * try to flush the send-queue otherwise we may never get a
> +         * resonce for something that may not have already been sent
> +         * because it's in our write buffer!
> +         */
> +        pqFlush(conn);
>          return 0;
> +    }
>      /* Parsing of the data waits till later. */
>      return 1;
>  }
> @@ -1088,8 +1096,17 @@
>  {
>      PGresult   *result;
>      PGresult   *lastResult;
> +    bool    savedblocking;
>
>      /*
> +     * we assume anyone calling PQexec wants blocking behaviour,
> +     * we force the blocking status of the connection to blocking
> +     * for the duration of this function and restore it on return
> +     */
> +    savedblocking = PQisnonblocking(conn);
> +    PQsetnonblocking(conn, FALSE);
> +
> +    /*
>       * Silently discard any prior query result that application didn't
>       * eat. This is probably poor design, but it's here for backward
>       * compatibility.
> @@ -1102,14 +1119,15 @@
>              PQclear(result);
>              printfPQExpBuffer(&conn->errorMessage,
>                  "PQexec: you gotta get out of a COPY state yourself.\n");
> -            return NULL;
> +            /* restore blocking status */
> +            goto errout;
>          }
>          PQclear(result);
>      }
>
>      /* OK to send the message */
>      if (!PQsendQuery(conn, query))
> -        return NULL;
> +        goto errout;
>
>      /*
>       * For backwards compatibility, return the last result if there are
> @@ -1142,7 +1160,13 @@
>              result->resultStatus == PGRES_COPY_OUT)
>              break;
>      }
> +
> +    PQsetnonblocking(conn, savedblocking);
>      return lastResult;
> +
> +errout:
> +    PQsetnonblocking(conn, savedblocking);
> +    return NULL;
>  }
>
>
> @@ -1431,8 +1455,14 @@
>               "PQendcopy() -- I don't think there's a copy in progress.\n");
>          return 1;
>      }
> +
> +    /* make sure no data is waiting to be sent */
> +    if (pqFlush(conn))
> +        return (1);
>
> -    (void) pqFlush(conn);        /* make sure no data is waiting to be sent */
> +    /* non blocking connections may have to abort at this point. */
> +    if (PQisnonblocking(conn) && PQisBusy(conn))
> +        return (1);
>
>      /* Return to active duty */
>      conn->asyncStatus = PGASYNC_BUSY;
> @@ -2025,4 +2055,72 @@
>          return 1;
>      else
>          return 0;
> +}
> +
> +/* PQsetnonblocking:
> +     sets the PGconn's database connection non-blocking if the arg is TRUE
> +     or makes it non-blocking if the arg is FALSE, this will not protect
> +     you from PQexec(), you'll only be safe when using the non-blocking
> +     API
> +     Needs to be called only on a connected database connection.
> +*/
> +
> +int
> +PQsetnonblocking(PGconn *conn, int arg)
> +{
> +    int    fcntlarg;
> +
> +    arg = (arg == TRUE) ? 1 : 0;
> +    if (arg == conn->nonblocking)
> +        return (0);
> +
> +#ifdef USE_SSL
> +    if (conn->ssl)
> +    {
> +        printfPQExpBuffer(&conn->errorMessage,
> +            "PQsetnonblocking() -- not supported when using SSL\n");
> +        return (-1);
> +    }
> +#endif /* USE_SSL */
> +
> +#ifndef WIN32
> +    fcntlarg = fcntl(conn->sock, F_GETFL, 0);
> +    if (fcntlarg == -1)
> +        return (-1);
> +
> +    if ((arg == TRUE &&
> +        fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) ||
> +        (arg == FALSE &&
> +        fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1))
> +#else
> +    fcntlarg = arg;
> +    if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0)
> +#endif
> +    {
> +        printfPQExpBuffer(&conn->errorMessage,
> +            "PQsetblocking() -- unable to set nonblocking status to %s\n",
> +            arg == TRUE ? "TRUE" : "FALSE");
> +        return (-1);
> +    }
> +
> +    conn->nonblocking = arg;
> +    return (0);
> +}
> +
> +/* return the blocking status of the database connection, TRUE == nonblocking,
> +     FALSE == blocking
> +*/
> +int
> +PQisnonblocking(PGconn *conn)
> +{
> +
> +    return (conn->nonblocking);
> +}
> +
> +/* try to force data out, really only useful for non-blocking users */
> +int
> +PQflush(PGconn *conn)
> +{
> +
> +    return (pqFlush(conn));
>  }
> Index: fe-misc.c
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-misc.c,v
> retrieving revision 1.33
> diff -u -u -r1.33 fe-misc.c
> --- fe-misc.c    1999/11/30 03:08:19    1.33
> +++ fe-misc.c    1999/12/14 08:21:09
> @@ -86,6 +86,34 @@
>  {
>      size_t avail = Max(conn->outBufSize - conn->outCount, 0);
>
> +    /*
> +     * if we are non-blocking and the send queue is too full to buffer this
> +     * request then try to flush some and return an error
> +     */
> +    if (PQisnonblocking(conn) && nbytes > avail && pqFlush(conn))
> +    {
> +        /*
> +         * even if the flush failed we may still have written some
> +         * data, recalculate the size of the send-queue relative
> +         * to the amount we have to send, we may be able to queue it
> +         * afterall even though it's not sent to the database it's
> +         * ok, any routines that check the data coming from the
> +         * database better call pqFlush() anyway.
> +         */
> +        if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> +        {
> +            printfPQExpBuffer(&conn->errorMessage,
> +                "pqPutBytes --  pqFlush couldn't flush enough"
> +                " data: space available: %d, space needed %d\n",
> +                Max(conn->outBufSize - conn->outCount, 0), nbytes);
> +            return EOF;
> +        }
> +    }
> +
> +    /*
> +     * the non-blocking code above makes sure that this isn't true,
> +     * essentially this is no-op
> +     */
>      while (nbytes > avail)
>      {
>          memcpy(conn->outBuffer + conn->outCount, s, avail);
> @@ -548,6 +576,14 @@
>          return EOF;
>      }
>
> +    /*
> +     * don't try to send zero data, allows us to use this function
> +     * without too much worry about overhead
> +     */
> +    if (len == 0)
> +        return (0);
> +
> +    /* while there's still data to send */
>      while (len > 0)
>      {
>          /* Prevent being SIGPIPEd if backend has closed the connection. */
> @@ -556,6 +592,7 @@
>  #endif
>
>          int sent;
> +
>  #ifdef USE_SSL
>          if (conn->ssl)
>            sent = SSL_write(conn->ssl, ptr, len);
> @@ -585,6 +622,8 @@
>                  case EWOULDBLOCK:
>                      break;
>  #endif
> +                case EINTR:
> +                    continue;
>
>                  case EPIPE:
>  #ifdef ECONNRESET
> @@ -616,13 +655,31 @@
>              ptr += sent;
>              len -= sent;
>          }
> +
>          if (len > 0)
>          {
>              /* We didn't send it all, wait till we can send more */
> +
> +            /*
> +             * if the socket is in non-blocking mode we may need
> +             * to abort here
> +             */
> +#ifdef USE_SSL
> +            /* can't do anything for our SSL users yet */
> +            if (conn->ssl == NULL)
> +            {
> +#endif
> +                if (PQisnonblocking(conn))
> +                {
> +                    /* shift the contents of the buffer */
> +                    memmove(conn->outBuffer, ptr, len);
> +                    conn->outCount = len;
> +                    return EOF;
> +                }
> +#ifdef USE_SSL
> +            }
> +#endif
>
> -            /* At first glance this looks as though it should block.  I think
> -             * that it will be OK though, as long as the socket is
> -             * non-blocking. */
>              if (pqWait(FALSE, TRUE, conn))
>                  return EOF;
>          }
> Index: libpq-fe.h
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-fe.h,v
> retrieving revision 1.53
> diff -u -u -r1.53 libpq-fe.h
> --- libpq-fe.h    1999/11/30 03:08:19    1.53
> +++ libpq-fe.h    1999/12/14 01:30:01
> @@ -269,6 +269,13 @@
>      extern int    PQputnbytes(PGconn *conn, const char *buffer, int nbytes);
>      extern int    PQendcopy(PGconn *conn);
>
> +    /* Set blocking/nonblocking connection to the backend */
> +    extern int    PQsetnonblocking(PGconn *conn, int arg);
> +    extern int    PQisnonblocking(PGconn *conn);
> +
> +    /* Force the write buffer to be written (or at least try) */
> +    extern int    PQflush(PGconn *conn);
> +
>      /*
>       * "Fast path" interface --- not really recommended for application
>       * use
> Index: libpq-int.h
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-int.h,v
> retrieving revision 1.14
> diff -u -u -r1.14 libpq-int.h
> --- libpq-int.h    1999/11/30 03:08:19    1.14
> +++ libpq-int.h    1999/12/14 01:30:01
> @@ -215,6 +215,9 @@
>      int            inEnd;            /* offset to first position after avail
>                                   * data */
>
> +    int            nonblocking;    /* whether this connection is using a blocking
> +                                 * socket to the backend or not */
> +
>      /* Buffer for data not yet sent to backend */
>      char       *outBuffer;        /* currently allocated buffer */
>      int            outBufSize;        /* allocated size of buffer */
>
>
> ************
>
>


--
  Bruce Momjian                        |  http://www.op.net/~candle
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

pgsql-patches by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: Industrial-Strength Logging
Next
From: Bruce Momjian
Date:
Subject: Re: Lock