Re: Streaming Replication patch for CommitFest 2009-09 - Mailing list pgsql-hackers

From Heikki Linnakangas
Subject Re: Streaming Replication patch for CommitFest 2009-09
Date
Msg-id 4AB73099.1060202@enterprisedb.com
Whole thread Raw
In response to Re: Streaming Replication patch for CommitFest 2009-09  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Responses Re: Streaming Replication patch for CommitFest 2009-09
Re: Streaming Replication patch for CommitFest 2009-09
Re: Streaming Replication patch for CommitFest 2009-09
List pgsql-hackers
Having gone through the patch now in more detail, I think it's in pretty
good shape. I'm happy with the overall design, except that I haven't
been able to make up my mind if walreceiver should indeed be a
stand-alone program as discussed, or a postmaster child process as in
the patch you submitted. Putting that question aside for a moment,
here's some minor things, in no particular order:

- The async API in PQgetXLogData is quite different from the other
commands. It's close to the API from PQgetCopyData(), but doesn't return
a malloc'd buffer like PQgetCopyData does. I presume that's to optimize
away the extra memcpy step? I don't think that's really necessary, I
don't recall any complaints about that in PQgetCopyData(), and if it
does become an issue, it could be optimized away by mallocing the buffer
first and reading directly to that.

- Can we avoid sprinkling XLogStreamingAllowed() calls to places where
we check if WAL-logging is required (nbtsort.c, copy.c etc.). I think we
need a new macro to encapsulate (XLogArchivingActive() ||
XLogStreamingAllowed()).

- Is O_DIRECT ever a good idea in walreceiver? If it's really direct and
doesn't get cached, the startup process will need to read from disk.

- Can we replace read/write_conninfo with just a long-enough field in
shared mem? Would be simpler. (this is moot if we go with the
stand-alone walreceiver program and pass it as a command-line argument)

- walreceiver shouldn't die on connection error, just to be restarted by
startup process. Can we add error handling a la bgwriter and have a
retry loop within walreceiver? (again, if we go with a stand-alone
walreceiver program, it's probably better to have startup process
responsible to restart walreceiver, as it is now)

- pq_wait in backend waits until you can read or write at least 1 byte.
There is no guarantee that you can send or read the whole message
without blocking. We'd have to put the socket in non-blocking mode for
that. I'm not sure what the implications of this are.

- we should include system_identifier somewhere in the replication
startup handshake. Otherwise you can connect to server from a different
system and have logs shipped, if they happen to be roughly at the same
point in WAL. Replay will almost certainly fail, but we should error
earlier.

- I know I said we should have just asynchronous replication at first,
but looking ahead, how would you do synchronous? What kind of signaling
is needed between walreceiver and startup process for that?

- 'replication' shouldn't be a real database.


I found the paging logic in walsender confusing, and didn't like the
idea that walsender needs to set the XLOGSTREAM_END_SEG flag. Surely
walreceiver knows how to split the WAL into files without such a flag. I
reworked that logic, I think it's easier to understand now. I kept the
support for the flag in libpq and the protocol for now, but it should be
removed too, or repurposed to indicate that pg_switch_xlog() was done in
the master. I've pushed that to 'replication-orig' branch in my git
repository, attached is the same as a diff against your SR_0914.patch.

I need a break from this patch, so I'll take a closer look at Simon's
hot standby now. Meanwhile, can you work on the above items and submit a
new version, please?

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/transam/recovery.conf.sample
--- b/src/backend/access/transam/recovery.conf.sample
***************
*** 2,10 ****
  # PostgreSQL recovery config file
  # -------------------------------
  #
! # Edit this file to provide the parameters that PostgreSQL
! # needs to perform an archive recovery of a database, or
! # a log-streaming replication.
  #
  # If "recovery.conf" is present in the PostgreSQL data directory, it is
  # read on postmaster startup.  After successful recovery, it is renamed
--- 2,10 ----
  # PostgreSQL recovery config file
  # -------------------------------
  #
! # Edit this file to provide the parameters that PostgreSQL needs to
! # perform an archive recovery of a database, or to act as a log-streaming
! # replication standby.
  #
  # If "recovery.conf" is present in the PostgreSQL data directory, it is
  # read on postmaster startup.  After successful recovery, it is renamed
***************
*** 83,89 ****
  #---------------------------------------------------------------------------
  #
  # When standby_mode is enabled, the PostgreSQL server will work as
! # the standby. It tries to connect to the primary according to the
  # connection settings primary_conninfo, and receives XLOG records
  # continuously.
  #
--- 83,89 ----
  #---------------------------------------------------------------------------
  #
  # When standby_mode is enabled, the PostgreSQL server will work as
! # a standby. It tries to connect to the primary according to the
  # connection settings primary_conninfo, and receives XLOG records
  # continuously.
  #
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2645,2653 **** XLogFileClose(void)
       * WAL segment files will not be re-read in normal operation, so we advise
       * the OS to release any cached pages.    But do not do so if WAL archiving
       * or streaming is active, because archiver and walsender process could use
!      * the cache to read the WAL segment, respectively.  Also, don't bother
!      * with it if we are using O_DIRECT, since the kernel is presumably not
!      * caching in that case.
       */
  #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
      if (!XLogArchivingActive() && !WalSndInProgress() &&
--- 2645,2653 ----
       * WAL segment files will not be re-read in normal operation, so we advise
       * the OS to release any cached pages.    But do not do so if WAL archiving
       * or streaming is active, because archiver and walsender process could use
!      * the cache to read the WAL segment.  Also, don't bother with it if we
!      * are using O_DIRECT, since the kernel is presumably not caching in that
!      * case.
       */
  #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
      if (!XLogArchivingActive() && !WalSndInProgress() &&
***************
*** 3481,3487 **** FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
                          startlsn.xlogid, startlsn.xrecoff)));
      }

!     return ReadRecord(RecPtr, emode);
  }

  /*
--- 3481,3487 ----
                          startlsn.xlogid, startlsn.xrecoff)));
      }

!     return ReadRecord(RecPtr, emode);
  }

  /*
***************
*** 5284,5290 **** exitStreamingRecovery(void)
       */
      ShutdownWalRcv();

!     /* We are no longer in streaming recovery state */
      InStreamingRecovery = false;

      ereport(LOG,
--- 5284,5290 ----
       */
      ShutdownWalRcv();

!     /* We are no longer in streaming recovery state */
      InStreamingRecovery = false;

      ereport(LOG,
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 289,295 **** typedef enum
      PM_WAIT_BACKENDS,            /* waiting for live backends to exit */
      PM_SHUTDOWN,                /* waiting for bgwriter to do shutdown ckpt */
      PM_SHUTDOWN_2,                /* waiting for archiver to finish */
-     PM_SHUTDOWN_3,                /* waiting for walsenders to finish */
      PM_WAIT_DEAD_END,            /* waiting for dead_end children to exit */
      PM_NO_CHILDREN                /* all important children have exited */
  } PMState;
--- 289,294 ----
***************
*** 1640,1646 **** retry1:
      if (proto == XLOG_STREAMING_CODE && !am_walsender)
      {
          am_walsender = true;
!         /* No packets other than regular one should not follow */
          return ProcessStartupPacket(port, SSLdone);
      }

--- 1639,1645 ----
      if (proto == XLOG_STREAMING_CODE && !am_walsender)
      {
          am_walsender = true;
!         /* No packets other than regular one should follow */
          return ProcessStartupPacket(port, SSLdone);
      }

***************
*** 2404,2420 **** reaper(SIGNAL_ARGS)
                   */
                  Assert(Shutdown > NoShutdown);

!                 if (PgArchPID != 0)
                  {
                      /* Waken archiver for the last time */
!                     signal_child(PgArchPID, SIGUSR2);
!                     pmState = PM_SHUTDOWN_2;
!                 }
!                 else if (WalSndInProgress())
!                 {
                      /* Waken walsenders for the last time */
                      SignalWalSenders(SIGUSR2);
!                     pmState = PM_SHUTDOWN_3;
                  }
                  else
                      pmState = PM_WAIT_DEAD_END;
--- 2403,2418 ----
                   */
                  Assert(Shutdown > NoShutdown);

!                 if (PgArchPID != 0 || WalSndInProgress())
                  {
                      /* Waken archiver for the last time */
!                     if (PgArchPID != 0)
!                         signal_child(PgArchPID, SIGUSR2);
!
                      /* Waken walsenders for the last time */
                      SignalWalSenders(SIGUSR2);
!
!                     pmState = PM_SHUTDOWN_2;
                  }
                  else
                      pmState = PM_WAIT_DEAD_END;
***************
*** 2499,2510 **** reaper(SIGNAL_ARGS)
                   ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
                    WalRcvInProgress())))
                  PgArchPID = pgarch_start();
!             else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress())
!             {
!                 SignalWalSenders(SIGUSR2);
!                 pmState = PM_SHUTDOWN_3;
!             }
!             else
                  pmState = PM_WAIT_DEAD_END;
              continue;
          }
--- 2497,2503 ----
                   ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
                    WalRcvInProgress())))
                  PgArchPID = pgarch_start();
!             else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress())
                  pmState = PM_WAIT_DEAD_END;
              continue;
          }
***************
*** 2611,2618 **** CleanupBackend(int pid,
                   * advance to the next shutdown step.
                   */
                  if (bp->child_type == BACKEND_TYPE_WALSND &&
!                     pmState == PM_SHUTDOWN_3 &&
!                     !WalSndInProgress())
                      pmState = PM_WAIT_DEAD_END;
              }
              DLRemove(curr);
--- 2604,2611 ----
                   * advance to the next shutdown step.
                   */
                  if (bp->child_type == BACKEND_TYPE_WALSND &&
!                     pmState == PM_SHUTDOWN_2 &&
!                     !WalSndInProgress() && PgArchPID == 0)
                      pmState = PM_WAIT_DEAD_END;
              }
              DLRemove(curr);
*** a/src/backend/postmaster/walreceiver.c
--- b/src/backend/postmaster/walreceiver.c
***************
*** 100,108 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS);
  static void WalRcvLoop(void);
  static void    InitWalRcv(void);
  static void    WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
! static void XLogWalRcvFlush(XLogRecPtr recptr);
! static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
  static char *read_conninfo_file(void);

  /* Main entry point for walreceiver process */
--- 100,107 ----
  static void WalRcvLoop(void);
  static void    InitWalRcv(void);
  static void    WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr);
! static void XLogWalRcvFlush(void);
  static char *read_conninfo_file(void);

  /* Main entry point for walreceiver process */
***************
*** 228,235 **** WalRcvLoop(void)
      /* Loop until end-of-streaming or error */
      for (;;)
      {
-         bool    fsynced = false;
-
          /*
           * Emergency bailout if postmaster has died.  This is to avoid the
           * necessity for manual cleanup of all postmaster children.
--- 227,232 ----
***************
*** 298,304 **** WalRcvLoop(void)
           * can recover all transactions from the primary).
           */

!         XLogWalRcvWrite(buf, len, recptr, &fsynced);

          /*
           * The logs in the XLogData message were written successfully,
--- 295,301 ----
           * can recover all transactions from the primary).
           */

!         XLogWalRcvWrite(buf, len, recptr);

          /*
           * The logs in the XLogData message were written successfully,
***************
*** 307,357 **** WalRcvLoop(void)
          PQmarkConsumed(streamConn);

          /*
!          * If fsync is not requested or was already done, we send a "success"
!          * to the primary before issuing fsync for end-of-segment.
           */
!         if (fsynced || !fsync_requested)
!         {
!             if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
!                                 (int) fsynced) == -1)
!                 ereport(FATAL,
!                         (errmsg("could not send a message to the primary: %s",
!                                 PQerrorMessage(streamConn))));
!         }
!
!         /*
!          * If we just wrote the whole last page of a logfile segment but
!          * had not fsynced it yet, fsync the segment immediately.  This
!          * avoids having to go back and re-open prior segments when an
!          * fsync request comes along later.
!          *
!          * Of course, if asked to fsync but not, do so.
!          */
!         if (!fsynced && (fsync_requested || finishing_seg))
!         {
!             XLogWalRcvFlush(recptr);
!
!             if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
!                                 1) == -1)
!                 ereport(FATAL,
!                         (errmsg("could not send a message to the primary: %s",
!                                 PQerrorMessage(streamConn))));
!
!             /*
!              * If the segment is ready to copy to archival storage,
!              * notify the archiver so.
!              */
!             if (finishing_seg && XLogArchivingActive())
!                 XLogArchiveNotifySeg(recvId, recvSeg);
!
!             /*
!              * XXX: Should we signal bgwriter to start a restartpoint
!              * if we've consumed too much xlog since the last one, like
!              * in normal processing? But this is not worth doing unless
!              * a restartpoint can be created independently from a
!              * checkpoint record.
!              */
!         }
      }

      if (len == -1)    /* end-of-streaming */
--- 304,314 ----
          PQmarkConsumed(streamConn);

          /*
!          * If the primary requested us to fsync, do so now and send
!          * and acknowledgement.
           */
!         if (fsync_requested)
!             XLogWalRcvFlush();
      }

      if (len == -1)    /* end-of-streaming */
***************
*** 511,589 **** WalRcvInProgress(void)
   * fsynced is set to true if the log was fsyned by O_DIRECT.
   */
  static void
! XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
  {
      int        startoff;
!     int        endoff;

!     START_CRIT_SECTION();

!     if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
      {
!         bool    use_existent;

!         /*
!          * XLOG segment files will be re-read in recovery operation soon,
!          * so we don't need to advise the OS to release any cache page.
!          */
!         if (recvFile >= 0 && close(recvFile))
              ereport(PANIC,
                      (errcode_for_file_access(),
!                      errmsg("could not close log file %u, segment %u: %m",
!                             recvId, recvSeg)));
!         recvFile = -1;
!
!         /* Create/use new log file */
!         XLByteToPrevSeg(recptr, recvId, recvSeg);
!         use_existent = true;
!         recvFile = XLogFileInit(recvId, recvSeg,
!                                   &use_existent, true);
!         recvOff = 0;
!     }

!     /* Make sure we have the current logfile open */
!     if (recvFile < 0)
!     {
!         XLByteToPrevSeg(recptr, recvId, recvSeg);
!         recvFile = XLogFileOpen(recvId, recvSeg);
!         recvOff = 0;
!     }

!     /* Calculate the start/end file offset of the received logs */
!     endoff = recptr.xrecoff % XLogSegSize;
!     startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;

      /*
!      * Re-zero the page so that bytes beyond what we've written will look
!      * like zeroes and not valid XLOG records. Only end page which we are
!      * writing need to be zeroed. Of course, we can skip zeroing the pages
!      * full of the XLOG records. Save the end position of the already zeroed
!      * area at the variable ZeroedRecPtr, and avoid zeroing the same page
!      * two or more times.
       *
       * This must precede the writing of the actual logs. Otherwise, a crash
!      * before re-zeroing would cause a corrupted page.
       */
!     if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
      {
          int        zlen;

!         zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
!         WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
          ZeroedRecPtr = recptr;
          ZeroedRecPtr.xrecoff += zlen;
-     }

!     /* Write out the logs */
!     WritePhysicalXLog(buf, len, startoff);
!     LogstreamResult.Send    = recptr;
!     LogstreamResult.Write    = recptr;
!
!     if (sync_method == SYNC_METHOD_OPEN ||
!         sync_method == SYNC_METHOD_OPEN_DSYNC)
!     {
!         LogstreamResult.Flush = recptr;
!         *fsynced = true;        /* logs were already fsynced */
      }

      /* Update shared-memory status */
--- 468,623 ----
   * fsynced is set to true if the log was fsyned by O_DIRECT.
   */
  static void
! XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
  {
      int        startoff;
!     int        byteswritten;

!     START_CRIT_SECTION(); /* XXX: Why? */

!     while (nbytes > 0)
      {
!         int        segbytes;
!         uint32    tmp;

!         if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
!         {
!             bool    use_existent;
!
!             /*
!              * XLOG segment files will be re-read in recovery operation soon,
!              * so we don't need to advise the OS to release any cache page.
!              */
!             if (recvFile >= 0)
!             {
!                 /*
!                  * fsync() before we switch to next file. We would otherwise
!                  * have to reopen this file to fsync it later
!                  */
!                 XLogWalRcvFlush();
!                 if (close(recvFile) != 0)
!                     ereport(PANIC,
!                             (errcode_for_file_access(),
!                              errmsg("could not close log file %u, segment %u: %m",
!                                     recvId, recvSeg)));
!             }
!             recvFile = -1;
!
!             /* Create/use new log file */
!             XLByteToSeg(recptr, recvId, recvSeg);
!             use_existent = true;
!             recvFile = XLogFileInit(recvId, recvSeg,
!                                     &use_existent, true);
!             recvOff = 0;
!         }
!
!         /* Calculate the start offset of the received logs */
!         startoff = recptr.xrecoff % XLogSegSize;
!
!         if (startoff + nbytes > XLOG_SEG_SIZE)
!             segbytes = XLOG_SEG_SIZE - startoff;
!         else
!             segbytes = nbytes;
!
!         /* Need to seek in the file? */
!         if (recvOff != startoff)
!         {
!             if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
!                 ereport(PANIC,
!                         (errcode_for_file_access(),
!                          errmsg("could not seek in log file %u, "
!                                 "segment %u to offset %u: %m",
!                                 recvId, recvSeg, startoff)));
!             recvOff = startoff;
!         }
!
!         /* OK to write the logs */
!         errno = 0;
!
!         byteswritten = write(recvFile, buf, segbytes);
!         if (byteswritten <= 0)
!         {
!             /* if write didn't set errno, assume no disk space */
!             if (errno == 0)
!                 errno = ENOSPC;
              ereport(PANIC,
                      (errcode_for_file_access(),
!                      errmsg("could not write to log file %u, segment %u "
!                             "at offset %u, length %lu: %m",
!                             recvId, recvSeg,
!                             recvOff, (unsigned long) segbytes)));
!         }

!         /* Update state for read */
!         tmp = recptr.xrecoff + byteswritten;
!         if (tmp < recptr.xrecoff)
!             recptr.xlogid++; /* overflow */
!         recptr.xrecoff = tmp;

!         recvOff += byteswritten;
!         nbytes -= byteswritten;
!         buf += byteswritten;
!
!         LogstreamResult.Send    = recptr;
!         LogstreamResult.Write    = recptr;
!
!         if (sync_method == SYNC_METHOD_OPEN ||
!             sync_method == SYNC_METHOD_OPEN_DSYNC)
!         {
!             LogstreamResult.Flush = recptr;
!         }
!
!         /*
!          * If the segment is ready to copy to archival storage,
!          * notify the archiver so.
!          */
!         if ((recptr.xrecoff % XLOG_SEG_SIZE == 0) && XLogArchivingActive())
!             XLogArchiveNotifySeg(recvId, recvSeg);
!
!         /*
!          * XXX: Should we signal bgwriter to start a restartpoint
!          * if we've consumed too much xlog since the last one, like
!          * in normal processing? But this is not worth doing unless
!          * a restartpoint can be created independently from a
!          * checkpoint record.
!          */
!     }

      /*
!      * Zero the rest of the last page we wrote to, so that bytes beyond what
!      * we've written will look like zeroes and not valid XLOG records. Save
!      * the end position of the already zeroed area at the variable
!      * ZeroedRecPtr, and avoid zeroing the same page two or more times.
       *
       * This must precede the writing of the actual logs. Otherwise, a crash
!      * before re-zeroing would cause a corrupted page. XXX: that's not really
!      * an issue, a hard crash could leave the page half-flushed anyway. And we
!      * have CRC to protect from that anyway, this zeroing business isn't
!      * absolutely necessary anyway.
       */
!     if (XLByteLT(ZeroedRecPtr, recptr) && recptr.xrecoff % XLOG_BLCKSZ != 0)
      {
          int        zlen;

!         zlen = XLOG_BLCKSZ - recptr.xrecoff % XLOG_BLCKSZ;
!
!         byteswritten = write(recvFile, ZeroedBuffer, zlen);
!         if (byteswritten != zlen)
!         {
!             /* if write didn't set errno, assume no disk space */
!             if (errno == 0)
!                 errno = ENOSPC;
!             ereport(PANIC,
!                     (errcode_for_file_access(),
!                      errmsg("could not write to log file %u, segment %u "
!                             "at offset %u, length %lu: %m",
!                             recvId, recvSeg,
!                             recvOff, (unsigned long) nbytes)));
!         }
          ZeroedRecPtr = recptr;
          ZeroedRecPtr.xrecoff += zlen;

!         recvOff += byteswritten;
      }

      /* Update shared-memory status */
***************
*** 594,600 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
          XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
!         if (*fsynced)
              XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);
      }
--- 628,635 ----
          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
          XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
!         if (sync_method == SYNC_METHOD_OPEN ||
!             sync_method == SYNC_METHOD_OPEN_DSYNC)
              XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);
      }
***************
*** 607,666 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)

  /* Flush the log to disk */
  static void
! XLogWalRcvFlush(XLogRecPtr recptr)
  {
!     START_CRIT_SECTION();
!
!     issue_xlog_fsync(recvFile, recvId, recvSeg);
!
!     LogstreamResult.Flush = recptr;
!
!     /* Update shared-memory status */
      {
          /* use volatile pointer to prevent code rearrangement */
          volatile WalRcvData *walrcv = WalRcv;

          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);
-     }
-
-     END_CRIT_SECTION();
- }

! /* Physical write to the given logs */
! static void
! WritePhysicalXLog(char *from, Size nbytes, int startoff)
! {
!     /* Need to seek in the file? */
!     if (recvOff != startoff)
!     {
!         if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
!             ereport(PANIC,
!                     (errcode_for_file_access(),
!                      errmsg("could not seek in log file %u, "
!                             "segment %u to offset %u: %m",
!                             recvId, recvSeg, startoff)));
!         recvOff = startoff;
!     }

!     /* OK to write the logs */
!     errno = 0;
!     if (write(recvFile, from, nbytes) != nbytes)
!     {
!         /* if write didn't set errno, assume no disk space */
!         if (errno == 0)
!             errno = ENOSPC;
!         ereport(PANIC,
!                 (errcode_for_file_access(),
!                  errmsg("could not write to log file %u, segment %u "
!                         "at offset %u, length %lu: %m",
!                         recvId, recvSeg,
!                         recvOff, (unsigned long) nbytes)));
      }
-
-     /* Update state for write */
-     recvOff += nbytes;
  }

  /*
--- 642,674 ----

  /* Flush the log to disk */
  static void
! XLogWalRcvFlush(void)
  {
!     if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
      {
          /* use volatile pointer to prevent code rearrangement */
          volatile WalRcvData *walrcv = WalRcv;

+         START_CRIT_SECTION();
+
+         issue_xlog_fsync(recvFile, recvId, recvSeg);
+
+         LogstreamResult.Flush = LogstreamResult.Write;
+
+         /* Update shared-memory status */
          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);

!         END_CRIT_SECTION();

!         /* Let the primary know */
!         if (PQputXLogRecPtr(streamConn, LogstreamResult.Flush.xlogid,
!                             LogstreamResult.Flush.xrecoff, 1) == -1)
!             ereport(FATAL,
!                     (errmsg("could not send a message to the primary: %s",
!                             PQerrorMessage(streamConn))));
      }
  }

  /*
*** a/src/backend/postmaster/walsender.c
--- b/src/backend/postmaster/walsender.c
***************
*** 113,122 **** static void WalSndQuickDieHandler(SIGNAL_ARGS);
  static int    WalSndLoop(void);
  static void    InitWalSnd(void);
  static void    WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr);
  static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
  static bool ProcessStreamMsgs(PendingMessage inMsg);

  /* Main entry point for walsender process */
  int
  WalSenderMain(void)
--- 113,127 ----
  static int    WalSndLoop(void);
  static void    InitWalSnd(void);
  static void    WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
  static bool ProcessStreamMsgs(PendingMessage inMsg);

+ /*
+  * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
+  */
+ #define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
+
  /* Main entry point for walsender process */
  int
  WalSenderMain(void)
***************
*** 382,400 **** WalSndKill(int code, Datum arg)
  }

  /*
!  * Read the log into buffer.
!  *
!  * startoff is the file offset where we start reading the log from; nbytes is
!  * the number of bytes which needs to be read; recptr is the last byte + 1 to
!  * read.
   */
  void
! XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
  {
      char path[MAXPGPATH];
!
!     /* Don't cross a segment boundary */
!     Assert(startoff + nbytes <= XLogSegSize);

  #ifdef REPLICATION_DEBUG
      if (REPLICATION_DEBUG_ENABLED)
--- 387,399 ----
  }

  /*
!  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
   */
  void
! XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  {
      char path[MAXPGPATH];
!     uint32 startoff;

  #ifdef REPLICATION_DEBUG
      if (REPLICATION_DEBUG_ENABLED)
***************
*** 404,464 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
               LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
  #endif

!     if (!XLByteInPrevSeg(recptr, sendId, sendSeg))
      {
!         /* Switch to another logfile segment */
!         if (sendFile >= 0)
!             close(sendFile);

!         XLByteToPrevSeg(recptr, sendId, sendSeg);
!         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);

!         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
!         if (sendFile < 0)
!             ereport(FATAL,
!                     (errcode_for_file_access(),
!                      errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                             path, sendId, sendSeg)));
!         sendOff = 0;
!     }

!     /* Make sure we have the current logfile open */
!     if (sendFile < 0)
!     {
!         XLByteToPrevSeg(recptr, sendId, sendSeg);
!         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);

!         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
!         if (sendFile < 0)
!             ereport(FATAL,
!                     (errcode_for_file_access(),
!                      errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                             path, sendId, sendSeg)));
!         sendOff = 0;
!     }

!     /* Need to seek in the file? */
!     if (sendOff != startoff)
!     {
!         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
              ereport(FATAL,
                      (errcode_for_file_access(),
!                      errmsg("could not seek in log file %u, segment %u to offset %u: %m",
!                             sendId, sendSeg, startoff)));
!         sendOff = startoff;
!     }
!
!     if (read(sendFile, buf, nbytes) != nbytes)
!     {
!         ereport(FATAL,
!                 (errcode_for_file_access(),
!                  errmsg("could not read from log file %u, segment %u, offset %u, "
!                         "length %lu: %m",
!                         sendId, sendSeg, sendOff, (unsigned long) nbytes)));
      }
-
-     /* Update state for read */
-     sendOff += nbytes;
  }

  /*
--- 403,469 ----
               LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
  #endif

!     while (nbytes > 0)
      {
!         int segbytes;
!         int readbytes;
!         uint32 tmp;

!         startoff = recptr.xrecoff % XLOG_SEG_SIZE;

!         if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
!         {
!             /* Switch to another logfile segment */
!             if (sendFile >= 0)
!                 close(sendFile);
!
!             XLByteToSeg(recptr, sendId, sendSeg);
!             XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
!
!             sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
!             if (sendFile < 0)
!                 ereport(FATAL, /* XXX: Why FATAL? */
!                         (errcode_for_file_access(),
!                          errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                                 path, sendId, sendSeg)));
!             sendOff = 0;
!         }

!         /* Need to seek in the file? */
!         if (sendOff != startoff)
!         {
!             if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
!                 ereport(FATAL,
!                         (errcode_for_file_access(),
!                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
!                                 sendId, sendSeg, startoff)));
!             sendOff = startoff;
!         }

!         /* How many bytes are within this segment? */
!         if (nbytes > (XLOG_SEG_SIZE - startoff))
!             segbytes = XLOG_SEG_SIZE - startoff;
!         else
!             segbytes = nbytes;

!         readbytes = read(sendFile, buf, segbytes);
!         if (readbytes <= 0)
              ereport(FATAL,
                      (errcode_for_file_access(),
!                      errmsg("could not read from log file %u, segment %u, offset %u, "
!                             "length %lu: %m",
!                             sendId, sendSeg, sendOff, (unsigned long) segbytes)));
!
!         /* Update state for read */
!         tmp = recptr.xrecoff + readbytes;
!         if (tmp < recptr.xrecoff)
!             recptr.xlogid++; /* overflow */
!         recptr.xrecoff = tmp;
!
!         sendOff += readbytes;
!         nbytes -= readbytes;
!         buf += readbytes;
      }
  }

  /*
***************
*** 469,488 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
  static bool
  XLogSend(PendingMessage inMsg, PendingMessage outMsg)
  {
-     bool        ispartialpage;
-     bool        last_iteration;
-     bool        finishing_seg;
-     int            nmsgs;
-     int            npages;
      int            res;
-     uint32        startpos;
-     uint32        startoff;
-     uint32        endpos;
      XLogRecPtr    SendRqstPtr;

      /*
!      * Invalid position means that XLOG streaming is not started yet,
!      * so we do nothing here.
       */
      if (XLogRecPtrIsInvalid(LogstreamResult.Send))
          return true;
--- 474,486 ----
  static bool
  XLogSend(PendingMessage inMsg, PendingMessage outMsg)
  {
      int            res;
      XLogRecPtr    SendRqstPtr;

      /*
!      * Invalid position means that we have not yet received the initial
!      * XLogRecPtr message from the slave that indicates where to start the
!      * streaming.
       */
      if (XLogRecPtrIsInvalid(LogstreamResult.Send))
          return true;
***************
*** 490,495 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
--- 488,497 ----
      /* Attempt to send all the records which were written to the disk */
      SendRqstPtr = GetWriteRecPtr();

+     /* Quick exit if nothing to do */
+     if (!XLByteLT(LogstreamResult.Send, SendRqstPtr))
+         return true;
+
  #ifdef REPLICATION_DEBUG
      if (REPLICATION_DEBUG_ENABLED)
          elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X",
***************
*** 520,631 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
       * sending in the last page. We must initialize all of them to
       * keep the compiler quiet.
       */
-     nmsgs = 0;
-     npages = 0;
-     startpos = 0;
-     startoff = 0;
-     endpos = XLOG_BLCKSZ;

      while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
      {
          /*
!          * Advance LogstreamResult.Send to end of current page. If this
!          * is a first loop iteration (i.e., in the case where npages is 0),
!          * it might indicate a halfway position or cross a logid boundary,
!          * so alignment is needed. Otherwise, since it's guaranteed that
!          * LogstreamResult.Send indicates end of previous page and we have
!          * not crossed a logid boundary yet in this loop iteration,
!          * we have only to increment it by XLOG_BLCKSZ bytes.
           */
!         if (npages == 0)
!         {
!             startpos = LogstreamResult.Send.xrecoff % XLOG_BLCKSZ;
!             startoff = LogstreamResult.Send.xrecoff % XLogSegSize - startpos;

!             LogstreamResult.Send.xrecoff += XLOG_BLCKSZ - startpos;
!             if (LogstreamResult.Send.xrecoff > XLogFileSize)
!             {
!                 LogstreamResult.Send.xlogid++;
!                 LogstreamResult.Send.xrecoff %= XLogFileSize;
!             }
!         }
!         else
!             LogstreamResult.Send.xrecoff += XLOG_BLCKSZ;
!         ispartialpage = XLByteLT(SendRqstPtr, LogstreamResult.Send);

!         npages++;

          /*
!          * Read and send the set if this will be the last loop iteration,
!          * or if the number of pages in the set is larger than
!          * MaxPagesPerXLogData, or if we are at the end of the logfile
!          * segment.
           */
-         last_iteration = !XLByteLT(LogstreamResult.Send, SendRqstPtr);
-         if (last_iteration)
-         {
-             endpos = SendRqstPtr.xrecoff % XLOG_BLCKSZ;
-             if (endpos == 0)
-                 endpos = XLOG_BLCKSZ;
-         }
-
-         finishing_seg = !ispartialpage &&
-             (startoff + npages * XLOG_BLCKSZ) >= XLogSegSize;

!         /* Only asked to send a partial page */
!         if (ispartialpage)
!             LogstreamResult.Send = SendRqstPtr;

!         if (last_iteration ||
!             npages >= MaxPagesPerXLogData ||
!             finishing_seg)
          {
!             Size    nbytes;
!             uint8    flags = 0;
!
!             if (finishing_seg)
!                 flags |= XLOGSTREAM_END_SEG;
!
!             /*
!              * XXX: Should we request the standby to fsync the log if the
!              * current set might include a shutdown checkpoint record?
!              */
!
!             /* OK to read and send the log */
!             pq_beginasyncmsg(outMsg, 'w');
!             pq_sendint(outMsg->buf, flags, 1);
!             pq_sendint(outMsg->buf, LogstreamResult.Send.xlogid, 4);
!             pq_sendint(outMsg->buf, LogstreamResult.Send.xrecoff, 4);
!
!             nbytes = (npages - 1) * (Size) XLOG_BLCKSZ - startpos + endpos;
!
!             /*
!              * Read the log into the output buffer directly to prevent
!              * extra memcpy calls.
!              */
!             XLogRead(BufferGetStringInfo(outMsg->buf, nbytes),
!                      startoff + startpos, nbytes, LogstreamResult.Send);

!             res = pq_endasyncmsg(outMsg);
!             if (res < 0)
!                 return false;
!             if (res == 0)
!                 break;

!             /*
!              * Stop sending the log for another job (e.g., checking for
!              * interrupts) periodically.
!              */
!             if (++nmsgs > MaxMsgsPerXLogSend)
!             {
!                 pending_xlog_send = true;
!                 break;
!             }
!
!             npages = 0;
!         }

!         if (ispartialpage)
              break;
      }

--- 522,588 ----
       * sending in the last page. We must initialize all of them to
       * keep the compiler quiet.
       */

      while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
      {
+         XLogRecPtr startptr;
+         XLogRecPtr endptr;
+         Size    nbytes;
+         uint8    flags = 0;
+
          /*
!          * Figure out how much to send in one message. If there's less than
!          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
!          * MAX_SEND_SIZE bytes, but round to page boundary for efficiency.
           */
!         startptr = LogstreamResult.Send;
!         endptr = startptr;
!         endptr.xrecoff += MAX_SEND_SIZE;
!         if(endptr.xrecoff < startptr.xrecoff)
!             endptr.xlogid++; /* xrecoff overflowed */

!         /* round down to page boundary */
!         endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);

!         if (XLByteLT(SendRqstPtr, endptr))
!             endptr = SendRqstPtr;

          /*
!          * XXX: Should we request the standby to fsync the log if the
!          * current set might include a shutdown checkpoint record?
!          *
!          * Heikki: Well, we don't do that with other checkpoints, I don't
!          * see why we should at a shutdown checkpoint. However, perhaps
!          * walreceiver should do an fsync whenever the connection is lost,
!          * whatever the reason (e.g the master has been shut down) ?
           */

!         /* OK to read and send the log */
!         pq_beginasyncmsg(outMsg, 'w');
!         pq_sendint(outMsg->buf, flags, 1);
!         pq_sendint(outMsg->buf, startptr.xlogid, 4);
!         pq_sendint(outMsg->buf, startptr.xrecoff, 4);

!         if (endptr.xlogid != startptr.xlogid)
          {
!             Assert(endptr.xlogid == startptr.xlogid + 1);
!             nbytes = (0xffffffff - endptr.xrecoff) + startptr.xrecoff;
!         }
!         else
!             nbytes = endptr.xrecoff - startptr.xrecoff;

!         LogstreamResult.Send = endptr;

!         /*
!          * Read the log into the output buffer directly to prevent
!          * extra memcpy calls.
!          */
!         XLogRead(BufferGetStringInfo(outMsg->buf, nbytes), startptr, nbytes);

!         res = pq_endasyncmsg(outMsg);
!         if (res < 0)
!             return false;
!         if (res == 0)
              break;
      }


pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: Re: Linux LSB init script
Next
From: Boszormenyi Zoltan
Date:
Subject: Re: SELECT ... FOR UPDATE [WAIT integer | NOWAIT] for 8.5