From 2bcbe0ffc5df81b38e6c6f0093eb486669c7a3b2 Mon Sep 17 00:00:00 2001 From: kommih Date: Fri, 3 Aug 2018 11:07:52 +1000 Subject: [PATCH 1/2] COPY's multi_insert path deal with bunch of slots Support of passing slots instead of tuples when doing multi insert. --- src/backend/access/heap/heapam.c | 31 ++++++---- src/backend/commands/copy.c | 100 +++++++++++++++---------------- src/include/access/heapam.h | 3 +- src/include/access/tableam.h | 6 +- 4 files changed, 74 insertions(+), 66 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 40c1a5432d..7d0d1dc234 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2648,7 +2648,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, * temporary context before calling this, if that's a problem. */ void -heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, +heap_multi_insert(Relation relation, TupleTableSlot **slots, int nslots, CommandId cid, int options, BulkInsertState bistate) { TransactionId xid = GetCurrentTransactionId(); @@ -2666,11 +2666,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, saveFreeSpace = RelationGetTargetPageFreeSpace(relation, HEAP_DEFAULT_FILLFACTOR); - /* Toast and set header data in all the tuples */ - heaptuples = palloc(ntuples * sizeof(HeapTuple)); - for (i = 0; i < ntuples; i++) - heaptuples[i] = heap_prepare_insert(relation, tuples[i], + /* Toast and set header data in all the slots */ + heaptuples = palloc(nslots * sizeof(HeapTuple)); + for (i = 0; i < nslots; i++) + { + heaptuples[i] = heap_prepare_insert(relation, ExecGetHeapTupleFromSlot(slots[i]), xid, cid, options); + if (slots[i]->tts_tupleOid != InvalidOid) + HeapTupleSetOid(heaptuples[i], slots[i]->tts_tupleOid); + + if (slots[i]->tts_tableOid != InvalidOid) + heaptuples[i]->t_tableOid = slots[i]->tts_tableOid; + } /* * Allocate some memory to use for constructing the WAL record. Using @@ -2706,7 +2713,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); ndone = 0; - while (ndone < ntuples) + while (ndone < nslots) { Buffer buffer; Buffer vmbuffer = InvalidBuffer; @@ -2732,7 +2739,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * Put that on the page, and then as many other tuples as fit. */ RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false); - for (nthispage = 1; ndone + nthispage < ntuples; nthispage++) + for (nthispage = 1; ndone + nthispage < nslots; nthispage++) { HeapTuple heaptup = heaptuples[ndone + nthispage]; @@ -2841,7 +2848,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * emitted by this call to heap_multi_insert(). Needed for logical * decoding so it knows when to cleanup temporary data. */ - if (ndone + nthispage == ntuples) + if (ndone + nthispage == nslots) xlrec->flags |= XLH_INSERT_LAST_IN_MULTI; if (init) @@ -2904,7 +2911,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, */ if (IsCatalogRelation(relation)) { - for (i = 0; i < ntuples; i++) + for (i = 0; i < nslots; i++) CacheInvalidateHeapTuple(relation, heaptuples[i], NULL); } @@ -2913,10 +2920,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's * probably faster to always copy than check. */ - for (i = 0; i < ntuples; i++) - tuples[i]->t_self = heaptuples[i]->t_self; + for (i = 0; i < nslots; i++) + slots[i]->tts_tid = heaptuples[i]->t_self; - pgstat_count_heap_insert(relation, ntuples); + pgstat_count_heap_insert(relation, nslots); } /* diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index d49734ddab..62ee8cfea7 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -306,9 +306,9 @@ static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, - ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, + ResultRelInfo *resultRelInfo, BulkInsertState bistate, - int nBufferedTuples, HeapTuple *bufferedTuples, + int nslots, TupleTableSlot **slots, uint64 firstBufferedLineNo); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); @@ -2310,7 +2310,6 @@ CopyFrom(CopyState cstate) EState *estate = CreateExecutorState(); /* for ExecConstraints() */ ModifyTableState *mtstate; ExprContext *econtext; - TupleTableSlot *myslot; MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcallback; @@ -2319,12 +2318,11 @@ CopyFrom(CopyState cstate) void *bistate; uint64 processed = 0; bool useHeapMultiInsert; - int nBufferedTuples = 0; + int nslots = 0; int prev_leaf_part_index = -1; -#define MAX_BUFFERED_TUPLES 1000 - HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ - Size bufferedTuplesSize = 0; +#define MAX_BUFFERED_SLOTS 1000 + TupleTableSlot **slots = NULL; /* initialize to silence warning */ uint64 firstBufferedLineNo = 0; Assert(cstate->rel); @@ -2467,10 +2465,6 @@ CopyFrom(CopyState cstate) estate->es_result_relation_info = resultRelInfo; estate->es_range_table = cstate->range_table; - /* Set up a tuple slot too */ - myslot = ExecInitExtraTupleSlot(estate, tupDesc, - TTS_TYPE_HEAPTUPLE); - /* * Set up a ModifyTableState so we can let FDW(s) init themselves for * foreign-table result relation(s). @@ -2541,7 +2535,7 @@ CopyFrom(CopyState cstate) else { useHeapMultiInsert = true; - bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); + slots = palloc(MAX_BUFFERED_SLOTS * sizeof(TupleTableSlot *)); } /* @@ -2569,11 +2563,17 @@ CopyFrom(CopyState cstate) TupleTableSlot *slot; bool skip_tuple; Oid loaded_oid = InvalidOid; + int natts = resultRelInfo->ri_RelationDesc->rd_att->natts; + int cnt; CHECK_FOR_INTERRUPTS(); - if (nBufferedTuples == 0) + if (nslots == 0) { + /* Reset Tupletable slots if any */ + ExecResetTupleTable(estate->es_tupleTable, false); + estate->es_tupleTable = NIL; + /* * Reset the per-tuple exprcontext. We can only do this if the * tuple buffer is empty. (Calling the context the per-tuple @@ -2588,25 +2588,32 @@ CopyFrom(CopyState cstate) if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid)) break; - /* And now we can form the input tuple. */ - tuple = heap_form_tuple(tupDesc, values, nulls); + slot = ExecInitExtraTupleSlot(estate, + RelationGetDescr(resultRelInfo->ri_RelationDesc), + useHeapMultiInsert ? TTS_TYPE_VIRTUAL : TTS_TYPE_HEAPTUPLE); + + /* Directly store the values/nulls array in the slot */ + memcpy(slot->tts_isnull, nulls, sizeof(bool) * natts); + for (cnt = 0; cnt < natts; cnt++) + { + if (!slot->tts_isnull[cnt]) + slot->tts_values[cnt] = values[cnt]; + } + + ExecStoreVirtualTuple(slot); if (loaded_oid != InvalidOid) - HeapTupleSetOid(tuple, loaded_oid); + slot->tts_tupleOid = loaded_oid; /* * Constraints might reference the tableoid column, so initialize * t_tableOid before evaluating them. */ - tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); /* Triggers and stuff need to be invoked in query context. */ MemoryContextSwitchTo(oldcontext); - /* Place tuple in tuple slot --- but slot shouldn't free it */ - slot = myslot; - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - /* Determine the partition to heap_insert the tuple into */ if (cstate->partition_tuple_routing) { @@ -2659,6 +2666,9 @@ CopyFrom(CopyState cstate) */ estate->es_result_relation_info = resultRelInfo; + /* FIXME: Get the HeapTuple from slot */ + tuple = ExecGetHeapTupleFromSlot(slot); + /* * If we're capturing transition tuples, we might need to convert * from the partition rowtype to parent rowtype. @@ -2699,6 +2709,7 @@ CopyFrom(CopyState cstate) &slot); tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + slot->tts_tableOid = tuple->t_tableOid; } skip_tuple = false; @@ -2709,8 +2720,6 @@ CopyFrom(CopyState cstate) { if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) skip_tuple = true; /* "do nothing" */ - else /* trigger might have changed tuple */ - tuple = ExecGetHeapTupleFromSlot(slot); } if (!skip_tuple) @@ -2746,10 +2755,9 @@ CopyFrom(CopyState cstate) if (useHeapMultiInsert) { /* Add this tuple to the tuple buffer */ - if (nBufferedTuples == 0) + if (nslots == 0) firstBufferedLineNo = cstate->cur_lineno; - bufferedTuples[nBufferedTuples++] = tuple; - bufferedTuplesSize += tuple->t_len; + slots[nslots++] = slot; /* * If the buffer filled up, flush it. Also flush if the @@ -2757,15 +2765,13 @@ CopyFrom(CopyState cstate) * large, to avoid using large amounts of memory for the * buffer when the tuples are exceptionally wide. */ - if (nBufferedTuples == MAX_BUFFERED_TUPLES || - bufferedTuplesSize > 65535) + if (nslots == MAX_BUFFERED_SLOTS) { CopyFromInsertBatch(cstate, estate, mycid, hi_options, - resultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, + resultRelInfo, bistate, + nslots, slots, firstBufferedLineNo); - nBufferedTuples = 0; - bufferedTuplesSize = 0; + nslots = 0; } } else @@ -2783,15 +2789,12 @@ CopyFrom(CopyState cstate) if (slot == NULL) /* "do nothing" */ goto next_tuple; - /* FDW might have changed tuple */ - tuple = ExecGetHeapTupleFromSlot(slot); - /* * AFTER ROW Triggers might reference the tableoid * column, so initialize t_tableOid before evaluating * them. */ - tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } else { @@ -2834,10 +2837,10 @@ next_tuple: } /* Flush any remaining buffered tuples */ - if (nBufferedTuples > 0) + if (nslots > 0) CopyFromInsertBatch(cstate, estate, mycid, hi_options, - resultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, + resultRelInfo, bistate, + nslots, slots, firstBufferedLineNo); /* Done, clean up */ @@ -2900,8 +2903,7 @@ next_tuple: static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, - TupleTableSlot *myslot, BulkInsertState bistate, - int nBufferedTuples, HeapTuple *bufferedTuples, + BulkInsertState bistate, int nslots, TupleTableSlot **slots, uint64 firstBufferedLineNo) { MemoryContext oldcontext; @@ -2921,8 +2923,8 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); table_multi_insert(cstate->rel, - bufferedTuples, - nBufferedTuples, + slots, + nslots, mycid, hi_options, bistate); @@ -2934,16 +2936,15 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, */ if (resultRelInfo->ri_NumIndices > 0) { - for (i = 0; i < nBufferedTuples; i++) + for (i = 0; i < nslots; i++) { List *recheckIndexes; cstate->cur_lineno = firstBufferedLineNo + i; - ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false); recheckIndexes = - ExecInsertIndexTuples(myslot, estate, false, NULL, NIL); + ExecInsertIndexTuples(slots[i], estate, false, NULL, NIL); ExecARInsertTriggers(estate, resultRelInfo, - myslot, + slots[i], recheckIndexes, cstate->transition_capture); list_free(recheckIndexes); } @@ -2957,12 +2958,11 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, (resultRelInfo->ri_TrigDesc->trig_insert_after_row || resultRelInfo->ri_TrigDesc->trig_insert_new_table)) { - for (i = 0; i < nBufferedTuples; i++) + for (i = 0; i < nslots; i++) { cstate->cur_lineno = firstBufferedLineNo + i; - ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false); ExecARInsertTriggers(estate, resultRelInfo, - myslot, + slots[i], NIL, cstate->transition_capture); } } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 5f89b5b174..a36a0c49e9 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -16,6 +16,7 @@ #include "access/sdir.h" #include "access/skey.h" +#include "executor/tuptable.h" #include "nodes/lockoptions.h" #include "nodes/primnodes.h" #include "storage/bufpage.h" @@ -168,7 +169,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate); -extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, +extern void heap_multi_insert(Relation relation, TupleTableSlot **slots, int nslots, CommandId cid, int options, BulkInsertState bistate); extern HTSU_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index df60ba3316..9912a171fb 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -103,7 +103,7 @@ typedef HTSU_Result (*TupleLock_function) (Relation relation, uint8 flags, HeapUpdateFailureData *hufd); -typedef void (*MultiInsert_function) (Relation relation, HeapTuple *tuples, int ntuples, +typedef void (*MultiInsert_function) (Relation relation, TupleTableSlot **slots, int nslots, CommandId cid, int options, BulkInsertState bistate); typedef void (*TupleGetLatestTid_function) (Relation relation, @@ -611,10 +611,10 @@ table_fetch_follow_check(Relation rel, * table_multi_insert - insert multiple tuple into a table */ static inline void -table_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, +table_multi_insert(Relation relation, TupleTableSlot **slots, int nslots, CommandId cid, int options, BulkInsertState bistate) { - relation->rd_tableamroutine->multi_insert(relation, tuples, ntuples, + relation->rd_tableamroutine->multi_insert(relation, slots, nslots, cid, options, bistate); } -- 2.18.0.windows.1