RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB5716E527412A3481F90B4397941A9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
List pgsql-hackers
On Wednesday, December 7, 2022 7:51 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> 
> On Mon, Dec 5, 2022 at 1:29 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > On Sunday, December 4, 2022 7:17 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com>
> > >
> > > Thursday, December 1, 2022 8:40 PM Amit Kapila
> <amit.kapila16@gmail.com>
> > > wrote:
> > > > Some other comments:
> > > ...
> > > Attach the new version patch set which addressed most of the comments
> > > received so far except some comments being discussed[1].
> > > [1]
> https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C
> 8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com
> >
> > Attach a new version patch set which fixed a testcase failure on CFbot.
> 
> Here are some comments on v56 0001, 0002 patches. Please ignore
> comments if you already incorporated them in v57.

Thanks for the comments!

> +static void
> +ProcessParallelApplyInterrupts(void)
> +{
> +        CHECK_FOR_INTERRUPTS();
> +
> +        if (ShutdownRequestPending)
> +        {
> +                ereport(LOG,
> +                                (errmsg("logical replication parallel
> apply worker for subscrip
> tion \"%s\" has finished",
> +                                                MySubscription->name)));
> +
> +                apply_worker_clean_exit(false);
> +        }
> +
> +        if (ConfigReloadPending)
> +        {
> +                ConfigReloadPending = false;
> +                ProcessConfigFile(PGC_SIGHUP);
> +        }
> +}
> 
> I personally think that we don't need to have a function to do only
> these few things.

I thought that introduce a new function make the handling of worker specific
Interrupts logic similar to other existing ones. Like:
ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
pgarch.c ...

> 
> Should we change the names to something like
> LOGICALREP_STREAM_PARALLEL?

Agreed, will change.

> ---
> + * The lock graph for the above example will look as follows:
> + * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
> + * acquire the lock on the remote transaction) -> LA
> 
> and
> 
> + * The lock graph for the above example will look as follows:
> + * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
> + * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
> + * lock) -> LA
> 
> "(waiting to acquire the lock on the remote transaction)" in the first
> example and "(waiting to acquire the stream lock)" in the second
> example is the same meaning, right? If so, I think we should use
> either term for consistency.

Will change.

> ---
> +        bool           write_abort_info = (data->streaming ==
> SUBSTREAM_PARALLEL);
> 
> I think that instead of setting write_abort_info every time when
> pgoutput_stream_abort() is called, we can set it once, probably in
> PGOutputData, at startup.

I thought that since we already have a "stream" flag in PGOutputData, I am not
sure if it would be better to introduce another flag for the same option.


> ---
> server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> options.proto.logical.proto_version =
> +                server_version >= 160000 ?
> LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
>         server_version >= 150000 ?
> LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
>         server_version >= 140000 ?
> LOGICALREP_PROTO_STREAM_VERSION_NUM :
>         LOGICALREP_PROTO_VERSION_NUM;
> 
> Instead of always using the new protocol version, I think we can use
> LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> 'parallel'. That way, we don't need to change protocl version check
> logic in pgoutput.c and don't need to expose defGetStreamingMode().
> What do you think?

I think that some user can also use the new version number when trying to get
changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
leave the check for new version number seems fine.

Besides, I feel even if we don't use new version number, we still need to use
defGetStreamingMode to check if parallel mode in used as we need to send
abort_lsn when parallel is in used. I might be missing something, sorry for
that. Can you please explain the idea a bit ?

> ---
> When max_parallel_apply_workers_per_subscription is changed to a value
> lower than the number of parallel worker running at that time, do we
> need to stop extra workers?

I think we can do this, like adding a check in the main loop of leader worker, and
check every time after reloading the conf. OTOH, we will also stop the worker after
finishing a transaction, so I am slightly not sure do we need to add another check logic here.
But I am fine to add it if you think it would be better.


> ---
> If a value of max_parallel_apply_workers_per_subscription is not
> sufficient, we get the LOG "out of parallel apply workers" every time
> when the apply worker doesn't launch a worker. But do we really need
> this log? It seems not consistent with
> max_sync_workers_per_subscription behavior. I think we can check if
> the number of running parallel workers is less than
> max_parallel_apply_workers_per_subscription before calling
> logicalrep_worker_launch(). What do you think?
> 
> ---
> +        if (server_version >= 160000 &&
> +                MySubscription->stream == SUBSTREAM_PARALLEL)
> +        {
> +                options.proto.logical.streaming_str = pstrdup("parallel");
> +                MyLogicalRepWorker->parallel_apply = true;
> +        }
> +        else if (server_version >= 140000 &&
> +                         MySubscription->stream != SUBSTREAM_OFF)
> +        {
> +                options.proto.logical.streaming_str = pstrdup("on");
> +                MyLogicalRepWorker->parallel_apply = false;
> +        }
> 
> I think we don't need to use pstrdup().

Will remove.

> ---
> -       BeginTransactionBlock();
> -       CommitTransactionCommand(); /* Completes the preceding Begin
> command. */
> +       if (!IsTransactionBlock())
> +       {
> +               BeginTransactionBlock();
> +               CommitTransactionCommand(); /* Completes the preceding
> Begin command. */
> +       }
> 
> Do we need this change? In my environment, 'make check-world' passes
> without this change.

We will start a transaction block when defining the savepoint and we will get
a warning[1] if enter this function later. I think there would be some WARNs in
the log of " 022_twophase_cascade" test if we remove this check.

[1] WARN: there is already a transaction in progress"

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Next
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply