From bc9219e7dfb47e77c08ff6f8602320d8844dff69 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 4 Mar 2024 14:54:18 +0000 Subject: [PATCH v13 2/3] Make XLogCtl->LogwrtResult accessible with atomics Currently, access to LogwrtResult is protected by a spinlock. This becomes severely contended in some scenarios, such as with a largish replication flock: walsenders all calling GetFlushRecPtr repeatedly cause the processor heat up to the point where eggs can be fried on top. This can be reduced to a non-problem by replacing XLogCtl->LogwrtResult with a struct containing a pair of atomically accessed variables. Do so. In a few places, we can adjust the exact location where the locals are updated to account for the fact that we no longer need the spinlock. --- src/backend/access/transam/xlog.c | 106 ++++++++++++++---------------- src/tools/pgindent/typedefs.list | 1 + 2 files changed, 52 insertions(+), 55 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..ffbf93690e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -290,16 +290,13 @@ static bool doPageWrites; * * LogwrtRqst indicates a byte position that we need to write and/or fsync * the log up to (all records before that point must be written or fsynced). - * LogwrtResult indicates the byte positions we have already written/fsynced. - * These structs are identical but are declared separately to indicate their - * slightly different functions. + * LogWrtResult indicates the byte positions we have already written/fsynced. + * These structs are similar but are declared separately to indicate their + * slightly different functions; in addition, the latter is read and written + * using atomic operations. * - * To read XLogCtl->LogwrtResult, you must hold either info_lck or - * WALWriteLock. To update it, you need to hold both locks. The point of - * this arrangement is that the value can be examined by code that already - * holds WALWriteLock without needing to grab info_lck as well. In addition - * to the shared variable, each backend has a private copy of LogwrtResult, - * which is updated when convenient. + * In addition to the shared variable, each backend has a private copy of + * LogwrtResult, which is updated when convenient. * * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst * (protected by info_lck), but we don't need to cache any copies of it. @@ -322,6 +319,12 @@ static bool doPageWrites; *---------- */ +typedef struct XLogwrtAtomic +{ + pg_atomic_uint64 Write; /* last byte + 1 written out */ + pg_atomic_uint64 Flush; /* last byte + 1 flushed */ +} XLogwrtAtomic; + typedef struct XLogwrtRqst { XLogRecPtr Write; /* last byte + 1 to write out */ @@ -334,6 +337,13 @@ typedef struct XLogwrtResult XLogRecPtr Flush; /* last byte + 1 flushed */ } XLogwrtResult; +/* + * Update local copy of shared XLogCtl->LogwrtResult. + */ +#define XLogUpdateLocalLogWrtResult() \ + LogwrtResult.Write = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Write); \ + LogwrtResult.Flush = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Flush); + /* * Inserting to WAL is protected by a small fixed number of WAL insertion * locks. To insert to the WAL, you must hold one of the locks - it doesn't @@ -457,6 +467,8 @@ typedef struct XLogCtlData { XLogCtlInsert Insert; + XLogwrtAtomic LogwrtResult; + /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ @@ -473,12 +485,6 @@ typedef struct XLogCtlData pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; - /* - * Protected by info_lck and WALWriteLock (you must hold either lock to - * read it, but both to update) - */ - XLogwrtResult LogwrtResult; - /* * Latest initialized page in the cache (last byte position + 1). * @@ -608,7 +614,7 @@ static ControlFileData *ControlFile = NULL; static int UsableBytesInSegment; /* - * Private, possibly out-of-date copy of shared LogwrtResult. + * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult. * See discussion above. */ static XLogwrtResult LogwrtResult = {0, 0}; @@ -958,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata, /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; - /* update local result copy while I have the chance */ - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); } /* @@ -1977,12 +1982,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) if (opportunistic) break; - /* Before waiting, get info_lck and update LogwrtResult */ + /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); /* * Now that we have an up-to-date LogwrtResult value, see if we @@ -2002,7 +2007,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; + XLogUpdateLocalLogWrtResult(); if (LogwrtResult.Write >= OldPageRqstPtr) { /* OK, someone wrote it already */ @@ -2286,7 +2291,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* * Update local LogwrtResult (caller probably did this already, but...) */ - LogwrtResult = XLogCtl->LogwrtResult; + XLogUpdateLocalLogWrtResult(); /* * Since successive pages in the xlog cache are consecutively allocated, @@ -2501,6 +2506,10 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) Assert(npages == 0); + /* Publish current write result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, + LogwrtResult.Write); + /* * If asked to flush, do so */ @@ -2537,22 +2546,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; } - /* - * Update shared-memory status - * - * We make sure that the shared 'request' values do not fall behind the - * 'result' values. This is not absolutely essential, but it saves some - * code in a couple of places. - */ - { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->LogwrtResult = LogwrtResult; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&XLogCtl->info_lck); - } + /* Publish current flush result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, + LogwrtResult.Flush); } /* @@ -2569,12 +2565,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr prevAsyncXactLSN; SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; sleeping = XLogCtl->WalWriterSleeping; prevAsyncXactLSN = XLogCtl->asyncXactLSN; if (XLogCtl->asyncXactLSN < asyncXactLSN) XLogCtl->asyncXactLSN = asyncXactLSN; SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); /* * If somebody else already called this function with a more aggressive @@ -2781,8 +2777,8 @@ XLogFlush(XLogRecPtr record) SpinLockAcquire(&XLogCtl->info_lck); if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); /* done already? */ if (record <= LogwrtResult.Flush) @@ -2812,7 +2808,7 @@ XLogFlush(XLogRecPtr record) } /* Got the lock; recheck whether request is satisfied */ - LogwrtResult = XLogCtl->LogwrtResult; + XLogUpdateLocalLogWrtResult(); if (record <= LogwrtResult.Flush) { LWLockRelease(WALWriteLock); @@ -2936,9 +2932,9 @@ XLogBackgroundFlush(void) /* read LogwrtResult and update local state */ SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; WriteRqst = XLogCtl->LogwrtRqst; SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); /* back off to last completed page boundary */ WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; @@ -3024,7 +3020,7 @@ XLogBackgroundFlush(void) /* now wait for any in-progress insertions to finish and get write lock */ WaitXLogInsertionsToFinish(WriteRqst.Write); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; + XLogUpdateLocalLogWrtResult(); if (WriteRqst.Write > LogwrtResult.Write || WriteRqst.Flush > LogwrtResult.Flush) { @@ -3112,9 +3108,7 @@ XLogNeedsFlush(XLogRecPtr record) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); /* check again */ if (record <= LogwrtResult.Flush) @@ -4907,6 +4901,9 @@ XLOGShmemInit(void) XLogCtl->InstallXLogFileSegmentActive = false; XLogCtl->WalWriterSleeping = false; + pg_atomic_init_u64(&XLogCtl->LogwrtResult.Write, InvalidXLogRecPtr); + pg_atomic_init_u64(&XLogCtl->LogwrtResult.Flush, InvalidXLogRecPtr); + SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); @@ -5925,10 +5922,13 @@ StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; } + /* + * Update local and shared status. This is OK to do without any locks + * because no other process can be reading or writing WAL yet. + */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; - - XLogCtl->LogwrtResult = LogwrtResult; - + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog); + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; @@ -6367,9 +6367,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) { Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); /* * If we're writing and flushing WAL, the time line can't be changing, so @@ -9281,9 +9279,7 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + XLogUpdateLocalLogWrtResult(); return LogwrtResult.Write; } diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 95ae7845d8..63f60df7bf 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3129,6 +3129,7 @@ XLogRedoAction XLogSegNo XLogSource XLogStats +XLogwrtAtomic XLogwrtResult XLogwrtRqst XPV -- 2.34.1