Tom Lane wrote:
> Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
>> The ReadBuffer() interface is already pretty complex, with all the
>> different variants. We should probably keep the good old ReadBuffer()
>> the same, for the sake of simplicity in the callers, but try to reduce
>> the number of other variatns.
>
> Indeed. Did you see the discussion about the similarly-too-complex
> heap_insert API a couple days ago in connection with bulk-write
> scenarios? The conclusion there was to try to shift stuff into a
> bitmask options argument, in hopes that future additions might not
> require touching every caller. Can we do it similarly here?
Hmm. I think an enum is better than a bitmask here. At the moment, we
need three different modes of operation:
1. Read the page as usual, throw an error on corrupted page (ReadBuffer())
2. Read the page, zero page on corruption (this is new)
3. Don't read the page from disk, just allocate a buffer.
(ReadOrZeroBuffer())
If we turned this into a bitmask, what would the bits be? Perhaps:
DONT_READ /* don't read the page from disk, just allocate buffer */
NO_ERROR_ON_CORRUPTION /* don't throw an error if page is corrupt */
With two bits, there's four different combinations. I don't think the
DONT_READ | NO_ERROR_ON_CORRUPTION combination makes much sense. Also,
negative arguments like that can be confusing, but if we inverted the
meanings, most callers would have to pass both flags to get the normal
behavior.
Looking into the crystal ball, there's two forthcoming features to the
interface that I can see:
1. Pin the buffer if the page is in buffer cache. If it's not, do
nothing. This is what Simon proposed for the B-tree vacuum interlocking,
and I can see that it might be useful elsewhere as well.
2. The posix_fadvise() thing. Or async I/O. It looks like it's going to
be a separate function you call before ReadBuffer(), but it could also
be implemented as a new mode to ReadBuffer() that just allocates a
buffer, issues a posix_fadvise(), and returns. You would then pass the
Buffer to another function to finish the read and make the contents of
the buffer valid.
Neither of these fits too well with the bitmask. Neither would make
sense with DONT_READ or NO_ERROR_ON_CORRUPTION.
So, attached is a patch using an enum. Barring objections, I'll commit this.
There is a conflict with Simon's hot standby patch, though. Simon's
patch adds yet another argument to XLogReadBufferWithFork(), to indicate
whether a normal exclusive lock or a cleanup lock is taken on the
buffer. I'm inclined to change the interface of XLogReadBufferExtended
(as it's now called, after this patch) so that it only pins the page,
and leave the locking to the caller.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
*** src/backend/access/gin/ginvacuum.c
--- src/backend/access/gin/ginvacuum.c
***************
*** 155,164 **** xlogVacuumPage(Relation index, Buffer buffer)
static bool
ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer)
{
! Buffer buffer = ReadBufferWithStrategy(gvs->index, blkno, gvs->strategy);
! Page page = BufferGetPage(buffer);
bool hasVoidPage = FALSE;
/*
* We should be sure that we don't concurrent with inserts, insert process
* never release root page until end (but it can unlock it and lock
--- 155,168 ----
static bool
ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer)
{
! Buffer buffer;
! Page page;
bool hasVoidPage = FALSE;
+ buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
+ RBM_NORMAL, gvs->strategy);
+ page = BufferGetPage(buffer);
+
/*
* We should be sure that we don't concurrent with inserts, insert process
* never release root page until end (but it can unlock it and lock
***************
*** 241,253 **** static void
ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
{
! Buffer dBuffer = ReadBufferWithStrategy(gvs->index, deleteBlkno, gvs->strategy);
! Buffer lBuffer = (leftBlkno == InvalidBlockNumber) ?
! InvalidBuffer : ReadBufferWithStrategy(gvs->index, leftBlkno, gvs->strategy);
! Buffer pBuffer = ReadBufferWithStrategy(gvs->index, parentBlkno, gvs->strategy);
Page page,
parentPage;
LockBuffer(dBuffer, GIN_EXCLUSIVE);
if (!isParentRoot) /* parent is already locked by
* LockBufferForCleanup() */
--- 245,268 ----
ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
{
! Buffer dBuffer;
! Buffer lBuffer;
! Buffer pBuffer;
Page page,
parentPage;
+ dBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, deleteBlkno,
+ RBM_NORMAL, gvs->strategy);
+
+ if (leftBlkno != InvalidBlockNumber)
+ lBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, leftBlkno,
+ RBM_NORMAL, gvs->strategy);
+ else
+ lBuffer = InvalidBuffer;
+
+ pBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, parentBlkno,
+ RBM_NORMAL, gvs->strategy);
+
LockBuffer(dBuffer, GIN_EXCLUSIVE);
if (!isParentRoot) /* parent is already locked by
* LockBufferForCleanup() */
***************
*** 401,407 **** ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, DataPageDel
me = parent->child;
}
! buffer = ReadBufferWithStrategy(gvs->index, blkno, gvs->strategy);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
--- 416,423 ----
me = parent->child;
}
! buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
! RBM_NORMAL, gvs->strategy);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
***************
*** 589,595 **** ginbulkdelete(PG_FUNCTION_ARGS)
gvs.strategy = info->strategy;
initGinState(&gvs.ginstate, index);
! buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
/* find leaf page */
for (;;)
--- 605,612 ----
gvs.strategy = info->strategy;
initGinState(&gvs.ginstate, index);
! buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
! RBM_NORMAL, info->strategy);
/* find leaf page */
for (;;)
***************
*** 621,627 **** ginbulkdelete(PG_FUNCTION_ARGS)
Assert(blkno != InvalidBlockNumber);
UnlockReleaseBuffer(buffer);
! buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
}
/* right now we found leftmost page in entry's BTree */
--- 638,645 ----
Assert(blkno != InvalidBlockNumber);
UnlockReleaseBuffer(buffer);
! buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
! RBM_NORMAL, info->strategy);
}
/* right now we found leftmost page in entry's BTree */
***************
*** 663,669 **** ginbulkdelete(PG_FUNCTION_ARGS)
if (blkno == InvalidBlockNumber) /* rightmost page */
break;
! buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
LockBuffer(buffer, GIN_EXCLUSIVE);
}
--- 681,688 ----
if (blkno == InvalidBlockNumber) /* rightmost page */
break;
! buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
! RBM_NORMAL, info->strategy);
LockBuffer(buffer, GIN_EXCLUSIVE);
}
***************
*** 718,724 **** ginvacuumcleanup(PG_FUNCTION_ARGS)
vacuum_delay_point();
! buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
LockBuffer(buffer, GIN_SHARE);
page = (Page) BufferGetPage(buffer);
--- 737,744 ----
vacuum_delay_point();
! buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
! RBM_NORMAL, info->strategy);
LockBuffer(buffer, GIN_SHARE);
page = (Page) BufferGetPage(buffer);
*** src/backend/access/gist/gistvacuum.c
--- src/backend/access/gist/gistvacuum.c
***************
*** 86,92 **** gistDeleteSubtree(GistVacuum *gv, BlockNumber blkno)
Buffer buffer;
Page page;
! buffer = ReadBufferWithStrategy(gv->index, blkno, gv->strategy);
LockBuffer(buffer, GIST_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
--- 86,93 ----
Buffer buffer;
Page page;
! buffer = ReadBufferExtended(gv->index, MAIN_FORKNUM, blkno, RBM_NORMAL,
! gv->strategy);
LockBuffer(buffer, GIST_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
***************
*** 306,312 **** gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
vacuum_delay_point();
! buffer = ReadBufferWithStrategy(gv->index, blkno, gv->strategy);
LockBuffer(buffer, GIST_EXCLUSIVE);
gistcheckpage(gv->index, buffer);
page = (Page) BufferGetPage(buffer);
--- 307,314 ----
vacuum_delay_point();
! buffer = ReadBufferExtended(gv->index, MAIN_FORKNUM, blkno, RBM_NORMAL,
! gv->strategy);
LockBuffer(buffer, GIST_EXCLUSIVE);
gistcheckpage(gv->index, buffer);
page = (Page) BufferGetPage(buffer);
***************
*** 595,601 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
vacuum_delay_point();
! buffer = ReadBufferWithStrategy(rel, blkno, info->strategy);
LockBuffer(buffer, GIST_SHARE);
page = (Page) BufferGetPage(buffer);
--- 597,604 ----
vacuum_delay_point();
! buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
! info->strategy);
LockBuffer(buffer, GIST_SHARE);
page = (Page) BufferGetPage(buffer);
***************
*** 691,703 **** gistbulkdelete(PG_FUNCTION_ARGS)
while (stack)
{
! Buffer buffer = ReadBufferWithStrategy(rel, stack->blkno, info->strategy);
Page page;
OffsetNumber i,
maxoff;
IndexTuple idxtuple;
ItemId iid;
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(rel, buffer);
page = (Page) BufferGetPage(buffer);
--- 694,708 ----
while (stack)
{
! Buffer buffer;
Page page;
OffsetNumber i,
maxoff;
IndexTuple idxtuple;
ItemId iid;
+ buffer = ReadBufferExtended(rel, MAIN_FORKNUM, stack->blkno,
+ RBM_NORMAL, info->strategy);
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(rel, buffer);
page = (Page) BufferGetPage(buffer);
*** src/backend/access/hash/hashpage.c
--- src/backend/access/hash/hashpage.c
***************
*** 158,164 **** _hash_getinitbuf(Relation rel, BlockNumber blkno)
if (blkno == P_NEW)
elog(ERROR, "hash AM does not use P_NEW");
! buf = ReadOrZeroBuffer(rel, MAIN_FORKNUM, blkno);
LockBuffer(buf, HASH_WRITE);
--- 158,164 ----
if (blkno == P_NEW)
elog(ERROR, "hash AM does not use P_NEW");
! buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO, NULL);
LockBuffer(buf, HASH_WRITE);
***************
*** 203,209 **** _hash_getnewbuf(Relation rel, BlockNumber blkno)
BufferGetBlockNumber(buf), blkno);
}
else
! buf = ReadOrZeroBuffer(rel, MAIN_FORKNUM, blkno);
LockBuffer(buf, HASH_WRITE);
--- 203,209 ----
BufferGetBlockNumber(buf), blkno);
}
else
! buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO, NULL);
LockBuffer(buf, HASH_WRITE);
***************
*** 231,237 **** _hash_getbuf_with_strategy(Relation rel, BlockNumber blkno,
if (blkno == P_NEW)
elog(ERROR, "hash AM does not use P_NEW");
! buf = ReadBufferWithStrategy(rel, blkno, bstrategy);
if (access != HASH_NOLOCK)
LockBuffer(buf, access);
--- 231,237 ----
if (blkno == P_NEW)
elog(ERROR, "hash AM does not use P_NEW");
! buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, bstrategy);
if (access != HASH_NOLOCK)
LockBuffer(buf, access);
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 205,213 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
}
/* read page using selected strategy */
! scan->rs_cbuf = ReadBufferWithStrategy(scan->rs_rd,
! page,
! scan->rs_strategy);
scan->rs_cblock = page;
if (!scan->rs_pageatatime)
--- 205,212 ----
}
/* read page using selected strategy */
! scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
! RBM_NORMAL, scan->rs_strategy);
scan->rs_cblock = page;
if (!scan->rs_pageatatime)
*** src/backend/access/nbtree/nbtree.c
--- src/backend/access/nbtree/nbtree.c
***************
*** 750,756 **** restart:
* recycle all-zero pages, not fail. Also, we want to use a nondefault
* buffer access strategy.
*/
! buf = ReadBufferWithStrategy(rel, blkno, info->strategy);
LockBuffer(buf, BT_READ);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
--- 750,757 ----
* recycle all-zero pages, not fail. Also, we want to use a nondefault
* buffer access strategy.
*/
! buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
! info->strategy);
LockBuffer(buf, BT_READ);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
*** src/backend/access/transam/xlog.c
--- src/backend/access/transam/xlog.c
***************
*** 2897,2904 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
! buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
! true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 2897,2904 ----
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
! buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
! RBM_ZERO);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 200,205 **** XLogCheckInvalidPages(void)
--- 200,219 ----
invalid_page_tab = NULL;
}
+ /*
+ * XLogReadBufferExtended
+ * A shorthand of XLogReadBufferExtended(), for reading from the main
+ * fork.
+ *
+ * For legacy reasons, instead of a ReadBufferMode argument, this only
+ * supports RBM_ZERO (init == true) and RBM_NORMAL (init == false) modes.
+ */
+ Buffer
+ XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
+ {
+ return XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+ init ? RBM_ZERO : RBM_NORMAL);
+ }
/*
* XLogReadBuffer
***************
*** 211,244 **** XLogCheckInvalidPages(void)
* expect that this is only used during single-process XLOG replay, but
* some subroutines such as MarkBufferDirty will complain if we don't.)
*
! * If "init" is true then the caller intends to rewrite the page fully
! * using the info in the XLOG record. In this case we will extend the
! * relation if needed to make the page exist, and we will not complain about
! * the page being "new" (all zeroes); in fact, we usually will supply a
! * zeroed buffer without reading the page at all, so as to avoid unnecessary
! * failure if the page is present on disk but has corrupt headers.
*
! * If "init" is false then the caller needs the page to be valid already.
! * If the page doesn't exist or contains zeroes, we return InvalidBuffer.
* In this case the caller should silently skip the update on this page.
* (In this situation, we expect that the page was later dropped or truncated.
* If we don't see evidence of that later in the WAL sequence, we'll complain
* at the end of WAL replay.)
*/
Buffer
! XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
! {
! return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init);
! }
!
! /*
! * XLogReadBufferWithFork
! * Like XLogReadBuffer, but for reading other relation forks than
! * the main one.
! */
! Buffer
! XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! BlockNumber blkno, bool init)
{
BlockNumber lastblock;
Buffer buffer;
--- 225,246 ----
* expect that this is only used during single-process XLOG replay, but
* some subroutines such as MarkBufferDirty will complain if we don't.)
*
! * There's a couple of differences in the behavior wrt. the "mode" argument,
! * compared to ReadBufferExtended:
*
! * In RBM_NORMAL mode, if the page doesn't exist, or contains all-zeroes, we
! * return InvalidBuffer.
* In this case the caller should silently skip the update on this page.
* (In this situation, we expect that the page was later dropped or truncated.
* If we don't see evidence of that later in the WAL sequence, we'll complain
* at the end of WAL replay.)
+ *
+ * In RBM_ZERO AND RBM_ZERO_ON_ERROR modes, if the page doesn't exist, the
+ * relation is extended with all-zeroes pages up to the given block number.
*/
Buffer
! XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
! BlockNumber blkno, ReadBufferMode mode)
{
BlockNumber lastblock;
Buffer buffer;
***************
*** 264,275 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
if (blkno < lastblock)
{
/* page exists in file */
! buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno, init);
}
else
{
/* hm, page doesn't exist in file */
! if (!init)
{
log_invalid_page(rnode, forknum, blkno, false);
return InvalidBuffer;
--- 266,278 ----
if (blkno < lastblock)
{
/* page exists in file */
! buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno,
! mode, NULL);
}
else
{
/* hm, page doesn't exist in file */
! if (mode == RBM_NORMAL)
{
log_invalid_page(rnode, forknum, blkno, false);
return InvalidBuffer;
***************
*** 283,289 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer);
buffer = ReadBufferWithoutRelcache(rnode, false, forknum,
! P_NEW, false);
lastblock++;
}
Assert(BufferGetBlockNumber(buffer) == blkno);
--- 286,292 ----
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer);
buffer = ReadBufferWithoutRelcache(rnode, false, forknum,
! P_NEW, mode, NULL);
lastblock++;
}
Assert(BufferGetBlockNumber(buffer) == blkno);
***************
*** 291,297 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! if (!init)
{
/* check that page has been initialized */
Page page = (Page) BufferGetPage(buffer);
--- 294,300 ----
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! if (mode == RBM_NORMAL)
{
/* check that page has been initialized */
Page page = (Page) BufferGetPage(buffer);
*** src/backend/commands/analyze.c
--- src/backend/commands/analyze.c
***************
*** 911,917 **** acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
* each tuple, but since we aren't doing much work per tuple, the
* extra lock traffic is probably better avoided.
*/
! targbuffer = ReadBufferWithStrategy(onerel, targblock, vac_strategy);
LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
targpage = BufferGetPage(targbuffer);
maxoffset = PageGetMaxOffsetNumber(targpage);
--- 911,918 ----
* each tuple, but since we aren't doing much work per tuple, the
* extra lock traffic is probably better avoided.
*/
! targbuffer = ReadBufferExtended(onerel, MAIN_FORKNUM, targblock,
! RBM_NORMAL, vac_strategy);
LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
targpage = BufferGetPage(targbuffer);
maxoffset = PageGetMaxOffsetNumber(targpage);
*** src/backend/commands/vacuum.c
--- src/backend/commands/vacuum.c
***************
*** 1348,1354 **** scan_heap(VRelStats *vacrelstats, Relation onerel,
vacuum_delay_point();
! buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
page = BufferGetPage(buf);
/*
--- 1348,1355 ----
vacuum_delay_point();
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
! vac_strategy);
page = BufferGetPage(buf);
/*
***************
*** 1919,1925 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
/*
* Process this page of relation.
*/
! buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
page = BufferGetPage(buf);
vacpage->offsets_free = 0;
--- 1920,1927 ----
/*
* Process this page of relation.
*/
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
! vac_strategy);
page = BufferGetPage(buf);
vacpage->offsets_free = 0;
***************
*** 2173,2181 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
nextTid = tp.t_data->t_ctid;
priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
/* assume block# is OK (see heap_fetch comments) */
! nextBuf = ReadBufferWithStrategy(onerel,
ItemPointerGetBlockNumber(&nextTid),
! vac_strategy);
nextPage = BufferGetPage(nextBuf);
/* If bogus or unused slot, assume tp is end of chain */
nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
--- 2175,2183 ----
nextTid = tp.t_data->t_ctid;
priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
/* assume block# is OK (see heap_fetch comments) */
! nextBuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
ItemPointerGetBlockNumber(&nextTid),
! RBM_NORMAL, vac_strategy);
nextPage = BufferGetPage(nextBuf);
/* If bogus or unused slot, assume tp is end of chain */
nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
***************
*** 2318,2326 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
break; /* out of check-all-items loop */
}
tp.t_self = vtlp->this_tid;
! Pbuf = ReadBufferWithStrategy(onerel,
ItemPointerGetBlockNumber(&(tp.t_self)),
! vac_strategy);
Ppage = BufferGetPage(Pbuf);
Pitemid = PageGetItemId(Ppage,
ItemPointerGetOffsetNumber(&(tp.t_self)));
--- 2320,2328 ----
break; /* out of check-all-items loop */
}
tp.t_self = vtlp->this_tid;
! Pbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
ItemPointerGetBlockNumber(&(tp.t_self)),
! RBM_NORMAL, vac_strategy);
Ppage = BufferGetPage(Pbuf);
Pitemid = PageGetItemId(Ppage,
ItemPointerGetOffsetNumber(&(tp.t_self)));
***************
*** 2402,2415 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
/* Get page to move from */
tuple.t_self = vtmove[ti].tid;
! Cbuf = ReadBufferWithStrategy(onerel,
ItemPointerGetBlockNumber(&(tuple.t_self)),
! vac_strategy);
/* Get page to move to */
! dst_buffer = ReadBufferWithStrategy(onerel,
! destvacpage->blkno,
! vac_strategy);
LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
if (dst_buffer != Cbuf)
--- 2404,2417 ----
/* Get page to move from */
tuple.t_self = vtmove[ti].tid;
! Cbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
ItemPointerGetBlockNumber(&(tuple.t_self)),
! RBM_NORMAL, vac_strategy);
/* Get page to move to */
! dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
! destvacpage->blkno,
! RBM_NORMAL, vac_strategy);
LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
if (dst_buffer != Cbuf)
***************
*** 2502,2510 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
if (i == num_fraged_pages)
break; /* can't move item anywhere */
dst_vacpage = fraged_pages->pagedesc[i];
! dst_buffer = ReadBufferWithStrategy(onerel,
! dst_vacpage->blkno,
! vac_strategy);
LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
dst_page = BufferGetPage(dst_buffer);
/* if this page was not used before - clean it */
--- 2504,2512 ----
if (i == num_fraged_pages)
break; /* can't move item anywhere */
dst_vacpage = fraged_pages->pagedesc[i];
! dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
! dst_vacpage->blkno,
! RBM_NORMAL, vac_strategy);
LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
dst_page = BufferGetPage(dst_buffer);
/* if this page was not used before - clean it */
***************
*** 2681,2689 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
Page page;
/* this page was not used as a move target, so must clean it */
! buf = ReadBufferWithStrategy(onerel,
! (*curpage)->blkno,
! vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
if (!PageIsEmpty(page))
--- 2683,2690 ----
Page page;
/* this page was not used as a move target, so must clean it */
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*curpage)->blkno,
! RBM_NORMAL, vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
if (!PageIsEmpty(page))
***************
*** 2770,2776 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
int uncnt = 0;
int num_tuples = 0;
! buf = ReadBufferWithStrategy(onerel, vacpage->blkno, vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
--- 2771,2778 ----
int uncnt = 0;
int num_tuples = 0;
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, vacpage->blkno,
! RBM_NORMAL, vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
***************
*** 3150,3156 **** update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
break; /* no need to scan any further */
if ((*curpage)->offsets_used == 0)
continue; /* this page was never used as a move dest */
! buf = ReadBufferWithStrategy(rel, (*curpage)->blkno, vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
max_offset = PageGetMaxOffsetNumber(page);
--- 3152,3159 ----
break; /* no need to scan any further */
if ((*curpage)->offsets_used == 0)
continue; /* this page was never used as a move dest */
! buf = ReadBufferExtended(rel, MAIN_FORKNUM, (*curpage)->blkno,
! RBM_NORMAL, vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
max_offset = PageGetMaxOffsetNumber(page);
***************
*** 3219,3227 **** vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
if ((*vacpage)->offsets_free > 0)
{
! buf = ReadBufferWithStrategy(onerel,
! (*vacpage)->blkno,
! vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
vacuum_page(onerel, buf, *vacpage);
UnlockReleaseBuffer(buf);
--- 3222,3229 ----
if ((*vacpage)->offsets_free > 0)
{
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*vacpage)->blkno,
! RBM_NORMAL, vac_strategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
vacuum_page(onerel, buf, *vacpage);
UnlockReleaseBuffer(buf);
*** src/backend/commands/vacuumlazy.c
--- src/backend/commands/vacuumlazy.c
***************
*** 301,307 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
vacrelstats->num_index_scans++;
}
! buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
/* We need buffer cleanup lock so that we can prune HOT chains. */
LockBufferForCleanup(buf);
--- 301,308 ----
vacrelstats->num_index_scans++;
}
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
! RBM_NORMAL, vac_strategy);
/* We need buffer cleanup lock so that we can prune HOT chains. */
LockBufferForCleanup(buf);
***************
*** 618,624 **** lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
vacuum_delay_point();
tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
! buf = ReadBufferWithStrategy(onerel, tblk, vac_strategy);
LockBufferForCleanup(buf);
tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
--- 619,626 ----
vacuum_delay_point();
tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
! vac_strategy);
LockBufferForCleanup(buf);
tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
***************
*** 880,886 **** count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
blkno--;
! buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
/* In this phase we only need shared access to the buffer */
LockBuffer(buf, BUFFER_LOCK_SHARE);
--- 882,889 ----
blkno--;
! buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
! RBM_NORMAL, vac_strategy);
/* In this phase we only need shared access to the buffer */
LockBuffer(buf, BUFFER_LOCK_SHARE);
*** src/backend/storage/buffer/bufmgr.c
--- src/backend/storage/buffer/bufmgr.c
***************
*** 72,82 **** static bool IsForInput;
static volatile BufferDesc *PinCountWaitBuf = NULL;
- static Buffer ReadBuffer_relcache(Relation reln, ForkNumber forkNum,
- BlockNumber blockNum, bool zeroPage, BufferAccessStrategy strategy);
static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf,
! ForkNumber forkNum, BlockNumber blockNum,
! bool zeroPage, BufferAccessStrategy strategy, bool *hit);
static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(volatile BufferDesc *buf);
static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
--- 72,81 ----
static volatile BufferDesc *PinCountWaitBuf = NULL;
static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf,
! ForkNumber forkNum, BlockNumber blockNum,
! ReadBufferMode mode , BufferAccessStrategy strategy,
! bool *hit);
static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(volatile BufferDesc *buf);
static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
***************
*** 96,101 **** static void AtProcExit_Buffers(int code, Datum arg);
--- 95,111 ----
/*
+ * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
+ * fork with RBM_NORMAL mode and default strategy.
+ */
+ Buffer
+ ReadBuffer(Relation reln, BlockNumber blockNum)
+ {
+ return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
+ }
+
+
+ /*
* ReadBuffer -- returns a buffer containing the requested
* block of the requested relation. If the blknum
* requested is P_NEW, extend the relation file and
***************
*** 107,181 **** static void AtProcExit_Buffers(int code, Datum arg);
* the block read. The returned buffer has been pinned.
* Does not return on error --- elog's instead.
*
! * Assume when this function is called, that reln has been
! * opened already.
! */
! Buffer
! ReadBuffer(Relation reln, BlockNumber blockNum)
! {
! return ReadBuffer_relcache(reln, MAIN_FORKNUM, blockNum, false, NULL);
! }
!
! /*
! * ReadBufferWithFork -- same as ReadBuffer, but for accessing relation
! * forks other than MAIN_FORKNUM.
! */
! Buffer
! ReadBufferWithFork(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
! {
! return ReadBuffer_relcache(reln, forkNum, blockNum, false, NULL);
! }
!
! /*
! * ReadBufferWithStrategy -- same as ReadBuffer, except caller can specify
! * a nondefault buffer access strategy. See buffer/README for details.
! */
! Buffer
! ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
! BufferAccessStrategy strategy)
! {
! return ReadBuffer_relcache(reln, MAIN_FORKNUM, blockNum, false, strategy);
! }
!
! /*
! * ReadOrZeroBuffer -- like ReadBuffer, but if the page isn't in buffer
! * cache already, it's filled with zeros instead of reading it from
! * disk. Useful when the caller intends to fill the page from scratch,
! * since this saves I/O and avoids unnecessary failure if the
! * page-on-disk has corrupt page headers.
! *
! * Caution: do not use this to read a page that is beyond the relation's
! * current physical EOF; that is likely to cause problems in md.c when
! * the page is modified and written out. P_NEW is OK, though.
! */
! Buffer
! ReadOrZeroBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
! {
! return ReadBuffer_relcache(reln, forkNum, blockNum, true, NULL);
! }
!
! /*
! * ReadBufferWithoutRelcache -- like ReadBuffer, but doesn't require a
! * relcache entry for the relation. If zeroPage is true, this behaves
! * like ReadOrZeroBuffer rather than ReadBuffer.
*/
Buffer
! ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
! ForkNumber forkNum, BlockNumber blockNum, bool zeroPage)
! {
! bool hit;
!
! SMgrRelation smgr = smgropen(rnode);
! return ReadBuffer_common(smgr, isTemp, forkNum, blockNum, zeroPage, NULL, &hit);
! }
!
! /*
! * ReadBuffer_relcache -- common logic for ReadBuffer-variants that
! * operate on a Relation.
! */
! static Buffer
! ReadBuffer_relcache(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
! bool zeroPage, BufferAccessStrategy strategy)
{
bool hit;
Buffer buf;
--- 117,146 ----
* the block read. The returned buffer has been pinned.
* Does not return on error --- elog's instead.
*
! * Assume when this function is called, that reln has been opened already.
! *
! * In RBM_NORMAL mode, the page is read from disk, and the page header is
! * validated. An error is thrown if the page header is not valid.
! *
! * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
! * valid, the page is zeroed instead of throwing an error. This is intended
! * for non-critical data, where the caller is prepared to deal repair
! * errors.
! *
! * In RBM_ZERO mode, if the page isn't in buffer cache already, it's filled
! * with zeros instead of reading it from disk. Useful when the caller is
! * going to fill the page from scratch, since this saves I/O and avoids
! * unnecessary failure if the page-on-disk has corrupt page headers.
! * Caution: do not use this mode to read a page that is beyond the relation's
! * current physical EOF; that is likely to cause problems in md.c when
! * the page is modified and written out. P_NEW is OK, though.
! *
! * If strategy is not NULL, a nondefault buffer access strategy is used.
! * See buffer/README for details.
*/
Buffer
! ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
! ReadBufferMode mode, BufferAccessStrategy strategy)
{
bool hit;
Buffer buf;
***************
*** 189,200 **** ReadBuffer_relcache(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
*/
pgstat_count_buffer_read(reln);
buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, forkNum, blockNum,
! zeroPage, strategy, &hit);
if (hit)
pgstat_count_buffer_hit(reln);
return buf;
}
/*
* ReadBuffer_common -- common logic for all ReadBuffer variants
*
--- 154,183 ----
*/
pgstat_count_buffer_read(reln);
buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, forkNum, blockNum,
! mode, strategy, &hit);
if (hit)
pgstat_count_buffer_hit(reln);
return buf;
}
+
+ /*
+ * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
+ * a relcache entry for the relation.
+ */
+ Buffer
+ ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
+ ForkNumber forkNum, BlockNumber blockNum,
+ ReadBufferMode mode, BufferAccessStrategy strategy)
+ {
+ bool hit;
+
+ SMgrRelation smgr = smgropen(rnode);
+ return ReadBuffer_common(smgr, isTemp, forkNum, blockNum, mode, strategy,
+ &hit);
+ }
+
+
/*
* ReadBuffer_common -- common logic for all ReadBuffer variants
*
***************
*** 202,208 **** ReadBuffer_relcache(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
*/
static Buffer
ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
! BlockNumber blockNum, bool zeroPage,
BufferAccessStrategy strategy, bool *hit)
{
volatile BufferDesc *bufHdr;
--- 185,191 ----
*/
static Buffer
ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
! BlockNumber blockNum, ReadBufferMode mode,
BufferAccessStrategy strategy, bool *hit)
{
volatile BufferDesc *bufHdr;
***************
*** 295,302 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((Page) bufBlock))
ereport(ERROR,
! (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u",
! blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
/*
--- 278,285 ----
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((Page) bufBlock))
ereport(ERROR,
! (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u/%u",
! blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode,
forkNum),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
/*
***************
*** 356,362 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
* Read in the page, unless the caller intends to overwrite it and
* just wants us to allocate a buffer.
*/
! if (zeroPage)
MemSet((char *) bufBlock, 0, BLCKSZ);
else
{
--- 339,345 ----
* Read in the page, unless the caller intends to overwrite it and
* just wants us to allocate a buffer.
*/
! if (mode == RBM_ZERO)
MemSet((char *) bufBlock, 0, BLCKSZ);
else
{
***************
*** 365,388 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
/* check for garbage data */
if (!PageHeaderIsValid((PageHeader) bufBlock))
{
! if (zero_damaged_pages)
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation %u/%u/%u; zeroing out page",
blockNum,
smgr->smgr_rnode.spcNode,
smgr->smgr_rnode.dbNode,
! smgr->smgr_rnode.relNode)));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation %u/%u/%u",
blockNum, smgr->smgr_rnode.spcNode,
smgr->smgr_rnode.dbNode,
! smgr->smgr_rnode.relNode)));
}
}
}
--- 348,372 ----
/* check for garbage data */
if (!PageHeaderIsValid((PageHeader) bufBlock))
{
! if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation %u/%u/%u/%u; zeroing out page",
blockNum,
smgr->smgr_rnode.spcNode,
smgr->smgr_rnode.dbNode,
! smgr->smgr_rnode.relNode,
! forkNum)));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation %u/%u/%u/%u",
blockNum, smgr->smgr_rnode.spcNode,
smgr->smgr_rnode.dbNode,
! smgr->smgr_rnode.relNode, forkNum)));
}
}
}
***************
*** 1679,1688 **** PrintBufferLeakWarning(Buffer buffer)
/* theoretically we should lock the bufhdr here */
elog(WARNING,
"buffer refcount leak: [%03d] "
! "(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
buffer,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, loccount);
}
--- 1663,1672 ----
/* theoretically we should lock the bufhdr here */
elog(WARNING,
"buffer refcount leak: [%03d] "
! "(rel=%u/%u/%u, forkNum=%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
buffer,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode, buf->tag.forkNum,
buf->tag.blockNum, buf->flags,
buf->refcount, loccount);
}
***************
*** 1991,2001 **** PrintBufferDescs(void)
{
/* theoretically we should lock the bufhdr here */
elog(LOG,
! "[%02d] (freeNext=%d, rel=%u/%u/%u, "
"blockNum=%u, flags=0x%x, refcount=%u %d)",
i, buf->freeNext,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
--- 1975,1985 ----
{
/* theoretically we should lock the bufhdr here */
elog(LOG,
! "[%02d] (freeNext=%d, rel=%u/%u/%u, forkNum=%u, "
"blockNum=%u, flags=0x%x, refcount=%u %d)",
i, buf->freeNext,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode, buf->tag.forkNum,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
***************
*** 2015,2025 **** PrintPinnedBufs(void)
{
/* theoretically we should lock the bufhdr here */
elog(LOG,
! "[%02d] (freeNext=%d, rel=%u/%u/%u, "
"blockNum=%u, flags=0x%x, refcount=%u %d)",
i, buf->freeNext,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
--- 1999,2009 ----
{
/* theoretically we should lock the bufhdr here */
elog(LOG,
! "[%02d] (freeNext=%d, rel=%u/%u/%u, forkNum=%u, "
"blockNum=%u, flags=0x%x, refcount=%u %d)",
i, buf->freeNext,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode, buf->tag.forkNum,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
***************
*** 2654,2664 **** AbortBufferIO(void)
/* Buffer is pinned, so we can read tag without spinlock */
ereport(WARNING,
(errcode(ERRCODE_IO_ERROR),
! errmsg("could not write block %u of %u/%u/%u",
buf->tag.blockNum,
buf->tag.rnode.spcNode,
buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode),
errdetail("Multiple failures --- write error might be permanent.")));
}
}
--- 2638,2648 ----
/* Buffer is pinned, so we can read tag without spinlock */
ereport(WARNING,
(errcode(ERRCODE_IO_ERROR),
! errmsg("could not write block %u of %u/%u/%u/%u",
buf->tag.blockNum,
buf->tag.rnode.spcNode,
buf->tag.rnode.dbNode,
! buf->tag.rnode.relNode, buf->tag.forkNum),
errdetail("Multiple failures --- write error might be permanent.")));
}
}
***************
*** 2676,2684 **** buffer_write_error_callback(void *arg)
/* Buffer is pinned, so we can read the tag without locking the spinlock */
if (bufHdr != NULL)
! errcontext("writing block %u of relation %u/%u/%u",
bufHdr->tag.blockNum,
bufHdr->tag.rnode.spcNode,
bufHdr->tag.rnode.dbNode,
! bufHdr->tag.rnode.relNode);
}
--- 2660,2669 ----
/* Buffer is pinned, so we can read the tag without locking the spinlock */
if (bufHdr != NULL)
! errcontext("writing block %u of relation %u/%u/%u/%u",
bufHdr->tag.blockNum,
bufHdr->tag.rnode.spcNode,
bufHdr->tag.rnode.dbNode,
! bufHdr->tag.rnode.relNode,
! bufHdr->tag.forkNum);
}
*** src/backend/storage/freespace/freespace.c
--- src/backend/storage/freespace/freespace.c
***************
*** 504,509 **** static Buffer
--- 504,510 ----
fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
{
BlockNumber blkno = fsm_logical_to_physical(addr);
+ Buffer buf;
RelationOpenSmgr(rel);
***************
*** 518,524 **** fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
else
return InvalidBuffer;
}
! return ReadBufferWithFork(rel, FSM_FORKNUM, blkno);
}
/*
--- 519,536 ----
else
return InvalidBuffer;
}
!
! /*
! * Use ZERO_ON_ERROR mode, and initialize the page if necessary. The FSM
! * information is not accurate anyway, so it's better to clear corrupt
! * pages than error out. Since the FSM changes are not WAL-logged, the
! * so-called torn page problem on crash can lead to pages with corrupt
! * headers, for example.
! */
! buf = ReadBufferExtended(rel, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR, NULL);
! if (PageIsNew(BufferGetPage(buf)))
! PageInit(BufferGetPage(buf), BLCKSZ, 0);
! return buf;
}
/*
***************
*** 779,801 **** fsm_redo_truncate(xl_fsm_truncate *xlrec)
* replay of the smgr truncation record to remove completely unused
* pages.
*/
! buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk, false);
if (BufferIsValid(buf))
{
! fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
}
- else
- {
- /*
- * The page doesn't exist. Because FSM extensions are not WAL-logged,
- * it's normal to have a truncation record for a page that doesn't
- * exist. Tell xlogutils.c not to PANIC at the end of recovery
- * because of the missing page
- */
- XLogTruncateRelation(xlrec->node, FSM_FORKNUM, fsmblk);
- }
}
void
--- 791,808 ----
* replay of the smgr truncation record to remove completely unused
* pages.
*/
! buf = XLogReadBufferExtended(xlrec->node, FSM_FORKNUM, fsmblk,
! RBM_ZERO_ON_ERROR);
if (BufferIsValid(buf))
{
! Page page = BufferGetPage(buf);
!
! if (PageIsNew(page))
! PageInit(page, BLCKSZ, 0);
! fsm_truncate_avail(page, first_removed_slot);
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
}
}
void
*** src/include/access/xlogutils.h
--- src/include/access/xlogutils.h
***************
*** 12,17 ****
--- 12,18 ----
#define XLOG_UTILS_H
#include "storage/buf.h"
+ #include "storage/bufmgr.h"
#include "storage/relfilenode.h"
#include "storage/block.h"
#include "utils/relcache.h"
***************
*** 25,32 **** extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
BlockNumber nblocks);
extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
! extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! BlockNumber blkno, bool init);
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel);
--- 26,33 ----
BlockNumber nblocks);
extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
! extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
! BlockNumber blkno, ReadBufferMode mode);
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel);
*** src/include/storage/bufmgr.h
--- src/include/storage/bufmgr.h
***************
*** 31,36 **** typedef enum BufferAccessStrategyType
--- 31,46 ----
BAS_VACUUM /* VACUUM */
} BufferAccessStrategyType;
+ /*
+ * Possible modes for ReadBufferExtended()
+ */
+ typedef enum
+ {
+ RBM_NORMAL, /* Normal read */
+ RBM_ZERO, /* Don't read from disk, caller will initialize */
+ RBM_ZERO_ON_ERROR /* Read, but return an all-zeros page on error */
+ } ReadBufferMode;
+
/* in globals.c ... this duplicates miscadmin.h */
extern PGDLLIMPORT int NBuffers;
***************
*** 144,156 **** extern PGDLLIMPORT int32 *LocalRefCount;
* prototypes for functions in bufmgr.c
*/
extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
! extern Buffer ReadBufferWithFork(Relation reln, ForkNumber forkNum, BlockNumber blockNum);
! extern Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
! BufferAccessStrategy strategy);
! extern Buffer ReadOrZeroBuffer(Relation reln, ForkNumber forkNum,
! BlockNumber blockNum);
extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
! ForkNumber forkNum, BlockNumber blockNum, bool zeroPage);
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer);
--- 154,165 ----
* prototypes for functions in bufmgr.c
*/
extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
! extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
! BlockNumber blockNum, ReadBufferMode mode,
! BufferAccessStrategy strategy);
extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
! ForkNumber forkNum, BlockNumber blockNum,
! ReadBufferMode mode, BufferAccessStrategy strategy);
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer);