Thread: Solving hash table overrun problems

Solving hash table overrun problems

From
Tom Lane
Date:
We saw a case recently where a hash join was using much more memory than
it was supposed to, causing failure when the server ran out of memory.
The hash join code is supposed to spill tuples to disk when the
hashtable exceeds work_mem, but this failed to save us because the
algorithm is not adaptive.  What it really does is to divide the hash
key space into N batches where N is chosen at query startup based on the
planner's estimate of the number of rows to be processed.  If that
estimate is way too small then an individual batch can be way too large,
but the code can't recover by adjusting N after the fact.

A somewhat related problem is that the HashAgg code doesn't have a way
to spill hashtable entries to disk at all, so it too can blow out memory
if the planner's estimate of the number of entries is way off.  We've
seen reports of that happening, too.

Here's what I'm thinking of doing to fix it:

* Determine the number of in-memory hash buckets, K, at plan startup.
This is dependent on work_mem more than it is on the planner's
estimates, so there's no need for it to be adaptive.  A tuple with
hash value H will go into bucket (H mod K) when it is processed.

* Estimate the number of batches N using the planner's estimate.
We will always choose N a power of 2.  A tuple's batch number is
((H div K) mod N).

* Begin loading the hash table from the inner input.  Tuples of batch
zero go into the hash table, tuples of higher batch numbers go into
holding files, one per batch.

* If the hash table size exceeds work_mem, double N (creating a bunch
of new empty holding files).  Scan through the hash table for tuples
whose batch number is no longer zero according to the new calculation,
and dump them out to the appropriate one of the new holding files.
This should get rid of about half of the hash table entries if the
hash values are well dispersed.  Essentially, we are looking at one
more bit of the hash value than we were using before.

* Lather, rinse, repeat until inner join input is completely read.

* Now begin scanning the outer join input.  Tuples of batch number
zero (according to the current calculation) can be matched to the
current hashtable contents.  Tuples of higher batch numbers are dropped
into holding files for the outer input, one per batch.

* After exhausting the outer input, we still have to match up tuples
of corresponding batches.  To do this, we clear the in-memory hash table
and load it from tuples in the first unprocessed inner batch file.
If we had to increase N on-the-fly then it is possible that some of
these tuples no longer belong to batch 1, but to some higher batch
number --- write such tuples to the proper batch file instead of putting
them into the hash table.

* If some batches are more heavily populated than others, it is possible
that we exceed work_mem here.  No problem: we can play the same game of
increasing N even at this stage.  This works because increasing N can
only cause tuples to be reassigned to later batches, never to earlier
ones.  (Of course, each on-the-fly increase in N means extra writes and
reads of tuples that were initially put in the wrong batch, so it's
still best to get as good an estimate as we can to start with.)

* While reading from an outer batch file, we have to check whether each
tuple is still considered to belong to the current batch, and dump it
out to the proper later batch file if not.

We can use basically the same ideas to fix HashAgg.  Here, the aggregate
state values play the part of the "inner join tuples", and the incoming
data to be aggregated plays the part of the "outer join tuples".  When
the hash table gets too big, we double the number of batches and dump
currently accumulated aggregate states into a per-batch holding file.
Incoming data that hashes into the current batch can be accumulated into
its aggregate value and discarded.  Incoming data that hashes into a
later batch is put into a "to-do" file.  After we scan all the input, we
emit the current aggregate states, load up the hash table from
the next batch holding file, and then scan the current to-do file for
inputs of the new batch.  Note that we'll use only one "to-do" file in
each pass, not one per batch.  This implies more I/O but it is the only
way to preserve the guarantee that incoming values are accumulated into
their aggregate in order of arrival.  The standard aggregates don't care
about that, but user-defined aggregate functions often do.

Comments?
        regards, tom lane


Re: Solving hash table overrun problems

From
Aaron Birkland
Date:
> We saw a case recently where a hash join was using much more memory than
> it was supposed to, causing failure when the server ran out of memory.

Yes.  I had the same problem a few month ago,
http://archives.postgresql.org/pgsql-general/2004-09/msg00410.php

It turned out that the cost estimates were so way off no matter what
tunables were modified, so I never was ever able to execute the query
fully.  I analyzed the code and devised a solution that was similar
what you proposed, though I didn't consider HashAggregates
at the time.  Unfortunately, I lost all work in a hard drive failure
and was never able to get back to working on it, so I can't really
refer to my old notes.  For what it's worth, your solution looks very
reasonable to me.

This also brings up a line of thought I had a while ago on a related
topic.  Something like a "HashDistinct" might be useful, if it had no
startup cost.  It would basically be a plan node in the executor that
would dynamically build a hashtable so that it can pull rows from its
child node (discarding if they appear in the hashtable) until it can
pass on a novel row.   I have some reservations about it, though.  At
best,
in queries with minimal startup cost from the get-go, it would seem to
be a tradeoff favoring latency over throughput (assuming the
HashDistinct would be a slower operation overall than separate
aggregation and distinct operations).   Then we have the issue of
really big hash tables...  I was hoping to get some time in the
upcoming months to hash out these issues to see if it's worth it, and
if it would be generally useful at all.
  -Aaron


Re: Solving hash table overrun problems

From
Tom Lane
Date:
Aaron Birkland <birkie@gmail.com> writes:
> This also brings up a line of thought I had a while ago on a related
> topic.  Something like a "HashDistinct" might be useful, if it had no
> startup cost.  It would basically be a plan node in the executor that
> would dynamically build a hashtable so that it can pull rows from its
> child node (discarding if they appear in the hashtable) until it can
> pass on a novel row.   I have some reservations about it, though.

We already have that: the planner will use a HashAgg node in this
fashion in some contexts (I think just as one of the ways to do IN,
at the moment).  It's not yet bright enough to consider doing it for
SELECT DISTINCT.  The DISTINCT planning code is old and crufty and
pretty tightly interwired with ORDER BY ... it needs work.
        regards, tom lane


Re: Solving hash table overrun problems

From
Bruno Wolff III
Date:
On Thu, Mar 03, 2005 at 17:05:40 -0500, Tom Lane <tgl@sss.pgh.pa.us> wrote:
> 
> * Estimate the number of batches N using the planner's estimate.
> We will always choose N a power of 2.  A tuple's batch number is
> ((H div K) mod N).

If K is way low this could be very slow. Is there a way to do something
similar changing the hash function to H div KN? If you went down this
road you would probably want to use distinct primes for each new N.

> * Now begin scanning the outer join input.  Tuples of batch number
> zero (according to the current calculation) can be matched to the
> current hashtable contents.  Tuples of higher batch numbers are dropped
> into holding files for the outer input, one per batch.

For new keys at this step do you know their final disposition so that making
new hash entries won't be necessary?


Re: Solving hash table overrun problems

From
Tom Lane
Date:
Bruno Wolff III <bruno@wolff.to> writes:
>   Tom Lane <tgl@sss.pgh.pa.us> wrote:
>> * Estimate the number of batches N using the planner's estimate.
>> We will always choose N a power of 2.  A tuple's batch number is
>> ((H div K) mod N).

> If K is way low this could be very slow.

How so?  You're not concerned about the time to do the division itself
are you?

>> * Now begin scanning the outer join input.  Tuples of batch number
>> zero (according to the current calculation) can be matched to the
>> current hashtable contents.  Tuples of higher batch numbers are dropped
>> into holding files for the outer input, one per batch.

> For new keys at this step do you know their final disposition so that making
> new hash entries won't be necessary?

Well, we probably have a pretty fair idea of N at this point, so it'll
usually be right --- but we reserve the right to increase N again later
in case we have to do so because one of the later inner batches is much
bigger than the zero batch we are currently processing.
        regards, tom lane


Re: Solving hash table overrun problems

From
Bruno Wolff III
Date:
On Fri, Mar 04, 2005 at 10:42:08 -0500, Tom Lane <tgl@sss.pgh.pa.us> wrote:
> Bruno Wolff III <bruno@wolff.to> writes:
> >   Tom Lane <tgl@sss.pgh.pa.us> wrote:
> >> * Estimate the number of batches N using the planner's estimate.
> >> We will always choose N a power of 2.  A tuple's batch number is
> >> ((H div K) mod N).
> 
> > If K is way low this could be very slow.
> 
> How so?  You're not concerned about the time to do the division itself
> are you?

No, rather having lots of entries in the same hash buckets. I was thinking
about recent discussions were there was a large number of rows with almost
all of the keys having just a few values, but there are a lot of unique
keys, but analyze doesn't see enough of the unique ones to make a good
estimate for K.

> >> * Now begin scanning the outer join input.  Tuples of batch number
> >> zero (according to the current calculation) can be matched to the
> >> current hashtable contents.  Tuples of higher batch numbers are dropped
> >> into holding files for the outer input, one per batch.
> 
> > For new keys at this step do you know their final disposition so that making
> > new hash entries won't be necessary?
> 
> Well, we probably have a pretty fair idea of N at this point, so it'll
> usually be right --- but we reserve the right to increase N again later
> in case we have to do so because one of the later inner batches is much
> bigger than the zero batch we are currently processing.

I just noticed that it wasn't mentioned that an overflow could occur at this
step. I didn't think it would be hard to do one if needed, but was wondering
if knowing a key couldn't match (because it was in the current batch 0 and
didn't match and existing key in that batch) was enough to emit or discard
the row.


Re: Solving hash table overrun problems

From
Tom Lane
Date:
Bruno Wolff III <bruno@wolff.to> writes:
> Tom Lane <tgl@sss.pgh.pa.us> wrote:
>> Bruno Wolff III <bruno@wolff.to> writes:
>>> If K is way low this could be very slow.
>> 
>> How so?  You're not concerned about the time to do the division itself
>> are you?

> No, rather having lots of entries in the same hash buckets.

That won't happen because we are going to set K with an eye to the
maximum number of rows we intend to hold in memory (given work_mem).
With the addition of the dynamic batch splitting logic, that number
of rows is actually reasonably accurate.

The only way this scheme can really lose badly is if there are large
numbers of tuples with exactly the same hash code, so that no matter how
much we increase N we can't split up the bucketload.  This is a risk for
*any* hashing scheme, however.  In practice we have to rely on the
planner to not choose hashing when there are only a few distinct values
for the key.

> I just noticed that it wasn't mentioned that an overflow could occur at this
> step.

It can't, because we aren't loading the outer tuples into the hash
table.  We are just considering them one at a time and probing for
matches.
        regards, tom lane


Re: Solving hash table overrun problems

From
Aaron Birkland
Date:
> > This also brings up a line of thought I had a while ago on a related
> > topic.  Something like a "HashDistinct" might be useful, if it had no
> > startup cost.  

> We already have that: the planner will use a HashAgg node in this
> fashion in some contexts (I think just as one of the ways to do IN,
> at the moment).  

Hmm.. I see HashAggregate being used that way in the IN queries, but I
have not observed it used in a way that incurrs no startup cost.  It
looked to me that in doing hash aggregation in ExecAgg (nodeAgg.c),
agg_fill_hash_table() would have to be called, which iterate through
every output of the child plan building the hash table before it
returns, thus incurring at least the startup cost of executing the
entire subplan of the child node at the aggregation stage.  I'm not
too familiar with the code, so there is probably something I'm missing
somewhere :(

> It's not yet bright enough to consider doing it for
> SELECT DISTINCT.  The DISTINCT planning code is old and crufty and
> pretty tightly interwired with ORDER BY ... it needs work.

Yes, SELECT DISTINCT was my motivating example, though in my specific
application latency (i.e. not having to wait for the entire query
below the DISTINCT operation to finish) was also an important factor,
hence my thoughts on a zero startup cost hash aggregation and
wondering if it would really be any kind of win in the end.
 -Aaron