From 3221de7b11f0b1df281b5c9076a639f0731d83bf Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 1 Apr 2025 18:07:38 -0400 Subject: [PATCH v10 3/3] Use streaming read I/O in autoprewarm Make a read stream for each valid fork of each valid relation represented in the autoprewarm dump file and prewarm those blocks through the read stream API instead of by directly invoking ReadBuffer(). Co-authored-by: Nazir Bilal Yavuz Co-authored-by: Melanie Plageman Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com --- contrib/pg_prewarm/autoprewarm.c | 125 +++++++++++++++++++++++++------ src/tools/pgindent/typedefs.list | 1 + 2 files changed, 104 insertions(+), 22 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 0722d7bf457..5d21314120f 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -41,6 +41,7 @@ #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procsignal.h" +#include "storage/read_stream.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/guc.h" @@ -75,6 +76,28 @@ typedef struct AutoPrewarmSharedState int prewarmed_blocks; } AutoPrewarmSharedState; +/* + * Private data passed through the read stream API for our use in the + * callback. + */ +typedef struct AutoPrewarmReadStreamData +{ + /* The array of records containing the blocks we should prewarm. */ + BlockInfoRecord *block_info; + + /* + * `pos` is the read stream callback's index into block_info. Because the + * read stream may read ahead, pos is likely to be ahead of the index in + * the main loop in autoprewarm_database_main(). + */ + int pos; + Oid database; + RelFileNumber filenumber; + ForkNumber forknum; + BlockNumber nblocks; +} AutoPrewarmReadStreamData; + + PGDLLEXPORT void autoprewarm_main(Datum main_arg); PGDLLEXPORT void autoprewarm_database_main(Datum main_arg); @@ -419,6 +442,55 @@ apw_load_buffers(void) apw_state->prewarmed_blocks, num_elements))); } +/* + * Return the next block number of a specific relation and fork to read + * according to the array of BlockInfoRecord. + */ +static BlockNumber +apw_read_stream_next_block(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + AutoPrewarmReadStreamData *p = callback_private_data; + + while (p->pos < apw_state->prewarm_stop_idx) + { + BlockInfoRecord blk = p->block_info[p->pos]; + + CHECK_FOR_INTERRUPTS(); + + if (!have_free_buffer()) + p->pos = apw_state->prewarm_stop_idx; + + if (p->pos >= apw_state->prewarm_stop_idx) + return InvalidBlockNumber; + + if (blk.database != p->database) + return InvalidBlockNumber; + + if (blk.filenumber != p->filenumber) + return InvalidBlockNumber; + + if (blk.forknum != p->forknum) + return InvalidBlockNumber; + + p->pos++; + + /* + * Check whether blocknum is valid and within fork file size. + * Fast-forward through any invalid blocks. We want `p->pos` to + * reflect the location of the next relation or fork before ending the + * stream. + */ + if (blk.blocknum >= p->nblocks) + continue; + + return blk.blocknum; + } + + return InvalidBlockNumber; +} + /* * Prewarm all blocks for one database or global objects (while connected to a * valid database). @@ -467,8 +539,6 @@ autoprewarm_database_main(Datum main_arg) Oid reloid; Relation rel; - CHECK_FOR_INTERRUPTS(); - StartTransactionCommand(); reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); @@ -509,6 +579,8 @@ autoprewarm_database_main(Datum main_arg) { ForkNumber forknum = blk->forknum; BlockNumber nblocks; + struct AutoPrewarmReadStreamData p; + ReadStream *stream; Buffer buf; /* @@ -539,32 +611,41 @@ autoprewarm_database_main(Datum main_arg) nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - /* Check whether blocknum is valid and within fork file size. */ - if (blk->blocknum >= nblocks) + p = (struct AutoPrewarmReadStreamData) { - blk = &block_info[++i]; - continue; - } + .block_info = block_info, + .pos = i, + .database = database, + .filenumber = filenumber, + .forknum = forknum, + .nblocks = nblocks, + }; + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + p.forknum, + apw_read_stream_next_block, + &p, + 0); - /* Prewarm buffers. */ - while (i < apw_state->prewarm_stop_idx && - blk->database == database && - blk->filenumber == filenumber && - blk->forknum == forknum && - have_free_buffer()) + /* + * Loop until we've prewarmed all the blocks from this fork. The + * read stream callback will check that we still have free buffers + * before requesting each block from the read stream API. + */ + while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) { - CHECK_FOR_INTERRUPTS(); - - buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, - NULL); - - blk = &block_info[++i]; - if (!BufferIsValid(buf)) - break; - apw_state->prewarmed_blocks++; ReleaseBuffer(buf); } + + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + + /* Advance i past all the blocks just prewarmed. */ + i = p.pos; + blk = &block_info[i]; } relation_close(rel, AccessShareLock); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8f28d8ff28e..5ac290fae78 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -175,6 +175,7 @@ AttributeOpts AuthRequest AuthToken AutoPrewarmSharedState +AutoPrewarmReadStreamData AutoVacOpts AutoVacuumShmemStruct AutoVacuumWorkItem -- 2.34.1