From c643190d1faaca87d9e8917252af9c87686b7fa3 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 19 Mar 2026 23:06:58 -0400 Subject: [PATCH v19 15/18] Hacky implementation of making read_stream_reset()/end() not wait for IO Not waiting for IO during read_stream_reset() can be important for performance in cases where read streams are frequently reset before the end is reached. Current users do not commonly do that, but the upcoming work to use a read stream to prefetch table blocks as part of index scans can do so frequently in some query patterns. E.g. if there is an index scan on the inner side of a nested loop. FIXME: This implementation is problematic though, as there is nothing forcing the discarded IOs to ever be completed if the backend goes idle. That's at the very least problematic because it leads to the underlying buffers continuing to be pinned and the IOs showing up in the pg_aios view. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/include/storage/aio.h | 1 + src/backend/storage/aio/aio.c | 27 ++++++++++++++++++++++ src/backend/storage/aio/read_stream.c | 32 ++++++++++++++++++++++----- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index ec543b784..c184e97a9 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -328,6 +328,7 @@ extern int pgaio_wref_get_id(PgAioWaitRef *iow); extern void pgaio_wref_wait(PgAioWaitRef *iow); extern bool pgaio_wref_check_done(PgAioWaitRef *iow); +extern void pgaio_wref_discard_result(PgAioWaitRef *iow); /* -------------------------------------------------------------------------------- diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 4e742038d..49eb677ad 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -1055,6 +1055,33 @@ pgaio_wref_check_done(PgAioWaitRef *iow) return false; } +void +pgaio_wref_discard_result(PgAioWaitRef *iow) +{ + uint64 ref_generation; + bool am_owner; + PgAioHandle *ioh; + PgAioHandleState state; + + ioh = pgaio_io_from_wref(iow, &ref_generation); + + am_owner = ioh->owner_procno == MyProcNumber; + + if (!am_owner) + elog(ERROR, "not you"); + + if (pgaio_io_was_recycled(ioh, ref_generation, &state)) + return; + + pgaio_debug_io(DEBUG2, ioh, + "discarding result %p", + ioh->report_return); + + if (ioh->resowner) + pgaio_io_release_resowner(&ioh->resowner_node, false); +} + + /* -------------------------------------------------------------------------------- diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 49971833d..144b3613c 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -524,7 +524,7 @@ read_stream_look_ahead(ReadStream *stream) (stream->pending_read_nblocks == stream->io_combine_limit || (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || - stream->distance == 0) && + stream->distance <= 0) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream); @@ -534,7 +534,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance == 0); + Assert(stream->pinned_buffers > 0 || stream->distance <= 0); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -916,7 +916,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance == 0) + if (stream->distance <= 0) return InvalidBuffer; /* @@ -930,7 +930,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance == 0); + Assert(stream->distance <= 0); return InvalidBuffer; } } @@ -958,7 +958,27 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->ios[io_index].op.buffers == &stream->buffers[oldest_buffer_index]); - needed_wait = WaitReadBuffers(&stream->ios[io_index].op); + /* + * If the stream has been reset, don't even wait for the IO, just + * discard it. + */ + if (stream->distance < 0) + { + if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) && + !stream->ios[io_index].op.foreign_io) + { + pgaio_wref_discard_result(&stream->ios[io_index].op.io_wref); + pgaio_wref_clear(&stream->ios[io_index].op.io_wref); + } + else + WaitReadBuffers(&stream->ios[io_index].op); + + needed_wait = false; + } + else + { + needed_wait = WaitReadBuffers(&stream->ios[io_index].op); + } Assert(stream->ios_in_progress > 0); stream->ios_in_progress--; @@ -1152,7 +1172,7 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = 0; + stream->distance = -1; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; -- 2.53.0