From 39ac7acb4944758ae9c1c243e9da09f99dafade0 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH v2.11 10/27] Support buffer forwarding in read_stream.c. In preparation for a following change to the buffer manager, teach streams to keep track of buffers that were "forwarded" from one call to StartReadBuffers() to the next. Since StartReadBuffers() buffers argument will become an in/out argument, we need to initialize the buffer queue entries with InvalidBuffer. We don't want to do that up front, because we try to keep stream initialization cheap for code that uses the fast path and stays in one single buffer queue element. Satisfy both goals by initializing the queue incrementally on the first cycle. Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 103 +++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 10 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 45bdf819d57..18ecf4affc7 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -95,8 +95,10 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; + int16 forwarded_buffers; int16 pinned_buffers; int16 distance; + int16 initialized_buffers; bool advice_enabled; bool temporary; @@ -224,8 +226,10 @@ static bool read_stream_start_pending_read(ReadStream *stream) { bool need_wait; + int requested_nblocks; int nblocks; int flags; + int forwarded; int16 io_index; int16 overflow; int16 buffer_index; @@ -272,11 +276,20 @@ read_stream_start_pending_read(ReadStream *stream) } } - /* How many more buffers is this backend allowed? */ + /* + * How many more buffers is this backend allowed? + * + * If we already have some forwarded buffers, we can certainly use those. + * They are already pinned, and are mapped to the starting blocks of the + * pending read, they just don't have any I/O started yet and are not + * counted in stream->pinned_buffers. + */ if (stream->temporary) buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); else buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + buffer_limit += stream->forwarded_buffers; if (buffer_limit == 0 && stream->pinned_buffers == 0) buffer_limit = 1; /* guarantee progress */ @@ -303,8 +316,31 @@ read_stream_start_pending_read(ReadStream *stream) * We say how many blocks we want to read, but it may be smaller on return * if the buffer manager decides to shorten the read. */ + requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks); + nblocks = requested_nblocks; buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; + + /* + * The first time around the queue we initialize it as we go, including + * the overflow zone, because otherwise the entries would appear as + * forwarded buffers. This avoids initializing the whole queue up front + * in cases where it is large but we don't ever use it due to the + * all-cached fast path or small scans. + */ + while (stream->initialized_buffers < buffer_index + nblocks) + stream->buffers[stream->initialized_buffers++] = InvalidBuffer; + + /* + * Start the I/O. Any buffers that are not InvalidBuffer will be + * interpreted as already pinned, forwarded by an earlier call to + * StartReadBuffers(), and must map to the expected blocks. The nblocks + * value may be smaller on return indicating the size of the I/O that + * could be started. Buffers beyond the output nblocks number may also + * have been pinned without starting I/O due to various edge cases. In + * that case we'll just leave them in the queue ahead of us, "forwarded" + * to the next call, avoiding the need to unpin/repin. + */ need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -333,16 +369,35 @@ read_stream_start_pending_read(ReadStream *stream) stream->seq_blocknum = stream->pending_read_blocknum + nblocks; } + /* + * How many pins were acquired but forwarded to the next call? These need + * to be passed to the next StartReadBuffers() call, or released if the + * stream ends early. We need the number for accounting purposes, since + * they are not counted in stream->pinned_buffers but we already hold + * them. + */ + forwarded = 0; + while (nblocks + forwarded < requested_nblocks && + stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) + forwarded++; + stream->forwarded_buffers = forwarded; + /* * We gave a contiguous range of buffer space to StartReadBuffers(), but - * we want it to wrap around at queue_size. Slide overflowing buffers to - * the front of the array. + * we want it to wrap around at queue_size. Copy overflowing buffers to + * the front of the array where they'll be consumed, but also leave a copy + * in the overflow zone which the I/O operation has a pointer to (it needs + * a contiguous array). Both copies will be cleared when the buffers are + * handed to the consumer. */ - overflow = (buffer_index + nblocks) - stream->queue_size; + overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; if (overflow > 0) - memmove(&stream->buffers[0], - &stream->buffers[stream->queue_size], - sizeof(stream->buffers[0]) * overflow); + { + Assert(overflow < stream->queue_size); /* can't overlap */ + memcpy(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + } /* Compute location of start of next read, without using % operator. */ buffer_index += nblocks; @@ -719,10 +774,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -771,6 +828,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; + stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -846,10 +904,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->seq_until_processed = InvalidBlockNumber; } -#ifdef CLOBBER_FREED_MEMORY - /* Clobber old buffer for debugging purposes. */ + /* + * We must zap this queue entry, or else it would appear as a forwarded + * buffer. If it's potentially in the overflow zone (ie it wrapped around + * the queue), also zap that copy. + */ stream->buffers[oldest_buffer_index] = InvalidBuffer; -#endif + if (oldest_buffer_index < io_combine_limit - 1) + stream->buffers[stream->queue_size + oldest_buffer_index] = + InvalidBuffer; #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) @@ -894,6 +957,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && + stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && @@ -929,6 +993,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) void read_stream_reset(ReadStream *stream) { + int16 index; Buffer buffer; /* Stop looking ahead. */ @@ -942,6 +1007,24 @@ read_stream_reset(ReadStream *stream) while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); + /* Unpin any unused forwarded buffers. */ + index = stream->next_buffer_index; + while (index < stream->initialized_buffers && + (buffer = stream->buffers[index]) != InvalidBuffer) + { + Assert(stream->forwarded_buffers > 0); + stream->forwarded_buffers--; + ReleaseBuffer(buffer); + + stream->buffers[index] = InvalidBuffer; + if (index < io_combine_limit - 1) + stream->buffers[stream->queue_size + index] = InvalidBuffer; + + if (++index == stream->queue_size) + index = 0; + } + + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); -- 2.48.1.76.g4e746b1a31.dirty