From d764f828e1cbbf6a69b9adf929282cb12b501ddb Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 23 Aug 2021 15:07:28 +0530 Subject: [PATCH v2 2/2] Better usage of sharedfileset in apply worker Instead of using a separate shared fileset for each xid, use one shared fileset for whole lifetime of the worker. So for each xid, just create shared buffile under that shared fileset and remove the file whenever we are done with the file. For subxact file we only need to create once we get the first subtransaction and for detecting that we also extend the buffile open and buffile delete interfaces to allow the missing files. --- src/backend/replication/logical/worker.c | 238 ++++++------------------------ src/backend/storage/file/buffile.c | 23 ++- src/backend/utils/sort/logtape.c | 2 +- src/backend/utils/sort/sharedtuplestore.c | 3 +- src/include/storage/buffile.h | 5 +- 5 files changed, 65 insertions(+), 206 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d98ebab..f991e09 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -221,20 +221,6 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; -/* - * Stream xid hash entry. Whenever we see a new xid we create this entry in the - * xidhash and along with it create the streaming file and store the fileset handle. - * The subxact file is created iff there is any subxact info under this xid. This - * entry is used on the subsequent streams for the xid to get the corresponding - * fileset handles, so storing them in hash makes the search faster. - */ -typedef struct StreamXidHash -{ - TransactionId xid; /* xid is the hash key and must be first */ - FileSet *stream_fileset; /* file set for stream data */ - FileSet *subxact_fileset; /* file set for subxact info */ -} StreamXidHash; - static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; @@ -255,10 +241,11 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; /* - * Hash table for storing the streaming xid information along with filesets - * for streaming and subxact files. + * Fileset for storing the changes and subxact information for the streaming + * transaction. We will use only one fileset and for each xid a separate + * changes and subxact files will be created under the same fileset. */ -static HTAB *xidhash = NULL; +static FileSet *xidfileset = NULL; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -1129,7 +1116,6 @@ static void apply_handle_stream_start(StringInfo s) { bool first_segment; - HASHCTL hash_ctl; if (in_streamed_transaction) ereport(ERROR, @@ -1157,17 +1143,21 @@ apply_handle_stream_start(StringInfo s) errmsg_internal("invalid transaction ID in streamed replication transaction"))); /* - * Initialize the xidhash table if we haven't yet. This will be used for - * the entire duration of the apply worker so create it in permanent - * context. + * Initialize the xidfileset if we haven't yet. This will be used for the + * entire duration of the apply worker so create it in permanent context. */ - if (xidhash == NULL) + if (xidfileset == NULL) { - hash_ctl.keysize = sizeof(TransactionId); - hash_ctl.entrysize = sizeof(StreamXidHash); - hash_ctl.hcxt = ApplyContext; - xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + MemoryContext oldctx; + + /* + * We need to keep the fileset for the worker lifetime so need to + * allocate it in a persistent context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + xidfileset = palloc(sizeof(FileSet)); + FileSetInit(xidfileset); + MemoryContextSwitchTo(oldctx); } /* open the spool file for this transaction */ @@ -1258,7 +1248,6 @@ apply_handle_stream_abort(StringInfo s) BufFile *fd; bool found = false; char path[MAXPGPATH]; - StreamXidHash *ent; subidx = -1; begin_replication_step(); @@ -1287,19 +1276,9 @@ apply_handle_stream_abort(StringInfo s) return; } - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, false); /* OK, truncate the file at the right offset */ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, @@ -1327,7 +1306,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1345,17 +1323,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(xidfileset, path, O_RDONLY, false); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2508,31 +2476,14 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } } - /* - * Cleanup filesets. +/* + * Cleanup fileset if created. */ static void worker_cleanup(int code, Datum arg) { - HASH_SEQ_STATUS status; - StreamXidHash *hentry; - - /* - * Scan the xidhash table if created and from each entry delete stream - * fileset and the subxact fileset. - */ - if (xidhash) - { - hash_seq_init(&status, xidhash); - while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) - { - FileSetDeleteAll(hentry->stream_fileset); - - /* Delete the subxact fileset only if it is created */ - if (hentry->subxact_fileset) - FileSetDeleteAll(hentry->subxact_fileset); - } - } + if (xidfileset != NULL) + FileSetDeleteAll(xidfileset); } /* @@ -2562,7 +2513,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ALLOCSET_DEFAULT_SIZES); /* - * Register before-shmem-exit hook to ensure filesets are dropped while we + * Register before-shmem-exit hook to ensure fileset is dropped while we * can still report stats for underlying temporary files. */ before_shmem_exit(worker_cleanup, (Datum) 0); @@ -2990,18 +2941,11 @@ subxact_info_write(Oid subid, TransactionId xid) { char path[MAXPGPATH]; Size len; - StreamXidHash *ent; BufFile *fd; Assert(TransactionIdIsValid(xid)); - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - /* By this time we must have created the transaction entry */ - Assert(ent); + subxact_filename(path, subid, xid); /* * If there is no subtransaction then nothing to do, but if already have @@ -3009,39 +2953,18 @@ subxact_info_write(Oid subid, TransactionId xid) */ if (subxact_data.nsubxacts == 0) { - if (ent->subxact_fileset) - { - cleanup_subxact_info(); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + cleanup_subxact_info(); + BufFileDeleteFileSet(xidfileset, path, true); + return; } subxact_filename(path, subid, xid); - /* - * Create the subxact file if it not already created, otherwise open the - * existing file. - */ - if (ent->subxact_fileset == NULL) - { - MemoryContext oldctx; - - /* - * We need to maintain fileset across multiple stream start/stop calls. - * So, need to allocate it in a persistent context. - */ - oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(FileSet)); - FileSetInit(ent->subxact_fileset); - MemoryContextSwitchTo(oldctx); - - fd = BufFileCreateFileSet(ent->subxact_fileset, path); - } - else - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); + /* Try to open the subxact file, if it doesn't exist then create it */ + fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, true); + if (fd == NULL) + fd = BufFileCreateFileSet(xidfileset, path); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3068,34 +2991,17 @@ subxact_info_read(Oid subid, TransactionId xid) char path[MAXPGPATH]; Size len; BufFile *fd; - StreamXidHash *ent; MemoryContext oldctx; Assert(!subxact_data.subxacts); Assert(subxact_data.nsubxacts == 0); Assert(subxact_data.nsubxacts_max == 0); - /* Find the stream xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - /* - * If subxact_fileset is not valid that mean we don't have any subxact - * info - */ - if (ent->subxact_fileset == NULL) - return; - subxact_filename(path, subid, xid); - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(xidfileset, path, O_RDONLY, true); + if (fd == NULL) + return; /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3237,36 +3143,13 @@ static void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - StreamXidHash *ent; - - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); /* Delete the change file and release the stream fileset memory */ changes_filename(path, subid, xid); - FileSetDeleteAll(ent->stream_fileset); - pfree(ent->stream_fileset); - ent->stream_fileset = NULL; - - /* Delete the subxact file and release the memory, if it exist */ - if (ent->subxact_fileset) - { - subxact_filename(path, subid, xid); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + BufFileDeleteFileSet(xidfileset, path, false); - /* Remove the xid entry from the stream xid hash */ - hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); + subxact_filename(path, subid, xid); + BufFileDeleteFileSet(xidfileset, path, true); } /* @@ -3276,8 +3159,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the fileset and create the buffile, - * otherwise open the previously created file. + * changes for this transaction, create the buffile, otherwise open the + * previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3286,20 +3169,13 @@ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) { char path[MAXPGPATH]; - bool found; MemoryContext oldcxt; - StreamXidHash *ent; Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); - /* create or find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_ENTER, - &found); changes_filename(path, subid, xid); elog(DEBUG1, "opening file \"%s\" for streamed changes", path); @@ -3316,44 +3192,14 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) * writing, in append mode. */ if (first_segment) - { - MemoryContext savectx; - FileSet *fileset; - - if (found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - - /* - * We need to maintain fileset across multiple stream start/stop calls. - * So, need to allocate it in a persistent context. - */ - savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(FileSet)); - - FileSetInit(fileset); - MemoryContextSwitchTo(savectx); - - stream_fd = BufFileCreateFileSet(fileset, path); - - /* Remember the fileset for the next stream of the same transaction */ - ent->xid = xid; - ent->stream_fileset = fileset; - ent->subxact_fileset = NULL; - } + stream_fd = BufFileCreateFileSet(xidfileset, path); else { - if (!found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - /* * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, false); BufFileSeek(stream_fd, 0, 0, SEEK_END); } diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index b9ca298..dbaa0e6 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -278,10 +278,12 @@ BufFileCreateFileSet(FileSet *fileset, const char *name) * with BufFileCreateFileSet in the same FileSet using the same name. * The backend that created the file must have called BufFileClose() or * BufFileExportFileSet() to make sure that it is ready to be opened by other - * backends and render it read-only. + * backends and render it read-only. If missing_ok is true then it will return + * NULL if file doesn't exist otherwise error. */ BufFile * -BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) +BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, + bool missing_ok) { BufFile *file; char segment_name[MAXPGPATH]; @@ -318,10 +320,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * name. */ if (nfiles == 0) + { + /* free the memory */ + pfree(files); + + if (missing_ok) + return NULL; + ereport(ERROR, (errcode_for_file_access(), errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", segment_name, name))); + } file = makeBufFileCommon(nfiles); file->files = files; @@ -341,10 +351,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * the FileSet to be cleaned up. * * Only one backend should attempt to delete a given name, and should know - * that it exists and has been exported or closed. + * that it exists and has been exported or closed otherwise missing_ok should + * be passed true. */ void -BufFileDeleteFileSet(FileSet *fileset, const char *name) +BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok) { char segment_name[MAXPGPATH]; int segment = 0; @@ -358,7 +369,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) for (;;) { SegmentName(segment_name, name, segment); - if (!FileSetDelete(fileset, segment_name, true)) + if (!FileSetDelete(fileset, segment_name, !missing_ok)) break; found = true; ++segment; @@ -366,7 +377,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) CHECK_FOR_INTERRUPTS(); } - if (!found) + if (!found && !missing_ok) elog(ERROR, "could not delete unknown BufFile \"%s\"", name); } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index f7994d7..debf12e1 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY); + file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false); filesize = BufFileSize(file); /* diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 72acd54..8c5135c 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY); + BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, + false); } /* Seek and load the chunk header. */ diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 032a823..5e9df44 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); extern void BufFileExportFileSet(BufFile *file); extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, - int mode); -extern void BufFileDeleteFileSet(FileSet *fileset, const char *name); + int mode, bool missing_ok); +extern void BufFileDeleteFileSet(FileSet *fileset, const char *name, + bool missing_ok); extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */ -- 1.8.3.1