diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c new file mode 100644 index 3ae2848..3e70792 *** a/src/backend/storage/buffer/buf_init.c --- b/src/backend/storage/buffer/buf_init.c *************** InitBufferPool(void) *** 95,106 **** BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - buf->refcount = 0; - buf->wait_backend_pid = 0; ! SpinLockInit(&buf->buf_hdr_lock); buf->buf_id = i; --- 95,103 ---- BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); ! pg_atomic_init_u32(&buf->state, 0); ! buf->wait_backend_pid = 0; buf->buf_id = i; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c new file mode 100644 index 63188a3..2ebd4a9 *** a/src/backend/storage/buffer/bufmgr.c --- b/src/backend/storage/buffer/bufmgr.c *************** *** 51,57 **** #include "utils/resowner_private.h" #include "utils/timestamp.h" - /* Note: these two macros only work on shared buffers, not local ones! */ #define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) --- 51,56 ---- *************** static BufferDesc *PinCountWaitBuf = NUL *** 126,132 **** * entry using ReservePrivateRefCountEntry() and then later, if necessary, * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing * memory allocations in NewPrivateRefCountEntry() which can be important ! * because in some scenarios it's called with a spinlock held... */ static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES]; static HTAB *PrivateRefCountHash = NULL; --- 125,131 ---- * entry using ReservePrivateRefCountEntry() and then later, if necessary, * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing * memory allocations in NewPrivateRefCountEntry() which can be important ! * because in some scenarios it's called with a header lock held... */ static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES]; static HTAB *PrivateRefCountHash = NULL; *************** ReadBuffer_common(SMgrRelation smgr, cha *** 774,782 **** */ if (isLocalBuf) { ! /* Only need to adjust flags */ ! Assert(bufHdr->flags & BM_VALID); ! bufHdr->flags &= ~BM_VALID; } else { --- 773,780 ---- */ if (isLocalBuf) { ! Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID); ! pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_VALID); } else { *************** ReadBuffer_common(SMgrRelation smgr, cha *** 788,795 **** do { LockBufHdr(bufHdr); ! Assert(bufHdr->flags & BM_VALID); ! bufHdr->flags &= ~BM_VALID; UnlockBufHdr(bufHdr); } while (!StartBufferIO(bufHdr, true)); } --- 786,793 ---- do { LockBufHdr(bufHdr); ! Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID); ! pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_VALID); UnlockBufHdr(bufHdr); } while (!StartBufferIO(bufHdr, true)); } *************** ReadBuffer_common(SMgrRelation smgr, cha *** 807,813 **** * it's not been recycled) but come right back here to try smgrextend * again. */ ! Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); --- 805,811 ---- * it's not been recycled) but come right back here to try smgrextend * again. */ ! Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* header lock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); *************** ReadBuffer_common(SMgrRelation smgr, cha *** 885,891 **** if (isLocalBuf) { /* Only need to adjust flags */ ! bufHdr->flags |= BM_VALID; } else { --- 883,889 ---- if (isLocalBuf) { /* Only need to adjust flags */ ! pg_atomic_fetch_or_u32(&bufHdr->state, BM_VALID); } else { *************** BufferAlloc(SMgrRelation smgr, char relp *** 939,945 **** BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! BufFlags oldFlags; int buf_id; BufferDesc *buf; bool valid; --- 937,943 ---- BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! uint32 oldFlags; int buf_id; BufferDesc *buf; bool valid; *************** BufferAlloc(SMgrRelation smgr, char relp *** 1001,1024 **** /* Loop here in case we have to try another victim buffer */ for (;;) { /* ! * Ensure, while the spinlock's not yet held, that there's a free * refcount entry. */ ReservePrivateRefCountEntry(); /* * Select a victim buffer. The buffer is returned with its header ! * spinlock still held! */ ! buf = StrategyGetBuffer(strategy); ! Assert(buf->refcount == 0); ! /* Must copy buffer flags while we still hold the spinlock */ ! oldFlags = buf->flags; ! /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); /* --- 999,1024 ---- /* Loop here in case we have to try another victim buffer */ for (;;) { + uint32 state; + /* ! * Ensure, while the header lock isn't yet held, that there's a free * refcount entry. */ ReservePrivateRefCountEntry(); /* * Select a victim buffer. The buffer is returned with its header ! * lock still held! */ ! buf = StrategyGetBuffer(strategy, &state); ! Assert(BUF_STATE_GET_REFCOUNT(state) == 0); ! /* Must copy buffer flags while we still hold the header lock */ ! oldFlags = state & BUF_FLAG_MASK; ! /* Pin the buffer and then release the buffer header lock */ PinBuffer_Locked(buf); /* *************** BufferAlloc(SMgrRelation smgr, char relp *** 1202,1208 **** /* * Need to lock the buffer header too in order to change its tag. */ ! LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were --- 1202,1208 ---- /* * Need to lock the buffer header too in order to change its tag. */ ! state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were *************** BufferAlloc(SMgrRelation smgr, char relp *** 1210,1217 **** * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ ! oldFlags = buf->flags; ! if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) break; UnlockBufHdr(buf); --- 1210,1217 ---- * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ ! oldFlags = state & BUF_FLAG_MASK; ! if (BUF_STATE_GET_REFCOUNT(state) == 1 && !(oldFlags & BM_DIRTY)) break; UnlockBufHdr(buf); *************** BufferAlloc(SMgrRelation smgr, char relp *** 1232,1243 **** * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; ! buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); if (relpersistence == RELPERSISTENCE_PERMANENT) ! buf->flags |= BM_TAG_VALID | BM_PERMANENT; else ! buf->flags |= BM_TAG_VALID; ! buf->usage_count = 1; UnlockBufHdr(buf); --- 1232,1250 ---- * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; ! pg_atomic_fetch_and_u32(&buf->state, ! ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | ! BM_CHECKPOINT_NEEDED | BM_IO_ERROR | ! BM_PERMANENT | ! BUF_USAGECOUNT_MASK)); if (relpersistence == RELPERSISTENCE_PERMANENT) ! pg_atomic_fetch_or_u32(&buf->state, ! BM_TAG_VALID | BM_PERMANENT | ! BUF_USAGECOUNT_ONE); else ! pg_atomic_fetch_or_u32(&buf->state, ! BM_TAG_VALID | ! BUF_USAGECOUNT_ONE); UnlockBufHdr(buf); *************** BufferAlloc(SMgrRelation smgr, char relp *** 1267,1273 **** * InvalidateBuffer -- mark a shared buffer invalid and return it to the * freelist. * ! * The buffer header spinlock must be held at entry. We drop it before * returning. (This is sane because the caller must have locked the * buffer in order to be sure it should be dropped.) * --- 1274,1280 ---- * InvalidateBuffer -- mark a shared buffer invalid and return it to the * freelist. * ! * The buffer header lock must be held at entry. We drop it before * returning. (This is sane because the caller must have locked the * buffer in order to be sure it should be dropped.) * *************** InvalidateBuffer(BufferDesc *buf) *** 1286,1294 **** BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! BufFlags oldFlags; ! /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; UnlockBufHdr(buf); --- 1293,1302 ---- BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! uint32 oldFlags; ! uint32 state; ! /* Save the original buffer tag before dropping the header lock */ oldTag = buf->tag; UnlockBufHdr(buf); *************** retry: *** 1310,1316 **** LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ ! LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) --- 1318,1324 ---- LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ ! state = LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) *************** retry: *** 1329,1335 **** * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ ! if (buf->refcount != 0) { UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); --- 1337,1343 ---- * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ ! if (BUF_STATE_GET_REFCOUNT(state) != 0) { UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); *************** retry: *** 1344,1353 **** * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ ! oldFlags = buf->flags; CLEAR_BUFFERTAG(buf->tag); ! buf->flags = 0; ! buf->usage_count = 0; UnlockBufHdr(buf); --- 1352,1360 ---- * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ ! oldFlags = state & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); ! pg_atomic_fetch_and_u32(&buf->state, BM_LOCKED | ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK)); UnlockBufHdr(buf); *************** void *** 1381,1386 **** --- 1388,1394 ---- MarkBufferDirty(Buffer buffer) { BufferDesc *bufHdr; + uint32 state; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); *************** MarkBufferDirty(Buffer buffer) *** 1397,1410 **** /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(bufHdr->content_lock)); ! LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); /* * If the buffer was not dirty already, do vacuum accounting. */ ! if (!(bufHdr->flags & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; --- 1405,1418 ---- /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(bufHdr->content_lock)); ! state = LockBufHdr(bufHdr); ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); /* * If the buffer was not dirty already, do vacuum accounting. */ ! if (!(state & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; *************** MarkBufferDirty(Buffer buffer) *** 1412,1418 **** VacuumCostBalance += VacuumCostPageDirty; } ! bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); UnlockBufHdr(bufHdr); } --- 1420,1426 ---- VacuumCostBalance += VacuumCostPageDirty; } ! pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY | BM_JUST_DIRTIED); UnlockBufHdr(bufHdr); } *************** ReleaseAndReadBuffer(Buffer buffer, *** 1454,1460 **** else { bufHdr = GetBufferDescriptor(buffer - 1); ! /* we have pin, so it's ok to examine tag without spinlock */ if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) && bufHdr->tag.forkNum == forkNum) --- 1462,1468 ---- else { bufHdr = GetBufferDescriptor(buffer - 1); ! /* we have pin, so it's ok to examine tag without header lock */ if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) && bufHdr->tag.forkNum == forkNum) *************** ReleaseAndReadBuffer(Buffer buffer, *** 1482,1488 **** * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows ! * some callers to avoid an extra spinlock cycle. */ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) --- 1490,1496 ---- * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows ! * some callers to avoid an extra header lock cycle. */ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) *************** PinBuffer(BufferDesc *buf, BufferAccessS *** 1495,1517 **** if (ref == NULL) { ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); ! LockBufHdr(buf); ! buf->refcount++; ! if (strategy == NULL) ! { ! if (buf->usage_count < BM_MAX_USAGE_COUNT) ! buf->usage_count++; ! } ! else { ! if (buf->usage_count == 0) ! buf->usage_count = 1; } ! result = (buf->flags & BM_VALID) != 0; ! UnlockBufHdr(buf); } else { --- 1503,1542 ---- if (ref == NULL) { + uint32 state; + uint32 oldstate; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); ! ! state = pg_atomic_read_u32(&buf->state); ! oldstate = state; ! ! while (true) { ! /* spin-wait till lock is free */ ! while (state & BM_LOCKED) ! { ! pg_spin_delay(); ! state = pg_atomic_read_u32(&buf->state); ! oldstate = state; ! } ! ! /* increase refcount */ ! state += BUF_REFCOUNT_ONE; ! ! /* increase usagecount unless already max */ ! if (BUF_STATE_GET_USAGECOUNT(state) != BM_MAX_USAGE_COUNT) ! state += BUF_USAGECOUNT_ONE; ! ! if (pg_atomic_compare_exchange_u32(&buf->state, &oldstate, state)) ! break; ! ! /* get ready for next loop, oldstate has been updated by cas */ ! state = oldstate; } ! result = (state & BM_VALID) != 0; } else { *************** PinBuffer(BufferDesc *buf, BufferAccessS *** 1527,1535 **** /* * PinBuffer_Locked -- as above, but caller already locked the buffer header. ! * The spinlock is released before return. * ! * As this function is called with the spinlock held, the caller has to * previously call ReservePrivateRefCountEntry(). * * Currently, no callers of this function want to modify the buffer's --- 1552,1560 ---- /* * PinBuffer_Locked -- as above, but caller already locked the buffer header. ! * The header lock is released before return. * ! * As this function is called with the header lock held, the caller has to * previously call ReservePrivateRefCountEntry(). * * Currently, no callers of this function want to modify the buffer's *************** PinBuffer(BufferDesc *buf, BufferAccessS *** 1540,1546 **** * Also all callers only ever use this function when it's known that the * buffer can't have a preexisting pin by this backend. That allows us to skip * searching the private refcount array & hash, which is a boon, because the ! * spinlock is still held. * * Note: use of this routine is frequently mandatory, not just an optimization * to save a spin lock/unlock cycle, because we need to pin a buffer before --- 1565,1571 ---- * Also all callers only ever use this function when it's known that the * buffer can't have a preexisting pin by this backend. That allows us to skip * searching the private refcount array & hash, which is a boon, because the ! * header lock is still held. * * Note: use of this routine is frequently mandatory, not just an optimization * to save a spin lock/unlock cycle, because we need to pin a buffer before *************** PinBuffer_Locked(BufferDesc *buf) *** 1554,1564 **** /* * As explained, We don't expect any preexisting pins. That allows us to ! * manipulate the PrivateRefCount after releasing the spinlock */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); ! buf->refcount++; UnlockBufHdr(buf); b = BufferDescriptorGetBuffer(buf); --- 1579,1589 ---- /* * As explained, We don't expect any preexisting pins. That allows us to ! * manipulate the PrivateRefCount after releasing the header lock */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); ! pg_atomic_fetch_add_u32(&buf->state, 1); UnlockBufHdr(buf); b = BufferDescriptorGetBuffer(buf); *************** UnpinBuffer(BufferDesc *buf, bool fixOwn *** 1594,1623 **** ref->refcount--; if (ref->refcount == 0) { /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(buf->content_lock)); Assert(!LWLockHeldByMe(buf->io_in_progress_lock)); ! LockBufHdr(buf); ! ! /* Decrement the shared reference count */ ! Assert(buf->refcount > 0); ! buf->refcount--; /* Support LockBufferForCleanup() */ ! if ((buf->flags & BM_PIN_COUNT_WAITER) && ! buf->refcount == 1) { ! /* we just released the last pin other than the waiter's */ ! int wait_backend_pid = buf->wait_backend_pid; ! buf->flags &= ~BM_PIN_COUNT_WAITER; ! UnlockBufHdr(buf); ! ProcSendSignal(wait_backend_pid); ! } ! else ! UnlockBufHdr(buf); ForgetPrivateRefCountEntry(ref); } } --- 1619,1658 ---- ref->refcount--; if (ref->refcount == 0) { + uint32 state; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(buf->content_lock)); Assert(!LWLockHeldByMe(buf->io_in_progress_lock)); ! /* ! * Decrement the shared reference count. ! * ! * Arguably it'd be more robust if we checked for BM_LOCKED here, but ! * currently all manipulation of ->state for shared buffers is through ! * atomics. ! */ ! state = pg_atomic_fetch_sub_u32(&buf->state, BUF_REFCOUNT_ONE); ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); /* Support LockBufferForCleanup() */ ! if (state & BM_PIN_COUNT_WAITER) { ! state = LockBufHdr(buf); ! if (state & BM_PIN_COUNT_WAITER && BUF_STATE_GET_REFCOUNT(state) == 1) ! { ! /* we just released the last pin other than the waiter's */ ! int wait_backend_pid = buf->wait_backend_pid; + pg_atomic_fetch_and_u32(&buf->state, + ~BM_PIN_COUNT_WAITER); + UnlockBufHdr(buf); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf); + } ForgetPrivateRefCountEntry(ref); } } *************** UnpinBuffer(BufferDesc *buf, bool fixOwn *** 1635,1640 **** --- 1670,1676 ---- static void BufferSync(int flags) { + uint32 state; int buf_id; int num_to_scan; int num_to_write; *************** BufferSync(int flags) *** 1675,1688 **** BufferDesc *bufHdr = GetBufferDescriptor(buf_id); /* ! * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ ! LockBufHdr(bufHdr); ! if ((bufHdr->flags & mask) == mask) { ! bufHdr->flags |= BM_CHECKPOINT_NEEDED; num_to_write++; } --- 1711,1725 ---- BufferDesc *bufHdr = GetBufferDescriptor(buf_id); /* ! * Header lock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ ! state = LockBufHdr(bufHdr); ! if ((state & mask) == mask) { ! pg_atomic_fetch_or_u32(&bufHdr->state, ! BM_CHECKPOINT_NEEDED); num_to_write++; } *************** BufferSync(int flags) *** 1721,1727 **** * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ ! if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) { --- 1758,1764 ---- * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ ! if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) { *************** SyncOneBuffer(int buf_id, bool skip_rece *** 2081,2086 **** --- 2118,2124 ---- { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 state; ReservePrivateRefCountEntry(); *************** SyncOneBuffer(int buf_id, bool skip_rece *** 2093,2102 **** * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ ! LockBufHdr(bufHdr); ! if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) result |= BUF_REUSABLE; else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ --- 2131,2143 ---- * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ ! state = LockBufHdr(bufHdr); ! if (BUF_STATE_GET_REFCOUNT(state) == 0 && ! BUF_STATE_GET_USAGECOUNT(state) == 0) ! { result |= BUF_REUSABLE; + } else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ *************** SyncOneBuffer(int buf_id, bool skip_rece *** 2104,2110 **** return result; } ! if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) { /* It's clean, so nothing to do */ UnlockBufHdr(bufHdr); --- 2145,2151 ---- return result; } ! if (!(state & BM_VALID) || !(state & BM_DIRTY)) { /* It's clean, so nothing to do */ UnlockBufHdr(bufHdr); *************** PrintBufferLeakWarning(Buffer buffer) *** 2256,2261 **** --- 2297,2303 ---- int32 loccount; char *path; BackendId backend; + uint32 state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) *************** PrintBufferLeakWarning(Buffer buffer) *** 2273,2284 **** /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, ! buf->tag.blockNum, buf->flags, ! buf->refcount, loccount); pfree(path); } --- 2315,2327 ---- /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, ! buf->tag.blockNum, state & BUF_FLAG_MASK, ! BUF_STATE_GET_REFCOUNT(state), loccount); pfree(path); } *************** BufferGetBlockNumber(Buffer buffer) *** 2333,2339 **** else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without spinlock */ return bufHdr->tag.blockNum; } --- 2376,2382 ---- else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without lock */ return bufHdr->tag.blockNum; } *************** BufferGetTag(Buffer buffer, RelFileNode *** 2356,2362 **** else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without spinlock */ *rnode = bufHdr->tag.rnode; *forknum = bufHdr->tag.forkNum; *blknum = bufHdr->tag.blockNum; --- 2399,2405 ---- else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without lock */ *rnode = bufHdr->tag.rnode; *forknum = bufHdr->tag.forkNum; *blknum = bufHdr->tag.blockNum; *************** FlushBuffer(BufferDesc *buf, SMgrRelatio *** 2424,2430 **** recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ ! buf->flags &= ~BM_JUST_DIRTIED; UnlockBufHdr(buf); /* --- 2467,2473 ---- recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ ! pg_atomic_fetch_and_u32(&buf->state, ~BM_JUST_DIRTIED); UnlockBufHdr(buf); /* *************** FlushBuffer(BufferDesc *buf, SMgrRelatio *** 2444,2450 **** * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ ! if (buf->flags & BM_PERMANENT) XLogFlush(recptr); /* --- 2487,2493 ---- * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ ! if (pg_atomic_read_u32(&buf->state) & BM_PERMANENT) XLogFlush(recptr); /* *************** BufferIsPermanent(Buffer buffer) *** 2532,2544 **** /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we ! * need not bother with the buffer header spinlock. Even if someone else * changes the buffer header flags while we're doing this, we assume that * changing an aligned 2-byte BufFlags value is atomic, so we'll read the * old value or the new value, but not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); ! return (bufHdr->flags & BM_PERMANENT) != 0; } /* --- 2575,2587 ---- /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we ! * need not bother with the buffer header lock. Even if someone else * changes the buffer header flags while we're doing this, we assume that * changing an aligned 2-byte BufFlags value is atomic, so we'll read the * old value or the new value, but not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); ! return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* *************** DropRelFileNodeBuffers(RelFileNodeBacken *** 2638,2644 **** if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 2681,2687 ---- if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) ! InvalidateBuffer(bufHdr); /* releases lock */ else UnlockBufHdr(bufHdr); } *************** DropRelFileNodesAllBuffers(RelFileNodeBa *** 2736,2742 **** LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 2779,2785 ---- LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) ! InvalidateBuffer(bufHdr); /* releases lock */ else UnlockBufHdr(bufHdr); } *************** DropDatabaseBuffers(Oid dbid) *** 2778,2784 **** LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 2821,2827 ---- LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) ! InvalidateBuffer(bufHdr); /* releases lock */ else UnlockBufHdr(bufHdr); } *************** FlushRelationBuffers(Relation rel) *** 2874,2880 **** { bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; --- 2917,2924 ---- { bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY)) ! == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; *************** FlushRelationBuffers(Relation rel) *** 2895,2901 **** localpage, false); ! bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); /* Pop the error context stack */ error_context_stack = errcallback.previous; --- 2939,2945 ---- localpage, false); ! pg_atomic_fetch_and_u32(&bufHdr->state, ~(BM_DIRTY | BM_JUST_DIRTIED)); /* Pop the error context stack */ error_context_stack = errcallback.previous; *************** FlushRelationBuffers(Relation rel) *** 2923,2929 **** LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); --- 2967,2974 ---- LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY)) ! == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); *************** FlushDatabaseBuffers(Oid dbid) *** 2975,2981 **** LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && ! (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); --- 3020,3027 ---- LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && ! (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY)) ! == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3086,3104 **** * This routine might get called many times on the same page, if we are * making the first scan after commit of an xact that added/deleted many * tuples. So, be as quick as we can if the buffer is already dirty. We ! * do this by not acquiring spinlock if it looks like the status bits are * already set. Since we make this test unlocked, there's a chance we * might fail to notice that the flags have just been cleared, and failed * to reset them, due to memory-ordering issues. But since this function * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ ! if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; /* * If we need to protect hint bit updates from torn writes, WAL-log a --- 3132,3151 ---- * This routine might get called many times on the same page, if we are * making the first scan after commit of an xact that added/deleted many * tuples. So, be as quick as we can if the buffer is already dirty. We ! * do this by not acquiring header lock if it looks like the status bits are * already set. Since we make this test unlocked, there's a chance we * might fail to notice that the flags have just been cleared, and failed * to reset them, due to memory-ordering issues. But since this function * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ ! if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 state; /* * If we need to protect hint bit updates from torn writes, WAL-log a *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3109,3115 **** * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ ! if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. --- 3156,3162 ---- * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ ! if (XLogHintBitIsNeeded() && (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3149,3156 **** } LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); ! if (!(bufHdr->flags & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ --- 3196,3207 ---- } LockBufHdr(bufHdr); ! ! state = pg_atomic_read_u32(&bufHdr->state); ! ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); ! ! if (!(state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3170,3176 **** if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } ! bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); UnlockBufHdr(bufHdr); if (delayChkpt) --- 3221,3229 ---- if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } ! ! pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY | BM_JUST_DIRTIED); ! UnlockBufHdr(bufHdr); if (delayChkpt) *************** UnlockBuffers(void) *** 3208,3216 **** * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ ! if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) ! buf->flags &= ~BM_PIN_COUNT_WAITER; UnlockBufHdr(buf); --- 3261,3269 ---- * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ ! if ((pg_atomic_read_u32(&buf->state) & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) ! pg_atomic_fetch_and_u32(&buf->state, ~BM_PIN_COUNT_WAITER); UnlockBufHdr(buf); *************** LockBufferForCleanup(Buffer buffer) *** 3304,3328 **** for (;;) { /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ! LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); ! if (bufHdr->refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); return; } /* Failed, so mark myself as waiting for pincount 1 */ ! if (bufHdr->flags & BM_PIN_COUNT_WAITER) { UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; ! bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); --- 3357,3384 ---- for (;;) { + int state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ! state = LockBufHdr(bufHdr); ! ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); ! if (BUF_STATE_GET_REFCOUNT(state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); return; } /* Failed, so mark myself as waiting for pincount 1 */ ! if (state & BM_PIN_COUNT_WAITER) { UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; ! pg_atomic_fetch_or_u32(&bufHdr->state, BM_PIN_COUNT_WAITER); PinCountWaitBuf = bufHdr; UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); *************** LockBufferForCleanup(Buffer buffer) *** 3349,3357 **** * better be safe. */ LockBufHdr(bufHdr); ! if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) ! bufHdr->flags &= ~BM_PIN_COUNT_WAITER; UnlockBufHdr(bufHdr); PinCountWaitBuf = NULL; --- 3405,3413 ---- * better be safe. */ LockBufHdr(bufHdr); ! if ((pg_atomic_read_u32(&bufHdr->state) & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) ! pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_PIN_COUNT_WAITER); UnlockBufHdr(bufHdr); PinCountWaitBuf = NULL; *************** bool *** 3393,3414 **** ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { /* There should be exactly one pin */ ! Assert(LocalRefCount[-buffer - 1] > 0); ! if (LocalRefCount[-buffer - 1] != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ ! Assert(GetPrivateRefCount(buffer) > 0); ! if (GetPrivateRefCount(buffer) != 1) return false; /* Try to acquire lock */ --- 3449,3474 ---- ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; + uint32 state, + refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ ! Assert(refcount > 0); ! if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ ! refcount = GetPrivateRefCount(buffer); ! Assert(refcount); ! if (refcount != 1) return false; /* Try to acquire lock */ *************** ConditionalLockBufferForCleanup(Buffer b *** 3416,3424 **** return false; bufHdr = GetBufferDescriptor(buffer - 1); ! LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); ! if (bufHdr->refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); --- 3476,3486 ---- return false; bufHdr = GetBufferDescriptor(buffer - 1); ! state = LockBufHdr(bufHdr); ! refcount = BUF_STATE_GET_REFCOUNT(state); ! ! Assert(refcount > 0); ! if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); *************** WaitIO(BufferDesc *buf) *** 3456,3472 **** */ for (;;) { ! BufFlags sv_flags; /* ! * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ ! LockBufHdr(buf); ! sv_flags = buf->flags; UnlockBufHdr(buf); ! if (!(sv_flags & BM_IO_IN_PROGRESS)) break; LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); LWLockRelease(buf->io_in_progress_lock); --- 3518,3534 ---- */ for (;;) { ! uint32 state; /* ! * It may not be necessary to acquire the header lock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ ! state = LockBufHdr(buf); UnlockBufHdr(buf); ! ! if (!(state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); LWLockRelease(buf->io_in_progress_lock); *************** WaitIO(BufferDesc *buf) *** 3494,3499 **** --- 3556,3563 ---- static bool StartBufferIO(BufferDesc *buf, bool forInput) { + uint32 state; + Assert(!InProgressBuf); for (;;) *************** StartBufferIO(BufferDesc *buf, bool forI *** 3504,3512 **** */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); ! LockBufHdr(buf); ! if (!(buf->flags & BM_IO_IN_PROGRESS)) break; /* --- 3568,3576 ---- */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); ! state = LockBufHdr(buf); ! if (!(state & BM_IO_IN_PROGRESS)) break; /* *************** StartBufferIO(BufferDesc *buf, bool forI *** 3522,3528 **** /* Once we get here, there is definitely no I/O active on this buffer */ ! if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) { /* someone else already did the I/O */ UnlockBufHdr(buf); --- 3586,3592 ---- /* Once we get here, there is definitely no I/O active on this buffer */ ! if (forInput ? (state & BM_VALID) : !(state & BM_DIRTY)) { /* someone else already did the I/O */ UnlockBufHdr(buf); *************** StartBufferIO(BufferDesc *buf, bool forI *** 3530,3536 **** return false; } ! buf->flags |= BM_IO_IN_PROGRESS; UnlockBufHdr(buf); --- 3594,3600 ---- return false; } ! pg_atomic_fetch_or_u32(&buf->state, BM_IO_IN_PROGRESS); UnlockBufHdr(buf); *************** StartBufferIO(BufferDesc *buf, bool forI *** 3560,3574 **** static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) { Assert(buf == InProgressBuf); ! LockBufHdr(buf); ! Assert(buf->flags & BM_IO_IN_PROGRESS); ! buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); ! if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) ! buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); ! buf->flags |= set_flag_bits; UnlockBufHdr(buf); --- 3624,3642 ---- static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) { + uint32 state; + Assert(buf == InProgressBuf); ! state = LockBufHdr(buf); ! Assert(state & BM_IO_IN_PROGRESS); ! ! pg_atomic_fetch_and_u32(&buf->state, ~(BM_IO_IN_PROGRESS | BM_IO_ERROR)); ! if (clear_dirty && !(pg_atomic_read_u32(&buf->state) & BM_JUST_DIRTIED)) ! pg_atomic_fetch_and_u32(&buf->state, ~(BM_DIRTY | BM_CHECKPOINT_NEEDED)); ! ! pg_atomic_fetch_or_u32(&buf->state, set_flag_bits); UnlockBufHdr(buf); *************** AbortBufferIO(void) *** 3593,3598 **** --- 3661,3667 ---- if (buf) { + uint32 state; /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that *************** AbortBufferIO(void) *** 3601,3626 **** */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); ! LockBufHdr(buf); ! Assert(buf->flags & BM_IO_IN_PROGRESS); if (IsForInput) { ! Assert(!(buf->flags & BM_DIRTY)); /* We'd better not think buffer is valid yet */ ! Assert(!(buf->flags & BM_VALID)); UnlockBufHdr(buf); } else { ! BufFlags sv_flags; ! ! sv_flags = buf->flags; ! Assert(sv_flags & BM_DIRTY); UnlockBufHdr(buf); /* Issue notice if this is not the first failure... */ ! if (sv_flags & BM_IO_ERROR) { ! /* Buffer is pinned, so we can read tag without spinlock */ char *path; path = relpathperm(buf->tag.rnode, buf->tag.forkNum); --- 3670,3693 ---- */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); ! state = LockBufHdr(buf); ! Assert(state & BM_IO_IN_PROGRESS); if (IsForInput) { ! Assert(!(state & BM_DIRTY)); ! /* We'd better not think buffer is valid yet */ ! Assert(!(state & BM_VALID)); UnlockBufHdr(buf); } else { ! Assert(state & BM_DIRTY); UnlockBufHdr(buf); /* Issue notice if this is not the first failure... */ ! if (state & BM_IO_ERROR) { ! /* Buffer is pinned, so we can read tag without header lock */ char *path; path = relpathperm(buf->tag.rnode, buf->tag.forkNum); *************** shared_buffer_write_error_callback(void *** 3644,3650 **** { BufferDesc *bufHdr = (BufferDesc *) arg; ! /* Buffer is pinned, so we can read the tag without locking the spinlock */ if (bufHdr != NULL) { char *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum); --- 3711,3717 ---- { BufferDesc *bufHdr = (BufferDesc *) arg; ! /* Buffer is pinned, so we can read the tag without locking the header */ if (bufHdr != NULL) { char *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum); *************** rnode_comparator(const void *p1, const v *** 3700,3702 **** --- 3767,3800 ---- else return 0; } + + uint32 + LockBufHdr(volatile BufferDesc *desc) + { + uint32 state = pg_atomic_read_u32(&desc->state); + + for (;;) + { + /* wait till lock is free */ + while (state & BM_LOCKED) + { + pg_spin_delay(); + state = pg_atomic_read_u32(&desc->state); + + /* Add exponential backoff? Should seldomly be contended tho. */ + } + + /* and try to get lock */ + if (pg_atomic_compare_exchange_u32(&desc->state, &state, state | BM_LOCKED)) + break; + } + return state | BM_LOCKED; + } + + void + UnlockBufHdr(volatile BufferDesc *desc) + { + Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED); + + pg_atomic_sub_fetch_u32(&desc->state, BM_LOCKED); + } diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c new file mode 100644 index 0234572..8b64855 *** a/src/backend/storage/buffer/freelist.c --- b/src/backend/storage/buffer/freelist.c *************** typedef struct BufferAccessStrategyData *** 98,104 **** /* Prototypes for internal functions */ ! static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); --- 98,105 ---- /* Prototypes for internal functions */ ! static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, ! uint32 *lockstate); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); *************** ClockSweepTick(void) *** 180,186 **** * return the buffer with the buffer header spinlock still held. */ BufferDesc * ! StrategyGetBuffer(BufferAccessStrategy strategy) { BufferDesc *buf; int bgwprocno; --- 181,187 ---- * return the buffer with the buffer header spinlock still held. */ BufferDesc * ! StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *lockstate) { BufferDesc *buf; int bgwprocno; *************** StrategyGetBuffer(BufferAccessStrategy s *** 192,198 **** */ if (strategy != NULL) { ! buf = GetBufferFromRing(strategy); if (buf != NULL) return buf; } --- 193,199 ---- */ if (strategy != NULL) { ! buf = GetBufferFromRing(strategy, lockstate); if (buf != NULL) return buf; } *************** StrategyGetBuffer(BufferAccessStrategy s *** 250,255 **** --- 251,258 ---- { while (true) { + uint32 state; + /* Acquire the spinlock to remove element from the freelist */ SpinLockAcquire(&StrategyControl->buffer_strategy_lock); *************** StrategyGetBuffer(BufferAccessStrategy s *** 279,289 **** * it before we got to it. It's probably impossible altogether as * of 8.3, but we'd better check anyway.) */ ! LockBufHdr(buf); ! if (buf->refcount == 0 && buf->usage_count == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); return buf; } UnlockBufHdr(buf); --- 282,294 ---- * it before we got to it. It's probably impossible altogether as * of 8.3, but we'd better check anyway.) */ ! state = LockBufHdr(buf); ! if (BUF_STATE_GET_REFCOUNT(state) == 0 ! && BUF_STATE_GET_USAGECOUNT(state) == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); + *lockstate = state; return buf; } UnlockBufHdr(buf); *************** StrategyGetBuffer(BufferAccessStrategy s *** 295,300 **** --- 300,306 ---- trycounter = NBuffers; for (;;) { + uint32 state; buf = GetBufferDescriptor(ClockSweepTick()); *************** StrategyGetBuffer(BufferAccessStrategy s *** 302,313 **** * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */ ! LockBufHdr(buf); ! if (buf->refcount == 0) { ! if (buf->usage_count > 0) { ! buf->usage_count--; trycounter = NBuffers; } else --- 308,321 ---- * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */ ! state = LockBufHdr(buf); ! ! if (BUF_STATE_GET_REFCOUNT(state) == 0) { ! if (BUF_STATE_GET_USAGECOUNT(state) != 0) { ! pg_atomic_fetch_sub_u32(&buf->state, BUF_USAGECOUNT_ONE); ! trycounter = NBuffers; } else *************** StrategyGetBuffer(BufferAccessStrategy s *** 315,320 **** --- 323,329 ---- /* Found a usable buffer */ if (strategy != NULL) AddBufferToRing(strategy, buf); + *lockstate = state; return buf; } } *************** FreeAccessStrategy(BufferAccessStrategy *** 585,594 **** * The bufhdr spin lock is held on the returned buffer. */ static BufferDesc * ! GetBufferFromRing(BufferAccessStrategy strategy) { BufferDesc *buf; Buffer bufnum; /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) --- 594,604 ---- * The bufhdr spin lock is held on the returned buffer. */ static BufferDesc * ! GetBufferFromRing(BufferAccessStrategy strategy, uint32 *lockstate) { BufferDesc *buf; Buffer bufnum; + uint32 state; /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) *************** GetBufferFromRing(BufferAccessStrategy s *** 616,625 **** * shouldn't re-use it. */ buf = GetBufferDescriptor(bufnum - 1); ! LockBufHdr(buf); ! if (buf->refcount == 0 && buf->usage_count <= 1) { strategy->current_was_in_ring = true; return buf; } UnlockBufHdr(buf); --- 626,637 ---- * shouldn't re-use it. */ buf = GetBufferDescriptor(bufnum - 1); ! state = LockBufHdr(buf); ! if (BUF_STATE_GET_REFCOUNT(state) == 0 ! && BUF_STATE_GET_USAGECOUNT(state) <= 1) { strategy->current_was_in_ring = true; + *lockstate = state; return buf; } UnlockBufHdr(buf); diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c new file mode 100644 index 3144afe..c62a6f2 *** a/src/backend/storage/buffer/localbuf.c --- b/src/backend/storage/buffer/localbuf.c *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 108,113 **** --- 108,114 ---- int b; int trycounter; bool found; + uint32 state; INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 128,143 **** fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { ! if (bufHdr->usage_count < BM_MAX_USAGE_COUNT) ! bufHdr->usage_count++; } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); ! if (bufHdr->flags & BM_VALID) *foundPtr = TRUE; else { --- 129,149 ---- fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif + state = pg_atomic_read_u32(&bufHdr->state); + /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { ! if (BUF_STATE_GET_USAGECOUNT(state) < BM_MAX_USAGE_COUNT) ! { ! state += BUF_USAGECOUNT_ONE; ! pg_atomic_write_u32(&bufHdr->state, state); ! } } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); ! if (state & BM_VALID) *foundPtr = TRUE; else { *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 169,177 **** if (LocalRefCount[b] == 0) { ! if (bufHdr->usage_count > 0) { ! bufHdr->usage_count--; trycounter = NLocBuffer; } else --- 175,186 ---- if (LocalRefCount[b] == 0) { ! state = pg_atomic_read_u32(&bufHdr->state); ! ! if (BUF_STATE_GET_USAGECOUNT(state) > 0) { ! state -= BUF_USAGECOUNT_ONE; ! pg_atomic_write_u32(&bufHdr->state, state); trycounter = NLocBuffer; } else *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 193,199 **** * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ ! if (bufHdr->flags & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); --- 202,208 ---- * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ ! if (state & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 211,217 **** false); /* Mark not-dirty now in case we error out below */ ! bufHdr->flags &= ~BM_DIRTY; pgBufferUsage.local_blks_written++; } --- 220,227 ---- false); /* Mark not-dirty now in case we error out below */ ! state &= ~BM_DIRTY; ! pg_atomic_write_u32(&bufHdr->state, state); pgBufferUsage.local_blks_written++; } *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 228,234 **** /* * Update the hash table: remove old entry, if any, and make new one. */ ! if (bufHdr->flags & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, --- 238,244 ---- /* * Update the hash table: remove old entry, if any, and make new one. */ ! if (state & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 237,243 **** elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); ! bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID); } hresult = (LocalBufferLookupEnt *) --- 247,254 ---- elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); ! state &= ~(BM_VALID | BM_TAG_VALID); ! pg_atomic_write_u32(&bufHdr->state, state); } hresult = (LocalBufferLookupEnt *) *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 250,258 **** * it's all ours now. */ bufHdr->tag = newTag; ! bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); ! bufHdr->flags |= BM_TAG_VALID; ! bufHdr->usage_count = 1; *foundPtr = FALSE; return bufHdr; --- 261,271 ---- * it's all ours now. */ bufHdr->tag = newTag; ! state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); ! state |= BM_TAG_VALID; ! state &= ~BUF_USAGECOUNT_MASK; ! state += BUF_USAGECOUNT_ONE; ! pg_atomic_write_u32(&bufHdr->state, state); *foundPtr = FALSE; return bufHdr; *************** MarkLocalBufferDirty(Buffer buffer) *** 267,272 **** --- 280,286 ---- { int bufid; BufferDesc *bufHdr; + uint32 state; Assert(BufferIsLocal(buffer)); *************** MarkLocalBufferDirty(Buffer buffer) *** 280,289 **** bufHdr = GetLocalBufferDescriptor(bufid); ! if (!(bufHdr->flags & BM_DIRTY)) ! pgBufferUsage.local_blks_dirtied++; ! bufHdr->flags |= BM_DIRTY; } /* --- 294,303 ---- bufHdr = GetLocalBufferDescriptor(bufid); ! state = pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY); ! if (!(state & BM_DIRTY)) ! pgBufferUsage.local_blks_dirtied++; } /* *************** DropRelFileNodeLocalBuffers(RelFileNode *** 307,314 **** { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; ! if ((bufHdr->flags & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) --- 321,331 ---- { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 state; ! state = pg_atomic_read_u32(&bufHdr->state); ! ! if ((state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) *************** DropRelFileNodeLocalBuffers(RelFileNode *** 327,334 **** elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! bufHdr->flags = 0; ! bufHdr->usage_count = 0; } } } --- 344,352 ---- elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! state &= ~BUF_FLAG_MASK; ! state &= ~BUF_USAGECOUNT_MASK; ! pg_atomic_write_u32(&bufHdr->state, state); } } } *************** DropRelFileNodeAllLocalBuffers(RelFileNo *** 349,356 **** { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; ! if ((bufHdr->flags & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) --- 367,377 ---- { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 state; ! state = pg_atomic_read_u32(&bufHdr->state); ! ! if ((state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) *************** DropRelFileNodeAllLocalBuffers(RelFileNo *** 367,374 **** elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! bufHdr->flags = 0; ! bufHdr->usage_count = 0; } } } --- 388,396 ---- elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! state &= ~BUF_FLAG_MASK; ! state &= ~BUF_USAGECOUNT_MASK; ! pg_atomic_write_u32(&bufHdr->state, state); } } } diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h new file mode 100644 index 19247c4..3d3e0d5 *** a/src/include/storage/buf_internals.h --- b/src/include/storage/buf_internals.h *************** *** 20,48 **** #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/smgr.h" #include "storage/spin.h" #include "utils/relcache.h" /* * Flags for buffer descriptors * * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ ! #define BM_DIRTY (1 << 0) /* data needs writing */ ! #define BM_VALID (1 << 1) /* data is valid */ ! #define BM_TAG_VALID (1 << 2) /* tag is assigned */ ! #define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */ ! #define BM_IO_ERROR (1 << 4) /* previous I/O failed */ ! #define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */ ! #define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */ ! #define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */ ! #define BM_PERMANENT (1 << 8) /* permanent relation (not * unlogged) */ - - typedef bits16 BufFlags; - /* * The maximum allowed value of usage_count represents a tradeoff between * accuracy and speed of the clock-sweep buffer management algorithm. A --- 20,62 ---- #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/smgr.h" + #include "port/atomics.h" #include "storage/spin.h" #include "utils/relcache.h" /* + * State is: + * 10 bit flags + * 4 bit usage count + * 18 bit refcount + */ + #define BUF_REFCOUNT_ONE 1 + #define BUF_REFCOUNT_MASK ((1U << 18) - 1) + #define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK) + #define BUF_USAGECOUNT_MASK 0x003C0000U + #define BUF_USAGECOUNT_ONE (1U << 18) + #define BUF_USAGECOUNT_SHIFT 18 + #define BUF_STATE_GET_USAGECOUNT(state) (((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT) + #define BUF_FLAG_MASK 0xFFC00000U + + /* * Flags for buffer descriptors * * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ ! #define BM_LOCKED (1U << 22) /* buffer header is locked */ ! #define BM_DIRTY (1U << 23) /* data needs writing */ ! #define BM_VALID (1U << 24) /* data is valid */ ! #define BM_TAG_VALID (1U << 25) /* tag is assigned */ ! #define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */ ! #define BM_IO_ERROR (1U << 27) /* previous I/O failed */ ! #define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */ ! #define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */ ! #define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */ ! #define BM_PERMANENT (1U << 31) /* permanent relation (not * unlogged) */ /* * The maximum allowed value of usage_count represents a tradeoff between * accuracy and speed of the clock-sweep buffer management algorithm. A *************** typedef struct buftag *** 137,148 **** typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ - BufFlags flags; /* see bit definitions above */ - uint16 usage_count; /* usage counter for clock sweep code */ - unsigned refcount; /* # of backends holding pins on buffer */ - int wait_backend_pid; /* backend PID of pin-count waiter */ ! slock_t buf_hdr_lock; /* protects the above fields */ int buf_id; /* buffer's index number (from 0) */ int freeNext; /* link in freelist chain */ --- 151,161 ---- typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ ! /* state of the tag, containing flags, refcount and usagecount */ ! pg_atomic_uint32 state; ! ! int wait_backend_pid; /* backend PID of pin-count waiter */ int buf_id; /* buffer's index number (from 0) */ int freeNext; /* link in freelist chain */ *************** typedef union BufferDescPadded *** 192,202 **** #define FREENEXT_NOT_IN_LIST (-2) /* ! * Macros for acquiring/releasing a shared buffer header's spinlock. ! * Do not apply these to local buffers! */ ! #define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock) ! #define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock) /* in buf_init.c */ --- 205,215 ---- #define FREENEXT_NOT_IN_LIST (-2) /* ! * Functions for acquiring/releasing a shared buffer header's spinlock. Do ! * not apply these to local buffers! FIXUP! */ ! extern uint32 LockBufHdr(volatile BufferDesc *desc); ! extern void UnlockBufHdr(volatile BufferDesc *desc); /* in buf_init.c */ *************** extern BufferDesc *LocalBufferDescriptor *** 211,217 **** */ /* freelist.c */ ! extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy); extern void StrategyFreeBuffer(BufferDesc *buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf); --- 224,231 ---- */ /* freelist.c */ ! extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, ! uint32 *state); extern void StrategyFreeBuffer(BufferDesc *buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf);