RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57167C91F40088516C3C21D2941A9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Tue, Dec 6, 2022 7:57 AM Peter Smith <smithpb2250@gmail.com> wrote:
> Here are my review comments for patch v55-0002

Thansk for your comments.

> ======
> 
> .../replication/logical/applyparallelworker.c
> 
> 1. pa_can_start
> 
> @@ -276,9 +278,9 @@ pa_can_start(TransactionId xid)
>   /*
>   * Don't start a new parallel worker if user has set skiplsn as it's
>   * possible that user want to skip the streaming transaction. For
> - * streaming transaction, we need to spill the transaction to disk so 
> that
> - * we can get the last LSN of the transaction to judge whether to 
> skip
> - * before starting to apply the change.
> + * streaming transaction, we need to serialize the transaction to a 
> + file
> + * so that we can get the last LSN of the transaction to judge 
> + whether to
> + * skip before starting to apply the change.
>   */
>   if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
>   return false;
> 
> I think the wording change may belong in patch 0001 because it has 
> nothing to do with partial serializing.

Changed.

> ~~~
> 
> 2. pa_free_worker
> 
> + /*
> + * Stop the worker if there are enough workers in the pool.
> + *
> + * XXX The worker is also stopped if the leader apply worker needed 
> + to
> + * serialize part of the transaction data due to a send timeout. This 
> + is
> + * because the message could be partially written to the queue due to 
> + send
> + * timeout and there is no way to clean the queue other than 
> + resending the
> + * message until it succeeds. To avoid complexity, we directly stop 
> + the
> + * worker in this case.
> + */
> + if (winfo->serialize_changes ||
> + napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
> 
> Don't need to say "due to send timeout" 2 times in 2 sentences.
> 
> SUGGESTION
> XXX The worker is also stopped if the leader apply worker needed to 
> serialize part of the transaction data due to a send timeout. This is 
> because the message could be partially written to the queue but there 
> is no way to clean the queue other than resending the message until it 
> succeeds. Directly stopping the worker avoids needing this complexity.

Changed.

> 4.
> 
>  /*
> + * Replay the spooled messages in the parallel apply worker if the 
> +leader apply
> + * worker has finished serializing changes to the file.
> + */
> +static void
> +pa_spooled_messages(void)
> 
> I'm not 100% sure of the logic, so IMO maybe the comment should say a 
> bit more about how this works:
> 
> Specifically, let's say there was some timeout and the LA needed to 
> write the spool file, then let's say the PA timed out and found itself 
> inside this function. Now, let's say the LA is still busy writing the 
> file -- so what happens next?
> 
> Does this function simply return, then the main PA loop waits again, 
> then the times out again, then PA finds itself back inside this 
> function again... and that keeps happening over and over until 
> eventually the spool file is found FS_READY? Some explanatory comments 
> might help.

Slightly changed the logic and comment here.

> ~
> 
> 5.
> 
> + /*
> + * Check if changes have been serialized to a file. if so, read and 
> + apply
> + * them.
> + */
> + SpinLockAcquire(&MyParallelShared->mutex);
> + fileset_state = MyParallelShared->fileset_state; 
> + SpinLockRelease(&MyParallelShared->mutex);
> 
> "if so" -> "If so"

Changed.

> ~~~
> 
> 
> 6. pa_send_data
> 
> + *
> + * If the attempt to send data via shared memory times out, then we 
> + will
> switch
> + * to "PARTIAL_SERIALIZE mode" for the current transaction to prevent
> possible
> + * deadlocks with another parallel apply worker (refer to the 
> + comments atop
> + * applyparallelworker.c for details). This means that the current 
> + data and any
> + * subsequent data for this transaction will be serialized to a file.
>   */
>  void
>  pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void 
> *data)
> 
> SUGGESTION (minor comment rearranging)
> 
> If the attempt to send data via shared memory times out, then we will 
> switch to "PARTIAL_SERIALIZE mode" for the current transaction -- this 
> means that the current data and any subsequent data for this 
> transaction will be serialized to a file. This is done to prevent 
> possible deadlocks with another parallel apply worker (refer to the 
> comments atop applyparallelworker.c for details).

Changed.

> ~
> 
> 7.
> 
> + /*
> + * Take the stream lock to make sure that the parallel apply worker
> + * will wait for the leader to release the stream lock until the
> + * end of the transaction.
> + */
> + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
> 
> The comment doesn't sound right.
> 
> "until the end" -> "at the end" (??)

I think it means "PA wait ... until the end of transaction".

> ~~~
> 
> 8. pa_stream_abort
> 
> @@ -1374,6 +1470,7 @@ pa_stream_abort(LogicalRepStreamAbortData
> *abort_data)
>   RollbackToSavepoint(spname);
>   CommitTransactionCommand();
>   subxactlist = list_truncate(subxactlist, i + 1);
> +
>   break;
>   }
>   }
> Spurious whitespace unrelated to this patch?

Changed.

> ======
> 
> src/backend/replication/logical/worker.c
> 
> 9. handle_streamed_transaction
> 
>   /*
> + * The parallel apply worker needs the xid in this message to decide
> + * whether to define a savepoint, so save the original message that 
> + has not
> + * moved the cursor after the xid. We will serailize this message to 
> + a file
> + * in PARTIAL_SERIALIZE mode.
> + */
> + original_msg = *s;
> 
> "serailize" -> "serialize"

Changed.

> ~~~
> 
> 10. apply_handle_stream_prepare
> 
> @@ -1245,6 +1265,7 @@ apply_handle_stream_prepare(StringInfo s)
>   LogicalRepPreparedTxnData prepare_data;
>   ParallelApplyWorkerInfo *winfo;
>   TransApplyAction apply_action;
> + StringInfoData original_msg = *s;
> 
> Should this include a longer explanation of why this copy is needed 
> (same as was done in handle_streamed_transaction)?

Added the blow comment atop this variable.
```
Save the message before it is consumed.
```

> ~
> 
> 11.
> 
>   case TRANS_PARALLEL_APPLY:
> +
> + /*
> + * Close the file before committing if the parallel apply worker
> + * is applying spooled messages.
> + */
> + if (stream_fd)
> + stream_close_file();
> 
> 11a.
> 
> This comment seems worded backwards.
> 
> SUGGESTION
> If the parallel apply worker is applying spooled messages then close 
> the file before committing.

Changed.

> ~
> 
> 11b.
> 
> I'm confused - isn't there code doing exactly this (close file before
> commit) already in the apply_handle_stream_commit 
> TRANS_PARALLEL_APPLY?

I think here is a typo.
Changed the action in the comment. (committing -> preparing)

> ~
> 
> 13.
> 
> + serialize_stream_start(stream_xid, false); 
> + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
> 
> - end_replication_step();
>   break;
> 
> A spurious blank line is left before the break;

Changed.

> ~~~
> 
> 14. serialize_stream_stop
> 
> + /* We must be in a valid transaction state */ 
> + Assert(IsTransactionState());
> 
> The comment seems redundant. The code says the same.

Changed.

> ~
> 
> 17.
> 
> + /*
> + * No need to output the DEBUG message here in the parallel apply
> + * worker as similar messages will be output when handling 
> + STREAM_STOP
> + * message.
> + */
> + if (!am_parallel_apply_worker() && nchanges % 1000 == 0)
>   elog(DEBUG1, "replayed %d changes from file \"%s\"",
>   nchanges, path);
> 
> Instead of saying what you are not doing  ("No need to... in output 
> apply worker") wouldn't it make more sense to reverse it and say what 
> you are doing ("Only log DEBUG messages for the leader apply worker 
> because ...") and then the condition also becomes positive:
> 
> if (am_leader_apply_worker())
> {
> ...
> }

Removed this condition according to Amit's comment.

> ~
> 
> 18.
> 
> + if (am_parallel_apply_worker() &&
> + MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED)
> + goto done;
> +
> + /*
> + * No need to output the DEBUG message here in the parallel apply
> + * worker as similar messages will be output when handling 
> + STREAM_STOP
> + * message.
> + */
> + if (!am_parallel_apply_worker() && nchanges % 1000 == 0)
>   elog(DEBUG1, "replayed %d changes from file \"%s\"",
>   nchanges, path);
>   }
> 
> - BufFileClose(fd);
> -
> + stream_close_file();
>   pfree(buffer);
>   pfree(s2.data);
> 
> +done:
>   elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
>   nchanges, path);
> 
> Shouldn't that "done:" label be *above* the pfree's. Otherwise, those 
> are going to be skipped over by the "goto done;".

After reconsidering, I think there is no need to 'pfree' these two variables here,
because they are allocated in toplevel transaction's context and will be freed very soon.
So, I just removed these pfree().

> ======
> 
> src/include/replication/worker_internal.h
> 
> 21. PartialFileSetState
> 
> 
> + * State of fileset in leader apply worker.
> + *
> + * FS_BUSY means that the leader is serializing changes to the file. 
> +FS_READY
> + * means that the leader has serialized all changes to the file and 
> +the file is
> + * ready to be read by a parallel apply worker.
> + */
> +typedef enum PartialFileSetState
> 
> "ready to be read" sounded a bit strange.
> 
> SUGGESTION
> ... to the file so it is now OK for a parallel apply worker to read it.

Changed.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: Kyotaro Horiguchi
Date:
Subject: Re: Time delayed LR (WAS Re: logical replication restrictions)
Next
From: Andres Freund
Date:
Subject: Re: meson PGXS compatibility