From 6803736e5695ab0ef06d263e9ba260db02d3b80c Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 1 Aug 2023 09:38:47 +0000 Subject: [PATCH v7] New table AMs for single and multi inserts --- src/backend/access/heap/heapam.c | 180 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 6 + src/include/access/heapam.h | 45 ++++++ src/include/access/tableam.h | 107 ++++++++++++++ 4 files changed, 338 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 7ed72abe59..ba4347026a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -68,6 +68,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -75,6 +76,7 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); +static void heap_multi_insert_flush(TableInsertState *state); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, @@ -2443,6 +2445,184 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int table_am_flags, + int table_insert_flags) +{ + TableInsertState *tistate; + + tistate = (TableInsertState *) palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->table_am_flags = table_am_flags; + tistate->table_insert_flags = table_insert_flags; + + if ((table_am_flags & TABLEAM_USE_MULTI_INSERTS) != 0 || + (table_am_flags & TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + { + tistate->table_am_data = + (HeapInsertState *) palloc0(sizeof(HeapInsertState)); + } + + if ((table_am_flags & TABLEAM_USE_MULTI_INSERTS) != 0) + { + ((HeapInsertState *) tistate->table_am_data)->mistate = + (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState)); + + ((HeapInsertState *) tistate->table_am_data)->mistate->slots = + palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + ((HeapInsertState *) tistate->table_am_data)->mistate->context = + AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + } + + if ((table_am_flags & TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->table_am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->bistate != NULL) + { + bistate = ((HeapInsertState *) state->table_am_data)->bistate; + } + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->table_insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->table_am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES) + heap_multi_insert_flush(state); +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState *state) +{ + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->table_am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->table_am_data)->mistate = NULL; + } + + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->bistate != NULL) + { + FreeBulkInsertState(((HeapInsertState *) state->table_am_data)->bistate); + } + + pfree(state->table_am_data); + state->table_am_data = NULL; + pfree(state); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +static void +heap_multi_insert_flush(TableInsertState *state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->table_am_data)->mistate; + + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->bistate != NULL) + { + bistate = ((HeapInsertState *) state->table_am_data)->bistate; + } + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->table_insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 5a17112c91..6f144d88dd 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2568,6 +2568,12 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index faf5026519..a1ea26cbd6 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -191,6 +191,40 @@ typedef struct HeapPageFreeze } HeapPageFreeze; +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -241,6 +275,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState* heap_insert_begin(Relation rel, + CommandId cid, + int table_am_flags, + int table_insert_flags); +extern void heap_insert_v2(TableInsertState *state, + TupleTableSlot *slot); +extern void heap_multi_insert_v2(TableInsertState *state, + TupleTableSlot *slot); +extern void heap_insert_end(TableInsertState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 230bc39cc0..5ea3eeee8a 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,35 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi (buffer multiple tuples and insert them at once) inserts */ +#define TABLEAM_USE_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass of + * insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int table_am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int table_insert_flags; + + /* Table AM specific data starts here */ + void *table_am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +551,19 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int table_am_flags, + int table_insert_flags); + + void (*tuple_insert_v2) (TableInsertState *state, + TupleTableSlot *slot); + + void (*tuple_multi_insert_v2) (TableInsertState *state, + TupleTableSlot *slot); + + void (*tuple_insert_end) (TableInsertState *state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1498,71 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int table_am_flags, + int table_insert_flags) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + { + return rel->rd_tableam->tuple_insert_begin(rel, cid, table_am_flags, + table_insert_flags); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel))); +} + +static inline void +table_tuple_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_begin) + { + return state->rel->rd_tableam->tuple_insert_v2(state, slot); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_begin) + { + return state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_insert_end(TableInsertState *state) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_begin) + { + return state->rel->rd_tableam->tuple_insert_end(state); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + /* * Delete a tuple. * -- 2.34.1