diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 105d541..7d63782 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1208,6 +1208,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
standby server
+ replay_lag>
+ interval>
+ Estimated time taken for recent WAL records to be replayed on this
+ standby server
+
+
sync_priority>
integer>
Priority of this standby server for being chosen as the
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 130b56b..48a5950 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5473,6 +5473,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
XLogFlush(lsn);
/*
+ * Record the primary's timestamp for the commit record, so it can be used
+ * for tracking replay lag.
+ */
+ SetXLogReplayTimestamp(parsed->xact_time);
+
+ /*
* If asked by the primary (because someone is waiting for a synchronous
* commit = remote_apply), we will need to ask walreceiver to send a
* reply immediately.
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3e454f5..504b4df 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -81,6 +81,8 @@ extern uint32 bootstrap_data_checksum_version;
#define PROMOTE_SIGNAL_FILE "promote"
#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
+/* Size of the circular buffer of timestamped LSNs. */
+#define MAX_TIMESTAMPED_LSNS 8192
/* User-settable parameters */
int max_wal_size = 64; /* 1 GB */
@@ -360,6 +362,13 @@ static bool doRequestWalReceiverReply;
*/
static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
+/*
+ * LastReplayedTimestamp can be set by redo handlers when they apply a record
+ * that carries a timestamp, by calling SetXLogReplayedTimestamp. The xlog
+ * apply loop can then update the value in shared memory.
+ */
+static TimestampTz LastReplayedTimestamp = 0;
+
/*----------
* Shared-memory data structures for XLOG control
*
@@ -634,6 +643,21 @@ typedef struct XLogCtlData
/* current effective recovery target timeline */
TimeLineID RecoveryTargetTLI;
+ /* timestamp from the most recently applied record associated with a timestamp. */
+ TimestampTz lastReplayedTimestamp;
+
+ /*
+ * We maintain a circular buffer of LSNs and associated timestamps.
+ * Walreceiver writes into it using information from timestamps, and the
+ * startup recovery process reads from it and notifies walreceiver when
+ * LSNs are replayed so that the timestamps can eventually be fed back to
+ * the upstream server, to track lag.
+ */
+ Index timestampedLsnRead;
+ Index timestampedLsnWrite;
+ XLogRecPtr timestampedLsn[MAX_TIMESTAMPED_LSNS];
+ TimestampTz timestampedLsnTime[MAX_TIMESTAMPED_LSNS];
+
/*
* timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery
@@ -6874,20 +6898,51 @@ StartupXLOG(void)
error_context_stack = errcallback.previous;
/*
- * Update lastReplayedEndRecPtr after this record has been
- * successfully replayed.
+ * Update lastReplayedEndRecPtr and lastReplayedTimestamp
+ * after this record has been successfully replayed.
*/
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
XLogCtl->lastReplayedTLI = ThisTimeLineID;
+ if (LastReplayedTimestamp != 0)
+ {
+ /* If replaying a record produced a timestamp, use that. */
+ XLogCtl->lastReplayedTimestamp = LastReplayedTimestamp;
+ LastReplayedTimestamp = 0;
+ }
+ else
+ {
+ /*
+ * If we have applied LSNs associated with timestamps
+ * received by walreceiver, then use the recorded
+ * timestamp. We consume from the read end of the
+ * circular buffer.
+ */
+ while (XLogCtl->timestampedLsnRead !=
+ XLogCtl->timestampedLsnWrite &&
+ XLogCtl->timestampedLsn[XLogCtl->timestampedLsnRead]
+ <= EndRecPtr)
+ {
+ if (XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead] >
+ XLogCtl->lastReplayedTimestamp)
+ {
+ XLogCtl->lastReplayedTimestamp =
+ XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead];
+ doRequestWalReceiverReply = true;
+ }
+ XLogCtl->timestampedLsnRead =
+ (XLogCtl->timestampedLsnRead + 1) % MAX_TIMESTAMPED_LSNS;
+ }
+ }
SpinLockRelease(&XLogCtl->info_lck);
/*
* If rm_redo reported that it applied a commit record that
* the master is waiting for by calling
- * XLogRequestWalReceiverReply, then we wake up the receiver
- * so that it notices the updated lastReplayedEndRecPtr and
- * sends a reply to the master.
+ * XLogRequestWalReceiverReply, or we encountered a WAL
+ * location that was associated with a timestamp above, then
+ * we wake up the receiver so that it notices the updated
+ * lastReplayedEndRecPtr and sends a reply to the master.
*/
if (doRequestWalReceiverReply)
{
@@ -11619,3 +11674,91 @@ XLogRequestWalReceiverReply(void)
{
doRequestWalReceiverReply = true;
}
+
+/*
+ * Record the timestamp that is associated with a WAL position.
+ *
+ * This is called by walreceiver on standby servers when keepalive messages
+ * arrive, using timestamps generated on the primary server. The timestamp
+ * will be sent back to the primary server when the standby had applied this
+ * WAL position. The primary can use the elapsed time to estimate the replay
+ * lag.
+ */
+void
+SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn)
+{
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (lsn == XLogCtl->lastReplayedEndRecPtr)
+ {
+ /*
+ * That is the last replayed LSN: we are fully replayed, so we can
+ * update the replay timestamp immediately.
+ */
+ XLogCtl->lastReplayedTimestamp = timestamp;
+ }
+ else
+ {
+ /*
+ * There is WAL still to be applied. We will associate the timestamp
+ * with this WAL position and wait for it to be replayed. We add it
+ * at the 'write' end of the circular buffer of LSN/timestamp
+ * mappings, which the replay loop will eventually read.
+ */
+ Index w = XLogCtl->timestampedLsnWrite;
+ Index r = XLogCtl->timestampedLsnRead;
+
+ XLogCtl->timestampedLsn[w] = lsn;
+ XLogCtl->timestampedLsnTime[w] = timestamp;
+
+ /* Advance the write point. */
+ w = (w + 1) % MAX_TIMESTAMPED_LSNS;
+ XLogCtl->timestampedLsnWrite = w;
+ if (w == r)
+ {
+ /*
+ * The buffer is full. Advance the read point (throwing away
+ * oldest values; we will begin to overestimate replay lag, until
+ * lag decreases to a size our buffer can manage, or the next
+ * commit record is replayed).
+ */
+ r = (r + 1) % MAX_TIMESTAMPED_LSNS;
+ XLogCtl->timestampedLsnRead = r;
+ }
+ }
+ SpinLockRelease(&XLogCtl->info_lck);
+}
+
+/*
+ * Set the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the primary. This can be called by redo handlers that have
+ * an appropriate timestamp (currently only commit records). Updating the
+ * shared memory value is deferred until after the redo handler returns.
+ */
+void
+SetXLogReplayTimestamp(TimestampTz timestamp)
+{
+ LastReplayedTimestamp = timestamp;
+}
+
+/*
+ * Get the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the primary, and also the most recently applied LSN. (Note
+ * that the timestamp and the LSN don't necessarily relate to the same
+ * record.)
+ *
+ * This is similar to GetLatestXTime, except that it is not only advanced by
+ * commit records (see SetXLogReplayTimestampAtLsn).
+ */
+TimestampTz
+GetXLogReplayTimestamp(XLogRecPtr *lsn)
+{
+ TimestampTz result;
+
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (lsn)
+ *lsn = XLogCtl->lastReplayedEndRecPtr;
+ result = XLogCtl->lastReplayedTimestamp;
+ SpinLockRelease(&XLogCtl->info_lck);
+
+ return result;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 9ae1ef4..a53f07b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -662,6 +662,7 @@ CREATE VIEW pg_stat_replication AS
W.write_location,
W.flush_location,
W.replay_location,
+ W.replay_lag,
W.sync_priority,
W.sync_state
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2fa996d..faea9ff 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -85,6 +85,8 @@ walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
+#define MIN_TIME_BETWEEN_TIMESTAMPED_LSNS 1000 /* 1s */
+
/*
* These variables are used similarly to openLogFile/SegNo/Off,
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -102,6 +104,8 @@ static uint32 recvOff = 0;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;
+static bool recovery_active = false;
+
/*
* LogstreamResult indicates the byte positions that we have already
* written/fsynced.
@@ -143,7 +147,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, bool includeApplyTimestamp);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -443,13 +447,15 @@ WalReceiverMain(void)
{
/* The recovery process has new apply feedback to report. */
apply_feedback_requested = true;
+ recovery_active = true;
ResetLatch(&walrcv->latch);
}
len = walrcv_receive(0, &buf, &walrcv->latch);
}
/* Let the master know that we received some data. */
- XLogWalRcvSendReply(apply_feedback_requested, false);
+ XLogWalRcvSendReply(apply_feedback_requested, false,
+ apply_feedback_requested);
apply_feedback_requested = false;
/*
@@ -505,7 +511,7 @@ WalReceiverMain(void)
}
}
- XLogWalRcvSendReply(requestReply, requestReply);
+ XLogWalRcvSendReply(requestReply, requestReply, false);
XLogWalRcvSendHSFeedback(false);
}
}
@@ -836,6 +842,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
}
case 'k': /* Keepalive */
{
+ bool reportApplyTimestamp = false;
+
/* copy message to StringInfo */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
if (len != hdrlen)
@@ -852,9 +860,22 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
ProcessWalSndrMessage(walEnd, sendTime);
+ /*
+ * If no apply timestamps have been sent at the request of the
+ * recovery process since we last received a keepalive, then
+ * we will send one now. This allows us to feed back
+ * timestamps in response to pings if we are idle or if the
+ * recovery process is somehow blocked, but we don't want to
+ * do that if it's actively applying and periodically waking
+ * us up with accurate apply timestamps.
+ */
+ if (!recovery_active)
+ reportApplyTimestamp = true;
+ recovery_active = false;
+
/* If the primary requested a reply, send one immediately */
- if (replyRequested)
- XLogWalRcvSendReply(true, false);
+ if (replyRequested || reportApplyTimestamp)
+ XLogWalRcvSendReply(true, false, reportApplyTimestamp);
break;
}
default:
@@ -1017,7 +1038,7 @@ XLogWalRcvFlush(bool dying)
/* Also let the master know that we made some progress */
if (!dying)
{
- XLogWalRcvSendReply(false, false);
+ XLogWalRcvSendReply(false, false, false);
XLogWalRcvSendHSFeedback(false);
}
}
@@ -1035,15 +1056,18 @@ XLogWalRcvFlush(bool dying)
* If 'requestReply' is true, requests the server to reply immediately upon
* receiving this message. This is used for heartbearts, when approaching
* wal_receiver_timeout.
+ *
+ * If 'reportApplyTimestamp' is true, the latest apply timestamp is included.
*/
static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, bool reportApplyTimestamp)
{
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now;
+ TimestampTz applyTimestamp = 0;
/*
* If the user doesn't want status to be reported to the master, be sure
@@ -1059,10 +1083,8 @@ XLogWalRcvSendReply(bool force, bool requestReply)
* We can compare the write and flush positions to the last message we
* sent without taking any lock, but the apply position requires a spin
* lock, so we don't check that unless something else has changed or 10
- * seconds have passed. This means that the apply log position will
- * appear, from the master's point of view, to lag slightly, but since
- * this is only for reporting purposes and only on idle systems, that's
- * probably OK.
+ * seconds have passed, or the force flag has been set (which happens when
+ * apply feedback has been requested by the primary).
*/
if (!force
&& writePtr == LogstreamResult.Write
@@ -1075,7 +1097,10 @@ XLogWalRcvSendReply(bool force, bool requestReply)
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
- applyPtr = GetXLogReplayRecPtr(NULL);
+ if (reportApplyTimestamp)
+ applyTimestamp = GetXLogReplayTimestamp(&applyPtr);
+ else
+ applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
@@ -1083,6 +1108,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+ pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp));
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Send it */
@@ -1187,8 +1213,8 @@ static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
{
WalRcvData *walrcv = WalRcv;
-
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+ static TimestampTz lastRecordedTimestamp = 0;
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
@@ -1199,6 +1225,18 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
+ /*
+ * Remember primary's timestamp at this WAL location. We throw away
+ * samples if they are coming too fast because we don't want to fill up
+ * the finite circular buffer and have to throw away older samples.
+ */
+ if (lastRecordedTimestamp < TimestampTzPlusMilliseconds(sendTime,
+ -MIN_TIME_BETWEEN_TIMESTAMPED_LSNS))
+ {
+ SetXLogReplayTimestampAtLsn(sendTime, walEnd);
+ lastRecordedTimestamp = sendTime;
+ }
+
if (log_min_messages <= DEBUG2)
{
char *sendtime;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f98475c..16d7abc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1545,15 +1545,29 @@ ProcessStandbyReplyMessage(void)
XLogRecPtr writePtr,
flushPtr,
applyPtr;
+ int64 applyLagUs;
bool replyRequested;
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz applyTimestamp;
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
replyRequested = pq_getmsgbyte(&reply_message);
+ /* Compute the apply lag in milliseconds. */
+ if (applyTimestamp == 0)
+ applyLagUs = -1;
+ else
+#ifdef HAVE_INT64_TIMESTAMP
+ applyLagUs = now - applyTimestamp;
+#else
+ applyLagUs = (now - applyTimestamp) * 1000000;
+#endif
+
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
(uint32) (writePtr >> 32), (uint32) writePtr,
(uint32) (flushPtr >> 32), (uint32) flushPtr,
@@ -1575,6 +1589,8 @@ ProcessStandbyReplyMessage(void)
walsnd->write = writePtr;
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
+ if (applyLagUs >= 0)
+ walsnd->applyLagUs = applyLagUs;
SpinLockRelease(&walsnd->mutex);
}
@@ -2745,7 +2761,7 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 8
+#define PG_STAT_GET_WAL_SENDERS_COLS 9
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2793,6 +2809,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int64 applyLagUs;
int priority;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2807,6 +2824,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
+ applyLagUs = walsnd->applyLagUs;
priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex);
@@ -2841,6 +2859,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[5] = true;
values[5] = LSNGetDatum(apply);
+ if (applyLagUs < 0)
+ nulls[6] = true;
+ else
+ {
+ Interval *applyLagInterval = palloc(sizeof(Interval));
+
+ applyLagInterval->month = 0;
+ applyLagInterval->day = 0;
+#ifdef HAVE_INT64_TIMESTAMP
+ applyLagInterval->time = applyLagUs;
+#else
+ applyLagInterval->time = applyLagUs / 1000000.0;
+#endif
+ nulls[6] = false;
+ values[6] = IntervalPGetDatum(applyLagInterval);
+ }
+
/*
* Treat a standby such as a pg_basebackup background process
* which always returns an invalid flush location, as an
@@ -2848,18 +2883,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
- values[6] = Int32GetDatum(priority);
+ values[7] = Int32GetDatum(priority);
/*
* More easily understood version of standby state. This is purely
* informational, not different from priority.
*/
if (priority == 0)
- values[7] = CStringGetTextDatum("async");
+ values[8] = CStringGetTextDatum("async");
else if (walsnd == sync_standby)
- values[7] = CStringGetTextDatum("sync");
+ values[8] = CStringGetTextDatum("sync");
else
- values[7] = CStringGetTextDatum("potential");
+ values[8] = CStringGetTextDatum("potential");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index c9e5270..f382b20 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1629,6 +1629,20 @@ IntegerTimestampToTimestampTz(int64 timestamp)
#endif
/*
+ * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+TimestampTzToIntegerTimestamp(TimestampTz timestamp)
+{
+ return timestamp * 1000000;
+}
+#endif
+
+/*
* TimestampDifference -- convert the difference between two timestamps
* into integer seconds and microseconds
*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index a7dcdae..c8be3ce 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -235,6 +235,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
+extern void SetXLogReplayTimestamp(TimestampTz timestamp);
+extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn);
+extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn);
extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index a595327..4054726 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2712,7 +2712,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f
DESCR("statistics: information about currently active backends");
DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25}" "{o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
DESCR("statistics: information about WAL receiver");
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 7794aa5..4de43e8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -46,6 +46,7 @@ typedef struct WalSnd
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int64 applyLagUs;
/* Protects shared variables shown above. */
slock_t mutex;
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index fbead3a..297e151 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -227,9 +227,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
#ifndef HAVE_INT64_TIMESTAMP
extern int64 GetCurrentIntegerTimestamp(void);
extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp);
#else
#define GetCurrentIntegerTimestamp() GetCurrentTimestamp()
#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#define TimestampTzToIntegerTimestamp(timestamp) (timestamp)
#endif
extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 79f9b23..fc4b765 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1783,11 +1783,12 @@ pg_stat_replication| SELECT s.pid,
w.write_location,
w.flush_location,
w.replay_location,
+ w.replay_lag,
w.sync_priority,
w.sync_state
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn),
pg_authid u,
- pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state)
+ pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, replay_lag, sync_priority, sync_state)
WHERE ((s.usesysid = u.oid) AND (s.pid = w.pid));
pg_stat_ssl| SELECT s.pid,
s.ssl,