From e3c4bb6245d0329f53d5545f487c996da1948ade Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 27 Oct 2022 16:37:28 +1300 Subject: [PATCH] Merge FeBeWaitSet and LatchWaitSet. Instead of using two epoll/kqueue kernel objects and descriptors in every backend process, create a new common one called BackendWaitSet. In backends that are connected to a FeBe socket, WaitLatch() might occasionally report a socket readiness event if data arrives, but that's not expected to happen much and can be lazily suppressed, and reenabled at the next FeBe wait. --- src/backend/libpq/be-secure.c | 10 ++-- src/backend/libpq/pqcomm.c | 21 +++----- src/backend/replication/walsender.c | 4 +- src/backend/storage/ipc/latch.c | 74 +++++++++++++++++++---------- src/backend/utils/init/miscinit.c | 12 ++--- src/include/libpq/libpq.h | 6 --- src/include/storage/latch.h | 9 +++- 7 files changed, 77 insertions(+), 59 deletions(-) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index e3e54713e8..431abd58ce 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -179,9 +179,10 @@ retry: Assert(waitfor); - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL); + ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, MyLatch); + ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, waitfor, NULL); - WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, + WaitEventSetWait(BackendWaitSet, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_READ); /* @@ -291,9 +292,10 @@ retry: Assert(waitfor); - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL); + ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, MyLatch); + ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, waitfor, NULL); - WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, + WaitEventSetWait(BackendWaitSet, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_WRITE); /* See comments in secure_read. */ diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index ce56ab1d41..1762e3a6ba 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -161,8 +161,6 @@ static const PQcommMethods PqCommSocketMethods = { const PQcommMethods *PqCommMethods = &PqCommSocketMethods; -WaitEventSet *FeBeWaitSet; - /* -------------------------------- * pq_init - initialize libpq at backend startup @@ -172,7 +170,6 @@ void pq_init(void) { int socket_pos PG_USED_FOR_ASSERTS_ONLY; - int latch_pos PG_USED_FOR_ASSERTS_ONLY; /* initialize state variables */ PqSendBufferSize = PQ_SEND_BUFFER_SIZE; @@ -200,20 +197,14 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif - FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, FeBeWaitSetNEvents); - socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, + socket_pos = AddWaitEventToSet(BackendWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); - latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, - NULL, NULL); /* * The event positions match the order we added them, but let's sanity * check them to be sure. */ - Assert(socket_pos == FeBeWaitSetSocketPos); - Assert(latch_pos == FeBeWaitSetLatchPos); + Assert(socket_pos == BackendWaitSetSocketPos); } /* -------------------------------- @@ -2022,17 +2013,17 @@ show_tcp_user_timeout(void) bool pq_check_connection(void) { - WaitEvent events[FeBeWaitSetNEvents]; + WaitEvent events[BackendWaitSetSocketPos + 1]; int rc; /* * It's OK to modify the socket event filter without restoring, because - * all FeBeWaitSet socket wait sites do the same. + * all BackendWaitSet socket wait sites do the same. */ - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL); + ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, WL_SOCKET_CLOSED, NULL); retry: - rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0); + rc = WaitEventSetWait(BackendWaitSet, 0, events, lengthof(events), 0); for (int i = 0; i < rc; ++i) { if (events[i].events & WL_SOCKET_CLOSED) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a81ef6a201..d539dabb37 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3320,8 +3320,8 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) { WaitEvent event; - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL); - if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 && + ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, socket_events, NULL); + if (WaitEventSetWait(BackendWaitSet, timeout, &event, 1, wait_event) == 1 && (event.events & WL_POSTMASTER_DEATH)) proc_exit(1); } diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index eb3a569aae..89574b544f 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -151,11 +151,14 @@ struct WaitEventSet #endif }; -/* A common WaitEventSet used to implement WatchLatch() */ -static WaitEventSet *LatchWaitSet; +/* A common WaitEventSet used to implement WatchLatch() and Fe/Be comms */ +WaitEventSet *BackendWaitSet; -/* The position of the latch in LatchWaitSet. */ -#define LatchWaitSetLatchPos 0 +/* The position of the latch in BackendWaitSet. */ +#define BackendWaitSetLatchPos 0 + +/* In backends that have a FeBe socket, it is here (see pqcomm.c). */ +#define BackendWaitSetSocketPos 2 #ifndef WIN32 /* Are we currently in WaitLatch? The signal handler would like to know. */ @@ -302,21 +305,23 @@ InitializeLatchSupport(void) } void -InitializeLatchWaitSet(void) +InitializeBackendWaitSet(void) { int latch_pos PG_USED_FOR_ASSERTS_ONLY; - Assert(LatchWaitSet == NULL); + Assert(BackendWaitSet == NULL); - /* Set up the WaitEventSet used by WaitLatch(). */ - LatchWaitSet = CreateWaitEventSet(TopMemoryContext, 2); - latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, + /* Set up the WaitEventSet used by WaitLatch() and libpq.c. */ + BackendWaitSet = CreateWaitEventSet(TopMemoryContext, 3); + latch_pos = AddWaitEventToSet(BackendWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + Assert(latch_pos == BackendWaitSetLatchPos); + if (IsUnderPostmaster) - AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH, + AddWaitEventToSet(BackendWaitSet, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); - Assert(latch_pos == LatchWaitSetLatchPos); + /* libpq.c may also add a socket if connected to the frontend */ } void @@ -326,10 +331,10 @@ ShutdownLatchSupport(void) pqsignal(SIGURG, SIG_IGN); #endif - if (LatchWaitSet) + if (BackendWaitSet) { - FreeWaitEventSet(LatchWaitSet); - LatchWaitSet = NULL; + FreeWaitEventSet(BackendWaitSet); + BackendWaitSet = NULL; } #if defined(WAIT_USE_SELF_PIPE) @@ -477,6 +482,7 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info) { WaitEvent event; + int result = 0; /* Postmaster-managed callers must handle postmaster death somehow. */ Assert(!IsUnderPostmaster || @@ -490,17 +496,32 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout, */ if (!(wakeEvents & WL_LATCH_SET)) latch = NULL; - ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch); - LatchWaitSet->exit_on_postmaster_death = - ((wakeEvents & WL_EXIT_ON_PM_DEATH) != 0); - - if (WaitEventSetWait(LatchWaitSet, - (wakeEvents & WL_TIMEOUT) ? timeout : -1, - &event, 1, - wait_event_info) == 0) - return WL_TIMEOUT; - else - return event.events; + ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, latch); + + do + { + if (WaitEventSetWait(BackendWaitSet, + (wakeEvents & WL_TIMEOUT) ? timeout : -1, + &event, 1, + wait_event_info) == 0) + result = WL_TIMEOUT; + else if (event.events & WL_SOCKET_MASK) + { + /* + * Lazily shut off socket readiness events if they occur while + * we're just waiting for a latch or timeout. + */ + ModifyWaitEvent(BackendWaitSet, event.pos, WL_SOCKET_IGNORE, NULL); + } + else + result = event.events; + } while (result == 0); + + if (result == WL_POSTMASTER_DEATH && + (wakeEvents & WL_EXIT_ON_PM_DEATH)) + proc_exit(1); + + return result; } /* @@ -1069,6 +1090,7 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) Assert(event->fd != PGINVALID_SOCKET); Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE | + WL_SOCKET_IGNORE | WL_SOCKET_CLOSED)); if (event->events & WL_SOCKET_READABLE) @@ -1117,6 +1139,7 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) { Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE | + WL_SOCKET_IGNORE | WL_SOCKET_CLOSED)); pollfd->events = 0; if (event->events & WL_SOCKET_READABLE) @@ -1201,6 +1224,7 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) event->events == WL_POSTMASTER_DEATH || (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE | + WL_SOCKET_IGNORE | WL_SOCKET_CLOSED))); if (event->events == WL_POSTMASTER_DEATH) diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index dfdcd3d78e..76f99f1700 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -137,7 +137,7 @@ InitPostmasterChild(void) InitializeLatchSupport(); MyLatch = &LocalLatchData; InitLatch(MyLatch); - InitializeLatchWaitSet(); + InitializeBackendWaitSet(); /* * If possible, make this process a group leader, so that the postmaster @@ -191,7 +191,7 @@ InitStandaloneProcess(const char *argv0) InitializeLatchSupport(); MyLatch = &LocalLatchData; InitLatch(MyLatch); - InitializeLatchWaitSet(); + InitializeBackendWaitSet(); /* * For consistency with InitPostmasterChild, initialize signal mask here. @@ -220,8 +220,8 @@ SwitchToSharedLatch(void) MyLatch = &MyProc->procLatch; - if (FeBeWaitSet) - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, + if (BackendWaitSet) + ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, MyLatch); /* @@ -240,8 +240,8 @@ SwitchBackToLocalLatch(void) MyLatch = &LocalLatchData; - if (FeBeWaitSet) - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, + if (BackendWaitSet) + ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, MyLatch); SetLatch(MyLatch); diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 2de7d9bad2..111f2e3297 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -58,12 +58,6 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods; /* * prototypes for functions in pqcomm.c */ -extern PGDLLIMPORT WaitEventSet *FeBeWaitSet; - -#define FeBeWaitSetSocketPos 0 -#define FeBeWaitSetLatchPos 1 -#define FeBeWaitSetNEvents 3 - extern int StreamServerPort(int family, const char *hostName, unsigned short portNumber, const char *unixSocketDir, pgsocket ListenSocket[], int MaxListen); diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 68ab740f16..b2e2d8c3f9 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -135,6 +135,7 @@ typedef struct Latch #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE #endif #define WL_SOCKET_CLOSED (1 << 7) +#define WL_SOCKET_IGNORE (1 << 8) /* temporarily ignore a socket */ #define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ WL_SOCKET_WRITEABLE | \ WL_SOCKET_CONNECTED | \ @@ -154,6 +155,12 @@ typedef struct WaitEvent /* forward declaration to avoid exposing latch.c implementation details */ typedef struct WaitEventSet WaitEventSet; +/* extern for pgcomm.c to access */ +extern PGDLLIMPORT WaitEventSet *BackendWaitSet; +#define BackendWaitSetLatchPos 0 +#define BackendWaitSetSocketPos 2 + + /* * prototypes for functions in latch.c */ @@ -179,7 +186,7 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info); extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); -extern void InitializeLatchWaitSet(void); +extern void InitializeBackendWaitSet(void); extern int GetNumRegisteredWaitEvents(WaitEventSet *set); extern bool WaitEventSetCanReportClosed(void); -- 2.35.1