RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57167C91F40088516C3C21D2941A9@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Peter Smith <smithpb2250@gmail.com>) |
List | pgsql-hackers |
On Tue, Dec 6, 2022 7:57 AM Peter Smith <smithpb2250@gmail.com> wrote: > Here are my review comments for patch v55-0002 Thansk for your comments. > ====== > > .../replication/logical/applyparallelworker.c > > 1. pa_can_start > > @@ -276,9 +278,9 @@ pa_can_start(TransactionId xid) > /* > * Don't start a new parallel worker if user has set skiplsn as it's > * possible that user want to skip the streaming transaction. For > - * streaming transaction, we need to spill the transaction to disk so > that > - * we can get the last LSN of the transaction to judge whether to > skip > - * before starting to apply the change. > + * streaming transaction, we need to serialize the transaction to a > + file > + * so that we can get the last LSN of the transaction to judge > + whether to > + * skip before starting to apply the change. > */ > if (!XLogRecPtrIsInvalid(MySubscription->skiplsn)) > return false; > > I think the wording change may belong in patch 0001 because it has > nothing to do with partial serializing. Changed. > ~~~ > > 2. pa_free_worker > > + /* > + * Stop the worker if there are enough workers in the pool. > + * > + * XXX The worker is also stopped if the leader apply worker needed > + to > + * serialize part of the transaction data due to a send timeout. This > + is > + * because the message could be partially written to the queue due to > + send > + * timeout and there is no way to clean the queue other than > + resending the > + * message until it succeeds. To avoid complexity, we directly stop > + the > + * worker in this case. > + */ > + if (winfo->serialize_changes || > + napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) > > Don't need to say "due to send timeout" 2 times in 2 sentences. > > SUGGESTION > XXX The worker is also stopped if the leader apply worker needed to > serialize part of the transaction data due to a send timeout. This is > because the message could be partially written to the queue but there > is no way to clean the queue other than resending the message until it > succeeds. Directly stopping the worker avoids needing this complexity. Changed. > 4. > > /* > + * Replay the spooled messages in the parallel apply worker if the > +leader apply > + * worker has finished serializing changes to the file. > + */ > +static void > +pa_spooled_messages(void) > > I'm not 100% sure of the logic, so IMO maybe the comment should say a > bit more about how this works: > > Specifically, let's say there was some timeout and the LA needed to > write the spool file, then let's say the PA timed out and found itself > inside this function. Now, let's say the LA is still busy writing the > file -- so what happens next? > > Does this function simply return, then the main PA loop waits again, > then the times out again, then PA finds itself back inside this > function again... and that keeps happening over and over until > eventually the spool file is found FS_READY? Some explanatory comments > might help. Slightly changed the logic and comment here. > ~ > > 5. > > + /* > + * Check if changes have been serialized to a file. if so, read and > + apply > + * them. > + */ > + SpinLockAcquire(&MyParallelShared->mutex); > + fileset_state = MyParallelShared->fileset_state; > + SpinLockRelease(&MyParallelShared->mutex); > > "if so" -> "If so" Changed. > ~~~ > > > 6. pa_send_data > > + * > + * If the attempt to send data via shared memory times out, then we > + will > switch > + * to "PARTIAL_SERIALIZE mode" for the current transaction to prevent > possible > + * deadlocks with another parallel apply worker (refer to the > + comments atop > + * applyparallelworker.c for details). This means that the current > + data and any > + * subsequent data for this transaction will be serialized to a file. > */ > void > pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void > *data) > > SUGGESTION (minor comment rearranging) > > If the attempt to send data via shared memory times out, then we will > switch to "PARTIAL_SERIALIZE mode" for the current transaction -- this > means that the current data and any subsequent data for this > transaction will be serialized to a file. This is done to prevent > possible deadlocks with another parallel apply worker (refer to the > comments atop applyparallelworker.c for details). Changed. > ~ > > 7. > > + /* > + * Take the stream lock to make sure that the parallel apply worker > + * will wait for the leader to release the stream lock until the > + * end of the transaction. > + */ > + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); > > The comment doesn't sound right. > > "until the end" -> "at the end" (??) I think it means "PA wait ... until the end of transaction". > ~~~ > > 8. pa_stream_abort > > @@ -1374,6 +1470,7 @@ pa_stream_abort(LogicalRepStreamAbortData > *abort_data) > RollbackToSavepoint(spname); > CommitTransactionCommand(); > subxactlist = list_truncate(subxactlist, i + 1); > + > break; > } > } > Spurious whitespace unrelated to this patch? Changed. > ====== > > src/backend/replication/logical/worker.c > > 9. handle_streamed_transaction > > /* > + * The parallel apply worker needs the xid in this message to decide > + * whether to define a savepoint, so save the original message that > + has not > + * moved the cursor after the xid. We will serailize this message to > + a file > + * in PARTIAL_SERIALIZE mode. > + */ > + original_msg = *s; > > "serailize" -> "serialize" Changed. > ~~~ > > 10. apply_handle_stream_prepare > > @@ -1245,6 +1265,7 @@ apply_handle_stream_prepare(StringInfo s) > LogicalRepPreparedTxnData prepare_data; > ParallelApplyWorkerInfo *winfo; > TransApplyAction apply_action; > + StringInfoData original_msg = *s; > > Should this include a longer explanation of why this copy is needed > (same as was done in handle_streamed_transaction)? Added the blow comment atop this variable. ``` Save the message before it is consumed. ``` > ~ > > 11. > > case TRANS_PARALLEL_APPLY: > + > + /* > + * Close the file before committing if the parallel apply worker > + * is applying spooled messages. > + */ > + if (stream_fd) > + stream_close_file(); > > 11a. > > This comment seems worded backwards. > > SUGGESTION > If the parallel apply worker is applying spooled messages then close > the file before committing. Changed. > ~ > > 11b. > > I'm confused - isn't there code doing exactly this (close file before > commit) already in the apply_handle_stream_commit > TRANS_PARALLEL_APPLY? I think here is a typo. Changed the action in the comment. (committing -> preparing) > ~ > > 13. > > + serialize_stream_start(stream_xid, false); > + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); > > - end_replication_step(); > break; > > A spurious blank line is left before the break; Changed. > ~~~ > > 14. serialize_stream_stop > > + /* We must be in a valid transaction state */ > + Assert(IsTransactionState()); > > The comment seems redundant. The code says the same. Changed. > ~ > > 17. > > + /* > + * No need to output the DEBUG message here in the parallel apply > + * worker as similar messages will be output when handling > + STREAM_STOP > + * message. > + */ > + if (!am_parallel_apply_worker() && nchanges % 1000 == 0) > elog(DEBUG1, "replayed %d changes from file \"%s\"", > nchanges, path); > > Instead of saying what you are not doing ("No need to... in output > apply worker") wouldn't it make more sense to reverse it and say what > you are doing ("Only log DEBUG messages for the leader apply worker > because ...") and then the condition also becomes positive: > > if (am_leader_apply_worker()) > { > ... > } Removed this condition according to Amit's comment. > ~ > > 18. > > + if (am_parallel_apply_worker() && > + MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED) > + goto done; > + > + /* > + * No need to output the DEBUG message here in the parallel apply > + * worker as similar messages will be output when handling > + STREAM_STOP > + * message. > + */ > + if (!am_parallel_apply_worker() && nchanges % 1000 == 0) > elog(DEBUG1, "replayed %d changes from file \"%s\"", > nchanges, path); > } > > - BufFileClose(fd); > - > + stream_close_file(); > pfree(buffer); > pfree(s2.data); > > +done: > elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", > nchanges, path); > > Shouldn't that "done:" label be *above* the pfree's. Otherwise, those > are going to be skipped over by the "goto done;". After reconsidering, I think there is no need to 'pfree' these two variables here, because they are allocated in toplevel transaction's context and will be freed very soon. So, I just removed these pfree(). > ====== > > src/include/replication/worker_internal.h > > 21. PartialFileSetState > > > + * State of fileset in leader apply worker. > + * > + * FS_BUSY means that the leader is serializing changes to the file. > +FS_READY > + * means that the leader has serialized all changes to the file and > +the file is > + * ready to be read by a parallel apply worker. > + */ > +typedef enum PartialFileSetState > > "ready to be read" sounded a bit strange. > > SUGGESTION > ... to the file so it is now OK for a parallel apply worker to read it. Changed. Best regards, Hou zj
pgsql-hackers by date: