From e21754e663e5bebfe005dd95d8e61184d4e18b05 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Tue, 25 Feb 2025 16:16:24 +0100 Subject: [PATCH v20250303 4/4] WIP: parallel inserts into GIN index --- src/backend/access/gin/gininsert.c | 450 +++++++++++------- .../utils/activity/wait_event_names.txt | 2 + 2 files changed, 286 insertions(+), 166 deletions(-) diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index e873443784a..750c0c3270d 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -26,7 +26,9 @@ #include "miscadmin.h" #include "nodes/execnodes.h" #include "pgstat.h" +#include "storage/barrier.h" #include "storage/bufmgr.h" +#include "storage/buffile.h" #include "storage/predicate.h" #include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/datum.h" @@ -42,6 +44,11 @@ #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +/* The phases for parallel builds, used by build_barrier. */ +#define GIN_BUILD_INIT 0 +#define GIN_BUILD_SCAN 1 +#define GIN_BUILD_PARTITION 2 + /* * Status for index builds performed in parallel. This is allocated in a * dynamic shared memory segment. @@ -88,6 +95,9 @@ typedef struct GinBuildShared double reltuples; double indtuples; + Barrier build_barrier; + SharedFileSet fileset; /* space for shared temporary files */ + /* * ParallelTableScanDescData data follows. Can't directly embed here, as * implementations of the parallel table scan desc interface might need @@ -173,7 +183,6 @@ static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relati static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _gin_parallel_heapscan(GinBuildState *buildstate); -static double _gin_parallel_merge(GinBuildState *buildstate); static void _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Relation index); static void _gin_parallel_scan_and_build(GinBuildState *buildstate, @@ -189,6 +198,12 @@ static GinTuple *_gin_build_tuple(OffsetNumber attrnum, unsigned char category, Datum key, int16 typlen, bool typbyval, ItemPointerData *items, uint32 nitems); +static double _gin_partition_sorted_data(GinBuildState *state); +static void _gin_parallel_insert(GinBuildState *state, + GinBuildShared *gistshared, + Relation heap, Relation index, + bool progress); + /* * Adds array of item pointers to tuple's posting list, or * creates posting tree and tuple pointing to tree in case @@ -699,8 +714,12 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) maintenance_work_mem, coordinate, TUPLESORT_NONE); - /* scan the relation in parallel and merge per-worker results */ - reltuples = _gin_parallel_merge(state); + /* partition the sorted data */ + reltuples = _gin_partition_sorted_data(state); + + /* do the insert for the leader's partition */ + _gin_parallel_insert(state, state->bs_leader->ginshared, + heap, index, true); _gin_end_parallel(state->bs_leader, state); } @@ -989,6 +1008,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, ginshared->reltuples = 0.0; ginshared->indtuples = 0.0; + /* used to wait for data to insert */ + BarrierInit(&ginshared->build_barrier, scantuplesortstates); + + /* Set up the space we'll use for shared temporary files. */ + SharedFileSetInit(&ginshared->fileset, pcxt->seg); + table_parallelscan_initialize(heap, ParallelTableScanFromGinBuildShared(ginshared), snapshot); @@ -1056,6 +1081,11 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * sure that the failure-to-start case will not hang forever. */ WaitForParallelWorkersToAttach(pcxt); + + /* wait for workers to read the data and add them to tuplesort */ + if (BarrierArriveAndWait(&ginshared->build_barrier, + WAIT_EVENT_GIN_BUILD_SCAN)) + elog(LOG, "data scanned, leader continues"); } /* @@ -1069,6 +1099,8 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) /* Shutdown worker processes */ WaitForParallelWorkersToFinish(ginleader->pcxt); + SharedFileSetDeleteAll(&ginleader->ginshared->fileset); + /* * Next, accumulate WAL usage. (This must wait for the workers to finish, * or we might get incomplete data.) @@ -1713,169 +1745,6 @@ GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup) return GinBufferKeyEquals(buffer, tup); } -/* - * Within leader, wait for end of heap scan and merge per-worker results. - * - * After waiting for all workers to finish, merge the per-worker results into - * the complete index. The results from each worker are sorted by block number - * (start of the page range). While combinig the per-worker results we merge - * summaries for the same page range, and also fill-in empty summaries for - * ranges without any tuples. - * - * Returns the total number of heap tuples scanned. - */ -static double -_gin_parallel_merge(GinBuildState *state) -{ - GinTuple *tup; - Size tuplen; - double reltuples = 0; - GinBuffer *buffer; - - /* GIN tuples from workers, merged by leader */ - double numtuples = 0; - - /* wait for workers to scan table and produce partial results */ - reltuples = _gin_parallel_heapscan(state); - - /* Execute the sort */ - pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, - PROGRESS_GIN_PHASE_PERFORMSORT_2); - - /* do the actual sort in the leader */ - tuplesort_performsort(state->bs_sortstate); - - /* - * Initialize buffer to combine entries for the same key. - * - * The leader is allowed to use the whole maintenance_work_mem buffer to - * combine data. The parallel workers already completed. - */ - buffer = GinBufferInit(state->ginstate.index); - - /* - * Set the progress target for the next phase. Reset the block number - * values set by table_index_build_scan - */ - { - const int progress_index[] = { - PROGRESS_CREATEIDX_SUBPHASE, - PROGRESS_CREATEIDX_TUPLES_TOTAL, - PROGRESS_SCAN_BLOCKS_TOTAL, - PROGRESS_SCAN_BLOCKS_DONE - }; - const int64 progress_vals[] = { - PROGRESS_GIN_PHASE_MERGE_2, - state->bs_numtuples, - 0, 0 - }; - - pgstat_progress_update_multi_param(4, progress_index, progress_vals); - } - - /* - * Read the GIN tuples from the shared tuplesort, sorted by category and - * key. That probably gives us order matching how data is organized in the - * index. - * - * We don't insert the GIN tuples right away, but instead accumulate as - * many TIDs for the same key as possible, and then insert that at once. - * This way we don't need to decompress/recompress the posting lists, etc. - */ - while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL) - { - CHECK_FOR_INTERRUPTS(); - - /* - * If the buffer can accept the new GIN tuple, just store it there and - * we're done. If it's a different key (or maybe too much data) flush - * the current contents into the index first. - */ - if (!GinBufferCanAddKey(buffer, tup)) - { - /* - * Buffer is not empty and it's storing a different key - flush - * the data into the insert, and start a new entry for current - * GinTuple. - */ - AssertCheckItemPointers(buffer); - Assert(!PointerIsValid(buffer->cached)); - - ginEntryInsert(&state->ginstate, - buffer->attnum, buffer->key, buffer->category, - buffer->items, buffer->nitems, &state->buildStats); - - /* discard the existing data */ - GinBufferReset(buffer); - } - - /* - * We're about to add a GIN tuple to the buffer - check the memory - * limit first, and maybe write out some of the data into the index - * first, if needed (and possible). We only flush the part of the TID - * list that we know won't change, and only if there's enough data for - * compression to work well. - */ - if (GinBufferShouldTrim(buffer, tup)) - { - Assert(buffer->nfrozen > 0); - - /* - * Buffer is not empty and it's storing a different key - flush - * the data into the insert, and start a new entry for current - * GinTuple. - */ - AssertCheckItemPointers(buffer); - Assert(!PointerIsValid(buffer->cached)); - - ginEntryInsert(&state->ginstate, - buffer->attnum, buffer->key, buffer->category, - buffer->items, buffer->nfrozen, &state->buildStats); - - /* truncate the data we've just discarded */ - GinBufferTrim(buffer); - } - - /* - * Remember data for the current tuple (either remember the new key, - * or append if to the existing data). - */ - GinBufferMergeTuple(buffer, tup); - - if (buffer->cached) - GinBufferUnpackCached(buffer, 0); - - /* Report progress */ - pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, - ++numtuples); - } - - /* flush data remaining in the buffer (for the last key) */ - if (!GinBufferIsEmpty(buffer)) - { - AssertCheckItemPointers(buffer); - Assert(!PointerIsValid(buffer->cached)); - - ginEntryInsert(&state->ginstate, - buffer->attnum, buffer->key, buffer->category, - buffer->items, buffer->nitems, &state->buildStats); - - /* discard the existing data */ - GinBufferReset(buffer); - - /* Report progress */ - pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, - ++numtuples); - } - - /* relase all the memory */ - GinBufferFree(buffer); - - tuplesort_end(state->bs_sortstate); - - return reltuples; -} - /* * Returns size of shared memory required to store state for a parallel * gin index build based on the snapshot its parallel scan will use. @@ -2093,6 +1962,9 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Prepare to track buffer usage during parallel execution */ InstrStartParallelQuery(); + /* attach to the fileset too */ + SharedFileSetAttach(&ginshared->fileset, seg); + /* * Might as well use reliable figure when doling out maintenance_work_mem * (when requested number of workers were not launched, this will be @@ -2103,6 +1975,20 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); + /* wait for workers to read the data and add them to tuplesort */ + if (BarrierArriveAndWait(&ginshared->build_barrier, + WAIT_EVENT_GIN_BUILD_SCAN)) + elog(LOG, "data scanned by workers, leader continues"); + + /* leader sorts and partitions the data */ + + /* wait for the leader to partition the data */ + if (BarrierArriveAndWait(&ginshared->build_barrier, + WAIT_EVENT_GIN_BUILD_PARTITION)) + elog(LOG, "data partitioned by leader, worker continues"); + + _gin_parallel_insert(&buildstate, ginshared, heapRel, indexRel, false); + /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); @@ -2375,3 +2261,235 @@ _gin_compare_tuples(GinTuple *a, GinTuple *b, SortSupport ssup) return ItemPointerCompare(GinTupleGetFirst(a), GinTupleGetFirst(b)); } + +static double +_gin_partition_sorted_data(GinBuildState *state) +{ + GinTuple *tup; + Size tuplen; + GinBuildShared *shared = state->bs_leader->ginshared; + BufFile **files; + int64 fileidx = 0; + double reltuples; + + /* how many tuples per worker */ + int64 worker_tuples = (state->indtuples / shared->scantuplesortstates) + 1; + int64 remaining = Min(worker_tuples, 1000); + int64 ntmp = 0; + + /* wait for workers to scan table and produce partial results */ + reltuples = _gin_parallel_heapscan(state); + + /* do the actual sort in the leader */ + tuplesort_performsort(state->bs_sortstate); + + /* Allocate BufFiles, one for each participants. */ + files = palloc0_array(BufFile *, shared->scantuplesortstates); + + for (int i = 0; i < shared->scantuplesortstates; i++) + { + char fname[MAXPGPATH]; + + sprintf(fname, "worker-%d", i); + + files[i] = BufFileCreateFileSet(&shared->fileset.fs, fname); + } + + /* + * Read the GIN tuples from the shared tuplesort, sorted by category and + * key. That probably gives us order matching how data is organized in the + * index. + * + * We don't insert the GIN tuples right away, but instead accumulate as + * many TIDs for the same key as possible, and then insert that at once. + * This way we don't need to decompress/recompress the posting lists, etc. + * + * XXX Maybe we should sort by key first, then by category? The idea is + * that if this matches the order of the keys in the index, we'd insert + * the entries in order better matching the index. + */ + while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL) + { + ntmp++; + + CHECK_FOR_INTERRUPTS(); + + /* + * FIXME Maybe move to next partition only when the index key changes? + * Otherwise we might have issues with 'could not fit onto page' when + * adding overlapping TID lists to the index. But maybe it can't with + * the merging of data in the tuplesort? + */ + + BufFileWrite(files[fileidx], &tuplen, sizeof(tuplen)); + BufFileWrite(files[fileidx], tup, tuplen); + + remaining--; + + /* move to the next file */ + if (remaining == 0) + { + remaining = Min(worker_tuples, 1000); + fileidx++; + fileidx = fileidx % shared->scantuplesortstates; + } + } + + /* close the files */ + for (int i = 0; i < shared->scantuplesortstates; i++) + { + BufFileClose(files[i]); + } + + /* and also close the tuplesort */ + tuplesort_end(state->bs_sortstate); + + /* wait for the leader to partition the data */ + if (BarrierArriveAndWait(&shared->build_barrier, + WAIT_EVENT_GIN_BUILD_PARTITION)) + elog(LOG, "data partitioned, leader continues"); + + return reltuples; +} + +static void +_gin_parallel_insert(GinBuildState *state, GinBuildShared *ginshared, + Relation heap, Relation index, bool progress) +{ + GinBuffer *buffer; + GinTuple *tup; + Size len; + + BufFile *file; + char fname[MAXPGPATH]; + char *buff; + int64 ntuples = 0; + Size maxlen; + + /* + * Initialize buffer to combine entries for the same key. + * + * The leader is allowed to use the whole maintenance_work_mem buffer to + * combine data. The parallel workers already completed. + */ + buffer = GinBufferInit(state->ginstate.index); + + + sprintf(fname, "worker-%d", ParallelWorkerNumber + 1); + file = BufFileOpenFileSet(&ginshared->fileset.fs, fname, O_RDONLY, false); + + /* 8kB seems like a reasonable starting point */ + maxlen = 8192; + buff = palloc(maxlen); + + while (true) + { + size_t ret; + + ret = BufFileRead(file, &len, sizeof(len)); + + if (ret == 0) + break; + if (ret != sizeof(len)) + elog(ERROR, "incorrect data %zu %zu", ret, sizeof(len)); + + /* maybe resize the buffer */ + if (maxlen < len) + { + while (maxlen < len) + maxlen *= 2; + + buff = repalloc(buff, maxlen); + } + + tup = (GinTuple *) buff; + + + BufFileReadExact(file, tup, len); + + ntuples++; + + if (ntuples % 100000 == 0) + elog(LOG, "inserted " INT64_FORMAT " tuples", ntuples); + + CHECK_FOR_INTERRUPTS(); + + /* + * If the buffer can accept the new GIN tuple, just store it there and + * we're done. If it's a different key (or maybe too much data) flush + * the current contents into the index first. + */ + if (!GinBufferCanAddKey(buffer, tup)) + { + /* + * Buffer is not empty and it's storing a different key - flush + * the data into the insert, and start a new entry for current + * GinTuple. + */ + AssertCheckItemPointers(buffer); + Assert(!PointerIsValid(buffer->cached)); + + ginEntryInsert(&state->ginstate, + buffer->attnum, buffer->key, buffer->category, + buffer->items, buffer->nitems, &state->buildStats); + + /* discard the existing data */ + GinBufferReset(buffer); + } + + /* + * We're about to add a GIN tuple to the buffer - check the memory + * limit first, and maybe write out some of the data into the index + * first, if needed (and possible). We only flush the part of the TID + * list that we know won't change, and only if there's enough data for + * compression to work well. + */ + if (GinBufferShouldTrim(buffer, tup)) + { + Assert(buffer->nfrozen > 0); + + /* + * Buffer is not empty and it's storing a different key - flush + * the data into the insert, and start a new entry for current + * GinTuple. + */ + AssertCheckItemPointers(buffer); + Assert(!PointerIsValid(buffer->cached)); + + ginEntryInsert(&state->ginstate, + buffer->attnum, buffer->key, buffer->category, + buffer->items, buffer->nfrozen, &state->buildStats); + + /* truncate the data we've just discarded */ + GinBufferTrim(buffer); + } + + /* + * Remember data for the current tuple (either remember the new key, + * or append if to the existing data). + */ + GinBufferMergeTuple(buffer, tup); + + if (buffer->cached) + GinBufferUnpackCached(buffer, 0); + } + + /* flush data remaining in the buffer (for the last key) */ + if (!GinBufferIsEmpty(buffer)) + { + AssertCheckItemPointers(buffer); + + Assert(!PointerIsValid(buffer->cached)); + ginEntryInsert(&state->ginstate, + buffer->attnum, buffer->key, buffer->category, + buffer->items, buffer->nitems, &state->buildStats); + + /* discard the existing data */ + GinBufferReset(buffer); + } + + /* relase all the memory */ + GinBufferFree(buffer); + + BufFileClose(file); +} diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index e199f071628..afb9be848a0 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -116,6 +116,8 @@ CHECKPOINT_DELAY_START "Waiting for a backend that blocks a checkpoint from star CHECKPOINT_DONE "Waiting for a checkpoint to complete." CHECKPOINT_START "Waiting for a checkpoint to start." EXECUTE_GATHER "Waiting for activity from a child process while executing a Gather plan node." +GIN_BUILD_SCAN "Wait for scan of data during parallel GIN index build." +GIN_BUILD_PARTITION "Wait for partition of data during parallel GIN index build." HASH_BATCH_ALLOCATE "Waiting for an elected Parallel Hash participant to allocate a hash table." HASH_BATCH_ELECT "Waiting to elect a Parallel Hash participant to allocate a hash table." HASH_BATCH_LOAD "Waiting for other Parallel Hash participants to finish loading a hash table." -- 2.48.1