From 75666da998aaa8fbc60d62ad8c160a5c227065e6 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 23 Apr 2024 04:12:29 +0000 Subject: [PATCH v19 1/6] Introduce new Table Access Methods for single and multi inserts --- src/backend/access/heap/heapam.c | 202 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 6 + src/backend/access/table/tableam.c | 95 +++++++++++ src/backend/access/table/tableamapi.c | 10 ++ src/include/access/heapam.h | 44 +++++ src/include/access/tableam.h | 146 ++++++++++++++++ src/tools/pgindent/typedefs.list | 3 + 7 files changed, 505 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 4a4cf76269..37c6ed232c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -64,6 +64,7 @@ #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -112,7 +113,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); - +static void heap_modify_insert_end_callback(TableModifyState *state); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -2608,6 +2609,205 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(Relation rel, int modify_flags, + CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(TopTransactionContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc0(sizeof(TableModifyState)); + state->rel = rel; + state->modify_flags = modify_flags; + state->mem_cxt = context; + state->cid = cid; + state->options = options; + state->modify_buffer_flush_callback = modify_buffer_flush_callback; + state->modify_buffer_flush_context = modify_buffer_flush_context; + state->modify_end_callback = NULL; /* To be installed lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mem_cxt); + + /* First time through, initialize heap insert state */ + if (state->data == NULL) + { + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + + if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0) + { + mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + istate->mistate = mistate; + mistate->mem_cxt = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert memory context", + ALLOCSET_DEFAULT_SIZES); + } + + if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0) + istate->bistate = GetBulkInsertState(); + + state->modify_end_callback = heap_modify_insert_end_callback; + } + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + dstslot = mistate->slots[mistate->cur_slots]; + if (dstslot == NULL) + { + /* + * We use virtual tuple slots buffered slots for leveraging the + * optimization it provides to minimize physical data copying. The + * virtual slot gets materialized when we copy (via below + * ExecCopySlot) the tuples from the source slot which can be of any + * type. This way, it is ensured that the tuple storage doesn't depend + * on external memory, because all the datums that aren't passed by + * value are copied into the slot's memory context. + */ + dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel), + &TTSOpsVirtual); + mistate->slots[mistate->cur_slots] = dstslot; + } + + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + + mistate->cur_slots++; + + /* + * Memory allocated for the whole tuple is in slot's memory context, so + * use it keep track of the total space occupied by all buffered tuples. + */ + if (TTS_SHOULDFREE(dstslot)) + mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false); + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES) + heap_modify_buffer_flush(state); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_modify_buffer_flush(TableModifyState *state) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + if (mistate->cur_slots == 0) + return; + + /* + * heap_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(mistate->mem_cxt); + + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->options, istate->bistate); + + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->mem_cxt); + + if (state->modify_buffer_flush_callback != NULL) + state->modify_buffer_flush_callback(state->modify_buffer_flush_context, + mistate->slots, mistate->cur_slots); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Heap insert specific callback used for performing work at the end like + * flushing buffered tuples if any, cleaning up the insert state and buffered + * slots. + */ +static void +heap_modify_insert_end_callback(TableModifyState *state) +{ + HeapInsertState *istate; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + heap_modify_buffer_flush(state); + + Assert(mistate->cur_slots == 0 && + mistate->cur_size == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + MemoryContextDelete(mistate->mem_cxt); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + if (state->modify_end_callback != NULL) + state->modify_end_callback(state); + + MemoryContextDelete(state->mem_cxt); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 6f8b1b7929..eda0c73a16 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2615,6 +2615,12 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_buffer_flush = heap_modify_buffer_flush, + .tuple_modify_end = heap_modify_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index e57a0b7ea3..0e4ce1aca6 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -21,6 +21,7 @@ #include +#include "access/heapam.h" /* just for BulkInsertState */ #include "access/syncscan.h" #include "access/tableam.h" #include "access/xact.h" @@ -29,6 +30,7 @@ #include "storage/bufmgr.h" #include "storage/shmem.h" #include "storage/smgr.h" +#include "utils/memutils.h" /* * Constants to control the behavior of block allocation to parallel workers @@ -48,6 +50,7 @@ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; bool synchronize_seqscans = true; +static void default_table_modify_insert_end_callback(TableModifyState *state); /* ---------------------------------------------------------------------------- * Slot functions. @@ -756,3 +759,95 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths, else *allvisfrac = (double) relallvisible / curpages; } + +/* + * Initialize default table modify state. + */ +TableModifyState * +default_table_modify_begin(Relation rel, int modify_flags, + CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(CurrentMemoryContext, + "default_table_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc0(sizeof(TableModifyState)); + state->rel = rel; + state->modify_flags = modify_flags; + state->mem_cxt = context; + state->cid = cid; + state->options = options; + state->modify_end_callback = NULL; /* To be installed lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Default table modify implementation for inserts. + */ +void +default_table_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mem_cxt); + + /* First time through, initialize default table modify state */ + if (state->data == NULL) + { + if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0) + state->data = (BulkInsertState) GetBulkInsertState(); + + state->modify_end_callback = default_table_modify_insert_end_callback; + } + + /* Fallback to table AM single insert routine */ + table_tuple_insert(state->rel, + slot, + state->cid, + state->options, + (BulkInsertState) state->data); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Default table modify implementation for flush. + */ +void +default_table_modify_buffer_flush(TableModifyState *state) +{ + /* no-op */ +} + +/* + * Default table modify insert specific callback used for performing work at + * the end like cleaning up the bulk insert state. + */ +static void +default_table_modify_insert_end_callback(TableModifyState *state) +{ + if (state->data != NULL) + FreeBulkInsertState((BulkInsertState) state->data); +} + +/* + * Clean default table modify state. + */ +void +default_table_modify_end(TableModifyState *state) +{ + if (state->modify_end_callback != NULL) + state->modify_end_callback(state); + + MemoryContextDelete(state->mem_cxt); +} diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index ce637a5a5d..96ac951af6 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -97,6 +97,16 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); + /* optional, but either all of them are defined or none. */ + Assert((routine->tuple_modify_begin == NULL && + routine->tuple_modify_buffer_insert == NULL && + routine->tuple_modify_buffer_flush == NULL && + routine->tuple_modify_end == NULL) || + (routine->tuple_modify_begin != NULL && + routine->tuple_modify_buffer_insert != NULL && + routine->tuple_modify_buffer_flush != NULL && + routine->tuple_modify_end != NULL)); + return routine; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index c47a5045ce..c10ebbb5ea 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -271,6 +271,38 @@ typedef enum PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */ } PruneReason; +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of buffered slots currently held */ + int cur_slots; + + /* Approximate size of all tuples currently held in buffered slots */ + Size cur_size; + + MemoryContext mem_cxt; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + + /* ---------------- * function prototypes for heap access method * @@ -321,6 +353,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableModifyState *heap_modify_begin(Relation rel, + int modify_flags, + CommandId cid, + int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context); +extern void heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void heap_modify_buffer_flush(TableModifyState *state); +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8e583b45cd..ddb6e6f3a5 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -255,6 +255,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Table modify flags */ + +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TM_FLAG_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TM_FLAG_BAS_BULKWRITE 0x000002 + +struct TableModifyState; + +/* Callback invoked for each tuple that gets flushed to disk from buffer */ +typedef void (*TableModifyBufferFlushCallback) (void *context, + TupleTableSlot **slots, + int nslots); + +/* Table AM specific callback that gets called in table_modify_end() */ +typedef void (*TableModifyEndCallback) (struct TableModifyState *state); + +/* Holds table modify state */ +typedef struct TableModifyState +{ + Relation rel; + int modify_flags; + MemoryContext mem_cxt; + CommandId cid; + int options; + + /* Flush callback and its context */ + TableModifyBufferFlushCallback modify_buffer_flush_callback; + void *modify_buffer_flush_context; + + /* Table AM specific data */ + void *data; + + TableModifyEndCallback modify_end_callback; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -578,6 +615,21 @@ typedef struct TableAmRoutine void (*finish_bulk_insert) (Relation rel, int options); + /* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ + TableModifyState *(*tuple_modify_begin) (Relation rel, + int modify_flags, + CommandId cid, + int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context); + void (*tuple_modify_buffer_insert) (TableModifyState *state, + TupleTableSlot *slot); + void (*tuple_modify_buffer_flush) (TableModifyState *state); + void (*tuple_modify_end) (TableModifyState *state); + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ @@ -1609,6 +1661,100 @@ table_finish_bulk_insert(Relation rel, int options) rel->rd_tableam->finish_bulk_insert(rel, options); } +/* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ +extern TableModifyState *default_table_modify_begin(Relation rel, int modify_flags, + CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context); +extern void default_table_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void default_table_modify_buffer_flush(TableModifyState *state); +extern void default_table_modify_end(TableModifyState *state); + +static inline TableModifyState * +table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context) +{ + if (rel->rd_tableam && + rel->rd_tableam->tuple_modify_begin != NULL) + { + return rel->rd_tableam->tuple_modify_begin(rel, modify_flags, + cid, options, + modify_buffer_flush_callback, + modify_buffer_flush_context); + } + else if (rel->rd_tableam && + rel->rd_tableam->tuple_modify_begin == NULL) + { + /* Fallback to a default implementation */ + return default_table_modify_begin(rel, modify_flags, + cid, options, + modify_buffer_flush_callback, + modify_buffer_flush_context); + } + else + Assert(false); + + return NULL; +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_insert != NULL) + { + state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_insert == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_buffer_insert(state, slot); + } + else + Assert(false); +} + +static inline void +table_modify_buffer_flush(TableModifyState *state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_flush != NULL) + { + state->rel->rd_tableam->tuple_modify_buffer_flush(state); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_flush == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_buffer_flush(state); + } + else + Assert(false); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_end != NULL) + { + state->rel->rd_tableam->tuple_modify_end(state); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_end == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_end(state); + } + else + Assert(false); +} /* ------------------------------------------------------------------------ * DDL related functionality. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d551ada325..ebde07bcde 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1130,6 +1130,8 @@ HeadlineJsonState HeadlineParsedText HeadlineWordEntry HeapCheckContext +HeapInsertState +HeapMultiInsertState HeapPageFreeze HeapScanDesc HeapTuple @@ -2844,6 +2846,7 @@ TableFuncScanState TableFuncType TableInfo TableLikeClause +TableModifyState TableSampleClause TableScanDesc TableScanDescData -- 2.34.1