From 6d34cbee84b06aa27d6c73426f29ef0d50dadb3a Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 29 Sep 2020 13:47:57 -0700 Subject: [PATCH v2] Support Parallel FOJ and ROJ To support parallel FOJ and ROJ, - re-enable setting match bit for tuples in the hash table - a single worker preps for unmatched inner tuple scan in HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock. ExecParallelScanHashTableForUnmatched() is no longer executed by multiple workers. A single worker will scan each HashMemoryChunk in the hash table, freeing it after finishing with it. - To align parallel and serial hash join, change ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the unmatched tuple scan instead of using the buckets --- src/backend/executor/nodeHash.c | 195 ++++++++++++++++++------ src/backend/executor/nodeHashjoin.c | 61 ++++---- src/backend/optimizer/path/joinpath.c | 14 +- src/backend/postmaster/pgstat.c | 3 + src/backend/storage/ipc/barrier.c | 22 ++- src/include/executor/hashjoin.h | 13 +- src/include/executor/nodeHash.h | 3 + src/include/pgstat.h | 1 + src/include/storage/barrier.h | 1 + src/test/regress/expected/join_hash.out | 56 ++++++- src/test/regress/sql/join_hash.sql | 23 ++- 11 files changed, 302 insertions(+), 90 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index ea69eeb2a1..42cb8514d3 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100; hashtable->chunks = NULL; hashtable->current_chunk = NULL; + hashtable->current_chunk_idx = 0; hashtable->parallel_state = state->parallel_state; hashtable->area = state->ps.state->es_query_dsa; hashtable->batches = NULL; @@ -2053,9 +2054,56 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) * hj_CurTuple: last tuple returned, or NULL to start next bucket *---------- */ + HashJoinTable hashtable = hjstate->hj_HashTable; + + hjstate->hj_CurBucketNo = 0; + hjstate->hj_CurSkewBucketNo = 0; + hjstate->hj_CurTuple = NULL; + hashtable->current_chunk = hashtable->chunks; + hashtable->current_chunk_idx = 0; +} + +/* + * ExecPrepHashTableForUnmatched + * set up for a series of ExecScanHashTableForUnmatched calls + * return true if this worker is elected to do the unmatched inner scan + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch]; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + bool last = false; hjstate->hj_CurBucketNo = 0; hjstate->hj_CurSkewBucketNo = 0; hjstate->hj_CurTuple = NULL; + if (curbatch < 0) + return false; + last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); + if (!last) + { + hashtable->batches[hashtable->curbatch].done = true; + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak,batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + hashtable->curbatch = -1; + } + else + { + batch_accessor->shared_chunk = batch->chunks; + batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk); + batch_accessor->current_chunk_idx = 0; + } + return last; } /* @@ -2069,63 +2117,118 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) { - HashJoinTable hashtable = hjstate->hj_HashTable; - HashJoinTuple hashTuple = hjstate->hj_CurTuple; + HashMemoryChunk next; + HashJoinTable hashtable = hjstate->hj_HashTable; - for (;;) + while (hashtable->current_chunk) { - /* - * hj_CurTuple is the address of the tuple last returned from the - * current bucket, or NULL if it's time to start scanning a new - * bucket. - */ - if (hashTuple != NULL) - hashTuple = hashTuple->next.unshared; - else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + while (hashtable->current_chunk_idx < hashtable->current_chunk->used) { - hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; - hjstate->hj_CurBucketNo++; - } - else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) - { - int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; + HashJoinTuple hashTuple = (HashJoinTuple) ( + HASH_CHUNK_DATA(hashtable->current_chunk) + + hashtable->current_chunk_idx + ); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); + + /* next tuple in this chunk */ + hashtable->current_chunk_idx += MAXALIGN(hashTupleSize); + + if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + continue; - hashTuple = hashtable->skewBucket[j]->tuples; - hjstate->hj_CurSkewBucketNo++; + /* insert hashtable's tuple into exec slot */ + econtext->ecxt_innertuple = + ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; } - else - break; /* finished all buckets */ - while (hashTuple != NULL) + next = hashtable->current_chunk->next.unshared; + hashtable->current_chunk = next; + hashtable->current_chunk_idx = 0; + + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + +/* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + dsa_pointer next; + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch]; + + /* + * Only one worker should execute this function. + * Since tuples have already been emitted, it is hazardous for workers + * to wait at the batch_barrier again. Instead, all workers except the last + * will detach and the last will conduct this unmatched inner tuple scan. + */ + Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1); + while (accessor->current_chunk) + { + while (accessor->current_chunk_idx < accessor->current_chunk->used) { - if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) - { - TupleTableSlot *inntuple; + HashJoinTuple hashTuple = (HashJoinTuple) ( + HASH_CHUNK_DATA(accessor->current_chunk) + accessor->current_chunk_idx + ); + accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len); - /* insert hashtable's tuple into exec slot */ - inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), - hjstate->hj_HashTupleSlot, - false); /* do not pfree */ - econtext->ecxt_innertuple = inntuple; + if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + continue; - /* - * Reset temp memory each time; although this function doesn't - * do any qual eval, the caller will, so let's keep it - * parallel to ExecScanHashBucket. - */ - ResetExprContext(econtext); + /* insert hashtable's tuple into exec slot */ + econtext->ecxt_innertuple = ExecStoreMinimalTuple( + HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot,false); - hjstate->hj_CurTuple = hashTuple; - return true; - } + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it parallel to + * ExecScanHashBucket. + */ + ResetExprContext(econtext); - hashTuple = hashTuple->next.unshared; + hjstate->hj_CurTuple = hashTuple; + return true; } - /* allow this loop to be cancellable */ + next = accessor->current_chunk->next.shared; + dsa_free(hashtable->area, accessor->shared_chunk); + accessor->shared_chunk = next; + accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk); + accessor->current_chunk_idx = 0; + CHECK_FOR_INTERRUPTS(); } + accessor->shared->chunks = InvalidDsaPointer; /* * no more unmatched tuples */ @@ -3131,13 +3234,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) /* Detach from the batch we were last working on. */ if (BarrierArriveAndDetach(&batch->batch_barrier)) { - /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. - */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); - /* Free shared chunks and buckets. */ while (DsaPointerIsValid(batch->chunks)) { @@ -3271,6 +3367,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) hashtable->current_chunk = NULL; hashtable->current_chunk_shared = InvalidDsaPointer; hashtable->batches[batchno].at_least_one_chunk = false; + hashtable->batches[batchno].shared_chunk = InvalidDsaPointer; + hashtable->batches[batchno].current_chunk = NULL; + hashtable->batches[batchno].current_chunk_idx = 0; } /* diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5532b91a71..7a2b5275bb 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -82,7 +82,9 @@ * PHJ_BATCH_ALLOCATING -- one allocates buckets * PHJ_BATCH_LOADING -- all load the hash table from disk * PHJ_BATCH_PROBING -- all probe - * PHJ_BATCH_DONE -- end + + * PHJ_BATCH_DONE -- queries not requiring inner fill done + * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done * * Batch 0 is a special case, because it starts out in phase * PHJ_BATCH_PROBING; populating batch 0's hash table is done during @@ -99,7 +101,9 @@ * while attached to a barrier, unless the barrier has reached its final * state. In the slightly special case of the per-batch barrier, we return * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use - * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or + * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires + * a scan for unmatched inner tuples, without waiting. * *------------------------------------------------------------------------- */ @@ -360,9 +364,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) /* end of batch, or maybe whole join */ if (HJ_FILL_INNER(node)) { - /* set up to scan for unmatched inner tuples */ - ExecPrepHashTableForUnmatched(node); - node->hj_JoinState = HJ_FILL_INNER_TUPLES; + if (parallel) + { + if (ExecParallelPrepHashTableForUnmatched(node)) + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + else + node->hj_JoinState = HJ_NEED_NEW_BATCH; + } + else + { + /* set up to scan for unmatched inner tuples */ + ExecPrepHashTableForUnmatched(node); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + } } else node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -455,25 +469,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; - if (parallel) - { - /* - * Full/right outer joins are currently not supported - * for parallel joins, so we don't need to set the - * match bit. Experiments show that it's worth - * avoiding the shared memory traffic on large - * systems. - */ - Assert(!HJ_FILL_INNER(node)); - } - else - { - /* - * This is really only needed if HJ_FILL_INNER(node), - * but we'll avoid the branch and just set it always. - */ + + /* + * This is really only needed if HJ_FILL_INNER(node), + * but we'll avoid the branch and just set it always. + */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - } /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI) @@ -531,7 +533,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * hash table stays alive until everyone's finished * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). - * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_DONE can be reached. + * All attached participants must eventually detach from + * the barrier and one worker must advance the phase + * so that the final phase is reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); return true; - case PHJ_BATCH_DONE: + /* Fall through. */ + + case PHJ_BATCH_FILL_INNER_DONE: /* * Already done. Detach and go around again (if any diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index db54a6ba2e..cbc8c2ad83 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root, * able to properly guarantee uniqueness. Similarly, we can't handle * JOIN_FULL and JOIN_RIGHT, because they can produce false null * extended rows. Also, the resulting path must not be parameterized. - * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel - * Hash, since in that case we're back to a single hash table with a - * single set of match bits for each batch, but that will require - * figuring out a deadlock-free way to wait for the probe to finish. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { @@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root, * total inner path will also be parallel-safe, but if not, we'll * have to search for the cheapest safe, unparameterized inner * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative - * inner path. + * inner path. If full or right join, we can't use parallelism + * (building the hash table in each backend) because no one process + * has all the match bits. */ - if (cheapest_total_inner->parallel_safe) + if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT) + cheapest_safe_inner = NULL; + else if (cheapest_total_inner->parallel_safe) cheapest_safe_inner = cheapest_total_inner; else if (save_jointype != JOIN_UNIQUE_INNER) cheapest_safe_inner = diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e6be2b7836..f6f242d806 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_HASH_BATCH_LOAD: event_name = "HashBatchLoad"; break; + case WAIT_EVENT_HASH_BATCH_PROBE: + event_name = "HashBatchProbe"; + break; case WAIT_EVENT_HASH_BUILD_ALLOCATE: event_name = "HashBuildAllocate"; break; diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c index 3e200e02cc..2e7b0687ef 100644 --- a/src/backend/storage/ipc/barrier.c +++ b/src/backend/storage/ipc/barrier.c @@ -204,6 +204,27 @@ BarrierArriveAndDetach(Barrier *barrier) { return BarrierDetachImpl(barrier, true); } +/* + * Upon arriving at the barrier, if this worker is not the last worker attached, + * detach from the barrier and return false. If this worker is the last worker, + * remain attached and advance the phase of the barrier, return true to indicate + * you are the last or "elected" worker who is still attached to the barrier. + */ +bool +BarrierArriveAndDetachExceptLast(Barrier *barrier) +{ + SpinLockAcquire(&barrier->mutex); + if (barrier->participants > 1) + { + --barrier->participants; + SpinLockRelease(&barrier->mutex); + return false; + } + Assert(barrier->participants == 1); + ++barrier->phase; + SpinLockRelease(&barrier->mutex); + return true; +} /* * Attach to a barrier. All waiting participants will now wait for this @@ -221,7 +242,6 @@ BarrierAttach(Barrier *barrier) ++barrier->participants; phase = barrier->phase; SpinLockRelease(&barrier->mutex); - return phase; } diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index eb5daba36b..2af7228ef0 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -205,6 +205,14 @@ typedef struct ParallelHashJoinBatchAccessor bool at_least_one_chunk; /* has this backend allocated a chunk? */ bool done; /* flag to remember that a batch is done */ + /* + * While doing the unmatched inner scan, the assigned worker may emit + * tuples. Thus, we must keep track of where it was in the hashtable + * so it can return to the correct offset within the correct chunk. + */ + dsa_pointer shared_chunk; + HashMemoryChunk current_chunk; + size_t current_chunk_idx; SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; } ParallelHashJoinBatchAccessor; @@ -265,7 +273,8 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_LOADING 2 #define PHJ_BATCH_PROBING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_FILL_INNER_DONE 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 @@ -351,6 +360,8 @@ typedef struct HashJoinTableData /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ + size_t current_chunk_idx; /* index of tuple within current chunk for serial unmatched inner scan */ + /* Shared and private state for Parallel Hash. */ HashMemoryChunk current_chunk; /* this backend's current chunk */ dsa_area *area; /* DSA area to allocate memory from */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 2db4e2f672..a642736d54 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); +extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 0dfbac46b4..62d5f1d16b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -856,6 +856,7 @@ typedef enum WAIT_EVENT_HASH_BATCH_ALLOCATE, WAIT_EVENT_HASH_BATCH_ELECT, WAIT_EVENT_HASH_BATCH_LOAD, + WAIT_EVENT_HASH_BATCH_PROBE, WAIT_EVENT_HASH_BUILD_ALLOCATE, WAIT_EVENT_HASH_BUILD_ELECT, WAIT_EVENT_HASH_BUILD_HASH_INNER, diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h index d71927cc2f..e0de24378b 100644 --- a/src/include/storage/barrier.h +++ b/src/include/storage/barrier.h @@ -37,6 +37,7 @@ typedef struct Barrier extern void BarrierInit(Barrier *barrier, int num_workers); extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info); extern bool BarrierArriveAndDetach(Barrier *barrier); +extern bool BarrierArriveAndDetachExceptLast(Barrier *barrier); extern int BarrierAttach(Barrier *barrier); extern bool BarrierDetach(Barrier *barrier); extern int BarrierPhase(Barrier *barrier); diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 3a91c144a2..4ca0e01756 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id); 20000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + rollback to settings; -- An full outer join where every record is not matched. -- non-parallel @@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); @@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 40000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: ((0 - s.id) = r.id) + -> Parallel Seq Scan on simple s + -> Parallel Hash + -> Parallel Seq Scan on simple r +(9 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +------- + 40000 +(1 row) + rollback to settings; -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 68c1a8c7b6..504b3611ca 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -418,7 +418,16 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); +select count(*) from simple r full outer join simple s using (id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -436,14 +445,24 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +rollback to settings; + + -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into -- the hash table) -- 2.25.1