Re: [PATCHES] non-blocking patches. - Mailing list pgsql-hackers
From | Bruce Momjian |
---|---|
Subject | Re: [PATCHES] non-blocking patches. |
Date | |
Msg-id | 199912151351.IAA02889@candle.pha.pa.us Whole thread Raw |
List | pgsql-hackers |
Can anyone comment on this? I don't know the answer, and I know he is waiting for help. This originally appeared on the patches list. > > These patches to libpq allow a user to toggle the blocking nature of > the database connection. > > They also fix a problem where if the database's pipe was full it could > busy-loop attempting to flush the send buffer. > > It's assumed that callers to PQexec want blocking behavior and the > mode of the connection will be toggled while the query is being > executed via PQexec. > > A new field has been added to the PGconn structure to allow the database > to track the non-blocking nature of the connection without polling the > status of the socket via syscalls. > > When in non-blocking mode the library is careful to make sure that > it will send a complete command/line down the wire before allowing it. > > The case of EINTR in pqFlush() is caught and averted from making a > useless select() call. > > There is a problem though, some of the code (particularly "fe-exec.c" line 518) > may now get out of sync because: > > if (pqPutnchar("Q", 1, conn) || > pqPuts(query, conn) || > pqFlush(conn)) > { > handleSendFailure(conn); > return 0; > } > > may send a 'Q' but be unable to send the query, I'm unsure if > handleSendFailure() is able to reliably deal with this. I may need > to work on reservations for the send buffer if not. > > Does anyone know? I'll be investigating meanwhile but I wanted > people to get a snapshot of what I was working on so I could get > some feedback if i'm going in the right direction. > > These patches need review. My apologies for not running it through > pgindent, but the patches supplied postgresql don't seem to apply > cleanly to FreeBSD's indent any longer and my indent was segfaulting. > > Hopefully I kept within the guidelines for acceptable changes. > > thanks, > -Alfred Perlstein - [bright@rush.net|alfred@freebsd.org] > Wintelcom systems administrator and programmer > - http://www.wintelcom.net/ [bright@wintelcom.net] > > > Index: fe-connect.c > =================================================================== > RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-connect.c,v > retrieving revision 1.108 > diff -u -u -r1.108 fe-connect.c > --- fe-connect.c 1999/12/02 00:26:15 1.108 > +++ fe-connect.c 1999/12/14 09:42:24 > @@ -595,31 +595,6 @@ > return 0; > } > > - > -/* ---------- > - * connectMakeNonblocking - > - * Make a connection non-blocking. > - * Returns 1 if successful, 0 if not. > - * ---------- > - */ > -static int > -connectMakeNonblocking(PGconn *conn) > -{ > -#ifndef WIN32 > - if (fcntl(conn->sock, F_SETFL, O_NONBLOCK) < 0) > -#else > - if (ioctlsocket(conn->sock, FIONBIO, &on) != 0) > -#endif > - { > - printfPQExpBuffer(&conn->errorMessage, > - "connectMakeNonblocking -- fcntl() failed: errno=%d\n%s\n", > - errno, strerror(errno)); > - return 0; > - } > - > - return 1; > -} > - > /* ---------- > * connectNoDelay - > * Sets the TCP_NODELAY socket option. > @@ -792,7 +767,7 @@ > * Ewan Mellor <eem21@cam.ac.uk>. > * ---------- */ > #if (!defined(WIN32) || defined(WIN32_NON_BLOCKING_CONNECTIONS)) && !defined(USE_SSL) > - if (!connectMakeNonblocking(conn)) > + if (PQsetnonblocking(conn, TRUE) != 0) > goto connect_errReturn; > #endif > > @@ -904,7 +879,7 @@ > /* This makes the connection non-blocking, for all those cases which forced us > not to do it above. */ > #if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL) > - if (!connectMakeNonblocking(conn)) > + if (PQsetnonblocking(conn, TRUE) != 0) > goto connect_errReturn; > #endif > > @@ -1702,6 +1677,7 @@ > conn->inBuffer = (char *) malloc(conn->inBufSize); > conn->outBufSize = 8 * 1024; > conn->outBuffer = (char *) malloc(conn->outBufSize); > + conn->nonblocking = FALSE; > initPQExpBuffer(&conn->errorMessage); > initPQExpBuffer(&conn->workBuffer); > if (conn->inBuffer == NULL || > @@ -1811,6 +1787,7 @@ > conn->lobjfuncs = NULL; > conn->inStart = conn->inCursor = conn->inEnd = 0; > conn->outCount = 0; > + conn->nonblocking = FALSE; > > } > > Index: fe-exec.c > =================================================================== > RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-exec.c,v > retrieving revision 1.86 > diff -u -u -r1.86 fe-exec.c > --- fe-exec.c 1999/11/11 00:10:14 1.86 > +++ fe-exec.c 1999/12/14 05:55:11 > @@ -13,6 +13,7 @@ > */ > #include <errno.h> > #include <ctype.h> > +#include <fcntl.h> > > #include "postgres.h" > #include "libpq-fe.h" > @@ -24,7 +25,6 @@ > #include <unistd.h> > #endif > > - > /* keep this in same order as ExecStatusType in libpq-fe.h */ > const char *const pgresStatus[] = { > "PGRES_EMPTY_QUERY", > @@ -574,7 +574,15 @@ > * we will NOT block waiting for more input. > */ > if (pqReadData(conn) < 0) > + { > + /* > + * try to flush the send-queue otherwise we may never get a > + * resonce for something that may not have already been sent > + * because it's in our write buffer! > + */ > + pqFlush(conn); > return 0; > + } > /* Parsing of the data waits till later. */ > return 1; > } > @@ -1088,8 +1096,17 @@ > { > PGresult *result; > PGresult *lastResult; > + bool savedblocking; > > /* > + * we assume anyone calling PQexec wants blocking behaviour, > + * we force the blocking status of the connection to blocking > + * for the duration of this function and restore it on return > + */ > + savedblocking = PQisnonblocking(conn); > + PQsetnonblocking(conn, FALSE); > + > + /* > * Silently discard any prior query result that application didn't > * eat. This is probably poor design, but it's here for backward > * compatibility. > @@ -1102,14 +1119,15 @@ > PQclear(result); > printfPQExpBuffer(&conn->errorMessage, > "PQexec: you gotta get out of a COPY state yourself.\n"); > - return NULL; > + /* restore blocking status */ > + goto errout; > } > PQclear(result); > } > > /* OK to send the message */ > if (!PQsendQuery(conn, query)) > - return NULL; > + goto errout; > > /* > * For backwards compatibility, return the last result if there are > @@ -1142,7 +1160,13 @@ > result->resultStatus == PGRES_COPY_OUT) > break; > } > + > + PQsetnonblocking(conn, savedblocking); > return lastResult; > + > +errout: > + PQsetnonblocking(conn, savedblocking); > + return NULL; > } > > > @@ -1431,8 +1455,14 @@ > "PQendcopy() -- I don't think there's a copy in progress.\n"); > return 1; > } > + > + /* make sure no data is waiting to be sent */ > + if (pqFlush(conn)) > + return (1); > > - (void) pqFlush(conn); /* make sure no data is waiting to be sent */ > + /* non blocking connections may have to abort at this point. */ > + if (PQisnonblocking(conn) && PQisBusy(conn)) > + return (1); > > /* Return to active duty */ > conn->asyncStatus = PGASYNC_BUSY; > @@ -2025,4 +2055,72 @@ > return 1; > else > return 0; > +} > + > +/* PQsetnonblocking: > + sets the PGconn's database connection non-blocking if the arg is TRUE > + or makes it non-blocking if the arg is FALSE, this will not protect > + you from PQexec(), you'll only be safe when using the non-blocking > + API > + Needs to be called only on a connected database connection. > +*/ > + > +int > +PQsetnonblocking(PGconn *conn, int arg) > +{ > + int fcntlarg; > + > + arg = (arg == TRUE) ? 1 : 0; > + if (arg == conn->nonblocking) > + return (0); > + > +#ifdef USE_SSL > + if (conn->ssl) > + { > + printfPQExpBuffer(&conn->errorMessage, > + "PQsetnonblocking() -- not supported when using SSL\n"); > + return (-1); > + } > +#endif /* USE_SSL */ > + > +#ifndef WIN32 > + fcntlarg = fcntl(conn->sock, F_GETFL, 0); > + if (fcntlarg == -1) > + return (-1); > + > + if ((arg == TRUE && > + fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) || > + (arg == FALSE && > + fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1)) > +#else > + fcntlarg = arg; > + if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0) > +#endif > + { > + printfPQExpBuffer(&conn->errorMessage, > + "PQsetblocking() -- unable to set nonblocking status to %s\n", > + arg == TRUE ? "TRUE" : "FALSE"); > + return (-1); > + } > + > + conn->nonblocking = arg; > + return (0); > +} > + > +/* return the blocking status of the database connection, TRUE == nonblocking, > + FALSE == blocking > +*/ > +int > +PQisnonblocking(PGconn *conn) > +{ > + > + return (conn->nonblocking); > +} > + > +/* try to force data out, really only useful for non-blocking users */ > +int > +PQflush(PGconn *conn) > +{ > + > + return (pqFlush(conn)); > } > Index: fe-misc.c > =================================================================== > RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-misc.c,v > retrieving revision 1.33 > diff -u -u -r1.33 fe-misc.c > --- fe-misc.c 1999/11/30 03:08:19 1.33 > +++ fe-misc.c 1999/12/14 08:21:09 > @@ -86,6 +86,34 @@ > { > size_t avail = Max(conn->outBufSize - conn->outCount, 0); > > + /* > + * if we are non-blocking and the send queue is too full to buffer this > + * request then try to flush some and return an error > + */ > + if (PQisnonblocking(conn) && nbytes > avail && pqFlush(conn)) > + { > + /* > + * even if the flush failed we may still have written some > + * data, recalculate the size of the send-queue relative > + * to the amount we have to send, we may be able to queue it > + * afterall even though it's not sent to the database it's > + * ok, any routines that check the data coming from the > + * database better call pqFlush() anyway. > + */ > + if (nbytes > Max(conn->outBufSize - conn->outCount, 0)) > + { > + printfPQExpBuffer(&conn->errorMessage, > + "pqPutBytes -- pqFlush couldn't flush enough" > + " data: space available: %d, space needed %d\n", > + Max(conn->outBufSize - conn->outCount, 0), nbytes); > + return EOF; > + } > + } > + > + /* > + * the non-blocking code above makes sure that this isn't true, > + * essentially this is no-op > + */ > while (nbytes > avail) > { > memcpy(conn->outBuffer + conn->outCount, s, avail); > @@ -548,6 +576,14 @@ > return EOF; > } > > + /* > + * don't try to send zero data, allows us to use this function > + * without too much worry about overhead > + */ > + if (len == 0) > + return (0); > + > + /* while there's still data to send */ > while (len > 0) > { > /* Prevent being SIGPIPEd if backend has closed the connection. */ > @@ -556,6 +592,7 @@ > #endif > > int sent; > + > #ifdef USE_SSL > if (conn->ssl) > sent = SSL_write(conn->ssl, ptr, len); > @@ -585,6 +622,8 @@ > case EWOULDBLOCK: > break; > #endif > + case EINTR: > + continue; > > case EPIPE: > #ifdef ECONNRESET > @@ -616,13 +655,31 @@ > ptr += sent; > len -= sent; > } > + > if (len > 0) > { > /* We didn't send it all, wait till we can send more */ > + > + /* > + * if the socket is in non-blocking mode we may need > + * to abort here > + */ > +#ifdef USE_SSL > + /* can't do anything for our SSL users yet */ > + if (conn->ssl == NULL) > + { > +#endif > + if (PQisnonblocking(conn)) > + { > + /* shift the contents of the buffer */ > + memmove(conn->outBuffer, ptr, len); > + conn->outCount = len; > + return EOF; > + } > +#ifdef USE_SSL > + } > +#endif > > - /* At first glance this looks as though it should block. I think > - * that it will be OK though, as long as the socket is > - * non-blocking. */ > if (pqWait(FALSE, TRUE, conn)) > return EOF; > } > Index: libpq-fe.h > =================================================================== > RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-fe.h,v > retrieving revision 1.53 > diff -u -u -r1.53 libpq-fe.h > --- libpq-fe.h 1999/11/30 03:08:19 1.53 > +++ libpq-fe.h 1999/12/14 01:30:01 > @@ -269,6 +269,13 @@ > extern int PQputnbytes(PGconn *conn, const char *buffer, int nbytes); > extern int PQendcopy(PGconn *conn); > > + /* Set blocking/nonblocking connection to the backend */ > + extern int PQsetnonblocking(PGconn *conn, int arg); > + extern int PQisnonblocking(PGconn *conn); > + > + /* Force the write buffer to be written (or at least try) */ > + extern int PQflush(PGconn *conn); > + > /* > * "Fast path" interface --- not really recommended for application > * use > Index: libpq-int.h > =================================================================== > RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-int.h,v > retrieving revision 1.14 > diff -u -u -r1.14 libpq-int.h > --- libpq-int.h 1999/11/30 03:08:19 1.14 > +++ libpq-int.h 1999/12/14 01:30:01 > @@ -215,6 +215,9 @@ > int inEnd; /* offset to first position after avail > * data */ > > + int nonblocking; /* whether this connection is using a blocking > + * socket to the backend or not */ > + > /* Buffer for data not yet sent to backend */ > char *outBuffer; /* currently allocated buffer */ > int outBufSize; /* allocated size of buffer */ > > > ************ > > -- Bruce Momjian | http://www.op.net/~candle maillist@candle.pha.pa.us | (610) 853-3000+ If your life is a hard drive, | 830 Blythe Avenue + Christ can be your backup. | Drexel Hill, Pennsylvania19026
pgsql-hackers by date: