Re: Reducing WaitEventSet syscall churn - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Reducing WaitEventSet syscall churn
Date
Msg-id 20200313.162113.33507975146072569.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Reducing WaitEventSet syscall churn  (Thomas Munro <thomas.munro@gmail.com>)
Responses Re: Reducing WaitEventSet syscall churn  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Re: Reducing WaitEventSet syscall churn  (Daniel Gustafsson <daniel@yesql.se>)
List pgsql-hackers
Hello.

At Tue, 10 Mar 2020 08:19:24 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
me> I'l continue reviewing in later mail.
me> 
me> > 0005: "libpq: Add PQsocketChangeCount to advertise socket changes."
me> .... 

At Thu, 27 Feb 2020 12:17:45 +1300, Thomas Munro <thomas.munro@gmail.com> wrote
in 
> On Sat, Feb 8, 2020 at 10:15 AM Thomas Munro <thomas.munro@gmail.com> wrote:
> 0005: "libpq: Add PQsocketChangeCount to advertise socket changes."
> 
> To support a long lived WES, libpq needs a way tell us when the socket
> changes underneath our feet.  This is the simplest thing I could think
> of; better ideas welcome.

I think at least windows is the reason for not just detecting the
change of the value of fd.  Instead of counting disconnection, we
could use event libpq-event.

QregisterEventProc returns false for all of bad-parameter,
already-registered, out-of-memory and proc-rejection. I don't think it
is usable interface so the attached 0005 patch fixes that. (but I
found it not necessarily needed after making 0007, but I included it
as a proposal separate from this patch set. It's not including the
corresponding doc fix.).

> 0006: "Reuse a WaitEventSet in libpqwalreceiver.c."
> 
> Rather than having all users of libpqwalreceiver.c deal with the
> complicated details of wait set management, have libpqwalreceiver
> expose a waiting interface that understands socket changes.

Looks reasonable. The attached 0006 and 0007 are a possible
replacement if we use libpq-event.

> Unfortunately, I couldn't figure out how to use CommonWaitSet for this
> (ie adding and removing sockets to that as required), due to
> complications with the bookkeeping required to provide the fd_closed
> flag to RemoveWaitEvent().  So it creates its own internal long lived
> WaitEventSet.

Agreed since they are used different way. But with the attached closed
connection is marked as wes_socket_position = -1.

> 0007: "Use a WaitEventSet for postgres_fdw."

Continues..

The attached are:
0001-0004 Not changed
0005 Fix interface of PQregisterEventProc
0006 Add new libpq event for this use.
0007 Another version of "0006 Reuse a WaitEventSet in
     libpqwalreceiver.c" based on libpq event.
0008-0011 Not changed (old 0007-0010, blindly appended)

passed the regression (includeing TAP recovery test) up to here.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 1a80abd292ff0d6a47274eb188a5576b2ef3cf6e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 25 Feb 2020 02:53:35 +1300
Subject: [PATCH 01/11] Don't use EV_CLEAR for kqueue events.

For the semantics to match the epoll implementation, we need a
socket to continue to appear readable/writable if you wait
multiple times without doing I/O in between (in Linux
terminology, level-triggered rather than edge-triggered).
Similar to commit 3b790d256f8 for Windows.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/storage/ipc/latch.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 046ca5c6c7..3b6acfb251 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -991,7 +991,7 @@ WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action,
 {
     k_ev->ident = event->fd;
     k_ev->filter = filter;
-    k_ev->flags = action | EV_CLEAR;
+    k_ev->flags = action;
     k_ev->fflags = 0;
     k_ev->data = 0;
     AccessWaitEvent(k_ev) = event;
@@ -1003,7 +1003,7 @@ WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event)
     /* For now postmaster death can only be added, not removed. */
     k_ev->ident = PostmasterPid;
     k_ev->filter = EVFILT_PROC;
-    k_ev->flags = EV_ADD | EV_CLEAR;
+    k_ev->flags = EV_ADD;
     k_ev->fflags = NOTE_EXIT;
     k_ev->data = 0;
     AccessWaitEvent(k_ev) = event;
-- 
2.18.2

From f93551a42b9e9ad0da05197d48fac5790474249d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 24 Feb 2020 15:39:24 +1300
Subject: [PATCH 02/11] Use a long lived WaitEventSet for WaitLatch().

Create CommonWaitSet at backend startup time, and use it to
implement WaitLatch().  This avoids a bunch of epoll/kqueue
system calls, and makes sure we don't run into EMFILE later
due to lack of file descriptors.

Reorder SubPostmasterMain() slightly so that we restore the
postmaster pipe and Windows signal before we reach
InitPostmasterChild(), to make this work in EXEC_BACKEND
builds.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/postmaster/postmaster.c | 24 +++++++-------
 src/backend/storage/ipc/latch.c     | 49 +++++++++++++++++++++++++++--
 src/backend/utils/init/miscinit.c   |  2 ++
 src/include/storage/latch.h         |  1 +
 4 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 46be78aadb..c472971ce0 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -4878,9 +4878,6 @@ SubPostmasterMain(int argc, char *argv[])
     IsPostmasterEnvironment = true;
     whereToSendOutput = DestNone;
 
-    /* Setup as postmaster child */
-    InitPostmasterChild();
-
     /* Setup essential subsystems (to ensure elog() behaves sanely) */
     InitializeGUCOptions();
 
@@ -4895,6 +4892,18 @@ SubPostmasterMain(int argc, char *argv[])
     /* Close the postmaster's sockets (as soon as we know them) */
     ClosePostmasterPorts(strcmp(argv[1], "--forklog") == 0);
 
+    /*
+     * Start our win32 signal implementation. This has to be done after we
+     * read the backend variables, because we need to pick up the signal pipe
+     * from the parent process.
+     */
+#ifdef WIN32
+    pgwin32_signal_initialize();
+#endif
+
+    /* Setup as postmaster child */
+    InitPostmasterChild();
+
     /*
      * Set reference point for stack-depth checking
      */
@@ -4943,15 +4952,6 @@ SubPostmasterMain(int argc, char *argv[])
     if (strcmp(argv[1], "--forkavworker") == 0)
         AutovacuumWorkerIAm();
 
-    /*
-     * Start our win32 signal implementation. This has to be done after we
-     * read the backend variables, because we need to pick up the signal pipe
-     * from the parent process.
-     */
-#ifdef WIN32
-    pgwin32_signal_initialize();
-#endif
-
     /* In EXEC_BACKEND case we will not have inherited these settings */
     pqinitmask();
     PG_SETMASK(&BlockSig);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 3b6acfb251..30e461e965 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -56,6 +56,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/memutils.h"
 
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
@@ -129,6 +130,9 @@ struct WaitEventSet
 #endif
 };
 
+/* A common WaitEventSet used to implement WatchLatch() */
+static WaitEventSet *CommonWaitSet;
+
 #ifndef WIN32
 /* Are we currently in WaitLatch? The signal handler would like to know. */
 static volatile sig_atomic_t waiting = false;
@@ -242,6 +246,20 @@ InitializeLatchSupport(void)
 #endif
 }
 
+void
+InitializeCommonWaitSet(void)
+{
+    Assert(CommonWaitSet == NULL);
+
+    /* Set up the WaitEventSet used by WaitLatch(). */
+    CommonWaitSet = CreateWaitEventSet(TopMemoryContext, 2);
+    AddWaitEventToSet(CommonWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
+                      MyLatch, NULL);
+    if (IsUnderPostmaster)
+        AddWaitEventToSet(CommonWaitSet, WL_EXIT_ON_PM_DEATH,
+                          PGINVALID_SOCKET, NULL, NULL);
+}
+
 /*
  * Initialize a process-local latch.
  */
@@ -365,8 +383,29 @@ int
 WaitLatch(Latch *latch, int wakeEvents, long timeout,
           uint32 wait_event_info)
 {
-    return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout,
-                             wait_event_info);
+    WaitEvent    event;
+
+    /* Postmaster-managed callers must handle postmaster death somehow. */
+    Assert(!IsUnderPostmaster ||
+           (wakeEvents & WL_EXIT_ON_PM_DEATH) ||
+           (wakeEvents & WL_POSTMASTER_DEATH));
+
+    /*
+     * Some callers may have a latch other than MyLatch, or want to handle
+     * postmaster death differently.  It's cheap to assign those, so just do it
+     * every time.
+     */
+    ModifyWaitEvent(CommonWaitSet, 0, WL_LATCH_SET, latch);
+    CommonWaitSet->exit_on_postmaster_death =
+        ((wakeEvents & WL_EXIT_ON_PM_DEATH) != 0);
+
+    if (WaitEventSetWait(CommonWaitSet,
+                         (wakeEvents & WL_TIMEOUT) ? timeout : -1,
+                         &event, 1,
+                         wait_event_info) == 0)
+        return WL_TIMEOUT;
+    else
+        return event.events;
 }
 
 /*
@@ -700,7 +739,11 @@ FreeWaitEventSet(WaitEventSet *set)
     ReleaseExternalFD();
 #elif defined(WAIT_USE_KQUEUE)
     close(set->kqueue_fd);
-    ReleaseExternalFD();
+    if (set->kqueue_fd >= 0)
+    {
+        close(set->kqueue_fd);
+        ReleaseExternalFD();
+    }
 #elif defined(WAIT_USE_WIN32)
     WaitEvent  *cur_event;
 
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index cd099b0c70..1cace9735d 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -109,6 +109,7 @@ InitPostmasterChild(void)
     InitializeLatchSupport();
     MyLatch = &LocalLatchData;
     InitLatch(MyLatch);
+    InitializeCommonWaitSet();
 
     /*
      * If possible, make this process a group leader, so that the postmaster
@@ -141,6 +142,7 @@ InitStandaloneProcess(const char *argv0)
     InitializeLatchSupport();
     MyLatch = &LocalLatchData;
     InitLatch(MyLatch);
+    InitializeCommonWaitSet();
 
     /* Compute paths, no postmaster to inherit from */
     if (my_exec_path[0] == '\0')
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 46ae56cae3..ec1865a8fd 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,7 @@ extern int    WaitLatch(Latch *latch, int wakeEvents, long timeout,
                       uint32 wait_event_info);
 extern int    WaitLatchOrSocket(Latch *latch, int wakeEvents,
                               pgsocket sock, long timeout, uint32 wait_event_info);
+extern void InitializeCommonWaitSet(void);
 
 /*
  * Unix implementation uses SIGUSR1 for inter-process signaling.
-- 
2.18.2

From 297764ce9da84bdb7c66faac61ec51ed49fbab0b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 24 Feb 2020 15:49:00 +1300
Subject: [PATCH 03/11] Use WaitLatch() for condition variables.

Previously, condition_variable.c created its own long lived
WaitEventSet to avoid extra system calls.  WaitLatch() now
does that, so there is no point in wasting an extra kernel
descriptor.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/storage/lmgr/condition_variable.c | 28 ++++---------------
 1 file changed, 5 insertions(+), 23 deletions(-)

diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 37b6a4eecd..2ec00397b4 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -30,9 +30,6 @@
 /* Initially, we are not prepared to sleep on any condition variable. */
 static ConditionVariable *cv_sleep_target = NULL;
 
-/* Reusable WaitEventSet. */
-static WaitEventSet *cv_wait_event_set = NULL;
-
 /*
  * Initialize a condition variable.
  */
@@ -62,23 +59,6 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 {
     int            pgprocno = MyProc->pgprocno;
 
-    /*
-     * If first time through in this process, create a WaitEventSet, which
-     * we'll reuse for all condition variable sleeps.
-     */
-    if (cv_wait_event_set == NULL)
-    {
-        WaitEventSet *new_event_set;
-
-        new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
-        AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
-                          MyLatch, NULL);
-        AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
-                          NULL, NULL);
-        /* Don't set cv_wait_event_set until we have a correct WES. */
-        cv_wait_event_set = new_event_set;
-    }
-
     /*
      * If some other sleep is already prepared, cancel it; this is necessary
      * because we have just one static variable tracking the prepared sleep,
@@ -135,6 +115,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
     long        cur_timeout = -1;
     instr_time    start_time;
     instr_time    cur_time;
+    int            wait_events;
 
     /*
      * If the caller didn't prepare to sleep explicitly, then do so now and
@@ -166,19 +147,20 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
         INSTR_TIME_SET_CURRENT(start_time);
         Assert(timeout >= 0 && timeout <= INT_MAX);
         cur_timeout = timeout;
+        wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
     }
+    else
+        wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
 
     while (true)
     {
-        WaitEvent    event;
         bool        done = false;
 
         /*
          * Wait for latch to be set.  (If we're awakened for some other
          * reason, the code below will cope anyway.)
          */
-        (void) WaitEventSetWait(cv_wait_event_set, cur_timeout, &event, 1,
-                                wait_event_info);
+        (void) WaitLatch(MyLatch, wait_events, cur_timeout, wait_event_info);
 
         /* Reset latch before examining the state of the wait list. */
         ResetLatch(MyLatch);
-- 
2.18.2

From d5ba7d38d7618c173eb10f3369e21937fb23aa74 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 24 Feb 2020 19:05:00 +1300
Subject: [PATCH 04/11] Introduce RemoveWaitEvent().

This will allow WaitEventSet objects to be used in more
long lived scenarios, where sockets are added and removed.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/storage/ipc/latch.c | 131 ++++++++++++++++++++++++++++----
 src/include/storage/latch.h     |   3 +
 2 files changed, 120 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 30e461e965..025545fc89 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -84,6 +84,7 @@ struct WaitEventSet
 {
     int            nevents;        /* number of registered events */
     int            nevents_space;    /* maximum number of events in this set */
+    int            free_list;        /* position of first free event */
 
     /*
      * Array, of nevents_space length, storing the definition of events this
@@ -119,6 +120,8 @@ struct WaitEventSet
 #elif defined(WAIT_USE_POLL)
     /* poll expects events to be waited on every poll() call, prepare once */
     struct pollfd *pollfds;
+    /* track the populated range of pollfds */
+    int            npollfds;
 #elif defined(WAIT_USE_WIN32)
 
     /*
@@ -127,6 +130,8 @@ struct WaitEventSet
      * event->pos + 1).
      */
     HANDLE       *handles;
+    /* track the populated range of handles */
+    int            nhandles;
 #endif
 };
 
@@ -642,13 +647,16 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 #elif defined(WAIT_USE_POLL)
     set->pollfds = (struct pollfd *) data;
     data += MAXALIGN(sizeof(struct pollfd) * nevents);
+    set->npollfds = 0;
 #elif defined(WAIT_USE_WIN32)
     set->handles = (HANDLE) data;
     data += MAXALIGN(sizeof(HANDLE) * nevents);
+    set->nhandles = 0;
 #endif
 
     set->latch = NULL;
     set->nevents_space = nevents;
+    set->nevents = 0;
     set->exit_on_postmaster_death = false;
 
 #if defined(WAIT_USE_EPOLL)
@@ -714,11 +722,25 @@ CreateWaitEventSet(MemoryContext context, int nevents)
      * Note: pgwin32_signal_event should be first to ensure that it will be
      * reported when multiple events are set.  We want to guarantee that
      * pending signals are serviced.
+     *
+     * We set unused handles to INVALID_HANDLE_VALUE, because
+     * WaitForMultipleObjects() considers that to mean "this process" which is
+     * not signaled until process, so it's a way of leaving a hole in the
+     * middle of the wait set if you remove something (just like -1 in the poll
+     * implementation).  An alternative would be to fill in holes and create a
+     * non 1-to-1 mapping between 'events' and 'handles'.
      */
     set->handles[0] = pgwin32_signal_event;
-    StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
+    for (int i = 0; i < nevents; ++i)
+        set->handles[i + 1] = INVALID_HANDLE_VALUE;
 #endif
 
+    /* Set up the free list. */
+    for (int i = 0; i < nevents; ++i)
+        set->events[i].next_free = i + 1;
+    set->events[nevents - 1].next_free = -1;
+    set->free_list = 0;
+
     return set;
 }
 
@@ -727,7 +749,6 @@ CreateWaitEventSet(MemoryContext context, int nevents)
  *
  * Note: preferably, this shouldn't have to free any resources that could be
  * inherited across an exec().  If it did, we'd likely leak those resources in
- * many scenarios.  For the epoll case, we ensure that by setting FD_CLOEXEC
  * when the FD is created.  For the Windows case, we assume that the handles
  * involved are non-inheritable.
  */
@@ -748,9 +769,12 @@ FreeWaitEventSet(WaitEventSet *set)
     WaitEvent  *cur_event;
 
     for (cur_event = set->events;
-         cur_event < (set->events + set->nevents);
+         cur_event < (set->events + set->nhandles);
          cur_event++)
     {
+        if (set->handles[cur_event->pos + 1] == INVALID_HANDLE_VALUE)
+            continue;
+
         if (cur_event->events & WL_LATCH_SET)
         {
             /* uses the latch's HANDLE */
@@ -805,9 +829,6 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 {
     WaitEvent  *event;
 
-    /* not enough space */
-    Assert(set->nevents < set->nevents_space);
-
     if (events == WL_EXIT_ON_PM_DEATH)
     {
         events = WL_POSTMASTER_DEATH;
@@ -833,8 +854,12 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
     if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
         elog(ERROR, "cannot wait on socket event without a socket");
 
-    event = &set->events[set->nevents];
-    event->pos = set->nevents++;
+    /* Do we have any free slots? */
+    if (set->free_list == -1)
+        elog(ERROR, "WaitEventSet is full");
+
+    event = &set->events[set->free_list];
+    event->pos = set->free_list;
     event->fd = fd;
     event->events = events;
     event->user_data = user_data;
@@ -868,6 +893,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
     WaitEventAdjustWin32(set, event);
 #endif
 
+    /* Remove it from the free list. */
+    set->free_list = event->next_free;
+    event->next_free = -1;
+    set->nevents++;
+
     return event->pos;
 }
 
@@ -885,7 +915,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
     int            old_events;
 #endif
 
-    Assert(pos < set->nevents);
+    Assert(pos < set->nevents_space);
 
     event = &set->events[pos];
 #if defined(WAIT_USE_KQUEUE)
@@ -933,6 +963,63 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 #endif
 }
 
+/*
+ * If the descriptor has already been closed, the kernel should already have
+ * removed it from the wait set (except in WAIT_USE_POLL).  Pass in true for
+ * fd_closed in that case, so we don't try to remove it ourselves.
+ */
+void
+RemoveWaitEvent(WaitEventSet *set, int pos, bool fd_closed)
+{
+    WaitEvent  *event;
+
+    Assert(pos >= 0);
+    Assert(pos < set->nevents_space);
+    event = &set->events[pos];
+
+    /* For now only sockets can be removed */
+    if ((event->events & WL_SOCKET_MASK) == 0)
+        elog(ERROR, "event type cannot be removed");
+
+#if defined(WAIT_USE_EPOLL)
+    if (!fd_closed)
+        WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_KQUEUE)
+    if (!fd_closed)
+    {
+        int old_events = event->events;
+
+        event->events = 0;
+        WaitEventAdjustKqueue(set, event, old_events);
+    }
+#elif defined(WAIT_USE_POLL)
+    /* no kernel state to remove, just blank out the fd */
+    set->pollfds[event->pos].fd = -1;
+    /* see if we can shrink the range of active fds */
+    while (set->npollfds > 0 &&
+           set->pollfds[set->npollfds - 1].fd == -1)
+        set->npollfds -= 1;
+#elif defined(WAIT_USE_WIN32)
+    if (!fd_closed)
+        WSAEventSelect(event->fd, NULL, 0);
+    if (set->handles[event->pos + 1] != INVALID_HANDLE_VALUE)
+    {
+        WSACloseEvent(set->handles[event->pos + 1]);
+        set->handles[event->pos + 1] = INVALID_HANDLE_VALUE;
+    }
+    /* see if we can shrink the range of active handles */
+    while (set->nhandles > 0 &&
+           set->handles[set->nhandles] == INVALID_HANDLE_VALUE)
+        set->nhandles -= 1;
+#endif
+
+    /* This position is now free. */
+    memset(event, 0, sizeof(*event));
+    event->next_free = set->free_list;
+    set->free_list = pos;
+    set->nevents--;
+}
+
 #if defined(WAIT_USE_EPOLL)
 /*
  * action can be one of EPOLL_CTL_ADD | EPOLL_CTL_MOD | EPOLL_CTL_DEL
@@ -994,6 +1081,9 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
     pollfd->revents = 0;
     pollfd->fd = event->fd;
 
+    /* track the known range of populated slots */
+    set->npollfds = Max(event->pos + 1, set->nevents);
+
     /* prepare pollfd entry once */
     if (event->events == WL_LATCH_SET)
     {
@@ -1072,7 +1162,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
     Assert(event->events != WL_LATCH_SET || set->latch != NULL);
     Assert(event->events == WL_LATCH_SET ||
            event->events == WL_POSTMASTER_DEATH ||
-           (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+           (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) ||
+           (event->events == 0 &&
+            (old_events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))));
 
     if (event->events == WL_POSTMASTER_DEATH)
     {
@@ -1149,6 +1241,9 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 {
     HANDLE       *handle = &set->handles[event->pos + 1];
 
+    /* track the known range of populated slots */
+    set->nhandles = Max(event->pos + 1, set->nhandles);
+
     if (event->events == WL_LATCH_SET)
     {
         Assert(set->latch != NULL);
@@ -1169,12 +1264,15 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
         if (event->events & WL_SOCKET_CONNECTED)
             flags |= FD_CONNECT;
 
-        if (*handle == WSA_INVALID_EVENT)
+        if (*handle == INVALID_HANDLE_VALUE)
         {
             *handle = WSACreateEvent();
             if (*handle == WSA_INVALID_EVENT)
+            {
+                *handle = INVALID_HANDLE_VALUE;
                 elog(ERROR, "failed to create event for socket: error code %u",
                      WSAGetLastError());
+            }
         }
         if (WSAEventSelect(event->fd, *handle, flags) != 0)
             elog(ERROR, "failed to set up event for socket: error code %u",
@@ -1304,6 +1402,11 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
     return returned_events;
 }
 
+int
+WaitEventSetSize(WaitEventSet *set)
+{
+    return set->nevents;
+}
 
 #if defined(WAIT_USE_EPOLL)
 
@@ -1589,7 +1692,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
     struct pollfd *cur_pollfd;
 
     /* Sleep */
-    rc = poll(set->pollfds, set->nevents, (int) cur_timeout);
+    rc = poll(set->pollfds, set->npollfds, (int) cur_timeout);
 
     /* Check return code */
     if (rc < 0)
@@ -1761,9 +1864,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
     /*
      * Sleep.
      *
-     * Need to wait for ->nevents + 1, because signal handle is in [0].
+     * Need to wait for ->nhandles + 1, because signal handle is in [0].
      */
-    rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE,
+    rc = WaitForMultipleObjects(set->nhandles + 1, set->handles, FALSE,
                                 cur_timeout);
 
     /* Check return code */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index ec1865a8fd..210f37659e 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -144,6 +144,7 @@ typedef struct WaitEvent
     uint32        events;            /* triggered events */
     pgsocket    fd;                /* socket fd associated with event */
     void       *user_data;        /* pointer provided in AddWaitEventToSet */
+    int            next_free;        /* free list for internal use */
 #ifdef WIN32
     bool        reset;            /* Is reset of the event required? */
 #endif
@@ -168,6 +169,8 @@ extern void FreeWaitEventSet(WaitEventSet *set);
 extern int    AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
                               Latch *latch, void *user_data);
 extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch);
+extern void RemoveWaitEvent(WaitEventSet *set, int pos, bool fd_closed);
+extern int WaitEventSetSize(WaitEventSet *set);
 
 extern int    WaitEventSetWait(WaitEventSet *set, long timeout,
                              WaitEvent *occurred_events, int nevents,
-- 
2.18.2

From 3fdb3f11d99f4efb8e88b3c8888b4092694f45c7 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Fri, 13 Mar 2020 12:00:23 +0900
Subject: [PATCH 05/11] Fix interface of PQregisterEventProc

The function returns false for all kind of failures so the caller
cannot identify the cause of failure. Change the return value from
int(bool) to an enum.
---
 src/interfaces/libpq/libpq-events.c | 34 ++++++++++++++++++++++-------
 src/interfaces/libpq/libpq-events.h | 15 +++++++++++--
 2 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/src/interfaces/libpq/libpq-events.c b/src/interfaces/libpq/libpq-events.c
index d050d7f3f2..6b594d910a 100644
--- a/src/interfaces/libpq/libpq-events.c
+++ b/src/interfaces/libpq/libpq-events.c
@@ -36,20 +36,31 @@
  * The function returns a non-zero if successful.  If the function fails,
  * zero is returned.
  */
-int
+PGRegEventResult
 PQregisterEventProc(PGconn *conn, PGEventProc proc,
                     const char *name, void *passThrough)
 {
     int            i;
     PGEventRegister regevt;
 
-    if (!proc || !conn || !name || !*name)
-        return false;            /* bad arguments */
+    if (!conn)
+        return PGEVTREG_BADARG;
+
+    if (!proc || !name || !*name)
+    {
+        printfPQExpBuffer(&conn->errorMessage,
+                          "bad argument in PQregisterEventProc");
+        return PGEVTREG_BADARG;
+    }
 
     for (i = 0; i < conn->nEvents; i++)
     {
         if (conn->events[i].proc == proc)
-            return false;        /* already registered */
+        {
+            printfPQExpBuffer(&conn->errorMessage,
+                              "proc is already registered in PQregisterEventProc");
+            return PGEVTREG_ALREADY_REAGISTERED;
+        }
     }
 
     if (conn->nEvents >= conn->eventArraySize)
@@ -64,7 +75,10 @@ PQregisterEventProc(PGconn *conn, PGEventProc proc,
             e = (PGEvent *) malloc(newSize * sizeof(PGEvent));
 
         if (!e)
-            return false;
+        {
+            printfPQExpBuffer(&conn->errorMessage, "out of memory");
+            return PGEVTREG_OUT_OF_MEMORY;
+        }
 
         conn->eventArraySize = newSize;
         conn->events = e;
@@ -73,7 +87,10 @@ PQregisterEventProc(PGconn *conn, PGEventProc proc,
     conn->events[conn->nEvents].proc = proc;
     conn->events[conn->nEvents].name = strdup(name);
     if (!conn->events[conn->nEvents].name)
-        return false;
+    {
+        printfPQExpBuffer(&conn->errorMessage, "out of memory");
+        return PGEVTREG_OUT_OF_MEMORY;
+    }
     conn->events[conn->nEvents].passThrough = passThrough;
     conn->events[conn->nEvents].data = NULL;
     conn->events[conn->nEvents].resultInitialized = false;
@@ -84,10 +101,11 @@ PQregisterEventProc(PGconn *conn, PGEventProc proc,
     {
         conn->nEvents--;
         free(conn->events[conn->nEvents].name);
-        return false;
+        printfPQExpBuffer(&conn->errorMessage, "proc rejected");
+        return PGEVTREG_PROC_REJECTED;
     }
 
-    return true;
+    return PGEVTREG_SUCCESS;
 }
 
 /*
diff --git a/src/interfaces/libpq/libpq-events.h b/src/interfaces/libpq/libpq-events.h
index 5108a55cf6..7b07537c11 100644
--- a/src/interfaces/libpq/libpq-events.h
+++ b/src/interfaces/libpq/libpq-events.h
@@ -23,6 +23,16 @@ extern "C"
 {
 #endif
 
+/* PQregisterEventProc return value */
+typedef enum
+{
+    PGEVTREG_SUCCESS,
+    PGEVTREG_BADARG,
+    PGEVTREG_ALREADY_REAGISTERED,
+    PGEVTREG_OUT_OF_MEMORY,
+    PGEVTREG_PROC_REJECTED
+} PGRegEventResult;
+    
 /* Callback Event Ids */
 typedef enum
 {
@@ -69,8 +79,9 @@ typedef struct
 typedef int (*PGEventProc) (PGEventId evtId, void *evtInfo, void *passThrough);
 
 /* Registers an event proc with the given PGconn. */
-extern int    PQregisterEventProc(PGconn *conn, PGEventProc proc,
-                                const char *name, void *passThrough);
+extern PGRegEventResult    PQregisterEventProc(PGconn *conn, PGEventProc proc,
+                                            const char *name,
+                                            void *passThrough);
 
 /* Sets the PGconn instance data for the provided proc to data. */
 extern int    PQsetInstanceData(PGconn *conn, PGEventProc proc, void *data);
-- 
2.18.2

From 15013dfca167ec336ba05cdf4550204ee69cc8f5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Fri, 13 Mar 2020 12:01:56 +0900
Subject: [PATCH 06/11] Add new libpq-event PGEVT_CONNDISCONNECTION

Add an event to detect closure of the underlying socket.
---
 src/interfaces/libpq/fe-connect.c   | 14 ++++++++++++++
 src/interfaces/libpq/libpq-events.h |  6 ++++++
 2 files changed, 20 insertions(+)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 0157c619aa..8de5304600 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -457,6 +457,8 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+    int i;
+
     /* Drop any SSL state */
     pqsecure_close(conn);
 
@@ -465,6 +467,18 @@ pqDropConnection(PGconn *conn, bool flushInput)
         closesocket(conn->sock);
     conn->sock = PGINVALID_SOCKET;
 
+    /* let any event procs notice of disconnection */
+    for (i = 0; i < conn->nEvents; i++)
+    {
+        PGEventConnDisconnection evt;
+
+        evt.conn = conn;
+
+        /* ignoring failure */
+        (void) conn->events[i].proc(PGEVT_CONNDISCONNECTION, &evt,
+                                    conn->events[i].passThrough);
+    }
+    
     /* Optionally discard any unread data */
     if (flushInput)
         conn->inStart = conn->inCursor = conn->inEnd = 0;
diff --git a/src/interfaces/libpq/libpq-events.h b/src/interfaces/libpq/libpq-events.h
index 7b07537c11..ffcd604b82 100644
--- a/src/interfaces/libpq/libpq-events.h
+++ b/src/interfaces/libpq/libpq-events.h
@@ -38,6 +38,7 @@ typedef enum
 {
     PGEVT_REGISTER,
     PGEVT_CONNRESET,
+    PGEVT_CONNDISCONNECTION,
     PGEVT_CONNDESTROY,
     PGEVT_RESULTCREATE,
     PGEVT_RESULTCOPY,
@@ -59,6 +60,11 @@ typedef struct
     PGconn       *conn;
 } PGEventConnDestroy;
 
+typedef struct
+{
+    PGconn       *conn;
+} PGEventConnDisconnection;
+
 typedef struct
 {
     PGconn       *conn;
-- 
2.18.2

From 00eb469bb537dfb134d36d79b93c337e92308557 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Fri, 13 Mar 2020 14:18:05 +0900
Subject: [PATCH 07/11] Reuse a WaitEventSet in libpqwalreceiver.c.

To avoid repeatedly setting up and tearing down WaitEventSet objects
and associated kernel objects, reuse a WaitEventSet.  Export a wait
function that is smart enough to handle socket changes under the
covers, and then use that for physical and logical replication.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 157 ++++++++++++++----
 src/include/replication/walreceiver.h         |   5 +
 2 files changed, 127 insertions(+), 35 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e4fd1f9bb6..319d5b970d 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,6 +23,7 @@
 #include "catalog/pg_type.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq-events.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -41,6 +42,10 @@ struct WalReceiverConn
 {
     /* Current connection to the primary, if any */
     PGconn       *streamConn;
+    /* Wait event set used to wait for I/O */
+    WaitEventSet *wes;
+    /* Used to handle changes in the underlying socket */
+    int            wes_socket_position;
     /* Used to remember if the connection is logical or physical */
     bool        logical;
     /* Buffer for currently read records */
@@ -80,6 +85,7 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
                                        const int nRetTypes,
                                        const Oid *retTypes);
 static void libpqrcv_disconnect(WalReceiverConn *conn);
+static int libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event);
 
 static WalReceiverFunctionsType PQWalReceiverFunctions = {
     libpqrcv_connect,
@@ -96,13 +102,20 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
     libpqrcv_create_slot,
     libpqrcv_get_backend_pid,
     libpqrcv_exec,
-    libpqrcv_disconnect
+    libpqrcv_disconnect,
+    libpqrcv_wait
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
+static int walrcv_libpqcallback(PGEventId evtId, void *evtInfo,
+                                void *passThrough);
+static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query);
+static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
+static int libpqrcv_wait_for_socket(WalReceiverConn *conn, int io_flags,
+                                    long timeout,
+                                    WaitEvent *occurred_events,
+                                    WaitEventClient event_type);
 
 /*
  * Module initialization function
@@ -115,6 +128,27 @@ _PG_init(void)
     WalReceiverFunctions = &PQWalReceiverFunctions;
 }
 
+static int
+walrcv_libpqcallback(PGEventId evtId, void *evtInfo, void *passThrough)
+{
+    PGconn                       *conn PG_USED_FOR_ASSERTS_ONLY;
+    WalReceiverConn               *walrcvconn = (WalReceiverConn *) passThrough;
+
+    /* return if not interested in the event or nothing to do */
+    if (evtId != PGEVT_CONNDISCONNECTION ||
+        walrcvconn->wes == NULL || walrcvconn->wes_socket_position < 0)
+        return true;
+
+    conn = ((PGEventConnDisconnection *)evtInfo)->conn;
+    Assert(walrcvconn->streamConn == conn);
+
+    /* The socket is already closed. */
+    RemoveWaitEvent(walrcvconn->wes, walrcvconn->wes_socket_position, true);
+    walrcvconn->wes_socket_position = -1;
+
+    return true;
+}
+
 /*
  * Establish the connection to the primary server for XLOG streaming
  *
@@ -129,6 +163,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
     const char *keys[5];
     const char *vals[5];
     int            i = 0;
+    PGRegEventResult regresult;
 
     /*
      * We use the expand_dbname parameter to process the connection string (or
@@ -168,6 +203,23 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
         return NULL;
     }
 
+    /* Create a WaitEventSet that will last as long as the connection. */
+    Assert(conn->streamConn);
+    regresult = PQregisterEventProc(conn->streamConn, walrcv_libpqcallback,
+                                    "libpqwalrcv disconnect callback", conn);
+    if (regresult != PGEVTREG_SUCCESS)
+    {
+        *err = pchomp(PQerrorMessage(conn->streamConn));
+        return NULL;
+    }
+        
+    conn->wes = CreateWaitEventSet(TopMemoryContext, 3);
+    conn->wes_socket_position =
+        AddWaitEventToSet(conn->wes, WL_SOCKET_READABLE,
+                          PQsocket(conn->streamConn), NULL, NULL);
+    AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+    AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL);
+
     /*
      * Poll connection until we have OK or FAILED status.
      *
@@ -177,7 +229,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
     do
     {
         int            io_flag;
-        int            rc;
+        WaitEvent    event;
 
         if (status == PGRES_POLLING_READING)
             io_flag = WL_SOCKET_READABLE;
@@ -189,21 +241,18 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
         else
             io_flag = WL_SOCKET_WRITEABLE;
 
-        rc = WaitLatchOrSocket(MyLatch,
-                               WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-                               PQsocket(conn->streamConn),
-                               0,
-                               WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+        (void) libpqrcv_wait_for_socket(conn, io_flag, -1, &event,
+                                        WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
         /* Interrupted? */
-        if (rc & WL_LATCH_SET)
+        if (event.events == WL_LATCH_SET)
         {
             ResetLatch(MyLatch);
             ProcessWalRcvInterrupts();
         }
 
         /* If socket is ready, advance the libpq state machine */
-        if (rc & io_flag)
+        if (event.events == io_flag)
             status = PQconnectPoll(conn->streamConn);
     } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
@@ -322,7 +371,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
      * Get the system identifier and timeline ID as a DataRow message from the
      * primary server.
      */
-    res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+    res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM");
     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     {
         PQclear(res);
@@ -431,7 +480,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
                          options->proto.physical.startpointTLI);
 
     /* Start streaming. */
-    res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+    res = libpqrcv_PQexec(conn, cmd.data);
     pfree(cmd.data);
 
     if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -479,7 +528,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
      * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
      * also possible in case we aborted the copy in mid-stream.
      */
-    res = libpqrcv_PQgetResult(conn->streamConn);
+    res = libpqrcv_PQgetResult(conn);
     if (PQresultStatus(res) == PGRES_TUPLES_OK)
     {
         /*
@@ -493,7 +542,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
         PQclear(res);
 
         /* the result set should be followed by CommandComplete */
-        res = libpqrcv_PQgetResult(conn->streamConn);
+        res = libpqrcv_PQgetResult(conn);
     }
     else if (PQresultStatus(res) == PGRES_COPY_OUT)
     {
@@ -506,7 +555,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
                             pchomp(PQerrorMessage(conn->streamConn)))));
 
         /* CommandComplete should follow */
-        res = libpqrcv_PQgetResult(conn->streamConn);
+        res = libpqrcv_PQgetResult(conn);
     }
 
     if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -516,7 +565,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
     PQclear(res);
 
     /* Verify that there are no more results */
-    res = libpqrcv_PQgetResult(conn->streamConn);
+    res = libpqrcv_PQgetResult(conn);
     if (res != NULL)
         ereport(ERROR,
                 (errmsg("unexpected result after CommandComplete: %s",
@@ -540,7 +589,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
      * Request the primary to send over the history file for given timeline.
      */
     snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-    res = libpqrcv_PQexec(conn->streamConn, cmd);
+    res = libpqrcv_PQexec(conn, cmd);
     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     {
         PQclear(res);
@@ -582,8 +631,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * May return NULL, rather than an error result, on failure.
  */
 static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
+libpqrcv_PQexec(WalReceiverConn *conn, const char *query)
 {
+    PGconn       *streamConn = conn->streamConn;
     PGresult   *lastResult = NULL;
 
     /*
@@ -606,7 +656,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
         /* Wait for, and collect, the next PGresult. */
         PGresult   *result;
 
-        result = libpqrcv_PQgetResult(streamConn);
+        result = libpqrcv_PQgetResult(conn);
         if (result == NULL)
             break;                /* query is complete, or failure */
 
@@ -631,30 +681,28 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
  * Perform the equivalent of PQgetResult(), but watch for interrupts.
  */
 static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
+libpqrcv_PQgetResult(WalReceiverConn *conn)
 {
+    PGconn       *streamConn = conn->streamConn;
+
     /*
      * Collect data until PQgetResult is ready to get the result without
      * blocking.
      */
     while (PQisBusy(streamConn))
     {
-        int            rc;
+        WaitEvent    event;
 
         /*
          * We don't need to break down the sleep into smaller increments,
          * since we'll get interrupted by signals and can handle any
          * interrupts here.
          */
-        rc = WaitLatchOrSocket(MyLatch,
-                               WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-                               WL_LATCH_SET,
-                               PQsocket(streamConn),
-                               0,
-                               WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+        (void) libpqrcv_wait_for_socket(conn, WL_SOCKET_READABLE, -1, &event,
+                                        WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
         /* Interrupted? */
-        if (rc & WL_LATCH_SET)
+        if (event.events == WL_LATCH_SET)
         {
             ResetLatch(MyLatch);
             ProcessWalRcvInterrupts();
@@ -681,9 +729,27 @@ libpqrcv_disconnect(WalReceiverConn *conn)
     PQfinish(conn->streamConn);
     if (conn->recvBuf != NULL)
         PQfreemem(conn->recvBuf);
+    FreeWaitEventSet(conn->wes);
     pfree(conn);
 }
 
+/*
+ * Wait for new data to arrive, or a timeout.
+ */
+static int
+libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event)
+{
+    WaitEvent    event;
+    int            rc;
+
+    rc = libpqrcv_wait_for_socket(conn, WL_SOCKET_READABLE,
+                                  timeout, &event, wait_event);
+    if (rc == 0)
+        return WL_TIMEOUT;
+
+    return event.events;
+}
+
 /*
  * Receive a message available from XLOG stream.
  *
@@ -701,8 +767,7 @@ libpqrcv_disconnect(WalReceiverConn *conn)
  * ereports on error.
  */
 static int
-libpqrcv_receive(WalReceiverConn *conn, char **buffer,
-                 pgsocket *wait_fd)
+libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 {
     int            rawlen;
 
@@ -733,13 +798,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
     {
         PGresult   *res;
 
-        res = libpqrcv_PQgetResult(conn->streamConn);
+        res = libpqrcv_PQgetResult(conn);
         if (PQresultStatus(res) == PGRES_COMMAND_OK)
         {
             PQclear(res);
 
             /* Verify that there are no more results. */
-            res = libpqrcv_PQgetResult(conn->streamConn);
+            res = libpqrcv_PQgetResult(conn);
             if (res != NULL)
             {
                 PQclear(res);
@@ -839,7 +904,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
         appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
     }
 
-    res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+    res = libpqrcv_PQexec(conn, cmd.data);
     pfree(cmd.data);
 
     if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -963,7 +1028,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                  errmsg("the query interface requires a database connection")));
 
-    pgres = libpqrcv_PQexec(conn->streamConn, query);
+    pgres = libpqrcv_PQexec(conn, query);
 
     switch (PQresultStatus(pgres))
     {
@@ -1047,3 +1112,25 @@ stringlist_to_identifierstr(PGconn *conn, List *strings)
 
     return res.data;
 }
+
+/*
+ * Update our WaitEventSet so that we can wait for 'io_flags' on our socket,
+ * considering that the socket might have changed.
+ */
+static int
+libpqrcv_wait_for_socket(WalReceiverConn *conn, int io_flags, long timeout,
+                         WaitEvent *occurred_events,
+                         WaitEventClient event_type)
+{
+    /*  wes is removed on disconnection, add new event if needed */
+    if (conn->wes_socket_position < 0)
+        conn->wes_socket_position =
+            AddWaitEventToSet(conn->wes, io_flags,
+                              PQsocket(conn->streamConn), NULL, NULL);
+    else
+        /* just set to wait for the right event */
+        ModifyWaitEvent(conn->wes, conn->wes_socket_position,
+                        io_flags, NULL);
+
+    return WaitEventSetWait(conn->wes, timeout, occurred_events, 1, event_type);
+}
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e08afc6548..115d6acf18 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -239,6 +239,8 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
                                              const int nRetTypes,
                                              const Oid *retTypes);
 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
+typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, long timeout,
+                               int wait_event);
 
 typedef struct WalReceiverFunctionsType
 {
@@ -257,6 +259,7 @@ typedef struct WalReceiverFunctionsType
     walrcv_get_backend_pid_fn walrcv_get_backend_pid;
     walrcv_exec_fn walrcv_exec;
     walrcv_disconnect_fn walrcv_disconnect;
+    walrcv_wait_fn walrcv_wait;
 } WalReceiverFunctionsType;
 
 extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
@@ -291,6 +294,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
     WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
 #define walrcv_disconnect(conn) \
     WalReceiverFunctions->walrcv_disconnect(conn)
+#define walrcv_wait(conn, timeout, wait_event) \
+    WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event)
 
 static inline void
 walrcv_clear_result(WalRcvExecResult *walres)
-- 
2.18.2

From df330045ef8b7a303f01b38c90e5d1805a71acac Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 25 Feb 2020 15:34:12 +1300
Subject: [PATCH 08/11] Use a WaitEventSet for postgres_fdw.

The same WaitEventSet object will be reused for the life of the
backend.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 contrib/postgres_fdw/connection.c | 98 +++++++++++++++++++++++++++----
 1 file changed, 88 insertions(+), 10 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index e45647f3ea..0397a16702 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -65,6 +65,12 @@ typedef struct ConnCacheEntry
  */
 static HTAB *ConnectionHash = NULL;
 
+/* Reusuable WaitEventSet. */
+static WaitEventSet *ConnectionWaitSet = NULL;
+static int64 ConnectionWaitSetSocketChangeCount = -1;
+static PGconn *ConnectionWaitSetConn = NULL;
+static int ConnectionWaitSetPosition = -1;
+
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -92,6 +98,7 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
                                      PGresult **result);
 static bool UserMappingPasswordRequired(UserMapping *user);
+static int pgfdw_wait_for_socket(PGconn *conn, long timeout);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -115,6 +122,17 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
     {
         HASHCTL        ctl;
 
+        /*
+         * We'll use a single WaitEventSet for the lifetime of this backend,
+         * and add and remove sockets as appropriate.  Only one socket will
+         * be in it at a time.
+         */
+        Assert(ConnectionWaitSet == NULL);
+        ConnectionWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+        AddWaitEventToSet(ConnectionWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
+        AddWaitEventToSet(ConnectionWaitSet, WL_EXIT_ON_PM_DEATH, -1, NULL,
+                          NULL);
+
         MemSet(&ctl, 0, sizeof(ctl));
         ctl.keysize = sizeof(ConnCacheKey);
         ctl.entrysize = sizeof(ConnCacheEntry);
@@ -344,6 +362,14 @@ disconnect_pg_server(ConnCacheEntry *entry)
     if (entry->conn != NULL)
     {
         PQfinish(entry->conn);
+        if (ConnectionWaitSetConn == entry->conn)
+        {
+            /* We do this after PQfinish, so we know the socket is closed. */
+            RemoveWaitEvent(ConnectionWaitSet,
+                            ConnectionWaitSetPosition,
+                            true);
+            ConnectionWaitSetConn = NULL;
+        }
         entry->conn = NULL;
         ReleaseExternalFD();
     }
@@ -603,11 +629,7 @@ pgfdw_get_result(PGconn *conn, const char *query)
                 int            wc;
 
                 /* Sleep until there's something to do */
-                wc = WaitLatchOrSocket(MyLatch,
-                                       WL_LATCH_SET | WL_SOCKET_READABLE |
-                                       WL_EXIT_ON_PM_DEATH,
-                                       PQsocket(conn),
-                                       -1L, PG_WAIT_EXTENSION);
+                wc = pgfdw_wait_for_socket(conn, -1);
                 ResetLatch(MyLatch);
 
                 CHECK_FOR_INTERRUPTS();
@@ -1207,11 +1229,7 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
                 cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
 
                 /* Sleep until there's something to do */
-                wc = WaitLatchOrSocket(MyLatch,
-                                       WL_LATCH_SET | WL_SOCKET_READABLE |
-                                       WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-                                       PQsocket(conn),
-                                       cur_timeout, PG_WAIT_EXTENSION);
+                wc = pgfdw_wait_for_socket(conn, cur_timeout);
                 ResetLatch(MyLatch);
 
                 CHECK_FOR_INTERRUPTS();
@@ -1250,3 +1268,63 @@ exit:    ;
         *result = last_res;
     return timed_out;
 }
+
+static int
+pgfdw_wait_for_socket(PGconn *conn, long timeout)
+{
+    WaitEvent    event;
+    int            rc;
+
+    /* If a different conn is in the set, or the socket changed, remove. */
+    if (ConnectionWaitSetConn)
+    {
+        bool socket_changed =
+            (ConnectionWaitSetSocketChangeCount !=
+            PQsocketChangeCount(ConnectionWaitSetConn));
+
+        if (ConnectionWaitSetConn == conn)
+        {
+            /*
+             * This connection is already in there, but the socket might have
+             * changed.  If so, remove it.
+             */
+            if (socket_changed)
+            {
+                RemoveWaitEvent(ConnectionWaitSet,
+                                ConnectionWaitSetPosition,
+                                true);
+                ConnectionWaitSetConn = NULL;
+            }
+        }
+        else
+        {
+            /*
+             * A different connection is in there.  Remove it, being careful
+             * to report whether the socket was already closed (this affects
+             * whether we unregister the fd with the kernel).
+             */
+            RemoveWaitEvent(ConnectionWaitSet,
+                            ConnectionWaitSetPosition,
+                            socket_changed);
+            ConnectionWaitSetConn = NULL;
+        }
+    }
+
+    /* Do we need to add our connection? */
+    if (ConnectionWaitSetConn == NULL)
+    {
+        ConnectionWaitSetPosition =
+            AddWaitEventToSet(ConnectionWaitSet, WL_SOCKET_READABLE,
+                              PQsocket(conn), NULL, NULL);
+        ConnectionWaitSetConn = conn;
+        ConnectionWaitSetSocketChangeCount = PQsocketChangeCount(conn);
+    }
+
+    /* Finally, we can wait. */
+    rc = WaitEventSetWait(ConnectionWaitSet, timeout, &event, 1,
+                          PG_WAIT_EXTENSION);
+    if (rc == 0)
+        return WL_TIMEOUT;
+
+    return event.events;
+}
-- 
2.18.2

From 79d9ba58f686462bebe035e27c5439a96b04a662 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 24 Feb 2020 23:51:09 +1300
Subject: [PATCH 09/11] Use WL_EXIT_ON_PM_DEATH in FeBeWaitSet.

Previously, we'd give either a FATAL message or a silent exit() when
we detected postmaster death, depending on which wait point we were at.
Make the exit more uniform, by using the standard exit facility.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/libpq/be-secure.c | 28 ----------------------------
 src/backend/libpq/pqcomm.c    |  2 +-
 2 files changed, 1 insertion(+), 29 deletions(-)

diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 2ae507a902..aec0926d93 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -184,28 +184,6 @@ retry:
         WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
                          WAIT_EVENT_CLIENT_READ);
 
-        /*
-         * If the postmaster has died, it's not safe to continue running,
-         * because it is the postmaster's job to kill us if some other backend
-         * exits uncleanly.  Moreover, we won't run very well in this state;
-         * helper processes like walwriter and the bgwriter will exit, so
-         * performance may be poor.  Finally, if we don't exit, pg_ctl will be
-         * unable to restart the postmaster without manual intervention, so no
-         * new connections can be accepted.  Exiting clears the deck for a
-         * postmaster restart.
-         *
-         * (Note that we only make this check when we would otherwise sleep on
-         * our latch.  We might still continue running for a while if the
-         * postmaster is killed in mid-query, or even through multiple queries
-         * if we never have to wait for read.  We don't want to burn too many
-         * cycles checking for this very rare condition, and this should cause
-         * us to exit quickly in most cases.)
-         */
-        if (event.events & WL_POSTMASTER_DEATH)
-            ereport(FATAL,
-                    (errcode(ERRCODE_ADMIN_SHUTDOWN),
-                     errmsg("terminating connection due to unexpected postmaster exit")));
-
         /* Handle interrupt. */
         if (event.events & WL_LATCH_SET)
         {
@@ -296,12 +274,6 @@ retry:
         WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
                          WAIT_EVENT_CLIENT_WRITE);
 
-        /* See comments in secure_read. */
-        if (event.events & WL_POSTMASTER_DEATH)
-            ereport(FATAL,
-                    (errcode(ERRCODE_ADMIN_SHUTDOWN),
-                     errmsg("terminating connection due to unexpected postmaster exit")));
-
         /* Handle interrupt. */
         if (event.events & WL_LATCH_SET)
         {
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 7717bb2719..5422175185 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -222,7 +222,7 @@ pq_init(void)
     AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
                       NULL, NULL);
     AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
-    AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+    AddWaitEventToSet(FeBeWaitSet, WL_EXIT_ON_PM_DEATH, -1, NULL, NULL);
 }
 
 /* --------------------------------
-- 
2.18.2

From 779a255cd97ceb52f138d7a00034dc6d774881de Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 24 Feb 2020 23:48:29 +1300
Subject: [PATCH 10/11] Use FeBeWaitSet for walsender.c.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/replication/walsender.c | 32 ++++++++++++++---------------
 1 file changed, 15 insertions(+), 17 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3f74bc8493..2beedc0f1a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1241,7 +1241,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
     /* If we have pending write here, go to slow path */
     for (;;)
     {
-        int            wakeEvents;
+        WaitEvent    event;
         long        sleeptime;
 
         /* Check for input from the client */
@@ -1258,13 +1258,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 
         sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
 
-        wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
-            WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
-
         /* Sleep until something happens or we time out */
-        (void) WaitLatchOrSocket(MyLatch, wakeEvents,
-                                 MyProcPort->sock, sleeptime,
-                                 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+        ModifyWaitEvent(FeBeWaitSet, 0,
+                        WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, NULL);
+        (void) WaitEventSetWait(FeBeWaitSet, sleeptime, &event, 1,
+                                WAIT_EVENT_WAL_SENDER_WRITE_DATA);
 
         /* Clear any already-pending wakeups */
         ResetLatch(MyLatch);
@@ -1348,6 +1346,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 
     for (;;)
     {
+        WaitEvent    event;
         long        sleeptime;
 
         /* Clear any already-pending wakeups */
@@ -1441,15 +1440,14 @@ WalSndWaitForWal(XLogRecPtr loc)
          */
         sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
 
-        wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
-            WL_SOCKET_READABLE | WL_TIMEOUT;
+        wakeEvents = WL_SOCKET_READABLE;
 
         if (pq_is_send_pending())
             wakeEvents |= WL_SOCKET_WRITEABLE;
 
-        (void) WaitLatchOrSocket(MyLatch, wakeEvents,
-                                 MyProcPort->sock, sleeptime,
-                                 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+        ModifyWaitEvent(FeBeWaitSet, 0, wakeEvents, NULL);
+        (void) WaitEventSetWait(FeBeWaitSet, sleeptime, &event, 1,
+                                WAIT_EVENT_WAL_SENDER_WAIT_WAL);
     }
 
     /* reactivate latch so WalSndLoop knows to continue */
@@ -2290,9 +2288,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
         {
             long        sleeptime;
             int            wakeEvents;
+            WaitEvent    event;
 
-            wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
-                WL_SOCKET_READABLE;
+            wakeEvents = WL_SOCKET_READABLE;
 
             /*
              * Use fresh timestamp, not last_processing, to reduce the chance
@@ -2304,9 +2302,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
                 wakeEvents |= WL_SOCKET_WRITEABLE;
 
             /* Sleep until something happens or we time out */
-            (void) WaitLatchOrSocket(MyLatch, wakeEvents,
-                                     MyProcPort->sock, sleeptime,
-                                     WAIT_EVENT_WAL_SENDER_MAIN);
+            ModifyWaitEvent(FeBeWaitSet, 0, wakeEvents, NULL);
+            (void) WaitEventSetWait(FeBeWaitSet, sleeptime, &event, 1,
+                                    WAIT_EVENT_WAL_SENDER_MAIN);
         }
     }
 }
-- 
2.18.2

From 0fd5c662953035a69bc90de796e82dae284284d0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 21 Jan 2020 12:54:11 +1300
Subject: [PATCH 11/11] Introduce a WaitEventSet for the stats collector.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/postmaster/pgstat.c | 24 ++++++++++++++----------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 107c965336..248ddc0bdd 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4429,6 +4429,8 @@ PgstatCollectorMain(int argc, char *argv[])
     int            len;
     PgStat_Msg    msg;
     int            wr;
+    WaitEvent    event;
+    WaitEventSet *wes;
 
     /*
      * Ignore all signals usually bound to some action in the postmaster,
@@ -4458,6 +4460,12 @@ PgstatCollectorMain(int argc, char *argv[])
     pgStatRunningInCollector = true;
     pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
 
+    /* Prepare to wait for our latch or data in our socket. */
+    wes = CreateWaitEventSet(CurrentMemoryContext, 3);
+    AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+    AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
+    AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
+
     /*
      * Loop to process messages until we get SIGQUIT or detect ungraceful
      * death of our parent postmaster.
@@ -4636,10 +4644,7 @@ PgstatCollectorMain(int argc, char *argv[])
 
         /* Sleep until there's something to do */
 #ifndef WIN32
-        wr = WaitLatchOrSocket(MyLatch,
-                               WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
-                               pgStatSock, -1L,
-                               WAIT_EVENT_PGSTAT_MAIN);
+        wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN);
 #else
 
         /*
@@ -4652,18 +4657,15 @@ PgstatCollectorMain(int argc, char *argv[])
          * to not provoke "using stale statistics" complaints from
          * backend_read_statsfile.
          */
-        wr = WaitLatchOrSocket(MyLatch,
-                               WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT,
-                               pgStatSock,
-                               2 * 1000L /* msec */ ,
-                               WAIT_EVENT_PGSTAT_MAIN);
+        wr = WaitEventSetWait(wes, 2 * 1000L /* msec */, &event, 1,
+                              WAIT_EVENT_PGSTAT_MAIN);
 #endif
 
         /*
          * Emergency bailout if postmaster has died.  This is to avoid the
          * necessity for manual cleanup of all postmaster children.
          */
-        if (wr & WL_POSTMASTER_DEATH)
+        if (wr == 1 && event.events == WL_POSTMASTER_DEATH)
             break;
     }                            /* end of outer loop */
 
@@ -4672,6 +4674,8 @@ PgstatCollectorMain(int argc, char *argv[])
      */
     pgstat_write_statsfiles(true, true);
 
+    FreeWaitEventSet(wes);
+
     exit(0);
 }
 
-- 
2.18.2


pgsql-hackers by date:

Previous
From: John Naylor
Date:
Subject: Re: truncating timestamps on arbitrary intervals
Next
From: Peter Eisentraut
Date:
Subject: Re: BEFORE ROW triggers for partitioned tables