From ac2dd3de67deef67bc15207944e72263c5625905 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Wed, 6 Aug 2025 16:23:13 +1200 Subject: [PATCH 1/2] Experiment: Peeking ahead in I/O streams. Allow consumers of self-referential block streams to install a callback that gets a chance to "peek" at each block as soon as possible. Consumers of self-referential block streams can use this to feed navigational information to the block number callback much earlier in the pipeline while they are still consuming older blocks. This can't help if there is an IO stall in the way of progress at every jump, but some mixed hit/miss patterns may be able to generate some extra concurrency this way. XXX highly experimental idea exploration... --- src/backend/storage/aio/read_stream.c | 74 ++++++++++++++++++++++++--- src/include/storage/bufmgr.h | 13 ++++- src/include/storage/read_stream.h | 8 +++ 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 0e7f5557f5c..2c64ce9bbd1 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -114,9 +114,12 @@ struct ReadStream /* * The callback that will tell us which block numbers to read, and an - * opaque pointer that will be pass to it for its own purposes. + * opaque pointer that will be pass to it for its own purposes. Optional + * peek_callback will have a chance to examine data in buffers some time + * before they are returned. */ ReadStreamBlockNumberCB callback; + ReadStreamPeekCB peek_callback; void *callback_private_data; /* Next expected block, for detecting sequential access. */ @@ -140,6 +143,7 @@ struct ReadStream /* Circular queue of buffers. */ int16 oldest_buffer_index; /* Next pinned buffer to return */ + int16 peek_buffer_index; /* Next pinned buffer to peek at */ int16 next_buffer_index; /* Index of next buffer to pin */ Buffer buffers[FLEXIBLE_ARRAY_MEMBER]; }; @@ -216,6 +220,26 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } +/* + * Pass as many pinned and valid buffers as we can to peek_callback. + */ +static void +read_stream_peek(ReadStream *stream) +{ + while (stream->peek_buffer_index != stream->next_buffer_index && + !(stream->ios_in_progress > 0 && + stream->peek_buffer_index == + stream->ios[stream->oldest_io_index].buffer_index)) + { + stream->peek_callback(stream, + stream->callback_private_data, + get_per_buffer_data(stream, stream->peek_buffer_index), + stream->buffers[stream->peek_buffer_index]); + if (++stream->peek_buffer_index == stream->queue_size) + stream->peek_buffer_index = 0; + } +} + /* * Start as much of the current pending read as we can. If we have to split it * because of the per-backend buffer limit, or the buffer manager decides to @@ -429,6 +453,14 @@ read_stream_look_ahead(ReadStream *stream) continue; } + /* + * New naviation information might be available if we got a cache hit + * in a self-referential stream, so invoke peek_callback before every + * block number callback. + */ + if (stream->peek_callback) + read_stream_peek(stream); + /* * See which block the callback wants next in the stream. We need to * compute the index of the Nth block of the pending read including @@ -791,6 +823,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->peek_callback == NULL); Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ @@ -888,15 +921,24 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(BufferIsValid(buffer)); /* Do we have to wait for an associated I/O first? */ - if (stream->ios_in_progress > 0 && - stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) + while (stream->ios_in_progress > 0) { int16 io_index = stream->oldest_io_index; int32 distance; /* wider temporary value, clamped below */ - /* Sanity check that we still agree on the buffers. */ - Assert(stream->ios[io_index].op.buffers == - &stream->buffers[oldest_buffer_index]); + /* + * If the oldest IO covers the buffer we're returning, we have to wait + * for it. Otherwise, process as many as we can opportunistically + * without stalling. + * + * XXX This works for io_method=worker because its IOs progress is + * autonomous. For io_method=io_uring it doesn't because this backend + * has to drain the completion queue. In theory you could check with + * a cheap load if the cqe has data. + */ + if (stream->ios[stream->oldest_io_index].buffer_index != oldest_buffer_index && + WaitReadBuffersMightStall(&stream->ios[io_index].op)) + break; WaitReadBuffers(&stream->ios[io_index].op); @@ -920,6 +962,13 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->seq_until_processed = InvalidBlockNumber; } + /* + * Invoke peek_callback so it can see the oldest buffer, before we zap it. + * XXX Could reorder things and not need this + */ + if (stream->peek_callback) + read_stream_peek(stream); + /* * We must zap this queue entry, or else it would appear as a forwarded * buffer. If it's potentially in the overflow zone (ie from a @@ -977,7 +1026,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && - stream->per_buffer_data_size == 0) + stream->per_buffer_data_size == 0 && + stream->peek_callback == NULL) { stream->fast_path = true; } @@ -1057,3 +1107,13 @@ read_stream_end(ReadStream *stream) read_stream_reset(stream); pfree(stream); } + +/* + * Install a callback that will have a chance to examine buffers as soon as + * possible. + */ +void +read_stream_set_peek_callback(ReadStream *stream, ReadStreamPeekCB peek_callback) +{ + stream->peek_callback = peek_callback; +} diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 41fdc1e7693..4ea522e0d4a 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -15,7 +15,7 @@ #define BUFMGR_H #include "port/pg_iovec.h" -#include "storage/aio_types.h" +#include "storage/aio.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -228,6 +228,17 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation, int flags); extern void WaitReadBuffers(ReadBuffersOperation *operation); +/* + * Return true if a ReadBuffersOperation is not known to be physically + * completed already. + */ +static inline bool +WaitReadBuffersMightStall(ReadBuffersOperation *operation) +{ + return !pgaio_wref_valid(&operation->io_wref) || + !pgaio_wref_check_done(&operation->io_wref); +} + extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern bool BufferIsExclusiveLocked(Buffer buffer); diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9b0d65161d0..c735bb0c6ab 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -78,6 +78,12 @@ typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data); +/* Callback that has a chance to peek at buffers when they are ready. */ +typedef void (*ReadStreamPeekCB) (ReadStream *stream, + void *callback_private_data, + void *per_buffer_data, + Buffer buffer); + extern BlockNumber block_range_read_stream_cb(ReadStream *stream, void *callback_private_data, void *per_buffer_data); @@ -99,6 +105,8 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); +extern void read_stream_set_peek_callback(ReadStream *stream, + ReadStreamPeekCB peek_callback); extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); -- 2.39.5 (Apple Git-154)