From e0d086eba083b105dfc59120d9a8d043602c1cb7 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 31 Mar 2026 12:45:41 -0400 Subject: [PATCH v4 06/14] WIP: read stream: Split decision about look ahead for AIO and combining "read_stream: Only increase distance when waiting for IO" caused a regression due to the this conflation, because we ended up not allowing IO combining when never needing to wait for IO (as the distance ended up too small to allow for that), which increased CPU overhead. If we go this way, it probably should be moved ahead of the aforementioned commit. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/storage/aio/read_stream.c | 147 ++++++++++++++++++++------ 1 file changed, 115 insertions(+), 32 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 5d7bfb8fa00..9d22a119bef 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -98,10 +98,14 @@ struct ReadStream int16 max_pinned_buffers; int16 forwarded_buffers; int16 pinned_buffers; - int16 distance; + /* limit of how far to look ahead for IO combining */ + int16 combine_distance; + /* limit of how far to look ahead for starting IO early */ + int16 readahead_distance; uint16 distance_decay_holdoff; int16 initialized_buffers; - int16 resume_distance; + int16 resume_readahead_distance; + int16 resume_combine_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream) /* Shrink distance: no more look-ahead until buffers are released. */ new_distance = stream->pinned_buffers + buffer_limit; - if (stream->distance > new_distance) - stream->distance = new_distance; + if (stream->readahead_distance > new_distance) + stream->readahead_distance = new_distance; /* Unless we have nothing to give the consumer, stop here. */ if (stream->pinned_buffers > 0) @@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream) * perform IO asynchronously when starting out with a small look-ahead * distance. */ - if (stream->distance > 1 && stream->ios_in_progress == 0) + if (stream->ios_in_progress == 0) { - if (stream->distance_decay_holdoff == 0) - stream->distance--; - else + if (stream->distance_decay_holdoff > 0) stream->distance_decay_holdoff--; + else + { + if (stream->readahead_distance > 1) + stream->readahead_distance--; + + /* + * XXX: Should we actually reduce this at any time other than + * a reset? For now we have to, as this is also a condition + * for re-enabling fast_path. + */ + if (stream->combine_distance > 1) + stream->combine_distance--; + } } } else @@ -452,18 +467,33 @@ read_stream_should_look_ahead(ReadStream *stream) return false; /* If the callback has signaled end-of-stream, we're done */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return false; /* never pin more buffers than allowed */ if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers) return false; + /* + * Allow looking further ahead if we have an the process of building a + * larger IO, the IO is not yet big enough and we don't yet have IO in + * flight. Note that this is allowed even if we are reaching the + * read-ahead limit (but not the buffer pin limit). + * + * This is important for cases where either effective_io_concurrency is + * low or we never need to wait for IO and thus are not increasing the + * distance. Without this we would end up with lots of small IOs. + */ + if (stream->pending_read_nblocks > 0 && + stream->pinned_buffers == 0 && + stream->pending_read_nblocks < stream->combine_distance) + return true; + /* * Don't start more read-ahead if that'd put us over the distance limit * for doing read-ahead. */ - if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance) + if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance) return false; return true; @@ -492,7 +522,7 @@ read_stream_should_issue_now(ReadStream *stream) * If the callback has signaled end-of-stream, start the pending read * immediately. There is no further potential for IO combining. */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return true; /* @@ -502,6 +532,11 @@ read_stream_should_issue_now(ReadStream *stream) if (pending_read_nblocks >= stream->io_combine_limit) return true; + /* same if capped not by io_combine_limit but combine_distance */ + if (stream->combine_distance > 0 && + pending_read_nblocks >= stream->combine_distance) + return true; + /* * If we currently have no reads in flight or prepared, issue the IO once * we are not looking ahead further. This ensures there's always at least @@ -552,7 +587,8 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; + stream->combine_distance = 0; break; } @@ -599,7 +635,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance == 0); + Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -789,10 +825,17 @@ read_stream_begin_impl(int flags, * doing full io_combine_limit sized reads. */ if (flags & READ_STREAM_FULL) - stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); + { + stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit); + stream->combine_distance = stream->io_combine_limit; + } else - stream->distance = 1; - stream->resume_distance = stream->distance; + { + stream->readahead_distance = 1; + stream->combine_distance = 1; + } + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; /* * Since we always access the same relation, we can initialize parts of @@ -891,7 +934,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->ios_in_progress == 0); Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); - Assert(stream->distance == 1); + Assert(stream->readahead_distance == 1); + Assert(stream->combine_distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); Assert(stream->initialized_buffers > stream->oldest_buffer_index); @@ -965,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; stream->buffers[oldest_buffer_index] = InvalidBuffer; @@ -981,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return InvalidBuffer; /* @@ -995,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance == 0); + Assert(stream->readahead_distance == 0); return InvalidBuffer; } } @@ -1016,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - int32 distance; /* wider temporary value, clamped below */ bool needed_wait; /* Sanity check that we still agree on the buffers. */ @@ -1056,11 +1099,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * the stream, as stream->distance == 0 is used to keep track of * having reached the end. */ - if (stream->distance > 0 && needed_wait) + if (stream->readahead_distance > 0 && needed_wait) { - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; + /* wider temporary value, due to oveflow risk */ + int32 readahead_distance; + + readahead_distance = stream->readahead_distance * 2; + readahead_distance = Min(readahead_distance, stream->max_pinned_buffers); + stream->readahead_distance = readahead_distance; + } + + /* + * Whether we needed to wait or not, allow for more IO combining if we + * needed to do IO. The reason to do so independent of needing to wait + * is that when the data is resident in the kernel page cache, IO + * combining reduces the syscall / dispatch overhead, making it + * worthwhile regardless of needing to wait. + * + * It is also important with io_uring as it will never signal the need + * to wait for reads if all the data is in the page cache. There are + * heuristics to deal with that in method_io_uring.c, but they only + * work when the IO gets large enough. + */ + if (stream->combine_distance > 0 && + stream->combine_distance < stream->io_combine_limit) + { + /* wider temporary value, due to oveflow risk */ + int32 combine_distance; + + combine_distance = stream->combine_distance * 2; + combine_distance = Min(combine_distance, stream->io_combine_limit); + combine_distance = Min(combine_distance, stream->max_pinned_buffers); + stream->combine_distance = combine_distance; } /* @@ -1139,10 +1209,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ + /* + * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's + * several pre-existing cases where it triggers because we are not issuing + * additional prefetching (e.g. because of a small + * effective_io_concurrency) and thus stream->pinned_buffers stays at 1 + * after read_stream_look_ahead(). + */ if (stream->ios_in_progress == 0 && stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && - stream->distance == 1 && + stream->readahead_distance == 1 && + stream->combine_distance == 1 && stream->pending_read_nblocks == 0 && stream->per_buffer_data_size == 0) { @@ -1188,8 +1266,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) BlockNumber read_stream_pause(ReadStream *stream) { - stream->resume_distance = stream->distance; - stream->distance = 0; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; + stream->readahead_distance = 0; return InvalidBlockNumber; } @@ -1201,7 +1280,8 @@ read_stream_pause(ReadStream *stream) void read_stream_resume(ReadStream *stream) { - stream->distance = stream->resume_distance; + stream->readahead_distance = stream->resume_readahead_distance; + stream->combine_distance = stream->resume_combine_distance; } /* @@ -1217,7 +1297,8 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = 0; + stream->readahead_distance = 0; + stream->combine_distance = 0; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; @@ -1249,8 +1330,10 @@ read_stream_reset(ReadStream *stream) Assert(stream->ios_in_progress == 0); /* Start off assuming data is cached. */ - stream->distance = 1; - stream->resume_distance = stream->distance; + stream->readahead_distance = 1; + stream->combine_distance = 1; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; stream->distance_decay_holdoff = 0; } -- 2.53.0.1.gb2826b52eb