RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB5716DB4C67394D1676429D3594769@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Thursday, August 25, 2022 7:33 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Wed, Aug 24, 2022 at 7:17 PM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > On Friday, August 19, 2022 4:49 PM Amit Kapila <amit.kapila16@gmail.com> > > > > > > > > 8. It is not clear to me how APPLY_BGWORKER_EXIT status is used. Is it > required > > > for the cases where bgworker exists due to some error and then apply > worker > > > uses it to detect that and exits? How other bgworkers would notice this, is > it > > > done via apply_bgworker_check_status()? > > > > It was used to detect the unexpected exit of bgworker and I have changed > the design > > of this which is now similar to what we have in parallel query. > > > > Thanks, this looks better. > > > Attach the new version patch set(v24) which address above comments. > > Besides, I added some logic which try to stop the bgworker at transaction > end > > if there are enough workers in the pool. > > > > I think this deserves an explanation in worker.c under the title: > "Separate background workers" in the patch. > > Review comments for v24-0001 Thanks for the comments. > ========================= > 1. > + * cost of searhing the hash table > > /searhing/searching Fixed. > 2. > +/* > + * Apply background worker states. > + */ > +typedef enum ApplyBgworkerState > +{ > + APPLY_BGWORKER_BUSY, /* assigned to a transaction */ > + APPLY_BGWORKER_FINISHED /* transaction is completed */ > +} ApplyBgworkerState; > > Now, that there are just two states, can we think to represent them > via a flag ('available'/'in_use') or do you see a downside with that > as compared to the current approach? Changed to in_use. > 3. > -replorigin_session_setup(RepOriginId node) > +replorigin_session_setup(RepOriginId node, int apply_leader_pid) > > I have mentioned previously that we don't need anything specific to > apply worker/leader in this API, so why this change? The other idea > that occurred to me is that can we use replorigin_session_reset() > before sending the commit message to bgworker and then do the session > setup in bgworker only to handle the commit/abort/prepare message. We > also need to set it again for the leader apply worker after the leader > worker completes the wait for bgworker to finish the commit handling. I have reverted the changes related to replorigin_session_setup and used the suggested approach. I also did some simple performance tests for this approach and didn't see some obvious overhead as the replorigin_session_setup is invoked per streaming transaction. > 4. Unlike parallel query, here we seem to be creating separate DSM for > each worker, and probably the difference is due to the fact that here > we don't know upfront how many workers will actually be required. If > so, can we write some comments for the same in worker.c where you have > explained about parallel bgwroker stuff? Added. > 5. > /* > - * Handle streamed transactions. > + * Handle streamed transactions for both the main apply worker and the apply > + * background workers. > > Shall we use leader apply worker in the above comment? Also, check > other places in the patch for similar changes. Changed. > 6. > + else > + { > > - /* open the spool file for this transaction */ > - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); > + /* notify handle methods we're processing a remote transaction */ > + in_streamed_transaction = true; > > There is a spurious line after else {. Also, the comment could be > slightly improved: "/* notify handle methods we're processing a remote > in-progress transaction */" Changed. > 7. The checks in various apply_handle_stream_* functions have improved > as compared to the previous version but I think we can still improve > those. One idea could be to use a separate function to decide the > action we want to take and then based on it, the caller can take > appropriate action. Using a similar idea, we can improve the checks in > handle_streamed_transaction() as well. Improved as suggested. > 8. > + else if ((winfo = apply_bgworker_find(xid))) > + { > + /* Send STREAM ABORT message to the apply background worker. */ > + apply_bgworker_send_data(winfo, s->len, s->data); > + > + /* > + * After sending the data to the apply background worker, wait for > + * that worker to finish. This is necessary to maintain commit > + * order which avoids failures due to transaction dependencies and > + * deadlocks. > + */ > + if (subxid == xid) > + { > + apply_bgworker_wait_for(winfo, APPLY_BGWORKER_FINISHED); > + apply_bgworker_free(winfo); > + } > + } > + else > + /* > + * We are in main apply worker and the transaction has been > + * serialized to file. > + */ > + serialize_stream_abort(xid, subxid); > > In the last else block, you can use {} to make it consistent with > other if, else checks. > > 9. > +void > +ApplyBgworkerMain(Datum main_arg) > +{ > + volatile ApplyBgworkerShared *shared; > + > + dsm_handle handle; > > Is there a need to keep this empty line between the above two declarations? Removed. > 10. > + /* > + * Attach to the message queue. > + */ > + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false); > > Here, we should say error queue in the comments. Fixed. > 11. > + /* > + * Attach to the message queue. > + */ > + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false); > + shm_mq_set_sender(mq, MyProc); > + error_mqh = shm_mq_attach(mq, seg, NULL); > + pq_redirect_to_shm_mq(seg, error_mqh); > + > + /* > + * Now, we have initialized DSM. Attach to slot. > + */ > + logicalrep_worker_attach(worker_slot); > + MyParallelShared->logicalrep_worker_generation = > MyLogicalRepWorker->generation; > + MyParallelShared->logicalrep_worker_slot_no = worker_slot; > + > + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid, > + InvalidBackendId); > > Is there a reason to set parallel_leader immediately after > pq_redirect_to_shm_mq() as we are doing parallel.c? Moved the code. > 12. > if (pq_mq_parallel_leader_pid != 0) > + { > SendProcSignal(pq_mq_parallel_leader_pid, > PROCSIG_PARALLEL_MESSAGE, > pq_mq_parallel_leader_backend_id); > > + /* > + * XXX maybe we can reuse the PROCSIG_PARALLEL_MESSAGE instead of > + * introducing a new signal reason. > + */ > + SendProcSignal(pq_mq_parallel_leader_pid, > + PROCSIG_APPLY_BGWORKER_MESSAGE, > + pq_mq_parallel_leader_backend_id); > + } > > I think we don't need to send both signals. Here, we can check if this > is a parallel worker (IsParallelWorker), then send > PROCSIG_PARALLEL_MESSAGE, otherwise, send > PROCSIG_APPLY_BGWORKER_MESSAGE message. In the else part, we can have > an assert to ensure it is an apply bgworker. Changed. Attach the new version patch set which addressed the above comments and comments from Amit[1] and Kuroda-san[2]. As discussed, I also renamed all the "apply background worker" and related stuff to "apply parallel worker". [1] https://www.postgresql.org/message-id/CAA4eK1%2B_oHZHoDooAR7QcYD2CeTUWNSwkqVcLWC2iQijAJC4Cg%40mail.gmail.com [2] https://www.postgresql.org/message-id/TYAPR01MB58666A97D40AB8919D106AD5F5709%40TYAPR01MB5866.jpnprd01.prod.outlook.com Best regards, Hou zj
Attachment
- v25-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v25-0001-Perform-streaming-logical-transactions-by-parall.patch
- v25-0002-Test-streaming-parallel-option-in-tap-test.patch
- v25-0003-Add-some-checks-before-using-apply-parallel-work.patch
- v25-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
pgsql-hackers by date: