From d83bf5bc0bfc5f45d85e38a876a7db94f12803da Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 23 Apr 2024 05:18:58 +0000 Subject: [PATCH v19 5/6] Use new multi insert Table AM for COPY FROM --- src/backend/commands/copyfrom.c | 230 +++++++++++++++-------- src/include/commands/copyfrom_internal.h | 4 +- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 153 insertions(+), 82 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index ce4d62e707..8572c5b730 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -71,13 +71,21 @@ /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 +typedef struct CopyModifyBufferFlushContext +{ + CopyFromState cstate; + ResultRelInfo *resultRelInfo; + EState *estate; +} CopyModifyBufferFlushContext; + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableModifyState *mstate; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ + TupleTableSlot *multislot; + CopyModifyBufferFlushContext *modify_buffer_flush_context; int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -99,6 +107,7 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; +static void CopyModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots); /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); @@ -218,14 +227,38 @@ CopyLimitPrintoutLength(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, + CopyFromState cstate, EState *estate) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + buffer->modify_buffer_flush_context = (CopyModifyBufferFlushContext *) palloc(sizeof(CopyModifyBufferFlushContext)); + buffer->modify_buffer_flush_context->cstate = cstate; + buffer->modify_buffer_flush_context->resultRelInfo = rri; + buffer->modify_buffer_flush_context->estate = estate; + + buffer->mstate = table_modify_begin(rri->ri_RelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + miinfo->mycid, + miinfo->ti_options, + CopyModifyBufferFlushCallback, + buffer->modify_buffer_flush_context); + buffer->slots = NULL; + buffer->multislot = NULL; + } + else + { + buffer->mstate = NULL; + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->multislot = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -236,11 +269,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri) */ static inline void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) + ResultRelInfo *rri, CopyFromState cstate, + EState *estate) { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -273,7 +307,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * tuples their way for the first time. */ if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - CopyMultiInsertInfoSetupBuffer(miinfo, rri); + CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate); } /* @@ -317,8 +351,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -390,13 +422,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -404,56 +431,12 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; + table_modify_buffer_flush(buffer->mstate); + /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. + * Indexes are updated and AFTER ROW INSERT triggers (if any) are run + * in the flush callback CopyModifyBufferFlushCallback. */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) - { - /* - * If there are any indexes, update them for all the inserted - * tuples, and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, - false, NULL, NIL, false); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } - - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) - { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, - cstate->transition_capture); - } - - ExecClearTuple(slots[i]); - } /* Update the row counter and progress of the COPY command */ *processed += nused; @@ -469,6 +452,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, buffer->nused = 0; } +static void +CopyModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots) +{ + CopyModifyBufferFlushContext *ctx = (CopyModifyBufferFlushContext *) context; + CopyFromState cstate = ctx->cstate; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer; + int i; + + if (nslots <= 0) + return; + + /* Quick exit if no indexes or no triggers */ + if (!(resultRelInfo->ri_NumIndices > 0 || + (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)))) + return; + + /* Caller must take care of opening and closing the indices */ + for (i = 0; i < nslots; i++) + { + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slots[i], estate, false, + false, NULL, NIL, false); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, + cstate->transition_capture); + } + } +} + /* * Drop used slots and free member for this buffer. * @@ -489,19 +530,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, if (resultRelInfo->ri_FdwRoutine == NULL) { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); + table_modify_end(buffer->mstate); + ExecDropSingleTupleTableSlot(buffer->multislot); + pfree(buffer->modify_buffer_flush_context); } else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -588,13 +628,32 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + { + if (buffer->multislot == NULL) + buffer->multislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc), + &TTSOpsVirtual); + + /* Caller must clear the slot */ + slot = buffer->multislot; + } + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -608,7 +667,17 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); + + if (rri->ri_FdwRoutine == NULL) + { + Assert(slot == buffer->multislot); + table_modify_buffer_insert(buffer->mstate, slot); + } + +#ifdef USE_ASSERT_CHECKING + if (rri->ri_FdwRoutine != NULL) + Assert(slot == buffer->slots[buffer->nused]); +#endif /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; @@ -830,7 +899,7 @@ CopyFrom(CopyFromState cstate) /* * It's generally more efficient to prepare a bunch of tuples for * insertion, and insert them in one - * table_multi_insert()/ExecForeignBatchInsert() call, than call + * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call * table_tuple_insert()/ExecForeignInsert() separately for every tuple. * However, there are a number of reasons why we might not be able to do * this. These are explained below. @@ -1080,7 +1149,8 @@ CopyFrom(CopyFromState cstate) { if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, - resultRelInfo); + resultRelInfo, cstate, + estate); } else if (insertMethod == CIM_MULTI_CONDITIONAL && !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc78..14addbc6f6 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -46,9 +46,9 @@ typedef enum EolType typedef enum CopyInsertMethod { CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or + CIM_MULTI, /* always use table_modify_buffer_insert or * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + CIM_MULTI_CONDITIONAL, /* use table_modify_buffer_insert or * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 70f23808e2..bd8c87be33 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -486,6 +486,7 @@ CopyHeaderChoice CopyInsertMethod CopyMethod CopyLogVerbosityChoice +CopyModifyBufferFlushContext CopyMultiInsertBuffer CopyMultiInsertInfo CopyOnErrorChoice -- 2.34.1