Re: COPY FROM WHEN condition - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | Re: COPY FROM WHEN condition |
Date | |
Msg-id | 87ef6kl2gf.fsf@alap4.lan Whole thread Raw |
In response to | Re: COPY FROM WHEN condition (David Rowley <david.rowley@2ndquadrant.com>) |
Responses |
Re: COPY FROM WHEN condition
|
List | pgsql-hackers |
On 2019-04-03 06:41:49 +1300, David Rowley wrote: > However, I've ended up not doing it that way as the patch requires > more than just an array of TupleTableSlots to be stored in the > ResultRelInfo, it'll pretty much need all of what I have in > CopyMultiInsertBuffer, which includes line numbers for error > reporting, a BulkInsertState and a counter to track how many of the > slots are used. I had thoughts that we could just tag the whole > CopyMultiInsertBuffer in ResultRelInfo, but that requires moving it > somewhere a bit more generic. Another thought would be that we have > something like "void *extra;" in ResultRelInfo that we can just borrow > for copy.c. It may be interesting to try this out to see if it saves > much in the way of performance. Hm, we could just forwad declare struct CopyMultiInsertBuffer in a header, and only define it in copy.c. That doesn't sound insane to me. > I've got the patch into a working state now and wrote a bunch of > comments. I've not really done a thorough read back of the code again. > I normally wait until the light is coming from a different angle > before doing that, but there does not seem to be much time to wait for > that in this case, so here's v2. Performance seems to be about the > same as what I reported yesterday. Cool. > +/* > + * Multi-Insert handling code for COPY FROM. Inserts are performed in > + * batches of up to MAX_BUFFERED_TUPLES. This should probably be moved to the top of the file. > + * When COPY FROM is used on a partitioned table, we attempt to maintain > + * only MAX_PARTITION_BUFFERS buffers at once. Buffers that are completely > + * unused in each batch are removed so that we end up not keeping buffers for > + * partitions that we might not insert into again. > + */ > +#define MAX_BUFFERED_TUPLES 1000 > +#define MAX_BUFFERED_BYTES 65535 > +#define MAX_PARTITION_BUFFERS 15 > +/* > + * CopyMultiInsertBuffer > + * Stores multi-insert data related to a single relation in CopyFrom. > + */ > +typedef struct Please don't create anonymous structs - they can't be forward declared, and some debugging tools handle them worse than named structs. And it makes naming CopyMultiInsertBuffer in the comment less necessary. > +{ > + Oid relid; /* Relation ID this insert data is for */ > + TupleTableSlot **slots; /* Array of MAX_BUFFERED_TUPLES to store > + * tuples */ > + uint64 *linenos; /* Line # of tuple in copy stream */ It could make sense to allocate those two together, or even as part of the CopyMultiInsertBuffer itself, to reduce allocator overhead. > + else > + { > + CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); > + > + /* Non-partitioned table. Just setup the buffer for the table. */ > + buffer->relid = RelationGetRelid(rri->ri_RelationDesc); > + buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *)); > + buffer->linenos = palloc(MAX_BUFFERED_TUPLES * sizeof(uint64)); > + buffer->resultRelInfo = rri; > + buffer->bistate = GetBulkInsertState(); > + buffer->nused = 0; > + miinfo->multiInsertBufferTab = NULL; > + miinfo->buffer = buffer; > + miinfo->nbuffers = 1; > + } Can this be moved into a helper function? > + /* > + * heap_multi_insert leaks memory, so switch to short-lived memory context > + * before calling it. > + */ s/heap_multi_insert/table_multi_insert/ > + /* > + * If there are any indexes, update them for all the inserted tuples, and > + * run AFTER ROW INSERT triggers. > + */ > + if (resultRelInfo->ri_NumIndices > 0) > + { > + for (i = 0; i < nBufferedTuples; i++) > + { > + List *recheckIndexes; > + > + cstate->cur_lineno = buffer->linenos[i]; > + recheckIndexes = > + ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, > + NIL); > + ExecARInsertTriggers(estate, resultRelInfo, > + buffer->slots[i], > + recheckIndexes, cstate->transition_capture); > + list_free(recheckIndexes); > + } > + } > + > + /* > + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers > + * anyway. > + */ > + else if (resultRelInfo->ri_TrigDesc != NULL && > + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || > + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) > + { > + for (i = 0; i < nBufferedTuples; i++) > + { > + cstate->cur_lineno = buffer->linenos[i]; > + ExecARInsertTriggers(estate, resultRelInfo, > + bufferedSlots[i], > + NIL, cstate->transition_capture); > + } > + } > + for (i = 0; i < nBufferedTuples; i++) > + ExecClearTuple(bufferedSlots[i]); I wonder about combining these loops somehow. But it's probably ok. > +/* > + * CopyMultiInsertBuffer_Cleanup > + * Drop used slots and free member for this buffer. The buffer > + * must be flushed before cleanup. > + */ > +static inline void > +CopyMultiInsertBuffer_Cleanup(CopyMultiInsertBuffer *buffer) > +{ > + int i; > + > + ReleaseBulkInsertStatePin(buffer->bistate); Shouldn't this FreeBulkInsertState() rather than ReleaseBulkInsertStatePin()? > + > +/* > + * CopyMultiInsertBuffer_RemoveBuffer > + * Remove a buffer from being tracked by miinfo > + */ > +static inline void > +CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo, > + CopyMultiInsertBuffer *buffer) > +{ > + Oid relid = buffer->relid; > + > + CopyMultiInsertBuffer_Cleanup(buffer); > + > + hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE, > + NULL); > + miinfo->nbuffers--; Aren't we leaking the CopyMultiInsertBuffer itself here? > +} > + > +/* > + * CopyMultiInsertInfo_Flush > + * Write out all stored tuples in all buffers out to the tables. > + * > + * To save us from ending up with buffers for 1000s of partitions we remove > + * buffers belonging to partitions that we've seen no tuples for in this batch > + */ > +static inline void > +CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, CopyState cstate, > + EState *estate, CommandId mycid, int ti_options) > +{ > + CopyMultiInsertBuffer *buffer; > + > + /* > + * If just storing buffers for a non-partitioned table, then just flush > + * that buffer. > + */ > + if (miinfo->multiInsertBufferTab == NULL) > + { > + buffer = miinfo->buffer; > + > + CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate, mycid, > + ti_options); > + } > + else > + { > + HASH_SEQ_STATUS status; > + > + /* > + * Otherwise make a pass over the hash table and flush all buffers > + * that have any tuples stored in them. > + */ > + hash_seq_init(&status, miinfo->multiInsertBufferTab); > + > + while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL) > + { > + if (buffer->nused > 0) > + { > + /* Flush the buffer if it was used */ > + CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate, > + mycid, ti_options); > + } > + else > + { > + /* > + * Otherwise just remove it. If we saw no tuples for it this > + * batch, then likely its best to make way for buffers for > + * other partitions. > + */ > + CopyMultiInsertBuffer_RemoveBuffer(miinfo, buffer); > + } > + } > + } > + > + miinfo->bufferedTuples = 0; > + miinfo->bufferedBytes = 0; > +} > + > +/* > + * CopyMultiInsertInfo_Cleanup > + * Cleanup allocated buffers and free memory > + */ > +static inline void > +CopyMultiInsertInfo_Cleanup(CopyMultiInsertInfo *miinfo) > +{ > + if (miinfo->multiInsertBufferTab == NULL) > + CopyMultiInsertBuffer_Cleanup(miinfo->buffer); > + else > + { > + HASH_SEQ_STATUS status; > + CopyMultiInsertBuffer *buffer; > + > + hash_seq_init(&status, miinfo->multiInsertBufferTab); > + > + while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL) > + { > + Assert(buffer->nused == 0); > + CopyMultiInsertBuffer_Cleanup(buffer); > + } > + > + hash_destroy(miinfo->multiInsertBufferTab); > + } > +} > + > +/* > + * CopyMultiInsertInfo_NextFreeSlot > + * Get the next TupleTableSlot that the next tuple should be stored in. > + * > + * Callers must ensure that the buffer is not full. > + */ > +static inline TupleTableSlot * > +CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo, > + ResultRelInfo *rri) > +{ > + CopyMultiInsertBuffer *buffer = miinfo->buffer; > + int nused = buffer->nused; > + > + Assert(nused < MAX_BUFFERED_TUPLES); > + > + if (buffer->slots[nused] == NULL) > + buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); > + return buffer->slots[nused]; > +} > + > +/* > + * CopyMultiInsertInfo_Store > + * Consume the previously reserved TupleTableSlot that was reserved by > + * CopyMultiInsertInfo_NextFreeSlot. > + */ > +static inline void > +CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, TupleTableSlot *slot, > + int tuplen, uint64 lineno) > +{ > + CopyMultiInsertBuffer *buffer = miinfo->buffer; > + > + Assert(slot == buffer->slots[buffer->nused]); > + > + /* Store the line number so we can properly report any errors later */ > + buffer->linenos[buffer->nused] = lineno; > + > + /* Record this slot as being used */ > + buffer->nused++; > + > + /* Update how many tuples are stored and their size */ > + miinfo->bufferedTuples++; > + miinfo->bufferedBytes += tuplen; > +} > + > /* > * Copy FROM file to relation. > */ > uint64 > CopyFrom(CopyState cstate) > { > - HeapTuple tuple; > - TupleDesc tupDesc; > - Datum *values; > - bool *nulls; > ResultRelInfo *resultRelInfo; > ResultRelInfo *target_resultRelInfo; > ResultRelInfo *prevResultRelInfo = NULL; > EState *estate = CreateExecutorState(); /* for ExecConstraints() */ > ModifyTableState *mtstate; > ExprContext *econtext; > - TupleTableSlot *myslot; > + TupleTableSlot *singleslot = NULL; > MemoryContext oldcontext = CurrentMemoryContext; > - MemoryContext batchcontext; > > PartitionTupleRouting *proute = NULL; > ErrorContextCallback errcallback; > CommandId mycid = GetCurrentCommandId(true); > int ti_options = 0; /* start with default table_insert options */ > - BulkInsertState bistate; > + BulkInsertState bistate = NULL; > CopyInsertMethod insertMethod; > + CopyMultiInsertInfo multiInsertInfo; > uint64 processed = 0; > - int nBufferedTuples = 0; > bool has_before_insert_row_trig; > bool has_instead_insert_row_trig; > bool leafpart_use_multi_insert = false; > > -#define MAX_BUFFERED_TUPLES 1000 > -#define RECHECK_MULTI_INSERT_THRESHOLD 1000 > - HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ > - Size bufferedTuplesSize = 0; > - uint64 firstBufferedLineNo = 0; > - uint64 lastPartitionSampleLineNo = 0; > - uint64 nPartitionChanges = 0; > - double avgTuplesPerPartChange = 0; > - > Assert(cstate->rel); > > + memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo)); > + > /* > * The target must be a plain, foreign, or partitioned relation, or have > * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only > @@ -2382,8 +2767,6 @@ CopyFrom(CopyState cstate) > RelationGetRelationName(cstate->rel)))); > } > > - tupDesc = RelationGetDescr(cstate->rel); > - > /*---------- > * Check to see if we can avoid writing WAL > * > @@ -2467,8 +2850,8 @@ CopyFrom(CopyState cstate) > if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) > { > ereport(ERROR, > - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), > - errmsg("cannot perform FREEZE on a partitioned table"))); > + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), > + errmsg("cannot perform FREEZE on a partitioned table"))); > } > > /* > @@ -2518,10 +2901,6 @@ CopyFrom(CopyState cstate) > > ExecInitRangeTable(estate, cstate->range_table); > > - /* Set up a tuple slot too */ > - myslot = ExecInitExtraTupleSlot(estate, tupDesc, > - &TTSOpsHeapTuple); > - > /* > * Set up a ModifyTableState so we can let FDW(s) init themselves for > * foreign-table result relation(s). > @@ -2565,10 +2944,11 @@ CopyFrom(CopyState cstate) > &mtstate->ps); > > /* > - * It's more efficient to prepare a bunch of tuples for insertion, and > - * insert them in one heap_multi_insert() call, than call heap_insert() > - * separately for every tuple. However, there are a number of reasons why > - * we might not be able to do this. These are explained below. > + * It's generally more efficient to prepare a bunch of tuples for > + * insertion, and insert them in one table_multi_insert() call, than call > + * table_insert() separately for every tuple. However, there are a number > + * of reasons why we might not be able to do this. These are explained > + * below. > */ > if (resultRelInfo->ri_TrigDesc != NULL && > (resultRelInfo->ri_TrigDesc->trig_insert_before_row || > @@ -2589,8 +2969,8 @@ CopyFrom(CopyState cstate) > * For partitioned tables we can't support multi-inserts when there > * are any statement level insert triggers. It might be possible to > * allow partitioned tables with such triggers in the future, but for > - * now, CopyFromInsertBatch expects that any before row insert and > - * statement level insert triggers are on the same relation. > + * now, CopyMultiInsertInfo_Flush expects that any before row insert > + * and statement level insert triggers are on the same relation. > */ > insertMethod = CIM_SINGLE; > } > @@ -2622,8 +3002,7 @@ CopyFrom(CopyState cstate) > { > /* > * For partitioned tables, we may still be able to perform bulk > - * inserts for sets of consecutive tuples which belong to the same > - * partition. However, the possibility of this depends on which types > + * inserts. However, the possibility of this depends on which types > * of triggers exist on the partition. We must disable bulk inserts > * if the partition is a foreign table or it has any before row insert > * or insert instead triggers (same as we checked above for the parent > @@ -2632,18 +3011,27 @@ CopyFrom(CopyState cstate) > * have the intermediate insert method of CIM_MULTI_CONDITIONAL to > * flag that we must later determine if we can use bulk-inserts for > * the partition being inserted into. > - * > - * Normally, when performing bulk inserts we just flush the insert > - * buffer whenever it becomes full, but for the partitioned table > - * case, we flush it whenever the current tuple does not belong to the > - * same partition as the previous tuple. > */ > if (proute) > insertMethod = CIM_MULTI_CONDITIONAL; > else > insertMethod = CIM_MULTI; > > - bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); > + CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo, > + proute != NULL); > + } > + > + /* > + * If not using batch mode (which allocates slots as needed) set up a > + * tuple slot too. When inserting into a partitioned table, we also need > + * one, even if we might batch insert, to read the tuple in the root > + * partition's form. > + */ > + if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) > + { > + singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, > + &estate->es_tupleTable); > + bistate = GetBulkInsertState(); > } > > has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && > @@ -2660,10 +3048,6 @@ CopyFrom(CopyState cstate) > */ > ExecBSInsertTriggers(estate, resultRelInfo); > > - values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); > - nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); > - > - bistate = GetBulkInsertState(); > econtext = GetPerTupleExprContext(estate); > > /* Set up callback to identify error line number */ > @@ -2672,17 +3056,9 @@ CopyFrom(CopyState cstate) > errcallback.previous = error_context_stack; > error_context_stack = &errcallback; > > - /* > - * Set up memory context for batches. For cases without batching we could > - * use the per-tuple context, but it's simpler to just use it every time. > - */ > - batchcontext = AllocSetContextCreate(CurrentMemoryContext, > - "batch context", > - ALLOCSET_DEFAULT_SIZES); > - > for (;;) > { > - TupleTableSlot *slot; > + TupleTableSlot *myslot; > bool skip_tuple; > > CHECK_FOR_INTERRUPTS(); > @@ -2693,20 +3069,33 @@ CopyFrom(CopyState cstate) > */ > ResetPerTupleExprContext(estate); > > + if (insertMethod == CIM_SINGLE || proute) > + { > + myslot = singleslot; > + Assert(myslot != NULL); > + } > + else > + { > + Assert(resultRelInfo == target_resultRelInfo); > + Assert(insertMethod == CIM_MULTI); > + > + myslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo, > + resultRelInfo); > + } > + > /* > * Switch to per-tuple context before calling NextCopyFrom, which does > * evaluate default expressions etc. and requires per-tuple context. > */ > MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); > > - if (!NextCopyFrom(cstate, econtext, values, nulls)) > - break; > + ExecClearTuple(myslot); > > - /* Switch into per-batch memory context before forming the tuple. */ > - MemoryContextSwitchTo(batchcontext); > + /* Directly store the values/nulls array in the slot */ > + if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) > + break; > > - /* And now we can form the input tuple. */ > - tuple = heap_form_tuple(tupDesc, values, nulls); > + ExecStoreVirtualTuple(myslot); > > /* > * Constraints might reference the tableoid column, so (re-)initialize > @@ -2717,18 +3106,15 @@ CopyFrom(CopyState cstate) > /* Triggers and stuff need to be invoked in query context. */ > MemoryContextSwitchTo(oldcontext); > > - /* Place tuple in tuple slot --- but slot shouldn't free it */ > - slot = myslot; > - ExecStoreHeapTuple(tuple, slot, false); > - > if (cstate->whereClause) > { > econtext->ecxt_scantuple = myslot; > + /* Skip items that don't match the COPY's WHERE clause */ > if (!ExecQual(cstate->qualexpr, econtext)) > continue; > } > > - /* Determine the partition to heap_insert the tuple into */ > + /* Determine the partition to table_insert the tuple into */ > if (proute) > { > TupleConversionMap *map; > @@ -2739,80 +3125,10 @@ CopyFrom(CopyState cstate) > * if the found partition is not suitable for INSERTs. > */ > resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, > - proute, slot, estate); > + proute, myslot, estate); > > if (prevResultRelInfo != resultRelInfo) > { > - /* Check if we can multi-insert into this partition */ > - if (insertMethod == CIM_MULTI_CONDITIONAL) > - { > - /* > - * When performing bulk-inserts into partitioned tables we > - * must insert the tuples seen so far to the heap whenever > - * the partition changes. > - */ > - if (nBufferedTuples > 0) > - { > - MemoryContext oldcontext; > - > - CopyFromInsertBatch(cstate, estate, mycid, ti_options, > - prevResultRelInfo, myslot, bistate, > - nBufferedTuples, bufferedTuples, > - firstBufferedLineNo); > - nBufferedTuples = 0; > - bufferedTuplesSize = 0; > - > - /* > - * The tuple is already allocated in the batch context, which > - * we want to reset. So to keep the tuple we copy it into the > - * short-lived (per-tuple) context, reset the batch context > - * and then copy it back into the per-batch one. > - */ > - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); > - tuple = heap_copytuple(tuple); > - MemoryContextSwitchTo(oldcontext); > - > - /* cleanup the old batch */ > - MemoryContextReset(batchcontext); > - > - /* copy the tuple back to the per-batch context */ > - oldcontext = MemoryContextSwitchTo(batchcontext); > - tuple = heap_copytuple(tuple); > - MemoryContextSwitchTo(oldcontext); > - > - /* > - * Also push the tuple copy to the slot (resetting the context > - * invalidated the slot contents). > - */ > - ExecStoreHeapTuple(tuple, slot, false); > - } > - > - nPartitionChanges++; > - > - /* > - * Here we adaptively enable multi-inserts based on the > - * average number of tuples from recent multi-insert > - * batches. We recalculate the average every > - * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking > - * the average over the whole copy. This allows us to > - * enable multi-inserts when we get periods in the copy > - * stream that have tuples commonly belonging to the same > - * partition, and disable when the partition is changing > - * too often. > - */ > - if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno - > - RECHECK_MULTI_INSERT_THRESHOLD) > - && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD)) > - { > - avgTuplesPerPartChange = > - (cstate->cur_lineno - lastPartitionSampleLineNo) / > - (double) nPartitionChanges; > - > - lastPartitionSampleLineNo = cstate->cur_lineno; > - nPartitionChanges = 0; > - } > - } > - > /* Determine which triggers exist on this partition */ > has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && > resultRelInfo->ri_TrigDesc->trig_insert_before_row); > @@ -2821,22 +3137,19 @@ CopyFrom(CopyState cstate) > resultRelInfo->ri_TrigDesc->trig_insert_instead_row); > > /* > - * Tests have shown that using multi-inserts when the > - * partition changes on every tuple slightly decreases the > - * performance, however, there are benefits even when only > - * some batches have just 2 tuples, so let's enable > - * multi-inserts even when the average is quite low. > + * Enable multi-inserts when the partition has BEFORE/INSTEAD > + * OF triggers, or if the partition is a foreign partition. > */ > leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && > - avgTuplesPerPartChange >= 1.3 && > !has_before_insert_row_trig && > !has_instead_insert_row_trig && > resultRelInfo->ri_FdwRoutine == NULL; > > - /* > - * We'd better make the bulk insert mechanism gets a new > - * buffer when the partition being inserted into changes. > - */ > + /* Set the multi-insert buffer to use for this partition. */ > + if (leafpart_use_multi_insert) > + CopyMultiInsertInfo_SetCurrentBuffer(&multiInsertInfo, > + resultRelInfo); > + > ReleaseBulkInsertStatePin(bistate); > prevResultRelInfo = resultRelInfo; > } > @@ -2879,26 +3192,48 @@ CopyFrom(CopyState cstate) > * rowtype. > */ > map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap; > - if (map != NULL) > + if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) > { > - TupleTableSlot *new_slot; > - MemoryContext oldcontext; > - > - new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot; > - Assert(new_slot != NULL); > - > - slot = execute_attr_map_slot(map->attrMap, slot, new_slot); > + /* non batch insert */ > + if (map != NULL) > + { > + TupleTableSlot *new_slot; > > + new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot; > + myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot); > + } > + } > + else > + { > /* > - * Get the tuple in the per-batch context, so that it will be > - * freed after each batch insert. > + * Batch insert into partitioned table. > */ > - oldcontext = MemoryContextSwitchTo(batchcontext); > - tuple = ExecCopySlotHeapTuple(slot); > - MemoryContextSwitchTo(oldcontext); > + TupleTableSlot *nextslot; > + > + /* no other path available for partitioned table */ > + Assert(insertMethod == CIM_MULTI_CONDITIONAL); > + > + nextslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo, > + resultRelInfo); > + > + if (map != NULL) > + myslot = execute_attr_map_slot(map->attrMap, myslot, nextslot); > + else > + { > + /* > + * This looks more expensive than it is (Believe me, I > + * optimized it away. Twice). The input is in virtual > + * form, and we'll materialize the slot below - for most > + * slot types the copy performs the work materialization > + * would later require anyway. > + */ > + ExecCopySlot(nextslot, myslot); > + myslot = nextslot; > + } > } > > - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); > + /* ensure that triggers etc see the right relation */ > + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); > } > > skip_tuple = false; > @@ -2906,7 +3241,7 @@ CopyFrom(CopyState cstate) > /* BEFORE ROW INSERT Triggers */ > if (has_before_insert_row_trig) > { > - if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) > + if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot)) > skip_tuple = true; /* "do nothing" */ > } > > @@ -2919,7 +3254,7 @@ CopyFrom(CopyState cstate) > */ > if (has_instead_insert_row_trig) > { > - ExecIRInsertTriggers(estate, resultRelInfo, slot); > + ExecIRInsertTriggers(estate, resultRelInfo, myslot); > } > else > { > @@ -2931,12 +3266,7 @@ CopyFrom(CopyState cstate) > */ > if (resultRelInfo->ri_RelationDesc->rd_att->constr && > resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) > - { > - ExecComputeStoredGenerated(estate, slot); > - MemoryContextSwitchTo(batchcontext); > - tuple = ExecCopySlotHeapTuple(slot); > - MemoryContextSwitchTo(oldcontext); > - } > + ExecComputeStoredGenerated(estate, myslot); > > /* > * If the target is a plain table, check the constraints of > @@ -2944,7 +3274,7 @@ CopyFrom(CopyState cstate) > */ > if (resultRelInfo->ri_FdwRoutine == NULL && > resultRelInfo->ri_RelationDesc->rd_att->constr) > - ExecConstraints(resultRelInfo, slot, estate); > + ExecConstraints(resultRelInfo, myslot, estate); > > /* > * Also check the tuple against the partition constraint, if > @@ -2954,7 +3284,7 @@ CopyFrom(CopyState cstate) > */ > if (resultRelInfo->ri_PartitionCheck && > (proute == NULL || has_before_insert_row_trig)) > - ExecPartitionCheck(resultRelInfo, slot, estate, true); > + ExecPartitionCheck(resultRelInfo, myslot, estate, true); > > /* > * Perform multi-inserts when enabled, or when loading a > @@ -2963,31 +3293,21 @@ CopyFrom(CopyState cstate) > */ > if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) > { > - /* Add this tuple to the tuple buffer */ > - if (nBufferedTuples == 0) > - firstBufferedLineNo = cstate->cur_lineno; > - bufferedTuples[nBufferedTuples++] = tuple; > - bufferedTuplesSize += tuple->t_len; > - > /* > - * If the buffer filled up, flush it. Also flush if the > - * total size of all the tuples in the buffer becomes > - * large, to avoid using large amounts of memory for the > - * buffer when the tuples are exceptionally wide. > + * The slot previously might point into the per-tuple > + * context. For batching it needs to be longer lived. > */ > - if (nBufferedTuples == MAX_BUFFERED_TUPLES || > - bufferedTuplesSize > 65535) > - { > - CopyFromInsertBatch(cstate, estate, mycid, ti_options, > - resultRelInfo, myslot, bistate, > - nBufferedTuples, bufferedTuples, > - firstBufferedLineNo); > - nBufferedTuples = 0; > - bufferedTuplesSize = 0; > - > - /* free memory occupied by tuples from the batch */ > - MemoryContextReset(batchcontext); > - } > + ExecMaterializeSlot(myslot); > + > + /* Add this tuple to the tuple buffer */ > + CopyMultiInsertInfo_Store(&multiInsertInfo, myslot, > + cstate->line_buf.len, > + cstate->cur_lineno); > + > + /* If the buffer filled up, flush it. */ > + if (CopyMultiInsertInfo_IsFull(&multiInsertInfo)) > + CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, > + estate, mycid, ti_options); > } > else > { > @@ -2996,12 +3316,12 @@ CopyFrom(CopyState cstate) > /* OK, store the tuple */ > if (resultRelInfo->ri_FdwRoutine != NULL) > { > - slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, > - resultRelInfo, > - slot, > - NULL); > + myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, > + resultRelInfo, > + myslot, > + NULL); > > - if (slot == NULL) /* "do nothing" */ > + if (myslot == NULL) /* "do nothing" */ > continue; /* next tuple please */ > > /* > @@ -3009,27 +3329,26 @@ CopyFrom(CopyState cstate) > * column, so (re-)initialize tts_tableOid before > * evaluating them. > */ > - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); > + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); > } > else > { > - tuple = ExecFetchSlotHeapTuple(slot, true, NULL); > - heap_insert(resultRelInfo->ri_RelationDesc, tuple, > - mycid, ti_options, bistate); > - ItemPointerCopy(&tuple->t_self, &slot->tts_tid); > - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); > + /* OK, store the tuple and create index entries for it */ > + table_insert(resultRelInfo->ri_RelationDesc, myslot, > + mycid, ti_options, bistate); > } > > + > /* And create index entries for it */ > if (resultRelInfo->ri_NumIndices > 0) > - recheckIndexes = ExecInsertIndexTuples(slot, > + recheckIndexes = ExecInsertIndexTuples(myslot, > estate, > false, > NULL, > NIL); > > /* AFTER ROW INSERT Triggers */ > - ExecARInsertTriggers(estate, resultRelInfo, slot, > + ExecARInsertTriggers(estate, resultRelInfo, myslot, > recheckIndexes, cstate->transition_capture); > > list_free(recheckIndexes); > @@ -3045,32 +3364,25 @@ CopyFrom(CopyState cstate) > } > } > > - /* Flush any remaining buffered tuples */ > - if (nBufferedTuples > 0) > + if (insertMethod != CIM_SINGLE) > { > - if (insertMethod == CIM_MULTI_CONDITIONAL) > - { > - CopyFromInsertBatch(cstate, estate, mycid, ti_options, > - prevResultRelInfo, myslot, bistate, > - nBufferedTuples, bufferedTuples, > - firstBufferedLineNo); > - } > - else > - CopyFromInsertBatch(cstate, estate, mycid, ti_options, > - resultRelInfo, myslot, bistate, > - nBufferedTuples, bufferedTuples, > - firstBufferedLineNo); > + /* Flush any remaining buffered tuples */ > + if (!CopyMultiInsertInfo_IsEmpty(&multiInsertInfo)) > + CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid, > + ti_options); > + > + /* Tear down the multi-insert data */ > + CopyMultiInsertInfo_Cleanup(&multiInsertInfo); > } > > /* Done, clean up */ > error_context_stack = errcallback.previous; > > - FreeBulkInsertState(bistate); > + if (bistate != NULL) > + ReleaseBulkInsertStatePin(bistate); > > MemoryContextSwitchTo(oldcontext); > > - MemoryContextDelete(batchcontext); > - > /* > * In the old protocol, tell pqcomm that we can process normal protocol > * messages again. > @@ -3084,9 +3396,6 @@ CopyFrom(CopyState cstate) > /* Handle queued AFTER triggers */ > AfterTriggerEndQuery(estate); > > - pfree(values); > - pfree(nulls); > - > ExecResetTupleTable(estate->es_tupleTable, false); > > /* Allow the FDW to shut down */ > @@ -3111,88 +3420,6 @@ CopyFrom(CopyState cstate) > return processed; > } > > -/* > - * A subroutine of CopyFrom, to write the current batch of buffered heap > - * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT > - * triggers. > - */ > -static void > -CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, > - int ti_options, ResultRelInfo *resultRelInfo, > - TupleTableSlot *myslot, BulkInsertState bistate, > - int nBufferedTuples, HeapTuple *bufferedTuples, > - uint64 firstBufferedLineNo) > -{ > - MemoryContext oldcontext; > - int i; > - uint64 save_cur_lineno; > - bool line_buf_valid = cstate->line_buf_valid; > - > - /* > - * Print error context information correctly, if one of the operations > - * below fail. > - */ > - cstate->line_buf_valid = false; > - save_cur_lineno = cstate->cur_lineno; > - > - /* > - * heap_multi_insert leaks memory, so switch to short-lived memory context > - * before calling it. > - */ > - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); > - heap_multi_insert(resultRelInfo->ri_RelationDesc, > - bufferedTuples, > - nBufferedTuples, > - mycid, > - ti_options, > - bistate); > - MemoryContextSwitchTo(oldcontext); > - > - /* > - * If there are any indexes, update them for all the inserted tuples, and > - * run AFTER ROW INSERT triggers. > - */ > - if (resultRelInfo->ri_NumIndices > 0) > - { > - for (i = 0; i < nBufferedTuples; i++) > - { > - List *recheckIndexes; > - > - cstate->cur_lineno = firstBufferedLineNo + i; > - ExecStoreHeapTuple(bufferedTuples[i], myslot, false); > - recheckIndexes = > - ExecInsertIndexTuples(myslot, > - estate, false, NULL, NIL); > - ExecARInsertTriggers(estate, resultRelInfo, > - myslot, > - recheckIndexes, cstate->transition_capture); > - list_free(recheckIndexes); > - } > - } > - > - /* > - * There's no indexes, but see if we need to run AFTER ROW INSERT triggers > - * anyway. > - */ > - else if (resultRelInfo->ri_TrigDesc != NULL && > - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || > - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) > - { > - for (i = 0; i < nBufferedTuples; i++) > - { > - cstate->cur_lineno = firstBufferedLineNo + i; > - ExecStoreHeapTuple(bufferedTuples[i], myslot, false); > - ExecARInsertTriggers(estate, resultRelInfo, > - myslot, > - NIL, cstate->transition_capture); > - } > - } > - > - /* reset cur_lineno and line_buf_valid to what they were */ > - cstate->line_buf_valid = line_buf_valid; > - cstate->cur_lineno = save_cur_lineno; > -} > - > /* > * Setup to read tuples from a file for COPY FROM. > * > @@ -4990,11 +5217,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) > DR_copy *myState = (DR_copy *) self; > CopyState cstate = myState->cstate; > > - /* Make sure the tuple is fully deconstructed */ > - slot_getallattrs(slot); > - > - /* And send the data */ > - CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull); > + /* Send the data */ > + CopyOneRowTo(cstate, slot); > myState->processed++; > > return true; > diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h > index 4c077755d5..ed0e2de144 100644 > --- a/src/include/access/heapam.h > +++ b/src/include/access/heapam.h > @@ -36,6 +36,7 @@ > #define HEAP_INSERT_SPECULATIVE 0x0010 > > typedef struct BulkInsertStateData *BulkInsertState; > +struct TupleTableSlot; > > #define MaxLockTupleMode LockTupleExclusive > > @@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); > > extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, > int options, BulkInsertState bistate); > -extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, > +extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, > CommandId cid, int options, BulkInsertState bistate); > extern TM_Result heap_delete(Relation relation, ItemPointer tid, > CommandId cid, Snapshot crosscheck, bool wait, > diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h > index 4efe178ed1..c2fdedc551 100644 > --- a/src/include/access/tableam.h > +++ b/src/include/access/tableam.h > @@ -328,6 +328,9 @@ typedef struct TableAmRoutine > * ------------------------------------------------------------------------ > */ > > + void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, > + CommandId cid, int options, struct BulkInsertStateData *bistate); > + > /* see table_insert() for reference about parameters */ > void (*tuple_insert) (Relation rel, TupleTableSlot *slot, > CommandId cid, int options, > @@ -1157,6 +1160,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, > lockmode, update_indexes); > } > > +/* > + * table_multi_insert - insert multiple tuple into a table > + */ > +static inline void > +table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, > + CommandId cid, int options, struct BulkInsertStateData *bistate) > +{ > + rel->rd_tableam->multi_insert(rel, slots, nslots, > + cid, options, bistate); > +} > + > /* > * Lock a tuple in the specified mode. > * Greetings, Andres Freund
pgsql-hackers by date: