RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From wangw.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS3PR01MB627567CF4B96A23DAFAF77269E4E9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Wed, Sep 21, 2022 at 17:25 PM Peter Smith <smithpb2250@gmail.com> wrote:
> Here are some review comments for patch v30-0001.

Thanks for your comments.

> ======
> 
> 1. Commit message
> 
> In addition, the patch extends the logical replication STREAM_ABORT message
> so
> that abort_time and abort_lsn can also be sent which can be used to update the
> replication origin in parallel apply worker when the streaming transaction is
> aborted. Because this message extension is needed to support parallel
> streaming, meaning that parallel streaming is not supported for publications on
> servers < PG16.
> 
> "meaning that parallel streaming is not supported" -> "parallel
> streaming is not supported"

Improved as suggested.

> ======
> 
> 2. doc/src/sgml/logical-replication.sgml
> 
> @@ -1611,8 +1622,12 @@ CONTEXT:  processing remote data for
> replication origin "pg_16395" during "INSER
>     to the subscriber, plus some reserve for table synchronization.
>     <varname>max_logical_replication_workers</varname> must be set to at
> least
>     the number of subscriptions, again plus some reserve for the table
> -   synchronization.  Additionally the
> <varname>max_worker_processes</varname>
> -   may need to be adjusted to accommodate for replication workers, at least
> +   synchronization. In addition, if the subscription parameter
> +   <literal>streaming</literal> is set to <literal>parallel</literal>, please
> +   increase <literal>max_logical_replication_workers</literal> according to
> +   the desired number of parallel apply workers.  Additionally the
> +   <varname>max_worker_processes</varname> may need to be adjusted to
> +   accommodate for replication workers, at least
>     (<varname>max_logical_replication_workers</varname>
>     + <literal>1</literal>).  Note that some extensions and parallel queries
>     also take worker slots from <varname>max_worker_processes</varname>.
> 
> IMO it looks a bit strange to have "In addition" followed by "Additionally".
> 
> Also, "to accommodate for replication workers"? seems like a typo (but
> it is not caused by your patch)
> 
> BEFORE
> In addition, if the subscription parameter streaming is set to
> parallel, please increase max_logical_replication_workers according to
> the desired number of parallel apply workers.
> 
> AFTER (???)
> If the subscription parameter streaming is set to parallel,
> max_logical_replication_workers should be increased according to the
> desired number of parallel apply workers.

=> Reword
Improved as suggested.

=> typo?
Sorry, I am not sure. Do you mean
s/replication workers/workers for subscriptions/  or something else?
I think we should improve it in a new thread.

> ======
> 
> 4. .../replication/logical/applyparallelworker.c - parallel_apply_free_worker
> 
> + winfo->in_use = false;
> +
> + /* Are there enough workers in the pool? */
> + if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
> + {
> 
> I felt the comment/logic about "enough" needs a bit more description.
> At least it should say to refer to the more detailed explanation atop
> worker.c

Added related comment atop this function.

> ======
> 
> 5. .../replication/logical/applyparallelworker.c - parallel_apply_setup_dsm
> 
> + /*
> + * Estimate how much shared memory we need.
> + *
> + * Because the TOC machinery may choose to insert padding of oddly-sized
> + * requests, we must estimate each chunk separately.
> + *
> + * We need one key to register the location of the header, and we need two
> + * other keys to track of the locations of the message queue and the error
> + * message queue.
> + */
> 
> "track of" -> "keep track of" ?

Improved.

> ======
> 
> 6. src/backend/replication/logical/launcher.c  - logicalrep_worker_detach
> 
>  logicalrep_worker_detach(void)
>  {
> + /* Stop the parallel apply workers. */
> + if (!am_parallel_apply_worker() && !am_tablesync_worker())
> + {
> + List    *workers;
> + ListCell   *lc;
> 
> The condition is not very obvious. This is why I previously suggested
> adding another macro/function like 'isLeaderApplyWorker'. In the
> absence of that, then I think the comment needs to be more
> descriptive.
> 
> SUGGESTION
> If this is the leader apply worker then stop the parallel apply workers.

Added the new function am_leader_apply_worker.

> ======
> 
> 7. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort
> 
>  void
>  logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
> -   TransactionId subxid)
> +   TransactionId subxid, XLogRecPtr abort_lsn,
> +   TimestampTz abort_time, bool abort_info)
>  {
>   pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
> 
> @@ -1175,19 +1179,40 @@ logicalrep_write_stream_abort(StringInfo out,
> TransactionId xid,
>   /* transaction ID */
>   pq_sendint32(out, xid);
>   pq_sendint32(out, subxid);
> +
> + if (abort_info)
> + {
> + pq_sendint64(out, abort_lsn);
> + pq_sendint64(out, abort_time);
> + }
> 
> 
> The new param name 'abort_info' seems misleading.
> 
> Maybe a name like 'write_abort_info' is better?

Improved as suggested.

> ~~~
> 
> 8. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort
> 
> +logicalrep_read_stream_abort(StringInfo in,
> + LogicalRepStreamAbortData *abort_data,
> + bool read_abort_lsn)
>  {
> - Assert(xid && subxid);
> + Assert(abort_data);
> +
> + abort_data->xid = pq_getmsgint(in, 4);
> + abort_data->subxid = pq_getmsgint(in, 4);
> 
> - *xid = pq_getmsgint(in, 4);
> - *subxid = pq_getmsgint(in, 4);
> + if (read_abort_lsn)
> + {
> + abort_data->abort_lsn = pq_getmsgint64(in);
> + abort_data->abort_time = pq_getmsgint64(in);
> + }
> 
> This name 'read_abort_lsn' is inconsistent with the 'abort_info' of
> the logicalrep_write_stream_abort.
> 
> I suggest change these to 'read_abort_info/write_abort_info'

Improved as suggested.

> ======
> 
> 9. src/backend/replication/logical/worker.c - file header comment
> 
> + * information is added to the ParallelApplyWorkersList. Once the worker
> + * finishes applying the transaction, we mark it available for use. Now,
> + * before starting a new worker to apply the streaming transaction, we check
> + * the list and use any worker, if available. Note that we maintain a maximum
> 
> 9a.
> "available for use." -> "available for re-use."
> 
> ~
> 
> 9b.
> "we check the list and use any worker, if available" -> "we check the
> list for any available worker"

Improved as suggested.

> ~~~
> 
> 10. src/backend/replication/logical/worker.c - handle_streamed_transaction
> 
> + /* write the change to the current file */
> + stream_write_change(action, s);
> + return true;
> 
> Uppercase the comment.

Improved as suggested.

> ~~~
> 
> 11. src/backend/replication/logical/worker.c - apply_handle_stream_abort
> 
> +static void
> +apply_handle_stream_abort(StringInfo s)
> +{
> + TransactionId xid;
> + TransactionId subxid;
> + LogicalRepStreamAbortData abort_data;
> + bool read_abort_lsn = false;
> + ParallelApplyWorkerInfo *winfo = NULL;
> + TransApplyAction apply_action;
> 
> The variable 'read_abort_lsn' name ought to be changed to match
> consistently the parameter name.

Improved as suggested.

> ======
> 
> 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort
> 
> @@ -1843,6 +1850,8 @@ pgoutput_stream_abort(struct
> LogicalDecodingContext *ctx,
>     XLogRecPtr abort_lsn)
>  {
>   ReorderBufferTXN *toptxn;
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + bool abort_info = (data->streaming == SUBSTREAM_PARALLEL);
> 
> The variable 'abort_info' name ought to be changed to be
> 'write_abort_info' (as suggested above) to match consistently the
> parameter name.

Improved as suggested.

Attach the new patch set.

Regards,
Wang wei

Attachment

pgsql-hackers by date:

Previous
From: "osumi.takamichi@fujitsu.com"
Date:
Subject: RE: Allow logical replication to copy tables in binary format
Next
From: "wangw.fnst@fujitsu.com"
Date:
Subject: RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher