Re: Logical Replication of sequences - Mailing list pgsql-hackers
| From | Peter Smith |
|---|---|
| Subject | Re: Logical Replication of sequences |
| Date | |
| Msg-id | CAHut+PspvmCwRaKnG=CAWZ4-Dz1YH1ZUQN14YARXvnR7yA3Y9g@mail.gmail.com Whole thread Raw |
| In response to | Re: Logical Replication of sequences (vignesh C <vignesh21@gmail.com>) |
| List | pgsql-hackers |
Hi Vignesh,
Some more review comments for v20251029-0001.
======
.../replication/logical/sequencesync.c
1.
+ * The apply worker periodically scans pg_subscription_rel for sequences in
+ * INIT state. When such sequences are found, it spawns a
+ * sequencesync worker to handle synchronization.
I did not see anything in this header comment that says there is
currently no more than 1 sequencesync worker. The above part of the
comment doesn't make that clear.
~~~
2.
+ * Handle sequence synchronization cooperation from the apply worker.
Is it simpler just to say:
Apply worker determines if sequence synchronization is needed.
~~~
report_error_sequences:
3.
+static void
+report_error_sequences(StringInfo insuffperm_seqs, StringInfo mismatched_seqs,
+ StringInfo missing_seqs)
Function name seems strange. How about 'ereport_sequence_errors'?
~~~
4.
+ if (mismatched_seqs->len)
+ {
+ if (insuffperm_seqs->len)
+ {
+ appendStringInfo(combined_error_detail, "; mismatched sequence(s) on
subscriber: (%s)",
+ mismatched_seqs->data);
+ appendStringInfoString(combined_error_hint, " For mismatched
sequences, alter or re-create local sequences to match the publisher's
parameters.");
+ }
+ else
+ {
+ appendStringInfo(combined_error_detail, "Mismatched sequence(s) on
subscriber: (%s)",
+ mismatched_seqs->data);
+ appendStringInfoString(combined_error_hint, "For mismatched
sequences, alter or re-create local sequences to match the publisher's
parameters.");
+ }
+ }
+
+ if (missing_seqs->len)
+ {
+ if (insuffperm_seqs->len || mismatched_seqs->len)
+ {
+ appendStringInfo(combined_error_detail, "; missing sequence(s) on
publisher: (%s)",
+ missing_seqs->data);
+ appendStringInfoString(combined_error_hint, " For missing sequences,
remove them locally or run ALTER SUBSCRIPTION ... REFRESH PUBLICATION
to refresh the subscription.");
+ }
+ else
+ {
+ appendStringInfo(combined_error_detail, "Missing sequence(s) on
publisher: (%s)",
+ missing_seqs->data);
+ appendStringInfoString(combined_error_hint, "For missing sequences,
remove them locally or run ALTER SUBSCRIPTION ... REFRESH PUBLICATION
to refresh the subscription.");
+ }
+ }
This logic has a lot of duplication just to handle the separate
multiple details/hints. I think it can be simplified a lot.
SUGGESTION
if (mismatched_seqs->len)
{
if (combined_error_detail->len)
{
appendStringInfo(combined_error_detail, "; ");
appendStringInfoChar(combined_error_hint, ' ');
}
appendStringInfo(combined_error_detail, "Mismatched sequence(s) ...);
appendStringInfoString(combined_error_hint, "For mismatched sequences, ...);
}
if (missing_seqs->len)
{
if (combined_error_detail->len)
{
appendStringInfo(combined_error_detail, "; ");
appendStringInfoChar(combined_error_hint, ' ');
}
appendStringInfo(combined_error_detail, "Missing sequence(s) on ...);
appendStringInfoString(combined_error_hint, "For missing sequences,
remove ...);
}
~~~
get_remote_sequence_info:
5.
+static void
+get_remote_sequence_info(TupleTableSlot *slot, LogicalRepSeqHashKey *key,
+ int64 *last_value, bool *is_called,
+ XLogRecPtr *page_lsn, Oid *remote_typid,
+ int64 *remote_start, int64 *remote_increment,
+ int64 *remote_min, int64 *remote_max,
+ bool *remote_cycle)
I felt this code might be better if you would introduce a new
structure (or add to an existing one?) to hold all the members instead
of declaring a dozen variables and passing them as parameters. So, a
cleaner function signature here might be like:
static SequenceInfo get_remote_sequence_info(TupleTableSlot *slot).
This may also allow you to simplify other code that passes so many
members as parameters -- e.g. also the validation function.
~~~
validate_sequence:
6.
+ /* Sequence was concurrently dropped */
+ if (!sequence_rel)
+ return COPYSEQ_SKIPPED;
+
+ /* Sequence was concurrently dropped */
+ tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo->localrelid));
+ if (!HeapTupleIsValid(tup))
+ return COPYSEQ_SKIPPED;
+
+ /* Sequence was concurrently invalidated */
+ if (!seqinfo->entry_valid)
+ {
+ ReleaseSysCache(tup);
+ return COPYSEQ_SKIPPED;
+ }
6a.
All those comments might be better written as questions. e.g.
/* Sequence was concurrently dropped? */
/* Sequence was concurrently dropped? */
/* Sequence was concurrently invalidated? */
~
6b.
+ /* Sequence was concurrently dropped */
+ tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo->localrelid));
+ if (!HeapTupleIsValid(tup))
+ return COPYSEQ_SKIPPED;
I think the 2nd comment belongs after the tup assignment.
~
6c.
+ local_seq = (Form_pg_sequence) GETSTRUCT(tup);
+ if (local_seq->seqtypid != remote_typid ||
+ local_seq->seqstart != remote_start ||
+ local_seq->seqincrement != remote_increment ||
+ local_seq->seqmin != remote_min ||
+ local_seq->seqmax != remote_max ||
+ local_seq->seqcycle != remote_cycle)
+ result = COPYSEQ_MISMATCH;
This should have a comment like the others. e.g.
/* Sequence parameters for remote/local are the same? */
~~~
copy_sequence:
7.
+/*
+ * Apply remote sequence state to local sequence and mark it as synchronized.
+ */
Is it better to explicitly name the state here too? e.g.
/and mark it as synchronized/and mark it as synchronized (READY)/
~~~
8.
+ /*
+ * Make sure that the sequence is copied as table owner, unless the user
+ * has opted out of that behaviour.
+ */
+ if (!MySubscription->runasowner)
+ SwitchToUntrustedUser(seqinfo->seqowner, &ucxt);
8a.
Is "table owner" the correct term here?
~
8b.
Why not use the 'run_as_owner' variable?
~~~
copy_sequences:
9.
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive list of sequence information from the
publisher: %s",
+ res->err));
How about just:
"could not fetch sequence information from the publisher: %s",
~~~
10.
+ CopySeqResult result;
'result' seems a slightly meaningless variable name since this is not
even returned from the function.
~~~
sequencesync_list_invalidate_cb:
11.
+ if (reloid != InvalidOid)
Use macro !OidIsValid(reloid)
~
12.
+ hash_seq_init(&status, sequences_to_copy);
+
This init is common for the if/else so could be done outside.
~~~
LogicalRepSeqMatchFunc:
13.
+ /* Compare by namespace name first */
+ cmp = strcmp(k1->nspname, k2->nspname);
+ if (cmp != 0)
+ return cmp;
+
+ /* If namespace names are equal, compare by sequence name */
+ return strcmp(k1->seqname, k2->seqname);
A simpler way to write this comparator might look like:
/* Compare by namespace name first, then by sequence name */
cmp = strcmp(k1->nspname, k2->nspname);
if (cmp == 0)
cmp = strcmp(k1->seqname, k2->seqname);
return cmp;
~~~
LogicalRepSyncSequences:
14.
+/*
+ * Start syncing the sequences in the sequencesync worker.
+ */
+static void
+LogicalRepSyncSequences(void)
Might need a different function comment. This one seems almost the
same as the function comment for copy_sequences().
~~~
15.
+ key.seqname = RelationGetRelationName(sequence_rel);
+ key.nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
+
+ /* Allocate the tracking info in a permanent memory context. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ seq_entry = hash_search(sequences_to_copy, &key, HASH_ENTER, &found);
+ Assert(!found);
+
+ memset(seq_entry, 0, sizeof(LogicalRepSequenceInfo));
+
+ seq_entry->localrelid = subrel->srrelid;
+ seq_entry->remote_seq_queried = false;
+ seq_entry->seqowner = sequence_rel->rd_rel->relowner;
+ seq_entry->entry_valid = true;
Nit. It seems more natural to put the nspname before the seqname.
So,
key.nspname = ...
key.seqname = ...
and
seq_entry->nspname = pstrdup(key.nspname);
seq_entry->seqname = pstrdup(key.seqname);
~~~
LogicalRepSyncSequences:
16.
+ /* If there are any sequences that need to be copied */
+ if (hash_get_num_entries(sequences_to_copy))
+ copy_sequences(LogRepWorkerWalRcvConn, subid);
If there are no sequences to copy then AFAICT (from
SequenceSyncWorkerMain) this sequencesync worker is just going to just
stop and exit isn't it? So, why didn't we check this earlier and
perhaps could have avoided making a publisher connection
LogRepWorkerWalRcvConn for no reason?
======
src/backend/replication/logical/syncutils.c
FinishSyncWorker:
17.
+ if (am_tablesync_worker())
+ ereport(LOG,...));
+ else
+ ereport(LOG,...);
+ if (am_sequencesync_worker())
+ {
...
+ }
+ else
+ {
...
+ }
Using different ordering for these if/else conditions makes the logic
appear more complex. The second condition should be rearranged to be
same order as the first: if (am_tablesync_worker()) ... else ...;
~~~
launch_sync_worker:
18.
+ (void) logicalrep_worker_launch((OidIsValid(relid)) ?
WORKERTYPE_TABLESYNC : WORKERTYPE_SEQUENCESYNC,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ relid, DSM_HANDLE_INVALID, false);
I noticed you chose not to pass 'wtype' as a parameter here, but
instead do the ternary to figure out the wtype. AFAICT the caller
already knows it, so why not just pass it in, instead of figuring it
out again?
======
Kind Regards,
Peter Smith.
Fujitsu Australia
pgsql-hackers by date: