From 03fd67881b935815cff42e4dbe040f4f01b591d9 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 30 Mar 2026 12:25:07 -0400 Subject: [PATCH v19 16/18] WIP: read stream: Split decision about look ahead for AIO and combining Previous commits caused a regression due to the this conflation. This is a first attempt at fixing the problem. Needs significant reordering and splitting if it works out. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/storage/aio/read_stream.c | 242 +++++++++++++++++++++----- 1 file changed, 195 insertions(+), 47 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 144b3613c..1c375edad 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -98,10 +98,14 @@ struct ReadStream int16 max_pinned_buffers; int16 forwarded_buffers; int16 pinned_buffers; - int16 distance; + /* limit of how far to read ahead for IO combining */ + int16 combine_distance; + /* limit of how far to read ahead for starting IO early */ + int16 readahead_distance; uint16 distance_decay_holdoff; int16 initialized_buffers; - int16 resume_distance; + int16 resume_readahead_distance; + int16 resume_combine_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream) /* Shrink distance: no more look-ahead until buffers are released. */ new_distance = stream->pinned_buffers + buffer_limit; - if (stream->distance > new_distance) - stream->distance = new_distance; + if (stream->readahead_distance > new_distance) + stream->readahead_distance = new_distance; /* Unless we have nothing to give the consumer, stop here. */ if (stream->pinned_buffers > 0) @@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream) * perform IO asynchronously when starting out with a small look-ahead * distance. */ - if (stream->distance > 1 && stream->ios_in_progress == 0) + if (stream->ios_in_progress == 0) { - if (stream->distance_decay_holdoff == 0) - stream->distance--; - else + if (stream->distance_decay_holdoff > 0) stream->distance_decay_holdoff--; + else + { + if (stream->readahead_distance > 1) + stream->readahead_distance--; + + /* + * XXX: Should we actually reduce this at any time other than + * a reset? For now we have to, as this is also a condition + * for re-enabling fast_path. + */ + if (stream->combine_distance > 1) + stream->combine_distance--; + } } } else @@ -440,6 +455,101 @@ read_stream_start_pending_read(ReadStream *stream) return true; } +/* + * Should we continue to perform look ahead? The look ahead may allow us to + * make the pending IO larger via IO combining or to issue more read ahead. + */ +static bool +read_stream_should_look_ahead(ReadStream *stream) +{ + /* never start more IOs than our cap */ + if (stream->ios_in_progress >= stream->max_ios) + return false; + + /* If the callback has signaled end-of-stream, we're done */ + if (stream->readahead_distance <= 0) + return false; + + /* never pin more buffers than allowed */ + if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers) + return false; + + /* + * Allow looking further ahead if we have an the process of building a + * larger IO, the IO is not yet big enough and we don't yet have IO in + * flight. Note that this is allowed even if we are reaching the + * readahead limit (but not the buffer pin limit). + * + * This is important for cases where either effective_io_concurrency is + * low or we never need to wait for IO and thus are not increasing the + * distance. Without this we would end up with lots of small IOs. + */ + if (stream->pending_read_nblocks > 0 && + stream->pinned_buffers == 0 && + stream->pending_read_nblocks < stream->combine_distance) + return true; + + /* + * Don't start more readahead if that'd put us over the limit for doing + * readahead. + */ + if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance) + return false; + + return true; +} + + +/* + * We don't start the pending read just because we've hit the distance limit, + * preferring to give it another chance to grow to full io_combine_limit size + * once more buffers have been consumed. But this is not desirable in all + * situations - see below. + */ +static bool +read_stream_should_issue_now(ReadStream *stream) +{ + int16 pending_read_nblocks = stream->pending_read_nblocks; + + /* no IO to issue */ + if (pending_read_nblocks == 0) + return false; + + /* never start more IOs than our cap */ + if (stream->ios_in_progress >= stream->max_ios) + return false; + + /* + * If the callback has signaled end-of-stream, start the read + * immediately. There's no deferring it for later. + */ + if (stream->readahead_distance <= 0) + return true; + + /* + * If we've already reached io_combine_limit, there's no chance of growing + * the read further. + */ + if (pending_read_nblocks >= stream->io_combine_limit) + return true; + + /* same if capped not by io_combine_limit but combine_distance */ + if (stream->combine_distance > 0 && + pending_read_nblocks >= stream->combine_distance) + return true; + + /* + * If we currently have no reads in flight or prepared, issue the IO once + * we stopped looking ahead. This ensures there's always at least one IO + * prepared. + */ + if (stream->pinned_buffers == 0 && + !read_stream_should_look_ahead(stream)) + return true; + + return false; +} + static void read_stream_look_ahead(ReadStream *stream) { @@ -452,14 +562,13 @@ read_stream_look_ahead(ReadStream *stream) if (stream->batch_mode) pgaio_enter_batchmode(); - while (stream->ios_in_progress < stream->max_ios && - stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + while (read_stream_should_look_ahead(stream)) { BlockNumber blocknum; int16 buffer_index; void *per_buffer_data; - if (stream->pending_read_nblocks == stream->io_combine_limit) + if (read_stream_should_issue_now(stream)) { read_stream_start_pending_read(stream); continue; @@ -479,7 +588,8 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; + stream->combine_distance = 0; break; } @@ -511,21 +621,13 @@ read_stream_look_ahead(ReadStream *stream) } /* - * We don't start the pending read just because we've hit the distance - * limit, preferring to give it another chance to grow to full - * io_combine_limit size once more buffers have been consumed. However, - * if we've already reached io_combine_limit, or we've reached the - * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. Note that the - * pending read can exceed the distance goal, if the latter was reduced - * after hitting the per-backend buffer limit. + * Check if the pending read should be issued now, or if we should give it + * another chance to grow to the full size. + * + * Note that the pending read can exceed the distance goal, if the latter + * was reduced after hitting the per-backend buffer limit. */ - if (stream->pending_read_nblocks > 0 && - (stream->pending_read_nblocks == stream->io_combine_limit || - (stream->pending_read_nblocks >= stream->distance && - stream->pinned_buffers == 0) || - stream->distance <= 0) && - stream->ios_in_progress < stream->max_ios) + if (read_stream_should_issue_now(stream)) read_stream_start_pending_read(stream); /* @@ -534,7 +636,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance <= 0); + Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -724,10 +826,17 @@ read_stream_begin_impl(int flags, * doing full io_combine_limit sized reads. */ if (flags & READ_STREAM_FULL) - stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); + { + stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit); + stream->combine_distance = stream->io_combine_limit; + } else - stream->distance = 1; - stream->resume_distance = stream->distance; + { + stream->readahead_distance = 1; + stream->combine_distance = 1; + } + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; /* * Since we always access the same relation, we can initialize parts of @@ -826,7 +935,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->ios_in_progress == 0); Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); - Assert(stream->distance == 1); + Assert(stream->readahead_distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); Assert(stream->initialized_buffers > stream->oldest_buffer_index); @@ -900,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; stream->buffers[oldest_buffer_index] = InvalidBuffer; @@ -916,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance <= 0) + if (stream->readahead_distance <= 0) return InvalidBuffer; /* @@ -930,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance <= 0); + Assert(stream->readahead_distance <= 0); return InvalidBuffer; } } @@ -951,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - int32 distance; /* wider temporary value, clamped below */ bool needed_wait; /* Sanity check that we still agree on the buffers. */ @@ -962,7 +1070,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * If the stream has been reset, don't even wait for the IO, just * discard it. */ - if (stream->distance < 0) + if (stream->readahead_distance < 0) { if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) && !stream->ios[io_index].op.foreign_io) @@ -1011,11 +1119,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * the stream, as stream->distance == 0 is used to keep track of * having reached the end. */ - if (stream->distance > 0 && needed_wait) + if (stream->readahead_distance > 0 && needed_wait) { - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; + /* wider temporary value, due to oveflow risk */ + int32 readahead_distance; + + readahead_distance = stream->readahead_distance * 2; + readahead_distance = Min(readahead_distance, stream->max_pinned_buffers); + stream->readahead_distance = readahead_distance; + } + + /* + * Whether we needed to wait or not, allow for more IO combining if we + * needed to do IO. The reason to do so independent of needing to wait + * is that when the data is resident in the kernel page cache, IO + * combining reduces the syscall / dispatch overhead, making it + * worthwhile regardless of needing to wait. + * + * It is also important with io_uring as it will never signal the need + * to wait for reads if all the data is in the page cache. There are + * heuristics to deal with that in method_io_uring.c, but they only + * work when the IO gets large enough. + */ + if (stream->combine_distance > 0 && + stream->combine_distance < stream->io_combine_limit) + { + /* wider temporary value, due to oveflow risk */ + int32 combine_distance; + + combine_distance = stream->combine_distance * 2; + combine_distance = Min(combine_distance, stream->io_combine_limit); + combine_distance = Min(combine_distance, stream->max_pinned_buffers); + stream->combine_distance = combine_distance; } /* @@ -1094,10 +1229,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ + /* + * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's + * several pre-existing cases where it triggers because we are not issuing + * additional prefetching (e.g. because of a small + * effective_io_concurrency) and thus stream->pinned_buffers stays at 1 + * after read_stream_look_ahead(). + */ if (stream->ios_in_progress == 0 && stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && - stream->distance == 1 && + stream->readahead_distance == 1 && + stream->combine_distance == 1 && stream->pending_read_nblocks == 0 && stream->per_buffer_data_size == 0) { @@ -1143,8 +1286,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) BlockNumber read_stream_pause(ReadStream *stream) { - stream->resume_distance = stream->distance; - stream->distance = 0; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; + stream->readahead_distance = 0; return InvalidBlockNumber; } @@ -1156,7 +1300,8 @@ read_stream_pause(ReadStream *stream) void read_stream_resume(ReadStream *stream) { - stream->distance = stream->resume_distance; + stream->readahead_distance = stream->resume_readahead_distance; + stream->combine_distance = stream->resume_combine_distance; } /* @@ -1172,7 +1317,8 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = -1; + stream->readahead_distance = -1; + stream->combine_distance = -1; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; @@ -1204,8 +1350,10 @@ read_stream_reset(ReadStream *stream) Assert(stream->ios_in_progress == 0); /* Start off assuming data is cached. */ - stream->distance = 1; - stream->resume_distance = stream->distance; + stream->readahead_distance = 1; + stream->combine_distance = 1; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; stream->distance_decay_holdoff = 0; } -- 2.53.0