RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | wangw.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS3PR01MB627567CF4B96A23DAFAF77269E4E9@OS3PR01MB6275.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Peter Smith <smithpb2250@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
On Wed, Sep 21, 2022 at 17:25 PM Peter Smith <smithpb2250@gmail.com> wrote: > Here are some review comments for patch v30-0001. Thanks for your comments. > ====== > > 1. Commit message > > In addition, the patch extends the logical replication STREAM_ABORT message > so > that abort_time and abort_lsn can also be sent which can be used to update the > replication origin in parallel apply worker when the streaming transaction is > aborted. Because this message extension is needed to support parallel > streaming, meaning that parallel streaming is not supported for publications on > servers < PG16. > > "meaning that parallel streaming is not supported" -> "parallel > streaming is not supported" Improved as suggested. > ====== > > 2. doc/src/sgml/logical-replication.sgml > > @@ -1611,8 +1622,12 @@ CONTEXT: processing remote data for > replication origin "pg_16395" during "INSER > to the subscriber, plus some reserve for table synchronization. > <varname>max_logical_replication_workers</varname> must be set to at > least > the number of subscriptions, again plus some reserve for the table > - synchronization. Additionally the > <varname>max_worker_processes</varname> > - may need to be adjusted to accommodate for replication workers, at least > + synchronization. In addition, if the subscription parameter > + <literal>streaming</literal> is set to <literal>parallel</literal>, please > + increase <literal>max_logical_replication_workers</literal> according to > + the desired number of parallel apply workers. Additionally the > + <varname>max_worker_processes</varname> may need to be adjusted to > + accommodate for replication workers, at least > (<varname>max_logical_replication_workers</varname> > + <literal>1</literal>). Note that some extensions and parallel queries > also take worker slots from <varname>max_worker_processes</varname>. > > IMO it looks a bit strange to have "In addition" followed by "Additionally". > > Also, "to accommodate for replication workers"? seems like a typo (but > it is not caused by your patch) > > BEFORE > In addition, if the subscription parameter streaming is set to > parallel, please increase max_logical_replication_workers according to > the desired number of parallel apply workers. > > AFTER (???) > If the subscription parameter streaming is set to parallel, > max_logical_replication_workers should be increased according to the > desired number of parallel apply workers. => Reword Improved as suggested. => typo? Sorry, I am not sure. Do you mean s/replication workers/workers for subscriptions/ or something else? I think we should improve it in a new thread. > ====== > > 4. .../replication/logical/applyparallelworker.c - parallel_apply_free_worker > > + winfo->in_use = false; > + > + /* Are there enough workers in the pool? */ > + if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) > + { > > I felt the comment/logic about "enough" needs a bit more description. > At least it should say to refer to the more detailed explanation atop > worker.c Added related comment atop this function. > ====== > > 5. .../replication/logical/applyparallelworker.c - parallel_apply_setup_dsm > > + /* > + * Estimate how much shared memory we need. > + * > + * Because the TOC machinery may choose to insert padding of oddly-sized > + * requests, we must estimate each chunk separately. > + * > + * We need one key to register the location of the header, and we need two > + * other keys to track of the locations of the message queue and the error > + * message queue. > + */ > > "track of" -> "keep track of" ? Improved. > ====== > > 6. src/backend/replication/logical/launcher.c - logicalrep_worker_detach > > logicalrep_worker_detach(void) > { > + /* Stop the parallel apply workers. */ > + if (!am_parallel_apply_worker() && !am_tablesync_worker()) > + { > + List *workers; > + ListCell *lc; > > The condition is not very obvious. This is why I previously suggested > adding another macro/function like 'isLeaderApplyWorker'. In the > absence of that, then I think the comment needs to be more > descriptive. > > SUGGESTION > If this is the leader apply worker then stop the parallel apply workers. Added the new function am_leader_apply_worker. > ====== > > 7. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort > > void > logicalrep_write_stream_abort(StringInfo out, TransactionId xid, > - TransactionId subxid) > + TransactionId subxid, XLogRecPtr abort_lsn, > + TimestampTz abort_time, bool abort_info) > { > pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); > > @@ -1175,19 +1179,40 @@ logicalrep_write_stream_abort(StringInfo out, > TransactionId xid, > /* transaction ID */ > pq_sendint32(out, xid); > pq_sendint32(out, subxid); > + > + if (abort_info) > + { > + pq_sendint64(out, abort_lsn); > + pq_sendint64(out, abort_time); > + } > > > The new param name 'abort_info' seems misleading. > > Maybe a name like 'write_abort_info' is better? Improved as suggested. > ~~~ > > 8. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort > > +logicalrep_read_stream_abort(StringInfo in, > + LogicalRepStreamAbortData *abort_data, > + bool read_abort_lsn) > { > - Assert(xid && subxid); > + Assert(abort_data); > + > + abort_data->xid = pq_getmsgint(in, 4); > + abort_data->subxid = pq_getmsgint(in, 4); > > - *xid = pq_getmsgint(in, 4); > - *subxid = pq_getmsgint(in, 4); > + if (read_abort_lsn) > + { > + abort_data->abort_lsn = pq_getmsgint64(in); > + abort_data->abort_time = pq_getmsgint64(in); > + } > > This name 'read_abort_lsn' is inconsistent with the 'abort_info' of > the logicalrep_write_stream_abort. > > I suggest change these to 'read_abort_info/write_abort_info' Improved as suggested. > ====== > > 9. src/backend/replication/logical/worker.c - file header comment > > + * information is added to the ParallelApplyWorkersList. Once the worker > + * finishes applying the transaction, we mark it available for use. Now, > + * before starting a new worker to apply the streaming transaction, we check > + * the list and use any worker, if available. Note that we maintain a maximum > > 9a. > "available for use." -> "available for re-use." > > ~ > > 9b. > "we check the list and use any worker, if available" -> "we check the > list for any available worker" Improved as suggested. > ~~~ > > 10. src/backend/replication/logical/worker.c - handle_streamed_transaction > > + /* write the change to the current file */ > + stream_write_change(action, s); > + return true; > > Uppercase the comment. Improved as suggested. > ~~~ > > 11. src/backend/replication/logical/worker.c - apply_handle_stream_abort > > +static void > +apply_handle_stream_abort(StringInfo s) > +{ > + TransactionId xid; > + TransactionId subxid; > + LogicalRepStreamAbortData abort_data; > + bool read_abort_lsn = false; > + ParallelApplyWorkerInfo *winfo = NULL; > + TransApplyAction apply_action; > > The variable 'read_abort_lsn' name ought to be changed to match > consistently the parameter name. Improved as suggested. > ====== > > 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort > > @@ -1843,6 +1850,8 @@ pgoutput_stream_abort(struct > LogicalDecodingContext *ctx, > XLogRecPtr abort_lsn) > { > ReorderBufferTXN *toptxn; > + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + bool abort_info = (data->streaming == SUBSTREAM_PARALLEL); > > The variable 'abort_info' name ought to be changed to be > 'write_abort_info' (as suggested above) to match consistently the > parameter name. Improved as suggested. Attach the new patch set. Regards, Wang wei
Attachment
- v33-0001-Perform-streaming-logical-transactions-by-parall.patch
- v33-0002-Test-streaming-parallel-option-in-tap-test.patch
- v33-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v33-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v33-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
pgsql-hackers by date: