From cfccafec650a77c53b1d78180b52db31742181ff Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 27 Mar 2024 20:25:06 -0400 Subject: [PATCH v8 3/3] Sequential scans and TID range scans stream reads Implementing streaming read support for heap sequential scans and TID range scans includes three parts: Allocate the read stream object in heap_beginscan(). On rescan, reset the stream by releasing all pinned buffers and resetting the prefetch block. Implement a callback returning the next block to prefetch to the read stream infrastructure. Invoke the read stream API when a new page is needed. When the scan direction changes, reset the stream. --- src/backend/access/heap/heapam.c | 94 ++++++++++++++++++++++++++++---- src/include/access/heapam.h | 15 +++++ 2 files changed, 97 insertions(+), 12 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 6b26f5bf8af..3546f637c13 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -221,6 +221,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_scan_stream_read_next(ReadStream *pgsr, void *private_data, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) private_data; + + if (unlikely(!scan->rs_inited)) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + scan->rs_dir); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -323,6 +342,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + /* + * Initialize to ForwardScanDirection because it is most common and heap + * scans usually must go forwards before going backward. + */ + scan->rs_dir = ForwardScanDirection; + scan->rs_prefetch_block = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -459,12 +485,14 @@ heapbuildvis(TableScanDesc sscan) /* * heapfetchbuf - subroutine for heapgettup() * - * This routine reads the next block of the relation into a buffer and returns - * with that pinned buffer saved in the scan descriptor. + * This routine gets gets the next block of the relation from the read stream + * and saves that pinned buffer in the scan descriptor. */ static inline void heapfetchbuf(HeapScanDesc scan, ScanDirection dir) { + Assert(scan->rs_read_stream); + /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -479,19 +507,23 @@ heapfetchbuf(HeapScanDesc scan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (unlikely(!scan->rs_inited)) + /* + * If the scan direction is changing, reset the prefetch block to the + * current block. Otherwise, we will incorrectly prefetch the blocks + * between the prefetch block and the current block again before + * prefetching blocks in the new, correct scan direction. + */ + if (unlikely(scan->rs_dir != dir)) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); - Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; + scan->rs_prefetch_block = scan->rs_cblock; + read_stream_reset(scan->rs_read_stream); } - else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); + scan->rs_dir = dir; + + scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } /* @@ -820,6 +852,7 @@ continue_page: scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -910,6 +943,7 @@ continue_page: ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -1003,6 +1037,28 @@ heap_beginscan(Relation relation, Snapshot snapshot, initscan(scan, key, false); + scan->rs_read_stream = NULL; + + /* + * For sequential scans and TID range scans, we will set up a read stream. + * We do not know the scan direction yet. If the scan does not end up + * being a forward scan, the read stream will be freed. This should be + * done after initscan() because initscan() allocates the + * BufferAccessStrategy object. + */ + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || + scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) + { + scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, + scan->rs_strategy, + scan->rs_base.rs_rd, + MAIN_FORKNUM, + heap_scan_stream_read_next, + scan, + 0); + } + + return (TableScanDesc) scan; } @@ -1037,6 +1093,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * The read stream is reset on rescan. This must be done before + * initscan(), as some state referred to by read_stream_reset() is reset + * in initscan(). + */ + if (scan->rs_read_stream) + read_stream_reset(scan->rs_read_stream); + /* * reinitialize scan descriptor */ @@ -1056,6 +1120,12 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * Must free the read stream before freeing the BufferAccessStrategy. + */ + if (scan->rs_read_stream) + read_stream_end(scan->rs_read_stream); + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4d324c78e5b..41d32d5d95d 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -25,6 +25,7 @@ #include "storage/bufpage.h" #include "storage/dsm.h" #include "storage/lockdefs.h" +#include "storage/read_stream.h" #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" @@ -70,6 +71,20 @@ typedef struct HeapScanDescData HeapTupleData rs_ctup; /* current tuple in scan, if any */ + /* For scans that stream reads */ + ReadStream *rs_read_stream; + + /* + * For sequential scans and TID range scans to stream reads. The read + * stream is allocated at the beginning of the scan and reset on rescan or + * when the scan direction changes. The scan direction is saved each time + * a new page is requested. If the scan direction changes from one page to + * the next, the read stream releases all previously pinned buffers and + * resets the prefetch block. + */ + ScanDirection rs_dir; + BlockNumber rs_prefetch_block; + /* * For parallel scans to store page allocation data. NULL when not * performing a parallel scan. -- 2.40.1