RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB5716921D5C84F80FD0C59A0794199@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("Hayato Kuroda (Fujitsu)" <kuroda.hayato@fujitsu.com>) |
List | pgsql-hackers |
On Friday, December 2, 2022 7:27 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wroteL > > Dear Hou, > > Thanks for making the patch. Followings are my comments for v54-0003 and > 0004. Thanks for the comments! > > 0003 > > pa_free_worker() > > + /* Unlink any files that were needed to serialize partial changes. */ > + if (winfo->serialize_changes) > + stream_cleanup_files(MyLogicalRepWorker->subid, > winfo->shared->xid); > + > > I think this part is not needed, because the LA cannot reach here if > winfo->serialize_changes is true. Moreover stream_cleanup_files() is done in > pa_free_worker_info(). Removed. > LogicalParallelApplyLoop() > > The parallel apply worker wakes up every 0.1s even if we are in the > PARTIAL_SERIALIZE mode. Do you have idea to reduce that? The parallel apply worker usually will wait on the stream lock after entering PARTIAL_SERIALIZE mode. > ``` > + pa_spooled_messages(); > ``` > > Comments are needed here, like "Changes may be serialize...". Added. > pa_stream_abort() > > ``` > + /* > + * Reopen the file and set the file position to > the saved > + * position. > + */ > + if (reopen_stream_fd) > + { > + char path[MAXPGPATH]; > + > + changes_filename(path, > MyLogicalRepWorker->subid, xid); > + stream_fd = > BufFileOpenFileSet(&MyParallelShared->fileset, > + > path, O_RDONLY, false); > + BufFileSeek(stream_fd, fileno, offset, > SEEK_SET); > + } > ``` > > MyParallelShared->serialize_changes may be used instead of reopen_stream_fd. These codes have been removed. > > ``` > + /* > + * It is possible that while sending this change to > parallel apply > + * worker we need to switch to serialize mode. > + */ > + if (winfo->serialize_changes) > + pa_set_fileset_state(winfo->shared, > FS_READY); > ``` > > There are three same parts in the code, can we combine them to common part? These codes have been slightly refactored. > apply_spooled_messages() > > ``` > + /* > + * Break the loop if the parallel apply worker has finished > applying > + * the transaction. The parallel apply worker should have closed > the > + * file before committing. > + */ > + if (am_parallel_apply_worker() && > + MyParallelShared->xact_state == > PARALLEL_TRANS_FINISHED) > + goto done; > ``` > > I thnk pfree(buffer) and pfree(s2.data) should not be skippied. > And this part should be at below "nchanges++;" buffer, s2.data were allocated in the toplevel transaction's context and it will be automatically freed soon when handling STREAM COMMIT. > > 0004 > > set_subscription_retry() > > ``` > + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, > + AccessShareLock); > + > ``` > > I think AccessExclusiveLock should be aquired instead of AccessShareLock. > In AlterSubscription(), LockSharedObject(AccessExclusiveLock) seems to be > used. Changed. Best regards, Hou zj
pgsql-hackers by date: