diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 105d541..7d63782 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1208,6 +1208,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i standby server + replay_lag + interval + Estimated time taken for recent WAL records to be replayed on this + standby server + + sync_priority integer Priority of this standby server for being chosen as the diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 130b56b..48a5950 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5473,6 +5473,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, XLogFlush(lsn); /* + * Record the primary's timestamp for the commit record, so it can be used + * for tracking replay lag. + */ + SetXLogReplayTimestamp(parsed->xact_time); + + /* * If asked by the primary (because someone is waiting for a synchronous * commit = remote_apply), we will need to ask walreceiver to send a * reply immediately. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3e454f5..504b4df 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -81,6 +81,8 @@ extern uint32 bootstrap_data_checksum_version; #define PROMOTE_SIGNAL_FILE "promote" #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote" +/* Size of the circular buffer of timestamped LSNs. */ +#define MAX_TIMESTAMPED_LSNS 8192 /* User-settable parameters */ int max_wal_size = 64; /* 1 GB */ @@ -360,6 +362,13 @@ static bool doRequestWalReceiverReply; */ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; +/* + * LastReplayedTimestamp can be set by redo handlers when they apply a record + * that carries a timestamp, by calling SetXLogReplayedTimestamp. The xlog + * apply loop can then update the value in shared memory. + */ +static TimestampTz LastReplayedTimestamp = 0; + /*---------- * Shared-memory data structures for XLOG control * @@ -634,6 +643,21 @@ typedef struct XLogCtlData /* current effective recovery target timeline */ TimeLineID RecoveryTargetTLI; + /* timestamp from the most recently applied record associated with a timestamp. */ + TimestampTz lastReplayedTimestamp; + + /* + * We maintain a circular buffer of LSNs and associated timestamps. + * Walreceiver writes into it using information from timestamps, and the + * startup recovery process reads from it and notifies walreceiver when + * LSNs are replayed so that the timestamps can eventually be fed back to + * the upstream server, to track lag. + */ + Index timestampedLsnRead; + Index timestampedLsnWrite; + XLogRecPtr timestampedLsn[MAX_TIMESTAMPED_LSNS]; + TimestampTz timestampedLsnTime[MAX_TIMESTAMPED_LSNS]; + /* * timestamp of when we started replaying the current chunk of WAL data, * only relevant for replication or archive recovery @@ -6874,20 +6898,51 @@ StartupXLOG(void) error_context_stack = errcallback.previous; /* - * Update lastReplayedEndRecPtr after this record has been - * successfully replayed. + * Update lastReplayedEndRecPtr and lastReplayedTimestamp + * after this record has been successfully replayed. */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->lastReplayedEndRecPtr = EndRecPtr; XLogCtl->lastReplayedTLI = ThisTimeLineID; + if (LastReplayedTimestamp != 0) + { + /* If replaying a record produced a timestamp, use that. */ + XLogCtl->lastReplayedTimestamp = LastReplayedTimestamp; + LastReplayedTimestamp = 0; + } + else + { + /* + * If we have applied LSNs associated with timestamps + * received by walreceiver, then use the recorded + * timestamp. We consume from the read end of the + * circular buffer. + */ + while (XLogCtl->timestampedLsnRead != + XLogCtl->timestampedLsnWrite && + XLogCtl->timestampedLsn[XLogCtl->timestampedLsnRead] + <= EndRecPtr) + { + if (XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead] > + XLogCtl->lastReplayedTimestamp) + { + XLogCtl->lastReplayedTimestamp = + XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead]; + doRequestWalReceiverReply = true; + } + XLogCtl->timestampedLsnRead = + (XLogCtl->timestampedLsnRead + 1) % MAX_TIMESTAMPED_LSNS; + } + } SpinLockRelease(&XLogCtl->info_lck); /* * If rm_redo reported that it applied a commit record that * the master is waiting for by calling - * XLogRequestWalReceiverReply, then we wake up the receiver - * so that it notices the updated lastReplayedEndRecPtr and - * sends a reply to the master. + * XLogRequestWalReceiverReply, or we encountered a WAL + * location that was associated with a timestamp above, then + * we wake up the receiver so that it notices the updated + * lastReplayedEndRecPtr and sends a reply to the master. */ if (doRequestWalReceiverReply) { @@ -11619,3 +11674,91 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Record the timestamp that is associated with a WAL position. + * + * This is called by walreceiver on standby servers when keepalive messages + * arrive, using timestamps generated on the primary server. The timestamp + * will be sent back to the primary server when the standby had applied this + * WAL position. The primary can use the elapsed time to estimate the replay + * lag. + */ +void +SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn) +{ + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn == XLogCtl->lastReplayedEndRecPtr) + { + /* + * That is the last replayed LSN: we are fully replayed, so we can + * update the replay timestamp immediately. + */ + XLogCtl->lastReplayedTimestamp = timestamp; + } + else + { + /* + * There is WAL still to be applied. We will associate the timestamp + * with this WAL position and wait for it to be replayed. We add it + * at the 'write' end of the circular buffer of LSN/timestamp + * mappings, which the replay loop will eventually read. + */ + Index w = XLogCtl->timestampedLsnWrite; + Index r = XLogCtl->timestampedLsnRead; + + XLogCtl->timestampedLsn[w] = lsn; + XLogCtl->timestampedLsnTime[w] = timestamp; + + /* Advance the write point. */ + w = (w + 1) % MAX_TIMESTAMPED_LSNS; + XLogCtl->timestampedLsnWrite = w; + if (w == r) + { + /* + * The buffer is full. Advance the read point (throwing away + * oldest values; we will begin to overestimate replay lag, until + * lag decreases to a size our buffer can manage, or the next + * commit record is replayed). + */ + r = (r + 1) % MAX_TIMESTAMPED_LSNS; + XLogCtl->timestampedLsnRead = r; + } + } + SpinLockRelease(&XLogCtl->info_lck); +} + +/* + * Set the timestamp for the most recently applied WAL record that carried a + * timestamp from the primary. This can be called by redo handlers that have + * an appropriate timestamp (currently only commit records). Updating the + * shared memory value is deferred until after the redo handler returns. + */ +void +SetXLogReplayTimestamp(TimestampTz timestamp) +{ + LastReplayedTimestamp = timestamp; +} + +/* + * Get the timestamp for the most recently applied WAL record that carried a + * timestamp from the primary, and also the most recently applied LSN. (Note + * that the timestamp and the LSN don't necessarily relate to the same + * record.) + * + * This is similar to GetLatestXTime, except that it is not only advanced by + * commit records (see SetXLogReplayTimestampAtLsn). + */ +TimestampTz +GetXLogReplayTimestamp(XLogRecPtr *lsn) +{ + TimestampTz result; + + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn) + *lsn = XLogCtl->lastReplayedEndRecPtr; + result = XLogCtl->lastReplayedTimestamp; + SpinLockRelease(&XLogCtl->info_lck); + + return result; +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 9ae1ef4..a53f07b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -662,6 +662,7 @@ CREATE VIEW pg_stat_replication AS W.write_location, W.flush_location, W.replay_location, + W.replay_lag, W.sync_priority, W.sync_state FROM pg_stat_get_activity(NULL) AS S, pg_authid U, diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2fa996d..faea9ff 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -85,6 +85,8 @@ walrcv_disconnect_type walrcv_disconnect = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ +#define MIN_TIME_BETWEEN_TIMESTAMPED_LSNS 1000 /* 1s */ + /* * These variables are used similarly to openLogFile/SegNo/Off, * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID @@ -102,6 +104,8 @@ static uint32 recvOff = 0; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; +static bool recovery_active = false; + /* * LogstreamResult indicates the byte positions that we have already * written/fsynced. @@ -143,7 +147,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvSendReply(bool force, bool requestReply); +static void XLogWalRcvSendReply(bool force, bool requestReply, bool includeApplyTimestamp); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -443,13 +447,15 @@ WalReceiverMain(void) { /* The recovery process has new apply feedback to report. */ apply_feedback_requested = true; + recovery_active = true; ResetLatch(&walrcv->latch); } len = walrcv_receive(0, &buf, &walrcv->latch); } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(apply_feedback_requested, false); + XLogWalRcvSendReply(apply_feedback_requested, false, + apply_feedback_requested); apply_feedback_requested = false; /* @@ -505,7 +511,7 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendReply(requestReply, requestReply, false); XLogWalRcvSendHSFeedback(false); } } @@ -836,6 +842,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) } case 'k': /* Keepalive */ { + bool reportApplyTimestamp = false; + /* copy message to StringInfo */ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); if (len != hdrlen) @@ -852,9 +860,22 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) ProcessWalSndrMessage(walEnd, sendTime); + /* + * If no apply timestamps have been sent at the request of the + * recovery process since we last received a keepalive, then + * we will send one now. This allows us to feed back + * timestamps in response to pings if we are idle or if the + * recovery process is somehow blocked, but we don't want to + * do that if it's actively applying and periodically waking + * us up with accurate apply timestamps. + */ + if (!recovery_active) + reportApplyTimestamp = true; + recovery_active = false; + /* If the primary requested a reply, send one immediately */ - if (replyRequested) - XLogWalRcvSendReply(true, false); + if (replyRequested || reportApplyTimestamp) + XLogWalRcvSendReply(true, false, reportApplyTimestamp); break; } default: @@ -1017,7 +1038,7 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, false); XLogWalRcvSendHSFeedback(false); } } @@ -1035,15 +1056,18 @@ XLogWalRcvFlush(bool dying) * If 'requestReply' is true, requests the server to reply immediately upon * receiving this message. This is used for heartbearts, when approaching * wal_receiver_timeout. + * + * If 'reportApplyTimestamp' is true, the latest apply timestamp is included. */ static void -XLogWalRcvSendReply(bool force, bool requestReply) +XLogWalRcvSendReply(bool force, bool requestReply, bool reportApplyTimestamp) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; XLogRecPtr applyPtr; static TimestampTz sendTime = 0; TimestampTz now; + TimestampTz applyTimestamp = 0; /* * If the user doesn't want status to be reported to the master, be sure @@ -1059,10 +1083,8 @@ XLogWalRcvSendReply(bool force, bool requestReply) * We can compare the write and flush positions to the last message we * sent without taking any lock, but the apply position requires a spin * lock, so we don't check that unless something else has changed or 10 - * seconds have passed. This means that the apply log position will - * appear, from the master's point of view, to lag slightly, but since - * this is only for reporting purposes and only on idle systems, that's - * probably OK. + * seconds have passed, or the force flag has been set (which happens when + * apply feedback has been requested by the primary). */ if (!force && writePtr == LogstreamResult.Write @@ -1075,7 +1097,10 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + if (reportApplyTimestamp) + applyTimestamp = GetXLogReplayTimestamp(&applyPtr); + else + applyPtr = GetXLogReplayRecPtr(NULL); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); @@ -1083,6 +1108,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp)); pq_sendbyte(&reply_message, requestReply ? 1 : 0); /* Send it */ @@ -1187,8 +1213,8 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) { WalRcvData *walrcv = WalRcv; - TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); + static TimestampTz lastRecordedTimestamp = 0; /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); @@ -1199,6 +1225,18 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) walrcv->lastMsgReceiptTime = lastMsgReceiptTime; SpinLockRelease(&walrcv->mutex); + /* + * Remember primary's timestamp at this WAL location. We throw away + * samples if they are coming too fast because we don't want to fill up + * the finite circular buffer and have to throw away older samples. + */ + if (lastRecordedTimestamp < TimestampTzPlusMilliseconds(sendTime, + -MIN_TIME_BETWEEN_TIMESTAMPED_LSNS)) + { + SetXLogReplayTimestampAtLsn(sendTime, walEnd); + lastRecordedTimestamp = sendTime; + } + if (log_min_messages <= DEBUG2) { char *sendtime; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f98475c..16d7abc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1545,15 +1545,29 @@ ProcessStandbyReplyMessage(void) XLogRecPtr writePtr, flushPtr, applyPtr; + int64 applyLagUs; bool replyRequested; + TimestampTz now = GetCurrentTimestamp(); + TimestampTz applyTimestamp; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message)); replyRequested = pq_getmsgbyte(&reply_message); + /* Compute the apply lag in milliseconds. */ + if (applyTimestamp == 0) + applyLagUs = -1; + else +#ifdef HAVE_INT64_TIMESTAMP + applyLagUs = now - applyTimestamp; +#else + applyLagUs = (now - applyTimestamp) * 1000000; +#endif + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, @@ -1575,6 +1589,8 @@ ProcessStandbyReplyMessage(void) walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; + if (applyLagUs >= 0) + walsnd->applyLagUs = applyLagUs; SpinLockRelease(&walsnd->mutex); } @@ -2745,7 +2761,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 8 +#define PG_STAT_GET_WAL_SENDERS_COLS 9 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2793,6 +2809,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int64 applyLagUs; int priority; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2807,6 +2824,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + applyLagUs = walsnd->applyLagUs; priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); @@ -2841,6 +2859,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[5] = true; values[5] = LSNGetDatum(apply); + if (applyLagUs < 0) + nulls[6] = true; + else + { + Interval *applyLagInterval = palloc(sizeof(Interval)); + + applyLagInterval->month = 0; + applyLagInterval->day = 0; +#ifdef HAVE_INT64_TIMESTAMP + applyLagInterval->time = applyLagUs; +#else + applyLagInterval->time = applyLagUs / 1000000.0; +#endif + nulls[6] = false; + values[6] = IntervalPGetDatum(applyLagInterval); + } + /* * Treat a standby such as a pg_basebackup background process * which always returns an invalid flush location, as an @@ -2848,18 +2883,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; - values[6] = Int32GetDatum(priority); + values[7] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely * informational, not different from priority. */ if (priority == 0) - values[7] = CStringGetTextDatum("async"); + values[8] = CStringGetTextDatum("async"); else if (walsnd == sync_standby) - values[7] = CStringGetTextDatum("sync"); + values[8] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + values[8] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index c9e5270..f382b20 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1629,6 +1629,20 @@ IntegerTimestampToTimestampTz(int64 timestamp) #endif /* + * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format + * + * When compiled with --enable-integer-datetimes, this is implemented as a + * no-op macro. + */ +#ifndef HAVE_INT64_TIMESTAMP +int64 +TimestampTzToIntegerTimestamp(TimestampTz timestamp) +{ + return timestamp * 1000000; +} +#endif + +/* * TimestampDifference -- convert the difference between two timestamps * into integer seconds and microseconds * diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index a7dcdae..c8be3ce 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -235,6 +235,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); +extern void SetXLogReplayTimestamp(TimestampTz timestamp); +extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn); +extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn); extern bool RecoveryIsPaused(void); extern void SetRecoveryPause(bool recoveryPause); extern TimestampTz GetLatestXTime(void); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index a595327..4054726 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2712,7 +2712,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f DESCR("statistics: information about currently active backends"); DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ )); DESCR("statistics: information about progress of backends running maintenance command"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25}" "{o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7794aa5..4de43e8 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -46,6 +46,7 @@ typedef struct WalSnd XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int64 applyLagUs; /* Protects shared variables shown above. */ slock_t mutex; diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index fbead3a..297e151 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -227,9 +227,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time, #ifndef HAVE_INT64_TIMESTAMP extern int64 GetCurrentIntegerTimestamp(void); extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp); +extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp); #else #define GetCurrentIntegerTimestamp() GetCurrentTimestamp() #define IntegerTimestampToTimestampTz(timestamp) (timestamp) +#define TimestampTzToIntegerTimestamp(timestamp) (timestamp) #endif extern TimestampTz time_t_to_timestamptz(pg_time_t tm); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 79f9b23..fc4b765 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1783,11 +1783,12 @@ pg_stat_replication| SELECT s.pid, w.write_location, w.flush_location, w.replay_location, + w.replay_lag, w.sync_priority, w.sync_state FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn), pg_authid u, - pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) + pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, replay_lag, sync_priority, sync_state) WHERE ((s.usesysid = u.oid) AND (s.pid = w.pid)); pg_stat_ssl| SELECT s.pid, s.ssl,