From 8bc3a52c8bd2c94489a7f865bf366ad11642fd9b Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Fri, 24 Jan 2020 11:17:49 -0800 Subject: [PATCH v4 4/4] Add SharedBits API Add SharedBits API--a way for workers to collaboratively make a bitmap. The SharedBits store is currently meant for each backend to write to its own bitmap file in one phase and for a single worker to combine all of the bitmaps into a combined bitmap in another phase. In other words, it supports parallel write but not parallel scan (and not concurrent read/write). This could be modified in the future. Also, the SharedBits uses a SharedFileset which uses BufFiles. This is not the ideal API for the bitmap. The access pattern is small sequential writes and random reads. It would also be nice to maintain the fixed size buffer but have an API that let us write an arbitrary number of bytes to it in bufsize chunks without incurring additional function call overhead. This commit also moves the outer match status file and combined_bitmap into a new SharedBits store. Co-authored-by: Jesse Zhang --- src/backend/executor/adaptiveHashjoin.c | 18 +- src/backend/executor/nodeHash.c | 8 +- src/backend/executor/nodeHashjoin.c | 169 ++++++------- src/backend/storage/file/buffile.c | 51 ---- src/backend/utils/sort/Makefile | 1 + src/backend/utils/sort/sharedbits.c | 276 ++++++++++++++++++++++ src/backend/utils/sort/sharedtuplestore.c | 122 +--------- src/include/executor/hashjoin.h | 13 +- src/include/storage/buffile.h | 1 - src/include/utils/sharedbits.h | 40 ++++ src/include/utils/sharedtuplestore.h | 7 +- 11 files changed, 423 insertions(+), 283 deletions(-) create mode 100644 src/backend/utils/sort/sharedbits.c create mode 100644 src/include/utils/sharedbits.h diff --git a/src/backend/executor/adaptiveHashjoin.c b/src/backend/executor/adaptiveHashjoin.c index 45846a076916..6c6e27e55e49 100644 --- a/src/backend/executor/adaptiveHashjoin.c +++ b/src/backend/executor/adaptiveHashjoin.c @@ -13,9 +13,6 @@ #include "executor/adaptiveHashjoin.h" - - - bool ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) { @@ -291,10 +288,9 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ExecHashTableDetachBatch(hashtable); } - else if (accessor->combined_bitmap != NULL) + else if (sb_combined_exists(accessor->sba)) { - BufFileClose(accessor->combined_bitmap); - accessor->combined_bitmap = NULL; + sb_end_read(accessor->sba); accessor->done = true; /* @@ -308,7 +304,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) else { - sts_close_outer_match_status_file(accessor->outer_tuples); + sb_end_write(accessor->sba); /* * If all workers (including this one) have finished probing the @@ -329,7 +325,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * reach here. This worker must do some final cleanup and then * detach from the batch */ - accessor->combined_bitmap = sts_combine_outer_match_status_files(accessor->outer_tuples); + sb_combine(accessor->sba); ExecHashTableLoopDetachBatchForChosen(hashtable); hjstate->last_worker = true; return true; @@ -410,7 +406,11 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * to by this worker and readable by any worker */ if (hashtable->batches[batchno].shared->parallel_hashloop_fallback) - sts_make_outer_match_status_file(hashtable->batches[batchno].outer_tuples); + { + ParallelHashJoinBatchAccessor *accessor = hashtable->batches + hashtable->curbatch; + + sb_initialize_accessor(accessor->sba, sts_get_tuplenum(accessor->outer_tuples)); + } return true; diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index afdc31a3b30c..51050ce47edb 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -3052,7 +3052,9 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) { ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + SharedBits *sbits = ParallelHashJoinBatchOuterBits(shared, pstate->nparticipants); char name[MAXPGPATH]; + char sbname[MAXPGPATH]; shared->parallel_hashloop_fallback = false; LWLockInitialize(&shared->lock, @@ -3098,6 +3100,9 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) SHARED_TUPLESTORE_SINGLE_PASS, &pstate->fileset, name); + snprintf(sbname, MAXPGPATH, "%s.bitmaps", name); + accessor->sba = sb_initialize(sbits, pstate->nparticipants, + ParallelWorkerNumber + 1, &pstate->sbfileset, sbname); } MemoryContextSwitchTo(oldcxt); @@ -3169,11 +3174,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) { ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + SharedBits *sbits = ParallelHashJoinBatchOuterBits(shared, pstate->nparticipants); accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; - accessor->combined_bitmap = NULL; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), ParallelWorkerNumber + 1, @@ -3183,6 +3188,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) pstate->nparticipants), ParallelWorkerNumber + 1, &pstate->fileset); + accessor->sba = sb_attach(sbits, ParallelWorkerNumber + 1, &pstate->sbfileset); } MemoryContextSwitchTo(oldcxt); diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index a454cba54543..e282fb368ce7 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -908,86 +908,91 @@ ExecParallelHashJoin(PlanState *pstate) /* FALL THRU */ case HJ_SCAN_BUCKET: - - /* - * Scan the selected hash bucket for matches to current outer - */ - phj_batch = node->hj_HashTable->batches[node->hj_HashTable->curbatch].shared; - - if (!ExecParallelScanHashBucket(node, econtext)) { /* - * The current outer tuple has run out of matches, so - * check whether to emit a dummy outer-join tuple. Whether - * we emit one or not, the next state is NEED_NEW_OUTER. + * Scan the selected hash bucket for matches to current + * outer */ - node->hj_JoinState = HJ_NEED_NEW_OUTER; - if (!phj_batch->parallel_hashloop_fallback) + ParallelHashJoinBatchAccessor *accessor = + &node->hj_HashTable->batches[node->hj_HashTable->curbatch]; + + phj_batch = accessor->shared; + + if (!ExecParallelScanHashBucket(node, econtext)) { - TupleTableSlot *slot = emitUnmatchedOuterTuple(otherqual, econtext, node); + /* + * The current outer tuple has run out of matches, so + * check whether to emit a dummy outer-join tuple. + * Whether we emit one or not, the next state is + * NEED_NEW_OUTER. + */ + node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (!phj_batch->parallel_hashloop_fallback) + { + TupleTableSlot *slot = emitUnmatchedOuterTuple(otherqual, econtext, node); - if (slot != NULL) - return slot; + if (slot != NULL) + return slot; + } + continue; } - continue; - } - /* - * We've got a match, but still need to test non-hashed quals. - * ExecScanHashBucket already set up all the state needed to - * call ExecQual. - * - * If we pass the qual, then save state for next call and have - * ExecProject form the projection, store it in the tuple - * table, and return the slot. - * - * Only the joinquals determine tuple match status, but all - * quals must pass to actually return the tuple. - */ - if (joinqual != NULL && !ExecQual(joinqual, econtext)) - { - InstrCountFiltered1(node, 1); - break; - } + /* + * We've got a match, but still need to test non-hashed + * quals. ExecScanHashBucket already set up all the state + * needed to call ExecQual. + * + * If we pass the qual, then save state for next call and + * have ExecProject form the projection, store it in the + * tuple table, and return the slot. + * + * Only the joinquals determine tuple match status, but + * all quals must pass to actually return the tuple. + */ + if (joinqual != NULL && !ExecQual(joinqual, econtext)) + { + InstrCountFiltered1(node, 1); + break; + } - node->hj_MatchedOuter = true; - HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); + node->hj_MatchedOuter = true; + HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - /* - * TODO: how does this interact with PAHJ -- do I need to set - * matchbit? - */ - /* In an antijoin, we never return a matched tuple */ - if (node->js.jointype == JOIN_ANTI) - { - node->hj_JoinState = HJ_NEED_NEW_OUTER; - continue; - } + /* + * TODO: how does this interact with PAHJ -- do I need to + * set matchbit? + */ + /* In an antijoin, we never return a matched tuple */ + if (node->js.jointype == JOIN_ANTI) + { + node->hj_JoinState = HJ_NEED_NEW_OUTER; + continue; + } - /* - * If we only need to join to the first matching inner tuple, - * then consider returning this one, but after that continue - * with next outer tuple. - */ - if (node->js.single_match) - node->hj_JoinState = HJ_NEED_NEW_OUTER; + /* + * If we only need to join to the first matching inner + * tuple, then consider returning this one, but after that + * continue with next outer tuple. + */ + if (node->js.single_match) + node->hj_JoinState = HJ_NEED_NEW_OUTER; - /* - * Set the match bit for this outer tuple in the match status - * file - */ - if (phj_batch->parallel_hashloop_fallback) - { - sts_set_outer_match_status(hashtable->batches[hashtable->curbatch].outer_tuples, - econtext->ecxt_outertuple->tuplenum); + /* + * Set the match bit for this outer tuple in the match + * status file + */ + if (phj_batch->parallel_hashloop_fallback) + { + sb_setbit(accessor->sba, + econtext->ecxt_outertuple->tuplenum); + } + if (otherqual == NULL || ExecQual(otherqual, econtext)) + return ExecProject(node->js.ps.ps_ProjInfo); + else + InstrCountFiltered2(node, 1); + break; } - if (otherqual == NULL || ExecQual(otherqual, econtext)) - return ExecProject(node->js.ps.ps_ProjInfo); - else - InstrCountFiltered2(node, 1); - break; - case HJ_FILL_INNER_TUPLES: /* @@ -1072,8 +1077,6 @@ ExecParallelHashJoin(PlanState *pstate) ParallelHashJoinBatchAccessor *batch_accessor = &node->hj_HashTable->batches[node->hj_HashTable->curbatch]; - Assert(batch_accessor->combined_bitmap != NULL); - /* * TODO: there should be a way to know the current batch * for the purposes of getting @@ -1092,33 +1095,10 @@ ExecParallelHashJoin(PlanState *pstate) { tupleMetadata metadata; - if ((tuple = - sts_parallel_scan_next(outer_acc, &metadata)) == - NULL) + if ((tuple = sts_parallel_scan_next(outer_acc, &metadata)) == NULL) break; - uint32 bytenum = metadata.tupleid / 8; - unsigned char bit = metadata.tupleid % 8; - unsigned char byte_to_check = 0; - - /* seek to byte to check */ - if (BufFileSeek(batch_accessor->combined_bitmap, - 0, - bytenum, - SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg( - "could not rewind shared outer temporary file: %m"))); - /* read byte containing ntuple bit */ - if (BufFileRead(batch_accessor->combined_bitmap, &byte_to_check, 1) == - 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg( - "could not read byte in outer match status bitmap: %m."))); - /* if bit is set */ - bool match = ((byte_to_check) >> bit) & 1; + bool match = sb_checkbit(batch_accessor->sba, metadata.tupleid); if (!match) break; @@ -1990,6 +1970,7 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) /* Set up the space we'll use for shared temporary files. */ SharedFileSetInit(&pstate->fileset, pcxt->seg); + SharedFileSetInit(&pstate->sbfileset, pcxt->seg); /* Initialize the shared state in the hash node. */ hashNode = (HashState *) innerPlanState(state); diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index cb49329d3fb1..f0e920b41618 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -269,57 +269,6 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) return file; } -/* - * Open a shared file created by any backend if it exists, otherwise return NULL - */ -BufFile * -BufFileOpenSharedIfExists(SharedFileSet *fileset, const char *name) -{ - BufFile *file; - char segment_name[MAXPGPATH]; - Size capacity = 16; - File *files; - int nfiles = 0; - - files = palloc(sizeof(File) * capacity); - - /* - * We don't know how many segments there are, so we'll probe the - * filesystem to find out. - */ - for (;;) - { - /* See if we need to expand our file segment array. */ - if (nfiles + 1 > capacity) - { - capacity *= 2; - files = repalloc(files, sizeof(File) * capacity); - } - /* Try to load a segment. */ - SharedSegmentName(segment_name, name, nfiles); - files[nfiles] = SharedFileSetOpen(fileset, segment_name); - if (files[nfiles] <= 0) - break; - ++nfiles; - - CHECK_FOR_INTERRUPTS(); - } - - /* - * If we didn't find any files at all, then no BufFile exists with this - * name. - */ - if (nfiles == 0) - return NULL; - file = makeBufFileCommon(nfiles); - file->files = files; - file->readOnly = true; /* Can't write to files opened this way */ - file->fileset = fileset; - file->name = pstrdup(name); - - return file; -} - /* * Open a file that was previously created in another backend (or this one) * with BufFileCreateShared in the same SharedFileSet using the same name. diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile index 7ac3659261e3..f11fe85aeb31 100644 --- a/src/backend/utils/sort/Makefile +++ b/src/backend/utils/sort/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ logtape.o \ + sharedbits.o \ sharedtuplestore.o \ sortsupport.o \ tuplesort.o \ diff --git a/src/backend/utils/sort/sharedbits.c b/src/backend/utils/sort/sharedbits.c new file mode 100644 index 000000000000..9d04d6b23661 --- /dev/null +++ b/src/backend/utils/sort/sharedbits.c @@ -0,0 +1,276 @@ +#include "postgres.h" +#include "storage/buffile.h" +#include "utils/sharedbits.h" + +/* TODO: put a comment about not currently supporting parallel scan of the SharedBits */ + +/* Per-participant shared state */ +struct SharedBitsParticipant +{ + bool present; + bool writing; +}; + +/* Shared control object */ +struct SharedBits +{ + int nparticipants; /* Number of participants that can write. */ + int64 nbits; + char name[NAMEDATALEN]; /* A name for this tuplestore. */ + + SharedBitsParticipant participants[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* backend-local state */ +struct SharedBitsAccessor +{ + int participant; + SharedBits *bits; + SharedFileSet *fileset; + BufFile *write_file; + BufFile *combined; +}; + +SharedBitsAccessor * +sb_attach(SharedBits *sbits, int my_participant_number, SharedFileSet *fileset) +{ + SharedBitsAccessor *accessor = palloc0(sizeof(SharedBitsAccessor)); + + accessor->participant = my_participant_number; + accessor->bits = sbits; + accessor->fileset = fileset; + accessor->write_file = NULL; + accessor->combined = NULL; + return accessor; +} + +SharedBitsAccessor * +sb_initialize(SharedBits *sbits, + int participants, + int my_participant_number, + SharedFileSet *fileset, + char *name) +{ + SharedBitsAccessor *accessor; + + sbits->nparticipants = participants; + strcpy(sbits->name, name); + sbits->nbits = 0; /* TODO: maybe delete this */ + + accessor = palloc0(sizeof(SharedBitsAccessor)); + accessor->participant = my_participant_number; + accessor->bits = sbits; + accessor->fileset = fileset; + accessor->write_file = NULL; + accessor->combined = NULL; + return accessor; +} + +/* TODO: is "initialize_accessor" a clear enough API for this? (making the file)? */ +void +sb_initialize_accessor(SharedBitsAccessor *accessor, uint32 nbits) +{ + char name[MAXPGPATH]; + + snprintf(name, MAXPGPATH, "%s.p%d.bitmap", accessor->bits->name, accessor->participant); + + accessor->write_file = + BufFileCreateShared(accessor->fileset, name); + + accessor->bits->participants[accessor->participant].present = true; + /* TODO: check this math. tuplenumber will be too high. */ + uint32 num_to_write = nbits / 8 + 1; + + /* + * TODO: add tests that could exercise a problem with junk being written + * to bitmap + */ + + /* + * TODO: is there a better way to write the bytes to the file without + * calling + */ + + /* + * BufFileWrite() like this? palloc()ing an undetermined number of bytes + * feels + */ + + /* + * like it is against the spirit of this patch to begin with, but the many + * function + */ + /* calls seem expensive */ + for (int i = 0; i < num_to_write; i++) + { + unsigned char byteToWrite = 0; + + BufFileWrite(accessor->write_file, &byteToWrite, 1); + } + + if (BufFileSeek(accessor->write_file, 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); +} + +size_t +sb_estimate(int participants) +{ + return offsetof(SharedBits, participants) + participants * sizeof(SharedBitsParticipant); +} + + +void +sb_setbit(SharedBitsAccessor *accessor, uint64 bit) +{ + Assert(accessor->write_file); + SharedBitsParticipant *const participant = + &accessor->bits->participants[accessor->participant]; + + if (!participant->writing) + participant->writing = true; + unsigned char current_outer_byte; + + BufFileSeek(accessor->write_file, 0, bit / 8, SEEK_SET); + BufFileRead(accessor->write_file, ¤t_outer_byte, 1); + + current_outer_byte |= 1U << (bit % 8); + + /* TODO: don't seek back one but instead seek explicitly to that byte */ + BufFileSeek(accessor->write_file, 0, -1, SEEK_CUR); + BufFileWrite(accessor->write_file, ¤t_outer_byte, 1); +} + +bool +sb_checkbit(SharedBitsAccessor *accessor, uint32 n) +{ + Assert(accessor->combined); + uint32 bytenum = n / 8; + unsigned char bit = n % 8; + unsigned char byte_to_check = 0; + + /* seek to byte to check */ + if (BufFileSeek(accessor->combined, + 0, + bytenum, + SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg( + "could not rewind shared outer temporary file: %m"))); + /* read byte containing ntuple bit */ + if (BufFileRead(accessor->combined, &byte_to_check, 1) == 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg( + "could not read byte in outer match status bitmap: %m."))); + /* if bit is set */ + bool match = ((byte_to_check) >> bit) & 1; + + return match; +} + +BufFile * +sb_combine(SharedBitsAccessor *accessor) +{ + /* TODO: this tries to close an outer match status file for */ + /* each participant in the tuplestore. technically, only participants */ + /* in the barrier could have outer match status files, however, */ + /* all but one participant continue on and detach from the barrier */ + /* so we won't have a reliable way to close only files for those attached */ + /* to the barrier */ + int nbparticipants = 0; + + for (int l = 0; l < accessor->bits->nparticipants; l++) + { + SharedBitsParticipant participant = accessor->bits->participants[l]; + + if (participant.present) + { + Assert(!participant.writing); + nbparticipants++; + } + } + BufFile **statuses = palloc(sizeof(BufFile *) * nbparticipants); + + /* + * Open the bitmap shared BufFile from each participant. TODO: explain why + * file can be NULLs + */ + int statuses_length = 0; + + for (int i = 0; i < accessor->bits->nparticipants; i++) + { + char bitmap_filename[MAXPGPATH]; + + /* TODO: make a function that will do this */ + snprintf(bitmap_filename, MAXPGPATH, "%s.p%d.bitmap", accessor->bits->name, i); + + if (!accessor->bits->participants[i].present) + continue; + BufFile *file = BufFileOpenShared(accessor->fileset, bitmap_filename); + + Assert(file); + + statuses[statuses_length++] = file; + } + + BufFile *combined_bitmap_file = BufFileCreateTemp(false); + + for (int64 cur = 0; cur < BufFileSize(statuses[0]); cur++) /* make it while not EOF */ + { + unsigned char combined_byte = 0; + + for (int i = 0; i < statuses_length; i++) + { + unsigned char read_byte; + + BufFileRead(statuses[i], &read_byte, 1); + combined_byte |= read_byte; + } + + BufFileWrite(combined_bitmap_file, &combined_byte, 1); + } + + if (BufFileSeek(combined_bitmap_file, 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + + for (int i = 0; i < statuses_length; i++) + BufFileClose(statuses[i]); + pfree(statuses); + + accessor->combined = combined_bitmap_file; + return combined_bitmap_file; +} + +/* TODO: this is an API leak. We should be able to use something in the hashjoin state */ +/* to indicate that the worker is the elected worker */ +/* We tried using last_worker, but the problem is that last_worker can be false when */ +/* there is a combined file (meaning this is the last worker), so, clearly, something needs */ +/* to change about the flag. it is not expressing what it was meant to express. */ +bool +sb_combined_exists(SharedBitsAccessor *accessor) +{ + return accessor->combined != NULL; +} + +void +sb_end_write(SharedBitsAccessor *sba) +{ + SharedBitsParticipant + *const participant = &sba->bits->participants[sba->participant]; + + participant->writing = false; + BufFileClose(sba->write_file); + sba->write_file = NULL; +} + +void +sb_end_read(SharedBitsAccessor *accessor) +{ + BufFileClose(accessor->combined); + accessor->combined = NULL; +} diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 0e5e9db82034..045b8eca80dc 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -98,15 +98,10 @@ struct SharedTuplestoreAccessor BlockNumber write_page; /* The next page to write to. */ char *write_pointer; /* Current write pointer within chunk. */ char *write_end; /* One past the end of the current chunk. */ - - /* Bitmap of matched outer tuples (currently only used for hashjoin). */ - BufFile *outer_match_status_file; }; static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant); -static void - sts_bitmap_filename(char *name, SharedTuplestoreAccessor *accessor, int participant); /* * Return the amount of shared memory required to hold SharedTuplestore for a @@ -178,7 +173,6 @@ sts_initialize(SharedTuplestore *sts, int participants, accessor->sts = sts; accessor->fileset = fileset; accessor->context = CurrentMemoryContext; - accessor->outer_match_status_file = NULL; return accessor; } @@ -641,120 +635,10 @@ sts_increment_tuplenum(SharedTuplestoreAccessor *accessor) return pg_atomic_fetch_add_u32(&accessor->sts->ntuples, 1); } -void -sts_make_outer_match_status_file(SharedTuplestoreAccessor *accessor) -{ - uint32 tuplenum = pg_atomic_read_u32(&accessor->sts->ntuples); - - /* don't make the outer match status file if there are no tuples */ - if (tuplenum == 0) - return; - - char name[MAXPGPATH]; - - sts_bitmap_filename(name, accessor, accessor->participant); - - accessor->outer_match_status_file = BufFileCreateShared(accessor->fileset, name); - - /* TODO: check this math. tuplenumber will be too high. */ - uint32 num_to_write = tuplenum / 8 + 1; - - unsigned char byteToWrite = 0; - - BufFileWrite(accessor->outer_match_status_file, &byteToWrite, num_to_write); - - if (BufFileSeek(accessor->outer_match_status_file, 0, 0L, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind hash-join temporary file: %m"))); -} - -void -sts_set_outer_match_status(SharedTuplestoreAccessor *accessor, uint32 tuplenum) -{ - BufFile *parallel_outer_matchstatuses = accessor->outer_match_status_file; - unsigned char current_outer_byte; - - BufFileSeek(parallel_outer_matchstatuses, 0, tuplenum / 8, SEEK_SET); - BufFileRead(parallel_outer_matchstatuses, ¤t_outer_byte, 1); - - current_outer_byte |= 1U << (tuplenum % 8); - - if (BufFileSeek(parallel_outer_matchstatuses, 0, -1, SEEK_CUR) != 0) - elog(ERROR, "there is a problem with outer match status file. pid %i.", MyProcPid); - BufFileWrite(parallel_outer_matchstatuses, ¤t_outer_byte, 1); -} - -void -sts_close_outer_match_status_file(SharedTuplestoreAccessor *accessor) -{ - BufFileClose(accessor->outer_match_status_file); -} - -BufFile * -sts_combine_outer_match_status_files(SharedTuplestoreAccessor *accessor) -{ - /* TODO: this tries to close an outer match status file for */ - /* each participant in the tuplestore. technically, only participants */ - /* in the barrier could have outer match status files, however, */ - /* all but one participant continue on and detach from the barrier */ - /* so we won't have a reliable way to close only files for those attached */ - /* to the barrier */ - BufFile **statuses = palloc(sizeof(BufFile *) * accessor->sts->nparticipants); - - /* - * Open the bitmap shared BufFile from each participant. TODO: explain why - * file can be NULLs - */ - int statuses_length = 0; - - for (int i = 0; i < accessor->sts->nparticipants; i++) - { - char bitmap_filename[MAXPGPATH]; - - sts_bitmap_filename(bitmap_filename, accessor, i); - BufFile *file = BufFileOpenSharedIfExists(accessor->fileset, bitmap_filename); - - if (file != NULL) - statuses[statuses_length++] = file; - } - - BufFile *combined_bitmap_file = BufFileCreateTemp(false); - - for (int64 cur = 0; cur < BufFileSize(statuses[0]); cur++) - /* make it while not */ - EOF - { - unsigned char combined_byte = 0; - - for (int i = 0; i < statuses_length; i++) - { - unsigned char read_byte; - - BufFileRead(statuses[i], &read_byte, 1); - combined_byte |= read_byte; - } - - BufFileWrite(combined_bitmap_file, &combined_byte, 1); - } - - if (BufFileSeek(combined_bitmap_file, 0, 0L, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind hash-join temporary file: %m"))); - - for (int i = 0; i < statuses_length; i++) - BufFileClose(statuses[i]); - pfree(statuses); - - return combined_bitmap_file; -} - - -static void -sts_bitmap_filename(char *name, SharedTuplestoreAccessor *accessor, int participant) +uint32 +sts_get_tuplenum(SharedTuplestoreAccessor *accessor) { - snprintf(name, MAXPGPATH, "%s.p%d.bitmap", accessor->sts->name, participant); + return pg_atomic_read_u32(&accessor->sts->ntuples); } /* diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index b2cc12dc19be..164a97ef9625 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -19,6 +19,7 @@ #include "storage/barrier.h" #include "storage/buffile.h" #include "storage/lwlock.h" +#include "utils/sharedbits.h" /* ---------------------------------------------------------------- * hash-join hash table structures @@ -193,10 +194,17 @@ typedef struct ParallelHashJoinBatch ((char *) ParallelHashJoinBatchInner(batch) + \ MAXALIGN(sts_estimate(nparticipants)))) +/* Accessor for sharedbits following a ParallelHashJoinBatch. */ +#define ParallelHashJoinBatchOuterBits(batch, nparticipants) \ + ((SharedBits *) \ + ((char *) ParallelHashJoinBatchOuter(batch, nparticipants) + \ + MAXALIGN(sts_estimate(nparticipants)))) + /* Total size of a ParallelHashJoinBatch and tuplestores. */ #define EstimateParallelHashJoinBatch(hashtable) \ (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \ - MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2) + MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2 + \ + MAXALIGN(sb_estimate((hashtable)->parallel_state->nparticipants))) /* Accessor for the nth ParallelHashJoinBatch given the base. */ #define NthParallelHashJoinBatch(base, n) \ @@ -221,9 +229,9 @@ typedef struct ParallelHashJoinBatchAccessor bool at_least_one_chunk; /* has this backend allocated a chunk? */ bool done; /* flag to remember that a batch is done */ - BufFile *combined_bitmap; /* for Adaptive Hashjoin only */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; + SharedBitsAccessor *sba; } ParallelHashJoinBatchAccessor; /* @@ -270,6 +278,7 @@ typedef struct ParallelHashJoinState pg_atomic_uint32 distributor; /* counter for load balancing */ SharedFileSet fileset; /* space for shared temporary files */ + SharedFileSet sbfileset; } ParallelHashJoinState; /* The phases for building batches, used by build_barrier. */ diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index f790f7e12186..82c0f8361154 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -48,7 +48,6 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name); extern void BufFileExportShared(BufFile *file); -extern BufFile *BufFileOpenSharedIfExists(SharedFileSet *fileset, const char *name); extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name); extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name); diff --git a/src/include/utils/sharedbits.h b/src/include/utils/sharedbits.h new file mode 100644 index 000000000000..a554a59a38b8 --- /dev/null +++ b/src/include/utils/sharedbits.h @@ -0,0 +1,40 @@ +/*------------------------------------------------------------------------- + * + * sharedbits.h + * Simple mechanism for sharing bits between backends. + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/utils/sharedbits.h + * + *------------------------------------------------------------------------- + */ +#ifndef SHAREDBITS_H +#define SHAREDBITS_H + +#include "storage/sharedfileset.h" + +struct SharedBits; +typedef struct SharedBits SharedBits; + +struct SharedBitsParticipant; +typedef struct SharedBitsParticipant SharedBitsParticipant; + +struct SharedBitsAccessor; +typedef struct SharedBitsAccessor SharedBitsAccessor; + +extern SharedBitsAccessor *sb_attach(SharedBits *sbits, int my_participant_number, SharedFileSet *fileset); +extern SharedBitsAccessor *sb_initialize(SharedBits *sbits, int participants, int my_participant_number, SharedFileSet *fileset, char *name); +extern void sb_initialize_accessor(SharedBitsAccessor *accessor, uint32 nbits); +extern size_t sb_estimate(int participants); + +extern void sb_setbit(SharedBitsAccessor *accessor, uint64 bit); +extern bool sb_checkbit(SharedBitsAccessor *accessor, uint32 n); +extern BufFile *sb_combine(SharedBitsAccessor *accessor); +extern bool sb_combined_exists(SharedBitsAccessor *accessor); + +extern void sb_end_write(SharedBitsAccessor *sba); +extern void sb_end_read(SharedBitsAccessor *accessor); + +#endif /* SHAREDBITS_H */ diff --git a/src/include/utils/sharedtuplestore.h b/src/include/utils/sharedtuplestore.h index 8b2433e5c4b0..5e78f4bb15b7 100644 --- a/src/include/utils/sharedtuplestore.h +++ b/src/include/utils/sharedtuplestore.h @@ -71,11 +71,6 @@ extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, extern uint32 sts_increment_tuplenum(SharedTuplestoreAccessor *accessor); - -extern void sts_make_outer_match_status_file(SharedTuplestoreAccessor *accessor); -extern void sts_set_outer_match_status(SharedTuplestoreAccessor *accessor, uint32 tuplenum); -extern void sts_close_outer_match_status_file(SharedTuplestoreAccessor *accessor); -extern BufFile *sts_combine_outer_match_status_files(SharedTuplestoreAccessor *accessor); - +extern uint32 sts_get_tuplenum(SharedTuplestoreAccessor *accessor); #endif /* SHAREDTUPLESTORE_H */ -- 2.25.0