Re: Logical Replication of sequences - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Logical Replication of sequences |
Date | |
Msg-id | CAHut+PuDkR7thJy6JFiSYeYb5eEX+Gpcc3Qz9uc-cVj31GnswQ@mail.gmail.com Whole thread Raw |
In response to | Re: Logical Replication of sequences (vignesh C <vignesh21@gmail.com>) |
Responses |
Re: Logical Replication of sequences
|
List | pgsql-hackers |
Hi Vignesh. Here are the rest of my comments for patch v20240705-0003. (Apologies for the length of this post; but it was unavoidable due to this being the 1st review of a very large large 1700-line patch) ====== src/backend/catalog/pg_subscription.c 1. GetSubscriptionSequences +/* + * Get the sequences for the subscription. + * + * The returned list is palloc'ed in the current memory context. + */ Is that comment right? The palloc seems to be done in CacheMemoryContext, not in the current context. ~ 2. The code is very similar to the other function GetSubscriptionRelations(). In fact I did not understand how the 2 functions know what they are returning: E.g. how does GetSubscriptionRelations not return sequences too? E.g. how does GetSubscriptionSequences not return relations too? ====== src/backend/commands/subscriptioncmds.c CreateSubscription: nitpick - put the sequence logic *after* the relations logic because that is the order that seems used everywhere else. ~~~ 3. AlterSubscription_refresh - logicalrep_worker_stop(sub->oid, relid); + /* Stop the worker if relation kind is not sequence*/ + if (relkind != RELKIND_SEQUENCE) + logicalrep_worker_stop(sub->oid, relid); Can you give more reasons in the comment why skip the stop for sequence worker? ~ nitpick - period and space in the comment ~~~ 4. for (off = 0; off < remove_rel_len; off++) { if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE && + get_rel_relkind(sub_remove_rels[off].relid) != RELKIND_SEQUENCE) { Would this new logic perhaps be better written as: if (get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE) continue; ~~~ AlterSubscription_refreshsequences: nitpick - rename AlterSubscription_refresh_sequences ~ 5. There is significant code overlap between the existing AlterSubscription_refresh and the new function AlterSubscription_refreshsequences. I wonder if it is better to try to combine the logic and just pass another parameter to AlterSubscription_refresh saying to update the existing sequences if necessary. Particularly since the AlterSubscription_refresh is already tweaked to work for sequences. Of course, the resulting combined function would be large and complex, but maybe that would still be better than having giant slabs of nearly identical cut/paste code. Thoughts? ~~~ check_publications_origin: nitpick - move variable declarations ~~~ fetch_sequence_list: nitpick - change /tablelist/seqlist/ nitpick - tweak the spaces of the SQL for alignment (similar to fetch_table_list) ~ 6. + " WHERE s.pubname IN ("); + first = true; + foreach_ptr(String, pubname, publications) + { + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname->sval)); + } + appendStringInfoChar(&cmd, ')'); IMO this can be written much better by using get_publications_str() function to do all this list work. ====== src/backend/replication/logical/launcher.c 7. logicalrep_worker_find /* * Walks the workers array and searches for one that matches given * subscription id and relid. * * We are only interested in the leader apply worker or table sync worker. */ The above function comment (not in the patch 0003) is stale because this AFAICT this is also going to return sequence workers if it finds one. ~~~ 8. logicalrep_sequence_sync_worker_find +/* + * Walks the workers array and searches for one that matches given + * subscription id. + * + * We are only interested in the sequence sync worker. + */ +LogicalRepWorker * +logicalrep_sequence_sync_worker_find(Oid subid, bool only_running) There are other similar functions for walking the workers array to search for a worker. Instead of having different functions for different cases, wouldn't it be cleaner to combine these into a single function, where you pass a parameter (e.g. a mask of worker types that you are interested in finding)? ~ nitpick - declare a for loop variable 'i' ~~~ 9. logicalrep_apply_worker_find +static LogicalRepWorker * +logicalrep_apply_worker_find(Oid subid, bool only_running) All the other find* functions assume the lock is already held (Assert(LWLockHeldByMe(LogicalRepWorkerLock));). But this one is different. IMO it might be better to acquire the lock in the caller to make all the find* functions look the same. Anyway, that will help to combine everything into 1 "find" worker as suggested in the previous review comment #8. ~ nitpick - declare a for loop variable 'i' nitpick - removed unnecessary parens in condition. ~~~ 10. logicalrep_worker_launch /*---------- * Sanity checks: * - must be valid worker type * - tablesync workers are only ones to have relid * - parallel apply worker is the only kind of subworker */ The above code-comment (not in the 0003 patch) seems stale. This should now also mention sequence sync workers, right? ~~~ 11. - Assert(is_tablesync_worker == OidIsValid(relid)); + Assert(is_tablesync_worker == OidIsValid(relid) || is_sequencesync_worker == OidIsValid(relid)); IIUC there is only a single sequence sync worker for handling all the sequences. So, what does the 'relid' actually mean here when there are multiple sequences? ~~~ 12. logicalrep_seqsyncworker_failuretime +/* + * Set the sequence sync worker failure time + * + * Called on sequence sync worker failure exit. + */ 12a. The comment should be improved to make it more clear that the failure time of the sync worker information is stored with the *apply* worker. See also other review comments in this post about this area -- perhaps all this can be removed? ~ 12b. Curious if this had to be a separate exit handler or if may this could have been handled by the existing logicalrep_worker_onexit handler. See also other review comments int this post about this area -- perhaps all this can be removed? ====== .../replication/logical/sequencesync.c 13. fetch_sequence_data 13a. The function comment has no explanation of what exactly the returned value means. It seems like it is what you will assign as 'last_value' on the subscriber-side. ~ 13b. Some of the table functions like this are called like 'fetch_remote_table_info()'. Maybe it is better to do similar here (e.g. include the word "remote" in the function name). ~ 14. The reason for the addition logic "(last_value + log_cnt)" is not obvious. I am guessing it might be related to code from 'nextval_internal' (fetch = log = fetch + SEQ_LOG_VALS;) but it is complicated. It is unfortunate that the field 'log_cnt' seems hardly commented anywhere at all. Also, I am not 100% sure if I trust the logic in the first place. The caller of this function is doing: sequence_value = fetch_sequence_data(conn, remoteid, &lsn); /* sets the sequence with sequence_value */ SetSequenceLastValue(RelationGetRelid(rel), sequence_value); Won't that mean you can get to a situation where subscriber-side result of lastval('s') can be *ahead* from lastval('s') on the publisher? That doesn't seem good. ~~~ copy_sequence: nitpick - ERROR message. Reword "for table..." to be more like the 2nd error message immediately below. nitpick - /RelationGetRelationName(rel)/relname/ nitpick - moved the Assert for 'relkind' to be nearer the assignment. ~ 15. + /* + * Logical replication of sequences is based on decoding WAL records, + * describing the "next" state of the sequence the current state in the + * relfilenode is yet to reach. But during the initial sync we read the + * current state, so we need to reconstruct the WAL record logged when we + * started the current batch of sequence values. + * + * Otherwise we might get duplicate values (on subscriber) if we failed + * over right after the sync. + */ + sequence_value = fetch_sequence_data(conn, remoteid, &lsn); + + /* sets the sequence with sequence_value */ + SetSequenceLastValue(RelationGetRelid(rel), sequence_value); (This is related to some earlier review comment #14 above). IMO all this tricky commentary belongs in the function header of "fetch_sequence_data", where it should be describing that function's return value. ~~~ LogicalRepSyncSequences: nitpick - declare void param nitpick indentation nitpick - wrapping nitpick - /sequencerel/sequence_rel/ nitpick - blank lines ~ 16. + if (check_enable_rls(RelationGetRelid(sequencerel), InvalidOid, false) == RLS_ENABLED) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"", + GetUserNameFromId(GetUserId(), true), + RelationGetRelationName(sequencerel))); This should be reworded to refer to sequences instead of relations. Maybe like: user \"%s\" cannot replicate into sequence \"%s\" with row-level security enabled" ~ 17. The Calculations involving the BATCH size seem a bit tricky. e.g. in 1st place it is doing: (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH == 0) e.g. in 2nd place it is doing: (next_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) Maybe this batch logic can be simplified somehow using a bool variable for the calculation? Also, where does the number 100 come from? Why not 1000? Why not 10? Why have batching at all? Maybe there should be some comment to describe the reason and the chosen value. ~ 18. + next_seq = curr_seq + 1; + if (((next_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || next_seq == seq_count) + { + /* LOG all the sequences synchronized during current batch. */ + int i = curr_seq - (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH); + for (; i <= curr_seq; i++) + { + SubscriptionRelState *done_seq; + done_seq = (SubscriptionRelState *) lfirst(list_nth_cell(sequences, i)); + ereport(LOG, + errmsg("logical replication synchronization for subscription \"%s\", sequence \"%s\" has finished", + get_subscription_name(subid, false), get_rel_name(done_seq->relid))); + } + + CommitTransactionCommand(); + } + + curr_seq++; I feel this batching logic needs more comments describing what you are doing here. ~~~ SequencesyncWorkerMain: nitpick - spaces in the function comment ====== src/backend/replication/logical/tablesync.c 19. finish_sync_worker -finish_sync_worker(void) +finish_sync_worker(bool istable) IMO, for better readability (here and in the callers) the new parameter should be the enum LogicalRepWorkerType. Since we have that enum, might as well make good use of it. ~ nitpick - /sequences synchronization worker/sequence synchronization worker/ nitpick - comment tweak ~ 20. + char relkind; + + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + relkind = get_rel_relkind(rstate->relid); + if (relkind == RELKIND_SEQUENCE) + continue; I am wondering is it possible to put the relkind check can come *before* the TX code here, because in case there are *only* sequences then maybe every would be skipped and there would have been no need for any TX at all in the first place. ~~~ process_syncing_sequences_for_apply: nitpick - fix typo and slight reword function header comment. Also /last start time/last failure time/ nitpick - tweak comments nitpick - blank lines ~ 21. + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + relkind = get_rel_relkind(rstate->relid); + if (relkind != RELKIND_SEQUENCE || rstate->state != SUBREL_STATE_INIT) + continue; Wondering (like in review comment #20) if it is possible to swap those because maybe there was no reason for any TX if the other condition would always continue. ~~~ 22. + if (nsyncworkers < max_sync_workers_per_subscription) + { + TimestampTz now = GetCurrentTimestamp(); + if (!MyLogicalRepWorker->sequencesync_failure_time || + TimestampDifferenceExceeds(MyLogicalRepWorker->sequencesync_failure_time, + now, wal_retrieve_retry_interval)) + { + MyLogicalRepWorker->sequencesync_failure_time = 0; It seems to me that storing 'sequencesync_failure_time' logic may be unnecessarily complicated. Can't the same "throttling" be achieved by storing the synchronization worker 'start time' instead of 'fail time', in which case then you won't have to mess around with considering if the sync worker failed or just exited normally etc? You might also be able to remove all the logicalrep_seqsyncworker_failuretime() exit handler code. ~~~ process_syncing_tables: nitpick - let's process tables before sequences (because all other code is generally in this same order) nitpick - removed some excessive comments about code that is not supposed to happen ====== src/backend/replication/logical/worker.c should_apply_changes_for_rel: nitpick - IMO there were excessive comments for something that is not going to happen ~~~ 23. InitializeLogRepWorker /* * Common initialization for leader apply worker, parallel apply worker and * tablesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. */ That comment (not part of patch 0003) is stale; it should now mention the sequence sync worker as well, right? ~ nitpick - Tweak plural /sequences sync worker/sequence sync worker/ ~~~ 24. SetupApplyOrSyncWorker /* Common function to setup the leader apply or tablesync worker. */ That comment (not part of patch 0003) is stale; it should now mention the sequence sync worker as well, right? ====== src/include/nodes/parsenodes.h 25. ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES, For consistency with your new enum it would be better to also change the existing enum name ALTER_SUBSCRIPTION_REFRESH ==> ALTER_SUBSCRIPTION_REFRESH_PUBLICATION. ====== src/include/replication/logicalworker.h nitpick - IMO should change the function name /SequencesyncWorkerMain/SequenceSyncWorkerMain/, and in passing make the same improvement to the TablesyncWorkerMain function name. ====== src/include/replication/worker_internal.h 26. WORKERTYPE_PARALLEL_APPLY, + WORKERTYPE_SEQUENCESYNC, } LogicalRepWorkerType; AFAIK the enum order should not matter here so it would be better to put the WORKERTYPE_SEQUENCESYNC directly after the WORKERTYPE_TABLESYNC to keep the similar things together. ~ nitpick - IMO change the macro name /isSequencesyncWorker/isSequenceSyncWorker/, and in passing make the same improvement to the isTablesyncWorker macro name. ====== src/test/subscription/t/034_sequences.pl nitpick - Copyright year nitpick - Modify the "Create subscriber node" comment for consistency nitpick - Modify comments slightly for the setup structure parts nitpick - Add or remove various blank lines nitpick - Since you have sequences 's2' and 's3', IMO it makes more sense to call the original sequence 's1' instead of just 's' nitpick - Rearrange so the CREATE PUBLICATION/SUBSCRIPTION can stay together nitpick - Modified some comment styles to clearly delineate all the main "TEST" scenarios nitpick - In the REFRESH PUBLICATION test the create new sequence and update existing can be combined (like you do in a later test). nitpick - Changed some of the test messages for REFRESH PUBLICATION which seemed wrong nitpick - Added another test for 's1' in REFRESH PUBLICATION SEQUENCES nitpick - Changed some of the test messages for REFRESH PUBLICATION SEQUENCES which seemed wrong ~ 27. IIUC the preferred practice is to give these test object names a 'regress_' prefix. ~ 28. +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s; +)); + +is($result, '132|0|t', 'initial test data replicated'); 28a. Maybe it is better to say "SELECT last_value, log_cnt, is_called" instead of "SELECT *" ? Note - this is in a couple of places. ~ 28b. Can you explain why the expected sequence value its 132, because AFAICT you only called nextval('s') 100 times, so why isn't it 100? My guess is that it seems to be related to code in "nextval_internal" (fetch = log = fetch + SEQ_LOG_VALS;) but it kind of defies expectations of the test, so if it really is correct then it needs commentary. Actually, I found other regression test code that deals with this: -- log_cnt can be higher if there is a checkpoint just at the right -- time, so just test for the expected range SELECT last_value, log_cnt IN (31, 32) AS log_cnt_ok, is_called FROM foo_seq_new; Do you have to do something similar? Or is this a bug? See my other review comments for function fetch_sequence_data in sequencesync.c ====== 99. Please also see the attached diffs patch which implements any nitpicks mentioned above. ====== Kind Regards, Peter Smith. Fujitsu Australia
Attachment
pgsql-hackers by date: