From 7368136290a67e49f6bf0ad5773be67243e99637 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 20 Aug 2021 11:32:33 +0530 Subject: [PATCH v1 2/2] Better usage of sharedfileset in apply worker Instead of using a separate shared fileset for each xid, use one shared fileset for whole lifetime of the worker. So for each xid, just create shared buffile under that shared fileset and remove the file whenever we are done with the file. For subxact file we only need to create once we get the first subtransaction and for detecting that we also extend the buffile open and buffile delete interfaces to allow the missing files. --- src/backend/replication/logical/worker.c | 229 +++++++----------------------- src/backend/storage/file/buffile.c | 23 ++- src/backend/utils/sort/logtape.c | 2 +- src/backend/utils/sort/sharedtuplestore.c | 3 +- src/include/storage/buffile.h | 5 +- 5 files changed, 76 insertions(+), 186 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9901cf6..77cad7f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -221,20 +221,6 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; -/* - * Stream xid hash entry. Whenever we see a new xid we create this entry in the - * xidhash and along with it create the streaming file and store the fileset handle. - * The subxact file is created iff there is any subxact info under this xid. This - * entry is used on the subsequent streams for the xid to get the corresponding - * fileset handles, so storing them in hash makes the search faster. - */ -typedef struct StreamXidHash -{ - TransactionId xid; /* xid is the hash key and must be first */ - FileSet *stream_fileset; /* shared file set for stream data */ - FileSet *subxact_fileset; /* shared file set for subxact info */ -} StreamXidHash; - static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; @@ -255,10 +241,12 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; /* - * Hash table for storing the streaming xid information along with shared file - * set for streaming and subxact files. + * Fileset for storing the changes and subxact information for the streaming + * transaction. We will use only one fileset and for each xid a separate + * changes and subxact files will be created under the same fileset. */ -static HTAB *xidhash = NULL; +static FileSet *xidfileset = NULL; + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -1129,7 +1117,6 @@ static void apply_handle_stream_start(StringInfo s) { bool first_segment; - HASHCTL hash_ctl; if (in_streamed_transaction) ereport(ERROR, @@ -1157,17 +1144,21 @@ apply_handle_stream_start(StringInfo s) errmsg_internal("invalid transaction ID in streamed replication transaction"))); /* - * Initialize the xidhash table if we haven't yet. This will be used for - * the entire duration of the apply worker so create it in permanent - * context. + * Initialize the xidfileset if we haven't yet. This will be used for the + * entire duration of the apply worker so create it in permanent context. */ - if (xidhash == NULL) + if (xidfileset == NULL) { - hash_ctl.keysize = sizeof(TransactionId); - hash_ctl.entrysize = sizeof(StreamXidHash); - hash_ctl.hcxt = ApplyContext; - xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + MemoryContext oldctx; + + /* + * We need to keep the shared fileset for the worker lifetime so, need + * to allocate it in a persistent context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + xidfileset = palloc(sizeof(FileSet)); + FileSetInit(xidfileset); + MemoryContextSwitchTo(oldctx); } /* open the spool file for this transaction */ @@ -1258,7 +1249,6 @@ apply_handle_stream_abort(StringInfo s) BufFile *fd; bool found = false; char path[MAXPGPATH]; - StreamXidHash *ent; subidx = -1; begin_replication_step(); @@ -1287,19 +1277,9 @@ apply_handle_stream_abort(StringInfo s) return; } - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, false); /* OK, truncate the file at the right offset */ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, @@ -1327,7 +1307,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1345,17 +1324,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(xidfileset, path, O_RDONLY, false); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2509,6 +2478,16 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } /* + * Cleanup fileset if created. + */ +static void +worker_cleanup(int code, Datum arg) +{ + if (xidfileset != NULL) + FileSetDeleteAll(xidfileset); +} + +/* * Apply main loop. */ static void @@ -2534,6 +2513,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "LogicalStreamingContext", ALLOCSET_DEFAULT_SIZES); + /* + * Register before-shmem-exit hook to ensure fileset is dropped while we + * can still report stats for underlying temporary files. + */ + before_shmem_exit(worker_cleanup, (Datum) 0); + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -2957,18 +2942,11 @@ subxact_info_write(Oid subid, TransactionId xid) { char path[MAXPGPATH]; Size len; - StreamXidHash *ent; BufFile *fd; Assert(TransactionIdIsValid(xid)); - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - /* By this time we must have created the transaction entry */ - Assert(ent); + subxact_filename(path, subid, xid); /* * If there is no subtransaction then nothing to do, but if already have @@ -2976,39 +2954,15 @@ subxact_info_write(Oid subid, TransactionId xid) */ if (subxact_data.nsubxacts == 0) { - if (ent->subxact_fileset) - { - cleanup_subxact_info(); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + cleanup_subxact_info(); + BufFileDeleteFileSet(xidfileset, path, true); return; } - subxact_filename(path, subid, xid); - - /* - * Create the subxact file if it not already created, otherwise open the - * existing file. - */ - if (ent->subxact_fileset == NULL) - { - MemoryContext oldctx; - - /* - * We need to maintain shared fileset across multiple stream - * start/stop calls. So, need to allocate it in a persistent context. - */ - oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(FileSet)); - FileSetInit(ent->subxact_fileset); - MemoryContextSwitchTo(oldctx); - - fd = BufFileCreateFileSet(ent->subxact_fileset, path); - } - else - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); + /* Try to open the subxact file, if it doesn't exist then create it */ + fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, true); + if (fd == NULL) + fd = BufFileCreateFileSet(xidfileset, path); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3035,34 +2989,17 @@ subxact_info_read(Oid subid, TransactionId xid) char path[MAXPGPATH]; Size len; BufFile *fd; - StreamXidHash *ent; MemoryContext oldctx; Assert(!subxact_data.subxacts); Assert(subxact_data.nsubxacts == 0); Assert(subxact_data.nsubxacts_max == 0); - /* Find the stream xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - /* - * If subxact_fileset is not valid that mean we don't have any subxact - * info - */ - if (ent->subxact_fileset == NULL) - return; - subxact_filename(path, subid, xid); - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(xidfileset, path, O_RDONLY, true); + if (fd == NULL) + return; /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3204,36 +3141,13 @@ static void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - StreamXidHash *ent; - - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); /* Delete the change file and release the stream fileset memory */ changes_filename(path, subid, xid); - FileSetDeleteAll(ent->stream_fileset); - pfree(ent->stream_fileset); - ent->stream_fileset = NULL; + BufFileDeleteFileSet(xidfileset, path, false); - /* Delete the subxact file and release the memory, if it exist */ - if (ent->subxact_fileset) - { - subxact_filename(path, subid, xid); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } - - /* Remove the xid entry from the stream xid hash */ - hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); + subxact_filename(path, subid, xid); + BufFileDeleteFileSet(xidfileset, path, true); } /* @@ -3243,8 +3157,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the shared fileset and create the - * buffile, otherwise open the previously created file. + * changes for this transaction, create the buffile, otherwise open the + * previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3253,20 +3167,13 @@ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) { char path[MAXPGPATH]; - bool found; MemoryContext oldcxt; - StreamXidHash *ent; Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); - /* create or find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_ENTER, - &found); changes_filename(path, subid, xid); elog(DEBUG1, "opening file \"%s\" for streamed changes", path); @@ -3283,44 +3190,14 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) * writing, in append mode. */ if (first_segment) - { - MemoryContext savectx; - FileSet *fileset; - - if (found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - - /* - * We need to maintain shared fileset across multiple stream - * start/stop calls. So, need to allocate it in a persistent context. - */ - savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(FileSet)); - - FileSetInit(fileset); - MemoryContextSwitchTo(savectx); - - stream_fd = BufFileCreateFileSet(fileset, path); - - /* Remember the fileset for the next stream of the same transaction */ - ent->xid = xid; - ent->stream_fileset = fileset; - ent->subxact_fileset = NULL; - } + stream_fd = BufFileCreateFileSet(xidfileset, path); else { - if (!found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - /* * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, false); BufFileSeek(stream_fd, 0, 0, SEEK_END); } diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 8e9307d..9b95f7f 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -278,10 +278,12 @@ BufFileCreateFileSet(FileSet *fileset, const char *name) * with BufFileCreateFileSet in the same FileSet using the same name. * The backend that created the file must have called BufFileClose() or * BufFileExportFileSet() to make sure that it is ready to be opened by other - * backends and render it read-only. + * backends and render it read-only. If missing_ok is true then it will return + * NULL if file doesn't exist otherwise error. */ BufFile * -BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) +BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, + bool missing_ok) { BufFile *file; char segment_name[MAXPGPATH]; @@ -318,10 +320,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * name. */ if (nfiles == 0) + { + /* free the memory */ + pfree(files); + + if (missing_ok) + return NULL; + ereport(ERROR, (errcode_for_file_access(), errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", segment_name, name))); + } file = makeBufFileCommon(nfiles); file->files = files; @@ -341,10 +351,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * the FileSet to be cleaned up. * * Only one backend should attempt to delete a given name, and should know - * that it exists and has been exported or closed. + * that it exists and has been exported or closed otherwise missing_ok should + * be passed true. */ void -BufFileDeleteFileSet(FileSet *fileset, const char *name) +BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok) { char segment_name[MAXPGPATH]; int segment = 0; @@ -358,7 +369,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) for (;;) { SegmentName(segment_name, name, segment); - if (!FileSetDelete(fileset, segment_name, true)) + if (!FileSetDelete(fileset, segment_name, !missing_ok)) break; found = true; ++segment; @@ -366,7 +377,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) CHECK_FOR_INTERRUPTS(); } - if (!found) + if (!found && !missing_ok) elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name); } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index f7994d7..debf12e1 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY); + file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false); filesize = BufFileSize(file); /* diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 72acd54..8c5135c 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY); + BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, + false); } /* Seek and load the chunk header. */ diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 032a823..5e9df44 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); extern void BufFileExportFileSet(BufFile *file); extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, - int mode); -extern void BufFileDeleteFileSet(FileSet *fileset, const char *name); + int mode, bool missing_ok); +extern void BufFileDeleteFileSet(FileSet *fileset, const char *name, + bool missing_ok); extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */ -- 1.8.3.1