From e7494c21da24d856e113a19dcba8b3c9893055fb Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 24 Aug 2021 14:44:59 +0530 Subject: [PATCH v3 2/2] Better usage of fileset in apply worker Instead of using a separate fileset for each xid, use just one fileset for whole lifetime of the worker. So for each xid, create buffilea under the same fileset and remove the files whenever we are done with those files. For subxact file we only need to create once we get the first subtransaction and for detecting that we have also extended the buffile open and buffile delete interfaces to allow the missing files. --- src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/worker.c | 234 +++++------------------------ src/backend/storage/file/buffile.c | 23 ++- src/backend/utils/sort/logtape.c | 2 +- src/backend/utils/sort/sharedtuplestore.c | 3 +- src/include/storage/buffile.h | 5 +- 6 files changed, 64 insertions(+), 205 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8b1772d..644a9c2 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -648,7 +648,7 @@ logicalrep_worker_onexit(int code, Datum arg) logicalrep_worker_detach(); - /* Cleanup filesets used for streaming transactions. */ + /* Cleanup fileset used for streaming transactions. */ logicalrep_worker_cleanupfileset(); ApplyLauncherWakeup(); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 07a2c90..77aaba1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -221,20 +221,6 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; -/* - * Stream xid hash entry. Whenever we see a new xid we create this entry in the - * xidhash and along with it create the streaming file and store the fileset handle. - * The subxact file is created iff there is any subxact info under this xid. This - * entry is used on the subsequent streams for the xid to get the corresponding - * fileset handles, so storing them in hash makes the search faster. - */ -typedef struct StreamXidHash -{ - TransactionId xid; /* xid is the hash key and must be first */ - FileSet *stream_fileset; /* file set for stream data */ - FileSet *subxact_fileset; /* file set for subxact info */ -} StreamXidHash; - static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; @@ -255,10 +241,11 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; /* - * Hash table for storing the streaming xid information along with filesets - * for streaming and subxact files. + * Fileset for storing the changes and subxact information for the streaming + * transaction. We will use only one fileset and for each xid a separate + * changes and subxact files will be created under the same fileset. */ -static HTAB *xidhash = NULL; +static FileSet *xidfileset = NULL; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -1129,7 +1116,6 @@ static void apply_handle_stream_start(StringInfo s) { bool first_segment; - HASHCTL hash_ctl; if (in_streamed_transaction) ereport(ERROR, @@ -1157,17 +1143,21 @@ apply_handle_stream_start(StringInfo s) errmsg_internal("invalid transaction ID in streamed replication transaction"))); /* - * Initialize the xidhash table if we haven't yet. This will be used for - * the entire duration of the apply worker so create it in permanent - * context. + * Initialize the xidfileset if we haven't yet. This will be used for the + * entire duration of the apply worker so create it in permanent context. */ - if (xidhash == NULL) + if (xidfileset == NULL) { - hash_ctl.keysize = sizeof(TransactionId); - hash_ctl.entrysize = sizeof(StreamXidHash); - hash_ctl.hcxt = ApplyContext; - xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + MemoryContext oldctx; + + /* + * We need to keep the fileset for the worker lifetime so need to + * allocate it in a persistent context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + xidfileset = palloc(sizeof(FileSet)); + FileSetInit(xidfileset); + MemoryContextSwitchTo(oldctx); } /* open the spool file for this transaction */ @@ -1258,7 +1248,6 @@ apply_handle_stream_abort(StringInfo s) BufFile *fd; bool found = false; char path[MAXPGPATH]; - StreamXidHash *ent; subidx = -1; begin_replication_step(); @@ -1287,19 +1276,9 @@ apply_handle_stream_abort(StringInfo s) return; } - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, false); /* OK, truncate the file at the right offset */ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, @@ -1327,7 +1306,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1345,17 +1323,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(xidfileset, path, O_RDONLY, false); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2509,30 +2477,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } /* - * Cleanup filesets. + * Cleanup fileset. */ void logicalrep_worker_cleanupfileset() { - HASH_SEQ_STATUS status; - StreamXidHash *hentry; - - /* - * Scan the xidhash table if created and from each entry delete stream - * fileset and the subxact fileset. - */ - if (xidhash) - { - hash_seq_init(&status, xidhash); - while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) - { - FileSetDeleteAll(hentry->stream_fileset); - - /* Delete the subxact fileset only if it is created */ - if (hentry->subxact_fileset) - FileSetDeleteAll(hentry->subxact_fileset); - } - } + if (xidfileset != NULL) + FileSetDeleteAll(xidfileset); } /* @@ -2984,18 +2935,11 @@ subxact_info_write(Oid subid, TransactionId xid) { char path[MAXPGPATH]; Size len; - StreamXidHash *ent; BufFile *fd; Assert(TransactionIdIsValid(xid)); - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - /* By this time we must have created the transaction entry */ - Assert(ent); + subxact_filename(path, subid, xid); /* * If there is no subtransaction then nothing to do, but if already have @@ -3003,39 +2947,18 @@ subxact_info_write(Oid subid, TransactionId xid) */ if (subxact_data.nsubxacts == 0) { - if (ent->subxact_fileset) - { - cleanup_subxact_info(); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + cleanup_subxact_info(); + BufFileDeleteFileSet(xidfileset, path, true); + return; } subxact_filename(path, subid, xid); - /* - * Create the subxact file if it not already created, otherwise open the - * existing file. - */ - if (ent->subxact_fileset == NULL) - { - MemoryContext oldctx; - - /* - * We need to maintain fileset across multiple stream start/stop calls. - * So, need to allocate it in a persistent context. - */ - oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(FileSet)); - FileSetInit(ent->subxact_fileset); - MemoryContextSwitchTo(oldctx); - - fd = BufFileCreateFileSet(ent->subxact_fileset, path); - } - else - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); + /* Try to open the subxact file, if it doesn't exist then create it */ + fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, true); + if (fd == NULL) + fd = BufFileCreateFileSet(xidfileset, path); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3062,34 +2985,17 @@ subxact_info_read(Oid subid, TransactionId xid) char path[MAXPGPATH]; Size len; BufFile *fd; - StreamXidHash *ent; MemoryContext oldctx; Assert(!subxact_data.subxacts); Assert(subxact_data.nsubxacts == 0); Assert(subxact_data.nsubxacts_max == 0); - /* Find the stream xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - /* - * If subxact_fileset is not valid that mean we don't have any subxact - * info - */ - if (ent->subxact_fileset == NULL) - return; - subxact_filename(path, subid, xid); - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(xidfileset, path, O_RDONLY, true); + if (fd == NULL) + return; /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3231,36 +3137,13 @@ static void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - StreamXidHash *ent; - - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); /* Delete the change file and release the stream fileset memory */ changes_filename(path, subid, xid); - FileSetDeleteAll(ent->stream_fileset); - pfree(ent->stream_fileset); - ent->stream_fileset = NULL; - - /* Delete the subxact file and release the memory, if it exist */ - if (ent->subxact_fileset) - { - subxact_filename(path, subid, xid); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + BufFileDeleteFileSet(xidfileset, path, false); - /* Remove the xid entry from the stream xid hash */ - hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); + subxact_filename(path, subid, xid); + BufFileDeleteFileSet(xidfileset, path, true); } /* @@ -3270,8 +3153,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the fileset and create the buffile, - * otherwise open the previously created file. + * changes for this transaction, create the buffile, otherwise open the + * previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3280,20 +3163,13 @@ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) { char path[MAXPGPATH]; - bool found; MemoryContext oldcxt; - StreamXidHash *ent; Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); - /* create or find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_ENTER, - &found); changes_filename(path, subid, xid); elog(DEBUG1, "opening file \"%s\" for streamed changes", path); @@ -3310,44 +3186,14 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) * writing, in append mode. */ if (first_segment) - { - MemoryContext savectx; - FileSet *fileset; - - if (found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - - /* - * We need to maintain fileset across multiple stream start/stop calls. - * So, need to allocate it in a persistent context. - */ - savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(FileSet)); - - FileSetInit(fileset); - MemoryContextSwitchTo(savectx); - - stream_fd = BufFileCreateFileSet(fileset, path); - - /* Remember the fileset for the next stream of the same transaction */ - ent->xid = xid; - ent->stream_fileset = fileset; - ent->subxact_fileset = NULL; - } + stream_fd = BufFileCreateFileSet(xidfileset, path); else { - if (!found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - /* * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(xidfileset, path, O_RDWR, false); BufFileSeek(stream_fd, 0, 0, SEEK_END); } diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index df3e099..1cf9733 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -278,10 +278,12 @@ BufFileCreateFileSet(FileSet *fileset, const char *name) * with BufFileCreateFileSet in the same FileSet using the same name. * The backend that created the file must have called BufFileClose() or * BufFileExportFileSet() to make sure that it is ready to be opened by other - * backends and render it read-only. + * backends and render it read-only. If missing_ok is true then it will return + * NULL if file doesn't exist otherwise error. */ BufFile * -BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) +BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, + bool missing_ok) { BufFile *file; char segment_name[MAXPGPATH]; @@ -318,10 +320,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * name. */ if (nfiles == 0) + { + /* free the memory */ + pfree(files); + + if (missing_ok) + return NULL; + ereport(ERROR, (errcode_for_file_access(), errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", segment_name, name))); + } file = makeBufFileCommon(nfiles); file->files = files; @@ -341,10 +351,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * the FileSet to be cleaned up. * * Only one backend should attempt to delete a given name, and should know - * that it exists and has been exported or closed. + * that it exists and has been exported or closed otherwise missing_ok should + * be passed true. */ void -BufFileDeleteFileSet(FileSet *fileset, const char *name) +BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok) { char segment_name[MAXPGPATH]; int segment = 0; @@ -358,7 +369,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) for (;;) { FileSetSegmentName(segment_name, name, segment); - if (!FileSetDelete(fileset, segment_name, true)) + if (!FileSetDelete(fileset, segment_name, !missing_ok)) break; found = true; ++segment; @@ -366,7 +377,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) CHECK_FOR_INTERRUPTS(); } - if (!found) + if (!found && !missing_ok) elog(ERROR, "could not delete unknown BufFile \"%s\"", name); } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index f7994d7..debf12e1 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY); + file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false); filesize = BufFileSize(file); /* diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 72acd54..8c5135c 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY); + BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, + false); } /* Seek and load the chunk header. */ diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 032a823..5e9df44 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); extern void BufFileExportFileSet(BufFile *file); extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, - int mode); -extern void BufFileDeleteFileSet(FileSet *fileset, const char *name); + int mode, bool missing_ok); +extern void BufFileDeleteFileSet(FileSet *fileset, const char *name, + bool missing_ok); extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */ -- 1.8.3.1