From ec809e08fe59ffc3eaee772d2269cb47f365c0a6 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 7 Feb 2024 12:47:51 -0500 Subject: [PATCH v4 2/2] Index scans prefetch with streaming read API --- src/backend/access/heap/heapam_handler.c | 55 +++++++- src/backend/access/index/indexam.c | 104 ++++++++++++++- src/backend/executor/execMain.c | 4 + src/backend/executor/nodeIndexonlyscan.c | 153 ++++++++++++++++------- src/backend/executor/nodeIndexscan.c | 61 ++++++++- src/backend/optimizer/plan/createplan.c | 1 + src/backend/optimizer/plan/planner.c | 3 + src/backend/storage/aio/streaming_read.c | 11 +- src/include/access/genam.h | 41 ++++++ src/include/access/relscan.h | 11 ++ src/include/nodes/execnodes.h | 1 + src/include/nodes/pathnodes.h | 1 + src/include/nodes/plannodes.h | 1 + src/include/storage/streaming_read.h | 2 + src/tools/pgindent/typedefs.list | 2 + 15 files changed, 401 insertions(+), 50 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be..03e6b522a8 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -108,6 +108,12 @@ heapam_index_fetch_end(IndexFetchTableData *scan) pfree(hscan); } +/* + * For those using the streaming read user, tid is an output parameter set with + * the latest TID obtained from the streaming read API. For non-streaming read + * users, tid is an input parameter and contains the next block to be read from + * the heap. + */ static bool heapam_index_fetch_tuple(struct IndexFetchTableData *scan, ItemPointer tid, @@ -127,9 +133,52 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan, /* Switch to correct buffer if we don't have it already */ Buffer prev_buf = hscan->xs_cbuf; - hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, - hscan->xs_base.rel, - ItemPointerGetBlockNumber(tid)); + if (scan->pgsr) + { + TIDQueueItem *result; + + if (BufferIsValid(hscan->xs_cbuf)) + ReleaseBuffer(hscan->xs_cbuf); + + hscan->xs_cbuf = pg_streaming_read_buffer_get_next(scan->pgsr, (void **) &result); + if (!BufferIsValid(hscan->xs_cbuf)) + { + /* + * Invalidate the item pointer to allow the caller to + * distinguish between index_fetch_heap() returning false + * because the tuple is not visible and because the streaming + * read callback ran out of queue items. + */ + ItemPointerSetInvalid(tid); + return false; + } + + /* Set this for use below */ + *tid = result->tid; + + scan->tid = result->tid; + scan->recheck = result->recheck; + + if (scan->itup) + pfree(scan->itup); + scan->itup = NULL; + + if (scan->htup) + pfree(scan->htup); + scan->htup = NULL; + + if (result->itup) + scan->itup = CopyIndexTuple(result->itup); + if (result->htup) + scan->htup = heap_copytuple(result->htup); + } + else + { + hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, + hscan->xs_base.rel, + ItemPointerGetBlockNumber(tid)); + } + /* * Prune page, but only if we weren't already on this page diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index bbd499abcf..0bf50bcd83 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -248,6 +248,91 @@ index_insert_cleanup(Relation indexRelation, indexRelation->rd_indam->aminsertcleanup(indexInfo); } +void +tid_queue_reset(TIDQueue *q) +{ + q->head = q->tail = 0; +} + +TIDQueue * +tid_queue_alloc(int size) +{ + TIDQueue *result; + + result = palloc(sizeof(TIDQueue) + (sizeof(TIDQueueItem) * size)); + result->size = size; + tid_queue_reset(result); + return result; +} + +static TIDQueueItem +index_tid_dequeue(TIDQueue *tid_queue) +{ + TIDQueueItem result; + + Assert(tid_queue->tail > tid_queue->head); + result = tid_queue->data[tid_queue->head % tid_queue->size]; + tid_queue->head++; + + return result; +} + +void +index_tid_enqueue(TIDQueue *tid_queue, ItemPointer tid, bool recheck, + HeapTuple htup, IndexTuple itup) +{ + TIDQueueItem *cur; + + Assert(tid_queue->tail >= tid_queue->head); + Assert(!TID_QUEUE_FULL(tid_queue)); + cur = &tid_queue->data[tid_queue->tail % tid_queue->size]; + ItemPointerSet(&cur->tid, + ItemPointerGetBlockNumber(tid), ItemPointerGetOffsetNumber(tid)); + cur->recheck = recheck; + cur->itup = NULL; + cur->htup = NULL; + + if (itup) + cur->itup = CopyIndexTuple(itup); + + if (htup) + cur->htup = heap_copytuple(htup); + + tid_queue->tail++; +} + +static BlockNumber +index_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, void *per_buffer_data) +{ + IndexScanDesc scan = pgsr_private; + TIDQueueItem *result = per_buffer_data; + + scan->kill_prior_tuple = false; + scan->xs_heap_continue = false; + + if (TID_QUEUE_EMPTY(scan->tid_queue)) + return InvalidBlockNumber; + + *result = index_tid_dequeue(scan->tid_queue); + return ItemPointerGetBlockNumber(&result->tid); +} + +void +index_pgsr_alloc(IndexScanDesc scan) +{ + if (scan->xs_heapfetch->pgsr) + pg_streaming_read_free(scan->xs_heapfetch->pgsr); + scan->xs_heapfetch->pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT, + scan, + sizeof(TIDQueueItem), + NULL, + BMR_REL(scan->heapRelation), + MAIN_FORKNUM, + index_pgsr_next_single); + + pg_streaming_read_set_resumable(scan->xs_heapfetch->pgsr); +} + /* * index_beginscan - start a scan of an index with amgettuple * @@ -333,6 +418,8 @@ index_beginscan_internal(Relation indexRelation, /* Initialize information for parallel scan. */ scan->parallel_scan = pscan; scan->xs_temp_snap = temp_snap; + scan->index_done = false; + scan->tid_queue = NULL; return scan; } @@ -364,8 +451,12 @@ index_rescan(IndexScanDesc scan, if (scan->xs_heapfetch) table_index_fetch_reset(scan->xs_heapfetch); + scan->index_done = false; + scan->kill_prior_tuple = false; /* for safety */ scan->xs_heap_continue = false; + if (scan->tid_queue) + tid_queue_reset(scan->tid_queue); scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys, orderbys, norderbys); @@ -384,10 +475,17 @@ index_endscan(IndexScanDesc scan) /* Release resources (like buffer pins) from table accesses */ if (scan->xs_heapfetch) { + if (scan->xs_heapfetch->pgsr) + pg_streaming_read_free(scan->xs_heapfetch->pgsr); + scan->xs_heapfetch->pgsr = NULL; table_index_fetch_end(scan->xs_heapfetch); scan->xs_heapfetch = NULL; } + if (scan->tid_queue) + pfree(scan->tid_queue); + scan->tid_queue = NULL; + /* End the AM's scan */ scan->indexRelation->rd_indam->amendscan(scan); @@ -530,6 +628,9 @@ index_parallelrescan(IndexScanDesc scan) if (scan->xs_heapfetch) table_index_fetch_reset(scan->xs_heapfetch); + if (scan->tid_queue) + tid_queue_reset(scan->tid_queue); + /* amparallelrescan is optional; assume no-op if not provided by AM */ if (scan->indexRelation->rd_indam->amparallelrescan != NULL) scan->indexRelation->rd_indam->amparallelrescan(scan); @@ -603,6 +704,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction) if (scan->xs_heapfetch) table_index_fetch_reset(scan->xs_heapfetch); + scan->index_done = true; return NULL; } Assert(ItemPointerIsValid(&scan->xs_heaptid)); @@ -651,7 +753,7 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot) * recovery because it may violate MVCC to do so. See comments in * RelationGetIndexScan(). */ - if (!scan->xactStartedInRecovery) + if (!scan->xactStartedInRecovery && !scan->xs_heapfetch->pgsr) scan->kill_prior_tuple = all_dead; return found; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 13a9b7da83..9e951a69ab 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1652,6 +1652,10 @@ ExecutePlan(EState *estate, if (!execute_once) use_parallel_mode = false; + estate->es_use_prefetching = execute_once; + if (!planstate->plan->allow_prefetch) + estate->es_use_prefetching = false; + estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) EnterParallelMode(); diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index 2c2c9c10b5..e3824c25e0 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -113,61 +113,109 @@ IndexOnlyNext(IndexOnlyScanState *node) node->ioss_NumOrderByKeys); } + if (!scandesc->tid_queue) + { + /* Fall back to a queue size of 1 for now */ + int queue_size = 1; + + if (estate->es_use_prefetching && ScanDirectionIsForward(direction)) + queue_size = TID_QUEUE_SIZE; + scandesc->tid_queue = tid_queue_alloc(queue_size); + index_pgsr_alloc(scandesc); + } + /* * OK, now that we have what we need, fetch the next tuple. */ - while ((tid = index_getnext_tid(scandesc, direction)) != NULL) + for (;;) { - bool tuple_from_heap = false; + bool tuple_from_heap = true; CHECK_FOR_INTERRUPTS(); - /* - * We can skip the heap fetch if the TID references a heap page on - * which all tuples are known visible to everybody. In any case, - * we'll use the index tuple not the heap tuple as the data source. - * - * Note on Memory Ordering Effects: visibilitymap_get_status does not - * lock the visibility map buffer, and therefore the result we read - * here could be slightly stale. However, it can't be stale enough to - * matter. - * - * We need to detect clearing a VM bit due to an insert right away, - * because the tuple is present in the index page but not visible. The - * reading of the TID by this scan (using a shared lock on the index - * buffer) is serialized with the insert of the TID into the index - * (using an exclusive lock on the index buffer). Because the VM bit - * is cleared before updating the index, and locking/unlocking of the - * index page acts as a full memory barrier, we are sure to see the - * cleared bit if we see a recently-inserted TID. - * - * Deletes do not update the index page (only VACUUM will clear out - * the TID), so the clearing of the VM bit by a delete is not - * serialized with this test below, and we may see a value that is - * significantly stale. However, we don't care about the delete right - * away, because the tuple is still visible until the deleting - * transaction commits or the statement ends (if it's our - * transaction). In either case, the lock on the VM buffer will have - * been released (acting as a write barrier) after clearing the bit. - * And for us to have a snapshot that includes the deleting - * transaction (making the tuple invisible), we must have acquired - * ProcArrayLock after that time, acting as a read barrier. - * - * It's worth going through this complexity to avoid needing to lock - * the VM buffer, which could cause significant contention. - */ - if (!VM_ALL_VISIBLE(scandesc->heapRelation, - ItemPointerGetBlockNumber(tid), - &node->ioss_VMBuffer)) + if (!scandesc->index_done) + { + while (!TID_QUEUE_FULL(scandesc->tid_queue)) + { + if ((tid = index_getnext_tid(scandesc, direction)) == NULL) + { + scandesc->index_done = true; + break; + } + + /* + * We can skip the heap fetch if the TID references a heap + * page on which all tuples are known visible to everybody. In + * any case, we'll use the index tuple not the heap tuple as + * the data source. + * + * Note on Memory Ordering Effects: visibilitymap_get_status + * does not lock the visibility map buffer, and therefore the + * result we read here could be slightly stale. However, it + * can't be stale enough to matter. + * + * We need to detect clearing a VM bit due to an insert right + * away, because the tuple is present in the index page but + * not visible. The reading of the TID by this scan (using a + * shared lock on the index buffer) is serialized with the + * insert of the TID into the index (using an exclusive lock + * on the index buffer). Because the VM bit is cleared before + * updating the index, and locking/unlocking of the index page + * acts as a full memory barrier, we are sure to see the + * cleared bit if we see a recently-inserted TID. + * + * Deletes do not update the index page (only VACUUM will + * clear out the TID), so the clearing of the VM bit by a + * delete is not serialized with this test below, and we may + * see a value that is significantly stale. However, we don't + * care about the delete right away, because the tuple is + * still visible until the deleting transaction commits or the + * statement ends (if it's our transaction). In either case, + * the lock on the VM buffer will have been released (acting + * as a write barrier) after clearing the bit. And for us to + * have a snapshot that includes the deleting transaction + * (making the tuple invisible), we must have acquired + * ProcArrayLock after that time, acting as a read barrier. + * + * It's worth going through this complexity to avoid needing + * to lock the VM buffer, which could cause significant + * contention. + */ + + if (VM_ALL_VISIBLE(scandesc->heapRelation, + ItemPointerGetBlockNumber(tid), + &node->ioss_VMBuffer)) + { + tuple_from_heap = false; + break; + } + + index_tid_enqueue(scandesc->tid_queue, tid, scandesc->xs_recheck, + scandesc->xs_hitup, scandesc->xs_itup); + } + } + + if (tuple_from_heap) { /* * Rats, we have to visit the heap to check visibility. */ InstrCountTuples2(node, 1); + if (!index_fetch_heap(scandesc, node->ioss_TableSlot)) - continue; /* no visible tuple, try next index entry */ + { + /* + * Either there is no visible tuple or the streaming read ran + * out of queue items and it is time to add more. + */ + if (ItemPointerIsValid(&scandesc->xs_heaptid)) + continue; - ExecClearTuple(node->ioss_TableSlot); + if (!scandesc->index_done) + continue; + + break; + } /* * Only MVCC snapshots are supported here, so there should be no @@ -185,15 +233,32 @@ IndexOnlyNext(IndexOnlyScanState *node) * entry might require a visit to the same heap page. */ - tuple_from_heap = true; + /* + * If we visit the underlying table, we need to reset the + * IndexScanDesc's fields to match the per tuple state returned by + * the streaming read API. The most recent index tuple fetched + * will not necessarily match the current TID being processed + * after returning from index_fetch_heap(). + */ + scandesc->xs_recheck = scandesc->xs_heapfetch->recheck; + scandesc->xs_heaptid = scandesc->xs_heapfetch->tid; + + scandesc->xs_hitup = scandesc->xs_heapfetch->htup; + scandesc->xs_itup = scandesc->xs_heapfetch->itup; } /* * Fill the scan tuple slot with data from the index. This might be - * provided in either HeapTuple or IndexTuple format. Conceivably an + * provided in either HeapTuple or IndexTuple format. Conceivably an * index AM might fill both fields, in which case we prefer the heap - * format, since it's probably a bit cheaper to fill a slot from. + * format, since it's probably a bit cheaper to fill a slot from. As + * soon as we encounter a tuple from an all visible block, we stop + * prefetching and yield the tuple. As such, we can use the IndexTuple + * and HeapTuple that the index AM filled in the scan descriptor + * instead of having to get them from the per tuple state yielded by + * the streaming read API. */ + ExecClearTuple(node->ioss_TableSlot); if (scandesc->xs_hitup) { /* diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 03142b4a94..aa72479df1 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -125,13 +125,72 @@ IndexNext(IndexScanState *node) node->iss_OrderByKeys, node->iss_NumOrderByKeys); } + if (!scandesc->tid_queue) + { + /* Fall back to a queue size of 1 for now */ + int queue_size = 1; + + if (estate->es_use_prefetching && ScanDirectionIsForward(direction)) + queue_size = TID_QUEUE_SIZE; + scandesc->tid_queue = tid_queue_alloc(queue_size); + index_pgsr_alloc(scandesc); + } + /* * ok, now that we have what we need, fetch the next tuple. */ - while (index_getnext_slot(scandesc, direction, slot)) + for (;;) { + ItemPointerData last_tid = scandesc->xs_heaptid; + + /* + * If we haven't exhausted TIDs from the index, then fill the queue + * with TIDs from the index until the queue is full. Mark the index as + * exhausted if we reach the end of it. + */ + if (!scandesc->index_done) + { + while (!TID_QUEUE_FULL(scandesc->tid_queue)) + { + ItemPointer tid; + + if ((tid = index_getnext_tid(scandesc, direction)) == NULL) + { + scandesc->index_done = true; + break; + } + + index_tid_enqueue(scandesc->tid_queue, tid, scandesc->xs_recheck, + NULL, NULL); + } + } + + if (scandesc->xs_heap_continue) + scandesc->xs_heaptid = last_tid; + + /* + * index_fetch_heap() returns false when either the tuple isn't + * visible or when there's no more to read + */ + if (!index_fetch_heap(scandesc, slot)) + { + if (ItemPointerIsValid(&scandesc->xs_heaptid)) + continue; + + if (!scandesc->index_done) + continue; + + if (scandesc->xs_heap_continue) + continue; + + break; + } + CHECK_FOR_INTERRUPTS(); + scandesc->xs_recheck = scandesc->xs_heapfetch->recheck; + scandesc->xs_heaptid = scandesc->xs_heapfetch->tid; + /* * If the index was lossy, we have to recheck the index quals using * the fetched tuple. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 610f4a56d6..3360da8288 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -4719,6 +4719,7 @@ create_mergejoin_plan(PlannerInfo *root, /* Costs of sort and material steps are included in path cost already */ copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path); + root->allow_prefetch = false; return join_plan; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 2e2458b128..3be133f757 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -413,11 +413,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, root = subquery_planner(glob, parse, NULL, false, tuple_fraction); + root->allow_prefetch = true; + /* Select best Path and turn it into a Plan */ final_rel = fetch_upper_rel(root, UPPERREL_FINAL, NULL); best_path = get_cheapest_fractional_path(final_rel, tuple_fraction); top_plan = create_plan(root, best_path); + top_plan->allow_prefetch = root->allow_prefetch; /* * If creating a plan for a scrollable cursor, make sure it can run diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c index 19605090fe..aaccf25a7f 100644 --- a/src/backend/storage/aio/streaming_read.c +++ b/src/backend/storage/aio/streaming_read.c @@ -34,6 +34,7 @@ struct PgStreamingRead int pinned_buffers_trigger; int next_tail_buffer; bool finished; + bool resumable; void *pgsr_private; PgStreamingReadBufferCB callback; BufferAccessStrategy strategy; @@ -245,10 +246,12 @@ pg_streaming_read_new_range(PgStreamingRead *pgsr) static void pg_streaming_read_look_ahead(PgStreamingRead *pgsr) { + bool done = pgsr->finished && !pgsr->resumable; + /* * If we're finished or can't start more I/O, then don't look ahead. */ - if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios) + if (done || pgsr->ios_in_progress == pgsr->max_ios) return; /* @@ -433,3 +436,9 @@ pg_streaming_read_free(PgStreamingRead *pgsr) pfree(pgsr->per_buffer_data); pfree(pgsr); } + +void +pg_streaming_read_set_resumable(PgStreamingRead *pgsr) +{ + pgsr->resumable = true; +} diff --git a/src/include/access/genam.h b/src/include/access/genam.h index 8026c2b36d..13d6fab318 100644 --- a/src/include/access/genam.h +++ b/src/include/access/genam.h @@ -14,6 +14,7 @@ #ifndef GENAM_H #define GENAM_H +#include "access/relscan.h" #include "access/sdir.h" #include "access/skey.h" #include "nodes/tidbitmap.h" @@ -24,6 +25,22 @@ /* We don't want this file to depend on execnodes.h. */ struct IndexInfo; +typedef struct TIDQueueItem +{ + ItemPointerData tid; + bool recheck; + IndexTuple itup; + HeapTuple htup; +} TIDQueueItem; + +typedef struct TIDQueue +{ + uint64 head; + uint64 tail; + int size; + TIDQueueItem data[FLEXIBLE_ARRAY_MEMBER /* size */ ]; +} TIDQueue; + /* * Struct for statistics returned by ambuild */ @@ -175,6 +192,30 @@ extern IndexScanDesc index_beginscan_parallel(Relation heaprel, ParallelIndexScanDesc pscan); extern ItemPointer index_getnext_tid(IndexScanDesc scan, ScanDirection direction); + +extern void index_pgsr_alloc(IndexScanDesc scan); + +extern void tid_queue_reset(TIDQueue *q); + +extern void index_tid_enqueue(TIDQueue *tid_queue, ItemPointer tid, bool recheck, + HeapTuple htup, IndexTuple itup); + +#define TID_QUEUE_SIZE 6 + +extern TIDQueue *tid_queue_alloc(int size); + +static inline bool +TID_QUEUE_FULL(TIDQueue *q) +{ + return q->tail - q->head == q->size; +} + +static inline bool +TID_QUEUE_EMPTY(TIDQueue *q) +{ + return q->head == q->tail; +} + struct TupleTableSlot; extern bool index_fetch_heap(IndexScanDesc scan, struct TupleTableSlot *slot); extern bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 521043304a..66b3d90c83 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -18,6 +18,7 @@ #include "access/itup.h" #include "port/atomics.h" #include "storage/buf.h" +#include "storage/streaming_read.h" #include "storage/spin.h" #include "utils/relcache.h" @@ -104,8 +105,16 @@ typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker; typedef struct IndexFetchTableData { Relation rel; + PgStreamingRead *pgsr; + + ItemPointerData tid; + bool recheck; + IndexTuple itup; + HeapTuple htup; } IndexFetchTableData; +typedef struct TIDQueue TIDQueue; + /* * We use the same IndexScanDescData structure for both amgettuple-based * and amgetbitmap-based index scans. Some fields are only relevant in @@ -148,6 +157,8 @@ typedef struct IndexScanDescData bool xs_heap_continue; /* T if must keep walking, potential * further results */ IndexFetchTableData *xs_heapfetch; + TIDQueue *tid_queue; + bool index_done; bool xs_recheck; /* T means scan keys must be rechecked */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 444a5f0fd5..a8042abf95 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -697,6 +697,7 @@ typedef struct EState struct EPQState *es_epq_active; bool es_use_parallel_mode; /* can we use parallel workers? */ + bool es_use_prefetching; /* The per-query shared memory area to use for parallel execution. */ struct dsa_area *es_query_dsa; diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 534692bee1..171066b14e 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -203,6 +203,7 @@ struct PlannerInfo /* 1 at the outermost Query */ Index query_level; + bool allow_prefetch; /* NULL at outermost Query */ PlannerInfo *parent_root pg_node_attr(read_write_ignore); diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index b4ef6bc44c..317de2d781 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -169,6 +169,7 @@ typedef struct Plan */ Bitmapset *extParam; Bitmapset *allParam; + bool allow_prefetch; } Plan; /* ---------------- diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h index 40c3408c54..2288b7b5eb 100644 --- a/src/include/storage/streaming_read.h +++ b/src/include/storage/streaming_read.h @@ -42,4 +42,6 @@ extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private); extern void pg_streaming_read_free(PgStreamingRead *pgsr); +extern void pg_streaming_read_set_resumable(PgStreamingRead *pgsr); + #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0e34145187..ecc910ff35 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2726,6 +2726,8 @@ TBMSharedIteratorState TBMStatus TBlockState TIDBitmap +TIDQueue +TIDQueueItem TM_FailureData TM_IndexDelete TM_IndexDeleteOp -- 2.37.2