From 1ae59996301aefd91efd10fcbee50b2c3d7c140e Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 9 Dec 2019 17:22:07 +1300 Subject: [PATCH v6 3/8] Add GetWalRcvWriteRecPtr() (new definition). A later patch will read received WAL to prefetch referenced blocks, without waiting for the data to be flushed to disk. To do that, it needs to be able to see the write pointer advancing in shared memory. The function formerly bearing name was recently renamed to GetWalRcvFlushRecPtr(), which better described what it does. Reviewed-by: Alvaro Herrera Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/replication/walreceiver.c | 5 +++++ src/backend/replication/walreceiverfuncs.c | 12 ++++++++++++ src/include/replication/walreceiver.h | 10 ++++++++++ 3 files changed, 27 insertions(+) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 1363c3facc..d69fb90132 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -261,6 +261,8 @@ WalReceiverMain(void) SpinLockRelease(&walrcv->mutex); + pg_atomic_init_u64(&WalRcv->writtenUpto, 0); + /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); @@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) LogstreamResult.Write = recptr; } + + /* Update shared-memory status */ + pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); } /* diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 32260c2236..4afad83539 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -328,6 +328,18 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) return recptr; } +/* + * Returns the last+1 byte position that walreceiver has written. + * This returns a recently written value without taking a lock. + */ +XLogRecPtr +GetWalRcvWriteRecPtr(void) +{ + WalRcvData *walrcv = WalRcv; + + return pg_atomic_read_u64(&walrcv->writtenUpto); +} + /* * Returns the replication apply delay in ms or -1 * if the apply delay info is not available diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 6298ca07be..f1aa6e9977 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -16,6 +16,7 @@ #include "access/xlogdefs.h" #include "getaddrinfo.h" /* for NI_MAXHOST */ #include "pgtime.h" +#include "port/atomics.h" #include "replication/logicalproto.h" #include "replication/walsender.h" #include "storage/latch.h" @@ -141,6 +142,14 @@ typedef struct slock_t mutex; /* locks shared variables shown above */ + /* + * Like flushedUpto, but advanced after writing and before flushing, + * without the need to acquire the spin lock. Data can be read by another + * process up to this point, but shouldn't be used for data integrity + * purposes. + */ + pg_atomic_uint64 writtenUpto; + /* * force walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. But we do need atomic fetch and @@ -323,6 +332,7 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot); extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvWriteRecPtr(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); -- 2.20.1