Re: Logical Replication of sequences - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Logical Replication of sequences
Date
Msg-id CAHut+PvwH2GXAiPoWePbSW6vsenZcFWuRxRCUbPXL8j+OQ0zSQ@mail.gmail.com
Whole thread Raw
In response to Logical Replication of sequences  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
Hi Vignesh,

Here are some review comments for the patch v20241211-0004.

======
GENERAL

1.
There are more than a dozen places where the relation (relkind) is
checked to see if it is a SEQUENCE:

e.g. + get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE &&
e.g. + if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
e.g. + if (relkind == RELKIND_SEQUENCE && !get_sequences)
e.g. + if (relkind != RELKIND_SEQUENCE && !get_tables)
e.g. + relkind == RELKIND_SEQUENCE ? "sequence" : "table",
e.g. + if (relkind != RELKIND_SEQUENCE)
e.g. + relkind == RELKIND_SEQUENCE ? "sequence" : "table",
e.g. + if (get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE)
e.g. + if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
e.g. + relkind != RELKIND_SEQUENCE)
e.g. + Assert(get_rel_relkind(rstate->relid) == RELKIND_SEQUENCE);
e.g. + Assert(relkind == RELKIND_SEQUENCE);
e.g. + if (get_rel_relkind(rstate->relid) == RELKIND_SEQUENCE)
e.g. + Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);

I am wondering if the code might be improved slightly by adding one new macro:

#define RELKIND_IS_SEQUENCE(relkind) ((relkind) == RELKIND_SEQUENCE)

======
Commit message

2.
1) CREATE SUBSCRIPTION
    - (PG17 command syntax is unchanged)
    - The subscriber retrieves sequences associated with publications.
    - Publisher sequences are added to pg_subscription_rel with INIT state.
    - Initiates the sequencesync worker (see above) to synchronize all
      sequences.

~

Shouldn't this say "Published sequences" instead of "Publisher sequences"?

I guess if the patch currently supports only ALL SEQUENCES then maybe
it amounts to the same thing, but IMO "Published" seems more correct.

~~~

3.
2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    - (PG17 command syntax is unchanged)
    - Dropped publisher sequences are removed from pg_subscription_rel.
    - New publisher sequences are added to pg_subscription_rel with INIT state.
    - Initiates the sequencesync worker (see above) to synchronize only
      newly added sequences.

3) ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES
    - The patch introduces this new command to refresh all sequences
    - Dropped publisher sequences are removed from pg_subscription_rel.
    - New publisher sequences are added to pg_subscription_rel
    - All sequences in pg_subscription_rel are reset to INIT state.
    - Initiates the sequencesync worker (see above) to synchronize all
      sequences.

~

Ditto previous comment -- maybe those should say "Newly published
sequences" instead of "New publisher sequences"/

~~~

4.
Should there be some mention of the WARNING logged if sequence
parameter differences are detected?

======
src/backend/catalog/pg_subscription.c

GetSubscriptionRelations:

5.
+ * all_states:
+ * If getting tables, if all_states is true get all tables, otherwise
+ * only get tables that have not reached READY state.
+ * If getting sequences, if all_states is true get all sequences,
+ * otherwise only get sequences that are in INIT state.
+ *
+ * The returned list is palloc'ed in the current memory context.

and

- if (not_ready)
+ if (!all_states)
  ScanKeyInit(&skey[nkeys++],
  Anum_pg_subscription_rel_srsubstate,
  BTEqualStrategyNumber, F_CHARNE,
                    CharGetDatum(SUBREL_STATE_READY));
~

It was a bit confusing that the code for (!all_states) is excluding
everything that is not in SUBREL_STATE_READY, but OTOH the function
comment for sequences it said "that are in INIT state". Maybe that
function comment for sequence also should have said "that have not
reached READY state (i.e. are still in INIT state)" to better match
the code.

~~~

6.
+ /* Skip sequences if they were not requested */
+ if (relkind == RELKIND_SEQUENCE && !get_sequences)
+ continue;
+
+ /* Skip tables if they were not requested */
+ if (relkind != RELKIND_SEQUENCE && !get_tables)
+ continue;

Somehow, I feel this logic would seem simpler if expressed differently
to make it more explicit that the relation is either a table or a
sequence. e.g. by adding some variables.

SUGGESTION:

bool is_sequence;
bool is_table;

...

/* Relation is either a sequence or a table */
is_sequence = get_rel_relkind(subrel->srrelid) == RELKIND_SEQUENCE;
is_table = !is_sequence;

/* Skip sequences if they were not requested */
if (!get_sequences && is_sequence)
  continue;

/* Skip tables if they were not requested */
if (!get_tables && is_table)
  continue;

======
src/backend/commands/sequence.c

7.
 /*
- * Implement the 2 arg setval procedure.
- * See do_setval for discussion.
+ * Implement the 2 arg set sequence procedure.
+ * See SetSequence for discussion.
  */
 Datum
 setval_oid(PG_FUNCTION_ARGS)

~

Not sure this function comment is right. Shouldn't it still say
"Implement the 2 arg setval procedure."

~~~

8.
 /*
- * Implement the 3 arg setval procedure.
- * See do_setval for discussion.
+ * Implement the 3 arg set sequence procedure.
+ * See SetSequence for discussion.
  */
 Datum
 setval3_oid(PG_FUNCTION_ARGS)

~

Not sure this function comment is right. Shouldn't it still say
"Implement the 3 arg setval procedure."

======
src/backend/commands/subscriptioncmds.c

AlterSubscription_refresh:

9.
+ * If 'refresh_tables' is true, update the subscription by adding or removing
+ * tables that have been added or removed since the last subscription creation
+ * or refresh publication.
+ *
+ * If 'refresh_sequences' is true, update the subscription by adding
or removing
+ * sequences that have been added or removed since the last subscription
+ * creation or publication refresh.

The first para says "refresh publication". The second para says
"publication refresh", but I guess it should also be saying "refresh
publication".

~~~

10.
+ if (resync_all_sequences)
+ {
+
+ UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
+    InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name));
+ }

Has unnecessary blank line.

~~~

AlterSubscription:

11.
Some of the generic error messages in this function are now
potentially misleading.

e.g. There are multiple places in this function that say "ALTER
SUBSCRIPTION ... REFRESH", meaning ALTER SUBSCRIPTION <subname>
REFRESH PUBLICATION, but not meaning ALTER SUBSCRIPTION <subname>
REFRESH PUBLICATION SEQUENCES, so possibly those need to be modified
to eliminate any ambiguity.

(Actually, maybe it is not only in this function -- the short form
"ALTER SUBSCRIPTION ... REFRESH" seems to be scattered in other
comments in this file also).

~~~

12.
- logicalrep_worker_stop(w->subid, w->relid);
+ /* Worker might have exited because of an error */
+ if (w->type == WORKERTYPE_UNKNOWN)
+ continue;
+
+ logicalrep_worker_stop(w->subid, w->relid, w->type);

It may be better to put that special case WORKERTYPE_UNKNOWN condition
as a quick exit within the logicalrep_worker_stop() function.

======
src/backend/replication/logical/launcher.c

logicalrep_worker_find:

13.
+ Assert(wtype == WORKERTYPE_TABLESYNC ||
+    wtype == WORKERTYPE_SEQUENCESYNC ||
+    wtype == WORKERTYPE_APPLY);
+

The master version of this function checked for
isParallelApplyWorker(w), but now if a WORKERTYPE_PARALLEL_APPLY ever
got to here it would fail the above Assert. So, is the patch code OK,
or do we still need to also account for a possible
WORKERTYPE_PARALLEL_APPLY reaching here?

~~~

logicalrep_worker_stop:

14.
worker = logicalrep_worker_find(subid, relid, wtype, false);

if (worker)
{
  Assert(!isParallelApplyWorker(worker));
  logicalrep_worker_stop_internal(worker, SIGTERM);
}

~

This code is not changed much from before, so it first finds the
worker, but then asserts that it must not be not a parallel apply
worker. But now, since the wtype is known and passed to the function
why not move the Assert up-front based on the wtype and before even
doing the 'find'?

======
.../replication/logical/sequencesync.c

ProcessSyncingSequencesForApply:

15.
+ /*
+ * Check if there is a sequence worker already running?
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+ InvalidOid, WORKERTYPE_SEQUENCESYNC,
+ true);
+ if (syncworker)
+ {
+ /* Now safe to release the LWLock */
+ LWLockRelease(LogicalRepWorkerLock);
+ break;
+ }

15a.
Comment: /sequence worker/sequencesync worker/

~

15b.
Maybe it's better to call this variable 'sequencesync_worker' or
similar because sync worker is too generic

~~~

fetch_remote_sequence_data:

16.
+/*
+ * fetch_remote_sequence_data
+ *
+ * Retrieves sequence data (last_value, log_cnt, page_lsn, and is_called)
+ * from a remote node.

The SELECT of this function fetch a lot more columns, so why are only
these few mentioned in the function comment?

~

17.
+ res = walrcv_exec(conn, cmd.data, REMOTE_SEQ_COL_COUNT, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ errmsg("could not receive sequence list from the publisher: %s",
+    res->err));

That error msg does seem quite right. IIUC, this is just the data from
a single sequence; it is not a "sequence list".

~~~

report_mismatched_sequences:

18.
+static void
+report_mismatched_sequences(StringInfo mismatched_seqs)
+{
+ if (mismatched_seqs->len)
+ {
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("parameters differ for the remote and local sequences (%s)
for subscription \"%s\"",
+    mismatched_seqs->data, MySubscription->name),
+ errhint("Alter/Re-create local sequences to have the same parameters
as the remote sequences."));
+
+ resetStringInfo(mismatched_seqs);
+ }
+}

I'm confused. The errhint says "Alter/Re-create local sequences to
have the same parameters", but in the function 'copy_sequence' there
was the code (below) that seems to be already setting the sequence
values (SetSequence) regardless of whether it detected
sequence_mismatch true/false.

+ *sequence_mismatch = !fetch_remote_sequence_data(conn, relid, remoteid,
+ nspname, relname,
+ &seq_log_cnt, &seq_is_called,
+ &seq_page_lsn, &seq_last_value);
+
+ SetSequence(RelationGetRelid(rel), seq_last_value, seq_is_called,
+ seq_log_cnt);

Is the setting regardless like that OK, or just going to lead to weird
integrity errors?

~~~

LogicalRepSyncSequences:

19.
+/*
+ * Start syncing the sequences in the sync worker.
+ */
+static void
+LogicalRepSyncSequences(void)

/sync worker/sequencesync worker/

~~~

20.
+ curr_seq++;
+
+ /*
+ * Have we reached the end of the current batch of sequences, or last
+ * remaining sequences to synchronize?
+ */
+ if (((curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) ||
+ curr_seq == seq_count)
+ {
+ /* LOG all the sequences synchronized during current batch. */
+ for (int i = (curr_seq - 1) - ((curr_seq - 1) % MAX_SEQUENCES_SYNC_PER_BATCH);
+ i < curr_seq; i++)
+ {

The calculation is quite tricky.

IMO this might all be simplified if you have another variable like
'batch_seq' that just ranges from 0 -- MAX_SEQUENCES_SYNC_PER_BATCH,
then do something like:

SUGGESTION:

cur_seq++;
batch_seq++;

if (batch_seq >= MAX_SEQUENCES_SYNC_PER_BATCH || cur_seq == seq_count)
{
  /* Process the batch. */

  ...

  /* LOG all the sequences synchronized during the current batch. */
  for (int i = 0; i < batch_seq; i++)
  {
    ...
  }

  /* Prepare for next batch */
  batch_seq = 0;
}

======
src/backend/replication/logical/syncutils.c

21.
 List    *table_states_not_ready = NIL;
+List    *sequence_states_not_ready = NIL;

I thought this declaration belonged more in the sequencesync.c file.

~~~

SyncProcessRelations:

22.
 /*
- * Process possible state change(s) of tables that are being synchronized.
+ * Process possible state change(s) of tables that are being synchronized and
+ * start new tablesync workers for the newly added tables and start new
+ * sequencesync worker for the newly added sequences.
  */

/added tables and start new sequencesync worker/added tables. Also,
start a new sequencesync worker/

~~~

FetchRelationStates:

23.
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
+ * Returns true if subscription has 1 or more tables, else false.
  */
 bool
-FetchRelationStates(bool *started_tx)
+FetchRelationStates(void)

Partly because of the name (relations), I felt this might be better to
be a void function and the returned value would be passed back by
references (bool *has_tables).

======
src/backend/replication/logical/worker.c

SetupApplyOrSyncWorker:

24.
+
+ if (am_sequencesync_worker())
+ before_shmem_exit(logicalrep_seqsyncworker_failuretime, (Datum) 0);

There should be a comment saying what this callback is for.

======
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: Re: AIO v2.0
Next
From: Jacob Champion
Date:
Subject: Re: [PoC] Let libpq reject unexpected authentication requests