Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers
From | Amit Kapila |
---|---|
Subject | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
Date | |
Msg-id | CAA4eK1K_nv2AvqQz0Fi9tBfKdmj3KnCVWU=R3zwhH2wqU8EVyA@mail.gmail.com Whole thread Raw |
In response to | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
List | pgsql-hackers |
On Mon, Jun 15, 2020 at 6:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > I have few more comments on the patch > 0013-Change-buffile-interface-required-for-streaming-.patch: > Review comments on 0014-Worker-tempfile-use-the-shared-buffile-infrastru: 1. The subxact file is only create if there + * are any suxact info under this xid. + */ +typedef struct StreamXidHash Lets slightly reword the part of the comment as "The subxact file is created iff there is any suxact info under this xid." 2. @@ -710,6 +740,9 @@ apply_handle_stream_stop(StringInfo s) subxact_info_write(MyLogicalRepWorker->subid, stream_xid); stream_close_file(); + /* Commit the per-stream transaction */ + CommitTransactionCommand(); Before calling commit, ensure that we are in a valid transaction. I think we can have an Assert for IsTransactionState(). 3. @@ -761,11 +791,13 @@ apply_handle_stream_abort(StringInfo s) int64 i; int64 subidx; - int fd; + BufFile *fd; bool found = false; char path[MAXPGPATH]; + StreamXidHash *ent; subidx = -1; + ensure_transaction(); subxact_info_read(MyLogicalRepWorker->subid, xid); Why to call ensure_transaction here? Is there any reason that we won't have a valid transaction by now? If not, then its better to have an Assert for IsTransactionState(). 4. - if (write(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) + if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) { - int save_errno = errno; + int save_errno = errno; - CloseTransientFile(fd); + BufFileClose(fd); On error, won't these files be close automatically? If so, why at this place and before other errors, we need to close this? 5. if ((len > 0) && ((BufFileRead(fd, subxacts, len)) != len)) { int save_errno = errno; BufFileClose(fd); errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", Can we change the error message to "could not read from streaming transactions file .." or something like that and similarly we can change the message for failure in reading changes file? 6. if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) { int save_errno = errno; BufFileClose(fd); errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", Similar to previous, can we change it to "could not write to streaming transactions file 7. @@ -2855,17 +2844,32 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) * for writing, in append mode. */ if (first_segment) - flags = (O_WRONLY | O_CREAT | O_EXCL | PG_BINARY); - else - flags = (O_WRONLY | O_APPEND | PG_BINARY); + { + /* + * Shared fileset handle must be allocated in the persistent context. + */ + SharedFileSet *fileset = + MemoryContextAlloc(ApplyContext, sizeof(SharedFileSet)); - stream_fd = OpenTransientFile(path, flags); + PrepareTempTablespaces(); + SharedFileSetInit(fileset, NULL); Why are we calling PrepareTempTablespaces here? It is already called in SharedFileSetInit. 8. + /* + * Start a transaction on stream start, this transaction will be committed + * on the stream stop. We need the transaction for handling the buffile, + * used for serializing the streaming data and subxact info. + */ + ensure_transaction(); I think we need this for PrepareTempTablespaces to set the temptablespaces. Also, isn't it required for a cleanup of buffile resources at the transaction end? Are there any other reasons for it as well? The comment should be a bit more clear for why we need a transaction here. 9. * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed * changes for this transaction, perform cleanup by removing existing * files after a possible previous crash. .. stream_open_file(Oid subid, TransactionId xid, bool first_segment) The above part comment atop stream_open_file needs to be changed after new implementation. 10. * enabled. This context is reeset on each stream stop. */ LogicalStreamingContext = AllocSetContextCreate(ApplyContext, /reeset/reset 11. stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok) { .. + /* No entry created for this xid so simply return. */ + if (ent == NULL) + return; .. } Is there any reason or scenario where this ent can be NULL? If not, it will be better to have an Assert for the same. 12. subxact_info_write(Oid subid, TransactionId xid) { .. + /* + * If there is no subtransaction then nothing to do, but if already have + * subxact file then delete that. + */ + if (nsubxacts == 0) { - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not create file \"%s\": %m", - path))); + if (ent->subxact_fileset) + { + cleanup_subxact_info(); + BufFileDeleteShared(ent->subxact_fileset, path); + ent->subxact_fileset = NULL; .. } Here don't we need to free the subxact_fileset before setting it to NULL? 13. + /* + * Scan complete hash and delete the underlying files for the the xids. + * Also delete the memory for the shared file sets. + */ /the the/the. Instead of "delete the memory", it would be better to say "release the memory". 14. + /* + * We might not have created the suxact fileset if there is no sub + * transaction. + */ /suxact/subxact -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: