Re: COPY FROM WHEN condition - Mailing list pgsql-hackers

From Andres Freund
Subject Re: COPY FROM WHEN condition
Date
Msg-id 87ef6kl2gf.fsf@alap4.lan
Whole thread Raw
In response to Re: COPY FROM WHEN condition  (David Rowley <david.rowley@2ndquadrant.com>)
Responses Re: COPY FROM WHEN condition
List pgsql-hackers
On 2019-04-03 06:41:49 +1300, David Rowley wrote:
> However, I've ended up not doing it that way as the patch requires
> more than just an array of TupleTableSlots to be stored in the
> ResultRelInfo, it'll pretty much need all of what I have in
> CopyMultiInsertBuffer, which includes line numbers for error
> reporting, a BulkInsertState and a counter to track how many of the
> slots are used.  I had thoughts that we could just tag the whole
> CopyMultiInsertBuffer in ResultRelInfo, but that requires moving it
> somewhere a bit more generic. Another thought would be that we have
> something like "void *extra;" in ResultRelInfo that we can just borrow
> for copy.c.  It may be interesting to try this out to see if it saves
> much in the way of performance.

Hm, we could just forwad declare struct CopyMultiInsertBuffer in a
header, and only define it in copy.c. That doesn't sound insane to me.


> I've got the patch into a working state now and wrote a bunch of
> comments. I've not really done a thorough read back of the code again.
> I normally wait until the light is coming from a different angle
> before doing that, but there does not seem to be much time to wait for
> that in this case, so here's v2.  Performance seems to be about the
> same as what I reported yesterday.

Cool.


> +/*
> + * Multi-Insert handling code for COPY FROM.  Inserts are performed in
> + * batches of up to MAX_BUFFERED_TUPLES.

This should probably be moved to the top of the file.


> + * When COPY FROM is used on a partitioned table, we attempt to maintain
> + * only MAX_PARTITION_BUFFERS buffers at once.  Buffers that are completely
> + * unused in each batch are removed so that we end up not keeping buffers for
> + * partitions that we might not insert into again.
> + */
> +#define MAX_BUFFERED_TUPLES        1000
> +#define MAX_BUFFERED_BYTES        65535
> +#define MAX_PARTITION_BUFFERS    15

> +/*
> + * CopyMultiInsertBuffer
> + *        Stores multi-insert data related to a single relation in CopyFrom.
> + */
> +typedef struct

Please don't create anonymous structs - they can't be forward declared,
and some debugging tools handle them worse than named structs. And it
makes naming CopyMultiInsertBuffer in the comment less necessary.


> +{
> +    Oid            relid;            /* Relation ID this insert data is for */
> +    TupleTableSlot **slots;        /* Array of MAX_BUFFERED_TUPLES to store
> +                                 * tuples */
> +    uint64       *linenos;        /* Line # of tuple in copy stream */

It could make sense to allocate those two together, or even as part of
the CopyMultiInsertBuffer itself, to reduce allocator overhead.


> +    else
> +    {
> +        CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
> +
> +        /* Non-partitioned table.  Just setup the buffer for the table. */
> +        buffer->relid = RelationGetRelid(rri->ri_RelationDesc);
> +        buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
> +        buffer->linenos = palloc(MAX_BUFFERED_TUPLES * sizeof(uint64));
> +        buffer->resultRelInfo = rri;
> +        buffer->bistate = GetBulkInsertState();
> +        buffer->nused = 0;
> +        miinfo->multiInsertBufferTab = NULL;
> +        miinfo->buffer = buffer;
> +        miinfo->nbuffers = 1;
> +    }

Can this be moved into a helper function?


> +    /*
> +     * heap_multi_insert leaks memory, so switch to short-lived memory context
> +     * before calling it.
> +     */

s/heap_multi_insert/table_multi_insert/



> +    /*
> +     * If there are any indexes, update them for all the inserted tuples, and
> +     * run AFTER ROW INSERT triggers.
> +     */
> +    if (resultRelInfo->ri_NumIndices > 0)
> +    {
> +        for (i = 0; i < nBufferedTuples; i++)
> +        {
> +            List       *recheckIndexes;
> +
> +            cstate->cur_lineno = buffer->linenos[i];
> +            recheckIndexes =
> +                ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
> +                                      NIL);
> +            ExecARInsertTriggers(estate, resultRelInfo,
> +                                 buffer->slots[i],
> +                                 recheckIndexes, cstate->transition_capture);
> +            list_free(recheckIndexes);
> +        }
> +    }
> +
> +    /*
> +     * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
> +     * anyway.
> +     */
> +    else if (resultRelInfo->ri_TrigDesc != NULL &&
> +             (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
> +              resultRelInfo->ri_TrigDesc->trig_insert_new_table))
> +    {
> +        for (i = 0; i < nBufferedTuples; i++)
> +        {
> +            cstate->cur_lineno = buffer->linenos[i];
> +            ExecARInsertTriggers(estate, resultRelInfo,
> +                                 bufferedSlots[i],
> +                                 NIL, cstate->transition_capture);
> +        }
> +    }

> +    for (i = 0; i < nBufferedTuples; i++)
> +        ExecClearTuple(bufferedSlots[i]);

I wonder about combining these loops somehow. But it's probably ok.


> +/*
> + * CopyMultiInsertBuffer_Cleanup
> + *        Drop used slots and free member for this buffer.  The buffer
> + *        must be flushed before cleanup.
> + */
> +static inline void
> +CopyMultiInsertBuffer_Cleanup(CopyMultiInsertBuffer *buffer)
> +{
> +    int            i;
> +
> +    ReleaseBulkInsertStatePin(buffer->bistate);

Shouldn't this FreeBulkInsertState() rather than
ReleaseBulkInsertStatePin()?


> +
> +/*
> + * CopyMultiInsertBuffer_RemoveBuffer
> + *        Remove a buffer from being tracked by miinfo
> + */
> +static inline void
> +CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo,
> +                                   CopyMultiInsertBuffer *buffer)
> +{
> +    Oid            relid = buffer->relid;
> +
> +    CopyMultiInsertBuffer_Cleanup(buffer);
> +
> +    hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE,
> +                NULL);
> +    miinfo->nbuffers--;

Aren't we leaking the CopyMultiInsertBuffer itself here?

> +}
> +
> +/*
> + * CopyMultiInsertInfo_Flush
> + *        Write out all stored tuples in all buffers out to the tables.
> + *
> + * To save us from ending up with buffers for 1000s of partitions we remove
> + * buffers belonging to partitions that we've seen no tuples for in this batch
> + */
> +static inline void
> +CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, CopyState cstate,
> +                          EState *estate, CommandId mycid, int ti_options)
> +{
> +    CopyMultiInsertBuffer *buffer;
> +
> +    /*
> +     * If just storing buffers for a non-partitioned table, then just flush
> +     * that buffer.
> +     */
> +    if (miinfo->multiInsertBufferTab == NULL)
> +    {
> +        buffer = miinfo->buffer;
> +
> +        CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate, mycid,
> +                                              ti_options);
> +    }
> +    else
> +    {
> +        HASH_SEQ_STATUS status;
> +
> +        /*
> +         * Otherwise make a pass over the hash table and flush all buffers
> +         * that have any tuples stored in them.
> +         */
> +        hash_seq_init(&status, miinfo->multiInsertBufferTab);
> +
> +        while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL)
> +        {
> +            if (buffer->nused > 0)
> +            {
> +                /* Flush the buffer if it was used */
> +                CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate,
> +                                                      mycid, ti_options);
> +            }
> +            else
> +            {
> +                /*
> +                 * Otherwise just remove it.  If we saw no tuples for it this
> +                 * batch, then likely its best to make way for buffers for
> +                 * other partitions.
> +                 */
> +                CopyMultiInsertBuffer_RemoveBuffer(miinfo, buffer);
> +            }
> +        }
> +    }
> +
> +    miinfo->bufferedTuples = 0;
> +    miinfo->bufferedBytes = 0;
> +}
> +
> +/*
> + * CopyMultiInsertInfo_Cleanup
> + *        Cleanup allocated buffers and free memory
> + */
> +static inline void
> +CopyMultiInsertInfo_Cleanup(CopyMultiInsertInfo *miinfo)
> +{
> +    if (miinfo->multiInsertBufferTab == NULL)
> +        CopyMultiInsertBuffer_Cleanup(miinfo->buffer);
> +    else
> +    {
> +        HASH_SEQ_STATUS status;
> +        CopyMultiInsertBuffer *buffer;
> +
> +        hash_seq_init(&status, miinfo->multiInsertBufferTab);
> +
> +        while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL)
> +        {
> +            Assert(buffer->nused == 0);
> +            CopyMultiInsertBuffer_Cleanup(buffer);
> +        }
> +
> +        hash_destroy(miinfo->multiInsertBufferTab);
> +    }
> +}
> +
> +/*
> + * CopyMultiInsertInfo_NextFreeSlot
> + *        Get the next TupleTableSlot that the next tuple should be stored in.
> + *
> + * Callers must ensure that the buffer is not full.
> + */
> +static inline TupleTableSlot *
> +CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo,
> +                                 ResultRelInfo *rri)
> +{
> +    CopyMultiInsertBuffer *buffer = miinfo->buffer;
> +    int            nused = buffer->nused;
> +
> +    Assert(nused < MAX_BUFFERED_TUPLES);
> +
> +    if (buffer->slots[nused] == NULL)
> +        buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
> +    return buffer->slots[nused];
> +}
> +
> +/*
> + * CopyMultiInsertInfo_Store
> + *        Consume the previously reserved TupleTableSlot that was reserved by
> + *        CopyMultiInsertInfo_NextFreeSlot.
> + */
> +static inline void
> +CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, TupleTableSlot *slot,
> +                          int tuplen, uint64 lineno)
> +{
> +    CopyMultiInsertBuffer *buffer = miinfo->buffer;
> +
> +    Assert(slot == buffer->slots[buffer->nused]);
> +
> +    /* Store the line number so we can properly report any errors later */
> +    buffer->linenos[buffer->nused] = lineno;
> +
> +    /* Record this slot as being used */
> +    buffer->nused++;
> +
> +    /* Update how many tuples are stored and their size */
> +    miinfo->bufferedTuples++;
> +    miinfo->bufferedBytes += tuplen;
> +}
> +
>  /*
>   * Copy FROM file to relation.
>   */
>  uint64
>  CopyFrom(CopyState cstate)
>  {
> -    HeapTuple    tuple;
> -    TupleDesc    tupDesc;
> -    Datum       *values;
> -    bool       *nulls;
>      ResultRelInfo *resultRelInfo;
>      ResultRelInfo *target_resultRelInfo;
>      ResultRelInfo *prevResultRelInfo = NULL;
>      EState       *estate = CreateExecutorState(); /* for ExecConstraints() */
>      ModifyTableState *mtstate;
>      ExprContext *econtext;
> -    TupleTableSlot *myslot;
> +    TupleTableSlot *singleslot = NULL;
>      MemoryContext oldcontext = CurrentMemoryContext;
> -    MemoryContext batchcontext;
>  
>      PartitionTupleRouting *proute = NULL;
>      ErrorContextCallback errcallback;
>      CommandId    mycid = GetCurrentCommandId(true);
>      int            ti_options = 0; /* start with default table_insert options */
> -    BulkInsertState bistate;
> +    BulkInsertState bistate = NULL;
>      CopyInsertMethod insertMethod;
> +    CopyMultiInsertInfo multiInsertInfo;
>      uint64        processed = 0;
> -    int            nBufferedTuples = 0;
>      bool        has_before_insert_row_trig;
>      bool        has_instead_insert_row_trig;
>      bool        leafpart_use_multi_insert = false;
>  
> -#define MAX_BUFFERED_TUPLES 1000
> -#define RECHECK_MULTI_INSERT_THRESHOLD 1000
> -    HeapTuple  *bufferedTuples = NULL;    /* initialize to silence warning */
> -    Size        bufferedTuplesSize = 0;
> -    uint64        firstBufferedLineNo = 0;
> -    uint64        lastPartitionSampleLineNo = 0;
> -    uint64        nPartitionChanges = 0;
> -    double        avgTuplesPerPartChange = 0;
> -
>      Assert(cstate->rel);
>  
> +    memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo));
> +
>      /*
>       * The target must be a plain, foreign, or partitioned relation, or have
>       * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
> @@ -2382,8 +2767,6 @@ CopyFrom(CopyState cstate)
>                              RelationGetRelationName(cstate->rel))));
>      }
>  
> -    tupDesc = RelationGetDescr(cstate->rel);
> -
>      /*----------
>       * Check to see if we can avoid writing WAL
>       *
> @@ -2467,8 +2850,8 @@ CopyFrom(CopyState cstate)
>          if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
>          {
>              ereport(ERROR,
> -                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> -                    errmsg("cannot perform FREEZE on a partitioned table")));
> +                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> +                     errmsg("cannot perform FREEZE on a partitioned table")));
>          }
>  
>          /*
> @@ -2518,10 +2901,6 @@ CopyFrom(CopyState cstate)
>  
>      ExecInitRangeTable(estate, cstate->range_table);
>  
> -    /* Set up a tuple slot too */
> -    myslot = ExecInitExtraTupleSlot(estate, tupDesc,
> -                                    &TTSOpsHeapTuple);
> -
>      /*
>       * Set up a ModifyTableState so we can let FDW(s) init themselves for
>       * foreign-table result relation(s).
> @@ -2565,10 +2944,11 @@ CopyFrom(CopyState cstate)
>                                                  &mtstate->ps);
>  
>      /*
> -     * It's more efficient to prepare a bunch of tuples for insertion, and
> -     * insert them in one heap_multi_insert() call, than call heap_insert()
> -     * separately for every tuple. However, there are a number of reasons why
> -     * we might not be able to do this.  These are explained below.
> +     * It's generally more efficient to prepare a bunch of tuples for
> +     * insertion, and insert them in one table_multi_insert() call, than call
> +     * table_insert() separately for every tuple. However, there are a number
> +     * of reasons why we might not be able to do this.  These are explained
> +     * below.
>       */
>      if (resultRelInfo->ri_TrigDesc != NULL &&
>          (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
> @@ -2589,8 +2969,8 @@ CopyFrom(CopyState cstate)
>           * For partitioned tables we can't support multi-inserts when there
>           * are any statement level insert triggers. It might be possible to
>           * allow partitioned tables with such triggers in the future, but for
> -         * now, CopyFromInsertBatch expects that any before row insert and
> -         * statement level insert triggers are on the same relation.
> +         * now, CopyMultiInsertInfo_Flush expects that any before row insert
> +         * and statement level insert triggers are on the same relation.
>           */
>          insertMethod = CIM_SINGLE;
>      }
> @@ -2622,8 +3002,7 @@ CopyFrom(CopyState cstate)
>      {
>          /*
>           * For partitioned tables, we may still be able to perform bulk
> -         * inserts for sets of consecutive tuples which belong to the same
> -         * partition.  However, the possibility of this depends on which types
> +         * inserts.  However, the possibility of this depends on which types
>           * of triggers exist on the partition.  We must disable bulk inserts
>           * if the partition is a foreign table or it has any before row insert
>           * or insert instead triggers (same as we checked above for the parent
> @@ -2632,18 +3011,27 @@ CopyFrom(CopyState cstate)
>           * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
>           * flag that we must later determine if we can use bulk-inserts for
>           * the partition being inserted into.
> -         *
> -         * Normally, when performing bulk inserts we just flush the insert
> -         * buffer whenever it becomes full, but for the partitioned table
> -         * case, we flush it whenever the current tuple does not belong to the
> -         * same partition as the previous tuple.
>           */
>          if (proute)
>              insertMethod = CIM_MULTI_CONDITIONAL;
>          else
>              insertMethod = CIM_MULTI;
>  
> -        bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
> +        CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo,
> +                                 proute != NULL);
> +    }
> +
> +    /*
> +     * If not using batch mode (which allocates slots as needed) set up a
> +     * tuple slot too. When inserting into a partitioned table, we also need
> +     * one, even if we might batch insert, to read the tuple in the root
> +     * partition's form.
> +     */
> +    if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
> +    {
> +        singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
> +                                       &estate->es_tupleTable);
> +        bistate = GetBulkInsertState();
>      }
>  
>      has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
> @@ -2660,10 +3048,6 @@ CopyFrom(CopyState cstate)
>       */
>      ExecBSInsertTriggers(estate, resultRelInfo);
>  
> -    values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
> -    nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
> -
> -    bistate = GetBulkInsertState();
>      econtext = GetPerTupleExprContext(estate);
>  
>      /* Set up callback to identify error line number */
> @@ -2672,17 +3056,9 @@ CopyFrom(CopyState cstate)
>      errcallback.previous = error_context_stack;
>      error_context_stack = &errcallback;
>  
> -    /*
> -     * Set up memory context for batches. For cases without batching we could
> -     * use the per-tuple context, but it's simpler to just use it every time.
> -     */
> -    batchcontext = AllocSetContextCreate(CurrentMemoryContext,
> -                                         "batch context",
> -                                         ALLOCSET_DEFAULT_SIZES);
> -
>      for (;;)
>      {
> -        TupleTableSlot *slot;
> +        TupleTableSlot *myslot;
>          bool        skip_tuple;
>  
>          CHECK_FOR_INTERRUPTS();
> @@ -2693,20 +3069,33 @@ CopyFrom(CopyState cstate)
>           */
>          ResetPerTupleExprContext(estate);
>  
> +        if (insertMethod == CIM_SINGLE || proute)
> +        {
> +            myslot = singleslot;
> +            Assert(myslot != NULL);
> +        }
> +        else
> +        {
> +            Assert(resultRelInfo == target_resultRelInfo);
> +            Assert(insertMethod == CIM_MULTI);
> +
> +            myslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
> +                                                      resultRelInfo);
> +        }
> +
>          /*
>           * Switch to per-tuple context before calling NextCopyFrom, which does
>           * evaluate default expressions etc. and requires per-tuple context.
>           */
>          MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
>  
> -        if (!NextCopyFrom(cstate, econtext, values, nulls))
> -            break;
> +        ExecClearTuple(myslot);
>  
> -        /* Switch into per-batch memory context before forming the tuple. */
> -        MemoryContextSwitchTo(batchcontext);
> +        /* Directly store the values/nulls array in the slot */
> +        if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
> +            break;
>  
> -        /* And now we can form the input tuple. */
> -        tuple = heap_form_tuple(tupDesc, values, nulls);
> +        ExecStoreVirtualTuple(myslot);
>  
>          /*
>           * Constraints might reference the tableoid column, so (re-)initialize
> @@ -2717,18 +3106,15 @@ CopyFrom(CopyState cstate)
>          /* Triggers and stuff need to be invoked in query context. */
>          MemoryContextSwitchTo(oldcontext);
>  
> -        /* Place tuple in tuple slot --- but slot shouldn't free it */
> -        slot = myslot;
> -        ExecStoreHeapTuple(tuple, slot, false);
> -
>          if (cstate->whereClause)
>          {
>              econtext->ecxt_scantuple = myslot;
> +            /* Skip items that don't match the COPY's WHERE clause */
>              if (!ExecQual(cstate->qualexpr, econtext))
>                  continue;
>          }
>  
> -        /* Determine the partition to heap_insert the tuple into */
> +        /* Determine the partition to table_insert the tuple into */
>          if (proute)
>          {
>              TupleConversionMap *map;
> @@ -2739,80 +3125,10 @@ CopyFrom(CopyState cstate)
>               * if the found partition is not suitable for INSERTs.
>               */
>              resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
> -                                              proute, slot, estate);
> +                                              proute, myslot, estate);
>  
>              if (prevResultRelInfo != resultRelInfo)
>              {
> -                /* Check if we can multi-insert into this partition */
> -                if (insertMethod == CIM_MULTI_CONDITIONAL)
> -                {
> -                    /*
> -                     * When performing bulk-inserts into partitioned tables we
> -                     * must insert the tuples seen so far to the heap whenever
> -                     * the partition changes.
> -                     */
> -                    if (nBufferedTuples > 0)
> -                    {
> -                        MemoryContext    oldcontext;
> -
> -                        CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> -                                            prevResultRelInfo, myslot, bistate,
> -                                            nBufferedTuples, bufferedTuples,
> -                                            firstBufferedLineNo);
> -                        nBufferedTuples = 0;
> -                        bufferedTuplesSize = 0;
> -
> -                        /*
> -                         * The tuple is already allocated in the batch context, which
> -                         * we want to reset.  So to keep the tuple we copy it into the
> -                         * short-lived (per-tuple) context, reset the batch context
> -                         * and then copy it back into the per-batch one.
> -                         */
> -                        oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> -                        tuple = heap_copytuple(tuple);
> -                        MemoryContextSwitchTo(oldcontext);
> -
> -                        /* cleanup the old batch */
> -                        MemoryContextReset(batchcontext);
> -
> -                        /* copy the tuple back to the per-batch context */
> -                        oldcontext = MemoryContextSwitchTo(batchcontext);
> -                        tuple = heap_copytuple(tuple);
> -                        MemoryContextSwitchTo(oldcontext);
> -
> -                        /*
> -                         * Also push the tuple copy to the slot (resetting the context
> -                         * invalidated the slot contents).
> -                         */
> -                        ExecStoreHeapTuple(tuple, slot, false);
> -                    }
> -
> -                    nPartitionChanges++;
> -
> -                    /*
> -                     * Here we adaptively enable multi-inserts based on the
> -                     * average number of tuples from recent multi-insert
> -                     * batches.  We recalculate the average every
> -                     * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
> -                     * the average over the whole copy.  This allows us to
> -                     * enable multi-inserts when we get periods in the copy
> -                     * stream that have tuples commonly belonging to the same
> -                     * partition, and disable when the partition is changing
> -                     * too often.
> -                     */
> -                    if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
> -                                                               RECHECK_MULTI_INSERT_THRESHOLD)
> -                                 && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
> -                    {
> -                        avgTuplesPerPartChange =
> -                            (cstate->cur_lineno - lastPartitionSampleLineNo) /
> -                            (double) nPartitionChanges;
> -
> -                        lastPartitionSampleLineNo = cstate->cur_lineno;
> -                        nPartitionChanges = 0;
> -                    }
> -                }
> -
>                  /* Determine which triggers exist on this partition */
>                  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
>                                                resultRelInfo->ri_TrigDesc->trig_insert_before_row);
> @@ -2821,22 +3137,19 @@ CopyFrom(CopyState cstate)
>                                                 resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
>  
>                  /*
> -                 * Tests have shown that using multi-inserts when the
> -                 * partition changes on every tuple slightly decreases the
> -                 * performance, however, there are benefits even when only
> -                 * some batches have just 2 tuples, so let's enable
> -                 * multi-inserts even when the average is quite low.
> +                 * Enable multi-inserts when the partition has BEFORE/INSTEAD
> +                 * OF triggers, or if the partition is a foreign partition.
>                   */
>                  leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
> -                    avgTuplesPerPartChange >= 1.3 &&
>                      !has_before_insert_row_trig &&
>                      !has_instead_insert_row_trig &&
>                      resultRelInfo->ri_FdwRoutine == NULL;
>  
> -                /*
> -                 * We'd better make the bulk insert mechanism gets a new
> -                 * buffer when the partition being inserted into changes.
> -                 */
> +                /* Set the multi-insert buffer to use for this partition. */
> +                if (leafpart_use_multi_insert)
> +                    CopyMultiInsertInfo_SetCurrentBuffer(&multiInsertInfo,
> +                                                         resultRelInfo);
> +
>                  ReleaseBulkInsertStatePin(bistate);
>                  prevResultRelInfo = resultRelInfo;
>              }
> @@ -2879,26 +3192,48 @@ CopyFrom(CopyState cstate)
>               * rowtype.
>               */
>              map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
> -            if (map != NULL)
> +            if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
>              {
> -                TupleTableSlot *new_slot;
> -                MemoryContext oldcontext;
> -
> -                new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
> -                Assert(new_slot != NULL);
> -
> -                slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
> +                /* non batch insert */
> +                if (map != NULL)
> +                {
> +                    TupleTableSlot *new_slot;
>  
> +                    new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
> +                    myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
> +                }
> +            }
> +            else
> +            {
>                  /*
> -                 * Get the tuple in the per-batch context, so that it will be
> -                 * freed after each batch insert.
> +                 * Batch insert into partitioned table.
>                   */
> -                oldcontext = MemoryContextSwitchTo(batchcontext);
> -                tuple = ExecCopySlotHeapTuple(slot);
> -                MemoryContextSwitchTo(oldcontext);
> +                TupleTableSlot *nextslot;
> +
> +                /* no other path available for partitioned table */
> +                Assert(insertMethod == CIM_MULTI_CONDITIONAL);
> +
> +                nextslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
> +                                                            resultRelInfo);
> +
> +                if (map != NULL)
> +                    myslot = execute_attr_map_slot(map->attrMap, myslot, nextslot);
> +                else
> +                {
> +                    /*
> +                     * This looks more expensive than it is (Believe me, I
> +                     * optimized it away. Twice). The input is in virtual
> +                     * form, and we'll materialize the slot below - for most
> +                     * slot types the copy performs the work materialization
> +                     * would later require anyway.
> +                     */
> +                    ExecCopySlot(nextslot, myslot);
> +                    myslot = nextslot;
> +                }
>              }
>  
> -            slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> +            /* ensure that triggers etc see the right relation  */
> +            myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
>          }
>  
>          skip_tuple = false;
> @@ -2906,7 +3241,7 @@ CopyFrom(CopyState cstate)
>          /* BEFORE ROW INSERT Triggers */
>          if (has_before_insert_row_trig)
>          {
> -            if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
> +            if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
>                  skip_tuple = true;    /* "do nothing" */
>          }
>  
> @@ -2919,7 +3254,7 @@ CopyFrom(CopyState cstate)
>               */
>              if (has_instead_insert_row_trig)
>              {
> -                ExecIRInsertTriggers(estate, resultRelInfo, slot);
> +                ExecIRInsertTriggers(estate, resultRelInfo, myslot);
>              }
>              else
>              {
> @@ -2931,12 +3266,7 @@ CopyFrom(CopyState cstate)
>                   */
>                  if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
>                      resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
> -                {
> -                    ExecComputeStoredGenerated(estate, slot);
> -                    MemoryContextSwitchTo(batchcontext);
> -                    tuple = ExecCopySlotHeapTuple(slot);
> -                    MemoryContextSwitchTo(oldcontext);
> -                }
> +                    ExecComputeStoredGenerated(estate, myslot);
>  
>                  /*
>                   * If the target is a plain table, check the constraints of
> @@ -2944,7 +3274,7 @@ CopyFrom(CopyState cstate)
>                   */
>                  if (resultRelInfo->ri_FdwRoutine == NULL &&
>                      resultRelInfo->ri_RelationDesc->rd_att->constr)
> -                    ExecConstraints(resultRelInfo, slot, estate);
> +                    ExecConstraints(resultRelInfo, myslot, estate);
>  
>                  /*
>                   * Also check the tuple against the partition constraint, if
> @@ -2954,7 +3284,7 @@ CopyFrom(CopyState cstate)
>                   */
>                  if (resultRelInfo->ri_PartitionCheck &&
>                      (proute == NULL || has_before_insert_row_trig))
> -                    ExecPartitionCheck(resultRelInfo, slot, estate, true);
> +                    ExecPartitionCheck(resultRelInfo, myslot, estate, true);
>  
>                  /*
>                   * Perform multi-inserts when enabled, or when loading a
> @@ -2963,31 +3293,21 @@ CopyFrom(CopyState cstate)
>                   */
>                  if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
>                  {
> -                    /* Add this tuple to the tuple buffer */
> -                    if (nBufferedTuples == 0)
> -                        firstBufferedLineNo = cstate->cur_lineno;
> -                    bufferedTuples[nBufferedTuples++] = tuple;
> -                    bufferedTuplesSize += tuple->t_len;
> -
>                      /*
> -                     * If the buffer filled up, flush it.  Also flush if the
> -                     * total size of all the tuples in the buffer becomes
> -                     * large, to avoid using large amounts of memory for the
> -                     * buffer when the tuples are exceptionally wide.
> +                     * The slot previously might point into the per-tuple
> +                     * context. For batching it needs to be longer lived.
>                       */
> -                    if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
> -                        bufferedTuplesSize > 65535)
> -                    {
> -                        CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> -                                            resultRelInfo, myslot, bistate,
> -                                            nBufferedTuples, bufferedTuples,
> -                                            firstBufferedLineNo);
> -                        nBufferedTuples = 0;
> -                        bufferedTuplesSize = 0;
> -
> -                        /* free memory occupied by tuples from the batch */
> -                        MemoryContextReset(batchcontext);
> -                    }
> +                    ExecMaterializeSlot(myslot);
> +
> +                    /* Add this tuple to the tuple buffer */
> +                    CopyMultiInsertInfo_Store(&multiInsertInfo, myslot,
> +                                              cstate->line_buf.len,
> +                                              cstate->cur_lineno);
> +
> +                    /* If the buffer filled up, flush it. */
> +                    if (CopyMultiInsertInfo_IsFull(&multiInsertInfo))
> +                        CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate,
> +                                                  estate, mycid, ti_options);
>                  }
>                  else
>                  {
> @@ -2996,12 +3316,12 @@ CopyFrom(CopyState cstate)
>                      /* OK, store the tuple */
>                      if (resultRelInfo->ri_FdwRoutine != NULL)
>                      {
> -                        slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
> -                                                                               resultRelInfo,
> -                                                                               slot,
> -                                                                               NULL);
> +                        myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
> +                                                                                 resultRelInfo,
> +                                                                                 myslot,
> +                                                                                 NULL);
>  
> -                        if (slot == NULL)    /* "do nothing" */
> +                        if (myslot == NULL) /* "do nothing" */
>                              continue;    /* next tuple please */
>  
>                          /*
> @@ -3009,27 +3329,26 @@ CopyFrom(CopyState cstate)
>                           * column, so (re-)initialize tts_tableOid before
>                           * evaluating them.
>                           */
> -                        slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> +                        myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
>                      }
>                      else
>                      {
> -                        tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
> -                        heap_insert(resultRelInfo->ri_RelationDesc, tuple,
> -                                    mycid, ti_options, bistate);
> -                        ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
> -                        slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> +                        /* OK, store the tuple and create index entries for it */
> +                        table_insert(resultRelInfo->ri_RelationDesc, myslot,
> +                                     mycid, ti_options, bistate);
>                      }
>  
> +
>                      /* And create index entries for it */
>                      if (resultRelInfo->ri_NumIndices > 0)
> -                        recheckIndexes = ExecInsertIndexTuples(slot,
> +                        recheckIndexes = ExecInsertIndexTuples(myslot,
>                                                                 estate,
>                                                                 false,
>                                                                 NULL,
>                                                                 NIL);
>  
>                      /* AFTER ROW INSERT Triggers */
> -                    ExecARInsertTriggers(estate, resultRelInfo, slot,
> +                    ExecARInsertTriggers(estate, resultRelInfo, myslot,
>                                           recheckIndexes, cstate->transition_capture);
>  
>                      list_free(recheckIndexes);
> @@ -3045,32 +3364,25 @@ CopyFrom(CopyState cstate)
>          }
>      }
>  
> -    /* Flush any remaining buffered tuples */
> -    if (nBufferedTuples > 0)
> +    if (insertMethod != CIM_SINGLE)
>      {
> -        if (insertMethod == CIM_MULTI_CONDITIONAL)
> -        {
> -            CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> -                                prevResultRelInfo, myslot, bistate,
> -                                nBufferedTuples, bufferedTuples,
> -                                firstBufferedLineNo);
> -        }
> -        else
> -            CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> -                                resultRelInfo, myslot, bistate,
> -                                nBufferedTuples, bufferedTuples,
> -                                firstBufferedLineNo);
> +        /* Flush any remaining buffered tuples */
> +        if (!CopyMultiInsertInfo_IsEmpty(&multiInsertInfo))
> +            CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid,
> +                                      ti_options);
> +
> +        /* Tear down the multi-insert data */
> +        CopyMultiInsertInfo_Cleanup(&multiInsertInfo);
>      }
>  
>      /* Done, clean up */
>      error_context_stack = errcallback.previous;
>  
> -    FreeBulkInsertState(bistate);
> +    if (bistate != NULL)
> +        ReleaseBulkInsertStatePin(bistate);
>  
>      MemoryContextSwitchTo(oldcontext);
>  
> -    MemoryContextDelete(batchcontext);
> -
>      /*
>       * In the old protocol, tell pqcomm that we can process normal protocol
>       * messages again.
> @@ -3084,9 +3396,6 @@ CopyFrom(CopyState cstate)
>      /* Handle queued AFTER triggers */
>      AfterTriggerEndQuery(estate);
>  
> -    pfree(values);
> -    pfree(nulls);
> -
>      ExecResetTupleTable(estate->es_tupleTable, false);
>  
>      /* Allow the FDW to shut down */
> @@ -3111,88 +3420,6 @@ CopyFrom(CopyState cstate)
>      return processed;
>  }
>  
> -/*
> - * A subroutine of CopyFrom, to write the current batch of buffered heap
> - * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
> - * triggers.
> - */
> -static void
> -CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
> -                    int ti_options, ResultRelInfo *resultRelInfo,
> -                    TupleTableSlot *myslot, BulkInsertState bistate,
> -                    int nBufferedTuples, HeapTuple *bufferedTuples,
> -                    uint64 firstBufferedLineNo)
> -{
> -    MemoryContext oldcontext;
> -    int            i;
> -    uint64        save_cur_lineno;
> -    bool        line_buf_valid = cstate->line_buf_valid;
> -
> -    /*
> -     * Print error context information correctly, if one of the operations
> -     * below fail.
> -     */
> -    cstate->line_buf_valid = false;
> -    save_cur_lineno = cstate->cur_lineno;
> -
> -    /*
> -     * heap_multi_insert leaks memory, so switch to short-lived memory context
> -     * before calling it.
> -     */
> -    oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> -    heap_multi_insert(resultRelInfo->ri_RelationDesc,
> -                      bufferedTuples,
> -                      nBufferedTuples,
> -                      mycid,
> -                      ti_options,
> -                      bistate);
> -    MemoryContextSwitchTo(oldcontext);
> -
> -    /*
> -     * If there are any indexes, update them for all the inserted tuples, and
> -     * run AFTER ROW INSERT triggers.
> -     */
> -    if (resultRelInfo->ri_NumIndices > 0)
> -    {
> -        for (i = 0; i < nBufferedTuples; i++)
> -        {
> -            List       *recheckIndexes;
> -
> -            cstate->cur_lineno = firstBufferedLineNo + i;
> -            ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
> -            recheckIndexes =
> -                ExecInsertIndexTuples(myslot,
> -                                      estate, false, NULL, NIL);
> -            ExecARInsertTriggers(estate, resultRelInfo,
> -                                 myslot,
> -                                 recheckIndexes, cstate->transition_capture);
> -            list_free(recheckIndexes);
> -        }
> -    }
> -
> -    /*
> -     * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
> -     * anyway.
> -     */
> -    else if (resultRelInfo->ri_TrigDesc != NULL &&
> -             (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
> -              resultRelInfo->ri_TrigDesc->trig_insert_new_table))
> -    {
> -        for (i = 0; i < nBufferedTuples; i++)
> -        {
> -            cstate->cur_lineno = firstBufferedLineNo + i;
> -            ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
> -            ExecARInsertTriggers(estate, resultRelInfo,
> -                                 myslot,
> -                                 NIL, cstate->transition_capture);
> -        }
> -    }
> -
> -    /* reset cur_lineno and line_buf_valid to what they were */
> -    cstate->line_buf_valid = line_buf_valid;
> -    cstate->cur_lineno = save_cur_lineno;
> -}
> -
>  /*
>   * Setup to read tuples from a file for COPY FROM.
>   *
> @@ -4990,11 +5217,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
>      DR_copy    *myState = (DR_copy *) self;
>      CopyState    cstate = myState->cstate;
>  
> -    /* Make sure the tuple is fully deconstructed */
> -    slot_getallattrs(slot);
> -
> -    /* And send the data */
> -    CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
> +    /* Send the data */
> +    CopyOneRowTo(cstate, slot);
>      myState->processed++;
>  
>      return true;
> diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
> index 4c077755d5..ed0e2de144 100644
> --- a/src/include/access/heapam.h
> +++ b/src/include/access/heapam.h
> @@ -36,6 +36,7 @@
>  #define HEAP_INSERT_SPECULATIVE 0x0010
>  
>  typedef struct BulkInsertStateData *BulkInsertState;
> +struct TupleTableSlot;
>  
>  #define MaxLockTupleMode    LockTupleExclusive
>  
> @@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
>  
>  extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
>              int options, BulkInsertState bistate);
> -extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
> +extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples,
>                    CommandId cid, int options, BulkInsertState bistate);
>  extern TM_Result heap_delete(Relation relation, ItemPointer tid,
>              CommandId cid, Snapshot crosscheck, bool wait,
> diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
> index 4efe178ed1..c2fdedc551 100644
> --- a/src/include/access/tableam.h
> +++ b/src/include/access/tableam.h
> @@ -328,6 +328,9 @@ typedef struct TableAmRoutine
>       * ------------------------------------------------------------------------
>       */
>  
> +    void        (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
> +                                 CommandId cid, int options, struct BulkInsertStateData *bistate);
> +
>      /* see table_insert() for reference about parameters */
>      void        (*tuple_insert) (Relation rel, TupleTableSlot *slot,
>                                   CommandId cid, int options,
> @@ -1157,6 +1160,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
>                                           lockmode, update_indexes);
>  }
>  
> +/*
> + *    table_multi_insert    - insert multiple tuple into a table
> + */
> +static inline void
> +table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
> +                   CommandId cid, int options, struct BulkInsertStateData *bistate)
> +{
> +    rel->rd_tableam->multi_insert(rel, slots, nslots,
> +                                  cid, options, bistate);
> +}
> +
>  /*
>   * Lock a tuple in the specified mode.
>   *

Greetings,

Andres Freund




pgsql-hackers by date:

Previous
From: David Rowley
Date:
Subject: Re: COPY FROM WHEN condition
Next
From: Andres Freund
Date:
Subject: Re: COPY FROM WHEN condition