From 6615e10588c800a8ab0d72587996610ebf26faef Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 26 Feb 2024 15:41:32 -0500 Subject: [PATCH v5 5/5] Sequential scans and TID range scans stream reads Implementing streaming read support for heap sequential scans and TID range scans includes three parts: Allocate the streaming read object in heap_beginscan(). On rescan, reset the streaming read by release all pinned buffers and resetting the prefetch block. Implement a callback returning the next block to prefetch to the streaming read API. Invoke the streaming read API when a new page is needed and streaming reads are enabled. When the scan direction changes, reset the streaming read. ci-os-only: --- src/backend/access/heap/heapam.c | 88 ++++++++++++++++++++++++++++---- src/include/access/heapam.h | 16 +++++- 2 files changed, 93 insertions(+), 11 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index adde61fca60..9606d71f457 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -62,6 +62,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/standby.h" +#include "storage/streaming_read.h" #include "utils/datum.h" #include "utils/inval.h" #include "utils/relcache.h" @@ -224,6 +225,29 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_scan_pgsr_next(PgStreamingRead *pgsr, void *pgsr_private, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) pgsr_private; + + /* + * Hard-code ScanDirection to ForwardScanDirection since only forward + * scans support streaming reads. + */ + if (unlikely(!scan->rs_inited)) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + scan->rs_dir); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -326,6 +350,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + /* + * Initialize to ForwardScanDirection because it is most common and heap + * scans usually must go forwards before going backward. + */ + scan->rs_dir = ForwardScanDirection; + scan->rs_prefetch_block = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -468,6 +499,8 @@ heapbuildvis(TableScanDesc sscan) static inline void heapfetchbuf(HeapScanDesc scan, ScanDirection dir) { + Assert(scan->rs_pgsr); + /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -482,19 +515,23 @@ heapfetchbuf(HeapScanDesc scan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (unlikely(!scan->rs_inited)) + /* + * If the scan direction is changing, reset the prefetch block to the + * current block. Otherwise, we will incorrectly prefetch the blocks + * between the prefetch block and the current block again before + * prefetching blocks in the new, correct scan direction. + */ + if (unlikely(scan->rs_dir != dir)) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); - Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; + scan->rs_prefetch_block = scan->rs_cblock; + pg_streaming_read_reset(scan->rs_pgsr); } - else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); + scan->rs_dir = dir; + + scan->rs_cbuf = pg_streaming_read_buffer_get_next(scan->rs_pgsr, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } /* @@ -823,6 +860,7 @@ continue_page: scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -913,6 +951,7 @@ continue_page: ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -995,6 +1034,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, else scan->rs_parallelworkerdata = NULL; + scan->rs_pgsr = NULL; + /* * we do this here instead of in initscan() because heap_rescan also calls * initscan() and we don't want to allocate memory again @@ -1006,6 +1047,22 @@ heap_beginscan(Relation relation, Snapshot snapshot, initscan(scan, key, false); + /* + * We do not know the scan direction yet. If the scan does not end up + * being a forward scan, the streaming read object will be freed. + */ + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || + scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) + { + scan->rs_pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_SEQUENTIAL, + scan, + 0, + scan->rs_strategy, + BMR_REL(scan->rs_base.rs_rd), + MAIN_FORKNUM, + heap_scan_pgsr_next); + } + return (TableScanDesc) scan; } @@ -1044,6 +1101,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, * reinitialize scan descriptor */ initscan(scan, key, true); + + /* + * The streaming read object is reset on rescan. This must be done after + * initscan(), as some state referred to by pg_streaming_read_reset() is + * reset in initscan(). + */ + if (scan->rs_pgsr) + pg_streaming_read_reset(scan->rs_pgsr); } void @@ -1059,6 +1124,9 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + if (scan->rs_pgsr) + pg_streaming_read_free(scan->rs_pgsr); + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index e2b1b2a3ad9..a476ae62a8f 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -72,9 +72,23 @@ typedef struct HeapScanDescData */ ParallelBlockTableScanWorkerData *rs_parallelworkerdata; - /* these fields only used in page-at-a-time mode and for bitmap scans */ + /* only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ int rs_ntuples; /* number of visible tuples on page */ + + /* + * Fields used for streaming reads by sequential scans and TID range + * scans. The streaming read object is allocated at the beginning of the + * scan and reset on rescan or when the scan direction changes. The scan + * direction is saved each time a new page is requested. If the scan + * direction changes from one page to the next, the streaming read object + * releases all previously pinned buffers and resets the prefetch block. + */ + BlockNumber rs_prefetch_block; + ScanDirection rs_dir; + struct PgStreamingRead *rs_pgsr; + + /* only used in page-at-a-time mode and for bitmap scans */ OffsetNumber rs_vistuples[MaxHeapTuplesPerPage]; /* their offsets */ } HeapScanDescData; typedef struct HeapScanDescData *HeapScanDesc; -- 2.40.1