Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | Amit Kapila |
---|---|
Subject | Re: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | CAA4eK1JERguGiYoid9CMEOamZoD4jF_R8+mkNVS=mV-X=CXe=A@mail.gmail.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Wed, Aug 24, 2022 at 7:17 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote: > > On Friday, August 19, 2022 4:49 PM Amit Kapila <amit.kapila16@gmail.com> > > > > > 8. It is not clear to me how APPLY_BGWORKER_EXIT status is used. Is it required > > for the cases where bgworker exists due to some error and then apply worker > > uses it to detect that and exits? How other bgworkers would notice this, is it > > done via apply_bgworker_check_status()? > > It was used to detect the unexpected exit of bgworker and I have changed the design > of this which is now similar to what we have in parallel query. > Thanks, this looks better. > Attach the new version patch set(v24) which address above comments. > Besides, I added some logic which try to stop the bgworker at transaction end > if there are enough workers in the pool. > I think this deserves an explanation in worker.c under the title: "Separate background workers" in the patch. Review comments for v24-0001 ========================= 1. + * cost of searhing the hash table /searhing/searching 2. +/* + * Apply background worker states. + */ +typedef enum ApplyBgworkerState +{ + APPLY_BGWORKER_BUSY, /* assigned to a transaction */ + APPLY_BGWORKER_FINISHED /* transaction is completed */ +} ApplyBgworkerState; Now, that there are just two states, can we think to represent them via a flag ('available'/'in_use') or do you see a downside with that as compared to the current approach? 3. -replorigin_session_setup(RepOriginId node) +replorigin_session_setup(RepOriginId node, int apply_leader_pid) I have mentioned previously that we don't need anything specific to apply worker/leader in this API, so why this change? The other idea that occurred to me is that can we use replorigin_session_reset() before sending the commit message to bgworker and then do the session setup in bgworker only to handle the commit/abort/prepare message. We also need to set it again for the leader apply worker after the leader worker completes the wait for bgworker to finish the commit handling. 4. Unlike parallel query, here we seem to be creating separate DSM for each worker, and probably the difference is due to the fact that here we don't know upfront how many workers will actually be required. If so, can we write some comments for the same in worker.c where you have explained about parallel bgwroker stuff? 5. /* - * Handle streamed transactions. + * Handle streamed transactions for both the main apply worker and the apply + * background workers. Shall we use leader apply worker in the above comment? Also, check other places in the patch for similar changes. 6. + else + { - /* open the spool file for this transaction */ - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); + /* notify handle methods we're processing a remote transaction */ + in_streamed_transaction = true; There is a spurious line after else {. Also, the comment could be slightly improved: "/* notify handle methods we're processing a remote in-progress transaction */" 7. The checks in various apply_handle_stream_* functions have improved as compared to the previous version but I think we can still improve those. One idea could be to use a separate function to decide the action we want to take and then based on it, the caller can take appropriate action. Using a similar idea, we can improve the checks in handle_streamed_transaction() as well. 8. + else if ((winfo = apply_bgworker_find(xid))) + { + /* Send STREAM ABORT message to the apply background worker. */ + apply_bgworker_send_data(winfo, s->len, s->data); + + /* + * After sending the data to the apply background worker, wait for + * that worker to finish. This is necessary to maintain commit + * order which avoids failures due to transaction dependencies and + * deadlocks. + */ + if (subxid == xid) + { + apply_bgworker_wait_for(winfo, APPLY_BGWORKER_FINISHED); + apply_bgworker_free(winfo); + } + } + else + /* + * We are in main apply worker and the transaction has been + * serialized to file. + */ + serialize_stream_abort(xid, subxid); In the last else block, you can use {} to make it consistent with other if, else checks. 9. +void +ApplyBgworkerMain(Datum main_arg) +{ + volatile ApplyBgworkerShared *shared; + + dsm_handle handle; Is there a need to keep this empty line between the above two declarations? 10. + /* + * Attach to the message queue. + */ + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false); Here, we should say error queue in the comments. 11. + /* + * Attach to the message queue. + */ + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false); + shm_mq_set_sender(mq, MyProc); + error_mqh = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(seg, error_mqh); + + /* + * Now, we have initialized DSM. Attach to slot. + */ + logicalrep_worker_attach(worker_slot); + MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; + MyParallelShared->logicalrep_worker_slot_no = worker_slot; + + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid, + InvalidBackendId); Is there a reason to set parallel_leader immediately after pq_redirect_to_shm_mq() as we are doing parallel.c? 12. if (pq_mq_parallel_leader_pid != 0) + { SendProcSignal(pq_mq_parallel_leader_pid, PROCSIG_PARALLEL_MESSAGE, pq_mq_parallel_leader_backend_id); + /* + * XXX maybe we can reuse the PROCSIG_PARALLEL_MESSAGE instead of + * introducing a new signal reason. + */ + SendProcSignal(pq_mq_parallel_leader_pid, + PROCSIG_APPLY_BGWORKER_MESSAGE, + pq_mq_parallel_leader_backend_id); + } I think we don't need to send both signals. Here, we can check if this is a parallel worker (IsParallelWorker), then send PROCSIG_PARALLEL_MESSAGE, otherwise, send PROCSIG_APPLY_BGWORKER_MESSAGE message. In the else part, we can have an assert to ensure it is an apply bgworker. -- With Regards, Amit Kapila.
pgsql-hackers by date: