RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From kuroda.hayato@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id TYAPR01MB5866AD46CD08A314271D900AF5449@TYAPR01MB5866.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
Dear Hou-san,

Thank you for updating the patch! Followings are comments for v28-0001.
I will dig your patch more, but I send partially to keep the activity of the thread.

===
For applyparallelworker.c

01. filename
The word-ordering of filename seems not good
because you defined the new worker as "parallel apply worker".

02. global variable

```
+/* Parallel apply workers hash table (initialized on first use). */
+static HTAB *ParallelApplyWorkersHash = NULL;
+
+/*
+ * List that stores the information of parallel apply workers that were
+ * started. Newly added worker information will be removed from the list at the
+ * end of the transaction when there are enough workers in the pool. Besides,
+ * exited workers will be removed from the list after being detected.
+ */
+static List *ParallelApplyWorkersList = NIL;
```

Could you add descriptions about difference between the list and hash table?
IIUC the Hash stores the parallel workers that
are assigned to transacitons, and the list stores all alive ones.


03. parallel_apply_find_worker

```
+       /* Return the cached parallel apply worker if valid. */
+       if (stream_apply_worker != NULL)
+               return stream_apply_worker;
```

This is just a question -
Why the given xid and the assigned xid to the worker are not checked here?
Is there chance to find wrong worker? 


04. parallel_apply_start_worker

```
+/*
+ * Start a parallel apply worker that will be used for the specified xid.
+ *
+ * If a parallel apply worker is not in use then re-use it, otherwise start a
+ * fresh one. Cache the worker information in ParallelApplyWorkersHash keyed by
+ * the specified xid.
+ */
+void
+parallel_apply_start_worker(TransactionId xid)
```

"parallel_apply_start_worker" should be "start_parallel_apply_worker", I think


05. parallel_apply_stream_abort

```
        for (i = list_length(subxactlist) - 1; i >= 0; i--)
        {
            xid = list_nth_xid(subxactlist, i);
            if (xid == subxid)
            {
                found = true;
                break;
            }
        }
```

Please not reuse the xid, declare and use another variable in the else block or something.

06. parallel_apply_free_worker

```
+       if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
+       {
```

Please add a comment like: "Do we have enough workers in the pool?" or something.

===
For worker.c

07. general

In many lines if-else statement is used for apply_action, but I think they should rewrite as switch-case statement.

08. global variable

```
-static bool in_streamed_transaction = false;
+bool in_streamed_transaction = false;
```

a.

It seems that in_streamed_transaction is used only in the worker.c, so we can change to stati variable.

b.

That flag is set only when an apply worker spill the transaction to the disk.
How about "in_streamed_transaction" -> "in_spilled_transaction"?

09.  apply_handle_stream_prepare

```
-       elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
```

I think this debug message is still useful.

10. apply_handle_stream_stop

```
+       if (apply_action == TA_APPLY_IN_PARALLEL_WORKER)
+       {
+               pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
+       }
+       else if (apply_action == TA_SEND_TO_PARALLEL_WORKER)
+       {
```

The ordering of the STREAM {STOP, START} is checked only when an apply worker spill the transaction to the disk.
(This is done via in_streamed_transaction)
I think checks should be added here, like if (!stream_apply_worker) or something.

11. apply_handle_stream_abort

```
+       if (in_streamed_transaction)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg_internal("STREAM ABORT message without STREAM STOP")));
```

I think the check by stream_apply_worker should be added.

12. apply_handle_stream_commit

a.

```
    if (in_streamed_transaction)
        ereport(ERROR,
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
```

I think the check by stream_apply_worker should be added.

b. 

```
-       elog(DEBUG1, "received commit for streamed transaction %u", xid);
```

I think this debug message is still useful.

===
For launcher.c

13. logicalrep_worker_stop_by_slot

```
+       LogicalRepWorker *worker = &LogicalRepCtx->workers[slot_no];
+
+       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+       /* Return if the generation doesn't match or the worker is not alive. */
+       if (worker->generation != generation ||
+               worker->proc == NULL)
+               return;
+
```

a.

LWLockAcquire(LogicalRepWorkerLock) is needed before reading slots.

b. 

LWLockRelease(LogicalRepWorkerLock) is needed even if worker is not found.



Best Regards,
Hayato Kuroda
FUJITSU LIMITED


pgsql-hackers by date:

Previous
From: andrey.arapov@nixaid.com
Date:
Subject: Re: [PATCH] initdb: do not exit after warn_on_mount_point
Next
From: Ranier Vilela
Date:
Subject: Re: Fix typo function circle_same (src/backend/utils/adt/geo_ops.c)