RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57160B0C0FDDCED638639696942B9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("kuroda.hayato@fujitsu.com" <kuroda.hayato@fujitsu.com>)
Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Tuesday, October 18, 2022 10:36 AM Peter Smith <smithpb2250@gmail.com> wrote:
> Hi, here are my review comments for patch v38-0001.

Thanks for your comments.

> ======
> 
> .../replication/logical/applyparallelworker.c
> 
> 1. parallel_apply_start_worker
> 
> + /* Try to get a free parallel apply worker. */ foreach(lc, 
> + ParallelApplyWorkersList) { ParallelApplyWorkerInfo *tmp_winfo;
> +
> + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
> +
> + if (!tmp_winfo->in_use)
> + {
> + /* Found a worker that has not been assigned a transaction. */ winfo 
> + = tmp_winfo; break; } }
> 
> The "Found a worker..." comment seems redundant because it's already 
> clear from the prior comment and the 'in_use' member what this code is 
> doing.

Removed.

> ~~~
> 
> 2. LogicalParallelApplyLoop
> 
> + void    *data;
> + Size len;
> + int c;
> + int rc;
> + StringInfoData s;
> + MemoryContext oldctx;
> 
> Several of these vars (like 'c', 'rc', 's') can be declared deeper - 
> e.g. only in the scope where they are actually used.

Changed.

> ~~~
> 
> 3.
> 
> + /* Ensure we are reading the data into our memory context. */ oldctx 
> + = MemoryContextSwitchTo(ApplyMessageContext);
> 
> Doesn't something need to switch back to this 'oldctx' prior to 
> breaking out of the for(;;) loop?
> 
> ~~~
> 
> 4.
> 
> + apply_dispatch(&s);
> +
> + MemoryContextReset(ApplyMessageContext);
> 
> Isn't this broken now? Since you've removed the 
> MemoryContextSwitchTo(oldctx), so next iteration will switch to 
> ApplyMessageContext again which will overwrite and lose knowledge of 
> the original 'oldctx' (??)

Sorry for the miss, fixed.

> ~~
> 
> 5.
> 
> Maybe this is a silly idea, I'm not sure. Because this is an infinite 
> loop, then instead of the multiple calls to
> MemoryContextReset(ApplyMessageContext) maybe there can be just a 
> single call to it immediately before you switch to that context in the 
> first place. The effect will be the same, won't it?
> 
> e.g.
> + /* Ensure we are reading the data into our memory context. */ 
> + MemoryContextReset(ApplyMessageContext); <=== THIS oldctx = 
> + MemoryContextSwitchTo(ApplyMessageContext);

In SHM_MQ_WOULD_BLOCK branch, we would invoke WaitLatch, so I feel we'd better
reset the memory context before waiting to avoid keeping no longer useful
memory context for more time (although it doesn’t matter too much in practice).
So, I didn't change this for now.

> ~~~
> 
> 6.
> 
> The code logic keeps flip-flopping for several versions. I think if 
> you are going to check all the return types of shm_mq_receive then 
> using a switch(shmq_res) might be a better way than having multiple 
> if/else with some Asserts.

Changed.

> ======
> 
> src/backend/replication/logical/launcher.c
> 
> 7. logicalrep_worker_launch
> 
> Previously I'd suggested ([1] #12) that the process name should change 
> for consistency, and AFAIK Amit also said [2] that would be OK, but 
> this change is still not done in the current patch.

Changed.

> ======
> 
> src/backend/replication/logical/worker.c
> 
> 8. should_apply_changes_for_rel
> 
>  * Should this worker apply changes for given relation.
>  *
>  * This is mainly needed for initial relation data sync as that runs 
> in
>  * separate worker process running in parallel and we need some way to 
> skip
>  * changes coming to the main apply worker during the sync of a table.
> 
> This existing comment refers to the "main apply worker". IMO it should 
> say "leader apply worker" to keep all the terminology consistent.

Changed.

> ~~~
> 
> 9. apply_handle_stream_start
> 
> + *
> + * XXX We can avoid sending pairs of the START/STOP messages to the 
> + parallel
> + * worker because unlike apply worker it will process only one 
> + transaction at a
> + * time. However, it is not clear whether that is worth the effort 
> + because it
> + * is sent after logical_decoding_work_mem changes.
>   */
>  static void
>  apply_handle_stream_start(StringInfo s)
> 
> As previously mentioned ([1] #13b) it's not obvious to me what that 
> last sentence means. e.g. "because it is sent"  - what is "it"?

Changed as Amit's suggestion in [1].

> ~~~
> 
> 11.
> 
> + /*
> + * Assign the appropriate streaming flag according to the 'streaming' 
> + mode
> + * and the publisher's ability to support that mode.
> + */
> 
> Maybe "streaming flag" ->  "streaming string/flag". (sorry, it was my 
> bad suggestion last time)

Improved.

Attach the version patch set.

[1] - https://www.postgresql.org/message-id/CAA4eK1%2BqwbD419%3DKgRTLRVj5zQhbM%3Dbfi-cvWG3HkORktb4-YA%40mail.gmail.com

Best Regards
Hou Zhijie

Attachment

pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: Getting rid of SQLValueFunction
Next
From: Bharath Rupireddy
Date:
Subject: Re: Avoid memory leaks during base backups