Re: WIP: [[Parallel] Shared] Hash - Mailing list pgsql-hackers

From Andres Freund
Subject Re: WIP: [[Parallel] Shared] Hash
Date
Msg-id 20170328203141.3ixg2nmdtxtsk4cq@alap3.anarazel.de
Whole thread Raw
In response to Re: [HACKERS] WIP: [[Parallel] Shared] Hash  (Thomas Munro <thomas.munro@enterprisedb.com>)
List pgsql-hackers
Hi,

On 2017-03-27 22:33:03 -0700, Andres Freund wrote:
> On 2017-03-23 20:35:09 +1300, Thomas Munro wrote:
> > Here is a new patch series responding to feedback from Peter and Andres:
> 
> Here's a review of 0007 & 0010 together - they're going to have to be
> applied together anyway...
> ...
> ok, ENOTIME for today...

Continuing, where I dropped of tiredly yesterday.


-        ExecHashJoinSaveTuple(tuple,
-                              hashvalue,
-                              &hashtable->innerBatchFile[batchno]);
+        if (HashJoinTableIsShared(hashtable))
+            sts_puttuple(hashtable->shared_inner_batches, batchno, &hashvalue,
+                         tuple);
+        else
+            ExecHashJoinSaveTuple(tuple,
+                                  hashvalue,
+                                  &hashtable->innerBatchFile[batchno]);    }}

Why isn't this done inside of ExecHashJoinSaveTuple?




@@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable)

+            /* Rewind the shared read heads for this batch, inner and outer. */
+            sts_prepare_parallel_read(hashtable->shared_inner_batches,
+                                      curbatch);
+            sts_prepare_parallel_read(hashtable->shared_outer_batches,
+                                      curbatch);

It feels somewhat wrong to do this in here, rather than on the callsites.

+        }
+
+        /*
+         * Each participant needs to make sure that data it has written for
+         * this partition is now read-only and visible to other participants.
+         */
+        sts_end_write(hashtable->shared_inner_batches, curbatch);
+        sts_end_write(hashtable->shared_outer_batches, curbatch);
+
+        /*
+         * Wait again, so that all workers see the new hash table and can
+         * safely read from batch files from any participant because they have
+         * all ended writing.
+         */
+        Assert(BarrierPhase(&hashtable->shared->barrier) ==
+               PHJ_PHASE_RESETTING_BATCH(curbatch));
+        BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_RESETTING);
+        Assert(BarrierPhase(&hashtable->shared->barrier) ==
+               PHJ_PHASE_LOADING_BATCH(curbatch));
+        ExecHashUpdate(hashtable);
+
+        /* Forget the current chunks. */
+        hashtable->current_chunk = NULL;
+        return;
+    }    /*     * Release all the hash buckets and tuples acquired in the prior pass, and
@@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable)    oldcxt =
MemoryContextSwitchTo(hashtable->batchCxt);   /* Reallocate and reinitialize the hash bucket headers. */
 
-    hashtable->buckets = (HashJoinTuple *)
-        palloc0(nbuckets * sizeof(HashJoinTuple));
+    hashtable->buckets = (HashJoinBucketHead *)
+        palloc0(nbuckets * sizeof(HashJoinBucketHead));
-    hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple);
+    hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead);    /* Cannot be more than our previous peak; we had
thissize before. */    Assert(hashtable->spaceUsed <= hashtable->spacePeak);
 
@@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable)    /* Forget the chunks (the memory was freed by the
contextreset above). */    hashtable->chunks = NULL;
 
+
+    /* Rewind the shared read heads for this batch, inner and outer. */
+    if (hashtable->innerBatchFile[curbatch] != NULL)
+    {
+        if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET))
+            ereport(ERROR,
+                    (errcode_for_file_access(),
+                   errmsg("could not rewind hash-join temporary file: %m")));
+    }
+    if (hashtable->outerBatchFile[curbatch] != NULL)
+    {
+        if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+            ereport(ERROR,
+                    (errcode_for_file_access(),
+                   errmsg("could not rewind hash-join temporary file: %m")));
+    }}/*
@@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable)voidExecHashTableResetMatchFlags(HashJoinTable
hashtable){
+    dsa_pointer chunk_shared = InvalidDsaPointer;    HashMemoryChunk chunk;    HashJoinTuple tuple;    int
i;   /* Reset all flags in the main table ... */
 
-    chunk = hashtable->chunks;
+    if (HashJoinTableIsShared(hashtable))
+    {
+        /* This only runs in the leader during rescan initialization. */
+        Assert(!IsParallelWorker());
+        hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+        chunk = pop_chunk_queue(hashtable, &chunk_shared);
+    }
+    else
+        chunk = hashtable->chunks;

Hm - doesn't pop_chunk_queue empty the work queue?


+/*
+ * Load a tuple into shared dense storage, like 'load_private_tuple'.  This
+ * version is for shared hash tables.
+ */
+static HashJoinTuple
+load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple,
+                  dsa_pointer *shared, bool respect_work_mem)
+{

Hm. Are there issues with "blessed" records being stored in shared
memory?  I seem to recall you talking about it, but I see nothing
addressing the issue here?    (later) Ah, I see - you just prohibit
paralleism in that case - might be worth pointing to.


+    /* Check if some other participant has increased nbatch. */
+    if (hashtable->shared->nbatch > hashtable->nbatch)
+    {
+        Assert(respect_work_mem);
+        ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
+    }
+
+    /* Check if we need to help shrinking. */
+    if (hashtable->shared->shrink_needed && respect_work_mem)
+    {
+        hashtable->current_chunk = NULL;
+        LWLockRelease(&hashtable->shared->chunk_lock);
+        return NULL;
+    }
+
+    /* Oversized tuples get their own chunk. */
+    if (size > HASH_CHUNK_THRESHOLD)
+        chunk_size = size + HASH_CHUNK_HEADER_SIZE;
+    else
+        chunk_size = HASH_CHUNK_SIZE;
+
+    /* If appropriate, check if work_mem would be exceeded by a new chunk. */
+    if (respect_work_mem &&
+        hashtable->shared->grow_enabled &&
+        hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP &&
+        (hashtable->shared->size +
+         chunk_size) > (work_mem * 1024L *
+                        hashtable->shared->planned_participants))
+    {
+        /*
+         * It would be exceeded.  Let's increase the number of batches, so we
+         * can try to shrink the hash table.
+         */
+        hashtable->shared->nbatch *= 2;
+        ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
+        hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+        hashtable->shared->chunks = InvalidDsaPointer;
+        hashtable->shared->shrink_needed = true;
+        hashtable->current_chunk = NULL;
+        LWLockRelease(&hashtable->shared->chunk_lock);
+
+        /* The caller needs to shrink the hash table. */
+        return NULL;
+    }

Hm - we could end up calling ExecHashIncreaseNumBatches twice here?
Probably harmless.




/* ---------------------------------------------------------------- *        ExecHashJoin
@@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node)                    /* no chance to not build the hash table */
                node->hj_FirstOuterTupleSlot = NULL;                }
 
+                else if (hashNode->shared_table_data != NULL)
+                {
+                    /*
+                     * The empty-outer optimization is not implemented for
+                     * shared hash tables yet.
+                     */
+                    node->hj_FirstOuterTupleSlot = NULL;

Hm, why is this checking for the shared-ness of the join in a different
manner?


+                    if (HashJoinTableIsShared(hashtable))
+                    {
+                        /*
+                         * An important optimization: if this is a
+                         * single-batch join and not an outer join, there is
+                         * no reason to synchronize again when we've finished
+                         * probing.
+                         */
+                        Assert(BarrierPhase(&hashtable->shared->barrier) ==
+                               PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+                        if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node))
+                            return NULL;    /* end of join */
+
+                        /*
+                         * Check if we are a leader that can't go further than
+                         * probing the first batch, to avoid risk of deadlock
+                         * against workers.
+                         */
+                        if (!LeaderGateCanContinue(&hashtable->shared->leader_gate))
+                        {
+                            /*
+                             * Other backends will need to handle all future
+                             * batches written by me.  We don't detach until
+                             * after we've finished writing to all batches so
+                             * that they are flushed, otherwise another
+                             * participant might try to read them too soon.
+                             */
+                            sts_end_write_all_partitions(hashNode->shared_inner_batches);
+                            sts_end_write_all_partitions(hashNode->shared_outer_batches);
+                            BarrierDetach(&hashtable->shared->barrier);
+                            hashtable->detached_early = true;
+                            return NULL;
+                        }
+
+                        /*
+                         * We can't start searching for unmatched tuples until
+                         * all participants have finished probing, so we
+                         * synchronize here.
+                         */
+                        Assert(BarrierPhase(&hashtable->shared->barrier) ==
+                               PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+                        if (BarrierWait(&hashtable->shared->barrier,
+                                        WAIT_EVENT_HASHJOIN_PROBING))
+                        {
+                            /* Serial phase: prepare for unmatched. */
+                            if (HJ_FILL_INNER(node))
+                            {
+                                hashtable->shared->chunk_work_queue =
+                                    hashtable->shared->chunks;
+                                hashtable->shared->chunks = InvalidDsaPointer;
+                            }
+                        }

Couldn't we skip that if this isn't an outer join?  Not sure if the
complication would be worth it...


+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+    /*
+     * By the time ExecEndHashJoin runs in a work, shared memory has been

s/work/worker/

+     * destroyed.  So this is our last chance to do any shared memory cleanup.
+     */
+    if (node->hj_HashTable)
+        ExecHashTableDetach(node->hj_HashTable);
+}

+           There is no extra charge
+     * for probing the hash table for outer path row, on the basis that
+     * read-only access to a shared hash table shouldn't be any more
+     * expensive.
+     */

Hm, that's debatable. !shared will mostly be on the local numa node,
shared probably not.


* Get hash table size that executor would use for inner relation.     *
+     * Shared hash tables are allowed to use the work_mem of all participants
+     * combined to make up for the fact that there is only one copy shared by
+     * all.

Hm. I don't quite understand that reasoning.

     * XXX for the moment, always assume that skew optimization will be     * performed.  As long as
SKEW_WORK_MEM_PERCENTis small, it's not worth     * trying to determine that for sure.
 

If we don't do skew for parallelism, should we skip that bit?



- Andres



pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Monitoring roles patch
Next
From: Mark Dilger
Date:
Subject: Re: Monitoring roles patch