Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAD21AoBX_7wybs9rVF1Y7AzUOMk6eG8301zjnif_uE1vqC85mw@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Thu, Dec 8, 2022 at 4:42 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Thu, Dec 8, 2022 at 12:42 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >
> > On Wed, Dec 7, 2022 at 10:03 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> > >
> > > > +static void
> > > > +ProcessParallelApplyInterrupts(void)
> > > > +{
> > > > +        CHECK_FOR_INTERRUPTS();
> > > > +
> > > > +        if (ShutdownRequestPending)
> > > > +        {
> > > > +                ereport(LOG,
> > > > +                                (errmsg("logical replication parallel
> > > > apply worker for subscrip
> > > > tion \"%s\" has finished",
> > > > +                                                MySubscription->name)));
> > > > +
> > > > +                apply_worker_clean_exit(false);
> > > > +        }
> > > > +
> > > > +        if (ConfigReloadPending)
> > > > +        {
> > > > +                ConfigReloadPending = false;
> > > > +                ProcessConfigFile(PGC_SIGHUP);
> > > > +        }
> > > > +}
> > > >
> > > > I personally think that we don't need to have a function to do only
> > > > these few things.
> > >
> > > I thought that introduce a new function make the handling of worker specific
> > > Interrupts logic similar to other existing ones. Like:
> > > ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
> > > pgarch.c ...
> >
> > I think the difference from them is that there is only one place to
> > call ProcessParallelApplyInterrupts().
> >
>
> But I feel it is better to isolate this code in a separate function.
> What if we decide to extend it further by having some logic to stop
> workers after reloading of config?

I think we can separate the function at that time. But let's keep the
current code as you and Hou agree with the current code. I'm not going
to insist on that.

>
> > >
> > > > ---
> > > > server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> > > > options.proto.logical.proto_version =
> > > > +                server_version >= 160000 ?
> > > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
> > > >         server_version >= 150000 ?
> > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
> > > >         server_version >= 140000 ?
> > > > LOGICALREP_PROTO_STREAM_VERSION_NUM :
> > > >         LOGICALREP_PROTO_VERSION_NUM;
> > > >
> > > > Instead of always using the new protocol version, I think we can use
> > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> > > > 'parallel'. That way, we don't need to change protocl version check
> > > > logic in pgoutput.c and don't need to expose defGetStreamingMode().
> > > > What do you think?
> > >
> > > I think that some user can also use the new version number when trying to get
> > > changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
> > > leave the check for new version number seems fine.
> > >
> > > Besides, I feel even if we don't use new version number, we still need to use
> > > defGetStreamingMode to check if parallel mode in used as we need to send
> > > abort_lsn when parallel is in used. I might be missing something, sorry for
> > > that. Can you please explain the idea a bit ?
> >
> > My idea is that we use LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM if
> > (server_version >= 160000 && MySubscription->stream ==
> > SUBSTREAM_PARALLEL). If the stream is SUBSTREAM_ON, we use
> > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM even if server_version is
> > 160000. That way, in pgoutput.c, we can send abort_lsn if the protocol
> > version is LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM. We don't need
> > to send "streaming = parallel" to the publisher since the publisher
> > can decide whether or not to send abort_lsn based on the protocol
> > version (still needs to send "streaming = on" though). I might be
> > missing something.
> >
>
> What if we decide to send some more additional information as part of
> another patch like we are discussing in the thread [1]? Now, we won't
> be able to decide the version number based on just the streaming
> option. Also, in such a case, even for
> LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, it may not be a good
> idea to send additional abort information unless the user has used the
> streaming=parallel option.

If we're going to send the additional information, it makes sense to
send streaming=parallel. But the next question came to me is why do we
need to increase the protocol version for parallel apply feature? If
sending the additional information is also controlled by an option
like "streaming", we can decide what we send based on these options,
no?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



pgsql-hackers by date:

Previous
From: Amit Langote
Date:
Subject: Re: Allow batched insert during cross-partition updates
Next
From: David Geier
Date:
Subject: Aggregate node doesn't include cost for sorting