RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57166F15F4542FAE69A74EC094739@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
On Friday, August 19, 2022 4:49 PM Amit Kapila <amit.kapila16@gmail.com> > > On Thu, Aug 18, 2022 at 5:14 PM Amit Kapila <amit.kapila16@gmail.com> > wrote: > > > > On Wed, Aug 17, 2022 at 11:58 AM wangw.fnst@fujitsu.com > > <wangw.fnst@fujitsu.com> wrote: > > > > > > Attach the new patches. > > > > > > > Few comments on v23-0001 > > ======================= > > > > Some more comments on v23-0001 > ============================ > 1. > static bool > handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) { ... > - /* not in streaming mode */ > - if (!in_streamed_transaction) > + /* Not in streaming mode and not in apply background worker. */ if > + (!(in_streamed_transaction || am_apply_bgworker())) > return false; > > This check appears a bit strange because ideally in bgworker > in_streamed_transaction should be false. I think we should set > in_streamed_transaction to true in apply_handle_stream_start() only when we > are going to write to file. Is there a reason for not doing the same? No, I removed this. > 2. > + { > + /* This is the main apply worker. */ > + ApplyBgworkerInfo *wstate = apply_bgworker_find(xid); > + > + /* > + * Check if we are processing this transaction using an apply > + * background worker and if so, send the changes to that worker. > + */ > + if (wstate) > + { > + /* Send STREAM ABORT message to the apply background worker. */ > + apply_bgworker_send_data(wstate, s->len, s->data); > > Why at some places the patch needs to separately fetch ApplyBgworkerInfo > whereas at other places it directly uses stream_apply_worker to pass the data > to bgworker. > 3. Why apply_handle_stream_abort() or apply_handle_stream_prepare() > doesn't use apply_bgworker_active() to identify whether it needs to send the > information to bgworker? I think stream_apply_worker is only valid between STREAM_START and STREAM_END, But it seems it's not clear from the code. So I added some comments and slightly refactor the code. > 4. In apply_handle_stream_prepare(), apply_handle_stream_abort(), and some > other similar functions, the patch handles three cases (a) apply background > worker, (b) sending data to bgworker, (c) handling for streamed transaction in > apply worker. I think the code will look better if you move the respective code > for all three cases into separate functions. Surely, if the code to deal with each > of the cases is less then we don't need to move it to a separate function. Refactored and simplified. > 5. > @@ -1088,24 +1177,78 @@ apply_handle_stream_prepare(StringInfo s) { ... > + in_remote_transaction = false; > + > + /* Unlink the files with serialized changes and subxact info. */ > + stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); } } > > in_remote_transaction = false; > ... > > We don't need to in_remote_transaction to false in multiple places. Removed. > 6. > @@ -1177,36 +1311,93 @@ apply_handle_stream_start(StringInfo s) { ... > ... > + if (am_apply_bgworker()) > { > - MemoryContext oldctx; > - > - oldctx = MemoryContextSwitchTo(ApplyContext); > + /* > + * Make sure the handle apply_dispatch methods are aware we're in a > + * remote transaction. > + */ > + in_remote_transaction = true; > > - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); > - FileSetInit(MyLogicalRepWorker->stream_fileset); > + /* Begin the transaction. */ > + AcceptInvalidationMessages(); > + maybe_reread_subscription(); > > - MemoryContextSwitchTo(oldctx); > + StartTransactionCommand(); > + BeginTransactionBlock(); > + CommitTransactionCommand(); > } > ... > > Why do we need to start a transaction here? Why can't it be done via > begin_replication_step() during the first operation apply? Is it because we may > need to define a save point in bgworker and we don't that information > beforehand? If so, then also, can't it be handled by > begin_replication_step() either by explicitly passing the information or > checking it there and then starting a transaction block? In any case, please add > a few comments to explain why this separate handling is required for > bgworker? The transaction block is used to define the savepoint and I moved these codes to the place where the savepoint is defined which looks better now. > 7. When we are already setting bgworker status as APPLY_BGWORKER_BUSY in > apply_bgworker_setup_dsm() then why do we need to set it again in > apply_bgworker_start()? Removed. > 8. It is not clear to me how APPLY_BGWORKER_EXIT status is used. Is it required > for the cases where bgworker exists due to some error and then apply worker > uses it to detect that and exits? How other bgworkers would notice this, is it > done via apply_bgworker_check_status()? It was used to detect the unexpected exit of bgworker and I have changed the design of this which is now similar to what we have in parallel query. Attach the new version patch set(v24) which address above comments. Besides, I added some logic which try to stop the bgworker at transaction end if there are enough workers in the pool. Best regards, Hou zj
Attachment
- v24-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v24-0001-Perform-streaming-logical-transactions-by-backgr.patch
- v24-0002-Test-streaming-parallel-option-in-tap-test.patch
- v24-0003-Add-some-checks-before-using-apply-background-wo.patch
- v24-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
pgsql-hackers by date: