Re: Streaming replication and a disk full in primary - Mailing list pgsql-hackers

From Heikki Linnakangas
Subject Re: Streaming replication and a disk full in primary
Date
Msg-id 4BBC581C.5060204@enterprisedb.com
Whole thread Raw
In response to Re: Streaming replication and a disk full in primary  (Fujii Masao <masao.fujii@gmail.com>)
Responses Re: Streaming replication and a disk full in primary  (Robert Haas <robertmhaas@gmail.com>)
Re: Streaming replication and a disk full in primary  (Fujii Masao <masao.fujii@gmail.com>)
List pgsql-hackers
This task has been languishing for a long time, so I took a shot at it.
I took the approach I suggested before, keeping a variable in shared
memory to track the latest removed WAL segment. After walsender has read
a bunch of WAL records from a WAL file, it checks that what it read is
after the latest removed WAL segment, otherwise the data it read might
have came from a file that was already recycled and overwritten with new
data, and an error is thrown.

This changes the behavior so that if a standby server doing streaming
replication falls behind too much, the primary will remove/recycle a WAL
segment needed by the standby server. The previous behavior was that WAL
segments still needed by any connected standby server were never
removed, at the risk of filling the disk in the primary if a standby
server behaves badly.

In your version of this patch, the default was still the current
behavior where the primary retains WAL files that are still needed by
connected stadby servers indefinitely. I think that's a dangerous
default, so I changed it so that if you don't set standby_keep_segments,
the primary doesn't retain any extra segments; the number of WAL
segments available for standby servers is determined only by the
location of the previous checkpoint, and the status of WAL archiving.
That makes the code a bit simpler too, as we never care how far the
walsenders are. In fact, the GetOldestWALSenderPointer() function is now
dead code.

Fujii Masao wrote:
> Thanks for the review! And, sorry for the delay.
>
> On Thu, Jan 21, 2010 at 11:10 PM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>> I don't think we should do the check XLogWrite(). There's really no
>> reason to kill the standby connections before the next checkpoint, when
>> the old WAL files are recycled. XLogWrite() is in the critical path of
>> normal operations, too.
>
> OK. I'll remove that check from XLogWrite().
>
>> There's another important reason for that: If archiving is not working
>> for some reason, the standby can't obtain the old segments from the
>> archive either. If we refuse to stream such old segments, and they're
>> not getting archived, the standby has no way to catch up until archiving
>> is fixed. Allowing streaming of such old segments is free wrt. disk
>> space, because we're keeping the files around anyway.
>
> OK. We should terminate the walsender whose currently-opened WAL file
> has been already archived, isn't required for crash recovery AND is
> 'max-lag' older than the currently-written one. I'll change so.
>
>> Walreceiver will get an error if it tries to open a segment that's been
>> deleted or recycled already. The dangerous situation we need to avoid is
>> when walreceiver holds a file open while bgwriter recycles it.
>> Walreceiver will merrily continue streaming data from it, even though
>> it's be overwritten by new data already.
>
> s/walreceiver/walsender ?
>
> Yes, that's the problem that I'll have to fix.
>
>> A straightforward fix is to keep an "newest recycled XLogRecPtr" in
>> shared memory that RemoveOldXlogFiles() updates. Walreceiver checks it
>> right after read()ing from a file, before sending it to the client, and
>> throws an error if the data it read() was already recycled.
>
> I prefer this. But I don't think such an aggressive check of a "newest
> recycled XLogRecPtr" is required if the bgwriter always doesn't delete
> the WAL file which is newer than or equal to the walsenders' oldest WAL
> file. In other words, the WAL files which the walsender is reading (or
> will read) are not removed at the moment.
>
> Regards,
>


--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1823,1828 **** archive_command = 'copy "%p" "C:\\server\\archivedir\\%f"'  # Windows
--- 1823,1856 ----
         </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry id="guc-replication-lag-segments" xreflabel="replication_lag_segments">
+        <term><varname>standby_keep_segments</varname> (<type>integer</type>)</term>
+        <indexterm>
+         <primary><varname>standby_keep_segments</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies the number of log file segments kept in <filename>pg_xlog</>
+         directory, in case a standby server needs to fetch them via streaming
+         replciation. Each segment is normally 16 megabytes. If a standby
+         server connected to the primary falls behind more than
+         <varname>standby_keep_segments</> segments, the primary might remove
+         a WAL segment still needed by the standby and the replication
+         connection will be terminated.
+
+         This sets only the minimum number of segments retained for standby
+         purposes, the system might need to retain more segments for WAL
+         archival or to recover from a checkpoint. If standby_keep_segments
+         is zero (the default), the system doesn't keep any extra segments
+         for standby purposes, and the number of old WAL segments available
+         for standbys is determined based only on the location of the previous
+         checkpoint and status of WAL archival.
+         This parameter can only be set in the <filename>postgresql.conf</>
+         file or on the server command line.
+        </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </sect2>
      <sect2 id="runtime-config-standby">
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 66,71 ****
--- 66,72 ----

  /* User-settable parameters */
  int            CheckPointSegments = 3;
+ int            StandbySegments = 0;
  int            XLOGbuffers = 8;
  int            XLogArchiveTimeout = 0;
  bool        XLogArchiveMode = false;
***************
*** 356,361 **** typedef struct XLogCtlData
--- 357,364 ----
      uint32        ckptXidEpoch;    /* nextXID & epoch of latest checkpoint */
      TransactionId ckptXid;
      XLogRecPtr    asyncCommitLSN; /* LSN of newest async commit */
+     uint32        lastRemovedLog;    /* latest removed/recycled XLOG segment */
+     uint32        lastRemovedSeg;

      /* Protected by WALWriteLock: */
      XLogCtlWrite Write;
***************
*** 3150,3155 **** PreallocXlogFiles(XLogRecPtr endptr)
--- 3153,3174 ----
  }

  /*
+  * Get the log/seg of the latest removed or recycled WAL segment.
+  * Returns 0 if no WAL segments have been removed since startup.
+  */
+ void
+ XLogGetLastRemoved(uint32 *log, uint32 *seg)
+ {
+     /* use volatile pointer to prevent code rearrangement */
+     volatile XLogCtlData *xlogctl = XLogCtl;
+
+     SpinLockAcquire(&xlogctl->info_lck);
+     *log = xlogctl->lastRemovedLog;
+     *seg = xlogctl->lastRemovedSeg;
+     SpinLockRelease(&xlogctl->info_lck);
+ }
+
+ /*
   * Recycle or remove all log files older or equal to passed log/seg#
   *
   * endptr is current (or recent) end of xlog; this is used to determine
***************
*** 3170,3175 **** RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
--- 3189,3208 ----
      char        newpath[MAXPGPATH];
  #endif
      struct stat statbuf;
+     /* use volatile pointer to prevent code rearrangement */
+     volatile XLogCtlData *xlogctl = XLogCtl;
+
+     /* Update the last removed location in shared memory first */
+     SpinLockAcquire(&xlogctl->info_lck);
+     if (log > xlogctl->lastRemovedLog ||
+         (log == xlogctl->lastRemovedLog && seg > xlogctl->lastRemovedSeg))
+     {
+         xlogctl->lastRemovedLog = log;
+         xlogctl->lastRemovedSeg = seg;
+     }
+     SpinLockRelease(&xlogctl->info_lck);
+
+     elog(DEBUG1, "removing WAL segments older than %X/%X", log, seg);

      /*
       * Initialize info about where to try to recycle to.  We allow recycling
***************
*** 7101,7136 **** CreateCheckPoint(int flags)
      smgrpostckpt();

      /*
!      * If there's connected standby servers doing XLOG streaming, don't delete
!      * XLOG files that have not been streamed to all of them yet. This does
!      * nothing to prevent them from being deleted when the standby is
!      * disconnected (e.g because of network problems), but at least it avoids
!      * an open replication connection from failing because of that.
       */
!     if ((_logId || _logSeg) && max_wal_senders > 0)
      {
!         XLogRecPtr    oldest;
!         uint32        log;
!         uint32        seg;
!
!         oldest = GetOldestWALSendPointer();
!         if (oldest.xlogid != 0 || oldest.xrecoff != 0)
          {
!             XLByteToSeg(oldest, log, seg);
              if (log < _logId || (log == _logId && seg < _logSeg))
              {
                  _logId = log;
                  _logSeg = seg;
              }
          }
-     }

-     /*
-      * Delete old log files (those no longer needed even for previous
-      * checkpoint or the standbys in XLOG streaming).
-      */
-     if (_logId || _logSeg)
-     {
          PrevLogSeg(_logId, _logSeg);
          RemoveOldXlogFiles(_logId, _logSeg, recptr);
      }
--- 7134,7184 ----
      smgrpostckpt();

      /*
!      * Delete old log files (those no longer needed even for previous
!      * checkpoint or the standbys in XLOG streaming).
       */
!     if (_logId || _logSeg)
      {
!         /*
!          * Calculate the last segment that we need to retain because of
!          * standby_keep_segments, by subtracting StandbySegments from the
!          * new checkpoint location.
!          */
!         if (StandbySegments > 0)
          {
!             uint32        log;
!             uint32        seg;
!             int            d_log;
!             int            d_seg;
!
!             XLByteToSeg(recptr, log, seg);
!
!             d_seg = StandbySegments % XLogSegsPerFile;
!             d_log = StandbySegments / XLogSegsPerFile;
!             if (seg < d_seg)
!             {
!                 d_log += 1;
!                 seg = seg - d_seg + XLogSegsPerFile;
!             }
!             else
!                 seg = seg - d_seg;
!             /* avoid underflow, don't go below (0,1) */
!             if (log < d_log || (log == d_log && seg == 0))
!             {
!                 log = 0;
!                 seg = 1;
!             }
!             else
!                 log = log - d_log;
!
!             /* don't delete WAL segments newer than the calculated segment */
              if (log < _logId || (log == _logId && seg < _logSeg))
              {
                  _logId = log;
                  _logSeg = seg;
              }
          }

          PrevLogSeg(_logId, _logSeg);
          RemoveOldXlogFiles(_logId, _logSeg, recptr);
      }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 508,513 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
--- 508,517 ----
  {
      char        path[MAXPGPATH];
      uint32        startoff;
+     uint32        lastRemovedLog;
+     uint32        lastRemovedSeg;
+     uint32        log;
+     uint32        seg;

      while (nbytes > 0)
      {
***************
*** 527,536 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)

              sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
              if (sendFile < 0)
!                 ereport(FATAL,    /* XXX: Why FATAL? */
!                         (errcode_for_file_access(),
!                          errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                                 path, sendId, sendSeg)));
              sendOff = 0;
          }

--- 531,557 ----

              sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
              if (sendFile < 0)
!             {
!                 /*
!                  * If the file is not found, assume it's because the
!                  * standby asked for a too old WAL segment that has already
!                  * been removed or recycled.
!                  */
!                 if (errno == ENOENT)
!                 {
!                     char filename[MAXFNAMELEN];
!                     XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
!                     ereport(ERROR,
!                             (errcode_for_file_access(),
!                              errmsg("requested WAL segment %s has already been removed",
!                                     filename)));
!                 }
!                 else
!                     ereport(ERROR,
!                             (errcode_for_file_access(),
!                              errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                                     path, sendId, sendSeg)));
!             }
              sendOff = 0;
          }

***************
*** 538,544 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
          if (sendOff != startoff)
          {
              if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
!                 ereport(FATAL,
                          (errcode_for_file_access(),
                           errmsg("could not seek in log file %u, segment %u to offset %u: %m",
                                  sendId, sendSeg, startoff)));
--- 559,565 ----
          if (sendOff != startoff)
          {
              if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
!                 ereport(ERROR,
                          (errcode_for_file_access(),
                           errmsg("could not seek in log file %u, segment %u to offset %u: %m",
                                  sendId, sendSeg, startoff)));
***************
*** 553,559 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)

          readbytes = read(sendFile, buf, segbytes);
          if (readbytes <= 0)
!             ereport(FATAL,
                      (errcode_for_file_access(),
              errmsg("could not read from log file %u, segment %u, offset %u, "
                     "length %lu: %m",
--- 574,580 ----

          readbytes = read(sendFile, buf, segbytes);
          if (readbytes <= 0)
!             ereport(ERROR,
                      (errcode_for_file_access(),
              errmsg("could not read from log file %u, segment %u, offset %u, "
                     "length %lu: %m",
***************
*** 566,571 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
--- 587,612 ----
          nbytes -= readbytes;
          buf += readbytes;
      }
+
+     /*
+      * After reading into the buffer, check that what we read was valid.
+      * We do this after reading, because even though the segment was present
+      * when we opened it, it might get recycled or removed while we read it.
+      * The read() succeeds in that case, but the data we tried to read might
+      * already have been overwritten with new WAL records.
+      */
+     XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
+     XLByteToPrevSeg(recptr, log, seg);
+     if (log < lastRemovedLog ||
+         (log == lastRemovedLog && seg <= lastRemovedSeg))
+     {
+         char filename[MAXFNAMELEN];
+         XLogFileName(filename, ThisTimeLineID, log, seg);
+         ereport(ERROR,
+                 (errcode_for_file_access(),
+                  errmsg("requested WAL segment %s has already been removed",
+                         filename)));
+     }
  }

  /*
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1648,1653 **** static struct config_int ConfigureNamesInt[] =
--- 1648,1662 ----
      },

      {
+         {"standby_keep_segments", PGC_SIGHUP, WAL_CHECKPOINTS,
+             gettext_noop("Sets the number of WAL files held for standby servers"),
+             NULL
+         },
+         &StandbySegments,
+         0, 0, INT_MAX, NULL, NULL
+     },
+
+     {
          {"checkpoint_segments", PGC_SIGHUP, WAL_CHECKPOINTS,
              gettext_noop("Sets the maximum distance in log segments between automatic WAL checkpoints."),
              NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 193,198 ****
--- 193,199 ----

  #max_wal_senders = 0        # max number of walsender processes
  #wal_sender_delay = 200ms    # 1-10000 milliseconds
+ #standby_keep_segments = 0    # in logfile segments, 16MB each; 0 disables


  #------------------------------------------------------------------------------
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 187,192 **** extern XLogRecPtr XactLastRecEnd;
--- 187,193 ----

  /* these variables are GUC parameters related to XLOG */
  extern int    CheckPointSegments;
+ extern int    StandbySegments;
  extern int    XLOGbuffers;
  extern bool XLogArchiveMode;
  extern char *XLogArchiveCommand;
***************
*** 267,272 **** extern int XLogFileInit(uint32 log, uint32 seg,
--- 268,274 ----
  extern int    XLogFileOpen(uint32 log, uint32 seg);


+ extern void XLogGetLastRemoved(uint32 *log, uint32 *seg);
  extern void XLogSetAsyncCommitLSN(XLogRecPtr record);

  extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup);

pgsql-hackers by date:

Previous
From: Dave Page
Date:
Subject: Re: fallback_application_name and pgbench
Next
From: Heikki Linnakangas
Date:
Subject: Re: [COMMITTERS] pgsql: Forbid using pg_xlogfile_name() and pg_xlogfile_name_offset()