Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAD21AoDHfhRwWvL6TaRiseJsXt5--Jcd11M6CdM0CJ9=6xgEyA@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
On Thu, Dec 1, 2022 at 7:17 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Thursday, December 1, 2022 3:58 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >
> > On Wed, Nov 30, 2022 at 10:51 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> > > On Wednesday, November 30, 2022 9:41 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > > >
> > > > On Tuesday, November 29, 2022 8:34 PM Amit Kapila
> > > > > Review comments on v53-0001*
> > > >
> > > > Attach the new version patch set.
> > >
> > > Sorry, there were some mistakes in the previous patch set.
> > > Here is the correct V54 patch set. I also ran pgindent for the patch set.
> > >
> >
> > Thank you for updating the patches. Here are random review comments for
> > 0001 and 0002 patches.
>
> Thanks for the comments!
>
> >
> > ereport(ERROR,
> >                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> >                  errmsg("logical replication parallel apply worker exited
> > abnormally"),
> >                  errcontext("%s", edata.context))); and
> >
> > ereport(ERROR,
> >                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> >                  errmsg("logical replication parallel apply worker exited
> > because of subscription information change")));
> >
> > I'm not sure ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE is appropriate
> > here. Given that parallel apply worker has already reported the error message
> > with the error code, I think we don't need to set the errorcode for the logs
> > from the leader process.
> >
> > Also, I'm not sure the term "exited abnormally" is appropriate since we use it
> > when the server crashes for example. I think ERRORs reported here don't mean
> > that in general.
>
> How about reporting "xxx worker exited due to error" ?

Sounds better to me.

>
> > ---
> > if (am_parallel_apply_worker() && on_subinfo_change) {
> >     /*
> >      * If a parallel apply worker exits due to the subscription
> >      * information change, we notify the leader apply worker so that the
> >      * leader can report more meaningful message in time and restart the
> >      * logical replication.
> >      */
> >     pq_putmessage('X', NULL, 0);
> > }
> >
> > and
> >
> > ereport(ERROR,
> >                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> >                  errmsg("logical replication parallel apply worker exited
> > because of subscription information change")));
> >
> > Do we really need an additional message in case of 'X'? When we call
> > apply_worker_clean_exit with on_subinfo_change = true, we have reported the
> > error message such as:
> >
> > ereport(LOG,
> >         (errmsg("logical replication parallel apply worker for subscription
> > \"%s\" will stop because of a parameter change",
> >                 MySubscription->name)));
> >
> > I think that reporting a similar message from the leader might not be
> > meaningful for users.
>
> The intention is to let leader report more meaningful message if a worker
> exited due to subinfo change. Otherwise, the leader is likely to report an
> error like " lost connection ... to parallel apply worker" when trying to send
> data via shared memory if the worker exited. What do you think ?

Agreed. But do we need to have the leader exit with an error in spite
of the fact that the worker cleanly exits? If the leader exits with an
error, the subscription will be disabled if disable_on_error is true,
right?

And what do you think about the error code?

>
> > ---
> > -                if (options->proto.logical.streaming &&
> > -                        PQserverVersion(conn->streamConn) >= 140000)
> > -                        appendStringInfoString(&cmd, ", streaming 'on'");
> > +                if (options->proto.logical.streaming_str)
> > +                        appendStringInfo(&cmd, ", streaming '%s'",
> > +
> > options->proto.logical.streaming_str);
> >
> > and
> >
> > +        /*
> > +         * Assign the appropriate option value for streaming option
> > according to
> > +         * the 'streaming' mode and the publisher's ability to
> > support that mode.
> > +         */
> > +        if (server_version >= 160000 &&
> > +                MySubscription->stream == SUBSTREAM_PARALLEL)
> > +        {
> > +                options.proto.logical.streaming_str = pstrdup("parallel");
> > +                MyLogicalRepWorker->parallel_apply = true;
> > +        }
> > +        else if (server_version >= 140000 &&
> > +                         MySubscription->stream != SUBSTREAM_OFF)
> > +        {
> > +                options.proto.logical.streaming_str = pstrdup("on");
> > +                MyLogicalRepWorker->parallel_apply = false;
> > +        }
> > +        else
> > +        {
> > +                options.proto.logical.streaming_str = NULL;
> > +                MyLogicalRepWorker->parallel_apply = false;
> > +        }
> >
> > This change moves the code of adjustment of the streaming option based on
> > the publisher server version from libpqwalreceiver.c to worker.c.
> > On the other hand, the similar logic for other parameters such as "two_phase"
> > and "origin" are still done in libpqwalreceiver.c. How about passing
> > MySubscription->stream via WalRcvStreamOptions and constructing a
> > streaming option string in libpqrcv_startstreaming()?
> > In ApplyWorkerMain(), we just need to set
> > MyLogicalRepWorker->parallel_apply = true if (server_version >= 160000
> > && MySubscription->stream == SUBSTREAM_PARALLEL). We won't need
> > pstrdup for "parallel" and "on", and it's more consistent with other parameters.
>
> Thanks for the suggestion. I thought about the same idea before, but it seems
> we would weed to introduce " pg_subscription.h " into libpqwalreceiver.c. The
> libpqwalreceiver.c looks a like a common place. So I am not sure is it looks
> better to expose the detail of streaming option to it.

Right. It means that all enum parameters of WalRcvStreamOptions needs
to be handled in the caller (e.g. worker.c etc) whereas other
parameters are handled in libpqwalreceiver.c. It's not elegant but I
have no better idea for that.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: Re: [RFC] building postgres with meson - v13
Next
From: Amit Kapila
Date:
Subject: Re: Force streaming every change in logical decoding