Thread: Reigning in ExecParallelHashRepartitionFirst
Last week as I was working on adaptive hash join [1] and trying to get
parallel adaptive hash join batch 0 to spill correctly, I noticed what
seemed like a problem with the code to repartition batch 0.
If we run out of space while inserting tuples into the hashtable during
the build phase of parallel hash join and proceed to increase the number
of batches, we need to repartition all of the tuples from the old
generation (when nbatch was x) and move them to their new homes in the
new generation (when nbatch is 2x). Before we do this repartitioning we
disable growth in the number of batches.
Then we repartition the tuples from the hashtable, inserting them either
back into the hashtable or into a batch file. While inserting them into
the hashtable, we call ExecParallelHashTupleAlloc(), and, if there is no
space for the current tuple in the current chunk and growth in the
number of batches is disabled, we go ahead and allocate a new chunk of
memory -- regardless of whether or not we will exceed the space limit.
Below, I've included a test case, which, on master, results in an error
while trying to allocate shared memory. I use a custom data type whose
hash function ensures that the tuples will go to batch 0. With my
attached patch, this test case no longer errors out.
I discussed with Thomas Munro, and it seems this is not the desired
behavior.
We discussed how abandoning a repartitioning effort once we know it is
doomed is an optimization anyway.
To start with, I've attached a patch which bails out of the
ExecParallelHashRepartitionFirst() attempt when allocating a new chunk
of memory would exceed the space limit. We skip
ExecParallelHashRepartitionRest() and engage in the deciding phase as
before.
This means that we will disable growth in the number of batches
if all of the tuples that we attempted to load back into the hashtable
from the evicted tuple queue would stay resident in the hashtable.
Otherwise, we will set growth to indicate we need to try increasing the
number of batches and return, eventually returning NULL to the original
allocation function call and indicating we need to retry repartitioning.
It's important to note that if we disable growth in the deciding phase
due to skew, batch 0, and subsequent batches that had too many tuples to
fit in the space allowed, will simply exceed the space limit while
building the hashtable. This patch does not fix that.
Thomas and I also discussed the potential optimization of bailing out of
repartitioning during repartitioning of all of the other batches (after
batch 0) in ExecParallelHashRepartitionRest(). This would be a good
optimization, however, it isn't addressing a "bug" in the same way that
bailing out in ExecParallelHashRepartitionFirst() is. Also, I hacked on
a few versions of this optimization and it requires more thought. I
would like to propose that as a separate patch and thread.
One note about the code of the attached patch, I added a variable to the
ParallelHashJoinState structure indicating that repartitioning should be
abandoned. Workers only need to check it before allocating a new chunk of
memory during repartitioning. I thought about whether or not it would be
better to make it a ParallelHashGrowth stage, but I wasn't sure whether
or not that made sense.
--------------------------------
Test Case
--------------------------------
[1] https://www.postgresql.org/message-id/flat/CA%2BhUKGJvYFCcF8vTHFSQQB_F8oGRsBp3JdZAPWbORZgfAPk5Sw%40mail.gmail.com#1156516651bb2587da3909cf1db29952
--
parallel adaptive hash join batch 0 to spill correctly, I noticed what
seemed like a problem with the code to repartition batch 0.
If we run out of space while inserting tuples into the hashtable during
the build phase of parallel hash join and proceed to increase the number
of batches, we need to repartition all of the tuples from the old
generation (when nbatch was x) and move them to their new homes in the
new generation (when nbatch is 2x). Before we do this repartitioning we
disable growth in the number of batches.
Then we repartition the tuples from the hashtable, inserting them either
back into the hashtable or into a batch file. While inserting them into
the hashtable, we call ExecParallelHashTupleAlloc(), and, if there is no
space for the current tuple in the current chunk and growth in the
number of batches is disabled, we go ahead and allocate a new chunk of
memory -- regardless of whether or not we will exceed the space limit.
Below, I've included a test case, which, on master, results in an error
while trying to allocate shared memory. I use a custom data type whose
hash function ensures that the tuples will go to batch 0. With my
attached patch, this test case no longer errors out.
I discussed with Thomas Munro, and it seems this is not the desired
behavior.
We discussed how abandoning a repartitioning effort once we know it is
doomed is an optimization anyway.
To start with, I've attached a patch which bails out of the
ExecParallelHashRepartitionFirst() attempt when allocating a new chunk
of memory would exceed the space limit. We skip
ExecParallelHashRepartitionRest() and engage in the deciding phase as
before.
This means that we will disable growth in the number of batches
if all of the tuples that we attempted to load back into the hashtable
from the evicted tuple queue would stay resident in the hashtable.
Otherwise, we will set growth to indicate we need to try increasing the
number of batches and return, eventually returning NULL to the original
allocation function call and indicating we need to retry repartitioning.
It's important to note that if we disable growth in the deciding phase
due to skew, batch 0, and subsequent batches that had too many tuples to
fit in the space allowed, will simply exceed the space limit while
building the hashtable. This patch does not fix that.
Thomas and I also discussed the potential optimization of bailing out of
repartitioning during repartitioning of all of the other batches (after
batch 0) in ExecParallelHashRepartitionRest(). This would be a good
optimization, however, it isn't addressing a "bug" in the same way that
bailing out in ExecParallelHashRepartitionFirst() is. Also, I hacked on
a few versions of this optimization and it requires more thought. I
would like to propose that as a separate patch and thread.
One note about the code of the attached patch, I added a variable to the
ParallelHashJoinState structure indicating that repartitioning should be
abandoned. Workers only need to check it before allocating a new chunk of
memory during repartitioning. I thought about whether or not it would be
better to make it a ParallelHashGrowth stage, but I wasn't sure whether
or not that made sense.
--------------------------------
Test Case
--------------------------------
DROP TYPE stub CASCADE;
CREATE TYPE stub AS (value CHAR(8098));
CREATE FUNCTION stub_hash(item stub)
RETURNS INTEGER AS $$
BEGIN
RETURN 0;
END; $$ LANGUAGE plpgsql IMMUTABLE LEAKPROOF STRICT PARALLEL SAFE;
CREATE FUNCTION stub_eq(item1 stub, item2 stub)
RETURNS BOOLEAN AS $$
BEGIN
RETURN item1.value = item2.value;
END; $$ LANGUAGE plpgsql IMMUTABLE LEAKPROOF STRICT PARALLEL SAFE;
CREATE OPERATOR = (
FUNCTION = stub_eq,
LEFTARG = stub,
RIGHTARG = stub,
COMMUTATOR = =,
HASHES, MERGES
);
CREATE OPERATOR CLASS stub_hash_ops
DEFAULT FOR TYPE stub USING hash AS
OPERATOR 1 =(stub, stub),
FUNCTION 1 stub_hash(stub);
DROP TABLE IF EXISTS probeside_batch0;
CREATE TABLE probeside_batch0(a stub);
ALTER TABLE probeside_batch0 ALTER COLUMN a SET STORAGE PLAIN;
INSERT INTO probeside_batch0 SELECT '("")' FROM generate_series(1, 13);
DROP TABLE IF EXISTS hashside_wide_batch0;
CREATE TABLE hashside_wide_batch0(a stub, id int);
ALTER TABLE hashside_wide_batch0 ALTER COLUMN a SET STORAGE PLAIN;
INSERT INTO hashside_wide_batch0 SELECT '("")', 22 FROM generate_series(1, 200);
ANALYZE probeside_batch0, hashside_wide_batch0;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
set enable_hashjoin = on;
set max_parallel_workers_per_gather = 1;
set enable_parallel_hash = on;
set work_mem = '64kB';
explain (analyze, costs off)
SELECT TRIM((probeside_batch0.a).value),
hashside_wide_batch0.id,
hashside_wide_batch0.ctid as innerctid,
TRIM((hashside_wide_batch0.a).value), probeside_batch0.ctid as outerctid
FROM probeside_batch0
LEFT OUTER JOIN hashside_wide_batch0 USING (a);
CREATE TYPE stub AS (value CHAR(8098));
CREATE FUNCTION stub_hash(item stub)
RETURNS INTEGER AS $$
BEGIN
RETURN 0;
END; $$ LANGUAGE plpgsql IMMUTABLE LEAKPROOF STRICT PARALLEL SAFE;
CREATE FUNCTION stub_eq(item1 stub, item2 stub)
RETURNS BOOLEAN AS $$
BEGIN
RETURN item1.value = item2.value;
END; $$ LANGUAGE plpgsql IMMUTABLE LEAKPROOF STRICT PARALLEL SAFE;
CREATE OPERATOR = (
FUNCTION = stub_eq,
LEFTARG = stub,
RIGHTARG = stub,
COMMUTATOR = =,
HASHES, MERGES
);
CREATE OPERATOR CLASS stub_hash_ops
DEFAULT FOR TYPE stub USING hash AS
OPERATOR 1 =(stub, stub),
FUNCTION 1 stub_hash(stub);
DROP TABLE IF EXISTS probeside_batch0;
CREATE TABLE probeside_batch0(a stub);
ALTER TABLE probeside_batch0 ALTER COLUMN a SET STORAGE PLAIN;
INSERT INTO probeside_batch0 SELECT '("")' FROM generate_series(1, 13);
DROP TABLE IF EXISTS hashside_wide_batch0;
CREATE TABLE hashside_wide_batch0(a stub, id int);
ALTER TABLE hashside_wide_batch0 ALTER COLUMN a SET STORAGE PLAIN;
INSERT INTO hashside_wide_batch0 SELECT '("")', 22 FROM generate_series(1, 200);
ANALYZE probeside_batch0, hashside_wide_batch0;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
set enable_hashjoin = on;
set max_parallel_workers_per_gather = 1;
set enable_parallel_hash = on;
set work_mem = '64kB';
explain (analyze, costs off)
SELECT TRIM((probeside_batch0.a).value),
hashside_wide_batch0.id,
hashside_wide_batch0.ctid as innerctid,
TRIM((hashside_wide_batch0.a).value), probeside_batch0.ctid as outerctid
FROM probeside_batch0
LEFT OUTER JOIN hashside_wide_batch0 USING (a);
---------------------------------
--
Melanie Plageman
Attachment
s/reign/rein/ in $subject
On Thu, Jul 9, 2020 at 8:17 AM Melanie Plageman <melanieplageman@gmail.com> wrote: > Last week as I was working on adaptive hash join [1] and trying to get > parallel adaptive hash join batch 0 to spill correctly, I noticed what > seemed like a problem with the code to repartition batch 0. > > If we run out of space while inserting tuples into the hashtable during > the build phase of parallel hash join and proceed to increase the number > of batches, we need to repartition all of the tuples from the old > generation (when nbatch was x) and move them to their new homes in the > new generation (when nbatch is 2x). Before we do this repartitioning we > disable growth in the number of batches. > > Then we repartition the tuples from the hashtable, inserting them either > back into the hashtable or into a batch file. While inserting them into > the hashtable, we call ExecParallelHashTupleAlloc(), and, if there is no > space for the current tuple in the current chunk and growth in the > number of batches is disabled, we go ahead and allocate a new chunk of > memory -- regardless of whether or not we will exceed the space limit. Hmm. It shouldn't really be possible for ExecParallelHashRepartitionFirst() to run out of memory anyway, considering that the input of that operation previously fit (just... I mean we started repartitioning because one more chunk would have pushed us over the edge, but the tuples so far fit, and we'll insert them in the same order for each input chunk, possibly filtering some out). Perhaps you reached this condition because batches[0].shared->size finishes up accounting for the memory used by the bucket array in PHJ_GROW_BUCKETS_ELECTING, but didn't originally account for it in generation 0, so what previously appeared to fit no longer does :-(. I'll look into that.