From 7db3df60698da30bfec4b8cca76342dc6e779f81 Mon Sep 17 00:00:00 2001 From: Nazir Bilal Yavuz Date: Sat, 29 Mar 2025 20:17:42 +0300 Subject: [PATCH v8 1/3] Optimize autoprewarm with read streams We've measured 10% performance improvement, and this arranges to benefit automatically from future optimizations to the read_stream subsystem. --- contrib/pg_prewarm/autoprewarm.c | 201 +++++++++++++++++++++++-------- 1 file changed, 148 insertions(+), 53 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 73485a2323c..7154303f0ba 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -41,6 +41,7 @@ #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procsignal.h" +#include "storage/read_stream.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/guc.h" @@ -421,6 +422,93 @@ apw_load_buffers(void) (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks", apw_state->prewarmed_blocks, num_elements))); } +struct apw_read_stream_private +{ + BlockInfoRecord *block_info; + BlockNumber nblocks; + BlockNumber max_pos; + BlockNumber pos; + Oid cur_database; + ForkNumber cur_forknum; + RelFileNumber cur_filenumber; +}; + +static BlockNumber +awp_read_stream_next_block(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + struct apw_read_stream_private *p = callback_private_data; + BlockInfoRecord cur_blk = p->block_info[p->pos]; + BlockNumber blocknum = InvalidBlockNumber; + + if (have_free_buffer() && + p->pos < p->max_pos && + cur_blk.blocknum < p->nblocks && + cur_blk.database == p->cur_database && + cur_blk.filenumber == p->cur_filenumber && + cur_blk.forknum == p->cur_forknum) + { + blocknum = cur_blk.blocknum; + } + + (p->pos)++; + return blocknum; +} + +/* + * Helper function to prewarm buffers in a relation by using read streams. + */ +static unsigned int +autoprewarm_prewarm_relation(Relation rel, + BlockNumber pos, + BlockNumber max_pos, + BlockNumber nblocks_in_fork, + BlockInfoRecord *block_info) +{ + struct apw_read_stream_private p; + ReadStream *stream; + unsigned int blocks_done = 0; + BlockInfoRecord first_block = block_info[pos]; + + p.pos = pos; + p.max_pos = max_pos; + p.block_info = block_info; + p.nblocks = nblocks_in_fork; + p.cur_database = first_block.database; + p.cur_forknum = first_block.forknum; + p.cur_filenumber = first_block.filenumber; + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + first_block.forknum, + awp_read_stream_next_block, + &p, + 0); + + while (true) + { + Buffer buf; + + CHECK_FOR_INTERRUPTS(); + + buf = read_stream_next_buffer(stream, NULL); + if (BufferIsValid(buf)) + { + ReleaseBuffer(buf); + ++blocks_done; + } + else + { + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + break; + } + } + + return blocks_done; +} /* * Prewarm all blocks for one database (and possibly also global objects, if @@ -432,9 +520,9 @@ autoprewarm_database_main(Datum main_arg) int pos; BlockInfoRecord *block_info; Relation rel = NULL; - BlockNumber nblocks = 0; - BlockInfoRecord *old_blk = NULL; dsm_segment *seg; + Oid cur_database; + BlockNumber nblocks_in_fork; /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, die); @@ -451,32 +539,34 @@ autoprewarm_database_main(Datum main_arg) block_info = (BlockInfoRecord *) dsm_segment_address(seg); pos = apw_state->prewarm_start_idx; + cur_database = block_info[pos].database; + /* * Loop until we run out of blocks to prewarm or until we run out of free * buffers. */ while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) { - BlockInfoRecord *blk = &block_info[pos++]; - Buffer buf; - - CHECK_FOR_INTERRUPTS(); + BlockInfoRecord *blk = &block_info[pos]; /* * Quit if we've reached records for another database. If previous * blocks are of some global objects, then continue pre-warming. */ - if (old_blk != NULL && old_blk->database != blk->database && - old_blk->database != 0) - break; + if (cur_database != blk->database) + { + if (cur_database == 0) + cur_database = blk->database; + else + break; + } /* - * As soon as we encounter a block of a new relation, close the old - * relation. Note that rel will be NULL if try_relation_open failed - * previously; in that case, there is nothing to close. + * Close the old relation. Note that rel will be NULL if + * try_relation_open failed previously; in that case, there is nothing + * to close. */ - if (old_blk != NULL && old_blk->filenumber != blk->filenumber && - rel != NULL) + if (rel) { relation_close(rel, AccessShareLock); rel = NULL; @@ -487,60 +577,65 @@ autoprewarm_database_main(Datum main_arg) * Try to open each new relation, but only once, when we first * encounter it. If it's been dropped, skip the associated blocks. */ - if (old_blk == NULL || old_blk->filenumber != blk->filenumber) + if (!rel) { Oid reloid; - Assert(rel == NULL); StartTransactionCommand(); reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); if (OidIsValid(reloid)) rel = try_relation_open(reloid, AccessShareLock); if (!rel) + { CommitTransactionCommand(); - } - if (!rel) - { - old_blk = blk; - continue; - } - /* Once per fork, check for fork existence and size. */ - if (old_blk == NULL || - old_blk->filenumber != blk->filenumber || - old_blk->forknum != blk->forknum) - { - /* - * smgrexists is not safe for illegal forknum, hence check whether - * the passed forknum is valid before using it in smgrexists. - */ - if (blk->forknum > InvalidForkNumber && - blk->forknum <= MAX_FORKNUM && - smgrexists(RelationGetSmgr(rel), blk->forknum)) - nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - else - nblocks = 0; - } + /* Move to next filenumber. */ + while (true) + { + BlockInfoRecord cur_blk = block_info[pos++]; - /* Check whether blocknum is valid and within fork file size. */ - if (blk->blocknum >= nblocks) - { - /* Move to next forknum. */ - old_blk = blk; - continue; - } + if (cur_blk.database != blk->database || + cur_blk.filenumber != blk->filenumber) + break; + } - /* Prewarm buffer. */ - buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, - NULL); - if (BufferIsValid(buf)) - { - apw_state->prewarmed_blocks++; - ReleaseBuffer(buf); + continue; + } + else if (smgrexists(RelationGetSmgr(rel), blk->forknum)) + { + unsigned int nblocks_processed; + + nblocks_in_fork = RelationGetNumberOfBlocksInFork(rel, blk->forknum); + nblocks_processed = autoprewarm_prewarm_relation(rel, + pos, + stop_idx, + nblocks_in_fork, + block_info); + + apw_state->prewarmed_blocks += nblocks_processed; + + /* Move pos forward by at least one. */ + pos += Max(nblocks_processed, 1); + + /* Move to next forknum. */ + while (true) + { + BlockInfoRecord cur_blk = block_info[pos]; + + if (cur_blk.database == blk->database && + cur_blk.filenumber == blk->filenumber && + cur_blk.forknum == blk->forknum) + pos++; + else + break; + } + + continue; + } } - old_blk = blk; + pos++; } dsm_detach(seg); -- 2.43.0