WIP: bufmgr rewrite per recent discussions - Mailing list pgsql-patches

From Tom Lane
Subject WIP: bufmgr rewrite per recent discussions
Date
Msg-id 17622.1108503550@sss.pgh.pa.us
Whole thread Raw
Responses Re: WIP: bufmgr rewrite per recent discussions  ("Mark Cave-Ayland" <m.cave-ayland@webbased.co.uk>)
Re: WIP: bufmgr rewrite per recent discussions  (Mark Kirkwood <markir@coretech.co.nz>)
List pgsql-patches
I don't feel this is quite ready to commit, but here it is if anyone
would like to try some performance testing.  Using "pgbench -s 10"
on a single-CPU machine, I find this code a little slower than CVS tip
at shared_buffers = 1000, but noticeably faster (~10% speedup) at
10000 buffers.  So it's not a dead loss for single-CPU anyway.  What
we need now is some performance measurements on multi-CPU boxes.

The bgwriter algorithm probably needs more work, maybe some more GUC
parameters.

            regards, tom lane

*** src/backend/catalog/index.c.orig    Mon Jan 10 15:02:19 2005
--- src/backend/catalog/index.c    Tue Feb 15 12:05:15 2005
***************
*** 1060,1066 ****
          /* Send out shared cache inval if necessary */
          if (!IsBootstrapProcessingMode())
              CacheInvalidateHeapTuple(pg_class, tuple);
-         BufferSync(-1, -1);
      }
      else if (dirty)
      {
--- 1060,1065 ----
*** src/backend/commands/dbcommands.c.orig    Fri Jan 28 10:09:11 2005
--- src/backend/commands/dbcommands.c    Tue Feb 15 15:49:12 2005
***************
*** 332,338 ****
       * up-to-date for the copy.  (We really only need to flush buffers for
       * the source database, but bufmgr.c provides no API for that.)
       */
!     BufferSync(-1, -1);

      /*
       * Close virtual file descriptors so the kernel has more available for
--- 332,338 ----
       * up-to-date for the copy.  (We really only need to flush buffers for
       * the source database, but bufmgr.c provides no API for that.)
       */
!     BufferSync();

      /*
       * Close virtual file descriptors so the kernel has more available for
***************
*** 1206,1212 ****
           * up-to-date for the copy.  (We really only need to flush buffers for
           * the source database, but bufmgr.c provides no API for that.)
           */
!         BufferSync(-1, -1);

  #ifndef WIN32

--- 1206,1212 ----
           * up-to-date for the copy.  (We really only need to flush buffers for
           * the source database, but bufmgr.c provides no API for that.)
           */
!         BufferSync();

  #ifndef WIN32

*** src/backend/commands/vacuum.c.orig    Fri Dec 31 17:45:38 2004
--- src/backend/commands/vacuum.c    Sun Feb 13 18:35:03 2005
***************
*** 35,41 ****
  #include "commands/vacuum.h"
  #include "executor/executor.h"
  #include "miscadmin.h"
- #include "storage/buf_internals.h"
  #include "storage/freespace.h"
  #include "storage/sinval.h"
  #include "storage/smgr.h"
--- 35,40 ----
*** src/backend/postmaster/bgwriter.c.orig    Mon Jan 10 15:02:20 2005
--- src/backend/postmaster/bgwriter.c    Tue Feb 15 15:49:23 2005
***************
*** 116,124 ****
   * GUC parameters
   */
  int            BgWriterDelay = 200;
- int            BgWriterPercent = 1;
- int            BgWriterMaxPages = 100;
-
  int            CheckPointTimeout = 300;
  int            CheckPointWarning = 30;

--- 116,121 ----
***************
*** 370,376 ****
              n = 1;
          }
          else
!             n = BufferSync(BgWriterPercent, BgWriterMaxPages);

          /*
           * Nap for the configured time or sleep for 10 seconds if there
--- 367,373 ----
              n = 1;
          }
          else
!             n = BgBufferSync();

          /*
           * Nap for the configured time or sleep for 10 seconds if there
*** src/backend/storage/buffer/README.orig    Mon Apr 19 19:27:17 2004
--- src/backend/storage/buffer/README    Tue Feb 15 15:54:52 2005
***************
*** 4,12 ****
  --------------------------------------

  There are two separate access control mechanisms for shared disk buffers:
! reference counts (a/k/a pin counts) and buffer locks.  (Actually, there's
! a third level of access control: one must hold the appropriate kind of
! lock on a relation before one can legally access any page belonging to
  the relation.  Relation-level locks are not discussed here.)

  Pins: one must "hold a pin on" a buffer (increment its reference count)
--- 4,12 ----
  --------------------------------------

  There are two separate access control mechanisms for shared disk buffers:
! reference counts (a/k/a pin counts) and buffer content locks.  (Actually,
! there's a third level of access control: one must hold the appropriate kind
! of lock on a relation before one can legally access any page belonging to
  the relation.  Relation-level locks are not discussed here.)

  Pins: one must "hold a pin on" a buffer (increment its reference count)
***************
*** 26,32 ****
  better hold one first.)  Pins may not be held across transaction
  boundaries, however.

! Buffer locks: there are two kinds of buffer locks, shared and exclusive,
  which act just as you'd expect: multiple backends can hold shared locks on
  the same buffer, but an exclusive lock prevents anyone else from holding
  either shared or exclusive lock.  (These can alternatively be called READ
--- 26,32 ----
  better hold one first.)  Pins may not be held across transaction
  boundaries, however.

! Buffer content locks: there are two kinds of buffer lock, shared and exclusive,
  which act just as you'd expect: multiple backends can hold shared locks on
  the same buffer, but an exclusive lock prevents anyone else from holding
  either shared or exclusive lock.  (These can alternatively be called READ
***************
*** 38,49 ****
  Buffer access rules:

  1. To scan a page for tuples, one must hold a pin and either shared or
! exclusive lock.  To examine the commit status (XIDs and status bits) of
! a tuple in a shared buffer, one must likewise hold a pin and either shared
  or exclusive lock.

  2. Once one has determined that a tuple is interesting (visible to the
! current transaction) one may drop the buffer lock, yet continue to access
  the tuple's data for as long as one holds the buffer pin.  This is what is
  typically done by heap scans, since the tuple returned by heap_fetch
  contains a pointer to tuple data in the shared buffer.  Therefore the
--- 38,49 ----
  Buffer access rules:

  1. To scan a page for tuples, one must hold a pin and either shared or
! exclusive content lock.  To examine the commit status (XIDs and status bits)
! of a tuple in a shared buffer, one must likewise hold a pin and either shared
  or exclusive lock.

  2. Once one has determined that a tuple is interesting (visible to the
! current transaction) one may drop the content lock, yet continue to access
  the tuple's data for as long as one holds the buffer pin.  This is what is
  typically done by heap scans, since the tuple returned by heap_fetch
  contains a pointer to tuple data in the shared buffer.  Therefore the
***************
*** 52,60 ****
  of visibility is made.

  3. To add a tuple or change the xmin/xmax fields of an existing tuple,
! one must hold a pin and an exclusive lock on the containing buffer.
  This ensures that no one else might see a partially-updated state of the
! tuple.

  4. It is considered OK to update tuple commit status bits (ie, OR the
  values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
--- 52,60 ----
  of visibility is made.

  3. To add a tuple or change the xmin/xmax fields of an existing tuple,
! one must hold a pin and an exclusive content lock on the containing buffer.
  This ensures that no one else might see a partially-updated state of the
! tuple while they are doing visibility checks.

  4. It is considered OK to update tuple commit status bits (ie, OR the
  values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
***************
*** 76,82 ****
  might expect to examine again.  Note that another backend might pin the
  buffer (increment the refcount) while one is performing the cleanup, but
  it won't be able to actually examine the page until it acquires shared
! or exclusive lock.


  VACUUM FULL ignores rule #5, because it instead acquires exclusive lock at
--- 76,82 ----
  might expect to examine again.  Note that another backend might pin the
  buffer (increment the refcount) while one is performing the cleanup, but
  it won't be able to actually examine the page until it acquires shared
! or exclusive content lock.


  VACUUM FULL ignores rule #5, because it instead acquires exclusive lock at
***************
*** 97,245 ****
  single relation anyway.


! Buffer replacement strategy interface
! -------------------------------------
!
! The file freelist.c contains the buffer cache replacement strategy.
! The interface to the strategy is:
!
!     BufferDesc *StrategyBufferLookup(BufferTag *tagPtr, bool recheck,
!                                      int *cdb_found_index)
!
! This is always the first call made by the buffer manager to check if a disk
! page is in memory. If so, the function returns the buffer descriptor and no
! further action is required. If the page is not in memory,
! StrategyBufferLookup() returns NULL.
!
! The flag recheck tells the strategy that this is a second lookup after
! flushing a dirty block. If the buffer manager has to evict another buffer,
! it will release the bufmgr lock while doing the write IO. During this time,
! another backend could possibly fault in the same page this backend is after,
! so we have to check again after the IO is done if the page is in memory now.
!
! *cdb_found_index is set to the index of the found CDB, or -1 if none.
! This is not intended to be used by the caller, except to pass to
! StrategyReplaceBuffer().
!
!     BufferDesc *StrategyGetBuffer(int *cdb_replace_index)
!
! The buffer manager calls this function to get an unpinned cache buffer whose
! content can be evicted. The returned buffer might be empty, clean or dirty.
!
! The returned buffer is only a candidate for replacement.  It is possible that
! while the buffer is being written, another backend finds and modifies it, so
! that it is dirty again.  The buffer manager will then have to call
! StrategyGetBuffer() again to ask for another candidate.
!
! *cdb_replace_index is set to the index of the candidate CDB, or -1 if none
! (meaning we are using a previously free buffer).  This is not intended to be
! used by the caller, except to pass to StrategyReplaceBuffer().
!
!     void StrategyReplaceBuffer(BufferDesc *buf, BufferTag *newTag,
!                                int cdb_found_index, int cdb_replace_index)
!
! Called by the buffer manager at the time it is about to change the association
! of a buffer with a disk page.

! Before this call, StrategyBufferLookup() still has to find the buffer under
! its old tag, even if it was returned by StrategyGetBuffer() as a candidate
! for replacement.
!
! After this call, this buffer must be returned for a lookup of the new page
! identified by *newTag.
!
! cdb_found_index and cdb_replace_index must be the auxiliary values
! returned by previous calls to StrategyBufferLookup and StrategyGetBuffer.
!
!     void StrategyInvalidateBuffer(BufferDesc *buf)
!
! Called by the buffer manager to inform the strategy that the content of this
! buffer is being thrown away. This happens for example in the case of dropping
! a relation.  The buffer must be clean and unpinned on call.
!
! If the buffer was associated with a disk page, StrategyBufferLookup()
! must not return it for this page after the call.
!
!     void StrategyHintVacuum(bool vacuum_active)
!
! Because VACUUM reads all relations of the entire database through the buffer
! manager, it can greatly disturb the buffer replacement strategy. This function
! is used by VACUUM to inform the strategy that subsequent buffer lookups are
! (or are not) caused by VACUUM scanning relations.


  Buffer replacement strategy
  ---------------------------

! The buffer replacement strategy actually used in freelist.c is a version of
! the Adaptive Replacement Cache (ARC) specially tailored for PostgreSQL.
!
! The algorithm works as follows:

- C is the size of the cache in number of pages (a/k/a shared_buffers or
- NBuffers).  ARC uses 2*C Cache Directory Blocks (CDB). A cache directory block
- is always associated with one unique file page.  It may point to one shared
- buffer, or may indicate that the file page is not in a buffer but has been
- accessed recently.
-
- All CDB entries are managed in 4 LRU lists named T1, T2, B1 and B2. The T1 and
- T2 lists are the "real" cache entries, linking a file page to a memory buffer
- where the page is currently cached. Consequently T1len+T2len <= C. B1 and B2
- are ghost cache directories that extend T1 and T2 so that the strategy
- remembers pages longer. The strategy tries to keep B1len+T1len and B2len+T2len
- both at C. T1len and T2len vary over the runtime depending on the lookup
- pattern and its resulting cache hits. The desired size of T1len is called
- T1target.
-
- Assuming we have a full cache, one of 5 cases happens on a lookup:
-
- MISS    On a cache miss, depending on T1target and the actual T1len
-     the LRU buffer of either T1 or T2 is evicted. Its CDB is removed
-     from the T list and added as MRU of the corresponding B list.
-     The now free buffer is replaced with the requested page
-     and added as MRU of T1.
-
- T1 hit    The T1 CDB is moved to the MRU position of the T2 list.
-
- T2 hit    The T2 CDB is moved to the MRU position of the T2 list.
-
- B1 hit    This means that a buffer that was evicted from the T1
-     list is now requested again, indicating that T1target is
-     too small (otherwise it would still be in T1 and thus in
-     memory). The strategy raises T1target, evicts a buffer
-     depending on T1target and T1len and places the CDB at
-     MRU of T2.
-
- B2 hit    This means the opposite of B1, the T2 list is probably too
-     small. So the strategy lowers T1target, evicts a buffer
-     and places the CDB at MRU of T2.
-
- Thus, every page that is found on lookup in any of the four lists
- ends up as the MRU of the T2 list. The T2 list therefore is the
- "frequency" cache, holding frequently requested pages.
-
- Every page that is seen for the first time ends up as the MRU of the T1
- list. The T1 list is the "recency" cache, holding recent newcomers.
-
- The tailoring done for PostgreSQL has to do with the way the query executor
- works. A typical UPDATE or DELETE first scans the relation, searching for the
- tuples and then calls heap_update() or heap_delete(). This causes at least 2
- lookups for the block in the same statement. In the case of multiple matches
- in one block even more often. As a result, every block touched in an UPDATE or
- DELETE would directly jump into the T2 cache, which is wrong. To prevent this
- the strategy remembers which transaction added a buffer to the T1 list and
- will not promote it from there into the T2 cache during the same transaction.
-
- Another specialty is the change of the strategy during VACUUM.  Lookups during
- VACUUM do not represent application needs, and do not suggest that the page
- will be hit again soon, so it would be wrong to change the cache balance
- T1target due to that or to cause massive cache evictions. Therefore, a page
- read in to satisfy vacuum is placed at the LRU position of the T1 list, for
- immediate reuse.  Also, if we happen to get a hit on a CDB entry during
- VACUUM, we do not promote the page above its current position in the list.
  Since VACUUM usually requests many pages very fast, the effect of this is that
  it will get back the very buffers it filled and possibly modified on the next
  call and will therefore do its work in a few shared memory buffers, while
  being able to use whatever it finds in the cache already.  This also implies
  that most of the write traffic caused by a VACUUM will be done by the VACUUM
  itself and not pushed off onto other processes.
--- 97,242 ----
  single relation anyway.


! Buffer manager's internal locking
! ---------------------------------

! Before PostgreSQL 8.1, all operations of the shared buffer manager itself
! were protected by a single system-wide lock, the BufMgrLock, which
! unsurprisingly proved to be a source of contention.  The new locking scheme
! avoids grabbing system-wide exclusive locks in common code paths.  It works
! like this:
!
! * There is a system-wide LWLock, the BufMappingLock, that notionally
! protects the mapping from buffer tags (page identifiers) to buffers.
! (Physically, it can be thought of as protecting the hash table maintained
! by buf_table.c.)  To look up whether a buffer exists for a tag, it is
! sufficient to obtain share lock on the BufMappingLock.  Note that one
! must pin the found buffer, if any, before releasing the BufMappingLock.
! To alter the page assignment of any buffer, one must hold exclusive lock
! on the BufMappingLock.  This lock must be held across adjusting the buffer's
! header fields and changing the buf_table hash table.  The only common
! operation that needs exclusive lock is reading in a page that was not
! in shared buffers already, which will require at least a kernel call
! and usually a wait for I/O, so it will be slow anyway.
!
! * A separate system-wide LWLock, the BufFreelistLock, provides mutual
! exclusion for operations that access the buffer free list or select
! buffers for replacement.  This is always taken in exclusive mode since
! there are no read-only operations on those data structures.  The buffer
! management policy is designed so that BufFreelistLock need not be taken
! except in paths that will require I/O, and thus will be slow anyway.
! (Details appear below.)  It is never necessary to hold the BufMappingLock
! and the BufFreelistLock at the same time.
!
! * Each buffer header contains a spinlock that must be taken when examining
! or changing fields of that buffer header.  This allows operations such as
! ReleaseBuffer to make local state changes without taking any system-wide
! lock.  We use a spinlock, not an LWLock, since there are no cases where
! the lock needs to be held for more than a few instructions.
!
! Note that a buffer header's spinlock does not control access to the data
! held within the buffer.  Each buffer header also contains an LWLock, the
! "buffer content lock", that *does* represent the right to access the data
! in the buffer.  It is used per the rules above.
!
! There is yet another set of per-buffer LWLocks, the io_in_progress locks,
! that are used to wait for I/O on a buffer to complete.  The process doing
! a read or write takes exclusive lock for the duration, and processes that
! need to wait for completion try to take shared locks (which they release
! immediately upon obtaining).  XXX on systems where an LWLock represents
! nontrivial resources, it's fairly annoying to need so many locks.  Possibly
! we could use per-backend LWLocks instead (a buffer header would then contain
! a field to show which backend is doing its I/O).


  Buffer replacement strategy
  ---------------------------

! There is a "free list" of buffers that are prime candidates for replacement.
! In particular, buffers that are completely free (contain no valid page) are
! always in this list.  We may also throw buffers into this list if we
! consider their pages unlikely to be needed soon.  The list is singly-linked
! using fields in the buffer headers; we maintain head and tail pointers in
! global variables.  (Note: although the list links are in the buffer headers,
! they are considered to be protected by the BufFreelistLock, not the
! buffer-header spinlocks.)  To choose a victim buffer to recycle when there
! are no free buffers available, we use a simple clock-sweep algorithm, which
! avoids the need to take system-wide locks during common operations.  It
! works like this:
!
! Each buffer header contains a "recently used" flag bit, which is set true
! whenever the buffer is unpinned.  (Setting this bit requires only the
! buffer header spinlock, which would have to be taken anyway to decrement
! the buffer reference count, so it's nearly free.)
!
! The "clock hand" is a buffer index, NextVictimBuffer, that moves circularly
! through all the available buffers.  NextVictimBuffer is protected by the
! BufFreelistLock.
!
! The algorithm for a process that needs to obtain a victim buffer is:
!
! 1. Obtain BufFreelistLock.
!
! 2. If buffer free list is nonempty, remove its head buffer.  If the buffer
! is pinned or has its "recently used" bit set, it cannot be used; ignore
! it and return to the start of step 2.  Otherwise, pin the buffer,
! release BufFreelistLock, and return the buffer.
!
! 3. Otherwise, select the buffer pointed to by NextVictimBuffer, and
! circularly advance NextVictimBuffer for next time.
!
! 4. If the selected buffer is pinned or has its "recently used" bit set,
! it cannot be used.  Clear its "recently used" bit and return to step 3
! to examine the next buffer.
!
! 5. Pin the selected buffer, release BufFreelistLock, and return the buffer.
!
! (Note that if the selected buffer is dirty, we will have to write it out
! before we can recycle it; if someone else pins the buffer meanwhile we will
! have to give up and try another buffer.  This however is not a concern
! of the basic select-a-victim-buffer algorithm.)
!
! This scheme selects only victim buffers that have gone unused since they
! were last passed over by the "clock hand".
!
! A special provision is that while running VACUUM, a backend does not set the
! "recently used" bit on buffers it accesses.  In fact, if ReleaseBuffer sees
! that it is dropping the pin count to zero and the "recently used" bit is not
! set, then it appends the buffer to the tail of the free list.  (This implies
! that VACUUM, but only VACUUM, must take the BufFreelistLock during
! ReleaseBuffer; this shouldn't create much of a contention problem.)  This
! provision encourages VACUUM to work in a relatively small number of buffers
! rather than blowing out the entire buffer cache.  It is reasonable since a
! page that has been touched only by VACUUM is unlikely to be needed again
! soon.

  Since VACUUM usually requests many pages very fast, the effect of this is that
  it will get back the very buffers it filled and possibly modified on the next
  call and will therefore do its work in a few shared memory buffers, while
  being able to use whatever it finds in the cache already.  This also implies
  that most of the write traffic caused by a VACUUM will be done by the VACUUM
  itself and not pushed off onto other processes.
+
+
+ Background writer's processing
+ ------------------------------
+
+ The background writer is designed to write out pages that are likely to be
+ recycled soon, thereby offloading the writing work from active backends.
+ To do this, it scans forward circularly from the current position of
+ NextVictimBuffer (which it does not change!), looking for buffers that are
+ dirty and not pinned nor marked "recently used".  It pins, writes, and
+ releases any such buffer.
+
+ If we can assume that reading NextVictimBuffer is an atomic action, then
+ the writer doesn't even need to take the BufFreelistLock in order to look
+ for buffers to write; it needs only to spinlock each buffer header for long
+ enough to check the dirtybit.  Even without that assumption, the writer
+ only needs to take the lock long enough to read the variable value, not
+ while scanning the buffers.  (This is a very substantial improvement in
+ the contention cost of the writer compared to PG 8.0.)
+
+ During a checkpoint, the writer's strategy must be to write every dirty
+ buffer (pinned or not!).  We may as well make it start this scan from
+ NextVictimBuffer, however, so that the first-to-be-written pages are the
+ ones that backends might otherwise have to write for themselves soon.
*** src/backend/storage/buffer/buf_init.c.orig    Thu Feb  3 18:29:11 2005
--- src/backend/storage/buffer/buf_init.c    Tue Feb 15 12:52:31 2005
***************
*** 22,27 ****
--- 22,29 ----
  Block       *BufferBlockPointers;
  int32       *PrivateRefCount;

+ static char *BufferBlocks;
+
  /* statistics counters */
  long int    ReadBufferCount;
  long int    ReadLocalBufferCount;
***************
*** 50,65 ****
   *
   * Synchronization/Locking:
   *
-  * BufMgrLock lock -- must be acquired before manipulating the
-  *        buffer search datastructures (lookup/freelist, as well as the
-  *        flag bits of any buffer).  Must be released
-  *        before exit and before doing any IO.
-  *
   * IO_IN_PROGRESS -- this is a flag in the buffer descriptor.
   *        It must be set when an IO is initiated and cleared at
   *        the end of the IO.    It is there to make sure that one
   *        process doesn't start to use a buffer while another is
!  *        faulting it in.  see IOWait/IOSignal.
   *
   * refcount --    Counts the number of processes holding pins on a buffer.
   *        A buffer is pinned during IO and immediately after a BufferAlloc().
--- 52,62 ----
   *
   * Synchronization/Locking:
   *
   * IO_IN_PROGRESS -- this is a flag in the buffer descriptor.
   *        It must be set when an IO is initiated and cleared at
   *        the end of the IO.    It is there to make sure that one
   *        process doesn't start to use a buffer while another is
!  *        faulting it in.  see WaitIO and related routines.
   *
   * refcount --    Counts the number of processes holding pins on a buffer.
   *        A buffer is pinned during IO and immediately after a BufferAlloc().
***************
*** 85,94 ****
  void
  InitBufferPool(void)
  {
-     char       *BufferBlocks;
      bool        foundBufs,
                  foundDescs;
-     int            i;

      BufferDescriptors = (BufferDesc *)
          ShmemInitStruct("Buffer Descriptors",
--- 82,89 ----
***************
*** 102,153 ****
      {
          /* both should be present or neither */
          Assert(foundDescs && foundBufs);
      }
      else
      {
          BufferDesc *buf;
!         char       *block;
!
!         /*
!          * It's probably not really necessary to grab the lock --- if
!          * there's anyone else attached to the shmem at this point, we've
!          * got problems.
!          */
!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);

          buf = BufferDescriptors;
-         block = BufferBlocks;

          /*
           * Initialize all the buffer headers.
           */
!         for (i = 0; i < NBuffers; block += BLCKSZ, buf++, i++)
          {
!             Assert(ShmemIsValid((unsigned long) block));

!             /*
!              * The bufNext fields link together all totally-unused buffers.
!              * Subsequent management of this list is done by
!              * StrategyGetBuffer().
!              */
!             buf->bufNext = i + 1;

-             CLEAR_BUFFERTAG(buf->tag);
              buf->buf_id = i;

!             buf->data = MAKE_OFFSET(block);
!             buf->flags = 0;
!             buf->refcount = 0;
              buf->io_in_progress_lock = LWLockAssign();
!             buf->cntx_lock = LWLockAssign();
!             buf->cntxDirty = false;
!             buf->wait_backend_id = 0;
          }

          /* Correct last entry of linked list */
!         BufferDescriptors[NBuffers - 1].bufNext = -1;
!
!         LWLockRelease(BufMgrLock);
      }

      /* Init other shared buffer-management stuff */
--- 97,137 ----
      {
          /* both should be present or neither */
          Assert(foundDescs && foundBufs);
+         /* note: this path is only taken in EXEC_BACKEND case */
      }
      else
      {
          BufferDesc *buf;
!         int            i;

          buf = BufferDescriptors;

          /*
           * Initialize all the buffer headers.
           */
!         for (i = 0; i < NBuffers; buf++, i++)
          {
!             CLEAR_BUFFERTAG(buf->tag);
!             buf->flags = 0;
!             buf->refcount = 0;
!             buf->wait_backend_id = 0;

!             SpinLockInit(&buf->buf_hdr_lock);

              buf->buf_id = i;

!             /*
!              * Initially link all the buffers together as unused.
!              * Subsequent management of this list is done by freelist.c.
!              */
!             buf->freeNext = i + 1;
!
              buf->io_in_progress_lock = LWLockAssign();
!             buf->content_lock = LWLockAssign();
          }

          /* Correct last entry of linked list */
!         BufferDescriptors[NBuffers - 1].freeNext = FREENEXT_END_OF_LIST;
      }

      /* Init other shared buffer-management stuff */
***************
*** 162,173 ****
   * buffer pool.
   *
   * NB: this is called before InitProcess(), so we do not have a PGPROC and
!  * cannot do LWLockAcquire; hence we can't actually access the bufmgr's
   * shared memory yet.  We are only initializing local data here.
   */
  void
  InitBufferPoolAccess(void)
  {
      int            i;

      /*
--- 146,158 ----
   * buffer pool.
   *
   * NB: this is called before InitProcess(), so we do not have a PGPROC and
!  * cannot do LWLockAcquire; hence we can't actually access stuff in
   * shared memory yet.  We are only initializing local data here.
   */
  void
  InitBufferPoolAccess(void)
  {
+     char       *block;
      int            i;

      /*
***************
*** 179,190 ****
                                         sizeof(*PrivateRefCount));

      /*
!      * Convert shmem offsets into addresses as seen by this process. This
!      * is just to speed up the BufferGetBlock() macro.  It is OK to do this
!      * without any lock since the data pointers never change.
       */
      for (i = 0; i < NBuffers; i++)
!         BufferBlockPointers[i] = (Block) MAKE_PTR(BufferDescriptors[i].data);
  }

  /*
--- 164,181 ----
                                         sizeof(*PrivateRefCount));

      /*
!      * Construct addresses for the individual buffer data blocks.  We do
!      * this just to speed up the BufferGetBlock() macro.  (Since the
!      * addresses should be the same in every backend, we could inherit
!      * this data from the postmaster --- but in the EXEC_BACKEND case
!      * that doesn't work.)
       */
+     block = BufferBlocks;
      for (i = 0; i < NBuffers; i++)
!     {
!         BufferBlockPointers[i] = (Block) block;
!         block += BLCKSZ;
!     }
  }

  /*
*** src/backend/storage/buffer/buf_table.c.orig    Thu Feb  3 18:29:11 2005
--- src/backend/storage/buffer/buf_table.c    Tue Feb 15 12:52:31 2005
***************
*** 3,14 ****
   * buf_table.c
   *      routines for mapping BufferTags to buffer indexes.
   *
!  * NOTE: this module is called only by freelist.c, and the "buffer IDs"
!  * it deals with are whatever freelist.c needs them to be; they may not be
!  * directly equivalent to Buffer numbers.
!  *
!  * Note: all routines in this file assume that the BufMgrLock is held
!  * by the caller, so no synchronization is needed.
   *
   *
   * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
--- 3,11 ----
   * buf_table.c
   *      routines for mapping BufferTags to buffer indexes.
   *
!  * Note: the routines in this file do no locking of their own.  The caller
!  * must hold a suitable lock on the BufMappingLock, as specified in the
!  * comments.
   *
   *
   * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
***************
*** 74,90 ****
  /*
   * BufTableLookup
   *        Lookup the given BufferTag; return buffer ID, or -1 if not found
   */
  int
  BufTableLookup(BufferTag *tagPtr)
  {
      BufferLookupEnt *result;

-     if (tagPtr->blockNum == P_NEW)
-         return -1;
-
      result = (BufferLookupEnt *)
          hash_search(SharedBufHash, (void *) tagPtr, HASH_FIND, NULL);
      if (!result)
          return -1;

--- 71,87 ----
  /*
   * BufTableLookup
   *        Lookup the given BufferTag; return buffer ID, or -1 if not found
+  *
+  * Caller must hold at least share lock on BufMappingLock
   */
  int
  BufTableLookup(BufferTag *tagPtr)
  {
      BufferLookupEnt *result;

      result = (BufferLookupEnt *)
          hash_search(SharedBufHash, (void *) tagPtr, HASH_FIND, NULL);
+
      if (!result)
          return -1;

***************
*** 93,106 ****

  /*
   * BufTableInsert
!  *        Insert a hashtable entry for given tag and buffer ID
   */
! void
  BufTableInsert(BufferTag *tagPtr, int buf_id)
  {
      BufferLookupEnt *result;
      bool        found;

      result = (BufferLookupEnt *)
          hash_search(SharedBufHash, (void *) tagPtr, HASH_ENTER, &found);

--- 90,112 ----

  /*
   * BufTableInsert
!  *        Insert a hashtable entry for given tag and buffer ID,
!  *        unless an entry already exists for that tag
!  *
!  * Returns -1 on successful insertion.  If a conflicting entry exists
!  * already, returns the buffer ID in that entry.
!  *
!  * Caller must hold write lock on BufMappingLock
   */
! int
  BufTableInsert(BufferTag *tagPtr, int buf_id)
  {
      BufferLookupEnt *result;
      bool        found;

+     Assert(buf_id >= 0);        /* -1 is reserved for not-in-table */
+     Assert(tagPtr->blockNum != P_NEW); /* invalid tag */
+
      result = (BufferLookupEnt *)
          hash_search(SharedBufHash, (void *) tagPtr, HASH_ENTER, &found);

***************
*** 109,123 ****
                  (errcode(ERRCODE_OUT_OF_MEMORY),
                   errmsg("out of shared memory")));

!     if (found)                    /* found something already in the table? */
!         elog(ERROR, "shared buffer hash table corrupted");

      result->id = buf_id;
  }

  /*
   * BufTableDelete
   *        Delete the hashtable entry for given tag (which must exist)
   */
  void
  BufTableDelete(BufferTag *tagPtr)
--- 115,133 ----
                  (errcode(ERRCODE_OUT_OF_MEMORY),
                   errmsg("out of shared memory")));

!     if (found)                    /* found something already in the table */
!         return result->id;

      result->id = buf_id;
+
+     return -1;
  }

  /*
   * BufTableDelete
   *        Delete the hashtable entry for given tag (which must exist)
+  *
+  * Caller must hold write lock on BufMappingLock
   */
  void
  BufTableDelete(BufferTag *tagPtr)
*** src/backend/storage/buffer/bufmgr.c.orig    Mon Jan 10 15:02:21 2005
--- src/backend/storage/buffer/bufmgr.c    Tue Feb 15 15:49:32 2005
***************
*** 25,31 ****
   *
   * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
   *
!  * BufferSync() -- flush all (or some) dirty buffers in the buffer pool.
   *
   * InitBufferPool() -- Init the buffer module.
   *
--- 25,33 ----
   *
   * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
   *
!  * BufferSync() -- flush all dirty buffers in the buffer pool.
!  *
!  * BgBufferSync() -- flush some dirty buffers in the buffer pool.
   *
   * InitBufferPool() -- Init the buffer module.
   *
***************
*** 50,65 ****
  #include "pgstat.h"


! #define BufferGetLSN(bufHdr)    \
!     (*((XLogRecPtr*) MAKE_PTR((bufHdr)->data)))


! /* GUC variable */
  bool        zero_damaged_pages = false;

- #ifdef NOT_USED
- bool        ShowPinTrace = false;
- #endif

  long        NDirectFileRead;    /* some I/O's are direct file access.
                                   * bypass bufmgr */
--- 52,71 ----
  #include "pgstat.h"


! /* Note: these two macros only work on shared buffers, not local ones! */
! #define BufHdrGetBlock(bufHdr)    BufferBlockPointers[(bufHdr)->buf_id]
! #define BufferGetLSN(bufHdr)    (*((XLogRecPtr*) BufHdrGetBlock(bufHdr)))
!
! /* Note: this macro only works on local buffers, not shared ones! */
! #define LocalBufHdrGetBlock(bufHdr)    \
!     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]


! /* GUC variables */
  bool        zero_damaged_pages = false;
+ int            BgWriterPercent = 1;
+ int            BgWriterMaxPages = 100;


  long        NDirectFileRead;    /* some I/O's are direct file access.
                                   * bypass bufmgr */
***************
*** 73,90 ****
  static BufferDesc *PinCountWaitBuf = NULL;


! static void PinBuffer(BufferDesc *buf, bool fixOwner);
! static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
  static void WaitIO(BufferDesc *buf);
! static void StartBufferIO(BufferDesc *buf, bool forInput);
! static void TerminateBufferIO(BufferDesc *buf, int err_flag);
! static void ContinueBufferIO(BufferDesc *buf, bool forInput);
  static void buffer_write_error_callback(void *arg);
- static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum,
-                    bool bufferLockHeld);
  static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
              bool *foundPtr);
! static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock);
  static void write_buffer(Buffer buffer, bool unpin);


--- 79,96 ----
  static BufferDesc *PinCountWaitBuf = NULL;


! static bool PinBuffer(BufferDesc *buf);
! static void PinBuffer_Locked(BufferDesc *buf);
! static void UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK);
! static bool SyncOneBuffer(int buf_id, bool skip_pinned);
  static void WaitIO(BufferDesc *buf);
! static bool StartBufferIO(BufferDesc *buf, bool forInput);
! static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
!                               int set_flag_bits);
  static void buffer_write_error_callback(void *arg);
  static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
              bool *foundPtr);
! static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
  static void write_buffer(Buffer buffer, bool unpin);


***************
*** 106,132 ****
  Buffer
  ReadBuffer(Relation reln, BlockNumber blockNum)
  {
-     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
-     return ReadBufferInternal(reln, blockNum, false);
- }
-
- /*
-  * ReadBufferInternal -- internal version of ReadBuffer with more options
-  *
-  * bufferLockHeld: if true, caller already acquired the bufmgr lock.
-  * (This is assumed never to be true if dealing with a local buffer!)
-  *
-  * The caller must have done ResourceOwnerEnlargeBuffers(CurrentResourceOwner)
-  */
- static Buffer
- ReadBufferInternal(Relation reln, BlockNumber blockNum,
-                    bool bufferLockHeld)
- {
      BufferDesc *bufHdr;
      bool        found;
      bool        isExtend;
      bool        isLocalBuf;

      isExtend = (blockNum == P_NEW);
      isLocalBuf = reln->rd_istemp;

--- 112,126 ----
  Buffer
  ReadBuffer(Relation reln, BlockNumber blockNum)
  {
      BufferDesc *bufHdr;
+     Block        bufBlock;
      bool        found;
      bool        isExtend;
      bool        isLocalBuf;

+     /* Make sure we will have room to remember the buffer pin */
+     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
      isExtend = (blockNum == P_NEW);
      isLocalBuf = reln->rd_istemp;

***************
*** 137,146 ****
      if (isExtend)
          blockNum = smgrnblocks(reln->rd_smgr);

      if (isLocalBuf)
      {
          ReadLocalBufferCount++;
-         pgstat_count_buffer_read(&reln->pgstat_info, reln);
          bufHdr = LocalBufferAlloc(reln, blockNum, &found);
          if (found)
              LocalBufferHitCount++;
--- 131,141 ----
      if (isExtend)
          blockNum = smgrnblocks(reln->rd_smgr);

+     pgstat_count_buffer_read(&reln->pgstat_info, reln);
+
      if (isLocalBuf)
      {
          ReadLocalBufferCount++;
          bufHdr = LocalBufferAlloc(reln, blockNum, &found);
          if (found)
              LocalBufferHitCount++;
***************
*** 148,167 ****
      else
      {
          ReadBufferCount++;
-         pgstat_count_buffer_read(&reln->pgstat_info, reln);

          /*
           * lookup the buffer.  IO_IN_PROGRESS is set if the requested
           * block is not currently in memory.
           */
-         if (!bufferLockHeld)
-             LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
          bufHdr = BufferAlloc(reln, blockNum, &found);
          if (found)
              BufferHitCount++;
      }

!     /* At this point we do NOT hold the bufmgr lock. */

      /* if it was already in the buffer pool, we're done */
      if (found)
--- 143,159 ----
      else
      {
          ReadBufferCount++;

          /*
           * lookup the buffer.  IO_IN_PROGRESS is set if the requested
           * block is not currently in memory.
           */
          bufHdr = BufferAlloc(reln, blockNum, &found);
          if (found)
              BufferHitCount++;
      }

!     /* At this point we do NOT hold any locks. */

      /* if it was already in the buffer pool, we're done */
      if (found)
***************
*** 187,206 ****
       * same buffer (if it's not been recycled) but come right back here to
       * try smgrextend again.
       */
!     Assert(!(bufHdr->flags & BM_VALID));

      if (isExtend)
      {
          /* new buffers are zero-filled */
!         MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
!         smgrextend(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data),
                     reln->rd_istemp);
      }
      else
      {
!         smgrread(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data));
          /* check for garbage data */
!         if (!PageHeaderIsValid((PageHeader) MAKE_PTR(bufHdr->data)))
          {
              /*
               * During WAL recovery, the first access to any data page
--- 179,200 ----
       * same buffer (if it's not been recycled) but come right back here to
       * try smgrextend again.
       */
!     Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */
!
!     bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);

      if (isExtend)
      {
          /* new buffers are zero-filled */
!         MemSet((char *) bufBlock, 0, BLCKSZ);
!         smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock,
                     reln->rd_istemp);
      }
      else
      {
!         smgrread(reln->rd_smgr, blockNum, (char *) bufBlock);
          /* check for garbage data */
!         if (!PageHeaderIsValid((PageHeader) bufBlock))
          {
              /*
               * During WAL recovery, the first access to any data page
***************
*** 215,221 ****
                          (errcode(ERRCODE_DATA_CORRUPTED),
                           errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
                                blockNum, RelationGetRelationName(reln))));
!                 MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
              }
              else
                  ereport(ERROR,
--- 209,215 ----
                          (errcode(ERRCODE_DATA_CORRUPTED),
                           errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
                                blockNum, RelationGetRelationName(reln))));
!                 MemSet((char *) bufBlock, 0, BLCKSZ);
              }
              else
                  ereport(ERROR,
***************
*** 232,247 ****
      }
      else
      {
!         /* lock buffer manager again to update IO IN PROGRESS */
!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
!
!         /* IO Succeeded, so mark data valid */
!         bufHdr->flags |= BM_VALID;
!
!         /* If anyone was waiting for IO to complete, wake them up now */
!         TerminateBufferIO(bufHdr, 0);
!
!         LWLockRelease(BufMgrLock);
      }

      if (VacuumCostActive)
--- 226,233 ----
      }
      else
      {
!         /* Set BM_VALID, terminate IO, and wake up any waiters */
!         TerminateBufferIO(bufHdr, false, BM_VALID);
      }

      if (VacuumCostActive)
***************
*** 263,270 ****
   * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
   * we keep it for simplicity in ReadBuffer.
   *
!  * BufMgrLock must be held at entry.  When this routine returns,
!  * the BufMgrLock is guaranteed NOT to be held.
   */
  static BufferDesc *
  BufferAlloc(Relation reln,
--- 249,255 ----
   * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
   * we keep it for simplicity in ReadBuffer.
   *
!  * No locks are held either at entry or exit.
   */
  static BufferDesc *
  BufferAlloc(Relation reln,
***************
*** 272,492 ****
              bool *foundPtr)
  {
      BufferTag    newTag;            /* identity of requested block */
!     BufferDesc *buf,
!                *buf2;
!     int            cdb_found_index,
!                 cdb_replace_index;
!     bool        inProgress;        /* did we already do StartBufferIO? */

      /* create a tag so we can lookup the buffer */
      INIT_BUFFERTAG(newTag, reln, blockNum);

      /* see if the block is in the buffer pool already */
!     buf = StrategyBufferLookup(&newTag, false, &cdb_found_index);
!     if (buf != NULL)
      {
          /*
           * Found it.  Now, pin the buffer so no one can steal it from the
!          * buffer pool, and check to see if someone else is still reading
!          * data into the buffer.  (Formerly, we'd always block here if
!          * IO_IN_PROGRESS is set, but there's no need to wait when someone
!          * is writing rather than reading.)
           */
!         *foundPtr = TRUE;

!         PinBuffer(buf, true);

!         if (!(buf->flags & BM_VALID))
          {
!             if (buf->flags & BM_IO_IN_PROGRESS)
!             {
!                 /* someone else is reading it, wait for them */
!                 WaitIO(buf);
!             }
!             if (!(buf->flags & BM_VALID))
              {
                  /*
                   * If we get here, previous attempts to read the buffer
                   * must have failed ... but we shall bravely try again.
                   */
                  *foundPtr = FALSE;
-                 StartBufferIO(buf, true);
              }
          }

-         LWLockRelease(BufMgrLock);
-
          return buf;
      }

-     *foundPtr = FALSE;
-
      /*
       * Didn't find it in the buffer pool.  We'll have to initialize a new
!      * buffer.    First, grab one from the free list.  If it's dirty, flush
!      * it to disk. Remember to unlock BufMgrLock while doing the IO.
       */
!     inProgress = FALSE;
!     do
      {
!         buf = StrategyGetBuffer(&cdb_replace_index);

!         /* StrategyGetBuffer will elog if it can't find a free buffer */
!         Assert(buf);

          /*
!          * There should be exactly one pin on the buffer after it is
!          * allocated -- ours.  If it had a pin it wouldn't have been on
!          * the free list.  No one else could have pinned it between
!          * StrategyGetBuffer and here because we have the BufMgrLock.
           *
!          * (We must pin the buffer before releasing BufMgrLock ourselves,
!          * to ensure StrategyGetBuffer won't give the same buffer to someone
!          * else.)
           */
!         Assert(buf->refcount == 0);
!         buf->refcount = 1;
!         PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;

!         ResourceOwnerRememberBuffer(CurrentResourceOwner,
!                                     BufferDescriptorGetBuffer(buf));

!         if ((buf->flags & BM_VALID) &&
!             (buf->flags & BM_DIRTY || buf->cntxDirty))
          {
              /*
!              * Set BM_IO_IN_PROGRESS to show the buffer is being written.
!              * It cannot already be set because the buffer would be pinned
!              * if someone were writing it.
!              *
!              * Note: it's okay to grab the io_in_progress lock while holding
!              * BufMgrLock.    All code paths that acquire this lock pin the
!              * buffer first; since no one had it pinned (it just came off
!              * the free list), no one else can have the lock.
               */
!             StartBufferIO(buf, false);

!             inProgress = TRUE;

!             /*
!              * Write the buffer out, being careful to release BufMgrLock
!              * while doing the I/O.  We also tell FlushBuffer to share-lock
!              * the buffer before releasing BufMgrLock.  This is safe because
!              * we know no other backend currently has the buffer pinned,
!              * therefore no one can have it locked either, so we can always
!              * get the lock without blocking.  It is necessary because if
!              * we release BufMgrLock first, it's possible for someone else
!              * to pin and exclusive-lock the buffer before we get to the
!              * share-lock, causing us to block.  If the someone else then
!              * blocks on a lock we hold, deadlock ensues.  This has been
!              * observed to happen when two backends are both trying to split
!              * btree index pages, and the second one just happens to be
!              * trying to split the page the first one got from the freelist.
!              */
!             FlushBuffer(buf, NULL, true);

              /*
!              * Somebody could have allocated another buffer for the same
!              * block we are about to read in. While we flush out the dirty
!              * buffer, we don't hold the lock and someone could have
!              * allocated another buffer for the same block. The problem is
!              * we haven't yet inserted the new tag into the buffer table.
!              * So we need to check here.        -ay 3/95
!              *
!              * Another reason we have to do this is to update
!              * cdb_found_index, since the CDB could have disappeared from
!              * B1/B2 list while we were writing.
               */
!             buf2 = StrategyBufferLookup(&newTag, true, &cdb_found_index);
!             if (buf2 != NULL)
              {
                  /*
!                  * Found it. Someone has already done what we were about
!                  * to do. We'll just handle this as if it were found in
!                  * the buffer pool in the first place.    First, give up the
!                  * buffer we were planning to use.
                   */
!                 TerminateBufferIO(buf, 0);
!                 UnpinBuffer(buf, true);

!                 buf = buf2;

!                 /* remaining code should match code at top of routine */

!                 *foundPtr = TRUE;

!                 PinBuffer(buf, true);

!                 if (!(buf->flags & BM_VALID))
!                 {
!                     if (buf->flags & BM_IO_IN_PROGRESS)
!                     {
!                         /* someone else is reading it, wait for them */
!                         WaitIO(buf);
!                     }
!                     if (!(buf->flags & BM_VALID))
!                     {
!                         /*
!                          * If we get here, previous attempts to read the
!                          * buffer must have failed ... but we shall
!                          * bravely try again.
!                          */
!                         *foundPtr = FALSE;
!                         StartBufferIO(buf, true);
!                     }
!                 }

!                 LWLockRelease(BufMgrLock);

!                 return buf;
!             }

!             /*
!              * Somebody could have pinned the buffer while we were doing
!              * the I/O and had given up the BufMgrLock.  If so, we can't
!              * recycle this buffer --- we need to clear the I/O flags,
!              * remove our pin and choose a new victim buffer.  Similarly,
!              * we have to start over if somebody re-dirtied the buffer.
!              */
!             if (buf->refcount > 1 || buf->flags & BM_DIRTY || buf->cntxDirty)
!             {
!                 TerminateBufferIO(buf, 0);
!                 UnpinBuffer(buf, true);
!                 inProgress = FALSE;
!                 buf = NULL;
!             }
!         }
!     } while (buf == NULL);

      /*
!      * At this point we should have the sole pin on a non-dirty buffer and
!      * we may or may not already have the BM_IO_IN_PROGRESS flag set.
       */

      /*
!      * Tell the buffer replacement strategy that we are replacing the
!      * buffer content. Then rename the buffer.    Clearing BM_VALID here is
!      * necessary, clearing the dirtybits is just paranoia.
       */
!     StrategyReplaceBuffer(buf, &newTag, cdb_found_index, cdb_replace_index);
!     buf->tag = newTag;
!     buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
!     buf->cntxDirty = false;

      /*
!      * Buffer contents are currently invalid.  Have to mark IO IN PROGRESS
!      * so no one fiddles with them until the read completes.  We may have
!      * already marked it, in which case we just flip from write to read
!      * status.
       */
!     if (!inProgress)
!         StartBufferIO(buf, true);
!     else
!         ContinueBufferIO(buf, true);

!     LWLockRelease(BufMgrLock);

!     return buf;
  }

  /*
--- 257,587 ----
              bool *foundPtr)
  {
      BufferTag    newTag;            /* identity of requested block */
!     BufferTag    oldTag;
!     BufFlags    oldFlags;
!     int            buf_id;
!     BufferDesc *buf;
!     bool        valid;

      /* create a tag so we can lookup the buffer */
      INIT_BUFFERTAG(newTag, reln, blockNum);

      /* see if the block is in the buffer pool already */
!     LWLockAcquire(BufMappingLock, LW_SHARED);
!     buf_id = BufTableLookup(&newTag);
!     if (buf_id >= 0)
      {
          /*
           * Found it.  Now, pin the buffer so no one can steal it from the
!          * buffer pool, and check to see if the correct data has been
!          * loaded into the buffer.
           */
!         buf = &BufferDescriptors[buf_id];
!
!         valid = PinBuffer(buf);
!
!         /* Can release the mapping lock as soon as we've pinned it */
!         LWLockRelease(BufMappingLock);

!         *foundPtr = TRUE;

!         if (!valid)
          {
!             /*
!              * We can only get here if (a) someone else is still reading
!              * in the page, or (b) a previous read attempt failed.  We
!              * have to wait for any active read attempt to finish, and
!              * then set up our own read attempt if the page is still not
!              * BM_VALID.  StartBufferIO does it all.
!              */
!             if (StartBufferIO(buf, true))
              {
                  /*
                   * If we get here, previous attempts to read the buffer
                   * must have failed ... but we shall bravely try again.
                   */
                  *foundPtr = FALSE;
              }
          }

          return buf;
      }

      /*
       * Didn't find it in the buffer pool.  We'll have to initialize a new
!      * buffer.  Remember to unlock BufMappingLock while doing the work.
       */
!     LWLockRelease(BufMappingLock);
!
!     for (;;)
      {
!         bool        isdirty;
!
!         /*
!          * Select a victim buffer.  The buffer is returned with its
!          * header spinlock still held!  Also the BufFreelistLock is
!          * still held, since it would be bad to hold the spinlock
!          * while possibly waking up other processes.
!          */
!         buf = StrategyGetBuffer();

!         Assert(buf->refcount == 0);

          /*
!          * If the buffer is dirty, we must share-lock the contents before
!          * releasing the spinlock.  Otherwise it is possible for someone else
!          * to pin and exclusive-lock the buffer before we get the content
!          * lock, forcing us to wait for them.  If the someone else then blocks
!          * on a lock we hold, deadlock ensues.
!          * (This has been observed to happen when two backends are both trying
!          * to split btree index pages, and the second one just happens to be
!          * trying to split the page the first one got from the freelist.)
           *
!          * It's ugly to do this while we hold the buffer header spinlock, but
!          * it's safe because we know no other backend currently has the buffer
!          * pinned, therefore no one can have it locked either, so we can
!          * always get the lock without blocking.
           */
!         if (buf->flags & BM_DIRTY)
!         {
!             LWLockAcquire(buf->content_lock, LW_SHARED);
!             isdirty = true;
!         }
!         else
!             isdirty = false;

!         /* Pin the buffer and then release the buffer spinlock */
!         PinBuffer_Locked(buf);

!         /* Now it's safe to release the freelist lock */
!         LWLockRelease(BufFreelistLock);
!
!         /*
!          * If it's dirty, try to write it out.  Note that someone may dirty
!          * it again while we're writing (since our share-lock doesn't block
!          * hint-bit updates), so this is not certain to succeed.  We will
!          * check again after re-locking the buffer header.
!          */
!         if (isdirty)
          {
+             FlushBuffer(buf, NULL);
              /*
!              * Whether FlushBuffer succeeded or not, we need to drop the
!              * buffer content share-lock now.
               */
!             LWLockRelease(buf->content_lock);
!         }

!         /*
!          * Acquire exclusive mapping lock in preparation for changing
!          * the buffer's association.
!          */
!         LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);

!         /*
!          * Try to make a hashtable entry for the buffer under its new tag.
!          * This could fail because while we were writing someone else
!          * allocated another buffer for the same block we want to read in.
!          * Note that we have not yet removed the hashtable entry for the
!          * old tag.
!          */
!         buf_id = BufTableInsert(&newTag, buf->buf_id);

+         if (buf_id >= 0)
+         {
              /*
!              * Got a collision. Someone has already done what we were about
!              * to do. We'll just handle this as if it were found in
!              * the buffer pool in the first place.    First, give up the
!              * buffer we were planning to use.  Don't allow it to be
!              * thrown in the free list (we don't want to hold both
!              * global locks at once).
               */
!             UnpinBuffer(buf, true, false);
!
!             /* remaining code should match code at top of routine */
!
!             buf = &BufferDescriptors[buf_id];
!
!             valid = PinBuffer(buf);
!
!             /* Can release the mapping lock as soon as we've pinned it */
!             LWLockRelease(BufMappingLock);
!
!             *foundPtr = TRUE;
!
!             if (!valid)
              {
                  /*
!                  * We can only get here if (a) someone else is still reading
!                  * in the page, or (b) a previous read attempt failed.  We
!                  * have to wait for any active read attempt to finish, and
!                  * then set up our own read attempt if the page is still not
!                  * BM_VALID.  StartBufferIO does it all.
                   */
!                 if (StartBufferIO(buf, true))
!                 {
!                     /*
!                      * If we get here, previous attempts to read the buffer
!                      * must have failed ... but we shall bravely try again.
!                      */
!                     *foundPtr = FALSE;
!                 }
!             }

!             return buf;
!         }

!         /*
!          * Need to lock the buffer header too in order to change its tag.
!          */
!         LockBufHdr_NoHoldoff(buf);

!         /*
!          * Somebody could have pinned or re-dirtied the buffer while we were
!          * doing the I/O and making the new hashtable entry.  If so, we
!          * can't recycle this buffer; we must undo everything we've done and
!          * start over with a new victim buffer.
!          */
!         if (buf->refcount == 1 && !(buf->flags & BM_DIRTY))
!             break;

!         UnlockBufHdr_NoHoldoff(buf);
!         BufTableDelete(&newTag);
!         LWLockRelease(BufMappingLock);
!         UnpinBuffer(buf, true, false /* evidently recently used */ );
!     }

!     /*
!      * Okay, it's finally safe to rename the buffer.
!      *
!      * Clearing BM_VALID here is necessary, clearing the dirtybits
!      * is just paranoia.  We also clear BM_RECENTLY_USED since any
!      * recency of use of the old content is no longer relevant.
!      */
!     oldTag = buf->tag;
!     oldFlags = buf->flags;
!     buf->tag = newTag;
!     buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
!                     BM_IO_ERROR | BM_RECENTLY_USED);
!     buf->flags |= BM_TAG_VALID;

!     UnlockBufHdr_NoHoldoff(buf);

!     if (oldFlags & BM_TAG_VALID)
!         BufTableDelete(&oldTag);

!     LWLockRelease(BufMappingLock);
!
!     /*
!      * Buffer contents are currently invalid.  Try to get the io_in_progress
!      * lock.  If StartBufferIO returns false, then someone else managed
!      * to read it before we did, so there's nothing left for BufferAlloc()
!      * to do.
!      */
!     if (StartBufferIO(buf, true))
!         *foundPtr = FALSE;
!     else
!         *foundPtr = TRUE;
!
!     return buf;
! }
!
! /*
!  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
!  * freelist.
!  *
!  * The buffer header spinlock must be held at entry.  We drop it before
!  * returning.  (This is sane because the caller must have locked the
!  * buffer in order to be sure it should be dropped.)
!  *
!  * This is used only in contexts such as dropping a relation.  We assume
!  * that no other backend could possibly be interested in using the page,
!  * so the only reason the buffer might be pinned is if someone else is
!  * trying to write it out.  We have to let them finish before we can
!  * reclaim the buffer.
!  *
!  * The buffer could get reclaimed by someone else while we are waiting
!  * to acquire the necessary locks; if so, don't mess it up.
!  */
! static void
! InvalidateBuffer(BufferDesc *buf)
! {
!     BufferTag    oldTag;
!     BufFlags    oldFlags;
!
!     /* Save the original buffer tag before dropping the spinlock */
!     oldTag = buf->tag;

+     UnlockBufHdr(buf);
+
+ retry:
      /*
!      * Acquire exclusive mapping lock in preparation for changing
!      * the buffer's association.
       */
+     LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);
+
+     /* Re-lock the buffer header (NoHoldoff since we have an LWLock) */
+     LockBufHdr_NoHoldoff(buf);
+
+     /* If it's changed while we were waiting for lock, do nothing */
+     if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
+     {
+         UnlockBufHdr_NoHoldoff(buf);
+         LWLockRelease(BufMappingLock);
+         return;
+     }

      /*
!      * We assume the only reason for it to be pinned is that someone else
!      * is flushing the page out.  Wait for them to finish.  (This could be
!      * an infinite loop if the refcount is messed up... it would be nice
!      * to time out after awhile, but there seems no way to be sure how
!      * many loops may be needed.  Note that if the other guy has pinned
!      * the buffer but not yet done StartBufferIO, WaitIO will fall through
!      * and we'll effectively be busy-looping here.)
       */
!     if (buf->refcount != 0)
!     {
!         UnlockBufHdr_NoHoldoff(buf);
!         LWLockRelease(BufMappingLock);
!         WaitIO(buf);
!         goto retry;
!     }

      /*
!      * Clear out the buffer's tag and flags.  We must do this to ensure
!      * that linear scans of the buffer array don't think the buffer is valid.
       */
!     oldFlags = buf->flags;
!     CLEAR_BUFFERTAG(buf->tag);
!     buf->flags = 0;

!     UnlockBufHdr_NoHoldoff(buf);

!     /*
!      * Remove the buffer from the lookup hashtable, if it was in there.
!      */
!     if (oldFlags & BM_TAG_VALID)
!         BufTableDelete(&oldTag);
!
!     /*
!      * Avoid accepting a cancel interrupt when we release the mapping lock;
!      * that would leave the buffer free but not on the freelist.  (Which would
!      * not be fatal, since it'd get picked up again by the clock scanning
!      * code, but we'd rather be sure it gets to the freelist.)
!      */
!     HOLD_INTERRUPTS();
!
!     LWLockRelease(BufMappingLock);
!
!     /*
!      * Insert the buffer at the head of the list of free buffers.
!      */
!     StrategyFreeBuffer(buf, true);
!
!     RESUME_INTERRUPTS();
  }

  /*
***************
*** 494,500 ****
   *                   WriteBuffer and WriteNoReleaseBuffer
   */
  static void
! write_buffer(Buffer buffer, bool release)
  {
      BufferDesc *bufHdr;

--- 589,595 ----
   *                   WriteBuffer and WriteNoReleaseBuffer
   */
  static void
! write_buffer(Buffer buffer, bool unpin)
  {
      BufferDesc *bufHdr;

***************
*** 503,509 ****

      if (BufferIsLocal(buffer))
      {
!         WriteLocalBuffer(buffer, release);
          return;
      }

--- 598,604 ----

      if (BufferIsLocal(buffer))
      {
!         WriteLocalBuffer(buffer, unpin);
          return;
      }

***************
*** 511,517 ****

      Assert(PrivateRefCount[buffer - 1] > 0);

!     LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
      Assert(bufHdr->refcount > 0);

      /*
--- 606,613 ----

      Assert(PrivateRefCount[buffer - 1] > 0);

!     LockBufHdr(bufHdr);
!
      Assert(bufHdr->refcount > 0);

      /*
***************
*** 522,530 ****

      bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);

!     if (release)
!         UnpinBuffer(bufHdr, true);
!     LWLockRelease(BufMgrLock);
  }

  /*
--- 618,627 ----

      bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);

!     UnlockBufHdr(bufHdr);
!
!     if (unpin)
!         UnpinBuffer(bufHdr, true, true);
  }

  /*
***************
*** 555,575 ****

  /*
   * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
-  *        to save a lock release/acquire.
   *
!  * Also, if the passed buffer is valid and already contains the desired block
!  * number, we simply return it without ever acquiring the lock at all.
!  * Since the passed buffer must be pinned, it's OK to examine its block
!  * number without getting the lock first.
   *
   * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
   * buffer actually needs to be released.  This case is the same as ReadBuffer,
   * but can save some tests in the caller.
-  *
-  * Also note: while it will work to call this routine with blockNum == P_NEW,
-  * it's best to avoid doing so, since that would result in calling
-  * smgrnblocks() while holding the bufmgr lock, hence some loss of
-  * concurrency.
   */
  Buffer
  ReleaseAndReadBuffer(Buffer buffer,
--- 652,667 ----

  /*
   * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
   *
!  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
!  * compared to calling the two routines separately.  Now it's mainly just
!  * a convenience function.  However, if the passed buffer is valid and
!  * already contains the desired block, we just return it as-is; and that
!  * does save considerable work compared to a full release and reacquire.
   *
   * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
   * buffer actually needs to be released.  This case is the same as ReadBuffer,
   * but can save some tests in the caller.
   */
  Buffer
  ReleaseAndReadBuffer(Buffer buffer,
***************
*** 588,822 ****
                  RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
                  return buffer;
              ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
-             /* owner now has a free slot, so no need for Enlarge() */
              LocalRefCount[-buffer - 1]--;
          }
          else
          {
              Assert(PrivateRefCount[buffer - 1] > 0);
              bufHdr = &BufferDescriptors[buffer - 1];
              if (bufHdr->tag.blockNum == blockNum &&
                  RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
                  return buffer;
!             ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
!             /* owner now has a free slot, so no need for Enlarge() */
!             if (PrivateRefCount[buffer - 1] > 1)
!                 PrivateRefCount[buffer - 1]--;
!             else
!             {
!                 LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
!                 UnpinBuffer(bufHdr, false);
!                 return ReadBufferInternal(relation, blockNum, true);
!             }
          }
      }
-     else
-         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

!     return ReadBufferInternal(relation, blockNum, false);
  }

  /*
   * PinBuffer -- make buffer unavailable for replacement.
   *
   * This should be applied only to shared buffers, never local ones.
-  * Bufmgr lock must be held by caller.
   *
-  * Most but not all callers want CurrentResourceOwner to be adjusted.
   * Note that ResourceOwnerEnlargeBuffers must have been done already.
   */
  static void
! PinBuffer(BufferDesc *buf, bool fixOwner)
  {
!     int            b = BufferDescriptorGetBuffer(buf) - 1;

      if (PrivateRefCount[b] == 0)
          buf->refcount++;
      PrivateRefCount[b]++;
      Assert(PrivateRefCount[b] > 0);
!     if (fixOwner)
!         ResourceOwnerRememberBuffer(CurrentResourceOwner,
!                                     BufferDescriptorGetBuffer(buf));
  }

  /*
   * UnpinBuffer -- make buffer available for replacement.
   *
   * This should be applied only to shared buffers, never local ones.
-  * Bufmgr lock must be held by caller.
   *
   * Most but not all callers want CurrentResourceOwner to be adjusted.
   */
  static void
! UnpinBuffer(BufferDesc *buf, bool fixOwner)
  {
!     int            b = BufferDescriptorGetBuffer(buf) - 1;

      if (fixOwner)
          ResourceOwnerForgetBuffer(CurrentResourceOwner,
                                    BufferDescriptorGetBuffer(buf));

-     Assert(buf->refcount > 0);
      Assert(PrivateRefCount[b] > 0);
      PrivateRefCount[b]--;
      if (PrivateRefCount[b] == 0)
      {
!         buf->refcount--;
          /* I'd better not still hold any locks on the buffer */
!         Assert(!LWLockHeldByMe(buf->cntx_lock));
          Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
-     }

!     if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
!         buf->refcount == 1)
!     {
!         /* we just released the last pin other than the waiter's */
!         buf->flags &= ~BM_PIN_COUNT_WAITER;
!         ProcSendSignal(buf->wait_backend_id);
      }
!     else
      {
!         /* do nothing */
      }
  }

  /*
!  * BufferSync -- Write out dirty buffers in the pool.
   *
!  * This is called at checkpoint time to write out all dirty shared buffers,
!  * and by the background writer process to write out some of the dirty blocks.
!  * percent/maxpages should be -1 in the former case, and limit values (>= 0)
!  * in the latter.
   *
   * Returns the number of buffers written.
   */
  int
! BufferSync(int percent, int maxpages)
  {
!     BufferDesc **dirty_buffers;
!     BufferTag  *buftags;
!     int            num_buffer_dirty;
!     int            i;

      /* If either limit is zero then we are disabled from doing anything... */
!     if (percent == 0 || maxpages == 0)
          return 0;

!     /*
!      * Get a list of all currently dirty buffers and how many there are.
!      * We do not flush buffers that get dirtied after we started. They
!      * have to wait until the next checkpoint.
!      */
!     dirty_buffers = (BufferDesc **) palloc(NBuffers * sizeof(BufferDesc *));
!     buftags = (BufferTag *) palloc(NBuffers * sizeof(BufferTag));
!
!     LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
!     num_buffer_dirty = StrategyDirtyBufferList(dirty_buffers, buftags,
!                                                NBuffers);

      /*
!      * If called by the background writer, we are usually asked to only
!      * write out some portion of dirty buffers now, to prevent the IO
!      * storm at checkpoint time.
       */
!     if (percent > 0)
!     {
!         Assert(percent <= 100);
!         num_buffer_dirty = (num_buffer_dirty * percent + 99) / 100;
!     }
!     if (maxpages > 0 && num_buffer_dirty > maxpages)
!         num_buffer_dirty = maxpages;

!     /* Make sure we can handle the pin inside the loop */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

      /*
!      * Loop over buffers to be written.  Note the BufMgrLock is held at
!      * loop top, but is released and reacquired within FlushBuffer, so we
!      * aren't holding it long.
       */
!     for (i = 0; i < num_buffer_dirty; i++)
      {
!         BufferDesc *bufHdr = dirty_buffers[i];
!
!         /*
!          * Check it is still the same page and still needs writing.
!          *
!          * We can check bufHdr->cntxDirty here *without* holding any lock on
!          * buffer context as long as we set this flag in access methods
!          * *before* logging changes with XLogInsert(): if someone will set
!          * cntxDirty just after our check we don't worry because of our
!          * checkpoint.redo points before log record for upcoming changes
!          * and so we are not required to write such dirty buffer.
!          */
!         if (!(bufHdr->flags & BM_VALID))
!             continue;
!         if (!BUFFERTAGS_EQUAL(bufHdr->tag, buftags[i]))
!             continue;
!         if (!(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
!             continue;
!
!         /*
!          * IO synchronization. Note that we do it with unpinned buffer to
!          * avoid conflicts with FlushRelationBuffers.
!          */
!         if (bufHdr->flags & BM_IO_IN_PROGRESS)
          {
!             WaitIO(bufHdr);
!             /* Still need writing? */
!             if (!(bufHdr->flags & BM_VALID))
!                 continue;
!             if (!BUFFERTAGS_EQUAL(bufHdr->tag, buftags[i]))
!                 continue;
!             if (!(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
!                 continue;
          }

!         /*
!          * Here: no one doing IO for this buffer and it's dirty. Pin
!          * buffer now and set IO state for it *before* acquiring shlock to
!          * avoid conflicts with FlushRelationBuffers.
!          */
!         PinBuffer(bufHdr, true);
!         StartBufferIO(bufHdr, false);
!
!         FlushBuffer(bufHdr, NULL, false);

!         TerminateBufferIO(bufHdr, 0);
!         UnpinBuffer(bufHdr, true);
!     }

!     LWLockRelease(BufMgrLock);

!     pfree(dirty_buffers);
!     pfree(buftags);

!     return num_buffer_dirty;
  }

  /*
!  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
   *
!  * Should be entered with buffer manager lock held; releases it before
!  * waiting and re-acquires it afterwards.
   */
! static void
! WaitIO(BufferDesc *buf)
  {
      /*
!      * Changed to wait until there's no IO - Inoue 01/13/2000
       *
!      * Note this is *necessary* because an error abort in the process doing
!      * I/O could release the io_in_progress_lock prematurely. See
!      * AbortBufferIO.
       */
!     while ((buf->flags & BM_IO_IN_PROGRESS) != 0)
      {
!         LWLockRelease(BufMgrLock);
!         LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
!         LWLockRelease(buf->io_in_progress_lock);
!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
      }
  }


--- 680,994 ----
                  RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
                  return buffer;
              ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
              LocalRefCount[-buffer - 1]--;
+             bufHdr->flags |= BM_RECENTLY_USED;
          }
          else
          {
              Assert(PrivateRefCount[buffer - 1] > 0);
              bufHdr = &BufferDescriptors[buffer - 1];
+             /* we have pin, so it's ok to examine tag without spinlock */
              if (bufHdr->tag.blockNum == blockNum &&
                  RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
                  return buffer;
!             UnpinBuffer(bufHdr, true, true);
          }
      }

!     return ReadBuffer(relation, blockNum);
  }

  /*
   * PinBuffer -- make buffer unavailable for replacement.
   *
   * This should be applied only to shared buffers, never local ones.
   *
   * Note that ResourceOwnerEnlargeBuffers must have been done already.
+  *
+  * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
+  * some callers to avoid an extra spinlock cycle.
+  */
+ static bool
+ PinBuffer(BufferDesc *buf)
+ {
+     int            b = buf->buf_id;
+     bool        result;
+
+     if (PrivateRefCount[b] == 0)
+     {
+         /*
+          * Use NoHoldoff here because we don't want the unlock to be a
+          * potential place to honor a QueryCancel request.
+          * (The caller should be holding off interrupts anyway.)
+          */
+         LockBufHdr_NoHoldoff(buf);
+         buf->refcount++;
+         result = (buf->flags & BM_VALID) != 0;
+         UnlockBufHdr_NoHoldoff(buf);
+     }
+     else
+     {
+         /* If we previously pinned the buffer, it must surely be valid */
+         result = true;
+     }
+     PrivateRefCount[b]++;
+     Assert(PrivateRefCount[b] > 0);
+     ResourceOwnerRememberBuffer(CurrentResourceOwner,
+                                 BufferDescriptorGetBuffer(buf));
+     return result;
+ }
+
+ /*
+  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
+  * The spinlock is released before return.
+  *
+  * Note: use of this routine is frequently mandatory, not just an optimization
+  * to save a spin lock/unlock cycle, because we need to pin a buffer before
+  * its state can change under us.
   */
  static void
! PinBuffer_Locked(BufferDesc *buf)
  {
!     int            b = buf->buf_id;

      if (PrivateRefCount[b] == 0)
          buf->refcount++;
+     /* NoHoldoff since we mustn't accept cancel interrupt here */
+     UnlockBufHdr_NoHoldoff(buf);
      PrivateRefCount[b]++;
      Assert(PrivateRefCount[b] > 0);
!     ResourceOwnerRememberBuffer(CurrentResourceOwner,
!                                 BufferDescriptorGetBuffer(buf));
!     /* Now we can accept cancel */
!     RESUME_INTERRUPTS();
  }

  /*
   * UnpinBuffer -- make buffer available for replacement.
   *
   * This should be applied only to shared buffers, never local ones.
   *
   * Most but not all callers want CurrentResourceOwner to be adjusted.
+  *
+  * If we are releasing a buffer during VACUUM, and it's not been otherwise
+  * used recently, and trashOK is true, send the buffer to the freelist.
   */
  static void
! UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK)
  {
!     int            b = buf->buf_id;

      if (fixOwner)
          ResourceOwnerForgetBuffer(CurrentResourceOwner,
                                    BufferDescriptorGetBuffer(buf));

      Assert(PrivateRefCount[b] > 0);
      PrivateRefCount[b]--;
      if (PrivateRefCount[b] == 0)
      {
!         bool    trash_buffer = false;
!
          /* I'd better not still hold any locks on the buffer */
!         Assert(!LWLockHeldByMe(buf->content_lock));
          Assert(!LWLockHeldByMe(buf->io_in_progress_lock));

!         /* NoHoldoff ensures we don't lose control before sending signal */
!         LockBufHdr_NoHoldoff(buf);
!
!         /* Decrement the shared reference count */
!         Assert(buf->refcount > 0);
!         buf->refcount--;
!
!         /* Mark the buffer recently used, unless we are in VACUUM */
!         if (!strategy_hint_vacuum)
!             buf->flags |= BM_RECENTLY_USED;
!         else if (trashOK &&
!                  buf->refcount == 0 &&
!                  !(buf->flags & BM_RECENTLY_USED))
!             trash_buffer = true;
!
!         if ((buf->flags & BM_PIN_COUNT_WAITER) &&
!             buf->refcount == 1)
!         {
!             /* we just released the last pin other than the waiter's */
!             BackendId    wait_backend_id = buf->wait_backend_id;
!
!             buf->flags &= ~BM_PIN_COUNT_WAITER;
!             UnlockBufHdr_NoHoldoff(buf);
!             ProcSendSignal(wait_backend_id);
!         }
!         else
!             UnlockBufHdr_NoHoldoff(buf);
!
!         /*
!          * If VACUUM is releasing an otherwise-unused buffer, send it to
!          * the freelist for near-term reuse.  We put it at the tail so that
!          * it won't be used before any invalid buffers that may exist.
!          */
!         if (trash_buffer)
!             StrategyFreeBuffer(buf, false);
      }
! }
!
! /*
!  * BufferSync -- Write out all dirty buffers in the pool.
!  *
!  * This is called at checkpoint time to write out all dirty shared buffers.
!  */
! void
! BufferSync(void)
! {
!     int            buf_id;
!     int            num_to_scan;
!
!     /*
!      * Find out where to start the circular scan.
!      */
!     buf_id = StrategySyncStart();
!
!     /* Make sure we can handle the pin inside SyncOneBuffer */
!     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
!
!     /*
!      * Loop over all buffers.
!      */
!     num_to_scan = NBuffers;
!     while (num_to_scan-- > 0)
      {
!         (void) SyncOneBuffer(buf_id, false);
!         if (++buf_id >= NBuffers)
!             buf_id = 0;
      }
  }

  /*
!  * BgBufferSync -- Write out some dirty buffers in the pool.
   *
!  * This is called periodically by the background writer process.
   *
   * Returns the number of buffers written.
   */
  int
! BgBufferSync(void)
  {
!     static int    buf_id1 = 0;
!     int            percent = BgWriterPercent;
!     int            maxpages = BgWriterMaxPages;
!     int            buf_id2;
!     int            num_to_scan;
!     int            num_written;

      /* If either limit is zero then we are disabled from doing anything... */
!     if (percent <= 0 || maxpages <= 0)
          return 0;

!     Assert(percent <= 100);
!     num_to_scan = (NBuffers * percent + 99) / 100;

      /*
!      * To minimize work at checkpoint time, we want to try to keep all the
!      * buffers clean; this motivates a scan that proceeds sequentially through
!      * all buffers.  But we are also charged with ensuring that buffers that
!      * will be recycled soon are clean when needed; these buffers are the
!      * ones just ahead of the StrategySyncStart point.  We make a separate
!      * scan through those.
!      *
!      * Currently we divide our attention equally between these tasks, but
!      * probably it would be better to have two sets of control parameters.
       */
!     buf_id2 = StrategySyncStart();

!     /* Make sure we can handle the pin inside SyncOneBuffer */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

      /*
!      * Loop over buffers to be written.
       */
!     num_written = 0;
!     while (num_to_scan-- > 0)
      {
!         /* Loop over all buffers, including pinned ones */
!         if (SyncOneBuffer(buf_id1, false))
          {
!             num_written++;
!             if (num_written >= maxpages)
!                 break;
          }

!         if (++buf_id1 >= NBuffers)
!             buf_id1 = 0;

!         if (num_to_scan-- == 0)
!             break;

!         /* Loop over soon-to-be-reused buffers, skipping pinned ones */
!         if (SyncOneBuffer(buf_id2, true))
!         {
!             num_written++;
!             if (num_written >= maxpages)
!                 break;
!         }

!         if (++buf_id2 >= NBuffers)
!             buf_id2 = 0;
!     }

!     return num_written;
  }

  /*
!  * SyncOneBuffer -- process a single buffer during syncing.
!  *
!  * If skip_pinned is true, we don't write currently-pinned buffers, nor
!  * buffers marked RECENTLY_USED, as these are not replacement candidates.
!  *
!  * Returns true if buffer was written, else false.  (This could be in error
!  * if FlushBuffers finds the buffer clean after locking it, but we don't
!  * care all that much.)
   *
!  * Note: caller must have done ResourceOwnerEnlargeBuffers.
   */
! static bool
! SyncOneBuffer(int buf_id, bool skip_pinned)
  {
+     BufferDesc *bufHdr = &BufferDescriptors[buf_id];
+
      /*
!      * Check whether buffer needs writing.
       *
!      * We can make this check without taking the buffer content lock
!      * so long as we mark pages dirty in access methods *before* logging
!      * changes with XLogInsert(): if someone marks the buffer dirty
!      * just after our check we don't worry because our checkpoint.redo
!      * points before log record for upcoming changes and so we are not
!      * required to write such dirty buffer.
       */
!     LockBufHdr(bufHdr);
!     if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
      {
!         UnlockBufHdr(bufHdr);
!         return false;
!     }
!     if (skip_pinned &&
!         (bufHdr->refcount != 0 || bufHdr->flags & BM_RECENTLY_USED))
!     {
!         UnlockBufHdr(bufHdr);
!         return false;
      }
+
+     /*
+      * Pin it, share-lock it, write it.  (FlushBuffer will do nothing
+      * if the buffer is clean by the time we've locked it.)
+      */
+     PinBuffer_Locked(bufHdr);
+     LWLockAcquire(bufHdr->content_lock, LW_SHARED);
+
+     FlushBuffer(bufHdr, NULL);
+
+     LWLockRelease(bufHdr->content_lock);
+     UnpinBuffer(bufHdr, true, false /* don't change freelist */ );
+
+     return true;
  }


***************
*** 888,893 ****
--- 1060,1068 ----

      AtEOXact_LocalBuffers(isCommit);
  #endif
+
+     /* Make sure we reset the strategy hint in case VACUUM errored out */
+     StrategyHintVacuum(false);
  }

  /*
***************
*** 912,920 ****
               * here, it suggests that ResourceOwners are messed up.
               */
              PrivateRefCount[i] = 1;        /* make sure we release shared pin */
!             LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
!             UnpinBuffer(buf, false);
!             LWLockRelease(BufMgrLock);
              Assert(PrivateRefCount[i] == 0);
          }
      }
--- 1087,1093 ----
               * here, it suggests that ResourceOwners are messed up.
               */
              PrivateRefCount[i] = 1;        /* make sure we release shared pin */
!             UnpinBuffer(buf, false, false /* don't change freelist */ );
              Assert(PrivateRefCount[i] == 0);
          }
      }
***************
*** 941,946 ****
--- 1114,1120 ----
          loccount = PrivateRefCount[buffer - 1];
      }

+     /* theoretically we should lock the bufhdr here */
      elog(WARNING,
           "buffer refcount leak: [%03d] "
           "(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
***************
*** 961,967 ****
  void
  FlushBufferPool(void)
  {
!     BufferSync(-1, -1);
      smgrsync();
  }

--- 1135,1141 ----
  void
  FlushBufferPool(void)
  {
!     BufferSync();
      smgrsync();
  }

***************
*** 988,999 ****
  BlockNumber
  BufferGetBlockNumber(Buffer buffer)
  {
      Assert(BufferIsPinned(buffer));

      if (BufferIsLocal(buffer))
!         return LocalBufferDescriptors[-buffer - 1].tag.blockNum;
      else
!         return BufferDescriptors[buffer - 1].tag.blockNum;
  }

  /*
--- 1162,1178 ----
  BlockNumber
  BufferGetBlockNumber(Buffer buffer)
  {
+     BufferDesc *bufHdr;
+
      Assert(BufferIsPinned(buffer));

      if (BufferIsLocal(buffer))
!         bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
      else
!         bufHdr = &BufferDescriptors[buffer - 1];
!
!     /* pinned, so OK to read tag without spinlock */
!     return bufHdr->tag.blockNum;
  }

  /*
***************
*** 1013,1019 ****
      else
          bufHdr = &BufferDescriptors[buffer - 1];

!     return (bufHdr->tag.rnode);
  }

  /*
--- 1192,1198 ----
      else
          bufHdr = &BufferDescriptors[buffer - 1];

!     return bufHdr->tag.rnode;
  }

  /*
***************
*** 1026,1066 ****
   * However, we will need to force the changes to disk via fsync before
   * we can checkpoint WAL.
   *
!  * BufMgrLock must be held at entry, and the buffer must be pinned.  The
!  * caller is also responsible for doing StartBufferIO/TerminateBufferIO.
   *
   * If the caller has an smgr reference for the buffer's relation, pass it
!  * as the second parameter.  If not, pass NULL.  (Do not open relation
!  * while holding BufMgrLock!)
!  *
!  * When earlylock is TRUE, we grab the per-buffer sharelock before releasing
!  * BufMgrLock, rather than after.  Normally this would be a bad idea since
!  * we might deadlock, but it is safe and necessary when called from
!  * BufferAlloc() --- see comments therein.
   */
  static void
! FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock)
  {
-     Buffer        buffer = BufferDescriptorGetBuffer(buf);
      XLogRecPtr    recptr;
      ErrorContextCallback errcontext;

-     /* Transpose cntxDirty into flags while holding BufMgrLock */
-     buf->cntxDirty = false;
-     buf->flags |= BM_DIRTY;
-
-     /* To check if block content changed while flushing. - vadim 01/17/97 */
-     buf->flags &= ~BM_JUST_DIRTIED;
-
      /*
!      * If earlylock, grab buffer sharelock before anyone else could re-lock
!      * the buffer.
       */
!     if (earlylock)
!         LockBuffer(buffer, BUFFER_LOCK_SHARE);
!
!     /* Release BufMgrLock while doing xlog work */
!     LWLockRelease(BufMgrLock);

      /* Setup error traceback support for ereport() */
      errcontext.callback = buffer_write_error_callback;
--- 1205,1232 ----
   * However, we will need to force the changes to disk via fsync before
   * we can checkpoint WAL.
   *
!  * The caller must hold a pin on the buffer and have share-locked the
!  * buffer contents.  (Note: a share-lock does not prevent updates of
!  * hint bits in the buffer, so the page could change while the write
!  * is in progress, but we assume that that will not invalidate the data
!  * written.)
   *
   * If the caller has an smgr reference for the buffer's relation, pass it
!  * as the second parameter.  If not, pass NULL.
   */
  static void
! FlushBuffer(BufferDesc *buf, SMgrRelation reln)
  {
      XLogRecPtr    recptr;
      ErrorContextCallback errcontext;

      /*
!      * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
!      * false, then someone else flushed the buffer before we could, so
!      * we need not do anything.
       */
!     if (!StartBufferIO(buf, false))
!         return;

      /* Setup error traceback support for ereport() */
      errcontext.callback = buffer_write_error_callback;
***************
*** 1068,1087 ****
      errcontext.previous = error_context_stack;
      error_context_stack = &errcontext;

!     /* Find smgr relation for buffer while holding minimal locks */
      if (reln == NULL)
          reln = smgropen(buf->tag.rnode);

      /*
!      * Protect buffer content against concurrent update.  (Note that
!      * hint-bit updates can still occur while the write is in progress,
!      * but we assume that that will not invalidate the data written.)
!      */
!     if (!earlylock)
!         LockBuffer(buffer, BUFFER_LOCK_SHARE);
!
!     /*
!      * Force XLOG flush for buffer' LSN.  This implements the basic WAL
       * rule that log updates must hit disk before any of the data-file
       * changes they describe do.
       */
--- 1234,1245 ----
      errcontext.previous = error_context_stack;
      error_context_stack = &errcontext;

!     /* Find smgr relation for buffer */
      if (reln == NULL)
          reln = smgropen(buf->tag.rnode);

      /*
!      * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
       * rule that log updates must hit disk before any of the data-file
       * changes they describe do.
       */
***************
*** 1090,1124 ****

      /*
       * Now it's safe to write buffer to disk. Note that no one else should
!      * have been able to write it while we were busy with locking and log
!      * flushing because caller has set the IO flag.
!      *
!      * It would be better to clear BM_JUST_DIRTIED right here, but we'd have
!      * to reacquire the BufMgrLock and it doesn't seem worth it.
       */
      smgrwrite(reln,
                buf->tag.blockNum,
!               (char *) MAKE_PTR(buf->data),
                false);

-     /* Pop the error context stack */
-     error_context_stack = errcontext.previous;
-
-     /*
-      * Release the per-buffer readlock, reacquire BufMgrLock.
-      */
-     LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-
-     LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
-
      BufferFlushCount++;

      /*
!      * If this buffer was marked by someone as DIRTY while we were
!      * flushing it out we must not clear DIRTY flag - vadim 01/17/97
       */
!     if (!(buf->flags & BM_JUST_DIRTIED))
!         buf->flags &= ~BM_DIRTY;
  }

  /*
--- 1248,1277 ----

      /*
       * Now it's safe to write buffer to disk. Note that no one else should
!      * have been able to write it while we were busy with log flushing
!      * because we have the io_in_progress lock.
       */
+
+     /* To check if block content changes while flushing. - vadim 01/17/97 */
+     LockBufHdr_NoHoldoff(buf);
+     buf->flags &= ~BM_JUST_DIRTIED;
+     UnlockBufHdr_NoHoldoff(buf);
+
      smgrwrite(reln,
                buf->tag.blockNum,
!               (char *) BufHdrGetBlock(buf),
                false);

      BufferFlushCount++;

      /*
!      * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set)
!      * and end the io_in_progress state.
       */
!     TerminateBufferIO(buf, true, 0);
!
!     /* Pop the error context stack */
!     error_context_stack = errcontext.previous;
  }

  /*
***************
*** 1210,1271 ****
                           bufHdr->tag.rnode.dbNode,
                           bufHdr->tag.rnode.relNode,
                           LocalRefCount[i]);
!                 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
!                 bufHdr->cntxDirty = false;
!                 bufHdr->tag.rnode.relNode = InvalidOid;
              }
          }
          return;
      }

!     LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
!
!     for (i = 1; i <= NBuffers; i++)
      {
!         bufHdr = &BufferDescriptors[i - 1];
! recheck:
          if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
              bufHdr->tag.blockNum >= firstDelBlock)
!         {
!             /*
!              * If there is I/O in progress, better wait till it's done;
!              * don't want to delete the relation out from under someone
!              * who's just trying to flush the buffer!
!              */
!             if (bufHdr->flags & BM_IO_IN_PROGRESS)
!             {
!                 WaitIO(bufHdr);
!
!                 /*
!                  * By now, the buffer very possibly belongs to some other
!                  * rel, so check again before proceeding.
!                  */
!                 goto recheck;
!             }
!
!             /*
!              * There should be no pin on the buffer.
!              */
!             if (bufHdr->refcount != 0)
!                 elog(ERROR, "block %u of %u/%u/%u is still referenced (private %d, global %u)",
!                      bufHdr->tag.blockNum,
!                      bufHdr->tag.rnode.spcNode,
!                      bufHdr->tag.rnode.dbNode,
!                      bufHdr->tag.rnode.relNode,
!                      PrivateRefCount[i - 1], bufHdr->refcount);
!
!             /* Now we can do what we came for */
!             bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
!             bufHdr->cntxDirty = false;
!
!             /*
!              * And mark the buffer as no longer occupied by this rel.
!              */
!             StrategyInvalidateBuffer(bufHdr);
!         }
      }
-
-     LWLockRelease(BufMgrLock);
  }

  /* ---------------------------------------------------------------------
--- 1363,1385 ----
                           bufHdr->tag.rnode.dbNode,
                           bufHdr->tag.rnode.relNode,
                           LocalRefCount[i]);
!                 CLEAR_BUFFERTAG(bufHdr->tag);
!                 bufHdr->flags = 0;
              }
          }
          return;
      }

!     for (i = 0; i < NBuffers; i++)
      {
!         bufHdr = &BufferDescriptors[i];
!         LockBufHdr(bufHdr);
          if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
              bufHdr->tag.blockNum >= firstDelBlock)
!             InvalidateBuffer(bufHdr);        /* releases spinlock */
!         else
!             UnlockBufHdr(bufHdr);
      }
  }

  /* ---------------------------------------------------------------------
***************
*** 1285,1331 ****
      int            i;
      BufferDesc *bufHdr;

!     LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);

!     for (i = 1; i <= NBuffers; i++)
      {
!         bufHdr = &BufferDescriptors[i - 1];
! recheck:
          if (bufHdr->tag.rnode.dbNode == dbid)
!         {
!             /*
!              * If there is I/O in progress, better wait till it's done;
!              * don't want to delete the database out from under someone
!              * who's just trying to flush the buffer!
!              */
!             if (bufHdr->flags & BM_IO_IN_PROGRESS)
!             {
!                 WaitIO(bufHdr);
!
!                 /*
!                  * By now, the buffer very possibly belongs to some other
!                  * DB, so check again before proceeding.
!                  */
!                 goto recheck;
!             }
!             /* Now we can do what we came for */
!             bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
!             bufHdr->cntxDirty = false;
!
!             /*
!              * The thing should be free, if caller has checked that no
!              * backends are running in that database.
!              */
!             Assert(bufHdr->refcount == 0);
!
!             /*
!              * And mark the buffer as no longer occupied by this page.
!              */
!             StrategyInvalidateBuffer(bufHdr);
!         }
      }
-
-     LWLockRelease(BufMgrLock);
  }

  /* -----------------------------------------------------------------
--- 1399,1418 ----
      int            i;
      BufferDesc *bufHdr;

!     /*
!      * We needn't consider local buffers, since by assumption the target
!      * database isn't our own.
!      */

!     for (i = 0; i < NBuffers; i++)
      {
!         bufHdr = &BufferDescriptors[i];
!         LockBufHdr(bufHdr);
          if (bufHdr->tag.rnode.dbNode == dbid)
!             InvalidateBuffer(bufHdr);        /* releases spinlock */
!         else
!             UnlockBufHdr(bufHdr);
      }
  }

  /* -----------------------------------------------------------------
***************
*** 1342,1373 ****
      int            i;
      BufferDesc *buf = BufferDescriptors;

!     if (IsUnderPostmaster)
!     {
!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
!         for (i = 0; i < NBuffers; ++i, ++buf)
!         {
!             elog(LOG,
!                  "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, "
!                  "blockNum=%u, flags=0x%x, refcount=%u %d)",
!                  i, buf->freeNext, buf->freePrev,
!                  buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!                  buf->tag.rnode.relNode,
!                  buf->tag.blockNum, buf->flags,
!                  buf->refcount, PrivateRefCount[i]);
!         }
!         LWLockRelease(BufMgrLock);
!     }
!     else
      {
!         /* interactive backend */
!         for (i = 0; i < NBuffers; ++i, ++buf)
!         {
!             printf("[%-2d] (%u/%u/%u, %u) flags=0x%x, refcount=%u %d)\n",
!                    i, buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!                    buf->tag.rnode.relNode, buf->tag.blockNum,
!                    buf->flags, buf->refcount, PrivateRefCount[i]);
!         }
      }
  }
  #endif
--- 1429,1445 ----
      int            i;
      BufferDesc *buf = BufferDescriptors;

!     for (i = 0; i < NBuffers; ++i, ++buf)
      {
!         /* theoretically we should lock the bufhdr here */
!         elog(LOG,
!              "[%02d] (freeNext=%d, rel=%u/%u/%u, "
!              "blockNum=%u, flags=0x%x, refcount=%u %d)",
!              i, buf->freeNext,
!              buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!              buf->tag.rnode.relNode,
!              buf->tag.blockNum, buf->flags,
!              buf->refcount, PrivateRefCount[i]);
      }
  }
  #endif
***************
*** 1379,1398 ****
      int            i;
      BufferDesc *buf = BufferDescriptors;

-     LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
      for (i = 0; i < NBuffers; ++i, ++buf)
      {
          if (PrivateRefCount[i] > 0)
!             elog(NOTICE,
!                  "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, "
                   "blockNum=%u, flags=0x%x, refcount=%u %d)",
!                  i, buf->freeNext, buf->freePrev,
                   buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
                   buf->tag.rnode.relNode,
                   buf->tag.blockNum, buf->flags,
                   buf->refcount, PrivateRefCount[i]);
      }
-     LWLockRelease(BufMgrLock);
  }
  #endif

--- 1451,1471 ----
      int            i;
      BufferDesc *buf = BufferDescriptors;

      for (i = 0; i < NBuffers; ++i, ++buf)
      {
          if (PrivateRefCount[i] > 0)
!         {
!             /* theoretically we should lock the bufhdr here */
!             elog(LOG,
!                  "[%02d] (freeNext=%d, rel=%u/%u/%u, "
                   "blockNum=%u, flags=0x%x, refcount=%u %d)",
!                  i, buf->freeNext,
                   buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
                   buf->tag.rnode.relNode,
                   buf->tag.blockNum, buf->flags,
                   buf->refcount, PrivateRefCount[i]);
+         }
      }
  }
  #endif

***************
*** 1451,1458 ****
              bufHdr = &LocalBufferDescriptors[i];
              if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
              {
!                 if ((bufHdr->flags & BM_VALID) &&
!                     (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
                  {
                      ErrorContextCallback errcontext;

--- 1524,1530 ----
              bufHdr = &LocalBufferDescriptors[i];
              if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
              {
!                 if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
                  {
                      ErrorContextCallback errcontext;

***************
*** 1464,1474 ****

                      smgrwrite(rel->rd_smgr,
                                bufHdr->tag.blockNum,
!                               (char *) MAKE_PTR(bufHdr->data),
                                true);

                      bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
-                     bufHdr->cntxDirty = false;

                      /* Pop the error context stack */
                      error_context_stack = errcontext.previous;
--- 1536,1545 ----

                      smgrwrite(rel->rd_smgr,
                                bufHdr->tag.blockNum,
!                               (char *) LocalBufHdrGetBlock(bufHdr),
                                true);

                      bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);

                      /* Pop the error context stack */
                      error_context_stack = errcontext.previous;
***************
*** 1478,1484 ****
                           RelationGetRelationName(rel), firstDelBlock,
                           bufHdr->tag.blockNum, LocalRefCount[i]);
                  if (bufHdr->tag.blockNum >= firstDelBlock)
!                     bufHdr->tag.rnode.relNode = InvalidOid;
              }
          }

--- 1549,1558 ----
                           RelationGetRelationName(rel), firstDelBlock,
                           bufHdr->tag.blockNum, LocalRefCount[i]);
                  if (bufHdr->tag.blockNum >= firstDelBlock)
!                 {
!                     CLEAR_BUFFERTAG(bufHdr->tag);
!                     bufHdr->flags = 0;
!                 }
              }
          }

***************
*** 1488,1533 ****
      /* Make sure we can handle the pin inside the loop */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

-     LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
-
      for (i = 0; i < NBuffers; i++)
      {
          bufHdr = &BufferDescriptors[i];
          if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
          {
!             if ((bufHdr->flags & BM_VALID) &&
!                 (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
              {
!                 PinBuffer(bufHdr, true);
!                 /* Someone else might be flushing buffer */
!                 if (bufHdr->flags & BM_IO_IN_PROGRESS)
!                     WaitIO(bufHdr);
!                 /* Still dirty? */
!                 if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
!                 {
!                     StartBufferIO(bufHdr, false);
!
!                     FlushBuffer(bufHdr, rel->rd_smgr, false);
!
!                     TerminateBufferIO(bufHdr, 0);
!                 }
!                 UnpinBuffer(bufHdr, true);
!                 if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
!                     elog(ERROR, "FlushRelationBuffers(\"%s\", %u): block %u was re-dirtied",
!                          RelationGetRelationName(rel), firstDelBlock,
!                          bufHdr->tag.blockNum);
              }
!             if (bufHdr->refcount != 0)
!                 elog(ERROR, "FlushRelationBuffers(\"%s\", %u): block %u is referenced (private %d, global %u)",
!                      RelationGetRelationName(rel), firstDelBlock,
!                      bufHdr->tag.blockNum,
!                      PrivateRefCount[i], bufHdr->refcount);
              if (bufHdr->tag.blockNum >= firstDelBlock)
!                 StrategyInvalidateBuffer(bufHdr);
          }
      }
-
-     LWLockRelease(BufMgrLock);
  }

  /*
--- 1562,1601 ----
      /* Make sure we can handle the pin inside the loop */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

      for (i = 0; i < NBuffers; i++)
      {
          bufHdr = &BufferDescriptors[i];
+     recheck:
+         LockBufHdr(bufHdr);
          if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
          {
!             if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
              {
!                 PinBuffer_Locked(bufHdr);
!                 LWLockAcquire(bufHdr->content_lock, LW_SHARED);
!                 FlushBuffer(bufHdr, rel->rd_smgr);
!                 LWLockRelease(bufHdr->content_lock);
!                 UnpinBuffer(bufHdr, true, false /* no freelist change */ );
!                 /*
!                  * As soon as we unpin, it's possible for someone to take
!                  * the buffer away from us; so loop back to re-lock and
!                  * re-check if it still belongs to the target relation.
!                  */
!                 goto recheck;
              }
!             /*
!              * Even though it's not dirty, it could still be pinned because
!              * TerminateIO and UnpinBuffer are separate actions.  Hence,
!              * we can't error out on nonzero reference count here.
!              */
              if (bufHdr->tag.blockNum >= firstDelBlock)
!                 InvalidateBuffer(bufHdr);        /* releases spinlock */
!             else
!                 UnlockBufHdr(bufHdr);
          }
+         else
+             UnlockBufHdr(bufHdr);
      }
  }

  /*
***************
*** 1547,1553 ****
--- 1615,1623 ----
      if (BufferIsLocal(buffer))
      {
          Assert(LocalRefCount[-buffer - 1] > 0);
+         bufHdr = &LocalBufferDescriptors[-buffer - 1];
          LocalRefCount[-buffer - 1]--;
+         bufHdr->flags |= BM_RECENTLY_USED;
          return;
      }

***************
*** 1558,1568 ****
      if (PrivateRefCount[buffer - 1] > 1)
          PrivateRefCount[buffer - 1]--;
      else
!     {
!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
!         UnpinBuffer(bufHdr, false);
!         LWLockRelease(BufMgrLock);
!     }
  }

  /*
--- 1628,1634 ----
      if (PrivateRefCount[buffer - 1] > 1)
          PrivateRefCount[buffer - 1]--;
      else
!         UnpinBuffer(bufHdr, false, true);
  }

  /*
***************
*** 1585,1672 ****
          PrivateRefCount[buffer - 1]++;
  }

- #ifdef NOT_USED
- void
- IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
- {
-     IncrBufferRefCount(buffer);
-     if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
-     {
-         BufferDesc *buf = &BufferDescriptors[buffer - 1];
-
-         fprintf(stderr,
-                 "PIN(Incr) %d rel = %u/%u/%u, blockNum = %u, "
-                 "refcount = %d, file: %s, line: %d\n",
-                 buffer,
-                 buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
-                 buf->tag.rnode.relNode, buf->tag.blockNum,
-                 PrivateRefCount[buffer - 1], file, line);
-     }
- }
- #endif
-
- #ifdef NOT_USED
- void
- ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
- {
-     ReleaseBuffer(buffer);
-     if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
-     {
-         BufferDesc *buf = &BufferDescriptors[buffer - 1];
-
-         fprintf(stderr,
-                 "UNPIN(Rel) %d rel = %u/%u/%u, blockNum = %u, "
-                 "refcount = %d, file: %s, line: %d\n",
-                 buffer,
-                 buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
-                 buf->tag.rnode.relNode, buf->tag.blockNum,
-                 PrivateRefCount[buffer - 1], file, line);
-     }
- }
- #endif
-
- #ifdef NOT_USED
- Buffer
- ReleaseAndReadBuffer_Debug(char *file,
-                            int line,
-                            Buffer buffer,
-                            Relation relation,
-                            BlockNumber blockNum)
- {
-     bool        bufferValid;
-     Buffer        b;
-
-     bufferValid = BufferIsValid(buffer);
-     b = ReleaseAndReadBuffer(buffer, relation, blockNum);
-     if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
-         && is_userbuffer(buffer))
-     {
-         BufferDesc *buf = &BufferDescriptors[buffer - 1];
-
-         fprintf(stderr,
-                 "UNPIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, "
-                 "refcount = %d, file: %s, line: %d\n",
-                 buffer,
-                 buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
-                 buf->tag.rnode.relNode, buf->tag.blockNum,
-                 PrivateRefCount[buffer - 1], file, line);
-     }
-     if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
-     {
-         BufferDesc *buf = &BufferDescriptors[b - 1];
-
-         fprintf(stderr,
-                 "PIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, "
-                 "refcount = %d, file: %s, line: %d\n",
-                 b,
-                 buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
-                 buf->tag.rnode.relNode, buf->tag.blockNum,
-                 PrivateRefCount[b - 1], file, line);
-     }
-     return b;
- }
- #endif
-
  /*
   * SetBufferCommitInfoNeedsSave
   *
--- 1651,1656 ----
***************
*** 1682,1688 ****
   * This routine might get called many times on the same page, if we are making
   * the first scan after commit of an xact that added/deleted many tuples.
   * So, be as quick as we can if the buffer is already dirty.  We do this by
!  * not acquiring BufMgrLock if it looks like the status bits are already OK.
   * (Note it is okay if someone else clears BM_JUST_DIRTIED immediately after
   * we look, because the buffer content update is already done and will be
   * reflected in the I/O.)
--- 1666,1672 ----
   * This routine might get called many times on the same page, if we are making
   * the first scan after commit of an xact that added/deleted many tuples.
   * So, be as quick as we can if the buffer is already dirty.  We do this by
!  * not acquiring spinlock if it looks like the status bits are already OK.
   * (Note it is okay if someone else clears BM_JUST_DIRTIED immediately after
   * we look, because the buffer content update is already done and will be
   * reflected in the I/O.)
***************
*** 1703,1725 ****

      bufHdr = &BufferDescriptors[buffer - 1];

      if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
          (BM_DIRTY | BM_JUST_DIRTIED))
      {
!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
          Assert(bufHdr->refcount > 0);
          bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         LWLockRelease(BufMgrLock);
      }
  }

  /*
!  * Release buffer context locks for shared buffers.
   *
   * Used to clean up after errors.
   *
   * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
!  * of releasing buffer context locks per se; the only thing we need to deal
   * with here is clearing any PIN_COUNT request that was in progress.
   */
  void
--- 1687,1711 ----

      bufHdr = &BufferDescriptors[buffer - 1];

+     Assert(PrivateRefCount[buffer - 1] > 0);
+
      if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
          (BM_DIRTY | BM_JUST_DIRTIED))
      {
!         LockBufHdr(bufHdr);
          Assert(bufHdr->refcount > 0);
          bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         UnlockBufHdr(bufHdr);
      }
  }

  /*
!  * Release buffer content locks for shared buffers.
   *
   * Used to clean up after errors.
   *
   * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
!  * of releasing buffer content locks per se; the only thing we need to deal
   * with here is clearing any PIN_COUNT request that was in progress.
   */
  void
***************
*** 1731,1737 ****
      {
          HOLD_INTERRUPTS();        /* don't want to die() partway through... */

!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);

          /*
           * Don't complain if flag bit not set; it could have been
--- 1717,1723 ----
      {
          HOLD_INTERRUPTS();        /* don't want to die() partway through... */

!         LockBufHdr_NoHoldoff(buf);

          /*
           * Don't complain if flag bit not set; it could have been
***************
*** 1741,1758 ****
          if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
              buf->wait_backend_id == MyBackendId)
              buf->flags &= ~BM_PIN_COUNT_WAITER;
!         LWLockRelease(BufMgrLock);

          ProcCancelWaitForSignal();

          RESUME_INTERRUPTS();
      }
-
-     PinCountWaitBuf = NULL;
  }

  /*
!  * Acquire or release the cntx_lock for the buffer.
   */
  void
  LockBuffer(Buffer buffer, int mode)
--- 1727,1745 ----
          if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
              buf->wait_backend_id == MyBackendId)
              buf->flags &= ~BM_PIN_COUNT_WAITER;
!
!         UnlockBufHdr_NoHoldoff(buf);

          ProcCancelWaitForSignal();

+         PinCountWaitBuf = NULL;
+
          RESUME_INTERRUPTS();
      }
  }

  /*
!  * Acquire or release the content_lock for the buffer.
   */
  void
  LockBuffer(Buffer buffer, int mode)
***************
*** 1766,1792 ****
      buf = &(BufferDescriptors[buffer - 1]);

      if (mode == BUFFER_LOCK_UNLOCK)
!         LWLockRelease(buf->cntx_lock);
      else if (mode == BUFFER_LOCK_SHARE)
!         LWLockAcquire(buf->cntx_lock, LW_SHARED);
      else if (mode == BUFFER_LOCK_EXCLUSIVE)
      {
!         LWLockAcquire(buf->cntx_lock, LW_EXCLUSIVE);

          /*
!          * This is not the best place to set cntxDirty flag (eg indices do
           * not always change buffer they lock in excl mode). But please
!          * remember that it's critical to set cntxDirty *before* logging
!          * changes with XLogInsert() - see comments in BufferSync().
           */
!         buf->cntxDirty = true;
      }
      else
          elog(ERROR, "unrecognized buffer lock mode: %d", mode);
  }

  /*
!  * Acquire the cntx_lock for the buffer, but only if we don't have to wait.
   *
   * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
   */
--- 1753,1781 ----
      buf = &(BufferDescriptors[buffer - 1]);

      if (mode == BUFFER_LOCK_UNLOCK)
!         LWLockRelease(buf->content_lock);
      else if (mode == BUFFER_LOCK_SHARE)
!         LWLockAcquire(buf->content_lock, LW_SHARED);
      else if (mode == BUFFER_LOCK_EXCLUSIVE)
      {
!         LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);

          /*
!          * This is not the best place to mark buffer dirty (eg indices do
           * not always change buffer they lock in excl mode). But please
!          * remember that it's critical to set dirty bit *before* logging
!          * changes with XLogInsert() - see comments in SyncOneBuffer().
           */
!         LockBufHdr_NoHoldoff(buf);
!         buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         UnlockBufHdr_NoHoldoff(buf);
      }
      else
          elog(ERROR, "unrecognized buffer lock mode: %d", mode);
  }

  /*
!  * Acquire the content_lock for the buffer, but only if we don't have to wait.
   *
   * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
   */
***************
*** 1801,1815 ****

      buf = &(BufferDescriptors[buffer - 1]);

!     if (LWLockConditionalAcquire(buf->cntx_lock, LW_EXCLUSIVE))
      {
          /*
!          * This is not the best place to set cntxDirty flag (eg indices do
           * not always change buffer they lock in excl mode). But please
!          * remember that it's critical to set cntxDirty *before* logging
!          * changes with XLogInsert() - see comments in BufferSync().
           */
!         buf->cntxDirty = true;

          return true;
      }
--- 1790,1806 ----

      buf = &(BufferDescriptors[buffer - 1]);

!     if (LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE))
      {
          /*
!          * This is not the best place to mark buffer dirty (eg indices do
           * not always change buffer they lock in excl mode). But please
!          * remember that it's critical to set dirty bit *before* logging
!          * changes with XLogInsert() - see comments in SyncOneBuffer().
           */
!         LockBufHdr_NoHoldoff(buf);
!         buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
!         UnlockBufHdr_NoHoldoff(buf);

          return true;
      }
***************
*** 1861,1885 ****
      {
          /* Try to acquire lock */
          LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
          Assert(bufHdr->refcount > 0);
          if (bufHdr->refcount == 1)
          {
              /* Successfully acquired exclusive lock with pincount 1 */
!             LWLockRelease(BufMgrLock);
              return;
          }
          /* Failed, so mark myself as waiting for pincount 1 */
          if (bufHdr->flags & BM_PIN_COUNT_WAITER)
          {
!             LWLockRelease(BufMgrLock);
              LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
              elog(ERROR, "multiple backends attempting to wait for pincount 1");
          }
          bufHdr->wait_backend_id = MyBackendId;
          bufHdr->flags |= BM_PIN_COUNT_WAITER;
          PinCountWaitBuf = bufHdr;
!         LWLockRelease(BufMgrLock);
          LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
          /* Wait to be signaled by UnpinBuffer() */
          ProcWaitForSignal();
--- 1852,1876 ----
      {
          /* Try to acquire lock */
          LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
!         LockBufHdr_NoHoldoff(bufHdr);
          Assert(bufHdr->refcount > 0);
          if (bufHdr->refcount == 1)
          {
              /* Successfully acquired exclusive lock with pincount 1 */
!             UnlockBufHdr_NoHoldoff(bufHdr);
              return;
          }
          /* Failed, so mark myself as waiting for pincount 1 */
          if (bufHdr->flags & BM_PIN_COUNT_WAITER)
          {
!             UnlockBufHdr_NoHoldoff(bufHdr);
              LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
              elog(ERROR, "multiple backends attempting to wait for pincount 1");
          }
          bufHdr->wait_backend_id = MyBackendId;
          bufHdr->flags |= BM_PIN_COUNT_WAITER;
          PinCountWaitBuf = bufHdr;
!         UnlockBufHdr_NoHoldoff(bufHdr);
          LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
          /* Wait to be signaled by UnpinBuffer() */
          ProcWaitForSignal();
***************
*** 1889,1982 ****
  }

  /*
!  *    Functions for IO error handling
   *
!  *    Note: We assume that nested buffer IO never occurs.
   *    i.e at most one io_in_progress lock is held per proc.
   */

  /*
!  * Function:StartBufferIO
   *    (Assumptions)
   *    My process is executing no IO
-  *    BufMgrLock is held
-  *    BM_IO_IN_PROGRESS mask is not set for the buffer
   *    The buffer is Pinned
   *
!  * Because BufMgrLock is held, we are already in an interrupt holdoff here,
!  * and do not need another.
   */
! static void
  StartBufferIO(BufferDesc *buf, bool forInput)
  {
      Assert(!InProgressBuf);
!     Assert(!(buf->flags & BM_IO_IN_PROGRESS));
      buf->flags |= BM_IO_IN_PROGRESS;

!     LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

      InProgressBuf = buf;
      IsForInput = forInput;
  }

  /*
!  * Function:TerminateBufferIO
   *    (Assumptions)
   *    My process is executing IO for the buffer
!  *    BufMgrLock is held
!  *    BM_IO_IN_PROGRESS mask is set for the buffer
   *    The buffer is Pinned
   *
!  * err_flag must be 0 for successful completion and BM_IO_ERROR for failure.
!  *
!  * Because BufMgrLock is held, we are already in an interrupt holdoff here,
!  * and do not need another.
   */
  static void
! TerminateBufferIO(BufferDesc *buf, int err_flag)
  {
      Assert(buf == InProgressBuf);
      Assert(buf->flags & BM_IO_IN_PROGRESS);
      buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
!     buf->flags |= err_flag;

!     LWLockRelease(buf->io_in_progress_lock);

      InProgressBuf = NULL;
- }
-
- /*
-  * Function:ContinueBufferIO
-  *    (Assumptions)
-  *    My process is executing IO for the buffer
-  *    BufMgrLock is held
-  *    The buffer is Pinned
-  *
-  * Because BufMgrLock is held, we are already in an interrupt holdoff here,
-  * and do not need another.
-  */
- static void
- ContinueBufferIO(BufferDesc *buf, bool forInput)
- {
-     Assert(buf == InProgressBuf);
-     Assert(buf->flags & BM_IO_IN_PROGRESS);
-     IsForInput = forInput;
- }

! #ifdef NOT_USED
! void
! InitBufferIO(void)
! {
!     InProgressBuf = NULL;
  }
- #endif

  /*
!  *    Clean up any active buffer I/O after an error.
!  *    BufMgrLock isn't held when this function is called,
   *    but we haven't yet released buffer pins, so the buffer is still pinned.
   *
!  *    If I/O was in progress, we always set BM_IO_ERROR.
   */
  void
  AbortBufferIO(void)
--- 1880,2039 ----
  }

  /*
!  *    Functions for buffer I/O handling
   *
!  *    Note: We assume that nested buffer I/O never occurs.
   *    i.e at most one io_in_progress lock is held per proc.
+  *
+  *    Also note that these are used only for shared buffers, not local ones.
   */

  /*
!  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
!  */
! static void
! WaitIO(BufferDesc *buf)
! {
!     /*
!      * Changed to wait until there's no IO - Inoue 01/13/2000
!      *
!      * Note this is *necessary* because an error abort in the process doing
!      * I/O could release the io_in_progress_lock prematurely. See
!      * AbortBufferIO.
!      */
!     for (;;)
!     {
!         BufFlags    sv_flags;
!
!         /*
!          * It may not be necessary to acquire the spinlock to check the
!          * flag here, but since this test is essential for correctness,
!          * we'd better play it safe.
!          */
!         LockBufHdr(buf);
!         sv_flags = buf->flags;
!         UnlockBufHdr(buf);
!         if (!(sv_flags & BM_IO_IN_PROGRESS))
!             break;
!         LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
!         LWLockRelease(buf->io_in_progress_lock);
!     }
! }
!
! /*
!  * StartBufferIO: begin I/O on this buffer
   *    (Assumptions)
   *    My process is executing no IO
   *    The buffer is Pinned
   *
!  * In some scenarios there are race conditions in which multiple backends
!  * could attempt the same I/O operation concurrently.  If someone else
!  * has already started I/O on this buffer then we will block on the
!  * io_in_progress lock until he's done.
!  *
!  * Input operations are only attempted on buffers that are not BM_VALID,
!  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
!  * so we can always tell if the work is already done.
!  *
!  * Returns TRUE if we successfully marked the buffer as I/O busy,
!  * FALSE if someone else already did the work.
   */
! static bool
  StartBufferIO(BufferDesc *buf, bool forInput)
  {
      Assert(!InProgressBuf);
!
!     for (;;)
!     {
!         /*
!          * Grab the io_in_progress lock so that other processes can wait for
!          * me to finish the I/O.
!          */
!         LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
!
!         /* NoHoldoff is OK since we now have an LWLock */
!         LockBufHdr_NoHoldoff(buf);
!
!         if (!(buf->flags & BM_IO_IN_PROGRESS))
!             break;
!
!         /*
!          * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
!          * lock isn't held is if the process doing the I/O is recovering from
!          * an error (see AbortBufferIO).  If that's the case, we must wait for
!          * him to get unwedged.
!          */
!         UnlockBufHdr_NoHoldoff(buf);
!         LWLockRelease(buf->io_in_progress_lock);
!         WaitIO(buf);
!     }
!
!     /* Once we get here, there is definitely no I/O active on this buffer */
!
!     if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
!     {
!         /* someone else already did the I/O */
!         UnlockBufHdr_NoHoldoff(buf);
!         LWLockRelease(buf->io_in_progress_lock);
!         return false;
!     }
!
      buf->flags |= BM_IO_IN_PROGRESS;

!     UnlockBufHdr_NoHoldoff(buf);

      InProgressBuf = buf;
      IsForInput = forInput;
+
+     return true;
  }

  /*
!  * TerminateBufferIO: release a buffer we were doing I/O on
   *    (Assumptions)
   *    My process is executing IO for the buffer
!  *    BM_IO_IN_PROGRESS bit is set for the buffer
!  *    We hold the buffer's io_in_progress lock
   *    The buffer is Pinned
   *
!  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
!  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
!  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
!  * marking the buffer clean if it was re-dirtied while we were writing.
!  *
!  * set_flag_bits gets ORed into the buffer's flags.  It must include
!  * BM_IO_ERROR in a failure case.  For successful completion it could
!  * be 0, or BM_VALID if we just finished reading in the page.
   */
  static void
! TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits)
  {
      Assert(buf == InProgressBuf);
+
+     /* NoHoldoff is OK since we must have an LWLock */
+     LockBufHdr_NoHoldoff(buf);
+
      Assert(buf->flags & BM_IO_IN_PROGRESS);
      buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
!     if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
!         buf->flags &= ~BM_DIRTY;
!     buf->flags |= set_flag_bits;

!     UnlockBufHdr_NoHoldoff(buf);

      InProgressBuf = NULL;

!     LWLockRelease(buf->io_in_progress_lock);
  }

  /*
!  * AbortBufferIO: Clean up any active buffer I/O after an error.
!  *
!  *    All LWLocks we might have held have been released,
   *    but we haven't yet released buffer pins, so the buffer is still pinned.
   *
!  *    If I/O was in progress, we always set BM_IO_ERROR, even though it's
!  *    possible the error condition wasn't related to the I/O.
   */
  void
  AbortBufferIO(void)
***************
*** 1994,2013 ****
           */
          LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

!         LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
          Assert(buf->flags & BM_IO_IN_PROGRESS);
          if (IsForInput)
          {
!             Assert(!(buf->flags & BM_DIRTY || buf->cntxDirty));
              /* We'd better not think buffer is valid yet */
              Assert(!(buf->flags & BM_VALID));
          }
          else
          {
!             Assert(buf->flags & BM_DIRTY || buf->cntxDirty);
              /* Issue notice if this is not the first failure... */
!             if (buf->flags & BM_IO_ERROR)
              {
                  ereport(WARNING,
                          (errcode(ERRCODE_IO_ERROR),
                           errmsg("could not write block %u of %u/%u/%u",
--- 2051,2077 ----
           */
          LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

!         /* NoHoldoff is OK since we now have an LWLock */
!         LockBufHdr_NoHoldoff(buf);
          Assert(buf->flags & BM_IO_IN_PROGRESS);
          if (IsForInput)
          {
!             Assert(!(buf->flags & BM_DIRTY));
              /* We'd better not think buffer is valid yet */
              Assert(!(buf->flags & BM_VALID));
+             UnlockBufHdr_NoHoldoff(buf);
          }
          else
          {
!             BufFlags    sv_flags;
!
!             sv_flags = buf->flags;
!             Assert(sv_flags & BM_DIRTY);
!             UnlockBufHdr_NoHoldoff(buf);
              /* Issue notice if this is not the first failure... */
!             if (sv_flags & BM_IO_ERROR)
              {
+                 /* Buffer is pinned, so we can read tag without spinlock */
                  ereport(WARNING,
                          (errcode(ERRCODE_IO_ERROR),
                           errmsg("could not write block %u of %u/%u/%u",
***************
*** 2017,2026 ****
                                  buf->tag.rnode.relNode),
                           errdetail("Multiple failures --- write error may be permanent.")));
              }
-             buf->flags |= BM_DIRTY;
          }
!         TerminateBufferIO(buf, BM_IO_ERROR);
!         LWLockRelease(BufMgrLock);
      }
  }

--- 2081,2088 ----
                                  buf->tag.rnode.relNode),
                           errdetail("Multiple failures --- write error may be permanent.")));
              }
          }
!         TerminateBufferIO(buf, false, BM_IO_ERROR);
      }
  }

***************
*** 2032,2037 ****
--- 2094,2100 ----
  {
      BufferDesc *bufHdr = (BufferDesc *) arg;

+     /* Buffer is pinned, so we can read the tag without locking the spinlock */
      if (bufHdr != NULL)
          errcontext("writing block %u of relation %u/%u/%u",
                     bufHdr->tag.blockNum,
*** src/backend/storage/buffer/freelist.c.orig    Sun Feb 13 13:08:21 2005
--- src/backend/storage/buffer/freelist.c    Tue Feb 15 13:37:59 2005
***************
*** 1,15 ****
  /*-------------------------------------------------------------------------
   *
   * freelist.c
!  *      routines for manipulating the buffer pool's replacement strategy.
!  *
!  * The name "freelist.c" is now a bit of a misnomer, since this module
!  * controls not only the list of free buffers per se, but the entire
!  * mechanism for looking up existing shared buffers and the strategy
!  * for choosing replacement victims when needed.
!  *
!  * Note: all routines in this file assume that the BufMgrLock is held
!  * by the caller, so no synchronization is needed.
   *
   *
   * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
--- 1,7 ----
  /*-------------------------------------------------------------------------
   *
   * freelist.c
!  *      routines for managing the buffer pool's replacement strategy.
   *
   *
   * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
***************
*** 23,402 ****
   */
  #include "postgres.h"

- #include <time.h>
-
- #include "access/xact.h"
  #include "storage/buf_internals.h"
  #include "storage/bufmgr.h"


  /*
!  * Definitions for the buffer replacement strategy
!  */
! #define STRAT_LIST_UNUSED    (-1)
! #define STRAT_LIST_B1        0
! #define STRAT_LIST_T1        1
! #define STRAT_LIST_T2        2
! #define STRAT_LIST_B2        3
! #define STRAT_NUM_LISTS        4
!
! /*
!  * The Cache Directory Block (CDB) of the Adaptive Replacement Cache (ARC)
   */
  typedef struct
  {
!     int            prev;            /* list links */
!     int            next;
!     short        list;            /* ID of list it is currently in */
!     bool        t1_vacuum;        /* t => present only because of VACUUM */
!     TransactionId t1_xid;        /* the xid this entry went onto T1 */
!     BufferTag    buf_tag;        /* page identifier */
!     int            buf_id;            /* currently assigned data buffer, or -1 */
! } BufferStrategyCDB;

! /*
!  * The shared ARC control information.
!  */
! typedef struct
! {
!     int            target_T1_size; /* What T1 size are we aiming for */
!     int            listUnusedCDB;    /* All unused StrategyCDB */
!     int            listHead[STRAT_NUM_LISTS];        /* ARC lists B1, T1, T2
!                                                  * and B2 */
!     int            listTail[STRAT_NUM_LISTS];
!     int            listSize[STRAT_NUM_LISTS];
!     Buffer        listFreeBuffers;    /* List of unused buffers */
!
!     long        num_lookup;        /* Some hit statistics */
!     long        num_hit[STRAT_NUM_LISTS];
!     time_t        stat_report;

!     /* Array of CDB's starts here */
!     BufferStrategyCDB cdb[1];    /* VARIABLE SIZE ARRAY */
  } BufferStrategyControl;

  /* GUC variable: time in seconds between statistics reports */
  int            DebugSharedBuffers = 0;

  /* Pointers to shared state */
  static BufferStrategyControl *StrategyControl = NULL;
- static BufferStrategyCDB *StrategyCDB = NULL;

  /* Backend-local state about whether currently vacuuming */
! static bool strategy_hint_vacuum = false;
! static TransactionId strategy_vacuum_xid;
!
!
! #define T1_TARGET    (StrategyControl->target_T1_size)
! #define B1_LENGTH    (StrategyControl->listSize[STRAT_LIST_B1])
! #define T1_LENGTH    (StrategyControl->listSize[STRAT_LIST_T1])
! #define T2_LENGTH    (StrategyControl->listSize[STRAT_LIST_T2])
! #define B2_LENGTH    (StrategyControl->listSize[STRAT_LIST_B2])
!
!
! /*
!  * Macro to remove a CDB from whichever list it currently is on
!  */
! #define STRAT_LIST_REMOVE(cdb) \
! do { \
!     Assert((cdb)->list >= 0 && (cdb)->list < STRAT_NUM_LISTS);    \
!     if ((cdb)->prev < 0)                                        \
!         StrategyControl->listHead[(cdb)->list] = (cdb)->next;    \
!     else                                                        \
!         StrategyCDB[(cdb)->prev].next = (cdb)->next;            \
!     if ((cdb)->next < 0)                                        \
!         StrategyControl->listTail[(cdb)->list] = (cdb)->prev;    \
!     else                                                        \
!         StrategyCDB[(cdb)->next].prev = (cdb)->prev;            \
!     StrategyControl->listSize[(cdb)->list]--;                    \
!     (cdb)->list = STRAT_LIST_UNUSED;                            \
! } while(0)
!
! /*
!  * Macro to add a CDB to the tail of a list (MRU position)
!  */
! #define STRAT_MRU_INSERT(cdb,l) \
! do { \
!     Assert((cdb)->list == STRAT_LIST_UNUSED);                    \
!     if (StrategyControl->listTail[(l)] < 0)                        \
!     {                                                            \
!         (cdb)->prev = (cdb)->next = -1;                            \
!         StrategyControl->listHead[(l)] =                        \
!             StrategyControl->listTail[(l)] =                    \
!             ((cdb) - StrategyCDB);                                \
!     }                                                            \
!     else                                                        \
!     {                                                            \
!         (cdb)->next = -1;                                        \
!         (cdb)->prev = StrategyControl->listTail[(l)];            \
!         StrategyCDB[StrategyControl->listTail[(l)]].next =        \
!             ((cdb) - StrategyCDB);                                \
!         StrategyControl->listTail[(l)] =                        \
!             ((cdb) - StrategyCDB);                                \
!     }                                                            \
!     StrategyControl->listSize[(l)]++;                            \
!     (cdb)->list = (l);                                            \
! } while(0)
!
! /*
!  * Macro to add a CDB to the head of a list (LRU position)
!  */
! #define STRAT_LRU_INSERT(cdb,l) \
! do { \
!     Assert((cdb)->list == STRAT_LIST_UNUSED);                    \
!     if (StrategyControl->listHead[(l)] < 0)                        \
!     {                                                            \
!         (cdb)->prev = (cdb)->next = -1;                            \
!         StrategyControl->listHead[(l)] =                        \
!             StrategyControl->listTail[(l)] =                    \
!             ((cdb) - StrategyCDB);                                \
!     }                                                            \
!     else                                                        \
!     {                                                            \
!         (cdb)->prev = -1;                                        \
!         (cdb)->next = StrategyControl->listHead[(l)];            \
!         StrategyCDB[StrategyControl->listHead[(l)]].prev =        \
!             ((cdb) - StrategyCDB);                                \
!         StrategyControl->listHead[(l)] =                        \
!             ((cdb) - StrategyCDB);                                \
!     }                                                            \
!     StrategyControl->listSize[(l)]++;                            \
!     (cdb)->list = (l);                                            \
! } while(0)
!
!
! /*
!  * Printout for use when DebugSharedBuffers is enabled
!  */
! static void
! StrategyStatsDump(void)
! {
!     time_t        now = time(NULL);
!
!     if (StrategyControl->stat_report + DebugSharedBuffers < now)
!     {
!         long        all_hit,
!                     b1_hit,
!                     t1_hit,
!                     t2_hit,
!                     b2_hit;
!         int            id,
!                     t1_clean,
!                     t2_clean;
!         ErrorContextCallback *errcxtold;
!
!         id = StrategyControl->listHead[STRAT_LIST_T1];
!         t1_clean = 0;
!         while (id >= 0)
!         {
!             if (BufferDescriptors[StrategyCDB[id].buf_id].flags & BM_DIRTY)
!                 break;
!             t1_clean++;
!             id = StrategyCDB[id].next;
!         }
!         id = StrategyControl->listHead[STRAT_LIST_T2];
!         t2_clean = 0;
!         while (id >= 0)
!         {
!             if (BufferDescriptors[StrategyCDB[id].buf_id].flags & BM_DIRTY)
!                 break;
!             t2_clean++;
!             id = StrategyCDB[id].next;
!         }
!
!         if (StrategyControl->num_lookup == 0)
!             all_hit = b1_hit = t1_hit = t2_hit = b2_hit = 0;
!         else
!         {
!             b1_hit = (StrategyControl->num_hit[STRAT_LIST_B1] * 100 /
!                       StrategyControl->num_lookup);
!             t1_hit = (StrategyControl->num_hit[STRAT_LIST_T1] * 100 /
!                       StrategyControl->num_lookup);
!             t2_hit = (StrategyControl->num_hit[STRAT_LIST_T2] * 100 /
!                       StrategyControl->num_lookup);
!             b2_hit = (StrategyControl->num_hit[STRAT_LIST_B2] * 100 /
!                       StrategyControl->num_lookup);
!             all_hit = b1_hit + t1_hit + t2_hit + b2_hit;
!         }
!
!         errcxtold = error_context_stack;
!         error_context_stack = NULL;
!         elog(DEBUG1, "ARC T1target=%5d B1len=%5d T1len=%5d T2len=%5d B2len=%5d",
!              T1_TARGET, B1_LENGTH, T1_LENGTH, T2_LENGTH, B2_LENGTH);
!         elog(DEBUG1, "ARC total   =%4ld%% B1hit=%4ld%% T1hit=%4ld%% T2hit=%4ld%% B2hit=%4ld%%",
!              all_hit, b1_hit, t1_hit, t2_hit, b2_hit);
!         elog(DEBUG1, "ARC clean buffers at LRU       T1=   %5d T2=   %5d",
!              t1_clean, t2_clean);
!         error_context_stack = errcxtold;
!
!         StrategyControl->num_lookup = 0;
!         StrategyControl->num_hit[STRAT_LIST_B1] = 0;
!         StrategyControl->num_hit[STRAT_LIST_T1] = 0;
!         StrategyControl->num_hit[STRAT_LIST_T2] = 0;
!         StrategyControl->num_hit[STRAT_LIST_B2] = 0;
!         StrategyControl->stat_report = now;
!     }
! }
!
! /*
!  * StrategyBufferLookup
!  *
!  *    Lookup a page request in the cache directory. A buffer is only
!  *    returned for a T1 or T2 cache hit. B1 and B2 hits are just
!  *    remembered here, to possibly affect the behaviour later.
!  *
!  *    recheck indicates we are rechecking after I/O wait; do not change
!  *    internal status in this case.
!  *
!  *    *cdb_found_index is set to the index of the found CDB, or -1 if none.
!  *    This is not intended to be used by the caller, except to pass to
!  *    StrategyReplaceBuffer().
!  */
! BufferDesc *
! StrategyBufferLookup(BufferTag *tagPtr, bool recheck,
!                      int *cdb_found_index)
! {
!     BufferStrategyCDB *cdb;
!
!     /* Optional stats printout */
!     if (DebugSharedBuffers > 0)
!         StrategyStatsDump();
!
!     /*
!      * Count lookups
!      */
!     StrategyControl->num_lookup++;
!
!     /*
!      * Lookup the block in the shared hash table
!      */
!     *cdb_found_index = BufTableLookup(tagPtr);
!
!     /*
!      * Done if complete CDB lookup miss
!      */
!     if (*cdb_found_index < 0)
!         return NULL;
!
!     /*
!      * We found a CDB
!      */
!     cdb = &StrategyCDB[*cdb_found_index];
!
!     /*
!      * Count hits
!      */
!     StrategyControl->num_hit[cdb->list]++;
!
!     /*
!      * If this is a T2 hit, we simply move the CDB to the T2 MRU position
!      * and return the found buffer.
!      *
!      * A CDB in T2 cannot have t1_vacuum set, so we needn't check.  However,
!      * if the current process is VACUUM then it doesn't promote to MRU.
!      */
!     if (cdb->list == STRAT_LIST_T2)
!     {
!         if (!strategy_hint_vacuum)
!         {
!             STRAT_LIST_REMOVE(cdb);
!             STRAT_MRU_INSERT(cdb, STRAT_LIST_T2);
!         }
!
!         return &BufferDescriptors[cdb->buf_id];
!     }
!
!     /*
!      * If this is a T1 hit, we move the buffer to the T2 MRU only if
!      * another transaction had read it into T1, *and* neither transaction
!      * is a VACUUM. This is required because any UPDATE or DELETE in
!      * PostgreSQL does multiple ReadBuffer(), first during the scan, later
!      * during the heap_update() or heap_delete().  Otherwise move to T1
!      * MRU.  VACUUM doesn't even get to make that happen.
!      */
!     if (cdb->list == STRAT_LIST_T1)
!     {
!         if (!strategy_hint_vacuum)
!         {
!             if (!cdb->t1_vacuum &&
!                 !TransactionIdEquals(cdb->t1_xid, GetTopTransactionId()))
!             {
!                 STRAT_LIST_REMOVE(cdb);
!                 STRAT_MRU_INSERT(cdb, STRAT_LIST_T2);
!             }
!             else
!             {
!                 STRAT_LIST_REMOVE(cdb);
!                 STRAT_MRU_INSERT(cdb, STRAT_LIST_T1);
!
!                 /*
!                  * If a non-VACUUM process references a page recently
!                  * loaded by VACUUM, clear the stigma; the state will now
!                  * be the same as if this process loaded it originally.
!                  */
!                 if (cdb->t1_vacuum)
!                 {
!                     cdb->t1_xid = GetTopTransactionId();
!                     cdb->t1_vacuum = false;
!                 }
!             }
!         }
!
!         return &BufferDescriptors[cdb->buf_id];
!     }
!
!     /*
!      * In the case of a recheck we don't care about B1 or B2 hits here.
!      * The bufmgr does this call only to make sure no-one faulted in the
!      * block while we where busy flushing another; we don't want to doubly
!      * adjust the T1target.
!      *
!      * Now for this really to end up as a B1 or B2 cache hit, we must have
!      * been flushing for quite some time as the block not only must have
!      * been read, but also traveled through the queue and evicted from the
!      * T cache again already.
!      *
!      * VACUUM re-reads shouldn't adjust the target either.
!      */
!     if (recheck || strategy_hint_vacuum)
!         return NULL;
!
!     /*
!      * Adjust the target size of the T1 cache depending on if this is a B1
!      * or B2 hit.
!      */
!     switch (cdb->list)
!     {
!         case STRAT_LIST_B1:
!
!             /*
!              * B1 hit means that the T1 cache is probably too small.
!              * Adjust the T1 target size and continue below.
!              */
!             T1_TARGET = Min(T1_TARGET + Max(B2_LENGTH / B1_LENGTH, 1),
!                             NBuffers);
!             break;
!
!         case STRAT_LIST_B2:
!
!             /*
!              * B2 hit means that the T2 cache is probably too small.
!              * Adjust the T1 target size and continue below.
!              */
!             T1_TARGET = Max(T1_TARGET - Max(B1_LENGTH / B2_LENGTH, 1), 0);
!             break;
!
!         default:
!             elog(ERROR, "buffer hash table corrupted: CDB->list = %d",
!                  cdb->list);
!     }
!
!     /*
!      * Even though we had seen the block in the past, its data is not
!      * currently in memory ... cache miss to the bufmgr.
!      */
!     return NULL;
! }


  /*
--- 15,50 ----
   */
  #include "postgres.h"

  #include "storage/buf_internals.h"
  #include "storage/bufmgr.h"


  /*
!  * The shared freelist control information.
   */
  typedef struct
  {
!     /* Clock sweep hand: index of next buffer to consider grabbing */
!     int            nextVictimBuffer;

!     int            firstFreeBuffer;    /* Head of list of unused buffers */
!     int            lastFreeBuffer;        /* Tail of list of unused buffers */

!     /*
!      * NOTE: lastFreeBuffer is undefined when firstFreeBuffer is -1
!      * (that is, when the list is empty)
!      */
  } BufferStrategyControl;

  /* GUC variable: time in seconds between statistics reports */
+ /* XXX not used at present */
  int            DebugSharedBuffers = 0;

  /* Pointers to shared state */
  static BufferStrategyControl *StrategyControl = NULL;

  /* Backend-local state about whether currently vacuuming */
! bool        strategy_hint_vacuum = false;


  /*
***************
*** 404,774 ****
   *
   *    Called by the bufmgr to get the next candidate buffer to use in
   *    BufferAlloc(). The only hard requirement BufferAlloc() has is that
!  *    this buffer must not currently be pinned.
   *
!  *    *cdb_replace_index is set to the index of the candidate CDB, or -1 if
!  *    none (meaning we are using a previously free buffer).  This is not
!  *    intended to be used by the caller, except to pass to
!  *    StrategyReplaceBuffer().
   */
  BufferDesc *
! StrategyGetBuffer(int *cdb_replace_index)
  {
-     int            cdb_id;
      BufferDesc *buf;

!     if (StrategyControl->listFreeBuffers < 0)
      {
          /*
!          * We don't have a free buffer, must take one from T1 or T2.
!          * Choose based on trying to converge T1len to T1target.
           */
!         if (T1_LENGTH >= Max(1, T1_TARGET))
!         {
!             /*
!              * We should take the first unpinned buffer from T1.
!              */
!             cdb_id = StrategyControl->listHead[STRAT_LIST_T1];
!             while (cdb_id >= 0)
!             {
!                 buf = &BufferDescriptors[StrategyCDB[cdb_id].buf_id];
!                 if (buf->refcount == 0)
!                 {
!                     *cdb_replace_index = cdb_id;
!                     Assert(StrategyCDB[cdb_id].list == STRAT_LIST_T1);
!                     return buf;
!                 }
!                 cdb_id = StrategyCDB[cdb_id].next;
!             }
!
!             /*
!              * No unpinned T1 buffer found - try T2 cache.
!              */
!             cdb_id = StrategyControl->listHead[STRAT_LIST_T2];
!             while (cdb_id >= 0)
!             {
!                 buf = &BufferDescriptors[StrategyCDB[cdb_id].buf_id];
!                 if (buf->refcount == 0)
!                 {
!                     *cdb_replace_index = cdb_id;
!                     Assert(StrategyCDB[cdb_id].list == STRAT_LIST_T2);
!                     return buf;
!                 }
!                 cdb_id = StrategyCDB[cdb_id].next;
!             }
!
!             /*
!              * No unpinned buffers at all!!!
!              */
!             elog(ERROR, "no unpinned buffers available");
!         }
!         else
!         {
!             /*
!              * We should take the first unpinned buffer from T2.
!              */
!             cdb_id = StrategyControl->listHead[STRAT_LIST_T2];
!             while (cdb_id >= 0)
!             {
!                 buf = &BufferDescriptors[StrategyCDB[cdb_id].buf_id];
!                 if (buf->refcount == 0)
!                 {
!                     *cdb_replace_index = cdb_id;
!                     Assert(StrategyCDB[cdb_id].list == STRAT_LIST_T2);
!                     return buf;
!                 }
!                 cdb_id = StrategyCDB[cdb_id].next;
!             }
!
!             /*
!              * No unpinned T2 buffer found - try T1 cache.
!              */
!             cdb_id = StrategyControl->listHead[STRAT_LIST_T1];
!             while (cdb_id >= 0)
!             {
!                 buf = &BufferDescriptors[StrategyCDB[cdb_id].buf_id];
!                 if (buf->refcount == 0)
!                 {
!                     *cdb_replace_index = cdb_id;
!                     Assert(StrategyCDB[cdb_id].list == STRAT_LIST_T1);
!                     return buf;
!                 }
!                 cdb_id = StrategyCDB[cdb_id].next;
!             }
!
!             /*
!              * No unpinned buffers at all!!!
!              */
!             elog(ERROR, "no unpinned buffers available");
!         }
      }
!     else
      {
!         /* There is a completely free buffer available - take it */

          /*
!          * Note: This code uses the side effect that a free buffer can
!          * never be pinned or dirty and therefore the call to
!          * StrategyReplaceBuffer() will happen without the bufmgr
!          * releasing the bufmgr-lock in the meantime. That means, that
!          * there will never be any reason to recheck. Otherwise we would
!          * leak shared buffers here!
           */
!         *cdb_replace_index = -1;
!         buf = &BufferDescriptors[StrategyControl->listFreeBuffers];
!
!         StrategyControl->listFreeBuffers = buf->bufNext;
!         buf->bufNext = -1;
!
!         /* Buffer in freelist cannot be pinned */
!         Assert(buf->refcount == 0);
!         Assert(!(buf->flags & BM_DIRTY));
!
!         return buf;
      }

      /* not reached */
      return NULL;
  }

-
  /*
!  * StrategyReplaceBuffer
   *
!  *    Called by the buffer manager to inform us that he flushed a buffer
!  *    and is now about to replace the content. Prior to this call,
!  *    the cache algorithm still reports the buffer as in the cache. After
!  *    this call we report the new block, even if IO might still need to
!  *    be done to bring in the new content.
!  *
!  *    cdb_found_index and cdb_replace_index must be the auxiliary values
!  *    returned by previous calls to StrategyBufferLookup and StrategyGetBuffer.
   */
  void
! StrategyReplaceBuffer(BufferDesc *buf, BufferTag *newTag,
!                       int cdb_found_index, int cdb_replace_index)
  {
!     BufferStrategyCDB *cdb_found;
!     BufferStrategyCDB *cdb_replace;
!
!     if (cdb_found_index >= 0)
!     {
!         /* This must have been a ghost buffer cache hit (B1 or B2) */
!         cdb_found = &StrategyCDB[cdb_found_index];

!         /* Assert that the buffer remembered in cdb_found is the one */
!         /* the buffer manager is currently faulting in */
!         Assert(BUFFERTAGS_EQUAL(cdb_found->buf_tag, *newTag));
!
!         if (cdb_replace_index >= 0)
!         {
!             /* We are satisfying it with an evicted T buffer */
!             cdb_replace = &StrategyCDB[cdb_replace_index];
!
!             /* Assert that the buffer remembered in cdb_replace is */
!             /* the one the buffer manager has just evicted */
!             Assert(cdb_replace->list == STRAT_LIST_T1 ||
!                    cdb_replace->list == STRAT_LIST_T2);
!             Assert(cdb_replace->buf_id == buf->buf_id);
!             Assert(BUFFERTAGS_EQUAL(cdb_replace->buf_tag, buf->tag));
!
!             /*
!              * Under normal circumstances we move the evicted T list entry
!              * to the corresponding B list.  However, T1 entries that
!              * exist only because of VACUUM are just thrown into the
!              * unused list instead. We don't expect them to be touched
!              * again by the VACUUM, and if we put them into B1 then VACUUM
!              * would skew T1_target adjusting.
!              */
!             if (cdb_replace->t1_vacuum)
!             {
!                 BufTableDelete(&(cdb_replace->buf_tag));
!                 STRAT_LIST_REMOVE(cdb_replace);
!                 cdb_replace->next = StrategyControl->listUnusedCDB;
!                 StrategyControl->listUnusedCDB = cdb_replace_index;
!             }
!             else
!             {
!                 if (cdb_replace->list == STRAT_LIST_T1)
!                 {
!                     STRAT_LIST_REMOVE(cdb_replace);
!                     STRAT_MRU_INSERT(cdb_replace, STRAT_LIST_B1);
!                 }
!                 else
!                 {
!                     STRAT_LIST_REMOVE(cdb_replace);
!                     STRAT_MRU_INSERT(cdb_replace, STRAT_LIST_B2);
!                 }
!             }
!             /* And clear its block reference */
!             cdb_replace->buf_id = -1;
!         }
!         else
!         {
!             /* We are satisfying it with an unused buffer */
!         }
!
!         /* Now the found B CDB gets the buffer and is moved to T2 */
!         cdb_found->buf_id = buf->buf_id;
!         STRAT_LIST_REMOVE(cdb_found);
!         STRAT_MRU_INSERT(cdb_found, STRAT_LIST_T2);
!     }
!     else
      {
!         /*
!          * This was a complete cache miss, so we need to create a new CDB.
!          * The goal is to keep T1len+B1len <= c.
!          */
!         if (B1_LENGTH > 0 && (T1_LENGTH + B1_LENGTH) >= NBuffers)
          {
!             /* So if B1 isn't empty and T1len+B1len >= c we take B1-LRU */
!             cdb_found = &StrategyCDB[StrategyControl->listHead[STRAT_LIST_B1]];
!
!             BufTableDelete(&(cdb_found->buf_tag));
!             STRAT_LIST_REMOVE(cdb_found);
          }
          else
          {
!             /* Otherwise, we try to use a free one */
!             if (StrategyControl->listUnusedCDB >= 0)
!             {
!                 cdb_found = &StrategyCDB[StrategyControl->listUnusedCDB];
!                 StrategyControl->listUnusedCDB = cdb_found->next;
!             }
              else
!             {
!                 /* If there isn't, we take B2-LRU ... except if */
!                 /* T1len+B1len+T2len = c ... oh my */
!                 if (B2_LENGTH > 0)
!                     cdb_found = &StrategyCDB[StrategyControl->listHead[STRAT_LIST_B2]];
!                 else
!                     cdb_found = &StrategyCDB[StrategyControl->listHead[STRAT_LIST_B1]];
!
!                 BufTableDelete(&(cdb_found->buf_tag));
!                 STRAT_LIST_REMOVE(cdb_found);
!             }
          }
-
-         /* Set the CDB's buf_tag and insert it into the hash table */
-         cdb_found->buf_tag = *newTag;
-         BufTableInsert(&(cdb_found->buf_tag), (cdb_found - StrategyCDB));
-
-         if (cdb_replace_index >= 0)
-         {
-             /*
-              * The buffer was formerly in a T list, move its CDB to the
-              * corresponding B list
-              */
-             cdb_replace = &StrategyCDB[cdb_replace_index];
-
-             Assert(cdb_replace->list == STRAT_LIST_T1 ||
-                    cdb_replace->list == STRAT_LIST_T2);
-             Assert(cdb_replace->buf_id == buf->buf_id);
-             Assert(BUFFERTAGS_EQUAL(cdb_replace->buf_tag, buf->tag));
-
-             if (cdb_replace->list == STRAT_LIST_T1)
-             {
-                 STRAT_LIST_REMOVE(cdb_replace);
-                 STRAT_MRU_INSERT(cdb_replace, STRAT_LIST_B1);
-             }
-             else
-             {
-                 STRAT_LIST_REMOVE(cdb_replace);
-                 STRAT_MRU_INSERT(cdb_replace, STRAT_LIST_B2);
-             }
-             /* And clear its block reference */
-             cdb_replace->buf_id = -1;
-         }
-         else
-         {
-             /* We are satisfying it with an unused buffer */
-         }
-
-         /* Assign the buffer id to the new CDB */
-         cdb_found->buf_id = buf->buf_id;
-
-         /*
-          * Specialized VACUUM optimization. If this complete cache miss
-          * happened because vacuum needed the page, we place it at the LRU
-          * position of T1; normally it goes at the MRU position.
-          */
-         if (strategy_hint_vacuum)
-         {
-             if (TransactionIdEquals(strategy_vacuum_xid,
-                                     GetTopTransactionId()))
-                 STRAT_LRU_INSERT(cdb_found, STRAT_LIST_T1);
-             else
-             {
-                 /* VACUUM must have been aborted by error, reset flag */
-                 strategy_hint_vacuum = false;
-                 STRAT_MRU_INSERT(cdb_found, STRAT_LIST_T1);
-             }
-         }
-         else
-             STRAT_MRU_INSERT(cdb_found, STRAT_LIST_T1);
-
-         /*
-          * Remember the Xid when this buffer went onto T1 to avoid a
-          * single UPDATE promoting a newcomer straight into T2. Also
-          * remember if it was loaded for VACUUM.
-          */
-         cdb_found->t1_xid = GetTopTransactionId();
-         cdb_found->t1_vacuum = strategy_hint_vacuum;
      }
- }


  /*
!  * StrategyInvalidateBuffer
   *
!  *    Called by the buffer manager to inform us that a buffer content
!  *    is no longer valid. We simply throw away any eventual existing
!  *    buffer hash entry and move the CDB and buffer to the free lists.
   */
! void
! StrategyInvalidateBuffer(BufferDesc *buf)
  {
!     int            cdb_id;
!     BufferStrategyCDB *cdb;
!
!     /* The buffer cannot be dirty or pinned */
!     Assert(!(buf->flags & BM_DIRTY) || !(buf->flags & BM_VALID));
!     Assert(buf->refcount == 0);

      /*
!      * Lookup the cache directory block for this buffer
       */
!     cdb_id = BufTableLookup(&(buf->tag));
!     if (cdb_id < 0)
!         elog(ERROR, "buffer %d not in buffer hash table", buf->buf_id);
!     cdb = &StrategyCDB[cdb_id];
!
!     /*
!      * Remove the CDB from the hashtable and the ARC queue it is currently
!      * on.
!      */
!     BufTableDelete(&(cdb->buf_tag));
!     STRAT_LIST_REMOVE(cdb);
!
!     /*
!      * Clear out the CDB's buffer tag and association with the buffer and
!      * add it to the list of unused CDB's
!      */
!     CLEAR_BUFFERTAG(cdb->buf_tag);
!     cdb->buf_id = -1;
!     cdb->next = StrategyControl->listUnusedCDB;
!     StrategyControl->listUnusedCDB = cdb_id;
!
!     /*
!      * Clear out the buffer's tag and add it to the list of currently
!      * unused buffers.    We must do this to ensure that linear scans of the
!      * buffer array don't think the buffer is valid.
!      */
!     CLEAR_BUFFERTAG(buf->tag);
!     buf->flags &= ~(BM_VALID | BM_DIRTY);
!     buf->cntxDirty = false;
!     buf->bufNext = StrategyControl->listFreeBuffers;
!     StrategyControl->listFreeBuffers = buf->buf_id;
  }

  /*
--- 52,178 ----
   *
   *    Called by the bufmgr to get the next candidate buffer to use in
   *    BufferAlloc(). The only hard requirement BufferAlloc() has is that
!  *    the selected buffer must not currently be pinned by anyone.
   *
!  *    To ensure that no one else can pin the buffer before we do, we must
!  *    return the buffer with the buffer header spinlock still held.  That
!  *    means that we return with the BufFreelistLock still held, as well;
!  *    the caller must release that lock once the spinlock is dropped.
   */
  BufferDesc *
! StrategyGetBuffer(void)
  {
      BufferDesc *buf;
+     int            maxtocheck;
+
+     LWLockAcquire(BufFreelistLock, LW_EXCLUSIVE);

!     /* Try to get a buffer from the freelist */
!     while (StrategyControl->firstFreeBuffer >= 0)
      {
+         buf = &BufferDescriptors[StrategyControl->firstFreeBuffer];
+         Assert(buf->freeNext != FREENEXT_NOT_IN_LIST);
+
+         /* Unconditionally remove buffer from freelist */
+         StrategyControl->firstFreeBuffer = buf->freeNext;
+         buf->freeNext = FREENEXT_NOT_IN_LIST;
+
          /*
!          * If the buffer is pinned or has its "recently used" bit set,
!          * we cannot use it; discard it and retry.  (This can only happen
!          * if VACUUM put a valid buffer in the freelist and then someone
!          * else used it before we got to it.)
           */
!         LockBufHdr(buf);
!         if (buf->refcount == 0 && !(buf->flags & BM_RECENTLY_USED))
!             return buf;
!         UnlockBufHdr(buf);
      }
!
!     /* Nothing on the freelist, so run the "clock sweep" algorithm */
!     for (maxtocheck = NBuffers * 2; maxtocheck > 0; maxtocheck--)
      {
!         buf = &BufferDescriptors[StrategyControl->nextVictimBuffer];
!
!         if (++StrategyControl->nextVictimBuffer >= NBuffers)
!             StrategyControl->nextVictimBuffer = 0;

          /*
!          * If the buffer is pinned or has its "recently used" bit set,
!          * we cannot use it; clear its "recently used" bit and retry.
           */
!         LockBufHdr(buf);
!         if (buf->refcount == 0 && !(buf->flags & BM_RECENTLY_USED))
!             return buf;
!         buf->flags &= ~BM_RECENTLY_USED;
!         UnlockBufHdr(buf);
      }

+     /* If we went twice around without result, all the buffers are pinned */
+     elog(ERROR, "no unpinned buffers available");
+
      /* not reached */
      return NULL;
  }

  /*
!  * StrategyFreeBuffer: put a buffer on the freelist
   *
!  * The buffer is added either at the head or the tail, according to the
!  * at_head parameter.  This allows a small amount of control over how
!  * quickly the buffer is reused.
   */
  void
! StrategyFreeBuffer(BufferDesc *buf, bool at_head)
  {
!     LWLockAcquire(BufFreelistLock, LW_EXCLUSIVE);

!     /*
!      * It is possible that we are told to put something in the freelist
!      * that is already in it; don't screw up the list if so.
!      */
!     if (buf->freeNext == FREENEXT_NOT_IN_LIST)
      {
!         if (at_head)
          {
!             buf->freeNext = StrategyControl->firstFreeBuffer;
!             if (buf->freeNext < 0)
!                 StrategyControl->lastFreeBuffer = buf->buf_id;
!             StrategyControl->firstFreeBuffer = buf->buf_id;
          }
          else
          {
!             buf->freeNext = FREENEXT_END_OF_LIST;
!             if (StrategyControl->firstFreeBuffer < 0)
!                 StrategyControl->firstFreeBuffer = buf->buf_id;
              else
!                 BufferDescriptors[StrategyControl->lastFreeBuffer].freeNext = buf->buf_id;
!             StrategyControl->lastFreeBuffer = buf->buf_id;
          }
      }

+     LWLockRelease(BufFreelistLock);
+ }

  /*
!  * StrategySyncStart -- tell BufferSync where to start syncing
   *
!  * The result is the buffer index of the best buffer to sync first.
!  * BufferSync() will proceed circularly around the buffer array from there.
   */
! int
! StrategySyncStart(void)
  {
!     int            result;

      /*
!      * We could probably dispense with the locking here, but just to be
!      * safe ...
       */
!     LWLockAcquire(BufFreelistLock, LW_EXCLUSIVE);
!     result = StrategyControl->nextVictimBuffer;
!     LWLockRelease(BufFreelistLock);
!     return result;
  }

  /*
***************
*** 778,864 ****
  StrategyHintVacuum(bool vacuum_active)
  {
      strategy_hint_vacuum = vacuum_active;
-     strategy_vacuum_xid = GetTopTransactionId();
- }
-
- /*
-  * StrategyDirtyBufferList
-  *
-  * Returns a list of dirty buffers, in priority order for writing.
-  * Note that the caller may choose not to write them all.
-  *
-  * The caller must beware of the possibility that a buffer is no longer dirty,
-  * or even contains a different page, by the time he reaches it.  If it no
-  * longer contains the same page it need not be written, even if it is (again)
-  * dirty.
-  *
-  * Buffer pointers are stored into buffers[], and corresponding tags into
-  * buftags[], both of size max_buffers.  The function returns the number of
-  * buffer IDs stored.
-  */
- int
- StrategyDirtyBufferList(BufferDesc **buffers, BufferTag *buftags,
-                         int max_buffers)
- {
-     int            num_buffer_dirty = 0;
-     int            cdb_id_t1;
-     int            cdb_id_t2;
-     int            buf_id;
-     BufferDesc *buf;
-
-     /*
-      * Traverse the T1 and T2 list LRU to MRU in "parallel" and add all
-      * dirty buffers found in that order to the list. The ARC strategy
-      * keeps all used buffers including pinned ones in the T1 or T2 list.
-      * So we cannot miss any dirty buffers.
-      */
-     cdb_id_t1 = StrategyControl->listHead[STRAT_LIST_T1];
-     cdb_id_t2 = StrategyControl->listHead[STRAT_LIST_T2];
-
-     while (cdb_id_t1 >= 0 || cdb_id_t2 >= 0)
-     {
-         if (cdb_id_t1 >= 0)
-         {
-             buf_id = StrategyCDB[cdb_id_t1].buf_id;
-             buf = &BufferDescriptors[buf_id];
-
-             if (buf->flags & BM_VALID)
-             {
-                 if ((buf->flags & BM_DIRTY) || (buf->cntxDirty))
-                 {
-                     buffers[num_buffer_dirty] = buf;
-                     buftags[num_buffer_dirty] = buf->tag;
-                     num_buffer_dirty++;
-                     if (num_buffer_dirty >= max_buffers)
-                         break;
-                 }
-             }
-
-             cdb_id_t1 = StrategyCDB[cdb_id_t1].next;
-         }
-
-         if (cdb_id_t2 >= 0)
-         {
-             buf_id = StrategyCDB[cdb_id_t2].buf_id;
-             buf = &BufferDescriptors[buf_id];
-
-             if (buf->flags & BM_VALID)
-             {
-                 if ((buf->flags & BM_DIRTY) || (buf->cntxDirty))
-                 {
-                     buffers[num_buffer_dirty] = buf;
-                     buftags[num_buffer_dirty] = buf->tag;
-                     num_buffer_dirty++;
-                     if (num_buffer_dirty >= max_buffers)
-                         break;
-                 }
-             }
-
-             cdb_id_t2 = StrategyCDB[cdb_id_t2].next;
-         }
-     }
-
-     return num_buffer_dirty;
  }


--- 182,187 ----
***************
*** 866,886 ****
   * StrategyShmemSize
   *
   * estimate the size of shared memory used by the freelist-related structures.
   */
  int
  StrategyShmemSize(void)
  {
      int            size = 0;

!     /* size of CDB lookup hash table */
!     size += BufTableShmemSize(NBuffers * 2);

      /* size of the shared replacement strategy control block */
      size += MAXALIGN(sizeof(BufferStrategyControl));

-     /* size of the ARC directory blocks */
-     size += MAXALIGN(NBuffers * 2 * sizeof(BufferStrategyCDB));
-
      return size;
  }

--- 189,209 ----
   * StrategyShmemSize
   *
   * estimate the size of shared memory used by the freelist-related structures.
+  *
+  * Note: for somewhat historical reasons, the buffer lookup hashtable size
+  * is also determined here.
   */
  int
  StrategyShmemSize(void)
  {
      int            size = 0;

!     /* size of lookup hash table --- see note in StrategyInitialize */
!     size += BufTableShmemSize(NBuffers + MaxBackends);

      /* size of the shared replacement strategy control block */
      size += MAXALIGN(sizeof(BufferStrategyControl));

      return size;
  }

***************
*** 888,916 ****
   * StrategyInitialize -- initialize the buffer cache replacement
   *        strategy.
   *
!  * Assume: All of the buffers are already building a linked list.
   *        Only called by postmaster and only during initialization.
   */
  void
  StrategyInitialize(bool init)
  {
      bool        found;
-     int            i;

      /*
!      * Initialize the shared CDB lookup hashtable
       */
!     InitBufTable(NBuffers * 2);

      /*
!      * Get or create the shared strategy control block and the CDB's
       */
      StrategyControl = (BufferStrategyControl *)
          ShmemInitStruct("Buffer Strategy Status",
!                         sizeof(BufferStrategyControl) +
!                         sizeof(BufferStrategyCDB) * (NBuffers * 2 - 1),
                          &found);
-     StrategyCDB = &(StrategyControl->cdb[0]);

      if (!found)
      {
--- 211,239 ----
   * StrategyInitialize -- initialize the buffer cache replacement
   *        strategy.
   *
!  * Assumes: All of the buffers are already built into a linked list.
   *        Only called by postmaster and only during initialization.
   */
  void
  StrategyInitialize(bool init)
  {
      bool        found;

      /*
!      * Initialize the shared buffer lookup hashtable.  The normal steady-
!      * state load is NBuffers entries, but since BufferAlloc() inserts a
!      * new entry before deleting the old one in a replacement operation,
!      * there could be up to MaxBackends extra transient entries.
       */
!     InitBufTable(NBuffers + MaxBackends);

      /*
!      * Get or create the shared strategy control block
       */
      StrategyControl = (BufferStrategyControl *)
          ShmemInitStruct("Buffer Strategy Status",
!                         sizeof(BufferStrategyControl),
                          &found);

      if (!found)
      {
***************
*** 923,961 ****
           * Grab the whole linked list of free buffers for our strategy. We
           * assume it was previously set up by InitBufferPool().
           */
!         StrategyControl->listFreeBuffers = 0;

!         /*
!          * We start off with a target T1 list size of half the available
!          * cache blocks.
!          */
!         StrategyControl->target_T1_size = NBuffers / 2;
!
!         /*
!          * Initialize B1, T1, T2 and B2 lists to be empty
!          */
!         for (i = 0; i < STRAT_NUM_LISTS; i++)
!         {
!             StrategyControl->listHead[i] = -1;
!             StrategyControl->listTail[i] = -1;
!             StrategyControl->listSize[i] = 0;
!             StrategyControl->num_hit[i] = 0;
!         }
!         StrategyControl->num_lookup = 0;
!         StrategyControl->stat_report = 0;
!
!         /*
!          * All CDB's are linked as the listUnusedCDB
!          */
!         for (i = 0; i < NBuffers * 2; i++)
!         {
!             StrategyCDB[i].next = i + 1;
!             StrategyCDB[i].list = STRAT_LIST_UNUSED;
!             CLEAR_BUFFERTAG(StrategyCDB[i].buf_tag);
!             StrategyCDB[i].buf_id = -1;
!         }
!         StrategyCDB[NBuffers * 2 - 1].next = -1;
!         StrategyControl->listUnusedCDB = 0;
      }
      else
          Assert(!init);
--- 246,256 ----
           * Grab the whole linked list of free buffers for our strategy. We
           * assume it was previously set up by InitBufferPool().
           */
!         StrategyControl->firstFreeBuffer = 0;
!         StrategyControl->lastFreeBuffer = NBuffers - 1;

!         /* Initialize the clock sweep pointer */
!         StrategyControl->nextVictimBuffer = 0;
      }
      else
          Assert(!init);
*** src/backend/storage/buffer/localbuf.c.orig    Mon Jan 10 15:02:21 2005
--- src/backend/storage/buffer/localbuf.c    Tue Feb 15 13:27:53 2005
***************
*** 24,29 ****
--- 24,33 ----

  /*#define LBDEBUG*/

+ /* Note: this macro only works on local buffers, not shared ones! */
+ #define LocalBufHdrGetBlock(bufHdr)    \
+     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
+
  /* should be a GUC parameter some day */
  int            NLocBuffer = 64;

***************
*** 39,45 ****
   *      allocate a local buffer. We do round robin allocation for now.
   *
   * API is similar to bufmgr.c's BufferAlloc, except that we do not need
!  * to have the BufMgrLock since this is all local.    Also, IO_IN_PROGRESS
   * does not get set.
   */
  BufferDesc *
--- 43,49 ----
   *      allocate a local buffer. We do round robin allocation for now.
   *
   * API is similar to bufmgr.c's BufferAlloc, except that we do not need
!  * to do any locking since this is all local.    Also, IO_IN_PROGRESS
   * does not get set.
   */
  BufferDesc *
***************
*** 51,57 ****

      INIT_BUFFERTAG(newTag, reln, blockNum);

!     /* a low tech search for now -- not optimized for scans */
      for (i = 0; i < NLocBuffer; i++)
      {
          bufHdr = &LocalBufferDescriptors[i];
--- 55,61 ----

      INIT_BUFFERTAG(newTag, reln, blockNum);

!     /* a low tech search for now -- should use a hashtable */
      for (i = 0; i < NLocBuffer; i++)
      {
          bufHdr = &LocalBufferDescriptors[i];
***************
*** 81,101 ****
              RelationGetRelid(reln), blockNum, -nextFreeLocalBuf - 1);
  #endif

!     /* need to get a new buffer (round robin for now) */
      bufHdr = NULL;
!     for (i = 0; i < NLocBuffer; i++)
      {
!         int            b = (nextFreeLocalBuf + i) % NLocBuffer;

!         if (LocalRefCount[b] == 0)
          {
-             bufHdr = &LocalBufferDescriptors[b];
              LocalRefCount[b]++;
              ResourceOwnerRememberBuffer(CurrentResourceOwner,
                                        BufferDescriptorGetBuffer(bufHdr));
-             nextFreeLocalBuf = (b + 1) % NLocBuffer;
              break;
          }
      }
      if (bufHdr == NULL)
          ereport(ERROR,
--- 85,112 ----
              RelationGetRelid(reln), blockNum, -nextFreeLocalBuf - 1);
  #endif

!     /*
!      * Need to get a new buffer.  We use a clock sweep algorithm
!      * (essentially the same as what freelist.c does now...)
!      */
      bufHdr = NULL;
!     for (i = NLocBuffer * 2; i > 0; i--)
      {
!         int            b = nextFreeLocalBuf;
!         BufferDesc *bHdr = &LocalBufferDescriptors[b];
!
!         if (++nextFreeLocalBuf >= NLocBuffer)
!             nextFreeLocalBuf = 0;

!         if (LocalRefCount[b] == 0 && !(bHdr->flags & BM_RECENTLY_USED))
          {
              LocalRefCount[b]++;
+             bufHdr = bHdr;
              ResourceOwnerRememberBuffer(CurrentResourceOwner,
                                        BufferDescriptorGetBuffer(bufHdr));
              break;
          }
+         bHdr->flags &= ~BM_RECENTLY_USED;
      }
      if (bufHdr == NULL)
          ereport(ERROR,
***************
*** 106,112 ****
       * this buffer is not referenced but it might still be dirty. if
       * that's the case, write it out before reusing it!
       */
!     if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
      {
          SMgrRelation oreln;

--- 117,123 ----
       * this buffer is not referenced but it might still be dirty. if
       * that's the case, write it out before reusing it!
       */
!     if (bufHdr->flags & BM_DIRTY)
      {
          SMgrRelation oreln;

***************
*** 116,122 ****
          /* And write... */
          smgrwrite(oreln,
                    bufHdr->tag.blockNum,
!                   (char *) MAKE_PTR(bufHdr->data),
                    true);

          LocalBufferFlushCount++;
--- 127,133 ----
          /* And write... */
          smgrwrite(oreln,
                    bufHdr->tag.blockNum,
!                   (char *) LocalBufHdrGetBlock(bufHdr),
                    true);

          LocalBufferFlushCount++;
***************
*** 129,135 ****
       * use, so it's okay to do it (and possibly error out) before marking
       * the buffer as not dirty.
       */
!     if (bufHdr->data == (SHMEM_OFFSET) 0)
      {
          char       *data = (char *) malloc(BLCKSZ);

--- 140,146 ----
       * use, so it's okay to do it (and possibly error out) before marking
       * the buffer as not dirty.
       */
!     if (LocalBufHdrGetBlock(bufHdr) == NULL)
      {
          char       *data = (char *) malloc(BLCKSZ);

***************
*** 139,162 ****
                       errmsg("out of memory")));

          /*
-          * This is a bit of a hack: bufHdr->data needs to be a shmem
-          * offset for consistency with the shared-buffer case, so make it
-          * one even though it's not really a valid shmem offset.
-          */
-         bufHdr->data = MAKE_OFFSET(data);
-
-         /*
           * Set pointer for use by BufferGetBlock() macro.
           */
!         LocalBufferBlockPointers[-(bufHdr->buf_id + 2)] = (Block) data;
      }

      /*
       * it's all ours now.
       */
      bufHdr->tag = newTag;
!     bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
!     bufHdr->cntxDirty = false;

      *foundPtr = FALSE;
      return bufHdr;
--- 150,167 ----
                       errmsg("out of memory")));

          /*
           * Set pointer for use by BufferGetBlock() macro.
           */
!         LocalBufHdrGetBlock(bufHdr) = (Block) data;
      }

      /*
       * it's all ours now.
       */
      bufHdr->tag = newTag;
!     bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
!                        BM_IO_ERROR | BM_RECENTLY_USED);
!     bufHdr->flags |= BM_TAG_VALID;

      *foundPtr = FALSE;
      return bufHdr;
***************
*** 170,175 ****
--- 175,181 ----
  WriteLocalBuffer(Buffer buffer, bool release)
  {
      int            bufid;
+     BufferDesc *bufHdr;

      Assert(BufferIsLocal(buffer));

***************
*** 178,189 ****
  #endif

      bufid = -(buffer + 1);
!     LocalBufferDescriptors[bufid].flags |= BM_DIRTY;

      if (release)
      {
-         Assert(LocalRefCount[bufid] > 0);
          LocalRefCount[bufid]--;
          ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
      }
  }
--- 184,199 ----
  #endif

      bufid = -(buffer + 1);
!
!     Assert(LocalRefCount[bufid] > 0);
!
!     bufHdr = &LocalBufferDescriptors[bufid];
!     bufHdr->flags |= BM_DIRTY;

      if (release)
      {
          LocalRefCount[bufid]--;
+         bufHdr->flags |= BM_RECENTLY_USED;
          ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
      }
  }
*** src/backend/utils/resowner/resowner.c.orig    Fri Dec 31 17:46:24 2004
--- src/backend/utils/resowner/resowner.c    Sun Feb 13 18:39:59 2005
***************
*** 200,211 ****
           * that would indicate failure to clean up the executor correctly ---
           * so issue warnings.  In the abort case, just clean up quietly.
           *
!          * XXX this is fairly inefficient due to multiple BufMgrLock
!          * grabs if there are lots of buffers to be released, but we
!          * don't expect many (indeed none in the success case) so it's
!          * probably not worth optimizing.
!          *
!          * We are however careful to release back-to-front, so as to
           * avoid O(N^2) behavior in ResourceOwnerForgetBuffer().
           */
          while (owner->nbuffers > 0)
--- 200,206 ----
           * that would indicate failure to clean up the executor correctly ---
           * so issue warnings.  In the abort case, just clean up quietly.
           *
!          * We are careful to do the releasing back-to-front, so as to
           * avoid O(N^2) behavior in ResourceOwnerForgetBuffer().
           */
          while (owner->nbuffers > 0)
*** src/include/postmaster/bgwriter.h.orig    Fri Dec 31 17:47:01 2004
--- src/include/postmaster/bgwriter.h    Tue Feb 15 15:48:40 2005
***************
*** 18,25 ****

  /* GUC options */
  extern int    BgWriterDelay;
- extern int    BgWriterPercent;
- extern int    BgWriterMaxPages;
  extern int    CheckPointTimeout;
  extern int    CheckPointWarning;

--- 18,23 ----
*** src/include/storage/buf_internals.h.orig    Thu Feb  3 18:29:19 2005
--- src/include/storage/buf_internals.h    Tue Feb 15 12:51:56 2005
***************
*** 19,39 ****
  #include "storage/buf.h"
  #include "storage/lwlock.h"
  #include "storage/shmem.h"
  #include "utils/rel.h"


  /*
   * Flags for buffer descriptors
   */
  #define BM_DIRTY                (1 << 0)        /* data needs writing */
  #define BM_VALID                (1 << 1)        /* data is valid */
! #define BM_IO_IN_PROGRESS        (1 << 2)        /* read or write in
                                                   * progress */
! #define BM_IO_ERROR                (1 << 3)        /* previous I/O failed */
! #define BM_JUST_DIRTIED            (1 << 4)        /* dirtied since write
                                                   * started */
! #define BM_PIN_COUNT_WAITER        (1 << 5)        /* have waiter for sole
                                                   * pin */

  typedef bits16 BufFlags;

--- 19,45 ----
  #include "storage/buf.h"
  #include "storage/lwlock.h"
  #include "storage/shmem.h"
+ #include "storage/spin.h"
  #include "utils/rel.h"


  /*
   * Flags for buffer descriptors
+  *
+  * Note: TAG_VALID essentially means that there is a buffer hashtable
+  * entry associated with the buffer's tag.
   */
  #define BM_DIRTY                (1 << 0)        /* data needs writing */
  #define BM_VALID                (1 << 1)        /* data is valid */
! #define BM_TAG_VALID            (1 << 2)        /* tag is assigned */
! #define BM_IO_IN_PROGRESS        (1 << 3)        /* read or write in
                                                   * progress */
! #define BM_IO_ERROR                (1 << 4)        /* previous I/O failed */
! #define BM_JUST_DIRTIED            (1 << 5)        /* dirtied since write
                                                   * started */
! #define BM_PIN_COUNT_WAITER        (1 << 6)        /* have waiter for sole
                                                   * pin */
+ #define BM_RECENTLY_USED        (1 << 7)        /* buffer is recently used */

  typedef bits16 BufFlags;

***************
*** 77,121 ****

  /*
   *    BufferDesc -- shared descriptor/state data for a single shared buffer.
   */
  typedef struct sbufdesc
  {
!     Buffer        bufNext;        /* link in freelist chain */
!     SHMEM_OFFSET data;            /* pointer to data in buf pool */
!
!     /* tag and id must be together for table lookup (still true?) */
!     BufferTag    tag;            /* file/block identifier */
!     int            buf_id;            /* buffer's index number (from 0) */
!
      BufFlags    flags;            /* see bit definitions above */
      unsigned    refcount;        /* # of backends holding pins on buffer */

!     LWLockId    io_in_progress_lock;    /* to wait for I/O to complete */
!     LWLockId    cntx_lock;        /* to lock access to page context */

!     bool        cntxDirty;        /* new way to mark block as dirty */

!     /*
!      * We can't physically remove items from a disk page if another
!      * backend has the buffer pinned.  Hence, a backend may need to wait
!      * for all other pins to go away.  This is signaled by storing its own
!      * backend ID into wait_backend_id and setting flag bit
!      * BM_PIN_COUNT_WAITER. At present, there can be only one such waiter
!      * per buffer.
!      */
!     BackendId    wait_backend_id;    /* backend ID of pin-count waiter */
  } BufferDesc;

  #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)


! /* in bufmgr.c */
  extern BufferDesc *BufferDescriptors;

  /* in localbuf.c */
  extern BufferDesc *LocalBufferDescriptors;

! /* counters in buf_init.c */
  extern long int ReadBufferCount;
  extern long int ReadLocalBufferCount;
  extern long int BufferHitCount;
--- 83,161 ----

  /*
   *    BufferDesc -- shared descriptor/state data for a single shared buffer.
+  *
+  * Note: buf_hdr_lock must be held to examine or change the tag, flags,
+  * refcount, or wait_backend_id fields.  buf_id field never changes after
+  * initialization, so does not need locking.  freeNext is protected by the
+  * BufFreelistLock not buf_hdr_lock.  The LWLocks can take care of themselves.
+  * The buf_hdr_lock is *not* used to control access to the data in the buffer!
+  *
+  * An exception is that if we have the buffer pinned, its tag can't change
+  * underneath us, so we can examine the tag without locking the spinlock.
+  * Also, in places we do one-time reads of the flags without bothering to
+  * lock the spinlock; this is generally for situations where we don't expect
+  * the flag bit being tested to be changing.
+  *
+  * We can't physically remove items from a disk page if another backend has
+  * the buffer pinned.  Hence, a backend may need to wait for all other pins
+  * to go away.  This is signaled by storing its own backend ID into
+  * wait_backend_id and setting flag bit BM_PIN_COUNT_WAITER.  At present,
+  * there can be only one such waiter per buffer.
+  *
+  * We use this same struct for local buffer headers, but the lock fields
+  * are not used and not all of the flag bits are useful either.
   */
  typedef struct sbufdesc
  {
!     BufferTag    tag;            /* ID of page contained in buffer */
      BufFlags    flags;            /* see bit definitions above */
      unsigned    refcount;        /* # of backends holding pins on buffer */
+     BackendId    wait_backend_id;    /* backend ID of pin-count waiter */

!     slock_t        buf_hdr_lock;    /* protects the above fields */

!     int            buf_id;            /* buffer's index number (from 0) */
!     int            freeNext;        /* link in freelist chain */

!     LWLockId    io_in_progress_lock;    /* to wait for I/O to complete */
!     LWLockId    content_lock;    /* to lock access to buffer contents */
  } BufferDesc;

  #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)

+ /*
+  * The freeNext field is either the index of the next freelist entry,
+  * or one of these special values:
+  */
+ #define FREENEXT_END_OF_LIST    (-1)
+ #define FREENEXT_NOT_IN_LIST    (-2)
+
+ /*
+  * Macros for acquiring/releasing a buffer header's spinlock.  The
+  * NoHoldoff cases may be used when we know that we hold some LWLock
+  * and therefore interrupts are already held off.  Do not apply these
+  * to local buffers!
+  */
+ #define LockBufHdr(bufHdr)  \
+     SpinLockAcquire(&(bufHdr)->buf_hdr_lock)
+ #define UnlockBufHdr(bufHdr)  \
+     SpinLockRelease(&(bufHdr)->buf_hdr_lock)
+ #define LockBufHdr_NoHoldoff(bufHdr)  \
+     SpinLockAcquire_NoHoldoff(&(bufHdr)->buf_hdr_lock)
+ #define UnlockBufHdr_NoHoldoff(bufHdr)  \
+     SpinLockRelease_NoHoldoff(&(bufHdr)->buf_hdr_lock)

!
! /* in buf_init.c */
  extern BufferDesc *BufferDescriptors;

  /* in localbuf.c */
  extern BufferDesc *LocalBufferDescriptors;

! /* in freelist.c */
! extern bool strategy_hint_vacuum;
!
! /* event counters in buf_init.c */
  extern long int ReadBufferCount;
  extern long int ReadLocalBufferCount;
  extern long int BufferHitCount;
***************
*** 129,143 ****
   */

  /* freelist.c */
! extern BufferDesc *StrategyBufferLookup(BufferTag *tagPtr, bool recheck,
!                      int *cdb_found_index);
! extern BufferDesc *StrategyGetBuffer(int *cdb_replace_index);
! extern void StrategyReplaceBuffer(BufferDesc *buf, BufferTag *newTag,
!                       int cdb_found_index, int cdb_replace_index);
! extern void StrategyInvalidateBuffer(BufferDesc *buf);
! extern void StrategyHintVacuum(bool vacuum_active);
! extern int StrategyDirtyBufferList(BufferDesc **buffers, BufferTag *buftags,
!                         int max_buffers);
  extern int    StrategyShmemSize(void);
  extern void StrategyInitialize(bool init);

--- 169,177 ----
   */

  /* freelist.c */
! extern BufferDesc *StrategyGetBuffer(void);
! extern void StrategyFreeBuffer(BufferDesc *buf, bool at_head);
! extern int    StrategySyncStart(void);
  extern int    StrategyShmemSize(void);
  extern void StrategyInitialize(bool init);

***************
*** 145,151 ****
  extern int    BufTableShmemSize(int size);
  extern void InitBufTable(int size);
  extern int    BufTableLookup(BufferTag *tagPtr);
! extern void BufTableInsert(BufferTag *tagPtr, int buf_id);
  extern void BufTableDelete(BufferTag *tagPtr);

  /* localbuf.c */
--- 179,185 ----
  extern int    BufTableShmemSize(int size);
  extern void InitBufTable(int size);
  extern int    BufTableLookup(BufferTag *tagPtr);
! extern int    BufTableInsert(BufferTag *tagPtr, int buf_id);
  extern void BufTableDelete(BufferTag *tagPtr);

  /* localbuf.c */
*** src/include/storage/bufmgr.h.orig    Fri Dec 31 17:47:02 2004
--- src/include/storage/bufmgr.h    Tue Feb 15 15:48:33 2005
***************
*** 27,47 ****

  /* in bufmgr.c */
  extern bool zero_damaged_pages;

  /* in buf_init.c */
  extern DLLIMPORT Block *BufferBlockPointers;
! extern int32 *PrivateRefCount;

  /* in localbuf.c */
  extern DLLIMPORT int NLocBuffer;
  extern DLLIMPORT Block *LocalBufferBlockPointers;
! extern int32 *LocalRefCount;

  /* special block number for ReadBuffer() */
  #define P_NEW    InvalidBlockNumber        /* grow the file to get a new page */

  /*
!  * Buffer context lock modes
   */
  #define BUFFER_LOCK_UNLOCK        0
  #define BUFFER_LOCK_SHARE        1
--- 27,49 ----

  /* in bufmgr.c */
  extern bool zero_damaged_pages;
+ extern int    BgWriterPercent;
+ extern int    BgWriterMaxPages;

  /* in buf_init.c */
  extern DLLIMPORT Block *BufferBlockPointers;
! extern DLLIMPORT int32 *PrivateRefCount;

  /* in localbuf.c */
  extern DLLIMPORT int NLocBuffer;
  extern DLLIMPORT Block *LocalBufferBlockPointers;
! extern DLLIMPORT int32 *LocalRefCount;

  /* special block number for ReadBuffer() */
  #define P_NEW    InvalidBlockNumber        /* grow the file to get a new page */

  /*
!  * Buffer content lock modes (mode argument for LockBuffer())
   */
  #define BUFFER_LOCK_UNLOCK        0
  #define BUFFER_LOCK_SHARE        1
***************
*** 150,157 ****
  extern void AbortBufferIO(void);

  extern void BufmgrCommit(void);
! extern int    BufferSync(int percent, int maxpages);

  extern void InitLocalBuffer(void);

  #endif
--- 152,163 ----
  extern void AbortBufferIO(void);

  extern void BufmgrCommit(void);
! extern void    BufferSync(void);
! extern int    BgBufferSync(void);

  extern void InitLocalBuffer(void);
+
+ /* in freelist.c */
+ extern void StrategyHintVacuum(bool vacuum_active);

  #endif
*** src/include/storage/lwlock.h.orig    Fri Dec 31 17:47:04 2004
--- src/include/storage/lwlock.h    Sun Feb 13 18:34:50 2005
***************
*** 25,31 ****
   */
  typedef enum LWLockId
  {
!     BufMgrLock,
      LockMgrLock,
      OidGenLock,
      XidGenLock,
--- 25,32 ----
   */
  typedef enum LWLockId
  {
!     BufMappingLock,
!     BufFreelistLock,
      LockMgrLock,
      OidGenLock,
      XidGenLock,

pgsql-patches by date:

Previous
From: "Joshua D. Drake"
Date:
Subject: Re: Shared dependency patch
Next
From: "Mark Cave-Ayland"
Date:
Subject: Re: WIP: bufmgr rewrite per recent discussions