From 2de89705c6b2d03020988db0cc8857a0bf19b38e Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 25 Mar 2024 07:09:25 +0000 Subject: [PATCH v14 1/3] Introduce table modify access methods --- src/backend/access/heap/heapam.c | 163 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 6 + src/include/access/heapam.h | 48 +++++++ src/include/access/tableam.h | 103 ++++++++++++++ src/tools/pgindent/typedefs.list | 4 + 5 files changed, 324 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 34bc60f625..d1ef2464ef 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -64,6 +64,7 @@ #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2442,6 +2443,168 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(TableModifyKind kind, Relation rel, int flags) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(CurrentMemoryContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + + state = palloc0(sizeof(TableModifyState)); + state->kind = kind; + state->rel = rel; + state->flags = flags; + state->mctx = context; + + if (kind == TM_KIND_INSERT) + { + HeapInsertState *istate; + + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + + if ((flags & TM_FLAG_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + istate->mistate = mistate; + } + + if ((flags & TM_FLAG_BAS_BULKWRITE) != 0) + istate->bistate = GetBulkInsertState(); + } + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, CommandId cid, + int options, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + Assert(state->kind == TM_KIND_INSERT); + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + Assert(istate->bistate != NULL); + + oldcontext = MemoryContextSwitchTo(state->mctx); + + dstslot = mistate->slots[mistate->cur_slots]; + if (dstslot == NULL) + { + /* + * We use virtual tuple slots buffered slots for leveraging the + * optimization it provides to minimize physical data copying. The + * virtual slot gets materialized when we copy (via below + * ExecCopySlot) the tuples from the source slot which can be of any + * type. This way, it is ensured that the tuple storage doesn't depend + * on external memory, because all the datums that aren't passed by + * value are copied into the slot's memory context. + */ + dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel), + &TTSOpsVirtual); + mistate->slots[mistate->cur_slots] = dstslot; + } + + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + + mistate->cur_slots++; + + /* + * Memory allocated for the whole tuple is in slot's memory context, so + * use it keep track of the total space occupied by all buffered tuples. + */ + if (TTS_SHOULDFREE(dstslot)) + mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false); + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES) + heap_modify_buffer_flush(state, cid, options); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_modify_buffer_flush(TableModifyState *state, CommandId cid, + int options) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + Assert(state->kind == TM_KIND_INSERT); + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + Assert(istate->bistate != NULL); + + oldcontext = MemoryContextSwitchTo(state->mctx); + + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + cid, options, istate->bistate); + + mistate->cur_slots = 0; + mistate->cur_size = 0; + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + if (state->kind == TM_KIND_INSERT) + { + HeapInsertState *istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + Assert(mistate->cur_slots == 0 && + mistate->cur_size == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); + } + + MemoryContextDelete(state->mctx); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 2b7c702642..4437425de9 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2564,6 +2564,12 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_buffer_flush = heap_modify_buffer_flush, + .tuple_modify_end = heap_modify_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4b133f6859..2b526550df 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,38 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of buffered slots currently held */ + int cur_slots; + + /* Approximate size of all tuples currently held in buffered slots */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + + /* ---------------- * function prototypes for heap access method * @@ -275,6 +307,22 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableModifyState *heap_modify_begin(TableModifyKind kind, + Relation rel, + int flags); + +extern void heap_modify_buffer_insert(TableModifyState *state, + CommandId cid, + int options, + TupleTableSlot *slot); + +extern void heap_modify_buffer_flush(TableModifyState *state, + CommandId cid, + int options); + +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 65834caeb1..3fc6d93555 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,33 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Holds table modify kind */ +typedef enum TableModifyKind +{ + TM_KIND_NONE, + TM_KIND_INSERT +} TableModifyKind; + +/* Table modify flags */ + +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TM_FLAG_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TM_FLAG_BAS_BULKWRITE 0x000002 + +/* Holds table modify state */ +typedef struct TableModifyState +{ + TableModifyKind kind; + Relation rel; + int flags; + MemoryContext mctx; + + /* Table AM specific data starts here */ + void *data; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +549,21 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableModifyState *(*tuple_modify_begin) (TableModifyKind kind, + Relation rel, + int flags); + + void (*tuple_modify_buffer_insert) (TableModifyState *state, + CommandId cid, + int options, + TupleTableSlot *slot); + + void (*tuple_modify_buffer_flush) (TableModifyState *state, + CommandId cid, + int options); + + void (*tuple_modify_end) (TableModifyState *state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1462,6 +1504,67 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableModifyState * +table_modify_begin(TableModifyKind kind, Relation rel, int flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_modify_begin) + { + return rel->rd_tableam->tuple_modify_begin(kind, rel, flags); + } + else + { + elog(ERROR, "table_modify_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, CommandId cid, + int options, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_insert) + { + state->rel->rd_tableam->tuple_modify_buffer_insert(state, + cid, + options, + slot); + } + else + elog(ERROR, "table_modify_buffer_insert access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_modify_buffer_flush(TableModifyState *state, CommandId cid, + int options) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_flush) + { + state->rel->rd_tableam->tuple_modify_buffer_flush(state, + cid, + options); + } + else + elog(ERROR, "table_modify_buffer_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_end) + { + state->rel->rd_tableam->tuple_modify_end(state); + } + else + elog(ERROR, "table_modify_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + /* * Delete a tuple. * diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e2a0525dd4..8396ec4ff0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1122,6 +1122,8 @@ HeadlineJsonState HeadlineParsedText HeadlineWordEntry HeapCheckContext +HeapInsertState +HeapMultiInsertState HeapPageFreeze HeapScanDesc HeapTuple @@ -2808,6 +2810,8 @@ TableFuncScan TableFuncScanState TableInfo TableLikeClause +TableModifyKind +TableModifyState TableSampleClause TableScanDesc TableScanDescData -- 2.34.1