RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57169DAA9A2A6E68EE5E05F094E19@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
(Amit Kapila <amit.kapila16@gmail.com>)
Re: Perform streaming logical transactions by background workers and parallel apply (Masahiko Sawada <sawada.mshk@gmail.com>) |
List | pgsql-hackers |
On Wednesday, December 14, 2022 2:49 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Wed, Dec 14, 2022 at 9:50 AM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada > <sawada.mshk@gmail.com> wrote: > > > > > > Here are comments on v59 0001, 0002 patches: > > > > Thanks for the comments! > > > > > +void > > > +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) { > > > + while (1) > > > + { > > > + SpinLockAcquire(&wshared->mutex); > > > + > > > + /* > > > + * Don't try to increment the count if the parallel > > > apply worker is > > > + * taking the stream lock. Otherwise, there would > > > + be > > > a race condition > > > + * that the parallel apply worker checks there is > > > + no > > > pending streaming > > > + * block and before it actually starts waiting on a > > > lock, the leader > > > + * sends another streaming block and take the > > > + stream > > > lock again. In > > > + * this case, the parallel apply worker will start > > > waiting for the next > > > + * streaming block whereas there is actually a > > > pending streaming block > > > + * available. > > > + */ > > > + if (!wshared->pa_wait_for_stream) > > > + { > > > + wshared->pending_stream_count++; > > > + SpinLockRelease(&wshared->mutex); > > > + break; > > > + } > > > + > > > + SpinLockRelease(&wshared->mutex); > > > + } > > > +} > > > > > > I think we should add an assertion to check if we don't hold the stream lock. > > > > > > I think that waiting for pa_wait_for_stream to be false in a busy > > > loop is not a good idea. It's not interruptible and there is not > > > guarantee that we can break from this loop in a short time. For > > > instance, if PA executes > > > pa_decr_and_wait_stream_block() a bit earlier than LA executes > > > pa_increment_stream_block(), LA has to wait for PA to acquire and > > > release the stream lock in a busy loop. It should not be long in > > > normal cases but the duration LA needs to wait for PA depends on PA, > > > which could be long. Also what if PA raises an error in > > > pa_lock_stream() due to some reasons? I think LA won't be able to > > > detect the failure. > > > > > > I think we should at least make it interruptible and maybe need to > > > add some sleep. Or perhaps we can use the condition variable for this case. > > > > Or we can leave this while (true) logic altogether for the first version and have a > comment to explain this race. Anyway, after restarting, it will probably be > solved. We can always change this part of the code later if this really turns out > to be problematic. Agreed, and reverted this part. > > > Thanks for the analysis, I will research this part. > > > > > --- > > > In worker.c, we have the following common pattern: > > > > > > case TRANS_LEADER_PARTIAL_SERIALIZE: > > > write change to the file; > > > do some work; > > > break; > > > > > > case TRANS_LEADER_SEND_TO_PARALLEL: > > > pa_send_data(); > > > > > > if (winfo->serialize_changes) > > > { > > > do some worker required after writing changes to the file. > > > } > > > : > > > break; > > > > > > IIUC there are two different paths for partial serialization: (a) > > > where apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where > > > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and > > > winfo->serialize_changes became true. And we need to match what we > > > winfo->do > > > in (a) and (b). Rather than having two different paths for the same > > > case, how about falling through TRANS_LEADER_PARTIAL_SERIALIZE when > > > we could not send the changes? That is, pa_send_data() just returns > > > false when the timeout exceeds and we need to switch to serialize > > > changes, otherwise returns true. If it returns false, we prepare for > > > switching to serialize changes such as initializing fileset, and > > > fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code would be > like: > > > > > > case TRANS_LEADER_SEND_TO_PARALLEL: > > > ret = pa_send_data(); > > > > > > if (ret) > > > { > > > do work for sending changes to PA. > > > break; > > > } > > > > > > /* prepare for switching to serialize changes */ > > > winfo->serialize_changes = true; > > > initialize fileset; > > > acquire stream lock if necessary; > > > > > > /* FALLTHROUGH */ > > > case TRANS_LEADER_PARTIAL_SERIALIZE: > > > do work for serializing changes; > > > break; > > > > I think that the suggestion is to extract the code that switch to > > serialize mode out of the pa_send_data(), and then we need to add that > > logic in all the functions which call pa_send_data(), I am not sure if > > it looks better as it might introduce some more codes in each handling > function. > > > > How about extracting the common code from apply_handle_stream_commit > and apply_handle_stream_prepare to a separate function say > pa_xact_finish_common()? I see there is a lot of common code (unlock the > stream, wait for the finish, store flush location, free worker > info) in both the functions for TRANS_LEADER_PARTIAL_SERIALIZE and > TRANS_LEADER_SEND_TO_PARALLEL cases. Agreed, changed. I also addressed Sawada-san comment by extracting the code that switch to serialize out of pa_send_data(). > > > > > --- > > > void > > > pa_lock_stream(TransactionId xid, LOCKMODE lockmode) { > > > LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, > > > PARALLEL_APPLY_LOCK_STREAM, > > > lockmode); } > > > > > > I think since we don't need to let the caller to specify the lock > > > mode but need only shared and exclusive modes, we can make it simple > > > by having a boolean argument say shared instead of lockmode. > > > > I personally think passing the lockmode would make the code more clear > > than passing a Boolean value. > > > > +1. > > I have made a few changes in the newly added comments and function name in > the attached patch. Kindly include this if you find the changes okay. Thanks, I have checked and merged it. Attach the new version patch set which addressed all comments so far. Best regards, Hou zj
Attachment
- v61-0007-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v61-0001-Perform-streaming-logical-transactions-by-parall.patch
- v61-0002-Serialize-partial-changes-to-a-file-when-the-att.patch
- v61-0003-Test-streaming-parallel-option-in-tap-test.patch
- v61-0004-Allow-streaming-every-change-without-waiting-til.patch
- v61-0005-Add-GUC-stream_serialize_threshold-and-test-seri.patch
- v61-0006-Retry-to-apply-streaming-xact-only-in-apply-work.patch
pgsql-hackers by date: