From fffe68285db092c14a15cd6f1c0bd4e932c7527b Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 30 Jan 2025 11:42:03 +1300 Subject: [PATCH v2.5 13/30] Support buffer forwarding in read_stream.c. In preparation for a following change to the buffer manager, teach read stream to keep track of buffers that were "forwarded" from one call to StartReadBuffers() to the next. Since StartReadBuffers() buffers argument will become an in/out argument, we need to initialize the buffer queue entries with InvalidBuffer. We don't want to do that up front, because we try to keep stream initialization cheap and code that uses the fast path stays in one single buffer queue element. Satisfy both goals by initializing the queue incrementally on the first cycle. --- src/backend/storage/aio/read_stream.c | 108 ++++++++++++++++++++++---- 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 6a39bd1d92d..e58e0edf221 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -112,8 +112,10 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; + int16 forwarded_buffers; int16 pinned_buffers; int16 distance; + int16 initialized_buffers; bool advice_enabled; bool temporary; @@ -280,7 +282,9 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { bool need_wait; + int requested_nblocks; int nblocks; + int forwarded; int flags; int16 io_index; int16 overflow; @@ -312,13 +316,34 @@ read_stream_start_pending_read(ReadStream *stream, flags = 0; /* - * We say how many blocks we want to read, but may be smaller on return. - * On memory-constrained systems we may be also have to ask for a smaller - * read ourselves. + * On buffer-constrained systems we may need to limit the I/O size by the + * available pin count. */ + requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks); + nblocks = requested_nblocks; buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = Min(buffer_limit, stream->pending_read_nblocks); + + /* + * The first time around the queue we initialize it as we go, including + * the overflow zone, because otherwise the entries would appear as + * forwarded buffers. This avoids initializing the whole queue up front + * in cases where it is large but we don't ever use it due to the + * all-cached fast path or small scans. + */ + while (stream->initialized_buffers < buffer_index + nblocks) + stream->buffers[stream->initialized_buffers++] = InvalidBuffer; + + /* + * Start the I/O. Any buffers that are not InvalidBuffer will be + * interpreted as already pinned, forwarded by an earlier call to + * StartReadBuffers(), and must map to the expected blocks. The nblocks + * value may be smaller on return indicating the size of the I/O that + * could be started. Buffers beyond the output nblocks number may also + * have been pinned without starting I/O due to various edge cases. In + * that case we'll just leave them in the queue ahead of us, "forwarded" + * to the next call, avoiding the need to unpin/repin. + */ need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -347,16 +372,35 @@ read_stream_start_pending_read(ReadStream *stream, stream->seq_blocknum = stream->pending_read_blocknum + nblocks; } + /* + * How many pins were acquired but forwarded to the next call? These need + * to be passed to the next StartReadBuffers() call, or released if the + * stream ends early. We need the number for accounting purposes, since + * they are not counted in stream->pinned_buffers but we already hold + * them. + */ + forwarded = 0; + while (nblocks + forwarded < requested_nblocks && + stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) + forwarded++; + stream->forwarded_buffers = forwarded; + /* * We gave a contiguous range of buffer space to StartReadBuffers(), but - * we want it to wrap around at queue_size. Slide overflowing buffers to - * the front of the array. + * we want it to wrap around at queue_size. Copy overflowing buffers to + * the front of the array where they'll be consumed, but also leave a copy + * in the overflow zone which the I/O operation has a pointer to (it needs + * a contiguous array). Both copies will be cleared when the buffers are + * handed to the consumer. */ - overflow = (buffer_index + nblocks) - stream->queue_size; + overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; if (overflow > 0) - memmove(&stream->buffers[0], - &stream->buffers[stream->queue_size], - sizeof(stream->buffers[0]) * overflow); + { + Assert(overflow < stream->queue_size); /* can't overlap */ + memcpy(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + } /* Move to the location of start of next read. */ read_stream_index_advance_n(stream, &buffer_index, nblocks); @@ -381,6 +425,15 @@ read_stream_get_buffer_limit(ReadStream *stream) else buffers = GetAdditionalPinLimit(); + /* + * If we already have some forwarded buffers, we can certainly use those. + * They are already pinned, and are mapped to the starting blocks of the + * pending read, they just don't have any I/O started yet and are not + * counted in stream->pinned_buffers. + */ + Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + buffers += stream->forwarded_buffers; + /* * Each stream is always allowed to try to acquire one pin if it doesn't * hold one already. This is needed to guarantee progress, and just like @@ -389,7 +442,7 @@ read_stream_get_buffer_limit(ReadStream *stream) * pin, ie the buffer pool is simply too small for the workload. */ if (buffers == 0 && stream->pinned_buffers == 0) - return 1; + buffers = 1; /* * Otherwise, see how many additional pins the backend can currently pin, @@ -755,10 +808,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -807,6 +862,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; + stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -891,10 +947,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) } } -#ifdef CLOBBER_FREED_MEMORY - /* Clobber old buffer for debugging purposes. */ + /* + * We must zap this queue entry, or else it would appear as a forwarded + * buffer. If it's potentially in the overflow zone (ie it wrapped around + * the queue), also zap that copy. + */ stream->buffers[oldest_buffer_index] = InvalidBuffer; -#endif + if (oldest_buffer_index < io_combine_limit - 1) + stream->buffers[stream->queue_size + oldest_buffer_index] = + InvalidBuffer; #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) @@ -937,6 +998,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && + stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && @@ -972,6 +1034,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) void read_stream_reset(ReadStream *stream) { + int16 index; Buffer buffer; /* Stop looking ahead. */ @@ -985,6 +1048,23 @@ read_stream_reset(ReadStream *stream) while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); + /* Unpin any unused forwarded buffers. */ + index = stream->next_buffer_index; + while (index < stream->initialized_buffers && + (buffer = stream->buffers[index]) != InvalidBuffer) + { + Assert(stream->forwarded_buffers > 0); + stream->forwarded_buffers--; + ReleaseBuffer(buffer); + + stream->buffers[index] = InvalidBuffer; + if (index < io_combine_limit - 1) + stream->buffers[stream->queue_size + index] = InvalidBuffer; + + read_stream_index_advance(stream, &index); + } + + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); -- 2.48.1.76.g4e746b1a31.dirty