Thread: Fix spinlock usage in UnpinBuffer()

Fix spinlock usage in UnpinBuffer()

From
Qingqing Zhou
Date:
There is no LWLock protecting the spinlock in UnpinBuffer(), so we need do
so ourselves. I also checked other NoHoldOff spinlock, seems they are ok.

Regards,
Qingqing

Index: bufmgr.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v
retrieving revision 1.200
diff -c -r1.200 bufmgr.c
*** bufmgr.c    22 Nov 2005 18:17:19 -0000    1.200
--- bufmgr.c    28 Dec 2005 09:33:18 -0000
***************
*** 812,817 ****
--- 812,818 ----
          Assert(!LWLockHeldByMe(buf->io_in_progress_lock));

          /* NoHoldoff ensures we don't lose control before sending signal */
+         HOLD_INTERRUPTS();
          LockBufHdr_NoHoldoff(buf);

          /* Decrement the shared reference count */
***************
*** 847,852 ****
--- 848,856 ----
          else
              UnlockBufHdr_NoHoldoff(buf);

+         /* safe to accept interrupts now */
+         RESUME_INTERRUPTS();
+
          /*
           * If VACUUM is releasing an otherwise-unused buffer, send it to the
           * freelist for near-term reuse.  We put it at the tail so that it


Re: Fix spinlock usage in UnpinBuffer()

From
Tom Lane
Date:
Qingqing Zhou <zhouqq@cs.toronto.edu> writes:
> There is no LWLock protecting the spinlock in UnpinBuffer(),

I don't believe this is necessary, and if it is necessary the patch
is wrong anyway.  Because the code uses _NoHoldoff, there won't be
any check of InterruptPending in that segment of code.  Furthermore,
any callers who don't have their own interrupt holdoff in force are
probably broken --- it's unlikely that the state of bufmgr is globally
correct if the caller of UnpinBuffer loses control.

Exactly which path of control are you worried about?

            regards, tom lane

Re: Fix spinlock usage in UnpinBuffer()

From
Qingqing Zhou
Date:

On Wed, 28 Dec 2005, Tom Lane wrote:
>
> Because the code uses _NoHoldoff, there won't be any check of
> InterruptPending in that segment of code.

I guess the danger I claimed may not really happen because of the
"ImmediateInterruptOK" variable. Since it is almost always false (except
reading inputs and some very limited local usage), so even we don't
HOLD_INTERRUPTS() in UnpinBuffer(), we are still protected by this
variable in die() when a SIGTERM sneaks in.  -- But this is dangerous
AFAICS, since we must gaurantee that nowhere during holding BufHdrLock
will invoke CHECK_FOR_INTERRUPTS(), which is *not* protected by
"ImmediateInterruptOK".

In other words, if we agree that the above behavior is safe, then we can
use _NoHoldoff in almost the whole buffer manager code on condition that
we are sure that no CHECK_FOR_INTERRUPTS() invoked while holding spinlock.
So for example, in UnlockBuffers(), remove the HOLD_INTERRUPTS() pair; in
write_buffer(), change LockBufHdr() to LockBufferHdr_NoHoldoff().

Regards,
Qingqing

Re: Fix spinlock usage in UnpinBuffer()

From
Tom Lane
Date:
> So can you drop several lines on last post in list?

[ looks at code some more... ]  Actually, I think this code thinks that
RESUME_INTERRUPTS() will do a CHECK_FOR_INTERRUPTS() if the count drops
to zero.  Which it does not.  (Perhaps it did at one time --- I don't
recall for sure --- but it definitely doesn't now.)

Realistically, given our coding rules for spinlocks --- which pretty
nearly forbid *any* out-of-line calls while holding a spinlock, and
certainly forbid any looping --- it's probably a waste of cycles to have
the HOLD/RESUME_INTERRUPTS operations associated with spinlock
grab/release at all.  There is simply no way that any legitimate path of
control will do a CHECK_FOR_INTERRUPTS() while holding or awaiting a
spinlock.

So I'm thinking the right answer is to make all the spinlock macros be
the equivalent of the NoHoldoff case.  It's reasonable for LWLockAcquire
to do a HOLD_INTERRUPTS, but I don't see the justification for doing it
at the spinlock level.

> Another thing is the qsort(). Shall we change to BSD version or just let
> it be?

I'm a bit worried about doing that across-the-board, since at least in
theory a vendor-supplied qsort ought to be tuned for the hardware et al.
I think it would be better to substitute our own qsort only on those
platforms where we have specifically proved it's a win.

            regards, tom lane

Re: Fix spinlock usage in UnpinBuffer()

From
"Qingqing Zhou"
Date:
"Tom Lane" <tgl@sss.pgh.pa.us> wrote
>
> So I'm thinking the right answer is to make all the spinlock macros be
> the equivalent of the NoHoldoff case.  It's reasonable for LWLockAcquire
> to do a HOLD_INTERRUPTS, but I don't see the justification for doing it
> at the spinlock level.
>
I agree on this. But before changing it, we need to inspect those spinlocks
one by one to making sure two things (1) if there is out-of-line-call, make
sure no CHECK_FOR_INTERRUPTS(); (2) ImmediateInterruptsOK is false (99% sure
now).

>
> I'm a bit worried about doing that across-the-board, since at least in
> theory a vendor-supplied qsort ought to be tuned for the hardware et al.
> I think it would be better to substitute our own qsort only on those
> platforms where we have specifically proved it's a win.
>
Our tests indicates that BSD version is better ... but it is just a
home-brew test.

Regards,
Qingqing



Re: Fix spinlock usage in UnpinBuffer()

From
Tom Lane
Date:
"Qingqing Zhou" <zhouqq@cs.toronto.edu> writes:
> I agree on this. But before changing it, we need to inspect those spinlocks
> one by one to making sure two things (1) if there is out-of-line-call, make
> sure no CHECK_FOR_INTERRUPTS(); (2) ImmediateInterruptsOK is false (99% sure
> now).

I'm sure of those things already ... but feel free to look for yourself.

>> I think it would be better to substitute our own qsort only on those
>> platforms where we have specifically proved it's a win.
>>
> Our tests indicates that BSD version is better ... but it is just a
> home-brew test.

Better than what is the operative question.

            regards, tom lane

Re: Fix spinlock usage in UnpinBuffer()

From
Qingqing Zhou
Date:

On Wed, 28 Dec 2005, Tom Lane wrote:
> "Qingqing Zhou" <zhouqq@cs.toronto.edu> writes:
> > I agree on this. But before changing it, we need to inspect those spinlocks
> > one by one to making sure two things (1) if there is out-of-line-call, make
> > sure no CHECK_FOR_INTERRUPTS(); (2) ImmediateInterruptsOK is false (99% sure
> > now).
>
> I'm sure of those things already ... but feel free to look for yourself.
>

Patch is ready.

---
Remove SpinLockAcquire_NoHoldoff() and related. Now SpinLockAcquire() will
not holdoff cancle/die interrupts. This will give cleaner and clearer
useage of spinlock and also save a few cycles.

The difference between the original SpinLockAcquire() and the original
SpinLockAcquire_NoHoldoff() is that the former will HOLD_INTERRUPTS()
before trying to acquire the lock. But in fact we can safely allow
Cancel/die interrupts the spinlock providing we follow the coding rule:
don't use any out-of-line calls that might invoke CHECK_FOR_INTERRUPTS()
while holding the locks. This is reasonable since we also require that the
spinlock holding time must be short. Thus, the "ImmediateInterruptsOK"
will help us to leave the processing to the proper time.

Per discussion with Tom.
---

> > Our tests indicates that BSD version is better ... but it is just a
> > home-brew test.
>
> Better than what is the operative question.
>

Better than the default libc qsort (except BSD machines). That is, if we
agreed, we will link against to the port/qsort.c just as Solaris currently
does. But if we leave as it is, I won't argue hard for it.

Regards,
Qingqing

---

Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.224
diff -c -r1.224 xlog.c
*** src/backend/access/transam/xlog.c    28 Dec 2005 23:22:50 -0000    1.224
--- src/backend/access/transam/xlog.c    29 Dec 2005 06:41:24 -0000
***************
*** 695,704 ****
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
          LogwrtRqst = xlogctl->LogwrtRqst;
          LogwrtResult = xlogctl->LogwrtResult;
!         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
      }

      /*
--- 695,704 ----
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire(&xlogctl->info_lck);
          LogwrtRqst = xlogctl->LogwrtRqst;
          LogwrtResult = xlogctl->LogwrtResult;
!         SpinLockRelease(&xlogctl->info_lck);
      }

      /*
***************
*** 940,952 ****
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
          /* advance global request to include new block(s) */
          if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
              xlogctl->LogwrtRqst.Write = WriteRqst;
          /* update local result copy while I have the chance */
          LogwrtResult = xlogctl->LogwrtResult;
!         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
      }

      ProcLastRecEnd = RecPtr;
--- 940,952 ----
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire(&xlogctl->info_lck);
          /* advance global request to include new block(s) */
          if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
              xlogctl->LogwrtRqst.Write = WriteRqst;
          /* update local result copy while I have the chance */
          LogwrtResult = xlogctl->LogwrtResult;
!         SpinLockRelease(&xlogctl->info_lck);
      }

      ProcLastRecEnd = RecPtr;
***************
*** 1175,1185 ****
              /* use volatile pointer to prevent code rearrangement */
              volatile XLogCtlData *xlogctl = XLogCtl;

!             SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
              if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
                  xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
              LogwrtResult = xlogctl->LogwrtResult;
!             SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
          }

          update_needed = false;    /* Did the shared-request update */
--- 1175,1185 ----
              /* use volatile pointer to prevent code rearrangement */
              volatile XLogCtlData *xlogctl = XLogCtl;

!             SpinLockAcquire(&xlogctl->info_lck);
              if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
                  xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
              LogwrtResult = xlogctl->LogwrtResult;
!             SpinLockRelease(&xlogctl->info_lck);
          }

          update_needed = false;    /* Did the shared-request update */
***************
*** 1560,1572 ****
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
          xlogctl->LogwrtResult = LogwrtResult;
          if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
              xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
          if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
              xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
!         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
      }

      Write->LogwrtResult = LogwrtResult;
--- 1560,1572 ----
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire(&xlogctl->info_lck);
          xlogctl->LogwrtResult = LogwrtResult;
          if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
              xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
          if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
              xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
!         SpinLockRelease(&xlogctl->info_lck);
      }

      Write->LogwrtResult = LogwrtResult;
***************
*** 1618,1628 ****
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
          if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
              WriteRqstPtr = xlogctl->LogwrtRqst.Write;
          LogwrtResult = xlogctl->LogwrtResult;
!         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
      }

      /* done already? */
--- 1618,1628 ----
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire(&xlogctl->info_lck);
          if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
              WriteRqstPtr = xlogctl->LogwrtRqst.Write;
          LogwrtResult = xlogctl->LogwrtResult;
!         SpinLockRelease(&xlogctl->info_lck);
      }

      /* done already? */
***************
*** 4984,4993 ****
      /* use volatile pointer to prevent code rearrangement */
      volatile XLogCtlData *xlogctl = XLogCtl;

!     SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
      Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
      RedoRecPtr = xlogctl->Insert.RedoRecPtr;
!     SpinLockRelease_NoHoldoff(&xlogctl->info_lck);

      return RedoRecPtr;
  }
--- 4984,4993 ----
      /* use volatile pointer to prevent code rearrangement */
      volatile XLogCtlData *xlogctl = XLogCtl;

!     SpinLockAcquire(&xlogctl->info_lck);
      Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
      RedoRecPtr = xlogctl->Insert.RedoRecPtr;
!     SpinLockRelease(&xlogctl->info_lck);

      return RedoRecPtr;
  }
***************
*** 5165,5173 ****
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
          RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
!         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
      }

      /*
--- 5165,5173 ----
          /* use volatile pointer to prevent code rearrangement */
          volatile XLogCtlData *xlogctl = XLogCtl;

!         SpinLockAcquire(&xlogctl->info_lck);
          RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
!         SpinLockRelease(&xlogctl->info_lck);
      }

      /*
Index: src/backend/storage/buffer/bufmgr.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v
retrieving revision 1.200
diff -c -r1.200 bufmgr.c
*** src/backend/storage/buffer/bufmgr.c    22 Nov 2005 18:17:19 -0000    1.200
--- src/backend/storage/buffer/bufmgr.c    29 Dec 2005 06:41:25 -0000
***************
*** 442,448 ****
          /*
           * Need to lock the buffer header too in order to change its tag.
           */
!         LockBufHdr_NoHoldoff(buf);

          /*
           * Somebody could have pinned or re-dirtied the buffer while we were
--- 442,448 ----
          /*
           * Need to lock the buffer header too in order to change its tag.
           */
!         LockBufHdr(buf);

          /*
           * Somebody could have pinned or re-dirtied the buffer while we were
***************
*** 453,459 ****
          if (buf->refcount == 1 && !(buf->flags & BM_DIRTY))
              break;

!         UnlockBufHdr_NoHoldoff(buf);
          BufTableDelete(&newTag);
          LWLockRelease(BufMappingLock);
          UnpinBuffer(buf, true, false /* evidently recently used */ );
--- 453,459 ----
          if (buf->refcount == 1 && !(buf->flags & BM_DIRTY))
              break;

!         UnlockBufHdr(buf);
          BufTableDelete(&newTag);
          LWLockRelease(BufMappingLock);
          UnpinBuffer(buf, true, false /* evidently recently used */ );
***************
*** 473,479 ****
      buf->flags |= BM_TAG_VALID;
      buf->usage_count = 0;

!     UnlockBufHdr_NoHoldoff(buf);

      if (oldFlags & BM_TAG_VALID)
          BufTableDelete(&oldTag);
--- 473,479 ----
      buf->flags |= BM_TAG_VALID;
      buf->usage_count = 0;

!     UnlockBufHdr(buf);

      if (oldFlags & BM_TAG_VALID)
          BufTableDelete(&oldTag);
***************
*** 529,541 ****
       */
      LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);

!     /* Re-lock the buffer header (NoHoldoff since we have an LWLock) */
!     LockBufHdr_NoHoldoff(buf);

      /* If it's changed while we were waiting for lock, do nothing */
      if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
      {
!         UnlockBufHdr_NoHoldoff(buf);
          LWLockRelease(BufMappingLock);
          return;
      }
--- 529,541 ----
       */
      LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);

!     /* Re-lock the buffer header */
!     LockBufHdr(buf);

      /* If it's changed while we were waiting for lock, do nothing */
      if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
      {
!         UnlockBufHdr(buf);
          LWLockRelease(BufMappingLock);
          return;
      }
***************
*** 551,557 ****
       */
      if (buf->refcount != 0)
      {
!         UnlockBufHdr_NoHoldoff(buf);
          LWLockRelease(BufMappingLock);
          /* safety check: should definitely not be our *own* pin */
          if (PrivateRefCount[buf->buf_id] != 0)
--- 551,557 ----
       */
      if (buf->refcount != 0)
      {
!         UnlockBufHdr(buf);
          LWLockRelease(BufMappingLock);
          /* safety check: should definitely not be our *own* pin */
          if (PrivateRefCount[buf->buf_id] != 0)
***************
*** 569,575 ****
      buf->flags = 0;
      buf->usage_count = 0;

!     UnlockBufHdr_NoHoldoff(buf);

      /*
       * Remove the buffer from the lookup hashtable, if it was in there.
--- 569,575 ----
      buf->flags = 0;
      buf->usage_count = 0;

!     UnlockBufHdr(buf);

      /*
       * Remove the buffer from the lookup hashtable, if it was in there.
***************
*** 729,743 ****

      if (PrivateRefCount[b] == 0)
      {
!         /*
!          * Use NoHoldoff here because we don't want the unlock to be a
!          * potential place to honor a QueryCancel request. (The caller should
!          * be holding off interrupts anyway.)
!          */
!         LockBufHdr_NoHoldoff(buf);
          buf->refcount++;
          result = (buf->flags & BM_VALID) != 0;
!         UnlockBufHdr_NoHoldoff(buf);
      }
      else
      {
--- 729,738 ----

      if (PrivateRefCount[b] == 0)
      {
!         LockBufHdr(buf);
          buf->refcount++;
          result = (buf->flags & BM_VALID) != 0;
!         UnlockBufHdr(buf);
      }
      else
      {
***************
*** 766,779 ****

      if (PrivateRefCount[b] == 0)
          buf->refcount++;
!     /* NoHoldoff since we mustn't accept cancel interrupt here */
!     UnlockBufHdr_NoHoldoff(buf);
      PrivateRefCount[b]++;
      Assert(PrivateRefCount[b] > 0);
      ResourceOwnerRememberBuffer(CurrentResourceOwner,
                                  BufferDescriptorGetBuffer(buf));
-     /* Now we can accept cancel */
-     RESUME_INTERRUPTS();
  }

  /*
--- 761,771 ----

      if (PrivateRefCount[b] == 0)
          buf->refcount++;
!     UnlockBufHdr(buf);
      PrivateRefCount[b]++;
      Assert(PrivateRefCount[b] > 0);
      ResourceOwnerRememberBuffer(CurrentResourceOwner,
                                  BufferDescriptorGetBuffer(buf));
  }

  /*
***************
*** 811,818 ****
          Assert(!LWLockHeldByMe(buf->content_lock));
          Assert(!LWLockHeldByMe(buf->io_in_progress_lock));

!         /* NoHoldoff ensures we don't lose control before sending signal */
!         LockBufHdr_NoHoldoff(buf);

          /* Decrement the shared reference count */
          Assert(buf->refcount > 0);
--- 803,809 ----
          Assert(!LWLockHeldByMe(buf->content_lock));
          Assert(!LWLockHeldByMe(buf->io_in_progress_lock));

!         LockBufHdr(buf);

          /* Decrement the shared reference count */
          Assert(buf->refcount > 0);
***************
*** 841,851 ****
              int            wait_backend_pid = buf->wait_backend_pid;

              buf->flags &= ~BM_PIN_COUNT_WAITER;
!             UnlockBufHdr_NoHoldoff(buf);
              ProcSendSignal(wait_backend_pid);
          }
          else
!             UnlockBufHdr_NoHoldoff(buf);

          /*
           * If VACUUM is releasing an otherwise-unused buffer, send it to the
--- 832,842 ----
              int            wait_backend_pid = buf->wait_backend_pid;

              buf->flags &= ~BM_PIN_COUNT_WAITER;
!             UnlockBufHdr(buf);
              ProcSendSignal(wait_backend_pid);
          }
          else
!             UnlockBufHdr(buf);

          /*
           * If VACUUM is releasing an otherwise-unused buffer, send it to the
***************
*** 1300,1308 ****
       */

      /* To check if block content changes while flushing. - vadim 01/17/97 */
!     LockBufHdr_NoHoldoff(buf);
      buf->flags &= ~BM_JUST_DIRTIED;
!     UnlockBufHdr_NoHoldoff(buf);

      smgrwrite(reln,
                buf->tag.blockNum,
--- 1291,1299 ----
       */

      /* To check if block content changes while flushing. - vadim 01/17/97 */
!     LockBufHdr(buf);
      buf->flags &= ~BM_JUST_DIRTIED;
!     UnlockBufHdr(buf);

      smgrwrite(reln,
                buf->tag.blockNum,
***************
*** 1691,1699 ****

      if (buf)
      {
!         HOLD_INTERRUPTS();        /* don't want to die() partway through... */
!
!         LockBufHdr_NoHoldoff(buf);

          /*
           * Don't complain if flag bit not set; it could have been reset but we
--- 1682,1688 ----

      if (buf)
      {
!         LockBufHdr(buf);

          /*
           * Don't complain if flag bit not set; it could have been reset but we
***************
*** 1703,1715 ****
              buf->wait_backend_pid == MyProcPid)
              buf->flags &= ~BM_PIN_COUNT_WAITER;

!         UnlockBufHdr_NoHoldoff(buf);

          ProcCancelWaitForSignal();

          PinCountWaitBuf = NULL;
-
-         RESUME_INTERRUPTS();
      }
  }

--- 1692,1702 ----
              buf->wait_backend_pid == MyProcPid)
              buf->flags &= ~BM_PIN_COUNT_WAITER;

!         UnlockBufHdr(buf);

          ProcCancelWaitForSignal();

          PinCountWaitBuf = NULL;
      }
  }

***************
*** 1741,1749 ****
           * that it's critical to set dirty bit *before* logging changes with
           * XLogInsert() - see comments in SyncOneBuffer().
           */
!         LockBufHdr_NoHoldoff(buf);
          buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         UnlockBufHdr_NoHoldoff(buf);
      }
      else
          elog(ERROR, "unrecognized buffer lock mode: %d", mode);
--- 1728,1736 ----
           * that it's critical to set dirty bit *before* logging changes with
           * XLogInsert() - see comments in SyncOneBuffer().
           */
!         LockBufHdr(buf);
          buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         UnlockBufHdr(buf);
      }
      else
          elog(ERROR, "unrecognized buffer lock mode: %d", mode);
***************
*** 1773,1781 ****
           * that it's critical to set dirty bit *before* logging changes with
           * XLogInsert() - see comments in SyncOneBuffer().
           */
!         LockBufHdr_NoHoldoff(buf);
          buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         UnlockBufHdr_NoHoldoff(buf);

          return true;
      }
--- 1760,1768 ----
           * that it's critical to set dirty bit *before* logging changes with
           * XLogInsert() - see comments in SyncOneBuffer().
           */
!         LockBufHdr(buf);
          buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         UnlockBufHdr(buf);

          return true;
      }
***************
*** 1827,1851 ****
      {
          /* Try to acquire lock */
          LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
!         LockBufHdr_NoHoldoff(bufHdr);
          Assert(bufHdr->refcount > 0);
          if (bufHdr->refcount == 1)
          {
              /* Successfully acquired exclusive lock with pincount 1 */
!             UnlockBufHdr_NoHoldoff(bufHdr);
              return;
          }
          /* Failed, so mark myself as waiting for pincount 1 */
          if (bufHdr->flags & BM_PIN_COUNT_WAITER)
          {
!             UnlockBufHdr_NoHoldoff(bufHdr);
              LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
              elog(ERROR, "multiple backends attempting to wait for pincount 1");
          }
          bufHdr->wait_backend_pid = MyProcPid;
          bufHdr->flags |= BM_PIN_COUNT_WAITER;
          PinCountWaitBuf = bufHdr;
!         UnlockBufHdr_NoHoldoff(bufHdr);
          LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
          /* Wait to be signaled by UnpinBuffer() */
          ProcWaitForSignal();
--- 1814,1838 ----
      {
          /* Try to acquire lock */
          LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
!         LockBufHdr(bufHdr);
          Assert(bufHdr->refcount > 0);
          if (bufHdr->refcount == 1)
          {
              /* Successfully acquired exclusive lock with pincount 1 */
!             UnlockBufHdr(bufHdr);
              return;
          }
          /* Failed, so mark myself as waiting for pincount 1 */
          if (bufHdr->flags & BM_PIN_COUNT_WAITER)
          {
!             UnlockBufHdr(bufHdr);
              LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
              elog(ERROR, "multiple backends attempting to wait for pincount 1");
          }
          bufHdr->wait_backend_pid = MyProcPid;
          bufHdr->flags |= BM_PIN_COUNT_WAITER;
          PinCountWaitBuf = bufHdr;
!         UnlockBufHdr(bufHdr);
          LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
          /* Wait to be signaled by UnpinBuffer() */
          ProcWaitForSignal();
***************
*** 1926,1933 ****
           */
          LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

!         /* NoHoldoff is OK since we now have an LWLock */
!         LockBufHdr_NoHoldoff(buf);

          if (!(buf->flags & BM_IO_IN_PROGRESS))
              break;
--- 1913,1919 ----
           */
          LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

!         LockBufHdr(buf);

          if (!(buf->flags & BM_IO_IN_PROGRESS))
              break;
***************
*** 1938,1944 ****
           * an error (see AbortBufferIO).  If that's the case, we must wait for
           * him to get unwedged.
           */
!         UnlockBufHdr_NoHoldoff(buf);
          LWLockRelease(buf->io_in_progress_lock);
          WaitIO(buf);
      }
--- 1924,1930 ----
           * an error (see AbortBufferIO).  If that's the case, we must wait for
           * him to get unwedged.
           */
!         UnlockBufHdr(buf);
          LWLockRelease(buf->io_in_progress_lock);
          WaitIO(buf);
      }
***************
*** 1948,1961 ****
      if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
      {
          /* someone else already did the I/O */
!         UnlockBufHdr_NoHoldoff(buf);
          LWLockRelease(buf->io_in_progress_lock);
          return false;
      }

      buf->flags |= BM_IO_IN_PROGRESS;

!     UnlockBufHdr_NoHoldoff(buf);

      InProgressBuf = buf;
      IsForInput = forInput;
--- 1934,1947 ----
      if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
      {
          /* someone else already did the I/O */
!         UnlockBufHdr(buf);
          LWLockRelease(buf->io_in_progress_lock);
          return false;
      }

      buf->flags |= BM_IO_IN_PROGRESS;

!     UnlockBufHdr(buf);

      InProgressBuf = buf;
      IsForInput = forInput;
***************
*** 1986,1993 ****
  {
      Assert(buf == InProgressBuf);

!     /* NoHoldoff is OK since we must have an LWLock */
!     LockBufHdr_NoHoldoff(buf);

      Assert(buf->flags & BM_IO_IN_PROGRESS);
      buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
--- 1972,1978 ----
  {
      Assert(buf == InProgressBuf);

!     LockBufHdr(buf);

      Assert(buf->flags & BM_IO_IN_PROGRESS);
      buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
***************
*** 1995,2001 ****
          buf->flags &= ~BM_DIRTY;
      buf->flags |= set_flag_bits;

!     UnlockBufHdr_NoHoldoff(buf);

      InProgressBuf = NULL;

--- 1980,1986 ----
          buf->flags &= ~BM_DIRTY;
      buf->flags |= set_flag_bits;

!     UnlockBufHdr(buf);

      InProgressBuf = NULL;

***************
*** 2026,2040 ****
           */
          LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

!         /* NoHoldoff is OK since we now have an LWLock */
!         LockBufHdr_NoHoldoff(buf);
          Assert(buf->flags & BM_IO_IN_PROGRESS);
          if (IsForInput)
          {
              Assert(!(buf->flags & BM_DIRTY));
              /* We'd better not think buffer is valid yet */
              Assert(!(buf->flags & BM_VALID));
!             UnlockBufHdr_NoHoldoff(buf);
          }
          else
          {
--- 2011,2024 ----
           */
          LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

!         LockBufHdr(buf);
          Assert(buf->flags & BM_IO_IN_PROGRESS);
          if (IsForInput)
          {
              Assert(!(buf->flags & BM_DIRTY));
              /* We'd better not think buffer is valid yet */
              Assert(!(buf->flags & BM_VALID));
!             UnlockBufHdr(buf);
          }
          else
          {
***************
*** 2042,2048 ****

              sv_flags = buf->flags;
              Assert(sv_flags & BM_DIRTY);
!             UnlockBufHdr_NoHoldoff(buf);
              /* Issue notice if this is not the first failure... */
              if (sv_flags & BM_IO_ERROR)
              {
--- 2026,2032 ----

              sv_flags = buf->flags;
              Assert(sv_flags & BM_DIRTY);
!             UnlockBufHdr(buf);
              /* Issue notice if this is not the first failure... */
              if (sv_flags & BM_IO_ERROR)
              {
Index: src/backend/storage/lmgr/lwlock.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v
retrieving revision 1.36
diff -c -r1.36 lwlock.c
*** src/backend/storage/lmgr/lwlock.c    11 Dec 2005 21:02:18 -0000    1.36
--- src/backend/storage/lmgr/lwlock.c    29 Dec 2005 06:41:25 -0000
***************
*** 301,307 ****
          bool        mustwait;

          /* Acquire mutex.  Time spent holding mutex should be short! */
!         SpinLockAcquire_NoHoldoff(&lock->mutex);

          /* If retrying, allow LWLockRelease to release waiters again */
          if (retry)
--- 301,307 ----
          bool        mustwait;

          /* Acquire mutex.  Time spent holding mutex should be short! */
!         SpinLockAcquire(&lock->mutex);

          /* If retrying, allow LWLockRelease to release waiters again */
          if (retry)
***************
*** 352,358 ****
          lock->tail = proc;

          /* Can release the mutex now */
!         SpinLockRelease_NoHoldoff(&lock->mutex);

          /*
           * Wait until awakened.
--- 352,358 ----
          lock->tail = proc;

          /* Can release the mutex now */
!         SpinLockRelease(&lock->mutex);

          /*
           * Wait until awakened.
***************
*** 384,390 ****
      }

      /* We are done updating shared state of the lock itself. */
!     SpinLockRelease_NoHoldoff(&lock->mutex);

      /* Add lock to list of locks held by this backend */
      held_lwlocks[num_held_lwlocks++] = lockid;
--- 384,390 ----
      }

      /* We are done updating shared state of the lock itself. */
!     SpinLockRelease(&lock->mutex);

      /* Add lock to list of locks held by this backend */
      held_lwlocks[num_held_lwlocks++] = lockid;
***************
*** 423,429 ****
      HOLD_INTERRUPTS();

      /* Acquire mutex.  Time spent holding mutex should be short! */
!     SpinLockAcquire_NoHoldoff(&lock->mutex);

      /* If I can get the lock, do so quickly. */
      if (mode == LW_EXCLUSIVE)
--- 423,429 ----
      HOLD_INTERRUPTS();

      /* Acquire mutex.  Time spent holding mutex should be short! */
!     SpinLockAcquire(&lock->mutex);

      /* If I can get the lock, do so quickly. */
      if (mode == LW_EXCLUSIVE)
***************
*** 448,454 ****
      }

      /* We are done updating shared state of the lock itself. */
!     SpinLockRelease_NoHoldoff(&lock->mutex);

      if (mustwait)
      {
--- 448,454 ----
      }

      /* We are done updating shared state of the lock itself. */
!     SpinLockRelease(&lock->mutex);

      if (mustwait)
      {
***************
*** 494,500 ****
          held_lwlocks[i] = held_lwlocks[i + 1];

      /* Acquire mutex.  Time spent holding mutex should be short! */
!     SpinLockAcquire_NoHoldoff(&lock->mutex);

      /* Release my hold on lock */
      if (lock->exclusive > 0)
--- 494,500 ----
          held_lwlocks[i] = held_lwlocks[i + 1];

      /* Acquire mutex.  Time spent holding mutex should be short! */
!     SpinLockAcquire(&lock->mutex);

      /* Release my hold on lock */
      if (lock->exclusive > 0)
***************
*** 542,548 ****
      }

      /* We are done updating shared state of the lock itself. */
!     SpinLockRelease_NoHoldoff(&lock->mutex);

      /*
       * Awaken any waiters I removed from the queue.
--- 542,548 ----
      }

      /* We are done updating shared state of the lock itself. */
!     SpinLockRelease(&lock->mutex);

      /*
       * Awaken any waiters I removed from the queue.
Index: src/include/storage/buf_internals.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/storage/buf_internals.h,v
retrieving revision 1.83
diff -c -r1.83 buf_internals.h
*** src/include/storage/buf_internals.h    22 Nov 2005 18:17:31 -0000    1.83
--- src/include/storage/buf_internals.h    29 Dec 2005 06:41:26 -0000
***************
*** 138,147 ****
  #define FREENEXT_NOT_IN_LIST    (-2)

  /*
!  * Macros for acquiring/releasing a buffer header's spinlock.  The
!  * NoHoldoff cases may be used when we know that we hold some LWLock
!  * and therefore interrupts are already held off.  Do not apply these
!  * to local buffers!
   *
   * Note: as a general coding rule, if you are using these then you probably
   * want to be using a volatile-qualified pointer to the buffer header, to
--- 138,145 ----
  #define FREENEXT_NOT_IN_LIST    (-2)

  /*
!  * Macros for acquiring/releasing a buffer header's spinlock. Do not apply
!  * these to local buffers!
   *
   * Note: as a general coding rule, if you are using these then you probably
   * want to be using a volatile-qualified pointer to the buffer header, to
***************
*** 152,162 ****
      SpinLockAcquire(&(bufHdr)->buf_hdr_lock)
  #define UnlockBufHdr(bufHdr)  \
      SpinLockRelease(&(bufHdr)->buf_hdr_lock)
- #define LockBufHdr_NoHoldoff(bufHdr)  \
-     SpinLockAcquire_NoHoldoff(&(bufHdr)->buf_hdr_lock)
- #define UnlockBufHdr_NoHoldoff(bufHdr)    \
-     SpinLockRelease_NoHoldoff(&(bufHdr)->buf_hdr_lock)
-

  /* in buf_init.c */
  extern BufferDesc *BufferDescriptors;
--- 150,155 ----
Index: src/include/storage/spin.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/storage/spin.h,v
retrieving revision 1.26
diff -c -r1.26 spin.h
*** src/include/storage/spin.h    13 Oct 2005 06:17:34 -0000    1.26
--- src/include/storage/spin.h    29 Dec 2005 06:41:26 -0000
***************
*** 14,30 ****
   *        Acquire a spinlock, waiting if necessary.
   *        Time out and abort() if unable to acquire the lock in a
   *        "reasonable" amount of time --- typically ~ 1 minute.
-  *        Cancel/die interrupts are held off until the lock is released.
   *
   *    void SpinLockRelease(volatile slock_t *lock)
   *        Unlock a previously acquired lock.
-  *        Release the cancel/die interrupt holdoff.
-  *
-  *    void SpinLockAcquire_NoHoldoff(volatile slock_t *lock)
-  *    void SpinLockRelease_NoHoldoff(volatile slock_t *lock)
-  *        Same as above, except no interrupt holdoff processing is done.
-  *        This pair of macros may be used when there is a surrounding
-  *        interrupt holdoff.
   *
   *    bool SpinLockFree(slock_t *lock)
   *        Tests if the lock is free. Returns TRUE if free, FALSE if locked.
--- 14,22 ----
***************
*** 33,38 ****
--- 25,37 ----
   *    Callers must beware that the macro argument may be evaluated multiple
   *    times!
   *
+  *    We can safely allow Cancel/die interrupts the spinlock providing we
+  *    follow the coding rule: don't use any out-of-line calls that might
+  *    invoke CHECK_FOR_INTERRUPTS() while holding the locks. This is quite
+  *    reasonable since we also require that the spinlock holding time must
+  *    be short. Thus, the "ImmediateInterruptsOK" will help us to leave the
+  *    processing to the proper time.
+  *
   *    CAUTION: Care must be taken to ensure that loads and stores of
   *    shared memory values are not rearranged around spinlock acquire
   *    and release. This is done using the "volatile" qualifier: the C
***************
*** 63,83 ****

  #define SpinLockInit(lock)    S_INIT_LOCK(lock)

! #define SpinLockAcquire(lock) \
!     do { \
!         HOLD_INTERRUPTS(); \
!         S_LOCK(lock); \
!     } while (0)
!
! #define SpinLockAcquire_NoHoldoff(lock) S_LOCK(lock)
!
! #define SpinLockRelease(lock) \
!     do { \
!         S_UNLOCK(lock); \
!         RESUME_INTERRUPTS(); \
!     } while (0)

! #define SpinLockRelease_NoHoldoff(lock) S_UNLOCK(lock)

  #define SpinLockFree(lock)    S_LOCK_FREE(lock)

--- 62,70 ----

  #define SpinLockInit(lock)    S_INIT_LOCK(lock)

! #define SpinLockAcquire(lock) S_LOCK(lock)

! #define SpinLockRelease(lock) S_UNLOCK(lock)

  #define SpinLockFree(lock)    S_LOCK_FREE(lock)


Re: Fix spinlock usage in UnpinBuffer()

From
Tom Lane
Date:
Qingqing Zhou <zhouqq@cs.toronto.edu> writes:
> Remove SpinLockAcquire_NoHoldoff() and related. Now SpinLockAcquire() will
> not holdoff cancle/die interrupts. This will give cleaner and clearer
> useage of spinlock and also save a few cycles.

Applied with some minor comment updates.

I noticed that shmem.c holds ShmemIndexLock considerably longer than any
other spinlock is held, and across operations that could theoretically
fail (hashtable manipulations).  This doesn't matter a lot in the Unix
code because only the postmaster ever executes ShmemInitStruct, but
in the Windows port we run that code every time a backend is launched.
I think that we could convert that spinlock to an LWLock.  Will look into it.

            regards, tom lane

Re: Fix spinlock usage in UnpinBuffer()

From
"Qingqing Zhou"
Date:
"Tom Lane" <tgl@sss.pgh.pa.us> wrote
>
> I noticed that shmem.c holds ShmemIndexLock considerably longer than any
> other spinlock is held, and across operations that could theoretically
> fail (hashtable manipulations).  This doesn't matter a lot in the Unix
> code because only the postmaster ever executes ShmemInitStruct, but
> in the Windows port we run that code every time a backend is launched.
> I think that we could convert that spinlock to an LWLock.  Will look into
> it.
>

Yeah, use LWLock is a safer way in order to recover to unlock status.

Regards,
Qingqing