From d572c9f86aee9fdebd91ec7f17662441e9d1e4c8 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 19 Mar 2026 23:06:58 -0400 Subject: [PATCH v17 18/18] Dirty hack to make read_stream_reset()/end() not wait for IO Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/include/storage/aio.h | 1 + src/backend/storage/aio/aio.c | 27 ++++++++++++++++++++++ src/backend/storage/aio/read_stream.c | 32 ++++++++++++++++++++++----- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index ec543b784..c184e97a9 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -328,6 +328,7 @@ extern int pgaio_wref_get_id(PgAioWaitRef *iow); extern void pgaio_wref_wait(PgAioWaitRef *iow); extern bool pgaio_wref_check_done(PgAioWaitRef *iow); +extern void pgaio_wref_discard_result(PgAioWaitRef *iow); /* -------------------------------------------------------------------------------- diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 4e742038d..49eb677ad 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -1055,6 +1055,33 @@ pgaio_wref_check_done(PgAioWaitRef *iow) return false; } +void +pgaio_wref_discard_result(PgAioWaitRef *iow) +{ + uint64 ref_generation; + bool am_owner; + PgAioHandle *ioh; + PgAioHandleState state; + + ioh = pgaio_io_from_wref(iow, &ref_generation); + + am_owner = ioh->owner_procno == MyProcNumber; + + if (!am_owner) + elog(ERROR, "not you"); + + if (pgaio_io_was_recycled(ioh, ref_generation, &state)) + return; + + pgaio_debug_io(DEBUG2, ioh, + "discarding result %p", + ioh->report_return); + + if (ioh->resowner) + pgaio_io_release_resowner(&ioh->resowner_node, false); +} + + /* -------------------------------------------------------------------------------- diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index e3c16bd17..d30cda868 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -524,7 +524,7 @@ read_stream_look_ahead(ReadStream *stream) (stream->pending_read_nblocks == stream->io_combine_limit || (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || - stream->distance == 0) && + stream->distance <= 0) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream); @@ -534,7 +534,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance == 0); + Assert(stream->pinned_buffers > 0 || stream->distance <= 0); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -910,7 +910,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance == 0) + if (stream->distance <= 0) return InvalidBuffer; /* @@ -924,7 +924,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance == 0); + Assert(stream->distance <= 0); return InvalidBuffer; } } @@ -952,7 +952,27 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->ios[io_index].op.buffers == &stream->buffers[oldest_buffer_index]); - needed_wait = WaitReadBuffers(&stream->ios[io_index].op); + /* + * If the stream has been reset, don't even wait for the IO, just + * discard it. + */ + if (stream->distance < 0) + { + if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) && + !stream->ios[io_index].op.foreign_io) + { + pgaio_wref_discard_result(&stream->ios[io_index].op.io_wref); + pgaio_wref_clear(&stream->ios[io_index].op.io_wref); + } + else + WaitReadBuffers(&stream->ios[io_index].op); + + needed_wait = false; + } + else + { + needed_wait = WaitReadBuffers(&stream->ios[io_index].op); + } Assert(stream->ios_in_progress > 0); stream->ios_in_progress--; @@ -1129,7 +1149,7 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = 0; + stream->distance = -1; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; -- 2.53.0