From fd9660170073fd86da033608f52c604b9664ae91 Mon Sep 17 00:00:00 2001 From: Jacob Champion Date: Tue, 10 Jun 2025 16:38:59 -0700 Subject: [PATCH 1/4] oauth: Remove stale events from the kqueue multiplexer If a socket is added to the kqueue, becomes readable/writable, and subsequently becomes non-readable/writable again, the kqueue itself will remain readable until either the socket registration is removed, or the stale event is cleared via a call to kevent(). In many simple cases, Curl itself will remove the socket registration quickly, but in real-world usage, this is not guaranteed to happen. The kqueue can then remain stuck in a permanently readable state until the request ends, which results in pointless wakeups for the client and wasted CPU time. Implement drain_socket_events() to call kevent() and unstick any stale events. This is called right after drive_request(), before we return control to the client to wait. To make sure we've taken a look at the entire queue, register_socket() now tracks the number of outstanding registrations. Suggested-by: Thomas Munro --- src/interfaces/libpq-oauth/oauth-curl.c | 218 ++++++++++++++++++------ 1 file changed, 166 insertions(+), 52 deletions(-) diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c index dba9a684fa8..8430356cfb5 100644 --- a/src/interfaces/libpq-oauth/oauth-curl.c +++ b/src/interfaces/libpq-oauth/oauth-curl.c @@ -278,6 +278,10 @@ struct async_ctx bool user_prompted; /* have we already sent the authz prompt? */ bool used_basic_auth; /* did we send a client secret? */ bool debugging; /* can we give unsafe developer assistance? */ + +#if defined(HAVE_SYS_EVENT_H) + int nevents; /* how many events are we waiting on? */ +#endif }; /* @@ -1291,41 +1295,95 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx, return 0; #elif defined(HAVE_SYS_EVENT_H) - struct kevent ev[2] = {0}; + struct kevent ev[2]; struct kevent ev_out[2]; struct timespec timeout = {0}; - int nev = 0; + int nev; int res; + /* + * First, any existing registrations for this socket need to be removed, + * both to track the outstanding number of events, and to ensure that + * we're not woken up for things that Curl no longer cares about. + * + * ENOENT is okay, but we have to track how many we get, so use + * EV_RECEIPT. + */ + nev = 0; + EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0); + nev++; + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0); + nev++; + + Assert(nev <= lengthof(ev)); + Assert(nev <= lengthof(ev_out)); + + res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout); + if (res < 0) + { + actx_error(actx, "could not delete from kqueue: %m"); + return -1; + } + + /* + * We can't use the simple errno version of kevent, because we need to + * skip over ENOENT while still allowing a second change to be processed. + * So we need a longer-form error checking loop. + */ + for (int i = 0; i < res; ++i) + { + /* + * EV_RECEIPT should guarantee one EV_ERROR result for every change, + * whether successful or not. Failed entries contain a non-zero errno + * in the data field. + */ + Assert(ev_out[i].flags & EV_ERROR); + + errno = ev_out[i].data; + if (!errno) + { + /* Successfully removed; update the event count. */ + Assert(actx->nevents > 0); + actx->nevents--; + } + else if (errno != ENOENT) + { + actx_error(actx, "could not delete from kqueue: %m"); + return -1; + } + } + + /* If we're only removing registrations, we're done. */ + if (what == CURL_POLL_REMOVE) + return 0; + + /* + * Now add the new filters. This is more straightfoward than deletion. + * + * Combining this kevent() call with the one above seems like it should be + * theoretically possible, but beware that not all BSDs keep the original + * event flags when using EV_RECEIPT, so it's tricky to figure out which + * operations succeeded. For now we keep the deletions and the additions + * separate. + */ + nev = 0; + switch (what) { case CURL_POLL_IN: - EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0); nev++; break; case CURL_POLL_OUT: - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0); nev++; break; case CURL_POLL_INOUT: - EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0); - nev++; - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0); - nev++; - break; - - case CURL_POLL_REMOVE: - - /* - * We don't know which of these is currently registered, perhaps - * both, so we try to remove both. This means we need to tolerate - * ENOENT below. - */ - EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0); nev++; - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0); nev++; break; @@ -1334,45 +1392,91 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx, return -1; } - res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout); + Assert(nev <= lengthof(ev)); + + res = kevent(actx->mux, ev, nev, NULL, 0, NULL); if (res < 0) { actx_error(actx, "could not modify kqueue: %m"); return -1; } + /* Update the event count, and we're done. */ + actx->nevents += nev; + + return 0; +#else +#error register_socket is not implemented on this platform +#endif +} + +/*------- + * Drains any stale level-triggered events out of the multiplexer. This is + * necessary only if the mux implementation requires it. + * + * As an example, consider the following sequence of events: + * 1. libcurl tries to write data to the send buffer, but it fills up. + * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the + * client to wait. + * 3. The kernel partially drains the send buffer. The socket becomes writable, + * and the client wakes up and calls back into the flow. + * 4. libcurl continues writing data to the send buffer, but it fills up again. + * The socket is no longer writable. + * + * At this point, an epoll-based mux no longer signals readiness, so nothing + * further needs to be done. But a kqueue-based mux will continue to signal + * "ready" until either the EVFILT_WRITE registration is dropped for the socket, + * or the old socket-writable event is read from the queue. Since Curl isn't + * guaranteed to do the former, we must do the latter here. + */ +static bool +drain_socket_events(struct async_ctx *actx) +{ +#if defined(HAVE_SYS_EPOLL_H) + /* The epoll implementation doesn't need to drain pending events. */ + return true; +#elif defined(HAVE_SYS_EVENT_H) + struct timespec timeout = {0}; + struct kevent *drain; + int drain_len; + /* - * We can't use the simple errno version of kevent, because we need to - * skip over ENOENT while still allowing a second change to be processed. - * So we need a longer-form error checking loop. + * Drain the events in one call, rather than looping. (We could maybe call + * kevent() drain_len times, instead of allocating space for the maximum + * number of events, but that relies on the events being in FIFO order to + * avoid starvation. The kqueue man pages don't seem to make any + * guarantees about that.) + * + * register_socket() keeps actx->nevents updated with the number of + * outstanding event filters. We don't track the registration of the + * timer; we just assume one could be registered here. */ - for (int i = 0; i < res; ++i) + drain_len = actx->nevents + 1; + + drain = malloc(sizeof(*drain) * drain_len); + if (!drain) { - /* - * EV_RECEIPT should guarantee one EV_ERROR result for every change, - * whether successful or not. Failed entries contain a non-zero errno - * in the data field. - */ - Assert(ev_out[i].flags & EV_ERROR); + actx_error(actx, "out of memory"); + return false; + } - errno = ev_out[i].data; - if (errno && errno != ENOENT) - { - switch (what) - { - case CURL_POLL_REMOVE: - actx_error(actx, "could not delete from kqueue: %m"); - break; - default: - actx_error(actx, "could not add to kqueue: %m"); - } - return -1; - } + /* + * Discard all pending events. Since our registrations are level-triggered + * (even the timer, since we use a chained kqueue for that instead of an + * EVFILT_TIMER on the top-level mux!), any events that we still need will + * remain signalled, and the stale ones will be swept away. + */ + if (kevent(actx->mux, NULL, 0, drain, drain_len, &timeout) < 0) + { + actx_error(actx, "could not drain kqueue: %m"); + free(drain); + return false; } - return 0; + free(drain); + return true; #else -#error register_socket is not implemented on this platform +#error drain_socket_events is not implemented on this platform #endif } @@ -1441,7 +1545,8 @@ set_timer(struct async_ctx *actx, long timeout) * macOS.) * * If there was no previous timer set, the kevent calls will result in - * ENOENT, which is fine. + * ENOENT, which is fine. (We don't track actx->nevents for this case; + * instead, drain_socket_events() just assumes a timer could be set.) */ EV_SET(&ev, 1, EVFILT_TIMER, EV_DELETE, 0, 0, 0); if (kevent(actx->timerfd, &ev, 1, NULL, 0, NULL) < 0 && errno != ENOENT) @@ -2755,13 +2860,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn) if (status == PGRES_POLLING_FAILED) goto error_return; - else if (status != PGRES_POLLING_OK) - { - /* not done yet */ - return status; - } + else if (status == PGRES_POLLING_OK) + break; /* done! */ + + /* + * This request is still running. + * + * Drain any stale socket events from the mux before we + * ask the client to poll. (Currently, this can occur only + * with kqueue.) If this is forgotten, the multiplexer can + * get stuck in a signalled state and we'll burn CPU + * cycles pointlessly. + */ + if (!drain_socket_events(actx)) + goto error_return; - break; + return status; } case OAUTH_STEP_WAIT_INTERVAL: -- 2.34.1