From 1ae6e34d38e236cf350d340dd23c168dbba612f8 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Thu, 30 Apr 2020 10:08:38 -0700 Subject: [PATCH v7 2/2] Provisionally detach unless last worker To solve the deadlock hazard of waiting on the stripe_barrier after emitting tuples, provisionally detach from the stripe_barrier if you are not the last worker. Save the state that the stripe_barrier was in. Later, check this batch again and, if the stripe_barrier has not moved forward since you last worked on it, call it done and detach for good. Note that this patch could be much more efficient if workers did not detach from the stripe barrier and close their outer match status bitmaps after failing to be the last worker. When they rejoin, they will have to create new bitmaps and re-attach to the stripe barrier. Originally, this patch had workers keep their bitmaps open, however, there were some synchronization problems with workers having outer match status bitmaps for multiple batches open at the same time. --- src/backend/executor/nodeHash.c | 6 ++- src/backend/executor/nodeHashjoin.c | 82 +++++++++++++++++++++++++---- src/backend/storage/ipc/barrier.c | 2 +- src/include/executor/hashjoin.h | 11 +++- 4 files changed, 87 insertions(+), 14 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index ebfd8f8410..25bfcbace5 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -3139,6 +3139,9 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) BarrierArriveAndWait(&shared->stripe_barrier, 0); BarrierDetach(&shared->stripe_barrier); } + accessor->last_participating_stripe_phase = -3; + /* why isn't done initialized here ? */ + accessor->done = -1; /* Initialize accessor state. All members were zero-initialized. */ accessor->shared = shared; @@ -3241,7 +3244,8 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; - accessor->done = false; + accessor->done = -1; + accessor->last_participating_stripe_phase = -3; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), ParallelWorkerNumber + 1, diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index b87d32ad8e..87a854572d 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1276,6 +1276,7 @@ ExecHashJoinLoadStripe(HashJoinState *hjstate) * possible for hashtable->nbatch to be increased here! */ uint32 hashTupleSize; + /* * TODO: wouldn't it be cool if this returned the size of the tuple * inserted @@ -1360,9 +1361,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) if (hashtable->curbatch >= 0) { + ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[hashtable->curbatch]; if (IsHashloopFallback(hashtable)) + { sb_end_write(hashtable->batches[hashtable->curbatch].sba); - hashtable->batches[hashtable->curbatch].done = true; + if (batch_accessor->last_participating_stripe_phase > -3) + batch_accessor->done = 0; + else + batch_accessor->done = 1; + } + else + batch_accessor->done = 1; ExecHashTableDetachBatch(hashtable); } @@ -1376,7 +1385,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) hashtable->nbatch; do { - if (!hashtable->batches[batchno].done) + if (hashtable->batches[batchno].done != 1) { Barrier *batch_barrier = &hashtable->batches[batchno].shared->batch_barrier; @@ -1413,8 +1422,21 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) sb_initialize_accessor(hashtable->batches[hashtable->curbatch].sba, sts_get_tuplenum(hashtable->batches[hashtable->curbatch].outer_tuples)); hashtable->curstripe = -1; - ExecParallelHashJoinLoadStripe(hjstate); - return true; + if (ExecParallelHashJoinLoadStripe(hjstate)) + return true; + /* + * ExecParallelHashJoinLoadStripe() will return false from + * here when no more work can be done by this worker on + * this batch. Until further optimized, this worker will + * have detached from the stripe_barrier and should close + * its outer match statuses bitmap and then detach from the + * batch. In order to reuse the code below, fall through, + * even though the phase will not have been advanced + */ + if (hashtable->batches[batchno].shared->hashloop_fallback) + sb_end_write(hashtable->batches[batchno].sba); + + /* Fall through. */ case PHJ_BATCH_DONE: @@ -1423,7 +1445,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * remain). */ BarrierDetach(batch_barrier); - hashtable->batches[batchno].done = true; + hashtable->batches[batchno].done = 1; hashtable->curbatch = -1; break; @@ -1461,11 +1483,49 @@ ExecParallelHashJoinLoadStripe(HashJoinState *hjstate) if (hashtable->curstripe >= 0) { - BarrierArriveAndWait(stripe_barrier, WAIT_EVENT_HASH_STRIPE_PROBING); + /* + * After finishing with participating in a stripe, if a worker is the + * only one working on a batch, it will continue working on it. + * However, if a worker is not the only worker working on a batch, it + * would risk deadlock if it waits on the barrier. Instead, it saves + * the current stripe phase and move on. Later, when it comes back to + * this batch, if the stripe phase hasn't advanced from when it last + * participated, it will mark the batch done and never return. If the + * stripe barrier has advanced, then, it will participate again in the + * batch. + */ + if (!BarrierArriveAndDetach(stripe_barrier)) + { + hashtable->batches[batchno].last_participating_stripe_phase = BarrierPhase(stripe_barrier); + sb_end_write(hashtable->batches[hashtable->curbatch].sba); + hashtable->curstripe = -1; + return false; + } + + /* + * This isn't a race condition if no other workers can stay attached to + * this barrier in the intervening time. Basically, if you attach to a + * stripe barrier in the PHJ_STRIPE_DONE phase, + * detach immediately and move on. + */ + BarrierAttach(stripe_barrier); } else if (hashtable->curstripe == -1) { - int phase = BarrierAttach(stripe_barrier); + ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[batchno]; + int phase; + + phase = BarrierAttach(stripe_barrier); + + /* + * If the phase hasn't advanced since the last time this worker + * checked, detach and return to pick another batch. Only check this + * if the worker has worked on this batch before. Workers are not permitted + * to join after the batch has progressed past its first stripe. + */ + if (batch_accessor->done == 0 && + batch_accessor->last_participating_stripe_phase == phase) + return ExecHashTableDetachStripe(hashtable); /* * If a worker enters this phase machine on a stripe number greater @@ -1474,10 +1534,10 @@ ExecParallelHashJoinLoadStripe(HashJoinState *hjstate) * fallback Either way the worker can't contribute so just detach and * move on. */ - if (PHJ_STRIPE_NUMBER(phase) > batch->maximum_stripe_number) - return ExecHashTableDetachStripe(hashtable); - hashtable->curstripe = PHJ_STRIPE_NUMBER(phase); + if (PHJ_STRIPE_NUMBER(phase) > batch->maximum_stripe_number || + PHJ_STRIPE_PHASE(phase) == PHJ_STRIPE_DONE) + return ExecHashTableDetachStripe(hashtable); } else if (hashtable->curstripe == -2) { @@ -1490,6 +1550,8 @@ ExecParallelHashJoinLoadStripe(HashJoinState *hjstate) return ExecHashTableDetachStripe(hashtable); } + hashtable->curstripe = PHJ_STRIPE_NUMBER(BarrierPhase(stripe_barrier)); + /* * The outer side is exhausted and either 1) the current stripe of the * inner side is exhausted and it is time to advance the stripe 2) the diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c index 3e200e02cc..2bfd7e6052 100644 --- a/src/backend/storage/ipc/barrier.c +++ b/src/backend/storage/ipc/barrier.c @@ -308,4 +308,4 @@ BarrierDetachImpl(Barrier *barrier, bool arrive) ConditionVariableBroadcast(&barrier->condition_variable); return last; -} +} \ No newline at end of file diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 9ffcd84806..8d232a1304 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -224,10 +224,12 @@ typedef struct ParallelHashJoinBatchAccessor size_t old_ntuples; /* how many tuples before repartitioning? */ bool at_least_one_chunk; /* has this backend allocated a chunk? */ - bool done; /* flag to remember that a batch is done */ + int done; /* flag to remember that a batch is done */ + /* -1 for not done, 0 for tentatively done, 1 for done */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; SharedBitsAccessor *sba; + int last_participating_stripe_phase; } ParallelHashJoinBatchAccessor; /* @@ -362,7 +364,12 @@ typedef struct HashJoinTableData */ BufFile **hashloop_fallback; /* outer match status files if fall back */ List *fallback_batches_stats; /* per hashjoin batch statistics */ - int curstripe; /* current stripe #; 0 on 1st pass, -2 on phantom stripe */ + + /* + * current stripe #; 0 during 1st pass, -1 when detached -2 on phantom + * stripe + */ + int curstripe; /* * Info about the datatype-specific hash functions for the datatypes being -- 2.20.1