From 2df6f67d199cfc0e0aa321830df2ee712057c61a Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 21 Jul 2025 13:58:47 +1200 Subject: [PATCH 1/3] aio: Refactor read_stream.c end-of-stream state. Previously, stream->distance was set to 0 to indicate that the end of the stream had been reached. In preparation for a later commit that preserves the distance across reset operations, introduce a separate end_of_stream flag instead, so it survives across restart operations. No externally visible effects. Discussion: https://postgr.es/m/CA%2BhUKG%2BWWr4-8TYemyU%3DucQsNe6bUBN_Sq3mCnBoBtxaJ9w3ug%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 0e7f5557f5c..5de6f83c253 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -137,6 +137,7 @@ struct ReadStream int16 next_io_index; bool fast_path; + bool end_of_stream; /* Circular queue of buffers. */ int16 oldest_buffer_index; /* Next pinned buffer to return */ @@ -429,6 +430,9 @@ read_stream_look_ahead(ReadStream *stream) continue; } + if (stream->end_of_stream) + break; + /* * See which block the callback wants next in the stream. We need to * compute the index of the Nth block of the pending read including @@ -443,7 +447,7 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ - stream->distance = 0; + stream->end_of_stream = true; break; } @@ -488,7 +492,7 @@ read_stream_look_ahead(ReadStream *stream) (stream->pending_read_nblocks == stream->io_combine_limit || (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || - stream->distance == 0) && + stream->end_of_stream) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream); @@ -498,7 +502,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance == 0); + Assert(stream->pinned_buffers > 0 || stream->end_of_stream); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -789,6 +793,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); + Assert(stream->end_of_stream == false); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); Assert(stream->initialized_buffers > stream->oldest_buffer_index); @@ -841,7 +846,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ - stream->distance = 0; + stream->end_of_stream = true; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; stream->buffers[oldest_buffer_index] = InvalidBuffer; @@ -857,7 +862,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance == 0) + if (stream->end_of_stream) return InvalidBuffer; /* @@ -871,7 +876,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance == 0); + Assert(stream->end_of_stream); return InvalidBuffer; } } @@ -976,6 +981,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && + stream->end_of_stream == false && stream->pending_read_nblocks == 0 && stream->per_buffer_data_size == 0) { @@ -1013,7 +1019,7 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = 0; + stream->end_of_stream = true; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; @@ -1046,6 +1052,7 @@ read_stream_reset(ReadStream *stream) /* Start off assuming data is cached. */ stream->distance = 1; + stream->end_of_stream = false; } /* -- 2.39.5 (Apple Git-154)