Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAHut+Pt40V32xwDZ6qpuh6SOuk8UOxyAFfL70bgJVv5VC66QLA@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Thu, Aug 18, 2022 at 6:57 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Thu, Aug 18, 2022 at 11:59 AM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > Here are my review comments for patch v21-0001:
> >
> > 4. Commit message
> >
> > In addition, the patch extends the logical replication STREAM_ABORT message so
> > that abort_time and abort_lsn can also be sent which can be used to update the
> > replication origin in apply background worker when the streaming transaction is
> > aborted.
> >
> > 4a.
> > Should this para also mention something about the introduction of
> > protocol version 4?
> >
> > 4b.
> > Should this para also mention that these extensions are not strictly
> > mandatory for the parallel streaming to still work?
> >
>
> Without parallel streaming/apply, we don't need to send this extra
> message. So, I don't think it will be correct to say that.

See my reply to 47a below.

>
> >
> > 46. src/backend/replication/logical/worker.c - apply_error_callback
> >
> > + if (errarg->remote_attnum < 0)
> > + {
> > + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> > + errcontext("processing remote data for replication origin \"%s\"
> > during \"%s\" for replication target relation \"%s.%s\" in transaction
> > %u",
> > +    errarg->origin_name,
> > +    logicalrep_message_type(errarg->command),
> > +    errarg->rel->remoterel.nspname,
> > +    errarg->rel->remoterel.relname,
> > +    errarg->remote_xid);
> > + else
> > + errcontext("processing remote data for replication origin \"%s\"
> > during \"%s\" for replication target relation \"%s.%s\" in transaction
> > %u finished at %X/%X",
> > +    errarg->origin_name,
> > +    logicalrep_message_type(errarg->command),
> > +    errarg->rel->remoterel.nspname,
> > +    errarg->rel->remoterel.relname,
> > +    errarg->remote_xid,
> > +    LSN_FORMAT_ARGS(errarg->finish_lsn));
> > + }
> > + else
> > + {
> > + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> > + errcontext("processing remote data for replication origin \"%s\"
> > during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> > in transaction %u",
> > +    errarg->origin_name,
> > +    logicalrep_message_type(errarg->command),
> > +    errarg->rel->remoterel.nspname,
> > +    errarg->rel->remoterel.relname,
> > +    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> > +    errarg->remote_xid);
> > + else
> > + errcontext("processing remote data for replication origin \"%s\"
> > during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> > in transaction %u finished at %X/%X",
> > +    errarg->origin_name,
> > +    logicalrep_message_type(errarg->command),
> > +    errarg->rel->remoterel.nspname,
> > +    errarg->rel->remoterel.relname,
> > +    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> > +    errarg->remote_xid,
> > +    LSN_FORMAT_ARGS(errarg->finish_lsn));
> > + }
> > + }
> >
> > Hou-san had asked [3](comment #14) me how the above code can be
> > shortened. Below is one idea, but maybe you won't like it ;-)
> >
> > #define MSG_O_T_S_R "processing remote data for replication origin
> > \"%s\" during \"%s\" for replication target relation \"%s.%s\" "
> > #define O_T_S_R\
> > errarg->origin_name,\
> > logicalrep_message_type(errarg->command),\
> > errarg->rel->remoterel.nspname,\
> > errarg->rel->remoterel.relname
> >
> > if (errarg->remote_attnum < 0)
> > {
> > if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> > errcontext(MSG_O_T_S_R "in transaction %u",
> >    O_T_S_R,
> >    errarg->remote_xid);
> > else
> > errcontext(MSG_O_T_S_R "in transaction %u finished at %X/%X",
> >    O_T_S_R,
> >    errarg->remote_xid,
> >    LSN_FORMAT_ARGS(errarg->finish_lsn));
> > }
> > else
> > {
> > if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> > errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u",
> >    O_T_S_R,
> >    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> >    errarg->remote_xid);
> > else
> > errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u finished at %X/%X",
> >    O_T_S_R,
> >    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> >    errarg->remote_xid,
> >    LSN_FORMAT_ARGS(errarg->finish_lsn));
> > }
> > #undef O_T_S_R
> > #undef MSG_O_T_S_R
> >
> > ======
> >
>
> I don't like this much. I think this reduces readability.

I agree. That wasn't a very serious suggestion :-)

>
> > 47. src/include/replication/logicalproto.h
> >
> > @@ -32,12 +32,17 @@
> >   *
> >   * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
> >   * support for two-phase commit decoding (at prepare time). Introduced in PG15.
> > + *
> > + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
> > + * with support for streaming large transactions using apply background
> > + * workers. Introduced in PG16.
> >   */
> >  #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
> >  #define LOGICALREP_PROTO_VERSION_NUM 1
> >  #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
> >  #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
> > -#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
> > +#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
> > +#define LOGICALREP_PROTO_MAX_VERSION_NUM
> > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
> >
> > 47a.
> > I don't think that comment is strictly true. IIUC the new protocol
> > version 4 is currently only affecting the *extra* STREAM_ABORT members
> > – but in fact streaming=parallel is still functional without using
> > those extra members, isn't it? So maybe this description needed to be
> > modified a bit to be more accurate?
> >
>
> The reason for sending this extra abort members is to ensure that
> after aborting the transaction, if the subscriber/apply worker
> restarts, it doesn't need to request the transaction again. Do you
> have suggestions for improving this comment?
>

I gave three review comments for v21-0001 that were all related to
this same point:
i- #4b (commit message)
ii- #7 (protocol pgdocs)
iii- #47a (code comment)

The point was:
AFAIK protocol 4 is only to let the parallel streaming logic behave
*better* in how it can handle restarts after aborts. But that does not
mean that protocol 4 is a *pre-requisite* for "allowing"
streaming=parallel to work in the first place. I thought that a PG15
publisher and PG16 subscriber can still work using streaming=parallel
even with protocol 3, but it just won't be quite as clever for
handling restarts after abort as protocol 4 (PG16 -> PG16) would be.

If the above is correct, then the code comment can be changed to
something like this:

BEFORE
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol
version with support for streaming large transactions using apply
background workers. Introduced in PG16.

AFTER
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM improves how subscription
parameter streaming=parallel (introduced in PG16) will handle restarts
after aborts. Introduced in PG16.

~

The protocol pgdocs might be changed similarly...

BEFORE
Version <literal>4</literal> is supported only for server version 16
and above, and it allows applying stream of large in-progress
transactions in parallel.

AFTER
Version <literal>4</literal> is supported only for server version 16
and above, and it improves how subscription parameter
streaming=parallel (introduced in PG16) will handle restarts after
aborts.

~~

And similar text again for the commit message...

------
Kind Regards,
Peter Smith.
Fujitsu Australia.



pgsql-hackers by date:

Previous
From: Dagfinn Ilmari Mannsåker
Date:
Subject: Re: Add support for DEFAULT specification in COPY FROM
Next
From: Amit Kapila
Date:
Subject: Re: Perform streaming logical transactions by background workers and parallel apply