From c3858e25c03f8f949cf1564e52236b5647427cbb Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 3 Apr 2026 13:01:26 -0400 Subject: [PATCH v5 3/5] read stream: Split decision about look ahead for AIO and combining In a subsequent commit the read-ahead distance will only be increased when waiting for IO. Without further work that would cause a regression: As IO combining and read-ahead are currently controlled by the same mechanism, we would end up not allowing IO combining when never needing to wait for IO (as the distance ends up too small to allow for full sized IOs), which can increase CPU overhead. A typical reason to not have to wait for IO completion at a low look-ahead distance is use of io_uring with the to-be-read data in the page cache. But even with worker the IO submission rate may be low enough for the worker to keep up. One might think that we could just always perform IO combining, but doing so at the start of a scan can cause performance regressions: 1) Performing a large IO commonly has a higher latency than smaller IOs. That is not a problem once reading ahead far enough, but at the start of a stream it can lead to longer waits for IO completion. 2) Sometimes read streams will not be read to completion. Immediately starting with full sized IOs leads to more wasted effort. This is not commonly an issue with existing read stream users, but the upcoming use of read streams to fetch table pages as part of an index scan frequently encounters this. One of the comments in read_stream_should_look_ahead() refers to a motivation that only really exists as of the next commit, but without it the code doesn't make sense on its own. Reviewed-by: Nazir Bilal Yavuz Reviewed-by: Melanie Plageman Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu Discussion: https://postgr.es/m/CA+hUKGL2PhFyDoqrHefqasOnaXhSg48t1phs3VM8BAdrZqKZkw@mail.gmail.com --- src/backend/storage/aio/read_stream.c | 141 +++++++++++++++++++------- 1 file changed, 107 insertions(+), 34 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 3499776d210..a63e66988e1 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -98,10 +98,23 @@ struct ReadStream int16 max_pinned_buffers; int16 forwarded_buffers; int16 pinned_buffers; - int16 distance; + + /* + * Limit of how far, in blocks, to look-ahead for IO combining and for + * read-ahead. + * + * The limits for read-ahead and combining are handled separately to allow + * for IO combining even in cases where the IO subsystem can keep up at a + * low read-ahead distance, as doing larger IOs is more efficient. + * + * Set to 0 when the end of the stream is reached. + */ + int16 combine_distance; + int16 readahead_distance; uint16 distance_decay_holdoff; int16 initialized_buffers; - int16 resume_distance; + int16 resume_readahead_distance; + int16 resume_combine_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -332,8 +345,8 @@ read_stream_start_pending_read(ReadStream *stream) /* Shrink distance: no more look-ahead until buffers are released. */ new_distance = stream->pinned_buffers + buffer_limit; - if (stream->distance > new_distance) - stream->distance = new_distance; + if (stream->readahead_distance > new_distance) + stream->readahead_distance = new_distance; /* Unless we have nothing to give the consumer, stop here. */ if (stream->pinned_buffers > 0) @@ -374,12 +387,23 @@ read_stream_start_pending_read(ReadStream *stream) * perform IO asynchronously when starting out with a small look-ahead * distance. */ - if (stream->distance > 1 && stream->ios_in_progress == 0) + if (stream->ios_in_progress == 0) { - if (stream->distance_decay_holdoff == 0) - stream->distance--; - else + if (stream->distance_decay_holdoff > 0) stream->distance_decay_holdoff--; + else + { + if (stream->readahead_distance > 1) + stream->readahead_distance--; + + /* + * XXX: Should we actually reduce this at any time other than + * a reset? For now we have to, as this is also a condition + * for re-enabling fast_path. + */ + if (stream->combine_distance > 1) + stream->combine_distance--; + } } } else @@ -448,20 +472,42 @@ static inline bool read_stream_should_look_ahead(ReadStream *stream) { /* If the callback has signaled end-of-stream, we're done */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return false; /* never start more IOs than our cap */ if (stream->ios_in_progress >= stream->max_ios) return false; + /* + * Allow looking further ahead if we are in the process of building a + * larger IO, the IO is not yet big enough and we don't yet have IO in + * flight. Note that this is allowed even if we have reached the + * read-ahead limit (but not the buffer pin limit, combine_distance is + * capped by it and we are checking for pinned_buffers == 0). + * + * The reason this is restricted to not yet having an IO in flight is that + * once we are actually reading ahead, we will not issue IOs before they + * have reached the full size (or can't be grown further). But we *have* + * to issue an IO once pinned_buffers == 0, otherwise there won't be a + * buffer to return to the caller. + * + * This is important for cases where either effective_io_concurrency is + * low or we never need to wait for IO and thus are not increasing the + * distance. Without this we would end up with lots of small IOs. + */ + if (stream->pending_read_nblocks > 0 && + stream->pinned_buffers == 0 && + stream->pending_read_nblocks < stream->combine_distance) + return true; + /* * Don't start more read-ahead if that'd put us over the distance limit * for doing read-ahead. As stream->distance is capped by * max_pinned_buffers, this prevents us from looking ahead so far that it * would put us over the pin limit. */ - if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance) + if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance) return false; return true; @@ -490,14 +536,14 @@ read_stream_should_issue_now(ReadStream *stream) * If the callback has signaled end-of-stream, start the pending read * immediately. There is no further potential for IO combining. */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return true; /* - * If we've already reached io_combine_limit, there's no chance of growing + * If we've already reached combine_distance, there's no chance of growing * the read further. */ - if (pending_read_nblocks >= stream->io_combine_limit) + if (pending_read_nblocks >= stream->combine_distance) return true; /* @@ -550,7 +596,8 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; + stream->combine_distance = 0; break; } @@ -597,7 +644,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance == 0); + Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -787,10 +834,17 @@ read_stream_begin_impl(int flags, * doing full io_combine_limit sized reads. */ if (flags & READ_STREAM_FULL) - stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); + { + stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit); + stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit); + } else - stream->distance = 1; - stream->resume_distance = stream->distance; + { + stream->readahead_distance = 1; + stream->combine_distance = 1; + } + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; /* * Since we always access the same relation, we can initialize parts of @@ -889,7 +943,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->ios_in_progress == 0); Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); - Assert(stream->distance == 1); + Assert(stream->readahead_distance == 1); + Assert(stream->combine_distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); Assert(stream->initialized_buffers > stream->oldest_buffer_index); @@ -963,7 +1018,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; stream->buffers[oldest_buffer_index] = InvalidBuffer; @@ -979,7 +1034,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return InvalidBuffer; /* @@ -993,7 +1048,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance == 0); + Assert(stream->readahead_distance == 0); return InvalidBuffer; } } @@ -1014,7 +1069,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - int32 distance; /* wider temporary value, clamped below */ + + /* wider temporary values, clamped below */ + int32 readahead_distance; + int32 combine_distance; /* Sanity check that we still agree on the buffers. */ Assert(stream->ios[io_index].op.buffers == @@ -1027,10 +1085,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (++stream->oldest_io_index == stream->max_ios) stream->oldest_io_index = 0; - /* Look-ahead distance ramps up rapidly after we do I/O. */ - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; + /* + * Read-ahead and IO combining distances ramp up rapidly after we do + * I/O. + */ + readahead_distance = stream->readahead_distance * 2; + readahead_distance = Min(readahead_distance, stream->max_pinned_buffers); + stream->readahead_distance = readahead_distance; + + combine_distance = stream->combine_distance * 2; + combine_distance = Min(combine_distance, stream->io_combine_limit); + combine_distance = Min(combine_distance, stream->max_pinned_buffers); + stream->combine_distance = combine_distance; /* * As we needed IO, prevent distance from being reduced within our @@ -1111,7 +1177,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (stream->ios_in_progress == 0 && stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && - stream->distance == 1 && + stream->readahead_distance == 1 && + stream->combine_distance == 1 && stream->pending_read_nblocks == 0 && stream->per_buffer_data_size == 0) { @@ -1157,8 +1224,10 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) BlockNumber read_stream_pause(ReadStream *stream) { - stream->resume_distance = stream->distance; - stream->distance = 0; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; + stream->readahead_distance = 0; + stream->combine_distance = 0; return InvalidBlockNumber; } @@ -1170,7 +1239,8 @@ read_stream_pause(ReadStream *stream) void read_stream_resume(ReadStream *stream) { - stream->distance = stream->resume_distance; + stream->readahead_distance = stream->resume_readahead_distance; + stream->combine_distance = stream->resume_combine_distance; } /* @@ -1186,7 +1256,8 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = 0; + stream->readahead_distance = 0; + stream->combine_distance = 0; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; @@ -1218,8 +1289,10 @@ read_stream_reset(ReadStream *stream) Assert(stream->ios_in_progress == 0); /* Start off assuming data is cached. */ - stream->distance = 1; - stream->resume_distance = stream->distance; + stream->readahead_distance = 1; + stream->combine_distance = 1; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; stream->distance_decay_holdoff = 0; } -- 2.53.0.1.gb2826b52eb