From 549bd36fc39282503d2ab83f7827437ddf6f3e1b Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 12 Sep 2019 00:30:49 +1200 Subject: [PATCH] WIP: Add support for Parallel Full Hash Join. This has an unsolved problem: it's dangerous to run BarrierArriveAndWait() when you're in a phase that has emitted tuples, because some process P might be blocked writing to a full tuple queue, while the leader process, which should be reading it, is also running the subplan and is waiting for P! --- src/backend/executor/nodeHash.c | 90 +++++++++++++++++++++++-- src/backend/executor/nodeHashjoin.c | 35 ++++++++-- src/backend/optimizer/path/joinpath.c | 9 +-- src/backend/postmaster/pgstat.c | 3 + src/backend/storage/ipc/barrier.c | 1 - src/include/executor/hashjoin.h | 5 +- src/include/executor/nodeHash.h | 2 + src/include/nodes/execnodes.h | 2 + src/include/pgstat.h | 1 + src/test/regress/expected/join_hash.out | 56 ++++++++++++++- src/test/regress/sql/join_hash.sql | 23 ++++++- 11 files changed, 204 insertions(+), 23 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 224cbb32ba..c366a523c6 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2050,6 +2050,7 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) hjstate->hj_CurBucketNo = 0; hjstate->hj_CurSkewBucketNo = 0; hjstate->hj_CurTuple = NULL; + hjstate->hj_AllocatedBucketRange = 0; } /* @@ -2126,6 +2127,87 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return false; } +/* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + + for (;;) + { + /* + * hj_CurTuple is the address of the tuple last returned from the + * current bucket, or NULL if it's time to start scanning a new + * bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else if (hjstate->hj_CurBucketNo < hjstate->hj_AllocatedBucketRange) + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo++); + else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + { + /* + * Allocate a few cachelines' worth of buckets and loop around. + * Testing shows that 8 is a good factor. + */ + int step = (PG_CACHE_LINE_SIZE * 8) / sizeof(dsa_pointer_atomic); + + hjstate->hj_CurBucketNo = + pg_atomic_fetch_add_u32(&hashtable->batches[hashtable->curbatch].shared->bucket, + step); + hjstate->hj_AllocatedBucketRange = + Min(hjstate->hj_CurBucketNo + step, hashtable->nbuckets); + } + else + break; /* finished all buckets */ + + while (hashTuple != NULL) + { + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + + /* * ExecHashTableReset * @@ -2937,6 +3019,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) * up the Barrier. */ BarrierInit(&shared->batch_barrier, 0); + pg_atomic_init_u32(&shared->bucket, 0); if (i == 0) { /* Batch 0 doesn't need to be loaded. */ @@ -3097,13 +3180,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) /* Detach from the batch we were last working on. */ if (BarrierArriveAndDetach(&batch->batch_barrier)) { - /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. - */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); - /* Free shared chunks and buckets. */ while (DsaPointerIsValid(batch->chunks)) { diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ec37558c12..1c9a40dcff 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -82,6 +82,7 @@ * PHJ_BATCH_ALLOCATING -- one allocates buckets * PHJ_BATCH_LOADING -- all load the hash table from disk * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_SCAN_INNER -- scan unmatched inner tuples * PHJ_BATCH_DONE -- end * * Batch 0 is a special case, because it starts out in phase @@ -97,9 +98,9 @@ * all other backends attached to it are actively executing the node or have * already arrived. Practically, that means that we never return a tuple * while attached to a barrier, unless the barrier has reached its final - * state. In the slightly special case of the per-batch barrier, we return - * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use - * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * state. + * + * TODO: WIP: That's now not true, because of PHJ_SCAN_INNER. * *------------------------------------------------------------------------- */ @@ -144,6 +145,7 @@ static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static void ExecParallelHashEndProbe(HashJoinState *hjstate); static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); static void ExecParallelHashJoinPartitionOuter(HashJoinState *node); @@ -358,6 +360,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ + if (parallel) + ExecParallelHashEndProbe(node); + if (HJ_FILL_INNER(node)) { /* set up to scan for unmatched inner tuples */ @@ -512,7 +517,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -723,6 +729,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate->hj_CurBucketNo = 0; hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; hjstate->hj_CurTuple = NULL; + hjstate->hj_AllocatedBucketRange = 0; hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys, (PlanState *) hjstate); @@ -1060,6 +1067,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) return true; } +static void +ExecParallelHashEndProbe(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + Barrier *batch_barrier = + &hashtable->batches[hashtable->curbatch].shared->batch_barrier; + + Assert(BarrierPhase(batch_barrier) == PHJ_BATCH_PROBING); + BarrierArriveAndWait(batch_barrier, WAIT_EVENT_HASH_BATCH_PROBING); + Assert(BarrierPhase(batch_barrier) == PHJ_BATCH_SCAN_INNER); +} + /* * Choose a batch to work on, and attach to it. Returns true if successful, * false if there are no more batches. @@ -1155,13 +1174,16 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_DONE can be reached. + * BarrierArriveAndDetach() so that the next phase can be + * reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); return true; + case PHJ_BATCH_SCAN_INNER: + /* TODO -- not right */ + case PHJ_BATCH_DONE: /* @@ -1335,6 +1357,7 @@ ExecReScanHashJoin(HashJoinState *node) node->hj_CurBucketNo = 0; node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; node->hj_CurTuple = NULL; + node->hj_AllocatedBucketRange = 0; node->hj_MatchedOuter = false; node->hj_FirstOuterTupleSlot = NULL; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index dc28b56e74..ef613d7696 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1853,8 +1853,6 @@ hash_inner_and_outer(PlannerInfo *root, */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { @@ -1888,9 +1886,12 @@ hash_inner_and_outer(PlannerInfo *root, * total inner path will also be parallel-safe, but if not, we'll * have to search for the cheapest safe, unparameterized inner * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative - * inner path. + * inner path. If full or right join, we can't use parallelism + * at all because no one process has all the match bits. */ - if (cheapest_total_inner->parallel_safe) + if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT) + cheapest_safe_inner = NULL; + else if (cheapest_total_inner->parallel_safe) cheapest_safe_inner = cheapest_total_inner; else if (save_jointype != JOIN_UNIQUE_INNER) cheapest_safe_inner = diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 011076c3e3..d64cb976db 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3774,6 +3774,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_HASH_BATCH_LOADING: event_name = "Hash/Batch/Loading"; break; + case WAIT_EVENT_HASH_BATCH_PROBING: + event_name = "Hash/Batch/Probing"; + break; case WAIT_EVENT_HASH_BUILD_ALLOCATING: event_name = "Hash/Build/Allocating"; break; diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c index 83cbe33107..170d002444 100644 --- a/src/backend/storage/ipc/barrier.c +++ b/src/backend/storage/ipc/barrier.c @@ -221,7 +221,6 @@ BarrierAttach(Barrier *barrier) ++barrier->participants; phase = barrier->phase; SpinLockRelease(&barrier->mutex); - return phase; } diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 2c94b926d3..f68f4568df 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -160,6 +160,8 @@ typedef struct ParallelHashJoinBatch size_t old_ntuples; /* number of tuples before repartitioning */ bool space_exhausted; + pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner scan */ + /* * Variable-sized SharedTuplestore objects follow this struct in memory. * See the accessor macros below. @@ -265,7 +267,8 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_LOADING 2 #define PHJ_BATCH_PROBING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_SCAN_INNER 4 +#define PHJ_BATCH_DONE 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index fc80f03aa8..94b0be380a 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -58,6 +58,8 @@ extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econ extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index b593d22c48..bdf58cd2ee 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1913,6 +1913,7 @@ typedef struct MergeJoinState * tuple, or NULL if starting search * (hj_CurXXX variables are undefined if * OuterTupleSlot is empty!) + * hj_AllocatedBucketRange range allocated for parallel unmatched scan * hj_OuterTupleSlot tuple slot for outer tuples * hj_HashTupleSlot tuple slot for inner (hashed) tuples * hj_NullOuterTupleSlot prepared null tuple for right/full outer joins @@ -1939,6 +1940,7 @@ typedef struct HashJoinState uint32 hj_CurHashValue; int hj_CurBucketNo; int hj_CurSkewBucketNo; + int hj_AllocatedBucketRange; HashJoinTuple hj_CurTuple; TupleTableSlot *hj_OuterTupleSlot; TupleTableSlot *hj_HashTupleSlot; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index fe076d823d..ef2fbe39ca 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -827,6 +827,7 @@ typedef enum WAIT_EVENT_HASH_BATCH_ALLOCATING, WAIT_EVENT_HASH_BATCH_ELECTING, WAIT_EVENT_HASH_BATCH_LOADING, + WAIT_EVENT_HASH_BATCH_PROBING, WAIT_EVENT_HASH_BUILD_ALLOCATING, WAIT_EVENT_HASH_BUILD_ELECTING, WAIT_EVENT_HASH_BUILD_HASHING_INNER, diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 3a91c144a2..4ca0e01756 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id); 20000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + rollback to settings; -- An full outer join where every record is not matched. -- non-parallel @@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); @@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 40000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: ((0 - s.id) = r.id) + -> Parallel Seq Scan on simple s + -> Parallel Hash + -> Parallel Seq Scan on simple r +(9 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +------- + 40000 +(1 row) + rollback to settings; -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 68c1a8c7b6..504b3611ca 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -418,7 +418,16 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); +select count(*) from simple r full outer join simple s using (id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -436,14 +445,24 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +rollback to settings; + + -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into -- the hash table) -- 2.22.0