From 03a5bd46e5cdf1e551a64b16bb8915aead67ad85 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sun, 19 Mar 2023 16:07:20 +1300 Subject: [PATCH v2 3/6] Redesign Windows socket event management. Previously, we created a Winsock event handle for each socket in each WaitEventSet, and then we translated an FD_CLOSE event directly to WL_SOCKET_READABLE. Since FD_CLOSE is reported only once when the remote end shuts down gracefully, we could hang in rare scenarios where backend code relies on WL_SOCKET_READABLE being level-triggered. We got away with this in the past when the thing on the other end of the socket was another PostgreSQL server (ie via postgres_fdw, replication etc), because the remote server would exit without shutting down or closing its socket, and that produces a repeating 'abortive' FD_CLOSE. We'd like to change that as it also eats error messages, producing user complaints and random CI failures, but that's a sepaarate issue and we'll need to fix this first. New design: * for each socket, we now create just one event handle to be used by all WaitEventSet objects that are interested in the socket * for each socket, we now track a set of sticky events that are reported as poll() would until they are cleared by either the send()/recv() wrappers, or failing that by an explicit re-check The lifetime management of event handles and associated state is done by reference counting. --- src/backend/port/win32/socket.c | 364 +++++++++++++++++++++++++++++++ src/backend/storage/ipc/latch.c | 212 ++++++------------ src/include/port/win32_port.h | 6 + src/include/storage/latch.h | 3 - src/tools/pgindent/typedefs.list | 1 + 5 files changed, 441 insertions(+), 145 deletions(-) diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c index 9c339397d1..a7fa98cb1d 100644 --- a/src/backend/port/win32/socket.c +++ b/src/backend/port/win32/socket.c @@ -13,6 +13,8 @@ #include "postgres.h" +#include "common/hashfn.h" + /* * Indicate if pgwin32_recv() and pgwin32_send() should operate * in non-blocking mode. @@ -37,6 +39,77 @@ int pgwin32_noblock = 0; #undef recv #undef send +/* + * An entry in our socket table. + */ +typedef struct SocketTableEntry +{ + SOCKET sock; + char status; + + /* + * The reference count for the event handle. Client code that wants to + * use the event functions must acquire a reference and release it when + * finished. + */ + int reference_count; + + /* + * The FD_XXX events that were most recently selected for this socket + * number with WSAEventSelect(). + */ + int selected_events; + + /* + * The FD_XXX events already reported by Winsock, that we'll continue to + * report as long as they are true. They are cleared by our send/recv + * wrappers, because those are 're-enabling' functions that will cause + * Winsock to report them again. The are also cleared by an explicit + * check we perform for the benefit of hypothetical code that might be + * reach Winsock send/recv wrappers without going via our wrappers. + */ + int level_triggered_events; + + /* + * Windows kernel event most recently associated with the socket number. + */ + HANDLE event_handle; +} SocketTableEntry; + +static inline void * +malloc0(size_t size) +{ + void *result; + + result = malloc(size); + if (result) + memset(result, 0, size); + + return result; +} + +/* + * It almost seems feasible to use an array to store our per-socket state, + * based on the observation that Windows socket descriptors seem to be small + * integers as on Unix, but the manual warns against making that assumption. + * So we use a hash table. + */ + +#define SH_PREFIX socket_table +#define SH_ELEMENT_TYPE SocketTableEntry +#define SH_RAW_ALLOCATOR malloc0 +#define SH_RAW_FREE free +#define SH_SCOPE static inline +#define SH_KEY_TYPE SOCKET +#define SH_KEY sock +#define SH_HASH_KEY(tb, key) murmurhash32(key) +#define SH_EQUAL(tb, a, b) (a) == (b) +#define SH_DECLARE +#define SH_DEFINE +#include "lib/simplehash.h" + +static socket_table_hash * socket_table; + /* * Blocking socket functions implemented so they listen on both * the socket and the signal event, required for signal handling. @@ -310,6 +383,265 @@ pgwin32_socket(int af, int type, int protocol) return s; } +/* + * Check if any of FD_READ, FD_WRITE or FD_CLOSE is still true. Used to + * re-check level-triggered events. + */ +static int +pgwin32_socket_poll(SOCKET s, int events) +{ + int revents = 0; + + if (events & (FD_READ | FD_CLOSE)) + { + ssize_t rc; + char c; + + rc = recv(s, &c, 1, MSG_PEEK); + if (rc == 1) + { + /* At least one byte to read. */ + if (events & FD_READ) + revents |= FD_READ; + } + else if (rc == 0 || WSAGetLastError() != WSAEWOULDBLOCK) + { + /* EOF due to graceful shutdown, or error. */ + if (events & FD_CLOSE) + revents |= FD_CLOSE; + } + } + + if (events & FD_WRITE) + { + char c; + + /* If it looks like we could write or get an error, report that. */ + if (send(s, &c, 0, 0) == 0 || WSAGetLastError() != WSAEWOULDBLOCK) + revents |= FD_WRITE; + } + + return revents; +} + +/* + * Adjust the set of FD_XXX events this socket's event handle should wake up + * for. Returns 0 on success, otherwise -1 and sets errno. + */ +int +pgwin32_socket_select_events(SOCKET s, int selected_events) +{ + SocketTableEntry *entry; + + Assert(socket_table); + entry = socket_table_lookup(socket_table, s); + + Assert(entry); + Assert(entry->reference_count > 0); + Assert(entry->event_handle != WSA_INVALID_EVENT); + + /* Do nothing if no change. */ + if (selected_events == entry->selected_events) + return 0; + + /* + * Tell Winsock to link the socket to the event handle, and which events + * we're interested in. + */ + if (WSAEventSelect(s, entry->event_handle, selected_events) == SOCKET_ERROR) + { + TranslateSocketError(); + return -1; + } + + entry->selected_events = selected_events; + + /* + * The manual tells us: "Issuing a WSAEventSelect for a socket cancels any + * previous WSAAsyncSelect or WSAEventSelect for the same socket and + * clears the internal network event record." If that is true, we might + * have wiped an internal flag we're interested in. Close that race by + * triggering an explicit poll before we sleep, by pretending we have seen + * all of these events. + */ + if (selected_events & (FD_READ | FD_WRITE)) + entry->level_triggered_events = selected_events & (FD_READ | FD_WRITE | FD_CLOSE); + else + entry->level_triggered_events = 0; + + return 0; +} + +/* + * Before waiting on the event handle, check if we have pending + * level-triggered events that are still true, and if so take measures to + * prevent the sleep. + */ +void +pgwin32_socket_prepare_to_wait(SOCKET s) +{ + SocketTableEntry *entry; + + Assert(socket_table); + entry = socket_table_lookup(socket_table, s); + + Assert(entry); + Assert(entry->reference_count > 0); + Assert(entry->event_handle != WSA_INVALID_EVENT); + + /* + * If we're not waiting for FD_READ or FD_WRITE, don't try to poll the + * socket. Server sockets and client sockets that haven't connected yet + * can't be polled by that technique. + */ + if ((entry->selected_events & (FD_READ | FD_WRITE)) && + entry->level_triggered_events != 0) + { + /* + * Re-check the level-triggered events we have recorded. This is + * necessary because someone might access WSASend()/WSARecv() directly + * without going via our wrapper functions, so they might never be + * cleared otherwise. + */ + entry->level_triggered_events = + pgwin32_socket_poll(s, + entry->level_triggered_events & entry->selected_events); + if (entry->level_triggered_events) + { + /* + * At least one readiness condition is still true. Prevent + * sleeping, and let pgwin32_socket_enumerate_events() report + * these level-triggered events. + */ + WSASetEvent(entry->event_handle); + } + } +} + +/* + * After the Windows event handle has been signaled, this function can be + * called to find out which socket events occurred, and atomically reset the + * event handle for the next sleep. + * + * The events returned are also remembered in our level-triggered event mask, + * so they'll prevent sleeping and be reported again as long as they remain + * true. + */ +int +pgwin32_socket_enumerate_events(SOCKET s) +{ + WSANETWORKEVENTS new_events = {0}; + SocketTableEntry *entry; + int result; + + Assert(socket_table); + entry = socket_table_lookup(socket_table, s); + + Assert(entry); + Assert(entry->reference_count > 0); + Assert(entry->event_handle != WSA_INVALID_EVENT); + + /* + * Atomically consume the internal network event record and reset the + * associated event handle. This guarantees that we can't miss future + * wakeups. + */ + if (WSAEnumNetworkEvents(s, entry->event_handle, &new_events) != 0) + { + TranslateSocketError(); + return -1; + } + + /* Add any events pgwin32_socket_prepare_to_wait() decided to feed us. */ + result = entry->level_triggered_events | new_events.lNetworkEvents; + + /* Remember certain events for next time around. */ + if (entry->selected_events & (FD_READ | FD_WRITE)) + entry->level_triggered_events = result & (FD_READ | FD_WRITE | FD_CLOSE); + else + entry->level_triggered_events = 0; + + return result; +} + +/* + * Acquire a reference-counted Windows event handle for this socket. This can + * be used for waiting for socket events. Returns NULL and sets errno on + * failure. + */ +HANDLE +pgwin32_socket_acquire_event_handle(SOCKET s) +{ + SocketTableEntry *entry; + bool found; + + /* First-time initialization. */ + if (unlikely(socket_table == NULL)) + { + socket_table = socket_table_create(16, NULL); + if (socket_table == NULL) + { + errno = ENOMEM; + return NULL; + } + } + + /* If we already have it, just bump the count. */ + entry = socket_table_insert(socket_table, s, &found); + if (likely(found)) + { + Assert(entry->event_handle != WSA_INVALID_EVENT); + entry->reference_count++; + return entry->event_handle; + } + + /* Did we run out of memory? */ + if (entry == NULL) + { + errno = ENOMEM; + return NULL; + } + + /* Allocate a new event handle. */ + entry->event_handle = WSACreateEvent(); + if (entry->event_handle == WSA_INVALID_EVENT) + { + socket_table_delete_item(socket_table, entry); + errno = ENOMEM; + return NULL; + } + + entry->selected_events = 0; + entry->level_triggered_events = 0; + entry->reference_count = 1; + + return entry->event_handle; +} + +/* + * Release a reference-counted event handle. + */ +void +pgwin32_socket_release_event_handle(SOCKET s) +{ + SocketTableEntry *entry; + + Assert(socket_table); + entry = socket_table_lookup(socket_table, s); + + Assert(entry); + Assert(entry->reference_count > 0); + Assert(entry->event_handle != WSA_INVALID_EVENT); + + if (--entry->reference_count == 0) + { + WSACloseEvent(entry->event_handle); + socket_table_delete_item(socket_table, entry); + + /* XXX Free socket_table if it is empty? */ + } +} + int pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen) { @@ -402,6 +734,22 @@ pgwin32_recv(SOCKET s, char *buf, int len, int f) return -1; } + /* + * WSARecv() is a re-enabling function for Winsock's FD_READ event, so it + * is now safe to clear our level-triggered flag. This is only an + * optimization for a common case, and not required for correctness. If + * someone calls WSARecv() directly instead of going through this wrapper, + * pgwin32_socket_prepare_to_wait() will figure that out and clear it + * anyway. + */ + if (socket_table) + { + SocketTableEntry *entry = socket_table_lookup(socket_table, s); + + if (entry) + entry->level_triggered_events &= ~FD_READ; + } + if (pgwin32_noblock) { /* @@ -485,6 +833,22 @@ pgwin32_send(SOCKET s, const void *buf, int len, int flags) return -1; } + /* + * WSASend() is a re-enabling function for Winsock's FD_WRITE event, + * so it is now safe to clear our level-triggered flag. This is only + * an optimization for a common case, and not required for + * correctness. If someone calls WSASend() directly instead of going + * through this wrapper, pgwin32_socket_prepare_to_wait() will figure + * that out and clear it anyway. + */ + if (socket_table) + { + SocketTableEntry *entry = socket_table_lookup(socket_table, s); + + if (entry) + entry->level_triggered_events &= ~FD_WRITE; + } + if (pgwin32_noblock) { /* diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 2fd386a4ed..5bf03a3cd9 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -847,20 +847,9 @@ FreeWaitEventSet(WaitEventSet *set) cur_event < (set->events + set->nevents); cur_event++) { - if (cur_event->events & WL_LATCH_SET) - { - /* uses the latch's HANDLE */ - } - else if (cur_event->events & WL_POSTMASTER_DEATH) - { - /* uses PostmasterHandle */ - } - else - { - /* Clean up the event object we created for the socket */ - WSAEventSelect(cur_event->fd, NULL, 0); - WSACloseEvent(set->handles[cur_event->pos + 1]); - } + /* Release reference to socket's event handle. */ + if (cur_event->events & WL_SOCKET_MASK) + pgwin32_socket_release_event_handle(cur_event->fd); } #endif @@ -955,9 +944,6 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, event->fd = fd; event->events = events; event->user_data = user_data; -#ifdef WIN32 - event->reset = false; -#endif if (events == WL_LATCH_SET) { @@ -976,10 +962,21 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, } else if (events == WL_POSTMASTER_DEATH) { -#ifndef WIN32 +#if defined(WAIT_USE_WIN32) + set->handles[event->pos + 1] = PostmasterHandle; + event->fd = PGINVALID_SOCKET; +#else event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; #endif } + else if (events & WL_SOCKET_MASK) + { +#if defined(WAIT_USE_WIN32) + set->handles[event->pos + 1] = pgwin32_socket_acquire_event_handle(fd); + if (!set->handles[event->pos + 1]) + elog(ERROR, "could not acquire socket event handle: %m"); +#endif + } /* perform wait primitive specific initialization, if needed */ #if defined(WAIT_USE_EPOLL) @@ -1322,45 +1319,52 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) #endif #if defined(WAIT_USE_WIN32) +static int +ToWinsockEvents(int pg_events) +{ + int winsock_events = 0; + + if (pg_events & WL_SOCKET_READABLE) + winsock_events |= FD_CLOSE | FD_READ; + if (pg_events & WL_SOCKET_WRITEABLE) + winsock_events |= FD_CLOSE | FD_WRITE; + if (pg_events & WL_SOCKET_CONNECTED) + winsock_events |= FD_CLOSE | FD_CONNECT; + if (pg_events & WL_SOCKET_ACCEPT) + winsock_events |= FD_CLOSE | FD_ACCEPT; + + return winsock_events; +} + +static int +FromWinsockEvents(int winsock_events) +{ + int pg_events = 0; + + if (winsock_events & (FD_CLOSE | FD_READ)) + pg_events |= WL_SOCKET_READABLE; + if (winsock_events & (FD_CLOSE | FD_WRITE)) + pg_events |= WL_SOCKET_WRITEABLE; + if (winsock_events & (FD_CLOSE | FD_CONNECT)) + pg_events |= WL_SOCKET_CONNECTED; + if (winsock_events & (FD_CLOSE | FD_ACCEPT)) + pg_events |= WL_SOCKET_ACCEPT; + + return pg_events; +} + static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) { - HANDLE *handle = &set->handles[event->pos + 1]; - - if (event->events == WL_LATCH_SET) + if (event->events & WL_LATCH_SET) { - Assert(set->latch != NULL); - *handle = set->latch->event; + set->handles[event->pos + 1] = set->latch->event; } - else if (event->events == WL_POSTMASTER_DEATH) - { - *handle = PostmasterHandle; - } - else + else if (event->events & WL_SOCKET_MASK) { - int flags = FD_CLOSE; /* always check for errors/EOF */ - - if (event->events & WL_SOCKET_READABLE) - flags |= FD_READ; - if (event->events & WL_SOCKET_WRITEABLE) - flags |= FD_WRITE; - if (event->events & WL_SOCKET_CONNECTED) - flags |= FD_CONNECT; - if (event->events & WL_SOCKET_ACCEPT) - flags |= FD_ACCEPT; - - if (*handle == WSA_INVALID_EVENT) - { - *handle = WSACreateEvent(); - if (*handle == WSA_INVALID_EVENT) - elog(ERROR, "failed to create event for socket: error code %d", - WSAGetLastError()); - } - if (WSAEventSelect(event->fd, *handle, flags) != 0) - elog(ERROR, "failed to set up event for socket: error code %d", - WSAGetLastError()); - - Assert(event->fd != PGINVALID_SOCKET); + if (pgwin32_socket_select_events(event->fd, + ToWinsockEvents(event->events)) < 0) + elog(ERROR, "failed to set up event for socket: %m"); } } #endif @@ -1945,48 +1949,16 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, DWORD rc; WaitEvent *cur_event; - /* Reset any wait events that need it */ + /* + * Allow level-triggered events to be signaled, causing + * WaitForMultipleObjects() to return immediately. + */ for (cur_event = set->events; cur_event < (set->events + set->nevents); cur_event++) { - if (cur_event->reset) - { - WaitEventAdjustWin32(set, cur_event); - cur_event->reset = false; - } - - /* - * Windows does not guarantee to log an FD_WRITE network event - * indicating that more data can be sent unless the previous send() - * failed with WSAEWOULDBLOCK. While our caller might well have made - * such a call, we cannot assume that here. Therefore, if waiting for - * write-ready, force the issue by doing a dummy send(). If the dummy - * send() succeeds, assume that the socket is in fact write-ready, and - * return immediately. Also, if it fails with something other than - * WSAEWOULDBLOCK, return a write-ready indication to let our caller - * deal with the error condition. - */ - if (cur_event->events & WL_SOCKET_WRITEABLE) - { - char c; - WSABUF buf; - DWORD sent; - int r; - - buf.buf = &c; - buf.len = 0; - - r = WSASend(cur_event->fd, &buf, 1, &sent, 0, NULL, NULL); - if (r == 0 || WSAGetLastError() != WSAEWOULDBLOCK) - { - occurred_events->pos = cur_event->pos; - occurred_events->user_data = cur_event->user_data; - occurred_events->events = WL_SOCKET_WRITEABLE; - occurred_events->fd = cur_event->fd; - return 1; - } - } + if (cur_event->events & WL_SOCKET_MASK) + pgwin32_socket_prepare_to_wait(cur_event->fd); } /* @@ -2067,64 +2039,20 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, } else if (cur_event->events & WL_SOCKET_MASK) { - WSANETWORKEVENTS resEvents; - HANDLE handle = set->handles[cur_event->pos + 1]; + int winsock_events; + int pg_events; Assert(cur_event->fd); - occurred_events->fd = cur_event->fd; + winsock_events = pgwin32_socket_enumerate_events(cur_event->fd); + if (winsock_events < 0) + elog(ERROR, "could not enumerate socket events: %m"); - ZeroMemory(&resEvents, sizeof(resEvents)); - if (WSAEnumNetworkEvents(cur_event->fd, handle, &resEvents) != 0) - elog(ERROR, "failed to enumerate network events: error code %d", - WSAGetLastError()); - if ((cur_event->events & WL_SOCKET_READABLE) && - (resEvents.lNetworkEvents & FD_READ)) - { - /* data available in socket */ - occurred_events->events |= WL_SOCKET_READABLE; - - /*------ - * WaitForMultipleObjects doesn't guarantee that a read event - * will be returned if the latch is set at the same time. Even - * if it did, the caller might drop that event expecting it to - * reoccur on next call. So, we must force the event to be - * reset if this WaitEventSet is used again in order to avoid - * an indefinite hang. - * - * Refer - * https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx - * for the behavior of socket events. - *------ - */ - cur_event->reset = true; - } - if ((cur_event->events & WL_SOCKET_WRITEABLE) && - (resEvents.lNetworkEvents & FD_WRITE)) - { - /* writeable */ - occurred_events->events |= WL_SOCKET_WRITEABLE; - } - if ((cur_event->events & WL_SOCKET_CONNECTED) && - (resEvents.lNetworkEvents & FD_CONNECT)) - { - /* connected */ - occurred_events->events |= WL_SOCKET_CONNECTED; - } - if ((cur_event->events & WL_SOCKET_ACCEPT) && - (resEvents.lNetworkEvents & FD_ACCEPT)) - { - /* incoming connection could be accepted */ - occurred_events->events |= WL_SOCKET_ACCEPT; - } - if (resEvents.lNetworkEvents & FD_CLOSE) - { - /* EOF/error, so signal all caller-requested socket flags */ - occurred_events->events |= (cur_event->events & WL_SOCKET_MASK); - } - - if (occurred_events->events != 0) + pg_events = FromWinsockEvents(winsock_events) & cur_event->events; + if (pg_events) { + occurred_events->fd = cur_event->fd; + occurred_events->events = pg_events; occurred_events++; returned_events++; } diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h index 27a11c7868..a0ed6aaeaa 100644 --- a/src/include/port/win32_port.h +++ b/src/include/port/win32_port.h @@ -506,6 +506,12 @@ extern int pgwin32_recv(SOCKET s, char *buf, int len, int flags); extern int pgwin32_send(SOCKET s, const void *buf, int len, int flags); extern int pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout); +extern HANDLE pgwin32_socket_acquire_event_handle(SOCKET s); +extern void pgwin32_socket_release_event_handle(SOCKET s); +extern int pgwin32_socket_select_events(SOCKET s, int events); +extern void pgwin32_socket_prepare_to_wait(SOCKET s); +extern int pgwin32_socket_enumerate_events(SOCKET s); + extern PGDLLIMPORT int pgwin32_noblock; #endif /* FRONTEND */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 99cc47874a..cbcc5ef23f 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -153,9 +153,6 @@ typedef struct WaitEvent uint32 events; /* triggered events */ pgsocket fd; /* socket fd associated with event */ void *user_data; /* pointer provided in AddWaitEventToSet */ -#ifdef WIN32 - bool reset; /* Is reset of the event required? */ -#endif } WaitEvent; /* forward declaration to avoid exposing latch.c implementation details */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index bf50a32119..15dd7fa2b8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2585,6 +2585,7 @@ Snapshot SnapshotData SnapshotType SockAddr +SocketTableEntry Sort SortBy SortByDir -- 2.42.0