Re: Logical Replication of sequences - Mailing list pgsql-hackers

From Chao Li
Subject Re: Logical Replication of sequences
Date
Msg-id BABE39BA-6893-4D65-8432-5523960FFC2B@gmail.com
Whole thread Raw
In response to Re: Logical Replication of sequences  (vignesh C <vignesh21@gmail.com>)
List pgsql-hackers
> On Oct 24, 2025, at 23:22, vignesh C <vignesh21@gmail.com> wrote:
>
> Regards,
> Vignesh
>
<v20251024-0001-Rename-sync_error_count-to-tbl_sync_error_.patch><v20251024-0002-Add-worker-type-argument-to-logicalrep_wor.patch><v20251024-0003-New-worker-for-sequence-synchronization-du.patch><v20251024-0004-Documentation-for-sequence-synchronization.patch>


The changes in 0001 are straightforward, looks good. I haven’t reviewed 0004 yet. Got a few comments for 0002 and 0003.

1 - 0002
```
  * We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables and sequences. For table sync workers, the relid
+ * should be set to the OID of the relation being synchronized.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype,
+                       bool only_running)
 {
     int            i;
     LogicalRepWorker *res = NULL;

     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
```

The comment says that “for apply workers, the relid should be set to InvalidOid”, so is it worthy adding an assert for
that?

2 - 0002
```
-    /* Search for attached worker for a given subscription id. */
+    /* Search for the attached worker matching the specified criteria. */
     for (i = 0; i < max_logical_replication_workers; i++)
     {
```

Minor issue with the comment:

* we are not search for a specific work, so “the” should be “a”
* “attached” is confusing. In the old comment, ‘attached” tied to “a given subscription id”, but now, attach to what?

So suggested revision:

“/* Search for a logical replication worker matching the specified criteria */”

3 - 0002
```
 /*
  * Stop the logical replication worker for subid/relid, if any.
+ *
+ * Similar to logicalrep_worker_find, relid should be set to a valid OID only
+ * for table sync workers.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype)
```

The comment should be updated: subid/relid => subid/relid/wtype.

4 - 0002
```
@@ -477,7 +477,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);

             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-                                                rstate->relid, false);
+                                                rstate->relid,
+                                                WORKERTYPE_TABLESYNC, true);
```

Why changed only_running from false to true? This commit adds a new worker type, but don’t tend to change the existing
logic.

5 - 0003
```
+/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+    LogicalRepWorker *worker;
+
+    LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+    /*
+     * Set the last_seqsync_start_time for the sequence worker in the apply
+     * worker instead of the sequence sync worker, as the sequence sync worker
+     * has finished and is about to exit.
+     */
+    worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid,
+                                    WORKERTYPE_APPLY, true);
+    if (worker)
+        worker->last_seqsync_start_time = 0;
+
+    LWLockRelease(LogicalRepWorkerLock);
+}
```

Two comments for this new function:

* The function comment and in-code comment are redundant. Suggesting move the in-code comment to function comment.
* Why LW_SHARED is used? We are writing worker->last_seqsync_start_time, shouldn’t LW_EXCLUSIVE be used?

6 - 0003
```
+    /*
+     * Count running sync workers for this subscription, while we have the
+     * lock.
+     */
+    nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+    LWLockRelease(LogicalRepWorkerLock);
+
+    launch_sync_worker(nsyncworkers, InvalidOid,
+                       &MyLogicalRepWorker->last_seqsync_start_time);
```

I think here could be a race condition. Because the lock is acquired in LW_SHARED, meaning multiple caller may get the
samensyncworkers. Then it launches sync worker based on nsyncworkers, which would use inaccurate nsyncworkers, because
betweenLWLockRelease() and launch_sync_worker(), another worker might be started. 

But if that is not the case, only one caller should call ProcessSyncingSequencesForApply(), then why the lock is
needed?

7 - 0003
```
+    if (insuffperm_seqs->len)
+    {
+        appendStringInfo(combined_error_detail, "Insufficient permission for sequence(s): (%s)",
+                         insuffperm_seqs->data);
+        appendStringInfoString(combined_error_hint, "Grant permissions for the sequence(s).");
+    }
```

“Grant permissions” is unclear. Should it be “Grant UPDATE privilege”?

8 - 0003
```
+            appendStringInfoString(combined_error_hint, " For mismatched sequences, alter or re-create local sequences
tohave matching parameters as publishers."); 
```

“To have matching parameters as publishers” grammatically sound not good. Maybe revision to “to match the publisher’s
parameters”.

9 - 0003
```
+        /*
+         * current_indexes is not incremented sequentially because some
+         * sequences may be missing, and the number of fetched rows may not
+         * match the batch size. The `hash_search` with HASH_REMOVE takes care
+         * of the count.
+         */
```

Typo: current_indexes => current_index

10 - 0003
```
-    /* Find the leader apply worker and signal it. */
-    logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+    /*
+     * This is a clean exit of the sequencesync worker; reset the
+     * last_seqsync_start_time.
+     */
+    if (wtype == WORKERTYPE_SEQUENCESYNC)
+        logicalrep_reset_seqsync_start_time();
+    else
+        /* Find the leader apply worker and signal it. */
+        logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
```

The comment “this is a clean exist of sequencsync worker” is specific to “if”, so suggesting moving into “if”. And
“thisis a clean exis of the sequencesyc worker” is not needed, keep consistent with the comment in “else”. 

11 - 0003
```
+void
+launch_sync_worker(int nsyncworkers, Oid relid, TimestampTz *last_start_time)
+{
+    /* If there is a free sync worker slot, start a new sync worker */
+    if (nsyncworkers < max_sync_workers_per_subscription)
+    {
```

The entire function is under an “if”, so we can do “if (!…) return”, so saves a level of indent.

Best regards,
—
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/







pgsql-hackers by date:

Previous
From: shveta malik
Date:
Subject: Re: Logical Replication of sequences
Next
From: Fujii Masao
Date:
Subject: Re: Suggestion to add --continue-client-on-abort option to pgbench