Re: Parallel Full Hash Join - Mailing list pgsql-hackers
From | Melanie Plageman |
---|---|
Subject | Re: Parallel Full Hash Join |
Date | |
Msg-id | 20230325205159.lzqhplrbrdqeug6o@liskov Whole thread Raw |
In response to | Re: Parallel Full Hash Join (Thomas Munro <thomas.munro@gmail.com>) |
Responses |
Re: Parallel Full Hash Join
|
List | pgsql-hackers |
On Sat, Mar 25, 2023 at 09:21:34AM +1300, Thomas Munro wrote: > * reuse the same umatched_scan_{chunk,idx} variables as above > * rename the list of chunks to scan to work_queue > * fix race/memory leak if we see PHJ_BATCH_SCAN when we attach (it > wasn't OK to just fall through) ah, good catch. > I don't love the way that both ExecHashTableDetachBatch() and > ExecParallelPrepHashTableForUnmatched() duplicate logic relating to > the _SCAN/_FREE protocol, but I'm struggling to find a better idea. > Perhaps I just need more coffee. I'm not sure if I have strong feelings either way. To confirm I understand, though: in ExecHashTableDetachBatch(), the call to BarrierArriveAndDetachExceptLast() serves only to advance the barrier phase through _SCAN, right? It doesn't really matter if this worker is the last worker since BarrierArriveAndDetach() handles that for us. There isn't another barrier function to do this (and I mostly think it is fine), but I did have to think on it for a bit. Oh, and, unrelated, but it is maybe worth updating the BarrierAttach() function comment to mention BarrierArriveAndDetachExceptLast(). > I think your idea of opportunistically joining the scan if it's > already running makes sense to explore for a later step, ie to make > multi-batch PHFJ fully fair, and I think that should be a fairly easy > code change, and I put in some comments where changes would be needed. makes sense. I have some very minor pieces of feedback, mainly about extraneous commas that made me uncomfortable ;) > From 8b526377eb4a4685628624e75743aedf37dd5bfe Mon Sep 17 00:00:00 2001 > From: Thomas Munro <thomas.munro@gmail.com> > Date: Fri, 24 Mar 2023 14:19:07 +1300 > Subject: [PATCH v12 1/2] Scan for unmatched hash join tuples in memory order. > > In a full/right outer join, we need to scan every tuple in the hash > table to find the ones that were not matched while probing, so that we Given how you are using the word "so" here, I think that comma before it is not needed. > @@ -2083,58 +2079,45 @@ bool > ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) > { > HashJoinTable hashtable = hjstate->hj_HashTable; > - HashJoinTuple hashTuple = hjstate->hj_CurTuple; > + HashMemoryChunk chunk; > > - for (;;) > + while ((chunk = hashtable->unmatched_scan_chunk)) > { > - /* > - * hj_CurTuple is the address of the tuple last returned from the > - * current bucket, or NULL if it's time to start scanning a new > - * bucket. > - */ > - if (hashTuple != NULL) > - hashTuple = hashTuple->next.unshared; > - else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) > - { > - hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; > - hjstate->hj_CurBucketNo++; > - } > - else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) > + while (hashtable->unmatched_scan_idx < chunk->used) > { > - int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; > + HashJoinTuple hashTuple = (HashJoinTuple) > + (HASH_CHUNK_DATA(hashtable->unmatched_scan_chunk) + > + hashtable->unmatched_scan_idx); > > - hashTuple = hashtable->skewBucket[j]->tuples; > - hjstate->hj_CurSkewBucketNo++; > - } > - else > - break; /* finished all buckets */ > + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); > + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); > > - while (hashTuple != NULL) > - { > - if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) > - { > - TupleTableSlot *inntuple; > + /* next tuple in this chunk */ > + hashtable->unmatched_scan_idx += MAXALIGN(hashTupleSize); > > - /* insert hashtable's tuple into exec slot */ > - inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), > - hjstate->hj_HashTupleSlot, > - false); /* do not pfree */ > - econtext->ecxt_innertuple = inntuple; > + if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) > + continue; > > - /* > - * Reset temp memory each time; although this function doesn't > - * do any qual eval, the caller will, so let's keep it > - * parallel to ExecScanHashBucket. > - */ > - ResetExprContext(econtext); I don't think I had done this before. Good call. > + /* insert hashtable's tuple into exec slot */ > + econtext->ecxt_innertuple = > + ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), > + hjstate->hj_HashTupleSlot, > + false); > From 6f4e82f0569e5b388440ca0ef268dd307388e8f8 Mon Sep 17 00:00:00 2001 > From: Thomas Munro <thomas.munro@gmail.com> > Date: Fri, 24 Mar 2023 15:23:14 +1300 > Subject: [PATCH v12 2/2] Parallel Hash Full Join. > > Full and right outer joins were not supported in the initial > implementation of Parallel Hash Join, because of deadlock hazards (see no comma needed before the "because" here > discussion). Therefore FULL JOIN inhibited page-based parallelism, > as the other join strategies can't do it either. I actually don't quite understand what this means? It's been awhile for me, so perhaps I'm being dense, but what is page-based parallelism? Also, I would put a comma after "Therefore" :) > Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on > the inner side of one batch's hash table. For now, sidestep the > deadlock problem by terminating parallelism there. The last process to > arrive at that phase emits the unmatched tuples, while others detach and > are free to go and work on other batches, if there are any, but > otherwise they finish the join early. > > That unfairness is considered acceptable for now, because it's better > than no parallelism at all. The build and probe phases are run in > parallel, and the new scan-for-unmatched phase, while serial, is usually > applied to the smaller of the two relations and is either limited by > some multiple of work_mem, or it's too big and is partitioned into > batches and then the situation is improved by batch-level parallelism. > In future work on deadlock avoidance strategies, we may find a way to > parallelize the new phase safely. Is it worth mentioning something about parallel-oblivious parallel hash join not being able to do this still? Or is that obvious? > * > @@ -2908,6 +3042,12 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, > chunk->next.shared = hashtable->batches[curbatch].shared->chunks; > hashtable->batches[curbatch].shared->chunks = chunk_shared; > > + /* > + * Also make this the head of the work_queue list. This is used as a > + * cursor for scanning all chunks in the batch. > + */ > + hashtable->batches[curbatch].shared->work_queue = chunk_shared; > + > if (size <= HASH_CHUNK_THRESHOLD) > { > /* > @@ -3116,18 +3256,31 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) > { > int curbatch = hashtable->curbatch; > ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; > + bool attached = true; > > /* Make sure any temporary files are closed. */ > sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); > sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); > > - /* Detach from the batch we were last working on. */ > - if (BarrierArriveAndDetach(&batch->batch_barrier)) > + /* After attaching we always get at least to PHJ_BATCH_PROBE. */ > + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || > + BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); > + > + /* > + * Even if we aren't doing a full/right outer join, we'll step through > + * the PHJ_BATCH_SCAN phase just to maintain the invariant that freeing > + * happens in PHJ_BATCH_FREE, but that'll be wait-free. > + */ > + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE) full/right joins should never fall into this code path, right? If so, would we be able to assert about that? Maybe it doesn't make sense, though... > + attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); > + if (attached && BarrierArriveAndDetach(&batch->batch_barrier)) > { > /* > - * Technically we shouldn't access the barrier because we're no > - * longer attached, but since there is no way it's moving after > - * this point it seems safe to make the following assertion. > + * We are not longer attached to the batch barrier, but we're the > + * process that was chosen to free resources and it's safe to > + * assert the current phase. The ParallelHashJoinBatch can't go > + * away underneath us while we are attached to the build barrier, > + * making this access safe. > */ > Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE); Otherwise, LGTM. - Melanie
pgsql-hackers by date: