diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 01fad38..2af778a 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1240,6 +1240,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for parallel workers to finish computing. + ParallelBitmapScan + Waiting for leader backend to complete the TidBitmap. + + SafeSnapshot Waiting for a snapshot for a READ ONLY DEFERRABLE transaction. diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1ce42ea..76a99a1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1753,6 +1753,22 @@ retry: } /* ---------------- + * heap_update_snapshot + * + * Update snapshot info in heap scan descriptor. + * ---------------- + */ +void +heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot) +{ + Assert(IsMVCCSnapshot(snapshot)); + + RegisterSnapshot(snapshot); + scan->rs_snapshot = snapshot; + scan->rs_temp_snap = true; +} + +/* ---------------- * heap_getnext - retrieve next tuple in scan * * Fix to work with index relations. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index e01fe6d..090aaf3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -29,6 +29,7 @@ #include "executor/nodeForeignscan.h" #include "executor/nodeSeqscan.h" #include "executor/tqueue.h" +#include "executor/nodeBitmapHeapscan.h" #include "nodes/nodeFuncs.h" #include "optimizer/planmain.h" #include "optimizer/planner.h" @@ -205,6 +206,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecCustomScanEstimate((CustomScanState *) planstate, e->pcxt); break; + case T_BitmapHeapScanState: + ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, + e->pcxt); + break; default: break; } @@ -257,6 +262,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecCustomScanInitializeDSM((CustomScanState *) planstate, d->pcxt); break; + case T_BitmapHeapScanState: + ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, + d->pcxt); + break; + default: break; } @@ -733,6 +743,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ExecCustomScanInitializeWorker((CustomScanState *) planstate, toc); break; + case T_BitmapHeapScanState: + ExecBitmapHeapInitializeWorker( + (BitmapHeapScanState *) planstate, toc); + break; default: break; } diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c index f18827d..b806cb5 100644 --- a/src/backend/executor/nodeBitmapHeapscan.c +++ b/src/backend/executor/nodeBitmapHeapscan.c @@ -48,10 +48,14 @@ #include "utils/snapmgr.h" #include "utils/tqual.h" +#include "miscadmin.h" +#include "pgstat.h" static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node); static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres); +static bool pbms_is_leader(ParallelBitmapInfo * pbminfo); +static void pbms_set_parallel(PlanState *node); /* ---------------------------------------------------------------- * BitmapHeapNext @@ -68,11 +72,18 @@ BitmapHeapNext(BitmapHeapScanState *node) TBMIterator *tbmiterator; TBMIterateResult *tbmres; + ParallelBitmapInfo *pbminfo = node->parallel_bitmap; #ifdef USE_PREFETCH TBMIterator *prefetch_iterator; #endif OffsetNumber targoffset; TupleTableSlot *slot; + ParallelTBMInfo *parallel_tbm = NULL; + + /* Get the parallel TBM address in shared memory using offset */ + if (pbminfo) + parallel_tbm = (ParallelTBMInfo *) ((char *) pbminfo + + pbminfo->tbm_offset); /* * extract necessary information from index scan node @@ -87,6 +98,35 @@ BitmapHeapNext(BitmapHeapScanState *node) prefetch_iterator = node->prefetch_iterator; #endif + /* -------------------------------------------------------------------- + * Parallel Bitmap Heap Scan Algorithm + * + * The first worker to see the state of ParallelBitmapInfo as PBM_INITIAL + * becomes leader and sets the state to PBM_INPROGRESS. All other workers + * see the state as PBM_INPROGRESS, and will wait for leader to finish + * building the TIDBitmap. + * + * Leader Processing: + * Create TIDBitmap using DSA memory. + * Restore local TIDBitmap variable information into + * ParallelBitmapInfo so that other worker can see those. + * Convert the TIDBitmap into shared chunk and page array + * Set state to PBM_FINISHED. + * Wake up other workers. + * + * Other Worker Processing: + * Wait until leader creates shared TIDBitmap. + * Copy TIDBitmap from info from ParallelBitmapInfo to local TIDBitmap. + * + * Iterate and process the pages. + * a) In this phase each worker will iterate over shared page and chunk + * array and select heap pages one by one. + * b) Since multiple workers are iterating over same page and chunk + * array we need to have a shared iterator, so we grab a LWLock + * and iterate within a lock. + * ---------------------------------------------------------------- + */ + /* * If we haven't yet performed the underlying index scan, do it, and begin * the iteration over the bitmap. @@ -101,36 +141,111 @@ BitmapHeapNext(BitmapHeapScanState *node) */ if (tbm == NULL) { - tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node)); + /* + * Process the lower level node only if either we are running in non + * parallel mode or we are leader. + * + * In parallel mode leader will immediately come out of the function, + * but all other worker will be blocked until leader wake them up. + */ + if (pbminfo == NULL || pbms_is_leader(pbminfo)) + { + /* + * If we are in parallel mode recursively process the outer node + * and set parallel flag in lower level bitmap index scan. This + * flag will be used by bitmap index node to identify whether it + * needs to create a shared pagetable or local pagetable. + */ + if (pbminfo) + pbms_set_parallel(outerPlanState(node)); - if (!tbm || !IsA(tbm, TIDBitmap)) - elog(ERROR, "unrecognized result from subplan"); + tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node)); + if (!tbm || !IsA(tbm, TIDBitmap)) + elog(ERROR, "unrecognized result from subplan"); + + /* + * Copy the local TIDBitmap information to the shared location so + * that other workers can use them. + */ + if (pbminfo) + tbm_update_shared_members(tbm, parallel_tbm); + } + else + { + /* + * By this time leader has already created the shared TBM. Here we + * need to create a local TBM and copy information from shared + * location. + */ + tbm = tbm_attach(parallel_tbm, node->ss.ps.state->es_query_dsa); + } node->tbm = tbm; - node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm); + + /* + * If we are running in parallel mode then initialize a shared + * iterator otherwise a local iterator. + */ + if (pbminfo) + node->tbmiterator = tbmiterator = + tbm_begin_shared_iterate(tbm, parallel_tbm, false); + else + node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm); + node->tbmres = tbmres = NULL; #ifdef USE_PREFETCH if (node->prefetch_maximum > 0) { - node->prefetch_iterator = prefetch_iterator = tbm_begin_iterate(tbm); + if (pbminfo) + node->prefetch_iterator = prefetch_iterator = + tbm_begin_shared_iterate(tbm, parallel_tbm, true); + else + node->prefetch_iterator = prefetch_iterator = + tbm_begin_iterate(tbm); + node->prefetch_pages = 0; node->prefetch_target = -1; } #endif /* USE_PREFETCH */ + + /* + * copy local TBM information in shared memory before waking up the + * other workers. Other workers will create there own TBM and copy + * information from shared memory. + */ + if (pbminfo) + { + /* Change the state under a lock */ + SpinLockAcquire(&pbminfo->state_mutex); + pbminfo->state = PBM_FINISHED; + SpinLockRelease(&pbminfo->state_mutex); + + /* Wake up all other workers. */ + ConditionVariableBroadcast(&pbminfo->cv); + } } for (;;) { Page dp; ItemId lp; + bool need_prefetch = false; + int *prefetch_target; /* * Get next page of results if needed */ if (tbmres == NULL) { - node->tbmres = tbmres = tbm_iterate(tbmiterator); + /* + * If we are in parallel mode perform shared iterate otherwise + * local iterate. + */ + if (!pbminfo) + node->tbmres = tbmres = tbm_iterate(tbmiterator); + else + node->tbmres = tbmres = tbm_shared_iterate(tbmiterator); if (tbmres == NULL) { /* no more entries in the bitmap */ @@ -138,17 +253,50 @@ BitmapHeapNext(BitmapHeapScanState *node) } #ifdef USE_PREFETCH - if (node->prefetch_pages > 0) + /* + * If we are in parallel mode then acquire prefetch_mutex and + * check prefetch_pages from shared location. + */ + if (pbminfo) { + SpinLockAcquire(&pbminfo->prefetch_mutex); + /* The main iterator has closed the distance by one page */ - node->prefetch_pages--; + if (pbminfo->prefetch_pages > 0) + pbminfo->prefetch_pages--; + else + need_prefetch = true; + SpinLockRelease(&pbminfo->prefetch_mutex); } - else if (prefetch_iterator) + else { - /* Do not let the prefetch iterator get behind the main one */ - TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); + /* The main iterator has closed the distance by one page */ + if (node->prefetch_pages > 0) + node->prefetch_pages--; + else + need_prefetch = true; + } + + if (prefetch_iterator && need_prefetch) + { + TBMIterateResult *tbmpre; - if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno) + /* Do not let the prefetch iterator get behind the main one */ + if (!pbminfo) + tbmpre = tbm_iterate(prefetch_iterator); + else + tbmpre = tbm_shared_iterate(prefetch_iterator); + + /* + * In case of parallel mode we can only ensure that prefetch + * iterator is not behind the main iterator, but we can not + * ensure that the current blockno in the main iterator and + * in the prefetch iterator is same. It's possible that + * whatever blockno we are prefetching is getting processed + * by some other worker. + */ + if ((pbminfo == NULL) && + (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)) elog(ERROR, "prefetch and main iterators are out of sync"); } #endif /* USE_PREFETCH */ @@ -183,19 +331,40 @@ BitmapHeapNext(BitmapHeapScanState *node) #ifdef USE_PREFETCH /* - * Increase prefetch target if it's not yet at the max. Note that - * we will increase it to zero after fetching the very first - * page/tuple, then to one after the second tuple is fetched, then - * it doubles as later pages are fetched. + * If we are in shared mode then use prefetch_target from shared + * location i.e pbminfo otherwise directly from node. */ - if (node->prefetch_target >= node->prefetch_maximum) - /* don't increase any further */ ; - else if (node->prefetch_target >= node->prefetch_maximum / 2) - node->prefetch_target = node->prefetch_maximum; - else if (node->prefetch_target > 0) - node->prefetch_target *= 2; + if (pbminfo == NULL) + prefetch_target = &node->prefetch_target; else - node->prefetch_target++; + prefetch_target = &pbminfo->prefetch_target; + + /* Increase prefetch target if it's not yet at the max. */ + if (*prefetch_target < node->prefetch_maximum) + { + /* If we are in parallel mode then grab prefetch_mutex */ + if (pbminfo != NULL) + SpinLockAcquire(&pbminfo->prefetch_mutex); + + /* + * Increase prefetch target if it's not yet at the max. Note + * that we will increase it to zero after fetching the very + * first page/tuple, then to one after the second tuple is + * fetched, then it doubles as later pages are fetched. + */ + if (*prefetch_target >= node->prefetch_maximum) + /* don't increase any further */ ; + else if (*prefetch_target >= node->prefetch_maximum / 2) + *prefetch_target = node->prefetch_maximum; + else if (*prefetch_target > 0) + *prefetch_target *= 2; + else + (*prefetch_target)++; + + if (pbminfo != NULL) + SpinLockRelease(&pbminfo->prefetch_mutex); + } + #endif /* USE_PREFETCH */ } else @@ -211,8 +380,25 @@ BitmapHeapNext(BitmapHeapScanState *node) * Try to prefetch at least a few pages even before we get to the * second page if we don't stop reading after the first tuple. */ - if (node->prefetch_target < node->prefetch_maximum) - node->prefetch_target++; + if (pbminfo == NULL) + { + if (node->prefetch_target < node->prefetch_maximum) + node->prefetch_target++; + } + else + { + /* + * If we are in parallel mode then grab prefetch_mutex before + * updating prefetch target. + */ + if (pbminfo->prefetch_target < node->prefetch_maximum) + { + SpinLockAcquire(&pbminfo->prefetch_mutex); + if (pbminfo->prefetch_target < node->prefetch_maximum) + pbminfo->prefetch_target++; + SpinLockRelease(&pbminfo->prefetch_mutex); + } + } #endif /* USE_PREFETCH */ } @@ -236,21 +422,59 @@ BitmapHeapNext(BitmapHeapScanState *node) */ if (prefetch_iterator) { - while (node->prefetch_pages < node->prefetch_target) + int *prefetch_pages; + int prefetch_target; + + /* + * If parallel bitmap info available means we are running in + * parallel mode. So use parallel iterator for prefetching. + */ + if (pbminfo) + { + prefetch_pages = &pbminfo->prefetch_pages; + prefetch_target = pbminfo->prefetch_target; + } + else { - TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); + prefetch_pages = &node->prefetch_pages; + prefetch_target = node->prefetch_target; + } + + /* + * We are checking the prefetch_pages without mutex. Henceforth in + * case of parallel mode there can be some extra prefetch. Should + * we acquire mutex and recheck before iterating? + */ + while (*prefetch_pages < prefetch_target) + { + TBMIterateResult *tbmpre; + + if (!pbminfo) + tbmpre = tbm_iterate(prefetch_iterator); + else + tbmpre = tbm_shared_iterate(prefetch_iterator); if (tbmpre == NULL) { /* No more pages to prefetch */ tbm_end_iterate(prefetch_iterator); + node->prefetch_iterator = prefetch_iterator = NULL; break; } - node->prefetch_pages++; + + if (pbminfo != NULL) + SpinLockAcquire(&pbminfo->prefetch_mutex); + + (*prefetch_pages)++; + + if (pbminfo != NULL) + SpinLockRelease(&pbminfo->prefetch_mutex); + PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno); } } + #endif /* USE_PREFETCH */ /* @@ -465,6 +689,16 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node) node->tbmres = NULL; node->prefetch_iterator = NULL; + /* reset parallel bitmap scan info, if present */ + if (node->parallel_bitmap) + { + ParallelBitmapInfo *pbminfo = node->parallel_bitmap; + + pbminfo->state = PBM_INITIAL; + pbminfo->prefetch_pages = 0; + pbminfo->prefetch_target = -1; + } + ExecScanReScan(&node->ss); /* @@ -567,6 +801,7 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags) scanstate->prefetch_target = 0; /* may be updated below */ scanstate->prefetch_maximum = target_prefetch_pages; + scanstate->parallel_bitmap = NULL; /* * Miscellaneous initialization @@ -651,3 +886,168 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags) */ return scanstate; } + +/*---------------- + * pbms_is_leader + * + * Check if we are the first one to come here, if yes then + * we become leader, otherwise we need to wait until leader + * create the shared TID bitmap and wake us up. + * --------------- + */ +static bool +pbms_is_leader(ParallelBitmapInfo * pbminfo) +{ + bool needWait = false; + bool leader = false; + + while (1) + { + /*--------------- + * Check the current state + * If state is + * PBM_INITIAL : then we become leader and set it to PBM_INPROGRESS + * PBM_INPROGRESS : then we need to wait till leader creates BITMAP + * PBM_FINISHED : BITMAP is ready so no need to wait. + *--------------- + */ + SpinLockAcquire(&pbminfo->state_mutex); + + if (pbminfo->state == PBM_INITIAL) + { + pbminfo->state = PBM_INPROGRESS; + leader = true; + } + else if (pbminfo->state == PBM_INPROGRESS) + needWait = true; + else + needWait = false; + + SpinLockRelease(&pbminfo->state_mutex); + + /* If we are leader or leader has already created a TIDBITMAP */ + if (leader || !needWait) + break; + + /* Sleep until leader send wake up signal */ + ConditionVariableSleep(&pbminfo->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN); + } + + ConditionVariableCancelSleep(); + + return leader; +} + +/*------------------- + * pbms_set_parallel + * + * Parallel bitmap heap scan set below index scan node as parallel + * so that it can create shared TIDBitmap. + * ----------------- + */ +static void +pbms_set_parallel(PlanState *node) +{ + /* + * In case of BitmapOr or BitmapAnd set first bitmap index scan node as + * parallel, because only first node will create the main bitmap other + * bitmaps will be merged to the first bitmap so no need to create them in + * shared memory. + */ + switch (node->type) + { + case T_BitmapIndexScanState: + ((BitmapIndexScanState *) node)->biss_Parallel = true; + break; + case T_BitmapOrState: + pbms_set_parallel(((BitmapOrState *) node)->bitmapplans[0]); + break; + case T_BitmapAndState: + pbms_set_parallel(((BitmapAndState *) node)->bitmapplans[0]); + break; + default: + break; + } +} + +/* ---------------------------------------------------------------- + * ExecBitmapHeapEstimate + * + * estimates the space required to serialize bitmap scan node. + * ---------------------------------------------------------------- + */ +void +ExecBitmapHeapEstimate(BitmapHeapScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + Size offset = add_size(offsetof(ParallelBitmapInfo, + phs_snapshot_data), + EstimateSnapshotSpace(estate->es_snapshot)); + + /* Estimate the size for sharing parallel TBM info. */ + node->pscan_len = tbm_estimate_parallel_tbminfo(offset); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecBitmapHeapInitializeDSM + * + * Set up a parallel bitmap heap scan descriptor. + * ---------------------------------------------------------------- + */ +void +ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, + ParallelContext *pcxt) +{ + ParallelBitmapInfo *pbminfo; + ParallelTBMInfo *parallel_tbm; + EState *estate = node->ss.ps.state; + Size offset = add_size(offsetof(ParallelBitmapInfo, + phs_snapshot_data), + EstimateSnapshotSpace(estate->es_snapshot)); + + pbminfo = shm_toc_allocate(pcxt->toc, node->pscan_len); + + /* Offset to parallel TBM info. */ + pbminfo->tbm_offset = offset; + + parallel_tbm = (ParallelTBMInfo *) (((char *) pbminfo) + offset); + tbm_init_shared_iterator(parallel_tbm); + + /* Initialize mutex to protect prefetch pages and target */ + SpinLockInit(&pbminfo->prefetch_mutex); + pbminfo->prefetch_pages = 0; + pbminfo->prefetch_target = -1; + + /* Initialize mutex to protect current state */ + SpinLockInit(&pbminfo->state_mutex); + pbminfo->state = PBM_INITIAL; + + ConditionVariableInit(&pbminfo->cv); + SerializeSnapshot(estate->es_snapshot, pbminfo->phs_snapshot_data); + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pbminfo); + + node->parallel_bitmap = pbminfo; +} + +/* ---------------------------------------------------------------- + * ExecBitmapHeapInitializeWorker + * + * Copy relevant information from TOC into planstate. + * ---------------------------------------------------------------- + */ +void +ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc) +{ + ParallelBitmapInfo *pbminfo; + Snapshot snapshot; + + pbminfo = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + node->parallel_bitmap = pbminfo; + + snapshot = RestoreSnapshot(pbminfo->phs_snapshot_data); + heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot); +} diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c index 4274e9a..56f8376 100644 --- a/src/backend/executor/nodeBitmapIndexscan.c +++ b/src/backend/executor/nodeBitmapIndexscan.c @@ -82,6 +82,13 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node) } /* + * If parallel flag is set then set flag in TIDBitmap to indicate that we + * need a shared page table. + */ + if (node->biss_Parallel) + tbm_set_parallel(tbm, node->ss.ps.state->es_query_dsa); + + /* * Get TIDs from index and insert into bitmap */ while (doscan) diff --git a/src/backend/nodes/tidbitmap.c b/src/backend/nodes/tidbitmap.c index 36102b5..bb321bd 100644 --- a/src/backend/nodes/tidbitmap.c +++ b/src/backend/nodes/tidbitmap.c @@ -42,7 +42,15 @@ #include "access/htup_details.h" #include "nodes/bitmapset.h" +#include "nodes/execnodes.h" #include "nodes/tidbitmap.h" +#include "storage/condition_variable.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/dsa.h" +#include "utils/hsearch.h" +#include "utils/relptr.h" /* * The maximum number of tuples per page is not large (typically 256 with @@ -102,6 +110,8 @@ typedef struct PagetableEntry bitmapword words[Max(WORDS_PER_PAGE, WORDS_PER_CHUNK)]; } PagetableEntry; +relptr_declare(PagetableEntry, RelptrPagetableEntry); + /* * We want to avoid the overhead of creating the hashtable, which is * comparatively large, when not necessary. Particularly when we are using a @@ -136,9 +146,17 @@ struct TIDBitmap bool iterating; /* tbm_begin_iterate called? */ uint32 lossify_start; /* offset to start lossifying hashtable at */ PagetableEntry entry1; /* used when status == TBM_ONE_PAGE */ + bool is_shared; /* need to build shared tbm if set */ + dsa_pointer dsa_data; /* dsa_pointer to the element array */ + int dsa_entries; /* actual allocated entries */ + dsa_area *area; /* reference to per-query shared memory area */ /* these are valid when iterating is true: */ PagetableEntry **spages; /* sorted exact-page list, or NULL */ PagetableEntry **schunks; /* sorted lossy-chunk list, or NULL */ + char *base; /* base pointer of the element array */ + RelptrPagetableEntry *relpages; /* page array of relptr */ + RelptrPagetableEntry *relchunks; /* chunk array of relptr */ + ParallelTBMInfo *parallel_tbm; /* reference to parallel TBM */ }; /* @@ -153,9 +171,43 @@ struct TBMIterator int spageptr; /* next spages index */ int schunkptr; /* next schunks index */ int schunkbit; /* next bit to check in current schunk */ + TBMSharedIterator *shareditr; /* reference to shared iterator info */ TBMIterateResult output; /* MUST BE LAST (because variable-size) */ }; +/* + * same as TBMIterator but, it will be allocated in shared memory. It also has + * LWLock to protect the shared members. However it don't have TBMIterateResult + * because results should be local for each worker. So for using shared + * iterator we need to use local TBMIterator and that will be wrapper around + * the shared iterator. Henceforth, shared member will be accessed from + * shared iterator and result will be in local iterator which is wrapper + * around shared iterator. + */ +struct TBMSharedIterator +{ + int spageptr; /* next spages index */ + int schunkptr; /* next schunks index */ + int schunkbit; /* next bit to check in current schunk */ + LWLock lock; /* lock to protect the shared access */ +}; + +/* + * Holds shared members of TIDBitmap for parallel bitmap scan. + */ +struct ParallelTBMInfo +{ + dsa_pointer dsa_data; /* dsa pointers for all kind of pages */ + dsa_pointer spages; + dsa_pointer schunks; + int nentries; /* number of entries in pagetable */ + int maxentries; /* limit on same to meet maxbytes */ + int npages; /* number of exact entries in pagetable */ + int nchunks; /* number of lossy entries in pagetable */ + int dsa_entries; /* total item in dsa_pages */ + TBMSharedIterator tbmiterator; + TBMSharedIterator prefetch_iterator; +}; /* Local function prototypes */ static void tbm_union_page(TIDBitmap *a, const PagetableEntry *bpage); @@ -168,6 +220,10 @@ static bool tbm_page_is_lossy(const TIDBitmap *tbm, BlockNumber pageno); static void tbm_mark_page_lossy(TIDBitmap *tbm, BlockNumber pageno); static void tbm_lossify(TIDBitmap *tbm); static int tbm_comparator(const void *left, const void *right); +static int tbm_shared_comparator(const void *left, const void *right, + void *arg); +static void *tbm_alloc_shared(Size size, void *arg); +static void tbm_free_shared(void *pointer, void *arg); /* * Simple inline murmur hash implementation for the exact width required, for @@ -198,7 +254,6 @@ hash_blockno(BlockNumber b) #define SH_DECLARE #include "lib/simplehash.h" - /* * tbm_create - create an initially-empty bitmap * @@ -244,7 +299,11 @@ tbm_create_pagetable(TIDBitmap *tbm) Assert(tbm->status != TBM_HASH); Assert(tbm->pagetable == NULL); - tbm->pagetable = pagetable_create(tbm->mcxt, 128, NULL, NULL, NULL); + if (tbm->is_shared) + tbm->pagetable = pagetable_create(tbm->mcxt, 128, tbm_alloc_shared, + tbm_free_shared, tbm); + else + tbm->pagetable = pagetable_create(tbm->mcxt, 128, NULL, NULL, NULL); /* If entry1 is valid, push it into the hashtable */ if (tbm->status == TBM_ONE_PAGE) @@ -664,6 +723,87 @@ tbm_begin_iterate(TIDBitmap *tbm) return iterator; } +TBMIterator * +tbm_begin_shared_iterate(TIDBitmap *tbm, ParallelTBMInfo * parallel_tbm, + bool prefetch) +{ + TBMIterator *iterator; + + /* + * Create the TBMIterator struct, with enough trailing space to serve the + * needs of the TBMIterateResult sub-struct. + */ + iterator = (TBMIterator *) palloc(sizeof(TBMIterator) + + MAX_TUPLES_PER_PAGE * sizeof(OffsetNumber)); + iterator->tbm = tbm; + + if (prefetch) + iterator->shareditr = ¶llel_tbm->prefetch_iterator; + else + iterator->shareditr = ¶llel_tbm->tbmiterator; + + /* + * If we have a hashtable, create and fill the sorted page lists, unless + * we already did that for a previous iterator. Note that the lists are + * attached to the bitmap not the iterator, so they can be used by more + * than one iterator. This list is created in shared memory so that + * multiple workers can access it and perform shared iterate. + */ + if (tbm->status == TBM_HASH && !tbm->iterating) + { + pagetable_iterator i; + PagetableEntry *page; + RelptrPagetableEntry *relpages; + RelptrPagetableEntry *relchunks; + int npages; + int nchunks; + + /* + * Create page list and chunk list using relptr so that we can share + * this information across multiple workers. + */ + if (tbm->npages) + parallel_tbm->spages = dsa_allocate(tbm->area, + tbm->npages * (sizeof(RelptrPagetableEntry))); + if (tbm->nchunks) + parallel_tbm->schunks = dsa_allocate(tbm->area, + tbm->nchunks * (sizeof(RelptrPagetableEntry))); + + relpages = dsa_get_address(tbm->area, parallel_tbm->spages); + relchunks = dsa_get_address(tbm->area, parallel_tbm->schunks); + + npages = nchunks = 0; + pagetable_start_iterate(tbm->pagetable, &i); + while ((page = pagetable_iterate(tbm->pagetable, &i)) != NULL) + { + if (page->ischunk) + relptr_store(tbm->base, relchunks[nchunks++], page); + else + relptr_store(tbm->base, relpages[npages++], page); + } + + Assert(npages == tbm->npages); + Assert(nchunks == tbm->nchunks); + if (npages > 1) + qsort_arg(relpages, npages, sizeof(RelptrPagetableEntry *), + tbm_shared_comparator, (void *) tbm->base); + if (nchunks > 1) + qsort_arg(relchunks, nchunks, sizeof(RelptrPagetableEntry *), + tbm_shared_comparator, (void *) tbm->base); + + iterator->schunkbit = 0; + iterator->schunkptr = 0; + iterator->spageptr = 0; + } + + tbm->relpages = dsa_get_address(tbm->area, parallel_tbm->spages); + tbm->relchunks = dsa_get_address(tbm->area, parallel_tbm->schunks); + + tbm->iterating = true; + + return iterator; +} + /* * tbm_iterate - scan through next page of a TIDBitmap * @@ -777,6 +917,125 @@ tbm_iterate(TBMIterator *iterator) } /* + * tbm_shared_iterate - same as tbm_iterate. Only difference is it's in + * shared memory and multiple worker operate on the shared iterator + * under a lock. + */ +TBMIterateResult * +tbm_shared_iterate(TBMIterator *tbmiterator) +{ + TBMSharedIterator *iterator = tbmiterator->shareditr; + TBMIterateResult *output = &(tbmiterator->output); + TIDBitmap *tbm = tbmiterator->tbm; + + Assert(tbm->iterating); + + LWLockAcquire(&iterator->lock, LW_EXCLUSIVE); + + /* + * If lossy chunk pages remain, make sure we've advanced schunkptr/ + * schunkbit to the next set bit. + */ + while (iterator->schunkptr < tbm->nchunks) + { + PagetableEntry *chunk = + relptr_access(tbm->base, tbm->relchunks[iterator->schunkptr]); + int schunkbit = iterator->schunkbit; + + while (schunkbit < PAGES_PER_CHUNK) + { + int wordnum = WORDNUM(schunkbit); + int bitnum = BITNUM(schunkbit); + + if ((chunk->words[wordnum] & ((bitmapword) 1 << bitnum)) != 0) + break; + schunkbit++; + } + if (schunkbit < PAGES_PER_CHUNK) + { + iterator->schunkbit = schunkbit; + break; + } + /* advance to next chunk */ + iterator->schunkptr++; + iterator->schunkbit = 0; + } + + /* + * If both chunk and per-page data remain, must output the numerically + * earlier page. + */ + if (iterator->schunkptr < tbm->nchunks) + { + PagetableEntry *chunk = + relptr_access(tbm->base, tbm->relchunks[iterator->schunkptr]); + PagetableEntry *page = + relptr_access(tbm->base, tbm->relpages[iterator->spageptr]); + + BlockNumber chunk_blockno; + + chunk_blockno = chunk->blockno + iterator->schunkbit; + if (iterator->spageptr >= tbm->npages || chunk_blockno < page->blockno) + { + /* Return a lossy page indicator from the chunk */ + output->blockno = chunk_blockno; + output->ntuples = -1; + output->recheck = true; + iterator->schunkbit++; + + LWLockRelease(&iterator->lock); + return output; + } + } + + if (iterator->spageptr < tbm->npages) + { + PagetableEntry *page; + int ntuples; + int wordnum; + + /* In ONE_PAGE state, we don't allocate an spages[] array */ + if (tbm->status == TBM_ONE_PAGE) + page = &tbm->entry1; + else + page = relptr_access(tbm->base, tbm->relpages[iterator->spageptr]); + + /* scan bitmap to extract individual offset numbers */ + ntuples = 0; + for (wordnum = 0; wordnum < WORDS_PER_PAGE; wordnum++) + { + bitmapword w = page->words[wordnum]; + + if (w != 0) + { + int off = wordnum * BITS_PER_BITMAPWORD + 1; + + while (w != 0) + { + if (w & 1) + output->offsets[ntuples++] = (OffsetNumber) off; + off++; + w >>= 1; + } + } + } + output->blockno = page->blockno; + output->ntuples = ntuples; + output->recheck = page->recheck; + + iterator->spageptr++; + + LWLockRelease(&iterator->lock); + return output; + } + + LWLockRelease(&iterator->lock); + + /* Nothing more in the bitmap */ + return NULL; +} + +/* * tbm_end_iterate - finish an iteration over a TIDBitmap * * Currently this is just a pfree, but it might do more someday. (For @@ -1061,3 +1320,169 @@ tbm_comparator(const void *left, const void *right) return 1; return 0; } + +/* + * same as tbm_comparator, Only difference is that this will get relptr + * of PagetableEntry and it need to get actual PagetableEntry using relptr. + */ +static int +tbm_shared_comparator(const void *left, const void *right, void *arg) +{ + PagetableEntry *lpage; + PagetableEntry *rpage; + + lpage = relptr_access((char *) arg, *((RelptrPagetableEntry *) left)); + rpage = relptr_access((char *) arg, *((RelptrPagetableEntry *) right)); + + if (lpage->blockno < rpage->blockno) + return -1; + else if (lpage->blockno > rpage->blockno) + return 1; + return 0; +} + +/* + * tbm_update_shared_members + * + * Store leader's private tbm state to shared location. This must + * be called before waking up other workers. + */ +void +tbm_update_shared_members(TIDBitmap *tbm, ParallelTBMInfo * parallel_tbm) +{ + /* + * Copy private information to shared location before waking up the other + * workers. + */ + parallel_tbm->maxentries = tbm->maxentries; + parallel_tbm->nchunks = tbm->nchunks; + parallel_tbm->nentries = tbm->nentries; + parallel_tbm->npages = tbm->npages; + parallel_tbm->dsa_data = tbm->dsa_data; + parallel_tbm->dsa_entries = tbm->dsa_entries; + + tbm->base = dsa_get_address(tbm->area, tbm->dsa_data); +} + +/* + * tbm_set_parallel + * + * Mark tidbitmap as shared and also store DSA area in it. + * marking tidbitmap as shared is indication that it should create pagetable + * in shared memory using DSA. + */ +void +tbm_set_parallel(TIDBitmap *tbm, void *area) +{ + tbm->is_shared = true; + tbm->area = (dsa_area *) area; +} + +/* + * tbm_estimate_parallel_tbminfo + * + * Estimate size of ParallelTBMInfo. + */ +Size +tbm_estimate_parallel_tbminfo(Size size) +{ + return add_size(size, sizeof(ParallelTBMInfo)); +} + +/* + * tbm_init_shared_iterator + * + * Initializes the lwlocks for the shared iterators. + */ +void +tbm_init_shared_iterator(ParallelTBMInfo * tbminfo) +{ + LWLockInitialize(&tbminfo->tbmiterator.lock, + LWTRANCHE_PARALLEL_BITMAP_ITERATOR); + + LWLockInitialize(&tbminfo->prefetch_iterator.lock, + LWTRANCHE_PARALLEL_BITMAP_ITERATOR); +} + +/* + * tbm_attach + * + * Create a local TIDBitmap for worker and Attach it to shared TID bitmap + * created by leader. + */ +TIDBitmap * +tbm_attach(ParallelTBMInfo * parallel_tbm, void *area) +{ + TIDBitmap *tbm = makeNode(TIDBitmap); + + tbm->mcxt = CurrentMemoryContext; + tbm->status = TBM_HASH; + tbm->nchunks = parallel_tbm->nchunks; + tbm->nentries = parallel_tbm->nentries; + tbm->npages = parallel_tbm->npages; + tbm->maxentries = parallel_tbm->maxentries; + tbm->dsa_data = parallel_tbm->dsa_data; + tbm->dsa_entries = parallel_tbm->dsa_entries; + tbm->base = dsa_get_address(area, tbm->dsa_data); + tbm->iterating = true; + + /* Mark the tbm as parallel and also keep the DSA reference in it. */ + tbm_set_parallel(tbm, area); + + return tbm; +} + +/* + * tbm_alloc_shared + * + * Memory allocation function for bitmap hash. + * It allocates memory from DSA and also stores dsa_pointer in the memory + * so that during free we can directly get the dsa_pointer and free it. + */ +static void * +tbm_alloc_shared(Size size, void *arg) +{ + TIDBitmap *tbm = arg; + dsa_pointer dsaptr; + char *ptr; + + /* Add the size for storing dsa_pointer */ + dsaptr = dsa_allocate(tbm->area, size + sizeof(dsa_pointer)); + + tbm->dsa_data = dsaptr; + + /* Keep track of actual number of entries */ + if (tbm->pagetable) + tbm->dsa_entries = tbm->pagetable->size; + + ptr = dsa_get_address(tbm->area, dsaptr); + memset(ptr, 0, size + sizeof(dsa_pointer)); + + /* Store dsa_pointer */ + *((dsa_pointer *) ptr) = dsaptr; + + return (ptr + sizeof(dsa_pointer)); +} + +/* + * tbm_free_shared + * + * Memory free function for tid bitmap + */ +static void +tbm_free_shared(void *pointer, void *arg) +{ + TIDBitmap *tbm = arg; + dsa_pointer dsa_data; + + /* + * If TBM is in iterating phase means pagetable was already created and we + * have come here during tbm_free. So we need not to do anything because + * by this time DSA would have been already freed. + */ + if (tbm->iterating) + return; + + dsa_data = *((dsa_pointer *) ((char *) pointer - sizeof(dsa_pointer))); + dsa_free(tbm->area, dsa_data); +} diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 5c18987..a2f6fbb 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -2875,6 +2875,31 @@ remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel) } /* + * create_partial_bitmap_paths + * Build partial access paths for parallel scan of a plain relation + */ +void +create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, + Path *bitmapqual) +{ + int parallel_workers; + double pages_fetched; + + /* Compute heap pages for bitmap heap scan */ + pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0, + NULL, NULL); + + parallel_workers = compute_parallel_worker(rel, pages_fetched); + + /* If any limit was set to zero, the user doesn't want a parallel scan. */ + if (parallel_workers <= 0) + return; + + add_partial_path(rel, (Path *) create_bitmap_heap_path(root, rel, + bitmapqual, rel->lateral_relids, 1.0, parallel_workers)); +} + +/* * Compute the number of parallel workers that should be used to scan a * relation. "pages" is the number of pages from the relation that we * expect to scan. diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index a43daa7..f81e469 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -816,6 +816,7 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel, QualCost qpqual_cost; Cost cpu_per_tuple; Cost cost_per_page; + Cost cpu_run_cost; double tuples_fetched; double pages_fetched; double spc_seq_page_cost, @@ -877,8 +878,32 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel, startup_cost += qpqual_cost.startup; cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple; + cpu_run_cost = cpu_per_tuple * tuples_fetched; - run_cost += cpu_per_tuple * tuples_fetched; + /* Adjust costing for parallelism, if used. */ + if (path->parallel_workers > 0) + { + double parallel_divisor = get_parallel_divisor(path); + + /* The CPU cost is divided among all the workers. */ + cpu_run_cost /= parallel_divisor; + + /* + * It may be possible to amortize some of the I/O cost, but probably + * not very much, because most operating systems already do aggressive + * prefetching. For now, we assume that the disk run cost can't be + * amortized at all. + */ + + /* + * In the case of a parallel plan, the row count needs to represent + * the number of tuples processed per worker. + */ + path->rows = clamp_row_est(path->rows / parallel_divisor); + } + + + run_cost += cpu_run_cost; /* tlist eval costs are paid per output row, not per tuple scanned */ startup_cost += path->pathtarget->cost.startup; diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c index 5283468..630c501 100644 --- a/src/backend/optimizer/path/indxpath.c +++ b/src/backend/optimizer/path/indxpath.c @@ -337,8 +337,12 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel) bitmapqual = choose_bitmap_and(root, rel, bitindexpaths); bpath = create_bitmap_heap_path(root, rel, bitmapqual, - rel->lateral_relids, 1.0); + rel->lateral_relids, 1.0, 0); add_path(rel, (Path *) bpath); + + /* create a partial bitmap heap path */ + if (rel->consider_parallel && rel->lateral_relids == NULL) + create_partial_bitmap_paths(root, rel, bitmapqual); } /* @@ -410,7 +414,7 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel) required_outer = get_bitmap_tree_required_outer(bitmapqual); loop_count = get_loop_count(root, rel->relid, required_outer); bpath = create_bitmap_heap_path(root, rel, bitmapqual, - required_outer, loop_count); + required_outer, loop_count, 0); add_path(rel, (Path *) bpath); } } @@ -1557,6 +1561,11 @@ bitmap_scan_cost_est(PlannerInfo *root, RelOptInfo *rel, Path *ipath) bpath.path.pathkeys = NIL; bpath.bitmapqual = ipath; + /* + * Check the cost of temporary path without considering parallelism. + * Parallel bitmap heap path will be considered at later stage. + */ + bpath.path.parallel_workers = 0; cost_bitmap_heap_scan(&bpath.path, root, rel, bpath.path.param_info, ipath, @@ -1599,6 +1608,12 @@ bitmap_and_cost_est(PlannerInfo *root, RelOptInfo *rel, List *paths) bpath.path.pathkeys = NIL; bpath.bitmapqual = (Path *) &apath; + /* + * Check the cost of temporary path without considering parallelism. + * Parallel bitmap heap path will be considered at later stage. + */ + bpath.path.parallel_workers = 0; + /* Now we can do cost_bitmap_heap_scan */ cost_bitmap_heap_scan(&bpath.path, root, rel, bpath.path.param_info, diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index fae1f67..221b5f1 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -2834,7 +2834,7 @@ create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual, plan->plan_rows = clamp_row_est(ipath->indexselectivity * ipath->path.parent->tuples); plan->plan_width = 0; /* meaningless */ - plan->parallel_aware = false; + plan->parallel_aware = bitmapqual->parallel_aware; *qual = get_actual_clauses(ipath->indexclauses); *indexqual = get_actual_clauses(ipath->indexquals); foreach(l, ipath->indexinfo->indpred) diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index f440875..20b9a59 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1071,7 +1071,8 @@ create_bitmap_heap_path(PlannerInfo *root, RelOptInfo *rel, Path *bitmapqual, Relids required_outer, - double loop_count) + double loop_count, + int parallel_degree) { BitmapHeapPath *pathnode = makeNode(BitmapHeapPath); @@ -1080,9 +1081,9 @@ create_bitmap_heap_path(PlannerInfo *root, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); - pathnode->path.parallel_aware = false; + pathnode->path.parallel_aware = parallel_degree > 0 ? true : false; pathnode->path.parallel_safe = rel->consider_parallel; - pathnode->path.parallel_workers = 0; + pathnode->path.parallel_workers = parallel_degree; pathnode->path.pathkeys = NIL; /* always unordered */ pathnode->bitmapqual = bitmapqual; @@ -3258,7 +3259,7 @@ reparameterize_path(PlannerInfo *root, Path *path, rel, bpath->bitmapqual, required_outer, - loop_count); + loop_count, 0); } case T_SubqueryScan: { diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 7176cf1..f8eef0b 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3392,6 +3392,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_PARALLEL_FINISH: event_name = "ParallelFinish"; break; + case WAIT_EVENT_PARALLEL_BITMAP_SCAN: + event_name = "ParallelBitmapScan"; + break; case WAIT_EVENT_SAFE_SNAPSHOT: event_name = "SafeSnapshot"; break; diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index ee7e05a..d7ebac4 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -178,6 +178,7 @@ extern void simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup); extern void heap_sync(Relation relation); +extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot); /* in heap/pruneheap.c */ extern void heap_page_prune_opt(Relation relation, Buffer buffer); diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h index d7659b9..465c58e 100644 --- a/src/include/executor/nodeBitmapHeapscan.h +++ b/src/include/executor/nodeBitmapHeapscan.h @@ -15,10 +15,17 @@ #define NODEBITMAPHEAPSCAN_H #include "nodes/execnodes.h" +#include "access/parallel.h" extern BitmapHeapScanState *ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags); extern TupleTableSlot *ExecBitmapHeapScan(BitmapHeapScanState *node); extern void ExecEndBitmapHeapScan(BitmapHeapScanState *node); extern void ExecReScanBitmapHeapScan(BitmapHeapScanState *node); +extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node, + ParallelContext *pcxt); +extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, + ParallelContext *pcxt); +extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, + shm_toc *toc); #endif /* NODEBITMAPHEAPSCAN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index f9bcdd6..59a3971 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -26,6 +26,8 @@ #include "utils/sortsupport.h" #include "utils/tuplestore.h" #include "utils/tuplesort.h" +#include "nodes/tidbitmap.h" +#include "storage/condition_variable.h" /* ---------------- @@ -1423,6 +1425,61 @@ typedef struct IndexOnlyScanState long ioss_HeapFetches; } IndexOnlyScanState; + +/* ---------------- + * PBMState information : Current status of the TIDBitmap creation during + * parallel bitmap heap scan. + * + * PBM_INITIAL TIDBitmap creation is not yet started, so + * first worker to see this state will become + * leader and will create TIDbitmap. This will + * also set the state to PBM_INPROGRESS. + * PBM_INPROGRESS TIDBitmap creation is already in progress, + * so workers need to sleep until leader set the + * state to PBM_FINISHED and wake us up. + * PBM_FINISHED TIDBitmap creation is done, so now all worker + * can proceed to iterate over TIDBitmap. + * ---------------- + */ +typedef enum +{ + PBM_INITIAL, + PBM_INPROGRESS, + PBM_FINISHED +} PBMState; + +/* ---------------- + * ParallelBitmapInfo information + * relid OID of relation to scan + * tbmiterator main iterator + * prefetch_mutex mutual exclusion for prefetch members + * (prefetch_iterator, prefetch_pages and + * prefetch_target) + * prefetch_iterator iterator for scanning ahead of current pages + * prefetch_pages # pages prefetch iterator is ahead of current + * prefetch_target current target prefetch distance + * state_mutex mutual exclusion for state field + * cv conditional wait variable + * state current state of the TIDBitmap + * ptbm_offset offset in bytes of ParallelTIDBitmap. + * phs_snapshot_data snapshot data shared to worker + * ---------------- + */ +typedef struct ParallelBitmapInfo +{ + Oid relid; + slock_t prefetch_mutex; + slock_t state_mutex; + int prefetch_pages; + int prefetch_target; + ConditionVariable cv; + PBMState state; + Size tbm_offset; + Size itr_offset; + Size prefetchitr_offset; + char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelBitmapInfo; + /* ---------------- * BitmapIndexScanState information * @@ -1437,6 +1494,8 @@ typedef struct IndexOnlyScanState * RuntimeContext expr context for evaling runtime Skeys * RelationDesc index relation descriptor * ScanDesc index scan descriptor + * Parallel Under parallel Bitmap heap so need to create shared + * TIDBitmap * ---------------- */ typedef struct BitmapIndexScanState @@ -1453,6 +1512,7 @@ typedef struct BitmapIndexScanState ExprContext *biss_RuntimeContext; Relation biss_RelationDesc; IndexScanDesc biss_ScanDesc; + bool biss_Parallel; } BitmapIndexScanState; /* ---------------- @@ -1468,7 +1528,9 @@ typedef struct BitmapIndexScanState * prefetch_pages # pages prefetch iterator is ahead of current * prefetch_target current target prefetch distance * prefetch_maximum maximum value for prefetch_target - * ---------------- + * pscan_len size of the shared memory for parallel bitmap + * parallel_bitmap shared memory for parallel bitmap scan + *---------------- */ typedef struct BitmapHeapScanState { @@ -1483,6 +1545,8 @@ typedef struct BitmapHeapScanState int prefetch_pages; int prefetch_target; int prefetch_maximum; + Size pscan_len; + ParallelBitmapInfo *parallel_bitmap; } BitmapHeapScanState; /* ---------------- diff --git a/src/include/nodes/tidbitmap.h b/src/include/nodes/tidbitmap.h index 14992e0..647d7f4 100644 --- a/src/include/nodes/tidbitmap.h +++ b/src/include/nodes/tidbitmap.h @@ -33,6 +33,7 @@ typedef struct TIDBitmap TIDBitmap; /* Likewise, TBMIterator is private */ typedef struct TBMIterator TBMIterator; +typedef struct TBMSharedIterator TBMSharedIterator; /* Result structure for tbm_iterate */ typedef struct @@ -44,6 +45,11 @@ typedef struct OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]; } TBMIterateResult; +/* + * Holds shared members of TIDBitmap for parallel bitmap scan. + */ +typedef struct ParallelTBMInfo ParallelTBMInfo; + /* function prototypes in nodes/tidbitmap.c */ extern TIDBitmap *tbm_create(long maxbytes); @@ -62,5 +68,16 @@ extern bool tbm_is_empty(const TIDBitmap *tbm); extern TBMIterator *tbm_begin_iterate(TIDBitmap *tbm); extern TBMIterateResult *tbm_iterate(TBMIterator *iterator); extern void tbm_end_iterate(TBMIterator *iterator); +extern void tbm_restore_shared_members(TIDBitmap *tbm, + ParallelTBMInfo * stbm); +extern void tbm_update_shared_members(TIDBitmap *tbm, + ParallelTBMInfo * parallel_tbm); +void tbm_set_parallel(TIDBitmap *tbm, void *area); +extern Size tbm_estimate_parallel_tbminfo(Size size); +TIDBitmap *tbm_attach(ParallelTBMInfo * parallel_tbm, void *area); +TBMIterator *tbm_begin_shared_iterate(TIDBitmap *tbm, + ParallelTBMInfo * parallel_tbm, bool prefetch); +TBMIterateResult *tbm_shared_iterate(TBMIterator *iterator); +void tbm_init_shared_iterator(ParallelTBMInfo * tbminfo); #endif /* TIDBITMAP_H */ diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 7b41317..cfef818 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -52,7 +52,8 @@ extern BitmapHeapPath *create_bitmap_heap_path(PlannerInfo *root, RelOptInfo *rel, Path *bitmapqual, Relids required_outer, - double loop_count); + double loop_count, + int parallel_degree); extern BitmapAndPath *create_bitmap_and_path(PlannerInfo *root, RelOptInfo *rel, List *bitmapquals); diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 81a9be7..968d39a 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -53,6 +53,8 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels); extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel); +extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, + Path *bitmapqual); #ifdef OPTIMIZER_DEBUG extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index de8225b..c52c864 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -786,6 +786,7 @@ typedef enum WAIT_EVENT_MQ_RECEIVE, WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, + WAIT_EVENT_PARALLEL_BITMAP_SCAN, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP } WaitEventIPC; diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 8bd93c3..f1a2734 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -212,6 +212,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, LWTRANCHE_PARALLEL_QUERY_DSA, + LWTRANCHE_PARALLEL_BITMAP_ITERATOR, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds;