RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB5716C10E35097026E12BB98A94019@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("Hayato Kuroda (Fujitsu)" <kuroda.hayato@fujitsu.com>)
List pgsql-hackers
On Monday, November 7, 2022 7:43 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:
> 
> Dear Hou,
> 
> The followings are my comments. I want to consider the patch more, but I sent
> it once.

Thanks for the comments.

> 
> ===
> worker.c
> 
> 01. typedef enum TransApplyAction
> 
> ```
> /*
>  * What action to take for the transaction.
>  *
>  * TRANS_LEADER_APPLY means that we are in the leader apply worker and
> changes
>  * of the transaction are applied directly in the worker.
>  *
>  * TRANS_LEADER_SERIALIZE means that we are in the leader apply worker or
> table
>  * sync worker. Changes are written to temporary files and then applied when
>  * the final commit arrives.
>  *
>  * TRANS_LEADER_SEND_TO_PARALLEL means that we are in the leader apply
> worker
>  * and need to send the changes to the parallel apply worker.
>  *
>  * TRANS_PARALLEL_APPLY means that we are in the parallel apply worker and
>  * changes of the transaction are applied directly in the worker.
>  */
> ```
> 
> TRANS_LEADER_PARTIAL_SERIALIZE should be listed in.
> 

Added.

> 02. handle_streamed_transaction()
> 
> ```
> +       StringInfoData  origin_msg;
> ...
> +       origin_msg = *s;
> ...
> +                               /* Write the change to the current file */
> +                               stream_write_change(action,
> +
> apply_action == TRANS_LEADER_SERIALIZE ?
> +
> + s : &origin_msg);
> ```
> 
> I'm not sure why origin_msg is needed. Can we remove the conditional
> operator?

Currently, the parallel apply worker would need the transaction xid of this change to
define savepoint. So, it need to write the original message to file.

> 
> 03. apply_handle_stream_start()
> 
> ```
> + * XXX We can avoid sending pairs of the START/STOP messages to the
> + parallel
> + * worker because unlike apply worker it will process only one
> + transaction at a
> + * time. However, it is not clear whether any optimization is
> + worthwhile
> + * because these messages are sent only when the
> + logical_decoding_work_mem
> + * threshold is exceeded.
> ```
> 
> This comment should be modified because PA must acquire and release locks at
> that time.
> 
> 
> 04. apply_handle_stream_prepare()
> 
> ```
> +                       /*
> +                        * After sending the data to the parallel apply worker,
> wait for
> +                        * that worker to finish. This is necessary to maintain
> commit
> +                        * order which avoids failures due to transaction
> dependencies and
> +                        * deadlocks.
> +                        */
> +
> + parallel_apply_wait_for_xact_finish(winfo->shared);
> ```
> 
> Here seems not to be correct. LA may not send data but spill changes to file.

Changed.

> 05. apply_handle_stream_commit()
> 
> ```
> +                       if (apply_action ==
> TRANS_LEADER_PARTIAL_SERIALIZE)
> +
> + stream_cleanup_files(MyLogicalRepWorker->subid, xid);
> ```
> 
> I'm not sure whether the stream files should be removed by LA or PAs. Could
> you tell me the reason why you choose LA?

I think the logic would be natural that only LA can write/delete/create the file and
PA only need to read from it.

> ===
> applyparallelworker.c
> 
> 05. parallel_apply_can_start()
> 
> ```
> +       if (switching_to_serialize)
> +               return false;
> ```
> 
> Could you add a comment like:
> Don't start a new parallel apply worker if the leader apply worker has been
> spilling changes to the disk temporarily.

These codes have been removed.

> 06. parallel_apply_start_worker()
> 
> ```
> +       /*
> +        * Set the xact_state flag in the leader instead of the
> +        * parallel apply worker to avoid the race condition where the leader
> has
> +        * already started waiting for the parallel apply worker to finish
> +        * processing the transaction while the child process has not yet
> +        * processed the first STREAM_START and has not set the
> +        * xact_state to true.
> +        */
> ```
> 
> I thinkg the word "flag" should be used for boolean, so the comment should be
> modified.
> (There are so many such code-comments, all of them should be modified.)

Changed.

> 
> 07. parallel_apply_get_unique_id()
> 
> ```
> +/*
> + * Returns the unique id among all parallel apply workers in the subscriber.
> + */
> +static uint16
> +parallel_apply_get_unique_id()
> ```
> 
> I think this function is inefficient: the computational complexity will be increased
> linearly when the number of PAs is increased. I think the Bitmapset data
> structure may be used.

This function is removed.

> 08. parallel_apply_send_data()
> 
> ```
> #define CHANGES_THRESHOLD    1000
> #define SHM_SEND_TIMEOUT_MS    10000
> ```
> 
> I think the timeout may be too long. Could you tell me the background about it?

Serializing data to file would affect the performance, so I tried to make it difficult to happen unless the
PA is really blocked by another PA or BA.

> 09. parallel_apply_send_data()
> 
> ```
>             /*
>              * Close the stream file if not in a streaming block, the
> file will
>              * be reopened later.
>              */
>             if (!stream_apply_worker)
>                 serialize_stream_stop(winfo->shared->xid);
> ```
> 
> a.
> IIUC the timings when LA tries to send data but stream_apply_worker is NULL
> are:
> * apply_handle_stream_prepare,
> * apply_handle_stream_start,
> * apply_handle_stream_abort, and
> * apply_handle_stream_commit.
> And at that time the state of TransApplyAction may be
> TRANS_LEADER_SEND_TO_PARALLEL. When should be close the file?

Changed to use another condition to check.

> b.
> Even if this is needed, I think the name of the called function should be modified.
> Here LA may not handle STREAM_STOP message. close_stream_file() or
> something?
> 
> 
> 10. parallel_apply_send_data()
> 
> ```
>             /* Initialize the stream fileset. */
>             serialize_stream_start(winfo->shared->xid, true); ```
> 
> I think the name of the called function should be modified. Here LA may not
> handle STREAM_START message. open_stream_file() or something?
> 
> 11. parallel_apply_send_data()
> 
> ```
>         if (++retry >= CHANGES_THRESHOLD)
>         {
>             MemoryContext oldcontext;
>             StringInfoData msg;
> ...
>             initStringInfo(&msg);
>             appendBinaryStringInfo(&msg, data, nbytes); ...
>             switching_to_serialize = true;
>             apply_dispatch(&msg);
>             switching_to_serialize = false;
> 
>             break;
>         }
> ```
> 
> pfree(msg.data) may be needed.
> 
> ===
> 12. worker_internal.h
> 
> ```
> +       pg_atomic_uint32        left_message;
> ```
> 
> 
> ParallelApplyWorkerShared has been already controlled by mutex locks.  Why
> did you add an atomic variable to the data structure?

I personally feel this value is modified more frequently, so use an atomic
variable here.

> ===
> 13. typedefs.list
> 
> ParallelTransState should be added.

Added.

> ===
> 14. General
> 
> I have already said old about it directly, but I point it out to notify other members
> again.
> I have caused a deadlock with two PAs. Indeed it could be solved by the lmgr, but
> the output seemed not to be kind. Followings were copied from the log and we
> could see that commands executed by apply workers were not output. Can we
> extend it, or is it the out of scope?
> 
> 
> ```
> 2022-11-07 11:11:27.449 UTC [11262] ERROR:  deadlock detected
> 2022-11-07 11:11:27.449 UTC [11262] DETAIL:  Process 11262 waits for
> AccessExclusiveLock on object 16393 of class 6100 of database 0; blocked by
> process 11320.
>         Process 11320 waits for ShareLock on transaction 742; blocked by
> process 11266.
>         Process 11266 waits for AccessShareLock on object 16393 of class 6100 of
> database 0; blocked by process 11262.
>         Process 11262: <command string not enabled>
>         Process 11320: <command string not enabled>
>         Process 11266: <command string not enabled> ```

On HEAD, a apply worker could also cause a deadlock with a user backend. Like:
Tx1 (backend)
begin;
insert into tbl1 values (100);
        Tx2 (replaying streaming transaction)
        begin;
        insert into tbl1 values (1);
        delete from tbl2;
insert into tbl1 values (1);
        insert into tbl1 values (100);

logical replication worker ERROR:  deadlock detected
logical replication worker DETAIL:  Process 2158391 waits for ShareLock on transaction 749; blocked by process
2158410.
        Process 2158410 waits for ShareLock on transaction 750; blocked by process 2158391.
        Process 2158391: <command string not enabled>
        Process 2158410: insert into tbl1 values (1);

So, it looks like the existing behavior. I agree that it would be better to
show something, but maybe we can do that as a separate patch.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply