Re: Parallel Full Hash Join - Mailing list pgsql-hackers
From | Melanie Plageman |
---|---|
Subject | Re: Parallel Full Hash Join |
Date | |
Msg-id | CAAKRu_bSJfCSNFBk+sKaE-ViFNdo_qvqW1MDEedbbG=99PrPHA@mail.gmail.com Whole thread Raw |
In response to | Parallel Full Hash Join (Thomas Munro <thomas.munro@gmail.com>) |
Responses |
Re: Parallel Full Hash Join
|
List | pgsql-hackers |
On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro <thomas.munro@gmail.com> wrote:
While thinking about looping hash joins (an alternative strategy for
limiting hash join memory usage currently being investigated by
Melanie Plageman in a nearby thread[1]), the topic of parallel query
deadlock hazards came back to haunt me. I wanted to illustrate the
problems I'm aware of with the concrete code where I ran into this
stuff, so here is a new-but-still-broken implementation of $SUBJECT.
This was removed from the original PHJ submission when I got stuck and
ran out of time in the release cycle for 11. Since the original
discussion is buried in long threads and some of it was also a bit
confused, here's a fresh description of the problems as I see them.
Hopefully these thoughts might help Melanie's project move forward,
because it's closely related, but I didn't want to dump another patch
into that other thread. Hence this new thread.
I haven't succeeded in actually observing a deadlock with the attached
patch (though I did last year, very rarely), but I also haven't tried
very hard. The patch seems to produce the right answers and is pretty
scalable, so it's really frustrating not to be able to get it over the
line.
Tuple queue deadlock hazard:
If the leader process is executing the subplan itself and waiting for
all processes to arrive in ExecParallelHashEndProbe() (in this patch)
while another process has filled up its tuple queue and is waiting for
the leader to read some tuples an unblock it, they will deadlock
forever. That can't happen in the the committed version of PHJ,
because it never waits for barriers after it has begun emitting
tuples.
Some possible ways to fix this:
1. You could probably make it so that the PHJ_BATCH_SCAN_INNER phase
in this patch (the scan for unmatched tuples) is executed by only one
process, using the "detach-and-see-if-you-were-last" trick. Melanie
proposed that for an equivalent problem in the looping hash join. I
think it probably works, but it gives up a lot of parallelism and thus
won't scale as nicely as the attached patch.
I have attached a patch which implements this
(v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patch).
For starters, in order to support parallel FOJ and ROJ, I re-enabled
setting the match bit for the tuples in the hashtable which
3e4818e9dd5be294d97c disabled. I did so using the code suggested in [1],
reading the match bit to see if it is already set before setting it.
Then, workers except for the last worker detach after exhausting the
outer side of a batch, leaving one worker to proceed to HJ_FILL_INNER
and do the scan of the hash table and emit unmatched inner tuples.
I have also attached a variant on this patch which I am proposing to
replace it (v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patch)
which has a new ExecParallelScanHashTableForUnmatched() in which the
single worker doing the unmatched scan scans one HashMemoryChunk at a
time and then frees them as it goes. I thought this might perform better
than the version which uses the buckets because 1) it should do a bit
less pointer chasing and 2) it frees each chunk of the hash table as it
scans it which (maybe) would save a bit of time during
ExecHashTableDetachBatch() when it goes through and frees the hash
table, but, my preliminary tests showed a negligible difference between
this and the version using buckets. I will do a bit more testing,
though.
I tried a few other variants of these patches, including one in which
the workers detach from the batch inside of the batch loading and
probing phase machine, ExecParallelHashJoinNewBatch(). This meant that
all workers transition to HJ_FILL_INNER and then HJ_NEED_NEW_BATCH in
order to detach in the batch phase machine. This, however, involved
adding a lot of new variables to distinguish whether or or not the
unmatched outer scan was already done, whether or not the current worker
was the worker elected to do the scan, etc. Overall, it is probably
incorrect to use the HJ_NEED_NEW_BATCH state in this way. I had
originally tried this to avoid operating on the batch_barrier in the
main hash join state machine. I've found that the more different places
we add code attaching and detaching to the batch_barrier (and other PHJ
barriers, for that matter), the harder it is to debug the code, however,
I think in this case it is required.
(v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patch).
For starters, in order to support parallel FOJ and ROJ, I re-enabled
setting the match bit for the tuples in the hashtable which
3e4818e9dd5be294d97c disabled. I did so using the code suggested in [1],
reading the match bit to see if it is already set before setting it.
Then, workers except for the last worker detach after exhausting the
outer side of a batch, leaving one worker to proceed to HJ_FILL_INNER
and do the scan of the hash table and emit unmatched inner tuples.
I have also attached a variant on this patch which I am proposing to
replace it (v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patch)
which has a new ExecParallelScanHashTableForUnmatched() in which the
single worker doing the unmatched scan scans one HashMemoryChunk at a
time and then frees them as it goes. I thought this might perform better
than the version which uses the buckets because 1) it should do a bit
less pointer chasing and 2) it frees each chunk of the hash table as it
scans it which (maybe) would save a bit of time during
ExecHashTableDetachBatch() when it goes through and frees the hash
table, but, my preliminary tests showed a negligible difference between
this and the version using buckets. I will do a bit more testing,
though.
I tried a few other variants of these patches, including one in which
the workers detach from the batch inside of the batch loading and
probing phase machine, ExecParallelHashJoinNewBatch(). This meant that
all workers transition to HJ_FILL_INNER and then HJ_NEED_NEW_BATCH in
order to detach in the batch phase machine. This, however, involved
adding a lot of new variables to distinguish whether or or not the
unmatched outer scan was already done, whether or not the current worker
was the worker elected to do the scan, etc. Overall, it is probably
incorrect to use the HJ_NEED_NEW_BATCH state in this way. I had
originally tried this to avoid operating on the batch_barrier in the
main hash join state machine. I've found that the more different places
we add code attaching and detaching to the batch_barrier (and other PHJ
barriers, for that matter), the harder it is to debug the code, however,
I think in this case it is required.
2. You could probably make it so that only the leader process drops
out of executing the inner unmatched scan, and then I think you
wouldn't have this very specific problem at the cost of losing some
(but not all) parallelism (ie the leader), but there might be other
variants of the problem. For example, a GatherMerge leader process
might be blocked waiting for the next tuple for a tuple from P1, while
P2 is try to write to a full queue, and P1 waits for P2.
3. You could introduce some kind of overflow for tuple queues, so
that tuple queues can never block because they're full (until you run
out of extra memory buffers or disk and error out). I haven't
seriously looked into this but I'm starting to suspect it's the
industrial strength general solution to the problem and variants of it
that show up in other parallelism projects (Parallel Repartition). As
Robert mentioned last time I talked about this[2], you'd probably only
want to allow spooling (rather than waiting) when the leader is
actually waiting for other processes; I'm not sure how exactly to
control that.
4. <thinking-really-big>Goetz Graefe's writing about parallel sorting
comes close to this topic, which he calls flow control deadlocks. He
mentions the possibility of infinite spooling like (3) as a solution.
He's describing a world where producers and consumers are running
concurrently, and the consumer doesn't just decide to start running
the subplan (what we call "leader participation"), so he doesn't
actually have a problem like Gather deadlock. He describes
planner-enforced rules that allow deadlock free execution even with
fixed-size tuple queue flow control by careful controlling where
order-forcing operators are allowed to appear, so he doesn't have a
problem like Gather Merge deadlock. I'm not proposing we should
create a whole bunch of producer and consumer processes to run
different plan fragments, but I think you can virtualise the general
idea in an async executor with "streams", and that also solves other
problems when you start working with partitions in a world where it's
not even sure how many workers will show up. I see this as a long
term architectural goal requiring vast amounts of energy to achieve,
hence my new interest in (3) for now.</thinking-really-big>
Hypothetical inter-node deadlock hazard:
Right now I think it is the case the whenever any node begins pulling
tuples from a subplan, it continues to do so until either the query
ends early or the subplan runs out of tuples. For example, Append
processes its subplans one at a time until they're done -- it doesn't
jump back and forth. Parallel Append doesn't necessarily run them in
the order that they appear in the plan, but it still runs each one to
completion before picking another one. If we ever had a node that
didn't adhere to that rule, then two Parallel Full Hash Join nodes
could dead lock, if some of the workers were stuck waiting in one
while some were stuck waiting in the other.
If we were happy to decree that that is a rule of the current
PostgreSQL executor, then this hypothetical problem would go away.
For example, consider the old patch I recently rebased[3] to allow
Append over a bunch of FDWs representing remote shards to return
tuples as soon as they're ready, not necessarily sequentially (and I
think several others have worked on similar patches). To be
committable under such a rule that applies globally to the whole
executor, that patch would only be allowed to *start* them in any
order, but once it's started pulling tuples from a given subplan it'd
have to pull them all to completion before considering another node.
(Again, that problem goes away in an async model like (4), which will
also be able to do much more interesting things with FDWs, and it's
the FDW thing that I think generates more interest in async execution
than my rambling about abstract parallel query problems.)
The leader exclusion tactics and the spooling idea don't solve the
execution order deadlock possibility, so, this "all except last detach
and last does unmatched inner scan" seems like the best way to solve
both types of deadlock.
There is another option that could maintain some parallelism for the
unmatched inner scan.
This method is exactly like the "all except last detach and last does
unmatched inner scan" method from the perspective of the main hash join
state machine. The difference is in ExecParallelHashJoinNewBatch(). In
the batch_barrier phase machine, workers loop around looking for batches
that are not done.
In this "detach for now" method, all workers except the last one detach
from a batch after exhausting the outer side. They will mark the batch
they were just working on as "provisionally done" (as opposed to
"done"). The last worker advances the batch_barrier from
PHJ_BATCH_PROBING to PHJ_BATCH_SCAN_INNER.
All detached workers then proceed to HJ_NEED_NEW_BATCH and try to find
another batch to work on. If there are no batches that are neither
"done" or "provisionally done", then the worker will re-attach to
batches that are "provisionally done" and attempt to join in conducting
the unmatched inner scan. Once it finishes its worker there, it will
return to HJ_NEED_NEW_BATCH, enter ExecParallelHashJoinNewBatch() and
mark the batch as "done".
Because the worker detached from the batch, this method solves the tuple
queue flow control deadlock issue--this worker could not be attempting
to emit a tuple while the leader waits at the barrier for it. There is
no waiting at the barrier.
However, it is unclear to me whether or not this method will be at risk
of inter-node deadlock/execution order deadlock. It seems like this is
not more at risk than the existing code is for this issue.
If a worker never returns to the HashJoin after leaving to emit a tuple,
in any of the methods (and in master), the query would not finish
correctly because the workers are attached to the batch_barrier while
emitting tuples and, though they may not wait at this barrier again, the
hashtable is cleaned up by the last participant to detach, and this
would not happen if it doesn't return to the batch phase machine. I'm
not sure if this exhibits the problematic behavior detailed above, but,
if it does, it is not unique to this method.
execution order deadlock possibility, so, this "all except last detach
and last does unmatched inner scan" seems like the best way to solve
both types of deadlock.
There is another option that could maintain some parallelism for the
unmatched inner scan.
This method is exactly like the "all except last detach and last does
unmatched inner scan" method from the perspective of the main hash join
state machine. The difference is in ExecParallelHashJoinNewBatch(). In
the batch_barrier phase machine, workers loop around looking for batches
that are not done.
In this "detach for now" method, all workers except the last one detach
from a batch after exhausting the outer side. They will mark the batch
they were just working on as "provisionally done" (as opposed to
"done"). The last worker advances the batch_barrier from
PHJ_BATCH_PROBING to PHJ_BATCH_SCAN_INNER.
All detached workers then proceed to HJ_NEED_NEW_BATCH and try to find
another batch to work on. If there are no batches that are neither
"done" or "provisionally done", then the worker will re-attach to
batches that are "provisionally done" and attempt to join in conducting
the unmatched inner scan. Once it finishes its worker there, it will
return to HJ_NEED_NEW_BATCH, enter ExecParallelHashJoinNewBatch() and
mark the batch as "done".
Because the worker detached from the batch, this method solves the tuple
queue flow control deadlock issue--this worker could not be attempting
to emit a tuple while the leader waits at the barrier for it. There is
no waiting at the barrier.
However, it is unclear to me whether or not this method will be at risk
of inter-node deadlock/execution order deadlock. It seems like this is
not more at risk than the existing code is for this issue.
If a worker never returns to the HashJoin after leaving to emit a tuple,
in any of the methods (and in master), the query would not finish
correctly because the workers are attached to the batch_barrier while
emitting tuples and, though they may not wait at this barrier again, the
hashtable is cleaned up by the last participant to detach, and this
would not happen if it doesn't return to the batch phase machine. I'm
not sure if this exhibits the problematic behavior detailed above, but,
if it does, it is not unique to this method.
Some other notes on the patch:
Aside from the deadlock problem, there are some minor details to tidy
up (handling of late starters probably not quite right, rescans not
yet considered).
These would not be an issue with only one worker doing the scan but
would have to be handled in a potential new parallel-enabled solution
like I suggested above.
would have to be handled in a potential new parallel-enabled solution
like I suggested above.
There is a fun hard-coded parameter that controls
the parallel step size in terms of cache lines for the unmatched scan;
I found that 8 was a lot faster than 4, but no slower than 128 on my
laptop, so I set it to 8.
I didn't add this cache line optimization to my chunk scanning method. I
could do so. Do you think it is more relevant, less relevant, or the
same if only one worker is doing the unmatched inner scan?
could do so. Do you think it is more relevant, less relevant, or the
same if only one worker is doing the unmatched inner scan?
More thoughts along those micro-optimistic
lines: instead of match bit in the header, you could tag the pointer
and sometimes avoid having to follow it, and you could prefetch next
non-matching tuple's cacheline by looking a head a bit.
I would be happy to try doing this once we get the rest of the patch
ironed out so that seeing how much of a performance difference it makes
is more straightforward.
ironed out so that seeing how much of a performance difference it makes
is more straightforward.
[1] https://www.postgresql.org/message-id/flat/CA%2BhUKGKWWmf%3DWELLG%3DaUGbcugRaSQbtm0tKYiBut-B2rVKX63g%40mail.gmail.com
[2] https://www.postgresql.org/message-id/CA%2BTgmoY4LogYcg1y5JPtto_fL-DBUqvxRiZRndDC70iFiVsVFQ%40mail.gmail.com
[3] https://www.postgresql.org/message-id/flat/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com
-- Melanie
Attachment
pgsql-hackers by date: