From 1a87519763a0fa67433a0049dcb3f9f021bd5e11 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 29 Jan 2024 12:22:38 -0500 Subject: [PATCH v1 4/4] Sequential scans support streaming read Add streaming read support for sequential scans. Do so by implementing the streaming read callback to get the next block and save this block in the scan descriptor. The PgStreamingRead object is allocated in initscan(). This means it will be freed and reallocated on rescan. Implementing a streaming read reset function is a TODO for that API. Currently, only forwards scans are supported by the streaming read API; so, if a scan switches from forwards to backwards, the PgStreamingRead object will need to be freed. This also means that if a scan switches from backwards to forwards, it will not use streaming reads. Distinguishing between a scan that has yet to be initialized, one that doesn't support streaming reads and one that has switched scan directions is one reason why it is difficult to wait until heapfetchbuf() to allocate the PgStreamingRead object. --- src/backend/access/heap/heapam.c | 106 ++++++++++++++++++++++++++++--- src/include/access/heapam.h | 3 + 2 files changed, 100 insertions(+), 9 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 9e3e6d8b52b..cc20e0f972c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -65,6 +65,7 @@ #include "storage/smgr.h" #include "storage/spin.h" #include "storage/standby.h" +#include "storage/streaming_read.h" #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -228,6 +229,27 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) pgsr_private; + + /* Only forward scans support streaming reads */ + if (!scan->rs_inited) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, + ForwardScanDirection); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + ForwardScanDirection); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -345,6 +367,36 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) */ if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN) pgstat_count_heap_scan(scan->rs_base.rs_rd); + + scan->rs_prefetch_block = InvalidBlockNumber; + + /* pgsr is freed and reallocated on rescan */ + if (scan->pgsr) + pg_streaming_read_free(scan->pgsr); + scan->pgsr = NULL; + scan->rs_prefetch_block = InvalidBlockNumber; + + /* + * This streaming read cannot be allocated in the per tuple memory context + * which is the current memory context during heapgettup[_pagemode](), as + * the per tuple context is often reset before the end of the query. There + * was discussion of allocating the pgsr when rs_inited is false. We could + * switch into a memory context that doesn't get reset to allocate it + * there, but 1) we probably want to reuse the pgsr across rescans and 2) + * we have to free the pgsr if the scan changes from forwards to a + * backwards scan anyway, so we better just allocate it here. + */ + if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) && + (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN)) + { + scan->pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_SEQUENTIAL, + scan, + 0, + scan->rs_strategy, + BMR_REL(scan->rs_base.rs_rd), + MAIN_FORKNUM, + heap_pgsr_next_single); + } } /* @@ -488,19 +540,41 @@ heapfetchbuf(TableScanDesc sscan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (!scan->rs_inited) + /* + * Backwards scans aren't supported with streaming read. At the time of + * allocation, the scan direction is not determined. Note that this means + * that if the scan switches from backwards to forwards, the forward scan + * will not use streaming reads + */ + if (!ScanDirectionIsForward(dir) && scan->pgsr) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); - Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; + pg_streaming_read_free(scan->pgsr); + scan->pgsr = NULL; + scan->rs_prefetch_block = InvalidBlockNumber; + } + + if (scan->pgsr) + { + scan->rs_cbuf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); + { + if (!scan->rs_inited) + { + scan->rs_cblock = heapgettup_initial_block(scan, dir); + Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); + scan->rs_inited = true; + } + else + scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); + /* read block if valid */ + if (BlockNumberIsValid(scan->rs_cblock)) + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, + scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); + } } /* @@ -1001,6 +1075,15 @@ heap_beginscan(Relation relation, Snapshot snapshot, else scan->rs_parallelworkerdata = NULL; + /* + * TODO: implement pg_streaming_read_reset(), then allocate the streaming + * reads here. Currently, they are allocated in initscan() which will free + * and reallocate the pgsr on each rescan. Fixing this is especially + * important for nested loop join. For now, set this to NULL to ensure the + * streaming read is allocated in initscan(). + */ + scan->pgsr = NULL; + /* * we do this here instead of in initscan() because heap_rescan also calls * initscan() and we don't want to allocate memory again @@ -1065,6 +1148,11 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + if (scan->pgsr) + pg_streaming_read_free(scan->pgsr); + scan->pgsr = NULL; + scan->rs_prefetch_block = InvalidBlockNumber; + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4a3a017c33a..8e702454367 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -59,6 +59,7 @@ typedef struct HeapScanDescData bool rs_inited; /* false = scan not init'd yet */ OffsetNumber rs_coffset; /* current offset # in non-page-at-a-time mode */ BlockNumber rs_cblock; /* current block # in scan, if any */ + BlockNumber rs_prefetch_block; /* block being prefetched */ Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ @@ -72,6 +73,8 @@ typedef struct HeapScanDescData */ ParallelBlockTableScanWorkerData *rs_parallelworkerdata; + struct PgStreamingRead *pgsr; + /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ int rs_ntuples; /* number of visible tuples on page */ -- 2.37.2