diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index de5bb9194e..dc7a2b12b5 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, * temporary context before calling this, if that's a problem. */ void -heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, +heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) { TransactionId xid = GetCurrentTransactionId(); @@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, saveFreeSpace = RelationGetTargetPageFreeSpace(relation, HEAP_DEFAULT_FILLFACTOR); - /* Toast and set header data in all the tuples */ + /* Toast and set header data in all the slots */ heaptuples = palloc(ntuples * sizeof(HeapTuple)); for (i = 0; i < ntuples; i++) - heaptuples[i] = heap_prepare_insert(relation, tuples[i], - xid, cid, options); + { + HeapTuple tuple; + + tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL); + slots[i]->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slots[i]->tts_tableOid; + heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid, + options); + } /* * We're about to do the actual inserts -- but check for conflict first, @@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, CacheInvalidateHeapTuple(relation, heaptuples[i], NULL); } - /* - * Copy t_self fields back to the caller's original tuples. This does - * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's - * probably faster to always copy than check. - */ + /* copy t_self fields back to the caller's slots */ for (i = 0; i < ntuples; i++) - tuples[i]->t_self = heaptuples[i]->t_self; + slots[i]->tts_tid = heaptuples[i]->t_self; pgstat_count_heap_insert(relation, ntuples); } diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index cf6e35670b..aa3cb7e7b3 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2010,6 +2010,7 @@ static const TableAmRoutine heapam_methods = { .tuple_complete_speculative = heapam_tuple_complete_speculative, .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, + .multi_insert = heap_multi_insert, .tuple_lock = heapam_tuple_lock, .tuple_fetch_row_version = heapam_fetch_row_version, diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 27d3a012af..6bd6e6d68f 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -316,13 +316,12 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, static void EndCopyTo(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); -static void CopyOneRowTo(CopyState cstate, - Datum *values, bool *nulls); +static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, - ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, + ResultRelInfo *resultRelInfo, BulkInsertState bistate, - int nBufferedTuples, HeapTuple *bufferedTuples, + int nBufferedTuples, TupleTableSlot **bufferedSlots, uint64 firstBufferedLineNo); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); @@ -2073,33 +2072,27 @@ CopyTo(CopyState cstate) if (cstate->rel) { - Datum *values; - bool *nulls; + TupleTableSlot *slot; TableScanDesc scandesc; - HeapTuple tuple; - - values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); - nulls = (bool *) palloc(num_phys_attrs * sizeof(bool)); scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); + slot = table_slot_create(cstate->rel, NULL); processed = 0; - while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) + while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot)) { CHECK_FOR_INTERRUPTS(); - /* Deconstruct the tuple ... faster than repeated heap_getattr */ - heap_deform_tuple(tuple, tupDesc, values, nulls); + /* Deconstruct the tuple ... */ + slot_getallattrs(slot); /* Format and send the data */ - CopyOneRowTo(cstate, values, nulls); + CopyOneRowTo(cstate, slot); processed++; } + ExecDropSingleTupleTableSlot(slot); table_endscan(scandesc); - - pfree(values); - pfree(nulls); } else { @@ -2125,7 +2118,7 @@ CopyTo(CopyState cstate) * Emit one row during CopyTo(). */ static void -CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls) +CopyOneRowTo(CopyState cstate, TupleTableSlot *slot) { bool need_delim = false; FmgrInfo *out_functions = cstate->out_functions; @@ -2142,11 +2135,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls) CopySendInt16(cstate, list_length(cstate->attnumlist)); } + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); - Datum value = values[attnum - 1]; - bool isnull = nulls[attnum - 1]; + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; if (!cstate->binary) { @@ -2305,25 +2301,219 @@ limit_printout_length(const char *str) return res; } +#define MAX_BUFFERED_TUPLES 1000 +#define MAX_BUFFERED_BYTES 65535 +#define MAX_PARTITION_BUFFERS 16 + +typedef struct { + Oid relid; + TupleTableSlot **slots; + ResultRelInfo *resultRelInfo; + int nused; +} CopyMultiInsertBuffer; + +typedef struct { + HTAB *multiInsertBufferTab; + CopyMultiInsertBuffer *buffer; + int bufferedTuples; + int bufferedBytes; + int nbuffers; +} CopyMultiInsertInfo; + + +static void +CopyMultiInsertInfo_Init(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, + bool partitioned) +{ + if (partitioned) + { + HASHCTL ctl; + HTAB *htab; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(CopyMultiInsertBuffer); + ctl.hcxt = CurrentMemoryContext; + + htab = hash_create("Copy multi-insert buffer table", MAX_PARTITION_BUFFERS, + &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + miinfo->multiInsertBufferTab = htab; + miinfo->buffer = NULL; + miinfo->nbuffers = 0; + } + else + { + CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); + + buffer->relid = RelationGetRelid(rri->ri_RelationDesc); + buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *)); + buffer->resultRelInfo = rri; + buffer->nused = 0; + miinfo->multiInsertBufferTab = NULL; + miinfo->buffer = buffer; + miinfo->nbuffers = 1; + } + + miinfo->bufferedTuples = 0; + miinfo->bufferedBytes = 0; +} + +static inline void +CopyMultiInsertInfo_SetCurrentBuffer(CopyMultiInsertInfo *miinfo, + ResultRelInfo *rri) +{ + CopyMultiInsertBuffer *buffer; + Oid relid = RelationGetRelid(rri->ri_RelationDesc); + bool found; + + Assert(miinfo->multiInsertBufferTab != NULL); + + buffer = hash_search(miinfo->multiInsertBufferTab, &relid, HASH_ENTER, &found); + + if (!found) + { + buffer->relid = relid; + buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *)); + buffer->resultRelInfo = rri; + buffer->nused = 0; + miinfo->nbuffers++; + } + + miinfo->buffer = buffer; +} + +static inline bool +CopyMultiInsertInfo_IsFull(CopyMultiInsertInfo *miinfo) +{ + if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || + miinfo->bufferedBytes >= MAX_BUFFERED_BYTES || + miinfo->nbuffers >= MAX_PARTITION_BUFFERS) + return true; + return false; +} + +static inline bool +CopyMultiInsertInfo_IsEmpty(CopyMultiInsertInfo *miinfo) +{ + return miinfo->bufferedTuples == 0; +} + +static inline void +CopyMultiInsertInfo_FlushSingleBuffer(CopyMultiInsertBuffer *buffer, CopyState cstate, EState *estate, + CommandId mycid, int hi_options, + BulkInsertState bistate, uint64 firstBufferedLineNo) +{ + CopyFromInsertBatch(cstate, estate, mycid, hi_options, + buffer->resultRelInfo, bistate, + buffer->nused, + buffer->slots, + firstBufferedLineNo); + buffer->nused = 0; + + /* + * We'd better make the bulk insert mechanism gets a new + * buffer when the partition being inserted into changes. + */ + ReleaseBulkInsertStatePin(bistate); +} + +static inline void +CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer) +{ + Oid relid = buffer->relid; + int i = 0; + + while (buffer->slots[i] != NULL) + ExecClearTuple(buffer->slots[i++]); + pfree(buffer->slots); + + hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE, + NULL); + miinfo->nbuffers--; +} + + +static inline void +CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, CopyState cstate, EState *estate, + CommandId mycid, int hi_options, + BulkInsertState bistate, uint64 firstBufferedLineNo) +{ + if (miinfo->multiInsertBufferTab == NULL) + CopyMultiInsertInfo_FlushSingleBuffer(miinfo->buffer, cstate, estate, + mycid, hi_options, bistate, + firstBufferedLineNo); + else + { + HASH_SEQ_STATUS status; + CopyMultiInsertBuffer *buffer; + + hash_seq_init(&status, miinfo->multiInsertBufferTab); + + while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL) + { + if (buffer->nused > 0) + { + /* Flush the buffer if it was used */ + CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate, + mycid, hi_options, + bistate, + firstBufferedLineNo); + } + else + { + /* + * Otherwise just remove it. If we saw no tuples for it this + * batch, then likely its best to make way for buffers for + * other partitions. + */ + CopyMultiInsertBuffer_RemoveBuffer(miinfo, buffer); + } + } + } + + miinfo->bufferedTuples = 0; + miinfo->bufferedBytes = 0; +} + +static inline TupleTableSlot * +CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo, + ResultRelInfo *rri, List **tupleTable) +{ + CopyMultiInsertBuffer *buffer = miinfo->buffer; + + if (buffer->slots[buffer->nused] == NULL) + buffer->slots[buffer->nused] = table_slot_create(rri->ri_RelationDesc, tupleTable); + return buffer->slots[buffer->nused]; +} + +static inline void +CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, TupleTableSlot *slot, + int tuplen) +{ + CopyMultiInsertBuffer *buffer = miinfo->buffer; + + Assert(slot == buffer->slots[buffer->nused]); + + buffer->nused++; + miinfo->bufferedTuples++; + miinfo->bufferedBytes += tuplen; +} + /* * Copy FROM file to relation. */ uint64 CopyFrom(CopyState cstate) { - HeapTuple tuple; - TupleDesc tupDesc; - Datum *values; - bool *nulls; ResultRelInfo *resultRelInfo; ResultRelInfo *target_resultRelInfo; ResultRelInfo *prevResultRelInfo = NULL; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ ModifyTableState *mtstate; ExprContext *econtext; - TupleTableSlot *myslot; + TupleTableSlot *singleslot = NULL; MemoryContext oldcontext = CurrentMemoryContext; - MemoryContext batchcontext; PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; @@ -2331,23 +2521,16 @@ CopyFrom(CopyState cstate) int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; CopyInsertMethod insertMethod; + CopyMultiInsertInfo multiInsertInfo; uint64 processed = 0; - int nBufferedTuples = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; - -#define MAX_BUFFERED_TUPLES 1000 -#define RECHECK_MULTI_INSERT_THRESHOLD 1000 - HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ - Size bufferedTuplesSize = 0; uint64 firstBufferedLineNo = 0; - uint64 lastPartitionSampleLineNo = 0; - uint64 nPartitionChanges = 0; - double avgTuplesPerPartChange = 0; - Assert(cstate->rel); + memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo)); + /* * The target must be a plain, foreign, or partitioned relation, or have * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only @@ -2382,8 +2565,6 @@ CopyFrom(CopyState cstate) RelationGetRelationName(cstate->rel)))); } - tupDesc = RelationGetDescr(cstate->rel); - /*---------- * Check to see if we can avoid writing WAL * @@ -2518,10 +2699,6 @@ CopyFrom(CopyState cstate) ExecInitRangeTable(estate, cstate->range_table); - /* Set up a tuple slot too */ - myslot = ExecInitExtraTupleSlot(estate, tupDesc, - &TTSOpsHeapTuple); - /* * Set up a ModifyTableState so we can let FDW(s) init themselves for * foreign-table result relation(s). @@ -2565,10 +2742,11 @@ CopyFrom(CopyState cstate) &mtstate->ps); /* - * It's more efficient to prepare a bunch of tuples for insertion, and - * insert them in one heap_multi_insert() call, than call heap_insert() - * separately for every tuple. However, there are a number of reasons why - * we might not be able to do this. These are explained below. + * It's generally more efficient to prepare a bunch of tuples for + * insertion, and insert them in one table_multi_insert() call, than call + * table_insert() separately for every tuple. However, there are a number + * of reasons why we might not be able to do this. These are explained + * below. */ if (resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_before_row || @@ -2622,8 +2800,7 @@ CopyFrom(CopyState cstate) { /* * For partitioned tables, we may still be able to perform bulk - * inserts for sets of consecutive tuples which belong to the same - * partition. However, the possibility of this depends on which types + * inserts. However, the possibility of this depends on which types * of triggers exist on the partition. We must disable bulk inserts * if the partition is a foreign table or it has any before row insert * or insert instead triggers (same as we checked above for the parent @@ -2632,18 +2809,26 @@ CopyFrom(CopyState cstate) * have the intermediate insert method of CIM_MULTI_CONDITIONAL to * flag that we must later determine if we can use bulk-inserts for * the partition being inserted into. - * - * Normally, when performing bulk inserts we just flush the insert - * buffer whenever it becomes full, but for the partitioned table - * case, we flush it whenever the current tuple does not belong to the - * same partition as the previous tuple. */ if (proute) insertMethod = CIM_MULTI_CONDITIONAL; else insertMethod = CIM_MULTI; - bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); + CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo, + proute != NULL); + } + + /* + * If not using batch mode (which allocates slots as needed) set up a + * tuple slot too. When inserting into a partitioned table, we also need + * one, even if we might batch insert, to read the tuple in the root + * partition's form. + */ + if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) + { + singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); } has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && @@ -2660,9 +2845,6 @@ CopyFrom(CopyState cstate) */ ExecBSInsertTriggers(estate, resultRelInfo); - values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); - nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); - bistate = GetBulkInsertState(); econtext = GetPerTupleExprContext(estate); @@ -2672,17 +2854,9 @@ CopyFrom(CopyState cstate) errcallback.previous = error_context_stack; error_context_stack = &errcallback; - /* - * Set up memory context for batches. For cases without batching we could - * use the per-tuple context, but it's simpler to just use it every time. - */ - batchcontext = AllocSetContextCreate(CurrentMemoryContext, - "batch context", - ALLOCSET_DEFAULT_SIZES); - for (;;) { - TupleTableSlot *slot; + TupleTableSlot *myslot; bool skip_tuple; CHECK_FOR_INTERRUPTS(); @@ -2693,20 +2867,34 @@ CopyFrom(CopyState cstate) */ ResetPerTupleExprContext(estate); + if (insertMethod == CIM_SINGLE || proute) + { + myslot = singleslot; + Assert(myslot != NULL); + } + else + { + Assert(resultRelInfo == target_resultRelInfo); + Assert(insertMethod == CIM_MULTI); + + myslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo, + resultRelInfo, + &estate->es_tupleTable); + } + /* * Switch to per-tuple context before calling NextCopyFrom, which does * evaluate default expressions etc. and requires per-tuple context. */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - if (!NextCopyFrom(cstate, econtext, values, nulls)) - break; + ExecClearTuple(myslot); - /* Switch into per-batch memory context before forming the tuple. */ - MemoryContextSwitchTo(batchcontext); + /* Directly store the values/nulls array in the slot */ + if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) + break; - /* And now we can form the input tuple. */ - tuple = heap_form_tuple(tupDesc, values, nulls); + ExecStoreVirtualTuple(myslot); /* * Constraints might reference the tableoid column, so (re-)initialize @@ -2717,18 +2905,15 @@ CopyFrom(CopyState cstate) /* Triggers and stuff need to be invoked in query context. */ MemoryContextSwitchTo(oldcontext); - /* Place tuple in tuple slot --- but slot shouldn't free it */ - slot = myslot; - ExecStoreHeapTuple(tuple, slot, false); - if (cstate->whereClause) { econtext->ecxt_scantuple = myslot; + /* Skip items that don't match the COPY's WHERE clause */ if (!ExecQual(cstate->qualexpr, econtext)) continue; } - /* Determine the partition to heap_insert the tuple into */ + /* Determine the partition to table_insert the tuple into */ if (proute) { TupleConversionMap *map; @@ -2739,80 +2924,10 @@ CopyFrom(CopyState cstate) * if the found partition is not suitable for INSERTs. */ resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, - proute, slot, estate); + proute, myslot, estate); if (prevResultRelInfo != resultRelInfo) { - /* Check if we can multi-insert into this partition */ - if (insertMethod == CIM_MULTI_CONDITIONAL) - { - /* - * When performing bulk-inserts into partitioned tables we - * must insert the tuples seen so far to the heap whenever - * the partition changes. - */ - if (nBufferedTuples > 0) - { - MemoryContext oldcontext; - - CopyFromInsertBatch(cstate, estate, mycid, hi_options, - prevResultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, - firstBufferedLineNo); - nBufferedTuples = 0; - bufferedTuplesSize = 0; - - /* - * The tuple is already allocated in the batch context, which - * we want to reset. So to keep the tuple we copy it into the - * short-lived (per-tuple) context, reset the batch context - * and then copy it back into the per-batch one. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - tuple = heap_copytuple(tuple); - MemoryContextSwitchTo(oldcontext); - - /* cleanup the old batch */ - MemoryContextReset(batchcontext); - - /* copy the tuple back to the per-batch context */ - oldcontext = MemoryContextSwitchTo(batchcontext); - tuple = heap_copytuple(tuple); - MemoryContextSwitchTo(oldcontext); - - /* - * Also push the tuple copy to the slot (resetting the context - * invalidated the slot contents). - */ - ExecStoreHeapTuple(tuple, slot, false); - } - - nPartitionChanges++; - - /* - * Here we adaptively enable multi-inserts based on the - * average number of tuples from recent multi-insert - * batches. We recalculate the average every - * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking - * the average over the whole copy. This allows us to - * enable multi-inserts when we get periods in the copy - * stream that have tuples commonly belonging to the same - * partition, and disable when the partition is changing - * too often. - */ - if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno - - RECHECK_MULTI_INSERT_THRESHOLD) - && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD)) - { - avgTuplesPerPartChange = - (cstate->cur_lineno - lastPartitionSampleLineNo) / - (double) nPartitionChanges; - - lastPartitionSampleLineNo = cstate->cur_lineno; - nPartitionChanges = 0; - } - } - /* Determine which triggers exist on this partition */ has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row); @@ -2821,23 +2936,19 @@ CopyFrom(CopyState cstate) resultRelInfo->ri_TrigDesc->trig_insert_instead_row); /* - * Tests have shown that using multi-inserts when the - * partition changes on every tuple slightly decreases the - * performance, however, there are benefits even when only - * some batches have just 2 tuples, so let's enable - * multi-inserts even when the average is quite low. + * Enable multi-inserts when the partition has BEFORE/INSTEAD + * OF triggers, or if the partition is a foreign partition. */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && - avgTuplesPerPartChange >= 1.3 && - !has_before_insert_row_trig && - !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + !has_before_insert_row_trig && + !has_instead_insert_row_trig && + resultRelInfo->ri_FdwRoutine == NULL; + + /* Set the multi-insert buffer to use for this partition. */ + if (leafpart_use_multi_insert) + CopyMultiInsertInfo_SetCurrentBuffer(&multiInsertInfo, + resultRelInfo); - /* - * We'd better make the bulk insert mechanism gets a new - * buffer when the partition being inserted into changes. - */ - ReleaseBulkInsertStatePin(bistate); prevResultRelInfo = resultRelInfo; } @@ -2879,26 +2990,49 @@ CopyFrom(CopyState cstate) * rowtype. */ map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap; - if (map != NULL) + if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) { - TupleTableSlot *new_slot; - MemoryContext oldcontext; - - new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot; - Assert(new_slot != NULL); - - slot = execute_attr_map_slot(map->attrMap, slot, new_slot); + /* non batch insert */ + if (map != NULL) + { + TupleTableSlot *new_slot; + new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot; + myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot); + } + } + else + { /* - * Get the tuple in the per-batch context, so that it will be - * freed after each batch insert. + * Batch insert into partitioned table. */ - oldcontext = MemoryContextSwitchTo(batchcontext); - tuple = ExecCopySlotHeapTuple(slot); - MemoryContextSwitchTo(oldcontext); + TupleTableSlot *nextslot; + + /* no other path available for partitioned table */ + Assert(insertMethod == CIM_MULTI_CONDITIONAL); + + nextslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo, + resultRelInfo, + &estate->es_tupleTable); + + if (map != NULL) + myslot = execute_attr_map_slot(map->attrMap, myslot, nextslot); + else + { + /* + * This looks more expensive than it is (Believe me, I + * optimized it away. Twice). The input is in virtual + * form, and we'll materialize the slot below - for most + * slot types the copy performs the work materialization + * would later require anyway. + */ + ExecCopySlot(nextslot, myslot); + myslot = nextslot; + } } - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + /* ensure that triggers etc see the right relation */ + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } skip_tuple = false; @@ -2906,7 +3040,7 @@ CopyFrom(CopyState cstate) /* BEFORE ROW INSERT Triggers */ if (has_before_insert_row_trig) { - if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) + if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot)) skip_tuple = true; /* "do nothing" */ } @@ -2919,7 +3053,7 @@ CopyFrom(CopyState cstate) */ if (has_instead_insert_row_trig) { - ExecIRInsertTriggers(estate, resultRelInfo, slot); + ExecIRInsertTriggers(estate, resultRelInfo, myslot); } else { @@ -2931,12 +3065,7 @@ CopyFrom(CopyState cstate) */ if (resultRelInfo->ri_RelationDesc->rd_att->constr && resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) - { - ExecComputeStoredGenerated(estate, slot); - MemoryContextSwitchTo(batchcontext); - tuple = ExecCopySlotHeapTuple(slot); - MemoryContextSwitchTo(oldcontext); - } + ExecComputeStoredGenerated(estate, myslot); /* * If the target is a plain table, check the constraints of @@ -2944,7 +3073,7 @@ CopyFrom(CopyState cstate) */ if (resultRelInfo->ri_FdwRoutine == NULL && resultRelInfo->ri_RelationDesc->rd_att->constr) - ExecConstraints(resultRelInfo, slot, estate); + ExecConstraints(resultRelInfo, myslot, estate); /* * Also check the tuple against the partition constraint, if @@ -2954,7 +3083,7 @@ CopyFrom(CopyState cstate) */ if (resultRelInfo->ri_PartitionCheck && (proute == NULL || has_before_insert_row_trig)) - ExecPartitionCheck(resultRelInfo, slot, estate, true); + ExecPartitionCheck(resultRelInfo, myslot, estate, true); /* * Perform multi-inserts when enabled, or when loading a @@ -2964,30 +3093,21 @@ CopyFrom(CopyState cstate) if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) { /* Add this tuple to the tuple buffer */ - if (nBufferedTuples == 0) + if (CopyMultiInsertInfo_IsEmpty(&multiInsertInfo)) firstBufferedLineNo = cstate->cur_lineno; - bufferedTuples[nBufferedTuples++] = tuple; - bufferedTuplesSize += tuple->t_len; /* - * If the buffer filled up, flush it. Also flush if the - * total size of all the tuples in the buffer becomes - * large, to avoid using large amounts of memory for the - * buffer when the tuples are exceptionally wide. + * The slot previously might point into the per-tuple + * context. For batching it needs to be longer lived. */ - if (nBufferedTuples == MAX_BUFFERED_TUPLES || - bufferedTuplesSize > 65535) - { - CopyFromInsertBatch(cstate, estate, mycid, hi_options, - resultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, - firstBufferedLineNo); - nBufferedTuples = 0; - bufferedTuplesSize = 0; - - /* free memory occupied by tuples from the batch */ - MemoryContextReset(batchcontext); - } + ExecMaterializeSlot(myslot); + + CopyMultiInsertInfo_Store(&multiInsertInfo, myslot, cstate->line_buf.len); + + /* If the buffer filled up, flush it. */ + if (CopyMultiInsertInfo_IsFull(&multiInsertInfo)) + CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid, hi_options, + bistate, firstBufferedLineNo); } else { @@ -2996,12 +3116,12 @@ CopyFrom(CopyState cstate) /* OK, store the tuple */ if (resultRelInfo->ri_FdwRoutine != NULL) { - slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, - resultRelInfo, - slot, - NULL); + myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, + resultRelInfo, + myslot, + NULL); - if (slot == NULL) /* "do nothing" */ + if (myslot == NULL) /* "do nothing" */ continue; /* next tuple please */ /* @@ -3009,27 +3129,26 @@ CopyFrom(CopyState cstate) * column, so (re-)initialize tts_tableOid before * evaluating them. */ - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } else { - tuple = ExecFetchSlotHeapTuple(slot, true, NULL); - heap_insert(resultRelInfo->ri_RelationDesc, tuple, - mycid, hi_options, bistate); - ItemPointerCopy(&tuple->t_self, &slot->tts_tid); - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + /* OK, store the tuple and create index entries for it */ + table_insert(resultRelInfo->ri_RelationDesc, myslot, + mycid, hi_options, bistate); } + /* And create index entries for it */ if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(slot, + recheckIndexes = ExecInsertIndexTuples(myslot, estate, false, NULL, NIL); /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, slot, + ExecARInsertTriggers(estate, resultRelInfo, myslot, recheckIndexes, cstate->transition_capture); list_free(recheckIndexes); @@ -3046,31 +3165,15 @@ CopyFrom(CopyState cstate) } /* Flush any remaining buffered tuples */ - if (nBufferedTuples > 0) - { - if (insertMethod == CIM_MULTI_CONDITIONAL) - { - CopyFromInsertBatch(cstate, estate, mycid, hi_options, - prevResultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, - firstBufferedLineNo); - } - else - CopyFromInsertBatch(cstate, estate, mycid, hi_options, - resultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, - firstBufferedLineNo); - } + if (insertMethod != CIM_SINGLE && !CopyMultiInsertInfo_IsEmpty(&multiInsertInfo)) + CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid, hi_options, + bistate, firstBufferedLineNo); /* Done, clean up */ error_context_stack = errcallback.previous; - FreeBulkInsertState(bistate); - MemoryContextSwitchTo(oldcontext); - MemoryContextDelete(batchcontext); - /* * In the old protocol, tell pqcomm that we can process normal protocol * messages again. @@ -3084,9 +3187,6 @@ CopyFrom(CopyState cstate) /* Handle queued AFTER triggers */ AfterTriggerEndQuery(estate); - pfree(values); - pfree(nulls); - ExecResetTupleTable(estate->es_tupleTable, false); /* Allow the FDW to shut down */ @@ -3124,8 +3224,7 @@ CopyFrom(CopyState cstate) static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, - TupleTableSlot *myslot, BulkInsertState bistate, - int nBufferedTuples, HeapTuple *bufferedTuples, + BulkInsertState bistate, int nBufferedTuples, TupleTableSlot **bufferedSlots, uint64 firstBufferedLineNo) { MemoryContext oldcontext; @@ -3145,12 +3244,12 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, * before calling it. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - heap_multi_insert(resultRelInfo->ri_RelationDesc, - bufferedTuples, - nBufferedTuples, - mycid, - hi_options, - bistate); + table_multi_insert(resultRelInfo->ri_RelationDesc, + bufferedSlots, + nBufferedTuples, + mycid, + hi_options, + bistate); MemoryContextSwitchTo(oldcontext); /* @@ -3164,12 +3263,11 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, List *recheckIndexes; cstate->cur_lineno = firstBufferedLineNo + i; - ExecStoreHeapTuple(bufferedTuples[i], myslot, false); recheckIndexes = - ExecInsertIndexTuples(myslot, - estate, false, NULL, NIL); + ExecInsertIndexTuples(bufferedSlots[i], estate, false, NULL, + NIL); ExecARInsertTriggers(estate, resultRelInfo, - myslot, + bufferedSlots[i], recheckIndexes, cstate->transition_capture); list_free(recheckIndexes); } @@ -3186,13 +3284,15 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, for (i = 0; i < nBufferedTuples; i++) { cstate->cur_lineno = firstBufferedLineNo + i; - ExecStoreHeapTuple(bufferedTuples[i], myslot, false); ExecARInsertTriggers(estate, resultRelInfo, - myslot, + bufferedSlots[i], NIL, cstate->transition_capture); } } + for (i = 0; i < nBufferedTuples; i++) + ExecClearTuple(bufferedSlots[i]); + /* reset cur_lineno and line_buf_valid to what they were */ cstate->line_buf_valid = line_buf_valid; cstate->cur_lineno = save_cur_lineno; @@ -4995,11 +5095,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) DR_copy *myState = (DR_copy *) self; CopyState cstate = myState->cstate; - /* Make sure the tuple is fully deconstructed */ - slot_getallattrs(slot); - - /* And send the data */ - CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull); + /* Send the data */ + CopyOneRowTo(cstate, slot); myState->processed++; return true; diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4c077755d5..ed0e2de144 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -36,6 +36,7 @@ #define HEAP_INSERT_SPECULATIVE 0x0010 typedef struct BulkInsertStateData *BulkInsertState; +struct TupleTableSlot; #define MaxLockTupleMode LockTupleExclusive @@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate); -extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, +extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 873ad15313..3e06d78eea 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -322,6 +322,9 @@ typedef struct TableAmRoutine * ------------------------------------------------------------------------ */ + void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, + CommandId cid, int options, struct BulkInsertStateData *bistate); + /* see table_insert() for reference about parameters */ void (*tuple_insert) (Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate); @@ -1021,6 +1024,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, lockmode, update_indexes); } +/* + * table_multi_insert - insert multiple tuple into a table + */ +static inline void +table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, + CommandId cid, int options, struct BulkInsertStateData *bistate) +{ + rel->rd_tableam->multi_insert(rel, slots, nslots, + cid, options, bistate); +} + /* * Lock a tuple in the specified mode. * diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index dbd7ed0363..6f901dc8a9 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -479,6 +479,12 @@ typedef struct ResultRelInfo /* relation descriptor for root partitioned table */ Relation ri_PartitionRoot; + /* + * When batch inserting into a table, it might be necessary to have one + * slot per input row, up to the largest possible batch size. + */ + TupleTableSlot **ri_batchInsertSlots; + /* Additional information specific to partition tuple routing */ struct PartitionRoutingInfo *ri_PartitionInfo; } ResultRelInfo;