From 6518212583e24b017375512701d9fefa6de20e42 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 10 Mar 2021 09:53:48 +0530 Subject: [PATCH v6 1/3] New Table AMs for Multi and Single Inserts This patch introduces new table access methods for multi and single inserts. Also implements/rearranges the outside code for heap am into these new APIs. Main design goal of these new APIs is to give flexibility to tableam developers in implementing multi insert logic dependent on the underlying storage engine. Currently, for all the underlying storage engines, we follow the same multi insert logic such as when and how to flush the buffered tuples, tuple size calculation, and this logic doesn't take into account the underlying storage engine capabilities. We can also avoid duplicating multi insert code (for existing COPY, and upcoming CTAS, CREATE/REFRESH MAT VIEW and INSERT SELECTs). We can also move bulk insert state allocation and deallocation inside these APIs. --- src/backend/access/heap/heapam.c | 212 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 5 + src/backend/access/table/tableamapi.c | 7 + src/backend/executor/execTuples.c | 83 ++++++++- src/include/access/heapam.h | 49 +++++- src/include/access/tableam.h | 87 ++++++++++ src/include/executor/tuptable.h | 1 + 7 files changed, 438 insertions(+), 6 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 3b435c107d..d8bfe17f22 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -67,6 +67,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2669,6 +2670,217 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * heap_insert_begin - allocate and initialize TableInsertState + * + * For single inserts: + * 1) Specify is_multi as false, then multi insert state will be NULL. + * + * For multi inserts: + * 1) Specify is_multi as true, then multi insert state will be allocated and + * initialized. + * + * Other input parameters i.e. relation, command id, options are common for + * both single and multi inserts. + */ +TableInsertState* +heap_insert_begin(Relation rel, CommandId cid, int options, bool is_multi) +{ + TableInsertState *state; + + state = palloc(sizeof(TableInsertState)); + state->rel = rel; + state->cid = cid; + state->options = options; + /* Below parameters are not used for single inserts. */ + state->mi_slots = NULL; + state->mistate = NULL; + state->mi_cur_slots = 0; + state->flushed = false; + + if (is_multi) + { + HeapMultiInsertState *mistate; + + mistate = palloc(sizeof(HeapMultiInsertState)); + state->mi_slots = + palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + mistate->max_slots = MAX_BUFFERED_TUPLES; + mistate->max_size = MAX_BUFFERED_BYTES; + mistate->cur_size = 0; + /* + * Create a temporary memory context so that we can reset once per + * multi insert batch. + */ + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert", + ALLOCSET_DEFAULT_SIZES); + state->mistate = mistate; + } + + return state; +} + +/* + * heap_insert_v2 - insert single tuple into a heap + * + * Insert tuple from slot into table. This is like heap_insert(), the only + * difference is that the parameters for insertion are inside table insert + * state structure. + */ +void +heap_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + + Assert(state); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->options, state->bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * heap_multi_insert_v2 - insert multiple tuples into a heap + * + * Compute size of tuple. See if the buffered slots can hold the tuple. If yes, + * store it in the buffers, otherwise flush i.e. insert the so far buffered + * tuples into heap. + * + * Flush can happen: + * 1) either if all the buffered slots are filled up + * 2) or if total tuple size of the currently buffered slots are >= max_size + */ +void +heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + TupleTableSlot *batchslot; + HeapMultiInsertState *mistate; + Size sz; + + Assert(state); + + mistate = (HeapMultiInsertState *)state->mistate; + + Assert(mistate && state->mi_slots); + + /* Reset flush state if previously set. */ + if (state->flushed) + { + state->mi_cur_slots = 0; + state->flushed = false; + } + + Assert(state->mi_cur_slots < mistate->max_slots); + + if (state->mi_slots[state->mi_cur_slots] == NULL) + state->mi_slots[state->mi_cur_slots] = + table_slot_create(state->rel, NULL); + + batchslot = state->mi_slots[state->mi_cur_slots]; + + ExecClearTuple(batchslot); + ExecCopySlot(batchslot, slot); + + /* + * Calculate tuple size after original slot is copied, because the copied + * slot type and tuple size may change. + */ + sz = GetTupleSize(batchslot, mistate->max_size); + + Assert(sz > 0); + + state->mi_cur_slots++; + mistate->cur_size += sz; + + if (state->mi_cur_slots >= mistate->max_slots || + mistate->cur_size >= mistate->max_size) + heap_multi_insert_flush(state); +} + +/* + * heap_multi_insert_flush - flush buffered tuples, if any, into a heap + * + * Flush the buffered tuples, indicate caller that flushing happened and reset + * parameters. + */ +void +heap_multi_insert_flush(TableInsertState *state) +{ + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + Assert(state); + + mistate = (HeapMultiInsertState *)state->mistate; + + Assert(mistate && state->mi_slots && state->mi_cur_slots >= 0 && + mistate->context); + + if (state->flushed) + { + state->mi_cur_slots = 0; + state->flushed = false; + return; + } + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, state->mi_slots, state->mi_cur_slots, + state->cid, state->options, state->bistate); + MemoryContextReset(mistate->context); + MemoryContextSwitchTo(oldcontext); + + state->flushed = true; + mistate->cur_size = 0; +} + +/* + * heap_insert_end - clean up TableInsertState + * + * For multi inserts, ensure to flush all the remaining buffers with + * heap_multi_insert_flush before calling this function. + * + * In this function, buffered slots are dropped, short-lived memory context is + * deleted, mistate and TableInsertState are freed up. + */ +void +heap_insert_end(TableInsertState *state) +{ + HeapMultiInsertState *mistate; + int i; + + Assert(state); + + mistate = (HeapMultiInsertState *)state->mistate; + + if (!mistate) + { + pfree(state); + return; + } + + Assert(state->mi_slots && mistate->context); + + /* Ensure that the buffers have been flushed before. */ + Assert(state->mi_cur_slots == 0 || state->flushed); + + for (i = 0; i < mistate->max_slots && state->mi_slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(state->mi_slots[i]); + + MemoryContextDelete(mistate->context); + pfree(mistate); + pfree(state->mi_slots); + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index bd5faf0c1f..655de8e6b7 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2558,6 +2558,11 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .multi_insert_v2 = heap_multi_insert_v2, + .multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 325ecdc122..95f1f9b6a0 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -78,6 +78,13 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->tuple_complete_speculative != NULL); Assert(routine->multi_insert != NULL); + + Assert(routine->tuple_insert_begin != NULL); + Assert(routine->tuple_insert_v2 != NULL); + Assert(routine->multi_insert_v2 != NULL); + Assert(routine->multi_insert_flush != NULL); + Assert(routine->tuple_insert_end != NULL); + Assert(routine->tuple_delete != NULL); Assert(routine->tuple_update != NULL); Assert(routine->tuple_lock != NULL); diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 73c35df9c9..79ae22455a 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -159,7 +159,11 @@ tts_virtual_materialize(TupleTableSlot *slot) if (TTS_SHOULDFREE(slot)) return; - /* compute size of memory required */ + /* + * Compute size of memory required. This size calculation code is also used + * in GetTupleSize(), hence ensure to have the same changes or fixes here + * and also there. + */ for (int natt = 0; natt < desc->natts; natt++) { Form_pg_attribute att = TupleDescAttr(desc, natt); @@ -1239,6 +1243,83 @@ ExecDropSingleTupleTableSlot(TupleTableSlot *slot) pfree(slot); } +/* + * GetTupleSize - Compute the tuple size given a table slot. + * + * For heap tuple, buffer tuple and minimal tuple slot types return the actual + * tuple size that exists. For virtual tuple, the size is calculated as the + * slot does not have the tuple size. If the computed size exceeds the given + * maxsize for the virtual tuple, this function exits, not investing time in + * further unnecessary calculation. + * + * Important Notes: + * 1) Size calculation code for virtual slots is being used from + * tts_virtual_materialize(), hence ensure to have the same changes or fixes + * here and also there. + * 2) Currently, GetTupleSize() handles the existing heap, buffer, minimal and + * virtual slots. Ensure to add related code in case any new slot type is + * introduced. + */ +inline Size +GetTupleSize(TupleTableSlot *slot, Size maxsize) +{ + Size sz = 0; + HeapTuple tuple = NULL; + + if (TTS_IS_HEAPTUPLE(slot)) + tuple = ((HeapTupleTableSlot *) slot)->tuple; + else if(TTS_IS_BUFFERTUPLE(slot)) + tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple; + else if(TTS_IS_MINIMALTUPLE(slot)) + tuple = ((MinimalTupleTableSlot *) slot)->tuple; + else if(TTS_IS_VIRTUAL(slot)) + { + /* + * Size calculation code being used here is from + * tts_virtual_materialize(), ensure to have the same changes or fixes + * here and also there. + */ + TupleDesc desc = slot->tts_tupleDescriptor; + + for (int natt = 0; natt < desc->natts; natt++) + { + Form_pg_attribute att = TupleDescAttr(desc, natt); + Datum val; + + if (att->attbyval) + sz += att->attlen; + + if (slot->tts_isnull[natt]) + continue; + + val = slot->tts_values[natt]; + + if (att->attlen == -1 && + VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val))) + { + sz = att_align_nominal(sz, att->attalign); + sz += EOH_get_flat_size(DatumGetEOHP(val)); + } + else + { + sz = att_align_nominal(sz, att->attalign); + sz = att_addlength_datum(sz, att->attlen, val); + } + + /* + * We are not interested in proceeding further if the computed size + * crosses maxsize limit that we are looking for. + */ + if (maxsize != 0 && sz >= maxsize) + break; + } + } + + if (tuple != NULL && !TTS_IS_VIRTUAL(slot)) + sz = tuple->t_len; + + return sz; +} /* ---------------------------------------------------------------- * tuple table slot accessor functions diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index bc0936bc2d..da74ab072d 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -36,11 +36,26 @@ #define HEAP_INSERT_NO_LOGICAL TABLE_INSERT_NO_LOGICAL #define HEAP_INSERT_SPECULATIVE 0x0010 -typedef struct BulkInsertStateData *BulkInsertState; struct TupleTableSlot; #define MaxLockTupleMode LockTupleExclusive +/* + * No more than this many tuples per single multi insert batch + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. Increasing this can cause quadratic growth in + * memory requirements during copies into partitioned tables with a large + * number of partitions. + */ +#define MAX_BUFFERED_TUPLES 1000 + +/* + * Flush multi insert buffers if there are >= this many bytes, as counted by + * the size of the tuples buffered. + */ +#define MAX_BUFFERED_BYTES 65535 + /* * Descriptor for heap table scans. */ @@ -93,6 +108,25 @@ typedef enum HEAPTUPLE_DELETE_IN_PROGRESS /* deleting xact is still in progress */ } HTSV_Result; +/* Holds multi insert state for heap access method.*/ +typedef struct HeapMultiInsertState +{ + /* Switch to short-lived memory context before flushing. */ + MemoryContext context; + /* Maximum number of slots that can be buffered. */ + int32 max_slots; + /* + * Maximum size (in bytes) of all the tuples that a single batch of + * buffered slots can hold. + */ + int64 max_size; + /* + * Total tuple size (in bytes) of the slots that are currently buffered. + * Flush the buffered slots when cur_size >= max_size. + */ + int64 cur_size; +} HeapMultiInsertState; + /* ---------------- * function prototypes for heap access method * @@ -134,15 +168,20 @@ extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation, extern void heap_get_latest_tid(TableScanDesc scan, ItemPointer tid); -extern BulkInsertState GetBulkInsertState(void); -extern void FreeBulkInsertState(BulkInsertState); -extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); - extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate); extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState* heap_insert_begin(Relation rel, CommandId cid, + int options, bool is_multi); +extern void heap_insert_v2(TableInsertState *state, TupleTableSlot *slot); +extern void heap_multi_insert_v2(TableInsertState *state, + TupleTableSlot *slot); +extern void heap_multi_insert_flush(TableInsertState *state); +extern void heap_insert_end(TableInsertState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 414b6b4d57..2a1470a7b6 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -229,6 +229,32 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Holds table insert state. */ +typedef struct TableInsertState +{ + Relation rel; + /* Bulk insert state if requested, otherwise NULL. */ + struct BulkInsertStateData *bistate; + CommandId cid; + int options; + /* Below members are only used for multi inserts. */ + /* Array of buffered slots. */ + TupleTableSlot **mi_slots; + /* Number of slots that are currently buffered. */ + int32 mi_cur_slots; + /* + * Access method specific information such as parameters that are needed + * for buffering and flushing decisions can go here. + */ + void *mistate; + /* + * This parameter indicates whether or not the buffered slots have been + * flushed to a table. Used by callers of multi insert API for inserting + * into indexes or executing after row triggers, if any. + */ + bool flushed; +}TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -504,6 +530,17 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState* (*tuple_insert_begin) (Relation rel, CommandId cid, + int options, bool is_multi); + + void (*tuple_insert_v2) (TableInsertState *state, TupleTableSlot *slot); + + void (*multi_insert_v2) (TableInsertState *state, TupleTableSlot *slot); + + void (*multi_insert_flush) (TableInsertState *state); + + void (*tuple_insert_end) (TableInsertState *state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -851,6 +888,8 @@ typedef struct TableAmRoutine } TableAmRoutine; +typedef struct BulkInsertStateData *BulkInsertState; + /* ---------------------------------------------------------------------------- * Slot functions. * ---------------------------------------------------------------------------- @@ -869,6 +908,10 @@ extern const TupleTableSlotOps *table_slot_callbacks(Relation rel); */ extern TupleTableSlot *table_slot_create(Relation rel, List **reglist); +/* Bulk insert state functions. */ +extern BulkInsertState GetBulkInsertState(void); +extern void FreeBulkInsertState(BulkInsertState); +extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); /* ---------------------------------------------------------------------------- * Table scan functions. @@ -1430,6 +1473,50 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState* +table_insert_begin(Relation rel, CommandId cid, int options, + bool alloc_bistate, bool is_multi) +{ + TableInsertState *state = rel->rd_tableam->tuple_insert_begin(rel, cid, + options, is_multi); + + /* Allocate bulk insert state here, since it's AM independent. */ + if (alloc_bistate) + state->bistate = GetBulkInsertState(); + else + state->bistate = NULL; + + return state; +} + +static inline void +table_tuple_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->tuple_insert_v2(state, slot); +} + +static inline void +table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->multi_insert_v2(state, slot); +} + +static inline void +table_multi_insert_flush(TableInsertState *state) +{ + state->rel->rd_tableam->multi_insert_flush(state); +} + +static inline void +table_insert_end(TableInsertState *state) +{ + /* Deallocate bulk insert state here, since it's AM independent. */ + if (state->bistate) + FreeBulkInsertState(state->bistate); + + state->rel->rd_tableam->tuple_insert_end(state); +} + /* * Delete a tuple. * diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index 679e57fbdd..1f59614183 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -330,6 +330,7 @@ extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum, int lastAttNum); extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum); +extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize); #ifndef FRONTEND -- 2.25.1