*** a/doc/src/sgml/gist.sgml
--- b/doc/src/sgml/gist.sgml
***************
*** 709,741 **** my_distance(PG_FUNCTION_ARGS)
-
- Crash Recovery
-
-
- Usually, replay of the WAL log is sufficient to restore the integrity
- of a GiST index following a database crash. However, there are some
- corner cases in which the index state is not fully rebuilt. The index
- will still be functionally correct, but there might be some performance
- degradation. When this occurs, the index can be repaired by
- VACUUM>ing its table, or by rebuilding the index using
- REINDEX>. In some cases a plain VACUUM> is
- not sufficient, and either VACUUM FULL> or REINDEX>
- is needed. The need for one of these procedures is indicated by occurrence
- of this log message during crash recovery:
-
- LOG: index NNN/NNN/NNN needs VACUUM or REINDEX to finish crash recovery
-
- or this log message during routine index insertions:
-
- LOG: index "FOO" needs VACUUM or REINDEX to finish crash recovery
-
- If a plain VACUUM> finds itself unable to complete recovery
- fully, it will return a notice:
-
- NOTICE: index "FOO" needs VACUUM FULL or REINDEX to finish crash recovery
-
-
-
-
--- 709,712 ----
*** a/src/backend/access/gist/README
--- b/src/backend/access/gist/README
***************
*** 108,150 **** Penalty is used for choosing a subtree to insert; method PickSplit is used for
the node splitting algorithm; method Union is used for propagating changes
upward to maintain the tree properties.
! NOTICE: We modified original INSERT algorithm for performance reason. In
! particularly, it is now a single-pass algorithm.
!
! Function findLeaf is used to identify subtree for insertion. Page, in which
! insertion is proceeded, is locked as well as its parent page. Functions
! findParent and findPath are used to find parent pages, which could be changed
! because of concurrent access. Function pageSplit is recurrent and could split
! page by more than 2 pages, which could be necessary if keys have different
! lengths or more than one key are inserted (in such situation, user defined
! function pickSplit cannot guarantee free space on page).
!
! findLeaf(new-key)
! push(stack, [root, 0]) //page, LSN
! while(true)
! ptr = top of stack
! latch( ptr->page, S-mode )
! ptr->lsn = ptr->page->lsn
! if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn )
! unlatch( ptr->page )
! pop stack
! else if ( ptr->page is not leaf )
! push( stack, [get_best_child(ptr->page, new-key), 0] )
! unlatch( ptr->page )
! else
! unlatch( ptr->page )
! latch( ptr->page, X-mode )
! if ( ptr->page is not leaf )
! //the only root page can become a non-leaf
! unlatch( ptr->page )
! else if ( ptr->parent->lsn < ptr->page->nsn )
! unlatch( ptr->page )
! pop stack
! else
! return stack
! end
! end
! end
findPath( stack item )
push stack, [root, 0, 0] // page, LSN, parent
--- 108,178 ----
the node splitting algorithm; method Union is used for propagating changes
upward to maintain the tree properties.
! To insert a tuple, we first have to find a suitable leaf page to insert to.
! The algorithm walks down the tree, starting from the root, along the path
! of smallest Penalty. At each step:
!
! 1. Has this page been split since we looked at the parent? If so, it's
! possible that we should be inserting to the other half instead, so retreat
! back to the parent.
! 2. If this is a leaf node, we've found our target node.
! 3. Otherwise use Penalty to pick a new target subtree.
! 4. Check the key representing the target subtree. If it doesn't already cover
! the key we're inserting, replace it with the Union of the old downlink key
! and the key being inserted. (Actually, we always call Union, and just skip
! the replacement if the Unioned key is the same as the existing key)
! 5. Replacing the key in step 4 might cause the page to be split. In that case,
! propagate the change upwards and restart the algorithm from the first parent
! that didn't need to be split.
! 6. Walk down to the target subtree, and goto 1.
!
! This differs from the insertion algorithm in the original paper. In the
! original paper, you first walk down the tree until you reach a leaf page, and
! then you adjust the downlink in the parent, and propagating the adjustment up,
! all the way up to the root in the worst case. But we adjust the downlinks to
! cover the new key already when we walk down, so that when we reach the leaf
! page, we don't need to update the parents anymore, except to insert the
! downlinks if we have to split the page. This makes crash recovery simpler:
! after inserting a key to the page, the tree is immediately self-consistent
! without having to update the parents. Even if we split a page and crash before
! inserting the downlink to the parent, the tree is self-consistent because the
! right half of the split is accessible via the rightlink of the left page
! (which replaced the original page).
!
! Note that the algorithm can walk up and down the tree before reaching a leaf
! page, if internal pages need to split while adjusting the downlinks for the
! new key. Eventually, you should reach the bottom, and proceed with the
! insertion of the new tuple.
!
! Once we've found the target page to insert to, we check if there's room
! for the new tuple. If there is, the tuple is inserted, and we're done.
! If it doesn't fit, however, the page needs to be split. Note that it is
! possible that a page needs to be split into more than two pages, if keys have
! different lengths or more than one key is being inserted at a time (which can
! happen when inserting downlinks for a page split that resulted in more than
! two pages at the lower level). After splitting a page, the parent page needs
! to be updated. The downlink for the new page needs to be inserted, and the
! downlink for the old page, which became the left half of the split, needs to
! be updated to only cover those tuples that stayed on the left page. Inserting
! the downlink in the parent can again lead to a page split, recursing up to the
! root page in the worst case.
!
! gistplacetopage is the workhorse function that performs one step of the
! insertion. If the tuple fits, it inserts it to the given page, otherwise
! it splits the page, and constructs the new downlink tuples for the split
! pages. The caller must then call gistplacetopage() on the parent page to
! insert the downlink tuples. The parent page that holds the downlink to
! the child might have migrated as a result of concurrent splits of the
! parent, gistfindCorrectParent() is used to find the parent page.
!
! Splitting the root page works slightly differently. At root split,
! gistplacetopage() allocates the new child pages and replaces the old root
! page with the new root containing downlinks to the new children, all in one
! operation.
!
!
! findPath is a subroutine of findParent, used when the correct parent page
! can't be found by following the rightlinks at the parent level:
findPath( stack item )
push stack, [root, 0, 0] // page, LSN, parent
***************
*** 165,170 **** findPath( stack item )
--- 193,203 ----
pop stack
end
+
+ gistFindCorrectParent is used to re-find the parent of a page during
+ insertion. It might have migrated to the right since we traversed down the
+ tree because of page splits.
+
findParent( stack item )
parent = item->parent
latch( parent->page, X-mode )
***************
*** 184,189 **** findParent( stack item )
--- 217,225 ----
return findParent( item )
end
+ pageSplit function decides how to distribute keys to the new pages after
+ page split:
+
pageSplit(page, allkeys)
(lkeys, rkeys) = pickSplit( allkeys )
if ( page is root )
***************
*** 204,242 **** pageSplit(page, allkeys)
return newkeys
- placetopage(page, keysarray)
- if ( no space left on page )
- keysarray = pageSplit(page, [ extract_keys(page), keysarray])
- last page in chain gets old NSN,
- original and others - new NSN equals to LSN
- if ( page is root )
- make new root with keysarray
- end
- else
- put keysarray on page
- if ( length of keysarray > 1 )
- keysarray = [ union(keysarray) ]
- end
- end
! insert(new-key)
! stack = findLeaf(new-key)
! keysarray = [new-key]
! ptr = top of stack
! while(true)
! findParent( ptr ) //findParent latches parent page
! keysarray = placetopage(ptr->page, keysarray)
! unlatch( ptr->page )
! pop stack;
! ptr = top of stack
! if (length of keysarray == 1)
! newboundingkey = union(oldboundingkey, keysarray)
! if (newboundingkey == oldboundingkey)
! unlatch ptr->page
! break loop
! end
! end
! end
Authors:
Teodor Sigaev
--- 240,283 ----
return newkeys
! Concurrency control
! -------------------
! As a rule of thumb, if you need to hold a lock on multiple pages at the
! same time, the locks should be acquired in the following order: child page
! before parent, and left-to-right at the same level. Always acquiring the
! locks in the same order avoids deadlocks.
!
! The search algorithm only looks at and locks one page at a time. Consequently
! there's a race condition between a search and a page split. A page split
! happens in two phases: 1. The page is split 2. The downlink is inserted to the
! parent. If a search looks at the parent page between those steps, before the
! downlink is inserted, it will still find the new right half by following the
! rightlink on the left half. But it must not follow the rightlink if it saw the
! downlink in the parent, or the page will be visited twice!
!
! A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan
! sees that flag set, it knows that the right page is missing the downlink, and
! should be visited too. When split inserts the downlink to the parent, it
! clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the
! child page header to match the LSN of the insertion on the parent. If the
! F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the
! LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page
! before the downlink was inserted, so it should follow the rightlink. Otherwise
! the scan saw the downlink in the parent page, and will/did follow that as
! usual.
!
! A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because
! a page split keeps the child pages locked until the downlink has been inserted
! to the parent and the flag cleared again. But if a crash happens in the middle
! of a page split, before the downlinks are inserted into the parent, that will
! leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine,
! but we'll eventually want to fix that for performance reasons. And more
! importantly, dealing with pages with missing downlink pointers in the parent
! would complicate the insertion algorithm. So when an insertion sees a page
! with F_FOLLOW_RIGHT set, it immediately tries to bring the split that
! crashed in the middle to completion by adding the downlink in the parent.
!
Authors:
Teodor Sigaev
*** a/src/backend/access/gist/gist.c
--- b/src/backend/access/gist/gist.c
***************
*** 43,50 **** static void gistdoinsert(Relation r,
IndexTuple itup,
Size freespace,
GISTSTATE *GISTstate);
! static void gistfindleaf(GISTInsertState *state,
! GISTSTATE *giststate);
#define ROTATEDIST(d) do { \
--- 43,54 ----
IndexTuple itup,
Size freespace,
GISTSTATE *GISTstate);
! static bool gistinserthere(GISTInsertState *state,
! GISTSTATE *giststate, IndexTuple itup, bool isupdate);
! static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
! static bool gistinsert_common(GISTInsertState *state, GISTSTATE *giststate,
! IndexTuple *tuples, int ntuples, bool isupdate,
! Buffer *oldchildren, int noldchildren);
#define ROTATEDIST(d) do { \
***************
*** 251,291 **** gistinsert(PG_FUNCTION_ARGS)
/*
! * Workhouse routine for doing insertion into a GiST index. Note that
! * this routine assumes it is invoked in a short-lived memory context,
! * so it does not bother releasing palloc'd allocations.
*/
- static void
- gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
- {
- GISTInsertState state;
-
- memset(&state, 0, sizeof(GISTInsertState));
-
- state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
- state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
- memcpy(state.itup[0], itup, IndexTupleSize(itup));
- state.ituplen = 1;
- state.freespace = freespace;
- state.r = r;
- state.key = itup->t_tid;
- state.needInsertComplete = true;
-
- state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
- state.stack->blkno = GIST_ROOT_BLKNO;
-
- gistfindleaf(&state, giststate);
- gistmakedeal(&state, giststate);
- }
-
static bool
! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
{
! bool is_splitted = false;
! bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
/*
! * if (!is_leaf) remove old key: This node's key has been modified, either
* because a child split occurred or because we needed to adjust our key
* for an insert in a child node. Therefore, remove the old version of
* this node's key.
--- 255,287 ----
/*
! * Place all tuples in 'itup' to stack->page. If 'isupdate' is true,
! * the first tuple in the array replaces the tuple at stack->childoffnum,
! * instead of being inserted as new.
! *
! * Sets NSN and clears F_FOLLOW_RIGHT flags on 'childbufs', and unlocks
! * and releases them.
! *
! * If this page was split, all the split pages are kept pinned and locked,
! * and returned in *splitbufs. New downlink tuples that need to be inserted
! * to the parent are returned in *parenttuples.
*/
static bool
! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
! IndexTuple *itup, int ntuples, bool isupdate,
! Buffer *childbufs, int nchildbufs,
! Buffer **splitbufs, int *nsplitbufs,
! IndexTuple **parenttuples, int *nparenttuples)
{
! GISTInsertStack *stack = state->stack;
! bool is_splitted;
! bool is_leaf = (GistPageIsLeaf(stack->page)) ? true : false;
! XLogRecPtr recptr;
! XLogRecPtr oldnsn;
! int i;
/*
! * if isupdate, remove old key: This node's key has been modified, either
* because a child split occurred or because we needed to adjust our key
* for an insert in a child node. Therefore, remove the old version of
* this node's key.
***************
*** 293,415 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
* for WAL replay, in the non-split case we handle this by setting up a
* one-element todelete array; in the split case, it's handled implicitly
* because the tuple vector passed to gistSplit won't include this tuple.
- *
- * XXX: If we want to change fillfactors between node and leaf, fillfactor
- * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor)
*/
! if (gistnospace(state->stack->page, state->itup, state->ituplen,
! is_leaf ? InvalidOffsetNumber : state->stack->childoffnum,
! state->freespace))
{
/* no space for insertion */
IndexTuple *itvec;
int tlen;
SplitedPageLayout *dist = NULL,
*ptr;
! BlockNumber rrlink = InvalidBlockNumber;
! GistNSN oldnsn;
!
! is_splitted = true;
/*
! * Form index tuples vector to split: remove old tuple if t's needed
! * and add new tuples to vector
*/
! itvec = gistextractpage(state->stack->page, &tlen);
! if (!is_leaf)
{
/* on inner page we should remove old tuple */
! int pos = state->stack->childoffnum - FirstOffsetNumber;
tlen--;
if (pos != tlen)
memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
}
! itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
! dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
! state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen);
! state->ituplen = 0;
!
! if (state->stack->blkno != GIST_ROOT_BLKNO)
{
! /*
! * if non-root split then we should not allocate new buffer, but
! * we must create temporary page to operate
! */
! dist->buffer = state->stack->buffer;
! dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer));
/* clean all flags except F_LEAF */
GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
- }
! /* make new pages and fills them */
! for (ptr = dist; ptr; ptr = ptr->next)
! {
! int i;
! char *data;
/* get new page */
! if (ptr->buffer == InvalidBuffer)
! {
! ptr->buffer = gistNewBuffer(state->r);
! GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
! ptr->page = BufferGetPage(ptr->buffer);
! }
ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
! /*
! * fill page, we can do it because all these pages are new (ie not
! * linked in tree or masked by temp page
! */
! data = (char *) (ptr->list);
for (i = 0; i < ptr->block.num; i++)
{
if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
data += IndexTupleSize((IndexTuple) data);
}
-
- /* set up ItemPointer and remember it for parent */
- ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
- state->itup[state->ituplen] = ptr->itup;
- state->ituplen++;
}
- /* saves old rightlink */
- if (state->stack->blkno != GIST_ROOT_BLKNO)
- rrlink = GistPageGetOpaque(dist->page)->rightlink;
-
START_CRIT_SECTION();
/*
! * must mark buffers dirty before XLogInsert, even though we'll still
! * be changing their opaque fields below. set up right links.
*/
for (ptr = dist; ptr; ptr = ptr->next)
{
MarkBufferDirty(ptr->buffer);
! GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
! ptr->next->block.blkno : rrlink;
}
! /* restore splitted non-root page */
! if (state->stack->blkno != GIST_ROOT_BLKNO)
! {
! PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
! dist->page = BufferGetPage(dist->buffer);
! }
if (!state->r->rd_istemp)
{
! XLogRecPtr recptr;
! XLogRecData *rdata;
!
! rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
! is_leaf, &(state->key), dist);
!
! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
for (ptr = dist; ptr; ptr = ptr->next)
{
--- 289,445 ----
* for WAL replay, in the non-split case we handle this by setting up a
* one-element todelete array; in the split case, it's handled implicitly
* because the tuple vector passed to gistSplit won't include this tuple.
*/
! is_splitted = gistnospace(stack->page, itup, ntuples,
! isupdate ? stack->childoffnum : InvalidOffsetNumber,
! state->freespace);
! if (is_splitted)
{
/* no space for insertion */
IndexTuple *itvec;
int tlen;
SplitedPageLayout *dist = NULL,
*ptr;
! BlockNumber oldrlink = InvalidBlockNumber;
! SplitedPageLayout rootpg;
! int nsplitpages;
! int n;
/*
! * Form index tuples vector to split. If we're replacing an old tuple,
! * remove the old version from the vector.
*/
! itvec = gistextractpage(stack->page, &tlen);
! if (isupdate)
{
/* on inner page we should remove old tuple */
! int pos = stack->childoffnum - FirstOffsetNumber;
tlen--;
if (pos != tlen)
memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
}
! itvec = gistjoinvector(itvec, &tlen, itup, ntuples);
! dist = gistSplit(state->r, stack->page, itvec, tlen, giststate);
! /*
! * Set up pages to work with. Allocate new buffers for all but the
! * leftmost page. The original page becomes the new leftmost page,
! * and is just replaced with new contents
! */
! ptr = dist;
! if (stack->blkno != GIST_ROOT_BLKNO)
{
! /* saves old rightlink */
! oldrlink = GistPageGetOpaque(stack->page)->rightlink;
+ dist->buffer = stack->buffer;
+ dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer));
/* clean all flags except F_LEAF */
GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
! dist->block.blkno = BufferGetBlockNumber(ptr->buffer);
+ ptr = ptr->next;
+ }
+ for (; ptr; ptr = ptr->next)
+ {
/* get new page */
! ptr->buffer = gistNewBuffer(state->r);
! GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
! ptr->page = BufferGetPage(ptr->buffer);
ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
+ }
+
+ /* Set up parent tuples. */
+ nsplitpages = 0;
+ for (ptr = dist; ptr; ptr = ptr->next)
+ nsplitpages++;
+ *parenttuples = (IndexTuple *) palloc(sizeof(IndexTuple) * nsplitpages);
+ *nparenttuples = nsplitpages;
+ n = 0;
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+ GistTupleSetValid(ptr->itup);
+ (*parenttuples)[n++] = ptr->itup;
+ }
! /*
! * If this is a root split, we construct the new root page with the
! * downlinks here directly, instead of requiring the caller to
! * insert them. Add the new root page to the list along with the
! * split child pages.
! */
! if (stack->blkno == GIST_ROOT_BLKNO)
! {
! rootpg.buffer = stack->buffer;
! rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
! GistPageGetOpaque(rootpg.page)->flags = 0;
!
! rootpg.block.blkno = GIST_ROOT_BLKNO;
! rootpg.block.num = *nparenttuples;
! rootpg.list = gistfillitupvec(*parenttuples, *nparenttuples, &(rootpg.lenlist));
! rootpg.itup = NULL;
!
! rootpg.next = dist;
! dist = &rootpg;
!
! *parenttuples = NULL;
! *nparenttuples = 0;
! }
!
! /*
! * Fill all pages. All the pages are new, ie. freshly allocated empty
! * pages, or a temporary copy of the old page.
! */
! for (ptr = dist; ptr; ptr = ptr->next)
! {
! char *data = (char *) (ptr->list);
for (i = 0; i < ptr->block.num; i++)
{
if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
data += IndexTupleSize((IndexTuple) data);
}
}
START_CRIT_SECTION();
/*
! * Must mark buffers dirty before XLogInsert, even though we'll still
! * be changing their opaque fields below. Also set up right links.
*/
for (ptr = dist; ptr; ptr = ptr->next)
{
MarkBufferDirty(ptr->buffer);
! if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
! {
! GistPageGetOpaque(ptr->page)->rightlink =
! ptr->next->block.blkno;
! GistMarkFollowRight(ptr->page);
! }
! else
! {
! GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
! GistClearFollowRight(ptr->page);
! }
}
+ for (i = 0; i < nchildbufs; i++)
+ MarkBufferDirty(childbufs[i]);
! /*
! * The first page in the chain was a temporary working copy meant
! * to replace the old page. Copy it over the old page.
! */
! PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
! dist->page = BufferGetPage(dist->buffer);
+ /* Write the WAL record */
if (!state->r->rd_istemp)
{
! recptr = gistXLogSplit(state->r->rd_node, stack->blkno, is_leaf,
! dist, childbufs, nchildbufs);
for (ptr = dist; ptr; ptr = ptr->next)
{
***************
*** 419,646 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
}
else
{
for (ptr = dist; ptr; ptr = ptr->next)
! {
! PageSetLSN(ptr->page, GetXLogRecPtrForTemp());
! }
! }
!
! /* set up NSN */
! oldnsn = GistPageGetOpaque(dist->page)->nsn;
! if (state->stack->blkno == GIST_ROOT_BLKNO)
! /* if root split we should put initial value */
! oldnsn = PageGetLSN(dist->page);
!
! for (ptr = dist; ptr; ptr = ptr->next)
! {
! /* only for last set oldnsn */
! GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
! PageGetLSN(ptr->page) : oldnsn;
}
/*
! * release buffers, if it was a root split then release all buffers
! * because we create all buffers
*/
! ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next;
! for (; ptr; ptr = ptr->next)
! UnlockReleaseBuffer(ptr->buffer);
!
! if (state->stack->blkno == GIST_ROOT_BLKNO)
{
! gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
! state->needInsertComplete = false;
}
-
- END_CRIT_SECTION();
}
else
{
! /* enough space */
START_CRIT_SECTION();
! if (!is_leaf)
! PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
! gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
! MarkBufferDirty(state->stack->buffer);
if (!state->r->rd_istemp)
{
OffsetNumber noffs = 0,
offs[1];
- XLogRecPtr recptr;
- XLogRecData *rdata;
! if (!is_leaf)
{
! /* only on inner page we should delete previous version */
! offs[0] = state->stack->childoffnum;
noffs = 1;
}
! rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
! offs, noffs,
! state->itup, state->ituplen,
! &(state->key));
! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
! PageSetLSN(state->stack->page, recptr);
! PageSetTLI(state->stack->page, ThisTimeLineID);
}
else
! PageSetLSN(state->stack->page, GetXLogRecPtrForTemp());
!
! if (state->stack->blkno == GIST_ROOT_BLKNO)
! state->needInsertComplete = false;
! END_CRIT_SECTION();
! if (state->ituplen > 1)
! { /* previous is_splitted==true */
! /*
! * child was splited, so we must form union for insertion in
! * parent
! */
! IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
! ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
! state->itup[0] = newtup;
! state->ituplen = 1;
! }
! else if (is_leaf)
! {
! /*
! * itup[0] store key to adjust parent, we set it to valid to
! * correct check by GistTupleIsInvalid macro in gistgetadjusted()
! */
! ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
! GistTupleSetValid(state->itup[0]);
}
}
return is_splitted;
}
/*
! * returns stack of pages, all pages in stack are pinned, and
! * leaf is X-locked
*/
-
static void
! gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
{
ItemId iid;
IndexTuple idxtuple;
! GISTPageOpaque opaque;
/*
! * walk down, We don't lock page for a long time, but so we should be
! * ready to recheck path in a bad case... We remember, that page->lsn
! * should never be invalid.
*/
for (;;)
{
! if (XLogRecPtrIsInvalid(state->stack->lsn))
! state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
! LockBuffer(state->stack->buffer, GIST_SHARE);
! gistcheckpage(state->r, state->stack->buffer);
! state->stack->page = (Page) BufferGetPage(state->stack->buffer);
! opaque = GistPageGetOpaque(state->stack->page);
! state->stack->lsn = PageGetLSN(state->stack->page);
! Assert(state->r->rd_istemp || !XLogRecPtrIsInvalid(state->stack->lsn));
! if (state->stack->blkno != GIST_ROOT_BLKNO &&
! XLByteLT(state->stack->parent->lsn, opaque->nsn))
{
/*
! * caused split non-root page is detected, go up to parent to
! * choose best child
*/
! UnlockReleaseBuffer(state->stack->buffer);
! state->stack = state->stack->parent;
continue;
}
! if (!GistPageIsLeaf(state->stack->page))
{
/*
! * This is an internal page, so continue to walk down the tree. We
! * find the child node that has the minimum insertion penalty and
! * recursively invoke ourselves to modify that node. Once the
! * recursive call returns, we may need to adjust the parent node
! * for two reasons: the child node split, or the key in this node
! * needs to be adjusted for the newly inserted key below us.
*/
! GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
! state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
! iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
! idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid);
! item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
! LockBuffer(state->stack->buffer, GIST_UNLOCK);
!
! item->parent = state->stack;
! item->child = NULL;
! if (state->stack)
! state->stack->child = item;
! state->stack = item;
! }
! else
! {
! /* be carefull, during unlock/lock page may be changed... */
! LockBuffer(state->stack->buffer, GIST_UNLOCK);
! LockBuffer(state->stack->buffer, GIST_EXCLUSIVE);
! state->stack->page = (Page) BufferGetPage(state->stack->buffer);
! opaque = GistPageGetOpaque(state->stack->page);
! if (state->stack->blkno == GIST_ROOT_BLKNO)
{
/*
! * the only page can become inner instead of leaf is a root
! * page, so for root we should recheck it
*/
! if (!GistPageIsLeaf(state->stack->page))
{
! /*
! * very rarely situation: during unlock/lock index with
! * number of pages = 1 was increased
! */
! LockBuffer(state->stack->buffer, GIST_UNLOCK);
! continue;
! }
/*
! * we don't need to check root split, because checking
! * leaf/inner is enough to recognize split for root
*/
!
}
! else if (XLByteLT(state->stack->parent->lsn, opaque->nsn))
{
! /*
! * detecting split during unlock/lock, so we should find
! * better child on parent
! */
! /* forget buffer */
! UnlockReleaseBuffer(state->stack->buffer);
! state->stack = state->stack->parent;
! continue;
}
! state->stack->lsn = PageGetLSN(state->stack->page);
! /* ok we found a leaf page and it X-locked */
break;
}
}
-
- /* now state->stack->(page, buffer and blkno) points to leaf page */
}
/*
--- 449,800 ----
}
else
{
+ recptr = GetXLogRecPtrForTemp();
for (ptr = dist; ptr; ptr = ptr->next)
! PageSetLSN(ptr->page, recptr);
}
/*
! * Return the new child buffers to the caller.
! *
! * If this was a root split, we've already inserted the downlink
! * pointers, in the form of a new root page. Therefore we can
! * release all the new buffers, and keep just the root page locked.
*/
! if (stack->blkno == GIST_ROOT_BLKNO)
{
! Assert(stack->parent == NULL);
! for (ptr = dist->next; ptr; ptr = ptr->next)
! {
! GistPageGetOpaque(ptr->page)->nsn = recptr;
! GistClearFollowRight(ptr->page);
! UnlockReleaseBuffer(ptr->buffer);
! }
! *splitbufs = NULL;
! *nsplitbufs = 0;
! }
! else
! {
! *splitbufs = (Buffer *) palloc(sizeof(Buffer) * nsplitpages);
! (*nsplitbufs) = nsplitpages;
! i = 0;
! for (ptr = dist; ptr; ptr = ptr->next)
! (*splitbufs)[i++] = ptr->buffer;
! Assert(i == *nsplitbufs);
}
}
else
{
! /*
! * Enough space. We also get here if ntuples==0.
! */
START_CRIT_SECTION();
! if (isupdate)
! PageIndexTupleDelete(stack->page, stack->childoffnum);
! gistfillbuffer(stack->page, itup, ntuples, InvalidOffsetNumber);
! MarkBufferDirty(stack->buffer);
! for (i = 0; i < nchildbufs; i++)
! MarkBufferDirty(childbufs[i]);
if (!state->r->rd_istemp)
{
OffsetNumber noffs = 0,
offs[1];
! if (isupdate)
{
! offs[0] = stack->childoffnum;
noffs = 1;
}
! recptr = gistXLogUpdate(state->r->rd_node, stack->buffer,
! offs, noffs,
! itup, ntuples,
! childbufs, nchildbufs);
! PageSetLSN(stack->page, recptr);
! PageSetTLI(stack->page, ThisTimeLineID);
}
else
! {
! recptr = GetXLogRecPtrForTemp();
! PageSetLSN(stack->page, recptr);
! }
! *parenttuples = NULL;
! *nparenttuples = 0;
! *splitbufs = NULL;
! *nsplitbufs = 0;
! }
! /*
! * If what we inserted were downlinks for some children, set NSN and
! * clear the F_FOLLOW_RIGHT flags in them. The rightmost page inherits
! * the NSN of the leftmost page, on others the NSN is set to the LSN
! * of the record that inserted the downlinks in the parent.
! *
! * Note that we do this *after* writing the WAL record. That means that
! * the possible full page image in the WAL record does not include
! * these changes, and they must be replayed even if the page is restored
! * from the full page image.
! */
! if (nchildbufs > 0)
! {
! Assert(nchildbufs > 1); /* there should always be at least two halves */
! oldnsn = GistPageGetOpaque(BufferGetPage(childbufs[0]))->nsn;
! for (i = 0; i < nchildbufs; i++)
! {
! Page p = BufferGetPage(childbufs[i]);
! if (i == nchildbufs - 1)
! GistPageGetOpaque(p)->nsn = oldnsn;
! else
! GistPageGetOpaque(p)->nsn = recptr;
! GistClearFollowRight(p);
! PageSetLSN(p, recptr);
! PageSetTLI(p, ThisTimeLineID);
! UnlockReleaseBuffer(childbufs[i]);
}
}
+
+ END_CRIT_SECTION();
+
return is_splitted;
}
/*
! * Workhouse routine for doing insertion into a GiST index. Note that
! * this routine assumes it is invoked in a short-lived memory context,
! * so it does not bother releasing palloc'd allocations.
*/
static void
! gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
{
ItemId iid;
IndexTuple idxtuple;
! GISTInsertStack firststack;
! GISTInsertStack *stack;
! GISTInsertState state;
! bool xlocked = false;
!
! memset(&state, 0, sizeof(GISTInsertState));
! state.freespace = freespace;
! state.r = r;
!
! /* Start from the root */
! firststack.blkno = GIST_ROOT_BLKNO;
! firststack.lsn.xrecoff = 0;
! firststack.parent = NULL;
! stack = &firststack;
/*
! * Walk down along the path of smallest penalty, updating the parent
! * pointers with the key we're inserting as we go. If we crash in the
! * middle, the tree is consistent, although the possible parent updates
! * were a waste.
*/
for (;;)
{
! if (XLogRecPtrIsInvalid(stack->lsn))
! stack->buffer = ReadBuffer(state.r, stack->blkno);
!
! /*
! * Be optimistic and grab shared lock first. Swap it for an
! * exclusive lock later if we need to update the page.
! */
! if (!xlocked)
! {
! LockBuffer(stack->buffer, GIST_SHARE);
! gistcheckpage(state.r, stack->buffer);
! }
! stack->page = (Page) BufferGetPage(stack->buffer);
! stack->lsn = PageGetLSN(stack->page);
! Assert(state.r->rd_istemp || !XLogRecPtrIsInvalid(stack->lsn));
! /*
! * If this page was split but the downlink was never inserted to
! * the parent because the inserting backend crashed before doing
! * that, fix that now.
! */
! if (GistFollowRight(stack->page))
! {
! if (!xlocked)
! {
! LockBuffer(stack->buffer, GIST_UNLOCK);
! LockBuffer(stack->buffer, GIST_EXCLUSIVE);
! xlocked = true;
! /* someone might've completed the split when we unlocked */
! if (!GistFollowRight(stack->page))
! continue;
! }
! state.stack = stack;
! gistfixsplit(&state, giststate);
! stack = state.stack;
! continue;
! }
! if (stack->blkno != GIST_ROOT_BLKNO &&
! XLByteLT(stack->parent->lsn,
! GistPageGetOpaque(stack->page)->nsn))
{
/*
! * Concurrent split detected. Go back to parent and rechoose the
! * best child.
*/
! UnlockReleaseBuffer(stack->buffer);
! stack = stack->parent;
continue;
}
! if (!GistPageIsLeaf(stack->page))
{
/*
! * This is an internal page so continue to walk down the tree.
! * Find the child node that has the minimum insertion penalty.
*/
! BlockNumber childblkno;
! IndexTuple newtup;
! GISTInsertStack *item;
! stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
! iid = PageGetItemId(stack->page, stack->childoffnum);
! idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
! childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
! /*
! * Check that it's not a leftover invalid tuple from pre-9.1
! */
! if (GistTupleIsInvalid(idxtuple))
! ereport(ERROR,
! (errmsg("index \"%s\" contains an inner tuple marked as invalid",
! RelationGetRelationName(r)),
! errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
! errhint("Please REINDEX it.")));
! /*
! * Check that the key representing the target child node is
! * consistent with the key we're inserting. Update it if it's not.
! */
! newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
! if (newtup)
{
+ bool popped;
+
/*
! * Swap shared lock for an exclusive one. Beware, the page
! * may change while we unlock/lock the page...
*/
! if (!xlocked)
{
! LockBuffer(stack->buffer, GIST_UNLOCK);
! LockBuffer(stack->buffer, GIST_EXCLUSIVE);
! xlocked = true;
! stack->page = (Page) BufferGetPage(stack->buffer);
+ if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
+ {
+ /* the page was changed while we unlocked it, retry */
+ continue;
+ }
+ }
/*
! * Update the tuple. gistinserthere() might have to split the
! * page to make the updated tuple fit. In that case we
! * continue the algorithm from the parent node. Note that
! * we hold an exclusive lock on stack->buffer in any case,
! * so we keep xlocked as 'true'. If the page was split,
! * stack->buffer just isn't the same page as it was before
! * the call.
*/
! state.stack = stack;
! popped = gistinserthere(&state, giststate, newtup, true);
! stack = state.stack;
! if (popped)
! continue;
}
! LockBuffer(stack->buffer, GIST_UNLOCK);
! xlocked = false;
!
! /* descend to the chosen child */
! item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
! item->blkno = childblkno;
! item->parent = stack;
! stack = item;
! }
! else
! {
! /*
! * Leaf page. Insert the new key. We've already updated all the
! * parents on the way down, but we might have to split the page
! * if it doesn't fit. gistinserthere() will take care of that.
! */
!
! /*
! * Swap shared lock for an exclusive one. Be careful, the page
! * may change while we unlock/lock the page...
! */
! if (!xlocked)
{
! LockBuffer(stack->buffer, GIST_UNLOCK);
! LockBuffer(stack->buffer, GIST_EXCLUSIVE);
! xlocked = true;
! stack->page = (Page) BufferGetPage(stack->buffer);
! stack->lsn = PageGetLSN(stack->page);
! if (stack->blkno == GIST_ROOT_BLKNO)
! {
! /*
! * the only page that can become inner instead of leaf
! * is the root page, so for root we should recheck it
! */
! if (!GistPageIsLeaf(stack->page))
! {
! /*
! * very rare situation: during unlock/lock index with
! * number of pages = 1 was increased
! */
! LockBuffer(stack->buffer, GIST_UNLOCK);
! xlocked = false;
! continue;
! }
! /*
! * we don't need to check root split, because checking
! * leaf/inner is enough to recognize split for root
! */
! }
! else if (GistFollowRight(stack->page) ||
! XLByteLT(stack->parent->lsn,
! GistPageGetOpaque(stack->page)->nsn))
! {
! /*
! * The page was split while we momentarily unlocked the
! * page. Go back to parent.
! */
! UnlockReleaseBuffer(stack->buffer);
! xlocked = false;
! stack = stack->parent;
! continue;
! }
}
! /* now state->stack->(page, buffer and blkno) points to leaf page */
! state.stack = stack;
! gistinserthere(&state, giststate, itup, false);
! stack = state.stack;
!
! /* Release any locks and pins we might still hold before exiting */
! LockBuffer(stack->buffer, GIST_UNLOCK);
! for (; stack; stack = stack->parent)
! ReleaseBuffer(stack->buffer);
break;
}
}
}
/*
***************
*** 648,654 **** gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
*
* returns from the beginning of closest parent;
*
! * To prevent deadlocks, this should lock only one page simultaneously.
*/
GISTInsertStack *
gistFindPath(Relation r, BlockNumber child)
--- 802,808 ----
*
* returns from the beginning of closest parent;
*
! * To prevent deadlocks, this should lock only one page at a time.
*/
GISTInsertStack *
gistFindPath(Relation r, BlockNumber child)
***************
*** 711,718 **** gistFindPath(Relation r, BlockNumber child)
ptr = top;
while (ptr->parent)
{
- /* set child link */
- ptr->parent->child = ptr;
/* move childoffnum.. */
if (ptr == top)
{
--- 865,870 ----
***************
*** 767,773 **** gistFindCorrectParent(Relation r, GISTInsertStack *child)
LockBuffer(parent->buffer, GIST_EXCLUSIVE);
gistcheckpage(r, parent->buffer);
parent->page = (Page) BufferGetPage(parent->buffer);
!
/* here we don't need to distinguish between split and page update */
if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
{
--- 919,925 ----
LockBuffer(parent->buffer, GIST_EXCLUSIVE);
gistcheckpage(r, parent->buffer);
parent->page = (Page) BufferGetPage(parent->buffer);
!
/* here we don't need to distinguish between split and page update */
if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
{
***************
*** 836,842 **** gistFindCorrectParent(Relation r, GISTInsertStack *child)
/* install new chain of parents to stack */
child->parent = parent;
- parent->child = child;
/* make recursive call to normal processing */
gistFindCorrectParent(r, child);
--- 988,993 ----
***************
*** 845,918 **** gistFindCorrectParent(Relation r, GISTInsertStack *child)
return;
}
! void
! gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
{
! int is_splitted;
! ItemId iid;
! IndexTuple oldtup,
! newtup;
! /* walk up */
! while (true)
{
! /*
! * After this call: 1. if child page was splited, then itup contains
! * keys for each page 2. if child page wasn't splited, then itup
! * contains additional for adjustment of current key
! */
! if (state->stack->parent)
{
/*
! * X-lock parent page before proceed child, gistFindCorrectParent
! * should find and lock it
*/
- gistFindCorrectParent(state->r, state->stack);
}
- is_splitted = gistplacetopage(state, giststate);
! /* parent locked above, so release child buffer */
! UnlockReleaseBuffer(state->stack->buffer);
! /* pop parent page from stack */
! state->stack = state->stack->parent;
!
! /* stack is void */
! if (!state->stack)
break;
/*
! * child did not split, so we can check is it needed to update parent
! * tuple
*/
! if (!is_splitted)
{
! /* parent's tuple */
! iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
! oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
! newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
!
! if (!newtup)
! { /* not need to update key */
! LockBuffer(state->stack->buffer, GIST_UNLOCK);
! break;
! }
! state->itup[0] = newtup;
}
} /* while */
!
! /* release all parent buffers */
! while (state->stack)
! {
! ReleaseBuffer(state->stack->buffer);
! state->stack = state->stack->parent;
! }
!
! /* say to xlog that insert is completed */
! if (state->needInsertComplete && !state->r->rd_istemp)
! gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
}
/*
--- 996,1193 ----
return;
}
!
! /*
! * Complete the incomplete split of state->stack->page.
! */
! static void
! gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
{
! GISTInsertStack *stack = state->stack;
! Buffer *childbufs;
! int nchildbufs;
! IndexTuple *downlinks;
! int ndownlinks;
! Buffer buf;
! Page page;
! elog(LOG, "fixing incomplete split in index \"%s\", block %u",
! RelationGetRelationName(state->r), stack->blkno);
!
! Assert(OffsetNumberIsValid(stack->parent->childoffnum));
! Assert(GistFollowRight(stack->page));
!
! childbufs = (Buffer *) palloc(sizeof(Buffer) * 2);
! nchildbufs = 0;
! downlinks = (IndexTuple *) palloc(sizeof(IndexTuple) * 2);
! ndownlinks = 0;
!
! buf = stack->buffer;
!
! /*
! * Read the chain of split pages, following the rightlinks. Construct
! * a downlink tuple for each page.
! */
! for (;;)
{
! OffsetNumber maxoff;
! OffsetNumber offset;
! IndexTuple downlink = NULL;
! page = BufferGetPage(buf);
!
! /* Form the new downlink tuples to insert to parent */
! maxoff = PageGetMaxOffsetNumber(page);
!
! for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
! {
! IndexTuple ituple = (IndexTuple)
! PageGetItem(stack->page, PageGetItemId(stack->page, offset));
! if (downlink == NULL)
! downlink = CopyIndexTuple(ituple);
! else
! {
! IndexTuple newdownlink;
! newdownlink = gistgetadjusted(state->r, downlink, ituple,
! giststate);
! if (newdownlink)
! downlink = newdownlink;
! }
! }
! if (downlink)
! {
! ItemPointerSetBlockNumber(&(downlink->t_tid),
! BufferGetBlockNumber(buf));
! GistTupleSetValid(downlink);
!
! downlinks = repalloc(downlinks, sizeof(IndexTuple) * (ndownlinks + 1));
! downlinks[ndownlinks++] = downlink;
! }
! else
{
/*
! * If the page is completely empty, we can't form a downlink to
! * insert to the parent. And we don't really want to add empty
! * pages to the tree anyway. So we simply leave it orphaned.
*/
}
! childbufs = repalloc(childbufs, sizeof(Buffer) * (nchildbufs + 1));
! childbufs[nchildbufs++] = buf;
! if (GistFollowRight(page))
! {
! /* lock next page */
! buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
! LockBuffer(buf, GIST_EXCLUSIVE);
! }
! else
break;
+ }
+
+ /* lock parent */
+ gistFindCorrectParent(state->r, stack);
+ stack = stack->parent;
+
+ /* Insert the downlinks */
+ state->stack = stack;
+ gistinsert_common(state, giststate, downlinks, ndownlinks, true,
+ childbufs, nchildbufs);
+ }
+
+ /*
+ * Insert the given tuple to the current page (state->stack->page).
+ * If the page has to be split, state->stack is popped so that
+ * state->stack->page points to the first non-split parent, and returns true.
+ * If the stack is not modified, returns false.
+ *
+ * If 'isupdate' is true, the tuple replaces the tuple at
+ * state->stack->childoffnum, otherwise it is inserted as new.
+ *
+ * The caller must hold a pin and exclusive lock on state->stack->buffer.
+ * On return, we will (continue to) hold a pin and exclusive lock on
+ * state->stack->buffer, but it might not be the same buffer any more.
+ */
+ static bool
+ gistinserthere(GISTInsertState *state,
+ GISTSTATE *giststate, IndexTuple itup, bool isupdate)
+ {
+ return gistinsert_common(state, giststate, &itup, 1, isupdate, NULL, 0);
+ }
+
+ /*
+ * Common code for gistinserthere and gistfixsplit. Inserts 'tuples' on
+ * to current page (stack->page). The NSNs are set and F_FOLLOW_RIGHT
+ * flags cleared in 'oldchildren'. 'oldchildren' are released, and if
+ * the current page is split, the stack is modified.
+ *
+ * ntuples==0 is also accepted. If isupdate is also true, that amounts to
+ * deleting a tuple. That special case can happen if we're fixing an
+ * incomplete split, and vacuum has already removed all tuples from the
+ * incompletely split pages. In any case, the children buffers are updated
+ * and a WAL record is written.
+ */
+ static bool
+ gistinsert_common(GISTInsertState *state, GISTSTATE *giststate,
+ IndexTuple *tuples, int ntuples, bool isupdate,
+ Buffer *oldchildren, int noldchildren)
+ {
+ GISTInsertStack *stack = state->stack;
+ bool walked_up = false;
+ /*
+ * Insert the new entry to the chosen target page. If it doesn't
+ * fit, split the page, and recursively split parent pages as high
+ * as necessary.
+ */
+ while (true)
+ {
/*
! * Refuse to insert to a page that's incompletely split. This should
! * not happen in practice because we finish any incomplete splits
! * while we walk down the tree. However, it's remotely possible that
! * another concurrent inserter splits a parent page, and errors out
! * before completing the split. We will just throw an error in that
! * case, and leave any split we had in progress unfinished too. The
! * next insert that comes along will clean up the mess.
*/
! if (GistFollowRight(stack->page))
! elog(ERROR, "concurrent GiST page split was incomplete");
!
! state->stack = stack;
! if (gistplacetopage(state, giststate,
! tuples, ntuples, isupdate,
! oldchildren, noldchildren,
! &oldchildren, &noldchildren,
! &tuples, &ntuples))
! walked_up = true;
! stack = state->stack;
!
! /*
! * If the page was split, we have to recurse to the parent to insert
! * new entries for the new child pages, and update the entry for
! * the original child page that was split.
! */
! if (ntuples != 0)
{
! /*
! * The downlink to this page might have migrated to a
! * different page if the parent page was split. Find and lock
! * the page currently holding the downlink.
! */
! gistFindCorrectParent(state->r, stack);
! /* pop parent page from stack */
! stack = stack->parent;
! isupdate = true;
! }
! else
! {
! break;
}
} /* while */
! state->stack = stack;
! return walked_up;
}
/*
***************
*** 963,970 **** gistSplit(Relation r,
ROTATEDIST(res);
res->block.num = v.splitVector.spl_nright;
res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
! res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false)
! : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}
if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
--- 1238,1244 ----
ROTATEDIST(res);
res->block.num = v.splitVector.spl_nright;
res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
! res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
}
if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
***************
*** 986,1038 **** gistSplit(Relation r,
ROTATEDIST(res);
res->block.num = v.splitVector.spl_nleft;
res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
! res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false)
! : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}
return res;
}
- /*
- * buffer must be pinned and locked by caller
- */
- void
- gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
- {
- Page page;
-
- Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
- page = BufferGetPage(buffer);
-
- START_CRIT_SECTION();
-
- GISTInitBuffer(buffer, 0);
- gistfillbuffer(page, itup, len, FirstOffsetNumber);
-
- MarkBufferDirty(buffer);
-
- if (!r->rd_istemp)
- {
- XLogRecPtr recptr;
- XLogRecData *rdata;
-
- rdata = formUpdateRdata(r->rd_node, buffer,
- NULL, 0,
- itup, len, key);
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
- }
- else
- PageSetLSN(page, GetXLogRecPtrForTemp());
-
- END_CRIT_SECTION();
- }
-
- /*
- * Fill a GISTSTATE with information about the index
- */
void
initGISTstate(GISTSTATE *giststate, Relation index)
{
--- 1260,1271 ----
ROTATEDIST(res);
res->block.num = v.splitVector.spl_nleft;
res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
! res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
}
return res;
}
void
initGISTstate(GISTSTATE *giststate, Relation index)
{
*** a/src/backend/access/gist/gistget.c
--- b/src/backend/access/gist/gistget.c
***************
*** 254,262 **** gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
page = BufferGetPage(buffer);
opaque = GistPageGetOpaque(page);
! /* check if page split occurred since visit to parent */
if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
! XLByteLT(pageItem->data.parentlsn, opaque->nsn) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* There was a page split, follow right link to add pages */
--- 254,268 ----
page = BufferGetPage(buffer);
opaque = GistPageGetOpaque(page);
! /*
! * Check if we need to follow the rightlink. We need to follow it if the
! * page was concurrently split since we visited the parent (in which case
! * parentlsn < nsn), or if the the system crashed after a page split but
! * before the downlink was inserted into the parent.
! */
if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
! (GistFollowRight(page) ||
! XLByteLT(pageItem->data.parentlsn, opaque->nsn)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* There was a page split, follow right link to add pages */
*** a/src/backend/access/gist/gistsplit.c
--- b/src/backend/access/gist/gistsplit.c
***************
*** 500,557 **** gistSplitHalf(GIST_SPLITVEC *v, int len)
}
/*
- * if it was invalid tuple then we need special processing.
- * We move all invalid tuples on right page.
- *
- * if there is no place on left page, gistSplit will be called one more
- * time for left page.
- *
- * Normally, we never exec this code, but after crash replay it's possible
- * to get 'invalid' tuples (probability is low enough)
- */
- static void
- gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
- {
- int i;
- static OffsetNumber offInvTuples[MaxOffsetNumber];
- int nOffInvTuples = 0;
-
- for (i = 1; i <= len; i++)
- if (GistTupleIsInvalid(itup[i - 1]))
- offInvTuples[nOffInvTuples++] = i;
-
- if (nOffInvTuples == len)
- {
- /* corner case, all tuples are invalid */
- v->spl_rightvalid = v->spl_leftvalid = false;
- gistSplitHalf(&v->splitVector, len);
- }
- else
- {
- GistSplitUnion gsvp;
-
- v->splitVector.spl_right = offInvTuples;
- v->splitVector.spl_nright = nOffInvTuples;
- v->spl_rightvalid = false;
-
- v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
- v->splitVector.spl_nleft = 0;
- for (i = 1; i <= len; i++)
- if (!GistTupleIsInvalid(itup[i - 1]))
- v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
- v->spl_leftvalid = true;
-
- gsvp.equiv = NULL;
- gsvp.attr = v->spl_lattr;
- gsvp.len = v->splitVector.spl_nleft;
- gsvp.entries = v->splitVector.spl_left;
- gsvp.isnull = v->spl_lisnull;
-
- gistunionsubkeyvec(giststate, itup, &gsvp, 0);
- }
- }
-
- /*
* trys to split page by attno key, in a case of null
* values move its to separate page.
*/
--- 500,505 ----
***************
*** 568,579 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
Datum datum;
bool IsNull;
- if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
- {
- gistSplitByInvalid(giststate, v, itup, len);
- return;
- }
-
datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, attno, &(entryvec->vector[i]),
datum, r, page, i,
--- 516,521 ----
***************
*** 582,589 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
offNullTuples[nOffNullTuples++] = i;
}
- v->spl_leftvalid = v->spl_rightvalid = true;
-
if (nOffNullTuples == len)
{
/*
--- 524,529 ----
*** a/src/backend/access/gist/gistutil.c
--- b/src/backend/access/gist/gistutil.c
***************
*** 152,158 **** gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
* invalid tuple. Resulting Datums aren't compressed.
*/
! bool
gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull)
{
--- 152,158 ----
* invalid tuple. Resulting Datums aren't compressed.
*/
! void
gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull)
{
***************
*** 180,189 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
Datum datum;
bool IsNull;
- if (GistTupleIsInvalid(itvec[j]))
- return FALSE; /* signals that union with invalid tuple =>
- * result is invalid */
-
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
--- 180,185 ----
***************
*** 218,225 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
isnull[i] = FALSE;
}
}
-
- return TRUE;
}
/*
--- 214,219 ----
***************
*** 231,238 **** gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{
memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
! if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS))
! return gist_form_invalid_tuple(InvalidBlockNumber);
return gistFormTuple(giststate, r, attrS, isnullS, false);
}
--- 225,231 ----
{
memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
! gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS);
return gistFormTuple(giststate, r, attrS, isnullS, false);
}
***************
*** 328,336 **** gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
IndexTuple newtup = NULL;
int i;
- if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup))
- return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid)));
-
gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldentries, oldisnull);
--- 321,326 ----
***************
*** 401,414 **** gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
- if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup))
- {
- ereport(LOG,
- (errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery",
- RelationGetRelationName(r))));
- continue;
- }
-
sum_grow = 0;
for (j = 0; j < r->rd_att->natts; j++)
{
--- 391,396 ----
***************
*** 521,527 **** gistFormTuple(GISTSTATE *giststate, Relation r,
}
res = index_form_tuple(giststate->tupdesc, compatt, isnull);
! GistTupleSetValid(res);
return res;
}
--- 503,513 ----
}
res = index_form_tuple(giststate->tupdesc, compatt, isnull);
! /*
! * The offset number on tuples on internal pages is unused. For historical
! * reasons, it is set 0xffff.
! */
! ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff);
return res;
}
*** a/src/backend/access/gist/gistvacuum.c
--- b/src/backend/access/gist/gistvacuum.c
***************
*** 26,38 ****
#include "utils/memutils.h"
- typedef struct GistBulkDeleteResult
- {
- IndexBulkDeleteResult std; /* common state */
- bool needReindex;
- } GistBulkDeleteResult;
-
-
/*
* VACUUM cleanup: update FSM
*/
--- 26,31 ----
***************
*** 40,46 **** Datum
gistvacuumcleanup(PG_FUNCTION_ARGS)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation rel = info->index;
BlockNumber npages,
blkno;
--- 33,39 ----
gistvacuumcleanup(PG_FUNCTION_ARGS)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation rel = info->index;
BlockNumber npages,
blkno;
***************
*** 56,65 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
/* Set up all-zero stats if gistbulkdelete wasn't called */
if (stats == NULL)
{
! stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
/* use heap's tuple count */
! stats->std.num_index_tuples = info->num_heap_tuples;
! stats->std.estimated_count = info->estimated_count;
/*
* XXX the above is wrong if index is partial. Would it be OK to just
--- 49,58 ----
/* Set up all-zero stats if gistbulkdelete wasn't called */
if (stats == NULL)
{
! stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* use heap's tuple count */
! stats->num_index_tuples = info->num_heap_tuples;
! stats->estimated_count = info->estimated_count;
/*
* XXX the above is wrong if index is partial. Would it be OK to just
***************
*** 67,77 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
*/
}
- if (stats->needReindex)
- ereport(NOTICE,
- (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
- RelationGetRelationName(rel))));
-
/*
* Need lock unless it's local to this backend.
*/
--- 60,65 ----
***************
*** 112,121 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
IndexFreeSpaceMapVacuum(info->index);
/* return statistics */
! stats->std.pages_free = totFreePages;
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
! stats->std.num_pages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
--- 100,109 ----
IndexFreeSpaceMapVacuum(info->index);
/* return statistics */
! stats->pages_free = totFreePages;
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
! stats->num_pages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
***************
*** 135,141 **** pushStackIfSplited(Page page, GistBDItem *stack)
GISTPageOpaque opaque = GistPageGetOpaque(page);
if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
! XLByteLT(stack->parentlsn, opaque->nsn) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* split page detected, install right link to the stack */
--- 123,129 ----
GISTPageOpaque opaque = GistPageGetOpaque(page);
if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
! (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* split page detected, install right link to the stack */
***************
*** 162,168 **** Datum
gistbulkdelete(PG_FUNCTION_ARGS)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index;
--- 150,156 ----
gistbulkdelete(PG_FUNCTION_ARGS)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index;
***************
*** 171,180 **** gistbulkdelete(PG_FUNCTION_ARGS)
/* first time through? */
if (stats == NULL)
! stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
/* we'll re-count the tuples each time */
! stats->std.estimated_count = false;
! stats->std.num_index_tuples = 0;
stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO;
--- 159,168 ----
/* first time through? */
if (stats == NULL)
! stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* we'll re-count the tuples each time */
! stats->estimated_count = false;
! stats->num_index_tuples = 0;
stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO;
***************
*** 232,241 **** gistbulkdelete(PG_FUNCTION_ARGS)
{
todelete[ntodelete] = i - ntodelete;
ntodelete++;
! stats->std.tuples_removed += 1;
}
else
! stats->std.num_index_tuples += 1;
}
if (ntodelete)
--- 220,229 ----
{
todelete[ntodelete] = i - ntodelete;
ntodelete++;
! stats->tuples_removed += 1;
}
else
! stats->num_index_tuples += 1;
}
if (ntodelete)
***************
*** 250,271 **** gistbulkdelete(PG_FUNCTION_ARGS)
if (!rel->rd_istemp)
{
- XLogRecData *rdata;
XLogRecPtr recptr;
- gistxlogPageUpdate *xlinfo;
! rdata = formUpdateRdata(rel->rd_node, buffer,
todelete, ntodelete,
NULL, 0,
! NULL);
! xlinfo = (gistxlogPageUpdate *) rdata->next->data;
!
! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
-
- pfree(xlinfo);
- pfree(rdata);
}
else
PageSetLSN(page, GetXLogRecPtrForTemp());
--- 238,251 ----
if (!rel->rd_istemp)
{
XLogRecPtr recptr;
! recptr = gistXLogUpdate(rel->rd_node, buffer,
todelete, ntodelete,
NULL, 0,
! NULL, 0);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
else
PageSetLSN(page, GetXLogRecPtrForTemp());
***************
*** 293,299 **** gistbulkdelete(PG_FUNCTION_ARGS)
stack->next = ptr;
if (GistTupleIsInvalid(idxtuple))
! stats->needReindex = true;
}
}
--- 273,283 ----
stack->next = ptr;
if (GistTupleIsInvalid(idxtuple))
! ereport(LOG,
! (errmsg("index \"%s\" contains an inner tuple marked as invalid",
! RelationGetRelationName(rel)),
! errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
! errhint("Please REINDEX it.")));
}
}
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 20,34 ****
#include "utils/memutils.h"
#include "utils/rel.h"
-
- typedef struct
- {
- gistxlogPageUpdate *data;
- int len;
- IndexTuple *itup;
- OffsetNumber *todelete;
- } PageUpdateRecord;
-
typedef struct
{
gistxlogPage *header;
--- 20,25 ----
***************
*** 41,214 **** typedef struct
NewPage *page;
} PageSplitRecord;
- /* track for incomplete inserts, idea was taken from nbtxlog.c */
-
- typedef struct gistIncompleteInsert
- {
- RelFileNode node;
- BlockNumber origblkno; /* for splits */
- ItemPointerData key;
- int lenblk;
- BlockNumber *blkno;
- XLogRecPtr lsn;
- BlockNumber *path;
- int pathlen;
- } gistIncompleteInsert;
-
-
static MemoryContext opCtx; /* working memory for operations */
- static MemoryContext insertCtx; /* holds incomplete_inserts list */
- static List *incomplete_inserts;
-
-
- #define ItemPointerEQ(a, b) \
- ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
- ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
-
! static void
! pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
! BlockNumber *blkno, int lenblk,
! PageSplitRecord *xlinfo /* to extract blkno info */ )
{
! MemoryContext oldCxt;
! gistIncompleteInsert *ninsert;
!
! if (!ItemPointerIsValid(&key))
!
! /*
! * if key is null then we should not store insertion as incomplete,
! * because it's a vacuum operation..
! */
! return;
!
! oldCxt = MemoryContextSwitchTo(insertCtx);
! ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
! ninsert->node = node;
! ninsert->key = key;
! ninsert->lsn = lsn;
!
! if (lenblk && blkno)
! {
! ninsert->lenblk = lenblk;
! ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
! memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
! ninsert->origblkno = *blkno;
! }
! else
{
! int i;
!
! Assert(xlinfo);
! ninsert->lenblk = xlinfo->data->npage;
! ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
! for (i = 0; i < ninsert->lenblk; i++)
! ninsert->blkno[i] = xlinfo->page[i].header->blkno;
! ninsert->origblkno = xlinfo->data->origblkno;
! }
! Assert(ninsert->lenblk > 0);
! /*
! * Stick the new incomplete insert onto the front of the list, not the
! * back. This is so that gist_xlog_cleanup will process incompletions in
! * last-in-first-out order.
! */
! incomplete_inserts = lcons(ninsert, incomplete_inserts);
!
! MemoryContextSwitchTo(oldCxt);
! }
!
! static void
! forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
! {
! ListCell *l;
! if (!ItemPointerIsValid(&key))
! return;
! if (incomplete_inserts == NIL)
! return;
! foreach(l, incomplete_inserts)
! {
! gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
!
! if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
{
! /* found */
! incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
! pfree(insert->blkno);
! pfree(insert);
! break;
}
- }
- }
-
- static void
- decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
- {
- char *begin = XLogRecGetData(record),
- *ptr;
- int i = 0,
- addpath = 0;
-
- decoded->data = (gistxlogPageUpdate *) begin;
-
- if (decoded->data->ntodelete)
- {
- decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
- addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
- }
- else
- decoded->todelete = NULL;
-
- decoded->len = 0;
- ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
- while (ptr - begin < record->xl_len)
- {
- decoded->len++;
- ptr += IndexTupleSize((IndexTuple) ptr);
- }
! decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
! ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
! while (ptr - begin < record->xl_len)
! {
! decoded->itup[i] = (IndexTuple) ptr;
! ptr += IndexTupleSize(decoded->itup[i]);
! i++;
}
}
/*
* redo any page update (except page split)
*/
static void
! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
{
! gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
! PageUpdateRecord xlrec;
Buffer buffer;
Page page;
! /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
! forgetIncompleteInsert(xldata->node, xldata->key);
!
! if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
! /* operation with root always finalizes insertion */
! pushIncompleteInsert(xldata->node, lsn, xldata->key,
! &(xldata->blkno), 1,
! NULL);
! /* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! decodePageUpdateRecord(&xlrec, record);
!
! buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
--- 32,116 ----
NewPage *page;
} PageSplitRecord;
static MemoryContext opCtx; /* working memory for operations */
! /*
! * Replay the clearing of F_FOLLOW_RIGHT flags. 'data' points to an array
! * of n BlockNumbers. Returns a pointer to just past the array.
! */
! static char *
! gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
! char *data, int n)
{
! int i;
! XLogRecPtr oldnsn = { 0, 0 };
! for (i = 0; i < n; i++)
{
! BlockNumber blkno;
! Buffer buffer;
! Page page;
! /* use memcpy in case the input isn't aligned */
! memcpy(&blkno, data, sizeof(BlockNumber));
! data += sizeof(BlockNumber);
! buffer = XLogReadBuffer(node, blkno, false);
! if (!BufferIsValid(buffer))
! continue;
! page = (Page) BufferGetPage(buffer);
! if (i == 0)
! oldnsn = GistPageGetOpaque(page)->nsn;
! /*
! * Note that we still update the NSN if page LSN is equal to the LSN
! * of this record, because the updated NSN is not included in the full
! * page image.
! */
! if (XLByteLT(lsn, PageGetLSN(page)))
{
! UnlockReleaseBuffer(buffer);
! continue;
}
! /*
! * Like in the non-recovery codepath, on the rightmost page set NSN
! * to the old NSN of the first page, and NSN of all the other pages
! * to the LSN of the parent. Clear F_FOLLOW_RIGHT flag on all pages.
! */
! GistPageGetOpaque(page)->nsn = (i == n - 1) ? oldnsn : lsn;
! GistClearFollowRight(page);
! PageSetLSN(page, lsn);
! PageSetTLI(page, ThisTimeLineID);
! MarkBufferDirty(buffer);
! UnlockReleaseBuffer(buffer);
}
+ return data;
}
/*
* redo any page update (except page split)
*/
static void
! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
{
! char *begin = XLogRecGetData(record);
! gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
Buffer buffer;
Page page;
+ char *data;
! data = gistRedoClearFollowRight(xldata->node, lsn,
! (char *) (&xldata->nclearFollowRight) + sizeof(uint16),
! xldata->nclearFollowRight);
! /* nothing more to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
! buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
***************
*** 219,246 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
return;
}
! if (isnewroot)
! GISTInitBuffer(buffer, 0);
! else if (xlrec.data->ntodelete)
{
int i;
! for (i = 0; i < xlrec.data->ntodelete; i++)
! PageIndexTupleDelete(page, xlrec.todelete[i]);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
! if (xlrec.len > 0)
! gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
!
! /*
! * special case: leafpage, nothing to insert, nothing to delete, then
! * vacuum marks page
! */
! if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
! GistClearTuplesDeleted(page);
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
--- 121,167 ----
return;
}
! /* Delete old tuples */
! if (xldata->ntodelete > 0)
{
int i;
+ OffsetNumber *todelete = (OffsetNumber *) data;
+ data += sizeof(OffsetNumber) * xldata->ntodelete;
! for (i = 0; i < xldata->ntodelete; i++)
! PageIndexTupleDelete(page, todelete[i]);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
! if (data - begin < record->xl_len)
! {
! OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
! OffsetNumberNext(PageGetMaxOffsetNumber(page));
! while (data - begin < record->xl_len)
! {
! IndexTuple itup = (IndexTuple) data;
! Size sz = IndexTupleSize(itup);
! OffsetNumber l;
! data += sz;
!
! l = PageAddItem(page, (Item) itup, sz, off, false, false);
! if (l == InvalidOffsetNumber)
! elog(ERROR, "failed to add item to GiST index page, size %d bytes",
! (int) sz);
! off++;
! }
! }
! else
! {
! /*
! * special case: leafpage, nothing to insert, nothing to delete, then
! * vacuum marks page
! */
! if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
! GistClearTuplesDeleted(page);
! }
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
***************
*** 282,298 **** gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
}
static void
! decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{
! char *begin = XLogRecGetData(record),
! *ptr;
int j,
i = 0;
decoded->data = (gistxlogPageSplit *) begin;
decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
- ptr = begin + sizeof(gistxlogPageSplit);
for (i = 0; i < decoded->data->npage; i++)
{
Assert(ptr - begin < record->xl_len);
--- 203,217 ----
}
static void
! decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record, char *ptr)
{
! char *begin = XLogRecGetData(record);
int j,
i = 0;
decoded->data = (gistxlogPageSplit *) begin;
decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
for (i = 0; i < decoded->data->npage; i++)
{
Assert(ptr - begin < record->xl_len);
***************
*** 315,327 **** decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
PageSplitRecord xlrec;
Buffer buffer;
Page page;
int i;
int flags;
! decodePageSplitRecord(&xlrec, record);
flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */
--- 234,252 ----
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
+ gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
PageSplitRecord xlrec;
Buffer buffer;
Page page;
int i;
int flags;
+ bool isrootsplit = false;
+ char *data;
! data = gistRedoClearFollowRight(xldata->node, lsn,
! ((char *) xldata) + sizeof(gistxlogPageSplit),
! xldata->nclearFollowRight);
! decodePageSplitRecord(&xlrec, record, data);
flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */
***************
*** 329,334 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
--- 254,265 ----
{
NewPage *newpage = xlrec.page + i;
+ if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ {
+ Assert(i == 0);
+ isrootsplit = true;
+ }
+
buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
***************
*** 339,355 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
/* and fill it */
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
-
- forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
-
- pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
- NULL, 0,
- &xlrec);
}
static void
--- 270,285 ----
/* and fill it */
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
+ if (i < xlrec.data->npage - 1 && !isrootsplit)
+ GistMarkFollowRight(page);
+ else
+ GistClearFollowRight(page);
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
}
static void
***************
*** 372,395 **** gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
}
- static void
- gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
- {
- char *begin = XLogRecGetData(record),
- *ptr;
- gistxlogInsertComplete *xlrec;
-
- xlrec = (gistxlogInsertComplete *) begin;
-
- ptr = begin + sizeof(gistxlogInsertComplete);
- while (ptr - begin < record->xl_len)
- {
- Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
- forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
- ptr += sizeof(ItemPointerData);
- }
- }
-
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
--- 302,307 ----
***************
*** 401,430 **** gist_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here.
*/
-
RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
! gistRedoPageUpdateRecord(lsn, record, false);
break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record);
break;
- case XLOG_GIST_NEW_ROOT:
- gistRedoPageUpdateRecord(lsn, record, true);
- break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record);
break;
- case XLOG_GIST_INSERT_COMPLETE:
- gistRedoCompleteInsert(lsn, record);
- break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
--- 313,335 ----
* implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here.
*/
RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
! gistRedoPageUpdateRecord(lsn, record);
break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record);
break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record);
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
***************
*** 434,453 **** gist_redo(XLogRecPtr lsn, XLogRecord *record)
}
static void
! out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
{
appendStringInfo(buf, "rel %u/%u/%u",
node.spcNode, node.dbNode, node.relNode);
- if (ItemPointerIsValid(&key))
- appendStringInfo(buf, "; tid %u/%u",
- ItemPointerGetBlockNumber(&key),
- ItemPointerGetOffsetNumber(&key));
}
static void
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{
! out_target(buf, xlrec->node, xlrec->key);
appendStringInfo(buf, "; block number %u", xlrec->blkno);
}
--- 339,354 ----
}
static void
! out_target(StringInfo buf, RelFileNode node)
{
appendStringInfo(buf, "rel %u/%u/%u",
node.spcNode, node.dbNode, node.relNode);
}
static void
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{
! out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u", xlrec->blkno);
}
***************
*** 463,469 **** static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_split: ");
! out_target(buf, xlrec->node, xlrec->key);
appendStringInfo(buf, "; block number %u splits to %d pages",
xlrec->origblkno, xlrec->npage);
}
--- 364,370 ----
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_split: ");
! out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u splits to %d pages",
xlrec->origblkno, xlrec->npage);
}
***************
*** 482,491 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec)
case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break;
- case XLOG_GIST_NEW_ROOT:
- appendStringInfo(buf, "new_root: ");
- out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
- break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break;
--- 383,388 ----
***************
*** 495,861 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec)
((RelFileNode *) rec)->dbNode,
((RelFileNode *) rec)->relNode);
break;
- case XLOG_GIST_INSERT_COMPLETE:
- appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
- ((gistxlogInsertComplete *) rec)->node.spcNode,
- ((gistxlogInsertComplete *) rec)->node.dbNode,
- ((gistxlogInsertComplete *) rec)->node.relNode);
- break;
default:
appendStringInfo(buf, "unknown gist op code %u", info);
break;
}
}
- IndexTuple
- gist_form_invalid_tuple(BlockNumber blkno)
- {
- /*
- * we don't alloc space for null's bitmap, this is invalid tuple, be
- * carefull in read and write code
- */
- Size size = IndexInfoFindDataOffset(0);
- IndexTuple tuple = (IndexTuple) palloc0(size);
-
- tuple->t_info |= size;
-
- ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
- GistTupleSetInvalid(tuple);
-
- return tuple;
- }
-
-
- static void
- gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
- {
- GISTInsertStack *top;
-
- insert->pathlen = 0;
- insert->path = NULL;
-
- if ((top = gistFindPath(index, insert->origblkno)) != NULL)
- {
- int i;
- GISTInsertStack *ptr;
-
- for (ptr = top; ptr; ptr = ptr->parent)
- insert->pathlen++;
-
- insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
-
- i = 0;
- for (ptr = top; ptr; ptr = ptr->parent)
- insert->path[i++] = ptr->blkno;
- }
- else
- elog(ERROR, "lost parent for block %u", insert->origblkno);
- }
-
- static SplitedPageLayout *
- gistMakePageLayout(Buffer *buffers, int nbuffers)
- {
- SplitedPageLayout *res = NULL,
- *resptr;
-
- while (nbuffers-- > 0)
- {
- Page page = BufferGetPage(buffers[nbuffers]);
- IndexTuple *vec;
- int veclen;
-
- resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
-
- resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
- resptr->block.num = PageGetMaxOffsetNumber(page);
-
- vec = gistextractpage(page, &veclen);
- resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
-
- resptr->next = res;
- res = resptr;
- }
-
- return res;
- }
-
- /*
- * Continue insert after crash. In normal situations, there aren't any
- * incomplete inserts, but if a crash occurs partway through an insertion
- * sequence, we'll need to finish making the index valid at the end of WAL
- * replay.
- *
- * Note that we assume the index is now in a valid state, except for the
- * unfinished insertion. In particular it's safe to invoke gistFindPath();
- * there shouldn't be any garbage pages for it to run into.
- *
- * To complete insert we can't use basic insertion algorithm because
- * during insertion we can't call user-defined support functions of opclass.
- * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
- * 'invalid' tuple should be updated by vacuum full.
- */
- static void
- gistContinueInsert(gistIncompleteInsert *insert)
- {
- IndexTuple *itup;
- int i,
- lenitup;
- Relation index;
-
- index = CreateFakeRelcacheEntry(insert->node);
-
- /*
- * needed vector itup never will be more than initial lenblkno+2, because
- * during this processing Indextuple can be only smaller
- */
- lenitup = insert->lenblk;
- itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
-
- for (i = 0; i < insert->lenblk; i++)
- itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
-
- /*
- * any insertion of itup[] should make LOG message about
- */
-
- if (insert->origblkno == GIST_ROOT_BLKNO)
- {
- /*
- * it was split root, so we should only make new root. it can't be
- * simple insert into root, we should replace all content of root.
- */
- Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
-
- gistnewroot(index, buffer, itup, lenitup, NULL);
- UnlockReleaseBuffer(buffer);
- }
- else
- {
- Buffer *buffers;
- Page *pages;
- int numbuffer;
- OffsetNumber *todelete;
-
- /* construct path */
- gistxlogFindPath(index, insert);
-
- Assert(insert->pathlen > 0);
-
- buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
- pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
- todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
-
- for (i = 0; i < insert->pathlen; i++)
- {
- int j,
- k,
- pituplen = 0;
- uint8 xlinfo;
- XLogRecData *rdata;
- XLogRecPtr recptr;
- Buffer tempbuffer = InvalidBuffer;
- int ntodelete = 0;
-
- numbuffer = 1;
- buffers[0] = ReadBuffer(index, insert->path[i]);
- LockBuffer(buffers[0], GIST_EXCLUSIVE);
-
- /*
- * we check buffer, because we restored page earlier
- */
- gistcheckpage(index, buffers[0]);
-
- pages[0] = BufferGetPage(buffers[0]);
- Assert(!GistPageIsLeaf(pages[0]));
-
- pituplen = PageGetMaxOffsetNumber(pages[0]);
-
- /* find remove old IndexTuples to remove */
- for (j = 0; j < pituplen && ntodelete < lenitup; j++)
- {
- BlockNumber blkno;
- ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
- IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid);
-
- blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
-
- for (k = 0; k < lenitup; k++)
- if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
- {
- todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
- ntodelete++;
- break;
- }
- }
-
- if (ntodelete == 0)
- elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
-
- /*
- * we check space with subtraction only first tuple to delete,
- * hope, that wiil be enough space....
- */
-
- if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
- {
-
- /* no space left on page, so we must split */
- buffers[numbuffer] = ReadBuffer(index, P_NEW);
- LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
- GISTInitBuffer(buffers[numbuffer], 0);
- pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
- gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
- numbuffer++;
-
- if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
- {
- Buffer tmp;
-
- /*
- * we split root, just copy content from root to new page
- */
-
- /* sanity check */
- if (i + 1 != insert->pathlen)
- elog(PANIC, "unexpected pathlen in index \"%s\"",
- RelationGetRelationName(index));
-
- /* fill new page, root will be changed later */
- tempbuffer = ReadBuffer(index, P_NEW);
- LockBuffer(tempbuffer, GIST_EXCLUSIVE);
- memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
-
- /* swap buffers[0] (was root) and temp buffer */
- tmp = buffers[0];
- buffers[0] = tempbuffer;
- tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO,
- * it is still unchanged */
-
- pages[0] = BufferGetPage(buffers[0]);
- }
-
- START_CRIT_SECTION();
-
- for (j = 0; j < ntodelete; j++)
- PageIndexTupleDelete(pages[0], todelete[j]);
-
- xlinfo = XLOG_GIST_PAGE_SPLIT;
- rdata = formSplitRdata(index->rd_node, insert->path[i],
- false, &(insert->key),
- gistMakePageLayout(buffers, numbuffer));
-
- }
- else
- {
- START_CRIT_SECTION();
-
- for (j = 0; j < ntodelete; j++)
- PageIndexTupleDelete(pages[0], todelete[j]);
- gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
-
- xlinfo = XLOG_GIST_PAGE_UPDATE;
- rdata = formUpdateRdata(index->rd_node, buffers[0],
- todelete, ntodelete,
- itup, lenitup, &(insert->key));
- }
-
- /*
- * use insert->key as mark for completion of insert (form*Rdata()
- * above) for following possible replays
- */
-
- /* write pages, we should mark it dirty befor XLogInsert() */
- for (j = 0; j < numbuffer; j++)
- {
- GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
- MarkBufferDirty(buffers[j]);
- }
- recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
- for (j = 0; j < numbuffer; j++)
- {
- PageSetLSN(pages[j], recptr);
- PageSetTLI(pages[j], ThisTimeLineID);
- }
-
- END_CRIT_SECTION();
-
- lenitup = numbuffer;
- for (j = 0; j < numbuffer; j++)
- {
- itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
- UnlockReleaseBuffer(buffers[j]);
- }
-
- if (tempbuffer != InvalidBuffer)
- {
- /*
- * it was a root split, so fill it by new values
- */
- gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
- UnlockReleaseBuffer(tempbuffer);
- }
- }
- }
-
- FreeFakeRelcacheEntry(index);
-
- ereport(LOG,
- (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
- insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
- errdetail("Incomplete insertion detected during crash replay.")));
- }
-
void
gist_xlog_startup(void)
{
- incomplete_inserts = NIL;
- insertCtx = AllocSetContextCreate(CurrentMemoryContext,
- "GiST recovery temporary context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void)
{
- ListCell *l;
- MemoryContext oldCxt;
-
- oldCxt = MemoryContextSwitchTo(opCtx);
-
- foreach(l, incomplete_inserts)
- {
- gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
-
- gistContinueInsert(insert);
- MemoryContextReset(opCtx);
- }
- MemoryContextSwitchTo(oldCxt);
-
MemoryContextDelete(opCtx);
- MemoryContextDelete(insertCtx);
- }
-
- bool
- gist_safe_restartpoint(void)
- {
- if (incomplete_inserts)
- return false;
- return true;
}
!
! XLogRecData *
! formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
! ItemPointer key, SplitedPageLayout *dist)
{
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
SplitedPageLayout *ptr;
int npage = 0,
! cur = 1;
ptr = dist;
while (ptr)
--- 392,432 ----
((RelFileNode *) rec)->dbNode,
((RelFileNode *) rec)->relNode);
break;
default:
appendStringInfo(buf, "unknown gist op code %u", info);
break;
}
}
void
gist_xlog_startup(void)
{
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void)
{
MemoryContextDelete(opCtx);
}
! XLogRecPtr
! gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
! SplitedPageLayout *dist,
! Buffer *clearFollowRight, int nclearFollowRight)
{
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
SplitedPageLayout *ptr;
int npage = 0,
! cur;
! int i;
! BlockNumber *clearFollowRightBlknos;
! XLogRecPtr recptr;
!
! clearFollowRightBlknos = palloc(sizeof(BlockNumber) * nclearFollowRight);
! for (i = 0; i < nclearFollowRight; i++)
! clearFollowRightBlknos[i] = BufferGetBlockNumber(clearFollowRight[i]);
ptr = dist;
while (ptr)
***************
*** 864,884 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
ptr = ptr->next;
}
! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
xlrec->node = node;
xlrec->origblkno = blkno;
xlrec->origleaf = page_is_leaf;
xlrec->npage = (uint16) npage;
! if (key)
! xlrec->key = *key;
! else
! ItemPointerSetInvalid(&(xlrec->key));
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) xlrec;
rdata[0].len = sizeof(gistxlogPageSplit);
! rdata[0].next = NULL;
ptr = dist;
while (ptr)
--- 435,462 ----
ptr = ptr->next;
}
! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 3 + nclearFollowRight));
xlrec->node = node;
xlrec->origblkno = blkno;
xlrec->origleaf = page_is_leaf;
xlrec->npage = (uint16) npage;
! xlrec->nclearFollowRight = nclearFollowRight;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) xlrec;
rdata[0].len = sizeof(gistxlogPageSplit);
! cur = 1;
!
! if (nclearFollowRight > 0)
! {
! rdata[cur - 1].next = &rdata[cur];
! rdata[cur].buffer = InvalidBuffer;
! rdata[cur].data = (char *) clearFollowRightBlknos;
! rdata[cur].len = sizeof(BlockNumber) * nclearFollowRight;
! rdata[cur].next = NULL;
! cur++;
! }
ptr = dist;
while (ptr)
***************
*** 898,904 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
ptr = ptr->next;
}
! return rdata;
}
/*
--- 476,500 ----
ptr = ptr->next;
}
! /*
! * full page images for the child bufs (necessary if a checkpoint happened
! * since splitting the child page)
! */
! for (i = 0; i < nclearFollowRight; i++)
! {
! rdata[cur - 1].next = &(rdata[cur]);
! rdata[cur].data = NULL;
! rdata[cur].len = 0;
! rdata[cur].buffer = clearFollowRight[i];
! rdata[cur].buffer_std = true;
! cur++;
! }
! rdata[cur - 1].next = NULL;
!
! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
!
! pfree(rdata);
! return recptr;
}
/*
***************
*** 911,937 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
* at least one rdata item referencing the buffer, even when ntodelete and
* ituplen are both zero; this ensures that XLogInsert knows about the buffer.
*/
! XLogRecData *
! formUpdateRdata(RelFileNode node, Buffer buffer,
! OffsetNumber *todelete, int ntodelete,
! IndexTuple *itup, int ituplen, ItemPointer key)
{
XLogRecData *rdata;
gistxlogPageUpdate *xlrec;
int cur,
i;
! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete;
!
! if (key)
! xlrec->key = *key;
! else
! ItemPointerSetInvalid(&(xlrec->key));
rdata[0].buffer = buffer;
rdata[0].buffer_std = true;
--- 507,536 ----
* at least one rdata item referencing the buffer, even when ntodelete and
* ituplen are both zero; this ensures that XLogInsert knows about the buffer.
*/
! XLogRecPtr
! gistXLogUpdate(RelFileNode node, Buffer buffer,
! OffsetNumber *todelete, int ntodelete,
! IndexTuple *itup, int ituplen,
! Buffer *clearFollowRight, int nclearFollowRight)
{
XLogRecData *rdata;
gistxlogPageUpdate *xlrec;
int cur,
i;
+ BlockNumber *clearFollowRightBlknos;
+ XLogRecPtr recptr;
! clearFollowRightBlknos = palloc(sizeof(BlockNumber) * nclearFollowRight);
! for (i = 0; i < nclearFollowRight; i++)
! clearFollowRightBlknos[i] = BufferGetBlockNumber(clearFollowRight[i]);
!
! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen + nclearFollowRight));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete;
! xlrec->nclearFollowRight = nclearFollowRight;
rdata[0].buffer = buffer;
rdata[0].buffer_std = true;
***************
*** 944,957 **** formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[1].buffer = InvalidBuffer;
rdata[1].next = &(rdata[2]);
! rdata[2].data = (char *) todelete;
! rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
! rdata[2].buffer = buffer;
! rdata[2].buffer_std = true;
! rdata[2].next = NULL;
/* new tuples */
! cur = 3;
for (i = 0; i < ituplen; i++)
{
rdata[cur - 1].next = &(rdata[cur]);
--- 543,561 ----
rdata[1].buffer = InvalidBuffer;
rdata[1].next = &(rdata[2]);
! rdata[2].data = (char *) clearFollowRightBlknos;
! rdata[2].len = nclearFollowRight * sizeof(BlockNumber);
! rdata[2].buffer = InvalidBuffer;
! rdata[2].next = &(rdata[3]);
!
! rdata[3].data = (char *) todelete;
! rdata[3].len = sizeof(OffsetNumber) * ntodelete;
! rdata[3].buffer = buffer;
! rdata[3].buffer_std = true;
! rdata[3].next = NULL;
/* new tuples */
! cur = 4;
for (i = 0; i < ituplen; i++)
{
rdata[cur - 1].next = &(rdata[cur]);
***************
*** 959,996 **** formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[cur].len = IndexTupleSize(itup[i]);
rdata[cur].buffer = buffer;
rdata[cur].buffer_std = true;
- rdata[cur].next = NULL;
cur++;
}
! return rdata;
! }
!
! XLogRecPtr
! gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
! {
! gistxlogInsertComplete xlrec;
! XLogRecData rdata[2];
! XLogRecPtr recptr;
!
! Assert(len > 0);
! xlrec.node = node;
!
! rdata[0].buffer = InvalidBuffer;
! rdata[0].data = (char *) &xlrec;
! rdata[0].len = sizeof(gistxlogInsertComplete);
! rdata[0].next = &(rdata[1]);
!
! rdata[1].buffer = InvalidBuffer;
! rdata[1].data = (char *) keys;
! rdata[1].len = sizeof(ItemPointerData) * len;
! rdata[1].next = NULL;
!
! START_CRIT_SECTION();
!
! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
!
! END_CRIT_SECTION();
return recptr;
}
--- 563,587 ----
rdata[cur].len = IndexTupleSize(itup[i]);
rdata[cur].buffer = buffer;
rdata[cur].buffer_std = true;
cur++;
}
+ /*
+ * full page images for the child bufs (necessary if a checkpoint happened
+ * since splitting the child page)
+ */
+ for (i = 0; i < nclearFollowRight; i++)
+ {
+ rdata[cur - 1].next = &(rdata[cur]);
+ rdata[cur].data = NULL;
+ rdata[cur].len = 0;
+ rdata[cur].buffer = clearFollowRight[i];
+ rdata[cur].buffer_std = true;
+ cur++;
+ }
+ rdata[cur - 1].next = NULL;
! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+ pfree(rdata);
return recptr;
}
*** a/src/backend/access/transam/rmgr.c
--- b/src/backend/access/transam/rmgr.c
***************
*** 40,45 **** const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
{"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint},
{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
};
--- 40,45 ----
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
{"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL},
{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
};
*** a/src/include/access/gist.h
--- b/src/include/access/gist.h
***************
*** 58,66 ****
/*
* Page opaque data in a GiST index page.
*/
! #define F_LEAF (1 << 0)
! #define F_DELETED (1 << 1)
! #define F_TUPLES_DELETED (1 << 2)
typedef XLogRecPtr GistNSN;
--- 58,67 ----
/*
* Page opaque data in a GiST index page.
*/
! #define F_LEAF (1 << 0) /* leaf page */
! #define F_DELETED (1 << 1) /* the page has been deleted */
! #define F_TUPLES_DELETED (1 << 2) /* some tuples on the page are dead */
! #define F_FOLLOW_RIGHT (1 << 3) /* page to the right has no downlink */
typedef XLogRecPtr GistNSN;
***************
*** 132,137 **** typedef struct GISTENTRY
--- 133,142 ----
#define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
#define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED)
+ #define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT)
+ #define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT)
+ #define GistClearFollowRight(page) ( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT)
+
/*
* Vector of GISTENTRY structs; user-defined methods union and picksplit
* take it as one of their arguments
*** a/src/include/access/gist_private.h
--- b/src/include/access/gist_private.h
***************
*** 132,140 **** typedef GISTScanOpaqueData *GISTScanOpaque;
/* XLog stuff */
#define XLOG_GIST_PAGE_UPDATE 0x00
! #define XLOG_GIST_NEW_ROOT 0x20
#define XLOG_GIST_PAGE_SPLIT 0x30
! #define XLOG_GIST_INSERT_COMPLETE 0x40
#define XLOG_GIST_CREATE_INDEX 0x50
#define XLOG_GIST_PAGE_DELETE 0x60
--- 132,140 ----
/* XLog stuff */
#define XLOG_GIST_PAGE_UPDATE 0x00
! /* #define XLOG_GIST_NEW_ROOT 0x20 */ /* not used anymore */
#define XLOG_GIST_PAGE_SPLIT 0x30
! /* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */
#define XLOG_GIST_CREATE_INDEX 0x50
#define XLOG_GIST_PAGE_DELETE 0x60
***************
*** 143,173 **** typedef struct gistxlogPageUpdate
RelFileNode node;
BlockNumber blkno;
- /*
- * It used to identify completeness of insert. Sets to leaf itup
- */
- ItemPointerData key;
-
/* number of deleted offsets */
uint16 ntodelete;
/*
! * follow: 1. todelete OffsetNumbers 2. tuples to insert
*/
} gistxlogPageUpdate;
typedef struct gistxlogPageSplit
{
RelFileNode node;
! BlockNumber origblkno; /* splitted page */
! bool origleaf; /* was splitted page a leaf page? */
uint16 npage;
- /* see comments on gistxlogPageUpdate */
- ItemPointerData key;
-
/*
! * follow: 1. gistxlogPage and array of IndexTupleData per page
*/
} gistxlogPageSplit;
--- 143,173 ----
RelFileNode node;
BlockNumber blkno;
/* number of deleted offsets */
uint16 ntodelete;
+ uint16 nclearFollowRight;
+
/*
! * follow:
! * 1. nclearFollowRight BlockNumbers
! * 2. todelete OffsetNumbers
! * 3. tuples to insert
*/
} gistxlogPageUpdate;
typedef struct gistxlogPageSplit
{
RelFileNode node;
! BlockNumber origblkno;
! bool origleaf;
! uint16 nclearFollowRight;
uint16 npage;
/*
! * follow:
! * 1. nclearFollowRight BlockNumbers
! * 2. gistxlogPage and array of IndexTupleData per page
*/
} gistxlogPageSplit;
***************
*** 177,188 **** typedef struct gistxlogPage
int num; /* number of index tuples following */
} gistxlogPage;
- typedef struct gistxlogInsertComplete
- {
- RelFileNode node;
- /* follows ItemPointerData key to clean */
- } gistxlogInsertComplete;
-
typedef struct gistxlogPageDelete
{
RelFileNode node;
--- 177,182 ----
***************
*** 206,212 **** typedef struct SplitedPageLayout
* GISTInsertStack used for locking buffers and transfer arguments during
* insertion
*/
-
typedef struct GISTInsertStack
{
/* current page */
--- 200,205 ----
***************
*** 215,221 **** typedef struct GISTInsertStack
Page page;
/*
! * log sequence number from page->lsn to recognize page update and
* compare it with page's nsn to recognize page split
*/
GistNSN lsn;
--- 208,214 ----
Page page;
/*
! * log sequence number from page->lsn to recognize page update and
* compare it with page's nsn to recognize page split
*/
GistNSN lsn;
***************
*** 223,231 **** typedef struct GISTInsertStack
/* child's offset */
OffsetNumber childoffnum;
! /* pointer to parent and child */
struct GISTInsertStack *parent;
- struct GISTInsertStack *child;
/* for gistFindPath */
struct GISTInsertStack *next;
--- 216,223 ----
/* child's offset */
OffsetNumber childoffnum;
! /* pointer to parent */
struct GISTInsertStack *parent;
/* for gistFindPath */
struct GISTInsertStack *next;
***************
*** 238,249 **** typedef struct GistSplitVector
Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in
* spl_left */
bool spl_lisnull[INDEX_MAX_KEYS];
- bool spl_leftvalid;
Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in
* spl_right */
bool spl_risnull[INDEX_MAX_KEYS];
- bool spl_rightvalid;
bool *spl_equiv; /* equivalent tuples which can be freely
* distributed between left and right pages */
--- 230,239 ----
***************
*** 252,279 **** typedef struct GistSplitVector
typedef struct
{
Relation r;
- IndexTuple *itup; /* in/out, points to compressed entry */
- int ituplen; /* length of itup */
Size freespace; /* free space to be left */
- GISTInsertStack *stack;
- bool needInsertComplete;
! /* pointer to heap tuple */
! ItemPointerData key;
} GISTInsertState;
/* root page of a gist index */
#define GIST_ROOT_BLKNO 0
/*
! * mark tuples on inner pages during recovery
*/
#define TUPLE_IS_VALID 0xffff
#define TUPLE_IS_INVALID 0xfffe
#define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
#define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
- #define GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID )
/* gist.c */
extern Datum gistbuild(PG_FUNCTION_ARGS);
--- 242,281 ----
typedef struct
{
Relation r;
Size freespace; /* free space to be left */
! GISTInsertStack *stack;
} GISTInsertState;
/* root page of a gist index */
#define GIST_ROOT_BLKNO 0
/*
! * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner
! * pages to finish crash recovery of incomplete page splits. If a crash
! * happened in the middle of a page split, so that the downlink pointers were
! * not yet inserted, crash recovery inserted a special downlink pointer. The
! * semantics of an invalid tuple was that it if you encounter one in a scan,
! * it must always be followed, because we don't know if the tuples on the
! * child page match or not.
! *
! * We no longer create such invalid tuples, we now mark the left-half of such
! * an incomplete split with the F_FOLLOW_RIGHT flag instead, and finish the
! * split properly the next time we need to insert on that page. To retain
! * on-disk compatibility for the sake of pg_upgrade, we still store 0xffff as
! * the offset number of all inner tuples. If we encounter any invalid tuples
! * with 0xfffe during insertion, we throw an error, though scans still handle
! * them. You should only encounter invalid tuples if you pg_upgrade a pre-9.1
! * gist index which already has invalid tuples in it because of a crash. That
! * should be rare, and you are recommended to REINDEX anyway if you have any
! * invalid tuples in an index, so throwing an error is as far as we go with
! * supporting that.
*/
#define TUPLE_IS_VALID 0xffff
#define TUPLE_IS_INVALID 0xfffe
#define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
#define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
/* gist.c */
extern Datum gistbuild(PG_FUNCTION_ARGS);
***************
*** 281,288 **** extern Datum gistinsert(PG_FUNCTION_ARGS);
extern MemoryContext createTempGistContext(void);
extern void initGISTstate(GISTSTATE *giststate, Relation index);
extern void freeGISTstate(GISTSTATE *giststate);
- extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
- extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
int len, GISTSTATE *giststate);
--- 283,288 ----
***************
*** 294,311 **** extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
- extern bool gist_safe_restartpoint(void);
- extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
-
- extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
- OffsetNumber *todelete, int ntodelete,
- IndexTuple *itup, int ituplen, ItemPointer key);
! extern XLogRecData *formSplitRdata(RelFileNode node,
! BlockNumber blkno, bool page_is_leaf,
! ItemPointer key, SplitedPageLayout *dist);
! extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
/* gistget.c */
extern Datum gistgettuple(PG_FUNCTION_ARGS);
--- 294,309 ----
extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
! extern XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer,
! OffsetNumber *todelete, int ntodelete,
! IndexTuple *itup, int ituplen,
! Buffer *clearFollowRight, int nclearFollowRight);
! extern XLogRecPtr gistXLogSplit(RelFileNode node,
! BlockNumber blkno, bool page_is_leaf,
! SplitedPageLayout *dist,
! Buffer *clearFollowRight, int nclearFollowRight);
/* gistget.c */
extern Datum gistgettuple(PG_FUNCTION_ARGS);
***************
*** 357,363 **** extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
extern float gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2);
! extern bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull);
extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b);
extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
--- 355,361 ----
extern float gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2);
! extern void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull);
extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b);
extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,