diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 0c3e2b0..3a1b1f7 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -167,6 +167,9 @@ static const struct } }; +/* Number of consecutive blocks to allocate per parallel seq scan batch */ +#define PARALLEL_SEQSCAN_PAGES_PER_BATCH 10 + /* Get the LOCKMODE for a given MultiXactStatus */ #define LOCKMODE_from_mxstatus(status) \ (tupleLockExtraInfo[TUPLOCK_from_mxstatus((status))].hwlock) @@ -298,6 +301,9 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_batchcurrblock = InvalidBlockNumber; + scan->rs_batchlastblock = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -1676,6 +1682,18 @@ heap_parallelscan_nextpage(HeapScanDesc scan) BlockNumber report_page = InvalidBlockNumber; ParallelHeapScanDesc parallel_scan; + /* + * First attempt read the next block from a sequence of blocks previously + * assigned to this worker. If only a single block is only ever assigned + * then we'll never find a block here. + */ + if (scan->rs_batchcurrblock != InvalidBlockNumber && + scan->rs_batchcurrblock < scan->rs_batchlastblock) + { + scan->rs_batchcurrblock++; + return scan->rs_batchcurrblock; + } + Assert(scan->rs_parallel); parallel_scan = scan->rs_parallel; @@ -1712,18 +1730,45 @@ retry: * The current block number is the next one that needs to be scanned, * unless it's InvalidBlockNumber already, in which case there are no more * blocks to scan. After remembering the current value, we must advance - * it so that the next call to this function returns the next block to be - * scanned. + * it so that it's set to the next block which needs to be scanned. + * + * To try to reduce contention on the mutex we allocate a range of blocks + * to each worker so that they only must acquire a lock once they've + * processed all of the blocks in their allocated range. */ page = parallel_scan->phs_cblock; if (page != InvalidBlockNumber) { - parallel_scan->phs_cblock++; + scan->rs_batchcurrblock = parallel_scan->phs_cblock; + scan->rs_batchlastblock = scan->rs_batchcurrblock + + PARALLEL_SEQSCAN_PAGES_PER_BATCH - 1; + + parallel_scan->phs_cblock += PARALLEL_SEQSCAN_PAGES_PER_BATCH; + if (parallel_scan->phs_cblock >= scan->rs_nblocks) - parallel_scan->phs_cblock = 0; - if (parallel_scan->phs_cblock == parallel_scan->phs_startblock) + { + /* + * We've stepped off the end of the range of blocks to scan, we'll + * need to shorten this batch to just the remaining blocks. + */ + scan->rs_batchlastblock = scan->rs_nblocks - 1; + + /* If we started the scan at block 0 then we're done */ + if (parallel_scan->phs_startblock == 0) + { + parallel_scan->phs_cblock = InvalidBlockNumber; + report_page = 0; + } + else + parallel_scan->phs_cblock = 0; + } + + if (parallel_scan->phs_startblock > 0 && + scan->rs_batchcurrblock < parallel_scan->phs_startblock && + scan->rs_batchlastblock >= parallel_scan->phs_startblock - 1) { parallel_scan->phs_cblock = InvalidBlockNumber; + scan->rs_batchlastblock = parallel_scan->phs_startblock - 1; report_page = parallel_scan->phs_startblock; } } diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 3fc726d..5be3ef6 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -71,6 +71,10 @@ typedef struct HeapScanDescData /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ ParallelHeapScanDesc rs_parallel; /* parallel scan information */ + /* Used to allocate a range of blocks for a parallel worker to scan */ + BlockNumber rs_batchcurrblock; /* current parallel block in batch */ + BlockNumber rs_batchlastblock; /* final parallel block in batch */ + /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ int rs_ntuples; /* number of visible tuples on page */