Re: Fix for non-blocking connections in libpq - Mailing list pgsql-patches
From | Bruce Momjian |
---|---|
Subject | Re: Fix for non-blocking connections in libpq |
Date | |
Msg-id | 200201250227.g0P2Rfx11500@candle.pha.pa.us Whole thread Raw |
In response to | Fix for non-blocking connections in libpq (Bernhard Herzog <bh@intevation.de>) |
List | pgsql-patches |
This has been saved for the 7.3 release: http://candle.pha.pa.us/cgi-bin/pgpatches2 --------------------------------------------------------------------------- Bernhard Herzog wrote: > > Here's a patch against 7.1.3 that fixes a problem with sending larger > queries over non-blocking connections with libpq. "Larger" here > basically means that it doesn't fit into the output buffer. > > The basic strategy is to fix pqFlush and pqPutBytes. > > The problem with pqFlush as it stands now is that it returns EOF when an > error occurs or when not all data could be sent. The latter case is > clearly not an error for a non-blocking connection but the caller can't > distringuish it from an error very well. > > The first part of the fix is therefore to fix pqFlush. This is done by > to renaming it to pqSendSome which only differs from pqFlush in its > return values to allow the caller to make the above distinction and a > new pqFlush which is implemented in terms of pqSendSome and behaves > exactly like the old pqFlush. > > The second part of the fix modifies pqPutBytes to use pqSendSome instead > of pqFlush and to either send all the data or if not all data can be > sent on a non-blocking connection to at least put all data into the > output buffer, enlarging it if necessary. The callers of pqPutBytes > don't have to be changed because from their point of view pqPutBytes > behaves like before. It either succeeds in queueing all output data or > fails with an error. > > I've also added a new API function PQsendSome which analogously to > PQflush just calls pqSendSome. Programs using non-blocking queries > should use this new function. The main difference is that this function > will have to be called repeatedly (calling select() properly in between) > until all data has been written. > > AFAICT, the code in CVS HEAD hasn't changed with respect to non-blocking > queries and this fix should work there, too, but I haven't tested that > yet. > > > Bernhard > > diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c postgresql-7.1.3/src/interfaces/libpq/fe-exec.c > *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c Sat Feb 10 03:31:30 2001 > --- postgresql-7.1.3/src/interfaces/libpq/fe-exec.c Tue Jan 22 12:12:58 2002 > *************** > *** 2214,2216 **** > --- 2214,2224 ---- > > return (pqFlush(conn)); > } > + > + /* try to force data out, really only useful for non-blocking users. > + * This implementation actually works for non-blocking connections */ > + int > + PQsendSome(PGconn *conn) > + { > + return pqSendSome(conn); > + } > diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c postgresql-7.1.3/src/interfaces/libpq/fe-misc.c > *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c Sun Apr 1 01:13:30 2001 > --- postgresql-7.1.3/src/interfaces/libpq/fe-misc.c Tue Jan 22 12:07:25 2002 > *************** > *** 90,144 **** > static int > pqPutBytes(const char *s, size_t nbytes, PGconn *conn) > { > ! size_t avail = Max(conn->outBufSize - conn->outCount, 0); > ! > ! /* > ! * if we are non-blocking and the send queue is too full to buffer > ! * this request then try to flush some and return an error > */ > - if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn)) > - { > > ! /* > ! * even if the flush failed we may still have written some data, > ! * recalculate the size of the send-queue relative to the amount > ! * we have to send, we may be able to queue it afterall even > ! * though it's not sent to the database it's ok, any routines that > ! * check the data coming from the database better call pqFlush() > ! * anyway. > ! */ > ! if (nbytes > Max(conn->outBufSize - conn->outCount, 0)) > ! { > ! printfPQExpBuffer(&conn->errorMessage, > ! "pqPutBytes -- pqFlush couldn't flush enough" > ! " data: space available: %d, space needed %d\n", > ! Max(conn->outBufSize - conn->outCount, 0), nbytes); > ! return EOF; > ! } > ! /* fixup avail for while loop */ > avail = Max(conn->outBufSize - conn->outCount, 0); > ! } > > ! /* > ! * is the amount of data to be sent is larger than the size of the > ! * output buffer then we must flush it to make more room. > ! * > ! * the code above will make sure the loop conditional is never true for > ! * non-blocking connections > ! */ > ! while (nbytes > avail) > ! { > ! memcpy(conn->outBuffer + conn->outCount, s, avail); > ! conn->outCount += avail; > ! s += avail; > ! nbytes -= avail; > ! if (pqFlush(conn)) > ! return EOF; > ! avail = conn->outBufSize; > ! } > > ! memcpy(conn->outBuffer + conn->outCount, s, nbytes); > ! conn->outCount += nbytes; > > return 0; > } > --- 90,163 ---- > static int > pqPutBytes(const char *s, size_t nbytes, PGconn *conn) > { > ! /* Strategy to handle blocking and non-blocking connections: Fill > ! * the output buffer and flush it repeatedly until either all data > ! * has been sent or is at least queued in the buffer. > ! * > ! * For non-blocking connections, grow the buffer if not all data > ! * fits into it and the buffer can't be sent because the socket > ! * would block. > */ > > ! while (nbytes) > ! { > ! size_t avail, remaining; > ! > ! /* fill the output buffer */ > avail = Max(conn->outBufSize - conn->outCount, 0); > ! remaining = Min(avail, nbytes); > ! memcpy(conn->outBuffer + conn->outCount, s, remaining); > ! conn->outCount += remaining; > ! s += remaining; > ! nbytes -= remaining; > ! > ! /* if the data didn't fit completely into the buffer, try to > ! * flush the buffer */ > ! if (nbytes) > ! { > ! int send_result = pqSendSome(conn); > > ! /* if there were errors, report them */ > ! if (send_result < 0) > ! return EOF; > > ! /* if not all data could be sent, increase the output > ! * buffer, put the rest of s into it and return > ! * successfully. This case will only happen in a > ! * non-blocking connection > ! */ > ! if (send_result > 0) > ! { > ! /* try to grow the buffer. > ! * FIXME: The new size could be chosen more > ! * intelligently. > ! */ > ! size_t buflen = conn->outCount + nbytes; > ! if (buflen > conn->outBufSize) > ! { > ! char * newbuf = realloc(conn->outBuffer, buflen); > ! if (!newbuf) > ! { > ! /* realloc failed. Probably out of memory */ > ! printfPQExpBuffer(&conn->errorMessage, > ! "cannot allocate memory for output buffer\n"); > ! return EOF; > ! } > ! conn->outBuffer = newbuf; > ! conn->outBufSize = buflen; > ! } > ! /* put the data into it */ > ! memcpy(conn->outBuffer + conn->outCount, s, nbytes); > ! conn->outCount += nbytes; > ! > ! /* report success. */ > ! return 0; > ! } > ! } > ! > ! /* pqSendSome was able to send all data. Continue with the next > ! * chunk of s. */ > ! } /* while */ > > return 0; > } > *************** > *** 575,584 **** > } > > /* --------------------------------------------------------------------- */ > ! /* pqFlush: send any data waiting in the output buffer > */ > int > ! pqFlush(PGconn *conn) > { > char *ptr = conn->outBuffer; > int len = conn->outCount; > --- 594,615 ---- > } > > /* --------------------------------------------------------------------- */ > ! /* pqSendSome: send any data waiting in the output buffer and return 0 > ! * if all data was sent, -1 if an error occurred or 1 if not all data > ! * could be written because the socket would have blocked. > ! * > ! * For a blocking connection all data will be sent unless an error > ! * occurrs. -1 will only be returned if the connection is non-blocking. > ! * > ! * Internally, the case of data remaining in the buffer after pqSendSome > ! * could be determined by looking at outCount, but this function also > ! * serves as the implementation of the new API function PQsendsome. > ! * > ! * FIXME: perhaps it would be more useful to return the number of bytes > ! * remaining? > */ > int > ! pqSendSome(PGconn *conn) > { > char *ptr = conn->outBuffer; > int len = conn->outCount; > *************** > *** 586,593 **** > if (conn->sock < 0) > { > printfPQExpBuffer(&conn->errorMessage, > ! "pqFlush() -- connection not open\n"); > ! return EOF; > } > > /* > --- 617,624 ---- > if (conn->sock < 0) > { > printfPQExpBuffer(&conn->errorMessage, > ! "pqSendSome() -- connection not open\n"); > ! return -1; > } > > /* > *************** > *** 645,651 **** > case ECONNRESET: > #endif > printfPQExpBuffer(&conn->errorMessage, > ! "pqFlush() -- backend closed the channel unexpectedly.\n" > "\tThis probably means the backend terminated abnormally" > " before or while processing the request.\n"); > > --- 676,682 ---- > case ECONNRESET: > #endif > printfPQExpBuffer(&conn->errorMessage, > ! "pqSendSome() -- backend closed the channel unexpectedly.\n" > "\tThis probably means the backend terminated abnormally" > " before or while processing the request.\n"); > > *************** > *** 657,670 **** > * the socket open until pqReadData finds no more data > * can be read. > */ > ! return EOF; > > default: > printfPQExpBuffer(&conn->errorMessage, > ! "pqFlush() -- couldn't send data: errno=%d\n%s\n", > errno, strerror(errno)); > /* We don't assume it's a fatal error... */ > ! return EOF; > } > } > else > --- 688,701 ---- > * the socket open until pqReadData finds no more data > * can be read. > */ > ! return -1; > > default: > printfPQExpBuffer(&conn->errorMessage, > ! "pqSendSome() -- couldn't send data: errno=%d\n%s\n", > errno, strerror(errno)); > /* We don't assume it's a fatal error... */ > ! return -1; > } > } > else > *************** > *** 679,685 **** > > /* > * if the socket is in non-blocking mode we may need to abort > ! * here > */ > #ifdef USE_SSL > /* can't do anything for our SSL users yet */ > --- 710,716 ---- > > /* > * if the socket is in non-blocking mode we may need to abort > ! * here and return 1 to indicate that data is still pending. > */ > #ifdef USE_SSL > /* can't do anything for our SSL users yet */ > *************** > *** 691,704 **** > /* shift the contents of the buffer */ > memmove(conn->outBuffer, ptr, len); > conn->outCount = len; > ! return EOF; > } > #ifdef USE_SSL > } > #endif > > if (pqWait(FALSE, TRUE, conn)) > ! return EOF; > } > } > > --- 722,735 ---- > /* shift the contents of the buffer */ > memmove(conn->outBuffer, ptr, len); > conn->outCount = len; > ! return 1; > } > #ifdef USE_SSL > } > #endif > > if (pqWait(FALSE, TRUE, conn)) > ! return -1; > } > } > > *************** > *** 707,712 **** > --- 738,762 ---- > if (conn->Pfdebug) > fflush(conn->Pfdebug); > > + return 0; > + } > + > + > + /* --------------------------------------------------------------------- */ > + /* pqFlush: send any data waiting in the output buffer > + * > + * Implemented in terms of pqSendSome to recreate the old behavior which > + * returned 0 if all data was sent or EOF. EOF was sent regardless of > + * whether an error occurred or not all data was sent on a non-blocking > + * socket. > + */ > + int > + pqFlush(PGconn *conn) > + { > + if (pqSendSome(conn)) > + { > + return EOF; > + } > return 0; > } > > diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h > *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h Thu Mar 22 05:01:27 2001 > --- postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h Tue Jan 22 12:13:17 2002 > *************** > *** 265,270 **** > --- 265,273 ---- > > /* Force the write buffer to be written (or at least try) */ > extern int PQflush(PGconn *conn); > + /* Force the write buffer to be written (or at least try) > + (better than PQflush) */ > + extern int PQsendSome(PGconn *conn); > > /* > * "Fast path" interface --- not really recommended for application > diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h postgresql-7.1.3/src/interfaces/libpq/libpq-int.h > *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h Thu Mar 22 05:01:27 2001 > --- postgresql-7.1.3/src/interfaces/libpq/libpq-int.h Tue Jan 22 12:13:17 2002 > *************** > *** 321,326 **** > --- 321,327 ---- > extern int pqPutInt(int value, size_t bytes, PGconn *conn); > extern int pqReadData(PGconn *conn); > extern int pqFlush(PGconn *conn); > + extern int pqSendSome(PGconn *conn); > extern int pqWait(int forRead, int forWrite, PGconn *conn); > extern int pqReadReady(PGconn *conn); > extern int pqWriteReady(PGconn *conn); > > > > -- > Intevation GmbH http://intevation.de/ > Sketch http://sketch.sourceforge.net/ > MapIt! http://mapit.de/ > > > > ---------------------------(end of broadcast)--------------------------- > TIP 5: Have you checked our extensive FAQ? > > http://www.postgresql.org/users-lounge/docs/faq.html > -- Bruce Momjian | http://candle.pha.pa.us pgman@candle.pha.pa.us | (610) 853-3000 + If your life is a hard drive, | 830 Blythe Avenue + Christ can be your backup. | Drexel Hill, Pennsylvania 19026
pgsql-patches by date: