Re: Parallel Full Hash Join - Mailing list pgsql-hackers

From Melanie Plageman
Subject Re: Parallel Full Hash Join
Date
Msg-id 20230325205159.lzqhplrbrdqeug6o@liskov
Whole thread Raw
In response to Re: Parallel Full Hash Join  (Thomas Munro <thomas.munro@gmail.com>)
Responses Re: Parallel Full Hash Join
List pgsql-hackers
On Sat, Mar 25, 2023 at 09:21:34AM +1300, Thomas Munro wrote:
>  * reuse the same umatched_scan_{chunk,idx} variables as above
>  * rename the list of chunks to scan to work_queue
>  * fix race/memory leak if we see PHJ_BATCH_SCAN when we attach (it
> wasn't OK to just fall through)

ah, good catch.

> I don't love the way that both ExecHashTableDetachBatch() and
> ExecParallelPrepHashTableForUnmatched() duplicate logic relating to
> the _SCAN/_FREE protocol, but I'm struggling to find a better idea.
> Perhaps I just need more coffee.

I'm not sure if I have strong feelings either way.
To confirm I understand, though: in ExecHashTableDetachBatch(), the call
to BarrierArriveAndDetachExceptLast() serves only to advance the barrier
phase through _SCAN, right? It doesn't really matter if this worker is
the last worker since BarrierArriveAndDetach() handles that for us.
There isn't another barrier function to do this (and I mostly think it
is fine), but I did have to think on it for a bit.

Oh, and, unrelated, but it is maybe worth updating the BarrierAttach()
function comment to mention BarrierArriveAndDetachExceptLast().

> I think your idea of opportunistically joining the scan if it's
> already running makes sense to explore for a later step, ie to make
> multi-batch PHFJ fully fair, and I think that should be a fairly easy
> code change, and I put in some comments where changes would be needed.

makes sense.

I have some very minor pieces of feedback, mainly about extraneous
commas that made me uncomfortable ;)

> From 8b526377eb4a4685628624e75743aedf37dd5bfe Mon Sep 17 00:00:00 2001
> From: Thomas Munro <thomas.munro@gmail.com>
> Date: Fri, 24 Mar 2023 14:19:07 +1300
> Subject: [PATCH v12 1/2] Scan for unmatched hash join tuples in memory order.
> 
> In a full/right outer join, we need to scan every tuple in the hash
> table to find the ones that were not matched while probing, so that we

Given how you are using the word "so" here, I think that comma before it
is not needed.

> @@ -2083,58 +2079,45 @@ bool
>  ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
>  {
>      HashJoinTable hashtable = hjstate->hj_HashTable;
> -    HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> +    HashMemoryChunk chunk;
>  
> -    for (;;)
> +    while ((chunk = hashtable->unmatched_scan_chunk))
>      {
> -        /*
> -         * hj_CurTuple is the address of the tuple last returned from the
> -         * current bucket, or NULL if it's time to start scanning a new
> -         * bucket.
> -         */
> -        if (hashTuple != NULL)
> -            hashTuple = hashTuple->next.unshared;
> -        else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
> -        {
> -            hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
> -            hjstate->hj_CurBucketNo++;
> -        }
> -        else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
> +        while (hashtable->unmatched_scan_idx < chunk->used)
>          {
> -            int            j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
> +            HashJoinTuple hashTuple = (HashJoinTuple)
> +            (HASH_CHUNK_DATA(hashtable->unmatched_scan_chunk) +
> +             hashtable->unmatched_scan_idx);
>  
> -            hashTuple = hashtable->skewBucket[j]->tuples;
> -            hjstate->hj_CurSkewBucketNo++;
> -        }
> -        else
> -            break;                /* finished all buckets */
> +            MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
> +            int            hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
>  
> -        while (hashTuple != NULL)
> -        {
> -            if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> -            {
> -                TupleTableSlot *inntuple;
> +            /* next tuple in this chunk */
> +            hashtable->unmatched_scan_idx += MAXALIGN(hashTupleSize);
>  
> -                /* insert hashtable's tuple into exec slot */
> -                inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
> -                                                 hjstate->hj_HashTupleSlot,
> -                                                 false);    /* do not pfree */
> -                econtext->ecxt_innertuple = inntuple;
> +            if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> +                continue;
>  
> -                /*
> -                 * Reset temp memory each time; although this function doesn't
> -                 * do any qual eval, the caller will, so let's keep it
> -                 * parallel to ExecScanHashBucket.
> -                 */
> -                ResetExprContext(econtext);

I don't think I had done this before. Good call.

> +            /* insert hashtable's tuple into exec slot */
> +            econtext->ecxt_innertuple =
> +                ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
> +                                      hjstate->hj_HashTupleSlot,
> +                                      false);

> From 6f4e82f0569e5b388440ca0ef268dd307388e8f8 Mon Sep 17 00:00:00 2001
> From: Thomas Munro <thomas.munro@gmail.com>
> Date: Fri, 24 Mar 2023 15:23:14 +1300
> Subject: [PATCH v12 2/2] Parallel Hash Full Join.
> 
> Full and right outer joins were not supported in the initial
> implementation of Parallel Hash Join, because of deadlock hazards (see

no comma needed before the "because" here

> discussion).  Therefore FULL JOIN inhibited page-based parallelism,
> as the other join strategies can't do it either.

I actually don't quite understand what this means? It's been awhile for
me, so perhaps I'm being dense, but what is page-based parallelism?
Also, I would put a comma after "Therefore" :)

> Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on
> the inner side of one batch's hash table.  For now, sidestep the
> deadlock problem by terminating parallelism there.  The last process to
> arrive at that phase emits the unmatched tuples, while others detach and
> are free to go and work on other batches, if there are any, but
> otherwise they finish the join early.
> 
> That unfairness is considered acceptable for now, because it's better
> than no parallelism at all.  The build and probe phases are run in
> parallel, and the new scan-for-unmatched phase, while serial, is usually
> applied to the smaller of the two relations and is either limited by
> some multiple of work_mem, or it's too big and is partitioned into
> batches and then the situation is improved by batch-level parallelism.
> In future work on deadlock avoidance strategies, we may find a way to
> parallelize the new phase safely.

Is it worth mentioning something about parallel-oblivious parallel hash
join not being able to do this still? Or is that obvious?

>   *
> @@ -2908,6 +3042,12 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
>      chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
>      hashtable->batches[curbatch].shared->chunks = chunk_shared;
>  
> +    /*
> +     * Also make this the head of the work_queue list.  This is used as a
> +     * cursor for scanning all chunks in the batch.
> +     */
> +    hashtable->batches[curbatch].shared->work_queue = chunk_shared;
> +
>      if (size <= HASH_CHUNK_THRESHOLD)
>      {
>          /*
> @@ -3116,18 +3256,31 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
>      {
>          int            curbatch = hashtable->curbatch;
>          ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
> +        bool        attached = true;
>  
>          /* Make sure any temporary files are closed. */
>          sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
>          sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
>  
> -        /* Detach from the batch we were last working on. */
> -        if (BarrierArriveAndDetach(&batch->batch_barrier))
> +        /* After attaching we always get at least to PHJ_BATCH_PROBE. */
> +        Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
> +               BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
> +
> +        /*
> +         * Even if we aren't doing a full/right outer join, we'll step through
> +         * the PHJ_BATCH_SCAN phase just to maintain the invariant that freeing
> +         * happens in PHJ_BATCH_FREE, but that'll be wait-free.
> +         */
> +        if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)

full/right joins should never fall into this code path, right?

If so, would we be able to assert about that? Maybe it doesn't make
sense, though...

> +            attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
> +        if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
>          {
>              /*
> -             * Technically we shouldn't access the barrier because we're no
> -             * longer attached, but since there is no way it's moving after
> -             * this point it seems safe to make the following assertion.
> +             * We are not longer attached to the batch barrier, but we're the
> +             * process that was chosen to free resources and it's safe to
> +             * assert the current phase.  The ParallelHashJoinBatch can't go
> +             * away underneath us while we are attached to the build barrier,
> +             * making this access safe.
>               */
>              Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);

Otherwise, LGTM.

- Melanie



pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: what should install-world do when docs are not available?
Next
From: Andres Freund
Date:
Subject: Re: what should install-world do when docs are not available?