Re: [HACKERS] Parallel Hash take II - Mailing list pgsql-hackers

From Andres Freund
Subject Re: [HACKERS] Parallel Hash take II
Date
Msg-id 20171108034032.jxjw4xxtxhxzfakx@alap3.anarazel.de
Whole thread Raw
In response to Re: [HACKERS] Parallel Hash take II  (Thomas Munro <thomas.munro@enterprisedb.com>)
List pgsql-hackers
Hi,
* avoids wasting memory on duplicated hash tables* avoids wasting disk space on duplicated batch files* avoids wasting
CPUexecuting duplicate subplans
 

What's the last one referring to?





+static void
+MultiExecParallelHash(HashState *node)
+{

+    switch (BarrierPhase(build_barrier))
+    {
+        case PHJ_BUILD_ALLOCATING:
+
+            /*
+             * Either I just allocated the initial hash table in
+             * ExecHashTableCreate(), or someone else is doing that.  Either
+             * way, wait for everyone to arrive here so we can proceed, and
+             * then fall through.
+             */
+            BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);

Can you add a /* fallthrough */ comment here? Gcc is warning if you
don't. While we currently have lotsa other places not having the
annotation, it seem reasonable to have it in new code.


+        case PHJ_BUILD_HASHING_INNER:
+
+            /*
+             * It's time to begin hashing, or if we just arrived here then
+             * hashing is already underway, so join in that effort.  While
+             * hashing we have to be prepared to help increase the number of
+             * batches or buckets at any time, and if we arrived here when
+             * that was already underway we'll have to help complete that work
+             * immediately so that it's safe to access batches and buckets
+             * below.
+             */
+            if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
+                PHJ_GROW_BATCHES_ELECTING)
+                ExecParallelHashIncreaseNumBatches(hashtable);
+            if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
+                PHJ_GROW_BUCKETS_ELECTING)
+                ExecParallelHashIncreaseNumBuckets(hashtable);
+            ExecParallelHashEnsureBatchAccessors(hashtable);

"accessors" sounds a bit weird for a bunch of pointers, but maybe that's
just my ESL senses tingling wrongly.



/* ----------------------------------------------------------------
@@ -240,12 +427,15 @@ ExecEndHash(HashState *node) * ----------------------------------------------------------------
*/HashJoinTable
-ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
+ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)

+    /*
+     * Parallel Hash tries to use the combined work_mem of all workers to
+     * avoid the need to batch.  If that won't work, it falls back to work_mem
+     * per worker and tries to process batches in parallel.
+     */

One day we're going to need a better approach to this. I have no idea
how, but this per-node, and now per_node * max_parallelism, approach has
only implementation simplicity as its benefit.





+static HashJoinTuple
+ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
+                          dsa_pointer *shared)
+{

+static void
+ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
+{



+/*
+ * Get the first tuple in a given bucket identified by number.
+ */
+static HashJoinTuple
+ExecHashFirstTupleInBucket(HashJoinTable hashtable, int bucketno)
+{
+    if (hashtable->parallel_state)
+    {
+        dsa_pointer p =
+        dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);

Can you make this, and possibly a few other places, more readable by
introducing a temporary variable?


+/*
+ * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
+ */
+static void
+ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+                          HashJoinTuple tuple,
+                          dsa_pointer tuple_shared)
+{
+    do
+    {
+        tuple->next.shared = dsa_pointer_atomic_read(head);
+    } while (!dsa_pointer_atomic_compare_exchange(head,
+                                                  &tuple->next.shared,
+                                                  tuple_shared));
+}

This is hard to read.


+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets.  Their phases are as follows:
+ *
+ *   PHJ_GROW_BATCHES_ELECTING       -- initial state
+ *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
+ *   PHJ_GROW_BATCHES_REPARTITIONING -- all rep
s/rep/repartition/?
#include "access/htup_details.h"
+#include "access/parallel.h"#include "executor/executor.h"#include "executor/hashjoin.h"#include
"executor/nodeHash.h"#include"executor/nodeHashjoin.h"#include "miscadmin.h"
 
+#include "pgstat.h"#include "utils/memutils.h"
-
+#include "utils/sharedtuplestore.h"

deletes a separator newline./* ----------------------------------------------------------------
@@ -138,6 +236,18 @@ ExecHashJoin(PlanState *pstate)                    /* no chance to not build the hash table */
              node->hj_FirstOuterTupleSlot = NULL;                }
 
+                else if (hashNode->parallel_state != NULL)
+                {
+                    /*
+                     * The empty-outer optimization is not implemented for
+                     * shared hash tables, because no one participant can
+                     * determine that there are no outer tuples, and it's not
+                     * yet clear that it's worth the synchronization overhead
+                     * of reaching consensus to figure that out.  So we have
+                     * to build the hash table.
+                     */
+                    node->hj_FirstOuterTupleSlot = NULL;
+                }

Hm. Isn't MultiExecParallelHash already doing so?


-                node->hj_JoinState = HJ_NEED_NEW_OUTER;
+                if (hashtable->parallel_state)
+                {
+                    Barrier    *build_barrier;
+
+                    build_barrier = &hashtable->parallel_state->build_barrier;
+                    if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+                    {
+                        /*
+                         * If multi-batch, we need to hash the outer relation
+                         * up front.
+                         */
+                        if (hashtable->nbatch > 1)
+                            ExecParallelHashJoinPartitionOuter(node);
+                        BarrierArriveAndWait(build_barrier,
+                                             WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+                    }
+                    Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+
+                    /* Each backend should now select a batch to work on. */
+                    hashtable->curbatch = -1;
+                    node->hj_JoinState = HJ_NEED_NEW_BATCH;
+
+                    continue;
+                }
+                else
+                    node->hj_JoinState = HJ_NEED_NEW_OUTER;

You know what I'm going to say about all these branches, and sigh.

If we don't split this into two versions, we at least should store
hashNode->parallel_state in a local var, so the compiler doesn't have to
pull that out of memory after every external function call (of which
there are a lot). In common cases it'll end up in a callee saved
registers, and most of the called functions won't be too register
starved (on x86-64).



+/*
+ * Choose a batch to work on, and attach to it.  Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{


+                    /*
+                     * This batch is ready to probe.  Return control to
+                     * caller. We stay attached to batch_barrier so that the
+                     * hash table stays alive until everyone's finish probing

*finished?


+                case PHJ_BATCH_DONE:
+
+                    /*
+                     * Already done.  Detach and go around again (if any
+                     * remain).
+                     */
+                    BarrierDetach(batch_barrier);
+
+                    /*
+                     * We didn't work on this batch, but we need to observe
+                     * its size for EXPLAIN.
+                     */
+                    ExecParallelHashUpdateSpacePeak(hashtable, batchno);
+                    hashtable->batches[batchno].done = true;
+                    hashtable->curbatch = -1;
+                    break;

Hm, maybe I'm missing something, but why is it guaranteed that "we
didn't work on this batch"?



+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+    /*
+     * By the time ExecEndHashJoin runs in a worker, shared memory has been
+     * destroyed.  So this is our last chance to do any shared memory cleanup.
+     */

This comment doesn't really make much sense to me.


+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{

could use a header comment.



a) The executor side is starting to look good.
b) This is a lot of code.
c) I'm tired, planner has to wait till tomorrow.

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: [HACKERS] [PATCH] A hook for session start
Next
From: Kyotaro HORIGUCHI
Date:
Subject: Re: [HACKERS] Restricting maximum keep segments by repslots