From 89ac3d5aae4f36c3c536dcd6b198e23b9f9e2024 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 28 Jul 2023 22:35:32 +1200 Subject: [PATCH v1 06/14] Provide basic streaming read API. "Streaming reads" can be used as a more efficient replacement for ReadBuffer() calls. The client code supplies a callback that can say which block to read next, and then consumes individual buffers one at a time. This division allows the PgStreamingRead object to build larger calls to CompleteReadBuffers(), which in turn produce large preadv() calls instead of traditional single-block pread() calls. Unless the read is purely sequential, posix_fadvise() calls are also issued. This API is intended as a stepping stone, allowing for true asynchronous implementation in later work. Code that adapts to the streaming read API would automatically benefit. Author: Thomas Munro --- src/backend/storage/Makefile | 2 +- src/backend/storage/aio/Makefile | 14 + src/backend/storage/aio/meson.build | 5 + src/backend/storage/aio/streaming_read.c | 436 +++++++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 2 +- src/backend/storage/meson.build | 1 + src/include/storage/bufmgr.h | 1 + src/include/storage/streaming_read.h | 38 ++ src/tools/pgindent/typedefs.list | 2 + 9 files changed, 499 insertions(+), 2 deletions(-) create mode 100644 src/backend/storage/aio/Makefile create mode 100644 src/backend/storage/aio/meson.build create mode 100644 src/backend/storage/aio/streaming_read.c create mode 100644 src/include/storage/streaming_read.h diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca2..eec03f6f2b 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 0000000000..bcab44c802 --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + streaming_read.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 0000000000..156e87cab7 --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2023, PostgreSQL Global Development Group + +backend_sources += files( + 'streaming_read.c', +) diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c new file mode 100644 index 0000000000..ee5a7e961d --- /dev/null +++ b/src/backend/storage/aio/streaming_read.c @@ -0,0 +1,436 @@ +#include "postgres.h" + +#include "storage/streaming_read.h" +#include "utils/rel.h" + +/* + * Element type for PgStreamingRead's circular array of clusters of buffers. + * + * For hits and RBM_WILL_ZERO, need_to_complete is false, we have just one + * buffer in each cluster, already pinned and ready for use. + * + * For misses that require a physical read, need_to_complete is true, and + * buffers[] holds a group of of neighboring blocks, so we can complete them + * with a single call to CompleteReadBuffers(). We can also issue a single + * prefetch for it as soon as it has grown to its largest possible size, if + * our random access heuristics determine that is a good idea. + */ +typedef struct PgStreamingReadCluster +{ + bool advice_issued; + bool need_complete; + + BufferManagerRelation bmr; + ForkNumber forknum; + BlockNumber blocknum; + int nblocks; + + int per_io_data_index[MAX_BUFFERS_PER_TRANSFER]; + bool need_advice[MAX_BUFFERS_PER_TRANSFER]; + Buffer buffers[MAX_BUFFERS_PER_TRANSFER]; +} PgStreamingReadCluster; + +struct PgStreamingRead +{ + int max_ios; + int ios_in_progress; + int ios_in_progress_trigger; + int max_pinned_buffers; + int pinned_buffers; + int pinned_buffers_trigger; + int next_tail_buffer; + bool finished; + uintptr_t pgsr_private; + PgStreamingReadBufferDetermineNextCB next_cb; + BufferAccessStrategy strategy; + + /* Next expected prefetch, for sequential prefetch avoidance. */ + BufferManagerRelation seq_bmr; + ForkNumber seq_forknum; + BlockNumber seq_blocknum; + + /* Space for optional per-I/O private data. */ + size_t per_io_data_size; + void *per_io_data; + int per_io_data_next; + + /* Circular buffer of clusters. */ + int size; + int head; + int tail; + PgStreamingReadCluster clusters[FLEXIBLE_ARRAY_MEMBER]; +}; + +PgStreamingRead * +pg_streaming_read_buffer_alloc(int max_ios, + size_t per_io_data_size, + uintptr_t pgsr_private, + BufferAccessStrategy strategy, + PgStreamingReadBufferDetermineNextCB determine_next_cb) +{ + PgStreamingRead *pgsr; + int size; + int max_pinned_buffers; + + Assert(max_ios > 0); + + /* + * We allow twice as many buffers to be pinned as I/Os. This allows us to + * look further ahead for blocks that need to be read in. + */ + max_pinned_buffers = max_ios * 2; + + /* Don't allow this backend to pin too many buffers. */ + LimitAdditionalPins((uint32 *) &max_pinned_buffers); + max_pinned_buffers = Max(2, max_pinned_buffers); + max_ios = max_pinned_buffers / 2; + Assert(max_ios > 0); + Assert(max_pinned_buffers > 0); + Assert(max_pinned_buffers > max_ios); + + /* + * pgsr->clusters is a circular buffer. When it is empty, head == tail. + * When it is full, there is an empty element between head and tail. Head + * can also be empty (nblocks == 0). So we need two extra elements. + */ + size = max_pinned_buffers + 2; + + pgsr = (PgStreamingRead *) + palloc0(offsetof(PgStreamingRead, clusters) + + sizeof(pgsr->clusters[0]) * size); + + pgsr->per_io_data_size = per_io_data_size; + pgsr->max_ios = max_ios; + pgsr->max_pinned_buffers = max_pinned_buffers; + pgsr->pgsr_private = pgsr_private; + pgsr->strategy = strategy; + pgsr->next_cb = determine_next_cb; + pgsr->size = size; + + /* + * We look ahead when the number of pinned buffers falls below this + * number. This encourages the formation of large vectored reads. + */ + pgsr->pinned_buffers_trigger = + Max(max_ios, max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER); + + /* Space the callback to store extra data along with each block. */ + if (per_io_data_size) + pgsr->per_io_data = palloc(per_io_data_size * max_pinned_buffers); + + return pgsr; +} + +/* + * Issue WILLNEED advice for the head cluster, and allocate a new head + * cluster. + * + * We don't have true asynchronous I/O to actually submit, but this is + * equivalent because it might start I/O on systems that understand WILLNEED + * advice. We count it as an I/O in progress. + */ +static PgStreamingReadCluster * +pg_streaming_read_submit(PgStreamingRead *pgsr) +{ + PgStreamingReadCluster *head_cluster; + + head_cluster = &pgsr->clusters[pgsr->head]; + Assert(head_cluster->nblocks > 0); + +#ifdef USE_PREFETCH + + /* + * Don't bother with advice if there will be no call to + * CompleteReadBuffers() or direct I/O is enabled. + */ + if (head_cluster->need_complete && + (io_direct_flags & IO_DIRECT_DATA) == 0) + { + /* + * Purely sequential advice is known to hurt performance on some + * systems, so only issue it if this looks random. + */ + if (head_cluster->bmr.smgr != pgsr->seq_bmr.smgr || + head_cluster->bmr.rel != pgsr->seq_bmr.rel || + head_cluster->forknum != pgsr->seq_forknum || + head_cluster->blocknum != pgsr->seq_blocknum) + { + SMgrRelation smgr = + head_cluster->bmr.smgr ? head_cluster->bmr.smgr + : RelationGetSmgr(head_cluster->bmr.rel); + + Assert(!head_cluster->advice_issued); + + for (int i = 0; i < head_cluster->nblocks; i++) + { + if (head_cluster->need_advice[i]) + { + BlockNumber first_blocknum = head_cluster->blocknum + i; + int nblocks = 1; + + /* + * How many adjacent blocks can we merge with to reduce + * system calls? Usually this is all of them, unless + * there are overlapping reads and our timing is unlucky. + */ + while ((i + 1) < head_cluster->nblocks && + head_cluster->need_advice[i + 1]) + { + nblocks++; + i++; + } + + smgrprefetch(smgr, + head_cluster->forknum, + first_blocknum, + nblocks); + } + + } + + /* + * Count this as an I/O that is concurrently in progress. We + * might have called smgrprefetch() more than once, if some of the + * buffers in the range were already in buffer pool but not valid + * yet, because of a concurrent read, but for now we choose to + * track this as one I/O. + */ + head_cluster->advice_issued = true; + pgsr->ios_in_progress++; + } + + /* Remember the point after this, for the above heuristics. */ + pgsr->seq_bmr = head_cluster->bmr; + pgsr->seq_forknum = head_cluster->forknum; + pgsr->seq_blocknum = head_cluster->blocknum + head_cluster->nblocks; + } +#endif + + /* Create a new head cluster. There must be space. */ + Assert(pgsr->size > pgsr->max_pinned_buffers); + Assert((pgsr->head + 1) % pgsr->size != pgsr->tail); + if (++pgsr->head == pgsr->size) + pgsr->head = 0; + head_cluster = &pgsr->clusters[pgsr->head]; + head_cluster->nblocks = 0; + + return head_cluster; +} + +void +pg_streaming_read_prefetch(PgStreamingRead *pgsr) +{ + /* If we're finished or can't start one more I/O, then no prefetching. */ + if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios) + return; + + /* + * We'll also wait until the number of pinned buffers falls below our + * trigger level, so that we have the chance to create a large cluster. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + return; + + do + { + BufferManagerRelation bmr; + ForkNumber forknum; + BlockNumber blocknum; + ReadBufferMode mode; + Buffer buffer; + bool found; + bool allocated; + bool need_complete; + PgStreamingReadCluster *head_cluster; + void *per_io_data; + + /* Do we have a full-sized cluster? */ + head_cluster = &pgsr->clusters[pgsr->head]; + if (head_cluster->nblocks == lengthof(head_cluster->buffers)) + { + Assert(head_cluster->need_complete); + head_cluster = pg_streaming_read_submit(pgsr); + + /* + * Give up now if I/O is saturated or we couldn't form another + * full cluster after this. + */ + if (pgsr->ios_in_progress == pgsr->max_ios || + pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + break; + } + + per_io_data = (char *) pgsr->per_io_data + + pgsr->per_io_data_size * pgsr->per_io_data_next; + + /* + * Try to find out which block the callback wants to read next. False + * indicates end-of-stream (but the client can restart). + */ + if (!pgsr->next_cb(pgsr, pgsr->pgsr_private, per_io_data, + &bmr, &forknum, &blocknum, &mode)) + { + pgsr->finished = true; + break; + } + + Assert(mode == RBM_NORMAL || mode == RBM_WILL_ZERO); + Assert(pgsr->pinned_buffers < pgsr->max_pinned_buffers); + + buffer = PrepareReadBuffer(bmr, + forknum, + blocknum, + pgsr->strategy, + &found, + &allocated); + pgsr->pinned_buffers++; + + need_complete = !found && mode != RBM_WILL_ZERO; + + /* Is there a head cluster that we can't extend? */ + head_cluster = &pgsr->clusters[pgsr->head]; + if (head_cluster->nblocks > 0 && + (!need_complete || + !head_cluster->need_complete || + head_cluster->bmr.smgr != bmr.smgr || + head_cluster->bmr.rel != bmr.rel || + head_cluster->forknum != forknum || + head_cluster->blocknum + head_cluster->nblocks != blocknum)) + { + /* Submit it so we can start a new one. */ + head_cluster = pg_streaming_read_submit(pgsr); + Assert(head_cluster->nblocks == 0); + } + + if (head_cluster->nblocks == 0) + { + /* Initialize the cluster. */ + head_cluster->bmr = bmr; + head_cluster->forknum = forknum; + head_cluster->blocknum = blocknum; + head_cluster->advice_issued = false; + head_cluster->need_complete = need_complete; + } + else + { + /* We'll extend an existing cluster by one buffer. */ + Assert(head_cluster->bmr.smgr == bmr.smgr); + Assert(head_cluster->bmr.rel == bmr.rel); + Assert(head_cluster->forknum == forknum); + Assert(head_cluster->blocknum + head_cluster->nblocks == blocknum); + Assert(head_cluster->need_complete); + } + + head_cluster->per_io_data_index[head_cluster->nblocks] = pgsr->per_io_data_next++; + head_cluster->need_advice[head_cluster->nblocks] = allocated; + head_cluster->buffers[head_cluster->nblocks] = buffer; + head_cluster->nblocks++; + + if (pgsr->per_io_data_next == pgsr->max_pinned_buffers) + pgsr->per_io_data_next = 0; + + } while (pgsr->ios_in_progress < pgsr->max_ios && + pgsr->pinned_buffers < pgsr->max_pinned_buffers); + + /* + * Initiate as soon as we can if we can't prepare any more reads right + * now. This makes sure we issue the advice as soon as possible, since + * any other backend that tries to read the same block won't do that. + */ + if (pgsr->clusters[pgsr->head].nblocks > 0) + pg_streaming_read_submit(pgsr); +} + +void +pg_streaming_read_reset(PgStreamingRead *pgsr) +{ + pgsr->finished = false; +} + +Buffer +pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_io_data) +{ + pg_streaming_read_prefetch(pgsr); + + /* See if we have one buffer to return. */ + while (pgsr->tail != pgsr->head) + { + PgStreamingReadCluster *tail_cluster; + + tail_cluster = &pgsr->clusters[pgsr->tail]; + + /* + * Do we need to perform an I/O before returning the buffers from this + * cluster? + */ + if (tail_cluster->need_complete) + { + CompleteReadBuffers(tail_cluster->bmr, + tail_cluster->buffers, + tail_cluster->forknum, + tail_cluster->blocknum, + tail_cluster->nblocks, + false, + pgsr->strategy); + tail_cluster->need_complete = false; + + /* We only counted this I/O as running if we issued advice. */ + if (tail_cluster->advice_issued) + pgsr->ios_in_progress--; + } + + /* Are there more buffers available in this cluster? */ + if (pgsr->next_tail_buffer < tail_cluster->nblocks) + { + /* We are giving away ownership of this pinned buffer. */ + Assert(pgsr->pinned_buffers > 0); + pgsr->pinned_buffers--; + + if (per_io_data) + *per_io_data = (char *) pgsr->per_io_data + + tail_cluster->per_io_data_index[pgsr->next_tail_buffer] * + pgsr->per_io_data_size; + + return tail_cluster->buffers[pgsr->next_tail_buffer++]; + } + + /* Advance tail to next cluster, if there is one. */ + if (++pgsr->tail == pgsr->size) + pgsr->tail = 0; + pgsr->next_tail_buffer = 0; + } + + return InvalidBuffer; +} + +void +pg_streaming_read_free(PgStreamingRead *pgsr) +{ + Buffer buffer; + + /* Stop reading ahead, and unpin anything that wasn't consumed. */ + pgsr->finished = true; + for (;;) + { + buffer = pg_streaming_read_buffer_get_next(pgsr, NULL); + if (buffer == InvalidBuffer) + break; + ReleaseBuffer(buffer); + } + + if (pgsr->per_io_data) + pfree(pgsr->per_io_data); + pfree(pgsr); +} + +int +pg_streaming_read_ios(PgStreamingRead *pgsr) +{ + return pgsr->ios_in_progress; +} + +int +pg_streaming_read_pins(PgStreamingRead *pgsr) +{ + return pgsr->pinned_buffers; +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index d647708f7f..678e3239a9 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1878,7 +1878,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 6ea9faa439..b196d9f885 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2023, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 5a2f66ed47..435239955e 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -271,6 +271,7 @@ extern void AbortBufferIO(Buffer buffer); extern bool BgBufferSync(struct WritebackContext *wb_context); extern void TestForOldSnapshot_impl(Snapshot snapshot, Relation relation); +extern void LimitAdditionalPins(uint32 *additional_pins); /* in buf_init.c */ extern void InitBufferPool(void); diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h new file mode 100644 index 0000000000..7717cdf056 --- /dev/null +++ b/src/include/storage/streaming_read.h @@ -0,0 +1,38 @@ +#ifndef STREAMING_READ_H +#define STREAMING_READ_H + +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "storage/smgr.h" + +/* + * For most sequential access, callers can user this size to build full sized + * reads without pinning many extra buffers. + */ +#define PG_STREAMING_READ_DEFAULT_MAX_IOS MAX_BUFFERS_PER_TRANSFER + +struct PgStreamingRead; +typedef struct PgStreamingRead PgStreamingRead; + +typedef bool (*PgStreamingReadBufferDetermineNextCB) (PgStreamingRead *pgsr, + uintptr_t pgsr_private, + void *per_io_private, + BufferManagerRelation *bmr, + ForkNumber *forkNum, + BlockNumber *blockNum, + ReadBufferMode *mode); + +extern PgStreamingRead *pg_streaming_read_buffer_alloc(int max_ios, + size_t per_io_private_size, + uintptr_t pgsr_private, + BufferAccessStrategy strategy, + PgStreamingReadBufferDetermineNextCB determine_next_cb); +extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); +extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_io_private); +extern void pg_streaming_read_reset(PgStreamingRead *pgsr); +extern void pg_streaming_read_free(PgStreamingRead *pgsr); + +extern int pg_streaming_read_ios(PgStreamingRead *pgsr); +extern int pg_streaming_read_pins(PgStreamingRead *pgsr); + +#endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 49a33c0387..a0752fa30e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2076,6 +2076,8 @@ PgStat_TableCounts PgStat_TableStatus PgStat_TableXactStatus PgStat_WalStats +PgStreamingRead +PgStreamingReadCluster PgXmlErrorContext PgXmlStrictness Pg_finfo_record -- 2.39.2