Tom Lane wrote:
> While testing it I realized that there seems to be a nearby bug in
> _bt_findsplitloc: it fails to consider the possibility of moving all the
> extant items to the left side. It will always return a firstright <=
> maxoff. ISTM this would mean that it could choose a bad split if the
> incoming item goes at the end and both it and the last extant item are
> large: in this case they should be split apart, but they won't be.
>
> Heikki, do you feel like looking at that, or shall I?
I refactored findsplitloc and checksplitloc so that the division of
labor is more clear IMO. I pushed all the space calculation inside the
loop to checksplitloc.
I also fixed the off by 4 in free space calculation caused by
PageGetFreeSpace subtracting sizeof(ItemIdData), even though it was
harmless, because it was distracting and I felt it might come back to
bite us in the future if we change the page layout or alignments.
There's now a new function PageGetExactFreeSpace that doesn't do the
subtraction.
findsplitloc now tries the "just the new item to right page" split as
well. If people don't like the refactoring, I can write a patch to just
add that.
Some of the long variable names look ugly. camelCase or underscores
between words would be more readable, but I retained the old style for
the sake of consistency.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
Index: src/backend/access/nbtree/nbtinsert.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtinsert.c,v
retrieving revision 1.150
diff -c -r1.150 nbtinsert.c
*** src/backend/access/nbtree/nbtinsert.c 8 Feb 2007 05:05:53 -0000 1.150
--- src/backend/access/nbtree/nbtinsert.c 8 Feb 2007 13:24:50 -0000
***************
*** 29,34 ****
--- 29,38 ----
int fillfactor; /* needed when splitting rightmost page */
bool is_leaf; /* T if splitting a leaf page */
bool is_rightmost; /* T if splitting a rightmost page */
+ OffsetNumber newitemoff; /* where the new item is to be inserted */
+ int leftspace; /* space available for items on left page */
+ int rightspace; /* space available for items on right page */
+ int olddataitemstotal; /* space taken by old items */
bool have_split; /* found a valid split? */
***************
*** 57,65 ****
OffsetNumber newitemoff,
Size newitemsz,
bool *newitemonleft);
! static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
! int leftfree, int rightfree,
! bool newitemonleft, Size firstrightitemsz);
static void _bt_pgaddtup(Relation rel, Page page,
Size itemsize, IndexTuple itup,
OffsetNumber itup_off, const char *where);
--- 61,69 ----
OffsetNumber newitemoff,
Size newitemsz,
bool *newitemonleft);
! static void _bt_checksplitloc(FindSplitData *state,
! OffsetNumber firstoldonright, bool newitemonleft,
! int dataitemstoleft, Size firstoldonrightsz);
static void _bt_pgaddtup(Relation rel, Page page,
Size itemsize, IndexTuple itup,
OffsetNumber itup_off, const char *where);
***************
*** 1101,1113 ****
int leftspace,
rightspace,
goodenough,
! dataitemtotal,
! dataitemstoleft;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
newitemsz += sizeof(ItemIdData);
state.newitemsz = newitemsz;
state.is_leaf = P_ISLEAF(opaque);
state.is_rightmost = P_RIGHTMOST(opaque);
--- 1105,1135 ----
int leftspace,
rightspace,
goodenough,
! olddataitemstotal,
! olddataitemstoleft;
! bool goodenoughfound;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
newitemsz += sizeof(ItemIdData);
+
+ /* Total free space available on a btree page, after fixed overhead */
+ leftspace = rightspace =
+ PageGetPageSize(page) - SizeOfPageHeaderData -
+ MAXALIGN(sizeof(BTPageOpaqueData));
+
+ /* The right page will have the same high key as the old page */
+ if (!P_RIGHTMOST(opaque))
+ {
+ itemid = PageGetItemId(page, P_HIKEY);
+ rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
+ sizeof(ItemIdData));
+ }
+
+ /* Count up total space in data items without actually scanning 'em */
+ olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
+
state.newitemsz = newitemsz;
state.is_leaf = P_ISLEAF(opaque);
state.is_rightmost = P_RIGHTMOST(opaque);
***************
*** 1120,1130 ****
state.newitemonleft = false; /* these just to keep compiler quiet */
state.firstright = 0;
state.best_delta = 0;
!
! /* Total free space available on a btree page, after fixed overhead */
! leftspace = rightspace =
! PageGetPageSize(page) - SizeOfPageHeaderData -
! MAXALIGN(sizeof(BTPageOpaqueData));
/*
* Finding the best possible split would require checking all the possible
--- 1142,1151 ----
state.newitemonleft = false; /* these just to keep compiler quiet */
state.firstright = 0;
state.best_delta = 0;
! state.leftspace = leftspace;
! state.rightspace = rightspace;
! state.olddataitemstotal = olddataitemstotal;
! state.newitemoff = newitemoff;
/*
* Finding the best possible split would require checking all the possible
***************
*** 1137,1210 ****
*/
goodenough = leftspace / 16;
- /* The right page will have the same high key as the old page */
- if (!P_RIGHTMOST(opaque))
- {
- itemid = PageGetItemId(page, P_HIKEY);
- rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
- sizeof(ItemIdData));
- }
-
- /* Count up total space in data items without actually scanning 'em */
- dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
-
/*
* Scan through the data items and calculate space usage for a split at
* each possible position.
*/
! dataitemstoleft = 0;
maxoff = PageGetMaxOffsetNumber(page);
!
for (offnum = P_FIRSTDATAKEY(opaque);
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
Size itemsz;
- int leftfree,
- rightfree;
itemid = PageGetItemId(page, offnum);
itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
/*
- * We have to allow for the current item becoming the high key of the
- * left page; therefore it counts against left space as well as right
- * space.
- */
- leftfree = leftspace - dataitemstoleft - (int) itemsz;
- rightfree = rightspace - (dataitemtotal - dataitemstoleft);
-
- /*
* Will the new item go to left or right of split?
*/
if (offnum > newitemoff)
! _bt_checksplitloc(&state, offnum, leftfree, rightfree,
! true, itemsz);
else if (offnum < newitemoff)
! _bt_checksplitloc(&state, offnum, leftfree, rightfree,
! false, itemsz);
else
{
/* need to try it both ways! */
! _bt_checksplitloc(&state, offnum, leftfree, rightfree,
! true, itemsz);
! /*
! * Here we are contemplating newitem as first on right. In this
! * case it, not the current item, will become the high key of the
! * left page, and so we have to correct the allowance made above.
! */
! leftfree += (int) itemsz - (int) newitemsz;
! _bt_checksplitloc(&state, offnum, leftfree, rightfree,
! false, newitemsz);
}
/* Abort scan once we find a good-enough choice */
if (state.have_split && state.best_delta <= goodenough)
break;
! dataitemstoleft += itemsz;
}
/*
* I believe it is not possible to fail to find a feasible split, but just
* in case ...
--- 1158,1217 ----
*/
goodenough = leftspace / 16;
/*
* Scan through the data items and calculate space usage for a split at
* each possible position.
*/
! olddataitemstoleft = 0;
! goodenoughfound = false;
maxoff = PageGetMaxOffsetNumber(page);
!
for (offnum = P_FIRSTDATAKEY(opaque);
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
Size itemsz;
itemid = PageGetItemId(page, offnum);
itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
/*
* Will the new item go to left or right of split?
*/
if (offnum > newitemoff)
! _bt_checksplitloc(&state, offnum, true,
! olddataitemstoleft, itemsz);
!
else if (offnum < newitemoff)
! _bt_checksplitloc(&state, offnum, false,
! olddataitemstoleft, itemsz);
else
{
/* need to try it both ways! */
! _bt_checksplitloc(&state, offnum, true,
! olddataitemstoleft, itemsz);
!
! _bt_checksplitloc(&state, offnum, false,
! olddataitemstoleft, itemsz);
}
/* Abort scan once we find a good-enough choice */
if (state.have_split && state.best_delta <= goodenough)
+ {
+ goodenoughfound = true;
break;
+ }
! olddataitemstoleft += itemsz;
}
+ /* If the new item goes as the last item, check for splitting so that
+ * all the old items go to the left page and the new item goes to the
+ * right page.
+ */
+ if (newitemoff > maxoff && !goodenoughfound)
+ _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
+
/*
* I believe it is not possible to fail to find a feasible split, but just
* in case ...
***************
*** 1220,1234 ****
/*
* Subroutine to analyze a particular possible split choice (ie, firstright
* and newitemonleft settings), and record the best split so far in *state.
*/
static void
! _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
! int leftfree, int rightfree,
! bool newitemonleft, Size firstrightitemsz)
{
/*
! * Account for the new item on whichever side it is to be put.
*/
if (newitemonleft)
leftfree -= (int) state->newitemsz;
else
--- 1227,1276 ----
/*
* Subroutine to analyze a particular possible split choice (ie, firstright
* and newitemonleft settings), and record the best split so far in *state.
+ *
+ * firstoldonright is the offset of the first item on the original page
+ * that goes to the right page, and firstoldonrightsz is the size of that
+ * tuple. firstoldonright can be > max offset, which means that all the old
+ * items go to the left page and only the new item goes to the right page.
+ * In that case, firstoldonrightsz is not used.
+ *
+ * olddataitemstoleft is the total size of all old items to the left of
+ * firstoldonright.
*/
static void
! _bt_checksplitloc(FindSplitData *state,
! OffsetNumber firstoldonright,
! bool newitemonleft,
! int olddataitemstoleft,
! Size firstoldonrightsz)
{
+ int leftfree,
+ rightfree;
+ Size firstrightitemsz;
+ bool newitemisfirstonright;
+
+ /* Is the new item going to be the first item on the right page? */
+ newitemisfirstonright = (firstoldonright == state->newitemoff
+ && !newitemonleft);
+
+ if(newitemisfirstonright)
+ firstrightitemsz = state->newitemsz;
+ else
+ firstrightitemsz = firstoldonrightsz;
+
+ /* Account for all the old tuples */
+ leftfree = state->leftspace - olddataitemstoleft;
+ rightfree = state->rightspace -
+ (state->olddataitemstotal - olddataitemstoleft);
+
/*
! * The first item on the right page becomes the high key of the
! * left page; therefore it counts against left space as well as right
! * space.
*/
+ leftfree -= firstrightitemsz;
+
+ /* account for the new item */
if (newitemonleft)
leftfree -= (int) state->newitemsz;
else
***************
*** 1270,1276 ****
{
state->have_split = true;
state->newitemonleft = newitemonleft;
! state->firstright = firstright;
state->best_delta = delta;
}
}
--- 1312,1318 ----
{
state->have_split = true;
state->newitemonleft = newitemonleft;
! state->firstright = firstoldonright;
state->best_delta = delta;
}
}
Index: src/backend/storage/page/bufpage.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/storage/page/bufpage.c,v
retrieving revision 1.70
diff -c -r1.70 bufpage.c
*** src/backend/storage/page/bufpage.c 5 Jan 2007 22:19:38 -0000 1.70
--- src/backend/storage/page/bufpage.c 8 Feb 2007 13:02:11 -0000
***************
*** 418,424 ****
/*
* PageGetFreeSpace
! * Returns the size of the free (allocatable) space on a page.
*/
Size
PageGetFreeSpace(Page page)
--- 418,425 ----
/*
* PageGetFreeSpace
! * Returns the size of the free (allocatable) space on a page,
! * deducted by the space needed for a new line pointer.
*/
Size
PageGetFreeSpace(Page page)
***************
*** 434,440 ****
if (space < (int) sizeof(ItemIdData))
return 0;
! space -= sizeof(ItemIdData); /* XXX not always appropriate */
return (Size) space;
}
--- 435,460 ----
if (space < (int) sizeof(ItemIdData))
return 0;
! space -= sizeof(ItemIdData);
!
! return (Size) space;
! }
!
! /*
! * PageGetExactFreeSpace
! * Returns the size of the free (allocatable) space on a page.
! */
! Size
! PageGetExactFreeSpace(Page page)
! {
! int space;
!
! /*
! * Use signed arithmetic here so that we behave sensibly if pd_lower >
! * pd_upper.
! */
! space = (int) ((PageHeader) page)->pd_upper -
! (int) ((PageHeader) page)->pd_lower;
return (Size) space;
}
Index: src/include/storage/bufpage.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/storage/bufpage.h,v
retrieving revision 1.69
diff -c -r1.69 bufpage.h
*** src/include/storage/bufpage.h 5 Jan 2007 22:19:57 -0000 1.69
--- src/include/storage/bufpage.h 7 Feb 2007 13:13:42 -0000
***************
*** 321,326 ****
--- 321,327 ----
extern void PageRestoreTempPage(Page tempPage, Page oldPage);
extern int PageRepairFragmentation(Page page, OffsetNumber *unused);
extern Size PageGetFreeSpace(Page page);
+ extern Size PageGetExactFreeSpace(Page page);
extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);