Thread: 9.5: Memory-bounded HashAgg

9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
This patch is requires the Memory Accounting patch, or something similar
to track memory usage.

The attached patch enables hashagg to spill to disk, which means that
hashagg will contain itself to work_mem even if the planner makes a
bad misestimate of the cardinality.

This is a well-known concept; there's even a Berkeley homework
assignment floating around to implement it -- in postgres 7.2, no
less. I didn't take the exact same approach as the homework assignment
suggests, but it's not much different, either. My apologies if some
classes are still using this as a homework assignment, but postgres
needs to eventually have an answer to this problem.

Included is a GUC, "enable_hashagg_disk" (default on), which allows
the planner to choose hashagg even if it doesn't expect the hashtable
to fit in memory. If it's off, and the planner misestimates the
cardinality, hashagg will still use the disk to contain itself to
work_mem.

One situation that might surprise the user is if work_mem is set too
low, and the user is *relying* on a misestimate to pick hashagg. With
this patch, it would end up going to disk, which might be
significantly slower. The solution for the user is to increase
work_mem.

Rough Design:

Change the hash aggregate algorithm to accept a generic "work item",
which consists of an input file as well as some other bookkeeping
information.

Initially prime the algorithm by adding a single work item where the
file is NULL, indicating that it should read from the outer plan.

If the memory is exhausted during execution of a work item, then
continue to allow existing groups to be aggregated, but do not allow new
groups to be created in the hash table. Tuples representing new groups
are saved in an output partition file referenced in the work item that
is currently being executed.

When the work item is done, emit any groups in the hash table, clear the
hash table, and turn each output partition file into a new work item.

Each time through at least some groups are able to stay in the hash
table, so eventually none will need to be saved in output partitions, no
new work items will be created, and the algorithm will terminate. This
is true even if the number of output partitions is always one.

Open items:
   * costing
   * EXPLAIN details for disk usage
   * choose number of partitions intelligently
   * performance testing

Initial tests indicate that it can be competitive with sort+groupagg
when the disk is involved, but more testing is required.

Feedback welcome.

Regards,
    Jeff Davis

Attachment

Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
Hi,

it's 1AM here, so only a few comments after quickly reading the patch.

On 10.8.2014 23:26, Jeff Davis wrote:
> This patch is requires the Memory Accounting patch, or something
> similar to track memory usage.

I think the patch you sent actually includes the accounting patch. Is
that on purpose, or by accident?

I'd suggest keeping these two patches separate.


> Rough Design:
> 
> Change the hash aggregate algorithm to accept a generic "work item", 
> which consists of an input file as well as some other bookkeeping 
> information.
> 
> Initially prime the algorithm by adding a single work item where the 
> file is NULL, indicating that it should read from the outer plan.
> 
> If the memory is exhausted during execution of a work item, then 
> continue to allow existing groups to be aggregated, but do not allow
> new groups to be created in the hash table. Tuples representing new
> groups are saved in an output partition file referenced in the work
> item that is currently being executed.
> 
> When the work item is done, emit any groups in the hash table, clear
> the hash table, and turn each output partition file into a new work
> item.
> 
> Each time through at least some groups are able to stay in the hash 
> table, so eventually none will need to be saved in output
> partitions, no new work items will be created, and the algorithm will
> terminate. This is true even if the number of output partitions is
> always one.

So once a group gets into memory, it stays there? That's going to work
fine for aggregates with fixed-size state (int4, or generally state that
gets allocated and does not grow), but I'm afraid for aggregates with
growing state (as for example array_agg and similar) that's not really a
solution.

How difficult would it be to dump the current state into a file (and
remove them from the hash table)?

While hacking on the hash join, I envisioned the hash aggregate might
work in a very similar manner, i.e. something like this:
 * nbatches=1, nbits=0 * when work_mem gets full => nbatches *= 2, nbits += 1 * get rid of half the groups, using nbits
fromthe hash    => dump the current states into 'states.batchno' file    => dump further tuples to 'tuples.batchno'
file* continue until the end, or until work_mem gets full again
 

This is pretty much what the hashjoin does, except that the join needs
to batch the outer relation too (which hashagg does not need to do).
Otherwise most of the batching logic can be copied.

It also seems to me that the logic of the patch is about this:
 * try to lookup the group in the hash table   * found => call the transition function   * not found       * enough
space=> call transition function       * not enough space => tuple/group goes to a batch
 

Which pretty much means all tuples need to do the lookup first. The nice
thing on the hash-join approach is that you don't really need to do the
lookup - you just need to peek at the hash whether the group belongs to
the current batch (and if not, to which batch it should go).

Of course, that would require the ability to dump the current state of
the group, but for the aggregates using basic types as a state
(int4/int8, ...) with fixed-length state that's trivial.

For aggregates using 'internal' to pass pointers that requires some help
from the author - serialization/deserialization functions.

> Open items:
>    * costing

Not sure how this is done for the hash-join, but I guess that might be a
good place for inspiration.

>    * EXPLAIN details for disk usage
>    * choose number of partitions intelligently

What is the purpose of HASH_DISK_MAX_PARTITIONS? I mean, when we decide
we need 2048 partitions, why should we use less if we believe it will
get us over work_mem?

>    * performance testing
> 
> Initial tests indicate that it can be competitive with sort+groupagg
> when the disk is involved, but more testing is required.

For us, removing the sort is a big deal, because we're working with
>100M rows regularly. It's more complicated though, because the sort is
usually enforced by COUNT(DISTINCT) and that's not going to disappear
because of this patch. But that's solvable with a custom aggregate.

Tomas



Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Mon, 2014-08-11 at 01:29 +0200, Tomas Vondra wrote:
> On 10.8.2014 23:26, Jeff Davis wrote:
> > This patch is requires the Memory Accounting patch, or something
> > similar to track memory usage.
> 
> I think the patch you sent actually includes the accounting patch. Is
> that on purpose, or by accident?

Accident, thank you.

> So once a group gets into memory, it stays there? That's going to work
> fine for aggregates with fixed-size state (int4, or generally state that
> gets allocated and does not grow), but I'm afraid for aggregates with
> growing state (as for example array_agg and similar) that's not really a
> solution.

I agree in theory, but for now I'm just not handling that case at all
because there is other work that needs to be done first. For one thing,
we would need a way to save the transition state, and we don't really
have that. In the case of array_agg, the state is not serialized and
there's no generic way to ask it to serialize itself without finalizing.

I'm open to ideas. Do you think my patch is going generally in the right
direction, and we can address this problem later; or do you think we
need a different approach entirely?

> While hacking on the hash join, I envisioned the hash aggregate might
> work in a very similar manner, i.e. something like this:
> 
>   * nbatches=1, nbits=0
>   * when work_mem gets full => nbatches *= 2, nbits += 1
>   * get rid of half the groups, using nbits from the hash
>      => dump the current states into 'states.batchno' file
>      => dump further tuples to 'tuples.batchno' file
>   * continue until the end, or until work_mem gets full again

It would get a little messy with HashAgg. Hashjoin is dealing entirely
with tuples; HashAgg deals with tuples and groups.

Also, if the transition state is fixed-size (or even nearly so), it
makes no sense to remove groups from the hash table before they are
finished. We'd need to detect that somehow, and it seems almost like two
different algorithms (though maybe not a bad idea to use a different
algorithm for things like array_agg).

Not saying that it can't be done, but (unless you have an idea) requires
quite a bit more work than what I did here.

> It also seems to me that the logic of the patch is about this:
> 
>   * try to lookup the group in the hash table
>     * found => call the transition function
>     * not found
>         * enough space => call transition function
>         * not enough space => tuple/group goes to a batch
> 
> Which pretty much means all tuples need to do the lookup first. The nice
> thing on the hash-join approach is that you don't really need to do the
> lookup - you just need to peek at the hash whether the group belongs to
> the current batch (and if not, to which batch it should go).

That's an interesting point. I suspect that, in practice, the cost of
hashing the tuple is more expensive (or at least not much cheaper than)
doing a failed lookup.

> For aggregates using 'internal' to pass pointers that requires some help
> from the author - serialization/deserialization functions.

Ah, yes, this is what I was referring to earlier.

> >    * EXPLAIN details for disk usage
> >    * choose number of partitions intelligently
> 
> What is the purpose of HASH_DISK_MAX_PARTITIONS? I mean, when we decide
> we need 2048 partitions, why should we use less if we believe it will
> get us over work_mem?

Because I suspect there are costs in having an extra file around that
I'm not accounting for directly. We are implicitly assuming that the OS
will keep around enough buffers for each BufFile to do sequential writes
when needed. If we create a zillion partitions, then either we end up
with random I/O or we push the memory burden into the OS buffer cache.

We could try to model those costs explicitly to put some downward
pressure on the number of partitions we select, but I just chose to cap
it for now.

> For us, removing the sort is a big deal, because we're working with
> >100M rows regularly. It's more complicated though, because the sort is
> usually enforced by COUNT(DISTINCT) and that's not going to disappear
> because of this patch. But that's solvable with a custom aggregate.

I hope this offers you a good alternative.

I'm not sure it will ever beat sort for very high cardinality cases, but
I hope it can beat sort when the group size averages something higher
than one. It will also be safer, so the optimizer can be more aggressive
about choosing HashAgg.

Thank you for taking a look so quickly!

Regards,Jeff Davis







Re: 9.5: Memory-bounded HashAgg

From
"Tomas Vondra"
Date:
On 12 Srpen 2014, 7:06, Jeff Davis wrote:
> On Mon, 2014-08-11 at 01:29 +0200, Tomas Vondra wrote:
>> On 10.8.2014 23:26, Jeff Davis wrote:
>> > This patch is requires the Memory Accounting patch, or something
>> > similar to track memory usage.
>>
>> I think the patch you sent actually includes the accounting patch. Is
>> that on purpose, or by accident?
>
> Accident, thank you.
>
>> So once a group gets into memory, it stays there? That's going to work
>> fine for aggregates with fixed-size state (int4, or generally state that
>> gets allocated and does not grow), but I'm afraid for aggregates with
>> growing state (as for example array_agg and similar) that's not really a
>> solution.
>
> I agree in theory, but for now I'm just not handling that case at all
> because there is other work that needs to be done first. For one thing,
> we would need a way to save the transition state, and we don't really
> have that. In the case of array_agg, the state is not serialized and
> there's no generic way to ask it to serialize itself without finalizing.

Yes and no.

It's true we don't have this ability for aggregates passing state using
'internal', and arguably these are the cases that matter (because those
are the states that tend to "bloat" as more values are passed to the
aggregate).

We can do that for states with a known type (because we have serialize
deserialize methods for them), but we can't really require all aggregates
to use only known types. The 'internal' is there for a reason.

So I think eventually we should to support something like this:

CREATE AGGREGATE myaggregate (   ...   SERIALIZE_FUNC = 'dump_data',   DESERIALIZE_FUNC = 'read_data',   ...
);

That being said, we can't require this from all existing aggregates.
There'll always be aggregates not providing this (for example some old
ones).

So even if we have this, we'll have to support the case when it's not
provided - possibly by using the batching algorithm you provided. What
I imagine is this:
  hitting work_mem limit -> do we know how to dump the aggregate state?
    yes (known type or serialize/deserialize)        => use the batching algorithm from hash join
    no (unknown type, ...)        => use the batching algorithm described in the original message

Now, I'm not trying to make you implement all this - I'm willing to work
on that. Implementing this CREATE AGGREGATE extension is however tightly
coupled with your patch, because that's the only place where it might be
used (that I'm aware of).


> I'm open to ideas. Do you think my patch is going generally in the right
> direction, and we can address this problem later; or do you think we
> need a different approach entirely?

I certainly think having memory-bounded hashagg is a great improvement,
and yes - this patch can get us there. Maybe it won't get us all the way
to the "perfect solution" but so what? We can improve that by further
patches (and I'm certainly willing to spend some time on that).

So thanks a lot for working on this!

>
>> While hacking on the hash join, I envisioned the hash aggregate might
>> work in a very similar manner, i.e. something like this:
>>
>>   * nbatches=1, nbits=0
>>   * when work_mem gets full => nbatches *= 2, nbits += 1
>>   * get rid of half the groups, using nbits from the hash
>>      => dump the current states into 'states.batchno' file
>>      => dump further tuples to 'tuples.batchno' file
>>   * continue until the end, or until work_mem gets full again
>
> It would get a little messy with HashAgg. Hashjoin is dealing entirely
> with tuples; HashAgg deals with tuples and groups.

I don't see why it should get messy? In the end, you have a chunk of
data and a hash for it.


> Also, if the transition state is fixed-size (or even nearly so), it
> makes no sense to remove groups from the hash table before they are
> finished. We'd need to detect that somehow, and it seems almost like two
> different algorithms (though maybe not a bad idea to use a different
> algorithm for things like array_agg).

It just means you need to walk through the hash table, look at the
hashes and dump ~50% of the groups to a file. I'm not sure how difficult
that is with dynahash, though (hashjoin uses a custom hashtable, that
makes this very simple).


> Not saying that it can't be done, but (unless you have an idea) requires
> quite a bit more work than what I did here.
>
>> It also seems to me that the logic of the patch is about this:
>>
>>   * try to lookup the group in the hash table
>>     * found => call the transition function
>>     * not found
>>         * enough space => call transition function
>>         * not enough space => tuple/group goes to a batch
>>
>> Which pretty much means all tuples need to do the lookup first. The nice
>> thing on the hash-join approach is that you don't really need to do the
>> lookup - you just need to peek at the hash whether the group belongs to
>> the current batch (and if not, to which batch it should go).
>
> That's an interesting point. I suspect that, in practice, the cost of
> hashing the tuple is more expensive (or at least not much cheaper than)
> doing a failed lookup.

I think you're missing the point, here. You need to compute the hash in
both cases. And then you either can do a lookup or just peek at the first
few bits of the hash to see whether it's in the current batch or not.

Certainly, doing this:
   batchno = hash & (nbatches - 1);
   if (batchno > curbatch) {      ... not current batch, dump to file ...   }

is much faster than a lookup. Also, as the hash table grows (beyond L3
cache size, which is a few MBs today), it becomes much slower in my
experience - that's one of the lessons I learnt while hacking on the
hashjoin. And we're dealing with hashagg not fitting into work_mem, so
this seems to be relevant.

>> For aggregates using 'internal' to pass pointers that requires some help
>> from the author - serialization/deserialization functions.
>
> Ah, yes, this is what I was referring to earlier.
>
>> >    * EXPLAIN details for disk usage
>> >    * choose number of partitions intelligently
>>
>> What is the purpose of HASH_DISK_MAX_PARTITIONS? I mean, when we decide
>> we need 2048 partitions, why should we use less if we believe it will
>> get us over work_mem?
>
> Because I suspect there are costs in having an extra file around that
> I'm not accounting for directly. We are implicitly assuming that the OS
> will keep around enough buffers for each BufFile to do sequential writes
> when needed. If we create a zillion partitions, then either we end up
> with random I/O or we push the memory burden into the OS buffer cache.

Assuming I understand it correctly, I think this logic is broken. Are you
saying "We'll try to do memory-bounded hashagg, but not for the really
large datasets because of fear we might cause random I/O"?

While I certainly understand your concerns about generating excessive
amount of random I/O, I think the modern filesystem are handling that just
fine (coalescing the writes into mostly sequential writes, etc.). Also,
current hardware is really good at handling this (controllers with write
cache, SSDs etc.).

Also, if hash-join does not worry about number of batches, why should
hashagg worry about that? I expect the I/O patterns to be very similar.

And if you have many batches, it means you have tiny work_mem, compared
to the amount of data. Either you have unreasonably small work_mem
(better fix that) or a lot of data (better have a lot of RAM and good
storage, or you'll suffer anyway).

In any case, trying to fix this by limiting number of partitions seems
like a bad approach. I think factoring those concerns into a costing
model is more appropriate.


> We could try to model those costs explicitly to put some downward
> pressure on the number of partitions we select, but I just chose to cap
> it for now.

OK, understood. We can't get all the goodies in the first version.

>
>> For us, removing the sort is a big deal, because we're working with
>> >100M rows regularly. It's more complicated though, because the sort is
>> usually enforced by COUNT(DISTINCT) and that's not going to disappear
>> because of this patch. But that's solvable with a custom aggregate.
>
> I hope this offers you a good alternative.
>
> I'm not sure it will ever beat sort for very high cardinality cases, but
> I hope it can beat sort when the group size averages something higher
> than one. It will also be safer, so the optimizer can be more aggressive
> about choosing HashAgg.

It's certainly an improvement, although the sort may get there for one
of two reasons:
  (a) COUNT(DISTINCT) -> this is solved by a custom aggregate
  (b) bad estimate of required memory -> this is common for aggregates      passing 'internal' state (planner uses some
quitehigh defaults)
 

Tomas




Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Tue, 2014-08-12 at 14:58 +0200, Tomas Vondra wrote:
> CREATE AGGREGATE myaggregate (
>     ...
>     SERIALIZE_FUNC = 'dump_data',
>     DESERIALIZE_FUNC = 'read_data',
>     ...
> );

Seems reasonable.

> I don't see why it should get messy? In the end, you have a chunk of
> data and a hash for it.

Perhaps it's fine; I'd have to see the approach.

> It just means you need to walk through the hash table, look at the
> hashes and dump ~50% of the groups to a file. 

If you have fixed-size states, why would you *want* to remove the group?
What is gained?

One thing I like about my simple approach is that it returns a good
number of groups after each pass, and then those are completely finished
(returned to the operator above, even). That's impossible with HashJoin
because the hashing all needs to be done before the probe phase begins.

The weakness of my approach is the array_agg case that you mention,
because this approach doesn't offer a way to dump out transition states.
It seems like that could be added later, but let me know if you see a
problem there.

> I think you're missing the point, here. You need to compute the hash in
> both cases. And then you either can do a lookup or just peek at the first
> few bits of the hash to see whether it's in the current batch or not.

I understood that. The point I was trying to make (which might or might
not be true) was that: (a) this only matters for a failed lookup,
because a successful lookup would just go in the hash table anyway; and
(b) a failed lookup probably doesn't cost much compared to all of the
other things that need to happen along that path.

I should have chosen a better example though. For instance: if the
lookup fails, we need to write the tuple, and writing the tuple is sure
to swamp the cost of a failed hash lookup.

> is much faster than a lookup. Also, as the hash table grows (beyond L3
> cache size, which is a few MBs today), it becomes much slower in my
> experience - that's one of the lessons I learnt while hacking on the
> hashjoin. And we're dealing with hashagg not fitting into work_mem, so
> this seems to be relevant.

Could be, but this is also the path that goes to disk, so I'm not sure
how significant it is.

> > Because I suspect there are costs in having an extra file around that
> > I'm not accounting for directly. We are implicitly assuming that the OS
> > will keep around enough buffers for each BufFile to do sequential writes
> > when needed. If we create a zillion partitions, then either we end up
> > with random I/O or we push the memory burden into the OS buffer cache.
> 
> Assuming I understand it correctly, I think this logic is broken. Are you
> saying "We'll try to do memory-bounded hashagg, but not for the really
> large datasets because of fear we might cause random I/O"?

No, the memory is still bounded even for very high cardinality inputs
(ignoring array_agg case for now). When a partition is processed later,
it also might exhaust work_mem, and need to write out tuples to its own
set of partitions. This allows memory-bounded execution to succeed even
if the number of partitions each iteration is one, though it will result
in repeated I/O for the same tuple.

> While I certainly understand your concerns about generating excessive
> amount of random I/O, I think the modern filesystem are handling that just
> fine (coalescing the writes into mostly sequential writes, etc.). Also,
> current hardware is really good at handling this (controllers with write
> cache, SSDs etc.).

All of that requires memory. We shouldn't dodge a work_mem limit by
using the kernel's memory, instead.

> Also, if hash-join does not worry about number of batches, why should
> hashagg worry about that? I expect the I/O patterns to be very similar.

One difference with HashJoin is that, to create a large number of
batches, the inner side must be huge, which is not the expected
operating mode for HashJoin[1]. Regardless, every partition that is
active *does* have a memory cost. HashJoin might ignore that cost, but
that doesn't make it right.

I think the right analogy here is to Sort's poly-phase merge -- it
doesn't merge all of the runs at once; see the comments at the top of
tuplesort.c.

In other words, sometimes it's better to have fewer partitions (for
hashing) or merge fewer runs at once (for sorting). It does more
repeated I/O, but the I/O is more sequential.

> In any case, trying to fix this by limiting number of partitions seems
> like a bad approach. I think factoring those concerns into a costing
> model is more appropriate.

Fair enough. I haven't modeled the cost yet; and I agree that an upper
limit is quite crude.

>    (a) COUNT(DISTINCT) -> this is solved by a custom aggregate

Is there a reason we can't offer a hash-based strategy for this one? It
would have to be separate hash tables for different aggregates, but it
seems like it could work.

>    (b) bad estimate of required memory -> this is common for aggregates
>        passing 'internal' state (planner uses some quite high defaults)

Maybe some planner hooks? Ideas?

Regards,Jeff Davis





Re: 9.5: Memory-bounded HashAgg

From
"Tomas Vondra"
Date:
On 13 Srpen 2014, 7:02, Jeff Davis wrote:
> On Tue, 2014-08-12 at 14:58 +0200, Tomas Vondra wrote:
>> CREATE AGGREGATE myaggregate (
>>     ...
>>     SERIALIZE_FUNC = 'dump_data',
>>     DESERIALIZE_FUNC = 'read_data',
>>     ...
>> );
>
> Seems reasonable.
>
>> I don't see why it should get messy? In the end, you have a chunk of
>> data and a hash for it.
>
> Perhaps it's fine; I'd have to see the approach.
>
>> It just means you need to walk through the hash table, look at the
>> hashes and dump ~50% of the groups to a file.
>
> If you have fixed-size states, why would you *want* to remove the group?
> What is gained?

You're right that for your batching algorithm (based on lookups), that's
not really needed, and keeping everything in memory is a good initial
approach.

My understanding of the batching algorithm (and I may be wrong on this
one) is that once you choose the number of batches, it's pretty much
fixed. Is that the case?

But what will happen in case of significant cardinality underestimate?
I.e. what will happen if you decide to use 16 batches, and then find
out 256 would be more appropriate? I believe you'll end up with batches
16x the size you'd want, most likely exceeding work_mem.

Do I understand that correctly?

But back to the removal of aggregate states from memory (irrespectedly
of the size) - this is what makes the hashjoin-style batching possible,
because it:
  (a) makes the batching decision simple (peeking at hash)  (b) makes it possible to repeatedly increase the number of
batches (c) provides a simple trigger for the increase of batch count
 

Some of this might be achievable even with keeping the states in memory.
I mean, you can add more batches on the fly, and handle this similarly
to hash join, while reading tuples from the batch (moving the tuples to
the proper batch, if needed).

The problem is that once you have the memory full, there's no trigger
to alert you that you should increase the number of batches again.

> One thing I like about my simple approach is that it returns a good
> number of groups after each pass, and then those are completely finished
> (returned to the operator above, even). That's impossible with HashJoin
> because the hashing all needs to be done before the probe phase begins.

The hash-join approach returns ~1/N groups after each pass, so I fail to
see how this is better?

> The weakness of my approach is the array_agg case that you mention,
> because this approach doesn't offer a way to dump out transition states.
> It seems like that could be added later, but let me know if you see a
> problem there.

Right. Let's not solve this in the first version of the patch.

>> I think you're missing the point, here. You need to compute the hash in
>> both cases. And then you either can do a lookup or just peek at the
>> first
>> few bits of the hash to see whether it's in the current batch or not.
>
> I understood that. The point I was trying to make (which might or might
> not be true) was that: (a) this only matters for a failed lookup,
> because a successful lookup would just go in the hash table anyway; and
> (b) a failed lookup probably doesn't cost much compared to all of the
> other things that need to happen along that path.

OK. I don't have numbers proving otherwise at hand, and you're probably
right that once the batching kicks in, the other parts are likely more
expensive than this.

> I should have chosen a better example though. For instance: if the
> lookup fails, we need to write the tuple, and writing the tuple is sure
> to swamp the cost of a failed hash lookup.
>
>> is much faster than a lookup. Also, as the hash table grows (beyond L3
>> cache size, which is a few MBs today), it becomes much slower in my
>> experience - that's one of the lessons I learnt while hacking on the
>> hashjoin. And we're dealing with hashagg not fitting into work_mem, so
>> this seems to be relevant.
>
> Could be, but this is also the path that goes to disk, so I'm not sure
> how significant it is.

It may or may not go to the disk, actually. The fact that you're doing
batching means it's written to a temporary file, but with large amounts
of RAM it may not get written to disk.

That's because the work_mem is only a very soft guarantee - a query may
use multiple work_mem buffers in a perfectly legal way. So the users ten
to set this rather conservatively. For example we have >256GB of RAM in
each machine, usually <24 queries running at the same time and yet we
have only work_mem=800MB. On the few occasions when a hash join is
batched, it usually remains in page cache and never actually gets writte
to disk. Or maybe it gets written, but it's still in the page cache so
the backend never notices that.

It's true there are other costs though - I/O calls, etc. So it's not free.

>
>> > Because I suspect there are costs in having an extra file around that
>> > I'm not accounting for directly. We are implicitly assuming that the
>> OS
>> > will keep around enough buffers for each BufFile to do sequential
>> writes
>> > when needed. If we create a zillion partitions, then either we end up
>> > with random I/O or we push the memory burden into the OS buffer cache.
>>
>> Assuming I understand it correctly, I think this logic is broken. Are
>> you
>> saying "We'll try to do memory-bounded hashagg, but not for the really
>> large datasets because of fear we might cause random I/O"?
>
> No, the memory is still bounded even for very high cardinality inputs
> (ignoring array_agg case for now). When a partition is processed later,
> it also might exhaust work_mem, and need to write out tuples to its own
> set of partitions. This allows memory-bounded execution to succeed even
> if the number of partitions each iteration is one, though it will result
> in repeated I/O for the same tuple.

Aha! And the new batches are 'private' to the work item, making it a bit
recursive, right? Is there any reason not to just double the number of
batches globally? I mean, why not to just say
  nbatches *= 2

which effectively splits each batch into two? Half the groups stays
in the current one, half is moved to a new one.

It makes it almost perfectly sequential, because you're reading
a single batch, keeping half the tuples and writing the other half to
a new batch. If you increase the number of batches a bit more, e.g.
  nbatches *= 4

then you're keeping 1/4 and writing into 3 new batches.

That seems like a better solution to me.

>
>> While I certainly understand your concerns about generating excessive
>> amount of random I/O, I think the modern filesystem are handling that
>> just
>> fine (coalescing the writes into mostly sequential writes, etc.). Also,
>> current hardware is really good at handling this (controllers with write
>> cache, SSDs etc.).
>
> All of that requires memory. We shouldn't dodge a work_mem limit by
> using the kernel's memory, instead.

Sure, saving memory at one place just to waste it somewhere else is
a poor solution. But I don't think work_mem is a memory-saving tool.
I see it as a memory-limiting protection.

>> Also, if hash-join does not worry about number of batches, why should
>> hashagg worry about that? I expect the I/O patterns to be very similar.
>
> One difference with HashJoin is that, to create a large number of
> batches, the inner side must be huge, which is not the expected
> operating mode for HashJoin[1]. Regardless, every partition that is
> active *does* have a memory cost. HashJoin might ignore that cost, but
> that doesn't make it right.
>
> I think the right analogy here is to Sort's poly-phase merge -- it
> doesn't merge all of the runs at once; see the comments at the top of
> tuplesort.c.
>
> In other words, sometimes it's better to have fewer partitions (for
> hashing) or merge fewer runs at once (for sorting). It does more
> repeated I/O, but the I/O is more sequential.

OK. I don't have a clear opinion on this yet. I don't think the costs
are that high, but maybe I'm wrong in this.

It also seems to me that using HASH_DISK_MAX_PARTITIONS, and then allowing
each work item to create it's own set of additional partitions effectively
renders the HASH_DISK_MAX_PARTITIONS futile.

>
>> In any case, trying to fix this by limiting number of partitions seems
>> like a bad approach. I think factoring those concerns into a costing
>> model is more appropriate.
>
> Fair enough. I haven't modeled the cost yet; and I agree that an upper
> limit is quite crude.

OK, let's keep the HASH_DISK_MAX_PARTITIONS for now and improve this later.

>
>>    (a) COUNT(DISTINCT) -> this is solved by a custom aggregate
>
> Is there a reason we can't offer a hash-based strategy for this one? It
> would have to be separate hash tables for different aggregates, but it
> seems like it could work.

I don't know what is the exact reasoning, but apparently it's how the
current planner works. Whenever it sees COUNT(DISTINCT) it enforces a
sort. I suspect this is because of fear of memory requirements (because
a distinct requires keeping all the items), which might have been
perfectly valid when this was designed.

>
>>    (b) bad estimate of required memory -> this is common for aggregates
>>        passing 'internal' state (planner uses some quite high defaults)
>
> Maybe some planner hooks? Ideas?

My plan is to add this to the CREATE AGGREGATE somehow - either as a
constant parameter (allowing to set a custom constant size) or a callback
to a 'sizing' function (estimating the size based on number of items,
average width and ndistinct in the group). In any case, this is
independent of this patch.

I think that for this patch we may either keep the current batching
strategy (and proceed with the TODO items you listed in your first patch).

Or we may investigate the alternative (hash-join-like) batching strategy.
I suppose this may be done after the TODO items, but I'm afrait it may
impact some of them (e.g. the costing). This can be done with the
simple aggregates (using fixed-size types for states), but eventually
it will require adding the serialize/deserialize to CREATE AGGREGATE.

Now, I'm very in favor of the #2 choice (because that's what works best
with the aggregates I need to use), but I'm also a big fan of the
'availability beats unavailable features 100% of the time' principle.

So if you decide to go for #1 now, I'm fine with that. I'm open to do
the next step - either as a follow-up patch, or maybe as an alternative
spin-off of your patch.

regards
Tomas





Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 13.8.2014 12:31, Tomas Vondra wrote:
> On 13 Srpen 2014, 7:02, Jeff Davis wrote:
>> On Tue, 2014-08-12 at 14:58 +0200, Tomas Vondra wrote:
>>>
>>>    (b) bad estimate of required memory -> this is common for aggregates
>>>        passing 'internal' state (planner uses some quite high defaults)
>>
>> Maybe some planner hooks? Ideas?
> 
> My plan is to add this to the CREATE AGGREGATE somehow - either as a
> constant parameter (allowing to set a custom constant size) or a callback
> to a 'sizing' function (estimating the size based on number of items,
> average width and ndistinct in the group). In any case, this is
> independent of this patch.

FWIW, the constant parameter is already implemented for 9.4. Adding the
function seems possible - the most difficult part seems to be getting
all the necessary info before count_agg_clauses() is called. For example
now dNumGroups is evaluated after the call (and tuples/group seems like
a useful info for sizing).

While this seems unrelated to the patch discussed here, it's true that:
 (a) good estimate of the memory is important for initial estimate of     batch count
 (b) dynamic increase of batch count alleviates issues from     underestimating the amount of memory necessary for
states


But let's leave this out of scope for the current patch.


regards
Tomas



Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
I think the hash-join like approach is reasonable, but I also think
you're going to run into a lot of challenges that make it more complex
for HashAgg. For instance, let's say you have the query:
 SELECT x, array_agg(y) FROM foo GROUP BY x;

Say the transition state is an array (for the sake of simplicity), so
the hash table has something like:
 1000 => {7,   8,  9} 1001 => {12, 13, 14}

You run out of memory and need to split the hash table, so you scan the
hash table and find that group 1001 needs to be written to disk. So you
serialize the key and array and write them out.

Then the next tuple you get is (1001, 19). What do you do? Create a new
group 1001 => {19} (how do you combine it later with the first one)? Or
try to fetch the existing group 1001 from disk and advance it (horrible
random I/O)?



On Wed, 2014-08-13 at 12:31 +0200, Tomas Vondra wrote:
> My understanding of the batching algorithm (and I may be wrong on this
> one) is that once you choose the number of batches, it's pretty much
> fixed. Is that the case?

It's only fixed for that one "work item" (iteration). A different K can
be selected if memory is exhausted again. But you're right: this is a
little less flexible than HashJoin.

> But what will happen in case of significant cardinality underestimate?
> I.e. what will happen if you decide to use 16 batches, and then find
> out 256 would be more appropriate? I believe you'll end up with batches
> 16x the size you'd want, most likely exceeding work_mem.

Yes, except that work_mem would never be exceeded. If the partitions are
16X work_mem, then each would be added as another work_item, and
hopefully it would choose better the next time.

> > One thing I like about my simple approach is that it returns a good
> > number of groups after each pass, and then those are completely finished
> > (returned to the operator above, even). That's impossible with HashJoin
> > because the hashing all needs to be done before the probe phase begins.
> 
> The hash-join approach returns ~1/N groups after each pass, so I fail to
> see how this is better?

You can't return any tuples until you begin the probe phase, and that
doesn't happen until you've hashed the entire inner side (which involves
splitting and other work). With my patch, it will return some tuples
after the first scan. Perhaps I'm splitting hairs here, but the idea of
finalizing some groups as early as possible seems appealing.

> Aha! And the new batches are 'private' to the work item, making it a bit
> recursive, right? Is there any reason not to just double the number of
> batches globally?

I didn't quite follow this proposal.

> It also seems to me that using HASH_DISK_MAX_PARTITIONS, and then allowing
> each work item to create it's own set of additional partitions effectively
> renders the HASH_DISK_MAX_PARTITIONS futile.

It's the number of active partitions that matter, because that's what
causes the random I/O.

Regards,Jeff Davis






Re: 9.5: Memory-bounded HashAgg

From
Tom Lane
Date:
Jeff Davis <pgsql@j-davis.com> writes:
> I think the hash-join like approach is reasonable, but I also think
> you're going to run into a lot of challenges that make it more complex
> for HashAgg. For instance, let's say you have the query:

>   SELECT x, array_agg(y) FROM foo GROUP BY x;

> Say the transition state is an array (for the sake of simplicity), so
> the hash table has something like:

>   1000 => {7,   8,  9}
>   1001 => {12, 13, 14}

> You run out of memory and need to split the hash table, so you scan the
> hash table and find that group 1001 needs to be written to disk. So you
> serialize the key and array and write them out.

> Then the next tuple you get is (1001, 19). What do you do? Create a new
> group 1001 => {19} (how do you combine it later with the first one)? Or
> try to fetch the existing group 1001 from disk and advance it (horrible
> random I/O)?

If you're following the HashJoin model, then what you do is the same thing
it does: you write the input tuple back out to the pending batch file for
the hash partition that now contains key 1001, whence it will be processed
when you get to that partition.  I don't see that there's any special case
here.

The fly in the ointment is how to serialize a partially-computed aggregate
state value to disk, if it's not of a defined SQL type.
        regards, tom lane



Re: 9.5: Memory-bounded HashAgg

From
"Tomas Vondra"
Date:
On 14 Srpen 2014, 9:22, Jeff Davis wrote:
> I think the hash-join like approach is reasonable, but I also think
> you're going to run into a lot of challenges that make it more complex
> for HashAgg. For instance, let's say you have the query:
>
>   SELECT x, array_agg(y) FROM foo GROUP BY x;
>
> Say the transition state is an array (for the sake of simplicity), so
> the hash table has something like:
>
>   1000 => {7,   8,  9}
>   1001 => {12, 13, 14}
>
> You run out of memory and need to split the hash table, so you scan the
> hash table and find that group 1001 needs to be written to disk. So you
> serialize the key and array and write them out.
>
> Then the next tuple you get is (1001, 19). What do you do? Create a new
> group 1001 => {19} (how do you combine it later with the first one)? Or
> try to fetch the existing group 1001 from disk and advance it (horrible
> random I/O)?

No, that's not how it works. The batching algorithm works with a hash of
the group. For example let's suppose you do this:
  batchno = hash % nbatches;

which essentially keeps the last few bits of the hash. 0 bits for
nbatches=1, 1 bit for nbatches=2, 2 bits for nbatches=4 etc.

So let's say we have 2 batches, and we're working on the first batch.
That means we're using 1 bit:
   batchno = hash % 2;

and for the first batch we're keeping only groups with batchno=0. So
only groups with 0 as the last bit are in batchno==0.

When running out of memory, you simply do
   nbatches *= 2

and start considering one more bit from the hash. So if you had this
before:
   group_a => batchno=0 => {7,   8,  9}   group_b => batchno=0 => {12, 13, 14}   group_c => batchno=0 => {23,  1, 45}
group_d=> batchno=0 => {77, 37, 54}
 

(where batchno is a bit string), after doubling the number of batches
you get something like this:
   group_a => batchno=10 => {7,   8,  9}   group_b => batchno=00 => {12, 13, 14}   group_c => batchno=00 => {23,  1,
45}  group_d => batchno=10 => {77, 37, 54}
 

So you have only two possible batchno values here, depending on the new
most-significant bit - either you got 0 (which means it's still in the
current batch) or 1 (and you need to move it to the temp file of the
new batch).

Then, when you get a new tuple, you get it's hash and do a simple check
of the last few bits - effectively computing batchno just like before
  batchno = hash % nbatches;

Either it belongs to the current batch (and either it's in the hash
table, or you add it there), or it's not - in that case write it to a
temp file.

It gets a bit more complex when you increase the number of batches
repeatedly (effectively you need to do the check/move when reading the
batches).

For sure, it's not for free - it may write to quite a few files. Is it
more expensive than what you propose? I'm not sure about that. With
your batching scheme, you'll end up with lower number of large batches,
and you'll need to read and split them, possibly repeatedly. The
batching scheme from hashjoin minimizes this.

IMHO the only way to find out is to some actual tests.

> On Wed, 2014-08-13 at 12:31 +0200, Tomas Vondra wrote:
>> My understanding of the batching algorithm (and I may be wrong on this
>> one) is that once you choose the number of batches, it's pretty much
>> fixed. Is that the case?
>
> It's only fixed for that one "work item" (iteration). A different K can
> be selected if memory is exhausted again. But you're right: this is a
> little less flexible than HashJoin.
>
>> But what will happen in case of significant cardinality underestimate?
>> I.e. what will happen if you decide to use 16 batches, and then find
>> out 256 would be more appropriate? I believe you'll end up with batches
>> 16x the size you'd want, most likely exceeding work_mem.
>
> Yes, except that work_mem would never be exceeded. If the partitions are
> 16X work_mem, then each would be added as another work_item, and
> hopefully it would choose better the next time.

Only for aggregates with fixed-length state. For aggregates with growing
serialize/deserialize, the states may eventually exceeding work_mem.

>> > One thing I like about my simple approach is that it returns a good
>> > number of groups after each pass, and then those are completely
>> finished
>> > (returned to the operator above, even). That's impossible with
>> HashJoin
>> > because the hashing all needs to be done before the probe phase
>> begins.
>>
>> The hash-join approach returns ~1/N groups after each pass, so I fail to
>> see how this is better?
>
> You can't return any tuples until you begin the probe phase, and that
> doesn't happen until you've hashed the entire inner side (which involves
> splitting and other work). With my patch, it will return some tuples
> after the first scan. Perhaps I'm splitting hairs here, but the idea of
> finalizing some groups as early as possible seems appealing.

I fail to see how this is different from your approach? How can you
output any tuples before processing the whole inner relation?

After the first scan, the hash-join approach is pretty much guaranteed
to output ~1/N tuples.

>> Aha! And the new batches are 'private' to the work item, making it a bit
>> recursive, right? Is there any reason not to just double the number of
>> batches globally?
>
> I didn't quite follow this proposal.

Again, it's about a difference between your batching approach and the
hashjoin-style batching. The hashjoin batching keeps a single level of
batches, and when hitting work_mem just doubles the number of batches.

Your approach is to do multi-level batching, and I was thinking whether
it'd be possible to use the same approach (single level). But in
retrospect it probably does not make much sense, because the multi-level
batching is one of the points of the proposed approach.

>> It also seems to me that using HASH_DISK_MAX_PARTITIONS, and then
>> allowing
>> each work item to create it's own set of additional partitions
>> effectively
>> renders the HASH_DISK_MAX_PARTITIONS futile.
>
> It's the number of active partitions that matter, because that's what
> causes the random I/O.

OK, point taken. While I share the general concern about random I/O,
I'm not sure this case is particularly problematic.

regard
Tomas




Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:
> If you're following the HashJoin model, then what you do is the same thing
> it does: you write the input tuple back out to the pending batch file for
> the hash partition that now contains key 1001, whence it will be processed
> when you get to that partition.  I don't see that there's any special case
> here.

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.

Regards,Jeff Davis





Re: 9.5: Memory-bounded HashAgg

From
Atri Sharma
Date:


On Thursday, August 14, 2014, Jeff Davis <pgsql@j-davis.com> wrote:
On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:
> If you're following the HashJoin model, then what you do is the same thing
> it does: you write the input tuple back out to the pending batch file for
> the hash partition that now contains key 1001, whence it will be processed
> when you get to that partition.  I don't see that there's any special case
> here.

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.


+1

Not to mention future cases if we start maintaining multiple state values,in regarded to grouping sets.

Regards,

Atri


--
Regards,
 
Atri
l'apprenant

Re: 9.5: Memory-bounded HashAgg

From
Tom Lane
Date:
Jeff Davis <pgsql@j-davis.com> writes:
> HashJoin only deals with tuples. With HashAgg, you have to deal with a
> mix of tuples and partially-computed aggregate state values. Not
> impossible, but it is a little more awkward than HashJoin.

Not sure that I follow your point.  You're going to have to deal with that
no matter what, no?

I guess in principle you could avoid the need to dump agg state to disk.
What you'd have to do is write out tuples to temp files even when you
think you've processed them entirely, so that if you later realize you
need to split the current batch, you can recompute the states of the
postponed aggregates from scratch (ie from the input tuples) when you get
around to processing the batch they got moved to.  This would avoid
confronting the how-to-dump-agg-state problem, but it seems to have little
else to recommend it.  Even if splitting a batch is a rare occurrence,
the killer objection here is that even a totally in-memory HashAgg would
have to write all its input to a temp file, on the small chance that it
would exceed work_mem and need to switch to batching.
        regards, tom lane



Re: 9.5: Memory-bounded HashAgg

From
"Tomas Vondra"
Date:
On 14 Srpen 2014, 18:12, Tom Lane wrote:
> Jeff Davis <pgsql@j-davis.com> writes:
>> HashJoin only deals with tuples. With HashAgg, you have to deal with a
>> mix of tuples and partially-computed aggregate state values. Not
>> impossible, but it is a little more awkward than HashJoin.
>
> Not sure that I follow your point.  You're going to have to deal with that
> no matter what, no?

That is not how the patch work. Once the memory consumption hits work_mem,
it keeps the already existing groups in memory, and only stops creating
new groups. For each tuple, hashagg does a lookup - if the group is
already in memory, it performs the transition, otherwise it writes the
tuple to disk (and does some batching, but that's mostly irrelevant here).

This way it's not necessary to dump the partially-computed states, and for
fixed-size states it actually limits the amount of consumed memory. For
variable-length aggregates (array_agg et.al.) not so much.

> I guess in principle you could avoid the need to dump agg state to disk.
> What you'd have to do is write out tuples to temp files even when you
> think you've processed them entirely, so that if you later realize you
> need to split the current batch, you can recompute the states of the
> postponed aggregates from scratch (ie from the input tuples) when you get
> around to processing the batch they got moved to.  This would avoid
> confronting the how-to-dump-agg-state problem, but it seems to have little
> else to recommend it.  Even if splitting a batch is a rare occurrence,
> the killer objection here is that even a totally in-memory HashAgg would
> have to write all its input to a temp file, on the small chance that it
> would exceed work_mem and need to switch to batching.

Yeah, I think putting this burden on each hashagg is not a good thing.

I was thinking about is an automatic fall-back - try to do an in-memory
hash-agg. When you hit work_mem limit, see how far we are (have we scanned
10% or 90% of tuples?), and decide whether to restart with batching.

But I think there's no single solution, fixing all the possible cases. I
think the patch proposed here is a solid starting point, that may be
improved and extended by further patches. Eventually, what I think might
work is this combination of approaches:

1) fixed-size states and states with serialize/deserialize methods
  => hashjoin-like batching (i.e. dumping both tuples and states)

2) variable-size states without serialize/deserialize
  => Jeff's approach (keep states in memory, dump tuples)  => possibly with the rescan fall-back, for quickly growing
states


Tomas




Re: 9.5: Memory-bounded HashAgg

From
"Tomas Vondra"
Date:
On 14 Srpen 2014, 18:02, Atri Sharma wrote:
> On Thursday, August 14, 2014, Jeff Davis <pgsql@j-davis.com> wrote:
>
>> On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:
>> > If you're following the HashJoin model, then what you do is the same
>> thing
>> > it does: you write the input tuple back out to the pending batch file
>> for
>> > the hash partition that now contains key 1001, whence it will be
>> processed
>> > when you get to that partition.  I don't see that there's any special
>> case
>> > here.
>>
>> HashJoin only deals with tuples. With HashAgg, you have to deal with a
>> mix of tuples and partially-computed aggregate state values. Not
>> impossible, but it is a little more awkward than HashJoin.
>>
>>
> +1
>
> Not to mention future cases if we start maintaining multiple state
> values,in regarded to grouping sets.

So what would you do for aggregates where the state is growing quickly?
Say, things like median() or array_agg()?

I think that "we can't do that for all aggregates" does not imply "we must
not do that at all."

There will always be aggregates not implementing dumping state for various
reasons, and in those cases the proposed approach is certainly a great
improvement. I like it, and I hope it will get committed.

But maybe for aggregates supporting serialize/deserialize of the state
(including all aggregates using known types, not just fixed-size types) a
hashjoin-like batching would be better? I can name a few custom aggregates
that'd benefit tremendously from this.

Just to be clear - this is certainly non-trivial to implement, and I'm not
trying to force anyone (e.g. Jeff) to implement the ideas I proposed. I'm
ready to spend time on reviewing the current patch, implement the approach
I proposed and compare the behaviour.

Kudos to Jeff for working on this.

Tomas




Re: 9.5: Memory-bounded HashAgg

From
Tom Lane
Date:
"Tomas Vondra" <tv@fuzzy.cz> writes:
> On 14 Srpen 2014, 18:12, Tom Lane wrote:
>> Not sure that I follow your point.  You're going to have to deal with that
>> no matter what, no?

> That is not how the patch work. Once the memory consumption hits work_mem,
> it keeps the already existing groups in memory, and only stops creating
> new groups.

Oh?  So if we have aggregates like array_agg whose memory footprint
increases over time, the patch completely fails to avoid bloat?

I might think a patch with such a limitation was useful, if it weren't
for the fact that aggregates of that nature are a large part of the
cases where the planner misestimates the table size in the first place.
Any complication that we add to nodeAgg should be directed towards
dealing with cases that the planner is likely to get wrong.
        regards, tom lane



Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Thu, 2014-08-14 at 16:17 +0200, Tomas Vondra wrote:
> Either it belongs to the current batch (and either it's in the hash
> table, or you add it there), or it's not - in that case write it to a
> temp file.

I think the part you left out is that you need two files per batch: one
for the dumped-out partially-computed state values, and one for the
tuples.

In other words, you haven't really discussed the step where you reunite
the tuples with that partially-computed state.

> For sure, it's not for free - it may write to quite a few files. Is it
> more expensive than what you propose? I'm not sure about that. With
> your batching scheme, you'll end up with lower number of large batches,
> and you'll need to read and split them, possibly repeatedly. The
> batching scheme from hashjoin minimizes this.

My approach only has fewer batches if it elects to have fewer batches,
which might happen for two reasons:1. A cardinality misestimate. This certainly could happen, but we do
have useful numbers to work from (we know the number of tuples and
distincts that we've read so far), so it's far from a blind guess. 2. We're concerned about the random I/O from way too
manypartitions.
 

> I fail to see how this is different from your approach? How can you
> output any tuples before processing the whole inner relation?

Right, the only thing I avoid is scanning the hash table and dumping out
the groups.

This isn't a major distinction, more like "my approach does a little
less work before returning tuples", and I'm not even sure I can defend
that, so I'll retract this point.

> Your approach is to do multi-level batching, and I was thinking whether
> it'd be possible to use the same approach (single level). But in
> retrospect it probably does not make much sense, because the multi-level
> batching is one of the points of the proposed approach.

Now that I think about it, many of the points we discussed could
actually work with either approach: * In my approach, if I need more partitions, I could create more in
much the same way as HashJoin to keep it single-level (as you suggest
above). * In your approach, if there are too many partitions, you could avoid
random I/O by intentionally putting tuples from multiple partitions in a
single file and moving them while reading. * If given a way to write out the partially-computed states, I could
evict some groups from the hash table to keep an array_agg() bounded.

Our approaches only differ on one fundamental trade-off that I see: (A) My approach requires a hash lookup of an
already-computedhash for
 
every incoming tuple, not only the ones going into the hash table. (B) Your approach requires scanning the hash table
anddumping out the
 
states every time the hash table fills up, which therefore requires a
way to dump out the partial states.

You could probably win the argument by pointing out that (A) is O(N) and
(B) is O(log2(N)). But I suspect that cost (A) is very low.

Unfortunately, it would take some effort to test your approach because
we'd actually need a way to write out the partially-computed state, and
the algorithm itself seems a little more complex. So I'm not really sure
how to proceed.

Regards,Jeff Davis





Re: 9.5: Memory-bounded HashAgg

From
Atri Sharma
Date:



On Thu, Aug 14, 2014 at 10:21 PM, Tomas Vondra <tv@fuzzy.cz> wrote:
On 14 Srpen 2014, 18:02, Atri Sharma wrote:
> On Thursday, August 14, 2014, Jeff Davis <pgsql@j-davis.com> wrote:
>
>> On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:
>> > If you're following the HashJoin model, then what you do is the same
>> thing
>> > it does: you write the input tuple back out to the pending batch file
>> for
>> > the hash partition that now contains key 1001, whence it will be
>> processed
>> > when you get to that partition.  I don't see that there's any special
>> case
>> > here.
>>
>> HashJoin only deals with tuples. With HashAgg, you have to deal with a
>> mix of tuples and partially-computed aggregate state values. Not
>> impossible, but it is a little more awkward than HashJoin.
>>
>>
> +1
>
> Not to mention future cases if we start maintaining multiple state
> values,in regarded to grouping sets.

So what would you do for aggregates where the state is growing quickly?
Say, things like median() or array_agg()?

I think that "we can't do that for all aggregates" does not imply "we must
not do that at all."

There will always be aggregates not implementing dumping state for various
reasons, and in those cases the proposed approach is certainly a great
improvement. I like it, and I hope it will get committed.

But maybe for aggregates supporting serialize/deserialize of the state
(including all aggregates using known types, not just fixed-size types) a
hashjoin-like batching would be better? I can name a few custom aggregates
that'd benefit tremendously from this.

Yeah, could work, but is it worth adding additional paths (assuming this patch gets committed) for some aggregates? I think we should do a further analysis on the use case.

Just to be clear - this is certainly non-trivial to implement, and I'm not
trying to force anyone (e.g. Jeff) to implement the ideas I proposed. I'm
ready to spend time on reviewing the current patch, implement the approach
I proposed and compare the behaviour.

Totally agreed. It would be a different approach, albeit as you said, the approach can be done off the current patch.

Kudos to Jeff for working on this.

Agreed :)
 




--
Regards,
 
Atri
l'apprenant

Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Thu, 2014-08-14 at 12:53 -0400, Tom Lane wrote:
> Oh?  So if we have aggregates like array_agg whose memory footprint
> increases over time, the patch completely fails to avoid bloat?

Yes, in its current form.

> I might think a patch with such a limitation was useful, if it weren't
> for the fact that aggregates of that nature are a large part of the
> cases where the planner misestimates the table size in the first place.
> Any complication that we add to nodeAgg should be directed towards
> dealing with cases that the planner is likely to get wrong.

In my experience, the planner has a lot of difficulty estimating the
cardinality unless it's coming from a base table without any operators
above it (other than maybe a simple predicate). This is probably a lot
more common than array_agg problems, simply because array_agg is
relatively rare compared with GROUP BY in general.

Also, there are also cases where my patch should win against Sort even
when it does go to disk. For instance, high enough cardinality to exceed
work_mem, but also a large enough group size. Sort will have to deal
with all of the tuples before it can group any of them, whereas HashAgg
can group at least some of them along the way.

Consider the skew case where the cardinality is 2M, work_mem fits 1M
groups, and the input consists of the keys 1..1999999 mixed randomly
inside one billion zeros. (Aside: if the input is non-random, you may
not get the skew value before the hash table fills up, in which case
HashAgg is just as bad as Sort.)

That being said, we can hold out for an array_agg fix if desired. As I
pointed out in another email, my proposal is compatible with the idea of
dumping groups out of the hash table, and does take some steps in that
direction.

Regards,Jeff Davis





Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 14.8.2014 18:54, Jeff Davis wrote:
> On Thu, 2014-08-14 at 16:17 +0200, Tomas Vondra wrote:
>> Either it belongs to the current batch (and either it's in the hash
>> table, or you add it there), or it's not - in that case write it to a
>> temp file.
>
> I think the part you left out is that you need two files per batch: one
> for the dumped-out partially-computed state values, and one for the
> tuples.
>
> In other words, you haven't really discussed the step where you reunite
> the tuples with that partially-computed state.

No, that's not how the serialize/deserialize should work. The aggregate
needs to store the state as-is, so that after deserializing it gets
pretty much the same thing.

For example, for 'median' the state is the list of all the values
received so far, and when serializing it you have to write all the
values out. After deserializing it, you will get the same list of values.

Some aggregates may use complex data structures that may need more
elaborate serialize.

>> For sure, it's not for free - it may write to quite a few files. Is it
>> more expensive than what you propose? I'm not sure about that. With
>> your batching scheme, you'll end up with lower number of large batches,
>> and you'll need to read and split them, possibly repeatedly. The
>> batching scheme from hashjoin minimizes this.
>
> My approach only has fewer batches if it elects to have fewer batches,
> which might happen for two reasons:
>  1. A cardinality misestimate. This certainly could happen, but we do
> have useful numbers to work from (we know the number of tuples and
> distincts that we've read so far), so it's far from a blind guess.
>  2. We're concerned about the random I/O from way too many partitions.

OK. We can't really do much with the cardinality estimate.

As for the random IO concerns, I did a quick test to see how this
behaves. I used a HP ProLiant DL380 G5 (i.e. a quite old machine, from
2006-09 if I'm not mistaken). 16GB RAM, RAID10 on 6 x 10k SAS drives,
512MB write cache. So a quite lousy machine, considering today's standards.

I used a simple C program (attached) that creates N files, and writes
into them in a round-robin fashion until a particular file size is
reached. I opted for 64GB total size, 1kB writes.

    ./iotest filecount filesize writesize

File size is in MB, writesize is in bytes. So for example this writes 64
files, each 1GB, using 512B writes.

    ./iotest 64 1024 512

Measured is duration before/after fsync (in seconds):

    files   |    file size  |  before  fsync |  after fsync
   ---------------------------------------------------------
    32      |      2048     |        290.16  |      294.33
    64      |      1024     |        264.68  |      267.60
    128     |       512     |        278.68  |      283.44
    256     |       256     |        332.11  |      338.45
    1024    |        64     |        419.91  |      425.48
    2048    |        32     |        450.37  |      455.20

So while there is a difference, I don't think it's the 'random I/O wall'
as usually observed on rotational drives. Also, this is 2.6.32 kernel,
and my suspicion is that with a newer one the behaviour would be better.

I also have an SSD in that machine (Intel S3700), so I did the same test
with these results:

    files   |    file size  |  before  fsync |  after fsync
   ---------------------------------------------------------
    32      |      2048     |        445.05  |      464.73
    64      |      1024     |        447.32  |      466.56
    128     |       512     |        446.63  |      465.90
    256     |       256     |        446.64  |      466.19
    1024    |        64     |        511.85  |      523.24
    2048    |        32     |        579.92  |      590.76

So yes, the number of files matter, but I don't think it's strong enough
to draw a clear line on how many batches we allow. Especially
considering how old this machine is (on 3.x kernels, we usually see much
better performance in I/O intensive conditions).


>> I fail to see how this is different from your approach? How can you
>> output any tuples before processing the whole inner relation?
>
> Right, the only thing I avoid is scanning the hash table and dumping out
> the groups.
>
> This isn't a major distinction, more like "my approach does a little
> less work before returning tuples", and I'm not even sure I can defend
> that, so I'll retract this point.
>
>> Your approach is to do multi-level batching, and I was thinking whether
>> it'd be possible to use the same approach (single level). But in
>> retrospect it probably does not make much sense, because the multi-level
>> batching is one of the points of the proposed approach.
>
> Now that I think about it, many of the points we discussed could
> actually work with either approach:
>   * In my approach, if I need more partitions, I could create more in
> much the same way as HashJoin to keep it single-level (as you suggest
> above).
>   * In your approach, if there are too many partitions, you could avoid
> random I/O by intentionally putting tuples from multiple partitions in a
> single file and moving them while reading.
>   * If given a way to write out the partially-computed states, I could
> evict some groups from the hash table to keep an array_agg() bounded.
>
> Our approaches only differ on one fundamental trade-off that I see:
>   (A) My approach requires a hash lookup of an already-computed hash for
> every incoming tuple, not only the ones going into the hash table.
>   (B) Your approach requires scanning the hash table and dumping out the
> states every time the hash table fills up, which therefore requires a
> way to dump out the partial states.
>
> You could probably win the argument by pointing out that (A) is O(N) and
> (B) is O(log2(N)). But I suspect that cost (A) is very low.
>
> Unfortunately, it would take some effort to test your approach because
> we'd actually need a way to write out the partially-computed state, and
> the algorithm itself seems a little more complex. So I'm not really sure
> how to proceed.

I plan to work on this a bit over the next week or two. In any case,
it'll be a limited implementation, but hopefully it will be usable for
some initial testing.

regards
Tomas


Attachment

Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 14.8.2014 21:47, Tomas Vondra wrote:
> On 14.8.2014 18:54, Jeff Davis wrote:
>> On Thu, 2014-08-14 at 16:17 +0200, Tomas Vondra wrote:
>>> Either it belongs to the current batch (and either it's in the hash
>>> table, or you add it there), or it's not - in that case write it to a
>>> temp file.
>>
>> I think the part you left out is that you need two files per batch: one
>> for the dumped-out partially-computed state values, and one for the
>> tuples.
>>
>> In other words, you haven't really discussed the step where you reunite
>> the tuples with that partially-computed state.
> 
> No, that's not how the serialize/deserialize should work. The aggregate
> needs to store the state as-is, so that after deserializing it gets
> pretty much the same thing.
> 
> For example, for 'median' the state is the list of all the values
> received so far, and when serializing it you have to write all the
> values out. After deserializing it, you will get the same list of values.
> 
> Some aggregates may use complex data structures that may need more
> elaborate serialize.
> 
>>> For sure, it's not for free - it may write to quite a few files. Is it
>>> more expensive than what you propose? I'm not sure about that. With
>>> your batching scheme, you'll end up with lower number of large batches,
>>> and you'll need to read and split them, possibly repeatedly. The
>>> batching scheme from hashjoin minimizes this.
>>
>> My approach only has fewer batches if it elects to have fewer batches,
>> which might happen for two reasons:
>>  1. A cardinality misestimate. This certainly could happen, but we do
>> have useful numbers to work from (we know the number of tuples and
>> distincts that we've read so far), so it's far from a blind guess. 
>>  2. We're concerned about the random I/O from way too many partitions.
> 
> OK. We can't really do much with the cardinality estimate.
> 
> As for the random IO concerns, I did a quick test to see how this
> behaves. I used a HP ProLiant DL380 G5 (i.e. a quite old machine, from
> 2006-09 if I'm not mistaken). 16GB RAM, RAID10 on 6 x 10k SAS drives,
> 512MB write cache. So a quite lousy machine, considering today's standards.
> 
> I used a simple C program (attached) that creates N files, and writes
> into them in a round-robin fashion until a particular file size is
> reached. I opted for 64GB total size, 1kB writes.
> 
>     ./iotest filecount filesize writesize
> 
> File size is in MB, writesize is in bytes. So for example this writes 64
> files, each 1GB, using 512B writes.
> 
>     ./iotest 64 1024 512
> 
> Measured is duration before/after fsync (in seconds):
> 
>     files   |    file size  |  before  fsync |  after fsync
>    ---------------------------------------------------------
>     32      |      2048     |        290.16  |      294.33
>     64      |      1024     |        264.68  |      267.60
>     128     |       512     |        278.68  |      283.44
>     256     |       256     |        332.11  |      338.45
>     1024    |        64     |        419.91  |      425.48
>     2048    |        32     |        450.37  |      455.20
> 
> So while there is a difference, I don't think it's the 'random I/O wall'
> as usually observed on rotational drives. Also, this is 2.6.32 kernel,
> and my suspicion is that with a newer one the behaviour would be better.
> 
> I also have an SSD in that machine (Intel S3700), so I did the same test
> with these results:
> 
>     files   |    file size  |  before  fsync |  after fsync
>    ---------------------------------------------------------
>     32      |      2048     |        445.05  |      464.73
>     64      |      1024     |        447.32  |      466.56
>     128     |       512     |        446.63  |      465.90
>     256     |       256     |        446.64  |      466.19
>     1024    |        64     |        511.85  |      523.24
>     2048    |        32     |        579.92  |      590.76
> 
> So yes, the number of files matter, but I don't think it's strong enough
> to draw a clear line on how many batches we allow. Especially
> considering how old this machine is (on 3.x kernels, we usually see much
> better performance in I/O intensive conditions).

And just for fun, I did the same test on a workstation with 8GB of RAM,
S3700 SSD, i5-2500 CPU and kernel 3.12. That is, a more modern
hardware / kernel / ... compared to the machine above.

For a test writing 32GB of data (4x the RAM), I got these results:
   files   | file size  | before  fsync |  after fsync  ------------------------------------------------------
32|    1024    |     171.27    |    175.96        64 |     512    |     165.57    |    170.12       128 |     256    |
  165.29    |    169.95       256 |     128    |     164.69    |    169.62       512 |      64    |     163.98    |
168.90     1024 |      32    |     165.44    |    170.50      2048 |      16    |     165.97    |    171.35      4096 |
     8    |     166.55    |    173.26
 

So, no sign of slowdown at all, in this case. I don't have a rotational
disk in the machine at this moment, so I can't repeat the test. But I
don't expect the impact to be much worse than for the old machine.

I'm not sure whether this proves we should not worry about the number of
batches at all - the old kernel / machines will be with us for some
time. However, I'm not a fan of artificialy limiting the implementation
because of a decade old machines either.

Tomas





Re: 9.5: Memory-bounded HashAgg

From
Robert Haas
Date:
On Thu, Aug 14, 2014 at 2:21 PM, Jeff Davis <pgsql@j-davis.com> wrote:
> On Thu, 2014-08-14 at 12:53 -0400, Tom Lane wrote:
>> Oh?  So if we have aggregates like array_agg whose memory footprint
>> increases over time, the patch completely fails to avoid bloat?
>
> Yes, in its current form.
>
>> I might think a patch with such a limitation was useful, if it weren't
>> for the fact that aggregates of that nature are a large part of the
>> cases where the planner misestimates the table size in the first place.
>> Any complication that we add to nodeAgg should be directed towards
>> dealing with cases that the planner is likely to get wrong.
>
> In my experience, the planner has a lot of difficulty estimating the
> cardinality unless it's coming from a base table without any operators
> above it (other than maybe a simple predicate). This is probably a lot
> more common than array_agg problems, simply because array_agg is
> relatively rare compared with GROUP BY in general.

I think that's right, and I rather like your (Jeff's) approach.  It's
definitely true that we could do better if we have a mechanism for
serializing and deserializing group states, but (1) I think an awful
lot of cases would get an awful lot better even just with the approach
proposed here and (2) I doubt we would make the
serialization/deserialization interfaces mandatory, so even if we had
that we'd probably want a fallback strategy anyway.

Furthermore, I don't really see that we're backing ourselves into a
corner here.  If prohibiting creation of additional groups isn't
sufficient to control memory usage, but we have
serialization/deserialization functions, we can just pick an arbitrary
subset of the groups that we're storing in memory and spool their
transition states off to disk, thus reducing memory even further.  I
understand Tomas' point to be that this is quite different from what
we do for hash joins, but I think it's a different problem.  In the
case of a hash join, there are two streams of input tuples, and we've
got to batch them in compatible ways.  If we were to, say, exclude an
arbitrary subset of tuples from the hash table instead of basing it on
the hash code, we'd have to test *every* outer tuple against the hash
table for *every* batch.  That would incur a huge amount of additional
cost vs. being able to discard outer tuples once the batch to which
they pertain has been processed.

But the situation here isn't comparable, because there's only one
input stream.  I'm pretty sure we'll want to keep track of which
transition states we've spilled due to lack of memory as opposed to
those which were never present in the table at all, so that we can
segregate the unprocessed tuples that pertain to spilled transition
states from the ones that pertain to a group we haven't begun yet.
And it might be that if we know (or learn as we go along) that we're
going to vastly blow out work_mem it makes sense to use batching to
segregate the tuples that we decide not to process onto N tapes binned
by hash code, so that we have a better chance that future batches will
be the right size to fit in memory.  But I'm not convinced that
there's a compelling reason why the *first* batch has to be chosen by
hash code; we're actually best off picking any arbitrary set of groups
that does the best job reducing the amount of data remaining to be
processed, at least if the transition states are fixed size and maybe
even if they aren't.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 10.8.2014 23:26, Jeff Davis wrote:
> This patch is requires the Memory Accounting patch, or something similar
> to track memory usage.
>
> The attached patch enables hashagg to spill to disk, which means that
> hashagg will contain itself to work_mem even if the planner makes a
> bad misestimate of the cardinality.
>
> This is a well-known concept; there's even a Berkeley homework
> assignment floating around to implement it -- in postgres 7.2, no
> less. I didn't take the exact same approach as the homework assignment
> suggests, but it's not much different, either. My apologies if some
> classes are still using this as a homework assignment, but postgres
> needs to eventually have an answer to this problem.
>
> Included is a GUC, "enable_hashagg_disk" (default on), which allows
> the planner to choose hashagg even if it doesn't expect the hashtable
> to fit in memory. If it's off, and the planner misestimates the
> cardinality, hashagg will still use the disk to contain itself to
> work_mem.
>
> One situation that might surprise the user is if work_mem is set too
> low, and the user is *relying* on a misestimate to pick hashagg. With
> this patch, it would end up going to disk, which might be
> significantly slower. The solution for the user is to increase
> work_mem.
>
> Rough Design:
>
> Change the hash aggregate algorithm to accept a generic "work item",
> which consists of an input file as well as some other bookkeeping
> information.
>
> Initially prime the algorithm by adding a single work item where the
> file is NULL, indicating that it should read from the outer plan.
>
> If the memory is exhausted during execution of a work item, then
> continue to allow existing groups to be aggregated, but do not allow new
> groups to be created in the hash table. Tuples representing new groups
> are saved in an output partition file referenced in the work item that
> is currently being executed.
>
> When the work item is done, emit any groups in the hash table, clear the
> hash table, and turn each output partition file into a new work item.
>
> Each time through at least some groups are able to stay in the hash
> table, so eventually none will need to be saved in output partitions, no
> new work items will be created, and the algorithm will terminate. This
> is true even if the number of output partitions is always one.
>
> Open items:
>    * costing
>    * EXPLAIN details for disk usage
>    * choose number of partitions intelligently
>    * performance testing
>
> Initial tests indicate that it can be competitive with sort+groupagg
> when the disk is involved, but more testing is required.
>
> Feedback welcome.

I've been working on this for a few hours - getting familiar with the
code, testing queries etc. Two comments.

1) Apparently there's something broken, because with this:

   create table table_b (fk_id int, val_a int, val_b int);
   insert into table_b
      select i, mod(i,1000), mod(i,1000)
        from generate_series(1,10000000) s(i);
   analyze table_b;

   I get this:

   set work_mem = '8MB';
   explain analyze select fk_id, count(*)
           from table_b where val_a < 50 and val_b < 50 group by 1;
   > The connection to the server was lost. Attempting reset: Failed.

   Stacktrace attached, but apparently there's a segfault in
   advance_transition_function when accessing pergroupstate.

   This happened for all queries that I tried, once they needed to do
   the batching.

2) Using the same hash value both for dynahash and batching seems
   really fishy to me. I'm not familiar with dynahash, but I'd bet
   the way it's done now will lead to bad distribution in the hash
   table (some buckets will be always empty in some batches, etc.).

   This is why hashjoin tries so hard to use non-overlapping parts
   of the hash for batchno/bucketno.

   The hashjoin implements it's onw hash table, which makes it clear
   how the bucket is derived from the hash value. I'm not sure how
   dynahash does that, but I'm pretty sure we can'd just reuse the hash
   value like this.

   I see two options - compute our own hash value, or somehow derive
   a new one (e.g. by doing "hashvalue XOR random_seed"). I'm not sure
   the latter would work, though.

regards
Tomas

Attachment

Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 15.8.2014 19:53, Robert Haas wrote:
> On Thu, Aug 14, 2014 at 2:21 PM, Jeff Davis <pgsql@j-davis.com> wrote:
>> On Thu, 2014-08-14 at 12:53 -0400, Tom Lane wrote:
>>> Oh?  So if we have aggregates like array_agg whose memory footprint
>>> increases over time, the patch completely fails to avoid bloat?
>>
>> Yes, in its current form.
>>
>>> I might think a patch with such a limitation was useful, if it weren't
>>> for the fact that aggregates of that nature are a large part of the
>>> cases where the planner misestimates the table size in the first place.
>>> Any complication that we add to nodeAgg should be directed towards
>>> dealing with cases that the planner is likely to get wrong.
>>
>> In my experience, the planner has a lot of difficulty estimating the
>> cardinality unless it's coming from a base table without any operators
>> above it (other than maybe a simple predicate). This is probably a lot
>> more common than array_agg problems, simply because array_agg is
>> relatively rare compared with GROUP BY in general.
> 
> I think that's right, and I rather like your (Jeff's) approach.  It's
> definitely true that we could do better if we have a mechanism for
> serializing and deserializing group states, but (1) I think an awful
> lot of cases would get an awful lot better even just with the approach
> proposed here and (2) I doubt we would make the
> serialization/deserialization interfaces mandatory, so even if we had
> that we'd probably want a fallback strategy anyway.

I certainly agree that we need Jeff's approach even if we can do better
in some cases (when we are able to serialize/deserialize the states).

And yes, (mis)estimating the cardinalities is a big issue, and certainly
a source of many problems.


> Furthermore, I don't really see that we're backing ourselves into a
> corner here.  If prohibiting creation of additional groups isn't
> sufficient to control memory usage, but we have
> serialization/deserialization functions, we can just pick an arbitrary
> subset of the groups that we're storing in memory and spool their
> transition states off to disk, thus reducing memory even further.  I
> understand Tomas' point to be that this is quite different from what
> we do for hash joins, but I think it's a different problem.  In the
> case of a hash join, there are two streams of input tuples, and we've
> got to batch them in compatible ways.  If we were to, say, exclude an
> arbitrary subset of tuples from the hash table instead of basing it on
> the hash code, we'd have to test *every* outer tuple against the hash
> table for *every* batch.  That would incur a huge amount of additional
> cost vs. being able to discard outer tuples once the batch to which
> they pertain has been processed.

Being able to batch inner and outer relations in a matching way is
certainly one of the reasons why hashjoin uses that particular scheme.
There are other reasons, though - for example being able to answer 'Does
this group belong to this batch?' quickly, and automatically update
number of batches.

I'm not saying the lookup is extremely costly, but I'd be very surprised
if it was as cheap as modulo on a 32-bit integer. Not saying it's the
dominant cost here, but memory bandwidth is quickly becoming one of the
main bottlenecks.


> But the situation here isn't comparable, because there's only one
> input stream.  I'm pretty sure we'll want to keep track of which
> transition states we've spilled due to lack of memory as opposed to
> those which were never present in the table at all, so that we can
> segregate the unprocessed tuples that pertain to spilled transition
> states from the ones that pertain to a group we haven't begun yet.

Why would that be necessary or useful? I don't see a reason for tracking
that / segregating the tuples.

> And it might be that if we know (or learn as we go along) that we're
> going to vastly blow out work_mem it makes sense to use batching to
> segregate the tuples that we decide not to process onto N tapes binned
> by hash code, so that we have a better chance that future batches will
> be the right size to fit in memory.  But I'm not convinced that
> there's a compelling reason why the *first* batch has to be chosen by
> hash code; we're actually best off picking any arbitrary set of groups
> that does the best job reducing the amount of data remaining to be
> processed, at least if the transition states are fixed size and maybe
> even if they aren't.

If you don't choose the fist batch by hash code, it's over, IMHO. You
can't just redo that later easily, because the HashWork items are
already treated separately.

regards
Tomas



Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Fri, 2014-08-15 at 13:53 -0400, Robert Haas wrote:
> I think that's right, and I rather like your (Jeff's) approach.  It's
> definitely true that we could do better if we have a mechanism for
> serializing and deserializing group states, but (1) I think an awful
> lot of cases would get an awful lot better even just with the approach
> proposed here and (2) I doubt we would make the
> serialization/deserialization interfaces mandatory, so even if we had
> that we'd probably want a fallback strategy anyway.

Thank you for taking a look.

To solve the problem for array_agg, that would open up two potentially
lengthy discussions:

1. Trying to support non-serialized representations (like
ArrayBuildState for array_agg) as a real type rather than using
"internal".

2. What changes should we make to the aggregate API? As long as we're
changing/extending it, should we go the whole way and support partial
aggregation[1] (particularly useful for parallelism)?

Both of those discussions are worth having, and perhaps they can happen
in parallel as I wrap up this patch.

I'll see whether I can get consensus that my approach is (potentially)
commit-worthy, and your statement that it (potentially) solves a real
problem is a big help.

Regards,Jeff Davis

[1]
http://blogs.msdn.com/b/craigfr/archive/2008/01/18/partial-aggregation.aspx





Re: 9.5: Memory-bounded HashAgg

From
"Tomas Vondra"
Date:
On 19 Srpen 2014, 9:52, Jeff Davis wrote:
> On Fri, 2014-08-15 at 13:53 -0400, Robert Haas wrote:
>> I think that's right, and I rather like your (Jeff's) approach.  It's
>> definitely true that we could do better if we have a mechanism for
>> serializing and deserializing group states, but (1) I think an awful
>> lot of cases would get an awful lot better even just with the approach
>> proposed here and (2) I doubt we would make the
>> serialization/deserialization interfaces mandatory, so even if we had
>> that we'd probably want a fallback strategy anyway.
>
> Thank you for taking a look.
>
> To solve the problem for array_agg, that would open up two potentially
> lengthy discussions:
>
> 1. Trying to support non-serialized representations (like
> ArrayBuildState for array_agg) as a real type rather than using
> "internal".

That's certainly an option, and it's quite straightforward. The downside
of it is that you either prevent the aggregates from using the most
efficient state form (e.g. the array_agg might use a simple array as a
state) or you cause a proliferation of types with no other purpose.


> 2. What changes should we make to the aggregate API? As long as we're
> changing/extending it, should we go the whole way and support partial
> aggregation[1] (particularly useful for parallelism)?

Maybe, but not in this patch please. That's far wider scope, and while
considering it when designing API changes is probably a good idea, we
should resist the attempt to do those two things in the same patch.

> Both of those discussions are worth having, and perhaps they can happen
> in parallel as I wrap up this patch.

Exactly.

> I'll see whether I can get consensus that my approach is (potentially)
> commit-worthy, and your statement that it (potentially) solves a real
> problem is a big help.

IMHO it's a step in the right direction. It may not go as far as I'd like,
but that's OK.

regards
Tomas




Re: 9.5: Memory-bounded HashAgg

From
Robert Haas
Date:
On Sun, Aug 17, 2014 at 1:17 PM, Tomas Vondra <tv@fuzzy.cz> wrote:
> Being able to batch inner and outer relations in a matching way is
> certainly one of the reasons why hashjoin uses that particular scheme.
> There are other reasons, though - for example being able to answer 'Does
> this group belong to this batch?' quickly, and automatically update
> number of batches.
>
> I'm not saying the lookup is extremely costly, but I'd be very surprised
> if it was as cheap as modulo on a 32-bit integer. Not saying it's the
> dominant cost here, but memory bandwidth is quickly becoming one of the
> main bottlenecks.

Well, I think you're certainly right that a hash table lookup is more
expensive than modulo on a 32-bit integer; so much is obvious.  But if
the load factor is not too large, I think that it's not a *lot* more
expensive, so it could be worth it if it gives us other advantages.
As I see it, the advantage of Jeff's approach is that it doesn't
really matter whether our estimates are accurate or not.  We don't
have to decide at the beginning how many batches to do, and then
possibly end up using too much or too little memory per batch if we're
wrong; we can let the amount of memory actually used during execution
determine the number of batches.  That seems good.  Of course, a hash
join can increase the number of batches on the fly, but only by
doubling it, so you might go from 4 batches to 8 when 5 would really
have been enough.  And a hash join also can't *reduce* the number of
batches on the fly, which might matter a lot.  Getting the number of
batches right avoids I/O, which is a lot more expensive than CPU.

>> But the situation here isn't comparable, because there's only one
>> input stream.  I'm pretty sure we'll want to keep track of which
>> transition states we've spilled due to lack of memory as opposed to
>> those which were never present in the table at all, so that we can
>> segregate the unprocessed tuples that pertain to spilled transition
>> states from the ones that pertain to a group we haven't begun yet.
>
> Why would that be necessary or useful? I don't see a reason for tracking
> that / segregating the tuples.

Suppose there are going to be three groups: A, B, C.  Each is an
array_agg(), and they're big, so only of them will fit in work_mem at
a time.  However, we don't know that at the beginning, either because
we don't write the code to try or because we do write that code but
our cardinality estimates are way off; instead, we're under the
impression that all four will fit in work_mem.  So we start reading
tuples.  We see values for A and B, but we don't see any values for C
because those all occur later in the input.  Eventually, we run short
of memory and cut off creation of new groups.  Any tuples for C are
now going to get written to a tape from which we'll later reread them.
After a while, even that proves insufficient and we spill the
transition state for B to disk.  Any further tuples that show up for C
will need to be written to tape as well.  We continue processing and
finish group A.

Now it's time to do batch #2.  Presumably, we begin by reloading the
serialized transition state for group B.  To finish group B, we must
look at all the tuples that might possibly fall in that group.  If all
of the remaining tuples are on a single tape, we'll have to read all
the tuples in group B *and* all the tuples in group C; we'll
presumably rewrite the tuples that are not part of this batch onto a
new tape, which we'll then process in batch #3.  But if we took
advantage of the first pass through the input to put the tuples for
group B on one tape and the tuples for group C on another tape, we can
be much more efficient - just read the remaining tuples for group B,
not mixed with anything else, and then read a separate tape for group
C.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Wed, 2014-08-20 at 14:32 -0400, Robert Haas wrote:
> Well, I think you're certainly right that a hash table lookup is more
> expensive than modulo on a 32-bit integer; so much is obvious.  But if
> the load factor is not too large, I think that it's not a *lot* more
> expensive, so it could be worth it if it gives us other advantages.
> As I see it, the advantage of Jeff's approach is that it doesn't
> really matter whether our estimates are accurate or not.  We don't
> have to decide at the beginning how many batches to do, and then
> possibly end up using too much or too little memory per batch if we're
> wrong; we can let the amount of memory actually used during execution
> determine the number of batches.  That seems good.  Of course, a hash
> join can increase the number of batches on the fly, but only by
> doubling it, so you might go from 4 batches to 8 when 5 would really
> have been enough.  And a hash join also can't *reduce* the number of
> batches on the fly, which might matter a lot.  Getting the number of
> batches right avoids I/O, which is a lot more expensive than CPU.

My approach uses partition counts that are powers-of-two also, so I
don't think that's a big differentiator. In principle my algorithm could
adapt to other partition counts, but I'm not sure how big of an
advantage there is.

I think the big benefit of my approach is that it doesn't needlessly
evict groups from the hashtable. Consider input like 0, 1, 0, 2, ..., 0,
N. For large N, if you evict group 0, you have to write out about N
tuples; but if you leave it in, you only have to write out about N/2
tuples. The hashjoin approach doesn't give you any control over
eviction, so you only have about 1/P chance of saving the skew group
(where P is the ultimate number of partitions). With my approach, we'd
always keep the skew group in memory (unless we're very unlucky, and the
hash table fills up before we even see the skew value).

Regards,Jeff Davis





Re: 9.5: Memory-bounded HashAgg

From
Heikki Linnakangas
Date:
Summary of this thread so far:

There was a lot of discussion comparing this with Tomas Vondra's Hash 
Join patch. The conclusion was that while it would be nice to be able to 
dump transition state to disk, for aggregates like array_agg, the patch 
is fine as it is. Dumping transition states would require much more 
work, and this is already useful without it. Moreover, solving the 
array_agg problem later won't require a rewrite; rather, it'll build on 
top of this.

You listed a number of open items in the original post, and these are 
still outstanding:

> * costing
> * EXPLAIN details for disk usage
> * choose number of partitions intelligently
> * performance testing

I think this is enough for this commitfest - we have consensus on the 
design. For the next one, please address those open items, and resubmit.

- Heikki




Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Tue, 2014-08-26 at 12:39 +0300, Heikki Linnakangas wrote:
> I think this is enough for this commitfest - we have consensus on the 
> design. For the next one, please address those open items, and resubmit.

Agreed, return with feedback.

I need to get the accounting patch in first, which needs to address some
performance issues, but there's a chance of wrapping those up quickly.

Regards,Jeff Davis





Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 26.8.2014 21:38, Jeff Davis wrote:
> On Tue, 2014-08-26 at 12:39 +0300, Heikki Linnakangas wrote:
>> I think this is enough for this commitfest - we have consensus on
>> the design. For the next one, please address those open items, and
>> resubmit.
> 
> Agreed, return with feedback.
> 
> I need to get the accounting patch in first, which needs to address 
> some performance issues, but there's a chance of wrapping those up 
> quickly.

Sounds good to me.

I'd like to coordinate our efforts on this a bit, if you're interested.

I've been working on the hashjoin-like batching approach PoC (because I
proposed it, so it's fair I do the work), and I came to the conclusion
that it's pretty much impossible to implement on top of dynahash. I
ended up replacing it with a hashtable (similar to the one in the
hashjoin patch, unsurprisingly), which supports the batching approach
well, and is more memory efficient and actually faster (I see ~25%
speedup in most cases, although YMMV).

I plan to address this in 4 patches:

(1) replacement of dynahash by the custom hash table (done)

(2) memory accounting (not sure what's your plan, I've used the   approach I proposed on 23/8 for now, with a few
bugfixes/cleanups)

(3) applying your HashWork patch on top of this (I have this mostly   completed, but need to do more testing over the
weekend)

(4) extending this with the batching I proposed, initially only for   aggregates with states that we can
serialize/deserializeeasily   (e.g. types passed by value) - I'd like to hack on this next week
 

So at this point I have (1) and (2) pretty much ready, (3) is almost
complete and I plan to start hacking on (4). Also, this does not address
the open items listed in your message.


But I agree this is more complex than the patch you proposed. So if you
choose to pursue your patch, I have no problem with that - I'll then
rebase my changes on top of your patch and submit them separately.


regards
Tomas



Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 29.8.2014 00:02, Tomas Vondra wrote:
> On 26.8.2014 21:38, Jeff Davis wrote:
>> On Tue, 2014-08-26 at 12:39 +0300, Heikki Linnakangas wrote:
>>> I think this is enough for this commitfest - we have consensus on
>>> the design. For the next one, please address those open items, and
>>> resubmit.
>>
>> Agreed, return with feedback.
>>
>> I need to get the accounting patch in first, which needs to address
>> some performance issues, but there's a chance of wrapping those up
>> quickly.
>
> Sounds good to me.
>
> I'd like to coordinate our efforts on this a bit, if you're interested.
>
> I've been working on the hashjoin-like batching approach PoC (because I
> proposed it, so it's fair I do the work), and I came to the conclusion
> that it's pretty much impossible to implement on top of dynahash. I
> ended up replacing it with a hashtable (similar to the one in the
> hashjoin patch, unsurprisingly), which supports the batching approach
> well, and is more memory efficient and actually faster (I see ~25%
> speedup in most cases, although YMMV).
>
> I plan to address this in 4 patches:
>
> (1) replacement of dynahash by the custom hash table (done)
>
> (2) memory accounting (not sure what's your plan, I've used the
>     approach I proposed on 23/8 for now, with a few bugfixes/cleanups)
>
> (3) applying your HashWork patch on top of this (I have this mostly
>     completed, but need to do more testing over the weekend)
>
> (4) extending this with the batching I proposed, initially only for
>     aggregates with states that we can serialize/deserialize easily
>     (e.g. types passed by value) - I'd like to hack on this next week
>
> So at this point I have (1) and (2) pretty much ready, (3) is almost
> complete and I plan to start hacking on (4). Also, this does not address
> the open items listed in your message.

Hi,

Attached are patches implementing this. In the end, I decided to keep
the two approaches separate for now, i.e. either the HashWork-based
batching, or hashjoin-like batching. It's easier to play with when it's
separate, and I think we need to figure out how the two approaches fit
together first (if they fit at all).

Shared patches:

(1) hashagg-dense-allocation-v1.patch

    - replacement for dynahash, with dense allocation (essentially the
      same idea as in the hashjoin patch)

    - this is necessary by the hashjoin-like batching, because dynahash
      does not free memory

    - it also makes the hashagg less memory expensive and faster (see
      the test results further down)

    - IMHO this part is in pretty good shape, i.e. I don't expect bugs
      or issues in this (although I do expect pushback to replacing
      dynahash, which is a code widely used in the whole codebase).

(2) memory-accounting-v1.patch

    - based on the ideas discussed in the 'memory accounting thread',
      with some improvements

    - this really needs a lot of work, the current code works but there
      are various subtle issues - essentially this should be replaced
      with whatever comes from the memory accounting thread


These two patches need to be applied first, before using either (3a-b)
or (4), implementing the two batching approaches:

(3a) hashagg-batching-jeff-v1.patch

     - essentially a 1:1 of Jeff's patch, applied on top of the dense-
       allocated hash table, mentioned in (1)

     - I also ran into a few bugs causing segfaults IIRC (I'll report
       them in a separate message, if I remember them)

(3b) hashagg-batching-jeff-pt2-v1.patch

     - this adds two things - basic estimation of how many partitions
       to use, and basic info to explain

     - the idea behind estimating number of partitions is quite simple:

       We don't really need to decide until the first tuple needs to be
       stored - when that happens, see how many more tuples we expect,
       and use this ratio as the number of partitions (or rather the
       nearest power of 2). In most cases this number of partitions
       is higher, because it assumes once we get the same number of
       tuples, we'll get the same number of new groups. But that's most
       likely untrue, as some of the groups are already present in the
       hash table.

       This may be further improved - first, at this stage we only know
       the expected number of input tuples. Second, with various
       aggregates the existing states may grow as more tuples are added
       to the state.

       So at the end we can look at how many tuples we actually got,
       and how much memory we actually consumed, and use that to decide
       on the size for the second-level HashWork items. For example, if
       we expected N tuples, but actually got 2*N, and at the end of
       the initial batch we ended up with 2*work_mem, we may choose
       to do 4 partitions in the second step - that way we're more
       likely not to exceed work_mem, and we can do that right away.

       I believe this might effectively limit the necessary HashWork
       levels to 2:

           * initial scan
           * 1st level : # of partitions determined on the first tuple
           * 2nd level : # of partitions determined at the end of the
                         initial scan

       Does that make sense?

     - regarding the info added to explain, I came to conclusion that
       these values are interesting:

       * number of batches - how many HashWork items were created

       * number of rebatches - number of times a HashWork is split into
                               partitions

       * rescan ratio - number of tuples that had to be stored into a
                        batch, and then read again

                      - this may be higher > 100% if there are multiple
                        levels of HashWork items, so a single tuple may
                        be read/stored repeatedly because of using too
                        low number of partitions

       * min/max partitions size (probably not as useful as I thought)

And the hashjoin-like batching (which is in considerably less mature
state compared to the previous patch):

(4) hashagg-batching-hashjoin-v1.patch

    - there's not much to say about the principle, it's pretty much the
      same as in hashjoin, and uses a single level of batches (as
      opposed to the tree-ish structure of HashWork items)

    - I added similar info to explain (especially the rescan ratio)

    - currently this only supports aggregates with states passed by
      value (e.g. COUNT(*))

    - extension to known types seems straightforward, supporting
      'internal' will require more work


So either you apply (1), (2), (3a) and (3b), or (1), (2) and (4).

All the patches currently pass 'make installcheck', expect for a few
failures that are caused by different order of rows in the result (which
is really an issue in the test itself, not using an ORDER BY clause and
expecting sorted output).


Regarding memory contexts
-------------------------

Both patches measure only memory used for the hash table, not the whole
aggcontext, which is really the right thing to measure. For aggregates
using passed-by-value states this does not make any differece, but
passed-by-ref states are allocated in aggcontext.

For example array_agg creates sub-contexts of aggcontext for each group.

So I think the hierarchy of context will require some rethinking,
because we want/need to throw away the states between partitions. As
this is currently located in aggcontext, it's difficult (we'd have to
redo the whole initialization).


Work_mem sizes
--------------

Another problem with the current memory accounting is that it tracks
blocks, not individual palloc/pfree calls. However AllocSet keeps some
of the blocks allocated for future use, which confuses the accounting.
This only happens with small work_mem values, values like 8MB or more
seem to work fine. I'm not sure what the accounting will look like, but
I expect it to solve this issue.


Testing and benchmarking
------------------------

I also did some basic testing, with three datasets - the testing scripts
and results are attached in the hashagg-testing.tgz. See the
hashagg-bench.sql for details - it creates three tables: small (1M),
medium (10M) and large (50M) with columns with different cardinalities.

The a series of GROUP BY queries is executed - query "a" has 1:1 groups
(i.e. 1 group per table rows), "b" 1:10 (10 rows per group), "c" 1:100
and "d" only 100 groups in total. These queries are executed with
different work_mem values (64MB to 1GB), and the durations are measured.
See the hashagg-bench.sql script (in the .tgz) for details.

Attached are two CSV files contain both raw results (4 runs per query),
and aggregated results (average of the runs), logs with complete logs
and explain (analyze) plans of the queries for inspection.

Attached are two charts for the large dataset (50M), because it nicely
illustrates the differences - for work_mem=1024MB and work_mem=128MB.

In general, it shows that for this set of queries:

   * Dense allocation gives ~20% speedup (and this is true for the
     other datasets). The only case when this is not true is query "a"
     but that's the query not using HashAggregate (so the dense
     allocation has nothing to do with this, AFAIK).

   * The difference between the two approaches is rather small.
     Sometimes the Jeff's approach is faster, sometimes hashjoin-like
     batching is faster.

   * There may be cases when we actually slow-down queries, because we
     trigger batching (irrespectedly of the approach). This is a
     feature, not a bug. Either we want to respect work_mem or not.

It's important to say however that this test is extremely simplistic and
very simple for the planner to get the number of groups reasonably
right, as the queries are grouping by a single column with a quite well
known cardinality. In practice, that's hardly the case. And incorrect
estimates are probably the place where the differences between the
approaches will be most significant.

Also, the 'large' dataset is not really as large as it should be. 50M
rows is not that much I guess.

I think we should create a wider set of tests, which should give us some
insight into proper costing etc.


Tomas

Attachment

Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 4.9.2014 00:42, Tomas Vondra wrote:
>
> Attached are two CSV files contain both raw results (4 runs per query),
> and aggregated results (average of the runs), logs with complete logs
> and explain (analyze) plans of the queries for inspection.

Of course, I forgot to attach the CSV files ... here they are.

Tomas

Attachment

Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 20.8.2014 20:32, Robert Haas wrote:
> On Sun, Aug 17, 2014 at 1:17 PM, Tomas Vondra <tv@fuzzy.cz> wrote:
>> Being able to batch inner and outer relations in a matching way is
>> certainly one of the reasons why hashjoin uses that particular scheme.
>> There are other reasons, though - for example being able to answer 'Does
>> this group belong to this batch?' quickly, and automatically update
>> number of batches.
>>
>> I'm not saying the lookup is extremely costly, but I'd be very surprised
>> if it was as cheap as modulo on a 32-bit integer. Not saying it's the
>> dominant cost here, but memory bandwidth is quickly becoming one of the
>> main bottlenecks.
> 
> Well, I think you're certainly right that a hash table lookup is more
> expensive than modulo on a 32-bit integer; so much is obvious.  But if
> the load factor is not too large, I think that it's not a *lot* more
> expensive, so it could be worth it if it gives us other advantages.

Yes, that may be true. I'm not opposed to Jeff's approach in general -
it's certainly a nice solution for cases with fixed size of the
aggregate states.

But I still don't see how it could handle the aggregates with growing
aggregate state (which is the case that troubles me, because that's what
we see in our workloads).

> As I see it, the advantage of Jeff's approach is that it doesn't
> really matter whether our estimates are accurate or not.  We don't
> have to decide at the beginning how many batches to do, and then
> possibly end up using too much or too little memory per batch if we're
> wrong; we can let the amount of memory actually used during execution
> determine the number of batches.  That seems good.  Of course, a hash

Yes. I think that maybe we could use Jeff's approach even for 'growing
aggregate state' case, assuming we can serialize the aggregate states
and release the memory properly.

First, the problem with the current hash table used in HashAggregate
(i.e. dynahash) is that it never actually frees memory - when you do
HASH_REMOVE it only moves it to a list of entries for future use.

Imagine a workload where you initially see only 1 tuple for each group
before work_mem gets full. At that point you stop adding new groups, but
the current ones will grow. Even if you know how to serialize the
aggregate states (which we don't), you're in trouble because the initial
state is small (only 1 tuple was passed to the group) and most of the
memory is stuck in dynahash.

> join can increase the number of batches on the fly, but only by
> doubling it, so you might go from 4 batches to 8 when 5 would really
> have been enough.  And a hash join also can't *reduce* the number of
> batches on the fly, which might matter a lot.  Getting the number of
> batches right avoids I/O, which is a lot more expensive than CPU.

Regarding the estimates, I don't see much difference between the two
approaches when handling this issue.

It's true you can wait with deciding how many partitions (aka batches)
to create until work_mem is full, at which point you have more
information than at the very beginning. You know how many tuples you've
already seen, how many tuples you expect (which is however only an
estimate etc.). And you may use that to estimate the number of
partitions to create.

That however comes at a cost - it's not really a memory-bounded hash
aggregate, because you explicitly allow exceeding work_mem as more
tuples for existing groups arrive.

Also, no one really says the initial estimate of how many tuples will be
aggregated is correct. It's about as unreliable as the group count
estimate. So how exactly are you going to estimate the partitions?

Considering this, I doubt being able to choose arbitrary number of
partitions (instead of only powers of 2) is really an advantage.

Reducing the number of partitions might matter, but in my experience
most estimation errors are underestimations. Because we assume
independence where in practice columns are dependent, etc.

I agree that getting the batches right is important, but OTOH when using
hash join using more smaller batches is often significantly faster than
using one large one. So it depends.

Whe I think we should prevent is under-estimating the number of batches,
because in that case you have to read the whole batch, write part of it
again and then read it again. Instead of just writing it once (into two
files). Reading a tuple from a batch only to write it to another batch
is not really efficient.


>>> But the situation here isn't comparable, because there's only one
>>> input stream.  I'm pretty sure we'll want to keep track of which
>>> transition states we've spilled due to lack of memory as opposed to
>>> those which were never present in the table at all, so that we can
>>> segregate the unprocessed tuples that pertain to spilled transition
>>> states from the ones that pertain to a group we haven't begun yet.
>>
>> Why would that be necessary or useful? I don't see a reason for tracking
>> that / segregating the tuples.
> 
> Suppose there are going to be three groups: A, B, C.  Each is an
> array_agg(), and they're big, so only of them will fit in work_mem at
> a time.  However, we don't know that at the beginning, either because
> we don't write the code to try or because we do write that code but
> our cardinality estimates are way off; instead, we're under the
> impression that all four will fit in work_mem.  So we start reading
> tuples.  We see values for A and B, but we don't see any values for C
> because those all occur later in the input.  Eventually, we run short
> of memory and cut off creation of new groups.  Any tuples for C are
> now going to get written to a tape from which we'll later reread them.
> After a while, even that proves insufficient and we spill the
> transition state for B to disk.  Any further tuples that show up for C
> will need to be written to tape as well.  We continue processing and
> finish group A.
> 
> Now it's time to do batch #2.  Presumably, we begin by reloading the
> serialized transition state for group B.  To finish group B, we must
> look at all the tuples that might possibly fall in that group.  If all
> of the remaining tuples are on a single tape, we'll have to read all
> the tuples in group B *and* all the tuples in group C; we'll
> presumably rewrite the tuples that are not part of this batch onto a
> new tape, which we'll then process in batch #3.  But if we took
> advantage of the first pass through the input to put the tuples for
> group B on one tape and the tuples for group C on another tape, we can
> be much more efficient - just read the remaining tuples for group B,
> not mixed with anything else, and then read a separate tape for group
> C.

OK, I understand the idea. However I don't think it makes much sense to
segregate every little group - that's a perfect fit for batching.

What might be worth segregating are exceptionally large groups, because
that what may cause batching inefficient - for example when a group is
larger than work_mem, it will result in a batch per group (even if those
remaining groups are tiny). But we have no way to identify this group,
because we have no way to determine the size of the state.

What we might do is assume that the size is proportional to number of
tuples, and segregate only those largest groups. This can easily be done
with hashjoin-like batching - adding ntuples, isSegregated and
skewBatchId the AggHashEntry. The placeholder (only the hash entry will
be stored in the batch, but the actual state etc. will be stored
separetely). This is a bit similar to how hashjoin handles skew buckets.

It's true that Jeff's approach handles this somewhat better, but at the
cost of not really bounding the memory consumed by HashAggregate.

Tomas



Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 4.9.2014 01:34, Tomas Vondra wrote:
> On 20.8.2014 20:32, Robert Haas wrote:
>>
>> As I see it, the advantage of Jeff's approach is that it doesn't
>> really matter whether our estimates are accurate or not.  We don't
>> have to decide at the beginning how many batches to do, and then
>> possibly end up using too much or too little memory per batch if we're
>> wrong; we can let the amount of memory actually used during execution
>> determine the number of batches.  That seems good.  Of course, a hash

Also, you don't actually have to decide the number of batches at the
very beginning. You can start start with nbatch=1 and decide how many
batches to use when the work_mem is reached. I.e. at exactly the same
moment / using the same amount of info as with Jeff's approach. No?

Tomas



Re: 9.5: Memory-bounded HashAgg

From
Robert Haas
Date:
On Wed, Sep 3, 2014 at 7:34 PM, Tomas Vondra <tv@fuzzy.cz> wrote:
>> Well, I think you're certainly right that a hash table lookup is more
>> expensive than modulo on a 32-bit integer; so much is obvious.  But if
>> join can increase the number of batches on the fly, but only by
>> doubling it, so you might go from 4 batches to 8 when 5 would really
>> have been enough.  And a hash join also can't *reduce* the number of
>> batches on the fly, which might matter a lot.  Getting the number of
>> batches right avoids I/O, which is a lot more expensive than CPU.
>
> Regarding the estimates, I don't see much difference between the two
> approaches when handling this issue.
>
> It's true you can wait with deciding how many partitions (aka batches)
> to create until work_mem is full, at which point you have more
> information than at the very beginning. You know how many tuples you've
> already seen, how many tuples you expect (which is however only an
> estimate etc.). And you may use that to estimate the number of
> partitions to create.

I think it's significantly better than that.  The first point I'd make
is that if work_mem never fills up, you don't need to batch anything
at all.  That's a potentially huge win over batching a join we thought
was going to overrun work_mem, but didn't.

But even work_mem does fill up, I think we still come out ahead,
because we don't necessarily need to dump the *entirety* of each batch
to disk.  For example, suppose there are 900 distinct values and only
300 of them can fit in memory at a time.  We read the input until
work_mem is full and we see a previously-unseen value, so we decide to
split the input up into 4 batches.  We now finish reading the input.
Each previously-seen value gets added to an existing in-memory group,
and each each new value gets written into one of four disk files.  At
the end of the input, 300 groups are complete, and we have four files
on disk each of which contains the data for 150 of the remaining 600
groups.

Now, the alternative strategy is to batch from the beginning.  Here,
we decide right from the get-go that we're using 4 batches, so batch
#1 goes into memory and the remaining 3 batches get written to three
different disk files.  At the end of the input, 225 groups are
complete, and we have three files on disk each of which contains the
data for 225 of the remaining 675 groups.  This seems clearly
inferior, because we have written 675 groups to disk when it would
have been possible to write only 600.

The gains can be even more significant when the input data is skewed.
For example, suppose things are as above, but ten values accounts for
90% of all the inputs, and the remaining 890 values account for the
other 10% of the inputs.  Furthermore, let's suppose we have no table
statistics or they are totally wrong.  In Jeff's approach, as long as
each of those values occurs at least once before work_mem fills up,
they'll all be processed in the initial pass through the data, which
means we will write at most 10% of the data to disk.  In fact it will
be a little bit less, because batch 1 will have not only then 10
frequently-occurring values but also 290 others, so our initial pass
through the data will complete 300 groups covering (if the
less-frequent values are occur with uniform frequency) 93.258% of the
data.  The remaining ~6.8% will be split up into 4 files which we can
then reread and process.  But if we use the other approach, we'll only
get 2 or 3 of the 10 commonly-occurring values in the first batch, so
we expect to write about 75% of the data out to one of our three batch
files.  That's a BIG difference - more than 10x the I/O load that
Jeff's approach would have incurred.  Now, admittedly, we could use a
skew optimization similar to the one we use for hash joins to try to
get the MCVs into the first batch, and that would help a lot when the
statistics are right - but sometimes the statistics are wrong, and
Jeff's approach doesn't care.  It just keeps on working.

> That however comes at a cost - it's not really a memory-bounded hash
> aggregate, because you explicitly allow exceeding work_mem as more
> tuples for existing groups arrive.

Well, that would be true for now, but as has been mentioned, we can
add new methods to the aggregate infrastructure to serialize and
de-serialize transition states.  I guess I agree that, in the absence
of such infrastructure, your patch might be a better way to handle
cases like array_agg, but I'm pretty happy to see that infrastructure
get added.

Hmm.  It occurs to me that it could also be really good to add a
"merge transition states" operator to the aggregate infrastructure.
That would allow further improvements to Jeff's approach for cases
like array_agg.  If we serialize a transition state to disk because
it's not fitting in memory, we don't need to reload it before
continuing to process the group, or at least not right away.  We can
instead just start a new transitions state and then merge all of the
accumulated states at the end of the hash join.  That's good, because
it means we're not using up precious work_mem for transition state
data that really isn't needed until it's time to start finalizing
groups.  And it would be useful for parallelism eventually, too.  :-)

> Also, no one really says the initial estimate of how many tuples will be
> aggregated is correct. It's about as unreliable as the group count
> estimate. So how exactly are you going to estimate the partitions?
>
> Considering this, I doubt being able to choose arbitrary number of
> partitions (instead of only powers of 2) is really an advantage.

You're right.  I was using the terminology in an imprecise and
misleading way.  What I meant was more along the lines of what's in
the first four paragraphs of this email - namely, that with Jeff's
approach, it seems that you can be certain of using all the memory you
have available on the first pass through, whereas with your approach
there seems to be a risk of dumping data to disk that could have been
kept in memory and processed.  Also, it's very likely that all of the
frequently-occurring values will get handled in the initial pass.

To put this another way, and I think we all agree on this, I think we
should be very concerned with minimizing the number of times the data
gets rewritten.  If the data doesn't fit in memory, we're going to
have to rewrite at least some of it.  But the algorithm we choose
could cause us to rewrite more of it than necessary, and that's bad.

> Whe I think we should prevent is under-estimating the number of batches,
> because in that case you have to read the whole batch, write part of it
> again and then read it again. Instead of just writing it once (into two
> files). Reading a tuple from a batch only to write it to another batch
> is not really efficient.

Completely agreed.  Choosing a partition count that is higher than
necessary doesn't hurt much.  The expensive part is spilling the
tuples to disk for processing in a future batch rather than processing
them immediately.  Once we've decided we're going to do that one way
or the other, the cost of distributing the tuples we decide to write
among (say) 16 tapes vs. 4 tapes is probably relatively small.  (At
some point this breaks down; 1024 tapes will overflow the FD table.)
But picking a partition count that is too low could be extremely
expensive, in that, as you say, we'd need to rewrite the data a second
time.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Sun, 2014-08-10 at 14:26 -0700, Jeff Davis wrote:
> This patch is requires the Memory Accounting patch, or something similar
> to track memory usage.
>
> The attached patch enables hashagg to spill to disk, which means that
> hashagg will contain itself to work_mem even if the planner makes a
> bad misestimate of the cardinality.

New patch attached. All open items are complete, though the patch may
have a few rough edges.

Summary of changes:

 * rebased on top of latest memory accounting patch
http://www.postgresql.org/message-id/1417497257.5584.5.camel@jeff-desktop
 * added a flag to hash_create to prevent it from creating an extra
level of memory context
   - without this, the memory accounting would have a measurable impact
on performance
 * cost model for the disk usage
 * intelligently choose the number of partitions for each pass of the
data
 * explain support
 * in build_hash_table(), be more intelligent about the value of
nbuckets to pass to BuildTupleHashTable()
   - BuildTupleHashTable tries to choose a value to keep the table in
work_mem, but it isn't very accurate.
 * some very rudimentary testing (sanity checks, really) shows good
results

Summary of previous discussion (my summary; I may have missed some
points):

Tom Lane requested that the patch also handle the case where transition
values grow (e.g. array_agg) beyond work_mem. I feel this patch provides
a lot of benefit as it is, and trying to handle that case would be a lot
more work (we need a way to write the transition values out to disk at a
minimum, and perhaps combine them with other transition values). I also
don't think my patch would interfere with a fix there in the future.

Tomas Vondra suggested an alternative design that more closely resembles
HashJoin: instead of filling up the hash table and then spilling any new
groups, the idea would be to split the current data into two partitions,
keep one in the hash table, and spill the other (see
ExecHashIncreaseNumBatches()). This has the advantage that it's very
fast to identify whether the tuple is part of the in-memory batch or
not; and we can avoid even looking in the memory hashtable if not.

The batch-splitting approach has a major downside, however: you are
likely to evict a skew value from the in-memory batch, which will result
in all subsequent tuples with that skew value going to disk. My approach
never evicts from the in-memory table until we actually finalize the
groups, so the skew values are likely to be completely processed in the
first pass.

So, the attached patch implements my original approach, which I still
feel is the best solution.

Regards,
    Jeff Davis


Attachment

Re: 9.5: Memory-bounded HashAgg

From
Tomas Vondra
Date:
On 11.12.2014 11:46, Jeff Davis wrote:
>
> New patch attached. All open items are complete, though the patch may
> have a few rough edges.
> 
> Summary of changes:
> 
>  * rebased on top of latest memory accounting patch
> http://www.postgresql.org/message-id/1417497257.5584.5.camel@jeff-desktop
>  * added a flag to hash_create to prevent it from creating an extra
> level of memory context
>    - without this, the memory accounting would have a measurable impact
> on performance
>  * cost model for the disk usage
>  * intelligently choose the number of partitions for each pass of the
> data
>  * explain support
>  * in build_hash_table(), be more intelligent about the value of
> nbuckets to pass to BuildTupleHashTable()
>    - BuildTupleHashTable tries to choose a value to keep the table in
> work_mem, but it isn't very accurate.
>  * some very rudimentary testing (sanity checks, really) shows good
> results

I plan to look into this over the holidays, hopefully.

> Summary of previous discussion (my summary; I may have missed some
> points):
> 
> Tom Lane requested that the patch also handle the case where transition
> values grow (e.g. array_agg) beyond work_mem. I feel this patch provides
> a lot of benefit as it is, and trying to handle that case would be a lot
> more work (we need a way to write the transition values out to disk at a
> minimum, and perhaps combine them with other transition values). I also
> don't think my patch would interfere with a fix there in the future.
> 
> Tomas Vondra suggested an alternative design that more closely resembles
> HashJoin: instead of filling up the hash table and then spilling any new
> groups, the idea would be to split the current data into two partitions,
> keep one in the hash table, and spill the other (see
> ExecHashIncreaseNumBatches()). This has the advantage that it's very
> fast to identify whether the tuple is part of the in-memory batch or
> not; and we can avoid even looking in the memory hashtable if not.
> 
> The batch-splitting approach has a major downside, however: you are
> likely to evict a skew value from the in-memory batch, which will result
> in all subsequent tuples with that skew value going to disk. My approach
> never evicts from the in-memory table until we actually finalize the
> groups, so the skew values are likely to be completely processed in the
> first pass.

I don't think that's the main issue - there are probably ways to work
around that (e.g. by keeping a "skew hash table" for those frequent
values, similarly to what hash join does).

The main problem IMHO is that it requires writing the transition values
to disk, which we don't know in many cases (esp. in the interesting
ones, where the transtion values grow).

> So, the attached patch implements my original approach, which I still
> feel is the best solution.

I think this is a reasonable approach - it's true it does no handle the
case with growing aggregate state (e.g. array_agg), so it really fixes
"just" the case when we underestimate the number of groups.

But I believe we need this approach anyway, becauce we'll never know how
to write all the various transition values (e.g. think of custom
aggregates), and this is an improvement.

We can build on this and add the more elaborate hashjoin-like approach
in the future.

regards
Tomas




Re: 9.5: Memory-bounded HashAgg

From
Jeff Davis
Date:
On Thu, 2014-12-11 at 02:46 -0800, Jeff Davis wrote:
> On Sun, 2014-08-10 at 14:26 -0700, Jeff Davis wrote:
> > This patch is requires the Memory Accounting patch, or something similar
> > to track memory usage.
> > 
> > The attached patch enables hashagg to spill to disk, which means that
> > hashagg will contain itself to work_mem even if the planner makes a
> > bad misestimate of the cardinality.
> 
> New patch attached. All open items are complete, though the patch may
> have a few rough edges.
> 

This thread got moved over here:

http://www.postgresql.org/message-id/1419326161.24895.13.camel@jeff-desktop

Regards,Jeff Davis