diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 22b3f5f..3f08b79 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1649,7 +1649,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, * ---------------- */ HeapScanDesc -heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) +heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan, + int nkeys, ScanKey key) { Snapshot snapshot; @@ -1657,7 +1658,7 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); RegisterSnapshot(snapshot); - return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, + return heap_beginscan_internal(relation, snapshot, nkeys, key, parallel_scan, true, true, true, false, false, true); } diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c index fb0013d..3a12ec4 100644 --- a/src/backend/executor/execScan.c +++ b/src/backend/executor/execScan.c @@ -205,7 +205,7 @@ ExecScan(ScanState *node, * when the qual is nil ... saves only a few cycles, but they add up * ... */ - if (!qual || ExecQual(qual, econtext, false)) + if (!node->ps.qual || ExecQual(node->ps.qual, econtext, false)) { /* * Found a satisfactory scan tuple. diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 00bf3a5..af716e2 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -33,6 +33,9 @@ static void InitScanRelation(SeqScanState *node, EState *estate, int eflags); static TupleTableSlot *SeqNext(SeqScanState *node); +static void get_scankey_from_qual(List *qual, PlanState *ps, int *nkeys, + ScanKey *rs_keys); +static bool extract_var_and_const(List *args, Var **var, Const **cons); /* ---------------------------------------------------------------- * Scan Support @@ -64,13 +67,23 @@ SeqNext(SeqScanState *node) if (scandesc == NULL) { + int nkeys = 0; + ScanKey keys = NULL; + + /* + * Pull out scan key from qual list + */ + get_scankey_from_qual(node->ss.ps.plan->qual, &node->ss.ps, + &nkeys, &keys); + /* * We reach here if the scan is not parallel, or if we're executing a * scan that was intended to be parallel serially. */ scandesc = heap_beginscan(node->ss.ss_currentRelation, estate->es_snapshot, - 0, NULL); + nkeys, keys); + node->ss.ss_currentScanDesc = scandesc; } @@ -317,14 +330,24 @@ ExecSeqScanInitializeDSM(SeqScanState *node, { EState *estate = node->ss.ps.state; ParallelHeapScanDesc pscan; + int nkeys = 0; + ScanKey keys = NULL; + + /* + * Pull out scan key from qual list + */ + get_scankey_from_qual(node->ss.ps.plan->qual, &node->ss.ps, + &nkeys, &keys); + pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); heap_parallelscan_initialize(pscan, node->ss.ss_currentRelation, estate->es_snapshot); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); - node->ss.ss_currentScanDesc = - heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); + node->ss.ss_currentScanDesc = heap_beginscan_parallel( + node->ss.ss_currentRelation, pscan, + nkeys, keys); } /* ---------------------------------------------------------------- @@ -337,8 +360,124 @@ void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc) { ParallelHeapScanDesc pscan; + int nkeys = 0; + ScanKey keys = NULL; pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); - node->ss.ss_currentScanDesc = - heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); + + /* + * Pull out scan key from qual list + */ + get_scankey_from_qual(node->ss.ps.plan->qual, &node->ss.ps, + &nkeys, &keys); + + node->ss.ss_currentScanDesc = heap_beginscan_parallel( + node->ss.ss_currentRelation, pscan, + nkeys, keys); +} + +/* + * extract_var_and_const + * + * Get var and const expression from arg list. + */ +static bool +extract_var_and_const(List *args, Var **var, Const **cons) +{ + Expr *larg; + Expr *rarg; + + /* for POC only supports var op const so only 2 arguments */ + if (list_length(args) != 2) + return false; + + larg = linitial(args); + rarg = lsecond(args); + + if (IsA(larg, Var) || IsA(larg, RelabelType)) + { + if (IsA(larg, Var)) + *var = (Var*)larg; + else if (IsA(((RelabelType*)larg)->arg, Var)) + *var = (Var*)((RelabelType*)larg)->arg; + else + return false; + + if (!IsA(rarg, Const)) + return false; + + *cons = (Const*)rarg; + } + else + return false; + + return true; +} + +/* + * get_scankey_from_qual + * + * Currently in POC we only supports the qual of type (var op const) + * so that we can use exising heap scan key pushdown framework. + */ +static void +get_scankey_from_qual(List *qual, PlanState *ps, int *nkeys, + ScanKey *rs_keys) +{ + ListCell *l, *l1; + Expr *expr; + ListCell *prev = NULL; + ScanKey key; + Var *var; + Const *con; + + /* + * Validate each qual whether this qual can be push down to heap node + * or not, If this can be pushed down then create a ScanKey entry + * and delete it from qual list of PlanState + */ + forboth(l, qual, l1, ps->qual) + { + expr = (Expr *) lfirst(l); + + if (IsA(expr, OpExpr)) + { + OpExpr *opexpr = (OpExpr *) expr; + + /* Extract the var and const expression from arg list. */ + if (!extract_var_and_const(opexpr->args, &var, &con)) + { + prev = l1; + continue; + } + + /* We don't yet have memory for key */ + if (*rs_keys == NULL) + { + *rs_keys = (ScanKey) palloc(sizeof(ScanKeyData) * + list_length(qual)); + key = *rs_keys; + } + + /* Create scan key entry */ + ScanKeyInit(key, + var->varattno, + BTEqualStrategyNumber, + opexpr->opfuncid, + con->constvalue); + + key->sk_collation = opexpr->inputcollid; + + key++; + (*nkeys)++; + + /* Only one qual in list so set it to NULL */ + if (list_length(ps->qual) == 1) + ps->qual = NIL; + else + list_delete_cell(ps->qual, l1, prev); + } + + prev = l1; + } } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index b3a595c..7288555 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -130,7 +130,9 @@ extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); extern Size heap_parallelscan_estimate(Snapshot snapshot); extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, Snapshot snapshot); -extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); +extern HeapScanDesc heap_beginscan_parallel(Relation relation, + ParallelHeapScanDesc parallel_scan, int nkeys, + ScanKey key); extern bool heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, bool keep_buf,