From 8fa43c9c577b19a2d4b7bc0efbe180912bac37b1 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 10 Aug 2020 17:06:21 +1200 Subject: [PATCH v13 2/6] Improve information about received WAL. In commit d140f2f3, we cleaned up the distiction between flushed and written LSN positions. Go further, and expose the written location in a way that allows for the associated timeline ID to be read consistently. Without that, it might be difficult to know the path of the file that has been written, without data races. Discussion: https://postgr.es/m/CA+hUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq=AovOddfHpA@mail.gmail.com --- src/backend/replication/walreceiver.c | 10 ++++-- src/backend/replication/walreceiverfuncs.c | 41 +++++++++++++++++----- src/include/replication/walreceiver.h | 30 +++++++++------- 3 files changed, 56 insertions(+), 25 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index babee386c4..ba42f59d6c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -870,6 +870,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int startoff; int byteswritten; + WalRcvData *walrcv = WalRcv; while (nbytes > 0) { @@ -961,7 +962,10 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) } /* Update shared-memory status */ - pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + SpinLockAcquire(&walrcv->mutex); + pg_atomic_write_u64(&walrcv->writtenUpto, LogstreamResult.Write); + walrcv->writtenTLI = ThisTimeLineID; + SpinLockRelease(&walrcv->mutex); } /* @@ -987,7 +991,7 @@ XLogWalRcvFlush(bool dying) { walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; - walrcv->receivedTLI = ThisTimeLineID; + walrcv->flushedTLI = ThisTimeLineID; } SpinLockRelease(&walrcv->mutex); @@ -1327,7 +1331,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) receive_start_tli = WalRcv->receiveStartTLI; written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto); flushed_lsn = WalRcv->flushedUpto; - received_tli = WalRcv->receivedTLI; + received_tli = WalRcv->flushedTLI; last_send_time = WalRcv->lastMsgSendTime; last_receipt_time = WalRcv->lastMsgReceiptTime; latest_end_lsn = WalRcv->latestWalEnd; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index c3e317df9f..3bd1fadbd3 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -284,10 +284,12 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, * If this is the first startup of walreceiver (on this timeline), * initialize flushedUpto and latestChunkStart to the starting point. */ - if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) + if (walrcv->receiveStart == 0 || walrcv->flushedTLI != tli) { + pg_atomic_write_u64(&walrcv->writtenUpto, recptr); + walrcv->writtenTLI = tli; walrcv->flushedUpto = recptr; - walrcv->receivedTLI = tli; + walrcv->flushedTLI = tli; walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; @@ -309,10 +311,10 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, * Optionally, returns the previous chunk start, that is the first byte * written in the most recent walreceiver flush cycle. Callers not * interested in that value may pass NULL for latestChunkStart. Same for - * receiveTLI. + * flushedTLI. */ XLogRecPtr -GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) +GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *flushedTLI) { WalRcvData *walrcv = WalRcv; XLogRecPtr recptr; @@ -321,8 +323,8 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) recptr = walrcv->flushedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; - if (receiveTLI) - *receiveTLI = walrcv->receivedTLI; + if (flushedTLI) + *flushedTLI = walrcv->flushedTLI; SpinLockRelease(&walrcv->mutex); return recptr; @@ -330,14 +332,35 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) /* * Returns the last+1 byte position that walreceiver has written. - * This returns a recently written value without taking a lock. + * + * The other arguments are similar to GetWalRcvFlushRecPtr()'s. */ XLogRecPtr -GetWalRcvWriteRecPtr(void) +GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *writtenTLI) { WalRcvData *walrcv = WalRcv; + XLogRecPtr recptr; + + SpinLockAcquire(&walrcv->mutex); + recptr = pg_atomic_read_u64(&walrcv->writtenUpto); + if (latestChunkStart) + *latestChunkStart = walrcv->latestChunkStart; + if (writtenTLI) + *writtenTLI = walrcv->writtenTLI; + SpinLockRelease(&walrcv->mutex); - return pg_atomic_read_u64(&walrcv->writtenUpto); + return recptr; +} + +/* + * For callers that don't need a consistent LSN, TLI pair, and that don't mind + * a potentially slightly out of date value in exchange for speed, this + * version provides an unlocked view of the latest written location. + */ +XLogRecPtr +GetWalRcvWriteRecPtrUnlocked(void) +{ + return pg_atomic_read_u64(&WalRcv->writtenUpto); } /* diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 1b05b39df4..84f84567cd 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -74,14 +74,25 @@ typedef struct TimeLineID receiveStartTLI; /* - * flushedUpto-1 is the last byte position that has already been received, - * and receivedTLI is the timeline it came from. At the first startup of + * flushedUpto-1 is the last byte position that has already been flushed, + * and flushedTLI is the timeline it came from. At the first startup of * walreceiver, these are set to receiveStart and receiveStartTLI. After * that, walreceiver updates these whenever it flushes the received WAL to * disk. */ XLogRecPtr flushedUpto; - TimeLineID receivedTLI; + TimeLineID flushedTLI; + + /* + * writtenUpto-1 is like as flushedUpto-1, except that it's updated + * without waiting for the flush, after the data has been written to disk + * and available for reading. It is an atomic type so that we can read it + * without locks. We still acquire the spinlock in cases where it is + * written or read along with the TLI, so that they can be accessed + * together consistently. + */ + pg_atomic_uint64 writtenUpto; + TimeLineID writtenTLI; /* * latestChunkStart is the starting byte position of the current "batch" @@ -142,14 +153,6 @@ typedef struct slock_t mutex; /* locks shared variables shown above */ - /* - * Like flushedUpto, but advanced after writing and before flushing, - * without the need to acquire the spin lock. Data can be read by another - * process up to this point, but shouldn't be used for data integrity - * purposes. - */ - pg_atomic_uint64 writtenUpto; - /* * force walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. But we do need atomic fetch and @@ -457,8 +460,9 @@ extern bool WalRcvRunning(void); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot); -extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); -extern XLogRecPtr GetWalRcvWriteRecPtr(void); +extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *flushedTLI); +extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *writtenTLI); +extern XLogRecPtr GetWalRcvWriteRecPtrUnlocked(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); -- 2.20.1