Thread: Solving hash table overrun problems
We saw a case recently where a hash join was using much more memory than it was supposed to, causing failure when the server ran out of memory. The hash join code is supposed to spill tuples to disk when the hashtable exceeds work_mem, but this failed to save us because the algorithm is not adaptive. What it really does is to divide the hash key space into N batches where N is chosen at query startup based on the planner's estimate of the number of rows to be processed. If that estimate is way too small then an individual batch can be way too large, but the code can't recover by adjusting N after the fact. A somewhat related problem is that the HashAgg code doesn't have a way to spill hashtable entries to disk at all, so it too can blow out memory if the planner's estimate of the number of entries is way off. We've seen reports of that happening, too. Here's what I'm thinking of doing to fix it: * Determine the number of in-memory hash buckets, K, at plan startup. This is dependent on work_mem more than it is on the planner's estimates, so there's no need for it to be adaptive. A tuple with hash value H will go into bucket (H mod K) when it is processed. * Estimate the number of batches N using the planner's estimate. We will always choose N a power of 2. A tuple's batch number is ((H div K) mod N). * Begin loading the hash table from the inner input. Tuples of batch zero go into the hash table, tuples of higher batch numbers go into holding files, one per batch. * If the hash table size exceeds work_mem, double N (creating a bunch of new empty holding files). Scan through the hash table for tuples whose batch number is no longer zero according to the new calculation, and dump them out to the appropriate one of the new holding files. This should get rid of about half of the hash table entries if the hash values are well dispersed. Essentially, we are looking at one more bit of the hash value than we were using before. * Lather, rinse, repeat until inner join input is completely read. * Now begin scanning the outer join input. Tuples of batch number zero (according to the current calculation) can be matched to the current hashtable contents. Tuples of higher batch numbers are dropped into holding files for the outer input, one per batch. * After exhausting the outer input, we still have to match up tuples of corresponding batches. To do this, we clear the in-memory hash table and load it from tuples in the first unprocessed inner batch file. If we had to increase N on-the-fly then it is possible that some of these tuples no longer belong to batch 1, but to some higher batch number --- write such tuples to the proper batch file instead of putting them into the hash table. * If some batches are more heavily populated than others, it is possible that we exceed work_mem here. No problem: we can play the same game of increasing N even at this stage. This works because increasing N can only cause tuples to be reassigned to later batches, never to earlier ones. (Of course, each on-the-fly increase in N means extra writes and reads of tuples that were initially put in the wrong batch, so it's still best to get as good an estimate as we can to start with.) * While reading from an outer batch file, we have to check whether each tuple is still considered to belong to the current batch, and dump it out to the proper later batch file if not. We can use basically the same ideas to fix HashAgg. Here, the aggregate state values play the part of the "inner join tuples", and the incoming data to be aggregated plays the part of the "outer join tuples". When the hash table gets too big, we double the number of batches and dump currently accumulated aggregate states into a per-batch holding file. Incoming data that hashes into the current batch can be accumulated into its aggregate value and discarded. Incoming data that hashes into a later batch is put into a "to-do" file. After we scan all the input, we emit the current aggregate states, load up the hash table from the next batch holding file, and then scan the current to-do file for inputs of the new batch. Note that we'll use only one "to-do" file in each pass, not one per batch. This implies more I/O but it is the only way to preserve the guarantee that incoming values are accumulated into their aggregate in order of arrival. The standard aggregates don't care about that, but user-defined aggregate functions often do. Comments? regards, tom lane
> We saw a case recently where a hash join was using much more memory than > it was supposed to, causing failure when the server ran out of memory. Yes. I had the same problem a few month ago, http://archives.postgresql.org/pgsql-general/2004-09/msg00410.php It turned out that the cost estimates were so way off no matter what tunables were modified, so I never was ever able to execute the query fully. I analyzed the code and devised a solution that was similar what you proposed, though I didn't consider HashAggregates at the time. Unfortunately, I lost all work in a hard drive failure and was never able to get back to working on it, so I can't really refer to my old notes. For what it's worth, your solution looks very reasonable to me. This also brings up a line of thought I had a while ago on a related topic. Something like a "HashDistinct" might be useful, if it had no startup cost. It would basically be a plan node in the executor that would dynamically build a hashtable so that it can pull rows from its child node (discarding if they appear in the hashtable) until it can pass on a novel row. I have some reservations about it, though. At best, in queries with minimal startup cost from the get-go, it would seem to be a tradeoff favoring latency over throughput (assuming the HashDistinct would be a slower operation overall than separate aggregation and distinct operations). Then we have the issue of really big hash tables... I was hoping to get some time in the upcoming months to hash out these issues to see if it's worth it, and if it would be generally useful at all. -Aaron
Aaron Birkland <birkie@gmail.com> writes: > This also brings up a line of thought I had a while ago on a related > topic. Something like a "HashDistinct" might be useful, if it had no > startup cost. It would basically be a plan node in the executor that > would dynamically build a hashtable so that it can pull rows from its > child node (discarding if they appear in the hashtable) until it can > pass on a novel row. I have some reservations about it, though. We already have that: the planner will use a HashAgg node in this fashion in some contexts (I think just as one of the ways to do IN, at the moment). It's not yet bright enough to consider doing it for SELECT DISTINCT. The DISTINCT planning code is old and crufty and pretty tightly interwired with ORDER BY ... it needs work. regards, tom lane
On Thu, Mar 03, 2005 at 17:05:40 -0500, Tom Lane <tgl@sss.pgh.pa.us> wrote: > > * Estimate the number of batches N using the planner's estimate. > We will always choose N a power of 2. A tuple's batch number is > ((H div K) mod N). If K is way low this could be very slow. Is there a way to do something similar changing the hash function to H div KN? If you went down this road you would probably want to use distinct primes for each new N. > * Now begin scanning the outer join input. Tuples of batch number > zero (according to the current calculation) can be matched to the > current hashtable contents. Tuples of higher batch numbers are dropped > into holding files for the outer input, one per batch. For new keys at this step do you know their final disposition so that making new hash entries won't be necessary?
Bruno Wolff III <bruno@wolff.to> writes: > Tom Lane <tgl@sss.pgh.pa.us> wrote: >> * Estimate the number of batches N using the planner's estimate. >> We will always choose N a power of 2. A tuple's batch number is >> ((H div K) mod N). > If K is way low this could be very slow. How so? You're not concerned about the time to do the division itself are you? >> * Now begin scanning the outer join input. Tuples of batch number >> zero (according to the current calculation) can be matched to the >> current hashtable contents. Tuples of higher batch numbers are dropped >> into holding files for the outer input, one per batch. > For new keys at this step do you know their final disposition so that making > new hash entries won't be necessary? Well, we probably have a pretty fair idea of N at this point, so it'll usually be right --- but we reserve the right to increase N again later in case we have to do so because one of the later inner batches is much bigger than the zero batch we are currently processing. regards, tom lane
On Fri, Mar 04, 2005 at 10:42:08 -0500, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Bruno Wolff III <bruno@wolff.to> writes: > > Tom Lane <tgl@sss.pgh.pa.us> wrote: > >> * Estimate the number of batches N using the planner's estimate. > >> We will always choose N a power of 2. A tuple's batch number is > >> ((H div K) mod N). > > > If K is way low this could be very slow. > > How so? You're not concerned about the time to do the division itself > are you? No, rather having lots of entries in the same hash buckets. I was thinking about recent discussions were there was a large number of rows with almost all of the keys having just a few values, but there are a lot of unique keys, but analyze doesn't see enough of the unique ones to make a good estimate for K. > >> * Now begin scanning the outer join input. Tuples of batch number > >> zero (according to the current calculation) can be matched to the > >> current hashtable contents. Tuples of higher batch numbers are dropped > >> into holding files for the outer input, one per batch. > > > For new keys at this step do you know their final disposition so that making > > new hash entries won't be necessary? > > Well, we probably have a pretty fair idea of N at this point, so it'll > usually be right --- but we reserve the right to increase N again later > in case we have to do so because one of the later inner batches is much > bigger than the zero batch we are currently processing. I just noticed that it wasn't mentioned that an overflow could occur at this step. I didn't think it would be hard to do one if needed, but was wondering if knowing a key couldn't match (because it was in the current batch 0 and didn't match and existing key in that batch) was enough to emit or discard the row.
Bruno Wolff III <bruno@wolff.to> writes: > Tom Lane <tgl@sss.pgh.pa.us> wrote: >> Bruno Wolff III <bruno@wolff.to> writes: >>> If K is way low this could be very slow. >> >> How so? You're not concerned about the time to do the division itself >> are you? > No, rather having lots of entries in the same hash buckets. That won't happen because we are going to set K with an eye to the maximum number of rows we intend to hold in memory (given work_mem). With the addition of the dynamic batch splitting logic, that number of rows is actually reasonably accurate. The only way this scheme can really lose badly is if there are large numbers of tuples with exactly the same hash code, so that no matter how much we increase N we can't split up the bucketload. This is a risk for *any* hashing scheme, however. In practice we have to rely on the planner to not choose hashing when there are only a few distinct values for the key. > I just noticed that it wasn't mentioned that an overflow could occur at this > step. It can't, because we aren't loading the outer tuples into the hash table. We are just considering them one at a time and probing for matches. regards, tom lane
> > This also brings up a line of thought I had a while ago on a related > > topic. Something like a "HashDistinct" might be useful, if it had no > > startup cost. > We already have that: the planner will use a HashAgg node in this > fashion in some contexts (I think just as one of the ways to do IN, > at the moment). Hmm.. I see HashAggregate being used that way in the IN queries, but I have not observed it used in a way that incurrs no startup cost. It looked to me that in doing hash aggregation in ExecAgg (nodeAgg.c), agg_fill_hash_table() would have to be called, which iterate through every output of the child plan building the hash table before it returns, thus incurring at least the startup cost of executing the entire subplan of the child node at the aggregation stage. I'm not too familiar with the code, so there is probably something I'm missing somewhere :( > It's not yet bright enough to consider doing it for > SELECT DISTINCT. The DISTINCT planning code is old and crufty and > pretty tightly interwired with ORDER BY ... it needs work. Yes, SELECT DISTINCT was my motivating example, though in my specific application latency (i.e. not having to wait for the entire query below the DISTINCT operation to finish) was also an important factor, hence my thoughts on a zero startup cost hash aggregation and wondering if it would really be any kind of win in the end. -Aaron