diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 8d7b3bf..b894e31 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3310,6 +3310,26 @@ ANY num_sync ( + replication_lag_sample_interval (integer) + + replication_lag_sample_interval configuration parameter + + + + + Controls how often a standby should sample timestamps from upstream to + send back to the primary or upstream standby after writing, flushing + and replaying WAL. The default is 1 second. Units are milliseconds if + not specified. A value of -1 disables the reporting of replication + lag. Estimated lag can be seen in the + pg_stat_replication view of the upstream server. + This parameter can only be set + in the postgresql.conf file or on the server command line. + + + + hot_standby_feedback (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 1545f03..a422ac0 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1405,6 +1405,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i standby server + write_lag + interval + Estimated time taken for recent WAL records to be written on this + standby server + + + flush_lag + interval + Estimated time taken for recent WAL records to be flushed on this + standby server + + + replay_lag + interval + Estimated time taken for recent WAL records to be replayed on this + standby server + + sync_priority integer Priority of this standby server for being chosen as the diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f8ffa5c..7e7312f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -82,6 +82,8 @@ extern uint32 bootstrap_data_checksum_version; #define PROMOTE_SIGNAL_FILE "promote" #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote" +/* Size of the circular buffer of timestamped LSNs. */ +#define XLOG_TIMESTAMP_BUFFER_SIZE 8192 /* User-settable parameters */ int max_wal_size = 64; /* 1 GB */ @@ -530,6 +532,26 @@ typedef struct XLogCtlInsert } XLogCtlInsert; /* + * A sample associating a timestamp with a given xlog position. + */ +typedef struct XLogTimestamp +{ + TimestampTz timestamp; + XLogRecPtr lsn; +} XLogTimestamp; + +/* + * A circular buffer of LSNs and associated timestamps. The buffer is empty + * when read_head == write_head. + */ +typedef struct XLogTimestampBuffer +{ + uint32 read_head; + uint32 write_head; + XLogTimestamp buffer[XLOG_TIMESTAMP_BUFFER_SIZE]; +} XLogTimestampBuffer; + +/* * Total shared-memory state for XLOG. */ typedef struct XLogCtlData @@ -648,6 +670,14 @@ typedef struct XLogCtlData /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* timestamp from the most recently applied record associated with a timestamp. */ + TimestampTz lastReplayedTimestamp; + + /* buffers of timestamps for WAL that is not yet written/flushed/applied. */ + XLogTimestampBuffer writeTimestamps; + XLogTimestampBuffer flushTimestamps; + XLogTimestampBuffer applyTimestamps; + /* * timestamp of when we started replaying the current chunk of WAL data, * only relevant for replication or archive recovery @@ -6006,6 +6036,96 @@ CheckRequiredParameterValues(void) } /* + * Read and consume all records from 'buffer' whose position is <= 'lsn'. + * Return true if any such records are found, and write the latest timestamp + * found into *timestamp. Write the new read head position into *read_head, + * so that the caller can store it with appropriate locking. + */ +static bool +ReadXLogTimestampForLsn(XLogTimestampBuffer *buffer, + XLogRecPtr lsn, + uint32 *read_head, + TimestampTz *timestamp) +{ + bool found = false; + + /* + * It's OK to access buffer->read_head without any kind synchronization + * because in all cases the caller is the only process reading from the + * buffer (ie writing to *buffer->read_head). + */ + *read_head = buffer->read_head; + + /* + * It's OK to access write_head without interlocking because it's an + * aligned 32 bit value which we can read atomically on all supported + * platforms to get some recent value, not a torn/garbage value. + * Furthermore we must see a value that is at least as recent as any WAL + * that we have written/flushed/replayed, because walreceiver calls + * SetXLogTimestampAtLsn before writing. + */ + while (*read_head != buffer->write_head && + buffer->buffer[*read_head].lsn <= lsn) + { + found = true; + *timestamp = buffer->buffer[*read_head].timestamp; + *read_head = (*read_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE; + } + + return found; +} + +/* + * Called by the WAL receiver process after it has written up to 'lsn'. + * Return true if it has written any LSN location that had an associated + * timestamp, and write the timestamp to '*timestamp'. + */ +bool +CheckForWrittenTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp) +{ + Assert(AmWalReceiverProcess()); + + return ReadXLogTimestampForLsn(&XLogCtl->writeTimestamps, lsn, + &XLogCtl->writeTimestamps.read_head, + timestamp); +} + +/* + * Called by the WAL receiver process after it has flushed up to 'lsn'. + * Return true if it has flushed any LSN location that had an associated + * timestamp, and write the timestamp to '*timestamp'. + */ +bool +CheckForFlushedTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp) +{ + Assert(AmWalReceiverProcess()); + + return ReadXLogTimestampForLsn(&XLogCtl->flushTimestamps, lsn, + &XLogCtl->flushTimestamps.read_head, + timestamp); +} + +/* + * Called by the startup process after it has replayed up to 'lsn'. Checks + * for timestamps associated with WAL positions that have now been replayed. + * If any are found, the latest such timestamp found is written to + * '*timestamp'. Returns the new buffer read head position, which the caller + * should write into XLogCtl->timestamps.read_head while holding info_lck. + */ +static uint32 +CheckForAppliedTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp) +{ + uint32 read_head; + + Assert(AmStartupProcess()); + + ReadXLogTimestampForLsn(&XLogCtl->applyTimestamps, lsn, &read_head, + timestamp); + + return read_head; +} + +/* * This must be called ONCE during postmaster or standalone-backend startup */ void @@ -6824,6 +6944,8 @@ StartupXLOG(void) do { bool switchedTLI = false; + TimestampTz replayed_timestamp = 0; + uint32 timestamp_read_head; #ifdef WAL_DEBUG if (XLOG_DEBUG || @@ -6977,24 +7099,35 @@ StartupXLOG(void) /* Pop the error context stack */ error_context_stack = errcallback.previous; + /* Check if we have replayed a timestamped WAL position */ + timestamp_read_head = + CheckForAppliedTimestampedLsn(EndRecPtr, + &replayed_timestamp); + /* - * Update lastReplayedEndRecPtr after this record has been - * successfully replayed. + * Update lastReplayedEndRecPtr and lastReplayedTimestamp + * after this record has been successfully replayed. */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->lastReplayedEndRecPtr = EndRecPtr; XLogCtl->lastReplayedTLI = ThisTimeLineID; + XLogCtl->applyTimestamps.read_head = timestamp_read_head; + if (replayed_timestamp != 0) + XLogCtl->lastReplayedTimestamp = replayed_timestamp; SpinLockRelease(&XLogCtl->info_lck); /* * If rm_redo called XLogRequestWalReceiverReply, then we wake * up the receiver so that it notices the updated - * lastReplayedEndRecPtr and sends a reply to the master. + * lastReplayedEndRecPtr and sends a reply to the master. We + * also wake it if we have replayed a WAL position that has + * an associated timestamp so that the upstream server can + * measure our replay lag. */ - if (doRequestWalReceiverReply) + if (doRequestWalReceiverReply || replayed_timestamp != 0) { doRequestWalReceiverReply = false; - WalRcvForceReply(); + WalRcvForceReply(replayed_timestamp != 0); } /* Remember this record as the last-applied one */ @@ -11809,3 +11942,106 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Store an (lsn, timestamp) sample in a timestamp buffer. + */ +static void +StoreXLogTimestampAtLsn(XLogTimestampBuffer *buffer, + TimestampTz timestamp, XLogRecPtr lsn) +{ + + uint32 write_head = buffer->write_head; + uint32 new_write_head = (write_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE; + + Assert(AmWalReceiverProcess()); + + if (new_write_head == buffer->read_head) + { + /* + * The buffer is full, so we'll rewind and overwrite the most + * recent sample. Overwriting the most recent sample means that + * if we're not writing/flushing/replaying fast enough and the buffer + * fills up, we'll effectively lower the sampling rate. + */ + new_write_head = write_head; + write_head = (write_head - 1) % XLOG_TIMESTAMP_BUFFER_SIZE; + } + + buffer->buffer[write_head].lsn = lsn; + buffer->buffer[write_head].timestamp = timestamp; + buffer->write_head = new_write_head; +} + +/* + * Record the timestamp that is associated with a WAL position. + * + * This is called by walreceiver on standby servers when new messages arrive, + * using a timestamp and the latest known WAL position from the upstream + * server. The timestamp will be sent back to the upstream server via + * walreceiver when the WAL position is eventually written, flushed and + * applied. + */ +void +SetXLogTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn) +{ + bool applied_end = false; + static TimestampTz last_timestamp; + static XLogRecPtr last_lsn; + + Assert(AmWalReceiverProcess()); + Assert(replication_lag_sample_interval >= 0); + + SpinLockAcquire(&XLogCtl->info_lck); + + /* + * Check if we're fully applied, so we can avoid recording samples in that + * case. There is effectively no replay lag, and we don't want to report + * bogus lag after a period of idleness. + */ + if (lsn == XLogCtl->lastReplayedEndRecPtr) + applied_end = true; + + /* + * Record this timestamp/LSN pair, if the LSN has moved since last time + * and we haven't recorded a sample too recently. + */ + if (!applied_end && + lsn > last_lsn && + timestamp > TimestampTzPlusMilliseconds(last_timestamp, + replication_lag_sample_interval)) + { + StoreXLogTimestampAtLsn(&XLogCtl->applyTimestamps, timestamp, lsn); + StoreXLogTimestampAtLsn(&XLogCtl->writeTimestamps, timestamp, lsn); + StoreXLogTimestampAtLsn(&XLogCtl->flushTimestamps, timestamp, lsn); + + last_timestamp = timestamp; + last_lsn = lsn; + } + + SpinLockRelease(&XLogCtl->info_lck); +} + +/* + * Get the timestamp for the most recently applied WAL record that carried a + * timestamp from the upstream server, and also the most recently applied LSN. + * (Note that the timestamp and the LSN don't necessarily relate to the same + * record.) + * + * This is similar to GetLatestXTime, except that it is advanced when WAL + * positions recorded with SetXLogReplayTimestampAtLsn have been applied, + * rather than commit records. + */ +TimestampTz +GetXLogReplayTimestamp(XLogRecPtr *lsn) +{ + TimestampTz result; + + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn) + *lsn = XLogCtl->lastReplayedEndRecPtr; + result = XLogCtl->lastReplayedTimestamp; + SpinLockRelease(&XLogCtl->info_lck); + + return result; +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 649cef8..2fd63e3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -685,6 +685,9 @@ CREATE VIEW pg_stat_replication AS W.write_location, W.flush_location, W.replay_location, + W.write_lag, + W.flush_lag, + W.replay_lag, W.sync_priority, W.sync_state FROM pg_stat_get_activity(NULL) AS S diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index cc3cf7d..621aa24 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -73,6 +73,7 @@ int wal_receiver_status_interval; int wal_receiver_timeout; bool hot_standby_feedback; +int replication_lag_sample_interval; /* libpqwalreceiver connection */ static WalReceiverConn *wrconn = NULL; @@ -107,6 +108,10 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +/* Latest timestamps for replication lag tracking. */ +static TimestampTz last_write_timestamp; +static TimestampTz last_flush_timestamp; + static StringInfoData reply_message; static StringInfoData incoming_message; @@ -138,7 +143,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvSendReply(bool force, bool requestReply); +static void XLogWalRcvSendReply(bool force, bool requestReply, int timestamps); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -148,6 +153,16 @@ static void WalRcvSigUsr1Handler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); +/* + * Which timestamps to include in a reply message. + */ +typedef enum XLogReplyTimestamp +{ + REPLY_WRITE_TIMESTAMP = 1, + REPLY_FLUSH_TIMESTAMP = 2, + REPLY_APPLY_TIMESTAMP = 4 +} XLogReplyTimestamp; + static void ProcessWalRcvInterrupts(void) @@ -424,6 +439,8 @@ WalReceiverMain(void) len = walrcv_receive(wrconn, &buf, &wait_fd); if (len != 0) { + int timestamp = 0; + /* * Process the received data, and any subsequent data we * can read without blocking. @@ -455,8 +472,17 @@ WalReceiverMain(void) len = walrcv_receive(wrconn, &buf, &wait_fd); } + /* + * Check if we have written an LSN location for which we + * have a timestamp from the upstream server, for + * replication lag tracking. + */ + if (CheckForWrittenTimestampedLsn(LogstreamResult.Write, + &last_write_timestamp)) + timestamp = REPLY_WRITE_TIMESTAMP; + /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, timestamp); /* * If we've written some records, flush them to disk and @@ -493,15 +519,20 @@ WalReceiverMain(void) ResetLatch(walrcv->latch); if (walrcv->force_reply) { + int timestamps = 0; + /* * The recovery process has asked us to send apply * feedback now. Make sure the flag is really set to * false in shared memory before sending the reply, so * we don't miss a new request for a reply. */ + if (walrcv->force_reply_apply_timestamp) + timestamps = REPLY_APPLY_TIMESTAMP; walrcv->force_reply = false; + walrcv->force_reply_apply_timestamp = false; pg_memory_barrier(); - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, timestamps); } } if (rc & WL_POSTMASTER_DEATH) @@ -559,7 +590,7 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendReply(requestReply, requestReply, 0); XLogWalRcvSendHSFeedback(false); } } @@ -911,7 +942,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) /* If the primary requested a reply, send one immediately */ if (replyRequested) - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, 0); break; } default: @@ -1074,7 +1105,18 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); + /* + * Check if we have just flushed a position for which we have a + * timestamp from the upstream server, for replication lag + * tracking. + */ + int timestamp = 0; + + if (CheckForFlushedTimestampedLsn(LogstreamResult.Flush, + &last_flush_timestamp)) + timestamp = REPLY_FLUSH_TIMESTAMP; + + XLogWalRcvSendReply(false, false, timestamp); XLogWalRcvSendHSFeedback(false); } } @@ -1092,21 +1134,27 @@ XLogWalRcvFlush(bool dying) * If 'requestReply' is true, requests the server to reply immediately upon * receiving this message. This is used for heartbearts, when approaching * wal_receiver_timeout. + * + * The bitmap 'timestamps' specifies which timestamps should be included, for + * replication lag tracking purposes. */ static void -XLogWalRcvSendReply(bool force, bool requestReply) +XLogWalRcvSendReply(bool force, bool requestReply, int timestamps) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; XLogRecPtr applyPtr; static TimestampTz sendTime = 0; TimestampTz now; + TimestampTz writeTimestamp = 0; + TimestampTz flushTimestamp = 0; + TimestampTz applyTimestamp = 0; /* * If the user doesn't want status to be reported to the master, be sure * to exit before doing anything at all. */ - if (!force && wal_receiver_status_interval <= 0) + if (!force && timestamps == 0 && wal_receiver_status_interval <= 0) return; /* Get current timestamp. */ @@ -1132,7 +1180,41 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyTimestamp = GetXLogReplayTimestamp(&applyPtr); + flushTimestamp = last_flush_timestamp; + writeTimestamp = last_write_timestamp; + + /* Decide whether to send timestamps for replay lag estimation. */ + if (replication_lag_sample_interval != -1) + { + static TimestampTz lastApplyTimestampSendTime = 0; + + /* + * Only send an apply timestamp if we were explicitly asked to by the + * recovery process or if replay lag sampling is active but the + * recovery process seems to be stuck. + * + * If we haven't heard from the recovery process in a time exceeding + * wal_receiver_status_interval and yet it has not applied the highest + * LSN we've heard about, then we want to resend the last replayed + * timestamp we have; otherwise we zero it out and wait for the + * recovery process to wake us when it has set a new accurate replay + * timestamp. Note that we can read latestWalEnd without acquiring the + * mutex that protects it because it is only written to by this + * process (walreceiver). + */ + if (((timestamps & REPLY_APPLY_TIMESTAMP) != 0) || + (WalRcv->latestWalEnd > applyPtr && + TimestampDifferenceExceeds(lastApplyTimestampSendTime, now, + wal_receiver_status_interval * 1000))) + lastApplyTimestampSendTime = now; + else + applyTimestamp = 0; + if ((timestamps & REPLY_FLUSH_TIMESTAMP) == 0) + flushTimestamp = 0; + if ((timestamps & REPLY_WRITE_TIMESTAMP) == 0) + writeTimestamp = 0; + } resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); @@ -1140,6 +1222,9 @@ XLogWalRcvSendReply(bool force, bool requestReply) pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(writeTimestamp)); + pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(flushTimestamp)); + pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp)); pq_sendbyte(&reply_message, requestReply ? 1 : 0); /* Send it */ @@ -1244,7 +1329,6 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) { WalRcvData *walrcv = WalRcv; - TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); /* Update shared-memory status */ @@ -1256,6 +1340,16 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) walrcv->lastMsgReceiptTime = lastMsgReceiptTime; SpinLockRelease(&walrcv->mutex); + /* + * If replication lag sampling is active, remember the upstream server's + * timestamp at the latest WAL end that it has. We'll be able to retrieve + * this timestamp once we have written, flushed and finally applied this + * LSN, so that we can report it to the upstream server for lag tracking + * purposes. + */ + if (replication_lag_sample_interval != -1) + SetXLogTimestampAtLsn(sendTime, walEnd); + if (log_min_messages <= DEBUG2) { char *sendtime; @@ -1291,12 +1385,14 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) * This is called by the startup process whenever interesting xlog records * are applied, so that walreceiver can check if it needs to send an apply * notification back to the master which may be waiting in a COMMIT with - * synchronous_commit = remote_apply. + * synchronous_commit = remote_apply. Also used to send periodic messages + * which are used to compute pg_stat_replication.replay_lag. */ void -WalRcvForceReply(void) +WalRcvForceReply(bool apply_timestamp) { WalRcv->force_reply = true; + WalRcv->force_reply_apply_timestamp = apply_timestamp; if (WalRcv->latch) SetLatch(WalRcv->latch); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5cdb8a0..3fbca0c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1545,6 +1545,25 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) } /* + * Compute the difference between 'timestamp' and 'now' in microseconds. + * Return -1 if timestamp is zero. + */ +static uint64 +compute_lag(TimestampTz now, TimestampTz timestamp) +{ + if (timestamp == 0) + return -1; + else + { +#ifdef HAVE_INT64_TIMESTAMP + return now - timestamp; +#else + return (now - timestamp) * 1000000; +#endif + } +} + +/* * Regular reply from standby advising of WAL positions on standby server. */ static void @@ -1553,15 +1572,30 @@ ProcessStandbyReplyMessage(void) XLogRecPtr writePtr, flushPtr, applyPtr; + int64 writeLagUs, + flushLagUs, + applyLagUs; + TimestampTz writeTimestamp, + flushTimestamp, + applyTimestamp; bool replyRequested; + TimestampTz now = GetCurrentTimestamp(); /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + writeTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message)); + flushTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message)); + applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message)); replyRequested = pq_getmsgbyte(&reply_message); + /* Compute the replication lag. */ + writeLagUs = compute_lag(now, writeTimestamp); + flushLagUs = compute_lag(now, flushTimestamp); + applyLagUs = compute_lag(now, applyTimestamp); + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, @@ -1583,6 +1617,12 @@ ProcessStandbyReplyMessage(void) walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; + if (writeLagUs >= 0) + walsnd->writeLagUs = writeLagUs; + if (flushLagUs >= 0) + walsnd->flushLagUs = flushLagUs; + if (applyLagUs >= 0) + walsnd->applyLagUs = applyLagUs; SpinLockRelease(&walsnd->mutex); } @@ -1979,6 +2019,9 @@ InitWalSenderSlot(void) walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; + walsnd->writeLagUs = -1; + walsnd->flushLagUs = -1; + walsnd->applyLagUs = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); @@ -2753,6 +2796,21 @@ WalSndGetStateString(WalSndState state) return "UNKNOWN"; } +static Interval * +lag_as_interval(uint64 lag_us) +{ + Interval *result = palloc(sizeof(Interval)); + + result->month = 0; + result->day = 0; +#ifdef HAVE_INT64_TIMESTAMP + result->time = lag_us; +#else + result->time = lag_us / 1000000.0; +#endif + + return result; +} /* * Returns activity of walsenders, including pids and xlog locations sent to @@ -2761,7 +2819,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 8 +#define PG_STAT_GET_WAL_SENDERS_COLS 11 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2809,6 +2867,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int64 writeLagUs; + int64 flushLagUs; + int64 applyLagUs; int priority; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2823,6 +2884,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + writeLagUs = walsnd->writeLagUs; + flushLagUs = walsnd->flushLagUs; + applyLagUs = walsnd->applyLagUs; priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); @@ -2857,6 +2921,21 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[5] = true; values[5] = LSNGetDatum(apply); + if (writeLagUs < 0) + nulls[6] = true; + else + values[6] = IntervalPGetDatum(lag_as_interval(writeLagUs)); + + if (flushLagUs < 0) + nulls[7] = true; + else + values[7] = IntervalPGetDatum(lag_as_interval(flushLagUs)); + + if (applyLagUs < 0) + nulls[8] = true; + else + values[8] = IntervalPGetDatum(lag_as_interval(applyLagUs)); + /* * Treat a standby such as a pg_basebackup background process * which always returns an invalid flush location, as an @@ -2864,7 +2943,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; - values[6] = Int32GetDatum(priority); + values[9] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely @@ -2878,12 +2957,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * states. We report just "quorum" for them. */ if (priority == 0) - values[7] = CStringGetTextDatum("async"); + values[10] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) - values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? + values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else - values[7] = CStringGetTextDatum("potential"); + values[10] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 545e9e0..90c608d 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1777,6 +1777,20 @@ GetSQLLocalTimestamp(int32 typmod) } /* + * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format + * + * When compiled with --enable-integer-datetimes, this is implemented as a + * no-op macro. + */ +#ifndef HAVE_INT64_TIMESTAMP +int64 +TimestampTzToIntegerTimestamp(TimestampTz timestamp) +{ + return timestamp * 1000000; +} +#endif + +/* * TimestampDifference -- convert the difference between two timestamps * into integer seconds and microseconds * diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 946ba9e..1adb598 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1800,6 +1800,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"replication_lag_sample_interval", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the minimum time between WAL timestamp samples used to estimate replication lag."), + NULL, + GUC_UNIT_MS + }, + &replication_lag_sample_interval, + 1 * 1000, -1, INT_MAX / 1000, + NULL, NULL, NULL + }, + + { {"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY, gettext_noop("Sets the maximum wait time to receive data from the primary."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ee8232f..f703e25 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -271,6 +271,8 @@ # in milliseconds; 0 disables #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt +#replication_lag_sample_interval = 1s # min time between timestamps recorded + # to estimate lag; -1 disables lag sampling #------------------------------------------------------------------------------ diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index cb5f989..6feb95d 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -111,7 +111,7 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested) static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; - char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 1]; int len = 0; /* @@ -142,6 +142,12 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested) len += 8; fe_sendint64(now, &replybuf[len]); /* sendTime */ len += 8; + fe_sendint64(0, &replybuf[len]); /* writeTimestamp */ + len += 8; + fe_sendint64(0, &replybuf[len]); /* flushTimestamp */ + len += 8; + fe_sendint64(0, &replybuf[len]); /* applyTimestamp */ + len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 568ff17..960e02f 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -321,7 +321,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) { - char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 1]; int len = 0; replybuf[len] = 'r'; @@ -337,6 +337,12 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) len += 8; fe_sendint64(now, &replybuf[len]); /* sendTime */ len += 8; + fe_sendint64(0, &replybuf[len]); /* writeTimestamp */ + len += 8; + fe_sendint64(0, &replybuf[len]); /* flushTimestamp */ + len += 8; + fe_sendint64(0, &replybuf[len]); /* applyTimestamp */ + len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 7d21408..ee11cf5 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -246,6 +246,12 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); +extern void SetXLogTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn); +extern bool CheckForWrittenTimestampedLsn(XLogRecPtr lsn, + TimestampTz *timestamp); +extern bool CheckForFlushedTimestampedLsn(XLogRecPtr lsn, + TimestampTz *timestamp); +extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn); extern bool RecoveryIsPaused(void); extern void SetRecoveryPause(bool recoveryPause); extern TimestampTz GetLatestXTime(void); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index a6cc2eb..80267b4 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2768,7 +2768,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f DESCR("statistics: information about currently active backends"); DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ )); DESCR("statistics: information about progress of backends running maintenance command"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 28dc1fc..41b248f 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -23,6 +23,7 @@ extern int wal_receiver_status_interval; extern int wal_receiver_timeout; extern bool hot_standby_feedback; +extern int replication_lag_sample_interval; /* * MAXCONNINFO: maximum size of a connection string. @@ -119,6 +120,9 @@ typedef struct */ bool force_reply; + /* include the latest replayed timestamp when replying? */ + bool force_reply_apply_timestamp; + /* set true once conninfo is ready to display (obfuscated pwds etc) */ bool ready_to_display; @@ -208,6 +212,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); -extern void WalRcvForceReply(void); +extern void WalRcvForceReply(bool sendApplyTimestamp); #endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7794aa5..fb3a03f 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -46,6 +46,9 @@ typedef struct WalSnd XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int64 writeLagUs; + int64 flushLagUs; + int64 applyLagUs; /* Protects shared variables shown above. */ slock_t mutex; diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 93b90fe..20517c9 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -233,9 +233,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time, #ifndef HAVE_INT64_TIMESTAMP extern int64 GetCurrentIntegerTimestamp(void); extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp); +extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp); #else #define GetCurrentIntegerTimestamp() GetCurrentTimestamp() #define IntegerTimestampToTimestampTz(timestamp) (timestamp) +#define TimestampTzToIntegerTimestamp(timestamp) (timestamp) #endif extern TimestampTz time_t_to_timestamptz(pg_time_t tm); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e9cfadb..14147c5 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1809,10 +1809,13 @@ pg_stat_replication| SELECT s.pid, w.write_location, w.flush_location, w.replay_location, + w.write_lag, + w.flush_lag, + w.replay_lag, w.sync_priority, w.sync_state FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl,