Re: Keepalive for max_standby_delay - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Keepalive for max_standby_delay
Date
Msg-id 26561.1278101495@sss.pgh.pa.us
Whole thread Raw
In response to Re: Keepalive for max_standby_delay  (Robert Haas <robertmhaas@gmail.com>)
Responses Re: Keepalive for max_standby_delay
List pgsql-hackers
[ Apologies for the very slow turnaround on this --- I got hit with
another batch of non-postgres security issues this week. ]

Attached is a draft patch for revising the max_standby_delay behavior into
something I think is a bit saner.  There is some unfinished business:

* I haven't touched the documentation yet

* The code in xlog.c needs to be reverted to its former behavior so that
recoveryLastXTime means what it's supposed to mean, ie just the last
commit/abort timestamp.

However neither of these affects the testability of the patch.

Basically the way that it works is that the standby delay is computed with
reference to XLogReceiptTime rather than any timestamp obtained from WAL.
XLogReceiptTime is set to current time whenever we obtain a WAL segment
from the archives or when we begin processing a fresh batch of WAL from
walreceiver.  There's a subtlety in the streaming case: we don't advance
XLogReceiptTime if we are not caught up, that is if the startup process
is more than one flush cycle behind walreceiver.  In the normal case
we'll advance XLogReceiptTime on each flush cycle, but once we start
falling behind, it doesn't move so the grace time alloted to conflicting
queries begins to decrease.

I split max_standby_delay into two GUC variables, as previously
threatened: max_standby_archive_delay and max_standby_streaming_delay.
The former applies when processing data read from archive, the latter
when processing data received from walreceiver.  I think this is really
quite important given the new behavior, because max_standby_archive_delay
ought to be set with reference to the expected time to process one WAL
segment, whereas max_standby_streaming_delay doesn't depend on that value
at all.  I'm not sure what good defaults are for these values, so I left
them at 30 seconds for the moment.  I'm inclined to think
max_standby_archive_delay ought to be quite a bit less though.

It might be worth adding a minimum-grace-time limit as we previously
discussed, but this patch doesn't attempt to do that.

Comments?

            regards, tom lane

Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.427
diff -c -r1.427 xlog.c
*** src/backend/access/transam/xlog.c    28 Jun 2010 19:46:19 -0000    1.427
--- src/backend/access/transam/xlog.c    2 Jul 2010 20:01:24 -0000
***************
*** 72,78 ****
  bool        XLogArchiveMode = false;
  char       *XLogArchiveCommand = NULL;
  bool        EnableHotStandby = false;
- int            MaxStandbyDelay = 30 * 1000;
  bool        fullPageWrites = true;
  bool        log_checkpoints = false;
  int            sync_method = DEFAULT_SYNC_METHOD;
--- 72,77 ----
***************
*** 487,493 ****
   * Keeps track of which sources we've tried to read the current WAL
   * record from and failed.
   */
! static int failedSources = 0;

  /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
  static char *readBuf = NULL;
--- 487,502 ----
   * Keeps track of which sources we've tried to read the current WAL
   * record from and failed.
   */
! static int failedSources = 0;    /* OR of XLOG_FROM_* codes */
!
! /*
!  * These variables track when we last obtained some WAL data to process,
!  * and where we got it from.  (XLogReceiptSource is initially the same as
!  * readSource, but readSource gets reset to zero when we don't have data
!  * to process right now.)
!  */
! static TimestampTz XLogReceiptTime = 0;
! static int XLogReceiptSource = 0;    /* XLOG_FROM_* code */

  /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
  static char *readBuf = NULL;
***************
*** 2626,2632 ****
   * Open a logfile segment for reading (during recovery).
   *
   * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
!  * If source = XLOG_FROM_PG_XLOG, it's read from pg_xlog.
   */
  static int
  XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
--- 2635,2641 ----
   * Open a logfile segment for reading (during recovery).
   *
   * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
!  * Otherwise, it's assumed to be already available in pg_xlog.
   */
  static int
  XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
***************
*** 2655,2660 ****
--- 2664,2670 ----
              break;

          case XLOG_FROM_PG_XLOG:
+         case XLOG_FROM_STREAM:
              XLogFilePath(path, tli, log, seg);
              restoredFromArchive = false;
              break;
***************
*** 2674,2680 ****
--- 2684,2696 ----
                   xlogfname);
          set_ps_display(activitymsg, false);

+         /* Track source of data in assorted state variables */
          readSource = source;
+         XLogReceiptSource = source;
+         /* In FROM_STREAM case, caller tracks receipt time, not me */
+         if (source != XLOG_FROM_STREAM)
+             XLogReceiptTime = GetCurrentTimestamp();
+
          return fd;
      }
      if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
***************
*** 5568,5574 ****
  /*
   * Returns timestamp of last recovered commit/abort record.
   */
! TimestampTz
  GetLatestXLogTime(void)
  {
      /* use volatile pointer to prevent code rearrangement */
--- 5584,5590 ----
  /*
   * Returns timestamp of last recovered commit/abort record.
   */
! static TimestampTz
  GetLatestXLogTime(void)
  {
      /* use volatile pointer to prevent code rearrangement */
***************
*** 5582,5587 ****
--- 5598,5620 ----
  }

  /*
+  * Returns time of receipt of current chunk of XLOG data, as well as
+  * whether it was received from streaming replication or from archives.
+  */
+ void
+ GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
+ {
+     /*
+      * This must be executed in the startup process, since we don't export
+      * the relevant state to shared memory.
+      */
+     Assert(InRecovery);
+
+     *rtime = XLogReceiptTime;
+     *fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
+ }
+
+ /*
   * Note that text field supplied is a parameter name and does not require
   * translation
   */
***************
*** 6060,6065 ****
--- 6093,6101 ----
          xlogctl->recoveryLastRecPtr = ReadRecPtr;
          SpinLockRelease(&xlogctl->info_lck);

+         /* Also ensure XLogReceiptTime has a sane value */
+         XLogReceiptTime = GetCurrentTimestamp();
+
          /*
           * Let postmaster know we've started redo now, so that it can
           * launch bgwriter to perform restartpoints.  We don't bother
***************
*** 7647,7653 ****
          XLogRecPtr    endptr;

          /* Get the current (or recent) end of xlog */
!         endptr = GetWalRcvWriteRecPtr();

          PrevLogSeg(_logId, _logSeg);
          RemoveOldXlogFiles(_logId, _logSeg, endptr);
--- 7683,7689 ----
          XLogRecPtr    endptr;

          /* Get the current (or recent) end of xlog */
!         endptr = GetWalRcvWriteRecPtr(NULL);

          PrevLogSeg(_logId, _logSeg);
          RemoveOldXlogFiles(_logId, _logSeg, endptr);
***************
*** 8757,8763 ****
      XLogRecPtr    recptr;
      char        location[MAXFNAMELEN];

!     recptr = GetWalRcvWriteRecPtr();

      if (recptr.xlogid == 0 && recptr.xrecoff == 0)
          PG_RETURN_NULL();
--- 8793,8799 ----
      XLogRecPtr    recptr;
      char        location[MAXFNAMELEN];

!     recptr = GetWalRcvWriteRecPtr(NULL);

      if (recptr.xlogid == 0 && recptr.xrecoff == 0)
          PG_RETURN_NULL();
***************
*** 9272,9277 ****
--- 9308,9315 ----
              {
                  if (WalRcvInProgress())
                  {
+                     bool    havedata;
+
                      /*
                       * If we find an invalid record in the WAL streamed from
                       * master, something is seriously wrong. There's little
***************
*** 9289,9316 ****
                      }

                      /*
!                      * While walreceiver is active, wait for new WAL to arrive
!                      * from primary.
                       */
-                     receivedUpto = GetWalRcvWriteRecPtr();
                      if (XLByteLT(*RecPtr, receivedUpto))
                      {
                          /*
                           * Great, streamed far enough. Open the file if it's
!                          * not open already.
                           */
                          if (readFile < 0)
                          {
                              readFile =
                                  XLogFileRead(readId, readSeg, PANIC,
                                               recoveryTargetTLI,
!                                              XLOG_FROM_PG_XLOG, false);
                              switched_segment = true;
                              readSource = XLOG_FROM_STREAM;
                          }
                          break;
                      }

                      if (CheckForStandbyTrigger())
                          goto triggered;

--- 9327,9388 ----
                      }

                      /*
!                      * Walreceiver is active, so see if new data has arrived.
!                      *
!                      * We only advance XLogReceiptTime when we obtain fresh
!                      * WAL from walreceiver and observe that we had already
!                      * processed everything before the most recent "chunk"
!                      * that it flushed to disk.  In steady state where we are
!                      * keeping up with the incoming data, XLogReceiptTime
!                      * will be updated on each cycle.  When we are behind,
!                      * XLogReceiptTime will not advance, so the grace time
!                      * alloted to conflicting queries will decrease.
                       */
                      if (XLByteLT(*RecPtr, receivedUpto))
+                         havedata = true;
+                     else
+                     {
+                         XLogRecPtr    latestChunkStart;
+
+                         receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+                         if (XLByteLT(*RecPtr, receivedUpto))
+                         {
+                             havedata = true;
+                             if (!XLByteLT(*RecPtr, latestChunkStart))
+                                 XLogReceiptTime = GetCurrentTimestamp();
+                         }
+                         else
+                             havedata = false;
+                     }
+                     if (havedata)
                      {
                          /*
                           * Great, streamed far enough. Open the file if it's
!                          * not open already.  Use XLOG_FROM_STREAM so that
!                          * source info is set correctly and XLogReceiptTime
!                          * isn't changed.
                           */
                          if (readFile < 0)
                          {
                              readFile =
                                  XLogFileRead(readId, readSeg, PANIC,
                                               recoveryTargetTLI,
!                                              XLOG_FROM_STREAM, false);
!                             Assert(readFile >= 0);
                              switched_segment = true;
+                         }
+                         else
+                         {
+                             /* just make sure source info is correct... */
                              readSource = XLOG_FROM_STREAM;
+                             XLogReceiptSource = XLOG_FROM_STREAM;
                          }
                          break;
                      }

+                     /*
+                      * Data not here yet, so check for trigger then sleep.
+                      */
                      if (CheckForStandbyTrigger())
                          goto triggered;

Index: src/backend/replication/walreceiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/replication/walreceiver.c,v
retrieving revision 1.14
diff -c -r1.14 walreceiver.c
*** src/backend/replication/walreceiver.c    9 Jun 2010 15:04:07 -0000    1.14
--- src/backend/replication/walreceiver.c    2 Jul 2010 20:01:24 -0000
***************
*** 524,529 ****
--- 524,530 ----

          /* Update shared-memory status */
          SpinLockAcquire(&walrcv->mutex);
+         walrcv->latestChunkStart = walrcv->receivedUpto;
          walrcv->receivedUpto = LogstreamResult.Flush;
          SpinLockRelease(&walrcv->mutex);

Index: src/backend/replication/walreceiverfuncs.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/replication/walreceiverfuncs.c,v
retrieving revision 1.5
diff -c -r1.5 walreceiverfuncs.c
*** src/backend/replication/walreceiverfuncs.c    28 Apr 2010 16:54:15 -0000    1.5
--- src/backend/replication/walreceiverfuncs.c    2 Jul 2010 20:01:24 -0000
***************
*** 199,214 ****
      walrcv->startTime = now;

      walrcv->receivedUpto = recptr;
      SpinLockRelease(&walrcv->mutex);

      SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
  }

  /*
!  * Returns the byte position that walreceiver has written
   */
  XLogRecPtr
! GetWalRcvWriteRecPtr(void)
  {
      /* use volatile pointer to prevent code rearrangement */
      volatile WalRcvData *walrcv = WalRcv;
--- 200,221 ----
      walrcv->startTime = now;

      walrcv->receivedUpto = recptr;
+     walrcv->latestChunkStart = recptr;
+
      SpinLockRelease(&walrcv->mutex);

      SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
  }

  /*
!  * Returns the last+1 byte position that walreceiver has written.
!  *
!  * Optionally, returns the previous chunk start, that is the first byte
!  * written in the most recent walreceiver flush cycle.  Callers not
!  * interested in that value may pass NULL for latestChunkStart.
   */
  XLogRecPtr
! GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
  {
      /* use volatile pointer to prevent code rearrangement */
      volatile WalRcvData *walrcv = WalRcv;
***************
*** 216,221 ****
--- 223,230 ----

      SpinLockAcquire(&walrcv->mutex);
      recptr = walrcv->receivedUpto;
+     if (latestChunkStart)
+         *latestChunkStart = walrcv->latestChunkStart;
      SpinLockRelease(&walrcv->mutex);

      return recptr;
Index: src/backend/storage/ipc/standby.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/storage/ipc/standby.c,v
retrieving revision 1.25
diff -c -r1.25 standby.c
*** src/backend/storage/ipc/standby.c    14 Jun 2010 00:49:24 -0000    1.25
--- src/backend/storage/ipc/standby.c    2 Jul 2010 20:01:24 -0000
***************
*** 30,36 ****
--- 30,39 ----
  #include "storage/standby.h"
  #include "utils/ps_status.h"

+ /* User-settable GUC parameters */
  int            vacuum_defer_cleanup_age;
+ int            max_standby_archive_delay = 30 * 1000;
+ int            max_standby_streaming_delay = 30 * 1000;

  static List *RecoveryLockList;

***************
*** 113,118 ****
--- 117,152 ----
   * -----------------------------------------------------
   */

+ /*
+  * Determine the cutoff time at which we want to start canceling conflicting
+  * transactions.  Returns zero (a time safely in the past) if we are willing
+  * to wait forever.
+  */
+ static TimestampTz
+ GetStandbyLimitTime(void)
+ {
+     TimestampTz    rtime;
+     bool        fromStream;
+
+     /*
+      * The cutoff time is the last WAL data receipt time plus the appropriate
+      * delay variable.  Delay of -1 means wait forever.
+      */
+     GetXLogReceiptTime(&rtime, &fromStream);
+     if (fromStream)
+     {
+         if (max_standby_streaming_delay < 0)
+             return 0;            /* wait forever */
+         return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
+     }
+     else
+     {
+         if (max_standby_archive_delay < 0)
+             return 0;            /* wait forever */
+         return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
+     }
+ }
+
  #define STANDBY_INITIAL_WAIT_US  1000
  static int    standbyWait_us = STANDBY_INITIAL_WAIT_US;

***************
*** 124,133 ****
  static bool
  WaitExceedsMaxStandbyDelay(void)
  {
!     /* Are we past max_standby_delay? */
!     if (MaxStandbyDelay >= 0 &&
!         TimestampDifferenceExceeds(GetLatestXLogTime(), GetCurrentTimestamp(),
!                                    MaxStandbyDelay))
          return true;

      /*
--- 158,168 ----
  static bool
  WaitExceedsMaxStandbyDelay(void)
  {
!     TimestampTz    ltime;
!
!     /* Are we past the limit time? */
!     ltime = GetStandbyLimitTime();
!     if (ltime && GetCurrentTimestamp() >= ltime)
          return true;

      /*
***************
*** 368,433 ****
   * Startup is sleeping and the query waits on a lock. We protect against
   * only the former sequence here, the latter sequence is checked prior to
   * the query sleeping, in CheckRecoveryConflictDeadlock().
   */
  void
  ResolveRecoveryConflictWithBufferPin(void)
  {
      bool        sig_alarm_enabled = false;

      Assert(InHotStandby);

!     if (MaxStandbyDelay == 0)
!     {
!         /*
!          * We don't want to wait, so just tell everybody holding the pin to
!          * get out of town.
!          */
!         SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
!     }
!     else if (MaxStandbyDelay < 0)
!     {
!         TimestampTz now = GetCurrentTimestamp();

          /*
!          * Set timeout for deadlock check (only)
           */
          if (enable_standby_sig_alarm(now, now, true))
              sig_alarm_enabled = true;
          else
              elog(FATAL, "could not set timer for process wakeup");
      }
      else
      {
!         TimestampTz then = GetLatestXLogTime();
!         TimestampTz now = GetCurrentTimestamp();
!
!         /* Are we past max_standby_delay? */
!         if (TimestampDifferenceExceeds(then, now, MaxStandbyDelay))
!         {
!             /*
!              * We're already behind, so clear a path as quickly as possible.
!              */
!             SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
!         }
          else
!         {
!             TimestampTz max_standby_time;
!
!             /*
!              * At what point in the future do we hit MaxStandbyDelay?
!              */
!             max_standby_time = TimestampTzPlusMilliseconds(then, MaxStandbyDelay);
!             Assert(max_standby_time > now);
!
!             /*
!              * Wake up at MaxStandby delay, and check for deadlocks as well
!              * if we will be waiting longer than deadlock_timeout
!              */
!             if (enable_standby_sig_alarm(now, max_standby_time, false))
!                 sig_alarm_enabled = true;
!             else
!                 elog(FATAL, "could not set timer for process wakeup");
!         }
      }

      /* Wait to be signaled by UnpinBuffer() */
--- 402,452 ----
   * Startup is sleeping and the query waits on a lock. We protect against
   * only the former sequence here, the latter sequence is checked prior to
   * the query sleeping, in CheckRecoveryConflictDeadlock().
+  *
+  * Deadlocks are extremely rare, and relatively expensive to check for,
+  * so we don't do a deadlock check right away ... only if we have had to wait
+  * at least deadlock_timeout.  Most of the logic about that is in proc.c.
   */
  void
  ResolveRecoveryConflictWithBufferPin(void)
  {
      bool        sig_alarm_enabled = false;
+     TimestampTz    ltime;
+     TimestampTz    now;

      Assert(InHotStandby);

!     ltime = GetStandbyLimitTime();
!     now = GetCurrentTimestamp();

+     if (!ltime)
+     {
          /*
!          * We're willing to wait forever for conflicts, so set timeout for
!          * deadlock check (only)
           */
          if (enable_standby_sig_alarm(now, now, true))
              sig_alarm_enabled = true;
          else
              elog(FATAL, "could not set timer for process wakeup");
      }
+     else if (now >= ltime)
+     {
+         /*
+          * We're already behind, so clear a path as quickly as possible.
+          */
+         SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
+     }
      else
      {
!         /*
!          * Wake up at ltime, and check for deadlocks as well if we will be
!          * waiting longer than deadlock_timeout
!          */
!         if (enable_standby_sig_alarm(now, ltime, false))
!             sig_alarm_enabled = true;
          else
!             elog(FATAL, "could not set timer for process wakeup");
      }

      /* Wait to be signaled by UnpinBuffer() */
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.557
diff -c -r1.557 guc.c
*** src/backend/utils/misc/guc.c    25 Jun 2010 13:11:25 -0000    1.557
--- src/backend/utils/misc/guc.c    2 Jul 2010 20:01:25 -0000
***************
*** 57,62 ****
--- 57,63 ----
  #include "postmaster/walwriter.h"
  #include "replication/walsender.h"
  #include "storage/bufmgr.h"
+ #include "storage/standby.h"
  #include "storage/fd.h"
  #include "tcop/tcopprot.h"
  #include "tsearch/ts_cache.h"
***************
*** 116,122 ****
  extern char *temp_tablespaces;
  extern bool synchronize_seqscans;
  extern bool fullPageWrites;
- extern int    vacuum_defer_cleanup_age;
  extern int    ssl_renegotiation_limit;

  #ifdef TRACE_SORT
--- 117,122 ----
***************
*** 1373,1378 ****
--- 1373,1398 ----
          1000, 1, INT_MAX / 1000, NULL, NULL
      },

+     {
+         {"max_standby_archive_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+             gettext_noop("Sets the maximum delay before canceling queries when a hot standby server is processing
archivedWAL data."), 
+             NULL,
+             GUC_UNIT_MS
+         },
+         &max_standby_archive_delay,
+         30 * 1000, -1, INT_MAX / 1000, NULL, NULL
+     },
+
+     {
+         {"max_standby_streaming_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+             gettext_noop("Sets the maximum delay before canceling queries when a hot standby server is processing
streamedWAL data."), 
+             NULL,
+             GUC_UNIT_MS
+         },
+         &max_standby_streaming_delay,
+         30 * 1000, -1, INT_MAX / 1000, NULL, NULL
+     },
+
      /*
       * Note: MaxBackends is limited to INT_MAX/4 because some places compute
       * 4*MaxBackends without any overflow check.  This check is made in
***************
*** 1393,1408 ****
      },

      {
-         {"max_standby_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
-             gettext_noop("Sets the maximum delay to avoid conflict processing on hot standby servers."),
-             NULL,
-             GUC_UNIT_MS
-         },
-         &MaxStandbyDelay,
-         30 * 1000, -1, INT_MAX / 1000, NULL, NULL
-     },
-
-     {
          {"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
              gettext_noop("Sets the number of connection slots reserved for superusers."),
              NULL
--- 1413,1418 ----
Index: src/include/access/xlog.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/access/xlog.h,v
retrieving revision 1.113
diff -c -r1.113 xlog.h
*** src/include/access/xlog.h    17 Jun 2010 16:41:25 -0000    1.113
--- src/include/access/xlog.h    2 Jul 2010 20:01:25 -0000
***************
*** 193,199 ****
  extern bool XLogArchiveMode;
  extern char *XLogArchiveCommand;
  extern bool EnableHotStandby;
- extern int    MaxStandbyDelay;
  extern bool log_checkpoints;

  /* WAL levels */
--- 197,202 ----
***************
*** 279,285 ****

  extern bool RecoveryInProgress(void);
  extern bool XLogInsertAllowed(void);
! extern TimestampTz GetLatestXLogTime(void);

  extern void UpdateControlFile(void);
  extern uint64 GetSystemIdentifier(void);
--- 282,288 ----

  extern bool RecoveryInProgress(void);
  extern bool XLogInsertAllowed(void);
! extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);

  extern void UpdateControlFile(void);
  extern uint64 GetSystemIdentifier(void);
Index: src/include/replication/walreceiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/replication/walreceiver.h,v
retrieving revision 1.9
diff -c -r1.9 walreceiver.h
*** src/include/replication/walreceiver.h    3 Jun 2010 22:17:32 -0000    1.9
--- src/include/replication/walreceiver.h    2 Jul 2010 20:01:25 -0000
***************
*** 41,65 ****
  typedef struct
  {
      /*
!      * connection string; is used for walreceiver to connect with the primary.
!      */
!     char        conninfo[MAXCONNINFO];
!
!     /*
!      * PID of currently active walreceiver process, and the current state.
       */
      pid_t        pid;
      WalRcvState walRcvState;
      pg_time_t    startTime;

      /*
!      * receivedUpto-1 is the last byte position that has been already
!      * received. When startup process starts the walreceiver, it sets this to
!      * the point where it wants the streaming to begin. After that,
!      * walreceiver updates this whenever it flushes the received WAL.
       */
      XLogRecPtr    receivedUpto;

      slock_t        mutex;            /* locks shared variables shown above */
  } WalRcvData;

--- 41,75 ----
  typedef struct
  {
      /*
!      * PID of currently active walreceiver process, its current state and
!      * start time (actually, the time at which it was requested to be started).
       */
      pid_t        pid;
      WalRcvState walRcvState;
      pg_time_t    startTime;

      /*
!      * receivedUpto-1 is the last byte position that has already been
!      * received.  When startup process starts the walreceiver, it sets
!      * receivedUpto to the point where it wants the streaming to begin.
!      * After that, walreceiver updates this whenever it flushes the received
!      * WAL to disk.
       */
      XLogRecPtr    receivedUpto;

+     /*
+      * latestChunkStart is the starting byte position of the current "batch"
+      * of received WAL.  It's actually the same as the previous value of
+      * receivedUpto before the last flush to disk.  Startup process can use
+      * this to detect whether it's keeping up or not.
+      */
+     XLogRecPtr    latestChunkStart;
+
+     /*
+      * connection string; is used for walreceiver to connect with the primary.
+      */
+     char        conninfo[MAXCONNINFO];
+
      slock_t        mutex;            /* locks shared variables shown above */
  } WalRcvData;

***************
*** 83,88 ****
  extern bool WalRcvInProgress(void);
  extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
! extern XLogRecPtr GetWalRcvWriteRecPtr(void);

  #endif   /* _WALRECEIVER_H */
--- 93,98 ----
  extern bool WalRcvInProgress(void);
  extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
! extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);

  #endif   /* _WALRECEIVER_H */
Index: src/include/storage/standby.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/storage/standby.h,v
retrieving revision 1.10
diff -c -r1.10 standby.h
*** src/include/storage/standby.h    13 May 2010 11:15:38 -0000    1.10
--- src/include/storage/standby.h    2 Jul 2010 20:01:25 -0000
***************
*** 19,25 ****
--- 19,28 ----
  #include "storage/procsignal.h"
  #include "storage/relfilenode.h"

+ /* User-settable GUC parameters */
  extern int    vacuum_defer_cleanup_age;
+ extern int    max_standby_archive_delay;
+ extern int    max_standby_streaming_delay;

  extern void InitRecoveryTransactionEnvironment(void);
  extern void ShutdownRecoveryTransactionEnvironment(void);

pgsql-hackers by date:

Previous
From: "David E. Wheeler"
Date:
Subject: Re: hstore ==> and deprecate =>
Next
From: Robert Haas
Date:
Subject: Re: [COMMITTERS] pgsql: Allow copydir() to be interrupted.