RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Hayato Kuroda (Fujitsu)
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id TYAPR01MB5866235A3B754897C03BCA1AF5179@TYAPR01MB5866.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
List pgsql-hackers
Dear Hou,

Thanks for making the patch. Followings are my comments for v54-0003 and 0004.

0003

pa_free_worker()

+       /* Unlink any files that were needed to serialize partial changes. */
+       if (winfo->serialize_changes)
+               stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
+

I think this part is not needed, because the LA cannot reach here if winfo->serialize_changes is true. Moreover
stream_cleanup_files()is done in pa_free_worker_info().
 

LogicalParallelApplyLoop()

The parallel apply worker wakes up every 0.1s even if we are in the PARTIAL_SERIALIZE mode. Do you have idea to reduce
that?

```
+                       pa_spooled_messages();
```

Comments are needed here, like "Changes may be serialize...".

pa_stream_abort()

```
+                               /*
+                                * Reopen the file and set the file position to the saved
+                                * position.
+                                */
+                               if (reopen_stream_fd)
+                               {
+                                       char            path[MAXPGPATH];
+
+                                       changes_filename(path, MyLogicalRepWorker->subid, xid);
+                                       stream_fd = BufFileOpenFileSet(&MyParallelShared->fileset,
+                                                                                                  path, O_RDONLY,
false);
+                                       BufFileSeek(stream_fd, fileno, offset, SEEK_SET);
+                               }
```

MyParallelShared->serialize_changes may be used instead of reopen_stream_fd.


worker.c

```
-#include "storage/buffile.h"
```

I think this include should not be removed.


handle_streamed_transaction()

```
+                       if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+                               pa_send_data(winfo, s->len, s->data);
+                       else
+                               stream_write_change(action, &original_msg);
```

Comments are needed here, 0001 has that bu removed in 0002.
There are some similar lines.


```
+                       /*
+                        * It is possible that while sending this change to parallel apply
+                        * worker we need to switch to serialize mode.
+                        */
+                       if (winfo->serialize_changes)
+                               pa_set_fileset_state(winfo->shared, FS_READY);
```

There are three same parts in the code, can we combine them to common part?

apply_spooled_messages()

```
+               /*
+                * Break the loop if the parallel apply worker has finished applying
+                * the transaction. The parallel apply worker should have closed the
+                * file before committing.
+                */
+               if (am_parallel_apply_worker() &&
+                       MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED)
+                       goto done;
```

I thnk pfree(buffer) and pfree(s2.data) should not be skippied.
And this part should be at below "nchanges++;"


0004

set_subscription_retry()

```
+       LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+                                        AccessShareLock);
+
```

I think AccessExclusiveLock should be aquired instead of AccessShareLock.
In AlterSubscription(), LockSharedObject(AccessExclusiveLock) seems to be used.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


pgsql-hackers by date:

Previous
From: Ashutosh Bapat
Date:
Subject: Re: Missing MaterialPath support in reparameterize_path_by_child
Next
From: Ashutosh Bapat
Date:
Subject: Re: Avoid streaming the transaction which are skipped (in corner cases)