Re: WIP: [[Parallel] Shared] Hash - Mailing list pgsql-hackers

From Andres Freund
Subject Re: WIP: [[Parallel] Shared] Hash
Date
Msg-id 20170328053303.cimag42pspossj5a@alap3.anarazel.de
Whole thread Raw
In response to Re: [HACKERS] WIP: [[Parallel] Shared] Hash  (Thomas Munro <thomas.munro@enterprisedb.com>)
List pgsql-hackers
On 2017-03-23 20:35:09 +1300, Thomas Munro wrote:
> Here is a new patch series responding to feedback from Peter and Andres:

Here's a review of 0007 & 0010 together - they're going to have to be
applied together anyway...


diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index ac339fb566..775c9126c7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3814,6 +3814,21 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>    </varlistentry>
 
+     <varlistentry id="guc-cpu-shared-tuple-cost" xreflabel="cpu_shared_tuple_cost">
+      <term><varname>cpu_shared_tuple_cost</varname> (<type>floating point</type>)
+      <indexterm>
+       <primary><varname>cpu_shared_tuple_cost</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the planner's estimate of the cost of sharing rows in
+        memory during a parallel query.
+        The default is 0.001.
+       </para>
+      </listitem>
+     </varlistentry>
+

Isn't that really low in comparison to the other costs? I think
specifying a bit more what this actually measures would be good too - is
it putting the tuple in shared memory? Is it accessing it?


+     <varlistentry id="guc-cpu-synchronization-cost" xreflabel="cpu_synchronization_cost">
+      <term><varname>cpu_synchronization_cost</varname> (<type>floating point</type>)
+      <indexterm>
+       <primary><varname>cpu_synchronization_cost</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the planner's estimate of the cost of waiting at synchronization
+        points for other processes while executing parallel queries.
+        The default is 1.0.
+       </para>
+      </listitem>
+     </varlistentry>

Isn't this also really cheap in comparison to a, probably cached, seq
page read?


+    if (HashJoinTableIsShared(hashtable))
+    {
+        /*
+         * Synchronize parallel hash table builds.  At this stage we know that
+         * the shared hash table has been created, but we don't know if our
+         * peers are still in MultiExecHash and if so how far through.  We use
+         * the phase to synchronize with them.
+         */
+        barrier = &hashtable->shared->barrier;
+
+        switch (BarrierPhase(barrier))
+        {
+        case PHJ_PHASE_BEGINNING:

Note pgindent will indent this further.  Might be worthwhile to try to
pgindent the file, revert some of the unintended damage.
    /*     * set expression context     */

I'd still like this to be moved to the start.


@@ -126,17 +202,79 @@ MultiExecHash(HashState *node)                /* Not subject to skew optimization, so insert
normally*/                ExecHashTableInsert(hashtable, slot, hashvalue);            }
 
-            hashtable->totalTuples += 1;
+            hashtable->partialTuples += 1;
+            if (!HashJoinTableIsShared(hashtable))
+                hashtable->totalTuples += 1;        }    }

FWIW, I'd put HashJoinTableIsShared() into a local var - the compiler
won't be able to do that on its own because external function calls
could invalidate the result.

That brings me to a related topic: Have you measured whether your
changes cause performance differences?


+    finish_loading(hashtable);

I find the sudden switch to a different naming scheme in the same file a
bit jarring.


+    if (HashJoinTableIsShared(hashtable))
+        BarrierDetach(&hashtable->shared->shrink_barrier);
+
+    if (HashJoinTableIsShared(hashtable))
+    {

Consecutive if blocks with the same condition...


+        bool elected_to_resize;
+
+        /*
+         * Wait for all backends to finish building.  If only one worker is
+         * running the building phase because of a non-partial inner plan, the
+         * other workers will pile up here waiting.  If multiple worker are
+         * building, they should finish close to each other in time.
+         */

That comment is outdated, isn't it?
    /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
-    if (hashtable->nbuckets != hashtable->nbuckets_optimal)
-        ExecHashIncreaseNumBuckets(hashtable);
+    ExecHashUpdate(hashtable);
+    ExecHashIncreaseNumBuckets(hashtable);

So this now doesn't actually increase the number of buckets anymore.

+ reinsert:
+    /* If the table was resized, insert tuples into the new buckets. */
+    ExecHashUpdate(hashtable);
+    ExecHashReinsertAll(hashtable);

ReinsertAll just happens to do nothing if we didn't have to
resize... Not entirely obvious, sure reads as if it were unconditional.
Also, it's not actually "All" when batching is in use, no?


+ post_resize:
+    if (HashJoinTableIsShared(hashtable))
+    {
+        Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING);
+        BarrierWait(barrier, WAIT_EVENT_HASH_RESIZING);
+        Assert(BarrierPhase(barrier) == PHJ_PHASE_REINSERTING);
+    }
+
+ reinsert:
+    /* If the table was resized, insert tuples into the new buckets. */
+    ExecHashUpdate(hashtable);
+    ExecHashReinsertAll(hashtable);

Hm.  So even non-resizing backends reach this - but they happen to not
do anything because there's no work queued up, right?  That's, uh, not
obvious.



For me the code here would be a good bit easier to read if we had a
MultiExecHash and MultiExecParallelHash.  Half of MultiExecHash is just
if(IsShared) blocks, and copying would avoid potential slowdowns.



+        /*
+         * Set up for skew optimization, if possible and there's a need for
+         * more than one batch.  (In a one-batch join, there's no point in
+         * it.)
+         */
+        if (nbatch > 1)
+            ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);

So there's no equivalent to the skew optimization for parallel query
yet...  It doesn't sound like that should be particulalry hard on first
blush?

static void
-ExecHashIncreaseNumBatches(HashJoinTable hashtable)
+ExecHashIncreaseNumBatches(HashJoinTable hashtable, int nbatch)

So this doesn't actually increase the number of batches anymore...  At
the very least this should mention that the main work is done in
ExecHashShrink.

+/*
+ * Process the queue of chunks whose tuples need to be redistributed into the
+ * correct batches until it is empty.  In the best case this will shrink the
+ * hash table, keeping about half of the tuples in memory and sending the rest
+ * to a future batch.
+ */
+static void
+ExecHashShrink(HashJoinTable hashtable)

Should mention this really only is meaningful after
ExecHashIncreaseNumBatches has run.


+{
+    long        ninmemory;
+    long        nfreed;
+    dsa_pointer chunk_shared;
+    HashMemoryChunk chunk;
-    /* If know we need to resize nbuckets, we can do it while rebatching. */
-    if (hashtable->nbuckets_optimal != hashtable->nbuckets)
+    if (HashJoinTableIsShared(hashtable))    {
-        /* we never decrease the number of buckets */
-        Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
+        /*
+         * Since a newly launched participant could arrive while shrinking is
+         * already underway, we need to be able to jump to the correct place
+         * in this function.
+         */
+        switch (PHJ_SHRINK_PHASE(BarrierPhase(&hashtable->shared->shrink_barrier)))
+        {
+        case PHJ_SHRINK_PHASE_BEGINNING: /* likely case */
+            break;
+        case PHJ_SHRINK_PHASE_CLEARING:
+            goto clearing;
+        case PHJ_SHRINK_PHASE_WORKING:
+            goto working;
+        case PHJ_SHRINK_PHASE_DECIDING:
+            goto deciding;
+        }

Hm, so we jump into different nesting levels here :/


ok, ENOTIME for today...



diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index f2c885afbe..87d8f3766e 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -6,10 +6,78 @@ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c)
1994,Regents of the University of California *
 
- * * IDENTIFICATION *      src/backend/executor/nodeHashjoin.c *
+ * NOTES:
+ *
+ * PARALLELISM
+ *
+ * Hash joins can participate in parallel queries in two ways: in
+ * non-parallel-aware mode, where each backend builds an identical hash table
+ * and then probes it with a partial outer relation, or parallel-aware mode
+ * where there is a shared hash table that all participants help to build.  A
+ * parallel-aware hash join can save time and space by dividing the work up
+ * and sharing the result, but has extra communication overheads.

There's a third, right?  The hashjoin, and everything below it, could
also not be parallel, but above it could be some parallel aware node
(e.g. a parallel aware HJ).


+ * In both cases, hash joins use a private state machine to track progress
+ * through the hash join algorithm.

That's not really parallel specific, right?  Perhaps just say that
parallel HJs use the normal state machine?


+ * In a parallel-aware hash join, there is also a shared 'phase' which
+ * co-operating backends use to synchronize their local state machine and
+ * program counter with the multi-process join.  The phase is managed by a
+ * 'barrier' IPC primitive.

Hm. I wonder if 'phase' shouldn't just be name
sharedHashJoinState. Might be a bit easier to understand than a
different terminology.

+ * The phases are as follows:
+ *
+ *   PHJ_PHASE_BEGINNING   -- initial phase, before any participant acts
+ *   PHJ_PHASE_CREATING       -- one participant creates the shmem hash table
+ *   PHJ_PHASE_BUILDING       -- all participants build the hash table
+ *   PHJ_PHASE_RESIZING       -- one participant decides whether to expand buckets
+ *   PHJ_PHASE_REINSERTING -- all participants reinsert tuples if necessary
+ *   PHJ_PHASE_PROBING       -- all participants probe the hash table
+ *   PHJ_PHASE_UNMATCHED   -- all participants scan for unmatched tuples

I think somewhere here - and probably around the sites it's happening -
should mention that state transitions are done kinda implicitly via
BarrierWait progressing to the numerically next phase. That's not
entirely obvious (and actually limits what the barrier mechanism can be
used for...).


- Andres



pgsql-hackers by date:

Previous
From: Nikhil Sontakke
Date:
Subject: Re: Speedup twophase transactions
Next
From: Rafia Sabih
Date:
Subject: Re: WIP: [[Parallel] Shared] Hash