diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 10b19c0..287be40 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -442,6 +442,7 @@ struct Tuplesortstate elog(ERROR, "unexpected end of data"); \ } while(0) +#define HEAP_ANCESTOR(slot, levels) ((((slot)+1)>>(levels))-1) static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess); static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); @@ -2545,28 +2546,75 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex) SortTuple *memtuples = state->memtuples; SortTuple *tuple; int i, - n; - + n, + levels = 0, + ll, + hl; + if (--state->memtupcount <= 0) return; - n = state->memtupcount; - tuple = &memtuples[n]; /* tuple that must be reinserted */ - i = 0; /* i is where the "hole" is */ - for (;;) + tuple = &memtuples[state->memtupcount]; /* tuple that must be reinserted */ + n = (state->memtupcount - 1) >> 1; /* first tuple without two children */ + + /* Descend heap, following lesser child at each level. */ + i = 0; + while (i < n) { int j = 2 * i + 1; - if (j >= n) - break; - if (j + 1 < n && - HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0) + if (HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0) j++; - if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0) - break; - memtuples[i] = memtuples[j]; i = j; + ++levels; + } + + /* + * If the heap is of even size, we might have landed on the tuple that has + * just a single child. In that case, we don't need to compare the children + * to determine which is smaller, but we do need to descend to the child. + */ + if (i == n && (state->memtupcount & 1) == 0) + { + i = 2 * i + 1; + ++levels; + } + Assert(i < state->memtupcount); + Assert(HEAP_ANCESTOR(i, levels) == 0); + + /* + * Binary search path of least children to find the insert position. + * + * From the heap property, we know that the path of least children is + * sorted; that is, memtuples[i] is greater than its parent, which in + * turn is greater than its own parent, and so on. We'd like to find the + * smallest number of levels we must travel up the heap to find a tuple + * less than or equal to the one we're reinserting; if there is no + * such shift, then we reinsert at the top of the heap. + */ + ll = 0; + hl = levels; + while (ll < hl) + { + int ml = (ll + hl) / 2; + + if (HEAPCOMPARE(tuple, &memtuples[HEAP_ANCESTOR(i, ml)]) > 0) + hl = ml; + else + ll = ml + 1; + } + Assert(ll == hl && ll <= levels); + + /* + * Move the hole down the tree until it reaches the correct position, and + * then stick the re-inserted tuple into it. + */ + while (levels > ll) + { + memtuples[HEAP_ANCESTOR(i, levels)] = + memtuples[HEAP_ANCESTOR(i, levels - 1)]; + --levels; } - memtuples[i] = *tuple; + memtuples[HEAP_ANCESTOR(i, ll)] = *tuple; }