[HACKERS] Inefficient shutdown of pg_basebackup - Mailing list pgsql-hackers

From Tom Lane
Subject [HACKERS] Inefficient shutdown of pg_basebackup
Date
Msg-id 6456.1493263884@sss.pgh.pa.us
Whole thread Raw
Responses Re: [HACKERS] Inefficient shutdown of pg_basebackup  (Simon Riggs <simon@2ndquadrant.com>)
List pgsql-hackers
I griped before that the src/test/recovery/ tests take an unreasonably
long time.  My interest in that was piqued further when I noticed that
the tests consume not very much CPU time, and aren't exactly saturating
my disks either.  That suggests that the problem isn't so much that the
tests do too much work, as that we've got dire performance problems in
either the test harness or the code under test.

While I'm continuing to poke at it, I've identified one such problem:
the system basically stops dead for about ten seconds at the end of
the pg_basebackup run invoked by t/001_stream_rep.pl.  The length of
the idle time corresponds to pg_basebackup's -s (standby_message_timeout)
parameter; you can make it even worse by increasing that parameter or
setting it to zero.  (In principle, setting it to zero ought to cause
pg_basebackup to never terminate at all :-( ... but apparently there is
some other effect that will wake it up after 30 seconds or so.  I've not
found out what yet.)

The reason for this appears to be that by the time the pg_basebackup
parent process has determined the xlogend position and sent it down
the bgpipe to the child process, the child process has already read
all the WAL that the source server is going to send, and is waiting
for more such input with a timeout corresponding to
standby_message_timeout.  Only after that timeout elapses does it
get around to noticing that some input is available from the bgpipe
and then realizing that it's time to stop streaming.

The attached draft patch fixes this by expanding the StreamCtl API
with a socket that the low-level wait routine should check for input.
For me, this consistently knocks about 10 seconds off the runtime of
001_stream_rep.pl.

It could be argued that this isn't too significant in the real world
because pg_basebackup would always run far longer than 10 seconds
anyway for non-toy data.  But it still looks like a bug to me.

            regards, tom lane

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 40ec0e1..e2a2ebb 100644
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
*************** LogStreamerMain(logstreamer_param *param
*** 480,485 ****
--- 480,490 ----
      stream.timeline = param->timeline;
      stream.sysidentifier = param->sysidentifier;
      stream.stream_stop = reached_end_position;
+ #ifndef WIN32
+     stream.stop_socket = bgpipe[0];
+ #else
+     stream.stop_socket = PGINVALID_SOCKET;
+ #endif
      stream.standby_message_timeout = standby_message_timeout;
      stream.synchronous = false;
      stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 1a9fe81..09385c5 100644
*** a/src/bin/pg_basebackup/pg_receivewal.c
--- b/src/bin/pg_basebackup/pg_receivewal.c
*************** StreamLog(void)
*** 409,414 ****
--- 409,415 ----
                  stream.timeline);

      stream.stream_stop = stop_streaming;
+     stream.stop_socket = PGINVALID_SOCKET;
      stream.standby_message_timeout = standby_message_timeout;
      stream.synchronous = synchronous;
      stream.do_sync = true;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8511e57..c41bba2 100644
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
*************** static bool still_sending = true;        /* fe
*** 39,46 ****

  static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
                   XLogRecPtr *stoppos);
! static int    CopyStreamPoll(PGconn *conn, long timeout_ms);
! static int    CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
                      int len, XLogRecPtr blockpos, TimestampTz *last_status);
  static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
--- 39,47 ----

  static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
                   XLogRecPtr *stoppos);
! static int    CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
! static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
!                   char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
                      int len, XLogRecPtr blockpos, TimestampTz *last_status);
  static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
*************** CheckServerVersionForStreaming(PGconn *c
*** 417,424 ****
--- 418,432 ----
   * return. As long as it returns false, streaming will continue
   * indefinitely.
   *
+  * If stream_stop() checks for external input, stop_socket should be set to
+  * the FD it checks.  This will allow such input to be detected promptly
+  * rather than after standby_message_timeout (which might be indefinite).
+  * Note that signals will interrupt waits for input as well, but that is
+  * race-y since a signal received while busy won't interrupt the wait.
+  *
   * standby_message_timeout controls how often we send a message
   * back to the master letting it know our progress, in milliseconds.
+  * Zero means no messages are sent.
   * This message will only contain the write location, and never
   * flush or replay.
   *
*************** HandleCopyStream(PGconn *conn, StreamCtl
*** 825,831 ****
          sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                                   last_status);

!         r = CopyStreamReceive(conn, sleeptime, ©buf);
          while (r != 0)
          {
              if (r == -1)
--- 833,839 ----
          sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                                   last_status);

!         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
          while (r != 0)
          {
              if (r == -1)
*************** HandleCopyStream(PGconn *conn, StreamCtl
*** 870,876 ****
               * Process the received data, and any subsequent data we can read
               * without blocking.
               */
!             r = CopyStreamReceive(conn, 0, ©buf);
          }
      }

--- 878,884 ----
               * Process the received data, and any subsequent data we can read
               * without blocking.
               */
!             r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
          }
      }

*************** error:
*** 881,900 ****
  }

  /*
!  * Wait until we can read CopyData message, or timeout.
   *
   * Returns 1 if data has become available for reading, 0 if timed out
!  * or interrupted by signal, and -1 on an error.
   */
  static int
! CopyStreamPoll(PGconn *conn, long timeout_ms)
  {
      int            ret;
      fd_set        input_mask;
      struct timeval timeout;
      struct timeval *timeoutptr;

!     if (PQsocket(conn) < 0)
      {
          fprintf(stderr, _("%s: invalid socket: %s"), progname,
                  PQerrorMessage(conn));
--- 889,913 ----
  }

  /*
!  * Wait until we can read a CopyData message,
!  * or timeout, or occurrence of a signal or input on the stop_socket.
!  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
   *
   * Returns 1 if data has become available for reading, 0 if timed out
!  * or interrupted by signal or stop_socket input, and -1 on an error.
   */
  static int
! CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
  {
      int            ret;
      fd_set        input_mask;
+     int            connsocket;
+     int            maxfd;
      struct timeval timeout;
      struct timeval *timeoutptr;

!     connsocket = PQsocket(conn);
!     if (connsocket < 0)
      {
          fprintf(stderr, _("%s: invalid socket: %s"), progname,
                  PQerrorMessage(conn));
*************** CopyStreamPoll(PGconn *conn, long timeou
*** 902,908 ****
      }

      FD_ZERO(&input_mask);
!     FD_SET(PQsocket(conn), &input_mask);

      if (timeout_ms < 0)
          timeoutptr = NULL;
--- 915,927 ----
      }

      FD_ZERO(&input_mask);
!     FD_SET(connsocket, &input_mask);
!     maxfd = connsocket;
!     if (stop_socket != PGINVALID_SOCKET)
!     {
!         FD_SET(stop_socket, &input_mask);
!         maxfd = Max(maxfd, stop_socket);
!     }

      if (timeout_ms < 0)
          timeoutptr = NULL;
*************** CopyStreamPoll(PGconn *conn, long timeou
*** 913,929 ****
          timeoutptr = &timeout;
      }

!     ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
!     if (ret == 0 || (ret < 0 && errno == EINTR))
!         return 0;                /* Got a timeout or signal */
!     else if (ret < 0)
      {
          fprintf(stderr, _("%s: select() failed: %s\n"),
                  progname, strerror(errno));
          return -1;
      }

!     return 1;
  }

  /*
--- 932,951 ----
          timeoutptr = &timeout;
      }

!     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
!
!     if (ret < 0)
      {
+         if (errno == EINTR)
+             return 0;            /* Got a signal, so not an error */
          fprintf(stderr, _("%s: select() failed: %s\n"),
                  progname, strerror(errno));
          return -1;
      }
+     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
+         return 1;                /* Got input on connection socket */

!     return 0;                    /* Got timeout or input on stop_socket */
  }

  /*
*************** CopyStreamPoll(PGconn *conn, long timeou
*** 934,944 ****
   * point to a buffer holding the received message. The buffer is only valid
   * until the next CopyStreamReceive call.
   *
!  * 0 if no data was available within timeout, or wait was interrupted
!  * by signal. -1 on error. -2 if the server ended the COPY.
   */
  static int
! CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
  {
      char       *copybuf = NULL;
      int            rawlen;
--- 956,968 ----
   * point to a buffer holding the received message. The buffer is only valid
   * until the next CopyStreamReceive call.
   *
!  * Returns 0 if no data was available within timeout, or if wait was
!  * interrupted by signal or stop_socket input.
!  * -1 on error. -2 if the server ended the COPY.
   */
  static int
! CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
!                   char **buffer)
  {
      char       *copybuf = NULL;
      int            rawlen;
*************** CopyStreamReceive(PGconn *conn, long tim
*** 951,970 ****
      rawlen = PQgetCopyData(conn, ©buf, 1);
      if (rawlen == 0)
      {
          /*
!          * No data available. Wait for some to appear, but not longer than the
!          * specified timeout, so that we can ping the server.
           */
!         if (timeout != 0)
!         {
!             int            ret;
!
!             ret = CopyStreamPoll(conn, timeout);
!             if (ret <= 0)
!                 return ret;
!         }

!         /* Else there is actually data on the socket */
          if (PQconsumeInput(conn) == 0)
          {
              fprintf(stderr,
--- 975,992 ----
      rawlen = PQgetCopyData(conn, ©buf, 1);
      if (rawlen == 0)
      {
+         int            ret;
+
          /*
!          * No data available.  Wait for some to appear, but not longer than
!          * the specified timeout, so that we can ping the server.  Also stop
!          * waiting if input appears on stop_socket.
           */
!         ret = CopyStreamPoll(conn, timeout, stop_socket);
!         if (ret <= 0)
!             return ret;

!         /* Now there is actually data on the socket */
          if (PQconsumeInput(conn) == 0)
          {
              fprintf(stderr,
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 42e93ac..9a51d9a 100644
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
*************** typedef struct StreamCtl
*** 42,47 ****
--- 42,50 ----

      stream_stop_callback stream_stop;    /* Stop streaming when returns true */

+     pgsocket    stop_socket;    /* if valid, watch for input on this socket
+                                  * and check stream_stop() when there is any */
+
      WalWriteMethod *walmethod;    /* How to write the WAL */
      char       *partial_suffix; /* Suffix appended to partially received files */
      char       *replication_slot;        /* Replication slot to use, or NULL */

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: [HACKERS] Logical replication in the same cluster
Next
From: Amit Langote
Date:
Subject: [HACKERS] Crash when partition column specified twice