Tom Lane wrote:
> The XLogOpenRelationWithFork stuff needs to be re-thought also,
> as again this is blurring the question of what's a logical and
> what's a physical relation --- and if forknum isn't part of the
> relation ID, that API is wrong either way. I'm not sure about
> a good solution in this area, but I wonder if the right answer
> might be to make the XLOG replay stuff use SMgrRelations instead
> of bogus Relations. IIRC the replay code design predates the
> existence of SMgrRelation, so maybe we need a fresh look there.
I joggled this around for a while, and settled on removing
XLogOpenRelation altogether, and modifying XLogReadBuffer so that it
takes RelFileNode directly as argument. XLogReadBuffer takes care of
calling smgropen and creating missing files, like XLogOpenRelation used
to. This fits nicely with the changes required for relation forks: the
blurring of logical and physical goes away with XLogOpenRelation, as
XLogReadBuffer clearly works at the physical level.
To support functions that really do need a Relation object, like b-tree
cleanup routine which needs to insert a new index pointer to parent
page, which uses the same code that's used during normal operation,
there's a function XLogFakeRelcacheEntry that let's you create a fake
relcache entry just like XLogOpenRelation used to.
There's one ugly modularity violation in the patch: in XLogReadBuffer,
I'm checking "if(smgr->md_fd == NULL)", to avoid calling smgrcreate if a
relation has already been accessed. I'm seeing two possible solutions to
it: push the functionality to smgr.c by adding a new function
smgropenandcreate that opens a relation and creates it if it doesn't
exist, or just document the wart in comments. I'm leaning towards just
documenting it.
Attached is a patch with just these refactorings, no relation fork or
FSM stuff included. I'm planning to commit this first, followed by the
patch to introduce relation forks, followed by FSM rewrite patch. I
think we're almost there with this refactoring patch and relation forks.
FSM still needs a fair amount of closing up loose ends, clean up, and
testing.
Ps. The ReadBuffer interface is getting out of hand. This patch
introduces one more ReadBuffer variant, ReadBufferWithoutRelcache, and
the upcoming relation forks patch will again introduce at least one new
variant.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
*** a/src/backend/access/gin/ginxlog.c
--- b/src/backend/access/gin/ginxlog.c
***************
*** 70,81 **** static void
ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
{
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
! reln = XLogOpenRelation(*node);
! buffer = XLogReadBuffer(reln, GIN_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 70,79 ----
ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
{
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
Buffer buffer;
Page page;
! buffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 93,104 **** ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
- Relation reln;
Buffer buffer;
Page page;
! reln = XLogOpenRelation(data->node);
! buffer = XLogReadBuffer(reln, data->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 91,100 ----
{
ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
Buffer buffer;
Page page;
! buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 117,123 **** static void
ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
--- 113,118 ----
***************
*** 125,132 **** ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! reln = XLogOpenRelation(data->node);
! buffer = XLogReadBuffer(reln, data->blkno, false);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 120,126 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(data->node, data->blkno, false);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 227,252 **** static void
ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
- Relation reln;
Buffer lbuffer,
rbuffer;
Page lpage,
rpage;
uint32 flags = 0;
- reln = XLogOpenRelation(data->node);
-
if (data->isLeaf)
flags |= GIN_LEAF;
if (data->isData)
flags |= GIN_DATA;
! lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
Assert(BufferIsValid(lbuffer));
lpage = (Page) BufferGetPage(lbuffer);
GinInitBuffer(lbuffer, flags);
! rbuffer = XLogReadBuffer(reln, data->rblkno, true);
Assert(BufferIsValid(rbuffer));
rpage = (Page) BufferGetPage(rbuffer);
GinInitBuffer(rbuffer, flags);
--- 221,243 ----
ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
Buffer lbuffer,
rbuffer;
Page lpage,
rpage;
uint32 flags = 0;
if (data->isLeaf)
flags |= GIN_LEAF;
if (data->isData)
flags |= GIN_DATA;
! lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
Assert(BufferIsValid(lbuffer));
lpage = (Page) BufferGetPage(lbuffer);
GinInitBuffer(lbuffer, flags);
! rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
Assert(BufferIsValid(rbuffer));
rpage = (Page) BufferGetPage(rbuffer);
GinInitBuffer(rbuffer, flags);
***************
*** 318,324 **** ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isRootSplit)
{
! Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
--- 309,315 ----
if (data->isRootSplit)
{
! Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
***************
*** 351,357 **** static void
ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
--- 342,347 ----
***************
*** 359,366 **** ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! reln = XLogOpenRelation(data->node);
! buffer = XLogReadBuffer(reln, data->blkno, false);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 349,355 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(data->node, data->blkno, false);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 402,416 **** static void
ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
- reln = XLogOpenRelation(data->node);
-
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(reln, data->blkno, false);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED;
--- 391,402 ----
ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
Buffer buffer;
Page page;
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(data->node, data->blkno, false);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED;
***************
*** 422,428 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
if (!(record->xl_info & XLR_BKP_BLOCK_2))
{
! buffer = XLogReadBuffer(reln, data->parentBlkno, false);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
Assert(!GinPageIsLeaf(page));
--- 408,414 ----
if (!(record->xl_info & XLR_BKP_BLOCK_2))
{
! buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
Assert(!GinPageIsLeaf(page));
***************
*** 435,441 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
{
! buffer = XLogReadBuffer(reln, data->leftBlkno, false);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink;
--- 421,427 ----
if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
{
! buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
page = BufferGetPage(buffer);
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink;
***************
*** 556,564 **** ginContinueSplit(ginIncompleteSplit *split)
* elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
* split->leftBlkno, split->rightBlkno);
*/
! reln = XLogOpenRelation(split->node);
! buffer = XLogReadBuffer(reln, split->leftBlkno, false);
if (split->rootBlkno == GIN_ROOT_BLKNO)
{
--- 542,550 ----
* elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
* split->leftBlkno, split->rightBlkno);
*/
! buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
! reln = XLogFakeRelcacheEntry(split->node);
if (split->rootBlkno == GIN_ROOT_BLKNO)
{
***************
*** 580,585 **** ginContinueSplit(ginIncompleteSplit *split)
--- 566,573 ----
GinPageGetOpaque(page)->maxoff))->key;
}
+ pfree(reln);
+
btree.rightblkno = split->rightBlkno;
stack.blkno = split->leftBlkno;
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 188,194 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
{
gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
PageUpdateRecord xlrec;
- Relation reln;
Buffer buffer;
Page page;
--- 188,193 ----
***************
*** 207,214 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
decodePageUpdateRecord(&xlrec, record);
! reln = XLogOpenRelation(xlrec.data->node);
! buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
--- 206,212 ----
decodePageUpdateRecord(&xlrec, record);
! buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
***************
*** 233,239 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
--- 231,241 ----
/* add tuples */
if (xlrec.len > 0)
+ {
+ Relation reln = XLogFakeRelcacheEntry(xlrec.data->node);
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
+ pfree(reln);
+ }
/*
* special case: leafpage, nothing to insert, nothing to delete, then
***************
*** 261,267 **** static void
gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
{
gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
--- 263,268 ----
***************
*** 269,276 **** gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! reln = XLogOpenRelation(xldata->node);
! buffer = XLogReadBuffer(reln, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
--- 270,276 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
***************
*** 325,339 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
int flags;
decodePageSplitRecord(&xlrec, record);
- reln = XLogOpenRelation(xlrec.data->node);
flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
! buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 325,340 ----
int flags;
decodePageSplitRecord(&xlrec, record);
flags = xlrec.data->origleaf ? F_LEAF : 0;
+ reln = XLogFakeRelcacheEntry(xlrec.data->node);
+
/* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
! buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 348,353 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
--- 349,355 ----
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
+ pfree(reln);
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
***************
*** 360,371 **** static void
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
{
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
! reln = XLogOpenRelation(*node);
! buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 362,371 ----
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
{
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
Buffer buffer;
Page page;
! buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 601,607 **** gistContinueInsert(gistIncompleteInsert *insert)
lenitup;
Relation index;
! index = XLogOpenRelation(insert->node);
/*
* needed vector itup never will be more than initial lenblkno+2, because
--- 601,607 ----
lenitup;
Relation index;
! index = XLogFakeRelcacheEntry(insert->node);
/*
* needed vector itup never will be more than initial lenblkno+2, because
***************
*** 623,629 **** gistContinueInsert(gistIncompleteInsert *insert)
* it was split root, so we should only make new root. it can't be
* simple insert into root, we should replace all content of root.
*/
! Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
gistnewroot(index, buffer, itup, lenitup, NULL);
UnlockReleaseBuffer(buffer);
--- 623,629 ----
* it was split root, so we should only make new root. it can't be
* simple insert into root, we should replace all content of root.
*/
! Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
gistnewroot(index, buffer, itup, lenitup, NULL);
UnlockReleaseBuffer(buffer);
***************
*** 793,798 **** gistContinueInsert(gistIncompleteInsert *insert)
--- 793,800 ----
}
}
+ pfree(index);
+
ereport(LOG,
(errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 3954,3961 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! reln = XLogOpenRelation(xlrec->node);
! buffer = XLogReadBuffer(reln, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
--- 3954,3960 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
***************
*** 3976,3986 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 3975,3987 ----
Assert(nunused >= 0);
/* Update all item pointers per the record, and repair fragmentation */
+ reln = XLogFakeRelcacheEntry(xlrec->node);
heap_page_prune_execute(reln, buffer,
redirected, nredirected,
nowdead, ndead,
nowunused, nunused,
clean_move);
+ pfree(reln);
/*
* Note: we don't worry about updating the page's prunability hints.
***************
*** 3998,4012 **** heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record);
TransactionId cutoff_xid = xlrec->cutoff_xid;
- Relation reln;
Buffer buffer;
Page page;
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! reln = XLogOpenRelation(xlrec->node);
! buffer = XLogReadBuffer(reln, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
--- 3999,4011 ----
{
xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record);
TransactionId cutoff_xid = xlrec->cutoff_xid;
Buffer buffer;
Page page;
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
***************
*** 4046,4052 **** static void
heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
--- 4045,4050 ----
***************
*** 4054,4061 **** heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
* Note: the NEWPAGE log record is used for both heaps and indexes, so do
* not do anything that assumes we are touching a heap.
*/
! reln = XLogOpenRelation(xlrec->node);
! buffer = XLogReadBuffer(reln, xlrec->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 4052,4058 ----
* Note: the NEWPAGE log record is used for both heaps and indexes, so do
* not do anything that assumes we are touching a heap.
*/
! buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 4072,4078 **** static void
heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
OffsetNumber offnum;
--- 4069,4074 ----
***************
*** 4082,4089 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! reln = XLogOpenRelation(xlrec->target.node);
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
--- 4078,4084 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
***************
*** 4129,4135 **** static void
heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
OffsetNumber offnum;
--- 4124,4129 ----
***************
*** 4145,4155 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
- reln = XLogOpenRelation(xlrec->target.node);
-
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
true);
Assert(BufferIsValid(buffer));
--- 4139,4147 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
true);
Assert(BufferIsValid(buffer));
***************
*** 4159,4165 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
}
else
{
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
--- 4151,4157 ----
}
else
{
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
***************
*** 4212,4218 **** static void
heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
{
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
- Relation reln = XLogOpenRelation(xlrec->target.node);
Buffer buffer;
bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
--- 4204,4209 ----
***************
*** 4238,4244 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
/* Deal with old tuple version */
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
--- 4229,4235 ----
/* Deal with old tuple version */
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
***************
*** 4313,4319 **** newt:;
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->newtid)),
true);
Assert(BufferIsValid(buffer));
--- 4304,4310 ----
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->newtid)),
true);
Assert(BufferIsValid(buffer));
***************
*** 4323,4329 **** newt:;
}
else
{
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->newtid)),
false);
if (!BufferIsValid(buffer))
--- 4314,4320 ----
}
else
{
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->newtid)),
false);
if (!BufferIsValid(buffer))
***************
*** 4395,4401 **** static void
heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
OffsetNumber offnum;
--- 4386,4391 ----
***************
*** 4405,4412 **** heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! reln = XLogOpenRelation(xlrec->target.node);
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
--- 4395,4401 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
***************
*** 4454,4460 **** static void
heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
- Relation reln = XLogOpenRelation(xlrec->target.node);
Buffer buffer;
Page page;
OffsetNumber offnum;
--- 4443,4448 ----
***************
*** 4466,4472 **** heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
--- 4454,4460 ----
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 149,155 **** _bt_restore_page(Page page, char *from, int len)
}
static void
! _bt_restore_meta(Relation reln, XLogRecPtr lsn,
BlockNumber root, uint32 level,
BlockNumber fastroot, uint32 fastlevel)
{
--- 149,155 ----
}
static void
! _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
BlockNumber root, uint32 level,
BlockNumber fastroot, uint32 fastlevel)
{
***************
*** 158,164 **** _bt_restore_meta(Relation reln, XLogRecPtr lsn,
BTMetaPageData *md;
BTPageOpaque pageop;
! metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true);
Assert(BufferIsValid(metabuf));
metapg = BufferGetPage(metabuf);
--- 158,164 ----
BTMetaPageData *md;
BTPageOpaque pageop;
! metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
Assert(BufferIsValid(metabuf));
metapg = BufferGetPage(metabuf);
***************
*** 193,199 **** btree_xlog_insert(bool isleaf, bool ismeta,
XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
char *datapos;
--- 193,198 ----
***************
*** 219,229 **** btree_xlog_insert(bool isleaf, bool ismeta,
if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
return; /* nothing to do */
- reln = XLogOpenRelation(xlrec->target.node);
-
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (BufferIsValid(buffer))
--- 218,226 ----
if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
return; /* nothing to do */
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (BufferIsValid(buffer))
***************
*** 250,256 **** btree_xlog_insert(bool isleaf, bool ismeta,
}
if (ismeta)
! _bt_restore_meta(reln, lsn,
md.root, md.level,
md.fastroot, md.fastlevel);
--- 247,253 ----
}
if (ismeta)
! _bt_restore_meta(xlrec->target.node, lsn,
md.root, md.level,
md.fastroot, md.fastlevel);
***************
*** 264,270 **** btree_xlog_split(bool onleft, bool isroot,
XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
- Relation reln;
Buffer rbuf;
Page rpage;
BTPageOpaque ropaque;
--- 261,266 ----
***************
*** 276,283 **** btree_xlog_split(bool onleft, bool isroot,
Item left_hikey = NULL;
Size left_hikeysz = 0;
- reln = XLogOpenRelation(xlrec->node);
-
datapos = (char *) xlrec + SizeOfBtreeSplit;
datalen = record->xl_len - SizeOfBtreeSplit;
--- 272,277 ----
***************
*** 327,333 **** btree_xlog_split(bool onleft, bool isroot,
}
/* Reconstruct right (new) sibling from scratch */
! rbuf = XLogReadBuffer(reln, xlrec->rightsib, true);
Assert(BufferIsValid(rbuf));
rpage = (Page) BufferGetPage(rbuf);
--- 321,327 ----
}
/* Reconstruct right (new) sibling from scratch */
! rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
Assert(BufferIsValid(rbuf));
rpage = (Page) BufferGetPage(rbuf);
***************
*** 368,374 **** btree_xlog_split(bool onleft, bool isroot,
*/
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! Buffer lbuf = XLogReadBuffer(reln, xlrec->leftsib, false);
if (BufferIsValid(lbuf))
{
--- 362,368 ----
*/
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! Buffer lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
if (BufferIsValid(lbuf))
{
***************
*** 438,444 **** btree_xlog_split(bool onleft, bool isroot,
/* Fix left-link of the page to the right of the new right sibling */
if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
{
! Buffer buffer = XLogReadBuffer(reln, xlrec->rnext, false);
if (BufferIsValid(buffer))
{
--- 432,438 ----
/* Fix left-link of the page to the right of the new right sibling */
if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
{
! Buffer buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
if (BufferIsValid(buffer))
{
***************
*** 467,473 **** static void
btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete *xlrec;
- Relation reln;
Buffer buffer;
Page page;
BTPageOpaque opaque;
--- 461,466 ----
***************
*** 476,483 **** btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
return;
xlrec = (xl_btree_delete *) XLogRecGetData(record);
! reln = XLogOpenRelation(xlrec->node);
! buffer = XLogReadBuffer(reln, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
--- 469,475 ----
return;
xlrec = (xl_btree_delete *) XLogRecGetData(record);
! buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
***************
*** 516,522 **** static void
btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
- Relation reln;
BlockNumber parent;
BlockNumber target;
BlockNumber leftsib;
--- 508,513 ----
***************
*** 525,531 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
Page page;
BTPageOpaque pageop;
- reln = XLogOpenRelation(xlrec->target.node);
parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
target = xlrec->deadblk;
leftsib = xlrec->leftblk;
--- 516,521 ----
***************
*** 534,540 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
/* parent page */
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(reln, parent, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
--- 524,530 ----
/* parent page */
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(xlrec->target.node, parent, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
***************
*** 580,586 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
/* Fix left-link of right sibling */
if (!(record->xl_info & XLR_BKP_BLOCK_2))
{
! buffer = XLogReadBuffer(reln, rightsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
--- 570,576 ----
/* Fix left-link of right sibling */
if (!(record->xl_info & XLR_BKP_BLOCK_2))
{
! buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
***************
*** 606,612 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
{
if (leftsib != P_NONE)
{
! buffer = XLogReadBuffer(reln, leftsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
--- 596,602 ----
{
if (leftsib != P_NONE)
{
! buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
***************
*** 629,635 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
}
/* Rewrite target page as empty deleted page */
! buffer = XLogReadBuffer(reln, target, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 619,625 ----
}
/* Rewrite target page as empty deleted page */
! buffer = XLogReadBuffer(xlrec->target.node, target, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 654,660 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
sizeof(xl_btree_metadata));
! _bt_restore_meta(reln, lsn,
md.root, md.level,
md.fastroot, md.fastlevel);
}
--- 644,650 ----
memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
sizeof(xl_btree_metadata));
! _bt_restore_meta(xlrec->target.node, lsn,
md.root, md.level,
md.fastroot, md.fastlevel);
}
***************
*** 671,684 **** static void
btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
- Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
BlockNumber downlink = 0;
! reln = XLogOpenRelation(xlrec->node);
! buffer = XLogReadBuffer(reln, xlrec->rootblk, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 661,672 ----
btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque pageop;
BlockNumber downlink = 0;
! buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 710,716 **** btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
! _bt_restore_meta(reln, lsn,
xlrec->rootblk, xlrec->level,
xlrec->rootblk, xlrec->level);
--- 698,704 ----
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
! _bt_restore_meta(xlrec->node, lsn,
xlrec->rootblk, xlrec->level,
xlrec->rootblk, xlrec->level);
***************
*** 903,911 **** btree_xlog_cleanup(void)
foreach(l, incomplete_actions)
{
bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
- Relation reln;
- reln = XLogOpenRelation(action->node);
if (action->is_split)
{
/* finish an incomplete split */
--- 891,897 ----
***************
*** 916,929 **** btree_xlog_cleanup(void)
BTPageOpaque lpageop,
rpageop;
bool is_only;
! lbuf = XLogReadBuffer(reln, action->leftblk, false);
/* failure is impossible because we wrote this page earlier */
if (!BufferIsValid(lbuf))
elog(PANIC, "btree_xlog_cleanup: left block unfound");
lpage = (Page) BufferGetPage(lbuf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
! rbuf = XLogReadBuffer(reln, action->rightblk, false);
/* failure is impossible because we wrote this page earlier */
if (!BufferIsValid(rbuf))
elog(PANIC, "btree_xlog_cleanup: right block unfound");
--- 902,916 ----
BTPageOpaque lpageop,
rpageop;
bool is_only;
+ Relation reln;
! lbuf = XLogReadBuffer(action->node, action->leftblk, false);
/* failure is impossible because we wrote this page earlier */
if (!BufferIsValid(lbuf))
elog(PANIC, "btree_xlog_cleanup: left block unfound");
lpage = (Page) BufferGetPage(lbuf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
! rbuf = XLogReadBuffer(action->node, action->rightblk, false);
/* failure is impossible because we wrote this page earlier */
if (!BufferIsValid(rbuf))
elog(PANIC, "btree_xlog_cleanup: right block unfound");
***************
*** 933,950 **** btree_xlog_cleanup(void)
/* if the pages are all of their level, it's a only-page split */
is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
_bt_insert_parent(reln, lbuf, rbuf, NULL,
action->is_root, is_only);
}
else
{
/* finish an incomplete deletion (of a half-dead page) */
Buffer buf;
! buf = XLogReadBuffer(reln, action->delblk, false);
if (BufferIsValid(buf))
if (_bt_pagedel(reln, buf, NULL, true) == 0)
elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
}
}
incomplete_actions = NIL;
--- 920,945 ----
/* if the pages are all of their level, it's a only-page split */
is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
+ reln = XLogFakeRelcacheEntry(action->node);
_bt_insert_parent(reln, lbuf, rbuf, NULL,
action->is_root, is_only);
+ pfree(reln);
}
else
{
/* finish an incomplete deletion (of a half-dead page) */
Buffer buf;
! buf = XLogReadBuffer(action->node, action->delblk, false);
if (BufferIsValid(buf))
+ {
+ Relation reln;
+
+ reln = XLogFakeRelcacheEntry(action->node);
if (_bt_pagedel(reln, buf, NULL, true) == 0)
elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
+ pfree(reln);
+ }
}
}
incomplete_actions = NIL;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2806,2812 **** CleanupBackupHistory(void)
static void
RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
{
- Relation reln;
Buffer buffer;
Page page;
BkpBlock bkpb;
--- 2806,2811 ----
***************
*** 2822,2829 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
! reln = XLogOpenRelation(bkpb.node);
! buffer = XLogReadBuffer(reln, bkpb.block, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 2821,2827 ----
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
! buffer = XLogReadBuffer(bkpb.node, bkpb.block, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 5027,5035 **** StartupXLOG(void)
BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
}
! /* Start up the recovery environment */
! XLogInitRelationCache();
!
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
{
if (RmgrTable[rmid].rm_startup != NULL)
--- 5025,5031 ----
BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
}
! /* Initialize resource managers */
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
{
if (RmgrTable[rmid].rm_startup != NULL)
***************
*** 5293,5303 **** StartupXLOG(void)
* allows some extra error checking in xlog_redo.
*/
CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
-
- /*
- * Close down recovery environment
- */
- XLogCloseRelationCache();
}
/*
--- 5289,5294 ----
*** a/src/backend/access/transam/xlogutils.c
--- b/src/backend/access/transam/xlogutils.c
***************
*** 190,195 **** XLogCheckInvalidPages(void)
--- 190,198 ----
if (foundone)
elog(PANIC, "WAL contains references to invalid pages");
+
+ hash_destroy(invalid_page_tab);
+ invalid_page_tab = NULL;
}
***************
*** 218,244 **** XLogCheckInvalidPages(void)
* at the end of WAL replay.)
*/
Buffer
! XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
{
! BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
Buffer buffer;
Assert(blkno != P_NEW);
if (blkno < lastblock)
{
/* page exists in file */
! if (init)
! buffer = ReadOrZeroBuffer(reln, blkno);
! else
! buffer = ReadBuffer(reln, blkno);
}
else
{
/* hm, page doesn't exist in file */
if (!init)
{
! log_invalid_page(reln->rd_node, blkno, false);
return InvalidBuffer;
}
/* OK to extend the file */
--- 221,261 ----
* at the end of WAL replay.)
*/
Buffer
! XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
{
! BlockNumber lastblock;
Buffer buffer;
+ SMgrRelation smgr;
Assert(blkno != P_NEW);
+ /* Open the relation at smgr level */
+ smgr = smgropen(rnode);
+
+ /*
+ * Create the target file if it doesn't already exist. This lets us cope
+ * if the replay sequence contains writes to a relation that is later
+ * deleted. (The original coding of this routine would instead suppress
+ * the writes, but that seems like it risks losing valuable data if the
+ * filesystem loses an inode during a crash. Better to write the data
+ * until we are actually told to delete the file.)
+ */
+ if (smgr->md_fd == NULL) /* XXX: Modularity violation! */
+ smgrcreate(smgr, false, true);
+
+ lastblock = smgrnblocks(smgr);
+
if (blkno < lastblock)
{
/* page exists in file */
! buffer = ReadBufferWithoutRelcache(rnode, false, blkno, init);
}
else
{
/* hm, page doesn't exist in file */
if (!init)
{
! log_invalid_page(rnode, blkno, false);
return InvalidBuffer;
}
/* OK to extend the file */
***************
*** 249,255 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
{
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer);
! buffer = ReadBuffer(reln, P_NEW);
lastblock++;
}
Assert(BufferGetBlockNumber(buffer) == blkno);
--- 266,272 ----
{
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer);
! buffer = ReadBufferWithoutRelcache(rnode, false, P_NEW, false);
lastblock++;
}
Assert(BufferGetBlockNumber(buffer) == blkno);
***************
*** 265,271 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
if (PageIsNew((PageHeader) page))
{
UnlockReleaseBuffer(buffer);
! log_invalid_page(reln->rd_node, blkno, true);
return InvalidBuffer;
}
}
--- 282,288 ----
if (PageIsNew((PageHeader) page))
{
UnlockReleaseBuffer(buffer);
! log_invalid_page(rnode, blkno, true);
return InvalidBuffer;
}
}
***************
*** 273,472 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
return buffer;
}
!
! /*
! * Lightweight "Relation" cache --- this substitutes for the normal relcache
! * during XLOG replay.
! */
!
! typedef struct XLogRelDesc
! {
! RelationData reldata;
! struct XLogRelDesc *lessRecently;
! struct XLogRelDesc *moreRecently;
! } XLogRelDesc;
!
! typedef struct XLogRelCacheEntry
! {
! RelFileNode rnode;
! XLogRelDesc *rdesc;
! } XLogRelCacheEntry;
!
! static HTAB *_xlrelcache;
! static XLogRelDesc *_xlrelarr = NULL;
! static Form_pg_class _xlpgcarr = NULL;
! static int _xlast = 0;
! static int _xlcnt = 0;
!
! #define _XLOG_RELCACHESIZE 512
!
! static void
! _xl_init_rel_cache(void)
! {
! HASHCTL ctl;
!
! _xlcnt = _XLOG_RELCACHESIZE;
! _xlast = 0;
! _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt);
! memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
! _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
! memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt);
!
! _xlrelarr[0].moreRecently = &(_xlrelarr[0]);
! _xlrelarr[0].lessRecently = &(_xlrelarr[0]);
!
! memset(&ctl, 0, sizeof(ctl));
! ctl.keysize = sizeof(RelFileNode);
! ctl.entrysize = sizeof(XLogRelCacheEntry);
! ctl.hash = tag_hash;
!
! _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE,
! &ctl, HASH_ELEM | HASH_FUNCTION);
! }
!
! static void
! _xl_remove_hash_entry(XLogRelDesc *rdesc)
! {
! Form_pg_class tpgc = rdesc->reldata.rd_rel;
! XLogRelCacheEntry *hentry;
!
! rdesc->lessRecently->moreRecently = rdesc->moreRecently;
! rdesc->moreRecently->lessRecently = rdesc->lessRecently;
!
! hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache,
! (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL);
! if (hentry == NULL)
! elog(PANIC, "_xl_remove_hash_entry: file was not found in cache");
!
! RelationCloseSmgr(&(rdesc->reldata));
!
! memset(rdesc, 0, sizeof(XLogRelDesc));
! memset(tpgc, 0, sizeof(FormData_pg_class));
! rdesc->reldata.rd_rel = tpgc;
! }
!
! static XLogRelDesc *
! _xl_new_reldesc(void)
! {
! XLogRelDesc *res;
!
! _xlast++;
! if (_xlast < _xlcnt)
! {
! _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
! return &(_xlrelarr[_xlast]);
! }
!
! /* reuse */
! res = _xlrelarr[0].moreRecently;
!
! _xl_remove_hash_entry(res);
!
! _xlast--;
! return res;
! }
!
!
! void
! XLogInitRelationCache(void)
{
! _xl_init_rel_cache();
! invalid_page_tab = NULL;
! }
!
! void
! XLogCloseRelationCache(void)
! {
! HASH_SEQ_STATUS status;
! XLogRelCacheEntry *hentry;
!
! if (!_xlrelarr)
! return;
! hash_seq_init(&status, _xlrelcache);
!
! while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
! _xl_remove_hash_entry(hentry->rdesc);
!
! hash_destroy(_xlrelcache);
!
! free(_xlrelarr);
! free(_xlpgcarr);
!
! _xlrelarr = NULL;
! }
/*
! * Open a relation during XLOG replay
*
! * Note: this once had an API that allowed NULL return on failure, but it
! * no longer does; any failure results in elog().
*/
Relation
! XLogOpenRelation(RelFileNode rnode)
{
! XLogRelDesc *res;
! XLogRelCacheEntry *hentry;
! bool found;
! hentry = (XLogRelCacheEntry *)
! hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
! if (hentry)
! {
! res = hentry->rdesc;
! res->lessRecently->moreRecently = res->moreRecently;
! res->moreRecently->lessRecently = res->lessRecently;
! }
! else
! {
! res = _xl_new_reldesc();
!
! sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode);
!
! res->reldata.rd_node = rnode;
!
! /*
! * We set up the lockRelId in case anything tries to lock the dummy
! * relation. Note that this is fairly bogus since relNode may be
! * different from the relation's OID. It shouldn't really matter
! * though, since we are presumably running by ourselves and can't have
! * any lock conflicts ...
! */
! res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode;
! res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
!
! hentry = (XLogRelCacheEntry *)
! hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
!
! if (found)
! elog(PANIC, "xlog relation already present on insert into cache");
!
! hentry->rdesc = res;
!
! res->reldata.rd_targblock = InvalidBlockNumber;
! res->reldata.rd_smgr = NULL;
! RelationOpenSmgr(&(res->reldata));
!
! /*
! * Create the target file if it doesn't already exist. This lets us
! * cope if the replay sequence contains writes to a relation that is
! * later deleted. (The original coding of this routine would instead
! * return NULL, causing the writes to be suppressed. But that seems
! * like it risks losing valuable data if the filesystem loses an inode
! * during a crash. Better to write the data until we are actually
! * told to delete the file.)
! */
! smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true);
! }
! res->moreRecently = &(_xlrelarr[0]);
! res->lessRecently = _xlrelarr[0].lessRecently;
! _xlrelarr[0].lessRecently = res;
! res->lessRecently->moreRecently = res;
! return &(res->reldata);
}
/*
--- 290,349 ----
return buffer;
}
! /* Struct returned by XLogFakeRelcacheEntry */
! typedef struct
{
! RelationData reldata; /* Note: this must be first */
! FormData_pg_class pgc;
! } FakeRelCacheEntryData;
! typedef FakeRelCacheEntryData *FakeRelCacheEntry;
/*
! * Create a fake relation cache entry for a physical relation
*
! * It's often convenient to use the same functions in XLOG replay as in the
! * main codepath, but those functions often work with a relcache entry. We
! * don't have a working relation cache during XLOG replay, but this function
! * can be used to create a fake relcache entry instead. Only the fields
! * related to physical storage, like rd_rel, are initialized, so the fake
! * entry is only usable in low-level operations.
! *
! * Caller must pfree() the returned entry.
*/
Relation
! XLogFakeRelcacheEntry(RelFileNode rnode)
{
! FakeRelCacheEntry fakeentry;
! Relation rel;
! /*
! * We allocate the RelationData struct and all related space in one
! * block, so that the caller can free it with one simple pfree call.
! */
! fakeentry = palloc0(sizeof(FakeRelCacheEntryData));
! rel = (Relation) fakeentry;
! rel->rd_rel = &fakeentry->pgc;
! rel->rd_node = rnode;
! /* We don't know the name of the relation; use relfilenode instead */
! sprintf(RelationGetRelationName(rel), "%u", rnode.relNode);
!
! /*
! * We set up the lockRelId in case anything tries to lock the dummy
! * relation. Note that this is fairly bogus since relNode may be
! * different from the relation's OID. It shouldn't really matter
! * though, since we are presumably running by ourselves and can't have
! * any lock conflicts ...
! */
! rel->rd_lockInfo.lockRelId.dbId = rnode.dbNode;
! rel->rd_lockInfo.lockRelId.relId = rnode.relNode;
! rel->rd_targblock = InvalidBlockNumber;
! rel->rd_smgr = NULL;
! return rel;
}
/*
***************
*** 484,500 **** XLogOpenRelation(RelFileNode rnode)
void
XLogDropRelation(RelFileNode rnode)
{
! XLogRelCacheEntry *hentry;
!
! hentry = (XLogRelCacheEntry *)
! hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
!
! if (hentry)
! {
! XLogRelDesc *rdesc = hentry->rdesc;
!
! RelationCloseSmgr(&(rdesc->reldata));
! }
forget_invalid_pages(rnode, 0);
}
--- 361,367 ----
void
XLogDropRelation(RelFileNode rnode)
{
! smgrclosenode(rnode);
forget_invalid_pages(rnode, 0);
}
***************
*** 507,524 **** XLogDropRelation(RelFileNode rnode)
void
XLogDropDatabase(Oid dbid)
{
! HASH_SEQ_STATUS status;
! XLogRelCacheEntry *hentry;
!
! hash_seq_init(&status, _xlrelcache);
!
! while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
! {
! XLogRelDesc *rdesc = hentry->rdesc;
!
! if (hentry->rnode.dbNode == dbid)
! RelationCloseSmgr(&(rdesc->reldata));
! }
forget_invalid_pages_db(dbid);
}
--- 374,386 ----
void
XLogDropDatabase(Oid dbid)
{
! /*
! * This is unnecessarily heavy-handed, as it will close SMgrRelation
! * objects for other databases as well. DROP DATABASE occurs seldom
! * enough that it's not worth introducing a variant of smgrclose for
! * just this purpose.
! */
! smgrcloseall();
forget_invalid_pages_db(dbid);
}
*** a/src/backend/commands/sequence.c
--- b/src/backend/commands/sequence.c
***************
*** 1266,1272 **** void
seq_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- Relation reln;
Buffer buffer;
Page page;
char *item;
--- 1266,1271 ----
***************
*** 1277,1284 **** seq_redo(XLogRecPtr lsn, XLogRecord *record)
if (info != XLOG_SEQ_LOG)
elog(PANIC, "seq_redo: unknown op code %u", info);
! reln = XLogOpenRelation(xlrec->node);
! buffer = XLogReadBuffer(reln, 0, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
--- 1276,1282 ----
if (info != XLOG_SEQ_LOG)
elog(PANIC, "seq_redo: unknown op code %u", info);
! buffer = XLogReadBuffer(xlrec->node, 0, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
*** a/src/backend/storage/buffer/bufmgr.c
--- b/src/backend/storage/buffer/bufmgr.c
***************
*** 76,84 **** static bool IsForInput;
static volatile BufferDesc *PinCountWaitBuf = NULL;
! static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum,
! bool zeroPage,
! BufferAccessStrategy strategy);
static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(volatile BufferDesc *buf);
static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
--- 76,83 ----
static volatile BufferDesc *PinCountWaitBuf = NULL;
! static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf, BlockNumber blockNum,
! bool zeroPage, BufferAccessStrategy strategy, bool *hit);
static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(volatile BufferDesc *buf);
static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
***************
*** 89,95 **** static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
int set_flag_bits);
static void buffer_write_error_callback(void *arg);
! static volatile BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr);
static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
--- 88,94 ----
static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
int set_flag_bits);
static void buffer_write_error_callback(void *arg);
! static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr);
static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
***************
*** 114,120 **** static void AtProcExit_Buffers(int code, Datum arg);
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
! return ReadBuffer_common(reln, blockNum, false, NULL);
}
/*
--- 113,130 ----
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
! bool hit;
! Buffer buf;
!
! /* Open it at the smgr level if not already done */
! RelationOpenSmgr(reln);
!
! pgstat_count_buffer_read(reln);
! buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, false, NULL, &hit);
! if (hit)
! pgstat_count_buffer_hit(reln);
!
! return buf;
}
/*
***************
*** 125,131 **** Buffer
ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
BufferAccessStrategy strategy)
{
! return ReadBuffer_common(reln, blockNum, false, strategy);
}
/*
--- 135,151 ----
ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
BufferAccessStrategy strategy)
{
! bool hit;
! Buffer buf;
!
! /* Open it at the smgr level if not already done */
! RelationOpenSmgr(reln);
!
! pgstat_count_buffer_read(reln);
! buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, false, strategy, &hit);
! if (hit)
! pgstat_count_buffer_hit(reln);
! return buf;
}
/*
***************
*** 142,182 **** ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
Buffer
ReadOrZeroBuffer(Relation reln, BlockNumber blockNum)
{
! return ReadBuffer_common(reln, blockNum, true, NULL);
}
/*
* ReadBuffer_common -- common logic for ReadBuffer variants
*/
static Buffer
! ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
! BufferAccessStrategy strategy)
{
volatile BufferDesc *bufHdr;
Block bufBlock;
bool found;
bool isExtend;
! bool isLocalBuf;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
isExtend = (blockNum == P_NEW);
- isLocalBuf = reln->rd_istemp;
-
- /* Open it at the smgr level if not already done */
- RelationOpenSmgr(reln);
/* Substitute proper block number if caller asked for P_NEW */
if (isExtend)
! blockNum = smgrnblocks(reln->rd_smgr);
!
! pgstat_count_buffer_read(reln);
if (isLocalBuf)
{
ReadLocalBufferCount++;
! bufHdr = LocalBufferAlloc(reln, blockNum, &found);
if (found)
LocalBufferHitCount++;
}
--- 162,222 ----
Buffer
ReadOrZeroBuffer(Relation reln, BlockNumber blockNum)
{
! bool hit;
! Buffer buf;
!
! /* Open it at the smgr level if not already done */
! RelationOpenSmgr(reln);
!
! pgstat_count_buffer_read(reln);
! buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, true, NULL, &hit);
! if (hit)
! pgstat_count_buffer_hit(reln);
! return buf;
! }
!
! /*
! * ReadBufferWithoutRelcache -- like ReadBuffer, but can be used to read
! * read a page without having a relcache entry. If zeroPage is true,
! * this behaves like ReadOrZeroBuffer rather than ReadBuffer.
! */
! Buffer
! ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
! BlockNumber blockNum, bool zeroPage)
! {
! bool hit;
!
! SMgrRelation smgr = smgropen(rnode);
! return ReadBuffer_common(smgr, isTemp, blockNum, zeroPage, NULL, &hit);
}
/*
* ReadBuffer_common -- common logic for ReadBuffer variants
*/
static Buffer
! ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, BlockNumber blockNum,
! bool zeroPage, BufferAccessStrategy strategy, bool *hit)
{
volatile BufferDesc *bufHdr;
Block bufBlock;
bool found;
bool isExtend;
!
! *hit = false;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
isExtend = (blockNum == P_NEW);
/* Substitute proper block number if caller asked for P_NEW */
if (isExtend)
! blockNum = smgrnblocks(smgr);
if (isLocalBuf)
{
ReadLocalBufferCount++;
! bufHdr = LocalBufferAlloc(smgr, blockNum, &found);
if (found)
LocalBufferHitCount++;
}
***************
*** 188,194 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
* lookup the buffer. IO_IN_PROGRESS is set if the requested block is
* not currently in memory.
*/
! bufHdr = BufferAlloc(reln, blockNum, strategy, &found);
if (found)
BufferHitCount++;
}
--- 228,234 ----
* lookup the buffer. IO_IN_PROGRESS is set if the requested block is
* not currently in memory.
*/
! bufHdr = BufferAlloc(smgr, blockNum, strategy, &found);
if (found)
BufferHitCount++;
}
***************
*** 201,207 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
if (!isExtend)
{
/* Just need to update stats before we exit */
! pgstat_count_buffer_hit(reln);
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageHit;
--- 241,247 ----
if (!isExtend)
{
/* Just need to update stats before we exit */
! *hit = true;
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageHit;
***************
*** 225,232 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((PageHeader) bufBlock))
ereport(ERROR,
! (errmsg("unexpected data beyond EOF in block %u of relation \"%s\"",
! blockNum, RelationGetRelationName(reln)),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
/*
--- 265,272 ----
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((PageHeader) bufBlock))
ereport(ERROR,
! (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u",
! blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
/*
***************
*** 278,285 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
{
/* new buffers are zero-filled */
MemSet((char *) bufBlock, 0, BLCKSZ);
! smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock,
! reln->rd_istemp);
}
else
{
--- 318,325 ----
{
/* new buffers are zero-filled */
MemSet((char *) bufBlock, 0, BLCKSZ);
! smgrextend(smgr, blockNum, (char *) bufBlock,
! isLocalBuf);
}
else
{
***************
*** 290,296 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
if (zeroPage)
MemSet((char *) bufBlock, 0, BLCKSZ);
else
! smgrread(reln->rd_smgr, blockNum, (char *) bufBlock);
/* check for garbage data */
if (!PageHeaderIsValid((PageHeader) bufBlock))
{
--- 330,336 ----
if (zeroPage)
MemSet((char *) bufBlock, 0, BLCKSZ);
else
! smgrread(smgr, blockNum, (char *) bufBlock);
/* check for garbage data */
if (!PageHeaderIsValid((PageHeader) bufBlock))
{
***************
*** 298,312 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
! blockNum, RelationGetRelationName(reln))));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation \"%s\"",
! blockNum, RelationGetRelationName(reln))));
}
}
--- 338,357 ----
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation %u/%u/%u; zeroing out page",
! blockNum,
! smgr->smgr_rnode.spcNode,
! smgr->smgr_rnode.dbNode,
! smgr->smgr_rnode.relNode)));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
! errmsg("invalid page header in block %u of relation %u/%u/%u",
! blockNum, smgr->smgr_rnode.spcNode,
! smgr->smgr_rnode.dbNode,
! smgr->smgr_rnode.relNode)));
}
}
***************
*** 347,353 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
* No locks are held either at entry or exit.
*/
static volatile BufferDesc *
! BufferAlloc(Relation reln,
BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr)
--- 392,398 ----
* No locks are held either at entry or exit.
*/
static volatile BufferDesc *
! BufferAlloc(SMgrRelation smgr,
BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr)
***************
*** 364,370 **** BufferAlloc(Relation reln,
bool valid;
/* create a tag so we can lookup the buffer */
! INIT_BUFFERTAG(newTag, reln, blockNum);
/* determine its hash code and partition lock ID */
newHash = BufTableHashCode(&newTag);
--- 409,415 ----
bool valid;
/* create a tag so we can lookup the buffer */
! INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);
/* determine its hash code and partition lock ID */
newHash = BufTableHashCode(&newTag);
*** a/src/backend/storage/buffer/localbuf.c
--- b/src/backend/storage/buffer/localbuf.c
***************
*** 61,67 **** static Block GetLocalBufferStorage(void);
* (hence, usage_count is always advanced).
*/
BufferDesc *
! LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
{
BufferTag newTag; /* identity of requested block */
LocalBufferLookupEnt *hresult;
--- 61,67 ----
* (hence, usage_count is always advanced).
*/
BufferDesc *
! LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
{
BufferTag newTag; /* identity of requested block */
LocalBufferLookupEnt *hresult;
***************
*** 70,76 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
int trycounter;
bool found;
! INIT_BUFFERTAG(newTag, reln, blockNum);
/* Initialize local buffers if first request in this session */
if (LocalBufHash == NULL)
--- 70,76 ----
int trycounter;
bool found;
! INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);
/* Initialize local buffers if first request in this session */
if (LocalBufHash == NULL)
***************
*** 87,93 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
#ifdef LBDEBUG
fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
! RelationGetRelid(reln), blockNum, -b - 1);
#endif
/* this part is equivalent to PinBuffer for a shared buffer */
if (LocalRefCount[b] == 0)
--- 87,93 ----
Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
#ifdef LBDEBUG
fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
! smgr->smgr_rnode.relNode, blockNum, -b - 1);
#endif
/* this part is equivalent to PinBuffer for a shared buffer */
if (LocalRefCount[b] == 0)
*** a/src/backend/utils/init/flatfiles.c
--- b/src/backend/utils/init/flatfiles.c
***************
*** 701,712 **** BuildFlatFiles(bool database_only)
rel_authid,
rel_authmem;
- /*
- * We don't have any hope of running a real relcache, but we can use the
- * same fake-relcache facility that WAL replay uses.
- */
- XLogInitRelationCache();
-
/* Need a resowner to keep the heapam and buffer code happy */
owner = ResourceOwnerCreate(NULL, "BuildFlatFiles");
CurrentResourceOwner = owner;
--- 701,706 ----
***************
*** 716,724 **** BuildFlatFiles(bool database_only)
rnode.dbNode = 0;
rnode.relNode = DatabaseRelationId;
! /* No locking is needed because no one else is alive yet */
! rel_db = XLogOpenRelation(rnode);
write_database_file(rel_db, true);
if (!database_only)
{
--- 710,724 ----
rnode.dbNode = 0;
rnode.relNode = DatabaseRelationId;
! /*
! * We don't have any hope of running a real relcache, but we can use the
! * same fake-relcache facility that WAL replay uses.
! *
! * No locking is needed because no one else is alive yet.
! */
! rel_db = XLogFakeRelcacheEntry(rnode);
write_database_file(rel_db, true);
+ pfree(rel_db);
if (!database_only)
{
***************
*** 726,746 **** BuildFlatFiles(bool database_only)
rnode.spcNode = GLOBALTABLESPACE_OID;
rnode.dbNode = 0;
rnode.relNode = AuthIdRelationId;
! rel_authid = XLogOpenRelation(rnode);
/* hard-wired path to pg_auth_members */
rnode.spcNode = GLOBALTABLESPACE_OID;
rnode.dbNode = 0;
rnode.relNode = AuthMemRelationId;
! rel_authmem = XLogOpenRelation(rnode);
write_auth_file(rel_authid, rel_authmem);
}
CurrentResourceOwner = NULL;
ResourceOwnerDelete(owner);
-
- XLogCloseRelationCache();
}
--- 726,746 ----
rnode.spcNode = GLOBALTABLESPACE_OID;
rnode.dbNode = 0;
rnode.relNode = AuthIdRelationId;
! rel_authid = XLogFakeRelcacheEntry(rnode);
/* hard-wired path to pg_auth_members */
rnode.spcNode = GLOBALTABLESPACE_OID;
rnode.dbNode = 0;
rnode.relNode = AuthMemRelationId;
! rel_authmem = XLogFakeRelcacheEntry(rnode);
write_auth_file(rel_authid, rel_authmem);
+ pfree(rel_authid);
+ pfree(rel_authmem);
}
CurrentResourceOwner = NULL;
ResourceOwnerDelete(owner);
}
*** a/src/include/access/xlogutils.h
--- b/src/include/access/xlogutils.h
***************
*** 12,29 ****
#define XLOG_UTILS_H
#include "storage/buf.h"
#include "utils/rel.h"
- extern void XLogInitRelationCache(void);
extern void XLogCheckInvalidPages(void);
- extern void XLogCloseRelationCache(void);
! extern Relation XLogOpenRelation(RelFileNode rnode);
extern void XLogDropRelation(RelFileNode rnode);
extern void XLogDropDatabase(Oid dbid);
extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks);
! extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init);
#endif
--- 12,29 ----
#define XLOG_UTILS_H
#include "storage/buf.h"
+ #include "storage/smgr.h"
#include "utils/rel.h"
extern void XLogCheckInvalidPages(void);
! extern Relation XLogFakeRelcacheEntry(RelFileNode rnode);
!
extern void XLogDropRelation(RelFileNode rnode);
extern void XLogDropDatabase(Oid dbid);
extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks);
! extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
#endif
*** a/src/include/storage/buf_internals.h
--- b/src/include/storage/buf_internals.h
***************
*** 18,23 ****
--- 18,24 ----
#include "storage/buf.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
+ #include "storage/smgr.h"
#include "storage/spin.h"
#include "utils/rel.h"
***************
*** 75,83 **** typedef struct buftag
(a).blockNum = InvalidBlockNumber \
)
! #define INIT_BUFFERTAG(a,xx_reln,xx_blockNum) \
( \
! (a).rnode = (xx_reln)->rd_node, \
(a).blockNum = (xx_blockNum) \
)
--- 76,84 ----
(a).blockNum = InvalidBlockNumber \
)
! #define INIT_BUFFERTAG(a,xx_rnode,xx_blockNum) \
( \
! (a).rnode = (xx_rnode), \
(a).blockNum = (xx_blockNum) \
)
***************
*** 201,207 **** extern int BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id);
extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);
/* localbuf.c */
! extern BufferDesc *LocalBufferAlloc(Relation reln, BlockNumber blockNum,
bool *foundPtr);
extern void MarkLocalBufferDirty(Buffer buffer);
extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
--- 202,208 ----
extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);
/* localbuf.c */
! extern BufferDesc *LocalBufferAlloc(SMgrRelation reln, BlockNumber blockNum,
bool *foundPtr);
extern void MarkLocalBufferDirty(Buffer buffer);
extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
*** a/src/include/storage/bufmgr.h
--- b/src/include/storage/bufmgr.h
***************
*** 121,126 **** extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
--- 121,128 ----
extern Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
BufferAccessStrategy strategy);
extern Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum);
+ extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
+ BlockNumber blockNum, bool zeroPage);
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer);