RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB571642A0A2D40FA402B999D794739@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Thur, Aug 18, 2022 11:44 AM Peter Smith <smithpb2250@gmail.com> wrote:
> Here are my review comments for patch v21-0001:
> 
> Note - There are some "general" comments which will result in lots of 
> smaller changes. The subsequent "detailed" review comments have some 
> overlap with these general comments but I expect some will be missed 
> so please search/replace to fix all code related to those general 
> comments.

Thanks for your comments.

> 1. GENERAL - main_worker_pid and replorigin_session_setup
> 
> Quite a few of my subsequent review comments below are related to the 
> somewhat tricky (IMO) change to the code for this area. Here is a 
> summary of some things that can be done to clean/simplify this logic.
> 
> 1a.
> Make the existing replorigin_session_setup function just be a wrapper 
> that delegates to the other function passing the acquired_by as 0.
> This is because in every case but one (in the apply bg worker main) we 
> are always passing 0, and IMO there is no need to spread the messy 
> extra param to places that do not use it.

Not sure about this. I feel interface change should
be fine in major release.

> 17. src/backend/replication/logical/applybgworker.c - 
> LogicalApplyBgworkerMain
> 
> + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker-
> >last_recv_time =
> + MyLogicalRepWorker->reply_time = 0;
> +
> + InitializeApplyWorker();
> 
> Lots of things happen within InitializeApplyWorker(). I think this 
> call deserves at least some comment to say it does lots of common 
> initialization. And same for the other caller or this in the apply 
> main worker.

I feel we can refer to the comments above/in the function InitializeApplyWorker.

> 19.
> + toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC,
> dsm_segment_address(seg),
> + segsize);

Since toc is just same as the input address which I think should not be NULL.
I think it's fine to skip the check here like what we did in other codes.

shm_toc_create(uint64 magic, void *address, Size nbytes)
{
    shm_toc    *toc = (shm_toc *) address;

> 20. src/backend/replication/logical/applybgworker.c - 
> apply_bgworker_setup
> 
> I think this function could be refactored to be cleaner and share more 
> common logic.
> 
> SUGGESTION
> 
> /* Setup shared memory, and attempt launch. */ if 
> (apply_bgworker_setup_dsm(wstate))
> {
> bool launched;
> launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
> MySubscription->oid,
> MySubscription->name,
> MyLogicalRepWorker->userid,
> InvalidOid,
> dsm_segment_handle(wstate->dsm_seg));
> if (launched)
> {
> ApplyBgworkersList = lappend(ApplyBgworkersList, wstate); 
> MemoryContextSwitchTo(oldcontext);
> return wstate;
> }
> else
> {
> dsm_detach(wstate->dsm_seg);
> wstate->dsm_seg = NULL;
> }
> }
> 
> pfree(wstate);
> MemoryContextSwitchTo(oldcontext);
> return NULL;

Not sure about this.

> 36. src/backend/replication/logical/tablesync.c - 
> process_syncing_tables
> 
> @@ -589,6 +590,9 @@ process_syncing_tables_for_apply(XLogRecPtr
> current_lsn)
>  void
>  process_syncing_tables(XLogRecPtr current_lsn)  {
> + if (am_apply_bgworker())
> + return;
> +
> 
> Perhaps should be a comment to describe why process_syncing_tables 
> should be skipped for the apply background worker?

I might refactor this function soon, so didn't change for now.
But I will consider it.

> 39. src/backend/replication/logical/worker.c - 
> handle_streamed_transaction
> 
> + /* Not in streaming mode and not in apply background worker. */ if 
> + (!(in_streamed_transaction || am_apply_bgworker()))
>   return false;
> IMO if you wanted to write the comment in that way then the code 
> should have matched it more closely like:
> if (!in_streamed_transaction && !am_apply_bgworker())
> 
> OTOH, if you want to keep the code as-is then the comment should be 
> worded slightly differently.

I feel both the in_streamed_transaction flag and in bgworker indicate that
we are in streaming mode. So it seems the original /* Not in streaming mode */
Should be fine.

> 44. src/backend/replication/logical/worker.c - InitializeApplyWorker
> 
> 
> +/*
> + * Initialize the databse connection, in-memory subscription and 
> +necessary
> + * config options.
> + */
>  void
> -ApplyWorkerMain(Datum main_arg)
> 44b.
> Should there be some more explanation in this comment to say that this 
> is common code for both the appl main workers and apply background 
> workers?
> 
> 44c.
> Following on from #44b, consider renaming this to something like
> CommonApplyWorkerInit() to emphasize it is called from multiple 
> places?

Not sure about this. if we change the bgworker name to parallel
apply worker in the future, it might be worth emphasizing this. So
I will consider this.

> 52.
> 
> +/* Apply background worker setup and interactions */ extern 
> +ApplyBgworkerInfo *apply_bgworker_start(TransactionId xid); extern 
> +ApplyBgworkerInfo *apply_bgworker_find(TransactionId xid); extern 
> +void apply_bgworker_wait_for(ApplyBgworkerInfo *wstate,  
> +ApplyBgworkerStatus wait_for_status); extern void 
> +apply_bgworker_send_data(ApplyBgworkerInfo *wstate, Size
> nbytes,
> + const void *data);
> +extern void apply_bgworker_free(ApplyBgworkerInfo *wstate); extern 
> +void apply_bgworker_check_status(void);
> +extern void apply_bgworker_set_status(ApplyBgworkerStatus status); 
> +extern void apply_bgworker_subxact_info_add(TransactionId 
> +current_xid); extern void apply_bgworker_savepoint_name(Oid suboid, Oid relid,
> +   char *spname, int szsp);
> 
> This big block of similarly named externs might as well be in 
> alphabetical order instead of apparently random.

I think Amit has a good idea in [2].
So I tried to reorder these based on related functionality.

The reply to your comments #4.2 for patch 0004 in [3]:
> 4.2
> 
> @@ -166,17 +175,6 @@ CREATE TRIGGER tri_tab1_unsafe  BEFORE INSERT ON 
> public.test_tab1  FOR EACH ROW EXECUTE PROCEDURE 
> trigger_func_tab1_unsafe();  ALTER TABLE test_tab1 ENABLE REPLICA 
> TRIGGER tri_tab1_unsafe;
> -
> -CREATE FUNCTION trigger_func_tab1_safe() RETURNS TRIGGER AS \$\$
> -  BEGIN
> -    RAISE NOTICE 'test for safe trigger function';
> - RETURN NEW;
> -  END
> -\$\$ language plpgsql;
> -ALTER FUNCTION trigger_func_tab1_safe IMMUTABLE; -CREATE TRIGGER 
> tri_tab1_safe -BEFORE INSERT ON public.test_tab1 -FOR EACH ROW EXECUTE 
> PROCEDURE trigger_func_tab1_safe();  });
> 
> I didn't understand why all this trigger_func_tab1_safe which was 
> added in patch 0003 is now getting removed in patch 0004. Maybe there 
> is some good reason, but it doesn't seem right to be adding code in 
> one patch and then removing it again in the next patch.

Because in 0003 we need to manually do something to let the test recover
from the constraint failure, while in 0004 it can automatically retry.

The rest of your comments are improved as suggested.

[1] - https://www.postgresql.org/message-id/CAHut%2BPuAxW57fowiMrn%3D3%3D53sagmehiTSW0o1Q52MpR3phUmyw%40mail.gmail.com
[2] - https://www.postgresql.org/message-id/CAA4eK1KpuQAk_fiqVXy16WkDrKPBwA9E61VpvLfkse-o31NNVA%40mail.gmail.com
[3] - https://www.postgresql.org/message-id/CAHut%2BPtCRkTT_KNaqA5Fn6_T38BXtFn4Eb3Ct-AbNko91s-cjQ%40mail.gmail.com

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: Tracking last scan time
Next
From: tushar
Date:
Subject: Re: replacing role-level NOINHERIT with a grant-level option