From cf41f56ba0d8c7f3bf92242020f742f201cf08d6 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 18 Nov 2025 14:24:12 -0500 Subject: [PATCH] resume --- src/backend/storage/aio/read_stream.c | 19 +++++++++++++++++++ src/include/storage/read_stream.h | 1 + 2 files changed, 20 insertions(+) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 031fde9f4cb..75bf92dc683 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -100,6 +100,7 @@ struct ReadStream int16 pinned_buffers; int16 distance; int16 initialized_buffers; + int16 resume_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -464,6 +465,7 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ + stream->resume_distance = stream->distance; stream->distance = 0; break; } @@ -711,6 +713,7 @@ read_stream_begin_impl(int flags, stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); else stream->distance = 1; + stream->resume_distance = stream->distance; /* * Since we always access the same relation, we can initialize parts of @@ -862,6 +865,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ + stream->resume_distance = stream->distance; stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; @@ -1034,6 +1038,19 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) return read_stream_get_block(stream, NULL); } +/* + * Resume looking ahead after the block number callback reported end-of-stream. + * This is useful for streams of self-referential blocks, after a buffer needed + * to be consumed and examined to find more block numbers. + */ +void +read_stream_resume(ReadStream *stream) +{ + if (stream->pinned_buffers > 0) + elog(ERROR, "read stream must be exhausted before resuming"); + stream->distance = stream->resume_distance; +} + /* * Reset a read stream by releasing any queued up buffers, allowing the stream * to be used again for different blocks. This can be used to clear an @@ -1047,6 +1064,8 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ + if (stream->distance > 0) + stream->resume_distance = stream->distance; stream->distance = 0; /* Forget buffered block number and fast path state. */ diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9b0d65161d0..e29ac50fc9e 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -99,6 +99,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); +extern void read_stream_resume(ReadStream *stream); extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); -- 2.47.3