RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57161DD88F755587E8FE2C3B947F9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Friday, September 2, 2022 2:10 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Thu, Sep 1, 2022 at 4:53 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> 
> Review of v27-0001*:

Thanks for the comments.

> ================
> 1. I feel the usage of in_remote_transaction and in_use flags is slightly complex.
> IIUC, the patch uses in_use flag to ensure commit ordering by waiting for it to
> become false before proceeding in transaction finish commands in leader
> apply worker. If so, I think it is better to name it in_parallel_apply_xact and set it
> to true only when we start applying xact in parallel apply worker and set it to
> false when we finish the xact in parallel apply worker. It can be initialized to false
> while setting up DSM. Also, accordingly change the function
> parallel_apply_wait_for_free() to parallel_apply_wait_for_xact_finish and
> parallel_apply_set_idle to parallel_apply_set_xact_finish. We can change the
> name of the in_remote_transaction flag to in_use.

Agreed. One thing I found when addressing this is that there could be a race
condition if we want to set the flag in parallel apply worker:

where the leader has already started waiting for the parallel apply worker to
finish processing the transaction(set the in_parallel_apply_xact to false)
while the child process has not yet processed the first STREAM_START and has
not set the in_parallel_apply_xact to true.

> Please explain about these flags in the struct where they are declared.
> 
> 2. The worker_id in ParallelApplyWorkerShared struct could have wrong
> information after the worker is reused from the pool. Because we could have
> removed some other worker from the ParallelApplyWorkersList which will
> make the value of worker_id wrong. For error/debug messages, we can
> probably use LSN if available or can oid of subscription if required. I thought of
> using xid as well but I think it is better to avoid that in messages as it can
> wraparound. See, if the patch uses xid in other messages, it is better to either
> use it along with LSN or try to use only LSN.
> 3.
> elog(ERROR, "[Parallel Apply Worker #%u] unexpected message \"%c\"",
> + shared->worker_id, c);
> 
> Also, I am not sure whether the above style (use of []) of messages is good. Did
> you follow the usage from some other place?
> 4.
> apply_handle_stream_stop(StringInfo s)
> {
> ...
> + if (apply_action == TA_APPLY_IN_PARALLEL_WORKER) { elog(DEBUG1,
> + "[Parallel Apply Worker #%u] ended processing streaming chunk, "
> + "waiting on shm_mq_receive", MyParallelShared->worker_id);
> ...
> 
> I don't understand the relevance of "waiting on shm_mq_receive" in the
> above message because AFAICS, here we are not waiting on any receive
> call.
> 
> 5. I suggest you please go through all the ERROR/LOG/DEBUG messages in
> the patch and try to improve them based on the above comments.

I removed the worker_id and also removed and improved some DEBUG/ERROR
messages which I think is not clear or we don't have similar message in existing code.

> 6.
> + * The dynamic shared memory segment will contain (1) a shm_mq that can be
> used
> + * to send errors (and other messages reported via elog/ereport) from the
> + * parallel apply worker to leader apply worker (2) another shm_mq that can
> be
> + * used to send changes in the transaction from leader apply worker to parallel
> + * apply worker
> 
> Here, it would be better to switch (1) and (2). I feel it is better to
> explain first about how the main apply information is exchanged among
> workers.

Exchanged.

> 7.
> + /* Try to get a free parallel apply worker. */
> + foreach(lc, ParallelApplyWorkersList)
> + {
> + ParallelApplyWorkerInfo *tmp_winfo;
> +
> + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
> +
> + if (tmp_winfo->error_mq_handle == NULL)
> + {
> + /*
> + * Release the worker information and try next one if the parallel
> + * apply worker exited cleanly.
> + */
> + ParallelApplyWorkersList =
> foreach_delete_current(ParallelApplyWorkersList, lc);
> + shm_mq_detach(tmp_winfo->mq_handle);
> + dsm_detach(tmp_winfo->dsm_seg);
> + pfree(tmp_winfo);
> +
> + continue;
> + }
> +
> + if (!tmp_winfo->in_remote_transaction)
> + {
> + winfo = tmp_winfo;
> + break;
> + }
> + }
> 
> Can we write it as if ... else if? If so, then we don't need to
> continue in the first loop. And, can we add some more comments to
> explain these cases?

Changed.


Attach the new version patch set which addressed above comments and
also fixed another problem while subscriber to a low version publisher.

Best regards,
Hou zj

Attachment

pgsql-hackers by date:

Previous
From: Ranier Vilela
Date:
Subject: Re: Fix possible bogus array out of bonds (src/backend/access/brin/brin_minmax_multi.c)
Next
From: Matthias van de Meent
Date:
Subject: Re: Different compression methods for FPI