Re: Parallel Full Hash Join - Mailing list pgsql-hackers

From Melanie Plageman
Subject Re: Parallel Full Hash Join
Date
Msg-id CAAKRu_beHSa1Xyuaw1DHh6ycCCHWKbOfJLEqT9aNTT76N44X3w@mail.gmail.com
Whole thread Raw
In response to Re: Parallel Full Hash Join  (Thomas Munro <thomas.munro@gmail.com>)
Responses Re: Parallel Full Hash Join  (Melanie Plageman <melanieplageman@gmail.com>)
List pgsql-hackers

On Mon, Sep 21, 2020 at 8:34 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Tue, Sep 22, 2020 at 8:49 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:
> On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro <thomas.munro@gmail.com> wrote:

I took it for a very quick spin and saw simple cases working nicely,
but TPC-DS queries 51 and 97 (which contain full joins) couldn't be
convinced to use it.  Hmm.

Thanks for taking a look, Thomas!

Both query 51 and query 97 have full outer joins of two CTEs, each of
which are aggregate queries.

During planning when constructing the joinrel and choosing paths, in
hash_inner_and_outer(), we don't consider parallel hash parallel hash
join paths because the outerrel and innerrel do not have
partial_pathlists.

This code

  if (joinrel->consider_parallel &&
    save_jointype != JOIN_UNIQUE_OUTER &&
    outerrel->partial_pathlist != NIL &&
    bms_is_empty(joinrel->lateral_relids))

gates the code to generate partial paths for hash join.

My understanding of this is that if the inner and outerrel don't have
partial paths, then they can't be executed in parallel, so the join
could not be executed in parallel.

For the two TPC-DS queries, even if they use parallel aggs, the finalize
agg will have to be done by a single worker, so I don't think they could
be joined with a parallel hash join.

I added some logging inside the "if" statement and ran join_hash.sql in
regress to see what nodes were typically in the pathlist and partial
pathlist. All of them had basically just sequential scans as the outer
and inner rel paths. regress examples are definitely meant to be
minimal, so this probably wasn't the best place to look for examples of
more complex rels that can be joined with a parallel hash join.
 

>> Some other notes on the patch:

From a quick peek:

+/*
+ * Upon arriving at the barrier, if this worker is not the last
worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, return true
to indicate
+ * you are the last or "elected" worker who is still attached to the barrier.
+ * Another name I considered was BarrierUniqueify or BarrierSoloAssign
+ */
+bool
+BarrierDetachOrElect(Barrier *barrier)

I tried to find some existing naming in writing about
barriers/phasers, but nothing is jumping out at me.  I think a lot of
this stuff comes from super computing where I guess "make all of the
threads give up except one" isn't a primitive they'd be too excited
about :-)

BarrierArriveAndElectOrDetach()... gah, no.

You're right that Arrive should be in there.
So, I went with BarrierArriveAndDetachExceptLast()
It's specific, if not clever.
 

+    last = BarrierDetachOrElect(&batch->batch_barrier);

I'd be nice to add some assertions after that, in the 'last' path,
that there's only one participant and that the phase is as expected,
just to make it even clearer to the reader, and a comment in the other
path that we are no longer attached.

Assert and comment added to the single worker path.
The other path is just back to HJ_NEED_NEW_BATCH and workers will detach
there as before, so I'm not sure where we could add the comment about
the other workers detaching.
 

+    hjstate->hj_AllocatedBucketRange = 0;
...
+    pg_atomic_uint32 bucket;    /* bucket allocator for unmatched inner scan */
...
+                //volatile int mybp = 0; while (mybp == 0)

Some leftover fragments of the bucket-scan version and debugging stuff.

cleaned up (and rebased).

I also changed ExecScanHashTableForUnmatched() to scan HashMemoryChunks
in the hashtable instead of using the buckets to align parallel and
serial hash join code.

Originally, I had that code freeing the chunks of the hashtable after
finishing scanning them, however, I noticed this query from regress
failing:

select * from
(values (1, array[10,20]), (2, array[20,30])) as v1(v1x,v1ys)
left join (values (1, 10), (2, 20)) as v2(v2x,v2y) on v2x = v1x
left join unnest(v1ys) as u1(u1y) on u1y = v2y;

It is because the hash join gets rescanned and because there is only one
batch, ExecReScanHashJoin reuses the same hashtable.

                         QUERY PLAN                          
-------------------------------------------------------------
 Nested Loop Left Join
   ->  Values Scan on "*VALUES*"
   ->  Hash Right Join
         Hash Cond: (u1.u1y = "*VALUES*_1".column2)
         Filter: ("*VALUES*_1".column1 = "*VALUES*".column1)
         ->  Function Scan on unnest u1
         ->  Hash
               ->  Values Scan on "*VALUES*_1"

I was freeing the hashtable as I scanned each chunk, which clearly
doesn't work for a single batch hash join which gets rescanned.

I don't see anything specific to parallel hash join in ExecReScanHashJoin(),
so, it seems like the same rules apply to parallel hash join. So, I will
have to remove the logic that frees the hash table after scanning each
chunk from the parallel function as well.

In addition, I still need to go through the patch with a fine tooth comb
(refine the comments and variable names and such) but just wanted to
check that these changes were in line with what you were thinking first.

Regards,
Melanie (Microsoft)
Attachment

pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Why does PostgresNode.pm set such a low value of max_wal_senders?
Next
From: "David G. Johnston"
Date:
Subject: Re: DROP relation IF EXISTS Docs and Tests - Bug Fix