From 62416fee879a1ea6a7ca7ec132227701d35d7468 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 9 Jan 2023 17:33:43 -0500 Subject: [PATCH v7 2/6] Add heapgettup_initial_block() helper --- src/backend/access/heap/heapam.c | 226 ++++++++++++++----------------- 1 file changed, 102 insertions(+), 124 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 75f68b4e79..ff2e64822a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -484,6 +484,67 @@ heapgetpage(TableScanDesc sscan, BlockNumber block) } +/* + * heapgettup_initial_block - helper for heapgettup() and heapgettup_pagemode() + * + * Determines and returns the first block to scan, given the scan direction and + * whether or not it is parallel. If the relation is empty or the current + * parallel worker finds the scan has already been completed by other workers, + * return InvalidBlockNumber. + * + * Note that this helper does set state in the scan descriptor. + */ +static inline BlockNumber +heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) +{ + Assert(!ScanDirectionIsNoMovement(dir)); + Assert(!scan->rs_inited); + + /* return null immediately if relation is empty */ + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + return InvalidBlockNumber; + + scan->rs_inited = true; + + /* forward and serial */ + if (ScanDirectionIsForward(dir) && scan->rs_base.rs_parallel == NULL) + return scan->rs_startblock; + + /* forward and parallel */ + if (ScanDirectionIsForward(dir)) + { + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + + return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + } + + /* backward parallel scan not supported */ + Assert(scan->rs_base.rs_parallel == NULL); + + /* + * Disable reporting to syncscan logic in a backwards scan; it's not very + * likely anyone else is doing the same thing at the same time, and much + * more likely that we'll just bollix things for forward scanners. + */ + scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC; + + /* + * Start from last page of the scan. Ensure we take into account + * rs_numblocks if it's been adjusted by heap_setscanlimits(). + */ + if (scan->rs_numblocks != InvalidBlockNumber) + return (scan->rs_startblock + scan->rs_numblocks - 1) % scan->rs_nblocks; + + if (scan->rs_startblock > 0) + return scan->rs_startblock - 1; + + return scan->rs_nblocks - 1; +} + /* ---------------- * heapgettup - fetch next heap tuple * @@ -534,41 +595,21 @@ heapgettup(HeapScanDesc scan, { if (!scan->rs_inited) { + block = heapgettup_initial_block(scan, dir); + /* - * return null immediately if relation is empty + * Check if we have reached the end of the scan already. This + * could happen if the table is empty or if the other workers in a + * parallel scan have already finished the scan. */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + if (block == InvalidBlockNumber) { Assert(!BufferIsValid(scan->rs_cbuf)); tuple->t_data = NULL; return; } - if (scan->rs_base.rs_parallel != NULL) - { - ParallelBlockTableScanDesc pbscan = - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - ParallelBlockTableScanWorker pbscanwork = - scan->rs_parallelworkerdata; - - table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - /* Other processes might have already finished the scan. */ - if (block == InvalidBlockNumber) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - } - else - block = scan->rs_startblock; /* first page */ heapgetpage((TableScanDesc) scan, block); - lineoff = FirstOffsetNumber; /* first offnum */ - scan->rs_inited = true; + lineoff = FirstOffsetNumber; } else { @@ -589,60 +630,39 @@ heapgettup(HeapScanDesc scan, } else { - /* backward parallel scan not supported */ - Assert(scan->rs_base.rs_parallel == NULL); - if (!scan->rs_inited) { + block = heapgettup_initial_block(scan, dir); + /* - * return null immediately if relation is empty + * Check if we have reached the end of the scan already. This + * could happen if the table is empty. */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + if (block == InvalidBlockNumber) { Assert(!BufferIsValid(scan->rs_cbuf)); tuple->t_data = NULL; return; } - /* - * Disable reporting to syncscan logic in a backwards scan; it's - * not very likely anyone else is doing the same thing at the same - * time, and much more likely that we'll just bollix things for - * forward scanners. - */ - scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC; - - /* - * Start from last page of the scan. Ensure we take into account - * rs_numblocks if it's been adjusted by heap_setscanlimits(). - */ - if (scan->rs_numblocks != InvalidBlockNumber) - block = (scan->rs_startblock + scan->rs_numblocks - 1) % scan->rs_nblocks; - else if (scan->rs_startblock > 0) - block = scan->rs_startblock - 1; - else - block = scan->rs_nblocks - 1; heapgetpage((TableScanDesc) scan, block); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); + + page = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page); + lines = PageGetMaxOffsetNumber(page); + lineoff = lines; /* final offnum */ } else { /* continue from previously returned page/tuple */ block = scan->rs_cblock; /* current page */ - } - - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - page = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page); - lines = PageGetMaxOffsetNumber(page); + page = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page); + lines = PageGetMaxOffsetNumber(page); - if (!scan->rs_inited) - { - lineoff = lines; /* final offnum */ - scan->rs_inited = true; - } - else - { /* * The previous returned tuple may have been vacuumed since the * previous scan when we use a non-MVCC snapshot, so we must @@ -847,41 +867,21 @@ heapgettup_pagemode(HeapScanDesc scan, { if (!scan->rs_inited) { + block = heapgettup_initial_block(scan, dir); + /* - * return null immediately if relation is empty + * Check if we have reached the end of the scan already. This + * could happen if the table is empty or if the other workers in a + * parallel scan have already finished the scan. */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + if (block == InvalidBlockNumber) { Assert(!BufferIsValid(scan->rs_cbuf)); tuple->t_data = NULL; return; } - if (scan->rs_base.rs_parallel != NULL) - { - ParallelBlockTableScanDesc pbscan = - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - ParallelBlockTableScanWorker pbscanwork = - scan->rs_parallelworkerdata; - - table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - /* Other processes might have already finished the scan. */ - if (block == InvalidBlockNumber) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - } - else - block = scan->rs_startblock; /* first page */ heapgetpage((TableScanDesc) scan, block); lineindex = 0; - scan->rs_inited = true; } else { @@ -899,60 +899,38 @@ heapgettup_pagemode(HeapScanDesc scan, } else { - /* backward parallel scan not supported */ - Assert(scan->rs_base.rs_parallel == NULL); - if (!scan->rs_inited) { + block = heapgettup_initial_block(scan, dir); + /* - * return null immediately if relation is empty + * Check if we have reached the end of the scan already. This + * could happen if the table is empty. */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + if (block == InvalidBlockNumber) { Assert(!BufferIsValid(scan->rs_cbuf)); tuple->t_data = NULL; return; } - /* - * Disable reporting to syncscan logic in a backwards scan; it's - * not very likely anyone else is doing the same thing at the same - * time, and much more likely that we'll just bollix things for - * forward scanners. - */ - scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC; - - /* - * Start from last page of the scan. Ensure we take into account - * rs_numblocks if it's been adjusted by heap_setscanlimits(). - */ - if (scan->rs_numblocks != InvalidBlockNumber) - block = (scan->rs_startblock + scan->rs_numblocks - 1) % scan->rs_nblocks; - else if (scan->rs_startblock > 0) - block = scan->rs_startblock - 1; - else - block = scan->rs_nblocks - 1; heapgetpage((TableScanDesc) scan, block); + page = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); + lines = scan->rs_ntuples; + lineindex = lines - 1; } else { /* continue from previously returned page/tuple */ block = scan->rs_cblock; /* current page */ - } - page = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); - lines = scan->rs_ntuples; - - if (!scan->rs_inited) - { - lineindex = lines - 1; - scan->rs_inited = true; - } - else - { + page = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); + lines = scan->rs_ntuples; lineindex = scan->rs_cindex - 1; } + /* block and lineindex now reference the previous visible tid */ linesleft = lineindex + 1; -- 2.37.2