From 7b474f3b4fad7fcf092bae20e67cb020846be30f Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 29 Jan 2024 11:50:01 -0500 Subject: [PATCH v8 2/3] Replace blocks with buffers in heapgettup control flow Future commits will introduce the sequential scan streaming read user which will implement a callback returning the next block to read. Sequential scans previously looped through the blocks in the relation, synchronously reading in a block and then processing it. An InvalidBlockNumber returned by heapgettup_advance_block() meant that the relation was exhausted and all blocks had been processed. The streaming read API may exhaust the blocks in a relation (having read all of them into buffers) before they have all been processed by the sequential scan. As such, the sequential scan should continue processing blocks until heapfetchbuf() returns InvalidBuffer. Note that this commit does not implement the streaming read API user. It simply restructures heapgettup() and heapgettup_pagemode() to use buffers instead of blocks for control flow. --- src/backend/access/heap/heapam.c | 79 ++++++++++++++------------------ 1 file changed, 35 insertions(+), 44 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index bf2b9b19e72..6b26f5bf8af 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -83,6 +83,9 @@ static Bitmapset *HeapDetermineColumnsInfo(Relation relation, static bool heap_acquire_tuplock(Relation relation, ItemPointer tid, LockTupleMode mode, LockWaitPolicy wait_policy, bool *have_tuple_lock); +static inline BlockNumber heapgettup_advance_block(HeapScanDesc scan, + BlockNumber block, ScanDirection dir); +static inline BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -456,14 +459,12 @@ heapbuildvis(TableScanDesc sscan) /* * heapfetchbuf - subroutine for heapgettup() * - * This routine reads the specified block of the relation into a buffer and - * returns with that pinned buffer saved in the scan descriptor. + * This routine reads the next block of the relation into a buffer and returns + * with that pinned buffer saved in the scan descriptor. */ static inline void -heapfetchbuf(HeapScanDesc scan, BlockNumber block) +heapfetchbuf(HeapScanDesc scan, ScanDirection dir) { - Assert(block < scan->rs_nblocks); - /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -478,10 +479,19 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block) */ CHECK_FOR_INTERRUPTS(); - /* read page using selected strategy */ - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block, - RBM_NORMAL, scan->rs_strategy); - scan->rs_cblock = block; + if (unlikely(!scan->rs_inited)) + { + scan->rs_cblock = heapgettup_initial_block(scan, dir); + Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); + scan->rs_inited = true; + } + else + scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); + + /* read block if valid */ + if (BlockNumberIsValid(scan->rs_cblock)) + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, + scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); } /* @@ -491,7 +501,7 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block) * occur with empty tables and in parallel scans when parallel workers get all * of the pages before we can get a chance to get our first page. */ -static BlockNumber +BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) { Assert(!scan->rs_inited); @@ -631,7 +641,7 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft, * This also adjusts rs_numblocks when a limit has been imposed by * heap_setscanlimits(). */ -static inline BlockNumber +BlockNumber heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir) { if (ScanDirectionIsForward(dir)) @@ -729,23 +739,13 @@ heapgettup(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - BlockNumber block; Page page; OffsetNumber lineoff; int linesleft; - if (unlikely(!scan->rs_inited)) - { - block = heapgettup_initial_block(scan, dir); - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; - } - else + if (likely(scan->rs_inited)) { /* continue from previously returned page/tuple */ - block = scan->rs_cblock; - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff); goto continue_page; @@ -755,9 +755,12 @@ heapgettup(HeapScanDesc scan, * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - while (block != InvalidBlockNumber) + while (true) { - heapfetchbuf(scan, block); + heapfetchbuf(scan, dir); + if (!BufferIsValid(scan->rs_cbuf)) + break; + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_start_page(scan, dir, &linesleft, &lineoff); continue_page: @@ -779,7 +782,7 @@ continue_page: tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), block, lineoff); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); visible = HeapTupleSatisfiesVisibility(tuple, scan->rs_base.rs_snapshot, @@ -809,9 +812,6 @@ continue_page: * it's time to move to the next. */ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); - - /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); } /* end of scan */ @@ -844,22 +844,13 @@ heapgettup_pagemode(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - BlockNumber block; Page page; int lineindex; int linesleft; - if (unlikely(!scan->rs_inited)) - { - block = heapgettup_initial_block(scan, dir); - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; - } - else + if (likely(scan->rs_inited)) { /* continue from previously returned page/tuple */ - block = scan->rs_cblock; /* current page */ page = BufferGetPage(scan->rs_cbuf); lineindex = scan->rs_cindex + dir; @@ -876,9 +867,12 @@ heapgettup_pagemode(HeapScanDesc scan, * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - while (block != InvalidBlockNumber) + while (true) { - heapfetchbuf(scan, block); + heapfetchbuf(scan, dir); + if (!BufferIsValid(scan->rs_cbuf)) + break; + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); heapbuildvis((TableScanDesc) scan); page = BufferGetPage(scan->rs_cbuf); linesleft = scan->rs_ntuples; @@ -898,7 +892,7 @@ continue_page: tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), block, lineoff); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); /* skip any tuples that don't match the scan key */ if (key != NULL && @@ -909,9 +903,6 @@ continue_page: scan->rs_cindex = lineindex; return; } - - /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); } /* end of scan */ -- 2.40.1