RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From wangw.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS3PR01MB6275728D53FF4D5165D2C14A9E499@OS3PR01MB6275.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("kuroda.hayato@fujitsu.com" <kuroda.hayato@fujitsu.com>)
List pgsql-hackers
On Mon, Sep 12, 2022 at 18:58 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:
> Dear Hou-san,
> 
> Thank you for updating the patch! Followings are comments for v28-0001.
> I will dig your patch more, but I send partially to keep the activity of the thread.

Thanks for your comments.

> ===
> For applyparallelworker.c
> 
> 01. filename
> The word-ordering of filename seems not good
> because you defined the new worker as "parallel apply worker".

As the Amit said, keep it consistent with other file name format.

> 02. global variable
> 
> ```
> +/* Parallel apply workers hash table (initialized on first use). */
> +static HTAB *ParallelApplyWorkersHash = NULL;
> +
> +/*
> + * List that stores the information of parallel apply workers that were
> + * started. Newly added worker information will be removed from the list at
> the
> + * end of the transaction when there are enough workers in the pool. Besides,
> + * exited workers will be removed from the list after being detected.
> + */
> +static List *ParallelApplyWorkersList = NIL;
> ```
> 
> Could you add descriptions about difference between the list and hash table?
> IIUC the Hash stores the parallel workers that
> are assigned to transacitons, and the list stores all alive ones.

Did some modifications to the comments above ParallelApplyWorkersList.
And I think we could know the difference between these two variables by
referring to the functions parallel_apply_start_worker and
parallel_apply_free_worker.

> 03. parallel_apply_find_worker
> 
> ```
> +       /* Return the cached parallel apply worker if valid. */
> +       if (stream_apply_worker != NULL)
> +               return stream_apply_worker;
> ```
> 
> This is just a question -
> Why the given xid and the assigned xid to the worker are not checked here?
> Is there chance to find wrong worker?

I think it is okay to not check the worker's xid here.
Please refer to the comments above `stream_apply_worker`.
"stream_apply_worker" will only be returned during a stream block, which means
the xid is the same as the xid in the STREAM_START message.

> 04. parallel_apply_start_worker
> 
> ```
> +/*
> + * Start a parallel apply worker that will be used for the specified xid.
> + *
> + * If a parallel apply worker is not in use then re-use it, otherwise start a
> + * fresh one. Cache the worker information in ParallelApplyWorkersHash
> keyed by
> + * the specified xid.
> + */
> +void
> +parallel_apply_start_worker(TransactionId xid)
> ```
> 
> "parallel_apply_start_worker" should be "start_parallel_apply_worker", I think

For code readability, similar functions are named in this format:
`parallel_apply_.*_worker`.

> 05. parallel_apply_stream_abort
> 
> ```
>         for (i = list_length(subxactlist) - 1; i >= 0; i--)
>         {
>             xid = list_nth_xid(subxactlist, i);
>             if (xid == subxid)
>             {
>                 found = true;
>                 break;
>             }
>         }
> ```
> 
> Please not reuse the xid, declare and use another variable in the else block or
> something.

Added a temporary variable "xid_tmp" inside the for-statement.

> 06. parallel_apply_free_worker
> 
> ```
> +       if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
> +       {
> ```
> 
> Please add a comment like: "Do we have enough workers in the pool?" or
> something.

Added the following comment according to your suggestion:
`Are there enough workers in the pool?`

> For worker.c
> 
> 07. general
> 
> In many lines if-else statement is used for apply_action, but I think they should
> rewrite as switch-case statement.

Changed.

> 08. global variable
> 
> ```
> -static bool in_streamed_transaction = false;
> +bool in_streamed_transaction = false;
> ```
> 
> a.
> 
> It seems that in_streamed_transaction is used only in the worker.c, so we can
> change to stati variable.
> 
> b.
> 
> That flag is set only when an apply worker spill the transaction to the disk.
> How about "in_streamed_transaction" -> "in_spilled_transaction"?

=>8a.
Improved.

=>8b.
I am not sure if we could rename this existing variable for this. So I kept the
name.

> 09.  apply_handle_stream_prepare
> 
> ```
> -       elog(DEBUG1, "received prepare for streamed transaction %u",
> prepare_data.xid);
> ```
> 
> I think this debug message is still useful.

Since I think it is not appropriate to log the xid here, added back the
following message: `finished processing the transaction finish command`.

> 10. apply_handle_stream_stop
> 
> ```
> +       if (apply_action == TA_APPLY_IN_PARALLEL_WORKER)
> +       {
> +               pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
> +       }
> +       else if (apply_action == TA_SEND_TO_PARALLEL_WORKER)
> +       {
> ```
> 
> The ordering of the STREAM {STOP, START} is checked only when an apply
> worker spill the transaction to the disk.
> (This is done via in_streamed_transaction)
> I think checks should be added here, like if (!stream_apply_worker) or
> something.
>
> 11. apply_handle_stream_abort
> 
> ```
> +       if (in_streamed_transaction)
> +               ereport(ERROR,
> +                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
> +                                errmsg_internal("STREAM ABORT message without STREAM
> STOP")));
> ```
> 
> I think the check by stream_apply_worker should be added.

Because "in_streamed_transaction" is only used for non-parallel apply.
So I used stream_apply_worker to confirm the ordering of the STREAM {STOP,
START}.
BTW, I move the reset of in_streamed_transaction into the block of
`else if (apply_action == TA_SERIALIZE_TO_FILE)`.

> 12. apply_handle_stream_commit
> 
> a.
> 
> ```
>     if (in_streamed_transaction)
>         ereport(ERROR,
>                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
>                  errmsg_internal("STREAM COMMIT message
> without STREAM STOP")));
> ```
> 
> I think the check by stream_apply_worker should be added.
> 
> b.
> 
> ```
> -       elog(DEBUG1, "received commit for streamed transaction %u", xid);
> ```
> 
> I think this debug message is still useful.

=>12a.
See the reply to #10 && #11.

=>12b.
See the reply to #09.

> ===
> For launcher.c
> 
> 13. logicalrep_worker_stop_by_slot
> 
> ```
> +       LogicalRepWorker *worker = &LogicalRepCtx->workers[slot_no];
> +
> +       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +
> +       /* Return if the generation doesn't match or the worker is not alive. */
> +       if (worker->generation != generation ||
> +               worker->proc == NULL)
> +               return;
> +
> ```
> 
> a.
> 
> LWLockAcquire(LogicalRepWorkerLock) is needed before reading slots.
> 
> b.
> 
> LWLockRelease(LogicalRepWorkerLock) is needed even if worker is not found.

Fixed.

The new patches were attached in [1].

[1] -
https://www.postgresql.org/message-id/OS3PR01MB6275F145878B4A44586C46CE9E499%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

pgsql-hackers by date:

Previous
From: "wangw.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: "wangw.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply