*** a/doc/src/sgml/gist.sgml --- b/doc/src/sgml/gist.sgml *************** *** 709,741 **** my_distance(PG_FUNCTION_ARGS) - - Crash Recovery - - - Usually, replay of the WAL log is sufficient to restore the integrity - of a GiST index following a database crash. However, there are some - corner cases in which the index state is not fully rebuilt. The index - will still be functionally correct, but there might be some performance - degradation. When this occurs, the index can be repaired by - VACUUMing its table, or by rebuilding the index using - REINDEX. In some cases a plain VACUUM is - not sufficient, and either VACUUM FULL or REINDEX - is needed. The need for one of these procedures is indicated by occurrence - of this log message during crash recovery: - - LOG: index NNN/NNN/NNN needs VACUUM or REINDEX to finish crash recovery - - or this log message during routine index insertions: - - LOG: index "FOO" needs VACUUM or REINDEX to finish crash recovery - - If a plain VACUUM finds itself unable to complete recovery - fully, it will return a notice: - - NOTICE: index "FOO" needs VACUUM FULL or REINDEX to finish crash recovery - - - - --- 709,712 ---- *** a/src/backend/access/gist/README --- b/src/backend/access/gist/README *************** *** 108,150 **** Penalty is used for choosing a subtree to insert; method PickSplit is used for the node splitting algorithm; method Union is used for propagating changes upward to maintain the tree properties. ! NOTICE: We modified original INSERT algorithm for performance reason. In ! particularly, it is now a single-pass algorithm. ! ! Function findLeaf is used to identify subtree for insertion. Page, in which ! insertion is proceeded, is locked as well as its parent page. Functions ! findParent and findPath are used to find parent pages, which could be changed ! because of concurrent access. Function pageSplit is recurrent and could split ! page by more than 2 pages, which could be necessary if keys have different ! lengths or more than one key are inserted (in such situation, user defined ! function pickSplit cannot guarantee free space on page). ! ! findLeaf(new-key) ! push(stack, [root, 0]) //page, LSN ! while(true) ! ptr = top of stack ! latch( ptr->page, S-mode ) ! ptr->lsn = ptr->page->lsn ! if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn ) ! unlatch( ptr->page ) ! pop stack ! else if ( ptr->page is not leaf ) ! push( stack, [get_best_child(ptr->page, new-key), 0] ) ! unlatch( ptr->page ) ! else ! unlatch( ptr->page ) ! latch( ptr->page, X-mode ) ! if ( ptr->page is not leaf ) ! //the only root page can become a non-leaf ! unlatch( ptr->page ) ! else if ( ptr->parent->lsn < ptr->page->nsn ) ! unlatch( ptr->page ) ! pop stack ! else ! return stack ! end ! end ! end findPath( stack item ) push stack, [root, 0, 0] // page, LSN, parent --- 108,178 ---- the node splitting algorithm; method Union is used for propagating changes upward to maintain the tree properties. ! To insert a tuple, we first have to find a suitable leaf page to insert to. ! The algorithm walks down the tree, starting from the root, along the path ! of smallest Penalty. At each step: ! ! 1. Has this page been split since we looked at the parent? If so, it's ! possible that we should be inserting to the other half instead, so retreat ! back to the parent. ! 2. If this is a leaf node, we've found our target node. ! 3. Otherwise use Penalty to pick a new target subtree. ! 4. Check the key representing the target subtree. If it doesn't already cover ! the key we're inserting, replace it with the Union of the old downlink key ! and the key being inserted. (Actually, we always call Union, and just skip ! the replacement if the Unioned key is the same as the existing key) ! 5. Replacing the key in step 4 might cause the page to be split. In that case, ! propagate the change upwards and restart the algorithm from the first parent ! that didn't need to be split. ! 6. Walk down to the target subtree, and goto 1. ! ! This differs from the insertion algorithm in the original paper. In the ! original paper, you first walk down the tree until you reach a leaf page, and ! then you adjust the downlink in the parent, and propagating the adjustment up, ! all the way up to the root in the worst case. But we adjust the downlinks to ! cover the new key already when we walk down, so that when we reach the leaf ! page, we don't need to update the parents anymore, except to insert the ! downlinks if we have to split the page. This makes crash recovery simpler: ! after inserting a key to the page, the tree is immediately self-consistent ! without having to update the parents. Even if we split a page and crash before ! inserting the downlink to the parent, the tree is self-consistent because the ! right half of the split is accessible via the rightlink of the left page ! (which replaced the original page). ! ! Note that the algorithm can walk up and down the tree before reaching a leaf ! page, if internal pages need to split while adjusting the downlinks for the ! new key. Eventually, you should reach the bottom, and proceed with the ! insertion of the new tuple. ! ! Once we've found the target page to insert to, we check if there's room ! for the new tuple. If there is, the tuple is inserted, and we're done. ! If it doesn't fit, however, the page needs to be split. Note that it is ! possible that a page needs to be split into more than two pages, if keys have ! different lengths or more than one key is being inserted at a time (which can ! happen when inserting downlinks for a page split that resulted in more than ! two pages at the lower level). After splitting a page, the parent page needs ! to be updated. The downlink for the new page needs to be inserted, and the ! downlink for the old page, which became the left half of the split, needs to ! be updated to only cover those tuples that stayed on the left page. Inserting ! the downlink in the parent can again lead to a page split, recursing up to the ! root page in the worst case. ! ! gistplacetopage is the workhorse function that performs one step of the ! insertion. If the tuple fits, it inserts it to the given page, otherwise ! it splits the page, and constructs the new downlink tuples for the split ! pages. The caller must then call gistplacetopage() on the parent page to ! insert the downlink tuples. The parent page that holds the downlink to ! the child might have migrated as a result of concurrent splits of the ! parent, gistfindCorrectParent() is used to find the parent page. ! ! Splitting the root page works slightly differently. At root split, ! gistplacetopage() allocates the new child pages and replaces the old root ! page with the new root containing downlinks to the new children, all in one ! operation. ! ! ! findPath is a subroutine of findParent, used when the correct parent page ! can't be found by following the rightlinks at the parent level: findPath( stack item ) push stack, [root, 0, 0] // page, LSN, parent *************** *** 165,170 **** findPath( stack item ) --- 193,203 ---- pop stack end + + gistFindCorrectParent is used to re-find the parent of a page during + insertion. It might have migrated to the right since we traversed down the + tree because of page splits. + findParent( stack item ) parent = item->parent latch( parent->page, X-mode ) *************** *** 184,189 **** findParent( stack item ) --- 217,225 ---- return findParent( item ) end + pageSplit function decides how to distribute keys to the new pages after + page split: + pageSplit(page, allkeys) (lkeys, rkeys) = pickSplit( allkeys ) if ( page is root ) *************** *** 204,242 **** pageSplit(page, allkeys) return newkeys - placetopage(page, keysarray) - if ( no space left on page ) - keysarray = pageSplit(page, [ extract_keys(page), keysarray]) - last page in chain gets old NSN, - original and others - new NSN equals to LSN - if ( page is root ) - make new root with keysarray - end - else - put keysarray on page - if ( length of keysarray > 1 ) - keysarray = [ union(keysarray) ] - end - end ! insert(new-key) ! stack = findLeaf(new-key) ! keysarray = [new-key] ! ptr = top of stack ! while(true) ! findParent( ptr ) //findParent latches parent page ! keysarray = placetopage(ptr->page, keysarray) ! unlatch( ptr->page ) ! pop stack; ! ptr = top of stack ! if (length of keysarray == 1) ! newboundingkey = union(oldboundingkey, keysarray) ! if (newboundingkey == oldboundingkey) ! unlatch ptr->page ! break loop ! end ! end ! end Authors: Teodor Sigaev --- 240,283 ---- return newkeys ! Concurrency control ! ------------------- ! As a rule of thumb, if you need to hold a lock on multiple pages at the ! same time, the locks should be acquired in the following order: child page ! before parent, and left-to-right at the same level. Always acquiring the ! locks in the same order avoids deadlocks. ! ! The search algorithm only looks at and locks one page at a time. Consequently ! there's a race condition between a search and a page split. A page split ! happens in two phases: 1. The page is split 2. The downlink is inserted to the ! parent. If a search looks at the parent page between those steps, before the ! downlink is inserted, it will still find the new right half by following the ! rightlink on the left half. But it must not follow the rightlink if it saw the ! downlink in the parent, or the page will be visited twice! ! ! A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan ! sees that flag set, it knows that the right page is missing the downlink, and ! should be visited too. When split inserts the downlink to the parent, it ! clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the ! child page header to match the LSN of the insertion on the parent. If the ! F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the ! LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page ! before the downlink was inserted, so it should follow the rightlink. Otherwise ! the scan saw the downlink in the parent page, and will/did follow that as ! usual. ! ! A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because ! a page split keeps the child pages locked until the downlink has been inserted ! to the parent and the flag cleared again. But if a crash happens in the middle ! of a page split, before the downlinks are inserted into the parent, that will ! leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine, ! but we'll eventually want to fix that for performance reasons. And more ! importantly, dealing with pages with missing downlink pointers in the parent ! would complicate the insertion algorithm. So when an insertion sees a page ! with F_FOLLOW_RIGHT set, it immediately tries to bring the split that ! crashed in the middle to completion by adding the downlink in the parent. ! Authors: Teodor Sigaev *** a/src/backend/access/gist/gist.c --- b/src/backend/access/gist/gist.c *************** *** 43,50 **** static void gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *GISTstate); ! static void gistfindleaf(GISTInsertState *state, ! GISTSTATE *giststate); #define ROTATEDIST(d) do { \ --- 43,54 ---- IndexTuple itup, Size freespace, GISTSTATE *GISTstate); ! static bool gistinserthere(GISTInsertState *state, ! GISTSTATE *giststate, IndexTuple itup, bool isupdate); ! static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate); ! static bool gistinsert_common(GISTInsertState *state, GISTSTATE *giststate, ! IndexTuple *tuples, int ntuples, bool isupdate, ! Buffer *oldchildren, int noldchildren); #define ROTATEDIST(d) do { \ *************** *** 251,291 **** gistinsert(PG_FUNCTION_ARGS) /* ! * Workhouse routine for doing insertion into a GiST index. Note that ! * this routine assumes it is invoked in a short-lived memory context, ! * so it does not bother releasing palloc'd allocations. */ - static void - gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) - { - GISTInsertState state; - - memset(&state, 0, sizeof(GISTInsertState)); - - state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); - state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); - memcpy(state.itup[0], itup, IndexTupleSize(itup)); - state.ituplen = 1; - state.freespace = freespace; - state.r = r; - state.key = itup->t_tid; - state.needInsertComplete = true; - - state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); - state.stack->blkno = GIST_ROOT_BLKNO; - - gistfindleaf(&state, giststate); - gistmakedeal(&state, giststate); - } - static bool ! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { ! bool is_splitted = false; ! bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; /* ! * if (!is_leaf) remove old key: This node's key has been modified, either * because a child split occurred or because we needed to adjust our key * for an insert in a child node. Therefore, remove the old version of * this node's key. --- 255,287 ---- /* ! * Place all tuples in 'itup' to stack->page. If 'isupdate' is true, ! * the first tuple in the array replaces the tuple at stack->childoffnum, ! * instead of being inserted as new. ! * ! * Sets NSN and clears F_FOLLOW_RIGHT flags on 'childbufs', and unlocks ! * and releases them. ! * ! * If this page was split, all the split pages are kept pinned and locked, ! * and returned in *splitbufs. New downlink tuples that need to be inserted ! * to the parent are returned in *parenttuples. */ static bool ! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate, ! IndexTuple *itup, int ntuples, bool isupdate, ! Buffer *childbufs, int nchildbufs, ! Buffer **splitbufs, int *nsplitbufs, ! IndexTuple **parenttuples, int *nparenttuples) { ! GISTInsertStack *stack = state->stack; ! bool is_splitted; ! bool is_leaf = (GistPageIsLeaf(stack->page)) ? true : false; ! XLogRecPtr recptr; ! XLogRecPtr oldnsn; ! int i; /* ! * if isupdate, remove old key: This node's key has been modified, either * because a child split occurred or because we needed to adjust our key * for an insert in a child node. Therefore, remove the old version of * this node's key. *************** *** 293,415 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) * for WAL replay, in the non-split case we handle this by setting up a * one-element todelete array; in the split case, it's handled implicitly * because the tuple vector passed to gistSplit won't include this tuple. - * - * XXX: If we want to change fillfactors between node and leaf, fillfactor - * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor) */ ! if (gistnospace(state->stack->page, state->itup, state->ituplen, ! is_leaf ? InvalidOffsetNumber : state->stack->childoffnum, ! state->freespace)) { /* no space for insertion */ IndexTuple *itvec; int tlen; SplitedPageLayout *dist = NULL, *ptr; ! BlockNumber rrlink = InvalidBlockNumber; ! GistNSN oldnsn; ! ! is_splitted = true; /* ! * Form index tuples vector to split: remove old tuple if t's needed ! * and add new tuples to vector */ ! itvec = gistextractpage(state->stack->page, &tlen); ! if (!is_leaf) { /* on inner page we should remove old tuple */ ! int pos = state->stack->childoffnum - FirstOffsetNumber; tlen--; if (pos != tlen) memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos)); } ! itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); ! dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate); ! state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen); ! state->ituplen = 0; ! ! if (state->stack->blkno != GIST_ROOT_BLKNO) { ! /* ! * if non-root split then we should not allocate new buffer, but ! * we must create temporary page to operate ! */ ! dist->buffer = state->stack->buffer; ! dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer)); /* clean all flags except F_LEAF */ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; - } ! /* make new pages and fills them */ ! for (ptr = dist; ptr; ptr = ptr->next) ! { ! int i; ! char *data; /* get new page */ ! if (ptr->buffer == InvalidBuffer) ! { ! ptr->buffer = gistNewBuffer(state->r); ! GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); ! ptr->page = BufferGetPage(ptr->buffer); ! } ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); ! /* ! * fill page, we can do it because all these pages are new (ie not ! * linked in tree or masked by temp page ! */ ! data = (char *) (ptr->list); for (i = 0; i < ptr->block.num; i++) { if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber) elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r)); data += IndexTupleSize((IndexTuple) data); } - - /* set up ItemPointer and remember it for parent */ - ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); - state->itup[state->ituplen] = ptr->itup; - state->ituplen++; } - /* saves old rightlink */ - if (state->stack->blkno != GIST_ROOT_BLKNO) - rrlink = GistPageGetOpaque(dist->page)->rightlink; - START_CRIT_SECTION(); /* ! * must mark buffers dirty before XLogInsert, even though we'll still ! * be changing their opaque fields below. set up right links. */ for (ptr = dist; ptr; ptr = ptr->next) { MarkBufferDirty(ptr->buffer); ! GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ? ! ptr->next->block.blkno : rrlink; } ! /* restore splitted non-root page */ ! if (state->stack->blkno != GIST_ROOT_BLKNO) ! { ! PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); ! dist->page = BufferGetPage(dist->buffer); ! } if (!state->r->rd_istemp) { ! XLogRecPtr recptr; ! XLogRecData *rdata; ! ! rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, ! is_leaf, &(state->key), dist); ! ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); for (ptr = dist; ptr; ptr = ptr->next) { --- 289,445 ---- * for WAL replay, in the non-split case we handle this by setting up a * one-element todelete array; in the split case, it's handled implicitly * because the tuple vector passed to gistSplit won't include this tuple. */ ! is_splitted = gistnospace(stack->page, itup, ntuples, ! isupdate ? stack->childoffnum : InvalidOffsetNumber, ! state->freespace); ! if (is_splitted) { /* no space for insertion */ IndexTuple *itvec; int tlen; SplitedPageLayout *dist = NULL, *ptr; ! BlockNumber oldrlink = InvalidBlockNumber; ! SplitedPageLayout rootpg; ! int nsplitpages; ! int n; /* ! * Form index tuples vector to split. If we're replacing an old tuple, ! * remove the old version from the vector. */ ! itvec = gistextractpage(stack->page, &tlen); ! if (isupdate) { /* on inner page we should remove old tuple */ ! int pos = stack->childoffnum - FirstOffsetNumber; tlen--; if (pos != tlen) memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos)); } ! itvec = gistjoinvector(itvec, &tlen, itup, ntuples); ! dist = gistSplit(state->r, stack->page, itvec, tlen, giststate); ! /* ! * Set up pages to work with. Allocate new buffers for all but the ! * leftmost page. The original page becomes the new leftmost page, ! * and is just replaced with new contents ! */ ! ptr = dist; ! if (stack->blkno != GIST_ROOT_BLKNO) { ! /* saves old rightlink */ ! oldrlink = GistPageGetOpaque(stack->page)->rightlink; + dist->buffer = stack->buffer; + dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer)); /* clean all flags except F_LEAF */ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; ! dist->block.blkno = BufferGetBlockNumber(ptr->buffer); + ptr = ptr->next; + } + for (; ptr; ptr = ptr->next) + { /* get new page */ ! ptr->buffer = gistNewBuffer(state->r); ! GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); ! ptr->page = BufferGetPage(ptr->buffer); ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); + } + + /* Set up parent tuples. */ + nsplitpages = 0; + for (ptr = dist; ptr; ptr = ptr->next) + nsplitpages++; + *parenttuples = (IndexTuple *) palloc(sizeof(IndexTuple) * nsplitpages); + *nparenttuples = nsplitpages; + n = 0; + for (ptr = dist; ptr; ptr = ptr->next) + { + ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); + GistTupleSetValid(ptr->itup); + (*parenttuples)[n++] = ptr->itup; + } ! /* ! * If this is a root split, we construct the new root page with the ! * downlinks here directly, instead of requiring the caller to ! * insert them. Add the new root page to the list along with the ! * split child pages. ! */ ! if (stack->blkno == GIST_ROOT_BLKNO) ! { ! rootpg.buffer = stack->buffer; ! rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer)); ! GistPageGetOpaque(rootpg.page)->flags = 0; ! ! rootpg.block.blkno = GIST_ROOT_BLKNO; ! rootpg.block.num = *nparenttuples; ! rootpg.list = gistfillitupvec(*parenttuples, *nparenttuples, &(rootpg.lenlist)); ! rootpg.itup = NULL; ! ! rootpg.next = dist; ! dist = &rootpg; ! ! *parenttuples = NULL; ! *nparenttuples = 0; ! } ! ! /* ! * Fill all pages. All the pages are new, ie. freshly allocated empty ! * pages, or a temporary copy of the old page. ! */ ! for (ptr = dist; ptr; ptr = ptr->next) ! { ! char *data = (char *) (ptr->list); for (i = 0; i < ptr->block.num; i++) { if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber) elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r)); data += IndexTupleSize((IndexTuple) data); } } START_CRIT_SECTION(); /* ! * Must mark buffers dirty before XLogInsert, even though we'll still ! * be changing their opaque fields below. Also set up right links. */ for (ptr = dist; ptr; ptr = ptr->next) { MarkBufferDirty(ptr->buffer); ! if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO) ! { ! GistPageGetOpaque(ptr->page)->rightlink = ! ptr->next->block.blkno; ! GistMarkFollowRight(ptr->page); ! } ! else ! { ! GistPageGetOpaque(ptr->page)->rightlink = oldrlink; ! GistClearFollowRight(ptr->page); ! } } + for (i = 0; i < nchildbufs; i++) + MarkBufferDirty(childbufs[i]); ! /* ! * The first page in the chain was a temporary working copy meant ! * to replace the old page. Copy it over the old page. ! */ ! PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); ! dist->page = BufferGetPage(dist->buffer); + /* Write the WAL record */ if (!state->r->rd_istemp) { ! recptr = gistXLogSplit(state->r->rd_node, stack->blkno, is_leaf, ! dist, childbufs, nchildbufs); for (ptr = dist; ptr; ptr = ptr->next) { *************** *** 419,646 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) } else { for (ptr = dist; ptr; ptr = ptr->next) ! { ! PageSetLSN(ptr->page, GetXLogRecPtrForTemp()); ! } ! } ! ! /* set up NSN */ ! oldnsn = GistPageGetOpaque(dist->page)->nsn; ! if (state->stack->blkno == GIST_ROOT_BLKNO) ! /* if root split we should put initial value */ ! oldnsn = PageGetLSN(dist->page); ! ! for (ptr = dist; ptr; ptr = ptr->next) ! { ! /* only for last set oldnsn */ ! GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ? ! PageGetLSN(ptr->page) : oldnsn; } /* ! * release buffers, if it was a root split then release all buffers ! * because we create all buffers */ ! ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next; ! for (; ptr; ptr = ptr->next) ! UnlockReleaseBuffer(ptr->buffer); ! ! if (state->stack->blkno == GIST_ROOT_BLKNO) { ! gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); ! state->needInsertComplete = false; } - - END_CRIT_SECTION(); } else { ! /* enough space */ START_CRIT_SECTION(); ! if (!is_leaf) ! PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); ! gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); ! MarkBufferDirty(state->stack->buffer); if (!state->r->rd_istemp) { OffsetNumber noffs = 0, offs[1]; - XLogRecPtr recptr; - XLogRecData *rdata; ! if (!is_leaf) { ! /* only on inner page we should delete previous version */ ! offs[0] = state->stack->childoffnum; noffs = 1; } ! rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, ! offs, noffs, ! state->itup, state->ituplen, ! &(state->key)); ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); ! PageSetLSN(state->stack->page, recptr); ! PageSetTLI(state->stack->page, ThisTimeLineID); } else ! PageSetLSN(state->stack->page, GetXLogRecPtrForTemp()); ! ! if (state->stack->blkno == GIST_ROOT_BLKNO) ! state->needInsertComplete = false; ! END_CRIT_SECTION(); ! if (state->ituplen > 1) ! { /* previous is_splitted==true */ ! /* ! * child was splited, so we must form union for insertion in ! * parent ! */ ! IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); ! ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno); ! state->itup[0] = newtup; ! state->ituplen = 1; ! } ! else if (is_leaf) ! { ! /* ! * itup[0] store key to adjust parent, we set it to valid to ! * correct check by GistTupleIsInvalid macro in gistgetadjusted() ! */ ! ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno); ! GistTupleSetValid(state->itup[0]); } } return is_splitted; } /* ! * returns stack of pages, all pages in stack are pinned, and ! * leaf is X-locked */ - static void ! gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) { ItemId iid; IndexTuple idxtuple; ! GISTPageOpaque opaque; /* ! * walk down, We don't lock page for a long time, but so we should be ! * ready to recheck path in a bad case... We remember, that page->lsn ! * should never be invalid. */ for (;;) { ! if (XLogRecPtrIsInvalid(state->stack->lsn)) ! state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); ! LockBuffer(state->stack->buffer, GIST_SHARE); ! gistcheckpage(state->r, state->stack->buffer); ! state->stack->page = (Page) BufferGetPage(state->stack->buffer); ! opaque = GistPageGetOpaque(state->stack->page); ! state->stack->lsn = PageGetLSN(state->stack->page); ! Assert(state->r->rd_istemp || !XLogRecPtrIsInvalid(state->stack->lsn)); ! if (state->stack->blkno != GIST_ROOT_BLKNO && ! XLByteLT(state->stack->parent->lsn, opaque->nsn)) { /* ! * caused split non-root page is detected, go up to parent to ! * choose best child */ ! UnlockReleaseBuffer(state->stack->buffer); ! state->stack = state->stack->parent; continue; } ! if (!GistPageIsLeaf(state->stack->page)) { /* ! * This is an internal page, so continue to walk down the tree. We ! * find the child node that has the minimum insertion penalty and ! * recursively invoke ourselves to modify that node. Once the ! * recursive call returns, we may need to adjust the parent node ! * for two reasons: the child node split, or the key in this node ! * needs to be adjusted for the newly inserted key below us. */ ! GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ! state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); ! iid = PageGetItemId(state->stack->page, state->stack->childoffnum); ! idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid); ! item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! ! item->parent = state->stack; ! item->child = NULL; ! if (state->stack) ! state->stack->child = item; ! state->stack = item; ! } ! else ! { ! /* be carefull, during unlock/lock page may be changed... */ ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! LockBuffer(state->stack->buffer, GIST_EXCLUSIVE); ! state->stack->page = (Page) BufferGetPage(state->stack->buffer); ! opaque = GistPageGetOpaque(state->stack->page); ! if (state->stack->blkno == GIST_ROOT_BLKNO) { /* ! * the only page can become inner instead of leaf is a root ! * page, so for root we should recheck it */ ! if (!GistPageIsLeaf(state->stack->page)) { ! /* ! * very rarely situation: during unlock/lock index with ! * number of pages = 1 was increased ! */ ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! continue; ! } /* ! * we don't need to check root split, because checking ! * leaf/inner is enough to recognize split for root */ ! } ! else if (XLByteLT(state->stack->parent->lsn, opaque->nsn)) { ! /* ! * detecting split during unlock/lock, so we should find ! * better child on parent ! */ ! /* forget buffer */ ! UnlockReleaseBuffer(state->stack->buffer); ! state->stack = state->stack->parent; ! continue; } ! state->stack->lsn = PageGetLSN(state->stack->page); ! /* ok we found a leaf page and it X-locked */ break; } } - - /* now state->stack->(page, buffer and blkno) points to leaf page */ } /* --- 449,800 ---- } else { + recptr = GetXLogRecPtrForTemp(); for (ptr = dist; ptr; ptr = ptr->next) ! PageSetLSN(ptr->page, recptr); } /* ! * Return the new child buffers to the caller. ! * ! * If this was a root split, we've already inserted the downlink ! * pointers, in the form of a new root page. Therefore we can ! * release all the new buffers, and keep just the root page locked. */ ! if (stack->blkno == GIST_ROOT_BLKNO) { ! Assert(stack->parent == NULL); ! for (ptr = dist->next; ptr; ptr = ptr->next) ! { ! GistPageGetOpaque(ptr->page)->nsn = recptr; ! GistClearFollowRight(ptr->page); ! UnlockReleaseBuffer(ptr->buffer); ! } ! *splitbufs = NULL; ! *nsplitbufs = 0; ! } ! else ! { ! *splitbufs = (Buffer *) palloc(sizeof(Buffer) * nsplitpages); ! (*nsplitbufs) = nsplitpages; ! i = 0; ! for (ptr = dist; ptr; ptr = ptr->next) ! (*splitbufs)[i++] = ptr->buffer; ! Assert(i == *nsplitbufs); } } else { ! /* ! * Enough space. We also get here if ntuples==0. ! */ START_CRIT_SECTION(); ! if (isupdate) ! PageIndexTupleDelete(stack->page, stack->childoffnum); ! gistfillbuffer(stack->page, itup, ntuples, InvalidOffsetNumber); ! MarkBufferDirty(stack->buffer); ! for (i = 0; i < nchildbufs; i++) ! MarkBufferDirty(childbufs[i]); if (!state->r->rd_istemp) { OffsetNumber noffs = 0, offs[1]; ! if (isupdate) { ! offs[0] = stack->childoffnum; noffs = 1; } ! recptr = gistXLogUpdate(state->r->rd_node, stack->buffer, ! offs, noffs, ! itup, ntuples, ! childbufs, nchildbufs); ! PageSetLSN(stack->page, recptr); ! PageSetTLI(stack->page, ThisTimeLineID); } else ! { ! recptr = GetXLogRecPtrForTemp(); ! PageSetLSN(stack->page, recptr); ! } ! *parenttuples = NULL; ! *nparenttuples = 0; ! *splitbufs = NULL; ! *nsplitbufs = 0; ! } ! /* ! * If what we inserted were downlinks for some children, set NSN and ! * clear the F_FOLLOW_RIGHT flags in them. The rightmost page inherits ! * the NSN of the leftmost page, on others the NSN is set to the LSN ! * of the record that inserted the downlinks in the parent. ! * ! * Note that we do this *after* writing the WAL record. That means that ! * the possible full page image in the WAL record does not include ! * these changes, and they must be replayed even if the page is restored ! * from the full page image. ! */ ! if (nchildbufs > 0) ! { ! Assert(nchildbufs > 1); /* there should always be at least two halves */ ! oldnsn = GistPageGetOpaque(BufferGetPage(childbufs[0]))->nsn; ! for (i = 0; i < nchildbufs; i++) ! { ! Page p = BufferGetPage(childbufs[i]); ! if (i == nchildbufs - 1) ! GistPageGetOpaque(p)->nsn = oldnsn; ! else ! GistPageGetOpaque(p)->nsn = recptr; ! GistClearFollowRight(p); ! PageSetLSN(p, recptr); ! PageSetTLI(p, ThisTimeLineID); ! UnlockReleaseBuffer(childbufs[i]); } } + + END_CRIT_SECTION(); + return is_splitted; } /* ! * Workhouse routine for doing insertion into a GiST index. Note that ! * this routine assumes it is invoked in a short-lived memory context, ! * so it does not bother releasing palloc'd allocations. */ static void ! gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) { ItemId iid; IndexTuple idxtuple; ! GISTInsertStack firststack; ! GISTInsertStack *stack; ! GISTInsertState state; ! bool xlocked = false; ! ! memset(&state, 0, sizeof(GISTInsertState)); ! state.freespace = freespace; ! state.r = r; ! ! /* Start from the root */ ! firststack.blkno = GIST_ROOT_BLKNO; ! firststack.lsn.xrecoff = 0; ! firststack.parent = NULL; ! stack = &firststack; /* ! * Walk down along the path of smallest penalty, updating the parent ! * pointers with the key we're inserting as we go. If we crash in the ! * middle, the tree is consistent, although the possible parent updates ! * were a waste. */ for (;;) { ! if (XLogRecPtrIsInvalid(stack->lsn)) ! stack->buffer = ReadBuffer(state.r, stack->blkno); ! ! /* ! * Be optimistic and grab shared lock first. Swap it for an ! * exclusive lock later if we need to update the page. ! */ ! if (!xlocked) ! { ! LockBuffer(stack->buffer, GIST_SHARE); ! gistcheckpage(state.r, stack->buffer); ! } ! stack->page = (Page) BufferGetPage(stack->buffer); ! stack->lsn = PageGetLSN(stack->page); ! Assert(state.r->rd_istemp || !XLogRecPtrIsInvalid(stack->lsn)); ! /* ! * If this page was split but the downlink was never inserted to ! * the parent because the inserting backend crashed before doing ! * that, fix that now. ! */ ! if (GistFollowRight(stack->page)) ! { ! if (!xlocked) ! { ! LockBuffer(stack->buffer, GIST_UNLOCK); ! LockBuffer(stack->buffer, GIST_EXCLUSIVE); ! xlocked = true; ! /* someone might've completed the split when we unlocked */ ! if (!GistFollowRight(stack->page)) ! continue; ! } ! state.stack = stack; ! gistfixsplit(&state, giststate); ! stack = state.stack; ! continue; ! } ! if (stack->blkno != GIST_ROOT_BLKNO && ! XLByteLT(stack->parent->lsn, ! GistPageGetOpaque(stack->page)->nsn)) { /* ! * Concurrent split detected. Go back to parent and rechoose the ! * best child. */ ! UnlockReleaseBuffer(stack->buffer); ! stack = stack->parent; continue; } ! if (!GistPageIsLeaf(stack->page)) { /* ! * This is an internal page so continue to walk down the tree. ! * Find the child node that has the minimum insertion penalty. */ ! BlockNumber childblkno; ! IndexTuple newtup; ! GISTInsertStack *item; ! stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate); ! iid = PageGetItemId(stack->page, stack->childoffnum); ! idxtuple = (IndexTuple) PageGetItem(stack->page, iid); ! childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); ! /* ! * Check that it's not a leftover invalid tuple from pre-9.1 ! */ ! if (GistTupleIsInvalid(idxtuple)) ! ereport(ERROR, ! (errmsg("index \"%s\" contains an inner tuple marked as invalid", ! RelationGetRelationName(r)), ! errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."), ! errhint("Please REINDEX it."))); ! /* ! * Check that the key representing the target child node is ! * consistent with the key we're inserting. Update it if it's not. ! */ ! newtup = gistgetadjusted(state.r, idxtuple, itup, giststate); ! if (newtup) { + bool popped; + /* ! * Swap shared lock for an exclusive one. Beware, the page ! * may change while we unlock/lock the page... */ ! if (!xlocked) { ! LockBuffer(stack->buffer, GIST_UNLOCK); ! LockBuffer(stack->buffer, GIST_EXCLUSIVE); ! xlocked = true; ! stack->page = (Page) BufferGetPage(stack->buffer); + if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn)) + { + /* the page was changed while we unlocked it, retry */ + continue; + } + } /* ! * Update the tuple. gistinserthere() might have to split the ! * page to make the updated tuple fit. In that case we ! * continue the algorithm from the parent node. Note that ! * we hold an exclusive lock on stack->buffer in any case, ! * so we keep xlocked as 'true'. If the page was split, ! * stack->buffer just isn't the same page as it was before ! * the call. */ ! state.stack = stack; ! popped = gistinserthere(&state, giststate, newtup, true); ! stack = state.stack; ! if (popped) ! continue; } ! LockBuffer(stack->buffer, GIST_UNLOCK); ! xlocked = false; ! ! /* descend to the chosen child */ ! item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ! item->blkno = childblkno; ! item->parent = stack; ! stack = item; ! } ! else ! { ! /* ! * Leaf page. Insert the new key. We've already updated all the ! * parents on the way down, but we might have to split the page ! * if it doesn't fit. gistinserthere() will take care of that. ! */ ! ! /* ! * Swap shared lock for an exclusive one. Be careful, the page ! * may change while we unlock/lock the page... ! */ ! if (!xlocked) { ! LockBuffer(stack->buffer, GIST_UNLOCK); ! LockBuffer(stack->buffer, GIST_EXCLUSIVE); ! xlocked = true; ! stack->page = (Page) BufferGetPage(stack->buffer); ! stack->lsn = PageGetLSN(stack->page); ! if (stack->blkno == GIST_ROOT_BLKNO) ! { ! /* ! * the only page that can become inner instead of leaf ! * is the root page, so for root we should recheck it ! */ ! if (!GistPageIsLeaf(stack->page)) ! { ! /* ! * very rare situation: during unlock/lock index with ! * number of pages = 1 was increased ! */ ! LockBuffer(stack->buffer, GIST_UNLOCK); ! xlocked = false; ! continue; ! } ! /* ! * we don't need to check root split, because checking ! * leaf/inner is enough to recognize split for root ! */ ! } ! else if (GistFollowRight(stack->page) || ! XLByteLT(stack->parent->lsn, ! GistPageGetOpaque(stack->page)->nsn)) ! { ! /* ! * The page was split while we momentarily unlocked the ! * page. Go back to parent. ! */ ! UnlockReleaseBuffer(stack->buffer); ! xlocked = false; ! stack = stack->parent; ! continue; ! } } ! /* now state->stack->(page, buffer and blkno) points to leaf page */ ! state.stack = stack; ! gistinserthere(&state, giststate, itup, false); ! stack = state.stack; ! ! /* Release any locks and pins we might still hold before exiting */ ! LockBuffer(stack->buffer, GIST_UNLOCK); ! for (; stack; stack = stack->parent) ! ReleaseBuffer(stack->buffer); break; } } } /* *************** *** 648,654 **** gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) * * returns from the beginning of closest parent; * ! * To prevent deadlocks, this should lock only one page simultaneously. */ GISTInsertStack * gistFindPath(Relation r, BlockNumber child) --- 802,808 ---- * * returns from the beginning of closest parent; * ! * To prevent deadlocks, this should lock only one page at a time. */ GISTInsertStack * gistFindPath(Relation r, BlockNumber child) *************** *** 711,718 **** gistFindPath(Relation r, BlockNumber child) ptr = top; while (ptr->parent) { - /* set child link */ - ptr->parent->child = ptr; /* move childoffnum.. */ if (ptr == top) { --- 865,870 ---- *************** *** 767,773 **** gistFindCorrectParent(Relation r, GISTInsertStack *child) LockBuffer(parent->buffer, GIST_EXCLUSIVE); gistcheckpage(r, parent->buffer); parent->page = (Page) BufferGetPage(parent->buffer); ! /* here we don't need to distinguish between split and page update */ if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page))) { --- 919,925 ---- LockBuffer(parent->buffer, GIST_EXCLUSIVE); gistcheckpage(r, parent->buffer); parent->page = (Page) BufferGetPage(parent->buffer); ! /* here we don't need to distinguish between split and page update */ if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page))) { *************** *** 836,842 **** gistFindCorrectParent(Relation r, GISTInsertStack *child) /* install new chain of parents to stack */ child->parent = parent; - parent->child = child; /* make recursive call to normal processing */ gistFindCorrectParent(r, child); --- 988,993 ---- *************** *** 845,918 **** gistFindCorrectParent(Relation r, GISTInsertStack *child) return; } ! void ! gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { ! int is_splitted; ! ItemId iid; ! IndexTuple oldtup, ! newtup; ! /* walk up */ ! while (true) { ! /* ! * After this call: 1. if child page was splited, then itup contains ! * keys for each page 2. if child page wasn't splited, then itup ! * contains additional for adjustment of current key ! */ ! if (state->stack->parent) { /* ! * X-lock parent page before proceed child, gistFindCorrectParent ! * should find and lock it */ - gistFindCorrectParent(state->r, state->stack); } - is_splitted = gistplacetopage(state, giststate); ! /* parent locked above, so release child buffer */ ! UnlockReleaseBuffer(state->stack->buffer); ! /* pop parent page from stack */ ! state->stack = state->stack->parent; ! ! /* stack is void */ ! if (!state->stack) break; /* ! * child did not split, so we can check is it needed to update parent ! * tuple */ ! if (!is_splitted) { ! /* parent's tuple */ ! iid = PageGetItemId(state->stack->page, state->stack->childoffnum); ! oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); ! newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); ! ! if (!newtup) ! { /* not need to update key */ ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! break; ! } ! state->itup[0] = newtup; } } /* while */ ! ! /* release all parent buffers */ ! while (state->stack) ! { ! ReleaseBuffer(state->stack->buffer); ! state->stack = state->stack->parent; ! } ! ! /* say to xlog that insert is completed */ ! if (state->needInsertComplete && !state->r->rd_istemp) ! gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); } /* --- 996,1193 ---- return; } ! ! /* ! * Complete the incomplete split of state->stack->page. ! */ ! static void ! gistfixsplit(GISTInsertState *state, GISTSTATE *giststate) { ! GISTInsertStack *stack = state->stack; ! Buffer *childbufs; ! int nchildbufs; ! IndexTuple *downlinks; ! int ndownlinks; ! Buffer buf; ! Page page; ! elog(LOG, "fixing incomplete split in index \"%s\", block %u", ! RelationGetRelationName(state->r), stack->blkno); ! ! Assert(OffsetNumberIsValid(stack->parent->childoffnum)); ! Assert(GistFollowRight(stack->page)); ! ! childbufs = (Buffer *) palloc(sizeof(Buffer) * 2); ! nchildbufs = 0; ! downlinks = (IndexTuple *) palloc(sizeof(IndexTuple) * 2); ! ndownlinks = 0; ! ! buf = stack->buffer; ! ! /* ! * Read the chain of split pages, following the rightlinks. Construct ! * a downlink tuple for each page. ! */ ! for (;;) { ! OffsetNumber maxoff; ! OffsetNumber offset; ! IndexTuple downlink = NULL; ! page = BufferGetPage(buf); ! ! /* Form the new downlink tuples to insert to parent */ ! maxoff = PageGetMaxOffsetNumber(page); ! ! for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset)) ! { ! IndexTuple ituple = (IndexTuple) ! PageGetItem(stack->page, PageGetItemId(stack->page, offset)); ! if (downlink == NULL) ! downlink = CopyIndexTuple(ituple); ! else ! { ! IndexTuple newdownlink; ! newdownlink = gistgetadjusted(state->r, downlink, ituple, ! giststate); ! if (newdownlink) ! downlink = newdownlink; ! } ! } ! if (downlink) ! { ! ItemPointerSetBlockNumber(&(downlink->t_tid), ! BufferGetBlockNumber(buf)); ! GistTupleSetValid(downlink); ! ! downlinks = repalloc(downlinks, sizeof(IndexTuple) * (ndownlinks + 1)); ! downlinks[ndownlinks++] = downlink; ! } ! else { /* ! * If the page is completely empty, we can't form a downlink to ! * insert to the parent. And we don't really want to add empty ! * pages to the tree anyway. So we simply leave it orphaned. */ } ! childbufs = repalloc(childbufs, sizeof(Buffer) * (nchildbufs + 1)); ! childbufs[nchildbufs++] = buf; ! if (GistFollowRight(page)) ! { ! /* lock next page */ ! buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink); ! LockBuffer(buf, GIST_EXCLUSIVE); ! } ! else break; + } + + /* lock parent */ + gistFindCorrectParent(state->r, stack); + stack = stack->parent; + + /* Insert the downlinks */ + state->stack = stack; + gistinsert_common(state, giststate, downlinks, ndownlinks, true, + childbufs, nchildbufs); + } + + /* + * Insert the given tuple to the current page (state->stack->page). + * If the page has to be split, state->stack is popped so that + * state->stack->page points to the first non-split parent, and returns true. + * If the stack is not modified, returns false. + * + * If 'isupdate' is true, the tuple replaces the tuple at + * state->stack->childoffnum, otherwise it is inserted as new. + * + * The caller must hold a pin and exclusive lock on state->stack->buffer. + * On return, we will (continue to) hold a pin and exclusive lock on + * state->stack->buffer, but it might not be the same buffer any more. + */ + static bool + gistinserthere(GISTInsertState *state, + GISTSTATE *giststate, IndexTuple itup, bool isupdate) + { + return gistinsert_common(state, giststate, &itup, 1, isupdate, NULL, 0); + } + + /* + * Common code for gistinserthere and gistfixsplit. Inserts 'tuples' on + * to current page (stack->page). The NSNs are set and F_FOLLOW_RIGHT + * flags cleared in 'oldchildren'. 'oldchildren' are released, and if + * the current page is split, the stack is modified. + * + * ntuples==0 is also accepted. If isupdate is also true, that amounts to + * deleting a tuple. That special case can happen if we're fixing an + * incomplete split, and vacuum has already removed all tuples from the + * incompletely split pages. In any case, the children buffers are updated + * and a WAL record is written. + */ + static bool + gistinsert_common(GISTInsertState *state, GISTSTATE *giststate, + IndexTuple *tuples, int ntuples, bool isupdate, + Buffer *oldchildren, int noldchildren) + { + GISTInsertStack *stack = state->stack; + bool walked_up = false; + /* + * Insert the new entry to the chosen target page. If it doesn't + * fit, split the page, and recursively split parent pages as high + * as necessary. + */ + while (true) + { /* ! * Refuse to insert to a page that's incompletely split. This should ! * not happen in practice because we finish any incomplete splits ! * while we walk down the tree. However, it's remotely possible that ! * another concurrent inserter splits a parent page, and errors out ! * before completing the split. We will just throw an error in that ! * case, and leave any split we had in progress unfinished too. The ! * next insert that comes along will clean up the mess. */ ! if (GistFollowRight(stack->page)) ! elog(ERROR, "concurrent GiST page split was incomplete"); ! ! state->stack = stack; ! if (gistplacetopage(state, giststate, ! tuples, ntuples, isupdate, ! oldchildren, noldchildren, ! &oldchildren, &noldchildren, ! &tuples, &ntuples)) ! walked_up = true; ! stack = state->stack; ! ! /* ! * If the page was split, we have to recurse to the parent to insert ! * new entries for the new child pages, and update the entry for ! * the original child page that was split. ! */ ! if (ntuples != 0) { ! /* ! * The downlink to this page might have migrated to a ! * different page if the parent page was split. Find and lock ! * the page currently holding the downlink. ! */ ! gistFindCorrectParent(state->r, stack); ! /* pop parent page from stack */ ! stack = stack->parent; ! isupdate = true; ! } ! else ! { ! break; } } /* while */ ! state->stack = stack; ! return walked_up; } /* *************** *** 963,970 **** gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nright; res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist)); ! res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false) ! : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) --- 1238,1244 ---- ROTATEDIST(res); res->block.num = v.splitVector.spl_nright; res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist)); ! res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false); } if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) *************** *** 986,1038 **** gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nleft; res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist)); ! res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false) ! : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } return res; } - /* - * buffer must be pinned and locked by caller - */ - void - gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key) - { - Page page; - - Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); - page = BufferGetPage(buffer); - - START_CRIT_SECTION(); - - GISTInitBuffer(buffer, 0); - gistfillbuffer(page, itup, len, FirstOffsetNumber); - - MarkBufferDirty(buffer); - - if (!r->rd_istemp) - { - XLogRecPtr recptr; - XLogRecData *rdata; - - rdata = formUpdateRdata(r->rd_node, buffer, - NULL, 0, - itup, len, key); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - } - else - PageSetLSN(page, GetXLogRecPtrForTemp()); - - END_CRIT_SECTION(); - } - - /* - * Fill a GISTSTATE with information about the index - */ void initGISTstate(GISTSTATE *giststate, Relation index) { --- 1260,1271 ---- ROTATEDIST(res); res->block.num = v.splitVector.spl_nleft; res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist)); ! res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false); } return res; } void initGISTstate(GISTSTATE *giststate, Relation index) { *** a/src/backend/access/gist/gistget.c --- b/src/backend/access/gist/gistget.c *************** *** 254,262 **** gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances, page = BufferGetPage(buffer); opaque = GistPageGetOpaque(page); ! /* check if page split occurred since visit to parent */ if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) && ! XLByteLT(pageItem->data.parentlsn, opaque->nsn) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* There was a page split, follow right link to add pages */ --- 254,268 ---- page = BufferGetPage(buffer); opaque = GistPageGetOpaque(page); ! /* ! * Check if we need to follow the rightlink. We need to follow it if the ! * page was concurrently split since we visited the parent (in which case ! * parentlsn < nsn), or if the the system crashed after a page split but ! * before the downlink was inserted into the parent. ! */ if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) && ! (GistFollowRight(page) || ! XLByteLT(pageItem->data.parentlsn, opaque->nsn)) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* There was a page split, follow right link to add pages */ *** a/src/backend/access/gist/gistsplit.c --- b/src/backend/access/gist/gistsplit.c *************** *** 500,557 **** gistSplitHalf(GIST_SPLITVEC *v, int len) } /* - * if it was invalid tuple then we need special processing. - * We move all invalid tuples on right page. - * - * if there is no place on left page, gistSplit will be called one more - * time for left page. - * - * Normally, we never exec this code, but after crash replay it's possible - * to get 'invalid' tuples (probability is low enough) - */ - static void - gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len) - { - int i; - static OffsetNumber offInvTuples[MaxOffsetNumber]; - int nOffInvTuples = 0; - - for (i = 1; i <= len; i++) - if (GistTupleIsInvalid(itup[i - 1])) - offInvTuples[nOffInvTuples++] = i; - - if (nOffInvTuples == len) - { - /* corner case, all tuples are invalid */ - v->spl_rightvalid = v->spl_leftvalid = false; - gistSplitHalf(&v->splitVector, len); - } - else - { - GistSplitUnion gsvp; - - v->splitVector.spl_right = offInvTuples; - v->splitVector.spl_nright = nOffInvTuples; - v->spl_rightvalid = false; - - v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); - v->splitVector.spl_nleft = 0; - for (i = 1; i <= len; i++) - if (!GistTupleIsInvalid(itup[i - 1])) - v->splitVector.spl_left[v->splitVector.spl_nleft++] = i; - v->spl_leftvalid = true; - - gsvp.equiv = NULL; - gsvp.attr = v->spl_lattr; - gsvp.len = v->splitVector.spl_nleft; - gsvp.entries = v->splitVector.spl_left; - gsvp.isnull = v->spl_lisnull; - - gistunionsubkeyvec(giststate, itup, &gsvp, 0); - } - } - - /* * trys to split page by attno key, in a case of null * values move its to separate page. */ --- 500,505 ---- *************** *** 568,579 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist Datum datum; bool IsNull; - if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) - { - gistSplitByInvalid(giststate, v, itup, len); - return; - } - datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, attno, &(entryvec->vector[i]), datum, r, page, i, --- 516,521 ---- *************** *** 582,589 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist offNullTuples[nOffNullTuples++] = i; } - v->spl_leftvalid = v->spl_rightvalid = true; - if (nOffNullTuples == len) { /* --- 524,529 ---- *** a/src/backend/access/gist/gistutil.c --- b/src/backend/access/gist/gistutil.c *************** *** 152,158 **** gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) * invalid tuple. Resulting Datums aren't compressed. */ ! bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull) { --- 152,158 ---- * invalid tuple. Resulting Datums aren't compressed. */ ! void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull) { *************** *** 180,189 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke Datum datum; bool IsNull; - if (GistTupleIsInvalid(itvec[j])) - return FALSE; /* signals that union with invalid tuple => - * result is invalid */ - datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); if (IsNull) continue; --- 180,185 ---- *************** *** 218,225 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke isnull[i] = FALSE; } } - - return TRUE; } /* --- 214,219 ---- *************** *** 231,238 **** gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); ! if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS)) ! return gist_form_invalid_tuple(InvalidBlockNumber); return gistFormTuple(giststate, r, attrS, isnullS, false); } --- 225,231 ---- { memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); ! gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS); return gistFormTuple(giststate, r, attrS, isnullS, false); } *************** *** 328,336 **** gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis IndexTuple newtup = NULL; int i; - if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup)) - return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid))); - gistDeCompressAtt(giststate, r, oldtup, NULL, (OffsetNumber) 0, oldentries, oldisnull); --- 321,326 ---- *************** *** 401,414 **** gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ int j; IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup)) - { - ereport(LOG, - (errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery", - RelationGetRelationName(r)))); - continue; - } - sum_grow = 0; for (j = 0; j < r->rd_att->natts; j++) { --- 391,396 ---- *************** *** 521,527 **** gistFormTuple(GISTSTATE *giststate, Relation r, } res = index_form_tuple(giststate->tupdesc, compatt, isnull); ! GistTupleSetValid(res); return res; } --- 503,513 ---- } res = index_form_tuple(giststate->tupdesc, compatt, isnull); ! /* ! * The offset number on tuples on internal pages is unused. For historical ! * reasons, it is set 0xffff. ! */ ! ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff); return res; } *** a/src/backend/access/gist/gistvacuum.c --- b/src/backend/access/gist/gistvacuum.c *************** *** 26,38 **** #include "utils/memutils.h" - typedef struct GistBulkDeleteResult - { - IndexBulkDeleteResult std; /* common state */ - bool needReindex; - } GistBulkDeleteResult; - - /* * VACUUM cleanup: update FSM */ --- 26,31 ---- *************** *** 40,46 **** Datum gistvacuumcleanup(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); Relation rel = info->index; BlockNumber npages, blkno; --- 33,39 ---- gistvacuumcleanup(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); Relation rel = info->index; BlockNumber npages, blkno; *************** *** 56,65 **** gistvacuumcleanup(PG_FUNCTION_ARGS) /* Set up all-zero stats if gistbulkdelete wasn't called */ if (stats == NULL) { ! stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); /* use heap's tuple count */ ! stats->std.num_index_tuples = info->num_heap_tuples; ! stats->std.estimated_count = info->estimated_count; /* * XXX the above is wrong if index is partial. Would it be OK to just --- 49,58 ---- /* Set up all-zero stats if gistbulkdelete wasn't called */ if (stats == NULL) { ! stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); /* use heap's tuple count */ ! stats->num_index_tuples = info->num_heap_tuples; ! stats->estimated_count = info->estimated_count; /* * XXX the above is wrong if index is partial. Would it be OK to just *************** *** 67,77 **** gistvacuumcleanup(PG_FUNCTION_ARGS) */ } - if (stats->needReindex) - ereport(NOTICE, - (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery", - RelationGetRelationName(rel)))); - /* * Need lock unless it's local to this backend. */ --- 60,65 ---- *************** *** 112,121 **** gistvacuumcleanup(PG_FUNCTION_ARGS) IndexFreeSpaceMapVacuum(info->index); /* return statistics */ ! stats->std.pages_free = totFreePages; if (needLock) LockRelationForExtension(rel, ExclusiveLock); ! stats->std.num_pages = RelationGetNumberOfBlocks(rel); if (needLock) UnlockRelationForExtension(rel, ExclusiveLock); --- 100,109 ---- IndexFreeSpaceMapVacuum(info->index); /* return statistics */ ! stats->pages_free = totFreePages; if (needLock) LockRelationForExtension(rel, ExclusiveLock); ! stats->num_pages = RelationGetNumberOfBlocks(rel); if (needLock) UnlockRelationForExtension(rel, ExclusiveLock); *************** *** 135,141 **** pushStackIfSplited(Page page, GistBDItem *stack) GISTPageOpaque opaque = GistPageGetOpaque(page); if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) && ! XLByteLT(stack->parentlsn, opaque->nsn) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* split page detected, install right link to the stack */ --- 123,129 ---- GISTPageOpaque opaque = GistPageGetOpaque(page); if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) && ! (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* split page detected, install right link to the stack */ *************** *** 162,168 **** Datum gistbulkdelete(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); void *callback_state = (void *) PG_GETARG_POINTER(3); Relation rel = info->index; --- 150,156 ---- gistbulkdelete(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); void *callback_state = (void *) PG_GETARG_POINTER(3); Relation rel = info->index; *************** *** 171,180 **** gistbulkdelete(PG_FUNCTION_ARGS) /* first time through? */ if (stats == NULL) ! stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); /* we'll re-count the tuples each time */ ! stats->std.estimated_count = false; ! stats->std.num_index_tuples = 0; stack = (GistBDItem *) palloc0(sizeof(GistBDItem)); stack->blkno = GIST_ROOT_BLKNO; --- 159,168 ---- /* first time through? */ if (stats == NULL) ! stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); /* we'll re-count the tuples each time */ ! stats->estimated_count = false; ! stats->num_index_tuples = 0; stack = (GistBDItem *) palloc0(sizeof(GistBDItem)); stack->blkno = GIST_ROOT_BLKNO; *************** *** 232,241 **** gistbulkdelete(PG_FUNCTION_ARGS) { todelete[ntodelete] = i - ntodelete; ntodelete++; ! stats->std.tuples_removed += 1; } else ! stats->std.num_index_tuples += 1; } if (ntodelete) --- 220,229 ---- { todelete[ntodelete] = i - ntodelete; ntodelete++; ! stats->tuples_removed += 1; } else ! stats->num_index_tuples += 1; } if (ntodelete) *************** *** 250,271 **** gistbulkdelete(PG_FUNCTION_ARGS) if (!rel->rd_istemp) { - XLogRecData *rdata; XLogRecPtr recptr; - gistxlogPageUpdate *xlinfo; ! rdata = formUpdateRdata(rel->rd_node, buffer, todelete, ntodelete, NULL, 0, ! NULL); ! xlinfo = (gistxlogPageUpdate *) rdata->next->data; ! ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - - pfree(xlinfo); - pfree(rdata); } else PageSetLSN(page, GetXLogRecPtrForTemp()); --- 238,251 ---- if (!rel->rd_istemp) { XLogRecPtr recptr; ! recptr = gistXLogUpdate(rel->rd_node, buffer, todelete, ntodelete, NULL, 0, ! NULL, 0); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } else PageSetLSN(page, GetXLogRecPtrForTemp()); *************** *** 293,299 **** gistbulkdelete(PG_FUNCTION_ARGS) stack->next = ptr; if (GistTupleIsInvalid(idxtuple)) ! stats->needReindex = true; } } --- 273,283 ---- stack->next = ptr; if (GistTupleIsInvalid(idxtuple)) ! ereport(LOG, ! (errmsg("index \"%s\" contains an inner tuple marked as invalid", ! RelationGetRelationName(rel)), ! errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."), ! errhint("Please REINDEX it."))); } } *** a/src/backend/access/gist/gistxlog.c --- b/src/backend/access/gist/gistxlog.c *************** *** 20,34 **** #include "utils/memutils.h" #include "utils/rel.h" - - typedef struct - { - gistxlogPageUpdate *data; - int len; - IndexTuple *itup; - OffsetNumber *todelete; - } PageUpdateRecord; - typedef struct { gistxlogPage *header; --- 20,25 ---- *************** *** 41,214 **** typedef struct NewPage *page; } PageSplitRecord; - /* track for incomplete inserts, idea was taken from nbtxlog.c */ - - typedef struct gistIncompleteInsert - { - RelFileNode node; - BlockNumber origblkno; /* for splits */ - ItemPointerData key; - int lenblk; - BlockNumber *blkno; - XLogRecPtr lsn; - BlockNumber *path; - int pathlen; - } gistIncompleteInsert; - - static MemoryContext opCtx; /* working memory for operations */ - static MemoryContext insertCtx; /* holds incomplete_inserts list */ - static List *incomplete_inserts; - - - #define ItemPointerEQ(a, b) \ - ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ - ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) ) - ! static void ! pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, ! BlockNumber *blkno, int lenblk, ! PageSplitRecord *xlinfo /* to extract blkno info */ ) { ! MemoryContext oldCxt; ! gistIncompleteInsert *ninsert; ! ! if (!ItemPointerIsValid(&key)) ! ! /* ! * if key is null then we should not store insertion as incomplete, ! * because it's a vacuum operation.. ! */ ! return; ! ! oldCxt = MemoryContextSwitchTo(insertCtx); ! ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); ! ninsert->node = node; ! ninsert->key = key; ! ninsert->lsn = lsn; ! ! if (lenblk && blkno) ! { ! ninsert->lenblk = lenblk; ! ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); ! memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk); ! ninsert->origblkno = *blkno; ! } ! else { ! int i; ! ! Assert(xlinfo); ! ninsert->lenblk = xlinfo->data->npage; ! ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); ! for (i = 0; i < ninsert->lenblk; i++) ! ninsert->blkno[i] = xlinfo->page[i].header->blkno; ! ninsert->origblkno = xlinfo->data->origblkno; ! } ! Assert(ninsert->lenblk > 0); ! /* ! * Stick the new incomplete insert onto the front of the list, not the ! * back. This is so that gist_xlog_cleanup will process incompletions in ! * last-in-first-out order. ! */ ! incomplete_inserts = lcons(ninsert, incomplete_inserts); ! ! MemoryContextSwitchTo(oldCxt); ! } ! ! static void ! forgetIncompleteInsert(RelFileNode node, ItemPointerData key) ! { ! ListCell *l; ! if (!ItemPointerIsValid(&key)) ! return; ! if (incomplete_inserts == NIL) ! return; ! foreach(l, incomplete_inserts) ! { ! gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); ! ! if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) { ! /* found */ ! incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); ! pfree(insert->blkno); ! pfree(insert); ! break; } - } - } - - static void - decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) - { - char *begin = XLogRecGetData(record), - *ptr; - int i = 0, - addpath = 0; - - decoded->data = (gistxlogPageUpdate *) begin; - - if (decoded->data->ntodelete) - { - decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); - addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); - } - else - decoded->todelete = NULL; - - decoded->len = 0; - ptr = begin + sizeof(gistxlogPageUpdate) + addpath; - while (ptr - begin < record->xl_len) - { - decoded->len++; - ptr += IndexTupleSize((IndexTuple) ptr); - } ! decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); ! ptr = begin + sizeof(gistxlogPageUpdate) + addpath; ! while (ptr - begin < record->xl_len) ! { ! decoded->itup[i] = (IndexTuple) ptr; ! ptr += IndexTupleSize(decoded->itup[i]); ! i++; } } /* * redo any page update (except page split) */ static void ! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { ! gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); ! PageUpdateRecord xlrec; Buffer buffer; Page page; ! /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ ! forgetIncompleteInsert(xldata->node, xldata->key); ! ! if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) ! /* operation with root always finalizes insertion */ ! pushIncompleteInsert(xldata->node, lsn, xldata->key, ! &(xldata->blkno), 1, ! NULL); ! /* nothing else to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; ! decodePageUpdateRecord(&xlrec, record); ! ! buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); --- 32,116 ---- NewPage *page; } PageSplitRecord; static MemoryContext opCtx; /* working memory for operations */ ! /* ! * Replay the clearing of F_FOLLOW_RIGHT flags. 'data' points to an array ! * of n BlockNumbers. Returns a pointer to just past the array. ! */ ! static char * ! gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, ! char *data, int n) { ! int i; ! XLogRecPtr oldnsn = { 0, 0 }; ! for (i = 0; i < n; i++) { ! BlockNumber blkno; ! Buffer buffer; ! Page page; ! /* use memcpy in case the input isn't aligned */ ! memcpy(&blkno, data, sizeof(BlockNumber)); ! data += sizeof(BlockNumber); ! buffer = XLogReadBuffer(node, blkno, false); ! if (!BufferIsValid(buffer)) ! continue; ! page = (Page) BufferGetPage(buffer); ! if (i == 0) ! oldnsn = GistPageGetOpaque(page)->nsn; ! /* ! * Note that we still update the NSN if page LSN is equal to the LSN ! * of this record, because the updated NSN is not included in the full ! * page image. ! */ ! if (XLByteLT(lsn, PageGetLSN(page))) { ! UnlockReleaseBuffer(buffer); ! continue; } ! /* ! * Like in the non-recovery codepath, on the rightmost page set NSN ! * to the old NSN of the first page, and NSN of all the other pages ! * to the LSN of the parent. Clear F_FOLLOW_RIGHT flag on all pages. ! */ ! GistPageGetOpaque(page)->nsn = (i == n - 1) ? oldnsn : lsn; ! GistClearFollowRight(page); ! PageSetLSN(page, lsn); ! PageSetTLI(page, ThisTimeLineID); ! MarkBufferDirty(buffer); ! UnlockReleaseBuffer(buffer); } + return data; } /* * redo any page update (except page split) */ static void ! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) { ! char *begin = XLogRecGetData(record); ! gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin; Buffer buffer; Page page; + char *data; ! data = gistRedoClearFollowRight(xldata->node, lsn, ! (char *) (&xldata->nclearFollowRight) + sizeof(uint16), ! xldata->nclearFollowRight); ! /* nothing more to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); *************** *** 219,246 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) return; } ! if (isnewroot) ! GISTInitBuffer(buffer, 0); ! else if (xlrec.data->ntodelete) { int i; ! for (i = 0; i < xlrec.data->ntodelete; i++) ! PageIndexTupleDelete(page, xlrec.todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ ! if (xlrec.len > 0) ! gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber); ! ! /* ! * special case: leafpage, nothing to insert, nothing to delete, then ! * vacuum marks page ! */ ! if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) ! GistClearTuplesDeleted(page); if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) --- 121,167 ---- return; } ! /* Delete old tuples */ ! if (xldata->ntodelete > 0) { int i; + OffsetNumber *todelete = (OffsetNumber *) data; + data += sizeof(OffsetNumber) * xldata->ntodelete; ! for (i = 0; i < xldata->ntodelete; i++) ! PageIndexTupleDelete(page, todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ ! if (data - begin < record->xl_len) ! { ! OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : ! OffsetNumberNext(PageGetMaxOffsetNumber(page)); ! while (data - begin < record->xl_len) ! { ! IndexTuple itup = (IndexTuple) data; ! Size sz = IndexTupleSize(itup); ! OffsetNumber l; ! data += sz; ! ! l = PageAddItem(page, (Item) itup, sz, off, false, false); ! if (l == InvalidOffsetNumber) ! elog(ERROR, "failed to add item to GiST index page, size %d bytes", ! (int) sz); ! off++; ! } ! } ! else ! { ! /* ! * special case: leafpage, nothing to insert, nothing to delete, then ! * vacuum marks page ! */ ! if (GistPageIsLeaf(page) && xldata->ntodelete == 0) ! GistClearTuplesDeleted(page); ! } if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) *************** *** 282,298 **** gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) } static void ! decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { ! char *begin = XLogRecGetData(record), ! *ptr; int j, i = 0; decoded->data = (gistxlogPageSplit *) begin; decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage); - ptr = begin + sizeof(gistxlogPageSplit); for (i = 0; i < decoded->data->npage; i++) { Assert(ptr - begin < record->xl_len); --- 203,217 ---- } static void ! decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record, char *ptr) { ! char *begin = XLogRecGetData(record); int j, i = 0; decoded->data = (gistxlogPageSplit *) begin; decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage); for (i = 0; i < decoded->data->npage; i++) { Assert(ptr - begin < record->xl_len); *************** *** 315,327 **** decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { PageSplitRecord xlrec; Buffer buffer; Page page; int i; int flags; ! decodePageSplitRecord(&xlrec, record); flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ --- 234,252 ---- static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { + gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); PageSplitRecord xlrec; Buffer buffer; Page page; int i; int flags; + bool isrootsplit = false; + char *data; ! data = gistRedoClearFollowRight(xldata->node, lsn, ! ((char *) xldata) + sizeof(gistxlogPageSplit), ! xldata->nclearFollowRight); ! decodePageSplitRecord(&xlrec, record, data); flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ *************** *** 329,334 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) --- 254,265 ---- { NewPage *newpage = xlrec.page + i; + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + Assert(i == 0); + isrootsplit = true; + } + buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 339,355 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } - - forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - - pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, - NULL, 0, - &xlrec); } static void --- 270,285 ---- /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); + if (i < xlrec.data->npage - 1 && !isrootsplit) + GistMarkFollowRight(page); + else + GistClearFollowRight(page); + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } } static void *************** *** 372,395 **** gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); } - static void - gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) - { - char *begin = XLogRecGetData(record), - *ptr; - gistxlogInsertComplete *xlrec; - - xlrec = (gistxlogInsertComplete *) begin; - - ptr = begin + sizeof(gistxlogInsertComplete); - while (ptr - begin < record->xl_len) - { - Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData)); - forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr)); - ptr += sizeof(ItemPointerData); - } - } - void gist_redo(XLogRecPtr lsn, XLogRecord *record) { --- 302,307 ---- *************** *** 401,430 **** gist_redo(XLogRecPtr lsn, XLogRecord *record) * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ - RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: ! gistRedoPageUpdateRecord(lsn, record, false); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; - case XLOG_GIST_NEW_ROOT: - gistRedoPageUpdateRecord(lsn, record, true); - break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; - case XLOG_GIST_INSERT_COMPLETE: - gistRedoCompleteInsert(lsn, record); - break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } --- 313,335 ---- * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: ! gistRedoPageUpdateRecord(lsn, record); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } *************** *** 434,453 **** gist_redo(XLogRecPtr lsn, XLogRecord *record) } static void ! out_target(StringInfo buf, RelFileNode node, ItemPointerData key) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); - if (ItemPointerIsValid(&key)) - appendStringInfo(buf, "; tid %u/%u", - ItemPointerGetBlockNumber(&key), - ItemPointerGetOffsetNumber(&key)); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { ! out_target(buf, xlrec->node, xlrec->key); appendStringInfo(buf, "; block number %u", xlrec->blkno); } --- 339,354 ---- } static void ! out_target(StringInfo buf, RelFileNode node) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { ! out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u", xlrec->blkno); } *************** *** 463,469 **** static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); ! out_target(buf, xlrec->node, xlrec->key); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } --- 364,370 ---- out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); ! out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } *************** *** 482,491 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec) case XLOG_GIST_PAGE_DELETE: out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); break; - case XLOG_GIST_NEW_ROOT: - appendStringInfo(buf, "new_root: "); - out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); - break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; --- 383,388 ---- *************** *** 495,861 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec) ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; - case XLOG_GIST_INSERT_COMPLETE: - appendStringInfo(buf, "complete_insert: rel %u/%u/%u", - ((gistxlogInsertComplete *) rec)->node.spcNode, - ((gistxlogInsertComplete *) rec)->node.dbNode, - ((gistxlogInsertComplete *) rec)->node.relNode); - break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } - IndexTuple - gist_form_invalid_tuple(BlockNumber blkno) - { - /* - * we don't alloc space for null's bitmap, this is invalid tuple, be - * carefull in read and write code - */ - Size size = IndexInfoFindDataOffset(0); - IndexTuple tuple = (IndexTuple) palloc0(size); - - tuple->t_info |= size; - - ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); - GistTupleSetInvalid(tuple); - - return tuple; - } - - - static void - gistxlogFindPath(Relation index, gistIncompleteInsert *insert) - { - GISTInsertStack *top; - - insert->pathlen = 0; - insert->path = NULL; - - if ((top = gistFindPath(index, insert->origblkno)) != NULL) - { - int i; - GISTInsertStack *ptr; - - for (ptr = top; ptr; ptr = ptr->parent) - insert->pathlen++; - - insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); - - i = 0; - for (ptr = top; ptr; ptr = ptr->parent) - insert->path[i++] = ptr->blkno; - } - else - elog(ERROR, "lost parent for block %u", insert->origblkno); - } - - static SplitedPageLayout * - gistMakePageLayout(Buffer *buffers, int nbuffers) - { - SplitedPageLayout *res = NULL, - *resptr; - - while (nbuffers-- > 0) - { - Page page = BufferGetPage(buffers[nbuffers]); - IndexTuple *vec; - int veclen; - - resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout)); - - resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]); - resptr->block.num = PageGetMaxOffsetNumber(page); - - vec = gistextractpage(page, &veclen); - resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist)); - - resptr->next = res; - res = resptr; - } - - return res; - } - - /* - * Continue insert after crash. In normal situations, there aren't any - * incomplete inserts, but if a crash occurs partway through an insertion - * sequence, we'll need to finish making the index valid at the end of WAL - * replay. - * - * Note that we assume the index is now in a valid state, except for the - * unfinished insertion. In particular it's safe to invoke gistFindPath(); - * there shouldn't be any garbage pages for it to run into. - * - * To complete insert we can't use basic insertion algorithm because - * during insertion we can't call user-defined support functions of opclass. - * So, we insert 'invalid' tuples without real key and do it by separate algorithm. - * 'invalid' tuple should be updated by vacuum full. - */ - static void - gistContinueInsert(gistIncompleteInsert *insert) - { - IndexTuple *itup; - int i, - lenitup; - Relation index; - - index = CreateFakeRelcacheEntry(insert->node); - - /* - * needed vector itup never will be more than initial lenblkno+2, because - * during this processing Indextuple can be only smaller - */ - lenitup = insert->lenblk; - itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->lenblk; i++) - itup[i] = gist_form_invalid_tuple(insert->blkno[i]); - - /* - * any insertion of itup[] should make LOG message about - */ - - if (insert->origblkno == GIST_ROOT_BLKNO) - { - /* - * it was split root, so we should only make new root. it can't be - * simple insert into root, we should replace all content of root. - */ - Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true); - - gistnewroot(index, buffer, itup, lenitup, NULL); - UnlockReleaseBuffer(buffer); - } - else - { - Buffer *buffers; - Page *pages; - int numbuffer; - OffsetNumber *todelete; - - /* construct path */ - gistxlogFindPath(index, insert); - - Assert(insert->pathlen > 0); - - buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); - pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); - todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->pathlen; i++) - { - int j, - k, - pituplen = 0; - uint8 xlinfo; - XLogRecData *rdata; - XLogRecPtr recptr; - Buffer tempbuffer = InvalidBuffer; - int ntodelete = 0; - - numbuffer = 1; - buffers[0] = ReadBuffer(index, insert->path[i]); - LockBuffer(buffers[0], GIST_EXCLUSIVE); - - /* - * we check buffer, because we restored page earlier - */ - gistcheckpage(index, buffers[0]); - - pages[0] = BufferGetPage(buffers[0]); - Assert(!GistPageIsLeaf(pages[0])); - - pituplen = PageGetMaxOffsetNumber(pages[0]); - - /* find remove old IndexTuples to remove */ - for (j = 0; j < pituplen && ntodelete < lenitup; j++) - { - BlockNumber blkno; - ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber); - IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid); - - blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); - - for (k = 0; k < lenitup; k++) - if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) - { - todelete[ntodelete] = j + FirstOffsetNumber - ntodelete; - ntodelete++; - break; - } - } - - if (ntodelete == 0) - elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)"); - - /* - * we check space with subtraction only first tuple to delete, - * hope, that wiil be enough space.... - */ - - if (gistnospace(pages[0], itup, lenitup, *todelete, 0)) - { - - /* no space left on page, so we must split */ - buffers[numbuffer] = ReadBuffer(index, P_NEW); - LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); - GISTInitBuffer(buffers[numbuffer], 0); - pages[numbuffer] = BufferGetPage(buffers[numbuffer]); - gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber); - numbuffer++; - - if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) - { - Buffer tmp; - - /* - * we split root, just copy content from root to new page - */ - - /* sanity check */ - if (i + 1 != insert->pathlen) - elog(PANIC, "unexpected pathlen in index \"%s\"", - RelationGetRelationName(index)); - - /* fill new page, root will be changed later */ - tempbuffer = ReadBuffer(index, P_NEW); - LockBuffer(tempbuffer, GIST_EXCLUSIVE); - memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer)); - - /* swap buffers[0] (was root) and temp buffer */ - tmp = buffers[0]; - buffers[0] = tempbuffer; - tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, - * it is still unchanged */ - - pages[0] = BufferGetPage(buffers[0]); - } - - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - - xlinfo = XLOG_GIST_PAGE_SPLIT; - rdata = formSplitRdata(index->rd_node, insert->path[i], - false, &(insert->key), - gistMakePageLayout(buffers, numbuffer)); - - } - else - { - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber); - - xlinfo = XLOG_GIST_PAGE_UPDATE; - rdata = formUpdateRdata(index->rd_node, buffers[0], - todelete, ntodelete, - itup, lenitup, &(insert->key)); - } - - /* - * use insert->key as mark for completion of insert (form*Rdata() - * above) for following possible replays - */ - - /* write pages, we should mark it dirty befor XLogInsert() */ - for (j = 0; j < numbuffer; j++) - { - GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; - MarkBufferDirty(buffers[j]); - } - recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata); - for (j = 0; j < numbuffer; j++) - { - PageSetLSN(pages[j], recptr); - PageSetTLI(pages[j], ThisTimeLineID); - } - - END_CRIT_SECTION(); - - lenitup = numbuffer; - for (j = 0; j < numbuffer; j++) - { - itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); - UnlockReleaseBuffer(buffers[j]); - } - - if (tempbuffer != InvalidBuffer) - { - /* - * it was a root split, so fill it by new values - */ - gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key)); - UnlockReleaseBuffer(tempbuffer); - } - } - } - - FreeFakeRelcacheEntry(index); - - ereport(LOG, - (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", - insert->node.spcNode, insert->node.dbNode, insert->node.relNode), - errdetail("Incomplete insertion detected during crash replay."))); - } - void gist_xlog_startup(void) { - incomplete_inserts = NIL; - insertCtx = AllocSetContextCreate(CurrentMemoryContext, - "GiST recovery temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { - ListCell *l; - MemoryContext oldCxt; - - oldCxt = MemoryContextSwitchTo(opCtx); - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - gistContinueInsert(insert); - MemoryContextReset(opCtx); - } - MemoryContextSwitchTo(oldCxt); - MemoryContextDelete(opCtx); - MemoryContextDelete(insertCtx); - } - - bool - gist_safe_restartpoint(void) - { - if (incomplete_inserts) - return false; - return true; } ! ! XLogRecData * ! formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ! ItemPointer key, SplitedPageLayout *dist) { XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; int npage = 0, ! cur = 1; ptr = dist; while (ptr) --- 392,432 ---- ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } void gist_xlog_startup(void) { opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { MemoryContextDelete(opCtx); } ! XLogRecPtr ! gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ! SplitedPageLayout *dist, ! Buffer *clearFollowRight, int nclearFollowRight) { XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; int npage = 0, ! cur; ! int i; ! BlockNumber *clearFollowRightBlknos; ! XLogRecPtr recptr; ! ! clearFollowRightBlknos = palloc(sizeof(BlockNumber) * nclearFollowRight); ! for (i = 0; i < nclearFollowRight; i++) ! clearFollowRightBlknos[i] = BufferGetBlockNumber(clearFollowRight[i]); ptr = dist; while (ptr) *************** *** 864,884 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ptr = ptr->next; } ! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); xlrec->node = node; xlrec->origblkno = blkno; xlrec->origleaf = page_is_leaf; xlrec->npage = (uint16) npage; ! if (key) ! xlrec->key = *key; ! else ! ItemPointerSetInvalid(&(xlrec->key)); rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) xlrec; rdata[0].len = sizeof(gistxlogPageSplit); ! rdata[0].next = NULL; ptr = dist; while (ptr) --- 435,462 ---- ptr = ptr->next; } ! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 3 + nclearFollowRight)); xlrec->node = node; xlrec->origblkno = blkno; xlrec->origleaf = page_is_leaf; xlrec->npage = (uint16) npage; ! xlrec->nclearFollowRight = nclearFollowRight; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) xlrec; rdata[0].len = sizeof(gistxlogPageSplit); ! cur = 1; ! ! if (nclearFollowRight > 0) ! { ! rdata[cur - 1].next = &rdata[cur]; ! rdata[cur].buffer = InvalidBuffer; ! rdata[cur].data = (char *) clearFollowRightBlknos; ! rdata[cur].len = sizeof(BlockNumber) * nclearFollowRight; ! rdata[cur].next = NULL; ! cur++; ! } ptr = dist; while (ptr) *************** *** 898,904 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ptr = ptr->next; } ! return rdata; } /* --- 476,500 ---- ptr = ptr->next; } ! /* ! * full page images for the child bufs (necessary if a checkpoint happened ! * since splitting the child page) ! */ ! for (i = 0; i < nclearFollowRight; i++) ! { ! rdata[cur - 1].next = &(rdata[cur]); ! rdata[cur].data = NULL; ! rdata[cur].len = 0; ! rdata[cur].buffer = clearFollowRight[i]; ! rdata[cur].buffer_std = true; ! cur++; ! } ! rdata[cur - 1].next = NULL; ! ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); ! ! pfree(rdata); ! return recptr; } /* *************** *** 911,937 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ ! XLogRecData * ! formUpdateRdata(RelFileNode node, Buffer buffer, ! OffsetNumber *todelete, int ntodelete, ! IndexTuple *itup, int ituplen, ItemPointer key) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; ! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; ! ! if (key) ! xlrec->key = *key; ! else ! ItemPointerSetInvalid(&(xlrec->key)); rdata[0].buffer = buffer; rdata[0].buffer_std = true; --- 507,536 ---- * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ ! XLogRecPtr ! gistXLogUpdate(RelFileNode node, Buffer buffer, ! OffsetNumber *todelete, int ntodelete, ! IndexTuple *itup, int ituplen, ! Buffer *clearFollowRight, int nclearFollowRight) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; + BlockNumber *clearFollowRightBlknos; + XLogRecPtr recptr; ! clearFollowRightBlknos = palloc(sizeof(BlockNumber) * nclearFollowRight); ! for (i = 0; i < nclearFollowRight; i++) ! clearFollowRightBlknos[i] = BufferGetBlockNumber(clearFollowRight[i]); ! ! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen + nclearFollowRight)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; ! xlrec->nclearFollowRight = nclearFollowRight; rdata[0].buffer = buffer; rdata[0].buffer_std = true; *************** *** 944,957 **** formUpdateRdata(RelFileNode node, Buffer buffer, rdata[1].buffer = InvalidBuffer; rdata[1].next = &(rdata[2]); ! rdata[2].data = (char *) todelete; ! rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); ! rdata[2].buffer = buffer; ! rdata[2].buffer_std = true; ! rdata[2].next = NULL; /* new tuples */ ! cur = 3; for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); --- 543,561 ---- rdata[1].buffer = InvalidBuffer; rdata[1].next = &(rdata[2]); ! rdata[2].data = (char *) clearFollowRightBlknos; ! rdata[2].len = nclearFollowRight * sizeof(BlockNumber); ! rdata[2].buffer = InvalidBuffer; ! rdata[2].next = &(rdata[3]); ! ! rdata[3].data = (char *) todelete; ! rdata[3].len = sizeof(OffsetNumber) * ntodelete; ! rdata[3].buffer = buffer; ! rdata[3].buffer_std = true; ! rdata[3].next = NULL; /* new tuples */ ! cur = 4; for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); *************** *** 959,996 **** formUpdateRdata(RelFileNode node, Buffer buffer, rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; - rdata[cur].next = NULL; cur++; } ! return rdata; ! } ! ! XLogRecPtr ! gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) ! { ! gistxlogInsertComplete xlrec; ! XLogRecData rdata[2]; ! XLogRecPtr recptr; ! ! Assert(len > 0); ! xlrec.node = node; ! ! rdata[0].buffer = InvalidBuffer; ! rdata[0].data = (char *) &xlrec; ! rdata[0].len = sizeof(gistxlogInsertComplete); ! rdata[0].next = &(rdata[1]); ! ! rdata[1].buffer = InvalidBuffer; ! rdata[1].data = (char *) keys; ! rdata[1].len = sizeof(ItemPointerData) * len; ! rdata[1].next = NULL; ! ! START_CRIT_SECTION(); ! ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); ! ! END_CRIT_SECTION(); return recptr; } --- 563,587 ---- rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; cur++; } + /* + * full page images for the child bufs (necessary if a checkpoint happened + * since splitting the child page) + */ + for (i = 0; i < nclearFollowRight; i++) + { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = clearFollowRight[i]; + rdata[cur].buffer_std = true; + cur++; + } + rdata[cur - 1].next = NULL; ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + pfree(rdata); return recptr; } *** a/src/backend/access/transam/rmgr.c --- b/src/backend/access/transam/rmgr.c *************** *** 40,45 **** const RmgrData RmgrTable[RM_MAX_ID + 1] = { {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, {"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, ! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint}, {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} }; --- 40,45 ---- {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, {"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, ! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL}, {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} }; *** a/src/include/access/gist.h --- b/src/include/access/gist.h *************** *** 58,66 **** /* * Page opaque data in a GiST index page. */ ! #define F_LEAF (1 << 0) ! #define F_DELETED (1 << 1) ! #define F_TUPLES_DELETED (1 << 2) typedef XLogRecPtr GistNSN; --- 58,67 ---- /* * Page opaque data in a GiST index page. */ ! #define F_LEAF (1 << 0) /* leaf page */ ! #define F_DELETED (1 << 1) /* the page has been deleted */ ! #define F_TUPLES_DELETED (1 << 2) /* some tuples on the page are dead */ ! #define F_FOLLOW_RIGHT (1 << 3) /* page to the right has no downlink */ typedef XLogRecPtr GistNSN; *************** *** 132,137 **** typedef struct GISTENTRY --- 133,142 ---- #define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED) #define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED) + #define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT) + #define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT) + #define GistClearFollowRight(page) ( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT) + /* * Vector of GISTENTRY structs; user-defined methods union and picksplit * take it as one of their arguments *** a/src/include/access/gist_private.h --- b/src/include/access/gist_private.h *************** *** 132,140 **** typedef GISTScanOpaqueData *GISTScanOpaque; /* XLog stuff */ #define XLOG_GIST_PAGE_UPDATE 0x00 ! #define XLOG_GIST_NEW_ROOT 0x20 #define XLOG_GIST_PAGE_SPLIT 0x30 ! #define XLOG_GIST_INSERT_COMPLETE 0x40 #define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_PAGE_DELETE 0x60 --- 132,140 ---- /* XLog stuff */ #define XLOG_GIST_PAGE_UPDATE 0x00 ! /* #define XLOG_GIST_NEW_ROOT 0x20 */ /* not used anymore */ #define XLOG_GIST_PAGE_SPLIT 0x30 ! /* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */ #define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_PAGE_DELETE 0x60 *************** *** 143,173 **** typedef struct gistxlogPageUpdate RelFileNode node; BlockNumber blkno; - /* - * It used to identify completeness of insert. Sets to leaf itup - */ - ItemPointerData key; - /* number of deleted offsets */ uint16 ntodelete; /* ! * follow: 1. todelete OffsetNumbers 2. tuples to insert */ } gistxlogPageUpdate; typedef struct gistxlogPageSplit { RelFileNode node; ! BlockNumber origblkno; /* splitted page */ ! bool origleaf; /* was splitted page a leaf page? */ uint16 npage; - /* see comments on gistxlogPageUpdate */ - ItemPointerData key; - /* ! * follow: 1. gistxlogPage and array of IndexTupleData per page */ } gistxlogPageSplit; --- 143,173 ---- RelFileNode node; BlockNumber blkno; /* number of deleted offsets */ uint16 ntodelete; + uint16 nclearFollowRight; + /* ! * follow: ! * 1. nclearFollowRight BlockNumbers ! * 2. todelete OffsetNumbers ! * 3. tuples to insert */ } gistxlogPageUpdate; typedef struct gistxlogPageSplit { RelFileNode node; ! BlockNumber origblkno; ! bool origleaf; ! uint16 nclearFollowRight; uint16 npage; /* ! * follow: ! * 1. nclearFollowRight BlockNumbers ! * 2. gistxlogPage and array of IndexTupleData per page */ } gistxlogPageSplit; *************** *** 177,188 **** typedef struct gistxlogPage int num; /* number of index tuples following */ } gistxlogPage; - typedef struct gistxlogInsertComplete - { - RelFileNode node; - /* follows ItemPointerData key to clean */ - } gistxlogInsertComplete; - typedef struct gistxlogPageDelete { RelFileNode node; --- 177,182 ---- *************** *** 206,212 **** typedef struct SplitedPageLayout * GISTInsertStack used for locking buffers and transfer arguments during * insertion */ - typedef struct GISTInsertStack { /* current page */ --- 200,205 ---- *************** *** 215,221 **** typedef struct GISTInsertStack Page page; /* ! * log sequence number from page->lsn to recognize page update and * compare it with page's nsn to recognize page split */ GistNSN lsn; --- 208,214 ---- Page page; /* ! * log sequence number from page->lsn to recognize page update and * compare it with page's nsn to recognize page split */ GistNSN lsn; *************** *** 223,231 **** typedef struct GISTInsertStack /* child's offset */ OffsetNumber childoffnum; ! /* pointer to parent and child */ struct GISTInsertStack *parent; - struct GISTInsertStack *child; /* for gistFindPath */ struct GISTInsertStack *next; --- 216,223 ---- /* child's offset */ OffsetNumber childoffnum; ! /* pointer to parent */ struct GISTInsertStack *parent; /* for gistFindPath */ struct GISTInsertStack *next; *************** *** 238,249 **** typedef struct GistSplitVector Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in * spl_left */ bool spl_lisnull[INDEX_MAX_KEYS]; - bool spl_leftvalid; Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in * spl_right */ bool spl_risnull[INDEX_MAX_KEYS]; - bool spl_rightvalid; bool *spl_equiv; /* equivalent tuples which can be freely * distributed between left and right pages */ --- 230,239 ---- *************** *** 252,279 **** typedef struct GistSplitVector typedef struct { Relation r; - IndexTuple *itup; /* in/out, points to compressed entry */ - int ituplen; /* length of itup */ Size freespace; /* free space to be left */ - GISTInsertStack *stack; - bool needInsertComplete; ! /* pointer to heap tuple */ ! ItemPointerData key; } GISTInsertState; /* root page of a gist index */ #define GIST_ROOT_BLKNO 0 /* ! * mark tuples on inner pages during recovery */ #define TUPLE_IS_VALID 0xffff #define TUPLE_IS_INVALID 0xfffe #define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID ) #define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID ) - #define GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID ) /* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); --- 242,281 ---- typedef struct { Relation r; Size freespace; /* free space to be left */ ! GISTInsertStack *stack; } GISTInsertState; /* root page of a gist index */ #define GIST_ROOT_BLKNO 0 /* ! * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner ! * pages to finish crash recovery of incomplete page splits. If a crash ! * happened in the middle of a page split, so that the downlink pointers were ! * not yet inserted, crash recovery inserted a special downlink pointer. The ! * semantics of an invalid tuple was that it if you encounter one in a scan, ! * it must always be followed, because we don't know if the tuples on the ! * child page match or not. ! * ! * We no longer create such invalid tuples, we now mark the left-half of such ! * an incomplete split with the F_FOLLOW_RIGHT flag instead, and finish the ! * split properly the next time we need to insert on that page. To retain ! * on-disk compatibility for the sake of pg_upgrade, we still store 0xffff as ! * the offset number of all inner tuples. If we encounter any invalid tuples ! * with 0xfffe during insertion, we throw an error, though scans still handle ! * them. You should only encounter invalid tuples if you pg_upgrade a pre-9.1 ! * gist index which already has invalid tuples in it because of a crash. That ! * should be rare, and you are recommended to REINDEX anyway if you have any ! * invalid tuples in an index, so throwing an error is as far as we go with ! * supporting that. */ #define TUPLE_IS_VALID 0xffff #define TUPLE_IS_INVALID 0xfffe #define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID ) #define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID ) /* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); *************** *** 281,288 **** extern Datum gistinsert(PG_FUNCTION_ARGS); extern MemoryContext createTempGistContext(void); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void freeGISTstate(GISTSTATE *giststate); - extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate); - extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key); extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate); --- 283,288 ---- *************** *** 294,311 **** extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec); extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); - extern bool gist_safe_restartpoint(void); - extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); - - extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, - IndexTuple *itup, int ituplen, ItemPointer key); ! extern XLogRecData *formSplitRdata(RelFileNode node, ! BlockNumber blkno, bool page_is_leaf, ! ItemPointer key, SplitedPageLayout *dist); ! extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS); --- 294,309 ---- extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec); extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); ! extern XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer, ! OffsetNumber *todelete, int ntodelete, ! IndexTuple *itup, int ituplen, ! Buffer *clearFollowRight, int nclearFollowRight); ! extern XLogRecPtr gistXLogSplit(RelFileNode node, ! BlockNumber blkno, bool page_is_leaf, ! SplitedPageLayout *dist, ! Buffer *clearFollowRight, int nclearFollowRight); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS); *************** *** 357,363 **** extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, extern float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *key1, bool isNull1, GISTENTRY *key2, bool isNull2); ! extern bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull); extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b); extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, --- 355,361 ---- extern float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *key1, bool isNull1, GISTENTRY *key2, bool isNull2); ! extern void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull); extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b); extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,