Thread: Avoiding hash join batch explosions with extreme skew and weird stats

Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
Hello,

As discussed elsewhere[1][2], our algorithm for deciding when to give
up on repartitioning (AKA increasing the number of batches) tends to
keep going until it has a number of batches that is a function of the
number of distinct well distributed keys.  I wanted to move this minor
issue away from Tomas Vondra's thread[2] since it's a mostly
independent problem.

SET max_parallel_workers_per_gather = 0;
SET synchronize_seqscans = off;
SET work_mem = '4MB';

CREATE TABLE r AS SELECT generate_series(1, 10000000)::int i;
ANALYZE r;

-- 1k uniform keys + 1m duplicates
CREATE TABLE s1k (i int);
INSERT INTO s1k SELECT generate_series(1, 1000)::int i;
ALTER TABLE s1k SET (autovacuum_enabled = off);
ANALYZE s1k;
INSERT INTO s1k SELECT 42 FROM generate_series(1, 1000000);

EXPLAIN ANALYZE SELECT COUNT(*) FROM r JOIN s1k USING (i);

  Buckets: 1048576 (originally 1048576)
  Batches: 4096 (originally 16)
  Memory Usage: 35157kB

-- 10k uniform keys + 1m duplicates
CREATE TABLE s10k (i int);
INSERT INTO s10k SELECT generate_series(1, 10000)::int i;
ALTER TABLE s10k SET (autovacuum_enabled = off);
ANALYZE s10k;
INSERT INTO s10k SELECT 42 FROM generate_series(1, 1000000);

EXPLAIN ANALYZE SELECT COUNT(*) FROM r JOIN s10k USING (i);

  Buckets: 131072 (originally 131072)
  Batches: 32768 (originally 16)
  Memory Usage: 35157kB

See how the number of batches is determined by the number of uniform
keys in r?  That's because the explosion unfolds until there is
*nothing left* but keys that hash to the same value in the problem
batch, which means those uniform keys have to keep spreading out until
there is something on the order of two batches per key.  The point is
that it's bounded only by input data (or eventually INT_MAX / 2 and
MaxAllocSize), and as Tomas has illuminated, batches eat unmetered
memory.  Ouch.

Here's a quick hack to show that a 95% cut-off fixes those examples.
I don't really know how to choose the number, but I suspect it should
be much closer to 100 than 50.  I think this is the easiest of three
fundamental problems that need to be solved in this area.  The others
are: accounting for per-partition overheads as Tomas pointed out, and
providing an actual fallback strategy that respects work_mem when
extreme skew is detected OR per-partition overheads dominate.  I plan
to experiment with nested loop hash join (or whatever you want to call
it: the thing where you join every arbitrary fragment of the hash
table against the outer batch, and somehow deal with outer match
flags) when time permits.

[1]
https://www.postgresql.org/message-id/flat/CAG_%3D8kBoWY4AXwW%3DCj44xe13VZnYohV9Yr-_hvZdx2xpiipr9w%40mail.gmail.com
[2] https://www.postgresql.org/message-id/flat/20190504003414.bulcbnge3rhwhcsh%40development

-- 
Thomas Munro
https://enterprisedb.com

Attachment

Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Thu, May 16, 2019 at 01:22:31PM +1200, Thomas Munro wrote:
> ...
>
>Here's a quick hack to show that a 95% cut-off fixes those examples.
>I don't really know how to choose the number, but I suspect it should
>be much closer to 100 than 50.  I think this is the easiest of three
>fundamental problems that need to be solved in this area.  The others
>are: accounting for per-partition overheads as Tomas pointed out, and
>providing an actual fallback strategy that respects work_mem when
>extreme skew is detected OR per-partition overheads dominate.  I plan
>to experiment with nested loop hash join (or whatever you want to call
>it: the thing where you join every arbitrary fragment of the hash
>table against the outer batch, and somehow deal with outer match
>flags) when time permits.
>

I think this is a step in the right direction, but as I said on the other
thread(s), I think we should not disable growth forever and recheck once
in a while. Otherwise we'll end up in sad situation with non-uniform data
sets, as poined out by Hubert Zhang in [1]. It's probably even truer with
this less strict logic, using 95% as a threshold (instead of 100%).

I kinda like the idea with increasing the spaceAllowed value. Essentially,
if we decide adding batches would be pointless, increasing the memory
budget is the only thing we can do anyway.

The problem however is that we only really look at a single bit - it may
be that doubling the batches would not help, but doing it twice would
actually reduce the memory usage. For example, assume there are 2 distinct
values in the batch, with hash values (in binary)

  101010000
  101010111

and assume we currently. Clearly, splitting batches is going to do nothing
until we get to the 000 vs. 111 parts.

At first I thought this is rather unlikely and we can ignore that, but I'm
not really sure about that - it may actually be pretty likely. We may get
to 101010 bucket with sufficiently large data set, and then it's ~50%
probability the next bit is the same (assuming two distinct values). So
this may be quite an issue, I think.

regards


[1] https://www.postgresql.org/message-id/CAB0yrekv%3D6_T_eUe2kOEvWUMwufcvfd15SFmCABtYFOkxCFdfA%40mail.gmail.com

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services




Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
On Fri, May 17, 2019 at 4:39 AM Tomas Vondra
<tomas.vondra@2ndquadrant.com> wrote:
> I think this is a step in the right direction, but as I said on the other
> thread(s), I think we should not disable growth forever and recheck once
> in a while. Otherwise we'll end up in sad situation with non-uniform data
> sets, as poined out by Hubert Zhang in [1]. It's probably even truer with
> this less strict logic, using 95% as a threshold (instead of 100%).
>
> I kinda like the idea with increasing the spaceAllowed value. Essentially,
> if we decide adding batches would be pointless, increasing the memory
> budget is the only thing we can do anyway.

But that's not OK, we need to fix THAT.

> The problem however is that we only really look at a single bit - it may
> be that doubling the batches would not help, but doing it twice would
> actually reduce the memory usage. For example, assume there are 2 distinct
> values in the batch, with hash values (in binary)

Yes, that's a good point, and not a case that we should ignore.  But
if we had a decent fall-back strategy that respected work_mem, we
wouldn't care so much if we get it wrong in a corner case.  I'm
arguing that we should use Grace partitioning as our primary
partitioning strategy, but fall back to looping (or possibly
sort-merging) for the current batch if Grace doesn't seem to be
working.  You'll always be able to find cases where if you'd just
tried one more round, Grace would work, but that seems acceptable to
me, because getting it wrong doesn't melt your computer, it just
probably takes longer.  Or maybe it doesn't.  How much longer would it
take to loop twice?  Erm, twice as long, and each loop makes actual
progress, unlike extra speculative Grace partition expansions which
apply not just to the current batch but all batches, might not
actually work, and you *have* to abandon at some point.  The more I
think about it, the more I think that a loop-base escape valve, though
unpalatably quadratic, is probably OK because we're in a sink-or-swim
situation at this point, and our budget is work_mem, not work_time.

I'm concerned that we're trying to find ways to treat the symptoms,
allowing us to exceed work_mem but maybe not so much, instead of
focusing on the fundamental problem, which is that we don't yet have
an algorithm that is guaranteed to respect work_mem.

Admittedly I don't have a patch, just a bunch of handwaving.  One
reason I haven't attempted to write it is because although I know how
to do the non-parallel version using a BufFile full of match bits in
sync with the tuples for outer joins, I haven't figured out how to do
it for parallel-aware hash join, because then each loop over the outer
batch could see different tuples in each participant.  You could use
the match bit in HashJoinTuple header, but then you'd have to write
all the tuples out again, which is more IO than I want to do.  I'll
probably start another thread about that.

-- 
Thomas Munro
https://enterprisedb.com



Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Fri, May 17, 2019 at 10:21:56AM +1200, Thomas Munro wrote:
>On Fri, May 17, 2019 at 4:39 AM Tomas Vondra
><tomas.vondra@2ndquadrant.com> wrote:
>> I think this is a step in the right direction, but as I said on the other
>> thread(s), I think we should not disable growth forever and recheck once
>> in a while. Otherwise we'll end up in sad situation with non-uniform data
>> sets, as poined out by Hubert Zhang in [1]. It's probably even truer with
>> this less strict logic, using 95% as a threshold (instead of 100%).
>>
>> I kinda like the idea with increasing the spaceAllowed value. Essentially,
>> if we decide adding batches would be pointless, increasing the memory
>> budget is the only thing we can do anyway.
>
>But that's not OK, we need to fix THAT.
>

I agree increasing the budget is not ideal, althought at the moment it's
the only thing we can do. If we can improve that, great.

>> The problem however is that we only really look at a single bit - it may
>> be that doubling the batches would not help, but doing it twice would
>> actually reduce the memory usage. For example, assume there are 2 distinct
>> values in the batch, with hash values (in binary)
>
>Yes, that's a good point, and not a case that we should ignore.  But
>if we had a decent fall-back strategy that respected work_mem, we
>wouldn't care so much if we get it wrong in a corner case.  I'm
>arguing that we should use Grace partitioning as our primary
>partitioning strategy, but fall back to looping (or possibly
>sort-merging) for the current batch if Grace doesn't seem to be
>working.  You'll always be able to find cases where if you'd just
>tried one more round, Grace would work, but that seems acceptable to
>me, because getting it wrong doesn't melt your computer, it just
>probably takes longer.  Or maybe it doesn't.  How much longer would it
>take to loop twice?  Erm, twice as long, and each loop makes actual
>progress, unlike extra speculative Grace partition expansions which
>apply not just to the current batch but all batches, might not
>actually work, and you *have* to abandon at some point.  The more I
>think about it, the more I think that a loop-base escape valve, though
>unpalatably quadratic, is probably OK because we're in a sink-or-swim
>situation at this point, and our budget is work_mem, not work_time.
>

True.

>I'm concerned that we're trying to find ways to treat the symptoms,
>allowing us to exceed work_mem but maybe not so much, instead of
>focusing on the fundamental problem, which is that we don't yet have
>an algorithm that is guaranteed to respect work_mem.
>

Yes, that's a good point.

>Admittedly I don't have a patch, just a bunch of handwaving.  One
>reason I haven't attempted to write it is because although I know how
>to do the non-parallel version using a BufFile full of match bits in
>sync with the tuples for outer joins, I haven't figured out how to do
>it for parallel-aware hash join, because then each loop over the outer
>batch could see different tuples in each participant.  You could use
>the match bit in HashJoinTuple header, but then you'd have to write
>all the tuples out again, which is more IO than I want to do.  I'll
>probably start another thread about that.
>

That pesky parallelism ;-)


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services




Thomas Munro <thomas.munro@gmail.com> writes:
> On Fri, May 17, 2019 at 4:39 AM Tomas Vondra
> <tomas.vondra@2ndquadrant.com> wrote:
>> I kinda like the idea with increasing the spaceAllowed value. Essentially,
>> if we decide adding batches would be pointless, increasing the memory
>> budget is the only thing we can do anyway.

> But that's not OK, we need to fix THAT.

I don't think it's necessarily a good idea to suppose that we MUST
fit in work_mem come what may.  It's likely impossible to guarantee
that in all cases.  Even if we can, a query that runs for eons will
help nobody.

            regards, tom lane



Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Thu, May 16, 2019 at 06:58:43PM -0400, Tom Lane wrote:
>Thomas Munro <thomas.munro@gmail.com> writes:
>> On Fri, May 17, 2019 at 4:39 AM Tomas Vondra
>> <tomas.vondra@2ndquadrant.com> wrote:
>>> I kinda like the idea with increasing the spaceAllowed value. Essentially,
>>> if we decide adding batches would be pointless, increasing the memory
>>> budget is the only thing we can do anyway.
>
>> But that's not OK, we need to fix THAT.
>
>I don't think it's necessarily a good idea to suppose that we MUST
>fit in work_mem come what may.  It's likely impossible to guarantee
>that in all cases.  Even if we can, a query that runs for eons will
>help nobody.
>

I kinda agree with Thomas - arbitrarily increasing work_mem is something
we should not do unless abosolutely necessary. If the query is slow, it's
up to the user to bump the value up, if deemed appropriate.


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services




Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
On Fri, May 17, 2019 at 11:46 AM Tomas Vondra
<tomas.vondra@2ndquadrant.com> wrote:
> On Thu, May 16, 2019 at 06:58:43PM -0400, Tom Lane wrote:
> >Thomas Munro <thomas.munro@gmail.com> writes:
> >> On Fri, May 17, 2019 at 4:39 AM Tomas Vondra
> >> <tomas.vondra@2ndquadrant.com> wrote:
> >>> I kinda like the idea with increasing the spaceAllowed value. Essentially,
> >>> if we decide adding batches would be pointless, increasing the memory
> >>> budget is the only thing we can do anyway.
> >
> >> But that's not OK, we need to fix THAT.
> >
> >I don't think it's necessarily a good idea to suppose that we MUST
> >fit in work_mem come what may.  It's likely impossible to guarantee
> >that in all cases.  Even if we can, a query that runs for eons will
> >help nobody.
>
> I kinda agree with Thomas - arbitrarily increasing work_mem is something
> we should not do unless abosolutely necessary. If the query is slow, it's
> up to the user to bump the value up, if deemed appropriate.

+1

I think we can gaurantee that we can fit in work_mem with only one
exception: we have to allow work_mem to be exceeded when we otherwise
couldn't fit a single tuple.

Then the worst possible case with the looping algorithm is that we
degrade to loading just one inner tuple at a time into the hash table,
at which point we effectively have a nested loop join (except (1) it's
flipped around: for each tuple on the inner side, we scan the outer
side; and (2) we can handle full outer joins).  In any reasonable case
you'll have a decent amount of tuples at a time, so you won't have to
loop too many times so it's not really quadratic in the number of
tuples.  The realisation that it's a nested loop join in the extreme
case is probably why the MySQL people called it 'block nested loop
join' (and as far as I can tell from quick googling, it might be their
*primary* strategy for hash joins that don't fit in memory, not just a
secondary strategy after Grace fails, but I might be wrong about
that).  Unlike plain old single-tuple nested loop join, it works in
arbitrary sized blocks (the hash table).  What we would call a regular
hash join, they call a BNL that just happens to have only one loop.  I
think Grace is probably a better primary strategy, but loops are a
good fallback.

The reason I kept mentioning sort-merge in earlier threads is because
it'd be better in the worst cases.  Unfortunately it would be worse in
the best case (smallish numbers of loops) and I suspect many real
world cases.  It's hard to decide, so perhaps we should be happy that
sort-merge can't be considered currently because the join conditions
may not be merge-joinable.


--
Thomas Munro
https://enterprisedb.com



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Thu, May 16, 2019 at 3:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
Admittedly I don't have a patch, just a bunch of handwaving.  One
reason I haven't attempted to write it is because although I know how
to do the non-parallel version using a BufFile full of match bits in
sync with the tuples for outer joins, I haven't figured out how to do
it for parallel-aware hash join, because then each loop over the outer
batch could see different tuples in each participant.  You could use
the match bit in HashJoinTuple header, but then you'd have to write
all the tuples out again, which is more IO than I want to do.  I'll
probably start another thread about that.


Could you explain more about the implementation you are suggesting?

Specifically, what do you mean "BufFile full of match bits in sync with the
tuples for outer joins?"

Is the implementation you are thinking of one which falls back to NLJ on a
batch-by-batch basis decided during the build phase?
If so, why do you need to keep track of the outer tuples seen?
If you are going to loop through the whole outer side for each tuple on the
inner side, it seems like you wouldn't need to.

Could you make an outer "batch" which is the whole of the outer relation? That
is, could you do something like: when hashing the inner side, if re-partitioning
is resulting in batches that will overflow spaceAllowed, could you set a flag on
that batch use_NLJ and when making batches for the outer side, make one "batch"
that has all the tuples from the outer side which the inner side batch which was
flagged will do NLJ with.

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
On Sat, May 18, 2019 at 12:15 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> On Thu, May 16, 2019 at 3:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
>> Admittedly I don't have a patch, just a bunch of handwaving.  One
>> reason I haven't attempted to write it is because although I know how
>> to do the non-parallel version using a BufFile full of match bits in
>> sync with the tuples for outer joins, I haven't figured out how to do
>> it for parallel-aware hash join, because then each loop over the outer
>> batch could see different tuples in each participant.  You could use
>> the match bit in HashJoinTuple header, but then you'd have to write
>> all the tuples out again, which is more IO than I want to do.  I'll
>> probably start another thread about that.
>
> Could you explain more about the implementation you are suggesting?
>
> Specifically, what do you mean "BufFile full of match bits in sync with the
> tuples for outer joins?"

First let me restate the PostgreSQL terminology for this stuff so I
don't get confused while talking about it:

* The inner side of the join = the right side = the side we use to
build a hash table.  Right and full joins emit inner tuples when there
is no matching tuple on the outer side.

* The outer side of the join = the left side = the side we use to
probe the hash table.  Left and full joins emit outer tuples when
there is no matching tuple on the inner side.

* Semi and anti joins emit exactly one instance of each outer tuple if
there is/isn't at least one match on the inner side.

We have a couple of relatively easy cases:

* Inner joins: for every outer tuple, we try to find a match in the
hash table, and if we find one we emit a tuple.  To add looping
support, if we run out of memory when loading the hash table we can
just proceed to probe the fragment we've managed to load so far, and
then rewind the outer batch, clear the hash table and load in the next
work_mem-sized fragment and do it again... rinse and repeat until
we've eventually processed the whole inner batch.  After we've
finished looping, we move on to the next batch.

* For right and full joins ("HJ_FILL_INNER"), we also need to emit an
inner tuple for every tuple that was loaded into the hash table but
never matched.  That's done using a flag HEAP_TUPLE_HAS_MATCH in the
header of the tuples of the hash table, and a scan through the whole
hash table at the end of each batch to look for unmatched tuples
(ExecScanHashTableForUnmatched()).  To add looping support, that just
has to be done at the end of every inner batch fragment, that is,
after every loop.

And now for the cases that need a new kind of match bit, as far as I can see:

* For left and full joins ("HJ_FILL_OUTER"), we also need to emit an
outer tuple for every tuple that didn't find a match in the hash
table.  Normally that is done while probing, without any need for
memory or match flags: if we don't find a match, we just spit out an
outer tuple immediately.  But that simple strategy won't work if the
hash table holds only part of the inner batch.  Since we'll be
rewinding and looping over the outer batch again for the next inner
batch fragment, we can't yet say if there will be a match in a later
loop.  But the later loops don't know on their own either.  So we need
some kind of cumulative memory between loops, and we only know which
outer tuples have a match after we've finished all loops.  So there
would need to be a new function ExecScanOuterBatchForUnmatched().

* For semi joins, we need to emit exactly one outer tuple whenever
there is one or more match on the inner side.  To add looping support,
we need to make sure that we don't emit an extra copy of the outer
tuple if there is a second match in another inner batch fragment.
Again, this implies some kind of memory between loops, so we can
suppress later matches.

* For anti joins, we need to emit an outer tuple whenever there is no
match.  To add looping support, we need to wait until we've seen all
the inner batch fragments before we know that a given outer tuple has
no match, perhaps with the same new function
ExecScanOuterBatchForUnmatched().

So, we need some kind of inter-loop memory, but we obviously don't
want to create another source of unmetered RAM gobbling.  So one idea
is a BufFile that has one bit per outer tuple in the batch.  In the
first loop, we just stream out the match results as we go, and then
somehow we OR the bitmap with the match results in subsequent loops.
After the last loop, we have a list of unmatched tuples -- just scan
it in lock-step with the outer batch and look for 0 bits.

Unfortunately that bits-in-order scheme doesn't work for parallel
hash, where the SharedTuplestore tuples seen by each worker are
non-deterministic.  So perhaps in that case we could use the
HEAP_TUPLE_HAS_MATCH bit in the outer tuple header itself, and write
the whole outer batch back out each time through the loop.  That'd
keep the tuples and match bits together, but it seems like a lot of
IO...  Note that parallel hash doesn't support right/full joins today,
because of some complications about waiting and deadlocks that might
turn out to be relevant here too, and might be solvable (I should
probably write about that in another email), but left joins *are*
supported today so would need to be desupported if we wanted to add
loop-based escape valve but not deal with with these problems.  That
doesn't seem acceptable, which is why I'm a bit stuck on this point,
and unfortunately it may be a while before I have time to tackle any
of that personally.

> Is the implementation you are thinking of one which falls back to NLJ on a
> batch-by-batch basis decided during the build phase?

Yeah.

> If so, why do you need to keep track of the outer tuples seen?
> If you are going to loop through the whole outer side for each tuple on the
> inner side, it seems like you wouldn't need to.

The idea is to loop through the whole outer batch for every
work_mem-sized inner batch fragment, not every tuple.  Though in
theory it could be as small as a single tuple.

> Could you make an outer "batch" which is the whole of the outer relation? That
> is, could you do something like: when hashing the inner side, if re-partitioning
> is resulting in batches that will overflow spaceAllowed, could you set a flag on
> that batch use_NLJ and when making batches for the outer side, make one "batch"
> that has all the tuples from the outer side which the inner side batch which was
> flagged will do NLJ with.

I didn't understand this... you always need to make one outer batch
corresponding to every inner batch.  The problem is the tricky
left/full/anti/semi join cases when joining against fragments holding
less that the full inner batch: we still need some way to implement
join logic that depends on knowing whether there is a match in *any*
of the inner fragments/loops.

About the question of when exactly to set the "use_NLJ" flag:  I had
originally been thinking of this only as a way to deal with the
extreme skew problem.  But in light of Tomas's complaints about
unmetered per-batch memory overheads, I had a new thought: it should
also be triggered whenever doubling the number of batches would halve
the amount of memory left for the hash table (after including the size
of all those BufFile objects in the computation as Tomas proposes).  I
think that might be exactly the right right cut-off if you want to do
as much Grace partitioning as your work_mem can afford, and therefore
as little looping as possible to complete the join while respecting
work_mem.

-- 
Thomas Munro
https://enterprisedb.com



Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Mon, May 20, 2019 at 11:07:03AM +1200, Thomas Munro wrote:
>On Sat, May 18, 2019 at 12:15 PM Melanie Plageman
><melanieplageman@gmail.com> wrote:
>> On Thu, May 16, 2019 at 3:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
>>> Admittedly I don't have a patch, just a bunch of handwaving.  One
>>> reason I haven't attempted to write it is because although I know how
>>> to do the non-parallel version using a BufFile full of match bits in
>>> sync with the tuples for outer joins, I haven't figured out how to do
>>> it for parallel-aware hash join, because then each loop over the outer
>>> batch could see different tuples in each participant.  You could use
>>> the match bit in HashJoinTuple header, but then you'd have to write
>>> all the tuples out again, which is more IO than I want to do.  I'll
>>> probably start another thread about that.
>>
>> Could you explain more about the implementation you are suggesting?
>>
>> Specifically, what do you mean "BufFile full of match bits in sync with the
>> tuples for outer joins?"
>
>First let me restate the PostgreSQL terminology for this stuff so I
>don't get confused while talking about it:
>
>* The inner side of the join = the right side = the side we use to
>build a hash table.  Right and full joins emit inner tuples when there
>is no matching tuple on the outer side.
>
>* The outer side of the join = the left side = the side we use to
>probe the hash table.  Left and full joins emit outer tuples when
>there is no matching tuple on the inner side.
>
>* Semi and anti joins emit exactly one instance of each outer tuple if
>there is/isn't at least one match on the inner side.
>

I think you're conflating inner/outer side and left/right, or rather
assuming it's always left=inner and right=outer.

> ... snip ...
>
>> Could you make an outer "batch" which is the whole of the outer relation? That
>> is, could you do something like: when hashing the inner side, if re-partitioning
>> is resulting in batches that will overflow spaceAllowed, could you set a flag on
>> that batch use_NLJ and when making batches for the outer side, make one "batch"
>> that has all the tuples from the outer side which the inner side batch which was
>> flagged will do NLJ with.
>
>I didn't understand this... you always need to make one outer batch
>corresponding to every inner batch.  The problem is the tricky
>left/full/anti/semi join cases when joining against fragments holding
>less that the full inner batch: we still need some way to implement
>join logic that depends on knowing whether there is a match in *any*
>of the inner fragments/loops.
>
>About the question of when exactly to set the "use_NLJ" flag:  I had
>originally been thinking of this only as a way to deal with the
>extreme skew problem.  But in light of Tomas's complaints about
>unmetered per-batch memory overheads, I had a new thought: it should
>also be triggered whenever doubling the number of batches would halve
>the amount of memory left for the hash table (after including the size
>of all those BufFile objects in the computation as Tomas proposes).  I
>think that might be exactly the right right cut-off if you want to do
>as much Grace partitioning as your work_mem can afford, and therefore
>as little looping as possible to complete the join while respecting
>work_mem.
>

Not sure what NLJ flag rule you propose, exactly.

Regarding the threshold value - once the space for BufFiles (and other
overhead) gets over work_mem/2, it does not make any sense to increase
the number of batches because then the work_mem would be entirely
occupied by BufFiles.

The WIP patches don't actually do exactly that though - they just check
if the incremented size would be over work_mem/2. I think we should
instead allow up to work_mem*2/3, i.e. stop adding batches after the
BufFiles start consuming more than work_mem/3 memory.

I think that's actually what you mean by "halving the amount of memory
left for the hash table" because that's what happens after reaching the
work_mem/3.

But I think that rule is irrelevant here, really, because this thread
was discussing cases where adding batches is futile due to skew, no? In
which case we should stop adding batches after reaching some % of tuples
not moving from the batch.

Or are you suggesting we should remove that rule, and instead realy on
this rule about halving the hash table space? That might work too, I
guess.

OTOH I'm not sure it's a good idea to handle both those cases the same
way - "overflow file" idea works pretty well for cases where the hash
table actually can be split into batches, and I'm afraid NLJ will be
much less efficient for those cases.

regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services 



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
On Mon, May 20, 2019 at 12:22 PM Tomas Vondra
<tomas.vondra@2ndquadrant.com> wrote:
> On Mon, May 20, 2019 at 11:07:03AM +1200, Thomas Munro wrote:
> >First let me restate the PostgreSQL terminology for this stuff so I
> >don't get confused while talking about it:
> >
> >* The inner side of the join = the right side = the side we use to
> >build a hash table.  Right and full joins emit inner tuples when there
> >is no matching tuple on the outer side.
> >
> >* The outer side of the join = the left side = the side we use to
> >probe the hash table.  Left and full joins emit outer tuples when
> >there is no matching tuple on the inner side.
> >
> >* Semi and anti joins emit exactly one instance of each outer tuple if
> >there is/isn't at least one match on the inner side.
> >
>
> I think you're conflating inner/outer side and left/right, or rather
> assuming it's always left=inner and right=outer.

In PostgreSQL, it's always inner = right, outer = left.  You can see
that reflected in plannodes.h and elsewhere:

/* ----------------
 *      these are defined to avoid confusion problems with "left"
 *      and "right" and "inner" and "outer".  The convention is that
 *      the "left" plan is the "outer" plan and the "right" plan is
 *      the inner plan, but these make the code more readable.
 * ----------------
 */
#define innerPlan(node)                 (((Plan *)(node))->righttree)
#define outerPlan(node)                 (((Plan *)(node))->lefttree)

I'm not sure you think it's not always like that: are you referring to
the fact that the planner can choose to reverse the join (compared to
the SQL LEFT|RIGHT JOIN that appeared in the query), creating an extra
layer of confusion?  In my email I was talking only about left and
right as seen by the executor.

> >About the question of when exactly to set the "use_NLJ" flag:  I had
> >originally been thinking of this only as a way to deal with the
> >extreme skew problem.  But in light of Tomas's complaints about
> >unmetered per-batch memory overheads, I had a new thought: it should
> >also be triggered whenever doubling the number of batches would halve
> >the amount of memory left for the hash table (after including the size
> >of all those BufFile objects in the computation as Tomas proposes).  I
> >think that might be exactly the right right cut-off if you want to do
> >as much Grace partitioning as your work_mem can afford, and therefore
> >as little looping as possible to complete the join while respecting
> >work_mem.
> >
>
> Not sure what NLJ flag rule you propose, exactly.
>
> Regarding the threshold value - once the space for BufFiles (and other
> overhead) gets over work_mem/2, it does not make any sense to increase
> the number of batches because then the work_mem would be entirely
> occupied by BufFiles.
>
> The WIP patches don't actually do exactly that though - they just check
> if the incremented size would be over work_mem/2. I think we should
> instead allow up to work_mem*2/3, i.e. stop adding batches after the
> BufFiles start consuming more than work_mem/3 memory.
>
> I think that's actually what you mean by "halving the amount of memory
> left for the hash table" because that's what happens after reaching the
> work_mem/3.

Well, instead of an arbitrary number like work_mem/2 or work_mem *
2/3, I was trying to figure out the precise threshold beyond which it
doesn't make sense to expend more memory on BufFile objects, even if
the keys are uniformly distributed so that splitting batches halves
the expect tuple count per batch.  Let work_mem_for_hash_table =
work_mem - nbatch * sizeof(BufFile).  Whenever you increase nbatch,
work_mem_for_hash_table goes down, but it had better be more than half
what it was before, or we expect to run out of memory again (if the
batch didn't fit before, and we're now splitting it so that we'll try
to load only half of it, we'd better have more than half the budget
for the hash table than we had before).  Otherwise you'd be making
matters worse, and this process probably won't terminate.

> But I think that rule is irrelevant here, really, because this thread
> was discussing cases where adding batches is futile due to skew, no? In
> which case we should stop adding batches after reaching some % of tuples
> not moving from the batch.

Yeah, this thread started off just about the 95% thing, but veered off
course since these topics are tangled up.  Sorry.

> Or are you suggesting we should remove that rule, and instead realy on
> this rule about halving the hash table space? That might work too, I
> guess.

No, I suspect you need both rules.  We still want to detect extreme
skew soon as possible, even though the other rule will eventually
fire; might as well do it sooner in clear-cut cases.

> OTOH I'm not sure it's a good idea to handle both those cases the same
> way - "overflow file" idea works pretty well for cases where the hash
> table actually can be split into batches, and I'm afraid NLJ will be
> much less efficient for those cases.

Yeah, you might be right about that, and everything I'm describing is
pure vapourware anyway.  But your overflow file scheme isn't exactly
free of IO-amplification and multiple-processing of input data
either... and I haven't yet grokked how it would work for parallel
hash.  Parallel hash generally doesn't have the
'throw-the-tuples-forward' concept. which is inherently based on
sequential in-order processing of batches.

-- 
Thomas Munro
https://enterprisedb.com



Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Andres Freund
Date:
Hi,

On 2019-05-20 13:25:52 +1200, Thomas Munro wrote:
> In PostgreSQL, it's always inner = right, outer = left.  You can see
> that reflected in plannodes.h and elsewhere:
> 
> /* ----------------
>  *      these are defined to avoid confusion problems with "left"
>  *      and "right" and "inner" and "outer".  The convention is that
>  *      the "left" plan is the "outer" plan and the "right" plan is
>  *      the inner plan, but these make the code more readable.
>  * ----------------
>  */
> #define innerPlan(node)                 (((Plan *)(node))->righttree)
> #define outerPlan(node)                 (((Plan *)(node))->lefttree)

I really don't understand why we don't just rename those fields.

Greetings,

Andres Freund



Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Mon, May 20, 2019 at 01:25:52PM +1200, Thomas Munro wrote:
>On Mon, May 20, 2019 at 12:22 PM Tomas Vondra
><tomas.vondra@2ndquadrant.com> wrote:
>> On Mon, May 20, 2019 at 11:07:03AM +1200, Thomas Munro wrote:
>> >First let me restate the PostgreSQL terminology for this stuff so I
>> >don't get confused while talking about it:
>> >
>> >* The inner side of the join = the right side = the side we use to
>> >build a hash table.  Right and full joins emit inner tuples when there
>> >is no matching tuple on the outer side.
>> >
>> >* The outer side of the join = the left side = the side we use to
>> >probe the hash table.  Left and full joins emit outer tuples when
>> >there is no matching tuple on the inner side.
>> >
>> >* Semi and anti joins emit exactly one instance of each outer tuple if
>> >there is/isn't at least one match on the inner side.
>> >
>>
>> I think you're conflating inner/outer side and left/right, or rather
>> assuming it's always left=inner and right=outer.
>
>In PostgreSQL, it's always inner = right, outer = left.  You can see
>that reflected in plannodes.h and elsewhere:
>
>/* ----------------
> *      these are defined to avoid confusion problems with "left"
> *      and "right" and "inner" and "outer".  The convention is that
> *      the "left" plan is the "outer" plan and the "right" plan is
> *      the inner plan, but these make the code more readable.
> * ----------------
> */
>#define innerPlan(node)                 (((Plan *)(node))->righttree)
>#define outerPlan(node)                 (((Plan *)(node))->lefttree)
>
>I'm not sure you think it's not always like that: are you referring to
>the fact that the planner can choose to reverse the join (compared to
>the SQL LEFT|RIGHT JOIN that appeared in the query), creating an extra
>layer of confusion?  In my email I was talking only about left and
>right as seen by the executor.
>

It might be my lack of understanding, but I'm not sure how we map
LEFT/RIGHT JOIN to left/righttree and inner/outer at plan level. My
assumption was that for "a LEFT JOIN b" then "a" and "b" can end up
both as inner and outer (sub)tree.

But I haven't checked so I may easily be wrong. Maybe the comment you
quoted clarifies that, not sure.

>> >About the question of when exactly to set the "use_NLJ" flag:  I had
>> >originally been thinking of this only as a way to deal with the
>> >extreme skew problem.  But in light of Tomas's complaints about
>> >unmetered per-batch memory overheads, I had a new thought: it should
>> >also be triggered whenever doubling the number of batches would halve
>> >the amount of memory left for the hash table (after including the size
>> >of all those BufFile objects in the computation as Tomas proposes).  I
>> >think that might be exactly the right right cut-off if you want to do
>> >as much Grace partitioning as your work_mem can afford, and therefore
>> >as little looping as possible to complete the join while respecting
>> >work_mem.
>> >
>>
>> Not sure what NLJ flag rule you propose, exactly.
>>
>> Regarding the threshold value - once the space for BufFiles (and other
>> overhead) gets over work_mem/2, it does not make any sense to increase
>> the number of batches because then the work_mem would be entirely
>> occupied by BufFiles.
>>
>> The WIP patches don't actually do exactly that though - they just check
>> if the incremented size would be over work_mem/2. I think we should
>> instead allow up to work_mem*2/3, i.e. stop adding batches after the
>> BufFiles start consuming more than work_mem/3 memory.
>>
>> I think that's actually what you mean by "halving the amount of memory
>> left for the hash table" because that's what happens after reaching the
>> work_mem/3.
>
>Well, instead of an arbitrary number like work_mem/2 or work_mem *
>2/3, I was trying to figure out the precise threshold beyond which it
>doesn't make sense to expend more memory on BufFile objects, even if
>the keys are uniformly distributed so that splitting batches halves
>the expect tuple count per batch.  Let work_mem_for_hash_table =
>work_mem - nbatch * sizeof(BufFile).  Whenever you increase nbatch,
>work_mem_for_hash_table goes down, but it had better be more than half
>what it was before, or we expect to run out of memory again (if the
>batch didn't fit before, and we're now splitting it so that we'll try
>to load only half of it, we'd better have more than half the budget
>for the hash table than we had before).  Otherwise you'd be making
>matters worse, and this process probably won't terminate.
>

But the work_mem/3 does exactly that.

Let's say BufFiles need a bit less than work_mem/3. That means we have
a bit more than 2*work_mem/3 for the hash table. If you double the number
of batches, then you'll end up with a bit more than 2*work_mem/3. That is,
we've not halved the hash table size.

If BufFiles need a bit more memory than work_mem/3, then after doubling
the number of batches we'll end up with less than half the initial hash
table space.

So I think work_mem/3 is the threshold we're looking for.

>> But I think that rule is irrelevant here, really, because this thread
>> was discussing cases where adding batches is futile due to skew, no? In
>> which case we should stop adding batches after reaching some % of tuples
>> not moving from the batch.
>
>Yeah, this thread started off just about the 95% thing, but veered off
>course since these topics are tangled up.  Sorry.
>
>> Or are you suggesting we should remove that rule, and instead realy on
>> this rule about halving the hash table space? That might work too, I
>> guess.
>
>No, I suspect you need both rules.  We still want to detect extreme
>skew soon as possible, even though the other rule will eventually
>fire; might as well do it sooner in clear-cut cases.
>

Right, I agree. I think we need the 95% rule (or whatever) to handle the
cases with skew / many duplicates, and then the overflow files to handle
underestimates with uniform distribution (or some other solution).

>> OTOH I'm not sure it's a good idea to handle both those cases the same
>> way - "overflow file" idea works pretty well for cases where the hash
>> table actually can be split into batches, and I'm afraid NLJ will be
>> much less efficient for those cases.
>
>Yeah, you might be right about that, and everything I'm describing is
>pure vapourware anyway.  But your overflow file scheme isn't exactly
>free of IO-amplification and multiple-processing of input data
>either... and I haven't yet grokked how it would work for parallel
>hash.  Parallel hash generally doesn't have the
>'throw-the-tuples-forward' concept. which is inherently based on
>sequential in-order processing of batches.
>

Sure, let's do some math.

With the overflow scheme, the amplification is roughly ~2x (relative to
master), because we need to write data for most batches first into the
overflow file and then to the correct one. Master has wrte aplification
about ~1.25x (due to the gradual increase of batches), so the "total"
amplification is ~2.5x.

For the NLJ, the amplification fully depends on what fraction of the hash
table fits into work_mem. For example when it needs to be split into 32
fragments, we have ~32x amplification. It might affect just some batches,
of course.

So I still think those approaches are complementary and we need both.

regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services




Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Sun, May 19, 2019 at 4:07 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Sat, May 18, 2019 at 12:15 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> On Thu, May 16, 2019 at 3:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
>> Admittedly I don't have a patch, just a bunch of handwaving.  One
>> reason I haven't attempted to write it is because although I know how
>> to do the non-parallel version using a BufFile full of match bits in
>> sync with the tuples for outer joins, I haven't figured out how to do
>> it for parallel-aware hash join, because then each loop over the outer
>> batch could see different tuples in each participant.  You could use
>> the match bit in HashJoinTuple header, but then you'd have to write
>> all the tuples out again, which is more IO than I want to do.  I'll
>> probably start another thread about that.
>
> Could you explain more about the implementation you are suggesting?
>
> Specifically, what do you mean "BufFile full of match bits in sync with the
> tuples for outer joins?"

First let me restate the PostgreSQL terminology for this stuff so I
don't get confused while talking about it:

* The inner side of the join = the right side = the side we use to
build a hash table.  Right and full joins emit inner tuples when there
is no matching tuple on the outer side.

* The outer side of the join = the left side = the side we use to
probe the hash table.  Left and full joins emit outer tuples when
there is no matching tuple on the inner side.

* Semi and anti joins emit exactly one instance of each outer tuple if
there is/isn't at least one match on the inner side.

We have a couple of relatively easy cases:

* Inner joins: for every outer tuple, we try to find a match in the
hash table, and if we find one we emit a tuple.  To add looping
support, if we run out of memory when loading the hash table we can
just proceed to probe the fragment we've managed to load so far, and
then rewind the outer batch, clear the hash table and load in the next
work_mem-sized fragment and do it again... rinse and repeat until
we've eventually processed the whole inner batch.  After we've
finished looping, we move on to the next batch.

* For right and full joins ("HJ_FILL_INNER"), we also need to emit an
inner tuple for every tuple that was loaded into the hash table but
never matched.  That's done using a flag HEAP_TUPLE_HAS_MATCH in the
header of the tuples of the hash table, and a scan through the whole
hash table at the end of each batch to look for unmatched tuples
(ExecScanHashTableForUnmatched()).  To add looping support, that just
has to be done at the end of every inner batch fragment, that is,
after every loop.

And now for the cases that need a new kind of match bit, as far as I can see:

* For left and full joins ("HJ_FILL_OUTER"), we also need to emit an
outer tuple for every tuple that didn't find a match in the hash
table.  Normally that is done while probing, without any need for
memory or match flags: if we don't find a match, we just spit out an
outer tuple immediately.  But that simple strategy won't work if the
hash table holds only part of the inner batch.  Since we'll be
rewinding and looping over the outer batch again for the next inner
batch fragment, we can't yet say if there will be a match in a later
loop.  But the later loops don't know on their own either.  So we need
some kind of cumulative memory between loops, and we only know which
outer tuples have a match after we've finished all loops.  So there
would need to be a new function ExecScanOuterBatchForUnmatched().

* For semi joins, we need to emit exactly one outer tuple whenever
there is one or more match on the inner side.  To add looping support,
we need to make sure that we don't emit an extra copy of the outer
tuple if there is a second match in another inner batch fragment.
Again, this implies some kind of memory between loops, so we can
suppress later matches.

* For anti joins, we need to emit an outer tuple whenever there is no
match.  To add looping support, we need to wait until we've seen all
the inner batch fragments before we know that a given outer tuple has
no match, perhaps with the same new function
ExecScanOuterBatchForUnmatched().

So, we need some kind of inter-loop memory, but we obviously don't
want to create another source of unmetered RAM gobbling.  So one idea
is a BufFile that has one bit per outer tuple in the batch.  In the
first loop, we just stream out the match results as we go, and then
somehow we OR the bitmap with the match results in subsequent loops.
After the last loop, we have a list of unmatched tuples -- just scan
it in lock-step with the outer batch and look for 0 bits.

That makes sense. Thanks for the detailed explanation.
 

Unfortunately that bits-in-order scheme doesn't work for parallel
hash, where the SharedTuplestore tuples seen by each worker are
non-deterministic.  So perhaps in that case we could use the
HEAP_TUPLE_HAS_MATCH bit in the outer tuple header itself, and write
the whole outer batch back out each time through the loop.  That'd
keep the tuples and match bits together, but it seems like a lot of
IO... 

If you set the has_match flag in the tuple header itself, wouldn't you only
need to write the tuples from the outer batch back out that don't have
matches?
 
> If so, why do you need to keep track of the outer tuples seen?
> If you are going to loop through the whole outer side for each tuple on the
> inner side, it seems like you wouldn't need to.

The idea is to loop through the whole outer batch for every
work_mem-sized inner batch fragment, not every tuple.  Though in
theory it could be as small as a single tuple.

> Could you make an outer "batch" which is the whole of the outer relation? That
> is, could you do something like: when hashing the inner side, if re-partitioning
> is resulting in batches that will overflow spaceAllowed, could you set a flag on
> that batch use_NLJ and when making batches for the outer side, make one "batch"
> that has all the tuples from the outer side which the inner side batch which was
> flagged will do NLJ with.

I didn't understand this... you always need to make one outer batch
corresponding to every inner batch.  The problem is the tricky
left/full/anti/semi join cases when joining against fragments holding
less that the full inner batch: we still need some way to implement
join logic that depends on knowing whether there is a match in *any*
of the inner fragments/loops.

Sorry, my suggestion was inaccurate and unclear: I was basically suggesting
that once you have all batches created for outer and inner sides, for a
given inner side batch that does not fit in memory, for each outer tuple in
the corresponding outer batch file, load and join all of the chunks of the
inner batch file. That way, before you emit that tuple, you have checked
all of the corresponding inner batch.

Thinking about it now, I realize that that would be worse in all cases than
what you are thinking of -- joining the outer side batch with the inner
side batch chunk that fits in memory and marking the BufFile bit
representing that outer side tuple as "matched" and only emitting it with a
NULL from the inner side after all chunks have been processed.

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Sun, May 19, 2019 at 4:07 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Sat, May 18, 2019 at 12:15 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> On Thu, May 16, 2019 at 3:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
>> Admittedly I don't have a patch, just a bunch of handwaving.  One
>> reason I haven't attempted to write it is because although I know how
>> to do the non-parallel version using a BufFile full of match bits in
>> sync with the tuples for outer joins, I haven't figured out how to do
>> it for parallel-aware hash join, because then each loop over the outer
>> batch could see different tuples in each participant.  You could use
>> the match bit in HashJoinTuple header, but then you'd have to write
>> all the tuples out again, which is more IO than I want to do.  I'll
>> probably start another thread about that.
>
> Could you explain more about the implementation you are suggesting?
>
> Specifically, what do you mean "BufFile full of match bits in sync with the
> tuples for outer joins?"

First let me restate the PostgreSQL terminology for this stuff so I
don't get confused while talking about it:

* The inner side of the join = the right side = the side we use to
build a hash table.  Right and full joins emit inner tuples when there
is no matching tuple on the outer side.

* The outer side of the join = the left side = the side we use to
probe the hash table.  Left and full joins emit outer tuples when
there is no matching tuple on the inner side.

* Semi and anti joins emit exactly one instance of each outer tuple if
there is/isn't at least one match on the inner side.

We have a couple of relatively easy cases:

* Inner joins: for every outer tuple, we try to find a match in the
hash table, and if we find one we emit a tuple.  To add looping
support, if we run out of memory when loading the hash table we can
just proceed to probe the fragment we've managed to load so far, and
then rewind the outer batch, clear the hash table and load in the next
work_mem-sized fragment and do it again... rinse and repeat until
we've eventually processed the whole inner batch.  After we've
finished looping, we move on to the next batch.

* For right and full joins ("HJ_FILL_INNER"), we also need to emit an
inner tuple for every tuple that was loaded into the hash table but
never matched.  That's done using a flag HEAP_TUPLE_HAS_MATCH in the
header of the tuples of the hash table, and a scan through the whole
hash table at the end of each batch to look for unmatched tuples
(ExecScanHashTableForUnmatched()).  To add looping support, that just
has to be done at the end of every inner batch fragment, that is,
after every loop.

And now for the cases that need a new kind of match bit, as far as I can see:

* For left and full joins ("HJ_FILL_OUTER"), we also need to emit an
outer tuple for every tuple that didn't find a match in the hash
table.  Normally that is done while probing, without any need for
memory or match flags: if we don't find a match, we just spit out an
outer tuple immediately.  But that simple strategy won't work if the
hash table holds only part of the inner batch.  Since we'll be
rewinding and looping over the outer batch again for the next inner
batch fragment, we can't yet say if there will be a match in a later
loop.  But the later loops don't know on their own either.  So we need
some kind of cumulative memory between loops, and we only know which
outer tuples have a match after we've finished all loops.  So there
would need to be a new function ExecScanOuterBatchForUnmatched().

* For semi joins, we need to emit exactly one outer tuple whenever
there is one or more match on the inner side.  To add looping support,
we need to make sure that we don't emit an extra copy of the outer
tuple if there is a second match in another inner batch fragment.
Again, this implies some kind of memory between loops, so we can
suppress later matches.

* For anti joins, we need to emit an outer tuple whenever there is no
match.  To add looping support, we need to wait until we've seen all
the inner batch fragments before we know that a given outer tuple has
no match, perhaps with the same new function
ExecScanOuterBatchForUnmatched().

So, we need some kind of inter-loop memory, but we obviously don't
want to create another source of unmetered RAM gobbling.  So one idea
is a BufFile that has one bit per outer tuple in the batch.  In the
first loop, we just stream out the match results as we go, and then
somehow we OR the bitmap with the match results in subsequent loops.
After the last loop, we have a list of unmatched tuples -- just scan
it in lock-step with the outer batch and look for 0 bits.

Unfortunately that bits-in-order scheme doesn't work for parallel
hash, where the SharedTuplestore tuples seen by each worker are
non-deterministic.  So perhaps in that case we could use the
HEAP_TUPLE_HAS_MATCH bit in the outer tuple header itself, and write
the whole outer batch back out each time through the loop.  That'd
keep the tuples and match bits together, but it seems like a lot of
IO...  Note that parallel hash doesn't support right/full joins today,
because of some complications about waiting and deadlocks that might
turn out to be relevant here too, and might be solvable (I should
probably write about that in another email), but left joins *are*
supported today so would need to be desupported if we wanted to add
loop-based escape valve but not deal with with these problems.  That
doesn't seem acceptable, which is why I'm a bit stuck on this point,
and unfortunately it may be a while before I have time to tackle any
of that personally.


There was an off-list discussion at PGCon last week about doing this
hash looping strategy using the bitmap with match bits and solving the
parallel hashjoin problem by having tuple-identifying information
encoded in the bitmap which allowed each worker to indicate that an
outer tuple had a match when processing that inner side chunk and
then, at the end of the scan of the outer side, the bitmaps would be
OR'd together to represent a single view of the unmatched tuples from
that iteration.

I was talking to Jeff Davis about this on Saturday, and, he felt that
there might be a way to solve the problem differently if we thought of
the left join case as performing an inner join and an antijoin
instead.

Riffing on this idea a bit, I started trying to write a patch that
would basically emit a tuple if it matches and write the tuple out to
a file if it does not match. Then, after iterating through the outer
batch the first time for the first inner chunk, any tuples which do
not yet have a match are the only ones which need to be joined against
the other inner chunks. Instead of iterating through the outer side
original batch file, use the unmatched outer tuples file to do the
join against the next chunk. Repeat this for all chunks.

Could we not do this and avoid using the match bit? In the worst case,
you would have to write out all the tuples on the outer side (if none
match) nchunks times (chunk is the work_mem sized chunk of inner
loaded into the hashtable).

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Mon, Jun 3, 2019 at 5:10 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> I was talking to Jeff Davis about this on Saturday, and, he felt that
> there might be a way to solve the problem differently if we thought of
> the left join case as performing an inner join and an antijoin
> instead.
>
> Riffing on this idea a bit, I started trying to write a patch that
> would basically emit a tuple if it matches and write the tuple out to
> a file if it does not match. Then, after iterating through the outer
> batch the first time for the first inner chunk, any tuples which do
> not yet have a match are the only ones which need to be joined against
> the other inner chunks. Instead of iterating through the outer side
> original batch file, use the unmatched outer tuples file to do the
> join against the next chunk. Repeat this for all chunks.

I'm not sure that I understanding this proposal correctly, but if I am
then I think it doesn't work in the case where a single outer row
matches rows in many different inner chunks.  When you "use the
unmatched outer tuples file to do the join against the next chunk,"
you deny any rows that have already matched the chance to produce
additional matches.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Sun, May 19, 2019 at 7:07 PM Thomas Munro <thomas.munro@gmail.com> wrote:
> Unfortunately that bits-in-order scheme doesn't work for parallel
> hash, where the SharedTuplestore tuples seen by each worker are
> non-deterministic.  So perhaps in that case we could use the
> HEAP_TUPLE_HAS_MATCH bit in the outer tuple header itself, and write
> the whole outer batch back out each time through the loop.  That'd
> keep the tuples and match bits together, but it seems like a lot of
> IO...

So, I think the case you're worried about here is something like:

Gather
-> Parallel Hash Left Join
  -> Parallel Seq Scan on a
  -> Parallel Hash
    -> Parallel Seq Scan on b

If I understand ExecParallelHashJoinPartitionOuter correctly, we're
going to hash all of a and put it into a set of batch files before we
even get started, so it's possible to identify precisely which tuple
we're talking about by just giving the batch number and the position
of the tuple within that batch.  So while it's true that the
individual workers can't use the number of tuples they've read to know
where they are in the SharedTuplestore, maybe the SharedTuplestore
could just tell them.  Then they could maintain a paged bitmap of the
tuples that they've matched to something, indexed by
position-within-the-tuplestore, and those bitmaps could be OR'd
together at the end.

Crazy idea, or...?

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Tue, Jun 4, 2019 at 5:43 AM Robert Haas <robertmhaas@gmail.com> wrote:
On Mon, Jun 3, 2019 at 5:10 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> I was talking to Jeff Davis about this on Saturday, and, he felt that
> there might be a way to solve the problem differently if we thought of
> the left join case as performing an inner join and an antijoin
> instead.
>
> Riffing on this idea a bit, I started trying to write a patch that
> would basically emit a tuple if it matches and write the tuple out to
> a file if it does not match. Then, after iterating through the outer
> batch the first time for the first inner chunk, any tuples which do
> not yet have a match are the only ones which need to be joined against
> the other inner chunks. Instead of iterating through the outer side
> original batch file, use the unmatched outer tuples file to do the
> join against the next chunk. Repeat this for all chunks.

I'm not sure that I understanding this proposal correctly, but if I am
then I think it doesn't work in the case where a single outer row
matches rows in many different inner chunks.  When you "use the
unmatched outer tuples file to do the join against the next chunk,"
you deny any rows that have already matched the chance to produce
additional matches.


Oops! You are totally right.
I will amend the idea:
For each chunk on the inner side, loop through both the original batch
file and the unmatched outer tuples file created for the last chunk.
Emit any matches and write out any unmatched tuples to a new unmatched
outer tuples file.

I think, in the worst case, if no tuples from the outer have a match,
you end up writing out all of the outer tuples for each chunk on the
inner side. However, using the match bit in the tuple header solution
would require this much writing.
Probably the bigger problem is that in this worst case you would also
need to read double the number of outer tuples for each inner chunk.

However, in the best case it seems like it would be better than the
match bit/write everything from the outer side out solution.

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Tue, Jun 4, 2019 at 2:47 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> Oops! You are totally right.
> I will amend the idea:
> For each chunk on the inner side, loop through both the original batch
> file and the unmatched outer tuples file created for the last chunk.
> Emit any matches and write out any unmatched tuples to a new unmatched
> outer tuples file.
>
> I think, in the worst case, if no tuples from the outer have a match,
> you end up writing out all of the outer tuples for each chunk on the
> inner side. However, using the match bit in the tuple header solution
> would require this much writing.
> Probably the bigger problem is that in this worst case you would also
> need to read double the number of outer tuples for each inner chunk.
>
> However, in the best case it seems like it would be better than the
> match bit/write everything from the outer side out solution.

I guess so, but the downside of needing to read twice as many outer
tuples for each inner chunk seems pretty large.  It would be a lot
nicer if we could find a way to store the matched-bits someplace other
than where we are storing the tuples, what Thomas called a
bits-in-order scheme, because then the amount of additional read and
write I/O would be tiny -- one bit per tuple doesn't add up very fast.

In the scheme you propose here, I think that after you read the
original outer tuples for each chunk and the unmatched outer tuples
for each chunk, you'll have to match up the unmatched tuples to the
original tuples, probably by using memcmp() or something.  Otherwise,
when a new match occurs, you won't know which tuple should now not be
emitted into the new unmatched outer tuples file that you're going to
produce.  So I think what's going to happen is that you'll read the
original batch file, then read the unmatched tuples file and use that
to set or not set a bit on each tuple in memory, then do the real work
setting more bits, then write out a new unmatched-tuples file with the
tuples that still don't have the bit set.  So your unmatched tuple
file is basically a list of tuple identifiers in the least compact
form imaginable: the tuple is identified by the entire tuple contents.
That doesn't seem very appealing, although I expect that it would
still win for some queries.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Tue, Jun 4, 2019 at 6:05 AM Robert Haas <robertmhaas@gmail.com> wrote:
On Sun, May 19, 2019 at 7:07 PM Thomas Munro <thomas.munro@gmail.com> wrote:
> Unfortunately that bits-in-order scheme doesn't work for parallel
> hash, where the SharedTuplestore tuples seen by each worker are
> non-deterministic.  So perhaps in that case we could use the
> HEAP_TUPLE_HAS_MATCH bit in the outer tuple header itself, and write
> the whole outer batch back out each time through the loop.  That'd
> keep the tuples and match bits together, but it seems like a lot of
> IO...

So, I think the case you're worried about here is something like:

Gather
-> Parallel Hash Left Join
  -> Parallel Seq Scan on a
  -> Parallel Hash
    -> Parallel Seq Scan on b

If I understand ExecParallelHashJoinPartitionOuter correctly, we're
going to hash all of a and put it into a set of batch files before we
even get started, so it's possible to identify precisely which tuple
we're talking about by just giving the batch number and the position
of the tuple within that batch.  So while it's true that the
individual workers can't use the number of tuples they've read to know
where they are in the SharedTuplestore, maybe the SharedTuplestore
could just tell them.  Then they could maintain a paged bitmap of the
tuples that they've matched to something, indexed by
position-within-the-tuplestore, and those bitmaps could be OR'd
together at the end.

Crazy idea, or...?


That idea does sound like it could work. Basically a worker is given a
tuple and a bit index (process this tuple and if it matches go flip
the bit at position 30) in its own bitmap, right?

I need to spend some time understanding how SharedTupleStore works and
how workers get tuples, so what I'm saying might not make sense.

One question I have is, how would the OR'd together bitmap be
propagated to workers after the first chunk? That is, when there are
no tuples left in the outer bunch, for a given inner chunk, would you
load the bitmaps from each worker into memory, OR them together, and
then write the updated bitmap back out so that each worker starts with
the updated bitmap?

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Tue, Jun 4, 2019 at 3:09 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> One question I have is, how would the OR'd together bitmap be
> propagated to workers after the first chunk? That is, when there are
> no tuples left in the outer bunch, for a given inner chunk, would you
> load the bitmaps from each worker into memory, OR them together, and
> then write the updated bitmap back out so that each worker starts with
> the updated bitmap?

I was assuming we'd elect one participant to go read all the bitmaps,
OR them together, and generate all the required null-extended tuples,
sort of like the PHJ_BUILD_ALLOCATING, PHJ_GROW_BATCHES_ALLOCATING,
PHJ_GROW_BUCKETS_ALLOCATING, and/or PHJ_BATCH_ALLOCATING states only
involve one participant being active at a time. Now you could hope for
something better -- why not parallelize that work?  But on the other
hand, why not start simple and worry about that in some future patch
instead of right away? A committed patch that does something good is
better than an uncommitted patch that does something AWESOME.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Tue, Jun 04, 2019 at 03:08:24PM -0400, Robert Haas wrote:
>On Tue, Jun 4, 2019 at 2:47 PM Melanie Plageman
><melanieplageman@gmail.com> wrote:
>> Oops! You are totally right.
>> I will amend the idea:
>> For each chunk on the inner side, loop through both the original batch
>> file and the unmatched outer tuples file created for the last chunk.
>> Emit any matches and write out any unmatched tuples to a new unmatched
>> outer tuples file.
>>
>> I think, in the worst case, if no tuples from the outer have a match,
>> you end up writing out all of the outer tuples for each chunk on the
>> inner side. However, using the match bit in the tuple header solution
>> would require this much writing.
>> Probably the bigger problem is that in this worst case you would also
>> need to read double the number of outer tuples for each inner chunk.
>>
>> However, in the best case it seems like it would be better than the
>> match bit/write everything from the outer side out solution.
>
>I guess so, but the downside of needing to read twice as many outer
>tuples for each inner chunk seems pretty large.  It would be a lot
>nicer if we could find a way to store the matched-bits someplace other
>than where we are storing the tuples, what Thomas called a
>bits-in-order scheme, because then the amount of additional read and
>write I/O would be tiny -- one bit per tuple doesn't add up very fast.
>
>In the scheme you propose here, I think that after you read the
>original outer tuples for each chunk and the unmatched outer tuples
>for each chunk, you'll have to match up the unmatched tuples to the
>original tuples, probably by using memcmp() or something.  Otherwise,
>when a new match occurs, you won't know which tuple should now not be
>emitted into the new unmatched outer tuples file that you're going to
>produce.  So I think what's going to happen is that you'll read the
>original batch file, then read the unmatched tuples file and use that
>to set or not set a bit on each tuple in memory, then do the real work
>setting more bits, then write out a new unmatched-tuples file with the
>tuples that still don't have the bit set.  So your unmatched tuple
>file is basically a list of tuple identifiers in the least compact
>form imaginable: the tuple is identified by the entire tuple contents.
>That doesn't seem very appealing, although I expect that it would
>still win for some queries.
>

I wonder how big of an issue that actually is in practice. If this is 
meant for significantly skewed data sets, which may easily cause OOM
(e.g. per the recent report, which restarted this discussion). So if we
still only expect to use this for rare cases, which may easily end up
with an OOM at the moment, the extra cost might be acceptable.

But if we plan to use this more widely (say, allow hashjoins even for
cases that we know won't fit into work_mem), then the extra cost would
be an issue. But even then it should be included in the cost estimate, 
and switch the plan to a merge join when appropriate.

Of course, maybe there are many data sets with enough skew to consume 
explosive growth and consume a lot of memory, but not enough to trigger 
OOM. Those cases may get slower, but I think that's OK. If appropriate,
the user can increase work_mem and get the "good" plan.

FWIW this is a challenge for all approaches discussed in this thread,
not just this particular one. We're restricting the resources available
to the query, switching to something (likely) slower.


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services 



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Tue, Jun 4, 2019 at 12:08 PM Robert Haas <robertmhaas@gmail.com> wrote:
On Tue, Jun 4, 2019 at 2:47 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> Oops! You are totally right.
> I will amend the idea:
> For each chunk on the inner side, loop through both the original batch
> file and the unmatched outer tuples file created for the last chunk.
> Emit any matches and write out any unmatched tuples to a new unmatched
> outer tuples file.
>
> I think, in the worst case, if no tuples from the outer have a match,
> you end up writing out all of the outer tuples for each chunk on the
> inner side. However, using the match bit in the tuple header solution
> would require this much writing.
> Probably the bigger problem is that in this worst case you would also
> need to read double the number of outer tuples for each inner chunk.
>
> However, in the best case it seems like it would be better than the
> match bit/write everything from the outer side out solution.

I guess so, but the downside of needing to read twice as many outer
tuples for each inner chunk seems pretty large.  It would be a lot
nicer if we could find a way to store the matched-bits someplace other
than where we are storing the tuples, what Thomas called a
bits-in-order scheme, because then the amount of additional read and
write I/O would be tiny -- one bit per tuple doesn't add up very fast.

In the scheme you propose here, I think that after you read the
original outer tuples for each chunk and the unmatched outer tuples
for each chunk, you'll have to match up the unmatched tuples to the
original tuples, probably by using memcmp() or something.  Otherwise,
when a new match occurs, you won't know which tuple should now not be
emitted into the new unmatched outer tuples file that you're going to
produce.  So I think what's going to happen is that you'll read the
original batch file, then read the unmatched tuples file and use that
to set or not set a bit on each tuple in memory, then do the real work
setting more bits, then write out a new unmatched-tuples file with the
tuples that still don't have the bit set.  So your unmatched tuple
file is basically a list of tuple identifiers in the least compact
form imaginable: the tuple is identified by the entire tuple contents.
That doesn't seem very appealing, although I expect that it would
still win for some queries.


I'm not sure I understand why you would need to compare the original
tuples to the unmatched tuples file.

This is the example I used to try and reason through it.

let's say you have a batch (you are joining two single column tables)
and your outer side is:
5,7,9,11,10,11
and your inner is:
7,10,7,12,5,9
and for the inner, let's say that only two values can fit in memory,
so it is split into 3 chunks:
7,10 | 7,12 | 5,9
The first time you iterate through the outer side (joining it to the
first chunk), you emit as matched
7,7
10,10
and write to unmatched tuples file
5
9
11
11
The second time you iterate through the outer side (joining it to the
second chunk) you emit as matched
7,7
Then, you iterate again through the outer side a third time to join it
to the unmatched tuples in the unmatched tuples file (from the first
chunk) and write the following to a new unmatched tuples file:
5
9
11
11
The fourth time you iterate through the outer side (joining it to the
third chunk), you emit as matched
5,5
9,9
Then you iterate a fifth time through the outer side to join it to the
unmatched tuples in the unmatched tuples file (from the second chunk)
and write the following to a new unmatched tuples file:
11
11
Now that all chunks from the inner side have been processed, you can
loop through the final unmatched tuples file, NULL-extend, and emit
them

Wouldn't that work?

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Tue, Jun 4, 2019 at 12:15 PM Robert Haas <robertmhaas@gmail.com> wrote:
On Tue, Jun 4, 2019 at 3:09 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> One question I have is, how would the OR'd together bitmap be
> propagated to workers after the first chunk? That is, when there are
> no tuples left in the outer bunch, for a given inner chunk, would you
> load the bitmaps from each worker into memory, OR them together, and
> then write the updated bitmap back out so that each worker starts with
> the updated bitmap?

I was assuming we'd elect one participant to go read all the bitmaps,
OR them together, and generate all the required null-extended tuples,
sort of like the PHJ_BUILD_ALLOCATING, PHJ_GROW_BATCHES_ALLOCATING,
PHJ_GROW_BUCKETS_ALLOCATING, and/or PHJ_BATCH_ALLOCATING states only
involve one participant being active at a time. Now you could hope for
something better -- why not parallelize that work?  But on the other
hand, why not start simple and worry about that in some future patch
instead of right away? A committed patch that does something good is
better than an uncommitted patch that does something AWESOME.


What if you have a lot of tuples -- couldn't the bitmaps get pretty
big? And then you have to OR them all together and if you can't put
the whole bitmap from each worker into memory at once to do it, it
seems like it would be pretty slow. (I mean maybe not as slow as
reading the outer side 5 times when you only have 3 chunks on the
inner + all the extra writes from my unmatched tuple file idea, but still...)

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Thu, May 16, 2019 at 3:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
Admittedly I don't have a patch, just a bunch of handwaving.  One
reason I haven't attempted to write it is because although I know how
to do the non-parallel version using a BufFile full of match bits in
sync with the tuples for outer joins, I haven't figured out how to do
it for parallel-aware hash join, because then each loop over the outer
batch could see different tuples in each participant.  You could use
the match bit in HashJoinTuple header, but then you'd have to write
all the tuples out again, which is more IO than I want to do.  I'll
probably start another thread about that.


Going back to the idea of using the match bit in the HashJoinTuple header
and writing out all of the outer side for every chunk of the inner
side, I was wondering if there was something we could do that was kind
of like mmap'ing the outer side file to give the workers in parallel
hashjoin the ability to update a match bit in the tuple in place and
avoid writing the whole outer side out each time.

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Thu, Jun 06, 2019 at 04:37:19PM -0700, Melanie Plageman wrote:
>On Thu, May 16, 2019 at 3:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
>
>> Admittedly I don't have a patch, just a bunch of handwaving.  One
>> reason I haven't attempted to write it is because although I know how
>> to do the non-parallel version using a BufFile full of match bits in
>> sync with the tuples for outer joins, I haven't figured out how to do
>> it for parallel-aware hash join, because then each loop over the outer
>> batch could see different tuples in each participant.  You could use
>> the match bit in HashJoinTuple header, but then you'd have to write
>> all the tuples out again, which is more IO than I want to do.  I'll
>> probably start another thread about that.
>>
>>
>Going back to the idea of using the match bit in the HashJoinTuple header
>and writing out all of the outer side for every chunk of the inner
>side, I was wondering if there was something we could do that was kind
>of like mmap'ing the outer side file to give the workers in parallel
>hashjoin the ability to update a match bit in the tuple in place and
>avoid writing the whole outer side out each time.
>

I think this was one of the things we discussed in Ottawa - we could pass
index of the tuple (in the batch) along with the tuple, so that each
worker know which bit to set.

regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services




Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Thu, Jun 06, 2019 at 04:33:31PM -0700, Melanie Plageman wrote:
>On Tue, Jun 4, 2019 at 12:15 PM Robert Haas <robertmhaas@gmail.com> wrote:
>
>> On Tue, Jun 4, 2019 at 3:09 PM Melanie Plageman
>> <melanieplageman@gmail.com> wrote:
>> > One question I have is, how would the OR'd together bitmap be
>> > propagated to workers after the first chunk? That is, when there are
>> > no tuples left in the outer bunch, for a given inner chunk, would you
>> > load the bitmaps from each worker into memory, OR them together, and
>> > then write the updated bitmap back out so that each worker starts with
>> > the updated bitmap?
>>
>> I was assuming we'd elect one participant to go read all the bitmaps,
>> OR them together, and generate all the required null-extended tuples,
>> sort of like the PHJ_BUILD_ALLOCATING, PHJ_GROW_BATCHES_ALLOCATING,
>> PHJ_GROW_BUCKETS_ALLOCATING, and/or PHJ_BATCH_ALLOCATING states only
>> involve one participant being active at a time. Now you could hope for
>> something better -- why not parallelize that work?  But on the other
>> hand, why not start simple and worry about that in some future patch
>> instead of right away? A committed patch that does something good is
>> better than an uncommitted patch that does something AWESOME.
>>
>>
>What if you have a lot of tuples -- couldn't the bitmaps get pretty
>big? And then you have to OR them all together and if you can't put
>the whole bitmap from each worker into memory at once to do it, it
>seems like it would be pretty slow. (I mean maybe not as slow as
>reading the outer side 5 times when you only have 3 chunks on the
>inner + all the extra writes from my unmatched tuple file idea, but
>still...)
>

Yes, they could get quite big, and I think you're right we need to
keep that in mind, because it's on the outer (often quite large) side of
the join. And if we're aiming to restrict memory usage, it'd be weird to
just ignore this.

But I think Thomas Munro originally proposed to treat this as a separate
BufFile, so my assumption was each worker would simply rewrite the bitmap
repeatedly for each hash table fragment. That means a bit more I/O, but as
those files are buffered and written in 8kB pages, with just 1 bit per
tuple. I think that's pretty OK and way cheaper that rewriting the whole
batch, where each tuple can be hundreds of bytes.

Also, it does not require any concurrency control, which rewriting the
batches themselves probably does (because we'd be feeding the tuples into
some shared file, I suppose). Except for the final step when we need to
merge the bitmaps, of course.

So I think this would work, it does not have the issue with using too much
memory, and I don't think the overhead is too bad.


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services




Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Thu, Jun 6, 2019 at 7:31 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> I'm not sure I understand why you would need to compare the original
> tuples to the unmatched tuples file.

I think I was confused.  Actually, I'm still not sure I understand this part:

> Then, you iterate again through the outer side a third time to join it
> to the unmatched tuples in the unmatched tuples file (from the first
> chunk) and write the following to a new unmatched tuples file:
> 5
> 9
> 11
> 11

and likewise here

> Then you iterate a fifth time through the outer side to join it to the
> unmatched tuples in the unmatched tuples file (from the second chunk)
> and write the following to a new unmatched tuples file:
> 11
> 11

So you refer to joining the outer side to the unmatched tuples file,
but how would that tell you which outer tuples had no matches on the
inner side?  I think what you'd need to do is anti-join the unmatched
tuples file to the current inner batch.  So the algorithm would be
something like:

for each inner batch:
  for each outer tuple:
    if tuple matches inner batch then emit match
    if tuple does not match inner batch and this is the first inner batch:
      write tuple to unmatched tuples file
  if this is not the first inner batch:
    for each tuple from the unmatched tuples file:
      if tuple does not match inner batch:
        write to new unmatched tuples file
    discard previous unmatched tuples file and use the new one for the
next iteration

for each tuple in the final unmatched tuples file:
  null-extend and emit

If that's not what you have in mind, maybe you could provide some
similar pseudocode?  Or you can just ignore me. I'm not trying to
interfere with an otherwise-fruitful discussion by being the only one
in the room who is confused...

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Fri, Jun 7, 2019 at 10:17 AM Tomas Vondra
<tomas.vondra@2ndquadrant.com> wrote:
> Yes, they could get quite big, and I think you're right we need to
> keep that in mind, because it's on the outer (often quite large) side of
> the join. And if we're aiming to restrict memory usage, it'd be weird to
> just ignore this.
>
> But I think Thomas Munro originally proposed to treat this as a separate
> BufFile, so my assumption was each worker would simply rewrite the bitmap
> repeatedly for each hash table fragment. That means a bit more I/O, but as
> those files are buffered and written in 8kB pages, with just 1 bit per
> tuple. I think that's pretty OK and way cheaper that rewriting the whole
> batch, where each tuple can be hundreds of bytes.

Yes, this is also my thought.  I'm not 100% sure I understand
Melanie's proposal, but I think that it involves writing every
still-unmatched outer tuple for every inner batch.  This proposal --
assuming we can get the tuple numbering worked out -- involves writing
a bit for every outer tuple for every inner batch.  So each time you
do an inner batch, you write either (a) one bit for EVERY outer tuple
or (b) the entirety of each unmatched tuple.  It's possible for the
latter to be cheaper if the number of unmatched tuples is really,
really tiny, but it's not very likely.

For example, suppose that you've got 4 batches and each batch matches
99% of the tuples, which are each 50 bytes wide.  After each batch,
approach A writes 1 bit per tuple, so a total of 4 bits per tuple
after 4 batches.  Approach B writes a different amount of data after
each batch.  After the first batch, it writes 1% of the tuples, and
for each one written it writes 50 bytes, so it writes 50 bytes * 0.01
= ~4 bits/tuple.  That's already equal to what approach A wrote after
all 4 batches, and it's going to do a little more I/O over the course
of the remaining matches - although not much, because the unmatched
tuples file will be very very tiny after we eliminate 99% of the 1%
that survived the first batch.  However, these are extremely favorable
assumptions for approach B.  If the tuples are wider or the batches
match only say 20% of the tuples, approach B is going to be waaaay
more I/O.

Assuming I understand correctly, which I may not.

> Also, it does not require any concurrency control, which rewriting the
> batches themselves probably does (because we'd be feeding the tuples into
> some shared file, I suppose). Except for the final step when we need to
> merge the bitmaps, of course.

I suppose that rewriting the batches -- or really the unmatched tuples
file -- could just use a SharedTuplestore, so we probably wouldn't
need a lot of new code for this.  I don't know whether contention
would be a problem or not.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Fri, Jun 7, 2019 at 7:30 AM Robert Haas <robertmhaas@gmail.com> wrote:
On Thu, Jun 6, 2019 at 7:31 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> I'm not sure I understand why you would need to compare the original
> tuples to the unmatched tuples file.

I think I was confused.  Actually, I'm still not sure I understand this part:

> Then, you iterate again through the outer side a third time to join it
> to the unmatched tuples in the unmatched tuples file (from the first
> chunk) and write the following to a new unmatched tuples file:
> 5
> 9
> 11
> 11

and likewise here

> Then you iterate a fifth time through the outer side to join it to the
> unmatched tuples in the unmatched tuples file (from the second chunk)
> and write the following to a new unmatched tuples file:
> 11
> 11

So you refer to joining the outer side to the unmatched tuples file,
but how would that tell you which outer tuples had no matches on the
inner side?  I think what you'd need to do is anti-join the unmatched
tuples file to the current inner batch.  So the algorithm would be
something like:

for each inner batch:
  for each outer tuple:
    if tuple matches inner batch then emit match
    if tuple does not match inner batch and this is the first inner batch:
      write tuple to unmatched tuples file
  if this is not the first inner batch:
    for each tuple from the unmatched tuples file:
      if tuple does not match inner batch:
        write to new unmatched tuples file
    discard previous unmatched tuples file and use the new one for the
next iteration

for each tuple in the final unmatched tuples file:
  null-extend and emit

If that's not what you have in mind, maybe you could provide some
similar pseudocode?  Or you can just ignore me. I'm not trying to
interfere with an otherwise-fruitful discussion by being the only one
in the room who is confused...


Yep, the pseudo-code you have above is exactly what I was thinking. I
have been hacking around on my fork implementing this for the
non-parallel hashjoin (my idea was to implement a parallel-friendly
design but for the non-parallel-aware case and then go back and
implement it for the parallel-aware hashjoin later) and have some
thoughts.

I'll call the whole adaptive hashjoin fallback strategy "chunked
hashloop join" for the purposes of this description.
I'll abbreviate the three approaches we've discussed like this:

Approach A is using a separate data structure (a bitmap was the
suggested pick) to track the match status of each outer tuple

Approach B is the inner-join + anti-join writing out unmatched tuples
to a new file for every iteration through the outer side batch (for
each chunk of inner)

Approach C is setting a match bit in the tuple and then writing all
outer side tuples out for every iteration through the outer side (for
each chunk of inner)

To get started with I implemented the inner side chunking logic which
is required for all of the approaches. I did a super basic version
which only allows nbatches to be increased during the initial
hashtable build--not during loading of subsequent batches--if a batch
after batch 0 runs out of work_mem, it just loads what will fit and
saves the inner page offset in the hashjoin state.

Part of the allure of approaches B and C for me was that they seemed
like they would require less code complexity and concurrency control
because you could just write out the unmatched tuples (to probably a
SharedTupleStore) without having to care about their original order or
page offset. It seemed like it didn't require treating a spill file
like it permits random access nor treating the tuples as ordered in a
SharedTupleStore.

The benefit I saw of approach B over approach C was that, in the case
where more tuples are matches, it requires fewer writes than approach
C--at the cost of additional reads. It would require at most the same
number of writes as approach C.

Approach B turned out to be problematic for many reasons. First of
all, with approach B, you end up having to keep track of an additional
new spill file for unmatched outer tuples for every chunk of the inner
side. Each spill file could have a different number of tuples, so, any
reuse of the file seems difficult to get right. For approach C (which
I did not try to implement), it seems like you could get away with
only maintaining two spill files for the outer side--one to be read
from and one to write to. I'm sure it is more complicated than this.
However, it seemed like, for approach B you would need to create and
destroy entirely new unmatched tuple spill files for every chunk.

Approach B was not simpler when it came to the code complexity of the
state machine either -- you have to do something different for the
first chunk than the other chunks (write to the unmatched tups file
but read from the original spill file, whereas other chunks require
writing to the unmatched tups file and reading from the unmatched tups
file), which requires complexity in the state machine (and, I imagine,
worker orchestration in the parallel implementation). And, you still
have to process all of the unmatched tups, null-extend them, and emit
them before advancing the batch.

So, I decided to try out approach A. The crux of the idea (my
understanding of it, at least) is to keep a separate data structure
which has the match status of each outer tuple in the batch. The
discussion was to do this with a bitmap in a file, but, I started with
doing it with a list in memory.

What I have so far is a list of structs--one for each outer
tuple--where each struct has a match flag and the page offset of that
tuple in the outer spill file. I add each struct to the list when I am
getting each tuple from a spill file in HJ_NEED_NEW_OUTER state to
join to the first chunk of the inner, and, since I only do this when I
am getting an outer tuple from the spill file, I also grab the page
offset and set it in the struct in the list.

As I am creating the list, and, while processing each subsequent chunk
of the inner, if the tuple is a match, I set the match flag to true in
that outer tuple's member of the list.

Then, after finishing the whole inner batch, I loop through the list,
and, for each unmatched tuple, I go to that offset in the spill file
and get that tuple and NULL-extend and emit it.

(Currently, I have a problem with the list and it doesn't produce
correct results yet.)

Thinking about how to move from my list of offsets to using a bitmap,
I got confused.

Let me try to articulate what I think the bitmap implementation would look
like:

Before doing chunked hashloop join for any batch, we would need to
know how many tuples are in the outer batch to make the bitmap the
correct size.

We could do this either with one loop through the whole outer batch
file right before joining it to the inner batch (an extra loop).

Or we could try and do it during the first read of the outer relation
when processing batch 0 and keep a data structure with each batch
number mapped to the number of outer tuples spilled to that batch.

Then, once we have this number, before joining the outer to the first
chunk of the inner, we would generate a bitmap with ntuples in outer
batch number of bits and save it somewhere (eventually in a file,
initially in the hjstate).

Now, I am back to the original problem--how do you know which bit to
set without somehow numbering the tuples with a unique identifier? Is
there anything that uniquely identifies a spill file tuple except its
offset?

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Tue, Jun 11, 2019 at 2:35 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> Let me try to articulate what I think the bitmap implementation would look
> like:
>
> Before doing chunked hashloop join for any batch, we would need to
> know how many tuples are in the outer batch to make the bitmap the
> correct size.

I was thinking that we wouldn't need to know this, because if the
bitmap is in a file, we can always extend it.  To imagine a needlessly
dumb implementation, consider:

set-bit(i):
   let b = i / 8
   while (b <= length of file in bytes)
      append '\0' to file
  read byte b from the file
  modify the byte you read by setting bit i % 8
  write the modified byte back to the file

In reality, we'd have some kind of buffer.  I imagine locality of
reference would be pretty good, because the outer tuples are coming to
us in increasing-tuple-number order.

If you want to prototype with an in-memory implementation, I'd suggest
just pallocing 8kB initially and repallocing when the tuple number
gets too big.  It'll be sorta inefficient, but who cares?  It's
certainly way cheaper than an extra pass over the data, and for a POC
it should be fine.

> Now, I am back to the original problem--how do you know which bit to
> set without somehow numbering the tuples with a unique identifier? Is
> there anything that uniquely identifies a spill file tuple except its
> offset?

I don't think so.  Approach A hinges on being able to get the tuple
number reliably and without contortions, and I have not tried to make
that work.  So maybe it's really hard or not possible or something.
My intuition is that it ought to work, but that and a dollar will get
you cup of coffee, so...

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Thu, Jun 13, 2019 at 7:10 AM Robert Haas <robertmhaas@gmail.com> wrote:
On Tue, Jun 11, 2019 at 2:35 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> Let me try to articulate what I think the bitmap implementation would look
> like:
>
> Before doing chunked hashloop join for any batch, we would need to
> know how many tuples are in the outer batch to make the bitmap the
> correct size.

I was thinking that we wouldn't need to know this, because if the
bitmap is in a file, we can always extend it.  To imagine a needlessly
dumb implementation, consider:

set-bit(i):
   let b = i / 8
   while (b <= length of file in bytes)
      append '\0' to file
  read byte b from the file
  modify the byte you read by setting bit i % 8
  write the modified byte back to the file

In reality, we'd have some kind of buffer.  I imagine locality of
reference would be pretty good, because the outer tuples are coming to
us in increasing-tuple-number order.

If you want to prototype with an in-memory implementation, I'd suggest
just pallocing 8kB initially and repallocing when the tuple number
gets too big.  It'll be sorta inefficient, but who cares?  It's
certainly way cheaper than an extra pass over the data, and for a POC
it should be fine.


That approach makes sense. I have attached the first draft of a patch
I wrote to do parallel-oblivious hashjoin fallback. I haven't switched
to using the approach with a bitmap (or bytemap :) yet because I found
that using a linked list was easier to debug for now.

(Also, I did things like include the value of the outer tuple
attribute in the linked list nodes and assumed it was an int because
that is what I have been testing with--this would definitely be blown
away with everything else that is just there to help me with debugging
right now).

I am refactoring it now to change the state machine to make more sense
before changing the representation of the match statuses.

So, specifically, I am interested in high-level gut checks on the
state machine I am currently implementing (not reflected in this
patch).

This patch adds only one state -- HJ_ADAPTIVE_EMIT_UNMATCHED-- which
duplicates the logic of HJ_FILL_OUTER_TUPLE. Also, in this patch, the
existing HJ_NEED_NEW_BATCH state is used for new chunks. After
separating the logic that advanced the batches from that which loaded
a batch, it felt like NEED_NEW_CHUNK did not need to be its own state.
When a new chunk is required, if more exist, then the next one should
be loaded and outer should be rewound. Rewinding of outer was already
being done (seek to the beginning of the outer spill file is the
equivalent of "loading" it).

Currently, I am tracking a lot of state in the HashJoinState, which is
fiddly and error-prone.

New state machine (questions posed below):
To refactor the state machine, I am thinking of adding a new state
HJ_NEED_NEW_INNER_CHUNK which we would transition to when outer batch
is over. We would load the new chunk, rewind the outer, and transition
to HJ_NEED_NEW_OUTER. However, we would have to emit unmatched inner
tuples for that chunk (in case of ROJ) before that transition to
HJ_NEED_NEW_OUTER. This feels a little less clean because the
HJ_FILL_INNER_TUPLES state is transitioned into when the inner batch
is over as well. And, in the current flow I am sketching out, if the
inner batch is exhausted, we check if we should emit NULL-extended
inner tuples and then check if we should emit NULL-extended outer
tuples (since both batches are exhausted), whereas when a single inner
chunk is done being processed, we only want to emit NULL-extended
tuples for the inner side. Not to mention HJ_NEED_NEW_INNER_CHUNK
would transition to HJ_NEED_NEW_OUTER directly instead of first
advancing the batches. This can all be hacked around with if
statements, but, my point here is that if I am refactoring the state
machine to be more clear, ideally, it would be more clear.

A similar problem happens with HJ_FILL_OUTER_TUPLE and the
non-fallback case. For the fallback case, with this implementation,
you must wait until after exhausting the inner side to emit
NULL-extended outer tuples. In the non-fallback case -- a batch which
can fit in memory or, always, for batch 0 -- the unmatched outer
tuples are emitted as they are encountered.

It makes most sense in the context of the state machine, as far as I
can tell, after exhausting both outer and inner batch, to emit
NULL-extended inner tuples for that chunk and then emit NULL-extended
outer tuples for that batch.

So, requiring an additional read of the outer side to emit
NULL-extended tuples at the end of the inner batch would slow things
down for the non-fallback case, however, it seems like special casing
the fallback case would make the state machine much more confusing --
basically like mashing two totally different state machines together.

These questions will probably make a lot more sense with corresponding
code, so I will follow up with the second version of the state machine
patch once I finish it.

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Tue, Jun 18, 2019 at 3:24 PM Melanie Plageman <melanieplageman@gmail.com> wrote:

These questions will probably make a lot more sense with corresponding
code, so I will follow up with the second version of the state machine
patch once I finish it.


I have changed the state machine and resolved the questions I had
raised in the previous email. This seems to work for the parallel and
non-parallel cases. I have not yet rewritten the unmatched outer tuple
status as a bitmap in a spill file (for ease of debugging).

Before doing that, I wanted to ask what a desirable fallback condition
would be. In this patch, fallback to hashloop join happens only when
inserting tuples into the hashtable after batch 0 when inserting
another tuple from the batch file would exceed work_mem. This means
you can't increase nbatches, which, I would think is undesirable.

I thought a bit about when fallback should happen. So, let's say that
we would like to fallback to hashloop join when we have increased
nbatches X times. At that point, since we do not want to fall back to
hashloop join for all batches, we have to make a decision. After
increasing nbatches the Xth time, do we then fall back for all batches
for which inserting inner tuples exceeds work_mem? Do we use this
strategy but work_mem + some fudge factor?

Or, do we instead try to determine if data skew led us to increase
nbatches both times and then determine which batch, given new
nbatches, contains that data, set fallback to true only for that
batch, and let all other batches use the existing logic (with no
fallback option) unless they contain a value which leads to increasing
nbatches X number of times?

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Wed, Jul 03, 2019 at 02:22:09PM -0700, Melanie Plageman wrote:
>On Tue, Jun 18, 2019 at 3:24 PM Melanie Plageman <melanieplageman@gmail.com>
>wrote:
>
>>
>> These questions will probably make a lot more sense with corresponding
>> code, so I will follow up with the second version of the state machine
>> patch once I finish it.
>>
>>
>I have changed the state machine and resolved the questions I had
>raised in the previous email. This seems to work for the parallel and
>non-parallel cases. I have not yet rewritten the unmatched outer tuple
>status as a bitmap in a spill file (for ease of debugging).
>
>Before doing that, I wanted to ask what a desirable fallback condition
>would be. In this patch, fallback to hashloop join happens only when
>inserting tuples into the hashtable after batch 0 when inserting
>another tuple from the batch file would exceed work_mem. This means
>you can't increase nbatches, which, I would think is undesirable.
>

Yes, I think that's undesirable.

>I thought a bit about when fallback should happen. So, let's say that
>we would like to fallback to hashloop join when we have increased
>nbatches X times. At that point, since we do not want to fall back to
>hashloop join for all batches, we have to make a decision. After
>increasing nbatches the Xth time, do we then fall back for all batches
>for which inserting inner tuples exceeds work_mem? Do we use this
>strategy but work_mem + some fudge factor?
>
>Or, do we instead try to determine if data skew led us to increase
>nbatches both times and then determine which batch, given new
>nbatches, contains that data, set fallback to true only for that
>batch, and let all other batches use the existing logic (with no
>fallback option) unless they contain a value which leads to increasing
>nbatches X number of times?
>

I think we should try to detect the skew and use this hashloop logic
only for the one batch. That's based on the assumption that the hashloop
is less efficient than the regular hashjoin.

We may need to apply it even for some non-skewed (but misestimated)
cases, though. At some point we'd need more than work_mem for BufFiles,
at which point we ought to use this hashloop.


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services 



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:
So, I've rewritten the patch to use a BufFile for the outer table
batch file tuples' match statuses and write bytes to and from the file
which start as 0 and, upon encountering a match for a tuple, I set its
bit in the file to 1 (also rebased with current master).

It, of course, only works for parallel-oblivious hashjoin -- it relies
on deterministic order of tuples encountered in the outer side batch
file to set the right match bit and uses a counter to decide which bit
to set.

I did the "needlessly dumb implementation" Robert mentioned, though,
I thought about it and couldn't come up with a much smarter way to
write match bits to a file. I think there might be an optimization
opportunity in not writing the current_byte to the file each time that
the outer tuple matches and only doing this once we have advanced to a
tuple number that wouldn't have its match bit in the current_byte. I
didn't do that to keep it simple, and, I suspect there might be a bit
of gymnastics needed to make sure that that byte is actually written
to the file in case we exit from some other state before we encounter
the tuple represented in the last bit in that byte.

I plan to work on a separate implementation for parallel hashjoin
next--to understand what is required. I believe the logic to decide
when to fall back should be fairly easy to slot in at the end once
we've decided what that logic is.

On Sat, Jul 13, 2019 at 4:44 PM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:
On Wed, Jul 03, 2019 at 02:22:09PM -0700, Melanie Plageman wrote:
>On Tue, Jun 18, 2019 at 3:24 PM Melanie Plageman <melanieplageman@gmail.com>
>
>Before doing that, I wanted to ask what a desirable fallback condition
>would be. In this patch, fallback to hashloop join happens only when
>inserting tuples into the hashtable after batch 0 when inserting
>another tuple from the batch file would exceed work_mem. This means
>you can't increase nbatches, which, I would think is undesirable.
>

Yes, I think that's undesirable.

>I thought a bit about when fallback should happen. So, let's say that
>we would like to fallback to hashloop join when we have increased
>nbatches X times. At that point, since we do not want to fall back to
>hashloop join for all batches, we have to make a decision. After
>increasing nbatches the Xth time, do we then fall back for all batches
>for which inserting inner tuples exceeds work_mem? Do we use this
>strategy but work_mem + some fudge factor?
>
>Or, do we instead try to determine if data skew led us to increase
>nbatches both times and then determine which batch, given new
>nbatches, contains that data, set fallback to true only for that
>batch, and let all other batches use the existing logic (with no
>fallback option) unless they contain a value which leads to increasing
>nbatches X number of times?
>

I think we should try to detect the skew and use this hashloop logic
only for the one batch. That's based on the assumption that the hashloop
is less efficient than the regular hashjoin.

We may need to apply it even for some non-skewed (but misestimated)
cases, though. At some point we'd need more than work_mem for BufFiles,
at which point we ought to use this hashloop.


I have not yet changed the logic for deciding to fall back from
my original design. It will still only fall back for a given batch if
that batch's inner batch file doesn't fit in memory. I haven't,
however, changed the logic to allow it to increase the number of
batches some number of times or according to some criteria before
falling back for that batch.

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Robert Haas
Date:
On Tue, Jul 30, 2019 at 2:47 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> I did the "needlessly dumb implementation" Robert mentioned, though,
> I thought about it and couldn't come up with a much smarter way to
> write match bits to a file. I think there might be an optimization
> opportunity in not writing the current_byte to the file each time that
> the outer tuple matches and only doing this once we have advanced to a
> tuple number that wouldn't have its match bit in the current_byte. I
> didn't do that to keep it simple, and, I suspect there might be a bit
> of gymnastics needed to make sure that that byte is actually written
> to the file in case we exit from some other state before we encounter
> the tuple represented in the last bit in that byte.

I mean, I was assuming we'd write in like 8kB blocks or something.
Doing it a byte at a time seems like it'd produce way too many
syscals.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Tue, Jul 30, 2019 at 4:36 PM Robert Haas <robertmhaas@gmail.com> wrote:
On Tue, Jul 30, 2019 at 2:47 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> I did the "needlessly dumb implementation" Robert mentioned, though,
> I thought about it and couldn't come up with a much smarter way to
> write match bits to a file. I think there might be an optimization
> opportunity in not writing the current_byte to the file each time that
> the outer tuple matches and only doing this once we have advanced to a
> tuple number that wouldn't have its match bit in the current_byte. I
> didn't do that to keep it simple, and, I suspect there might be a bit
> of gymnastics needed to make sure that that byte is actually written
> to the file in case we exit from some other state before we encounter
> the tuple represented in the last bit in that byte.

I mean, I was assuming we'd write in like 8kB blocks or something.
Doing it a byte at a time seems like it'd produce way too many
syscals.


For the actual write to disk, I'm pretty sure I get that for free from
the BufFile API, no?
I was more thinking about optimizing when I call BufFileWrite at all.

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Peter Geoghegan
Date:
On Tue, Jul 30, 2019 at 8:07 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> For the actual write to disk, I'm pretty sure I get that for free from
> the BufFile API, no?
> I was more thinking about optimizing when I call BufFileWrite at all.

Right. Clearly several existing buffile.c users regularly have very
small BufFileWrite() size arguments. tuplestore.c, for one.

-- 
Peter Geoghegan



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
On Wed, Jul 31, 2019 at 6:47 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> So, I've rewritten the patch to use a BufFile for the outer table
> batch file tuples' match statuses and write bytes to and from the file
> which start as 0 and, upon encountering a match for a tuple, I set its
> bit in the file to 1 (also rebased with current master).
>
> It, of course, only works for parallel-oblivious hashjoin -- it relies
> on deterministic order of tuples encountered in the outer side batch
> file to set the right match bit and uses a counter to decide which bit
> to set.
>
> I did the "needlessly dumb implementation" Robert mentioned, though,
> I thought about it and couldn't come up with a much smarter way to
> write match bits to a file. I think there might be an optimization
> opportunity in not writing the current_byte to the file each time that
> the outer tuple matches and only doing this once we have advanced to a
> tuple number that wouldn't have its match bit in the current_byte. I
> didn't do that to keep it simple, and, I suspect there might be a bit
> of gymnastics needed to make sure that that byte is actually written
> to the file in case we exit from some other state before we encounter
> the tuple represented in the last bit in that byte.

Thanks for working on this! I plan to poke at it a bit in the next few weeks.

> I plan to work on a separate implementation for parallel hashjoin
> next--to understand what is required. I believe the logic to decide
> when to fall back should be fairly easy to slot in at the end once
> we've decided what that logic is.

Seems like a good time for me to try to summarise what I think the
main problems are here:

1.  The match-bit storage problem already discussed.  The tuples that
each process receives while reading from SharedTupleStore are
non-deterministic (like other parallel scans).  To use a bitmap-based
approach, I guess we'd need to invent some way to give the tuples a
stable identifier within some kind of densely packed number space that
we could use to address the bitmap, or take the IO hit and write all
the tuples back.  That might involve changing the way SharedTupleStore
holds data.

2.  Tricky problems relating to barriers and flow control.  First, let
me explain why PHJ doesn't support full/right outer joins yet.  At
first I thought it was going to be easy, because, although the shared
memory hash table is read-only after it has been built, it seems safe
to weaken that only slightly and let the match flag be set by any
process during probing: it's OK if two processes clobber each other's
writes, as the only transition is a single bit going strictly from 0
to 1, and there will certainly be a full memory barrier before anyone
tries to read those match bits.  Then during the scan for unmatched,
you just have to somehow dole out hash table buckets or ranges of
buckets to processes on a first-come-first-served basis.  But.... then
I crashed into the following problem:

* You can't begin the scan for unmatched tuples until every process
has finished probing (ie until you have the final set of match bits).
* You can't wait for every process to finish probing, because any
process that has emitted a tuple might never come back if there is
another node that is also waiting for all processes (ie deadlock
against another PHJ doing the same thing), and probing is a phase that
emits tuples.

Generally, it's not safe to emit tuples while you are attached to a
Barrier, unless you're only going to detach from it, not wait at it,
because emitting tuples lets the program counter escape your control.
Generally, it's not safe to detach from a Barrier while accessing
resources whose lifetime it controls, such as a hash table, because
then it might go away underneath you.

The PHJ plans that are supported currently adhere to that programming
rule and so don't have a problem: after the Barrier reaches the
probing phase, processes never wait for each other again so they're
free to begin emitting tuples.  They just detach when they're done
probing, and the last to detach cleans up (frees the hash table etc).
If there is more than one batch, they detach from one batch and attach
to another when they're ready (each batch has its own Barrier), so we
can consider the batches to be entirely independent.

There is probably a way to make a scan-for-unmatched-inner phase work,
possibly involving another Barrier or something like that, but I ran
out of time trying to figure it out and wanted to ship a working PHJ
for the more common plan types.  I suppose PHLJ will face two variants
of this problem: (1) you need to synchronise the loops (you can't dump
the hash table in preparation for the next loop until all have
finished probing for the current loop), and yet you've already emitted
tuples, so you're not allowed to wait for other processes and they're
not allowed to wait for you, and (2) you can't start the
scan-for-unmatched-outer until all the probe loops belonging to one
batch are done.  The first problem is sort of analogous to a problem I
faced with batches in the first place, which Robert and I found a
solution to by processing the batches in parallel, and could perhaps
be solved in the same way: run the loops in parallel (if that sounds
crazy, recall that every worker has its own quota of work_mem and the
data is entirely prepartitioned up front, which is why we are able to
run the batches in parallel; in constrast, single-batch mode makes a
hash table with a quota of nparticipants * work_mem).  The second
problem is sort of analogous to the existing scan-for-unmatched-inner
problem that I haven't solved.

I think there may be ways to make that general class of deadlock
problem go away in a future asynchronous executor model where N
streams conceptually run concurrently in event-driven nodes so that
control never gets stuck in a node, but that seems quite far off and I
haven't worked out the details.  The same problem comes up in a
hypothetical Parallel Repartition node: you're not done with your
partition until all processes have run out of input tuples, so you
have to wait for all of them to send an EOF, so you risk deadlock if
they are waiting for you elsewhere in the tree.  A stupid version of
the idea is to break the node up into a consumer part and a producer
part, and put the producer into a subprocess so that its program
counter can never escape and deadlock somewhere in the consumer part
of the plan.  Obviously we don't want to have loads of extra OS
processes all over the place, but I think you can get the same effect
using a form of asynchronous execution where the program counter jumps
between nodes and streams based on readiness, and yields control
instead of blocking.  Similar ideas have been proposed to deal with
asynchronous IO.

-- 
Thomas Munro
https://enterprisedb.com



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Thu, Sep 5, 2019 at 10:35 PM Thomas Munro <thomas.munro@gmail.com> wrote:
Seems like a good time for me to try to summarise what I think the
main problems are here:

1.  The match-bit storage problem already discussed.  The tuples that
each process receives while reading from SharedTupleStore are
non-deterministic (like other parallel scans).  To use a bitmap-based
approach, I guess we'd need to invent some way to give the tuples a
stable identifier within some kind of densely packed number space that
we could use to address the bitmap, or take the IO hit and write all
the tuples back.  That might involve changing the way SharedTupleStore
holds data.

This I've dealt with by adding a tuplenum to the SharedTupleStore
itself which I atomically increment in sts_puttuple().
In ExecParallelHashJoinPartitionOuter(), as each worker writes tuples
to the batch files, they call sts_puttuple() and this increments the
number so each tuple has a unique number.
For persisting this number, I added the tuplenum to the meta data
section of the MinimalTuple (along with the hashvalue -- there was a
comment about this meta data that said it could be used for other
things in the future, so this seemed like a good place to put it) and
write that out to the batch file.

At the end of ExecParallelHashJoinPartitionOuter(), I make the outer
match status bitmap file. I use the final tuplenum count to determine
the number of bytes to write to it. Each worker has a file with a
bitmap which has the number of bytes required to represent the number
of tuples in that batch.

Because one worker may beat the other(s) and build the whole batch
file for a batch before the others have a chance, I also make the
outer match status bitmap file for workers who missed out in
ExecParallelHashJoinOuterGetTuple() using the final tuplenum as well.
 

2.  Tricky problems relating to barriers and flow control.  First, let
me explain why PHJ doesn't support full/right outer joins yet.  At
first I thought it was going to be easy, because, although the shared
memory hash table is read-only after it has been built, it seems safe
to weaken that only slightly and let the match flag be set by any
process during probing: it's OK if two processes clobber each other's
writes, as the only transition is a single bit going strictly from 0
to 1, and there will certainly be a full memory barrier before anyone
tries to read those match bits.  Then during the scan for unmatched,
you just have to somehow dole out hash table buckets or ranges of
buckets to processes on a first-come-first-served basis.  But.... then
I crashed into the following problem:

* You can't begin the scan for unmatched tuples until every process
has finished probing (ie until you have the final set of match bits).
* You can't wait for every process to finish probing, because any
process that has emitted a tuple might never come back if there is
another node that is also waiting for all processes (ie deadlock
against another PHJ doing the same thing), and probing is a phase that
emits tuples.

Generally, it's not safe to emit tuples while you are attached to a
Barrier, unless you're only going to detach from it, not wait at it,
because emitting tuples lets the program counter escape your control.
Generally, it's not safe to detach from a Barrier while accessing
resources whose lifetime it controls, such as a hash table, because
then it might go away underneath you.

The PHJ plans that are supported currently adhere to that programming
rule and so don't have a problem: after the Barrier reaches the
probing phase, processes never wait for each other again so they're
free to begin emitting tuples.  They just detach when they're done
probing, and the last to detach cleans up (frees the hash table etc).
If there is more than one batch, they detach from one batch and attach
to another when they're ready (each batch has its own Barrier), so we
can consider the batches to be entirely independent.

There is probably a way to make a scan-for-unmatched-inner phase work,
possibly involving another Barrier or something like that, but I ran
out of time trying to figure it out and wanted to ship a working PHJ
for the more common plan types.  I suppose PHLJ will face two variants
of this problem: (1) you need to synchronise the loops (you can't dump
the hash table in preparation for the next loop until all have
finished probing for the current loop), and yet you've already emitted
tuples, so you're not allowed to wait for other processes and they're
not allowed to wait for you, and (2) you can't start the
scan-for-unmatched-outer until all the probe loops belonging to one
batch are done.  The first problem is sort of analogous to a problem I
faced with batches in the first place, which Robert and I found a
solution to by processing the batches in parallel, and could perhaps
be solved in the same way: run the loops in parallel (if that sounds
crazy, recall that every worker has its own quota of work_mem and the
data is entirely prepartitioned up front, which is why we are able to
run the batches in parallel; in constrast, single-batch mode makes a
hash table with a quota of nparticipants * work_mem).  The second
problem is sort of analogous to the existing scan-for-unmatched-inner
problem that I haven't solved.


I "solved" these problem for now by having all workers except for one
detach from the outer batch file after finishing probing. The last
worker to arrive does not detach from the batch and instead iterates
through all of the workers' outer match status files per participant
shared mem SharedTuplestoreParticipant) and create a single unified
bitmap. All the other workers continue to wait at the barrier until
the sole remaining worker has finished with iterating through the
outer match status bitmap files.

Admittedly, I'm still fighting with this step a bit, but, my intent is
to have all the backends wait until the lone remaining worker has
created the unified bitmap, then, that worker, which is still attached
to the outer batch will scan the outer batch file and the unified
outer match status bitmap and emit unmatched tuples.

I thought that the other workers can move on and stop waiting at the
barrier once the lone remaining worker has scanned their outer match
status files. All the probe loops would be done, and the worker that
is emitting tuples is not referencing the inner side hashtable at all
and only the outer batch file and the combined bitmap.

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Fri, Sep 06, 2019 at 10:54:13AM -0700, Melanie Plageman wrote:
>On Thu, Sep 5, 2019 at 10:35 PM Thomas Munro <thomas.munro@gmail.com> wrote:
>
>> Seems like a good time for me to try to summarise what I think the
>> main problems are here:
>>
>> 1.  The match-bit storage problem already discussed.  The tuples that
>> each process receives while reading from SharedTupleStore are
>> non-deterministic (like other parallel scans).  To use a bitmap-based
>> approach, I guess we'd need to invent some way to give the tuples a
>> stable identifier within some kind of densely packed number space that
>> we could use to address the bitmap, or take the IO hit and write all
>> the tuples back.  That might involve changing the way SharedTupleStore
>> holds data.
>>
>
>This I've dealt with by adding a tuplenum to the SharedTupleStore
>itself which I atomically increment in sts_puttuple().
>In ExecParallelHashJoinPartitionOuter(), as each worker writes tuples
>to the batch files, they call sts_puttuple() and this increments the
>number so each tuple has a unique number.
>For persisting this number, I added the tuplenum to the meta data
>section of the MinimalTuple (along with the hashvalue -- there was a
>comment about this meta data that said it could be used for other
>things in the future, so this seemed like a good place to put it) and
>write that out to the batch file.
>
>At the end of ExecParallelHashJoinPartitionOuter(), I make the outer
>match status bitmap file. I use the final tuplenum count to determine
>the number of bytes to write to it. Each worker has a file with a
>bitmap which has the number of bytes required to represent the number
>of tuples in that batch.
>
>Because one worker may beat the other(s) and build the whole batch
>file for a batch before the others have a chance, I also make the
>outer match status bitmap file for workers who missed out in
>ExecParallelHashJoinOuterGetTuple() using the final tuplenum as well.
>

That seems like a perfectly sensible solution to me. I'm sure there are
ways to optimize it (say, having a bitmap optimized for sparse data, or
bitmap shared by all the workers or something like that), but that's
definitely not needed for v1.

Even having a bitmap per worker is pretty cheap. Assume we have 1B rows,
the bitmap is 1B/8 bytes = ~120MB per worker. So with 16 workers that's
~2GB, give or take. But with 100B rows, the original data is ~100GB. So
the bitmaps are not free, but it's not terrible either.

>>
>> 2.  Tricky problems relating to barriers and flow control.  First, let
>> me explain why PHJ doesn't support full/right outer joins yet.  At
>> first I thought it was going to be easy, because, although the shared
>> memory hash table is read-only after it has been built, it seems safe
>> to weaken that only slightly and let the match flag be set by any
>> process during probing: it's OK if two processes clobber each other's
>> writes, as the only transition is a single bit going strictly from 0
>> to 1, and there will certainly be a full memory barrier before anyone
>> tries to read those match bits.  Then during the scan for unmatched,
>> you just have to somehow dole out hash table buckets or ranges of
>> buckets to processes on a first-come-first-served basis.  But.... then
>> I crashed into the following problem:
>>
>> * You can't begin the scan for unmatched tuples until every process
>> has finished probing (ie until you have the final set of match bits).
>> * You can't wait for every process to finish probing, because any
>> process that has emitted a tuple might never come back if there is
>> another node that is also waiting for all processes (ie deadlock
>> against another PHJ doing the same thing), and probing is a phase that
>> emits tuples.
>>
>> Generally, it's not safe to emit tuples while you are attached to a
>> Barrier, unless you're only going to detach from it, not wait at it,
>> because emitting tuples lets the program counter escape your control.
>> Generally, it's not safe to detach from a Barrier while accessing
>> resources whose lifetime it controls, such as a hash table, because
>> then it might go away underneath you.
>>
>> The PHJ plans that are supported currently adhere to that programming
>> rule and so don't have a problem: after the Barrier reaches the
>> probing phase, processes never wait for each other again so they're
>> free to begin emitting tuples.  They just detach when they're done
>> probing, and the last to detach cleans up (frees the hash table etc).
>> If there is more than one batch, they detach from one batch and attach
>> to another when they're ready (each batch has its own Barrier), so we
>> can consider the batches to be entirely independent.
>>
>> There is probably a way to make a scan-for-unmatched-inner phase work,
>> possibly involving another Barrier or something like that, but I ran
>> out of time trying to figure it out and wanted to ship a working PHJ
>> for the more common plan types.  I suppose PHLJ will face two variants
>> of this problem: (1) you need to synchronise the loops (you can't dump
>> the hash table in preparation for the next loop until all have
>> finished probing for the current loop), and yet you've already emitted
>> tuples, so you're not allowed to wait for other processes and they're
>> not allowed to wait for you, and (2) you can't start the
>> scan-for-unmatched-outer until all the probe loops belonging to one
>> batch are done.  The first problem is sort of analogous to a problem I
>> faced with batches in the first place, which Robert and I found a
>> solution to by processing the batches in parallel, and could perhaps
>> be solved in the same way: run the loops in parallel (if that sounds
>> crazy, recall that every worker has its own quota of work_mem and the
>> data is entirely prepartitioned up front, which is why we are able to
>> run the batches in parallel; in constrast, single-batch mode makes a
>> hash table with a quota of nparticipants * work_mem).  The second
>> problem is sort of analogous to the existing scan-for-unmatched-inner
>> problem that I haven't solved.
>>
>>
>I "solved" these problem for now by having all workers except for one
>detach from the outer batch file after finishing probing. The last
>worker to arrive does not detach from the batch and instead iterates
>through all of the workers' outer match status files per participant
>shared mem SharedTuplestoreParticipant) and create a single unified
>bitmap. All the other workers continue to wait at the barrier until
>the sole remaining worker has finished with iterating through the
>outer match status bitmap files.
>

Why did you put solved in quotation marks? This seems like a reasonable
solution to me, at least for now, but the quotation marks kinda suggest
you think it's either not correct or not good enough. Or did I miss some
flaw that makes this unacceptable?

>Admittedly, I'm still fighting with this step a bit, but, my intent is
>to have all the backends wait until the lone remaining worker has
>created the unified bitmap, then, that worker, which is still attached
>to the outer batch will scan the outer batch file and the unified
>outer match status bitmap and emit unmatched tuples.
>

Makes sense, I think.

The one "issue" this probably has is that it serializes the last step, 
i.e. the search for unmatched tuples is done in a single process, instead
of parallelized over multiple workers. That's certainly unfortunate, but 
is that really an issue in practice? Probably not for queries with just a
small number of unmatched tuples. And for cases with many unmatched rows 
it's probably going to degrade to non-parallel case.

>I thought that the other workers can move on and stop waiting at the
>barrier once the lone remaining worker has scanned their outer match
>status files. All the probe loops would be done, and the worker that
>is emitting tuples is not referencing the inner side hashtable at all
>and only the outer batch file and the combined bitmap.
>

Why would the workers need to wait for the lone worker to scan their
bitmap file? Or do the files disappear with the workers, or something
like that? 

regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services




Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:
So, I finally have a prototype to share of parallel hashloop fallback.

See the commit message for a full description of the functionality of the
patch.

This patch does contain refactoring of nodeHashjoin.

I have split the Parallel HashJoin and Serial HashJoin state machines
up, as they were diverging in my patch to a point that made for a
really cluttered ExecHashJoinImpl() (ExecHashJoinImpl() is now gone).

The reason I didn't do this refactoring in one patch and then put the
adaptive hashjoin code on top of it is that I might like to make
Parallel HashJoin and Serial HashJoin different nodes.

I think that has been discussed elsewhere and was looking to
understand the rationale for keeping them in the same node.

The patch is a rough prototype. Below are some of the high-level
pieces of work that I plan to do next. (there are many TODOs in the
code as well).

Some of the major outstanding work:

- correctness:
  - haven't tried it with anti-joins and don't think it works
  - number of batches is not deterministic from run-to-run

- performance:
  - join_hash.sql is *much* slower.
    While there are loads of performance fixes needed in the patch,
    the basic criteria for "falling back" is likely the culprit here.
  - There are many bottlenecks (there are several places where a
    barrier could be moved to somewhere less hot, an atomic used
    instead of a lock, or a method of coordination could be used to
    allow workers to do backend-local accounting and aggregate it)
  - need to make sure it does not create outer match status files when
    it shouldn't (inner joins, for example)

- testing:
  - many unexercised cases
  - add number of chunks to EXPLAIN (for users and for testing)

- refactoring:
  - The match status bitmap should have its own API or, at least,
    manipulation of it should be done in a centralized set of
    functions
  - Rename "chunk" (as in chunks of inner side) to something that is
    not already used in the context of memory chunks and, more
    importantly, SharedTuplestoreChunk
  - Make references to "hashloop fallback" and "adaptive hashjoin"
    more consistent
  - Rename adaptiveHashjoin.h/.c files and change what is in the files
    which are separate from nodeHashjoin.h/.c (depending on outcome of
    "new node")
  - The state machines are big and unwieldy now, so, there is probably
    some larger restructuring that could be done
  - Should probably use the ParallelHashJoinBatchAccessor to access
    the ParallelHashJoinBatch everywhere (realized this recently)

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
On Mon, Dec 30, 2019 at 4:34 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> So, I finally have a prototype to share of parallel hashloop fallback.

Hi Melanie,

Thanks for all your continued work on this!  I started looking at it
today; it's a difficult project and I think it'll take me a while to
grok.  I do have some early comments though:

* I am uneasy about BarrierArriveExplicitAndWait() (a variant of
BarrierArriveAndWait() that lets you skip directly to a given phase?);
perhaps you only needed that for a circular phase system, which you
could do with modular phase numbers, like PHJ_GROW_BATCHES_PHASE?  I
tried to make the barrier interfaces look like the libraries in other
parallel programming environments, and I'd be worried that the
explicit phase thing could easily lead to bugs.
* It seems a bit strange to have "outer_match_status_file" in
SharedTupleStore; something's gone awry layering-wise there.
* I'm not sure it's OK to wait at the end of each loop, as described
in the commit message:

    Workers probing a fallback batch will wait until all workers have
    finished probing before moving on so that an elected worker can read
    and combine the outer match status files into a single bitmap and use
    it to emit unmatched outer tuples after all chunks of the inner side
    have been processed.

Maybe I misunderstood completely, but that seems to break the
programming rule described in nodeHashjoin.c's comment beginning "To
avoid deadlocks, ...".  To recap: (1) When you emit a tuple, the
program counter escapes to some other node, and maybe that other node
waits for thee, (2) Maybe the leader is waiting for you but you're
waiting for it to drain its queue so you can emit a tuple (I learned a
proper name for this: "flow control deadlock").  That's why the
current code only ever detaches (a non-waiting operation) after it's
begun emitting tuples (that is, the probing phase).  It just moves
onto another batch.  That's not a solution here: you can't simply move
to another loop, loops are not independent of each other like batches.
It's possible that barriers are not the right tool for this part of
the problem, or that there is a way to use a barrier that you don't
remain attached to while emitting, or that we should remove the
deadlock risks another way entirely[1] but I'm not sure.  Furthermore,
the new code in ExecParallelHashJoinNewBatch() appears to break the
rule even in the non-looping case (it calls BarrierArriveAndWait() in
ExecParallelHashJoinNewBatch(), where the existing code just
detaches).

> This patch does contain refactoring of nodeHashjoin.
>
> I have split the Parallel HashJoin and Serial HashJoin state machines
> up, as they were diverging in my patch to a point that made for a
> really cluttered ExecHashJoinImpl() (ExecHashJoinImpl() is now gone).

Hmm.  I'm rather keen on extending that technique further: I'd like
there to be more configuration points in the form of parameters to
that function, so that we write the algorithm just once but we
generate a bunch of specialised variants that are the best possible
machine code for each combination of parameters via constant-folding
using the "always inline" trick (steampunk C++ function templates).
My motivations for wanting to do that are: supporting different hash
sizes (CF commit e69d6445), removing branches for unused optimisations
(eg skew), and inlining common hash functions.  That isn't to say we
couldn't have two different templatoid functions from which many
others are specialised, but I feel like that's going to lead to a lot
of duplication.

> The reason I didn't do this refactoring in one patch and then put the
> adaptive hashjoin code on top of it is that I might like to make
> Parallel HashJoin and Serial HashJoin different nodes.
>
> I think that has been discussed elsewhere and was looking to
> understand the rationale for keeping them in the same node.

Well, there is a discussion about getting rid of the Hash node, since
it's so tightly coupled with Hash Join that it might as well not exist
as a separate entity.  (Incidentally, I noticed in someone's blog that
MySQL now shows Hash separately in its PostgreSQL-style EXPLAIN
output; now we'll remove it, CF the Dr Seuss story about the
Sneetches).  But as for Parallel Hash Join vs [Serial] Hash Join, I
think it makes sense to use the same node because they are
substantially the same thing, with optional extra magic, and I think
it's our job to figure out how to write code in a style that makes the
differences maintainable.  That fits into a general pattern that
"Parallel" is a mode, not a different node.  On the other hand, PHJ is
by far the most different from the original code, compared to things
like Parallel Sequential Scan etc.  FWIW I think we're probably in
relatively new territory here: as far as I know, other traditional
RDBMSs didn't really seem to have a concept like parallel-aware
executor nodes, because they tended to be based on partitioning, so
that the operators are all oblivious to parallelism and don't have to
share/coordinate anything at this level.  It seems that everyone is
now coming around to the view that shared hash table hash joins are a
good idea now that we have so many cores connected up to shared
memory.  Curiously, judging from another blog article I saw, on the
surface it looks like Oracle's brand new HASH JOIN SHARED is a
different operator than HASH JOIN (just an observation, I could be way
off and I don't know or want to know how that's done under the covers
in that system).

>   - number of batches is not deterministic from run-to-run

Yeah, I had a lot of fun with that sort of thing on the build farm
when PHJ was first committed, and the effects were different on
systems I don't have access to that have different sizeof() for
certain types.

>   - Rename "chunk" (as in chunks of inner side) to something that is
>     not already used in the context of memory chunks and, more
>     importantly, SharedTuplestoreChunk

+1.  Fragments?  Loops?  Blocks (from
https://en.wikipedia.org/wiki/Block_nested_loop, though, no, strike
that, blocks are also super overloaded).

[1]
https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:


On Tue, Jan 7, 2020 at 4:14 PM Thomas Munro <thomas.munro@gmail.com> wrote:
* I am uneasy about BarrierArriveExplicitAndWait() (a variant of
BarrierArriveAndWait() that lets you skip directly to a given phase?);
perhaps you only needed that for a circular phase system, which you
could do with modular phase numbers, like PHJ_GROW_BATCHES_PHASE?  I
tried to make the barrier interfaces look like the libraries in other
parallel programming environments, and I'd be worried that the
explicit phase thing could easily lead to bugs.

So, I actually use it to circle back up to the first phase while
skipping the last phase.
So I couldn't do it with modular phase numbers and a loop.
The last phase detaches from the chunk barrier. I don't want to detach
from the chunk barrier if there are more chunks.
I basically need a way to only attach to the chunk barrier at the
begininng of the first chunk and only detach at the end of the last
chunk--not in between chunks. I will return from the function and
re-enter between chunks -- say between chunk 2 and chunk 3 of 5.

However, could this be solved by having more than one chunk
barrier?
A worker would attach to one chunk barrier and then when it moves to
the next chunk it would attach to the other chunk barrier and then
switch back when it switches to the next chunk. Then it could detach
and attach each time it enters/leaves the function.
 
* I'm not sure it's OK to wait at the end of each loop, as described
in the commit message:

    Workers probing a fallback batch will wait until all workers have
    finished probing before moving on so that an elected worker can read
    and combine the outer match status files into a single bitmap and use
    it to emit unmatched outer tuples after all chunks of the inner side
    have been processed.

Maybe I misunderstood completely, but that seems to break the
programming rule described in nodeHashjoin.c's comment beginning "To
avoid deadlocks, ...".  To recap: (1) When you emit a tuple, the
program counter escapes to some other node, and maybe that other node
waits for thee, (2) Maybe the leader is waiting for you but you're
waiting for it to drain its queue so you can emit a tuple (I learned a
proper name for this: "flow control deadlock").  That's why the
current code only ever detaches (a non-waiting operation) after it's
begun emitting tuples (that is, the probing phase).  It just moves
onto another batch.  That's not a solution here: you can't simply move
to another loop, loops are not independent of each other like batches.
It's possible that barriers are not the right tool for this part of
the problem, or that there is a way to use a barrier that you don't
remain attached to while emitting, or that we should remove the
deadlock risks another way entirely[1] but I'm not sure.  Furthermore,
the new code in ExecParallelHashJoinNewBatch() appears to break the
rule even in the non-looping case (it calls BarrierArriveAndWait() in
ExecParallelHashJoinNewBatch(), where the existing code just
detaches).

Yea, I think I'm totally breaking that rule.
Just to make sure I understand the way in which I am breaking that
rule:

In my patch, while attached to a chunk_barrier, worker1 emits a
matched tuple (control leaves the current node).  Meanwhile, worker2
has finished probing the chunk and is waiting on the chunk_barrier for
worker1.
How though could worker1 be waiting for worker2?

Is this only a problem when one of the barrier participants is the
leader and is reading from the tuple queue? (reading your tuple queue
deadlock hazard example in the thread [1] you referred to).
Basically is my deadlock hazard a tuple queue deadlock hazard?

I thought maybe this could be a problem with nested HJ nodes, but I'm
not sure.

As I understand it, this isn't a problem with current master with
batch barriers because while attached to a batch_barrier, a worker can
emit tuples. No other workers will wait on the batch barrier once they
have started probing.

I need to think more about the suggestions you provided in [1] about
nixing the tuple queue deadlock hazard.

However, hypothetically, if we decide we don't want to break the no
emitting tuples while attached to a barrier rule, how can we still
allow workers to coordinate while probing chunks of the batch
sequentially (1 chunk at a time)?

I could think of two options (both sound slow and bad):

Option 1:
Stash away the matched tuples in a tuplestore and emit them at the end
of the batch (incurring more writes).

Option 2:
Degenerate to 1 worker for fallback batches

Any other ideas?
 

>   - Rename "chunk" (as in chunks of inner side) to something that is
>     not already used in the context of memory chunks and, more
>     importantly, SharedTuplestoreChunk

+1.  Fragments?  Loops?  Blocks (from
https://en.wikipedia.org/wiki/Block_nested_loop, though, no, strike
that, blocks are also super overloaded).

Hmmm. I think loop is kinda confusing. "fragment" has potential.
I also thought of "piece". That is actually where I am leaning now.
What do you think?

[1] https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com

--
Melanie Plageman

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Tue, Jan 7, 2020 at 4:14 PM Thomas Munro <thomas.munro@gmail.com> wrote:
* I am uneasy about BarrierArriveExplicitAndWait() (a variant of
BarrierArriveAndWait() that lets you skip directly to a given phase?);
perhaps you only needed that for a circular phase system, which you
could do with modular phase numbers, like PHJ_GROW_BATCHES_PHASE?  I
tried to make the barrier interfaces look like the libraries in other
parallel programming environments, and I'd be worried that the
explicit phase thing could easily lead to bugs.

BarrierArriveExplicitAndWait() is gone now due to the refactor to
address the barrier waiting deadlock hazard (mentioned below).
 
* It seems a bit strange to have "outer_match_status_file" in
SharedTupleStore; something's gone awry layering-wise there.

outer_match_status_file is now out of the SharedTuplestore. Jesse
Zhang and I worked on a new API, SharedBits, for workers to
collaboratively make a bitmap and then used it for the outer match
status file and the combined bitmap file
(v4-0004-Add-SharedBits-API.patch).

The SharedBits API is modeled closely after the SharedTuplestore API.
It uses a control object in shared memory to synchronize access to
some files in a SharedFileset and maintains some participant-specific
shared state. The big difference (other than that the files are for
bitmaps and not tuples) is that each backend writes to its file in one
phase and a single backend reads from all of the files and combines
them in another phase.
In other words, it supports parallel write but not parallel scan (and
not concurrent read/write). This could definitely be modified in the
future.
   
Also, the SharedBits uses a SharedFileset which uses BufFiles. This is
not the ideal API for the bitmap. The access pattern is small sequential
writes and random reads. It would also be nice to maintain the fixed
size buffer but have an API that let us write an arbitrary number of
bytes to it in bufsize chunks without incurring additional function call
overhead.
 
* I'm not sure it's OK to wait at the end of each loop, as described
in the commit message:

    Workers probing a fallback batch will wait until all workers have
    finished probing before moving on so that an elected worker can read
    and combine the outer match status files into a single bitmap and use
    it to emit unmatched outer tuples after all chunks of the inner side
    have been processed.

Maybe I misunderstood completely, but that seems to break the
programming rule described in nodeHashjoin.c's comment beginning "To
avoid deadlocks, ...".  To recap: (1) When you emit a tuple, the
program counter escapes to some other node, and maybe that other node
waits for thee, (2) Maybe the leader is waiting for you but you're
waiting for it to drain its queue so you can emit a tuple (I learned a
proper name for this: "flow control deadlock").  That's why the
current code only ever detaches (a non-waiting operation) after it's
begun emitting tuples (that is, the probing phase).  It just moves
onto another batch.  That's not a solution here: you can't simply move
to another loop, loops are not independent of each other like batches.
It's possible that barriers are not the right tool for this part of
the problem, or that there is a way to use a barrier that you don't
remain attached to while emitting, or that we should remove the
deadlock risks another way entirely[1] but I'm not sure.  Furthermore,
the new code in ExecParallelHashJoinNewBatch() appears to break the
rule even in the non-looping case (it calls BarrierArriveAndWait() in
ExecParallelHashJoinNewBatch(), where the existing code just
detaches).


So, after a more careful reading of the parallel full hashjoin email
[1], I think I understand the ways in which I am violating the rule in
nodeHashJoin.c.
I do have some questions about the potential solutions mentioned in
that thread, however, I'll pose those over there.

For adaptive hashjoin, for now, the options for addressing the barrier
wait hazard that Jesse and I came up with based on the PFHJ thread are:
- leader doesn't participate in fallback batches (has the downside of
  reduced parallelism and needing special casing when it ends up being
  the only worker because other workers get used for something else
  [like autovaccuum])
- use some kind of spool to avoid deadlock
- the original solution I proposed in which all workers detach from
  the batch barrier (instead of waiting)

I revisited the original solution I proposed and realized that I had
not implemented it as advertised. By reverting to the original
design, I can skirt the issue for now.

In the original solution I suggested, I mentioned all workers would
detach from the batch barrier and the last to detach would combine the
bitmaps.  That was not what I actually implemented (my patch had all
the workers wait on the barrier).

I've changed to actually doing this--which addresses some of the
potential deadlock hazard.

The two deadlock waits causing the deadlock hazard were waiting on the
chunk barrier and waiting on the batch barrier.  In order to fully
address the deadlock hazard, Jesse and I came up with the following
solution (in v4-0003-Address-barrier-wait-deadlock-hazard.patch in the
attached patchset) to each:

chunk barrier wait:
- instead of waiting on the chunk barrier when it is not in its final
  state and then reusing it and jumping back to the initial state,
  initialize an array of chunk barriers, one per chunk, and, workers
  only wait on a chunk barrier when it is in its final state. The last
  worker to arrive will increment the chunk number. All workers detach
  from the chunk barrier they are attached to and select the next
  chunk barrier

Jesse brought up that there isn't a safe time to reinitialize the
chunk barrier, so reusing it doesn't seem like a good idea.

batch barrier wait:
- In order to mitigate the other cause of deadlock hazard (workers
  wait on the batch barrier after emitting tuples), now, in
  ExecParallelHashJoinNewBatch(), if we are attached to a batch
  barrier and it is a fallback batch, all workers will detach from the
  batch barrier and then end their scan of that batch.  The last
  worker to detach will combine the outer match status files, then it
  will detach from the batch, clean up the hashtable, and end its scan
  of the inner side.  Then it will return and proceed to emit
  unmatched outer tuples.
 
> This patch does contain refactoring of nodeHashjoin.
>
> I have split the Parallel HashJoin and Serial HashJoin state machines
> up, as they were diverging in my patch to a point that made for a
> really cluttered ExecHashJoinImpl() (ExecHashJoinImpl() is now gone).

Hmm.  I'm rather keen on extending that technique further: I'd like
there to be more configuration points in the form of parameters to
that function, so that we write the algorithm just once but we
generate a bunch of specialised variants that are the best possible
machine code for each combination of parameters via constant-folding
using the "always inline" trick (steampunk C++ function templates).
My motivations for wanting to do that are: supporting different hash
sizes (CF commit e69d6445), removing branches for unused optimisations
(eg skew), and inlining common hash functions.  That isn't to say we
couldn't have two different templatoid functions from which many
others are specialised, but I feel like that's going to lead to a lot
of duplication.


I'm okay with using templating. For now, while I am addressing large
TODO items with the patchset, I will keep them as separate functions.
Once it is in a better state, I will look at the overlap and explore
templating. The caveat here is if a lot of new commits start going
into nodeHashjoin.c and keeping this long-running branch rebased gets
painful.

The patchset has also been run through pg_indent, so,
v4-0001-Implement-Adaptive-Hashjoin.patch will look a bit different
than v3-0001-hashloop-fallback.patch, but, it is the same content.
v4-0002-Fixup-tupleMetadata-struct-issues.patch is just some other
fixups and small cosmetic changes.

The new big TODOs is to make a file type that suits the SharedBits API
better--but I don't want to do that unless the idea is validated.

[1] https://www.postgresql.org/message-id/flat/CA+hUKG+A6ftXPz4oe92+x8Er+xpGZqto70-Q_ERwRaSyA=afNg@mail.gmail.com
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:
I've implemented avoiding rescanning all inner tuples for each stripe
in the attached patch:
v5-0005-Avoid-rescanning-inner-tuples-per-stripe.patch

Patchset is rebased--and I had my first merge conflicts as I contend
with maintaining this long-running branch with large differences
between it and current hashjoin. I think I'll need to reconsider the
changes I've made if I want to make it maintainable.

As for patch 0005, not rescanning inner tuples for every stripe,
basically, instead of reinitializing the SharedTuplestore for the
inner side for each stripe (I'm using "stripe" from now on, but I
haven't done any retroactive renaming yet) during fallback, each
participant's read_page is set to the beginning of the
SharedTuplestoreChunk which contains the end of one stripe and the
beginning of another.

Previously all inner tuples were scanned and only tuples from the
current stripe were loaded.

Each SharedTuplestoreAccessor now has a variable "start_page", which
is initialized when it is assigned its read_page (which will always be
the beginning of a SharedTuplestoreChunk).

While loading tuples into the hashtable, if a tuple is from a past
stripe, the worker skips it (that will happen when a stripe straddles
two SharedTuplestoreChunks). If a tuple is from the future, the worker
backs that SharedTuplestoreChunk out and sets the shared read_page (in
the shared SharedTuplestoreParticipant) back to its start_page.

There are a couple mechanisms to provide for synchronization that
address specific race conditions/synchronization points -- those
scenarios are laid out in the commit message.

The first is a rule that a worker can only set read_page to a
start_page which is less than the current value of read_page.

The second is a "rewound" flag in the SharedTuplestoreParticipant. It
indicates if this participant has been rewound during loading of the
current stripe. If it has, a worker cannot be assigned a
SharedTuplestoreChunk. This flag is reset between stripes.

In this patch, Hashjoin makes an unacceptable intrusion into the
SharedTuplestore API. I am looking for feedback on how to solve this.

Basically, because the SharedTuplestore does not know about stripes or
about HashJoin, the logic to decide if a tuple should be loaded into a
hashtable or not is in the stripe phase machine where tuples are loaded
into the hashtable.

So, to ensure that workers have read from all participant files before
assuming all tuples from a stripe are loaded, I have duplicated the
logic from sts_parallel_scan_next() which has workers get the next
participant file and added it into in the body of the tuple loading
loop in the stripe phase machine (see sts_ready_for_next_stripe() and
sts_seen_all_participants()).

This clearly needs to be fixed and it is arguable that there are other
intrusions into the SharedTuplestore API in these patches.

One option is to write each stripe for each participant to a different
file, preserving the idea that a worker is done with a read_file when it
is at EOF.

Outside of addressing the relationship between SharedTuplestore,
stripes, and Hashjoin, I have re-prioritized the next steps for the
patch as follows:

Next Steps:
1) Rename "chunk" to "stripe"
1) refine fallback logic
3) refactor code to make it easier to keep it rebased
4) EXPLAIN ANALYZE instrumentation to show stripes probed by workers
5) anti/semi-join support

1)
The chunk/stripe thing is becoming extremely confusing.

2)
I re-prioritized refining the fallback logic because the premature
disabling of growth in serial hashjoin is making the join_hash test so
slow that it is slowing down iteration speed for me.

3)
I am wondering if Thomas Munro's idea to template-ize Hashjoin [1]
would make maintaining the diff easier, harder, or no different. The
code I've added made the main hashjoin state machine incredibly long,
so I broke it up into Parallel Hashjoin and Serial Hashjoin to make it
more manageable. This, of course, lends itself to difficult rebasing
(luckily only one small commit has been made to nodeHashjoin.c). If
the template-ization were to happen sooner, I could refactor my code
so that there were at least the same function names and the diffs
would be more clear.

4)
It is important that I have some way of knowing if I'm even exercising
code that I'm adding that involves multiple workers probing the same
stripes. As I make changes to the code, even though it will not
necessarily be deterministic, I can change the tests if I am no longer
able to get any of the concurrent behavior I'm looking for.

5)
Seems like it's time

[1] https://www.postgresql.org/message-id/CA%2BhUKGJjs6H77u%2BPL3ovMSowFZ8nib9Z%2BnHGNF6YNmw6osUU%2BA%40mail.gmail.com

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:
I've attached a patch which should address some of the previous feedback
about code complexity. Two of my co-workers and I wrote what is
essentially a new prototype of the idea. It uses the main state machine
to route emitting unmatched tuples instead of introducing a separate
state. The logic for falling back is also more developed.

In addition to many assorted TODOs in the code, there are a few major
projects left:
- Batch 0 falling back
- Stripe barrier deadlock
- Performance improvements and testing

I will address the stripe barrier deadlock here. David is going to send
a separate email about batch 0 falling back.

There is a deadlock hazard in parallel hashjoin (pointed out by Thomas
Munro in the past). Workers attached to the stripe_barrier emit tuples
and then wait on that barrier.
I believe that that can be addressed starting with this
relatively unoptimized solution:
- after probing a stripe in a batch, a worker sets the status of that
  batch to "tentatively done" and saves the stripe_barrier phase
- if that worker is not the only worker attached to that batch, it
  detaches from both stripe and batch barriers and moves on to other
  batches
- if that worker is the only worker attached to the batch, it will
  proceed to load the next stripe of that batch, and, once it has
  finished loading, it will set the status of the batch back to "not
  done" for itself
- when the other worker encounters that batch again, if the
  stripe_barrier phase has not moved forward, it will mark that batch as
  done for itself. if the stripe_barrier phase has moved forward, it can
  join in in probing this batch for the current stripe.
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
David Kimura
Date:
On Wed, Apr 29, 2020 at 4:39 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
>
> In addition to many assorted TODOs in the code, there are a few major
> projects left:
> - Batch 0 falling back
> - Stripe barrier deadlock
> - Performance improvements and testing
>

Batch 0 never spills.  That behavior is an artifact of the existing design that
as an optimization special cases batch 0 to fill the initial hash table. This
means it can skip loading and doesn't need to create a batch file.

However in the pathalogical case where all tuples hash to batch 0 there is no
way to redistribute those tuples to other batches. So, existing hash join
implementation allows work_mem to be exceeded for batch 0.

In adaptive hash join approach, there is another way to deal with a batch that
exceeds work_mem. If increasing the number of batches does not work then the
batch can be split into stripes that will not exceed work_mem. Doing this
requires spilling the excess tuples to batch files. Following patch adds logic
to create a batch 0 file for serial hash join so that even in pathalogical case
we do not need to exceed work_mem.

Thanks,
David

Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Tue, Apr 28, 2020 at 11:50 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:
On 29/04/2020 05:03, Melanie Plageman wrote:
> I've attached a patch which should address some of the previous feedback
> about code complexity. Two of my co-workers and I wrote what is
> essentially a new prototype of the idea. It uses the main state machine
> to route emitting unmatched tuples instead of introducing a separate
> state. The logic for falling back is also more developed.

I haven't looked at the patch in detail, but thanks for the commit
message; it describes very well what this is all about. It would be nice
to copy that explanation to the top comment in nodeHashJoin.c in some
form. I think we're missing a high level explanation of how the batching
works even before this new patch, and that commit message does a good
job at it.


Thanks for taking a look, Heikki!

I made a few edits to the message and threw it into a draft patch (on
top of master, of course). I didn't want to junk up peoples' inboxes, so
I didn't start a separate thread, but, it will be pretty hard to
collaboratively edit the comment/ever register it for a commitfest if it
is wedged into this thread. What do you think?

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Alvaro Herrera
Date:
On 2020-Apr-30, Melanie Plageman wrote:

> On Tue, Apr 28, 2020 at 11:50 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

> > I haven't looked at the patch in detail, but thanks [...]

> Thanks for taking a look, Heikki!

Hmm.  We don't have Heikki's message in the archives.  In fact, the last
message from Heikki we seem to have in any list is
cca4e4dc-32ac-b9ab-039d-98dcb5650791@iki.fi dated February 19 in
pgsql-bugs.  I wonder if there's some problem between Heikki and the
lists.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Thomas Munro
Date:
On Fri, May 1, 2020 at 2:30 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> I made a few edits to the message and threw it into a draft patch (on
> top of master, of course). I didn't want to junk up peoples' inboxes, so
> I didn't start a separate thread, but, it will be pretty hard to
> collaboratively edit the comment/ever register it for a commitfest if it
> is wedged into this thread. What do you think?

+1, this is a good description and I'm sure you're right about the
name of the algorithm.  It's a "hybrid" between a simple no partition
hash join, and partitioning like the Grace machine, since batch 0 is
processed directly without touching the disk.

You mention that PHJ finalises the number of batches during build
phase while SHJ can extend it later.  There's also a difference in the
probe phase: although inner batch 0 is loaded into the hash table
directly and not written to disk during the build phase (= classic
hybrid, just like the serial algorithm), outer batch 0 *is* written
out to disk at the start of the probe phase (unlike classic hybrid at
least as we have it for serial hash join).  That's because I couldn't
figure out how to begin emitting tuples before partitioning was
finished, without breaking the deadlock-avoidance programming rule
that you can't let the program counter escape from the node when
someone might wait for you.  So maybe it's erm, a hybrid between
hybrid and Grace...



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
David Kimura
Date:
On Wed, Apr 29, 2020 at 4:44 PM David Kimura <david.g.kimura@gmail.com> wrote:
>
> Following patch adds logic to create a batch 0 file for serial hash join so
> that even in pathalogical case we do not need to exceed work_mem.

Updated the patch to spill batch 0 tuples after it is marked as fallback.

A couple questions from looking more at serial code:

1) Does the current pattern to repartition batches *after* the previous
   hashtable insert exceeds work_mem still make sense?

   In that case we'd allow ourselves to exceed work_mem by one tuple. If that
   doesn't seem correct anymore then I think we can move the space exceeded
   check in ExecHashTableInsert() *before* actual hashtable insert.

2) After batch 0 is marked fallback, does the logic to insert into its batch
   file fit more in MultiExecPrivateHash() or ExecHashTableInsert()?

   The latter already has logic to decide whether to insert into hashtable or
   batchfile

Thanks,
David

Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Tue, Apr 28, 2020 at 7:03 PM Melanie Plageman <melanieplageman@gmail.com> wrote:

There is a deadlock hazard in parallel hashjoin (pointed out by Thomas
Munro in the past). Workers attached to the stripe_barrier emit tuples
and then wait on that barrier.
I believe that that can be addressed starting with this
relatively unoptimized solution:
- after probing a stripe in a batch, a worker sets the status of that
  batch to "tentatively done" and saves the stripe_barrier phase
- if that worker is not the only worker attached to that batch, it
  detaches from both stripe and batch barriers and moves on to other
  batches
- if that worker is the only worker attached to the batch, it will
  proceed to load the next stripe of that batch, and, once it has
  finished loading, it will set the status of the batch back to "not
  done" for itself
- when the other worker encounters that batch again, if the
  stripe_barrier phase has not moved forward, it will mark that batch as
  done for itself. if the stripe_barrier phase has moved forward, it can
  join in in probing this batch for the current stripe.

 
Just to follow-up on the stripe barrier deadlock, I've implemented a
solution and attached it.

There are three solutions I've thought about so far:

1)  leaders don't participate in fallback batches
2)  serial after stripe 0
      no worker can join a batch after any worker has left and only one
      worker can work on stripes after stripe 0
3)  provisionally complete batches
      After the end of stripe 0, all workers except the last worker
      detach from the stripe barrier, mark the batch as provisionally
      done, save the stripe barrier phase, and move on to another batch.
      Later, when one of these workers returns to the batch, if it is
      not already done, the worker checks to see if the phase of the
      stripe barrier has advanced. If the phase has advanced, it means
      that no one is waiting for that worker. The worker can join that
      batch. If the phase hasn't advanced, the worker won't risk
      deadlock and will simply mark the batch as done. The last worker
      executes the normal path -- participating in each stripe.

I've attached a patch to implement solution 3
v7-0002-Provisionally-detach-unless-last-worker.patch

This isn't a very optimized version of this solution. It detaches from
the stripe barrier and closes the outer match status bitmap upon
provisional completion by a worker. However, I ran into some problems
keeping outer match status bitmaps open for multiple batches at a time.

I've also attached the original adaptive hashjoin patch with a couple
small tweaks (not quite meriting a patch version bump, but that seemed
like the easiest way).

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:
I've attached a rebased patch which includes the "provisionally detach"
deadlock hazard fix approach as well as addresses some of the following
feedback from Jeff Davis provided off-list:

> Can you add some high-level comments that describe the algorithm and
> what the terms mean?

I added to the large comment at the top of nodeHashjoin.c. I've also
added comments to a few of the new members in some structs. Plus I've
added some in-line comments to assist the reviewer that may or may not
be overkill in a final version.

> Can you add some comments to describe what's happening when a batch is
> entering fallback mode?
...
> Can you add some comments describing tuple relocation?
...
> Can you describe somewhere what all the bits for outer matches are for?
All three done.

Also, we kept the batch 0 spilling patch David Kimura authored [1]
separate so it could be discussed separately because we still had some
questions.
It would be great to discuss those, however, keeping them separate might
be more confusing -- I'm not sure.

Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Wed, May 27, 2020 at 7:25 PM Melanie Plageman <melanieplageman@gmail.com> wrote:
I've attached a rebased patch which includes the "provisionally detach"
deadlock hazard fix approach

Alas, the "provisional detach" logic proved incorrect (see last point in
the list of changes included in the patch at bottom).
 
Also, we kept the batch 0 spilling patch David Kimura authored [1]
separate so it could be discussed separately because we still had some
questions.

The serial batch 0 spilling is in the attached patch. Parallel batch 0
spilling is still in a separate batch that David Kimura is working on.

I've attached a rebased and updated patch with a few fixes:

- semi-join fallback works now
- serial batch 0 spilling in main patch
- added instrumentation for stripes to the parallel case
- SharedBits uses same SharedFileset as SharedTuplestore
- reverted the optimization to allow workers to re-attach to a batch and
  help out with stripes if they are sure they pose no deadlock risk

For the last point, I discovered a pretty glaring problem with this
optimization: I did not include the bitmap created by a worker while
working on its first participating stripe in the final combined bitmap.
I only was combining the last bitmap file each worker worked on.

I had the workers make new bitmaps for each time that they attached to
the batch and participated because having them keep an open file
tracking information for a batch they are no longer attached to on the
chance that they might return and work on that batch was a
synchronization nightmare. It was difficult to figure out when to close
the file if they never returned and hard to make sure that the combining
worker is actually combining all the files from all participants who
were ever active.

I am sure I can hack around those, but I think we need a better solution
overall. After reverting those changes, loading and probing of stripes
after stripe 0 is serial. This is not only sub-optimal, it also means
that all the synchronization variables and code complexity around
coordinating work on fallback batches is practically wasted.
So, they have to be able to collaborate on stripes after the first
stripe. This version of the patch has correct results and no deadlock
hazard, however, it lacks parallelism on stripes after stripe 0.
I am looking for ideas on how to address the deadlock hazard more
efficiently.

The next big TODOs are:
- come up with a better solution to the potential tuple emitting/barrier
  waiting deadlock issue
- parallel batch 0 spilling complete

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Mon, Jun 08, 2020 at 05:12:25PM -0700, Melanie Plageman wrote:
>On Wed, May 27, 2020 at 7:25 PM Melanie Plageman <melanieplageman@gmail.com>
>wrote:
>
>> I've attached a rebased patch which includes the "provisionally detach"
>> deadlock hazard fix approach
>>
>
>Alas, the "provisional detach" logic proved incorrect (see last point in
>the list of changes included in the patch at bottom).
>
>
>> Also, we kept the batch 0 spilling patch David Kimura authored [1]
>> separate so it could be discussed separately because we still had some
>> questions.
>>
>
>The serial batch 0 spilling is in the attached patch. Parallel batch 0
>spilling is still in a separate batch that David Kimura is working on.
>
>I've attached a rebased and updated patch with a few fixes:
>
>- semi-join fallback works now
>- serial batch 0 spilling in main patch
>- added instrumentation for stripes to the parallel case
>- SharedBits uses same SharedFileset as SharedTuplestore
>- reverted the optimization to allow workers to re-attach to a batch and
>  help out with stripes if they are sure they pose no deadlock risk
>
>For the last point, I discovered a pretty glaring problem with this
>optimization: I did not include the bitmap created by a worker while
>working on its first participating stripe in the final combined bitmap.
>I only was combining the last bitmap file each worker worked on.
>
>I had the workers make new bitmaps for each time that they attached to
>the batch and participated because having them keep an open file
>tracking information for a batch they are no longer attached to on the
>chance that they might return and work on that batch was a
>synchronization nightmare. It was difficult to figure out when to close
>the file if they never returned and hard to make sure that the combining
>worker is actually combining all the files from all participants who
>were ever active.
>
>I am sure I can hack around those, but I think we need a better solution
>overall. After reverting those changes, loading and probing of stripes
>after stripe 0 is serial. This is not only sub-optimal, it also means
>that all the synchronization variables and code complexity around
>coordinating work on fallback batches is practically wasted.
>So, they have to be able to collaborate on stripes after the first
>stripe. This version of the patch has correct results and no deadlock
>hazard, however, it lacks parallelism on stripes after stripe 0.
>I am looking for ideas on how to address the deadlock hazard more
>efficiently.
>
>The next big TODOs are:
>- come up with a better solution to the potential tuple emitting/barrier
>  waiting deadlock issue
>- parallel batch 0 spilling complete
>


Hi Melanie,

I started looking at the patch to refresh my knowledge both of this
patch and parallel hash join, but I think it needs a rebase. The
changes in 7897e3bb90 apparently touched some of the code. I assume
you're working on a patch addressing the remaining TODOS, right?

I see you've switched to "stripe" naming - I find that a bit confusing,
because when I hear stripe I think about RAID, where it means pieces of
data interleaved and stored on different devices. But maybe that's just
me and it's a good name. Maybe it'd be better to keep the naming and
only tweak it at the end, not to disrupt reviews unnecessarily.

Now, a couple comments / questions about the code.


nodeHash.c
----------


1) MultiExecPrivateHash says this

   /*
    * Not subject to skew optimization, so either insert normally
    * or save to batch file if it belongs to another stripe
    */

I wonder what it means to "belong to another stripe". I understand what
that means for batches, which are identified by batchno computed from
the hash value. But I thought "stripes" are just work_mem-sized pieces
of a batch, so I don't quite understand this. Especially when the code
does not actually check "which stripe" the row belongs to.


2) I find the fields hashloop_fallback rather confusing. We have one in
HashJoinTable (and it's array of BufFile items) and another one in
ParallelHashJoinBatch (this time just bool).

I think HashJoinTable should be renamed to hashloopBatchFile (similarly
to the other BufFile arrays). Although I'm not sure why we even need
this file, when we have innerBatchFile? BufFile(s) are not exactly free,
in fact it's one of the problems for hashjoins with many batches.



3) I'm a bit puzzled about this formula in ExecHashIncreaseNumBatches

   childbatch = (1U << (my_log2(hashtable->nbatch) - 1)) | hashtable->curbatch;

and also about this comment

   /*
    * TODO: what to do about tuples that don't go to the child
    * batch or stay in the current batch? (this is why we are
    * counting tuples to child and curbatch with two diff
    * variables in case the tuples go to a batch that isn't the
    * child)
    */
   if (batchno == childbatch)
     childbatch_outgoing_tuples++;

I thought each old batch is split into two new ones, and the tuples
either stay in the current one, or are moved to the new one - which I
presume is the childbatch, although I haven't tried to decode that
formula. So where else could the tuple go, as the comment tried to
suggest?



regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Jesse Zhang
Date:
Hi Tomas,

On Tue, Jun 23, 2020 at 3:24 PM Tomas Vondra wrote:
>
> Now, a couple comments / questions about the code.
>
>
> nodeHash.c
> ----------
>
>
> 1) MultiExecPrivateHash says this
>
>    /*
>     * Not subject to skew optimization, so either insert normally
>     * or save to batch file if it belongs to another stripe
>     */
>
> I wonder what it means to "belong to another stripe". I understand what
> that means for batches, which are identified by batchno computed from
> the hash value. But I thought "stripes" are just work_mem-sized pieces
> of a batch, so I don't quite understand this. Especially when the code
> does not actually check "which stripe" the row belongs to.

I have to concur that "stripe" did inspire a RAID vibe when I heard it,
but it seemed to be a better name than what it replaces

> 3) I'm a bit puzzled about this formula in ExecHashIncreaseNumBatches
>
>    childbatch = (1U << (my_log2(hashtable->nbatch) - 1)) | hashtable->curbatch;
>
> and also about this comment
>
>    /*
>     * TODO: what to do about tuples that don't go to the child
>     * batch or stay in the current batch? (this is why we are
>     * counting tuples to child and curbatch with two diff
>     * variables in case the tuples go to a batch that isn't the
>     * child)
>     */
>    if (batchno == childbatch)
>      childbatch_outgoing_tuples++;
>
> I thought each old batch is split into two new ones, and the tuples
> either stay in the current one, or are moved to the new one - which I
> presume is the childbatch, although I haven't tried to decode that
> formula. So where else could the tuple go, as the comment tried to
> suggest?

True, every old batch is split into two new ones, if you only consider
tuples coming from the batch file that _still belong in there_. i.e.
there are tuples in the old batch file that belong to a future batch. As
an example, if the current nbatch = 8, and we want to expand to nbatch =
16, (old) batch 1 will split into (new) batch 1 and batch 9, but it can
already contain tuples that need to go into (current) batches 3, 5, and
7 (soon-to-be batches 11, 13, and 15).

Cheers,
Jesse



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:

On Tue, Jun 23, 2020 at 3:24 PM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:
I started looking at the patch to refresh my knowledge both of this
patch and parallel hash join, but I think it needs a rebase. The
changes in 7897e3bb90 apparently touched some of the code.

Thanks so much for the review, Tomas!

I've attached a rebased patch which also contains updates discussed
below.
 
I assume
you're working on a patch addressing the remaining TODOS, right?

I wanted to get some feedback on the patch before working through the
TODOs to make sure I was on the right track.
Now that you are reviewing this, I will focus all my attention
on addressing your feedback. If there are any TODOs that you feel are
most important, let me know, so I can start with those. Otherwise, I
will prioritize parallel batch 0 spilling.

I wanted to get some feedback on the patch before working through the
TODOs to make sure I was on the right track.

Now that you are reviewing this, I will focus all my attention
on addressing your feedback. If there are any TODOs that you feel are
most important, let me know, so I can start with those.

Otherwise, I will prioritize parallel batch 0 spilling.
David Kimura plans to do a bit of work on on parallel hash join batch 0
spilling tomorrow. Whatever is left after that, I will pick up next
week. Parallel hash join batch 0 spilling is the last large TODO that I
had.

My plan was to then focus on the feedback (either about which TODOs are
most important or outside of the TODOs I've identified) I get from you
and anyone else who reviews this.
 

I see you've switched to "stripe" naming - I find that a bit confusing,
because when I hear stripe I think about RAID, where it means pieces of
data interleaved and stored on different devices. But maybe that's just
me and it's a good name. Maybe it'd be better to keep the naming and
only tweak it at the end, not to disrupt reviews unnecessarily.

I hear you about "stripe". I still quite like it, especially as compared
to its predecessor (originally, I called them chunks -- which is
impossible given that SharedTuplestoreChunks are a thing).

For ease of review, as you mentioned, I will keep the name for now. I am
open to changing it later, though.

I've been soliciting ideas for alternatives and, so far, folks have
suggested "stride", "step", "flock", "herd", "cohort", and "school". I'm
still on team "stripe" though, as it stands.
 

nodeHash.c
----------


1) MultiExecPrivateHash says this

   /*
    * Not subject to skew optimization, so either insert normally
    * or save to batch file if it belongs to another stripe
    */

I wonder what it means to "belong to another stripe". I understand what
that means for batches, which are identified by batchno computed from
the hash value. But I thought "stripes" are just work_mem-sized pieces
of a batch, so I don't quite understand this. Especially when the code
does not actually check "which stripe" the row belongs to.


I agree this was confusing.

"belongs to another stripe" meant here that if batch 0 falls back and we
are still loading it, once we've filled up work_mem, we need to start
saving those tuples to a spill file for batch 0. I've changed the
comment to this:

-        * or save to batch file if it belongs to another stripe
+       * or save to batch file if batch 0 falls back and we have
+       * already filled the hashtable up to space_allowed.


2) I find the fields hashloop_fallback rather confusing. We have one in
HashJoinTable (and it's array of BufFile items) and another one in
ParallelHashJoinBatch (this time just bool).

I think HashJoinTable should be renamed to hashloopBatchFile (similarly
to the other BufFile arrays).

I think you are right about the name. I've changed the name in
HashJoinTableData to hashloopBatchFile.

The array of BufFiles hashloop_fallback was only used by serial
hashjoin. The boolean hashloop_fallback variable is used only by
parallel hashjoin.

The reason I had them named the same thing is that I thought it would be
nice to have a variable with the same name to indicate if a batch "fell
back" for both parallel and serial hashjoin--especially since we check
it in the main hashjoin state machine used by parallel and serial
hashjoin.

In serial hashjoin, the BufFiles aren't identified by name, so I kept
them in that array. In parallel hashjoin, each ParallelHashJoinBatch has
the status saved (in the struct).
So, both represented the fall back status of a batch.

However, I agree with you, so I've renamed the serial one to
hashloopBatchFile.
 
Although I'm not sure why we even need
this file, when we have innerBatchFile? BufFile(s) are not exactly free,
in fact it's one of the problems for hashjoins with many batches.


Interesting -- it didn't even occur to me to combine the bitmap with the
inner side batch file data.
It definitely seems like a good idea to save the BufFile given that so
little data will likely go in it and that it has a 1-1 relationship with
inner side batches.

How might it work? Would you reserve some space at the beginning of the
file? When would you reserve the bytes (before adding tuples you won't
know how many bytes you need, so it might be hard to make sure there is
enough space.) Would all inner side files have space reserved or just
fallback batches?

--
Melanie Plageman
Attachment

Re: Avoiding hash join batch explosions with extreme skew and weirdstats

From
Tomas Vondra
Date:
On Thu, Jun 25, 2020 at 03:09:44PM -0700, Melanie Plageman wrote:
>On Tue, Jun 23, 2020 at 3:24 PM Tomas Vondra <tomas.vondra@2ndquadrant.com>
>wrote:
>
>> I started looking at the patch to refresh my knowledge both of this
>> patch and parallel hash join, but I think it needs a rebase. The
>> changes in 7897e3bb90 apparently touched some of the code.
>
>
>Thanks so much for the review, Tomas!
>
>I've attached a rebased patch which also contains updates discussed
>below.
>

Thanks.

>
>> I assume
>> you're working on a patch addressing the remaining TODOS, right?
>>
>
>I wanted to get some feedback on the patch before working through the
>TODOs to make sure I was on the right track.
>
>Now that you are reviewing this, I will focus all my attention
>on addressing your feedback. If there are any TODOs that you feel are
>most important, let me know, so I can start with those.
>
>Otherwise, I will prioritize parallel batch 0 spilling.

Feel free to work on the batch 0 spilling, please. I still need to get
familiar with various parts of the parallel hash join etc. so I don't
have any immediate feedback which TODOs to work on first.

>David Kimura plans to do a bit of work on on parallel hash join batch 0
>spilling tomorrow. Whatever is left after that, I will pick up next
>week. Parallel hash join batch 0 spilling is the last large TODO that I
>had.
>
>My plan was to then focus on the feedback (either about which TODOs are
>most important or outside of the TODOs I've identified) I get from you
>and anyone else who reviews this.
>

OK.

>>
>> I see you've switched to "stripe" naming - I find that a bit confusing,
>> because when I hear stripe I think about RAID, where it means pieces of
>> data interleaved and stored on different devices. But maybe that's just
>> me and it's a good name. Maybe it'd be better to keep the naming and
>> only tweak it at the end, not to disrupt reviews unnecessarily.
>>
>
>I hear you about "stripe". I still quite like it, especially as compared
>to its predecessor (originally, I called them chunks -- which is
>impossible given that SharedTuplestoreChunks are a thing).
>

I don't think using chunks in one place means we can't use it elsewhere
in a different context. I'm sure we have "chunks" in other places. But
let's not bikeshed on this too much.

>For ease of review, as you mentioned, I will keep the name for now. I am
>open to changing it later, though.
>
>I've been soliciting ideas for alternatives and, so far, folks have
>suggested "stride", "step", "flock", "herd", "cohort", and "school". I'm
>still on team "stripe" though, as it stands.
>

;-)

>
>>
>> nodeHash.c
>> ----------
>>
>>
>> 1) MultiExecPrivateHash says this
>>
>>    /*
>>     * Not subject to skew optimization, so either insert normally
>>     * or save to batch file if it belongs to another stripe
>>     */
>>
>> I wonder what it means to "belong to another stripe". I understand what
>> that means for batches, which are identified by batchno computed from
>> the hash value. But I thought "stripes" are just work_mem-sized pieces
>> of a batch, so I don't quite understand this. Especially when the code
>> does not actually check "which stripe" the row belongs to.
>>
>>
>I agree this was confusing.
>
>"belongs to another stripe" meant here that if batch 0 falls back and we
>are still loading it, once we've filled up work_mem, we need to start
>saving those tuples to a spill file for batch 0. I've changed the
>comment to this:
>
>-        * or save to batch file if it belongs to another stripe
>+       * or save to batch file if batch 0 falls back and we have
>+       * already filled the hashtable up to space_allowed.
>

OK. Silly question - what does "batch 0 falls back" mean? Does it mean
that we realized the hash table for batch 0 would not fit into work_mem,
so we switched to the "hashloop" strategy?

>
>> 2) I find the fields hashloop_fallback rather confusing. We have one in
>> HashJoinTable (and it's array of BufFile items) and another one in
>> ParallelHashJoinBatch (this time just bool).
>>
>> I think HashJoinTable should be renamed to hashloopBatchFile (similarly
>> to the other BufFile arrays).
>
>
>I think you are right about the name. I've changed the name in
>HashJoinTableData to hashloopBatchFile.
>
>The array of BufFiles hashloop_fallback was only used by serial
>hashjoin. The boolean hashloop_fallback variable is used only by
>parallel hashjoin.
>
>The reason I had them named the same thing is that I thought it would be
>nice to have a variable with the same name to indicate if a batch "fell
>back" for both parallel and serial hashjoin--especially since we check
>it in the main hashjoin state machine used by parallel and serial
>hashjoin.
>
>In serial hashjoin, the BufFiles aren't identified by name, so I kept
>them in that array. In parallel hashjoin, each ParallelHashJoinBatch has
>the status saved (in the struct).
>So, both represented the fall back status of a batch.
>
>However, I agree with you, so I've renamed the serial one to
>hashloopBatchFile.
>

OK

>>
>> Although I'm not sure why we even need
>> this file, when we have innerBatchFile? BufFile(s) are not exactly free,
>> in fact it's one of the problems for hashjoins with many batches.
>>
>>
>Interesting -- it didn't even occur to me to combine the bitmap with the
>inner side batch file data.
>It definitely seems like a good idea to save the BufFile given that so
>little data will likely go in it and that it has a 1-1 relationship with
>inner side batches.
>
>How might it work? Would you reserve some space at the beginning of the
>file? When would you reserve the bytes (before adding tuples you won't
>know how many bytes you need, so it might be hard to make sure there is
>enough space.) Would all inner side files have space reserved or just
>fallback batches?
>

Oh! So the hashloopBatchFile is only used for the bitmap? I haven't
realized that. In that case it probably makes sense to keep it separate
from the files with spilled tuples, interleaving that somehow would be
way too complex, I think.

However, do we need an array of those files? I thought we only need the
bitmap until we process all rows from each "stripe" and then we can
throw it away, right? Which would also mean we don't need to worry about
the memory usage too much, because the 8kB buffer will go away after
calling BufFileClose.


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Melanie Plageman
Date:
Attached is the current version of adaptive hash join with two
significant changes as compared to v10:

1) Implements spilling of batch 0 for parallel-aware parallel hash join.
2) Moves "striping" of fallback batches from "build" to "load" stage
It includes several smaller changes as well.

Batch 0 spilling is necessary when the hash table for batch 0 cannot fit
in memory and allows us to use the "hashloop" strategy for batch 0.

Spilling of batch 0 necessitated the addition of a few new pieces of
code. The most noticeable one is probably the hash table eviction phase
machine. If batch 0 was marked as a "fallback" batch in
ExecParallelHashIncreaseNumBatches() PHJ_GROW_BATCHES_DECIDING phase,
any future attempt to insert a tuple that would exceed the space_allowed
triggers eviction of the hash table.
ExecParallelHashTableEvictBatch0() will evict all batch 0 tuples in
memory into spill files in a batch 0 inner SharedTuplestore.

This means that when repartitioning batch 0 in the future, both the
batch 0 spill file and the hash table need to be drained and relocated
into the new generation of batches and the hash table. If enough memory
is freed up from batch 0 tuples relocating to other batches, then it is
possible that tuples from the batch 0 spill files will go back into the
hash table.
After batch 0 is evicted, the build stage proceeds as normal.

The main alternative to this design that we considered was to "close" the
hash table after it is full. That is, if batch 0 has been marked to fall
back, once it is full, all subsequent tuples pulled from the outer child
would bypass the hash table altogether and go directly into a spill
file.

We chose the hash table eviction route because I thought it might be
better to write chunks of the hashtable into a file together rather than
sporadically write new batch 0 tuples to spill files as they are
pulled out of the child node. However, since the same sts_puttuple() API
is used in both cases, it is highly possible this won't actually matter
and we will do the same amount of I/O.
Both designs involved changing the flow of the code for inserting and
repartitioning tuples, so I figured that I would choose one, do some
testing, and try the other one later after more discussion and review.

This patch also introduces a significant change to how tuples are split
into stripes. Previously, during the build stage, tuples were written to
spill files in the SharedTuplestore with a stripe number in the metadata
section of the MinimalTuple.
For a batch that had been designated a "fallback" batch,
once the space_allowed had been exhausted, the shared stripe number
would be incremented and the new stripe number was written in the tuple
metadata to the files. Then, during loading, tuples were only loaded
into the hashtable if their stripe number matched the current stripe number.

This had several downsides. It introduced a couple new shared variables --
the current stripe number for the batch and its size.
In master, during the normal mode of the "build" stage, shared variables
for the size or estimated_size of the batch are checked on each
allocation of a STS Chunk or HashMemoryChunk, however, during
repartitioning, because bailing out early was not an option, workers
could use backend-local variables to keep track of size and merge them
at the end of repartitioning. This wasn't possible if we needed accurate
stripe numbers written into the tuples. This meant that we had to add
new shared variable accesses to repartitioning.

To avoid this, Deep and I worked on moving the "striping" logic from the
"build" stage to the "load" stage for batches. Serial hash join already
did striping in this way. This patch now pauses loading once the
space_allowed has been exhausted for parallel hash join as well. The
tricky part was keeping track of multiple read_pages for a given file.

When tuples had explicit stripe numbers, we simply rewound the read_page
in the SharedTuplestoreParticipant to the earliest SharedTuplestoreChunk
that anyone had read and relied on the stripe numbers to avoid loading
tuples more than once. Now, each worker participating in reading from
the SharedTuplestore could have received a read_page "assignment" (four
blocks, currently) and then failed to allocate a HashMemoryChunk. We
cannot risk rewinding the read_page because there could be
SharedTuplestoreChunks that have already been loaded in between ones
that have not.

The design we went with was to "overflow" the tuples from this
SharedTuplestoreChunk onto the end of the write_file which this worker
wrote--if it participated in writing this STS--or by making a new
write_file if it did not participate in writing. This entailed keeping
track of who participated in the write phase. SharedTuplestore
participation now has three "modes"-- reading, writing, and appending.
During appending, workers can write to their own file and read from any
file.

One of the alternative designs I considered was to store the offset and
length of leftover blocks that still needed to be loaded into the hash
table in the SharedTuplestoreParticipant data structure. Then, workers
would pick up these "assignments". It is basically a
SharedTuplestoreParticipant work queue.
The main stumbling block I faced here was allocating a variable number of
things in shared memory. You don't know how many read participants will
read from the file and how many stripes there will be (until you've
loaded the file). In the worst case, you would need space for
nparticipants * nstripes - 1 offset/length combos.
Since I don't know how many stripes I have until I've loaded the file, I
can't allocate shared memory for this up front.

The downside of the "append overflow" design is that, currently, all
workers participating in loading a fallback batch write an overflow
chunk for every fallback stripe.
It seems like something could be done to check if there is space in the
hashtable before accepting an assignment of blocks to read from the
SharedTuplestore and moving the shared variable read_page. It might
reduce instances in which workers have to overflow. However, I tried
this and it is very intrusive on the SharedTuplestore API (it would have
to know about the hash table). Also, oversized tuples would not be
addressed by this pre-assignment check since memory is allocated a
HashMemoryChunk at a time. So, even if this was solved, you would need
overflow functionality

One note is that I had to comment out a test in join_hash.sql which
inserts tuples larger than work_mem in size (each), because it no longer
successfully executes.
Also, the stripe number is not deterministic, so sometimes the tests that
compare fallback batches' number of stripes fail (also in join_hash.sql).

Major outstanding TODOs:
--
- Potential redesign of stripe loading pausing and resumption
- The instrumentation for parallel fallback batches has some problems
- Deadlock hazard avoidance design of the stripe barrier still needs work
- Assorted smaller TODOs in the code


On Thu, Jun 25, 2020 at 5:22 PM Tomas Vondra <tomas.vondra@2ndquadrant.com> wrote:
On Thu, Jun 25, 2020 at 03:09:44PM -0700, Melanie Plageman wrote:
>On Tue, Jun 23, 2020 at 3:24 PM Tomas Vondra <tomas.vondra@2ndquadrant.com>
>wrote:
>
>> I assume
>> you're working on a patch addressing the remaining TODOS, right?
>>
>
>I wanted to get some feedback on the patch before working through the
>TODOs to make sure I was on the right track.
>
>Now that you are reviewing this, I will focus all my attention
>on addressing your feedback. If there are any TODOs that you feel are
>most important, let me know, so I can start with those.
>
>Otherwise, I will prioritize parallel batch 0 spilling.

Feel free to work on the batch 0 spilling, please. I still need to get
familiar with various parts of the parallel hash join etc. so I don't
have any immediate feedback which TODOs to work on first.

>David Kimura plans to do a bit of work on on parallel hash join batch 0
>spilling tomorrow. Whatever is left after that, I will pick up next
>week. Parallel hash join batch 0 spilling is the last large TODO that I
>had.
>
>My plan was to then focus on the feedback (either about which TODOs are
>most important or outside of the TODOs I've identified) I get from you
>and anyone else who reviews this.
>

OK.

See list of patch contents above.

Tomas, I wasn't sure if you would want a patchset which included a
commit with just the differences between this version and v10 since you
had already started reviewing it.
This commit [1] is on a branch off of my fork that has just the delta
between v10 and v11.
As a warning, I have added a few updates to comments and such after
squashing the two in my current branch (which is what is in this patch).
I didn't intend to maintain the commits separately as I felt it would be
more confusing for other reviewers.
 

>
>>
>> nodeHash.c
>> ----------
>>
>>
>> 1) MultiExecPrivateHash says this
>>
>>    /*
>>     * Not subject to skew optimization, so either insert normally
>>     * or save to batch file if it belongs to another stripe
>>     */
>>
>> I wonder what it means to "belong to another stripe". I understand what
>> that means for batches, which are identified by batchno computed from
>> the hash value. But I thought "stripes" are just work_mem-sized pieces
>> of a batch, so I don't quite understand this. Especially when the code
>> does not actually check "which stripe" the row belongs to.
>>
>>
>I agree this was confusing.
>
>"belongs to another stripe" meant here that if batch 0 falls back and we
>are still loading it, once we've filled up work_mem, we need to start
>saving those tuples to a spill file for batch 0. I've changed the
>comment to this:
>
>-        * or save to batch file if it belongs to another stripe
>+       * or save to batch file if batch 0 falls back and we have
>+       * already filled the hashtable up to space_allowed.
>

OK. Silly question - what does "batch 0 falls back" mean? Does it mean
that we realized the hash table for batch 0 would not fit into work_mem,
so we switched to the "hashloop" strategy?

Exactly.


>
>> 2) I find the fields hashloop_fallback rather confusing. We have one in
>> HashJoinTable (and it's array of BufFile items) and another one in
>> ParallelHashJoinBatch (this time just bool).
>>
>> I think HashJoinTable should be renamed to hashloopBatchFile (similarly
>> to the other BufFile arrays).
>
>
>I think you are right about the name. I've changed the name in
>HashJoinTableData to hashloopBatchFile.
>
>The array of BufFiles hashloop_fallback was only used by serial
>hashjoin. The boolean hashloop_fallback variable is used only by
>parallel hashjoin.
>
>The reason I had them named the same thing is that I thought it would be
>nice to have a variable with the same name to indicate if a batch "fell
>back" for both parallel and serial hashjoin--especially since we check
>it in the main hashjoin state machine used by parallel and serial
>hashjoin.
>
>In serial hashjoin, the BufFiles aren't identified by name, so I kept
>them in that array. In parallel hashjoin, each ParallelHashJoinBatch has
>the status saved (in the struct).
>So, both represented the fall back status of a batch.
>
>However, I agree with you, so I've renamed the serial one to
>hashloopBatchFile.
>

OK

>>
>> Although I'm not sure why we even need
>> this file, when we have innerBatchFile? BufFile(s) are not exactly free,
>> in fact it's one of the problems for hashjoins with many batches.
>>
>>
>Interesting -- it didn't even occur to me to combine the bitmap with the
>inner side batch file data.
>It definitely seems like a good idea to save the BufFile given that so
>little data will likely go in it and that it has a 1-1 relationship with
>inner side batches.
>
>How might it work? Would you reserve some space at the beginning of the
>file? When would you reserve the bytes (before adding tuples you won't
>know how many bytes you need, so it might be hard to make sure there is
>enough space.) Would all inner side files have space reserved or just
>fallback batches?
>

Oh! So the hashloopBatchFile is only used for the bitmap? I haven't
realized that. In that case it probably makes sense to keep it separate
from the files with spilled tuples, interleaving that somehow would be
way too complex, I think.

However, do we need an array of those files? I thought we only need the
bitmap until we process all rows from each "stripe" and then we can
throw it away, right? Which would also mean we don't need to worry about
the memory usage too much, because the 8kB buffer will go away after
calling BufFileClose.

 
 Good point! I will try this change.

Attachment

Re: Avoiding hash join batch explosions with extreme skew and weird stats

From
Michael Paquier
Date:
On Mon, Aug 31, 2020 at 03:13:06PM -0700, Melanie Plageman wrote:
> Attached is the current version of adaptive hash join with two
> significant changes as compared to v10:

The CF bot is complaining about a regression test failure:
@@ -2465,7 +2465,7 @@
  Gather (actual rows=469 loops=1)
    Workers Planned: 1
    Workers Launched: 1
-   ->  Parallel Hash Left Join (actual rows=234 loops=2)
+   ->  Parallel Hash Left Join (actual rows=235 loops=2)
--
Michael

Attachment