Re: Escaping from blocked send() reprised. - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Escaping from blocked send() reprised.
Date
Msg-id 20140828.214704.93968088.horiguchi.kyotaro@lab.ntt.co.jp
Whole thread Raw
In response to Re: Escaping from blocked send() reprised.  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Responses Re: Escaping from blocked send() reprised.
Re: Escaping from blocked send() reprised.
List pgsql-hackers
Hello, sorry for the dazed reply in the previous mail.

I made revised patch for this issue.

Attached patches are following,

- 0001_Revise_socket_emulation_for_win32_backend.patch
 Revises socket emulation on win32 backend so that each socket can have its own blocking mode state.

- 0002_Allow_backend_termination_during_write_blocking.patch
 The patch to solve the issue. This patch depends on the 0001_ patch.

==========

> > I'm marking this as Waiting on Author in the commitfest app, because:
> > 1. the protocol violation needs to be avoided one way or another, and
> > 2. the behavior needs to be consistent so that a single
> > pg_terminate_backend() is enough to always kill the connection.

- Preventing protocol violation.
 To prevent protocol violation, secure_write sets ClientConnectionLost when SIGTERM detected, then internal_flush() and
ProcessInterrupts()follow the instruction.
 

- Single pg_terminate_backend surely kills the backend.
 secure_raw_write() uses non-blocking socket and a loop of select() with timeout to surely detects received
signal(SIGTERM).
 To avoid frequent switching of blocking mode, the bare socket for Port is put to non-blocking mode from the first in
StreamConnection()and blocking mode is controlled only by Port->noblock in secure_raw_read/write().
 
 To make the code mentioned above (Patch 0002) tidy, rewrite the socket emulation code for win32 backends so that each
socketcan have its own non-blocking state. (patch 0001)
 

Some concern about this patch,

- This patch allows the number of non-blocking socket to be below 64 (FD_SETSIZE) on win32 backend but it seems to be
sufficient.

- This patch introduced redundant socket emulation for win32 backend but win32 bare socket for Port is already
nonblockingas described so it donsn't seem to be a serious problem on performance. Addition to it, since I don't know
thereason why win32/socket.c provides the blocking-mode socket emulation, I decided to preserve win32/socket.c to have
blockingsocket emulation. Possibly it can be removed.
 

Any suggestions?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 605d891..c92851e 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -795,10 +795,6 @@ pq_set_nonblocking(bool nonblocking)    if (MyProcPort->noblock == nonblocking)        return;
-#ifdef WIN32
-    pgwin32_noblock = nonblocking ? 1 : 0;
-#else
-    /*     * Use COMMERROR on failure, because ERROR would try to send the error to     * the client, which might
requirechanging the mode again, leading to
 
@@ -816,7 +812,7 @@ pq_set_nonblocking(bool nonblocking)            ereport(COMMERROR,
(errmsg("couldnot set socket to blocking mode: %m")));    }
 
-#endif
+    MyProcPort->noblock = nonblocking;}
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index c981169..f0ff3e7 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -21,11 +21,8 @@ * non-blocking mode in order to be able to deliver signals, we must * specify this in a separate
flagif we actually need non-blocking * operation.
 
- *
- * This flag changes the behaviour *globally* for all socket operations,
- * so it should only be set for very short periods of time. */
-int            pgwin32_noblock = 0;
+static fd_set        nonblockset;#undef socket#undef accept
@@ -33,6 +30,7 @@ int            pgwin32_noblock = 0;#undef select#undef recv#undef send
+#undef closesocket/* * Blocking socket functions implemented so they listen on both
@@ -40,6 +38,34 @@ int            pgwin32_noblock = 0; *//*
+ * Set blocking mode for each socket
+ */
+void
+pgwin32_set_socket_nonblock(SOCKET s, int nonblock)
+{
+    if (nonblock)
+        FD_SET(s, &nonblockset);
+    else
+        FD_CLR(s, &nonblockset);
+
+    /*
+     * fd_set cannot have more than FD_SETSIZE entries. It's not likey to come
+     * close to this limit but if it goes above the limit, non blocking state
+     * of some existing sockets will be discarded.
+     */
+    if (nonblockset.fd_count >= FD_SETSIZE)
+        elog(FATAL, "Too many sockets requested to be nonblocking mode.");
+}
+
+void
+pgwin32_nonblockset_init()
+{
+    FD_ZERO(&nonblockset);
+}
+
+#define socket_is_nonblocking(s) FD_ISSET((s), &nonblockset)
+
+/* * Convert the last socket error code into errno */static void
@@ -256,6 +282,10 @@ pgwin32_socket(int af, int type, int protocol)        TranslateSocketError();        return
INVALID_SOCKET;   }
 
+
+    /* newly cerated socket should be in blocking mode  */
+    pgwin32_set_socket_nonblock(s, false);
+    errno = 0;    return s;
@@ -334,7 +364,7 @@ pgwin32_recv(SOCKET s, char *buf, int len, int f)        return -1;    }
-    if (pgwin32_noblock)
+    if (socket_is_nonblocking(s))    {        /*         * No data received, and we are in "emulated non-blocking
mode",so
 
@@ -420,7 +450,7 @@ pgwin32_send(SOCKET s, const void *buf, int len, int flags)            return -1;        }
-        if (pgwin32_noblock)
+        if (socket_is_nonblocking(s))        {            /*             * No data sent, and we are in "emulated
non-blockingmode", so
 
@@ -645,6 +675,15 @@ pgwin32_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, c/*
+ * Unused entry in nonblockset needs to be removed when closing socket.
+ */
+int pgwin32_closesocket(SOCKET s)
+{
+    pgwin32_set_socket_nonblock(s, false);
+    return closesocket(s);
+}
+
+/* * Return win32 error string, since strerror can't * handle winsock codes */
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index c7f41a5..72e0576 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3255,22 +3255,10 @@ PgstatCollectorMain(int argc, char *argv[])            /*             * Try to receive and
processa message.  This will not block,             * since the socket is set to non-blocking mode.
 
-             *
-             * XXX On Windows, we have to force pgwin32_recv to cooperate,
-             * despite the previous use of pg_set_noblock() on the socket.
-             * This is extremely broken and should be fixed someday.             */
-#ifdef WIN32
-            pgwin32_noblock = 1;
-#endif
-            len = recv(pgStatSock, (char *) &msg,                       sizeof(PgStat_Msg), 0);
-#ifdef WIN32
-            pgwin32_noblock = 0;
-#endif
-            if (len < 0)            {                if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index b190cf5..5d32de6 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -896,6 +896,10 @@ PostmasterMain(int argc, char *argv[])     */    InitializeMaxBackends();
+#ifdef WIN32    
+    pgwin32_nonblockset_init();
+#endif
+    /*     * Establish input sockets.     */
diff --git a/src/include/port/win32.h b/src/include/port/win32.h
index 550c3ec..b0df45e 100644
--- a/src/include/port/win32.h
+++ b/src/include/port/win32.h
@@ -368,6 +368,7 @@ void        pg_queue_signal(int signum);#define select(n, r, w, e, timeout) pgwin32_select(n, r, w,
e,timeout)#define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags)#define send(s, buf, len, flags)
pgwin32_send(s,buf, len, flags)
 
+#define closesocket(s) pgwin32_closesocket(s)SOCKET        pgwin32_socket(int af, int type, int protocol);SOCKET
pgwin32_accept(SOCKET s, struct sockaddr * addr, int *addrlen);
 
@@ -375,11 +376,12 @@ int            pgwin32_connect(SOCKET s, const struct sockaddr * name, int namelen);int
pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptfds, const struct timeval * timeout);int
    pgwin32_recv(SOCKET s, char *buf, int len, int flags);int            pgwin32_send(SOCKET s, const void *buf, int
len,int flags);
 
+int            pgwin32_closesocket(SOCKET s);const char *pgwin32_socket_strerror(int err);int
pgwin32_waitforsinglesocket(SOCKETs, int what, int timeout);
 
-
-extern int    pgwin32_noblock;
+void        pgwin32_set_socket_nonblock(SOCKET s, int nonblock);
+void        pgwin32_nonblockset_init();/* in backend/port/win32/security.c */extern int    pgwin32_is_admin(void);
diff --git a/src/port/noblock.c b/src/port/noblock.c
index 1da0339..d6cc6a2 100644
--- a/src/port/noblock.c
+++ b/src/port/noblock.c
@@ -25,9 +25,18 @@ pg_set_noblock(pgsocket sock)#else    unsigned long ioctlsocket_ret = 1;
+#ifndef FRONTEND
+    /*
+     * sockets on non-frontend processes on win32 is wrapped and blocking mode
+     * is controlled there. See socket.c for the details.
+     */
+    pgwin32_set_socket_nonblock(sock, true);
+    return 1;
+#else    /* Returns non-0 on failure, while fcntl() returns -1 on failure */    return (ioctlsocket(sock, FIONBIO,
&ioctlsocket_ret)== 0);
 
-#endif
+#endif /* FRONTEND */
+#endif /* !WIN32   */}
@@ -41,10 +50,16 @@ pg_set_block(pgsocket sock)    if (flags < 0 || fcntl(sock, F_SETFL, (long) (flags & ~O_NONBLOCK)))
      return false;    return true;
 
-#else
+#else /* !WIN32   */    unsigned long ioctlsocket_ret = 0;
+#ifndef FRONTEND
+    /*  See pg_set_noblock */
+    pgwin32_set_socket_nonblock(sock, false);
+    return 1;
+#else    /* Returns non-0 on failure, while fcntl() returns -1 on failure */    return (ioctlsocket(sock, FIONBIO,
&ioctlsocket_ret)== 0);
 
-#endif
+#endif /* FRONTEND */
+#endif /* !WIN32   */}
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 41ec1ad..fbb4c47 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -34,7 +34,7 @@#include "libpq/libpq.h"#include "tcop/tcopprot.h"#include "utils/memutils.h"
-
+#include "miscadmin.h"char       *ssl_cert_file;char       *ssl_key_file;
@@ -140,6 +140,10 @@ secure_read(Port *port, void *ptr, size_t len)    return n;}
+/*
+ *  Read data from socket.
+ *  This emulates blocking behavior using non-blocking sockets.
+ */ssize_tsecure_raw_read(Port *port, void *ptr, size_t len){
@@ -147,8 +151,34 @@ secure_raw_read(Port *port, void *ptr, size_t len)    prepare_for_client_read();
-    n = recv(port->sock, ptr, len, 0);
+    if (port->noblock)
+        n = recv(port->sock, ptr, len, 0);
+    else
+    {
+        do
+        {
+            fd_set rfds;
+
+            FD_ZERO(&rfds);
+            FD_SET(port->sock, &rfds);
+            /*
+             * In contrast to secure_raw_write, this section runs with
+             * ImmediateInterruptOK = true so we can wait forever in
+             * select.
+             */
+            n = select(port->sock + 1, &rfds, NULL, NULL, NULL);
+            if (n < 0) break;
+
+            n = recv(port->sock, ptr, len, 0);
+
+            /*
+             * We should have something to read here so EAGAIN/EWOULDBLOCK is
+             * likey not to be seen. But we check them here not to return
+             * these error numbers for blocking sockets for the caller.
+             */
+        } while (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+    }    client_read_ended();    return n;
@@ -178,5 +208,77 @@ secure_write(Port *port, void *ptr, size_t len)ssize_tsecure_raw_write(Port *port, const void
*ptr,size_t len){
 
-    return send(port->sock, ptr, len, 0);
+    int ret = 0;
+
+    /*
+     * Port socket is always in non-blocking mode. See StreamConnection for
+     * the details.
+     */
+    ret = send(port->sock, ptr, len, 0);
+
+    /* We can return here regardless of blocking mode in the most cases */
+    if (port->noblock || ret > 0 || len == 0)
+        return ret;
+
+    /* Here, we shold block waiting for the room in send buffer. */
+    while(ret < 1 && !ProcDiePending)
+    {
+        fd_set wfds;
+        struct timeval tv;
+        int i = 0;
+
+        FD_ZERO(&wfds);
+        tv.tv_usec = 0;
+
+        /*
+         * We may get terminate signal (SIGTERM) during write blocking. If we
+         * check ProcDiePending then wait by select indefinitely, SIGTERM
+         * comes after the check and before the select will be pending and we
+         * should wait the second SIGTERM. So we periodically wake up to check
+         * ProcDiePending in order to catch the signal surely.  The timeout
+         * for the select is the maximum delay of handling the signal. 1
+         * seconds groundlessly seems to be appropreate.
+         */
+        do
+        {
+            FD_SET(port->sock, &wfds);
+            tv.tv_sec = 1;
+            tv.tv_usec = 0;
+
+            ret = select(port->sock + 1, NULL, &wfds, NULL, &tv);
+        } while (!ProcDiePending && ret == 0);
+
+        if (ProcDiePending || ret < 0)
+            break;
+
+        ret = send(port->sock, ptr, len, 0);
+        if (ProcDiePending)
+            break;
+        if (ret < 0)
+        {
+            if (errno != EAGAIN && errno != EWOULDBLOCK)
+                break;
+
+            /*
+             * This loop might run a busy loop if send(2) returned EAGAIN or
+             * EWOULDBLOCK after select(2) returned normally. Sleep expressly
+             * to avoid the busy loop.
+             */
+            pg_usleep(200000L); /* 200 ms */
+            ret = 0;
+        }
+    }
+
+    if (ProcDiePending)
+    {
+        /*
+         * Allow to terminate this backend. ClientConnectionLost prevents any
+         * more bytes including error messages from being sent to
+         * client. errno is set in order to teach ssl layer not to retry.
+         */
+        ClientConnectionLost = 1;
+        errno = ECONNRESET;
+    }
+    
+    return ret;}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index c92851e..8387d6a 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -718,6 +718,17 @@ StreamConnection(pgsocket server_fd, Port *port)        (void)
pq_setkeepalivescount(tcp_keepalives_count,port);    }
 
+    /*
+     * Put this socket to non-blocking mode. Blocking behavior is emulated in
+     * secure_write() and secure_read().
+     * Use COMMERROR on failure, because ERROR would try to send the error to
+     * the client, which might require changing the mode again, leading to
+     * infinite recursion.
+     */
+    if (!pg_set_noblock(port->sock))
+        ereport(COMMERROR,
+                (errmsg("could not set socket to nonblocking mode: %m")));
+    return STATUS_OK;}
@@ -792,27 +803,6 @@ TouchSocketFiles(void)static voidpq_set_nonblocking(bool nonblocking){
-    if (MyProcPort->noblock == nonblocking)
-        return;
-
-    /*
-     * Use COMMERROR on failure, because ERROR would try to send the error to
-     * the client, which might require changing the mode again, leading to
-     * infinite recursion.
-     */
-    if (nonblocking)
-    {
-        if (!pg_set_noblock(MyProcPort->sock))
-            ereport(COMMERROR,
-                    (errmsg("could not set socket to nonblocking mode: %m")));
-    }
-    else
-    {
-        if (!pg_set_block(MyProcPort->sock))
-            ereport(COMMERROR,
-                    (errmsg("could not set socket to blocking mode: %m")));
-    }
-    MyProcPort->noblock = nonblocking;}
@@ -1249,34 +1239,38 @@ internal_flush(void)        if (r <= 0)        {
-            if (errno == EINTR)
-                continue;        /* Ok if we were interrupted */
-
-            /*
-             * Ok if no data writable without blocking, and the socket is in
-             * non-blocking mode.
-             */
-            if (errno == EAGAIN ||
-                errno == EWOULDBLOCK)
+            if (!ClientConnectionLost)            {
+                if (errno == EINTR)
+                    continue;        /* Ok if we were interrupted */
+
+                /*
+                 * Ok if no data writable without blocking, and the socket is in
+                 * non-blocking mode.
+                 */
+                if (errno == EAGAIN ||
+                    errno == EWOULDBLOCK)
+                {                return 0;
-            }
-
-            /*
-             * Careful: an ereport() that tries to write to the client would
-             * cause recursion to here, leading to stack overflow and core
-             * dump!  This message must go *only* to the postmaster log.
-             *
-             * If a client disconnects while we're in the midst of output, we
-             * might write quite a bit of data before we get to a safe query
-             * abort point.  So, suppress duplicate log messages.
-             */
-            if (errno != last_reported_send_errno)
-            {
-                last_reported_send_errno = errno;
-                ereport(COMMERROR,
-                        (errcode_for_socket_access(),
-                         errmsg("could not send data to client: %m")));
+                }
+
+                /*
+                 * Careful: an ereport() that tries to write to the client
+                 * would cause recursion to here, leading to stack overflow
+                 * and core dump!  This message must go *only* to the
+                 * postmaster log.
+                 *
+                 * If a client disconnects while we're in the midst of output,
+                 * we might write quite a bit of data before we get to a safe
+                 * query abort point.  So, suppress duplicate log messages.
+                 */
+                if (errno != last_reported_send_errno)
+                {
+                    last_reported_send_errno = errno;
+                    ereport(COMMERROR,
+                            (errcode_for_socket_access(),
+                             errmsg("could not send data to client: %m")));
+                }            }            /*
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 5d32de6..d979191 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -5789,6 +5789,13 @@ read_inheritable_socket(SOCKET *dest, InheritableSocket *src)        *dest = s;        /*
+         * We didn't inherit emulated blocking mode but port socket should be
+         * always in nonblocking mode. pg_set_noblock() on win32 backend won't
+         * return error.
+         */
+        pg_set_noblock(s);
+
+        /*         * To make sure we don't get two references to the same socket, close         * the original one.
(Thiswould happen when inheritance actually         * works..
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7b5480f..1d252e7 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2840,8 +2840,16 @@ ProcessInterrupts(void)        ImmediateInterruptOK = false;    /* not idle anymore */
DisableNotifyInterrupt();       DisableCatchupInterrupt();
 
-        /* As in quickdie, don't risk sending to client during auth */
-        if (ClientAuthInProgress && whereToSendOutput == DestRemote)
+        /*
+         *  As in quickdie, don't risk sending to client during auth. In
+         *  addition to that, don't try to send any more to client if current
+         *  connection is marked as ClientConnectionLost. It will lead to
+         *  protocol violation if the truth is that the connection is living
+         *  and amid sending data. Such case will occur if this backend was
+         *  terminated during waiting for query result to be sent.
+         */
+        if ((ClientAuthInProgress && whereToSendOutput == DestRemote) ||
+            ClientConnectionLost)            whereToSendOutput = DestNone;        if (IsAutoVacuumWorkerProcess())
      ereport(FATAL, 

pgsql-hackers by date:

Previous
From: Pavel Stehule
Date:
Subject: Re: [Fwd: Re: proposal: new long psql parameter --on-error-stop]
Next
From: Heikki Linnakangas
Date:
Subject: Re: Selectivity estimation for inet operators