Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | Amit Kapila |
---|---|
Subject | Re: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | CAA4eK1+UK0eN9hqU1JqY5WR5-YNbh6_2t8Zvd3bXpViQSE2+Rw@mail.gmail.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
On Thu, Nov 3, 2022 at 6:36 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote: > > Thanks for the analysis and summary ! > > I tried to implement the above idea and here is the patch set. > Few comments on v42-0001 =========================== 1. + /* + * Set the xact_state flag in the leader instead of the + * parallel apply worker to avoid the race condition where the leader has + * already started waiting for the parallel apply worker to finish + * processing the transaction while the child process has not yet + * processed the first STREAM_START and has not set the + * xact_state to true. + */ + SpinLockAcquire(&winfo->shared->mutex); + winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; The comments and code for xact_state doesn't seem to match. 2. + * progress. This could happend as we don't wait for transaction rollback + * to finish. + */ /happend/happen 3. +/* Helper function to release a lock with lockid */ +void +parallel_apply_lock(uint16 lockid) ... ... +/* Helper function to take a lock with lockid */ +void +parallel_apply_unlock(uint16 lockid) Here, the comments seems to be reversed. 4. +parallel_apply_lock(uint16 lockid) +{ + MemoryContext oldcontext; + + if (list_member_int(ParallelApplyLockids, lockid)) + return; + + LockSharedObjectForSession(SubscriptionRelationId, MySubscription->oid, + lockid, am_leader_apply_worker() ? + AccessExclusiveLock: + AccessShareLock); This appears odd to me because this forecloses the option the parallel apply worker can ever acquire this lock in exclusive mode. I think it would be better to have lock_mode as one of the parameters in this API. 5. + * Inintialize fileset if not yet and open the file. + */ +void +serialize_stream_start(TransactionId xid, bool first_segment) Typo. /Inintialize/Initialize 6. parallel_apply_setup_dsm() { ... + shared->xact_state = false; xact_state should be set with one of the values of ParallelTransState. 7. /* + * Don't use SharedFileSet here because the fileset is shared by the leader + * worker and the fileset in leader need to survive after releasing the + * shared memory This comment seems a bit unclear to me. Should there be and between leader worker? If so, then the following 'and' won't make sense. 8. +apply_handle_stream_stop(StringInfo s) { ... + case TRANS_PARALLEL_APPLY: + + /* + * If there is no message left, wait for the leader to release the + * lock and send more messages. + */ + if (pg_atomic_sub_fetch_u32(&(MyParallelShared->left_message), 1) == 0) + parallel_apply_lock(MyParallelShared->stream_lock_id); As per Sawada-San's email [1], this lock should be released immediately after we acquire it. If we do so, then we don't need to unlock separately in apply_handle_stream_start() in the below code and at similar places in stream_prepare, stream_commit, and stream_abort. Is there a reason for doing it differently? apply_handle_stream_start(StringInfo s) { ... + case TRANS_PARALLEL_APPLY: ... + /* + * Unlock the shared object lock so that the leader apply worker + * can continue to send changes. + */ + parallel_apply_unlock(MyParallelShared->stream_lock_id); 9. +parallel_apply_spooled_messages(void) { ... + if (fileset_valid) + { + in_streamed_transaction = false; + + parallel_apply_lock(MyParallelShared->transaction_lock_id); Is there a reason to acquire this lock here if the parallel apply worker will acquire it at stream_start? 10. + winfo->shared->stream_lock_id = parallel_apply_get_unique_id(); + winfo->shared->transaction_lock_id = parallel_apply_get_unique_id(); Why can't we use xid (remote_xid) for one of these and local_xid (one generated by parallel apply) for the other? I was a bit worried about the local_xid because it will be generated only after applying the first message but the patch already seems to be waiting for it in parallel_apply_wait_for_xact_finish as seen in the below code. +void +parallel_apply_wait_for_xact_finish(ParallelApplyWorkerShared *wshared) +{ + /* + * Wait until the parallel apply worker handles the first message and + * set the flag to true. + */ + parallel_apply_wait_for_in_xact(wshared, PARALLEL_TRANS_STARTED); + + /* Wait for the transaction lock to be released. */ + parallel_apply_lock(wshared->transaction_lock_id); [1] - https://www.postgresql.org/message-id/CAD21AoCWovvhGBD2uKcQqbk6px6apswuBrs6dR9%2BWhP1j2LdsQ%40mail.gmail.com -- With Regards, Amit Kapila.
pgsql-hackers by date: