Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAHut+PuVjRgGr4saN7qwq0oB8DANHVR7UfDiciB1Q3cYN54F6A@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
Here are some review comments for patch v30-0001.

======

1. Commit message

In addition, the patch extends the logical replication STREAM_ABORT message so
that abort_time and abort_lsn can also be sent which can be used to update the
replication origin in parallel apply worker when the streaming transaction is
aborted. Because this message extension is needed to support parallel
streaming, meaning that parallel streaming is not supported for publications on
servers < PG16.

"meaning that parallel streaming is not supported" -> "parallel
streaming is not supported"

======

2. doc/src/sgml/logical-replication.sgml

@@ -1611,8 +1622,12 @@ CONTEXT:  processing remote data for
replication origin "pg_16395" during "INSER
    to the subscriber, plus some reserve for table synchronization.
    <varname>max_logical_replication_workers</varname> must be set to at least
    the number of subscriptions, again plus some reserve for the table
-   synchronization.  Additionally the <varname>max_worker_processes</varname>
-   may need to be adjusted to accommodate for replication workers, at least
+   synchronization. In addition, if the subscription parameter
+   <literal>streaming</literal> is set to <literal>parallel</literal>, please
+   increase <literal>max_logical_replication_workers</literal> according to
+   the desired number of parallel apply workers.  Additionally the
+   <varname>max_worker_processes</varname> may need to be adjusted to
+   accommodate for replication workers, at least
    (<varname>max_logical_replication_workers</varname>
    + <literal>1</literal>).  Note that some extensions and parallel queries
    also take worker slots from <varname>max_worker_processes</varname>.

IMO it looks a bit strange to have "In addition" followed by "Additionally".

Also, "to accommodate for replication workers"? seems like a typo (but
it is not caused by your patch)

BEFORE
In addition, if the subscription parameter streaming is set to
parallel, please increase max_logical_replication_workers according to
the desired number of parallel apply workers.

AFTER (???)
If the subscription parameter streaming is set to parallel,
max_logical_replication_workers should be increased according to the
desired number of parallel apply workers.

======

3. .../replication/logical/applyparallelworker.c - parallel_apply_can_start

+/*
+ * Returns true, if it is allowed to start a parallel apply worker, false,
+ * otherwise.
+ */
+static bool
+parallel_apply_can_start(TransactionId xid)

Seems a slightly complicated comment for a simple boolean function.

SUGGESTION
Returns true/false if it is OK to start a parallel apply worker.

======

4. .../replication/logical/applyparallelworker.c - parallel_apply_free_worker

+ winfo->in_use = false;
+
+ /* Are there enough workers in the pool? */
+ if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
+ {

I felt the comment/logic about "enough" needs a bit more description.
At least it should say to refer to the more detailed explanation atop
worker.c

======

5. .../replication/logical/applyparallelworker.c - parallel_apply_setup_dsm

+ /*
+ * Estimate how much shared memory we need.
+ *
+ * Because the TOC machinery may choose to insert padding of oddly-sized
+ * requests, we must estimate each chunk separately.
+ *
+ * We need one key to register the location of the header, and we need two
+ * other keys to track of the locations of the message queue and the error
+ * message queue.
+ */

"track of" -> "keep track of" ?

======

6. src/backend/replication/logical/launcher.c  - logicalrep_worker_detach

 logicalrep_worker_detach(void)
 {
+ /* Stop the parallel apply workers. */
+ if (!am_parallel_apply_worker() && !am_tablesync_worker())
+ {
+ List    *workers;
+ ListCell   *lc;

The condition is not very obvious. This is why I previously suggested
adding another macro/function like 'isLeaderApplyWorker'. In the
absence of that, then I think the comment needs to be more
descriptive.

SUGGESTION
If this is the leader apply worker then stop the parallel apply workers.

======

7. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort

 void
 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
-   TransactionId subxid)
+   TransactionId subxid, XLogRecPtr abort_lsn,
+   TimestampTz abort_time, bool abort_info)
 {
  pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);

@@ -1175,19 +1179,40 @@ logicalrep_write_stream_abort(StringInfo out,
TransactionId xid,
  /* transaction ID */
  pq_sendint32(out, xid);
  pq_sendint32(out, subxid);
+
+ if (abort_info)
+ {
+ pq_sendint64(out, abort_lsn);
+ pq_sendint64(out, abort_time);
+ }


The new param name 'abort_info' seems misleading.

Maybe a name like 'write_abort_info' is better?

~~~

8. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort

+logicalrep_read_stream_abort(StringInfo in,
+ LogicalRepStreamAbortData *abort_data,
+ bool read_abort_lsn)
 {
- Assert(xid && subxid);
+ Assert(abort_data);
+
+ abort_data->xid = pq_getmsgint(in, 4);
+ abort_data->subxid = pq_getmsgint(in, 4);

- *xid = pq_getmsgint(in, 4);
- *subxid = pq_getmsgint(in, 4);
+ if (read_abort_lsn)
+ {
+ abort_data->abort_lsn = pq_getmsgint64(in);
+ abort_data->abort_time = pq_getmsgint64(in);
+ }

This name 'read_abort_lsn' is inconsistent with the 'abort_info' of
the logicalrep_write_stream_abort.

I suggest change these to 'read_abort_info/write_abort_info'

======

9. src/backend/replication/logical/worker.c - file header comment

+ * information is added to the ParallelApplyWorkersList. Once the worker
+ * finishes applying the transaction, we mark it available for use. Now,
+ * before starting a new worker to apply the streaming transaction, we check
+ * the list and use any worker, if available. Note that we maintain a maximum

9a.
"available for use." -> "available for re-use."

~

9b.
"we check the list and use any worker, if available" -> "we check the
list for any available worker"

~~~

10. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /* write the change to the current file */
+ stream_write_change(action, s);
+ return true;

Uppercase the comment.

~~~

11. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+ LogicalRepStreamAbortData abort_data;
+ bool read_abort_lsn = false;
+ ParallelApplyWorkerInfo *winfo = NULL;
+ TransApplyAction apply_action;

The variable 'read_abort_lsn' name ought to be changed to match
consistently the parameter name.

======

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort

@@ -1843,6 +1850,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    XLogRecPtr abort_lsn)
 {
  ReorderBufferTXN *toptxn;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ bool abort_info = (data->streaming == SUBSTREAM_PARALLEL);

The variable 'abort_info' name ought to be changed to be
'write_abort_info' (as suggested above) to match consistently the
parameter name.

======

13. src/include/replication/worker_internal.h

+ /*
+ * Indicates whether the worker is available to be used for parallel apply
+ * transaction?
+ */
+ bool in_use;

This comment seems backward for this member's name.

SUGGESTION (something like...)
Indicates whether this ParallelApplyWorkerInfo is currently being used
by a parallel apply worker processing a transaction. (If this flag is
false then it means the ParallelApplyWorkerInfo is available for
re-use by another parallel apply worker.)


------
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Alena Rybakina
Date:
Subject: Re: RFC: Logging plan of the running query
Next
From: Thomas Munro
Date:
Subject: Re: SLRUs in the main buffer pool, redux