Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers
From | Dilip Kumar |
---|---|
Subject | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
Date | |
Msg-id | CAFiTN-u1MCP5gj=d=up3sokzm05G1y=7sYXF=hrjufAsS9dn3g@mail.gmail.com Whole thread Raw |
In response to | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
|
List | pgsql-hackers |
On Tue, Jun 16, 2020 at 2:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Mon, Jun 15, 2020 at 6:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > > > I have few more comments on the patch > > 0013-Change-buffile-interface-required-for-streaming-.patch: > > > > Review comments on 0014-Worker-tempfile-use-the-shared-buffile-infrastru: > 1. > The subxact file is only create if there > + * are any suxact info under this xid. > + */ > +typedef struct StreamXidHash > > Lets slightly reword the part of the comment as "The subxact file is > created iff there is any suxact info under this xid." Done > > 2. > @@ -710,6 +740,9 @@ apply_handle_stream_stop(StringInfo s) > subxact_info_write(MyLogicalRepWorker->subid, stream_xid); > stream_close_file(); > > + /* Commit the per-stream transaction */ > + CommitTransactionCommand(); > > Before calling commit, ensure that we are in a valid transaction. I > think we can have an Assert for IsTransactionState(). Done > 3. > @@ -761,11 +791,13 @@ apply_handle_stream_abort(StringInfo s) > > int64 i; > int64 subidx; > - int fd; > + BufFile *fd; > bool found = false; > char path[MAXPGPATH]; > + StreamXidHash *ent; > > subidx = -1; > + ensure_transaction(); > subxact_info_read(MyLogicalRepWorker->subid, xid); > > Why to call ensure_transaction here? Is there any reason that we > won't have a valid transaction by now? If not, then its better to > have an Assert for IsTransactionState(). We are only starting transaction from stream_start to stream_stop, so at stream_abort we will not have the transaction. > 4. > - if (write(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) > + if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) > { > - int save_errno = errno; > + int save_errno = errno; > > - CloseTransientFile(fd); > + BufFileClose(fd); > > On error, won't these files be close automatically? If so, why at > this place and before other errors, we need to close this? Yes, that's correct. I have fixed those. > 5. > if ((len > 0) && ((BufFileRead(fd, subxacts, len)) != len)) > { > int save_errno = errno; > > BufFileClose(fd); > errno = save_errno; > ereport(ERROR, > (errcode_for_file_access(), > errmsg("could not read file \"%s\": %m", > > Can we change the error message to "could not read from streaming > transactions file .." or something like that and similarly we can > change the message for failure in reading changes file? Done > 6. > if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) > { > int save_errno = errno; > > BufFileClose(fd); > errno = save_errno; > ereport(ERROR, > (errcode_for_file_access(), > errmsg("could not write to file \"%s\": %m", > > Similar to previous, can we change it to "could not write to streaming > transactions file BufFileWrite is not returning failure anymore. > 7. > @@ -2855,17 +2844,32 @@ stream_open_file(Oid subid, TransactionId xid, > bool first_segment) > * for writing, in append mode. > */ > if (first_segment) > - flags = (O_WRONLY | O_CREAT | O_EXCL | PG_BINARY); > - else > - flags = (O_WRONLY | O_APPEND | PG_BINARY); > + { > + /* > + * Shared fileset handle must be allocated in the persistent context. > + */ > + SharedFileSet *fileset = > + MemoryContextAlloc(ApplyContext, sizeof(SharedFileSet)); > > - stream_fd = OpenTransientFile(path, flags); > + PrepareTempTablespaces(); > + SharedFileSetInit(fileset, NULL); > > Why are we calling PrepareTempTablespaces here? It is already called > in SharedFileSetInit. My bad, First I tired using SharedFileSetInit but later it got changed for forgot to remove this part. > 8. > + /* > + * Start a transaction on stream start, this transaction will be committed > + * on the stream stop. We need the transaction for handling the buffile, > + * used for serializing the streaming data and subxact info. > + */ > + ensure_transaction(); > > I think we need this for PrepareTempTablespaces to set the > temptablespaces. Also, isn't it required for a cleanup of buffile > resources at the transaction end? Are there any other reasons for it > as well? The comment should be a bit more clear for why we need a > transaction here. I am not sure that will it make sense to add a comment here that why buffile and sharedfileset need a transaction? Do you think that we should add comment in buffile/shared fileset API that it should be called under a transaction? > 9. > * Open a file for streamed changes from a toplevel transaction identified > * by stream_xid (global variable). If it's the first chunk of streamed > * changes for this transaction, perform cleanup by removing existing > * files after a possible previous crash. > .. > stream_open_file(Oid subid, TransactionId xid, bool first_segment) > > The above part comment atop stream_open_file needs to be changed after > new implementation. Done > 10. > * enabled. This context is reeset on each stream stop. > */ > LogicalStreamingContext = AllocSetContextCreate(ApplyContext, > > /reeset/reset Done > 11. > stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok) > { > .. > + /* No entry created for this xid so simply return. */ > + if (ent == NULL) > + return; > .. > } > > Is there any reason or scenario where this ent can be NULL? If not, > it will be better to have an Assert for the same. Right, it should be an assert, even if all the changes are ignored for the top transaction, we should have sent the stream_start. > 12. > subxact_info_write(Oid subid, TransactionId xid) > { > .. > + /* > + * If there is no subtransaction then nothing to do, but if already have > + * subxact file then delete that. > + */ > + if (nsubxacts == 0) > { > - ereport(ERROR, > - (errcode_for_file_access(), > - errmsg("could not create file \"%s\": %m", > - path))); > + if (ent->subxact_fileset) > + { > + cleanup_subxact_info(); > + BufFileDeleteShared(ent->subxact_fileset, path); > + ent->subxact_fileset = NULL; > .. > } > > Here don't we need to free the subxact_fileset before setting it to NULL? Yes, done > 13. > + /* > + * Scan complete hash and delete the underlying files for the the xids. > + * Also delete the memory for the shared file sets. > + */ > > /the the/the. Instead of "delete the memory", it would be better to > say "release the memory". Done > > 14. > + /* > + * We might not have created the suxact fileset if there is no sub > + * transaction. > + */ > > /suxact/subxact Done -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: