RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From kuroda.hayato@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id TYAPR01MB5866C7ED7046BBFC1B8FDD26F55C9@TYAPR01MB5866.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("kuroda.hayato@fujitsu.com" <kuroda.hayato@fujitsu.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
Dear Hou,

I put comments for v35-0001.

01. catalog.sgml

```
+       Controls how to handle the streaming of in-progress transactions:
+       <literal>f</literal> = disallow streaming of in-progress transactions,
+       <literal>t</literal> = spill the changes of in-progress transactions to
+       disk and apply at once after the transaction is committed on the
+       publisher,
+       <literal>p</literal> = apply changes directly using a parallel apply
+       worker if available (same as 't' if no worker is available)
```

I'm not sure why 't' means "spill the changes to file". Is it compatibility issue?

~~~
02. applyworker.c - parallel_apply_stream_abort

The argument abort_data is not modified in the function. Maybe "const" modifier should be added.
(Other functions should be also checked...)

~~~
03. applyparallelworker.c - parallel_apply_find_worker

```
+       ParallelApplyWorkerEntry *entry = NULL;
```

This may not have to be initialized here.

~~~
04. applyparallelworker.c - HandleParallelApplyMessages

```
+       static MemoryContext hpm_context = NULL;
```

I think "hpm" means "handle parallel message", so it should be "hpam".

~~~
05. launcher.c - logicalrep_worker_launch()

```
    if (is_subworker)
        snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
    else
        snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
```

I'm not sure why there are only bgw_type even if there are three types of apply workers. Is it for compatibility?

~~~
06. launcher.c - logicalrep_worker_stop_by_slot

An assertion like Assert(slot_no >=0 && slot_no < max_logical_replication_workers) should be added at the top of this
function.

~~~
07. launcher.c - logicalrep_worker_stop_internal

```
+/*
+ * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach() and
+ * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die.
+ */
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker)
```

I think logicalrep_worker_stop_internal() may be not "Workhorse" for logicalrep_worker_detach(). In the function
internalfunction is called for parallel apply worker, and it does not main part of the detach function. 
 

~~~
08. worker.c - handle_streamed_transaction()

```
+       TransactionId current_xid = InvalidTransactionId;
```

This initialization is not needed. This is not used in non-streaming mode, otherwise it is substituted before used.

~~~
09. worker.c - handle_streamed_transaction()

```
+               case TRANS_PARALLEL_APPLY:
+                       /* Define a savepoint for a subxact if needed. */
+                       parallel_apply_start_subtrans(current_xid, stream_xid);
+                       return false;
```

Based on other case-block, Assert(am_parallel_apply_worker()) may be added at the top of this part.
This suggestion can be said for other swith-case statements.

~~~
10. worker.c - apply_handle_stream_start

```
+ *
+ * XXX We can avoid sending pair of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one
+ * transaction-at-a-time. However, it is not clear whether that is worth the
+ * effort because it is sent after logical_decoding_work_mem changes.
```

I can understand that START message is not needed, but is STOP really removable? If leader does not send STOP to its
child,does it lose a chance to change the worker-state to IDLE_IN_TRANSACTION?  
 

~~~
11. worker.c - apply_handle_stream_start

Currently the number of received chunks have not counted, but it can do if a variable "nchunks" is defined and
incrementedin apply_handle_stream_start(). This this info may be useful to determine appropriate
logical_decoding_work_memfor workloads. How do you think?
 

~~~
12. worker.c - get_transaction_apply_action

{} are not needed.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED


pgsql-hackers by date:

Previous
From: "Drouvot, Bertrand"
Date:
Subject: Re: Record SET session in VariableSetStmt
Next
From: Peter Eisentraut
Date:
Subject: Re: Allow foreign keys to reference a superset of unique columns