diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c new file mode 100644 index 6622d22..35b5ee9 *** a/contrib/pg_buffercache/pg_buffercache_pages.c --- b/contrib/pg_buffercache/pg_buffercache_pages.c *************** pg_buffercache_pages(PG_FUNCTION_ARGS) *** 149,158 **** for (i = 0; i < NBuffers; i++) { volatile BufferDesc *bufHdr; bufHdr = GetBufferDescriptor(i); /* Lock each buffer header before inspecting. */ ! LockBufHdr(bufHdr); fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr); fctx->record[i].relfilenode = bufHdr->tag.rnode.relNode; --- 149,159 ---- for (i = 0; i < NBuffers; i++) { volatile BufferDesc *bufHdr; + uint32 state; bufHdr = GetBufferDescriptor(i); /* Lock each buffer header before inspecting. */ ! state = LockBufHdr(bufHdr); fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr); fctx->record[i].relfilenode = bufHdr->tag.rnode.relNode; *************** pg_buffercache_pages(PG_FUNCTION_ARGS) *** 160,175 **** fctx->record[i].reldatabase = bufHdr->tag.rnode.dbNode; fctx->record[i].forknum = bufHdr->tag.forkNum; fctx->record[i].blocknum = bufHdr->tag.blockNum; ! fctx->record[i].usagecount = bufHdr->usage_count; ! fctx->record[i].pinning_backends = bufHdr->refcount; ! if (bufHdr->flags & BM_DIRTY) fctx->record[i].isdirty = true; else fctx->record[i].isdirty = false; /* Note if the buffer is valid, and has storage created */ ! if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_TAG_VALID)) fctx->record[i].isvalid = true; else fctx->record[i].isvalid = false; --- 161,176 ---- fctx->record[i].reldatabase = bufHdr->tag.rnode.dbNode; fctx->record[i].forknum = bufHdr->tag.forkNum; fctx->record[i].blocknum = bufHdr->tag.blockNum; ! fctx->record[i].usagecount = BUF_STATE_GET_USAGECOUNT(state); ! fctx->record[i].pinning_backends = BUF_STATE_GET_REFCOUNT(state); ! if (state & BM_DIRTY) fctx->record[i].isdirty = true; else fctx->record[i].isdirty = false; /* Note if the buffer is valid, and has storage created */ ! if ((state & BM_VALID) && (state & BM_TAG_VALID)) fctx->record[i].isvalid = true; else fctx->record[i].isvalid = false; diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c new file mode 100644 index bfa37f1..a5cffc7 *** a/src/backend/storage/buffer/buf_init.c --- b/src/backend/storage/buffer/buf_init.c *************** InitBufferPool(void) *** 135,146 **** BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - buf->refcount = 0; - buf->wait_backend_pid = 0; ! SpinLockInit(&buf->buf_hdr_lock); buf->buf_id = i; --- 135,143 ---- BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); ! pg_atomic_init_u32(&buf->state, 0); ! buf->wait_backend_pid = 0; buf->buf_id = i; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c new file mode 100644 index 6dd7c6e..fe6fb9c *** a/src/backend/storage/buffer/bufmgr.c --- b/src/backend/storage/buffer/bufmgr.c *************** *** 52,58 **** #include "utils/resowner_private.h" #include "utils/timestamp.h" - /* Note: these two macros only work on shared buffers, not local ones! */ #define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) --- 52,57 ---- *************** static BufferDesc *PinCountWaitBuf = NUL *** 163,169 **** * entry using ReservePrivateRefCountEntry() and then later, if necessary, * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing * memory allocations in NewPrivateRefCountEntry() which can be important ! * because in some scenarios it's called with a spinlock held... */ static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES]; static HTAB *PrivateRefCountHash = NULL; --- 162,168 ---- * entry using ReservePrivateRefCountEntry() and then later, if necessary, * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing * memory allocations in NewPrivateRefCountEntry() which can be important ! * because in some scenarios it's called with a header lock held... */ static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES]; static HTAB *PrivateRefCountHash = NULL; *************** static int SyncOneBuffer(int buf_id, boo *** 440,446 **** static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, ! int set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, --- 439,445 ---- static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, ! uint32 set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, *************** ReadBuffer_common(SMgrRelation smgr, cha *** 815,823 **** */ if (isLocalBuf) { ! /* Only need to adjust flags */ ! Assert(bufHdr->flags & BM_VALID); ! bufHdr->flags &= ~BM_VALID; } else { --- 814,827 ---- */ if (isLocalBuf) { ! /* ! * Since it's local buffer, there is no concurrency. We assume ! * read/write pair to be cheaper than atomic AND. ! */ ! uint32 state = pg_atomic_read_u32(&bufHdr->state); ! Assert(state & BM_VALID); ! state &= ~BM_VALID; ! pg_atomic_write_u32(&bufHdr->state, state); } else { *************** ReadBuffer_common(SMgrRelation smgr, cha *** 828,837 **** */ do { ! LockBufHdr(bufHdr); ! Assert(bufHdr->flags & BM_VALID); ! bufHdr->flags &= ~BM_VALID; ! UnlockBufHdr(bufHdr); } while (!StartBufferIO(bufHdr, true)); } } --- 832,841 ---- */ do { ! uint32 state = LockBufHdr(bufHdr); ! Assert(state & BM_VALID); ! state &= ~(BM_VALID | BM_LOCKED); ! pg_atomic_write_u32(&bufHdr->state, state); } while (!StartBufferIO(bufHdr, true)); } } *************** ReadBuffer_common(SMgrRelation smgr, cha *** 848,854 **** * it's not been recycled) but come right back here to try smgrextend * again. */ ! Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); --- 852,858 ---- * it's not been recycled) but come right back here to try smgrextend * again. */ ! Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* header lock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); *************** ReadBuffer_common(SMgrRelation smgr, cha *** 932,939 **** if (isLocalBuf) { ! /* Only need to adjust flags */ ! bufHdr->flags |= BM_VALID; } else { --- 936,948 ---- if (isLocalBuf) { ! /* ! * Only need to adjust flags. Since it's local buffer, there is no ! * concurrency. We assume read/write pair to be cheaper than atomic OR. ! */ ! uint32 state = pg_atomic_read_u32(&bufHdr->state); ! state |= BM_VALID; ! pg_atomic_write_u32(&bufHdr->state, state); } else { *************** BufferAlloc(SMgrRelation smgr, char relp *** 987,996 **** BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! BufFlags oldFlags; int buf_id; BufferDesc *buf; bool valid; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); --- 996,1006 ---- BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! uint32 oldFlags; int buf_id; BufferDesc *buf; bool valid; + uint32 state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); *************** BufferAlloc(SMgrRelation smgr, char relp *** 1050,1072 **** for (;;) { /* ! * Ensure, while the spinlock's not yet held, that there's a free * refcount entry. */ ReservePrivateRefCountEntry(); /* * Select a victim buffer. The buffer is returned with its header ! * spinlock still held! */ ! buf = StrategyGetBuffer(strategy); ! Assert(buf->refcount == 0); ! /* Must copy buffer flags while we still hold the spinlock */ ! oldFlags = buf->flags; ! /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); /* --- 1060,1082 ---- for (;;) { /* ! * Ensure, while the header lock isn't yet held, that there's a free * refcount entry. */ ReservePrivateRefCountEntry(); /* * Select a victim buffer. The buffer is returned with its header ! * lock still held! */ ! buf = StrategyGetBuffer(strategy, &state); ! Assert(BUF_STATE_GET_REFCOUNT(state) == 0); ! /* Must copy buffer flags while we still hold the header lock */ ! oldFlags = state & BUF_FLAG_MASK; ! /* Pin the buffer and then release the buffer header lock */ PinBuffer_Locked(buf); /* *************** BufferAlloc(SMgrRelation smgr, char relp *** 1254,1260 **** /* * Need to lock the buffer header too in order to change its tag. */ ! LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were --- 1264,1270 ---- /* * Need to lock the buffer header too in order to change its tag. */ ! state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were *************** BufferAlloc(SMgrRelation smgr, char relp *** 1262,1269 **** * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ ! oldFlags = buf->flags; ! if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) break; UnlockBufHdr(buf); --- 1272,1279 ---- * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ ! oldFlags = state & BUF_FLAG_MASK; ! if (BUF_STATE_GET_REFCOUNT(state) == 1 && !(oldFlags & BM_DIRTY)) break; UnlockBufHdr(buf); *************** BufferAlloc(SMgrRelation smgr, char relp *** 1284,1297 **** * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; ! buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); if (relpersistence == RELPERSISTENCE_PERMANENT) ! buf->flags |= BM_TAG_VALID | BM_PERMANENT; else ! buf->flags |= BM_TAG_VALID; ! buf->usage_count = 1; ! UnlockBufHdr(buf); if (oldFlags & BM_TAG_VALID) { --- 1294,1308 ---- * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; ! state &= ~(BM_VALID | BM_LOCKED | BM_DIRTY | BM_JUST_DIRTIED | ! BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | ! BUF_USAGECOUNT_MASK); if (relpersistence == RELPERSISTENCE_PERMANENT) ! state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; else ! state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; ! pg_atomic_write_u32(&buf->state, state); if (oldFlags & BM_TAG_VALID) { *************** BufferAlloc(SMgrRelation smgr, char relp *** 1319,1325 **** * InvalidateBuffer -- mark a shared buffer invalid and return it to the * freelist. * ! * The buffer header spinlock must be held at entry. We drop it before * returning. (This is sane because the caller must have locked the * buffer in order to be sure it should be dropped.) * --- 1330,1336 ---- * InvalidateBuffer -- mark a shared buffer invalid and return it to the * freelist. * ! * The buffer header lock must be held at entry. We drop it before * returning. (This is sane because the caller must have locked the * buffer in order to be sure it should be dropped.) * *************** InvalidateBuffer(BufferDesc *buf) *** 1338,1346 **** BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! BufFlags oldFlags; ! /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; UnlockBufHdr(buf); --- 1349,1358 ---- BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ ! uint32 oldFlags; ! uint32 state; ! /* Save the original buffer tag before dropping the header lock */ oldTag = buf->tag; UnlockBufHdr(buf); *************** retry: *** 1362,1368 **** LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ ! LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) --- 1374,1380 ---- LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ ! state = LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) *************** retry: *** 1381,1387 **** * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ ! if (buf->refcount != 0) { UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); --- 1393,1399 ---- * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ ! if (BUF_STATE_GET_REFCOUNT(state) != 0) { UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); *************** retry: *** 1396,1407 **** * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ ! oldFlags = buf->flags; CLEAR_BUFFERTAG(buf->tag); ! buf->flags = 0; ! buf->usage_count = 0; ! ! UnlockBufHdr(buf); /* * Remove the buffer from the lookup hashtable, if it was in there. --- 1408,1417 ---- * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ ! oldFlags = state & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); ! state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); ! pg_atomic_write_u32(&buf->state, state); /* * Remove the buffer from the lookup hashtable, if it was in there. *************** void *** 1433,1438 **** --- 1443,1449 ---- MarkBufferDirty(Buffer buffer) { BufferDesc *bufHdr; + uint32 state; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); *************** MarkBufferDirty(Buffer buffer) *** 1449,1462 **** /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); ! LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); /* * If the buffer was not dirty already, do vacuum accounting. */ ! if (!(bufHdr->flags & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; --- 1460,1473 ---- /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); ! state = LockBufHdr(bufHdr); ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); /* * If the buffer was not dirty already, do vacuum accounting. */ ! if (!(state & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; *************** MarkBufferDirty(Buffer buffer) *** 1464,1472 **** VacuumCostBalance += VacuumCostPageDirty; } ! bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); ! ! UnlockBufHdr(bufHdr); } /* --- 1475,1483 ---- VacuumCostBalance += VacuumCostPageDirty; } ! state |= BM_DIRTY | BM_JUST_DIRTIED; ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&bufHdr->state, state); } /* *************** ReleaseAndReadBuffer(Buffer buffer, *** 1506,1512 **** else { bufHdr = GetBufferDescriptor(buffer - 1); ! /* we have pin, so it's ok to examine tag without spinlock */ if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) && bufHdr->tag.forkNum == forkNum) --- 1517,1523 ---- else { bufHdr = GetBufferDescriptor(buffer - 1); ! /* we have pin, so it's ok to examine tag without header lock */ if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) && bufHdr->tag.forkNum == forkNum) *************** ReleaseAndReadBuffer(Buffer buffer, *** 1531,1540 **** * * This should be applied only to shared buffers, never local ones. * * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows ! * some callers to avoid an extra spinlock cycle. */ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) --- 1542,1556 ---- * * This should be applied only to shared buffers, never local ones. * + * Since buffers are pinned/unpinned very frequently, this functions tries + * to pin buffer as cheap as possible. This is why we don't take buffer header + * lock here, but update state variable in loop of CAS operations. Hopefully. + * it would be just single CAS. + * * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows ! * some callers to avoid an extra header lock cycle. */ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) *************** PinBuffer(BufferDesc *buf, BufferAccessS *** 1547,1569 **** if (ref == NULL) { ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); ! LockBufHdr(buf); ! buf->refcount++; ! if (strategy == NULL) ! { ! if (buf->usage_count < BM_MAX_USAGE_COUNT) ! buf->usage_count++; ! } ! else { ! if (buf->usage_count == 0) ! buf->usage_count = 1; } ! result = (buf->flags & BM_VALID) != 0; ! UnlockBufHdr(buf); } else { --- 1563,1606 ---- if (ref == NULL) { + /* loop of CAS operations */ + uint32 state; + uint32 oldstate; + SpinDelayStatus delayStatus; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); ! state = pg_atomic_read_u32(&buf->state); ! oldstate = state; ! ! init_spin_delay(&delayStatus, (Pointer)buf, __FILE__, __LINE__); ! while (true) { ! /* spin-wait till lock is free */ ! while (state & BM_LOCKED) ! { ! make_spin_delay(&delayStatus); ! state = pg_atomic_read_u32(&buf->state); ! oldstate = state; ! } ! ! /* increase refcount */ ! state += BUF_REFCOUNT_ONE; ! ! /* increase usagecount unless already max */ ! if (BUF_STATE_GET_USAGECOUNT(state) != BM_MAX_USAGE_COUNT) ! state += BUF_USAGECOUNT_ONE; ! ! /* try to do CAS, exit on success */ ! if (pg_atomic_compare_exchange_u32(&buf->state, &oldstate, state)) ! break; ! ! /* get ready for next loop, oldstate has been updated by cas */ ! state = oldstate; } ! finish_spin_delay(&delayStatus); ! result = (state & BM_VALID) != 0; } else { *************** PinBuffer(BufferDesc *buf, BufferAccessS *** 1579,1587 **** /* * PinBuffer_Locked -- as above, but caller already locked the buffer header. ! * The spinlock is released before return. * ! * As this function is called with the spinlock held, the caller has to * previously call ReservePrivateRefCountEntry(). * * Currently, no callers of this function want to modify the buffer's --- 1616,1624 ---- /* * PinBuffer_Locked -- as above, but caller already locked the buffer header. ! * The header lock is released before return. * ! * As this function is called with the header lock held, the caller has to * previously call ReservePrivateRefCountEntry(). * * Currently, no callers of this function want to modify the buffer's *************** PinBuffer(BufferDesc *buf, BufferAccessS *** 1592,1598 **** * Also all callers only ever use this function when it's known that the * buffer can't have a preexisting pin by this backend. That allows us to skip * searching the private refcount array & hash, which is a boon, because the ! * spinlock is still held. * * Note: use of this routine is frequently mandatory, not just an optimization * to save a spin lock/unlock cycle, because we need to pin a buffer before --- 1629,1635 ---- * Also all callers only ever use this function when it's known that the * buffer can't have a preexisting pin by this backend. That allows us to skip * searching the private refcount array & hash, which is a boon, because the ! * header lock is still held. * * Note: use of this routine is frequently mandatory, not just an optimization * to save a spin lock/unlock cycle, because we need to pin a buffer before *************** PinBuffer_Locked(BufferDesc *buf) *** 1603,1617 **** { Buffer b; PrivateRefCountEntry *ref; /* * As explained, We don't expect any preexisting pins. That allows us to ! * manipulate the PrivateRefCount after releasing the spinlock */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); ! buf->refcount++; ! UnlockBufHdr(buf); b = BufferDescriptorGetBuffer(buf); --- 1640,1661 ---- { Buffer b; PrivateRefCountEntry *ref; + uint32 state; /* * As explained, We don't expect any preexisting pins. That allows us to ! * manipulate the PrivateRefCount after releasing the header lock */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); ! /* ! * Since we assume to held buffer header lock, we can update the buffer ! * state in a single write operation. ! */ ! state = pg_atomic_read_u32(&buf->state); ! state += 1; ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&buf->state, state); b = BufferDescriptorGetBuffer(buf); *************** UnpinBuffer(BufferDesc *buf, bool fixOwn *** 1646,1675 **** ref->refcount--; if (ref->refcount == 0) { /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); ! LockBufHdr(buf); ! /* Decrement the shared reference count */ ! Assert(buf->refcount > 0); ! buf->refcount--; ! /* Support LockBufferForCleanup() */ ! if ((buf->flags & BM_PIN_COUNT_WAITER) && ! buf->refcount == 1) { ! /* we just released the last pin other than the waiter's */ ! int wait_backend_pid = buf->wait_backend_pid; ! buf->flags &= ~BM_PIN_COUNT_WAITER; ! UnlockBufHdr(buf); ! ProcSendSignal(wait_backend_pid); } ! else ! UnlockBufHdr(buf); ForgetPrivateRefCountEntry(ref); } } --- 1690,1755 ---- ref->refcount--; if (ref->refcount == 0) { + uint32 state; + uint32 oldstate; + SpinDelayStatus delayStatus; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); ! /* ! * Decrement the shared reference count. ! * ! * Since buffer header lock holder can update status using just write, ! * it's not safe to use atomic decrement here. We are doing loop of ! * CAS operations like PinBuffer does. ! */ ! state = pg_atomic_read_u32(&buf->state); ! oldstate = state; ! init_spin_delay(&delayStatus, (Pointer)buf, __FILE__, __LINE__); ! while (true) { ! /* spin-wait till lock is free */ ! while (state & BM_LOCKED) ! { ! make_spin_delay(&delayStatus); ! state = pg_atomic_read_u32(&buf->state); ! oldstate = state; ! } ! /* decrease refcount */ ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); ! state -= 1; ! ! /* try to do CAS, exit on success */ ! if (pg_atomic_compare_exchange_u32(&buf->state, &oldstate, state)) ! break; ! ! /* get ready for next loop, oldstate has been updated by cas */ ! state = oldstate; } ! finish_spin_delay(&delayStatus); ! ! /* Support LockBufferForCleanup() */ ! if (state & BM_PIN_COUNT_WAITER) ! { ! state = LockBufHdr(buf); ! ! if (state & BM_PIN_COUNT_WAITER && BUF_STATE_GET_REFCOUNT(state) == 1) ! { ! /* we just released the last pin other than the waiter's */ ! int wait_backend_pid = buf->wait_backend_pid; + state &= ~(BM_PIN_COUNT_WAITER | BM_LOCKED); + pg_atomic_write_u32(&buf->state, state); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf); + } ForgetPrivateRefCountEntry(ref); } } *************** UnpinBuffer(BufferDesc *buf, bool fixOwn *** 1687,1692 **** --- 1767,1773 ---- static void BufferSync(int flags) { + uint32 state; int buf_id; int num_to_scan; int num_spaces; *************** BufferSync(int flags) *** 1733,1748 **** BufferDesc *bufHdr = GetBufferDescriptor(buf_id); /* ! * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ ! LockBufHdr(bufHdr); ! if ((bufHdr->flags & mask) == mask) { CkptSortItem *item; ! bufHdr->flags |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; --- 1814,1829 ---- BufferDesc *bufHdr = GetBufferDescriptor(buf_id); /* ! * Header lock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ ! state = LockBufHdr(bufHdr); ! if ((state & mask) == mask) { CkptSortItem *item; ! state |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; *************** BufferSync(int flags) *** 1752,1758 **** item->blockNum = bufHdr->tag.blockNum; } ! UnlockBufHdr(bufHdr); } if (num_to_scan == 0) --- 1833,1840 ---- item->blockNum = bufHdr->tag.blockNum; } ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&bufHdr->state, state); } if (num_to_scan == 0) *************** BufferSync(int flags) *** 1888,1894 **** * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ ! if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { --- 1970,1976 ---- * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ ! if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { *************** SyncOneBuffer(int buf_id, bool skip_rece *** 2258,2263 **** --- 2340,2346 ---- { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 state; BufferTag tag; ReservePrivateRefCountEntry(); *************** SyncOneBuffer(int buf_id, bool skip_rece *** 2271,2280 **** * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ ! LockBufHdr(bufHdr); ! if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) result |= BUF_REUSABLE; else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ --- 2354,2366 ---- * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ ! state = LockBufHdr(bufHdr); ! if (BUF_STATE_GET_REFCOUNT(state) == 0 && ! BUF_STATE_GET_USAGECOUNT(state) == 0) ! { result |= BUF_REUSABLE; + } else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ *************** SyncOneBuffer(int buf_id, bool skip_rece *** 2282,2288 **** return result; } ! if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) { /* It's clean, so nothing to do */ UnlockBufHdr(bufHdr); --- 2368,2374 ---- return result; } ! if (!(state & BM_VALID) || !(state & BM_DIRTY)) { /* It's clean, so nothing to do */ UnlockBufHdr(bufHdr); *************** PrintBufferLeakWarning(Buffer buffer) *** 2439,2444 **** --- 2525,2531 ---- int32 loccount; char *path; BackendId backend; + uint32 state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) *************** PrintBufferLeakWarning(Buffer buffer) *** 2456,2467 **** /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, ! buf->tag.blockNum, buf->flags, ! buf->refcount, loccount); pfree(path); } --- 2543,2555 ---- /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, ! buf->tag.blockNum, state & BUF_FLAG_MASK, ! BUF_STATE_GET_REFCOUNT(state), loccount); pfree(path); } *************** BufferGetBlockNumber(Buffer buffer) *** 2516,2522 **** else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without spinlock */ return bufHdr->tag.blockNum; } --- 2604,2610 ---- else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without lock */ return bufHdr->tag.blockNum; } *************** BufferGetTag(Buffer buffer, RelFileNode *** 2539,2545 **** else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without spinlock */ *rnode = bufHdr->tag.rnode; *forknum = bufHdr->tag.forkNum; *blknum = bufHdr->tag.blockNum; --- 2627,2633 ---- else bufHdr = GetBufferDescriptor(buffer - 1); ! /* pinned, so OK to read tag without lock */ *rnode = bufHdr->tag.rnode; *forknum = bufHdr->tag.forkNum; *blknum = bufHdr->tag.blockNum; *************** FlushBuffer(BufferDesc *buf, SMgrRelatio *** 2573,2578 **** --- 2661,2667 ---- io_time; Block bufBlock; char *bufToWrite; + uint32 state; /* * Acquire the buffer's io_in_progress lock. If StartBufferIO returns *************** FlushBuffer(BufferDesc *buf, SMgrRelatio *** 2598,2604 **** reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode); ! LockBufHdr(buf); /* * Run PageGetLSN while holding header lock, since we don't have the --- 2687,2693 ---- reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode); ! state = LockBufHdr(buf); /* * Run PageGetLSN while holding header lock, since we don't have the *************** FlushBuffer(BufferDesc *buf, SMgrRelatio *** 2607,2614 **** recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ ! buf->flags &= ~BM_JUST_DIRTIED; ! UnlockBufHdr(buf); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL --- 2696,2703 ---- recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ ! state &= ~(BM_JUST_DIRTIED | BM_LOCKED); ! pg_atomic_write_u32(&buf->state, state); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL *************** FlushBuffer(BufferDesc *buf, SMgrRelatio *** 2627,2633 **** * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ ! if (buf->flags & BM_PERMANENT) XLogFlush(recptr); /* --- 2716,2722 ---- * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ ! if (state & BM_PERMANENT) XLogFlush(recptr); /* *************** BufferIsPermanent(Buffer buffer) *** 2715,2727 **** /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we ! * need not bother with the buffer header spinlock. Even if someone else * changes the buffer header flags while we're doing this, we assume that * changing an aligned 2-byte BufFlags value is atomic, so we'll read the * old value or the new value, but not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); ! return (bufHdr->flags & BM_PERMANENT) != 0; } /* --- 2804,2816 ---- /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we ! * need not bother with the buffer header lock. Even if someone else * changes the buffer header flags while we're doing this, we assume that * changing an aligned 2-byte BufFlags value is atomic, so we'll read the * old value or the new value, but not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); ! return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* *************** DropRelFileNodeBuffers(RelFileNodeBacken *** 2821,2827 **** if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 2910,2916 ---- if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) ! InvalidateBuffer(bufHdr); /* releases lock */ else UnlockBufHdr(bufHdr); } *************** DropRelFileNodesAllBuffers(RelFileNodeBa *** 2919,2925 **** LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 3008,3014 ---- LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) ! InvalidateBuffer(bufHdr); /* releases lock */ else UnlockBufHdr(bufHdr); } *************** DropDatabaseBuffers(Oid dbid) *** 2961,2967 **** LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 3050,3056 ---- LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) ! InvalidateBuffer(bufHdr); /* releases lock */ else UnlockBufHdr(bufHdr); } *************** FlushRelationBuffers(Relation rel) *** 3057,3063 **** { bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; --- 3146,3153 ---- { bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY)) ! == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; *************** FlushRelationBuffers(Relation rel) *** 3078,3084 **** localpage, false); ! bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); /* Pop the error context stack */ error_context_stack = errcallback.previous; --- 3168,3174 ---- localpage, false); ! pg_atomic_fetch_and_u32(&bufHdr->state, ~(BM_DIRTY | BM_JUST_DIRTIED)); /* Pop the error context stack */ error_context_stack = errcallback.previous; *************** FlushRelationBuffers(Relation rel) *** 3093,3098 **** --- 3183,3190 ---- for (i = 0; i < NBuffers; i++) { + uint32 state; + bufHdr = GetBufferDescriptor(i); /* *************** FlushRelationBuffers(Relation rel) *** 3104,3112 **** ReservePrivateRefCountEntry(); ! LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); --- 3196,3204 ---- ReservePrivateRefCountEntry(); ! state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && ! (state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); *************** FlushDatabaseBuffers(Oid dbid) *** 3145,3150 **** --- 3237,3243 ---- for (i = 0; i < NBuffers; i++) { + uint32 state; bufHdr = GetBufferDescriptor(i); /* *************** FlushDatabaseBuffers(Oid dbid) *** 3156,3164 **** ReservePrivateRefCountEntry(); ! LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && ! (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); --- 3249,3257 ---- ReservePrivateRefCountEntry(); ! state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && ! (state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3290,3308 **** * This routine might get called many times on the same page, if we are * making the first scan after commit of an xact that added/deleted many * tuples. So, be as quick as we can if the buffer is already dirty. We ! * do this by not acquiring spinlock if it looks like the status bits are * already set. Since we make this test unlocked, there's a chance we * might fail to notice that the flags have just been cleared, and failed * to reset them, due to memory-ordering issues. But since this function * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ ! if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; /* * If we need to protect hint bit updates from torn writes, WAL-log a --- 3383,3402 ---- * This routine might get called many times on the same page, if we are * making the first scan after commit of an xact that added/deleted many * tuples. So, be as quick as we can if the buffer is already dirty. We ! * do this by not acquiring header lock if it looks like the status bits are * already set. Since we make this test unlocked, there's a chance we * might fail to notice that the flags have just been cleared, and failed * to reset them, due to memory-ordering issues. But since this function * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ ! if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 state; /* * If we need to protect hint bit updates from torn writes, WAL-log a *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3313,3319 **** * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ ! if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. --- 3407,3413 ---- * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ ! if (XLogHintBitIsNeeded() && (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3352,3360 **** lsn = XLogSaveBufferForHint(buffer, buffer_std); } ! LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); ! if (!(bufHdr->flags & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ --- 3446,3456 ---- lsn = XLogSaveBufferForHint(buffer, buffer_std); } ! state = LockBufHdr(bufHdr); ! ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); ! ! if (!(state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ *************** MarkBufferDirtyHint(Buffer buffer, bool *** 3374,3381 **** if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } ! bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); ! UnlockBufHdr(bufHdr); if (delayChkpt) MyPgXact->delayChkpt = false; --- 3470,3479 ---- if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } ! ! state |= BM_DIRTY | BM_JUST_DIRTIED; ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&bufHdr->state, state); if (delayChkpt) MyPgXact->delayChkpt = false; *************** UnlockBuffers(void) *** 3406,3422 **** if (buf) { ! LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ ! if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) ! buf->flags &= ~BM_PIN_COUNT_WAITER; ! UnlockBufHdr(buf); PinCountWaitBuf = NULL; } --- 3504,3523 ---- if (buf) { ! uint32 state; ! ! state = LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ ! if ((state & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) ! state &= ~BM_PIN_COUNT_WAITER; ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&buf->state, state); PinCountWaitBuf = NULL; } *************** LockBufferForCleanup(Buffer buffer) *** 3509,3535 **** for (;;) { /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ! LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); ! if (bufHdr->refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); return; } /* Failed, so mark myself as waiting for pincount 1 */ ! if (bufHdr->flags & BM_PIN_COUNT_WAITER) { UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; - bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; ! UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Report the wait */ --- 3610,3640 ---- for (;;) { + uint32 state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ! state = LockBufHdr(bufHdr); ! ! Assert(BUF_STATE_GET_REFCOUNT(state) > 0); ! if (BUF_STATE_GET_REFCOUNT(state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); return; } /* Failed, so mark myself as waiting for pincount 1 */ ! if (state & BM_PIN_COUNT_WAITER) { UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; PinCountWaitBuf = bufHdr; ! state |= BM_PIN_COUNT_WAITER; ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&bufHdr->state, state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Report the wait */ *************** LockBufferForCleanup(Buffer buffer) *** 3558,3568 **** * impossible with the current usages due to table level locking, but * better be safe. */ ! LockBufHdr(bufHdr); ! if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) ! bufHdr->flags &= ~BM_PIN_COUNT_WAITER; ! UnlockBufHdr(bufHdr); PinCountWaitBuf = NULL; /* Loop back and try again */ --- 3663,3674 ---- * impossible with the current usages due to table level locking, but * better be safe. */ ! state = LockBufHdr(bufHdr); ! if ((state & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) ! state &= ~BM_PIN_COUNT_WAITER; ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&bufHdr->state, state); PinCountWaitBuf = NULL; /* Loop back and try again */ *************** bool *** 3603,3624 **** ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { /* There should be exactly one pin */ ! Assert(LocalRefCount[-buffer - 1] > 0); ! if (LocalRefCount[-buffer - 1] != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ ! Assert(GetPrivateRefCount(buffer) > 0); ! if (GetPrivateRefCount(buffer) != 1) return false; /* Try to acquire lock */ --- 3709,3734 ---- ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; + uint32 state, + refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ ! Assert(refcount > 0); ! if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ ! refcount = GetPrivateRefCount(buffer); ! Assert(refcount); ! if (refcount != 1) return false; /* Try to acquire lock */ *************** ConditionalLockBufferForCleanup(Buffer b *** 3626,3634 **** return false; bufHdr = GetBufferDescriptor(buffer - 1); ! LockBufHdr(bufHdr); ! Assert(bufHdr->refcount > 0); ! if (bufHdr->refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); --- 3736,3746 ---- return false; bufHdr = GetBufferDescriptor(buffer - 1); ! state = LockBufHdr(bufHdr); ! refcount = BUF_STATE_GET_REFCOUNT(state); ! ! Assert(refcount > 0); ! if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); *************** WaitIO(BufferDesc *buf) *** 3666,3682 **** */ for (;;) { ! BufFlags sv_flags; /* ! * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ ! LockBufHdr(buf); ! sv_flags = buf->flags; UnlockBufHdr(buf); ! if (!(sv_flags & BM_IO_IN_PROGRESS)) break; LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockRelease(BufferDescriptorGetIOLock(buf)); --- 3778,3794 ---- */ for (;;) { ! uint32 state; /* ! * It may not be necessary to acquire the header lock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ ! state = LockBufHdr(buf); UnlockBufHdr(buf); ! ! if (!(state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockRelease(BufferDescriptorGetIOLock(buf)); *************** WaitIO(BufferDesc *buf) *** 3704,3709 **** --- 3816,3823 ---- static bool StartBufferIO(BufferDesc *buf, bool forInput) { + uint32 state; + Assert(!InProgressBuf); for (;;) *************** StartBufferIO(BufferDesc *buf, bool forI *** 3714,3722 **** */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); ! LockBufHdr(buf); ! if (!(buf->flags & BM_IO_IN_PROGRESS)) break; /* --- 3828,3836 ---- */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); ! state = LockBufHdr(buf); ! if (!(state & BM_IO_IN_PROGRESS)) break; /* *************** StartBufferIO(BufferDesc *buf, bool forI *** 3732,3738 **** /* Once we get here, there is definitely no I/O active on this buffer */ ! if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) { /* someone else already did the I/O */ UnlockBufHdr(buf); --- 3846,3852 ---- /* Once we get here, there is definitely no I/O active on this buffer */ ! if (forInput ? (state & BM_VALID) : !(state & BM_DIRTY)) { /* someone else already did the I/O */ UnlockBufHdr(buf); *************** StartBufferIO(BufferDesc *buf, bool forI *** 3740,3748 **** return false; } ! buf->flags |= BM_IO_IN_PROGRESS; ! ! UnlockBufHdr(buf); InProgressBuf = buf; IsForInput = forInput; --- 3854,3862 ---- return false; } ! state |= BM_IO_IN_PROGRESS; ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&buf->state, state); InProgressBuf = buf; IsForInput = forInput; *************** StartBufferIO(BufferDesc *buf, bool forI *** 3768,3786 **** * be 0, or BM_VALID if we just finished reading in the page. */ static void ! TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) { Assert(buf == InProgressBuf); ! LockBufHdr(buf); ! Assert(buf->flags & BM_IO_IN_PROGRESS); ! buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); ! if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) ! buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); ! buf->flags |= set_flag_bits; ! UnlockBufHdr(buf); InProgressBuf = NULL; --- 3882,3903 ---- * be 0, or BM_VALID if we just finished reading in the page. */ static void ! TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) { + uint32 state; + Assert(buf == InProgressBuf); ! state = LockBufHdr(buf); ! Assert(state & BM_IO_IN_PROGRESS); ! state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR | BM_LOCKED); ! if (clear_dirty && !(state & BM_JUST_DIRTIED)) ! state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); ! ! state |= set_flag_bits; ! pg_atomic_write_u32(&buf->state, state); InProgressBuf = NULL; *************** AbortBufferIO(void) *** 3803,3808 **** --- 3920,3926 ---- if (buf) { + uint32 state; /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that *************** AbortBufferIO(void) *** 3811,3836 **** */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); ! LockBufHdr(buf); ! Assert(buf->flags & BM_IO_IN_PROGRESS); if (IsForInput) { ! Assert(!(buf->flags & BM_DIRTY)); /* We'd better not think buffer is valid yet */ ! Assert(!(buf->flags & BM_VALID)); UnlockBufHdr(buf); } else { ! BufFlags sv_flags; ! ! sv_flags = buf->flags; ! Assert(sv_flags & BM_DIRTY); UnlockBufHdr(buf); /* Issue notice if this is not the first failure... */ ! if (sv_flags & BM_IO_ERROR) { ! /* Buffer is pinned, so we can read tag without spinlock */ char *path; path = relpathperm(buf->tag.rnode, buf->tag.forkNum); --- 3929,3952 ---- */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); ! state = LockBufHdr(buf); ! Assert(state & BM_IO_IN_PROGRESS); if (IsForInput) { ! Assert(!(state & BM_DIRTY)); ! /* We'd better not think buffer is valid yet */ ! Assert(!(state & BM_VALID)); UnlockBufHdr(buf); } else { ! Assert(state & BM_DIRTY); UnlockBufHdr(buf); /* Issue notice if this is not the first failure... */ ! if (state & BM_IO_ERROR) { ! /* Buffer is pinned, so we can read tag without header lock */ char *path; path = relpathperm(buf->tag.rnode, buf->tag.forkNum); *************** shared_buffer_write_error_callback(void *** 3854,3860 **** { BufferDesc *bufHdr = (BufferDesc *) arg; ! /* Buffer is pinned, so we can read the tag without locking the spinlock */ if (bufHdr != NULL) { char *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum); --- 3970,3976 ---- { BufferDesc *bufHdr = (BufferDesc *) arg; ! /* Buffer is pinned, so we can read the tag without locking the header */ if (bufHdr != NULL) { char *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum); *************** rnode_comparator(const void *p1, const v *** 3912,3917 **** --- 4028,4075 ---- } /* + * Lock buffer header - set BM_LOCKED in buffer state. + */ + uint32 + LockBufHdr(volatile BufferDesc *desc) + { + SpinDelayStatus delayStatus; + uint32 state; + + init_spin_delay(&delayStatus, (Pointer)desc, __FILE__, __LINE__); + + state = pg_atomic_read_u32(&desc->state); + + for (;;) + { + /* wait till lock is free */ + while (state & BM_LOCKED) + { + make_spin_delay(&delayStatus); + state = pg_atomic_read_u32(&desc->state); + /* Add exponential backoff? Should seldomly be contended tho. */ + } + + /* and try to get lock */ + if (pg_atomic_compare_exchange_u32(&desc->state, &state, state | BM_LOCKED)) + break; + } + finish_spin_delay(&delayStatus); + return state | BM_LOCKED; + } + + /* + * Unlock buffer header - unset BM_LOCKED in buffer state. + */ + void + UnlockBufHdr(volatile BufferDesc *desc) + { + Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED); + + pg_atomic_sub_fetch_u32(&desc->state, BM_LOCKED); + } + + /* * BufferTag comparator. */ static int diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c new file mode 100644 index 551d152..8b99824 *** a/src/backend/storage/buffer/freelist.c --- b/src/backend/storage/buffer/freelist.c *************** typedef struct BufferAccessStrategyData *** 98,104 **** /* Prototypes for internal functions */ ! static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); --- 98,105 ---- /* Prototypes for internal functions */ ! static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, ! uint32 *lockstate); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); *************** ClockSweepTick(void) *** 180,186 **** * return the buffer with the buffer header spinlock still held. */ BufferDesc * ! StrategyGetBuffer(BufferAccessStrategy strategy) { BufferDesc *buf; int bgwprocno; --- 181,187 ---- * return the buffer with the buffer header spinlock still held. */ BufferDesc * ! StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *lockstate) { BufferDesc *buf; int bgwprocno; *************** StrategyGetBuffer(BufferAccessStrategy s *** 192,198 **** */ if (strategy != NULL) { ! buf = GetBufferFromRing(strategy); if (buf != NULL) return buf; } --- 193,199 ---- */ if (strategy != NULL) { ! buf = GetBufferFromRing(strategy, lockstate); if (buf != NULL) return buf; } *************** StrategyGetBuffer(BufferAccessStrategy s *** 250,255 **** --- 251,258 ---- { while (true) { + uint32 state; + /* Acquire the spinlock to remove element from the freelist */ SpinLockAcquire(&StrategyControl->buffer_strategy_lock); *************** StrategyGetBuffer(BufferAccessStrategy s *** 279,289 **** * it before we got to it. It's probably impossible altogether as * of 8.3, but we'd better check anyway.) */ ! LockBufHdr(buf); ! if (buf->refcount == 0 && buf->usage_count == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); return buf; } UnlockBufHdr(buf); --- 282,294 ---- * it before we got to it. It's probably impossible altogether as * of 8.3, but we'd better check anyway.) */ ! state = LockBufHdr(buf); ! if (BUF_STATE_GET_REFCOUNT(state) == 0 ! && BUF_STATE_GET_USAGECOUNT(state) == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); + *lockstate = state; return buf; } UnlockBufHdr(buf); *************** StrategyGetBuffer(BufferAccessStrategy s *** 295,300 **** --- 300,306 ---- trycounter = NBuffers; for (;;) { + uint32 state; buf = GetBufferDescriptor(ClockSweepTick()); *************** StrategyGetBuffer(BufferAccessStrategy s *** 302,313 **** * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */ ! LockBufHdr(buf); ! if (buf->refcount == 0) { ! if (buf->usage_count > 0) { ! buf->usage_count--; trycounter = NBuffers; } else --- 308,321 ---- * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */ ! state = LockBufHdr(buf); ! ! if (BUF_STATE_GET_REFCOUNT(state) == 0) { ! if (BUF_STATE_GET_USAGECOUNT(state) != 0) { ! state -= BUF_USAGECOUNT_ONE; ! trycounter = NBuffers; } else *************** StrategyGetBuffer(BufferAccessStrategy s *** 315,320 **** --- 323,329 ---- /* Found a usable buffer */ if (strategy != NULL) AddBufferToRing(strategy, buf); + *lockstate = state; return buf; } } *************** StrategyGetBuffer(BufferAccessStrategy s *** 327,336 **** * probably better to fail than to risk getting stuck in an * infinite loop. */ ! UnlockBufHdr(buf); elog(ERROR, "no unpinned buffers available"); } ! UnlockBufHdr(buf); } } --- 336,347 ---- * probably better to fail than to risk getting stuck in an * infinite loop. */ ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&buf->state, state); elog(ERROR, "no unpinned buffers available"); } ! state &= ~BM_LOCKED; ! pg_atomic_write_u32(&buf->state, state); } } *************** FreeAccessStrategy(BufferAccessStrategy *** 585,594 **** * The bufhdr spin lock is held on the returned buffer. */ static BufferDesc * ! GetBufferFromRing(BufferAccessStrategy strategy) { BufferDesc *buf; Buffer bufnum; /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) --- 596,606 ---- * The bufhdr spin lock is held on the returned buffer. */ static BufferDesc * ! GetBufferFromRing(BufferAccessStrategy strategy, uint32 *lockstate) { BufferDesc *buf; Buffer bufnum; + uint32 state; /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) *************** GetBufferFromRing(BufferAccessStrategy s *** 616,625 **** * shouldn't re-use it. */ buf = GetBufferDescriptor(bufnum - 1); ! LockBufHdr(buf); ! if (buf->refcount == 0 && buf->usage_count <= 1) { strategy->current_was_in_ring = true; return buf; } UnlockBufHdr(buf); --- 628,639 ---- * shouldn't re-use it. */ buf = GetBufferDescriptor(bufnum - 1); ! state = LockBufHdr(buf); ! if (BUF_STATE_GET_REFCOUNT(state) == 0 ! && BUF_STATE_GET_USAGECOUNT(state) <= 1) { strategy->current_was_in_ring = true; + *lockstate = state; return buf; } UnlockBufHdr(buf); diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c new file mode 100644 index 17640cf..edc0ada *** a/src/backend/storage/buffer/localbuf.c --- b/src/backend/storage/buffer/localbuf.c *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 108,113 **** --- 108,114 ---- int b; int trycounter; bool found; + uint32 state; INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 128,143 **** fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { ! if (bufHdr->usage_count < BM_MAX_USAGE_COUNT) ! bufHdr->usage_count++; } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); ! if (bufHdr->flags & BM_VALID) *foundPtr = TRUE; else { --- 129,149 ---- fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif + state = pg_atomic_read_u32(&bufHdr->state); + /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { ! if (BUF_STATE_GET_USAGECOUNT(state) < BM_MAX_USAGE_COUNT) ! { ! state += BUF_USAGECOUNT_ONE; ! pg_atomic_write_u32(&bufHdr->state, state); ! } } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); ! if (state & BM_VALID) *foundPtr = TRUE; else { *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 169,177 **** if (LocalRefCount[b] == 0) { ! if (bufHdr->usage_count > 0) { ! bufHdr->usage_count--; trycounter = NLocBuffer; } else --- 175,186 ---- if (LocalRefCount[b] == 0) { ! state = pg_atomic_read_u32(&bufHdr->state); ! ! if (BUF_STATE_GET_USAGECOUNT(state) > 0) { ! state -= BUF_USAGECOUNT_ONE; ! pg_atomic_write_u32(&bufHdr->state, state); trycounter = NLocBuffer; } else *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 193,199 **** * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ ! if (bufHdr->flags & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); --- 202,208 ---- * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ ! if (state & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 211,217 **** false); /* Mark not-dirty now in case we error out below */ ! bufHdr->flags &= ~BM_DIRTY; pgBufferUsage.local_blks_written++; } --- 220,227 ---- false); /* Mark not-dirty now in case we error out below */ ! state &= ~BM_DIRTY; ! pg_atomic_write_u32(&bufHdr->state, state); pgBufferUsage.local_blks_written++; } *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 228,234 **** /* * Update the hash table: remove old entry, if any, and make new one. */ ! if (bufHdr->flags & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, --- 238,244 ---- /* * Update the hash table: remove old entry, if any, and make new one. */ ! if (state & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 237,243 **** elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); ! bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID); } hresult = (LocalBufferLookupEnt *) --- 247,254 ---- elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); ! state &= ~(BM_VALID | BM_TAG_VALID); ! pg_atomic_write_u32(&bufHdr->state, state); } hresult = (LocalBufferLookupEnt *) *************** LocalBufferAlloc(SMgrRelation smgr, Fork *** 250,258 **** * it's all ours now. */ bufHdr->tag = newTag; ! bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); ! bufHdr->flags |= BM_TAG_VALID; ! bufHdr->usage_count = 1; *foundPtr = FALSE; return bufHdr; --- 261,271 ---- * it's all ours now. */ bufHdr->tag = newTag; ! state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); ! state |= BM_TAG_VALID; ! state &= ~BUF_USAGECOUNT_MASK; ! state += BUF_USAGECOUNT_ONE; ! pg_atomic_write_u32(&bufHdr->state, state); *foundPtr = FALSE; return bufHdr; *************** MarkLocalBufferDirty(Buffer buffer) *** 267,272 **** --- 280,286 ---- { int bufid; BufferDesc *bufHdr; + uint32 state; Assert(BufferIsLocal(buffer)); *************** MarkLocalBufferDirty(Buffer buffer) *** 280,289 **** bufHdr = GetLocalBufferDescriptor(bufid); ! if (!(bufHdr->flags & BM_DIRTY)) ! pgBufferUsage.local_blks_dirtied++; ! bufHdr->flags |= BM_DIRTY; } /* --- 294,303 ---- bufHdr = GetLocalBufferDescriptor(bufid); ! state = pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY); ! if (!(state & BM_DIRTY)) ! pgBufferUsage.local_blks_dirtied++; } /* *************** DropRelFileNodeLocalBuffers(RelFileNode *** 307,314 **** { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; ! if ((bufHdr->flags & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) --- 321,331 ---- { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 state; ! state = pg_atomic_read_u32(&bufHdr->state); ! ! if ((state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) *************** DropRelFileNodeLocalBuffers(RelFileNode *** 327,334 **** elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! bufHdr->flags = 0; ! bufHdr->usage_count = 0; } } } --- 344,352 ---- elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! state &= ~BUF_FLAG_MASK; ! state &= ~BUF_USAGECOUNT_MASK; ! pg_atomic_write_u32(&bufHdr->state, state); } } } *************** DropRelFileNodeAllLocalBuffers(RelFileNo *** 349,356 **** { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; ! if ((bufHdr->flags & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) --- 367,377 ---- { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 state; ! state = pg_atomic_read_u32(&bufHdr->state); ! ! if ((state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) *************** DropRelFileNodeAllLocalBuffers(RelFileNo *** 367,374 **** elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! bufHdr->flags = 0; ! bufHdr->usage_count = 0; } } } --- 388,396 ---- elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); ! state &= ~BUF_FLAG_MASK; ! state &= ~BUF_USAGECOUNT_MASK; ! pg_atomic_write_u32(&bufHdr->state, state); } } } diff --git a/src/backend/storage/lmgr/s_lock.c b/src/backend/storage/lmgr/s_lock.c new file mode 100644 index cc0bf5e..a2edf00 *** a/src/backend/storage/lmgr/s_lock.c --- b/src/backend/storage/lmgr/s_lock.c *************** static int spins_per_delay = DEFAULT_SPI *** 30,146 **** * s_lock_stuck() - complain about a stuck spinlock */ static void ! s_lock_stuck(volatile slock_t *lock, const char *file, int line) { #if defined(S_LOCK_TEST) fprintf(stderr, "\nStuck spinlock (%p) detected at %s:%d.\n", ! lock, file, line); exit(1); #else elog(PANIC, "stuck spinlock (%p) detected at %s:%d", ! lock, file, line); #endif } - /* ! * s_lock(lock) - platform-independent portion of waiting for a spinlock. */ - int - s_lock(volatile slock_t *lock, const char *file, int line) - { - /* - * We loop tightly for awhile, then delay using pg_usleep() and try again. - * Preferably, "awhile" should be a small multiple of the maximum time we - * expect a spinlock to be held. 100 iterations seems about right as an - * initial guess. However, on a uniprocessor the loop is a waste of - * cycles, while in a multi-CPU scenario it's usually better to spin a bit - * longer than to call the kernel, so we try to adapt the spin loop count - * depending on whether we seem to be in a uniprocessor or multiprocessor. - * - * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd - * be wrong; there are platforms where that can result in a "stuck - * spinlock" failure. This has been seen particularly on Alphas; it seems - * that the first TAS after returning from kernel space will always fail - * on that hardware. - * - * Once we do decide to block, we use randomly increasing pg_usleep() - * delays. The first delay is 1 msec, then the delay randomly increases to - * about one second, after which we reset to 1 msec and start again. The - * idea here is that in the presence of heavy contention we need to - * increase the delay, else the spinlock holder may never get to run and - * release the lock. (Consider situation where spinlock holder has been - * nice'd down in priority by the scheduler --- it will not get scheduled - * until all would-be acquirers are sleeping, so if we always use a 1-msec - * sleep, there is a real possibility of starvation.) But we can't just - * clamp the delay to an upper bound, else it would take a long time to - * make a reasonable number of tries. - * - * We time out and declare error after NUM_DELAYS delays (thus, exactly - * that many tries). With the given settings, this will usually take 2 or - * so minutes. It seems better to fix the total number of tries (and thus - * the probability of unintended failure) than to fix the total time - * spent. - */ #define MIN_SPINS_PER_DELAY 10 #define MAX_SPINS_PER_DELAY 1000 #define NUM_DELAYS 1000 #define MIN_DELAY_USEC 1000L #define MAX_DELAY_USEC 1000000L ! int spins = 0; ! int delays = 0; ! int cur_delay = 0; ! while (TAS_SPIN(lock)) ! { ! /* CPU-specific delay each time through the loop */ ! SPIN_DELAY(); ! /* Block the process every spins_per_delay tries */ ! if (++spins >= spins_per_delay) ! { ! if (++delays > NUM_DELAYS) ! s_lock_stuck(lock, file, line); ! if (cur_delay == 0) /* first time to delay? */ ! cur_delay = MIN_DELAY_USEC; ! pg_usleep(cur_delay); #if defined(S_LOCK_TEST) ! fprintf(stdout, "*"); ! fflush(stdout); #endif ! /* increase delay by a random fraction between 1X and 2X */ ! cur_delay += (int) (cur_delay * ! ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5); ! /* wrap back to minimum delay when max is exceeded */ ! if (cur_delay > MAX_DELAY_USEC) ! cur_delay = MIN_DELAY_USEC; ! spins = 0; ! } } ! /* ! * If we were able to acquire the lock without delaying, it's a good ! * indication we are in a multiprocessor. If we had to delay, it's a sign ! * (but not a sure thing) that we are in a uniprocessor. Hence, we ! * decrement spins_per_delay slowly when we had to delay, and increase it ! * rapidly when we didn't. It's expected that spins_per_delay will ! * converge to the minimum value on a uniprocessor and to the maximum ! * value on a multiprocessor. ! * ! * Note: spins_per_delay is local within our current process. We want to ! * average these observations across multiple backends, since it's ! * relatively rare for this function to even get entered, and so a single ! * backend might not live long enough to converge on a good value. That ! * is handled by the two routines below. ! */ ! if (cur_delay == 0) { /* we never had to delay */ if (spins_per_delay < MAX_SPINS_PER_DELAY) --- 30,150 ---- * s_lock_stuck() - complain about a stuck spinlock */ static void ! s_lock_stuck(Pointer p, const char *file, int line) { #if defined(S_LOCK_TEST) fprintf(stderr, "\nStuck spinlock (%p) detected at %s:%d.\n", ! p, file, line); exit(1); #else elog(PANIC, "stuck spinlock (%p) detected at %s:%d", ! p, file, line); #endif } /* ! * We loop tightly for awhile, then delay using pg_usleep() and try again. ! * Preferably, "awhile" should be a small multiple of the maximum time we ! * expect a spinlock to be held. 100 iterations seems about right as an ! * initial guess. However, on a uniprocessor the loop is a waste of ! * cycles, while in a multi-CPU scenario it's usually better to spin a bit ! * longer than to call the kernel, so we try to adapt the spin loop count ! * depending on whether we seem to be in a uniprocessor or multiprocessor. ! * ! * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd ! * be wrong; there are platforms where that can result in a "stuck ! * spinlock" failure. This has been seen particularly on Alphas; it seems ! * that the first TAS after returning from kernel space will always fail ! * on that hardware. ! * ! * Once we do decide to block, we use randomly increasing pg_usleep() ! * delays. The first delay is 1 msec, then the delay randomly increases to ! * about one second, after which we reset to 1 msec and start again. The ! * idea here is that in the presence of heavy contention we need to ! * increase the delay, else the spinlock holder may never get to run and ! * release the lock. (Consider situation where spinlock holder has been ! * nice'd down in priority by the scheduler --- it will not get scheduled ! * until all would-be acquirers are sleeping, so if we always use a 1-msec ! * sleep, there is a real possibility of starvation.) But we can't just ! * clamp the delay to an upper bound, else it would take a long time to ! * make a reasonable number of tries. ! * ! * We time out and declare error after NUM_DELAYS delays (thus, exactly ! * that many tries). With the given settings, this will usually take 2 or ! * so minutes. It seems better to fix the total number of tries (and thus ! * the probability of unintended failure) than to fix the total time ! * spent. */ #define MIN_SPINS_PER_DELAY 10 #define MAX_SPINS_PER_DELAY 1000 #define NUM_DELAYS 1000 #define MIN_DELAY_USEC 1000L #define MAX_DELAY_USEC 1000000L ! void ! init_spin_delay(SpinDelayStatus *status, Pointer ptr, const char *file, int line) ! { ! status->spins = 0; ! status->delays = 0; ! status->cur_delay = 0; ! status->ptr = ptr; ! status->file = file; ! status->line = line; ! } ! void ! make_spin_delay(SpinDelayStatus *status) ! { ! /* CPU-specific delay each time through the loop */ ! SPIN_DELAY(); ! /* Block the process every spins_per_delay tries */ ! if (++(status->spins) >= spins_per_delay) ! { ! if (++(status->delays) > NUM_DELAYS) ! s_lock_stuck(status->ptr, status->file, status->line); ! if (status->cur_delay == 0) /* first time to delay? */ ! status->cur_delay = MIN_DELAY_USEC; ! pg_usleep(status->cur_delay); #if defined(S_LOCK_TEST) ! fprintf(stdout, "*"); ! fflush(stdout); #endif ! /* increase delay by a random fraction between 1X and 2X */ ! status->cur_delay += (int) (status->cur_delay * ! ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5); ! /* wrap back to minimum delay when max is exceeded */ ! if (status->cur_delay > MAX_DELAY_USEC) ! status->cur_delay = MIN_DELAY_USEC; ! status->spins = 0; } + } ! /* ! * If we were able to acquire the lock without delaying, it's a good ! * indication we are in a multiprocessor. If we had to delay, it's a sign ! * (but not a sure thing) that we are in a uniprocessor. Hence, we ! * decrement spins_per_delay slowly when we had to delay, and increase it ! * rapidly when we didn't. It's expected that spins_per_delay will ! * converge to the minimum value on a uniprocessor and to the maximum ! * value on a multiprocessor. ! * ! * Note: spins_per_delay is local within our current process. We want to ! * average these observations across multiple backends, since it's ! * relatively rare for this function to even get entered, and so a single ! * backend might not live long enough to converge on a good value. That ! * is handled by the two routines below. ! */ ! void ! finish_spin_delay(SpinDelayStatus *status) ! { ! if (status->cur_delay == 0) { /* we never had to delay */ if (spins_per_delay < MAX_SPINS_PER_DELAY) *************** s_lock(volatile slock_t *lock, const cha *** 151,157 **** if (spins_per_delay > MIN_SPINS_PER_DELAY) spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY); } ! return delays; } #ifdef USE_DEFAULT_S_UNLOCK --- 155,180 ---- if (spins_per_delay > MIN_SPINS_PER_DELAY) spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY); } ! } ! ! /* ! * s_lock(lock) - platform-independent portion of waiting for a spinlock. ! */ ! int ! s_lock(volatile slock_t *lock, const char *file, int line) ! { ! SpinDelayStatus delayStatus; ! ! init_spin_delay(&delayStatus, (Pointer)lock, file, line); ! ! while (TAS_SPIN(lock)) ! { ! make_spin_delay(&delayStatus); ! } ! ! finish_spin_delay(&delayStatus); ! ! return delayStatus.delays; } #ifdef USE_DEFAULT_S_UNLOCK diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h new file mode 100644 index d04363b..2cfacd8 *** a/src/include/storage/buf_internals.h --- b/src/include/storage/buf_internals.h *************** *** 21,49 **** #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/smgr.h" #include "storage/spin.h" #include "utils/relcache.h" /* * Flags for buffer descriptors * * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ ! #define BM_DIRTY (1 << 0) /* data needs writing */ ! #define BM_VALID (1 << 1) /* data is valid */ ! #define BM_TAG_VALID (1 << 2) /* tag is assigned */ ! #define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */ ! #define BM_IO_ERROR (1 << 4) /* previous I/O failed */ ! #define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */ ! #define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */ ! #define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */ ! #define BM_PERMANENT (1 << 8) /* permanent relation (not * unlogged) */ - - typedef bits16 BufFlags; - /* * The maximum allowed value of usage_count represents a tradeoff between * accuracy and speed of the clock-sweep buffer management algorithm. A --- 21,69 ---- #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/smgr.h" + #include "port/atomics.h" #include "storage/spin.h" #include "utils/relcache.h" /* + * Buffer state is a single 32-bit variable where following data is combined. + * + * - 18 bits refcount + * - 4 bits usage count + * - 10 bits of flags + * + * Such layout allows us to perform some operation more efficiently. + * The definition of buffer state parts is below. + */ + #define BUF_REFCOUNT_ONE 1 + #define BUF_REFCOUNT_MASK ((1U << 18) - 1) + #define BUF_USAGECOUNT_MASK 0x003C0000U + #define BUF_USAGECOUNT_ONE (1U << 18) + #define BUF_USAGECOUNT_SHIFT 18 + #define BUF_FLAG_MASK 0xFFC00000U + + /* Get refcount and usagecount from buffer state */ + #define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK) + #define BUF_STATE_GET_USAGECOUNT(state) (((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT) + + /* * Flags for buffer descriptors * * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ ! #define BM_LOCKED (1U << 22) /* buffer header is locked */ ! #define BM_DIRTY (1U << 23) /* data needs writing */ ! #define BM_VALID (1U << 24) /* data is valid */ ! #define BM_TAG_VALID (1U << 25) /* tag is assigned */ ! #define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */ ! #define BM_IO_ERROR (1U << 27) /* previous I/O failed */ ! #define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */ ! #define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */ ! #define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */ ! #define BM_PERMANENT (1U << 31) /* permanent relation (not * unlogged) */ /* * The maximum allowed value of usage_count represents a tradeoff between * accuracy and speed of the clock-sweep buffer management algorithm. A *************** typedef struct buftag *** 113,130 **** /* * BufferDesc -- shared descriptor/state data for a single shared buffer. * ! * Note: buf_hdr_lock must be held to examine or change the tag, flags, ! * usage_count, refcount, or wait_backend_pid fields. buf_id field never ! * changes after initialization, so does not need locking. freeNext is ! * protected by the buffer_strategy_lock not buf_hdr_lock. The LWLock can ! * take care of itself. The buf_hdr_lock is *not* used to control access to ! * the data in the buffer! * * An exception is that if we have the buffer pinned, its tag can't change ! * underneath us, so we can examine the tag without locking the spinlock. * Also, in places we do one-time reads of the flags without bothering to ! * lock the spinlock; this is generally for situations where we don't expect ! * the flag bit being tested to be changing. * * We can't physically remove items from a disk page if another backend has * the buffer pinned. Hence, a backend may need to wait for all other pins --- 133,161 ---- /* * BufferDesc -- shared descriptor/state data for a single shared buffer. * ! * Note: Buffer header lock (BM_LOCKED flag) must be held to examine or change ! * the tag, state or wait_backend_pid fields. In general, buffer header lock ! * is a spinlock which is combined with flags, refcount and usagecount into ! * single atomic variable. This layout allow us to do some operations in single ! * CAS without actually acquiring and releasing a spinlock; for instance, ! * increase or decrease refcount. buf_id field never changes after ! * initialization, so does not need locking. freeNext is protected by the ! * buffer_strategy_lock not buffer header lock. The LWLock can take care of ! * itself. The buffer header lock is *not* used to control access to the data ! * in the buffer! ! * ! * It's assumed that nobody changes the state field while buffer header lock ! * is held. Thanks to it, buffer header lock holder can do complex updates of ! * state variable in single write simultaneously with lock release (cleaning ! * BM_LOCKED flag). On the other hand, updating of state without holding ! * buffer header lock is restricted to CAS which insure that BM_LOCKED flag ! * is not set. Atomic increment/decrement, OR/AND etc are not allowed. * * An exception is that if we have the buffer pinned, its tag can't change ! * underneath us, so we can examine the tag without locking the buffer header. * Also, in places we do one-time reads of the flags without bothering to ! * lock the buffer header; this is generally for situations where we don't ! * expect the flag bit being tested to be changing. * * We can't physically remove items from a disk page if another backend has * the buffer pinned. Hence, a backend may need to wait for all other pins *************** typedef struct buftag *** 142,153 **** typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ - BufFlags flags; /* see bit definitions above */ - uint8 usage_count; /* usage counter for clock sweep code */ - slock_t buf_hdr_lock; /* protects a subset of fields, see above */ - unsigned refcount; /* # of backends holding pins on buffer */ - int wait_backend_pid; /* backend PID of pin-count waiter */ int buf_id; /* buffer's index number (from 0) */ int freeNext; /* link in freelist chain */ --- 173,183 ---- typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ + /* state of the tag, containing flags, refcount and usagecount */ + pg_atomic_uint32 state; + + int wait_backend_pid; /* backend PID of pin-count waiter */ int buf_id; /* buffer's index number (from 0) */ int freeNext; /* link in freelist chain */ *************** extern PGDLLIMPORT LWLockMinimallyPadded *** 202,212 **** #define FREENEXT_NOT_IN_LIST (-2) /* ! * Macros for acquiring/releasing a shared buffer header's spinlock. ! * Do not apply these to local buffers! */ ! #define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock) ! #define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock) /* --- 232,242 ---- #define FREENEXT_NOT_IN_LIST (-2) /* ! * Functions for acquiring/releasing a shared buffer header's spinlock. Do ! * not apply these to local buffers! FIXUP! */ ! extern uint32 LockBufHdr(volatile BufferDesc *desc); ! extern void UnlockBufHdr(volatile BufferDesc *desc); /* *************** extern void IssuePendingWritebacks(Write *** 267,273 **** extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag); /* freelist.c */ ! extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy); extern void StrategyFreeBuffer(BufferDesc *buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf); --- 297,304 ---- extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag); /* freelist.c */ ! extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, ! uint32 *state); extern void StrategyFreeBuffer(BufferDesc *buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf); diff --git a/src/include/storage/s_lock.h b/src/include/storage/s_lock.h new file mode 100644 index 8b240cd..cc6c195 *** a/src/include/storage/s_lock.h --- b/src/include/storage/s_lock.h *************** extern int s_lock(volatile slock_t *lock *** 991,994 **** --- 991,1012 ---- extern void set_spins_per_delay(int shared_spins_per_delay); extern int update_spins_per_delay(int shared_spins_per_delay); + /* + * Support for spin delay which could be useful in other places where + * spinlock-like procedures take place. + */ + typedef struct + { + int spins; + int delays; + int cur_delay; + Pointer ptr; + const char *file; + int line; + } SpinDelayStatus; + + void init_spin_delay(SpinDelayStatus *status, Pointer ptr, const char *file, int line); + void make_spin_delay(SpinDelayStatus *status); + void finish_spin_delay(SpinDelayStatus *status); + #endif /* S_LOCK_H */