From f806c211d2d1128303f7ed29bf8d9b8395d3a859 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 27 Mar 2023 16:32:22 +1300 Subject: [PATCH v13 3/4] XXX fixup --- src/backend/executor/nodeHash.c | 34 +++++++++++++++++++++++++++++ src/backend/executor/nodeHashjoin.c | 2 ++ src/include/executor/hashjoin.h | 3 ++- 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 58789be71a..6539b32d45 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2116,6 +2116,17 @@ ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); Assert(BarrierParticipants(&batch->batch_barrier) == 1); + /* + * Has another process decided to give up early and command all processes + * to skip the unmatched scan? + */ + if (batch->skip_unmatched) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + return false; + } + /* * See also ExecParallelHashJoinNewBatch()'s assertion that * batch->work_queue == batch->chunks. That is, we are now ready to start @@ -3211,6 +3222,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; + accessor->outer_eof = false; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), ParallelWorkerNumber + 1, @@ -3266,6 +3278,28 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + /* + * If we're abandoning the PHJ_BATCH_PROBE phase early without having + * reached the end of it, it means the plan doesn't want any more + * tuples, and it is happy to abandon any tuples buffered in this + * process's subplans. For correctness, we can't allow any process to + * execute the PHJ_BATCH_SCAN phase, because we will never have the + * complete set of match bits. Therefore we skip emitting unmatched + * tuples in all backends (if this is a full/right join), as if those + * tuples were all due to be emitted by this process and it has + * abandoned them too. + */ + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + !hashtable->batches[curbatch].outer_eof) + { + /* + * This flag may be written to by multiple backends during + * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN + * phase so requires no extra locking. + */ + batch->skip_unmatched = true; + } + /* * Even if we aren't doing a full/right outer join, we'll step through * the PHJ_BATCH_SCAN phase just to maintain the invariant that freeing diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 93bf0ad6e9..c8af59f106 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -972,6 +972,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, } /* End of this batch */ + hashtable->batches[curbatch].outer_eof = true; + return NULL; } diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 7615025d73..f5fcf7d4e6 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -160,6 +160,7 @@ typedef struct ParallelHashJoinBatch size_t ntuples; /* number of tuples loaded */ size_t old_ntuples; /* number of tuples before repartitioning */ bool space_exhausted; + bool skip_unmatched; /* whether to abandon unmatched scan */ /* * Variable-sized SharedTuplestore objects follow this struct in memory. @@ -204,7 +205,7 @@ typedef struct ParallelHashJoinBatchAccessor size_t estimated_size; /* size of partition on disk */ size_t old_ntuples; /* how many tuples before repartitioning? */ bool at_least_one_chunk; /* has this backend allocated a chunk? */ - + bool outer_eof; /* has this process hit end of batch? */ bool done; /* flag to remember that a batch is done */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; -- 2.39.2