Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAD21AoAD6uC_X4FhKYFJHdCZQu9rm7c5=hvTPeghBUf=Ybga9Q@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
On Fri, Dec 9, 2022 at 3:05 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Fri, Dec 9, 2022 at 7:45 AM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > On Thu, Dec 8, 2022 at 7:43 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> > >
> > > On Thu, Dec 8, 2022 at 4:42 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > > >
> > > > On Thu, Dec 8, 2022 at 12:42 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> > > > >
> > > > > On Wed, Dec 7, 2022 at 10:03 PM houzj.fnst@fujitsu.com
> > > > > <houzj.fnst@fujitsu.com> wrote:
> > > > > >
> > > > > >
> > > > > > > +static void
> > > > > > > +ProcessParallelApplyInterrupts(void)
> > > > > > > +{
> > > > > > > +        CHECK_FOR_INTERRUPTS();
> > > > > > > +
> > > > > > > +        if (ShutdownRequestPending)
> > > > > > > +        {
> > > > > > > +                ereport(LOG,
> > > > > > > +                                (errmsg("logical replication parallel
> > > > > > > apply worker for subscrip
> > > > > > > tion \"%s\" has finished",
> > > > > > > +                                                MySubscription->name)));
> > > > > > > +
> > > > > > > +                apply_worker_clean_exit(false);
> > > > > > > +        }
> > > > > > > +
> > > > > > > +        if (ConfigReloadPending)
> > > > > > > +        {
> > > > > > > +                ConfigReloadPending = false;
> > > > > > > +                ProcessConfigFile(PGC_SIGHUP);
> > > > > > > +        }
> > > > > > > +}
> > > > > > >
> > > > > > > I personally think that we don't need to have a function to do only
> > > > > > > these few things.
> > > > > >
> > > > > > I thought that introduce a new function make the handling of worker specific
> > > > > > Interrupts logic similar to other existing ones. Like:
> > > > > > ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
> > > > > > pgarch.c ...
> > > > >
> > > > > I think the difference from them is that there is only one place to
> > > > > call ProcessParallelApplyInterrupts().
> > > > >
> > > >
> > > > But I feel it is better to isolate this code in a separate function.
> > > > What if we decide to extend it further by having some logic to stop
> > > > workers after reloading of config?
> > >
> > > I think we can separate the function at that time. But let's keep the
> > > current code as you and Hou agree with the current code. I'm not going
> > > to insist on that.
> > >
> > > >
> > > > > >
> > > > > > > ---
> > > > > > > server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> > > > > > > options.proto.logical.proto_version =
> > > > > > > +                server_version >= 160000 ?
> > > > > > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
> > > > > > >         server_version >= 150000 ?
> > > > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
> > > > > > >         server_version >= 140000 ?
> > > > > > > LOGICALREP_PROTO_STREAM_VERSION_NUM :
> > > > > > >         LOGICALREP_PROTO_VERSION_NUM;
> > > > > > >
> > > > > > > Instead of always using the new protocol version, I think we can use
> > > > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> > > > > > > 'parallel'. That way, we don't need to change protocl version check
> > > > > > > logic in pgoutput.c and don't need to expose defGetStreamingMode().
> > > > > > > What do you think?
> > > > > >
> > > > > > I think that some user can also use the new version number when trying to get
> > > > > > changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
> > > > > > leave the check for new version number seems fine.
> > > > > >
> > > > > > Besides, I feel even if we don't use new version number, we still need to use
> > > > > > defGetStreamingMode to check if parallel mode in used as we need to send
> > > > > > abort_lsn when parallel is in used. I might be missing something, sorry for
> > > > > > that. Can you please explain the idea a bit ?
> > > > >
> > > > > My idea is that we use LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM if
> > > > > (server_version >= 160000 && MySubscription->stream ==
> > > > > SUBSTREAM_PARALLEL). If the stream is SUBSTREAM_ON, we use
> > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM even if server_version is
> > > > > 160000. That way, in pgoutput.c, we can send abort_lsn if the protocol
> > > > > version is LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM. We don't need
> > > > > to send "streaming = parallel" to the publisher since the publisher
> > > > > can decide whether or not to send abort_lsn based on the protocol
> > > > > version (still needs to send "streaming = on" though). I might be
> > > > > missing something.
> > > > >
> > > >
> > > > What if we decide to send some more additional information as part of
> > > > another patch like we are discussing in the thread [1]? Now, we won't
> > > > be able to decide the version number based on just the streaming
> > > > option. Also, in such a case, even for
> > > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, it may not be a good
> > > > idea to send additional abort information unless the user has used the
> > > > streaming=parallel option.
> > >
> > > If we're going to send the additional information, it makes sense to
> > > send streaming=parallel. But the next question came to me is why do we
> > > need to increase the protocol version for parallel apply feature? If
> > > sending the additional information is also controlled by an option
> > > like "streaming", we can decide what we send based on these options,
> > > no?
> > >
> >
> > AFAIK the protocol version defines what protocol message bytes are
> > transmitted on the wire. So I thought the protocol version should
> > *always* be updated whenever the message format changes. In other
> > words, I don't think we ought to be transmitting different protocol
> > message formats unless it is a different protocol version.
> >
> > Whether the pub/sub implementation actually needs to check that
> > protocol version or whether we happen to have some alternative knob we
> > can check doesn't change what the protocol version is supposed to
> > mean. And the PGDOCS [1] and [2] currently have clear field notes
> > about when those fields are present (e.g. "This field is available
> > since protocol version XXX"), but if hypothetically you don't change
> > the protocol version for some new fields then now the message format
> > becomes tied to the built-in implementation of pub/sub -- now what
> > field note will you say instead to explain that?
> >
>
> I think the protocol version acts as a backstop to not send some
> information which clients don't understand. Now, the other way is to
> believe the client when it sends a particular option (say streaming =
> on (aka allow sending in-progress transactions)) that it will
> understand additional information for that feature but the protocol
> version acts as a backstop in that case.

Yeah, it seems that this is how the logical replication protocol has
been working. New logical replication protocol versions have backward
compatibility. I was thinking that the protocol version needs to bump
if there is no compatibility, i.g. if most clients need to change to
support new protocols.

> As Peter mentioned, it will
> be easier to explain the additional information we are sending across
> different versions without relying on additional options for pub/sub.
> Having said that, we can send additional required information based on
> just the new option but I felt it is better to bump the protocol
> version along with it unless we see any downside to it. What do you
> think?

I agree to bump the protocol version.

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: static assert cleanup
Next
From: Amit Langote
Date:
Subject: Re: generic plans and "initial" pruning