RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57160A440C9821C48ABD4D9C94649@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Tuesday, August 9, 2022 7:00 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Thu, Aug 4, 2022 at 12:10 PM wangw.fnst@fujitsu.com > <wangw.fnst@fujitsu.com> wrote: > > > > On Mon, Jul 25, 2022 at 21:50 PM Amit Kapila <amit.kapila16@gmail.com> > wrote: > > > Few comments on 0001: > > > ====================== > > > > Thanks for your comments. > > > > Review comments on > v20-0001-Perform-streaming-logical-transactions-by-backgr > =================================================== > ============ > 1. > + <para> > + If set to <literal>on</literal>, the incoming changes are written to > + temporary files and then applied only after the transaction is > + committed on the publisher. > > It is not very clear that the transaction is applied when the commit is received > by the subscriber. Can we slightly change it to: "If set to <literal>on</literal>, > the incoming changes are written to temporary files and then applied only after > the transaction is committed on the publisher and received by the subscriber." Changed. > 2. > /* First time through, initialize apply workers hashtable */ > + if (ApplyBgworkersHash == NULL) > + { > + HASHCTL ctl; > + > + MemSet(&ctl, 0, sizeof(ctl)); > + ctl.keysize = sizeof(TransactionId); > + ctl.entrysize = sizeof(ApplyBgworkerEntry); ctl.hcxt = ApplyContext; > + > + ApplyBgworkersHash = hash_create("logical apply workers hash", 8, &ctl, > + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); > > I think it would be better if we start with probably 16 element hash table, 8 > seems to be on the lower side. Changed. > 3. > +/* > + * Try to look up worker assigned before (see function > +apply_bgworker_get_free) > + * inside ApplyBgworkersHash for requested xid. > + */ > +ApplyBgworkerState * > +apply_bgworker_find(TransactionId xid) > > The above comment is not very clear. There doesn't seem to be any function > named apply_bgworker_get_free in the patch. Can we write this comment as: > "Find the previously assigned worker for the given transaction, if any." Changed the comments. > 4. > /* > + * Push apply error context callback. Fields will be filled applying a > + * change. > + */ > > /Fields will be filled applying a change./Fields will be filled while applying a > change. Changed. > 5. > +void > +ApplyBgworkerMain(Datum main_arg) > +{ > ... > ... > + StartTransactionCommand(); > + oldcontext = MemoryContextSwitchTo(ApplyContext); > + > + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); if > + (!MySubscription) { ereport(LOG, (errmsg("logical replication apply > + worker for subscription %u will not " > + "start because the subscription was removed during startup", > + MyLogicalRepWorker->subid))); > + proc_exit(0); > + } > + > + MySubscriptionValid = true; > + MemoryContextSwitchTo(oldcontext); > + > + /* Setup synchronous commit according to the user's wishes */ > + SetConfigOption("synchronous_commit", MySubscription->synccommit, > + PGC_BACKEND, PGC_S_OVERRIDE); > + > + /* Keep us informed about subscription changes. */ > + CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, > + subscription_change_cb, > + (Datum) 0); > + > + CommitTransactionCommand(); > ... > > This part appears of the code appears to be the same as we have in > ApplyWorkerMain() except that the patch doesn't check whether the > subscription is enabled. Is there a reason to not have that check here as well? > Then in ApplyWorkerMain(), we do LOG the type of worker that is also missing > here. Unless there is a specific reason to have a different code here, we should > move this part to a common function and call it both from ApplyWorkerMain() > and ApplyBgworkerMain(). > 6. I think the code in ApplyBgworkerMain() to set session_replication_role, > search_path, and connect to the database also appears to be the same in > ApplyWorkerMain(). If so, that can also be moved to the common function > mentioned in the previous point. > > 7. I think we need to register for subscription rel map invalidation > (invalidate_syncing_table_states) in ApplyBgworkerMain similar to > ApplyWorkerMain. The reason is that we check the table state after processing > a commit or similar change record via a call to process_syncing_tables. Agreed and changed. > 8. In apply_bgworker_setup_dsm(), we should have handling related to > dsm_create failure due to max_segments reached as we have in > InitializeParallelDSM(). We can follow the regular path of streaming > transactions in case we are not able to create DSM instead of parallelizing it. Changed. > 9. > + shm_toc_initialize_estimator(&e); > + shm_toc_estimate_chunk(&e, sizeof(ApplyBgworkerShared)); > + shm_toc_estimate_chunk(&e, (Size) queue_size); > + > + shm_toc_estimate_keys(&e, 1 + 1); > > Here, you can directly write 2 instead of (1 + 1) stuff. It is quite clear that we > need two keys here. Changed. > 10. > apply_bgworker_wait_for() > { > ... > + /* Wait to be signalled. */ > + WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, > + WAIT_EVENT_LOGICAL_APPLY_BGWORKER_STATE_CHANGE); > ... > } > > Typecast with the void, if we don't care for the return value. Changed. > 11. > +static void > +apply_bgworker_shutdown(int code, Datum arg) { > +SpinLockAcquire(&MyParallelShared->mutex); > + MyParallelShared->status = APPLY_BGWORKER_EXIT; > + SpinLockRelease(&MyParallelShared->mutex); > > Is there a reason to not use apply_bgworker_set_status() directly? No, changed to use that function. > 12. > + * Special case is if the first change comes from subtransaction, then > + * we check that current_xid differs from stream_xid. > + */ > +void > +apply_bgworker_subxact_info_add(TransactionId current_xid) { if > +(current_xid != stream_xid && !list_member_int(subxactlist, (int) > +current_xid)) > ... > ... > > I don't understand the above comment. Does that mean we don't need to > define a savepoint if the first change is from a subtransaction? Also, keep an > empty line before the above comment. After checking, I think this comment is not very clear and have removed it and improve other comments. > 13. > +void > +apply_bgworker_subxact_info_add(TransactionId current_xid) { if > +(current_xid != stream_xid && !list_member_int(subxactlist, (int) > +current_xid)) { MemoryContext oldctx; char spname[MAXPGPATH]; > + > + snprintf(spname, MAXPGPATH, "savepoint_for_xid_%u", current_xid); > > To uniquely generate the savepoint name, it is better to append the > subscription id as well? Something like pg_sp_<subid>_<xid>. Changed. > 14. The CommitTransactionCommand() call in > apply_bgworker_subxact_info_add looks a bit odd as that function neither > seems to be starting the transaction command nor has any comments > explaining it. Shall we do it in caller where it is more apparent to do the same? I think the CommitTransactionCommand here is used to cooperate the DefineSavepoint because we need to invoke CommitTransactionCommand to start a new subtransaction. I tried to add some comments to explain the same. > 15. > else > snprintf(bgw.bgw_name, BGW_MAXLEN, > "logical replication worker for subscription %u", subid); > + > snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); > > Spurious new line Removed. > 16. > @@ -1153,7 +1162,14 @@ replorigin_session_setup(RepOriginId node) > > Assert(session_replication_state->roident != InvalidRepOriginId); > > - session_replication_state->acquired_by = MyProcPid; > + if (must_acquire) > + session_replication_state->acquired_by = MyProcPid; else if > + (session_replication_state->acquired_by == 0) ereport(ERROR, > + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), > + errmsg("apply background worker could not find replication state > slot for replication origin with OID %u", > + node), > + errdetail("There is no replication state slot set by its main apply > worker."))); > > It is not a good idea to give apply workers specific messages from this API > because I don't think we can assume this is used by only apply workers. It seems > to me that if 'must_acquire' is false, then we should either give elog(ERROR, ..) > or there should be an Assert for the same. I am not completely sure but maybe > we can request the caller to supply the PID (which already has acquired this > origin) in case must_acquire is false and then use it in Assert/elog to ensure the > correct usage of API. What do you think? Agreed. I think we can replace the 'must_acquire' with the pid of worker which acquired this origin(called 'acquired_by'). We can use this pid to check and report the error if needed. > 17. The commit message can explain the abort-related new information this > patch sends to the subscribers. Added. > 18. > + * In streaming case (receiving a block of streamed transaction), for > + * SUBSTREAM_ON mode, simply redirect it to a file for the proper > + toplevel > + * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to > + apply > + * background workers (LOGICAL_REP_MSG_RELATION or > LOGICAL_REP_MSG_TYPE > + changes > + * will also be applied in main apply worker). > > In this, part of the comment "(LOGICAL_REP_MSG_RELATION or > LOGICAL_REP_MSG_TYPE changes will also be applied in main apply worker)" is > not very clear. Do you mean to say that these messages are applied by both > main and background apply workers, if so, then please state the same > explicitly? Changed. > 19. > - /* not in streaming mode */ > - if (!in_streamed_transaction) > + /* Not in streaming mode */ > + if (!(in_streamed_transaction || am_apply_bgworker())) > ... > ... > - /* write the change to the current file */ > + /* Write the change to the current file */ > stream_write_change(action, s); > > I don't see the need to change the above comments. Remove the changes. > 20. > static bool > handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) { ... > ... > + if (am_apply_bgworker()) > + { > + /* Define a savepoint for a subxact if needed. */ > + apply_bgworker_subxact_info_add(current_xid); > + > + return false; > + } > + > + if (apply_bgworker_active()) > > Isn't it better to use else if in the above code and probably else for the > remaining part of code in this function? Changed. Attach new version(v21) patch set which addressed all the comments received so far. Best regards, Hou zj
Attachment
- v21-0003-Add-some-checks-before-using-apply-background-wo.patch
- v21-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v21-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v21-0001-Perform-streaming-logical-transactions-by-backgr.patch
- v21-0002-Test-streaming-parallel-option-in-tap-test.patch
pgsql-hackers by date: