RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Hayato Kuroda (Fujitsu)
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id TYAPR01MB5866AD420FB3E290F13EDD17F53C9@TYAPR01MB5866.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
Dear Hou,

The followings are my comments. I want to consider the patch more, but I sent it once.

===
worker.c

01. typedef enum TransApplyAction

```
/*
 * What action to take for the transaction.
 *
 * TRANS_LEADER_APPLY means that we are in the leader apply worker and changes
 * of the transaction are applied directly in the worker.
 *
 * TRANS_LEADER_SERIALIZE means that we are in the leader apply worker or table
 * sync worker. Changes are written to temporary files and then applied when
 * the final commit arrives.
 *
 * TRANS_LEADER_SEND_TO_PARALLEL means that we are in the leader apply worker
 * and need to send the changes to the parallel apply worker.
 *
 * TRANS_PARALLEL_APPLY means that we are in the parallel apply worker and
 * changes of the transaction are applied directly in the worker.
 */
```

TRANS_LEADER_PARTIAL_SERIALIZE should be listed in.

02. handle_streamed_transaction()

```
+       StringInfoData  origin_msg;
...
+       origin_msg = *s;
...
+                               /* Write the change to the current file */
+                               stream_write_change(action,
+                                                                       apply_action == TRANS_LEADER_SERIALIZE ?
+                                                                       s : &origin_msg);
```

I'm not sure why origin_msg is needed. Can we remove the conditional operator?


03. apply_handle_stream_start()

```
+ * XXX We can avoid sending pairs of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one transaction at a
+ * time. However, it is not clear whether any optimization is worthwhile
+ * because these messages are sent only when the logical_decoding_work_mem
+ * threshold is exceeded.
```

This comment should be modified because PA must acquire and release locks at that time.


04. apply_handle_stream_prepare()

```
+                       /*
+                        * After sending the data to the parallel apply worker, wait for
+                        * that worker to finish. This is necessary to maintain commit
+                        * order which avoids failures due to transaction dependencies and
+                        * deadlocks.
+                        */
+                       parallel_apply_wait_for_xact_finish(winfo->shared);
```

Here seems not to be correct. LA may not send data but spill changes to file.

05. apply_handle_stream_commit()

```
+                       if (apply_action == TRANS_LEADER_PARTIAL_SERIALIZE)
+                               stream_cleanup_files(MyLogicalRepWorker->subid, xid);
```

I'm not sure whether the stream files should be removed by LA or PAs. Could you tell me the reason why you choose LA?

===
applyparallelworker.c

05. parallel_apply_can_start()

```
+       if (switching_to_serialize)
+               return false;
```

Could you add a comment like:
Don't start a new parallel apply worker if the leader apply worker has been spilling changes to the disk temporarily.

06. parallel_apply_start_worker()

```
+       /*
+        * Set the xact_state flag in the leader instead of the
+        * parallel apply worker to avoid the race condition where the leader has
+        * already started waiting for the parallel apply worker to finish
+        * processing the transaction while the child process has not yet
+        * processed the first STREAM_START and has not set the
+        * xact_state to true.
+        */
```

I thinkg the word "flag" should be used for boolean, so the comment should be modified.
(There are so many such code-comments, all of them should be modified.)


07. parallel_apply_get_unique_id()

```
+/*
+ * Returns the unique id among all parallel apply workers in the subscriber.
+ */
+static uint16
+parallel_apply_get_unique_id()
```

I think this function is inefficient: the computational complexity will be increased linearly when the number of PAs is
increased.I think the Bitmapset data structure may be used.
 

08. parallel_apply_send_data()

```
#define CHANGES_THRESHOLD    1000
#define SHM_SEND_TIMEOUT_MS    10000
```

I think the timeout may be too long. Could you tell me the background about it?


09. parallel_apply_send_data()

```
            /*
             * Close the stream file if not in a streaming block, the file will
             * be reopened later.
             */
            if (!stream_apply_worker)
                serialize_stream_stop(winfo->shared->xid);
```

a.
IIUC the timings when LA tries to send data but stream_apply_worker is NULL are:
* apply_handle_stream_prepare, 
* apply_handle_stream_start, 
* apply_handle_stream_abort, and
* apply_handle_stream_commit.
And at that time the state of TransApplyAction may be TRANS_LEADER_SEND_TO_PARALLEL. When should be close the file?

b.
Even if this is needed, I think the name of the called function should be modified. Here LA may not handle STREAM_STOP
message.close_stream_file() or something?
 


10. parallel_apply_send_data()

```
            /* Initialize the stream fileset. */
            serialize_stream_start(winfo->shared->xid, true);
```

I think the name of the called function should be modified. Here LA may not handle STREAM_START message.
open_stream_file()or something?
 

11. parallel_apply_send_data()

```
        if (++retry >= CHANGES_THRESHOLD)
        {
            MemoryContext oldcontext;
            StringInfoData msg;
...
            initStringInfo(&msg);
            appendBinaryStringInfo(&msg, data, nbytes);
...
            switching_to_serialize = true;
            apply_dispatch(&msg);
            switching_to_serialize = false;

            break;
        }
```

pfree(msg.data) may be needed.

===
12. worker_internal.h

```
+       pg_atomic_uint32        left_message;
```


ParallelApplyWorkerShared has been already controlled by mutex locks.  Why did you add an atomic variable to the data
structure?

===
13. typedefs.list

ParallelTransState should be added.

===
14. General

I have already said old about it directly, but I point it out to notify other members again.
I have caused a deadlock with two PAs. Indeed it could be solved by the lmgr, but the output seemed not to be kind.
Followingswere copied from the log and we could see that commands executed by apply workers were not output. Can we
extendit, or is it the out of scope?
 


```
2022-11-07 11:11:27.449 UTC [11262] ERROR:  deadlock detected
2022-11-07 11:11:27.449 UTC [11262] DETAIL:  Process 11262 waits for AccessExclusiveLock on object 16393 of class 6100
ofdatabase 0; blocked by process 11320.
 
        Process 11320 waits for ShareLock on transaction 742; blocked by process 11266.
        Process 11266 waits for AccessShareLock on object 16393 of class 6100 of database 0; blocked by process 11262.
        Process 11262: <command string not enabled>
        Process 11320: <command string not enabled>
        Process 11266: <command string not enabled>
```


Best Regards,
Hayato Kuroda
FUJITSU LIMITED


pgsql-hackers by date:

Previous
From: "Karthik Jagadish (kjagadis)"
Date:
Subject: Postgres auto vacuum - Disable
Next
From: Dave Page
Date:
Subject: Re: Postgres auto vacuum - Disable