From 22b01b7e514cf975bb70d14918dcb6611a09bbd4 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Thu, 2 Jul 2020 17:02:48 -0700 Subject: [PATCH v1] Bail out of repartitioning batch 0 when space is exhausted Previously, ExecParallelHashTupleAlloc() would allocate a new chunk of shared memory while repartitioning batch 0 -- even if it would exceed the space limit. This patch adds a new exception to allow the executor to bail out of repartitioning when growth is only disabled because we are in the middle of repartitioning. --- src/backend/executor/nodeHash.c | 30 ++++++++++++++++++++++++++++- src/backend/executor/nodeHashjoin.c | 1 + src/include/executor/hashjoin.h | 1 + 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 45b342011f..9c40d010f3 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1083,6 +1083,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) int new_nbatch; int i; + pstate->abandon_repartitioning = false; /* Move the old batch out of the way. */ old_batch0 = hashtable->batches[0].shared; pstate->old_batches = pstate->batches; @@ -1195,7 +1196,8 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ExecParallelHashTableSetCurrentBatch(hashtable, 0); /* Then partition, flush counters. */ ExecParallelHashRepartitionFirst(hashtable); - ExecParallelHashRepartitionRest(hashtable); + if (!pstate->abandon_repartitioning) + ExecParallelHashRepartitionRest(hashtable); ExecParallelHashMergeCounters(hashtable); /* Wait for the above to be finished. */ BarrierArriveAndWait(&pstate->grow_batches_barrier, @@ -1302,6 +1304,11 @@ ExecParallelHashRepartitionFirst(HashJoinTable hashtable) ExecParallelHashTupleAlloc(hashtable, HJTUPLE_OVERHEAD + tuple->t_len, &shared); + if (!copyTuple) + { + Assert(hashtable->parallel_state->abandon_repartitioning); + return; + } copyTuple->hashvalue = hashTuple->hashvalue; memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], @@ -1759,6 +1766,8 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, hashTuple = ExecParallelHashTupleAlloc(hashtable, HJTUPLE_OVERHEAD + tuple->t_len, &shared); + /* After finishing with the build phase, this function should never fail */ + Assert(hashTuple); hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); @@ -2847,6 +2856,25 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, else chunk_size = HASH_CHUNK_SIZE; + /* + * If we are already repartitioning and the space is full, we need to + * abandon this repartitioning attempt and either start a new attempt with + * double the number of batches or disable growth and resign ourselves to + * exceeding the space allowed. We will decide during + * PHJ_GROW_BATCHES_DECIDING phase which of these to do. + */ + if (pstate->growth == PHJ_GROWTH_DISABLED && + PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)) == + PHJ_GROW_BATCHES_REPARTITIONING && + hashtable->batches[0].at_least_one_chunk && + hashtable->batches[0].shared->size + chunk_size > pstate->space_allowed) + { + pstate->abandon_repartitioning = true; + hashtable->batches[0].shared->space_exhausted = true; + LWLockRelease(&pstate->lock); + return NULL; + } + /* Check if it's time to grow batches or buckets. */ if (pstate->growth != PHJ_GROWTH_DISABLED) { diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9bb23fef1a..39df3cbbc6 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1480,6 +1480,7 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) pstate->total_tuples = 0; LWLockInitialize(&pstate->lock, LWTRANCHE_PARALLEL_HASH_JOIN); + pstate->abandon_repartitioning = false; BarrierInit(&pstate->build_barrier, 0); BarrierInit(&pstate->grow_batches_barrier, 0); BarrierInit(&pstate->grow_buckets_barrier, 0); diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 79b634e8ed..5bdbd4a3c7 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -244,6 +244,7 @@ typedef struct ParallelHashJoinState size_t space_allowed; size_t total_tuples; /* total number of inner tuples */ LWLock lock; /* lock protecting the above */ + bool abandon_repartitioning; Barrier build_barrier; /* synchronization for the build phases */ Barrier grow_batches_barrier; -- 2.20.1