Re: COPY FROM WHEN condition - Mailing list pgsql-hackers

From Andres Freund
Subject Re: COPY FROM WHEN condition
Date
Msg-id 20190402175753.ig3gfyws4wdurmrm@alap3.anarazel.de
Whole thread Raw
In response to Re: COPY FROM WHEN condition  (David Rowley <david.rowley@2ndquadrant.com>)
Responses Re: COPY FROM WHEN condition
List pgsql-hackers
On 2019-04-03 06:41:49 +1300, David Rowley wrote:
> However, I've ended up not doing it that way as the patch requires
> more than just an array of TupleTableSlots to be stored in the
> ResultRelInfo, it'll pretty much need all of what I have in
> CopyMultiInsertBuffer, which includes line numbers for error
> reporting, a BulkInsertState and a counter to track how many of the
> slots are used.  I had thoughts that we could just tag the whole
> CopyMultiInsertBuffer in ResultRelInfo, but that requires moving it
> somewhere a bit more generic. Another thought would be that we have
> something like "void *extra;" in ResultRelInfo that we can just borrow
> for copy.c.  It may be interesting to try this out to see if it saves
> much in the way of performance.

Hm, we could just forwad declare struct CopyMultiInsertBuffer in a
header, and only define it in copy.c. That doesn't sound insane to me.


> I've got the patch into a working state now and wrote a bunch of
> comments. I've not really done a thorough read back of the code again.
> I normally wait until the light is coming from a different angle
> before doing that, but there does not seem to be much time to wait for
> that in this case, so here's v2.  Performance seems to be about the
> same as what I reported yesterday.

Cool.


> +/*
> + * Multi-Insert handling code for COPY FROM.  Inserts are performed in
> + * batches of up to MAX_BUFFERED_TUPLES.

This should probably be moved to the top of the file.


> + * When COPY FROM is used on a partitioned table, we attempt to maintain
> + * only MAX_PARTITION_BUFFERS buffers at once.  Buffers that are completely
> + * unused in each batch are removed so that we end up not keeping buffers for
> + * partitions that we might not insert into again.
> + */
> +#define MAX_BUFFERED_TUPLES        1000
> +#define MAX_BUFFERED_BYTES        65535
> +#define MAX_PARTITION_BUFFERS    15

> +/*
> + * CopyMultiInsertBuffer
> + *        Stores multi-insert data related to a single relation in CopyFrom.
> + */
> +typedef struct

Please don't create anonymous structs - they can't be forward declared,
and some debugging tools handle them worse than named structs. And it
makes naming CopyMultiInsertBuffer in the comment less necessary.


> +{
> +    Oid            relid;            /* Relation ID this insert data is for */
> +    TupleTableSlot **slots;        /* Array of MAX_BUFFERED_TUPLES to store
> +                                 * tuples */
> +    uint64       *linenos;        /* Line # of tuple in copy stream */

It could make sense to allocate those two together, or even as part of
the CopyMultiInsertBuffer itself, to reduce allocator overhead.


> +    else
> +    {
> +        CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
> +
> +        /* Non-partitioned table.  Just setup the buffer for the table. */
> +        buffer->relid = RelationGetRelid(rri->ri_RelationDesc);
> +        buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
> +        buffer->linenos = palloc(MAX_BUFFERED_TUPLES * sizeof(uint64));
> +        buffer->resultRelInfo = rri;
> +        buffer->bistate = GetBulkInsertState();
> +        buffer->nused = 0;
> +        miinfo->multiInsertBufferTab = NULL;
> +        miinfo->buffer = buffer;
> +        miinfo->nbuffers = 1;
> +    }

Can this be moved into a helper function?


> +    /*
> +     * heap_multi_insert leaks memory, so switch to short-lived memory context
> +     * before calling it.
> +     */

s/heap_multi_insert/table_multi_insert/



> +    /*
> +     * If there are any indexes, update them for all the inserted tuples, and
> +     * run AFTER ROW INSERT triggers.
> +     */
> +    if (resultRelInfo->ri_NumIndices > 0)
> +    {
> +        for (i = 0; i < nBufferedTuples; i++)
> +        {
> +            List       *recheckIndexes;
> +
> +            cstate->cur_lineno = buffer->linenos[i];
> +            recheckIndexes =
> +                ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
> +                                      NIL);
> +            ExecARInsertTriggers(estate, resultRelInfo,
> +                                 buffer->slots[i],
> +                                 recheckIndexes, cstate->transition_capture);
> +            list_free(recheckIndexes);
> +        }
> +    }
> +
> +    /*
> +     * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
> +     * anyway.
> +     */
> +    else if (resultRelInfo->ri_TrigDesc != NULL &&
> +             (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
> +              resultRelInfo->ri_TrigDesc->trig_insert_new_table))
> +    {
> +        for (i = 0; i < nBufferedTuples; i++)
> +        {
> +            cstate->cur_lineno = buffer->linenos[i];
> +            ExecARInsertTriggers(estate, resultRelInfo,
> +                                 bufferedSlots[i],
> +                                 NIL, cstate->transition_capture);
> +        }
> +    }

> +    for (i = 0; i < nBufferedTuples; i++)
> +        ExecClearTuple(bufferedSlots[i]);

I wonder about combining these loops somehow. But it's probably ok.


> +/*
> + * CopyMultiInsertBuffer_Cleanup
> + *        Drop used slots and free member for this buffer.  The buffer
> + *        must be flushed before cleanup.
> + */
> +static inline void
> +CopyMultiInsertBuffer_Cleanup(CopyMultiInsertBuffer *buffer)
> +{
> +    int            i;
> +
> +    ReleaseBulkInsertStatePin(buffer->bistate);

Shouldn't this FreeBulkInsertState() rather than
ReleaseBulkInsertStatePin()?


> +
> +/*
> + * CopyMultiInsertBuffer_RemoveBuffer
> + *        Remove a buffer from being tracked by miinfo
> + */
> +static inline void
> +CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo,
> +                                   CopyMultiInsertBuffer *buffer)
> +{
> +    Oid            relid = buffer->relid;
> +
> +    CopyMultiInsertBuffer_Cleanup(buffer);
> +
> +    hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE,
> +                NULL);
> +    miinfo->nbuffers--;

Aren't we leaking the CopyMultiInsertBuffer itself here?



> +/*
> + * CopyMultiInsertInfo_Flush
> + *        Write out all stored tuples in all buffers out to the tables.
> + *
> + * To save us from ending up with buffers for 1000s of partitions we remove
> + * buffers belonging to partitions that we've seen no tuples for in this batch

That seems a little naive (imagine you have like 5 partitions, and we
constantly cycle through 2-3 of them per batch).  It's probably OK for
this version.   I guess I'd only do that cleanup if we're actually
flushing due to the number of partitions.



>          if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
>          {
>              ereport(ERROR,
> -                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> -                    errmsg("cannot perform FREEZE on a partitioned table")));
> +                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> +                     errmsg("cannot perform FREEZE on a partitioned table")));
>          }

accidental hunk?

Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: Re: COPY FROM WHEN condition
Next
From: Pavel Stehule
Date:
Subject: Re: [HACKERS] proposal: schema variables