Re: Logical Replication of sequences - Mailing list pgsql-hackers

From vignesh C
Subject Re: Logical Replication of sequences
Date
Msg-id CALDaNm18siwD6Mamv8Dd8ubwSCw3Fi6SnB4B3Lr+4R7snLkfeA@mail.gmail.com
Whole thread Raw
In response to Re: Logical Replication of sequences  (Peter Smith <smithpb2250@gmail.com>)
Responses Re: Logical Replication of sequences
List pgsql-hackers
On Wed, 29 Oct 2025 at 08:17, Peter Smith <smithpb2250@gmail.com> wrote:
>
> Hi Vignesh,
>
> Some more review comments for v20251029-0001.
>
> ======
> .../replication/logical/sequencesync.c
>
> 1.
> + * The apply worker periodically scans pg_subscription_rel for sequences in
> + * INIT state. When such sequences are found, it spawns a
> + * sequencesync worker to handle synchronization.
>
> I did not see anything in this header comment that says there is
> currently no more than 1 sequencesync worker. The above part of the
> comment doesn't make that clear.

Modified

> ~~~
>
> 2.
> + * Handle sequence synchronization cooperation from the apply worker.
>
> Is it simpler just to say:
> Apply worker determines if sequence synchronization is needed.

Modified

> ~~~
>
> report_error_sequences:
>
> 3.
> +static void
> +report_error_sequences(StringInfo insuffperm_seqs, StringInfo mismatched_seqs,
> +    StringInfo missing_seqs)
>
> Function name seems strange. How about 'ereport_sequence_errors'?

I preferred report_sequence_errors over ereport_sequence_errors.

> ~~~
>
> 4.
> + if (mismatched_seqs->len)
> + {
> + if (insuffperm_seqs->len)
> + {
> + appendStringInfo(combined_error_detail, "; mismatched sequence(s) on
> subscriber: (%s)",
> + mismatched_seqs->data);
> + appendStringInfoString(combined_error_hint, " For mismatched
> sequences, alter or re-create local sequences to match the publisher's
> parameters.");
> + }
> + else
> + {
> + appendStringInfo(combined_error_detail, "Mismatched sequence(s) on
> subscriber: (%s)",
> + mismatched_seqs->data);
> + appendStringInfoString(combined_error_hint, "For mismatched
> sequences, alter or re-create local sequences to match the publisher's
> parameters.");
> + }
> + }
> +
> + if (missing_seqs->len)
> + {
> + if (insuffperm_seqs->len || mismatched_seqs->len)
> + {
> + appendStringInfo(combined_error_detail, "; missing sequence(s) on
> publisher: (%s)",
> + missing_seqs->data);
> + appendStringInfoString(combined_error_hint, " For missing sequences,
> remove them locally or run ALTER SUBSCRIPTION ... REFRESH PUBLICATION
> to refresh the subscription.");
> + }
> + else
> + {
> + appendStringInfo(combined_error_detail, "Missing sequence(s) on
> publisher: (%s)",
> + missing_seqs->data);
> + appendStringInfoString(combined_error_hint, "For missing sequences,
> remove them locally or run ALTER SUBSCRIPTION ... REFRESH PUBLICATION
> to refresh the subscription.");
> + }
> + }
>
>
> This logic has a lot of duplication just to handle the separate
> multiple details/hints. I think it can be simplified a lot.
>
> SUGGESTION
> if (mismatched_seqs->len)
> {
>   if (combined_error_detail->len)
>   {
>     appendStringInfo(combined_error_detail, "; ");
>     appendStringInfoChar(combined_error_hint, ' ');
>   }
>   appendStringInfo(combined_error_detail, "Mismatched sequence(s) ...);
>   appendStringInfoString(combined_error_hint, "For mismatched sequences, ...);
> }
> if (missing_seqs->len)
> {
>   if (combined_error_detail->len)
>   {
>     appendStringInfo(combined_error_detail, "; ");
>     appendStringInfoChar(combined_error_hint, ' ');
>   }
>   appendStringInfo(combined_error_detail, "Missing sequence(s) on ...);
>   appendStringInfoString(combined_error_hint, "For missing sequences,
> remove ...);
> }

Modified

> ~~~
>
> get_remote_sequence_info:
>
> 5.
> +static void
> +get_remote_sequence_info(TupleTableSlot *slot, LogicalRepSeqHashKey *key,
> + int64 *last_value, bool *is_called,
> + XLogRecPtr *page_lsn, Oid *remote_typid,
> + int64 *remote_start, int64 *remote_increment,
> + int64 *remote_min, int64 *remote_max,
> + bool *remote_cycle)
>
> I felt this code might be better if you would introduce a new
> structure (or add to an existing one?) to hold all the members instead
> of declaring a dozen variables and passing them as parameters. So, a
> cleaner function signature here might be like:
>
> static SequenceInfo get_remote_sequence_info(TupleTableSlot *slot).
>
> This may also allow you to simplify other code that passes so many
> members as parameters -- e.g. also the validation function.

I will take this in the next version or little later.

> ~~~
> validate_sequence:
>
> 6.
> + /* Sequence was concurrently dropped */
> + if (!sequence_rel)
> + return COPYSEQ_SKIPPED;
> +
> + /* Sequence was concurrently dropped */
> + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo->localrelid));
> + if (!HeapTupleIsValid(tup))
> + return COPYSEQ_SKIPPED;
> +
> + /* Sequence was concurrently invalidated */
> + if (!seqinfo->entry_valid)
> + {
> + ReleaseSysCache(tup);
> + return COPYSEQ_SKIPPED;
> + }
>
> 6a.
> All those comments might be better written as questions. e.g.
> /* Sequence was concurrently dropped? */
> /* Sequence was concurrently dropped? */
> /* Sequence was concurrently invalidated? */

Modified

> ~
>
> 6b.
> + /* Sequence was concurrently dropped */
> + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo->localrelid));
> + if (!HeapTupleIsValid(tup))
> + return COPYSEQ_SKIPPED;
>
> I think the 2nd comment belongs after the tup assignment.

Modified

> ~
>
> 6c.
> + local_seq = (Form_pg_sequence) GETSTRUCT(tup);
> + if (local_seq->seqtypid != remote_typid ||
> + local_seq->seqstart != remote_start ||
> + local_seq->seqincrement != remote_increment ||
> + local_seq->seqmin != remote_min ||
> + local_seq->seqmax != remote_max ||
> + local_seq->seqcycle != remote_cycle)
> + result = COPYSEQ_MISMATCH;
>
> This should have a comment like the others. e.g.
> /* Sequence parameters for remote/local are the same? */

Modified

> ~~~
>
> copy_sequence:
>
> 7.
> +/*
> + * Apply remote sequence state to local sequence and mark it as synchronized.
> + */
>
> Is it better to explicitly name the state here too? e.g.
>
> /and mark it as synchronized/and mark it as synchronized (READY)/

Modified

> ~~~
>
> 8.
> + /*
> + * Make sure that the sequence is copied as table owner, unless the user
> + * has opted out of that behaviour.
> + */
> + if (!MySubscription->runasowner)
> + SwitchToUntrustedUser(seqinfo->seqowner, &ucxt);
>
> 8a.
> Is "table owner" the correct term here?

Changed it to sequence owner

> ~
>
> 8b.
> Why not use the 'run_as_owner' variable?

modified

> ~~~
>
> copy_sequences:
>
> 9.
> + ereport(ERROR,
> + errcode(ERRCODE_CONNECTION_FAILURE),
> + errmsg("could not receive list of sequence information from the
> publisher: %s",
> +    res->err));
>
> How about just:
> "could not fetch sequence information from the publisher: %s",

modified

> ~~~
>
> 10.
> + CopySeqResult result;
>
> 'result' seems a slightly meaningless variable name since this is not
> even returned from the function.

Changed it

> ~~~
>
> sequencesync_list_invalidate_cb:
>
> 11.
> + if (reloid != InvalidOid)
>
> Use macro !OidIsValid(reloid)

Modified

> ~
>
> 12.
> + hash_seq_init(&status, sequences_to_copy);
> +
>
> This init is common for the if/else so could be done outside.

This code is removed now

> ~~~
>
> LogicalRepSeqMatchFunc:
>
> 13.
> + /* Compare by namespace name first */
> + cmp = strcmp(k1->nspname, k2->nspname);
> + if (cmp != 0)
> + return cmp;
> +
> + /* If namespace names are equal, compare by sequence name */
> + return strcmp(k1->seqname, k2->seqname);
>
> A simpler way to write this comparator might look like:
>
> /* Compare by namespace name first, then by sequence name */
> cmp = strcmp(k1->nspname, k2->nspname);
> if (cmp == 0)
>   cmp = strcmp(k1->seqname, k2->seqname);
>
> return cmp;

This code is removed now

> ~~~
>
> LogicalRepSyncSequences:
>
> 14.
> +/*
> + * Start syncing the sequences in the sequencesync worker.
> + */
> +static void
> +LogicalRepSyncSequences(void)
>
> Might need a different function comment. This one seems almost the
> same as the function comment for copy_sequences().

Modified

> ~~~
>
> 15.
> + key.seqname = RelationGetRelationName(sequence_rel);
> + key.nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
> +
> + /* Allocate the tracking info in a permanent memory context. */
> + oldctx = MemoryContextSwitchTo(CacheMemoryContext);
> +
> + seq_entry = hash_search(sequences_to_copy, &key, HASH_ENTER, &found);
> + Assert(!found);
> +
> + memset(seq_entry, 0, sizeof(LogicalRepSequenceInfo));
> +
> + seq_entry->localrelid = subrel->srrelid;
> + seq_entry->remote_seq_queried = false;
> + seq_entry->seqowner = sequence_rel->rd_rel->relowner;
> + seq_entry->entry_valid = true;
>
> Nit. It seems more natural to put the nspname before the seqname.
>
> So,
> key.nspname = ...
> key.seqname = ...
>
> and
> seq_entry->nspname = pstrdup(key.nspname);
> seq_entry->seqname = pstrdup(key.seqname);

This code has been removed, there was one more place where the
ordering was different, modified it there.

> ~~~
>
> LogicalRepSyncSequences:
>
> 16.
> + /* If there are any sequences that need to be copied */
> + if (hash_get_num_entries(sequences_to_copy))
> + copy_sequences(LogRepWorkerWalRcvConn, subid);
>
> If there are no sequences to copy then AFAICT (from
> SequenceSyncWorkerMain) this sequencesync worker is just going to just
> stop and exit isn't it? So, why didn't we check this earlier and
> perhaps could have avoided making a publisher connection
> LogRepWorkerWalRcvConn for no reason?

Modified

 ======
> src/backend/replication/logical/syncutils.c
>
> FinishSyncWorker:
>
> 17.
> + if (am_tablesync_worker())
> + ereport(LOG,...));
> + else
> + ereport(LOG,...);
>
> + if (am_sequencesync_worker())
> + {
> ...
> + }
> + else
> + {
> ...
> + }
>
> Using different ordering for these if/else conditions makes the logic
> appear more complex. The second condition should be rearranged to be
> same order as the first: if (am_tablesync_worker()) ... else ...;

Modified

> ~~~
>
> launch_sync_worker:
>
> 18.
> + (void) logicalrep_worker_launch((OidIsValid(relid)) ?
> WORKERTYPE_TABLESYNC : WORKERTYPE_SEQUENCESYNC,
> + MyLogicalRepWorker->dbid,
> + MySubscription->oid,
> + MySubscription->name,
> + MyLogicalRepWorker->userid,
> + relid, DSM_HANDLE_INVALID, false);
>
> I noticed you chose not to pass 'wtype' as a parameter here, but
> instead do the ternary to figure out the wtype.  AFAICT the caller
> already knows it, so why not just pass it in, instead of figuring it
> out again?

Modified

Shveta's comments from [1] have also been addressed in this version.
Dilip's merge suggestion from [2] has also been addressed in this version.

The attached v20251029_2 version patch has the fixes for the same.

[1] - https://www.postgresql.org/message-id/CAJpy0uCkt4V95un1025xV%2BBoLOXg0DTk418Di_f6gerpuezBmA%40mail.gmail.com
[2] - https://www.postgresql.org/message-id/CAFiTN-v1mm%3DwAMBVT82Ok9YrGG7o-wszxw4RHsmKk1oP%2B%3DrJnA%40mail.gmail.com

Regards,
Vignesh

Attachment

pgsql-hackers by date:

Previous
From: Nathan Bossart
Date:
Subject: Re: another autovacuum scheduling thread
Next
From: vignesh C
Date:
Subject: Re: Logical Replication of sequences