Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAA4eK1KpuQAk_fiqVXy16WkDrKPBwA9E61VpvLfkse-o31NNVA@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Thu, Aug 18, 2022 at 11:59 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Here are my review comments for patch v21-0001:
>
> 4. Commit message
>
> In addition, the patch extends the logical replication STREAM_ABORT message so
> that abort_time and abort_lsn can also be sent which can be used to update the
> replication origin in apply background worker when the streaming transaction is
> aborted.
>
> 4a.
> Should this para also mention something about the introduction of
> protocol version 4?
>
> 4b.
> Should this para also mention that these extensions are not strictly
> mandatory for the parallel streaming to still work?
>

Without parallel streaming/apply, we don't need to send this extra
message. So, I don't think it will be correct to say that.

>
> 46. src/backend/replication/logical/worker.c - apply_error_callback
>
> + if (errarg->remote_attnum < 0)
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u finished at %X/%X",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->remote_xid,
> +    LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + else
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> +    errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u finished at %X/%X",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> +    errarg->remote_xid,
> +    LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + }
>
> Hou-san had asked [3](comment #14) me how the above code can be
> shortened. Below is one idea, but maybe you won't like it ;-)
>
> #define MSG_O_T_S_R "processing remote data for replication origin
> \"%s\" during \"%s\" for replication target relation \"%s.%s\" "
> #define O_T_S_R\
> errarg->origin_name,\
> logicalrep_message_type(errarg->command),\
> errarg->rel->remoterel.nspname,\
> errarg->rel->remoterel.relname
>
> if (errarg->remote_attnum < 0)
> {
> if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> errcontext(MSG_O_T_S_R "in transaction %u",
>    O_T_S_R,
>    errarg->remote_xid);
> else
> errcontext(MSG_O_T_S_R "in transaction %u finished at %X/%X",
>    O_T_S_R,
>    errarg->remote_xid,
>    LSN_FORMAT_ARGS(errarg->finish_lsn));
> }
> else
> {
> if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u",
>    O_T_S_R,
>    errarg->rel->remoterel.attnames[errarg->remote_attnum],
>    errarg->remote_xid);
> else
> errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u finished at %X/%X",
>    O_T_S_R,
>    errarg->rel->remoterel.attnames[errarg->remote_attnum],
>    errarg->remote_xid,
>    LSN_FORMAT_ARGS(errarg->finish_lsn));
> }
> #undef O_T_S_R
> #undef MSG_O_T_S_R
>
> ======
>

I don't like this much. I think this reduces readability.

> 47. src/include/replication/logicalproto.h
>
> @@ -32,12 +32,17 @@
>   *
>   * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
>   * support for two-phase commit decoding (at prepare time). Introduced in PG15.
> + *
> + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
> + * with support for streaming large transactions using apply background
> + * workers. Introduced in PG16.
>   */
>  #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
>  #define LOGICALREP_PROTO_VERSION_NUM 1
>  #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
>  #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
> -#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
> +#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
> +#define LOGICALREP_PROTO_MAX_VERSION_NUM
> LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
>
> 47a.
> I don't think that comment is strictly true. IIUC the new protocol
> version 4 is currently only affecting the *extra* STREAM_ABORT members
> – but in fact streaming=parallel is still functional without using
> those extra members, isn't it? So maybe this description needed to be
> modified a bit to be more accurate?
>

The reason for sending this extra abort members is to ensure that
after aborting the transaction, if the subscriber/apply worker
restarts, it doesn't need to request the transaction again. Do you
have suggestions for improving this comment?

>
> 52.
>
> +/* Apply background worker setup and interactions */
> +extern ApplyBgworkerInfo *apply_bgworker_start(TransactionId xid);
> +extern ApplyBgworkerInfo *apply_bgworker_find(TransactionId xid);
> +extern void apply_bgworker_wait_for(ApplyBgworkerInfo *wstate,
> + ApplyBgworkerStatus wait_for_status);
> +extern void apply_bgworker_send_data(ApplyBgworkerInfo *wstate, Size nbytes,
> + const void *data);
> +extern void apply_bgworker_free(ApplyBgworkerInfo *wstate);
> +extern void apply_bgworker_check_status(void);
> +extern void apply_bgworker_set_status(ApplyBgworkerStatus status);
> +extern void apply_bgworker_subxact_info_add(TransactionId current_xid);
> +extern void apply_bgworker_savepoint_name(Oid suboid, Oid relid,
> +   char *spname, int szsp);
>
> This big block of similarly named externs might as well be in
> alphabetical order instead of apparently random.
>

I think it is better to order them based on related functionality if
they are not already instead of using alphabetical order.

--
With Regards,
Amit Kapila.



pgsql-hackers by date:

Previous
From: Peter Smith
Date:
Subject: Re: shadow variables - pg15 edition
Next
From: Alvaro Herrera
Date:
Subject: Re: cataloguing NOT NULL constraints