Thread: Stats collector performance improvement

Stats collector performance improvement

From
Bruce Momjian
Date:
Tom Lane wrote:
> Michael Fuhr <mike@fuhr.org> writes:
> > Further tests show that for this application
> > the killer is stats_command_string, not stats_block_level or
> > stats_row_level.
>
> I tried it with pgbench -c 10, and got these results:
>     41% reduction in TPS rate for stats_command_string
>     9% reduction in TPS rate for stats_block/row_level (any combination)
>
> strace'ing a backend confirms my belief that stats_block/row_level send
> just one stats message per transaction (at least for the relatively
> small number of tables touched per transaction by pgbench).  However
> stats_command_string sends 14(!) --- there are seven commands per
> pgbench transaction and each results in sending a <command> message and
> later an <IDLE> message.
>
> Given the rather lackadaisical way in which the stats collector makes
> the data available, it seems like the backends are being much too
> enthusiastic about posting their stats_command_string status
> immediately.  Might be worth thinking about how to cut back the
> overhead by suppressing some of these messages.

I did some research on this because the numbers Tom quotes indicate there
is something wrong in the way we process stats_command_string
statistics.

I made a small test script:

    if [ ! -f /tmp/pgstat.sql ]
    then    i=0
        while [ $i -lt 10000 ]
        do
            i=`expr $i + 1`
            echo "SELECT 1;"
        done > /tmp/pgstat.sql
    fi

    time sql test </tmp/pgstat.sql >/dev/null

This sends 10,000 "SELECT 1" queries to the backend, and reports the
execution time.  I found that without stats_command_string defined, it
ran in 3.5 seconds.  With stats_command_string defined, it took 5.5
seconds, meaning the command string is causing a 57% slowdown.  That is
way too much considering that the SELECT 1 has to be send from psql to
the backend, parsed, optimized, and executed, and the result returned to
the psql, while stats_command_string only has to send a string to a
backend collector.  There is _no_ way that collector should take 57% of
the time it takes to run the actual query.

With the test program, I tried various options.  The basic code we have
sends a UDP packet to a statistics buffer process, which recv()'s the
packet, puts it into a memory queue buffer, and writes it to a pipe()
that is read by the statistics collector process which processes the
packet.

I tried various ways of speeding up the buffer and collector processes.
I found if I put a pg_usleep(100) in the buffer process the backend
speed was good, but packets were lost.  What I found worked well was to
do multiple recv() calls in a loop.  The previous code did a select(),
then perhaps a recv() and pipe write() based on the results of the
select().  This caused many small packets to be written to the pipe and
the pipe write overhead seems fairly large.  The best fix I found was to
loop over the recv() call at most 25 times, collecting a group of
packets that can then be sent to the collector in one pipe write.  The
recv() socket is non-blocking, so a zero return indicates there are no
more packets available.  Patch attached.

This change reduced the stats_command_string time from 5.5 to 3.9, which
is closer to the 3.5 seconds with stats_command_string off.

A second improvement I discovered is that the statistics collector is
calling gettimeofday() for every packet received, so it can determine
the timeout for the select() call to write the flat file.  I removed
that behavior and instead used setitimer() to issue a SIGINT every
500ms, which was the original behavior.  This eliminates the
gettimeofday() call and makes the code cleaner.  Second patch attached.

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
Index: src/backend/postmaster/pgstat.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v
retrieving revision 1.116
diff -c -c -r1.116 pgstat.c
*** src/backend/postmaster/pgstat.c    2 Jan 2006 00:58:00 -0000    1.116
--- src/backend/postmaster/pgstat.c    2 Jan 2006 18:36:43 -0000
***************
*** 1911,1916 ****
--- 1911,1918 ----
       */
      for (;;)
      {
+ loop_again:
+
          FD_ZERO(&rfds);
          FD_ZERO(&wfds);
          maxfd = -1;
***************
*** 1970,2014 ****
           */
          if (FD_ISSET(pgStatSock, &rfds))
          {
!             len = recv(pgStatSock, (char *) &input_buffer,
!                        sizeof(PgStat_Msg), 0);
!             if (len < 0)
!                 ereport(ERROR,
!                         (errcode_for_socket_access(),
!                          errmsg("could not read statistics message: %m")));
!
!             /*
!              * We ignore messages that are smaller than our common header
!              */
!             if (len < sizeof(PgStat_MsgHdr))
!                 continue;
!
!             /*
!              * The received length must match the length in the header
!              */
!             if (input_buffer.msg_hdr.m_size != len)
!                 continue;
!
              /*
!              * O.K. - we accept this message.  Copy it to the circular
!              * msgbuffer.
               */
!             frm = 0;
!             while (len > 0)
              {
!                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
!                 if (xfr > len)
!                     xfr = len;
!                 Assert(xfr > 0);
!                 memcpy(msgbuffer + msg_recv,
!                        ((char *) &input_buffer) + frm,
!                        xfr);
!                 msg_recv += xfr;
!                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
!                     msg_recv = 0;
!                 msg_have += xfr;
!                 frm += xfr;
!                 len -= xfr;
              }
          }

--- 1972,2033 ----
           */
          if (FD_ISSET(pgStatSock, &rfds))
          {
!             int loops = 0;
!
              /*
!              *    While pipewrite() can send multiple data packets, recv() pulls
!              *    only a single packet per call.  For busy systems, doing
!              *    multiple recv() calls and then one pipewrite() can improve
!              *    query speed by 40%.  25 was chosen because 25 packets should
!              *    easily fit in a single pipewrite() call.  recv()'s socket is
!              *    non-blocking.
               */
!             while (++loops < 25 &&
!                    (len = recv(pgStatSock, (char *) &input_buffer,
!                                sizeof(PgStat_Msg), 0)) != 0)
              {
!                 if (len < 0)
!                 {
!                     if (errno == EAGAIN)
!                         continue;
!                     ereport(ERROR,
!                             (errcode_for_socket_access(),
!                              errmsg("could not read statistics message: %m")));
!                 }
!
!                 /*
!                  * We ignore messages that are smaller than our common header
!                  */
!                 if (len < sizeof(PgStat_MsgHdr))
!                     goto loop_again;
!
!                 /*
!                  * The received length must match the length in the header
!                  */
!                 if (input_buffer.msg_hdr.m_size != len)
!                     goto loop_again;
!
!                 /*
!                  * O.K. - we accept this message.  Copy it to the circular
!                  * msgbuffer.
!                  */
!                 frm = 0;
!                 while (len > 0)
!                 {
!                     xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
!                     if (xfr > len)
!                         xfr = len;
!                     Assert(xfr > 0);
!                     memcpy(msgbuffer + msg_recv,
!                            ((char *) &input_buffer) + frm,
!                            xfr);
!                     msg_recv += xfr;
!                     if (msg_recv == PGSTAT_RECVBUFFERSZ)
!                         msg_recv = 0;
!                     msg_have += xfr;
!                     frm += xfr;
!                     len -= xfr;
!                 }
              }
          }

***************
*** 2023,2029 ****
           * caught up, or because more data arrives so that we have more than
           * PIPE_BUF bytes buffered).  This is not good, but is there any way
           * around it?  We have no way to tell when the collector has caught
!          * up...
           */
          if (FD_ISSET(writePipe, &wfds))
          {
--- 2042,2048 ----
           * caught up, or because more data arrives so that we have more than
           * PIPE_BUF bytes buffered).  This is not good, but is there any way
           * around it?  We have no way to tell when the collector has caught
!          * up.  Followup, the pipe rarely fills up.
           */
          if (FD_ISSET(writePipe, &wfds))
          {
Index: src/backend/postmaster/pgstat.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v
retrieving revision 1.116
diff -c -c -r1.116 pgstat.c
*** src/backend/postmaster/pgstat.c    2 Jan 2006 00:58:00 -0000    1.116
--- src/backend/postmaster/pgstat.c    2 Jan 2006 18:21:28 -0000
***************
*** 145,150 ****
--- 145,151 ----
  static PgStat_StatBeEntry *pgStatBeTable = NULL;
  static int    pgStatNumBackends = 0;

+ static volatile bool    need_statwrite;

  /* ----------
   * Local function forward declarations
***************
*** 164,169 ****
--- 165,171 ----

  NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
  NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
+ static void force_statwrite(SIGNAL_ARGS);
  static void pgstat_recvbuffer(void);
  static void pgstat_exit(SIGNAL_ARGS);
  static void pgstat_die(SIGNAL_ARGS);
***************
*** 1548,1560 ****
      PgStat_Msg    msg;
      fd_set        rfds;
      int            readPipe;
-     int            nready;
      int            len = 0;
!     struct timeval timeout;
!     struct timeval next_statwrite;
!     bool        need_statwrite;
      HASHCTL        hash_ctl;
!
      MyProcPid = getpid();        /* reset MyProcPid */

      /*
--- 1550,1560 ----
      PgStat_Msg    msg;
      fd_set        rfds;
      int            readPipe;
      int            len = 0;
!     struct itimerval timeval;
      HASHCTL        hash_ctl;
!     bool        need_timer = false;
!
      MyProcPid = getpid();        /* reset MyProcPid */

      /*
***************
*** 1572,1578 ****
      /* kluge to allow buffer process to kill collector; FIXME */
      pqsignal(SIGQUIT, pgstat_exit);
  #endif
!     pqsignal(SIGALRM, SIG_IGN);
      pqsignal(SIGPIPE, SIG_IGN);
      pqsignal(SIGUSR1, SIG_IGN);
      pqsignal(SIGUSR2, SIG_IGN);
--- 1572,1578 ----
      /* kluge to allow buffer process to kill collector; FIXME */
      pqsignal(SIGQUIT, pgstat_exit);
  #endif
!     pqsignal(SIGALRM, force_statwrite);
      pqsignal(SIGPIPE, SIG_IGN);
      pqsignal(SIGUSR1, SIG_IGN);
      pqsignal(SIGUSR2, SIG_IGN);
***************
*** 1597,1608 ****
      init_ps_display("stats collector process", "", "");
      set_ps_display("");

-     /*
-      * Arrange to write the initial status file right away
-      */
-     gettimeofday(&next_statwrite, NULL);
      need_statwrite = TRUE;

      /*
       * Read in an existing statistics stats file or initialize the stats to
       * zero.
--- 1597,1608 ----
      init_ps_display("stats collector process", "", "");
      set_ps_display("");

      need_statwrite = TRUE;

+     MemSet(&timeval, 0, sizeof(struct itimerval));
+     timeval.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
+     timeval.it_value.tv_usec = PGSTAT_STAT_INTERVAL % 1000;
+
      /*
       * Read in an existing statistics stats file or initialize the stats to
       * zero.
***************
*** 1634,1667 ****
       */
      for (;;)
      {
-         /*
-          * If we need to write the status file again (there have been changes
-          * in the statistics since we wrote it last) calculate the timeout
-          * until we have to do so.
-          */
          if (need_statwrite)
          {
!             struct timeval now;
!
!             gettimeofday(&now, NULL);
!             /* avoid assuming that tv_sec is signed */
!             if (now.tv_sec > next_statwrite.tv_sec ||
!                 (now.tv_sec == next_statwrite.tv_sec &&
!                  now.tv_usec >= next_statwrite.tv_usec))
!             {
!                 timeout.tv_sec = 0;
!                 timeout.tv_usec = 0;
!             }
!             else
!             {
!                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
!                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
!                 if (timeout.tv_usec < 0)
!                 {
!                     timeout.tv_sec--;
!                     timeout.tv_usec += 1000000;
!                 }
!             }
          }

          /*
--- 1634,1644 ----
       */
      for (;;)
      {
          if (need_statwrite)
          {
!             pgstat_write_statsfile();
!             need_statwrite = false;
!             need_timer = true;
          }

          /*
***************
*** 1673,1681 ****
          /*
           * Now wait for something to do.
           */
!         nready = select(readPipe + 1, &rfds, NULL, NULL,
!                         (need_statwrite) ? &timeout : NULL);
!         if (nready < 0)
          {
              if (errno == EINTR)
                  continue;
--- 1650,1656 ----
          /*
           * Now wait for something to do.
           */
!         if (select(readPipe + 1, &rfds, NULL, NULL, NULL) < 0)
          {
              if (errno == EINTR)
                  continue;
***************
*** 1685,1702 ****
          }

          /*
-          * If there are no descriptors ready, our timeout for writing the
-          * stats file happened.
-          */
-         if (nready == 0)
-         {
-             pgstat_write_statsfile();
-             need_statwrite = FALSE;
-
-             continue;
-         }
-
-         /*
           * Check if there is a new statistics message to collect.
           */
          if (FD_ISSET(readPipe, &rfds))
--- 1660,1665 ----
***************
*** 1813,1829 ****
               */
              pgStatNumMessages++;

!             /*
!              * If this is the first message after we wrote the stats file the
!              * last time, setup the timeout that it'd be written.
!              */
!             if (!need_statwrite)
              {
!                 gettimeofday(&next_statwrite, NULL);
!                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
!                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
!                 next_statwrite.tv_usec %= 1000000;
!                 need_statwrite = TRUE;
              }
          }

--- 1776,1787 ----
               */
              pgStatNumMessages++;

!             if (need_timer)
              {
!                 if (setitimer(ITIMER_REAL, &timeval, NULL))
!                     ereport(ERROR,
!                           (errmsg("unable to set statistics collector timer: %m")));
!                 need_timer = false;
              }
          }

***************
*** 1848,1853 ****
--- 1806,1818 ----
  }


+ static void
+ force_statwrite(SIGNAL_ARGS)
+ {
+     need_statwrite = true;
+ }
+
+
  /* ----------
   * pgstat_recvbuffer() -
   *

Re: Stats collector performance improvement

From
Tom Lane
Date:
Bruce Momjian <pgman@candle.pha.pa.us> writes:
> I found if I put a pg_usleep(100) in the buffer process the backend
> speed was good, but packets were lost.  What I found worked well was to
> do multiple recv() calls in a loop.  The previous code did a select(),
> then perhaps a recv() and pipe write() based on the results of the
> select().  This caused many small packets to be written to the pipe and
> the pipe write overhead seems fairly large.  The best fix I found was to
> loop over the recv() call at most 25 times, collecting a group of
> packets that can then be sent to the collector in one pipe write.  The
> recv() socket is non-blocking, so a zero return indicates there are no
> more packets available.  Patch attached.

This seems incredibly OS-specific.  How many platforms did you test it
on?

A more serious objection is that it will cause the stats machinery to
work very poorly if there isn't a steady stream of incoming messages.
You can't just sit on 24 messages until the 25th one arrives next week.

            regards, tom lane

Re: Stats collector performance improvement

From
Bruce Momjian
Date:
Tom Lane wrote:
> Bruce Momjian <pgman@candle.pha.pa.us> writes:
> > I found if I put a pg_usleep(100) in the buffer process the backend
> > speed was good, but packets were lost.  What I found worked well was to
> > do multiple recv() calls in a loop.  The previous code did a select(),
> > then perhaps a recv() and pipe write() based on the results of the
> > select().  This caused many small packets to be written to the pipe and
> > the pipe write overhead seems fairly large.  The best fix I found was to
> > loop over the recv() call at most 25 times, collecting a group of
> > packets that can then be sent to the collector in one pipe write.  The
> > recv() socket is non-blocking, so a zero return indicates there are no
> > more packets available.  Patch attached.
>
> This seems incredibly OS-specific.  How many platforms did you test it
> on?

Only mine.  I am posting the patch so others can test it, of course.

> A more serious objection is that it will cause the stats machinery to
> work very poorly if there isn't a steady stream of incoming messages.
> You can't just sit on 24 messages until the 25th one arrives next week.

You wouldn't.  It exits out of the loop on a not found, checks the pipe
write descriptor, and writes on it.

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

Re: Stats collector performance improvement

From
Simon Riggs
Date:
On Mon, 2006-01-02 at 13:40 -0500, Bruce Momjian wrote:

> This change reduced the stats_command_string time from 5.5 to 3.9, which
> is closer to the 3.5 seconds with stats_command_string off.

Excellent work, port specific or not.

Best Regards, Simon Riggs


Re: Stats collector performance improvement

From
Bruce Momjian
Date:
Bruce Momjian wrote:
> Tom Lane wrote:
> A second improvement I discovered is that the statistics collector is
> calling gettimeofday() for every packet received, so it can determine
> the timeout for the select() call to write the flat file.  I removed
> that behavior and instead used setitimer() to issue a SIGINT every
> 500ms, which was the original behavior.  This eliminates the
> gettimeofday() call and makes the code cleaner.  Second patch attached.

I have applied this second patch, with a few small stylistic
improvements.

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
Index: src/backend/postmaster/pgstat.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v
retrieving revision 1.116
diff -c -c -r1.116 pgstat.c
*** src/backend/postmaster/pgstat.c    2 Jan 2006 00:58:00 -0000    1.116
--- src/backend/postmaster/pgstat.c    3 Jan 2006 16:26:04 -0000
***************
*** 117,123 ****

  static long pgStatNumMessages = 0;

! static bool pgStatRunningInCollector = FALSE;

  /*
   * Place where backends store per-table info to be sent to the collector.
--- 117,123 ----

  static long pgStatNumMessages = 0;

! static bool pgStatRunningInCollector = false;

  /*
   * Place where backends store per-table info to be sent to the collector.
***************
*** 145,150 ****
--- 145,151 ----
  static PgStat_StatBeEntry *pgStatBeTable = NULL;
  static int    pgStatNumBackends = 0;

+ static volatile bool    need_statwrite;

  /* ----------
   * Local function forward declarations
***************
*** 164,169 ****
--- 165,171 ----

  NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
  NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
+ static void force_statwrite(SIGNAL_ARGS);
  static void pgstat_recvbuffer(void);
  static void pgstat_exit(SIGNAL_ARGS);
  static void pgstat_die(SIGNAL_ARGS);
***************
*** 1548,1560 ****
      PgStat_Msg    msg;
      fd_set        rfds;
      int            readPipe;
-     int            nready;
      int            len = 0;
!     struct timeval timeout;
!     struct timeval next_statwrite;
!     bool        need_statwrite;
      HASHCTL        hash_ctl;
!
      MyProcPid = getpid();        /* reset MyProcPid */

      /*
--- 1550,1560 ----
      PgStat_Msg    msg;
      fd_set        rfds;
      int            readPipe;
      int            len = 0;
!     struct itimerval timeval;
      HASHCTL        hash_ctl;
!     bool        need_timer = false;
!
      MyProcPid = getpid();        /* reset MyProcPid */

      /*
***************
*** 1572,1578 ****
      /* kluge to allow buffer process to kill collector; FIXME */
      pqsignal(SIGQUIT, pgstat_exit);
  #endif
!     pqsignal(SIGALRM, SIG_IGN);
      pqsignal(SIGPIPE, SIG_IGN);
      pqsignal(SIGUSR1, SIG_IGN);
      pqsignal(SIGUSR2, SIG_IGN);
--- 1572,1578 ----
      /* kluge to allow buffer process to kill collector; FIXME */
      pqsignal(SIGQUIT, pgstat_exit);
  #endif
!     pqsignal(SIGALRM, force_statwrite);
      pqsignal(SIGPIPE, SIG_IGN);
      pqsignal(SIGUSR1, SIG_IGN);
      pqsignal(SIGUSR2, SIG_IGN);
***************
*** 1597,1613 ****
      init_ps_display("stats collector process", "", "");
      set_ps_display("");

!     /*
!      * Arrange to write the initial status file right away
!      */
!     gettimeofday(&next_statwrite, NULL);
!     need_statwrite = TRUE;

      /*
       * Read in an existing statistics stats file or initialize the stats to
       * zero.
       */
!     pgStatRunningInCollector = TRUE;
      pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);

      /*
--- 1597,1613 ----
      init_ps_display("stats collector process", "", "");
      set_ps_display("");

!     need_statwrite = true;
!
!     MemSet(&timeval, 0, sizeof(struct itimerval));
!     timeval.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
!     timeval.it_value.tv_usec = PGSTAT_STAT_INTERVAL % 1000;

      /*
       * Read in an existing statistics stats file or initialize the stats to
       * zero.
       */
!     pgStatRunningInCollector = true;
      pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);

      /*
***************
*** 1634,1667 ****
       */
      for (;;)
      {
-         /*
-          * If we need to write the status file again (there have been changes
-          * in the statistics since we wrote it last) calculate the timeout
-          * until we have to do so.
-          */
          if (need_statwrite)
          {
!             struct timeval now;
!
!             gettimeofday(&now, NULL);
!             /* avoid assuming that tv_sec is signed */
!             if (now.tv_sec > next_statwrite.tv_sec ||
!                 (now.tv_sec == next_statwrite.tv_sec &&
!                  now.tv_usec >= next_statwrite.tv_usec))
!             {
!                 timeout.tv_sec = 0;
!                 timeout.tv_usec = 0;
!             }
!             else
!             {
!                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
!                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
!                 if (timeout.tv_usec < 0)
!                 {
!                     timeout.tv_sec--;
!                     timeout.tv_usec += 1000000;
!                 }
!             }
          }

          /*
--- 1634,1644 ----
       */
      for (;;)
      {
          if (need_statwrite)
          {
!             pgstat_write_statsfile();
!             need_statwrite = false;
!             need_timer = true;
          }

          /*
***************
*** 1673,1681 ****
          /*
           * Now wait for something to do.
           */
!         nready = select(readPipe + 1, &rfds, NULL, NULL,
!                         (need_statwrite) ? &timeout : NULL);
!         if (nready < 0)
          {
              if (errno == EINTR)
                  continue;
--- 1650,1656 ----
          /*
           * Now wait for something to do.
           */
!         if (select(readPipe + 1, &rfds, NULL, NULL, NULL) < 0)
          {
              if (errno == EINTR)
                  continue;
***************
*** 1685,1702 ****
          }

          /*
-          * If there are no descriptors ready, our timeout for writing the
-          * stats file happened.
-          */
-         if (nready == 0)
-         {
-             pgstat_write_statsfile();
-             need_statwrite = FALSE;
-
-             continue;
-         }
-
-         /*
           * Check if there is a new statistics message to collect.
           */
          if (FD_ISSET(readPipe, &rfds))
--- 1660,1665 ----
***************
*** 1813,1829 ****
               */
              pgStatNumMessages++;

!             /*
!              * If this is the first message after we wrote the stats file the
!              * last time, setup the timeout that it'd be written.
!              */
!             if (!need_statwrite)
              {
!                 gettimeofday(&next_statwrite, NULL);
!                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
!                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
!                 next_statwrite.tv_usec %= 1000000;
!                 need_statwrite = TRUE;
              }
          }

--- 1776,1787 ----
               */
              pgStatNumMessages++;

!             if (need_timer)
              {
!                 if (setitimer(ITIMER_REAL, &timeval, NULL))
!                     ereport(ERROR,
!                           (errmsg("unable to set statistics collector timer: %m")));
!                 need_timer = false;
              }
          }

***************
*** 1848,1853 ****
--- 1806,1818 ----
  }


+ static void
+ force_statwrite(SIGNAL_ARGS)
+ {
+     need_statwrite = true;
+ }
+
+
  /* ----------
   * pgstat_recvbuffer() -
   *
***************
*** 1865,1871 ****
      struct timeval timeout;
      int            writePipe = pgStatPipe[1];
      int            maxfd;
-     int            nready;
      int            len;
      int            xfr;
      int            frm;
--- 1830,1835 ----
***************
*** 1907,1912 ****
--- 1871,1884 ----
      msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);

      /*
+      * Wait for some work to do; but not for more than 10 seconds. (This
+      * determines how quickly we will shut down after an ungraceful
+      * postmaster termination; so it needn't be very fast.)
+      */
+     timeout.tv_sec = 10;
+     timeout.tv_usec = 0;
+
+     /*
       * Loop forever
       */
      for (;;)
***************
*** 1946,1961 ****
                  maxfd = writePipe;
          }

!         /*
!          * Wait for some work to do; but not for more than 10 seconds. (This
!          * determines how quickly we will shut down after an ungraceful
!          * postmaster termination; so it needn't be very fast.)
!          */
!         timeout.tv_sec = 10;
!         timeout.tv_usec = 0;
!
!         nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
!         if (nready < 0)
          {
              if (errno == EINTR)
                  continue;
--- 1918,1924 ----
                  maxfd = writePipe;
          }

!         if (select(maxfd + 1, &rfds, &wfds, NULL, &timeout) < 0)
          {
              if (errno == EINTR)
                  continue;

Re: Stats collector performance improvement

From
Bruce Momjian
Date:
Bruce Momjian wrote:
> I did some research on this because the numbers Tom quotes indicate there
> is something wrong in the way we process stats_command_string
> statistics.
>
...
> This sends 10,000 "SELECT 1" queries to the backend, and reports the
> execution time.  I found that without stats_command_string defined, it
> ran in 3.5 seconds.  With stats_command_string defined, it took 5.5
> seconds, meaning the command string is causing a 57% slowdown.  That is
> way too much considering that the SELECT 1 has to be send from psql to
> the backend, parsed, optimized, and executed, and the result returned to
> the psql, while stats_command_string only has to send a string to a
> backend collector.  There is _no_ way that collector should take 57% of
> the time it takes to run the actual query.

I have updated information on this performance issue.  It seems it is
the blocking activity of recv() that is slowing down the buffer process
and hence the backends.  Basically, I found if I use select() or recv()
to block until data arrives, I see the huge performance loss reported
above.  If I loop over the recv() call in non-blocking mode, I see
almost no performance hit from stats_command_string (no backend
slowdown), but of course that consumes all the CPU (bad).  What I found
worked perfectly was to do a non-blocking recv(), and if no data was
returned, change the socket to blocking mode and loop back over the
recv().  This allowed for no performance loss, and prevented infinite
looping over the recv() call.

My theory is that the kernel blocking logic of select() or recv() is
somehow locking up the socket for a small amount of time, therefore
slowing down the backend.  With the on/off blocking, the packets arrive
in groups, we get a few packets then block when nothing is available.

The test program:

    TMPFILE=/tmp/pgstat.sql
    export TMPFILE

    if [ ! -f $TMPFILE ]
    then    i=0
        while [ $i -lt 10000 ]
        do
            i=`expr $i + 1`
            echo "SELECT 1;"
        done > $TMPFILE
    fi

    time psql test < $TMPFILE >/dev/null

is basically sending 30k packets of roughly 26 bytes each, or roughly
800k in 3.5 seconds, meaning there is a packet every 0.0001 seconds.  I
wouldn't have thought that was too much volume for a dual Xeon BSD
machine, but it seems it might be.  Tom seeing 44% slowdown from pgbench
means Linux might have an issue too.

Two patches are attached.  The first patch shows the use of the on/off
blocking method to have almost zero overhead for reading from the
socket.  (The packets are discarded.)  The second patch removes the
buffer process entirely and uses the on/off buffering to process the
incoming packets.  I tried running two test scripts simultaneously and
saw almost no packet loss.  Also keep in mind we are writing the stat
file twice a second, which might need to be pushed into a separate
process.

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
Index: src/backend/postmaster/pgstat.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v
retrieving revision 1.118
diff -c -c -r1.118 pgstat.c
*** src/backend/postmaster/pgstat.c    3 Jan 2006 19:54:08 -0000    1.118
--- src/backend/postmaster/pgstat.c    4 Jan 2006 23:22:44 -0000
***************
*** 1839,1845 ****
      int            msg_recv = 0;    /* next receive index */
      int            msg_have = 0;    /* number of bytes stored */
      bool        overflow = false;
!
      /*
       * Identify myself via ps
       */
--- 1839,1847 ----
      int            msg_recv = 0;    /* next receive index */
      int            msg_have = 0;    /* number of bytes stored */
      bool        overflow = false;
!     bool        is_block_mode = false;
!     int            cnt = 0, bloops = 0, nbloops = 0;
!
      /*
       * Identify myself via ps
       */
***************
*** 1870,1875 ****
--- 1872,1921 ----
       */
      msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);

+
+     while (1)
+     {
+ #if 0
+           FD_ZERO(&rfds);
+           FD_ZERO(&wfds);
+           maxfd = -1;
+               FD_SET(pgStatSock, &rfds);
+               maxfd = pgStatSock;
+
+           timeout.tv_sec = 0;
+           timeout.tv_usec = 0;
+
+           select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
+ #endif
+
+               if (is_block_mode)
+                   bloops++;
+               else
+                   nbloops++;
+
+               len = recv(pgStatSock, (char *) &input_buffer,
+                          sizeof(PgStat_Msg), 0);
+               if (len > 0)
+                   cnt += len;
+
+ //fprintf(stderr, "len = %d, errno = %d\n", len, errno);
+
+               if (len > 0 && is_block_mode)
+               {
+                   pg_set_noblock(pgStatSock);
+                   is_block_mode = false;
+               }
+               else if (len < 0 && errno == EAGAIN && !is_block_mode)
+               {
+                   pg_set_block(pgStatSock);
+                   is_block_mode = true;
+               }
+ //              if ((bloops + nbloops)  % 1000 == 0)
+ //                  fprintf(stderr, "cnt = %d, len = %d, bloops = %d, nbloops = %d\n", cnt, len, bloops, nbloops);
+     }
+
+     exit(1);
+
      /*
       * Loop forever
       */
Index: src/backend/postmaster/pgstat.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v
retrieving revision 1.118
diff -c -c -r1.118 pgstat.c
*** src/backend/postmaster/pgstat.c    3 Jan 2006 19:54:08 -0000    1.118
--- src/backend/postmaster/pgstat.c    4 Jan 2006 23:06:26 -0000
***************
*** 109,117 ****
   * ----------
   */
  NON_EXEC_STATIC int pgStatSock = -1;
- NON_EXEC_STATIC int pgStatPipe[2] = {-1, -1};
  static struct sockaddr_storage pgStatAddr;
- static pid_t pgStatCollectorPid = 0;

  static time_t last_pgstat_start_time;

--- 109,115 ----
***************
*** 166,172 ****
  NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
  NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
  static void force_statwrite(SIGNAL_ARGS);
- static void pgstat_recvbuffer(void);
  static void pgstat_exit(SIGNAL_ARGS);
  static void pgstat_die(SIGNAL_ARGS);
  static void pgstat_beshutdown_hook(int code, Datum arg);
--- 164,169 ----
***************
*** 1491,1536 ****
      pgstat_parseArgs(argc, argv);
  #endif

-     /*
-      * Start a buffering process to read from the socket, so we have a little
-      * more time to process incoming messages.
-      *
-      * NOTE: the process structure is: postmaster is parent of buffer process
-      * is parent of collector process.    This way, the buffer can detect
-      * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
-      * collector failure until it tried to write on the pipe.  That would mean
-      * that after the postmaster started a new collector, we'd have two buffer
-      * processes competing to read from the UDP socket --- not good.
-      */
-     if (pgpipe(pgStatPipe) < 0)
-         ereport(ERROR,
-                 (errcode_for_socket_access(),
-                  errmsg("could not create pipe for statistics buffer: %m")));
-
      /* child becomes collector process */
! #ifdef EXEC_BACKEND
!     pgStatCollectorPid = pgstat_forkexec(STAT_PROC_COLLECTOR);
! #else
!     pgStatCollectorPid = fork();
! #endif
!     switch (pgStatCollectorPid)
!     {
!         case -1:
!             ereport(ERROR,
!                     (errmsg("could not fork statistics collector: %m")));
!
! #ifndef EXEC_BACKEND
!         case 0:
!             /* child becomes collector process */
!             PgstatCollectorMain(0, NULL);
!             break;
! #endif
!
!         default:
!             /* parent becomes buffer process */
!             closesocket(pgStatPipe[0]);
!             pgstat_recvbuffer();
!     }
      exit(0);
  }

--- 1488,1495 ----
      pgstat_parseArgs(argc, argv);
  #endif

      /* child becomes collector process */
!     PgstatCollectorMain(0, NULL);
      exit(0);
  }

***************
*** 1548,1559 ****
  PgstatCollectorMain(int argc, char *argv[])
  {
      PgStat_Msg    msg;
-     fd_set        rfds;
-     int            readPipe;
-     int            len = 0;
-     struct itimerval timeval;
      HASHCTL        hash_ctl;
      bool        need_timer = false;

      MyProcPid = getpid();        /* reset MyProcPid */

--- 1507,1517 ----
  PgstatCollectorMain(int argc, char *argv[])
  {
      PgStat_Msg    msg;
      HASHCTL        hash_ctl;
      bool        need_timer = false;
+     struct itimerval timeval;
+     bool        is_block_mode = false;
+     int            loops = 0;

      MyProcPid = getpid();        /* reset MyProcPid */

***************
*** 1587,1596 ****
      pgstat_parseArgs(argc, argv);
  #endif

-     /* Close unwanted files */
-     closesocket(pgStatPipe[1]);
-     closesocket(pgStatSock);
-
      /*
       * Identify myself via ps
       */
--- 1545,1550 ----
***************
*** 1626,1791 ****
      pgStatBeTable = (PgStat_StatBeEntry *)
          palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends);

-     readPipe = pgStatPipe[0];
-
      /*
       * Process incoming messages and handle all the reporting stuff until
       * there are no more messages.
       */
      for (;;)
      {
          if (need_statwrite)
          {
!             pgstat_write_statsfile();
              need_statwrite = false;
              need_timer = true;
          }

!         /*
!          * Setup the descriptor set for select(2)
!          */
!         FD_ZERO(&rfds);
!         FD_SET(readPipe, &rfds);
!
!         /*
!          * Now wait for something to do.
!          */
!         if (select(readPipe + 1, &rfds, NULL, NULL, NULL) < 0)
          {
!             if (errno == EINTR)
!                 continue;
!             ereport(ERROR,
!                     (errcode_for_socket_access(),
!                      errmsg("select() failed in statistics collector: %m")));
          }

!         /*
!          * Check if there is a new statistics message to collect.
!          */
!         if (FD_ISSET(readPipe, &rfds))
!         {
!             /*
!              * We may need to issue multiple read calls in case the buffer
!              * process didn't write the message in a single write, which is
!              * possible since it dumps its buffer bytewise. In any case, we'd
!              * need two reads since we don't know the message length
!              * initially.
!              */
!             int            nread = 0;
!             int            targetlen = sizeof(PgStat_MsgHdr);        /* initial */
!             bool        pipeEOF = false;

!             while (nread < targetlen)
              {
!                 len = piperead(readPipe, ((char *) &msg) + nread,
!                                targetlen - nread);
!                 if (len < 0)
!                 {
!                     if (errno == EINTR)
!                         continue;
!                     ereport(ERROR,
!                             (errcode_for_socket_access(),
!                              errmsg("could not read from statistics collector pipe: %m")));
!                 }
!                 if (len == 0)    /* EOF on the pipe! */
                  {
!                     pipeEOF = true;
!                     break;
!                 }
!                 nread += len;
!                 if (nread == sizeof(PgStat_MsgHdr))
!                 {
!                     /* we have the header, compute actual msg length */
!                     targetlen = msg.msg_hdr.m_size;
!                     if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
!                         targetlen > (int) sizeof(msg))
!                     {
!                         /*
!                          * Bogus message length implies that we got out of
!                          * sync with the buffer process somehow. Abort so that
!                          * we can restart both processes.
!                          */
!                         ereport(ERROR,
!                               (errmsg("invalid statistics message length")));
!                     }
                  }
              }
!
!             /*
!              * EOF on the pipe implies that the buffer process exited. Fall
!              * out of outer loop.
!              */
!             if (pipeEOF)
!                 break;
!
!             /*
!              * Distribute the message to the specific function handling it.
!              */
!             switch (msg.msg_hdr.m_type)
              {
!                 case PGSTAT_MTYPE_DUMMY:
!                     break;

!                 case PGSTAT_MTYPE_BESTART:
!                     pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_BETERM:
!                     pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_TABSTAT:
!                     pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_TABPURGE:
!                     pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_ACTIVITY:
!                     pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_DROPDB:
!                     pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_RESETCOUNTER:
!                     pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
!                                              nread);
!                     break;

!                 case PGSTAT_MTYPE_AUTOVAC_START:
!                     pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_VACUUM:
!                     pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread);
!                     break;

!                 case PGSTAT_MTYPE_ANALYZE:
!                     pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread);
!                     break;

!                 default:
!                     break;
!             }

!             /*
!              * Globally count messages.
!              */
!             pgStatNumMessages++;

!             if (need_timer)
!             {
!                 if (setitimer(ITIMER_REAL, &timeval, NULL))
!                     ereport(ERROR,
!                           (errmsg("unable to set statistics collector timer: %m")));
!                 need_timer = false;
!             }
          }

          /*
           * Note that we do NOT check for postmaster exit inside the loop; only
           * EOF on the buffer pipe causes us to fall out.  This ensures we
           * don't exit prematurely if there are still a few messages in the
--- 1580,1704 ----
      pgStatBeTable = (PgStat_StatBeEntry *)
          palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends);

      /*
       * Process incoming messages and handle all the reporting stuff until
       * there are no more messages.
       */
      for (;;)
      {
+         int nread;
+
          if (need_statwrite)
          {
!             //pgstat_write_statsfile();
              need_statwrite = false;
              need_timer = true;
          }

!         if (need_timer)
          {
!             if (setitimer(ITIMER_REAL, &timeval, NULL))
!                 ereport(ERROR,
!                       (errmsg("unable to set statistics collector timer: %m")));
!             need_timer = false;
          }

!         nread = recv(pgStatSock, (char *) &msg,
!                    sizeof(PgStat_Msg), 0);

!         if (nread > 0 && is_block_mode)    /* got data */
!         {
!             pg_set_noblock(pgStatSock);
!             is_block_mode = false;
!         }
!         else if (nread < 0)
!         {
!             if (errno == EAGAIN)
              {
!                 if (!is_block_mode)
                  {
!                     /* no data, block mode */
!                     pg_set_block(pgStatSock);
!                     is_block_mode = true;
                  }
+                 continue;
              }
!             else if (errno == EINTR)
              {
!                 if (!PostmasterIsAlive(true))
!                     ereport(ERROR,
!                             (errmsg("stats collector exited: %m")));
!                 continue;
!             }
!             else
!                 ereport(ERROR,
!                         (errmsg("stats collector exited: %m")));
!         }

! //fprintf(stderr, "nread = %d, type = %d\n", nread, msg.msg_hdr.m_type);
! if (++loops % 1000 == 0)
!     fprintf(stderr, "loops = %d\n", loops);

!         /*
!          * Distribute the message to the specific function handling it.
!          */
!         switch (msg.msg_hdr.m_type)
!         {
!             case PGSTAT_MTYPE_DUMMY:
!                 break;

!             case PGSTAT_MTYPE_BESTART:
!                 pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_BETERM:
!                 pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_TABSTAT:
!                 pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_TABPURGE:
!                 pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_ACTIVITY:
!                 pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_DROPDB:
!                 pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_RESETCOUNTER:
!                 pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
!                                          nread);
!                 break;

!             case PGSTAT_MTYPE_AUTOVAC_START:
!                 pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_VACUUM:
!                 pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread);
!                 break;

!             case PGSTAT_MTYPE_ANALYZE:
!                 pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread);
!                 break;

!             default:
!                 break;
          }

          /*
+          * Globally count messages.
+          */
+         pgStatNumMessages++;
+
+
+         /*
           * Note that we do NOT check for postmaster exit inside the loop; only
           * EOF on the buffer pipe causes us to fall out.  This ensures we
           * don't exit prematurely if there are still a few messages in the
***************
*** 1813,2032 ****
  }


- /* ----------
-  * pgstat_recvbuffer() -
-  *
-  *    This is the body of the separate buffering process. Its only
-  *    purpose is to receive messages from the UDP socket as fast as
-  *    possible and forward them over a pipe into the collector itself.
-  *    If the collector is slow to absorb messages, they are buffered here.
-  * ----------
-  */
- static void
- pgstat_recvbuffer(void)
- {
-     fd_set        rfds;
-     fd_set        wfds;
-     struct timeval timeout;
-     int            writePipe = pgStatPipe[1];
-     int            maxfd;
-     int            len;
-     int            xfr;
-     int            frm;
-     PgStat_Msg    input_buffer;
-     char       *msgbuffer;
-     int            msg_send = 0;    /* next send index in buffer */
-     int            msg_recv = 0;    /* next receive index */
-     int            msg_have = 0;    /* number of bytes stored */
-     bool        overflow = false;
-
-     /*
-      * Identify myself via ps
-      */
-     init_ps_display("stats buffer process", "", "");
-     set_ps_display("");
-
-     /*
-      * We want to die if our child collector process does.    There are two ways
-      * we might notice that it has died: receive SIGCHLD, or get a write
-      * failure on the pipe leading to the child.  We can set SIGPIPE to kill
-      * us here.  Our SIGCHLD handler was already set up before we forked (must
-      * do it that way, else it's a race condition).
-      */
-     pqsignal(SIGPIPE, SIG_DFL);
-     PG_SETMASK(&UnBlockSig);
-
-     /*
-      * Set the write pipe to nonblock mode, so that we cannot block when the
-      * collector falls behind.
-      */
-     if (!pg_set_noblock(writePipe))
-         ereport(ERROR,
-                 (errcode_for_socket_access(),
-                  errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
-
-     /*
-      * Allocate the message buffer
-      */
-     msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);
-
-     /*
-      * Loop forever
-      */
-     for (;;)
-     {
-         FD_ZERO(&rfds);
-         FD_ZERO(&wfds);
-         maxfd = -1;
-
-         /*
-          * As long as we have buffer space we add the socket to the read
-          * descriptor set.
-          */
-         if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
-         {
-             FD_SET(pgStatSock, &rfds);
-             maxfd = pgStatSock;
-             overflow = false;
-         }
-         else
-         {
-             if (!overflow)
-             {
-                 ereport(LOG,
-                         (errmsg("statistics buffer is full")));
-                 overflow = true;
-             }
-         }
-
-         /*
-          * If we have messages to write out, we add the pipe to the write
-          * descriptor set.
-          */
-         if (msg_have > 0)
-         {
-             FD_SET(writePipe, &wfds);
-             if (writePipe > maxfd)
-                 maxfd = writePipe;
-         }
-
-         /*
-          * Wait for some work to do; but not for more than 10 seconds. (This
-          * determines how quickly we will shut down after an ungraceful
-          * postmaster termination; so it needn't be very fast.)  struct timeout
-          * is modified by some operating systems.
-          */
-         timeout.tv_sec = 10;
-         timeout.tv_usec = 0;
-
-         if (select(maxfd + 1, &rfds, &wfds, NULL, &timeout) < 0)
-         {
-             if (errno == EINTR)
-                 continue;
-             ereport(ERROR,
-                     (errcode_for_socket_access(),
-                      errmsg("select() failed in statistics buffer: %m")));
-         }
-
-         /*
-          * If there is a message on the socket, read it and check for
-          * validity.
-          */
-         if (FD_ISSET(pgStatSock, &rfds))
-         {
-             len = recv(pgStatSock, (char *) &input_buffer,
-                        sizeof(PgStat_Msg), 0);
-             if (len < 0)
-                 ereport(ERROR,
-                         (errcode_for_socket_access(),
-                          errmsg("could not read statistics message: %m")));
-
-             /*
-              * We ignore messages that are smaller than our common header
-              */
-             if (len < sizeof(PgStat_MsgHdr))
-                 continue;
-
-             /*
-              * The received length must match the length in the header
-              */
-             if (input_buffer.msg_hdr.m_size != len)
-                 continue;
-
-             /*
-              * O.K. - we accept this message.  Copy it to the circular
-              * msgbuffer.
-              */
-             frm = 0;
-             while (len > 0)
-             {
-                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
-                 if (xfr > len)
-                     xfr = len;
-                 Assert(xfr > 0);
-                 memcpy(msgbuffer + msg_recv,
-                        ((char *) &input_buffer) + frm,
-                        xfr);
-                 msg_recv += xfr;
-                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
-                     msg_recv = 0;
-                 msg_have += xfr;
-                 frm += xfr;
-                 len -= xfr;
-             }
-         }
-
-         /*
-          * If the collector is ready to receive, write some data into his
-          * pipe.  We may or may not be able to write all that we have.
-          *
-          * NOTE: if what we have is less than PIPE_BUF bytes but more than the
-          * space available in the pipe buffer, most kernels will refuse to
-          * write any of it, and will return EAGAIN.  This means we will
-          * busy-loop until the situation changes (either because the collector
-          * caught up, or because more data arrives so that we have more than
-          * PIPE_BUF bytes buffered).  This is not good, but is there any way
-          * around it?  We have no way to tell when the collector has caught
-          * up...
-          */
-         if (FD_ISSET(writePipe, &wfds))
-         {
-             xfr = PGSTAT_RECVBUFFERSZ - msg_send;
-             if (xfr > msg_have)
-                 xfr = msg_have;
-             Assert(xfr > 0);
-             len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
-             if (len < 0)
-             {
-                 if (errno == EINTR || errno == EAGAIN)
-                     continue;    /* not enough space in pipe */
-                 ereport(ERROR,
-                         (errcode_for_socket_access(),
-                 errmsg("could not write to statistics collector pipe: %m")));
-             }
-             /* NB: len < xfr is okay */
-             msg_send += len;
-             if (msg_send == PGSTAT_RECVBUFFERSZ)
-                 msg_send = 0;
-             msg_have -= len;
-         }
-
-         /*
-          * Make sure we forwarded all messages before we check for postmaster
-          * termination.
-          */
-         if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
-             continue;
-
-         /*
-          * If the postmaster has terminated, we die too.  (This is no longer
-          * the normal exit path, however.)
-          */
-         if (!PostmasterIsAlive(true))
-             exit(0);
-     }
- }
-
  /* SIGQUIT signal handler for buffer process */
  static void
  pgstat_exit(SIGNAL_ARGS)
--- 1726,1731 ----
***************
*** 2049,2054 ****
--- 1748,1754 ----
      exit(0);
  }

+
  /* SIGCHLD signal handler for buffer process */
  static void
  pgstat_die(SIGNAL_ARGS)