From 4a1d93b379966e939c1932a5fdd486d89bae4c7a Mon Sep 17 00:00:00 2001 From: Nazir Bilal Yavuz Date: Sat, 29 Mar 2025 20:17:42 +0300 Subject: [PATCH 1/2] Optimize autoprewarm with read streams We've measured 10% performance improvement, and this arranges to benefit automatically from future optimizations to the read_stream subsystem. --- contrib/pg_prewarm/autoprewarm.c | 197 ++++++++++++++++++++++--------- 1 file changed, 141 insertions(+), 56 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 73485a2323c..142e9d5359b 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -41,6 +41,7 @@ #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procsignal.h" +#include "storage/read_stream.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/guc.h" @@ -421,6 +422,93 @@ apw_load_buffers(void) (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks", apw_state->prewarmed_blocks, num_elements))); } +struct apw_read_stream_private +{ + BlockInfoRecord *block_info; + BlockNumber nblocks; + BlockNumber max_pos; + BlockNumber pos; + Oid cur_database; + ForkNumber cur_forknum; + RelFileNumber cur_filenumber; +}; + +static BlockNumber +awp_read_stream_next_block(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + struct apw_read_stream_private *p = callback_private_data; + BlockInfoRecord cur_blk = p->block_info[p->pos]; + BlockNumber blocknum = InvalidBlockNumber; + + if (have_free_buffer() && + p->pos < p->max_pos && + cur_blk.blocknum < p->nblocks && + cur_blk.database == p->cur_database && + cur_blk.forknum == p->cur_forknum && + cur_blk.filenumber == p->cur_filenumber) + { + blocknum = cur_blk.blocknum; + } + + (p->pos)++; + return blocknum; +} + +/* + * Helper function to prewarm buffers in a relation by using read streams. + */ +static unsigned int +autoprewarm_prewarm_relation(Relation rel, + BlockNumber pos, + BlockNumber max_pos, + BlockNumber nblocks_in_fork, + BlockInfoRecord *block_info) +{ + struct apw_read_stream_private p; + ReadStream *stream; + unsigned int blocks_done = 0; + BlockInfoRecord first_block = block_info[pos]; + + p.pos = pos; + p.max_pos = max_pos; + p.block_info = block_info; + p.nblocks = nblocks_in_fork; + p.cur_database = first_block.database; + p.cur_forknum = first_block.forknum; + p.cur_filenumber = first_block.filenumber; + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + first_block.forknum, + awp_read_stream_next_block, + &p, + 0); + + while (true) + { + Buffer buf; + + CHECK_FOR_INTERRUPTS(); + + buf = read_stream_next_buffer(stream, NULL); + if (BufferIsValid(buf)) + { + ReleaseBuffer(buf); + ++blocks_done; + } + else + { + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + break; + } + } + + return blocks_done; +} /* * Prewarm all blocks for one database (and possibly also global objects, if @@ -432,9 +520,10 @@ autoprewarm_database_main(Datum main_arg) int pos; BlockInfoRecord *block_info; Relation rel = NULL; - BlockNumber nblocks = 0; - BlockInfoRecord *old_blk = NULL; dsm_segment *seg; + Oid cur_database; + Oid cur_filenumber = InvalidOid; + BlockNumber nblocks_in_fork = InvalidBlockNumber; /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, die); @@ -451,32 +540,44 @@ autoprewarm_database_main(Datum main_arg) block_info = (BlockInfoRecord *) dsm_segment_address(seg); pos = apw_state->prewarm_start_idx; - /* - * Loop until we run out of blocks to prewarm or until we run out of free - * buffers. - */ - while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) + cur_database = block_info[pos].database; + + /* Loop until we run out of blocks to prewarm. */ + while (pos < apw_state->prewarm_stop_idx) { - BlockInfoRecord *blk = &block_info[pos++]; - Buffer buf; + BlockInfoRecord *blk = &block_info[pos]; - CHECK_FOR_INTERRUPTS(); + /* Loop until we run we run out of free buffers. */ + if (!have_free_buffer()) + break; /* * Quit if we've reached records for another database. If previous * blocks are of some global objects, then continue pre-warming. */ - if (old_blk != NULL && old_blk->database != blk->database && - old_blk->database != 0) - break; + if (cur_database != blk->database) + { + if (cur_database == 0) + cur_database = blk->database; + else + break; + } + + /* Check whether blocknum is valid and within fork file size. */ + while (cur_filenumber == blk->filenumber && + blk->blocknum >= nblocks_in_fork) + { + /* Move to next forknum. */ + pos++; + continue; + } /* * As soon as we encounter a block of a new relation, close the old * relation. Note that rel will be NULL if try_relation_open failed * previously; in that case, there is nothing to close. */ - if (old_blk != NULL && old_blk->filenumber != blk->filenumber && - rel != NULL) + if (rel && cur_filenumber != blk->filenumber) { relation_close(rel, AccessShareLock); rel = NULL; @@ -487,70 +588,54 @@ autoprewarm_database_main(Datum main_arg) * Try to open each new relation, but only once, when we first * encounter it. If it's been dropped, skip the associated blocks. */ - if (old_blk == NULL || old_blk->filenumber != blk->filenumber) + if (!rel && cur_filenumber != blk->filenumber) { Oid reloid; - Assert(rel == NULL); StartTransactionCommand(); reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); if (OidIsValid(reloid)) rel = try_relation_open(reloid, AccessShareLock); - if (!rel) - CommitTransactionCommand(); - } - if (!rel) - { - old_blk = blk; - continue; - } - - /* Once per fork, check for fork existence and size. */ - if (old_blk == NULL || - old_blk->filenumber != blk->filenumber || - old_blk->forknum != blk->forknum) - { /* - * smgrexists is not safe for illegal forknum, hence check whether - * the passed forknum is valid before using it in smgrexists. + * Update cur_filenumber although relation may not be opened. If + * not updated and if we can't open the relation when the file + * number is changed; we will end up unnecessarily trying to open + * relation for all the blocks that have the same file number. */ - if (blk->forknum > InvalidForkNumber && - blk->forknum <= MAX_FORKNUM && - smgrexists(RelationGetSmgr(rel), blk->forknum)) - nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - else - nblocks = 0; - } + cur_filenumber = blk->filenumber; - /* Check whether blocknum is valid and within fork file size. */ - if (blk->blocknum >= nblocks) - { - /* Move to next forknum. */ - old_blk = blk; - continue; + if (!rel) + CommitTransactionCommand(); } - /* Prewarm buffer. */ - buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, - NULL); - if (BufferIsValid(buf)) + if (rel && smgrexists(RelationGetSmgr(rel), blk->forknum)) { - apw_state->prewarmed_blocks++; - ReleaseBuffer(buf); + unsigned int nblocks_processed; + + nblocks_in_fork = RelationGetNumberOfBlocksInFork(rel, blk->forknum); + nblocks_processed = autoprewarm_prewarm_relation(rel, + pos, + apw_state->prewarm_stop_idx, + nblocks_in_fork, + block_info); + + apw_state->prewarmed_blocks += nblocks_processed; + /* Move pos forward by at least one */ + pos += Max(nblocks_processed, 1); + continue; } - old_blk = blk; + pos++; } - dsm_detach(seg); - - /* Release lock on previous relation. */ if (rel) { relation_close(rel, AccessShareLock); CommitTransactionCommand(); } + + dsm_detach(seg); } /* -- 2.43.0