Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAHut+PuccmYwfcxtH0K3RMQUNcfwJKAnaUENiyz-u3DijTwkFw@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
On Thu, Dec 8, 2022 at 7:43 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Thu, Dec 8, 2022 at 4:42 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Thu, Dec 8, 2022 at 12:42 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> > >
> > > On Wed, Dec 7, 2022 at 10:03 PM houzj.fnst@fujitsu.com
> > > <houzj.fnst@fujitsu.com> wrote:
> > > >
> > > >
> > > > > +static void
> > > > > +ProcessParallelApplyInterrupts(void)
> > > > > +{
> > > > > +        CHECK_FOR_INTERRUPTS();
> > > > > +
> > > > > +        if (ShutdownRequestPending)
> > > > > +        {
> > > > > +                ereport(LOG,
> > > > > +                                (errmsg("logical replication parallel
> > > > > apply worker for subscrip
> > > > > tion \"%s\" has finished",
> > > > > +                                                MySubscription->name)));
> > > > > +
> > > > > +                apply_worker_clean_exit(false);
> > > > > +        }
> > > > > +
> > > > > +        if (ConfigReloadPending)
> > > > > +        {
> > > > > +                ConfigReloadPending = false;
> > > > > +                ProcessConfigFile(PGC_SIGHUP);
> > > > > +        }
> > > > > +}
> > > > >
> > > > > I personally think that we don't need to have a function to do only
> > > > > these few things.
> > > >
> > > > I thought that introduce a new function make the handling of worker specific
> > > > Interrupts logic similar to other existing ones. Like:
> > > > ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
> > > > pgarch.c ...
> > >
> > > I think the difference from them is that there is only one place to
> > > call ProcessParallelApplyInterrupts().
> > >
> >
> > But I feel it is better to isolate this code in a separate function.
> > What if we decide to extend it further by having some logic to stop
> > workers after reloading of config?
>
> I think we can separate the function at that time. But let's keep the
> current code as you and Hou agree with the current code. I'm not going
> to insist on that.
>
> >
> > > >
> > > > > ---
> > > > > server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> > > > > options.proto.logical.proto_version =
> > > > > +                server_version >= 160000 ?
> > > > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
> > > > >         server_version >= 150000 ?
> > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
> > > > >         server_version >= 140000 ?
> > > > > LOGICALREP_PROTO_STREAM_VERSION_NUM :
> > > > >         LOGICALREP_PROTO_VERSION_NUM;
> > > > >
> > > > > Instead of always using the new protocol version, I think we can use
> > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> > > > > 'parallel'. That way, we don't need to change protocl version check
> > > > > logic in pgoutput.c and don't need to expose defGetStreamingMode().
> > > > > What do you think?
> > > >
> > > > I think that some user can also use the new version number when trying to get
> > > > changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
> > > > leave the check for new version number seems fine.
> > > >
> > > > Besides, I feel even if we don't use new version number, we still need to use
> > > > defGetStreamingMode to check if parallel mode in used as we need to send
> > > > abort_lsn when parallel is in used. I might be missing something, sorry for
> > > > that. Can you please explain the idea a bit ?
> > >
> > > My idea is that we use LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM if
> > > (server_version >= 160000 && MySubscription->stream ==
> > > SUBSTREAM_PARALLEL). If the stream is SUBSTREAM_ON, we use
> > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM even if server_version is
> > > 160000. That way, in pgoutput.c, we can send abort_lsn if the protocol
> > > version is LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM. We don't need
> > > to send "streaming = parallel" to the publisher since the publisher
> > > can decide whether or not to send abort_lsn based on the protocol
> > > version (still needs to send "streaming = on" though). I might be
> > > missing something.
> > >
> >
> > What if we decide to send some more additional information as part of
> > another patch like we are discussing in the thread [1]? Now, we won't
> > be able to decide the version number based on just the streaming
> > option. Also, in such a case, even for
> > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, it may not be a good
> > idea to send additional abort information unless the user has used the
> > streaming=parallel option.
>
> If we're going to send the additional information, it makes sense to
> send streaming=parallel. But the next question came to me is why do we
> need to increase the protocol version for parallel apply feature? If
> sending the additional information is also controlled by an option
> like "streaming", we can decide what we send based on these options,
> no?
>

AFAIK the protocol version defines what protocol message bytes are
transmitted on the wire. So I thought the protocol version should
*always* be updated whenever the message format changes. In other
words, I don't think we ought to be transmitting different protocol
message formats unless it is a different protocol version.

Whether the pub/sub implementation actually needs to check that
protocol version or whether we happen to have some alternative knob we
can check doesn't change what the protocol version is supposed to
mean. And the PGDOCS [1] and [2] currently have clear field notes
about when those fields are present (e.g. "This field is available
since protocol version XXX"), but if hypothetically you don't change
the protocol version for some new fields then now the message format
becomes tied to the built-in implementation of pub/sub -- now what
field note will you say instead to explain that?

------
[1] https://www.postgresql.org/docs/current/protocol-logical-replication.html
[2] https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html

Kind Regards,
Peter Smith.
Fujitsu Australia.



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: Re: postgres_fdw uninterruptible during connection establishment / ProcSignalBarrier
Next
From: Andrew Dunstan
Date:
Subject: Re: Error-safe user functions