RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB5716A8B3EDCABAFC3031769B94559@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
On Monday, September 26, 2022 6:58 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Mon, Sep 26, 2022 at 8:41 AM wangw.fnst@fujitsu.com
> <wangw.fnst@fujitsu.com> wrote:
> >
> > On Thur, Sep 22, 2022 at 18:12 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> >
> > > 3.
> > > ApplyWorkerMain()
> > > {
> > > ...
> > > ...
> > > +
> > > + if (server_version >= 160000 &&
> > > + MySubscription->stream == SUBSTREAM_PARALLEL)
> > > + options.proto.logical.streaming = pstrdup("parallel");
> > >
> > > After deciding here whether the parallel streaming mode is enabled
> > > or not, we recheck the same thing in apply_handle_stream_abort() and
> > > parallel_apply_can_start(). In parallel_apply_can_start(), we do it
> > > via two different checks. How about storing this information say in
> > > structure MyLogicalRepWorker in ApplyWorkerMain() and then use it at
> > > other places?
> >
> > Improved as suggested.
> > Added a new flag "in_parallel_apply" to structure MyLogicalRepWorker.
> >
> 
> Can we name the variable in_parallel_apply as parallel_apply and set it in
> logicalrep_worker_launch() instead of in ParallelApplyWorkerMain()?

Changed.

> Few other comments:
> ==================
> 1.
> + if (is_subworker &&
> + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
> + {
> + LWLockRelease(LogicalRepWorkerLock);
> +
> + ereport(DEBUG1,
> + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
> + errmsg("out of parallel apply workers"), errhint("You might need to
> + increase
> max_parallel_apply_workers_per_subscription.")));
> 
> I think it is better to keep the level of this as LOG. Similar messages at other
> places use WARNING or LOG. Here, I prefer LOG because the system can still
> proceed without blocking anything.

Changed.

> 2.
> +/* Reset replication origin tracking. */ void
> +parallel_apply_replorigin_reset(void)
> +{
> + bool started_tx = false;
> +
> + /* This function might be called inside or outside of transaction. */
> + if (!IsTransactionState()) { StartTransactionCommand(); started_tx =
> + true; }
> 
> Why do we need a transaction in this function?

I think we don't need it and removed this in the new version patch.

> 3. Few suggestions to improve in the patch:
> diff --git a/src/backend/replication/logical/worker.c
> b/src/backend/replication/logical/worker.c
> index 1623c9e2fa..d9c519dfab 100644
> --- a/src/backend/replication/logical/worker.c
> +++ b/src/backend/replication/logical/worker.c
> @@ -1264,6 +1264,10 @@ apply_handle_stream_prepare(StringInfo s)
>   case TRANS_LEADER_SEND_TO_PARALLEL:
>   Assert(winfo);
> 
> + /*
> + * The origin can be active only in one process. See
> + * apply_handle_stream_commit.
> + */
>   parallel_apply_replorigin_reset();
> 
>   /* Send STREAM PREPARE message to the parallel apply worker. */ @@
> -1623,12 +1627,7 @@ apply_handle_stream_abort(StringInfo s)
>   (errcode(ERRCODE_PROTOCOL_VIOLATION),
>   errmsg_internal("STREAM ABORT message without STREAM STOP")));
> 
> - /*
> - * Check whether the publisher sends abort_lsn and abort_time.
> - *
> - * Note that the parallel apply worker is only started when the publisher
> - * sends abort_lsn and abort_time.
> - */
> + /* We receive abort information only when we can apply in parallel. */
>   if (MyLogicalRepWorker->in_parallel_apply)
>   read_abort_info = true;
> 
> @@ -1656,7 +1655,13 @@ apply_handle_stream_abort(StringInfo s)
>   Assert(winfo);
> 
>   if (subxid == xid)
> + {
> + /*
> + * The origin can be active only in one process. See
> + * apply_handle_stream_commit.
> + */
>   parallel_apply_replorigin_reset();
> + }
> 
>   /* Send STREAM ABORT message to the parallel apply worker. */
>   parallel_apply_send_data(winfo, s->len, s->data); @@ -1858,6 +1863,12 @@
> apply_handle_stream_commit(StringInfo s)
>   case TRANS_LEADER_SEND_TO_PARALLEL:
>   Assert(winfo);
> 
> + /*
> + * We need to reset the replication origin before sending the commit
> + * message and set it up again after confirming that parallel worker
> + * has processed the message. This is required because origin can be
> + * active only in one process at-a-time.
> + */
>   parallel_apply_replorigin_reset();
> 
>   /* Send STREAM COMMIT message to the parallel apply worker. */ diff --git
> a/src/include/replication/worker_internal.h
> b/src/include/replication/worker_internal.h
> index 4cbfb43492..2bd9664f86 100644
> --- a/src/include/replication/worker_internal.h
> +++ b/src/include/replication/worker_internal.h
> @@ -70,11 +70,7 @@ typedef struct LogicalRepWorker
>   */
>   pid_t apply_leader_pid;
> 
> - /*
> - * Indicates whether to use parallel apply workers.
> - *
> - * Determined based on streaming parameter and publisher version.
> - */
> + /* Indicates whether apply can be performed parallelly. */
>   bool in_parallel_apply;
> 

Merged, thanks.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply