From 4823d95c86ee696b2df57526ffba93aea83054bf Mon Sep 17 00:00:00 2001 From: Dilip kumar Date: Sat, 9 Sep 2023 12:56:10 +0530 Subject: [PATCH v1 2/3] bank wise slru locks The previous patch has divided SLRU buffer pool into associative banks. And this patch is further optimizing it by introducing bank wise slru locks instead of a common centralized lock this will reduce the contention on the slru control lock. Dilip Kumar based on some design suggestions from Robert Haas --- src/backend/access/transam/clog.c | 108 +++++++++----- src/backend/access/transam/commit_ts.c | 43 +++--- src/backend/access/transam/multixact.c | 179 ++++++++++++++++------- src/backend/access/transam/slru.c | 134 +++++++++++++---- src/backend/access/transam/subtrans.c | 27 ++-- src/backend/commands/async.c | 30 ++-- src/backend/storage/lmgr/lwlock.c | 14 ++ src/backend/storage/lmgr/lwlocknames.txt | 14 +- src/backend/storage/lmgr/predicate.c | 32 ++-- src/include/access/slru.h | 33 ++++- src/include/storage/lwlock.h | 8 + src/test/modules/test_slru/test_slru.c | 28 ++-- 12 files changed, 452 insertions(+), 198 deletions(-) diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 29d58f1eb3..938806532d 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -276,14 +276,19 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, XLogRecPtr lsn, int pageno, bool all_xact_same_page) { + LWLock *lock; + /* Can't use group update when PGPROC overflows. */ StaticAssertDecl(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS, "group clog threshold less than PGPROC cached subxids"); + /* Get SLRU lock w.r.t. the SLRU page we are going to access. */ + lock = SimpleLruPageGetSLRULock(XactCtl, pageno); + /* - * When there is contention on XactSLRULock, we try to group multiple + * When there is contention on SLRU lock, we try to group multiple * updates; a single leader process will perform transaction status - * updates for multiple backends so that the number of times XactSLRULock + * updates for multiple backends so that the number of times the SLRU lock * needs to be acquired is reduced. * * For this optimization to be safe, the XID and subxids in MyProc must be @@ -302,17 +307,17 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, nsubxids * sizeof(TransactionId)) == 0)) { /* - * If we can immediately acquire XactSLRULock, we update the status of + * If we can immediately acquire SLRULock, we update the status of * our own XID and release the lock. If not, try use group XID * update. If that doesn't work out, fall back to waiting for the * lock to perform an update for this transaction only. */ - if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE)) + if (LWLockConditionalAcquire(lock, LW_EXCLUSIVE)) { /* Got the lock without waiting! Do the update. */ TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status, lsn, pageno); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); return; } else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno)) @@ -325,10 +330,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, } /* Group update not applicable, or couldn't accept this page number. */ - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status, lsn, pageno); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -347,7 +352,8 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, Assert(status == TRANSACTION_STATUS_COMMITTED || status == TRANSACTION_STATUS_ABORTED || (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid))); - Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE)); + Assert(LWLockHeldByMeInMode(SimpleLruPageGetSLRULock(XactCtl, pageno), + LW_EXCLUSIVE)); /* * If we're doing an async commit (ie, lsn is valid), then we must wait @@ -398,14 +404,13 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, } /* - * When we cannot immediately acquire XactSLRULock in exclusive mode at + * When we cannot immediately acquire SLRU bank lock in exclusive mode at * commit time, add ourselves to a list of processes that need their XIDs * status update. The first process to add itself to the list will acquire - * XactSLRULock in exclusive mode and set transaction status as required - * on behalf of all group members. This avoids a great deal of contention - * around XactSLRULock when many processes are trying to commit at once, - * since the lock need not be repeatedly handed off from one committing - * process to the next. + * SLRU lock in exclusive mode and set transaction status as required on behalf + * of all group members. This avoids a great deal of contention around + * SLRULock when many processes are trying to commit at once, since the lock + * need not be repeatedly handed off from one committing process to the next. * * Returns true when transaction status has been updated in clog; returns * false if we decided against applying the optimization because the page @@ -419,6 +424,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, PGPROC *proc = MyProc; uint32 nextidx; uint32 wakeidx; + int prevpageno; + LWLock *prevlock = NULL; /* We should definitely have an XID whose status needs to be updated. */ Assert(TransactionIdIsValid(xid)); @@ -499,11 +506,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, return true; } - /* We are the leader. Acquire the lock on behalf of everyone. */ - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); - /* - * Now that we've got the lock, clear the list of processes waiting for + * We are leader so clear the list of processes waiting for * group XID status update, saving a pointer to the head of the list. * Trying to pop elements one at a time could lead to an ABA problem. */ @@ -513,10 +517,38 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, /* Remember head of list so we can perform wakeups after dropping lock. */ wakeidx = nextidx; + /* Acquire the SLRU bank lock w.r.t. the first page in the group. */ + prevpageno = ProcGlobal->allProcs[nextidx].clogGroupMemberPage; + prevlock = SimpleLruPageGetSLRULock(XactCtl, prevpageno); + LWLockAcquire(prevlock, LW_EXCLUSIVE); + /* Walk the list and update the status of all XIDs. */ while (nextidx != INVALID_PGPROCNO) { PGPROC *nextproc = &ProcGlobal->allProcs[nextidx]; + int thispageno = nextproc->clogGroupMemberPage; + + /* + * Although we are trying our best to keep same page in a group, there + * are cases where we might get different pages as well for detail + * refer comment in above while loop where we are adding this process + * for group update. So if the current page we are going to access is + * not in the same slru bank in which we updated the last page then we + * need to release the lock on the previous bank and acquire lock on + * the bank w.r.t. the page we are going to update now. + */ + if (thispageno != prevpageno) + { + LWLock *lock = SimpleLruPageGetSLRULock(XactCtl, thispageno); + + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + } + prevlock = lock; + prevpageno = thispageno; + } /* * Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs @@ -536,7 +568,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, } /* We're done with the lock now. */ - LWLockRelease(XactSLRULock); + if (prevlock != NULL) + LWLockRelease(prevlock); /* * Now that we've released the lock, go back and wake everybody up. We @@ -565,10 +598,11 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, /* * Sets the commit status of a single transaction. * - * Must be called with XactSLRULock held + * Must be called with slot specific SLRU bank's lock held */ static void -TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno) +TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, + int slotno) { int byteno = TransactionIdToByte(xid); int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT; @@ -657,7 +691,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn) lsnindex = GetLSNIndex(slotno, xid); *lsn = XactCtl->shared->group_lsn[lsnindex]; - LWLockRelease(XactSLRULock); + LWLockRelease(SimpleLruPageGetSLRULock(XactCtl, pageno)); return status; } @@ -676,7 +710,7 @@ CLOGShmemInit(void) { XactCtl->PagePrecedes = CLOGPagePrecedes; SimpleLruInit(XactCtl, "Xact", NUM_CLOG_BUFFERS, CLOG_LSNS_PER_PAGE, - XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER, + "pg_xact", LWTRANCHE_XACT_BUFFER, LWTRANCHE_XACT_SLRU, SYNC_HANDLER_CLOG); SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE); } @@ -691,8 +725,9 @@ void BootStrapCLOG(void) { int slotno; + LWLock *lock = SimpleLruPageGetSLRULock(XactCtl, 0); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the commit log */ slotno = ZeroCLOGPage(0, false); @@ -701,7 +736,7 @@ BootStrapCLOG(void) SimpleLruWritePage(XactCtl, slotno); Assert(!XactCtl->shared->page_dirty[slotno]); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -736,14 +771,10 @@ StartupCLOG(void) TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid); int pageno = TransactionIdToPage(xid); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); - /* * Initialize our idea of the latest page number. */ - XactCtl->shared->latest_page_number = pageno; - - LWLockRelease(XactSLRULock); + pg_atomic_init_u32(&XactCtl->shared->latest_page_number, pageno); } /* @@ -754,8 +785,9 @@ TrimCLOG(void) { TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid); int pageno = TransactionIdToPage(xid); + LWLock *lock = SimpleLruPageGetSLRULock(XactCtl, pageno); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* * Zero out the remainder of the current clog page. Under normal @@ -787,7 +819,7 @@ TrimCLOG(void) XactCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -819,6 +851,7 @@ void ExtendCLOG(TransactionId newestXact) { int pageno; + LWLock *lock; /* * No work except at first XID of a page. But beware: just after @@ -829,13 +862,14 @@ ExtendCLOG(TransactionId newestXact) return; pageno = TransactionIdToPage(newestXact); + lock = SimpleLruPageGetSLRULock(XactCtl, pageno); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroCLOGPage(pageno, true); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } @@ -973,16 +1007,18 @@ clog_redo(XLogReaderState *record) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(XactCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCLOGPage(pageno, false); SimpleLruWritePage(XactCtl, slotno); Assert(!XactCtl->shared->page_dirty[slotno]); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } else if (info == CLOG_TRUNCATE) { diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index 54422f2780..0c7f5bae86 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -220,8 +220,9 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, { int slotno; int i; + LWLock *lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); @@ -231,13 +232,13 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, CommitTsCtl->shared->page_dirty[slotno] = true; - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* * Sets the commit timestamp of a single transaction. * - * Must be called with CommitTsSLRULock held + * Must be called with slot specific SLRU bank's Lock held */ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, @@ -338,7 +339,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, if (nodeid) *nodeid = entry.nodeid; - LWLockRelease(CommitTsSLRULock); + LWLockRelease(SimpleLruPageGetSLRULock(CommitTsCtl, pageno)); return *ts != 0; } @@ -510,9 +511,8 @@ CommitTsShmemInit(void) CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; SimpleLruInit(CommitTsCtl, "CommitTs", NUM_COMMIT_TS_BUFFERS, 0, - CommitTsSLRULock, "pg_commit_ts", - LWTRANCHE_COMMITTS_BUFFER, - SYNC_HANDLER_COMMIT_TS); + "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER, + LWTRANCHE_COMMITTS_SLRU, SYNC_HANDLER_COMMIT_TS); SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE); commitTsShared = ShmemInitStruct("CommitTs shared", @@ -668,9 +668,7 @@ ActivateCommitTs(void) /* * Re-Initialize our idea of the latest page number. */ - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); - CommitTsCtl->shared->latest_page_number = pageno; - LWLockRelease(CommitTsSLRULock); + pg_atomic_write_u32(&CommitTsCtl->shared->latest_page_number, pageno); /* * If CommitTs is enabled, but it wasn't in the previous server run, we @@ -697,12 +695,13 @@ ActivateCommitTs(void) if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) { int slotno; + LWLock *lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCommitTsPage(pageno, false); SimpleLruWritePage(CommitTsCtl, slotno); Assert(!CommitTsCtl->shared->page_dirty[slotno]); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* Change the activation status in shared memory. */ @@ -751,9 +750,9 @@ DeactivateCommitTs(void) * be overwritten anyway when we wrap around, but it seems better to be * tidy.) */ - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + SimpleLruAcquireAllBankLock(CommitTsCtl, LW_EXCLUSIVE); (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL); - LWLockRelease(CommitTsSLRULock); + SimpleLruReleaseAllBankLock(CommitTsCtl); } /* @@ -785,6 +784,7 @@ void ExtendCommitTs(TransactionId newestXact) { int pageno; + LWLock *lock; /* * Nothing to do if module not enabled. Note we do an unlocked read of @@ -805,12 +805,14 @@ ExtendCommitTs(TransactionId newestXact) pageno = TransactionIdToCTsPage(newestXact); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno); + + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroCommitTsPage(pageno, !InRecovery); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* @@ -964,16 +966,18 @@ commit_ts_redo(XLogReaderState *record) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCommitTsPage(pageno, false); SimpleLruWritePage(CommitTsCtl, slotno); Assert(!CommitTsCtl->shared->page_dirty[slotno]); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } else if (info == COMMIT_TS_TRUNCATE) { @@ -985,7 +989,8 @@ commit_ts_redo(XLogReaderState *record) * During XLOG replay, latest_page_number isn't set up yet; insert a * suitable value to bypass the sanity test in SimpleLruTruncate. */ - CommitTsCtl->shared->latest_page_number = trunc->pageno; + pg_atomic_write_u32(&CommitTsCtl->shared->latest_page_number, + trunc->pageno); SimpleLruTruncate(CommitTsCtl, trunc->pageno); } diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index abb022e067..e63bd4cf71 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -192,10 +192,10 @@ static SlruCtlData MultiXactMemberCtlData; /* * MultiXact state shared across all backends. All this state is protected - * by MultiXactGenLock. (We also use MultiXactOffsetSLRULock and - * MultiXactMemberSLRULock to guard accesses to the two sets of SLRU - * buffers. For concurrency's sake, we avoid holding more than one of these - * locks at a time.) + * by MultiXactGenLock. (We also use SLRU bank's lock of MultiXactOffset and + * MultiXactMember to guard accesses to the two sets of SLRU buffers. For + * concurrency's sake, we avoid holding more than one of these locks at a + * time.) */ typedef struct MultiXactStateData { @@ -870,12 +870,15 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, int slotno; MultiXactOffset *offptr; int i; - - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLock *lock; + LWLock *prevlock = NULL; pageno = MultiXactIdToOffsetPage(multi); entryno = MultiXactIdToOffsetEntry(multi); + lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); + /* * Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction" * to complain about if there's any I/O error. This is kinda bogus, but @@ -891,10 +894,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, MultiXactOffsetCtl->shared->page_dirty[slotno] = true; - /* Exchange our lock */ - LWLockRelease(MultiXactOffsetSLRULock); - - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + /* Release MultiXactOffset SLRU lock. */ + LWLockRelease(lock); prev_pageno = -1; @@ -916,6 +917,20 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, if (pageno != prev_pageno) { + /* + * MultiXactMember SLRU page is changed so check if this new page + * fall into the different SLRU bank then release the old bank's + * lock and acquire lock on the new bank. + */ + lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno); + if (lock != prevlock) + { + if (prevlock != NULL) + LWLockRelease(prevlock); + + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi); prev_pageno = pageno; } @@ -936,7 +951,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, MultiXactMemberCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(MultiXactMemberSLRULock); + if (prevlock != NULL) + LWLockRelease(prevlock); } /* @@ -1239,6 +1255,8 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members, MultiXactId tmpMXact; MultiXactOffset nextOffset; MultiXactMember *ptr; + LWLock *lock; + LWLock *prevlock = NULL; debug_elog3(DEBUG2, "GetMembers: asked for %u", multi); @@ -1342,11 +1360,23 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members, * time on every multixact creation. */ retry: - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); - pageno = MultiXactIdToOffsetPage(multi); entryno = MultiXactIdToOffsetEntry(multi); + /* + * If the page is on the different SLRU bank then release the lock on the + * previous bank if we are already holding one and acquire the lock on the + * new bank. + */ + lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno); + if (lock != prevlock) + { + if (prevlock != NULL) + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -1379,7 +1409,22 @@ retry: entryno = MultiXactIdToOffsetEntry(tmpMXact); if (pageno != prev_pageno) + { + /* + * SLRU pageno is changed so check whether this page is falling in + * the different slru bank than on which we are already holding the + * lock and if so release the lock on the old bank and acquire that + * on the new bank. + */ + lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno); + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact); + } offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -1388,7 +1433,8 @@ retry: if (nextMXOffset == 0) { /* Corner case 2: next multixact is still being filled in */ - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(prevlock); + prevlock = NULL; CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); goto retry; @@ -1397,13 +1443,11 @@ retry: length = nextMXOffset - offset; } - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(prevlock); + prevlock = NULL; ptr = (MultiXactMember *) palloc(length * sizeof(MultiXactMember)); - /* Now get the members themselves. */ - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); - truelength = 0; prev_pageno = -1; for (i = 0; i < length; i++, offset++) @@ -1419,6 +1463,20 @@ retry: if (pageno != prev_pageno) { + /* + * MultiXactMember SLRU page is changed so check if this new page + * fall into the different SLRU bank then release the old bank's + * lock and acquire lock on the new bank. + */ + lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno); + if (lock != prevlock) + { + if (prevlock) + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi); prev_pageno = pageno; } @@ -1442,7 +1500,8 @@ retry: truelength++; } - LWLockRelease(MultiXactMemberSLRULock); + if (prevlock) + LWLockRelease(prevlock); /* A multixid with zero members should not happen */ Assert(truelength > 0); @@ -1852,15 +1911,14 @@ MultiXactShmemInit(void) SimpleLruInit(MultiXactOffsetCtl, "MultiXactOffset", NUM_MULTIXACTOFFSET_BUFFERS, 0, - MultiXactOffsetSLRULock, "pg_multixact/offsets", - LWTRANCHE_MULTIXACTOFFSET_BUFFER, + "pg_multixact/offsets", LWTRANCHE_MULTIXACTOFFSET_BUFFER, + LWTRANCHE_MULTIXACTOFFSET_SLRU, SYNC_HANDLER_MULTIXACT_OFFSET); SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE); SimpleLruInit(MultiXactMemberCtl, "MultiXactMember", NUM_MULTIXACTMEMBER_BUFFERS, 0, - MultiXactMemberSLRULock, "pg_multixact/members", - LWTRANCHE_MULTIXACTMEMBER_BUFFER, - SYNC_HANDLER_MULTIXACT_MEMBER); + "pg_multixact/members", LWTRANCHE_MULTIXACTMEMBER_BUFFER, + LWTRANCHE_MULTIXACTMEMBER_SLRU, SYNC_HANDLER_MULTIXACT_MEMBER); /* doesn't call SimpleLruTruncate() or meet criteria for unit tests */ /* Initialize our shared state struct */ @@ -1894,8 +1952,10 @@ void BootStrapMultiXact(void) { int slotno; + LWLock *lock; - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl , 0); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the offsets log */ slotno = ZeroMultiXactOffsetPage(0, false); @@ -1904,9 +1964,10 @@ BootStrapMultiXact(void) SimpleLruWritePage(MultiXactOffsetCtl, slotno); Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl , 0); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the members log */ slotno = ZeroMultiXactMemberPage(0, false); @@ -1915,7 +1976,7 @@ BootStrapMultiXact(void) SimpleLruWritePage(MultiXactMemberCtl, slotno); Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } /* @@ -1975,10 +2036,12 @@ static void MaybeExtendOffsetSlru(void) { int pageno; + LWLock *lock; pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact); + lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl ,pageno); - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); if (!SimpleLruDoesPhysicalPageExist(MultiXactOffsetCtl, pageno)) { @@ -1993,7 +2056,7 @@ MaybeExtendOffsetSlru(void) SimpleLruWritePage(MultiXactOffsetCtl, slotno); } - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } /* @@ -2015,13 +2078,15 @@ StartupMultiXact(void) * Initialize offset's idea of the latest page number. */ pageno = MultiXactIdToOffsetPage(multi); - MultiXactOffsetCtl->shared->latest_page_number = pageno; + pg_atomic_init_u32(&MultiXactOffsetCtl->shared->latest_page_number, + pageno); /* * Initialize member's idea of the latest page number. */ pageno = MXOffsetToMemberPage(offset); - MultiXactMemberCtl->shared->latest_page_number = pageno; + pg_atomic_init_u32(&MultiXactMemberCtl->shared->latest_page_number, + pageno); } /* @@ -2037,7 +2102,6 @@ TrimMultiXact(void) int pageno; int entryno; int flagsoff; - LWLockAcquire(MultiXactGenLock, LW_SHARED); nextMXact = MultiXactState->nextMXact; offset = MultiXactState->nextOffset; @@ -2046,13 +2110,13 @@ TrimMultiXact(void) LWLockRelease(MultiXactGenLock); /* Clean up offsets state */ - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); /* * (Re-)Initialize our idea of the latest page number for offsets. */ pageno = MultiXactIdToOffsetPage(nextMXact); - MultiXactOffsetCtl->shared->latest_page_number = pageno; + pg_atomic_write_u32(&MultiXactOffsetCtl->shared->latest_page_number, + pageno); /* * Zero out the remainder of the current offsets page. See notes in @@ -2067,7 +2131,9 @@ TrimMultiXact(void) { int slotno; MultiXactOffset *offptr; + LWLock *lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -2075,18 +2141,17 @@ TrimMultiXact(void) MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset))); MultiXactOffsetCtl->shared->page_dirty[slotno] = true; + LWLockRelease(lock); } - LWLockRelease(MultiXactOffsetSLRULock); - /* And the same for members */ - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); /* * (Re-)Initialize our idea of the latest page number for members. */ pageno = MXOffsetToMemberPage(offset); - MultiXactMemberCtl->shared->latest_page_number = pageno; + pg_atomic_write_u32(&MultiXactMemberCtl->shared->latest_page_number, + pageno); /* * Zero out the remainder of the current members page. See notes in @@ -2098,7 +2163,9 @@ TrimMultiXact(void) int slotno; TransactionId *xidptr; int memberoff; + LWLock *lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); memberoff = MXOffsetToMemberOffset(offset); slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset); xidptr = (TransactionId *) @@ -2113,10 +2180,9 @@ TrimMultiXact(void) */ MultiXactMemberCtl->shared->page_dirty[slotno] = true; + LWLockRelease(lock); } - LWLockRelease(MultiXactMemberSLRULock); - /* signal that we're officially up */ LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE); MultiXactState->finishedStartup = true; @@ -2404,6 +2470,7 @@ static void ExtendMultiXactOffset(MultiXactId multi) { int pageno; + LWLock *lock; /* * No work except at first MultiXactId of a page. But beware: just after @@ -2414,13 +2481,14 @@ ExtendMultiXactOffset(MultiXactId multi) return; pageno = MultiXactIdToOffsetPage(multi); + lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno); - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroMultiXactOffsetPage(pageno, true); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } /* @@ -2453,15 +2521,17 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers) if (flagsoff == 0 && flagsbit == 0) { int pageno; + LWLock *lock; pageno = MXOffsetToMemberPage(offset); + lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroMultiXactMemberPage(pageno, true); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } /* @@ -2759,7 +2829,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result) offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; offset = *offptr; - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno)); *result = offset; return true; @@ -3241,31 +3311,33 @@ multixact_redo(XLogReaderState *record) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); - - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroMultiXactOffsetPage(pageno, false); SimpleLruWritePage(MultiXactOffsetCtl, slotno); Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); - - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroMultiXactMemberPage(pageno, false); SimpleLruWritePage(MultiXactMemberCtl, slotno); Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } else if (info == XLOG_MULTIXACT_CREATE_ID) { @@ -3331,7 +3403,8 @@ multixact_redo(XLogReaderState *record) * SimpleLruTruncate. */ pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff); - MultiXactOffsetCtl->shared->latest_page_number = pageno; + pg_atomic_write_u32(&MultiXactOffsetCtl->shared->latest_page_number, + pageno); PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff); LWLockRelease(MultiXactTruncationLock); diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 57889b72bd..c06e4eddd1 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -187,9 +187,9 @@ Size SimpleLruShmemSize(int nslots, int nlsns) { Size sz; - int bankmask_ignore; + int bankmask; - SlruAdjustNSlots(&nslots, &bankmask_ignore); + SlruAdjustNSlots(&nslots, &bankmask); /* we assume nslots isn't so large as to risk overflow */ sz = MAXALIGN(sizeof(SlruSharedData)); @@ -199,6 +199,7 @@ SimpleLruShmemSize(int nslots, int nlsns) sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */ + sz += MAXALIGN((bankmask + 1) * sizeof(LWLockPadded)); /* bank_locks[] */ if (nlsns > 0) sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ @@ -206,6 +207,32 @@ SimpleLruShmemSize(int nslots, int nlsns) return BUFFERALIGN(sz) + BLCKSZ * nslots; } +/* + * Function to acquire all bank's lock of the given SlruCtl + */ +void +SimpleLruAcquireAllBankLock(SlruCtl ctl, LWLockMode mode) +{ + SlruShared shared = ctl->shared; + int bankno; + + for (bankno = 0; bankno <= ctl->bank_mask; bankno++) + LWLockAcquire(&shared->bank_locks[bankno].lock, mode); +} + +/* + * Function to release all bank's lock of the given SlruCtl + */ +void +SimpleLruReleaseAllBankLock(SlruCtl ctl) +{ + SlruShared shared = ctl->shared; + int bankno; + + for (bankno = 0; bankno <= ctl->bank_mask; bankno++) + LWLockRelease(&shared->bank_locks[bankno].lock); +} + /* * Initialize, or attach to, a simple LRU cache in shared memory. * @@ -220,7 +247,7 @@ SimpleLruShmemSize(int nslots, int nlsns) */ void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, - LWLock *ctllock, const char *subdir, int tranche_id, + const char *subdir, int tranche_id, int slru_tranche_id, SyncRequestHandler sync_handler) { SlruShared shared; @@ -239,13 +266,13 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, char *ptr; Size offset; int slotno; + int nbanks = bankmask + 1; + int bankno; Assert(!found); memset(shared, 0, sizeof(SlruSharedData)); - shared->ControlLock = ctllock; - shared->num_slots = nslots; shared->lsn_groups_per_page = nlsns; @@ -271,6 +298,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, /* Initialize LWLocks */ shared->buffer_locks = (LWLockPadded *) (ptr + offset); offset += MAXALIGN(nslots * sizeof(LWLockPadded)); + shared->bank_locks = (LWLockPadded *) (ptr + offset); + offset += MAXALIGN(nbanks * sizeof(LWLockPadded)); if (nlsns > 0) { @@ -290,6 +319,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, shared->page_lru_count[slotno] = 0; ptr += BLCKSZ; } + /* initialize bank locks for each buffer bank */ + for (bankno = 0; bankno < nbanks; bankno++) + LWLockInitialize(&shared->bank_locks[bankno].lock, + slru_tranche_id); /* Should fit to estimated shmem size */ Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns)); @@ -344,7 +377,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno) SimpleLruZeroLSNs(ctl, slotno); /* Assume this page is now the latest active page */ - shared->latest_page_number = pageno; + pg_atomic_write_u32(&shared->latest_page_number, pageno); /* update the stats counter of zeroed pages */ pgstat_count_slru_page_zeroed(shared->slru_stats_idx); @@ -383,12 +416,13 @@ static void SimpleLruWaitIO(SlruCtl ctl, int slotno) { SlruShared shared = ctl->shared; + int bankno = slotno / SLRU_BANK_SIZE; /* See notes at top of file */ - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[bankno].lock); LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED); LWLockRelease(&shared->buffer_locks[slotno].lock); - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); /* * If the slot is still in an io-in-progress state, then either someone @@ -443,6 +477,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, for (;;) { int slotno; + int bankno; bool ok; /* See if page already is in memory; if not, pick victim slot */ @@ -485,9 +520,10 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, /* Acquire per-buffer lock (cannot deadlock, see notes at top) */ LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); + bankno = slotno / SLRU_BANK_SIZE; /* Release control lock while doing I/O */ - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[bankno].lock); /* Do the read */ ok = SlruPhysicalReadPage(ctl, pageno, slotno); @@ -496,7 +532,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, SimpleLruZeroLSNs(ctl, slotno); /* Re-acquire control lock and update page state */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); Assert(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS && @@ -538,11 +574,12 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) { SlruShared shared = ctl->shared; int slotno; - int bankstart = (pageno & ctl->bank_mask) * SLRU_BANK_SIZE; + int bankno = pageno & ctl->bank_mask; + int bankstart = bankno * SLRU_BANK_SIZE; int bankend = bankstart + SLRU_BANK_SIZE; /* Try to find the page while holding only shared lock */ - LWLockAcquire(shared->ControlLock, LW_SHARED); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_SHARED); /* See if page is already in a buffer */ for (slotno = bankstart; slotno < bankend; slotno++) @@ -562,8 +599,8 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) } /* No luck, so switch to normal exclusive lock and do regular read */ - LWLockRelease(shared->ControlLock); - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockRelease(&shared->bank_locks[bankno].lock); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); return SimpleLruReadPage(ctl, pageno, true, xid); } @@ -585,6 +622,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) SlruShared shared = ctl->shared; int pageno = shared->page_number[slotno]; bool ok; + int bankno = slotno / SLRU_BANK_SIZE; /* If a write is in progress, wait for it to finish */ while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS && @@ -613,7 +651,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); /* Release control lock while doing I/O */ - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[bankno].lock); /* Do the write */ ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata); @@ -628,7 +666,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) } /* Re-acquire control lock and update page state */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); Assert(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS); @@ -1133,7 +1171,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno) this_delta = 0; } this_page_number = shared->page_number[slotno]; - if (this_page_number == shared->latest_page_number) + if (this_page_number == pg_atomic_read_u32(&shared->latest_page_number)) continue; if (shared->page_status[slotno] == SLRU_PAGE_VALID) { @@ -1207,6 +1245,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) int slotno; int pageno = 0; int i; + int lastbankno = 0; bool ok; /* update the stats counter of flushes */ @@ -1217,10 +1256,19 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) */ fdata.num_files = 0; - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[0].lock, LW_EXCLUSIVE); for (slotno = 0; slotno < shared->num_slots; slotno++) { + int curbankno = slotno / SLRU_BANK_SIZE; + + if (curbankno != lastbankno) + { + LWLockRelease(&shared->bank_locks[lastbankno].lock); + LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE); + lastbankno = curbankno; + } + SlruInternalWritePage(ctl, slotno, &fdata); /* @@ -1234,7 +1282,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) !shared->page_dirty[slotno])); } - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[lastbankno].lock); /* * Now close any files that were open @@ -1274,6 +1322,7 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage) { SlruShared shared = ctl->shared; int slotno; + int prevbankno; /* update the stats counter of truncates */ pgstat_count_slru_truncate(shared->slru_stats_idx); @@ -1284,25 +1333,38 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage) * or just after a checkpoint, any dirty pages should have been flushed * already ... we're just being extra careful here.) */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); - restart: /* * While we are holding the lock, make an important safety check: the * current endpoint page must not be eligible for removal. */ - if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage)) + if (ctl->PagePrecedes(pg_atomic_read_u32(&shared->latest_page_number), + cutoffPage)) { - LWLockRelease(shared->ControlLock); ereport(LOG, (errmsg("could not truncate directory \"%s\": apparent wraparound", ctl->Dir))); return; } + prevbankno = 0; + LWLockAcquire(&shared->bank_locks[prevbankno].lock, LW_EXCLUSIVE); for (slotno = 0; slotno < shared->num_slots; slotno++) { + int curbankno = slotno / SLRU_BANK_SIZE; + + /* + * If the curbankno is not same as prevbankno then release the lock on + * the prevbankno and acquire the lock on the curbankno. + */ + if (curbankno != prevbankno) + { + LWLockRelease(&shared->bank_locks[prevbankno].lock); + LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE); + prevbankno = curbankno; + } + if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) continue; if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage)) @@ -1332,10 +1394,12 @@ restart: SlruInternalWritePage(ctl, slotno, NULL); else SimpleLruWaitIO(ctl, slotno); + + LWLockRelease(&shared->bank_locks[prevbankno].lock); goto restart; } - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[prevbankno].lock); /* Now we can remove the old segment(s) */ (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage); @@ -1376,15 +1440,31 @@ SlruDeleteSegment(SlruCtl ctl, int segno) SlruShared shared = ctl->shared; int slotno; bool did_write; + int prevbankno = 0; /* Clean out any possibly existing references to the segment. */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[prevbankno].lock, LW_EXCLUSIVE); restart: did_write = false; for (slotno = 0; slotno < shared->num_slots; slotno++) { - int pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT; + int pagesegno; + int curbankno; + + curbankno = slotno / SLRU_BANK_SIZE; + + /* + * If the curbankno is not same as prevbankno then release the lock on + * the prevbankno and acquire the lock on the curbankno. + */ + if (curbankno != prevbankno) + { + LWLockRelease(&shared->bank_locks[prevbankno].lock); + LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE); + prevbankno = curbankno; + } + pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT; if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) continue; @@ -1418,7 +1498,7 @@ restart: SlruInternalDeleteSegment(ctl, segno); - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[prevbankno].lock); } /* diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index 125273e235..2b0afa8a15 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -77,12 +77,14 @@ SubTransSetParent(TransactionId xid, TransactionId parent) int pageno = TransactionIdToPage(xid); int entryno = TransactionIdToEntry(xid); int slotno; + LWLock *lock; TransactionId *ptr; Assert(TransactionIdIsValid(parent)); Assert(TransactionIdFollows(xid, parent)); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(SubTransCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid); ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno]; @@ -100,7 +102,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent) SubTransCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } /* @@ -130,7 +132,7 @@ SubTransGetParent(TransactionId xid) parent = *ptr; - LWLockRelease(SubtransSLRULock); + LWLockRelease(SimpleLruPageGetSLRULock(SubTransCtl, pageno)); return parent; } @@ -193,8 +195,8 @@ SUBTRANSShmemInit(void) { SubTransCtl->PagePrecedes = SubTransPagePrecedes; SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, 0, - SubtransSLRULock, "pg_subtrans", - LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE); + "pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER, + LWTRANCHE_SUBTRANS_SLRU, SYNC_HANDLER_NONE); SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE); } @@ -212,8 +214,9 @@ void BootStrapSUBTRANS(void) { int slotno; + LWLock *lock = SimpleLruPageGetSLRULock(SubTransCtl, 0); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the subtrans log */ slotno = ZeroSUBTRANSPage(0); @@ -222,7 +225,7 @@ BootStrapSUBTRANS(void) SimpleLruWritePage(SubTransCtl, slotno); Assert(!SubTransCtl->shared->page_dirty[slotno]); - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } /* @@ -259,7 +262,7 @@ StartupSUBTRANS(TransactionId oldestActiveXID) * Whenever we advance into a new page, ExtendSUBTRANS will likewise zero * the new page without regard to whatever was previously on disk. */ - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + SimpleLruAcquireAllBankLock(SubTransCtl, LW_EXCLUSIVE); startPage = TransactionIdToPage(oldestActiveXID); nextXid = ShmemVariableCache->nextXid; @@ -275,7 +278,7 @@ StartupSUBTRANS(TransactionId oldestActiveXID) } (void) ZeroSUBTRANSPage(startPage); - LWLockRelease(SubtransSLRULock); + SimpleLruReleaseAllBankLock(SubTransCtl); } /* @@ -309,6 +312,7 @@ void ExtendSUBTRANS(TransactionId newestXact) { int pageno; + LWLock *lock; /* * No work except at first XID of a page. But beware: just after @@ -320,12 +324,13 @@ ExtendSUBTRANS(TransactionId newestXact) pageno = TransactionIdToPage(newestXact); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + lock = SimpleLruPageGetSLRULock(SubTransCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page */ ZeroSUBTRANSPage(pageno); - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index d148d10850..7088fe15ea 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -267,9 +267,10 @@ typedef struct QueueBackendStatus * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends * can change the tail pointers. * - * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers. + * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as + * the control lock for the pg_notify SLRU buffers. * In order to avoid deadlocks, whenever we need multiple locks, we first get - * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock. + * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock. * * Each backend uses the backend[] array entry with index equal to its * BackendId (which can range from 1 to MaxBackends). We rely on this to make @@ -570,8 +571,8 @@ AsyncShmemInit(void) */ NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0, - NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER, - SYNC_HANDLER_NONE); + "pg_notify", LWTRANCHE_NOTIFY_BUFFER, + LWTRANCHE_NOTIFY_SLRU, SYNC_HANDLER_NONE); if (!found) { @@ -1402,7 +1403,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) * Eventually we will return NULL indicating all is done. * * We are holding NotifyQueueLock already from the caller and grab - * NotifySLRULock locally in this function. + * page specific SLRULock locally in this function. */ static ListCell * asyncQueueAddEntries(ListCell *nextNotify) @@ -1412,9 +1413,7 @@ asyncQueueAddEntries(ListCell *nextNotify) int pageno; int offset; int slotno; - - /* We hold both NotifyQueueLock and NotifySLRULock during this operation */ - LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE); + LWLock *lock; /* * We work with a local copy of QUEUE_HEAD, which we write back to shared @@ -1438,6 +1437,11 @@ asyncQueueAddEntries(ListCell *nextNotify) * wrapped around, but re-zeroing the page is harmless in that case.) */ pageno = QUEUE_POS_PAGE(queue_head); + lock = SimpleLruPageGetSLRULock(NotifyCtl, pageno); + + /* We hold both NotifyQueueLock and SLRU bank lock during this operation */ + LWLockAcquire(lock, LW_EXCLUSIVE); + if (QUEUE_POS_IS_ZERO(queue_head)) slotno = SimpleLruZeroPage(NotifyCtl, pageno); else @@ -1509,7 +1513,7 @@ asyncQueueAddEntries(ListCell *nextNotify) /* Success, so update the global QUEUE_HEAD */ QUEUE_HEAD = queue_head; - LWLockRelease(NotifySLRULock); + LWLockRelease(lock); return nextNotify; } @@ -1988,7 +1992,7 @@ asyncQueueReadAllNotifications(void) /* * We copy the data from SLRU into a local buffer, so as to avoid - * holding the NotifySLRULock while we are examining the entries + * holding the SLRULock while we are examining the entries * and possibly transmitting them to our frontend. Copy only the * part of the page we will actually inspect. */ @@ -2010,7 +2014,7 @@ asyncQueueReadAllNotifications(void) NotifyCtl->shared->page_buffer[slotno] + curoffset, copysize); /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(NotifySLRULock); + LWLockRelease(SimpleLruPageGetSLRULock(NotifyCtl, curpage)); /* * Process messages up to the stop position, end of page, or an @@ -2051,7 +2055,7 @@ asyncQueueReadAllNotifications(void) * * The current page must have been fetched into page_buffer from shared * memory. (We could access the page right in shared memory, but that - * would imply holding the NotifySLRULock throughout this routine.) + * would imply holding the SLRU bank lock throughout this routine.) * * We stop if we reach the "stop" position, or reach a notification from an * uncommitted transaction, or reach the end of the page. @@ -2204,7 +2208,7 @@ asyncQueueAdvanceTail(void) if (asyncQueuePagePrecedes(oldtailpage, boundary)) { /* - * SimpleLruTruncate() will ask for NotifySLRULock but will also + * SimpleLruTruncate() will ask for SLRU bank locks but will also * release the lock again. */ SimpleLruTruncate(NotifyCtl, newtailpage); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 315a78cda9..1261af0548 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -190,6 +190,20 @@ static const char *const BuiltinTrancheNames[] = { "LogicalRepLauncherDSA", /* LWTRANCHE_LAUNCHER_HASH: */ "LogicalRepLauncherHash", + /* LWTRANCHE_XACT_SLRU: */ + "XactSLRU", + /* LWTRANCHE_COMMITTS_SLRU: */ + "CommitTSSLRU", + /* LWTRANCHE_SUBTRANS_SLRU: */ + "SubtransSLRU", + /* LWTRANCHE_MULTIXACTOFFSET_SLRU: */ + "MultixactOffsetSLRU", + /* LWTRANCHE_MULTIXACTMEMBER_SLRU: */ + "MultixactMemberSLRU", + /* LWTRANCHE_NOTIFY_SLRU: */ + "NotifySLRU", + /* LWTRANCHE_SERIAL_SLRU: */ + "SerialSLRU" }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906ce..9e66ecd1ed 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -16,11 +16,11 @@ WALBufMappingLock 7 WALWriteLock 8 ControlFileLock 9 # 10 was CheckpointLock -XactSLRULock 11 -SubtransSLRULock 12 +# 11 was XactSLRULock +# 12 was SubtransSLRULock MultiXactGenLock 13 -MultiXactOffsetSLRULock 14 -MultiXactMemberSLRULock 15 +# 14 was MultiXactOffsetSLRULock +# 15 was MultiXactMemberSLRULock RelCacheInitLock 16 CheckpointerCommLock 17 TwoPhaseStateLock 18 @@ -31,19 +31,19 @@ AutovacuumLock 22 AutovacuumScheduleLock 23 SyncScanLock 24 RelationMappingLock 25 -NotifySLRULock 26 +#26 was NotifySLRULock NotifyQueueLock 27 SerializableXactHashLock 28 SerializableFinishedListLock 29 SerializablePredicateListLock 30 -SerialSLRULock 31 +SerialControlLock 31 SyncRepLock 32 BackgroundWorkerLock 33 DynamicSharedMemoryControlLock 34 AutoFileLock 35 ReplicationSlotAllocationLock 36 ReplicationSlotControlLock 37 -CommitTsSLRULock 38 +#38 was CommitTsSLRULock CommitTsLock 39 ReplicationOriginLock 40 MultiXactTruncationLock 41 diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 1af41213b4..fe00148956 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -808,8 +808,8 @@ SerialInit(void) */ SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically; SimpleLruInit(SerialSlruCtl, "Serial", - NUM_SERIAL_BUFFERS, 0, SerialSLRULock, "pg_serial", - LWTRANCHE_SERIAL_BUFFER, SYNC_HANDLER_NONE); + NUM_SERIAL_BUFFERS, 0, "pg_serial", LWTRANCHE_SERIAL_BUFFER, + LWTRANCHE_SERIAL_SLRU, SYNC_HANDLER_NONE); #ifdef USE_ASSERT_CHECKING SerialPagePrecedesLogicallyUnitTests(); #endif @@ -846,12 +846,14 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo) int slotno; int firstZeroPage; bool isNewPage; + LWLock *lock; Assert(TransactionIdIsValid(xid)); targetPage = SerialPage(xid); + lock = SimpleLruPageGetSLRULock(SerialSlruCtl, targetPage); - LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* * If no serializable transactions are active, there shouldn't be anything @@ -901,7 +903,7 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo) SerialValue(slotno, xid) = minConflictCommitSeqNo; SerialSlruCtl->shared->page_dirty[slotno] = true; - LWLockRelease(SerialSLRULock); + LWLockRelease(lock); } /* @@ -919,10 +921,10 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid) Assert(TransactionIdIsValid(xid)); - LWLockAcquire(SerialSLRULock, LW_SHARED); + LWLockAcquire(SerialControlLock, LW_SHARED); headXid = serialControl->headXid; tailXid = serialControl->tailXid; - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); if (!TransactionIdIsValid(headXid)) return 0; @@ -934,13 +936,13 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid) return 0; /* - * The following function must be called without holding SerialSLRULock, + * The following function must be called without holding SLRU bank lock, * but will return with that lock held, which must then be released. */ slotno = SimpleLruReadPage_ReadOnly(SerialSlruCtl, SerialPage(xid), xid); val = SerialValue(slotno, xid); - LWLockRelease(SerialSLRULock); + LWLockRelease(SimpleLruPageGetSLRULock(SerialSlruCtl, SerialPage(xid))); return val; } @@ -953,7 +955,7 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid) static void SerialSetActiveSerXmin(TransactionId xid) { - LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE); + LWLockAcquire(SerialControlLock, LW_EXCLUSIVE); /* * When no sxacts are active, nothing overlaps, set the xid values to @@ -965,7 +967,7 @@ SerialSetActiveSerXmin(TransactionId xid) { serialControl->tailXid = InvalidTransactionId; serialControl->headXid = InvalidTransactionId; - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); return; } @@ -983,7 +985,7 @@ SerialSetActiveSerXmin(TransactionId xid) { serialControl->tailXid = xid; } - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); return; } @@ -992,7 +994,7 @@ SerialSetActiveSerXmin(TransactionId xid) serialControl->tailXid = xid; - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); } /* @@ -1006,12 +1008,12 @@ CheckPointPredicate(void) { int tailPage; - LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE); + LWLockAcquire(SerialControlLock, LW_EXCLUSIVE); /* Exit quickly if the SLRU is currently not in use. */ if (serialControl->headPage < 0) { - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); return; } @@ -1055,7 +1057,7 @@ CheckPointPredicate(void) serialControl->headPage = -1; } - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); /* Truncate away pages that are no longer required */ SimpleLruTruncate(SerialSlruCtl, tailPage); diff --git a/src/include/access/slru.h b/src/include/access/slru.h index f5f2b5b8b5..eec7a568dc 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -52,8 +52,6 @@ typedef enum */ typedef struct SlruSharedData { - LWLock *ControlLock; - /* Number of buffers managed by this SLRU structure */ int num_slots; @@ -68,6 +66,13 @@ typedef struct SlruSharedData int *page_lru_count; LWLockPadded *buffer_locks; + /* + * Lock to protect the buffer slot access in per SLRU bank. The + * buffer_locks protects the I/O on each buffer slots whereas this lock + * protect the in memory operation on the buffer within one SLRU bank. + */ + LWLockPadded *bank_locks; + /* * Optional array of WAL flush LSNs associated with entries in the SLRU * pages. If not zero/NULL, we must flush WAL before writing pages (true @@ -95,7 +100,7 @@ typedef struct SlruSharedData * this is not critical data, since we use it only to avoid swapping out * the latest page. */ - int latest_page_number; + pg_atomic_uint32 latest_page_number; /* SLRU's index for statistics purposes (might not be unique) */ int slru_stats_idx; @@ -143,11 +148,25 @@ typedef struct SlruCtlData typedef SlruCtlData *SlruCtl; +/* + * Get the SLRU bank lock for given SlruCtl and the pageno. + * + * This lock needs to be acquire in order to access the slru buffer slots in + * the respective bank. For more details refer comments in SlruSharedData. + */ +static inline LWLock * +SimpleLruPageGetSLRULock(SlruCtl ctl, int pageno) +{ + int bankno = (pageno & ctl->bank_mask); + + /* Try to find the page while holding only shared lock */ + return &(ctl->shared->bank_locks[bankno].lock); +} extern Size SimpleLruShmemSize(int nslots, int nlsns); extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, - LWLock *ctllock, const char *subdir, int tranche_id, - SyncRequestHandler sync_handler); + const char *subdir, int tranche_id, + int slru_tranche_id, SyncRequestHandler sync_handler); extern int SimpleLruZeroPage(SlruCtl ctl, int pageno); extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid); @@ -175,5 +194,7 @@ extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data); extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data); - +extern LWLock *SimpleLruPageGetSLRULock(SlruCtl ctl, int pageno); +extern void SimpleLruAcquireAllBankLock(SlruCtl ctl, LWLockMode mode); +extern void SimpleLruReleaseAllBankLock(SlruCtl ctl); #endif /* SLRU_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index d77410bdea..09d2efe8ca 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -207,6 +207,14 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DATA, LWTRANCHE_LAUNCHER_DSA, LWTRANCHE_LAUNCHER_HASH, + LWTRANCHE_XACT_SLRU, + LWTRANCHE_COMMITTS_SLRU, + LWTRANCHE_SUBTRANS_SLRU, + LWTRANCHE_MULTIXACTOFFSET_SLRU, + LWTRANCHE_MULTIXACTMEMBER_SLRU, + LWTRANCHE_NOTIFY_SLRU, + LWTRANCHE_SERIAL_SLRU, + LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/test/modules/test_slru/test_slru.c b/src/test/modules/test_slru/test_slru.c index ae21444c47..7b2eb4ae50 100644 --- a/src/test/modules/test_slru/test_slru.c +++ b/src/test/modules/test_slru/test_slru.c @@ -63,9 +63,9 @@ test_slru_page_write(PG_FUNCTION_ARGS) int pageno = PG_GETARG_INT32(0); char *data = text_to_cstring(PG_GETARG_TEXT_PP(1)); int slotno; + LWLock *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno); - LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); - + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruZeroPage(TestSlruCtl, pageno); /* these should match */ @@ -80,7 +80,7 @@ test_slru_page_write(PG_FUNCTION_ARGS) BLCKSZ - 1); SimpleLruWritePage(TestSlruCtl, slotno); - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_VOID(); } @@ -99,13 +99,14 @@ test_slru_page_read(PG_FUNCTION_ARGS) bool write_ok = PG_GETARG_BOOL(1); char *data = NULL; int slotno; + LWLock *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno); /* find page in buffers, reading it if necessary */ - LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(TestSlruCtl, pageno, write_ok, InvalidTransactionId); data = (char *) TestSlruCtl->shared->page_buffer[slotno]; - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_TEXT_P(cstring_to_text(data)); } @@ -116,14 +117,15 @@ test_slru_page_readonly(PG_FUNCTION_ARGS) int pageno = PG_GETARG_INT32(0); char *data = NULL; int slotno; + LWLock *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno); /* find page in buffers, reading it if necessary */ slotno = SimpleLruReadPage_ReadOnly(TestSlruCtl, pageno, InvalidTransactionId); - Assert(LWLockHeldByMe(TestSLRULock)); + Assert(LWLockHeldByMe(lock)); data = (char *) TestSlruCtl->shared->page_buffer[slotno]; - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_TEXT_P(cstring_to_text(data)); } @@ -133,10 +135,11 @@ test_slru_page_exists(PG_FUNCTION_ARGS) { int pageno = PG_GETARG_INT32(0); bool found; + LWLock *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno); - LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); found = SimpleLruDoesPhysicalPageExist(TestSlruCtl, pageno); - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_BOOL(found); } @@ -215,6 +218,7 @@ test_slru_shmem_startup(void) { const char slru_dir_name[] = "pg_test_slru"; int test_tranche_id; + int test_buffer_tranche_id; if (prev_shmem_startup_hook) prev_shmem_startup_hook(); @@ -228,11 +232,13 @@ test_slru_shmem_startup(void) /* initialize the SLRU facility */ test_tranche_id = LWLockNewTrancheId(); LWLockRegisterTranche(test_tranche_id, "test_slru_tranche"); - LWLockInitialize(TestSLRULock, test_tranche_id); + + test_buffer_tranche_id = LWLockNewTrancheId(); + LWLockRegisterTranche(test_tranche_id, "test_buffer_tranche"); TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically; SimpleLruInit(TestSlruCtl, "TestSLRU", - NUM_TEST_BUFFERS, 0, TestSLRULock, slru_dir_name, + NUM_TEST_BUFFERS, 0, slru_dir_name, test_buffer_tranche_id, test_tranche_id, SYNC_HANDLER_NONE); } -- 2.39.2 (Apple Git-143)