From 0640e2e4a9ffb286e713189e37b104f442047a3b Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 10 Mar 2026 23:41:58 +0300 Subject: [PATCH] Throttle read stream look-ahead against local buffer pin limit --- src/backend/storage/aio/read_stream.c | 37 +++++++++++- src/test/regress/expected/temp.out | 87 +++++++++++++++++++++++++++ src/test/regress/sql/temp.sql | 79 ++++++++++++++++++++++++ 3 files changed, 201 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 031fde9f4c..0ac575e1a6 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -428,6 +428,8 @@ read_stream_start_pending_read(ReadStream *stream) static void read_stream_look_ahead(ReadStream *stream) { + int buffer_limit; + /* * Allow amortizing the cost of submitting IO over multiple IOs. This * requires that we don't do any operations that could lead to a deadlock @@ -437,8 +439,24 @@ read_stream_look_ahead(ReadStream *stream) if (stream->batch_mode) pgaio_enter_batchmode(); + /* + * Compute how many more buffers this backend is allowed to pin. + * This is checked before each StartReadBuffers() call and also used + * to throttle the outer loop so we don't queue more pending blocks + * than we'll be able to pin. + */ + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); + else + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + /* Always allow at least 1 if we hold no pins yet, for progress */ + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; + + while (stream->ios_in_progress < stream->max_ios && - stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + stream->pinned_buffers + stream->pending_read_nblocks < stream->distance && + stream->pinned_buffers + stream->pending_read_nblocks < buffer_limit) { BlockNumber blocknum; int16 buffer_index; @@ -447,6 +465,13 @@ read_stream_look_ahead(ReadStream *stream) if (stream->pending_read_nblocks == stream->io_combine_limit) { read_stream_start_pending_read(stream); + /* Re-check limit after pinning */ + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); + else + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; continue; } @@ -488,6 +513,13 @@ read_stream_look_ahead(ReadStream *stream) pgaio_exit_batchmode(); return; } + /* Re-check limit after each pin */ + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); + else + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; } /* This is the start of a new pending read. */ @@ -509,7 +541,8 @@ read_stream_look_ahead(ReadStream *stream) (stream->pending_read_nblocks == stream->io_combine_limit || (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || - stream->distance == 0) && + stream->distance == 0 || + stream->pinned_buffers + stream->pending_read_nblocks >= buffer_limit) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream); diff --git a/src/test/regress/expected/temp.out b/src/test/regress/expected/temp.out index 370361543b..be7f426fbe 100644 --- a/src/test/regress/expected/temp.out +++ b/src/test/regress/expected/temp.out @@ -566,3 +566,90 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); +-- Tests to verify the correct recovery from exhausting buffer pins +\c +SET temp_buffers = 100; +CREATE TEMP TABLE tt1(val text); +INSERT INTO tt1 SELECT repeat('x', 2000) FROM generate_series(1, 600); +CREATE INDEX ON tt1(val); +CREATE INDEX ON tt1((length(val))); +CREATE TEMP TABLE tt2(val text); +INSERT INTO tt2 SELECT val FROM tt1; +SELECT count(*) FROM tt2; + count +------- + 600 +(1 row) + +SELECT count(*) FROM tt2 WHERE val = repeat('x', 2000); + count +------- + 600 +(1 row) + +DROP TABLE tt2; +DROP TABLE tt1; +-- Exhaust local buffers via many correlated subplans (row IN with correlated condition) +-- verifies no "no empty local buffer available" error during heavy subquery execution +\c +SET effective_io_concurrency = 128; +SET enable_mergejoin = off; +CREATE TEMP TABLE tt_big( + id serial, f01 bytea, f02 bytea, f03 bytea, + f04 bytea, f05 boolean, val text +); +INSERT INTO tt_big +SELECT g, + decode(md5(g::text), 'hex'), + decode(md5((g+1)::text), 'hex'), + decode(md5((g+2)::text), 'hex'), + decode(md5((g+3)::text), 'hex'), + (g % 2 = 0), + repeat('x', 500) +FROM generate_series(1, 100000) g; +CREATE TEMP TABLE tt_small( + s01 bytea, s02 bytea, s03 bytea, s04 boolean +); +INSERT INTO tt_small +SELECT + decode(md5(g::text), 'hex'), + decode(md5((g+1)::text), 'hex'), + decode(md5((g+2)::text), 'hex'), + (g % 3 = 0) +FROM generate_series(1, 500) g; +CREATE TEMP TABLE tt_out(id int, f01 bytea, val numeric); +-- Force correlated subplans (not semi-joins) by using row-value IN +-- with correlated conditions, mimicking 1C query pattern +INSERT INTO tt_out +SELECT DISTINCT t1.id, t1.f01, + CAST(CASE + WHEN (t1.f02, t1.f03) IN + (SELECT t2.s02, t2.s03 FROM tt_small t2 + WHERE t1.f01 = t2.s01 AND t2.s04 = true) + THEN 0 + WHEN (t1.f03, t1.f04) IN + (SELECT t3.s02, t3.s03 FROM tt_small t3 + WHERE t1.f01 = t3.s01 AND t3.s04 = false) + THEN 1 + ELSE 2 + END AS numeric) +FROM tt_big t1 +LEFT JOIN tt_small s ON t1.f01 = s.s01 +WHERE CASE + WHEN (t1.f02, t1.f03) IN + (SELECT t4.s02, t4.s03 FROM tt_small t4 + WHERE t1.f01 = t4.s01 AND t4.s04 = true) + THEN 0 + WHEN (t1.f03, t1.f04) IN + (SELECT t5.s02, t5.s03 FROM tt_small t5 + WHERE t1.f01 = t5.s01 AND t5.s04 = false) + THEN 1 + ELSE 2 +END <> 0; +SELECT count(*) FROM tt_out; + count +------- + 99834 +(1 row) + +DROP TABLE tt_out; diff --git a/src/test/regress/sql/temp.sql b/src/test/regress/sql/temp.sql index d50472ddce..e3063bab9a 100644 --- a/src/test/regress/sql/temp.sql +++ b/src/test/regress/sql/temp.sql @@ -418,3 +418,82 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); + +-- Tests to verify the correct recovery from exhausting buffer pins +\c +SET temp_buffers = 100; +CREATE TEMP TABLE tt1(val text); +INSERT INTO tt1 SELECT repeat('x', 2000) FROM generate_series(1, 600); +CREATE INDEX ON tt1(val); +CREATE INDEX ON tt1((length(val))); +CREATE TEMP TABLE tt2(val text); +INSERT INTO tt2 SELECT val FROM tt1; +SELECT count(*) FROM tt2; +SELECT count(*) FROM tt2 WHERE val = repeat('x', 2000); +DROP TABLE tt2; +DROP TABLE tt1; + +-- Exhaust local buffers via many correlated subplans (row IN with correlated condition) +-- verifies no "no empty local buffer available" error during heavy subquery execution +\c +SET effective_io_concurrency = 128; +SET enable_mergejoin = off; + +CREATE TEMP TABLE tt_big( + id serial, f01 bytea, f02 bytea, f03 bytea, + f04 bytea, f05 boolean, val text +); +INSERT INTO tt_big +SELECT g, + decode(md5(g::text), 'hex'), + decode(md5((g+1)::text), 'hex'), + decode(md5((g+2)::text), 'hex'), + decode(md5((g+3)::text), 'hex'), + (g % 2 = 0), + repeat('x', 500) +FROM generate_series(1, 100000) g; + +CREATE TEMP TABLE tt_small( + s01 bytea, s02 bytea, s03 bytea, s04 boolean +); +INSERT INTO tt_small +SELECT + decode(md5(g::text), 'hex'), + decode(md5((g+1)::text), 'hex'), + decode(md5((g+2)::text), 'hex'), + (g % 3 = 0) +FROM generate_series(1, 500) g; + +CREATE TEMP TABLE tt_out(id int, f01 bytea, val numeric); + +-- Force correlated subplans (not semi-joins) by using row-value IN +-- with correlated conditions, mimicking 1C query pattern +INSERT INTO tt_out +SELECT DISTINCT t1.id, t1.f01, + CAST(CASE + WHEN (t1.f02, t1.f03) IN + (SELECT t2.s02, t2.s03 FROM tt_small t2 + WHERE t1.f01 = t2.s01 AND t2.s04 = true) + THEN 0 + WHEN (t1.f03, t1.f04) IN + (SELECT t3.s02, t3.s03 FROM tt_small t3 + WHERE t1.f01 = t3.s01 AND t3.s04 = false) + THEN 1 + ELSE 2 + END AS numeric) +FROM tt_big t1 +LEFT JOIN tt_small s ON t1.f01 = s.s01 +WHERE CASE + WHEN (t1.f02, t1.f03) IN + (SELECT t4.s02, t4.s03 FROM tt_small t4 + WHERE t1.f01 = t4.s01 AND t4.s04 = true) + THEN 0 + WHEN (t1.f03, t1.f04) IN + (SELECT t5.s02, t5.s03 FROM tt_small t5 + WHERE t1.f01 = t5.s01 AND t5.s04 = false) + THEN 1 + ELSE 2 +END <> 0; + +SELECT count(*) FROM tt_out; +DROP TABLE tt_out; -- 2.43.0