Re: Logical Replication of sequences - Mailing list pgsql-hackers
| From | Chao Li |
|---|---|
| Subject | Re: Logical Replication of sequences |
| Date | |
| Msg-id | BABE39BA-6893-4D65-8432-5523960FFC2B@gmail.com Whole thread Raw |
| In response to | Re: Logical Replication of sequences (vignesh C <vignesh21@gmail.com>) |
| List | pgsql-hackers |
> On Oct 24, 2025, at 23:22, vignesh C <vignesh21@gmail.com> wrote:
>
> Regards,
> Vignesh
>
<v20251024-0001-Rename-sync_error_count-to-tbl_sync_error_.patch><v20251024-0002-Add-worker-type-argument-to-logicalrep_wor.patch><v20251024-0003-New-worker-for-sequence-synchronization-du.patch><v20251024-0004-Documentation-for-sequence-synchronization.patch>
The changes in 0001 are straightforward, looks good. I haven’t reviewed 0004 yet. Got a few comments for 0002 and 0003.
1 - 0002
```
* We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables and sequences. For table sync workers, the relid
+ * should be set to the OID of the relation being synchronized.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype,
+ bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
```
The comment says that “for apply workers, the relid should be set to InvalidOid”, so is it worthy adding an assert for
that?
2 - 0002
```
- /* Search for attached worker for a given subscription id. */
+ /* Search for the attached worker matching the specified criteria. */
for (i = 0; i < max_logical_replication_workers; i++)
{
```
Minor issue with the comment:
* we are not search for a specific work, so “the” should be “a”
* “attached” is confusing. In the old comment, ‘attached” tied to “a given subscription id”, but now, attach to what?
So suggested revision:
“/* Search for a logical replication worker matching the specified criteria */”
3 - 0002
```
/*
* Stop the logical replication worker for subid/relid, if any.
+ *
+ * Similar to logicalrep_worker_find, relid should be set to a valid OID only
+ * for table sync workers.
*/
void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype)
```
The comment should be updated: subid/relid => subid/relid/wtype.
4 - 0002
```
@@ -477,7 +477,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- rstate->relid, false);
+ rstate->relid,
+ WORKERTYPE_TABLESYNC, true);
```
Why changed only_running from false to true? This commit adds a new worker type, but don’t tend to change the existing
logic.
5 - 0003
```
+/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+ LogicalRepWorker *worker;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /*
+ * Set the last_seqsync_start_time for the sequence worker in the apply
+ * worker instead of the sequence sync worker, as the sequence sync worker
+ * has finished and is about to exit.
+ */
+ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid,
+ WORKERTYPE_APPLY, true);
+ if (worker)
+ worker->last_seqsync_start_time = 0;
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
```
Two comments for this new function:
* The function comment and in-code comment are redundant. Suggesting move the in-code comment to function comment.
* Why LW_SHARED is used? We are writing worker->last_seqsync_start_time, shouldn’t LW_EXCLUSIVE be used?
6 - 0003
```
+ /*
+ * Count running sync workers for this subscription, while we have the
+ * lock.
+ */
+ nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ launch_sync_worker(nsyncworkers, InvalidOid,
+ &MyLogicalRepWorker->last_seqsync_start_time);
```
I think here could be a race condition. Because the lock is acquired in LW_SHARED, meaning multiple caller may get the
samensyncworkers. Then it launches sync worker based on nsyncworkers, which would use inaccurate nsyncworkers, because
betweenLWLockRelease() and launch_sync_worker(), another worker might be started.
But if that is not the case, only one caller should call ProcessSyncingSequencesForApply(), then why the lock is
needed?
7 - 0003
```
+ if (insuffperm_seqs->len)
+ {
+ appendStringInfo(combined_error_detail, "Insufficient permission for sequence(s): (%s)",
+ insuffperm_seqs->data);
+ appendStringInfoString(combined_error_hint, "Grant permissions for the sequence(s).");
+ }
```
“Grant permissions” is unclear. Should it be “Grant UPDATE privilege”?
8 - 0003
```
+ appendStringInfoString(combined_error_hint, " For mismatched sequences, alter or re-create local sequences
tohave matching parameters as publishers.");
```
“To have matching parameters as publishers” grammatically sound not good. Maybe revision to “to match the publisher’s
parameters”.
9 - 0003
```
+ /*
+ * current_indexes is not incremented sequentially because some
+ * sequences may be missing, and the number of fetched rows may not
+ * match the batch size. The `hash_search` with HASH_REMOVE takes care
+ * of the count.
+ */
```
Typo: current_indexes => current_index
10 - 0003
```
- /* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ /*
+ * This is a clean exit of the sequencesync worker; reset the
+ * last_seqsync_start_time.
+ */
+ if (wtype == WORKERTYPE_SEQUENCESYNC)
+ logicalrep_reset_seqsync_start_time();
+ else
+ /* Find the leader apply worker and signal it. */
+ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
```
The comment “this is a clean exist of sequencsync worker” is specific to “if”, so suggesting moving into “if”. And
“thisis a clean exis of the sequencesyc worker” is not needed, keep consistent with the comment in “else”.
11 - 0003
```
+void
+launch_sync_worker(int nsyncworkers, Oid relid, TimestampTz *last_start_time)
+{
+ /* If there is a free sync worker slot, start a new sync worker */
+ if (nsyncworkers < max_sync_workers_per_subscription)
+ {
```
The entire function is under an “if”, so we can do “if (!…) return”, so saves a level of indent.
Best regards,
—
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
pgsql-hackers by date: