Re: WIP: [[Parallel] Shared] Hash - Mailing list pgsql-hackers
| From | Andres Freund |
|---|---|
| Subject | Re: WIP: [[Parallel] Shared] Hash |
| Date | |
| Msg-id | 20170328203141.3ixg2nmdtxtsk4cq@alap3.anarazel.de Whole thread Raw |
| In response to | Re: [HACKERS] WIP: [[Parallel] Shared] Hash (Thomas Munro <thomas.munro@enterprisedb.com>) |
| List | pgsql-hackers |
Hi,
On 2017-03-27 22:33:03 -0700, Andres Freund wrote:
> On 2017-03-23 20:35:09 +1300, Thomas Munro wrote:
> > Here is a new patch series responding to feedback from Peter and Andres:
>
> Here's a review of 0007 & 0010 together - they're going to have to be
> applied together anyway...
> ...
> ok, ENOTIME for today...
Continuing, where I dropped of tiredly yesterday.
- ExecHashJoinSaveTuple(tuple,
- hashvalue,
- &hashtable->innerBatchFile[batchno]);
+ if (HashJoinTableIsShared(hashtable))
+ sts_puttuple(hashtable->shared_inner_batches, batchno, &hashvalue,
+ tuple);
+ else
+ ExecHashJoinSaveTuple(tuple,
+ hashvalue,
+ &hashtable->innerBatchFile[batchno]); }}
Why isn't this done inside of ExecHashJoinSaveTuple?
@@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable)
+ /* Rewind the shared read heads for this batch, inner and outer. */
+ sts_prepare_parallel_read(hashtable->shared_inner_batches,
+ curbatch);
+ sts_prepare_parallel_read(hashtable->shared_outer_batches,
+ curbatch);
It feels somewhat wrong to do this in here, rather than on the callsites.
+ }
+
+ /*
+ * Each participant needs to make sure that data it has written for
+ * this partition is now read-only and visible to other participants.
+ */
+ sts_end_write(hashtable->shared_inner_batches, curbatch);
+ sts_end_write(hashtable->shared_outer_batches, curbatch);
+
+ /*
+ * Wait again, so that all workers see the new hash table and can
+ * safely read from batch files from any participant because they have
+ * all ended writing.
+ */
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_RESETTING_BATCH(curbatch));
+ BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_RESETTING);
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_LOADING_BATCH(curbatch));
+ ExecHashUpdate(hashtable);
+
+ /* Forget the current chunks. */
+ hashtable->current_chunk = NULL;
+ return;
+ } /* * Release all the hash buckets and tuples acquired in the prior pass, and
@@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt =
MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */
- hashtable->buckets = (HashJoinTuple *)
- palloc0(nbuckets * sizeof(HashJoinTuple));
+ hashtable->buckets = (HashJoinBucketHead *)
+ palloc0(nbuckets * sizeof(HashJoinBucketHead));
- hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple);
+ hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead); /* Cannot be more than our previous peak; we had
thissize before. */ Assert(hashtable->spaceUsed <= hashtable->spacePeak);
@@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable) /* Forget the chunks (the memory was freed by the
contextreset above). */ hashtable->chunks = NULL;
+
+ /* Rewind the shared read heads for this batch, inner and outer. */
+ if (hashtable->innerBatchFile[curbatch] != NULL)
+ {
+ if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind hash-join temporary file: %m")));
+ }
+ if (hashtable->outerBatchFile[curbatch] != NULL)
+ {
+ if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind hash-join temporary file: %m")));
+ }}/*
@@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable)voidExecHashTableResetMatchFlags(HashJoinTable
hashtable){
+ dsa_pointer chunk_shared = InvalidDsaPointer; HashMemoryChunk chunk; HashJoinTuple tuple; int
i; /* Reset all flags in the main table ... */
- chunk = hashtable->chunks;
+ if (HashJoinTableIsShared(hashtable))
+ {
+ /* This only runs in the leader during rescan initialization. */
+ Assert(!IsParallelWorker());
+ hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+ chunk = pop_chunk_queue(hashtable, &chunk_shared);
+ }
+ else
+ chunk = hashtable->chunks;
Hm - doesn't pop_chunk_queue empty the work queue?
+/*
+ * Load a tuple into shared dense storage, like 'load_private_tuple'. This
+ * version is for shared hash tables.
+ */
+static HashJoinTuple
+load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple,
+ dsa_pointer *shared, bool respect_work_mem)
+{
Hm. Are there issues with "blessed" records being stored in shared
memory? I seem to recall you talking about it, but I see nothing
addressing the issue here? (later) Ah, I see - you just prohibit
paralleism in that case - might be worth pointing to.
+ /* Check if some other participant has increased nbatch. */
+ if (hashtable->shared->nbatch > hashtable->nbatch)
+ {
+ Assert(respect_work_mem);
+ ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
+ }
+
+ /* Check if we need to help shrinking. */
+ if (hashtable->shared->shrink_needed && respect_work_mem)
+ {
+ hashtable->current_chunk = NULL;
+ LWLockRelease(&hashtable->shared->chunk_lock);
+ return NULL;
+ }
+
+ /* Oversized tuples get their own chunk. */
+ if (size > HASH_CHUNK_THRESHOLD)
+ chunk_size = size + HASH_CHUNK_HEADER_SIZE;
+ else
+ chunk_size = HASH_CHUNK_SIZE;
+
+ /* If appropriate, check if work_mem would be exceeded by a new chunk. */
+ if (respect_work_mem &&
+ hashtable->shared->grow_enabled &&
+ hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP &&
+ (hashtable->shared->size +
+ chunk_size) > (work_mem * 1024L *
+ hashtable->shared->planned_participants))
+ {
+ /*
+ * It would be exceeded. Let's increase the number of batches, so we
+ * can try to shrink the hash table.
+ */
+ hashtable->shared->nbatch *= 2;
+ ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
+ hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+ hashtable->shared->chunks = InvalidDsaPointer;
+ hashtable->shared->shrink_needed = true;
+ hashtable->current_chunk = NULL;
+ LWLockRelease(&hashtable->shared->chunk_lock);
+
+ /* The caller needs to shrink the hash table. */
+ return NULL;
+ }
Hm - we could end up calling ExecHashIncreaseNumBatches twice here?
Probably harmless.
/* ---------------------------------------------------------------- * ExecHashJoin
@@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node) /* no chance to not build the hash table */
node->hj_FirstOuterTupleSlot = NULL; }
+ else if (hashNode->shared_table_data != NULL)
+ {
+ /*
+ * The empty-outer optimization is not implemented for
+ * shared hash tables yet.
+ */
+ node->hj_FirstOuterTupleSlot = NULL;
Hm, why is this checking for the shared-ness of the join in a different
manner?
+ if (HashJoinTableIsShared(hashtable))
+ {
+ /*
+ * An important optimization: if this is a
+ * single-batch join and not an outer join, there is
+ * no reason to synchronize again when we've finished
+ * probing.
+ */
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+ if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node))
+ return NULL; /* end of join */
+
+ /*
+ * Check if we are a leader that can't go further than
+ * probing the first batch, to avoid risk of deadlock
+ * against workers.
+ */
+ if (!LeaderGateCanContinue(&hashtable->shared->leader_gate))
+ {
+ /*
+ * Other backends will need to handle all future
+ * batches written by me. We don't detach until
+ * after we've finished writing to all batches so
+ * that they are flushed, otherwise another
+ * participant might try to read them too soon.
+ */
+ sts_end_write_all_partitions(hashNode->shared_inner_batches);
+ sts_end_write_all_partitions(hashNode->shared_outer_batches);
+ BarrierDetach(&hashtable->shared->barrier);
+ hashtable->detached_early = true;
+ return NULL;
+ }
+
+ /*
+ * We can't start searching for unmatched tuples until
+ * all participants have finished probing, so we
+ * synchronize here.
+ */
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+ if (BarrierWait(&hashtable->shared->barrier,
+ WAIT_EVENT_HASHJOIN_PROBING))
+ {
+ /* Serial phase: prepare for unmatched. */
+ if (HJ_FILL_INNER(node))
+ {
+ hashtable->shared->chunk_work_queue =
+ hashtable->shared->chunks;
+ hashtable->shared->chunks = InvalidDsaPointer;
+ }
+ }
Couldn't we skip that if this isn't an outer join? Not sure if the
complication would be worth it...
+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+ /*
+ * By the time ExecEndHashJoin runs in a work, shared memory has been
s/work/worker/
+ * destroyed. So this is our last chance to do any shared memory cleanup.
+ */
+ if (node->hj_HashTable)
+ ExecHashTableDetach(node->hj_HashTable);
+}
+ There is no extra charge
+ * for probing the hash table for outer path row, on the basis that
+ * read-only access to a shared hash table shouldn't be any more
+ * expensive.
+ */
Hm, that's debatable. !shared will mostly be on the local numa node,
shared probably not.
* Get hash table size that executor would use for inner relation. *
+ * Shared hash tables are allowed to use the work_mem of all participants
+ * combined to make up for the fact that there is only one copy shared by
+ * all.
Hm. I don't quite understand that reasoning.
* XXX for the moment, always assume that skew optimization will be * performed. As long as
SKEW_WORK_MEM_PERCENTis small, it's not worth * trying to determine that for sure.
If we don't do skew for parallelism, should we skip that bit?
- Andres
pgsql-hackers by date: