Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAD21AoCs3zDutS5ft0ixH_K-Q4MT7L68Yg-gHnvGTjGoGQrt8Q@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
On Wed, Dec 7, 2022 at 10:03 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Wednesday, December 7, 2022 7:51 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >
> > On Mon, Dec 5, 2022 at 1:29 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> > > On Sunday, December 4, 2022 7:17 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com>
> > > >
> > > > Thursday, December 1, 2022 8:40 PM Amit Kapila
> > <amit.kapila16@gmail.com>
> > > > wrote:
> > > > > Some other comments:
> > > > ...
> > > > Attach the new version patch set which addressed most of the comments
> > > > received so far except some comments being discussed[1].
> > > > [1]
> > https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C
> > 8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com
> > >
> > > Attach a new version patch set which fixed a testcase failure on CFbot.
> >
> > Here are some comments on v56 0001, 0002 patches. Please ignore
> > comments if you already incorporated them in v57.
>
> Thanks for the comments!
>
> > +static void
> > +ProcessParallelApplyInterrupts(void)
> > +{
> > +        CHECK_FOR_INTERRUPTS();
> > +
> > +        if (ShutdownRequestPending)
> > +        {
> > +                ereport(LOG,
> > +                                (errmsg("logical replication parallel
> > apply worker for subscrip
> > tion \"%s\" has finished",
> > +                                                MySubscription->name)));
> > +
> > +                apply_worker_clean_exit(false);
> > +        }
> > +
> > +        if (ConfigReloadPending)
> > +        {
> > +                ConfigReloadPending = false;
> > +                ProcessConfigFile(PGC_SIGHUP);
> > +        }
> > +}
> >
> > I personally think that we don't need to have a function to do only
> > these few things.
>
> I thought that introduce a new function make the handling of worker specific
> Interrupts logic similar to other existing ones. Like:
> ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
> pgarch.c ...

I think the difference from them is that there is only one place to
call ProcessParallelApplyInterrupts().

>
> >
> > Should we change the names to something like
> > LOGICALREP_STREAM_PARALLEL?
>
> Agreed, will change.
>
> > ---
> > + * The lock graph for the above example will look as follows:
> > + * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
> > + * acquire the lock on the remote transaction) -> LA
> >
> > and
> >
> > + * The lock graph for the above example will look as follows:
> > + * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
> > + * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
> > + * lock) -> LA
> >
> > "(waiting to acquire the lock on the remote transaction)" in the first
> > example and "(waiting to acquire the stream lock)" in the second
> > example is the same meaning, right? If so, I think we should use
> > either term for consistency.
>
> Will change.
>
> > ---
> > +        bool           write_abort_info = (data->streaming ==
> > SUBSTREAM_PARALLEL);
> >
> > I think that instead of setting write_abort_info every time when
> > pgoutput_stream_abort() is called, we can set it once, probably in
> > PGOutputData, at startup.
>
> I thought that since we already have a "stream" flag in PGOutputData, I am not
> sure if it would be better to introduce another flag for the same option.

I see your point. Another way is to have it as a static variable like
publish_no_origin. But since It's trivial change I'm fine also with
the current code.

>
> > ---
> > server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> > options.proto.logical.proto_version =
> > +                server_version >= 160000 ?
> > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
> >         server_version >= 150000 ?
> > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
> >         server_version >= 140000 ?
> > LOGICALREP_PROTO_STREAM_VERSION_NUM :
> >         LOGICALREP_PROTO_VERSION_NUM;
> >
> > Instead of always using the new protocol version, I think we can use
> > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> > 'parallel'. That way, we don't need to change protocl version check
> > logic in pgoutput.c and don't need to expose defGetStreamingMode().
> > What do you think?
>
> I think that some user can also use the new version number when trying to get
> changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
> leave the check for new version number seems fine.
>
> Besides, I feel even if we don't use new version number, we still need to use
> defGetStreamingMode to check if parallel mode in used as we need to send
> abort_lsn when parallel is in used. I might be missing something, sorry for
> that. Can you please explain the idea a bit ?

My idea is that we use LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM if
(server_version >= 160000 && MySubscription->stream ==
SUBSTREAM_PARALLEL). If the stream is SUBSTREAM_ON, we use
LOGICALREP_PROTO_TWOPHASE_VERSION_NUM even if server_version is
160000. That way, in pgoutput.c, we can send abort_lsn if the protocol
version is LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM. We don't need
to send "streaming = parallel" to the publisher since the publisher
can decide whether or not to send abort_lsn based on the protocol
version (still needs to send "streaming = on" though). I might be
missing something.

My question came from the fact that the difference between
LOGICALREP_PROTO_TWOPHASE_VERSION_NUM and
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is just whether or not to
send abort_lsn and there are two knobs to control that. IIUC even if
we use the new protocol version, the data actually sent during logical
replication are the same as the previous protocol version if streaming
is not 'parallel'. So I thought that we do either not send 'parallel'
to the publisher (i.e., send abort_lsn based on the protocol version)
or not introduce a new protocol version (i.e. send abort_lsn based on
the streaming option).

>
> > ---
> > When max_parallel_apply_workers_per_subscription is changed to a value
> > lower than the number of parallel worker running at that time, do we
> > need to stop extra workers?
>
> I think we can do this, like adding a check in the main loop of leader worker, and
> check every time after reloading the conf. OTOH, we will also stop the worker after
> finishing a transaction, so I am slightly not sure do we need to add another check logic here.
> But I am fine to add it if you think it would be better.
>
>
> > ---
> > If a value of max_parallel_apply_workers_per_subscription is not
> > sufficient, we get the LOG "out of parallel apply workers" every time
> > when the apply worker doesn't launch a worker. But do we really need
> > this log? It seems not consistent with
> > max_sync_workers_per_subscription behavior. I think we can check if
> > the number of running parallel workers is less than
> > max_parallel_apply_workers_per_subscription before calling
> > logicalrep_worker_launch(). What do you think?
> >
> > ---
> > +        if (server_version >= 160000 &&
> > +                MySubscription->stream == SUBSTREAM_PARALLEL)
> > +        {
> > +                options.proto.logical.streaming_str = pstrdup("parallel");
> > +                MyLogicalRepWorker->parallel_apply = true;
> > +        }
> > +        else if (server_version >= 140000 &&
> > +                         MySubscription->stream != SUBSTREAM_OFF)
> > +        {
> > +                options.proto.logical.streaming_str = pstrdup("on");
> > +                MyLogicalRepWorker->parallel_apply = false;
> > +        }
> >
> > I think we don't need to use pstrdup().
>
> Will remove.
>
> > ---
> > -       BeginTransactionBlock();
> > -       CommitTransactionCommand(); /* Completes the preceding Begin
> > command. */
> > +       if (!IsTransactionBlock())
> > +       {
> > +               BeginTransactionBlock();
> > +               CommitTransactionCommand(); /* Completes the preceding
> > Begin command. */
> > +       }
> >
> > Do we need this change? In my environment, 'make check-world' passes
> > without this change.
>
> We will start a transaction block when defining the savepoint and we will get
> a warning[1] if enter this function later. I think there would be some WARNs in
> the log of " 022_twophase_cascade" test if we remove this check.

Thanks, I understood.

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Next
From: Etsuro Fujita
Date:
Subject: Re: postgres_fdw: batch inserts vs. before row triggers