Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Synchronizing slots from primary to standby |
Date | |
Msg-id | CAHut+PsFs=Qyi+qBXixc78Hc6DeKvS2n5+Qpk1Perhq73j+Log@mail.gmail.com Whole thread Raw |
In response to | Re: Synchronizing slots from primary to standby (shveta malik <shveta.malik@gmail.com>) |
Responses |
Re: Synchronizing slots from primary to standby
|
List | pgsql-hackers |
Hi Shveta. Here are some comments for patch v14-0002 The patch is large, so my code review is a WIP... more later next week... ====== GENERAL 1. Patch size The patch is 2700 lines. Is it possible to break this up into smaller self-contained parts to make the reviews more manageable? ~~~ 2. PG Docs I guess there are missing PG Docs for this patch. E.g there are new GUCs added but I see no documentation yet for them. ~ 3. Terms There are variations of what to call the sync worker - "Slot sync worker" or - "slot-sync worker" or - "slot synchronization worker" or - "slot-synchronization worker" - and others These are all in the comments and messages etc. Better to search/replace to make a consistent term everywhere. FWIW, I preferred just to call it "slot-sync worker". ~ 4. typedefs I think multiple new typedefs are added by this patch. IIUC, those should be included in the file typedef.list so the pg_indent will work properly. 5. max_slot_sync_workers GUC There is already a 'max_sync_workers_per_subscription', but that one is for "tablesync" workers. IMO it is potentially confusing now that both these GUCs have 'sync_workers' in the name. I think it would be less ambiguous to change your new GUC to 'max_slotsync_workers'. ====== Commit Message 6. Overview? I felt the information in this commit message is describing details of what changes are in this patch but there is no synopsis about the *purpose* of this patch as a whole. Eg. What is it for? It seemed like there should be some introductory paragraph up-front before describing all the specifics. ~~~ 7. For slots to be synchronised, another GUC is added: synchronize_slot_names: This is a runtime modifiable GUC. ~ If this is added by this patch then how come there is some SGML describing the same GUC in patch 14-0001? What is the relationship? ~~~ 8. Let us say slots mentioned in 'synchronize_slot_names' on primary belongs to 10 DBs and say the new GUC is set at default value of 2, then each worker will manage 5 dbs and will keep on synching the slots for them. ~ /the new GUC is set at default value of 2/'max_slot_sync_workers' is 2/ ~~~ 9. If a new DB is found by replication launcher, it will assign this new db to the worker handling the minimum number of dbs currently (or first worker in case of equal count) ~ Hmm. Isn't this only describing cases where max_slot_workers was exceeded? Otherwise, you should just launch a brand new sync-worker, right? ~~~ 10. Each worker slot will have its own dbids list. ~ It seems confusing to say "worker slot" when already talking about workers and slots. Can you reword that more like "Each slot-sync worker will have its own dbids list"? ====== src/backend/postmaster/bgworker.c ====== .../libpqwalreceiver/libpqwalreceiver.c 11. libpqrcv_list_db_for_logical_slots +/* + * List DB for logical slots + * + * It gets the list of unique DBIDs for logical slots mentioned in slot_names + * from primary. + */ +static List * +libpqrcv_list_db_for_logical_slots(WalReceiverConn *conn, Comment needs some minor tweaking. ~~~ 12. + if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0) + { + char *rawname; + List *namelist; + ListCell *lc; + + appendStringInfoChar(&s, ' '); + rawname = pstrdup(slot_names); + SplitIdentifierString(rawname, ',', &namelist); + foreach (lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_identifier(lfirst(lc))); + } + } /rawname/rawnames/ ~~~ 13. + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive list of slots the primary server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } /the primary server/from the primary server/ ~~~ 14. + if (PQnfields(res) < 1) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Could not get list of slots: got %d fields, " + "expected %d or more fields.", + nfields, 1))); + } This code seems over-complicated. If it is < 1 then it can only be zero, right? So then what is the point of calculating and displaying the 'nfields' which can only be 0? ~~~ 15. + ntuples = PQntuples(res); + for (int i = 0; i < ntuples; i++) + { + + slot_data = palloc0(sizeof(WalRecvReplicationSlotDbData)); + if (!PQgetisnull(res, i, 0)) + slot_data->database = atooid(PQgetvalue(res, i, 0)); + + slot_data->last_sync_time = 0; + slotlist = lappend(slotlist, slot_data); + } 15a. Unnecessary blank line in for-block. ~~~ 15b. Unnecessary assignment to 'last_sync_time' because the whole structure was palloc0 just 2 lines above. ====== src/backend/replication/logical/Makefile ====== src/backend/replication/logical/launcher.c 16. +/* + * Initial and incremental allocation size for dbids array for each + * SlotSyncWorker in dynamic shared memory i.e. we start with this size + * and once it is exhausted, dbids is rellocated with size incremented + * by ALLOC_DB_PER_WORKER + */ +#define ALLOC_DB_PER_WORKER 100 I felt it might be simpler to just separate these values instead of having to describe how you make use of the same constant for 2 meanings For example, #define DB_PER_WORKER_ALLOC_INIT 100 #define DB_PER_WORKER_ALLOC_EXTRA 100 ~~~ 17. static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); - /* Unnecessary whitespace change. ~~~ 18. ApplyLauncherShmemInit bool found; + bool foundSlotSync; I think it is simpler to just use the same 'found' variable again. ~~ 19. ApplyLauncherShmemInit + /* Allocate shared-memory for slot-sync workers pool now */ + LogicalRepCtx->ss_workers = (SlotSyncWorker *) + ShmemInitStruct("Replication slot synchronization workers", + mul_size(max_slot_sync_workers, sizeof(SlotSyncWorker)), + &foundSlotSync); + + if (!foundSlotSync) + { + int slot; + + for (slot = 0; slot < max_slot_sync_workers; slot++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[slot]; + + memset(worker, 0, sizeof(SlotSyncWorker)); + } + } Why is the memset in a loop? Can't we just zap the whole ss_workers array in one go using that same mul_size Size? SUGGESTION Size ssw_size = mul_size(max_slot_sync_workers, sizeof(SlotSyncWorker)); if (!found) memset(LogicalRepCtx->ss_workers, 0, ssw_size); ====== .../replication/logical/logicalfuncs.c ====== src/backend/replication/logical/meson.build ====== src/backend/replication/logical/slotsync.c ====== src/backend/replication/logical/tablesync.c ====== src/backend/replication/repl_gram.y ====== src/backend/replication/repl_scanner.l ====== src/backend/replication/slot.c ====== src/backend/replication/walsender.c ====== src/backend/storage/lmgr/lwlocknames.txt ====== .../utils/activity/wait_event_names.txt ====== src/backend/utils/misc/guc_tables.c 20. + { + {"max_slot_sync_workers", + PGC_SIGHUP, + REPLICATION_STANDBY, + gettext_noop("Maximum number of slots synchronization workers " + "on a standby."), + NULL, + }, + &max_slot_sync_workers, + 2, 0, MAX_SLOT_SYNC_WORKER_LIMIT, + NULL, NULL, NULL + }, + /slots synchronization/slot synchronization/ OR /slots synchronization/slot-sync/ ====== src/backend/utils/misc/postgresql.conf.sample 21. +#max_slot_sync_workers = 2 # max number of slot synchronization workers Should this comment match the guc_tables.c text. E.g should it say "... on a standby" ====== src/include/commands/subscriptioncmds.h ====== src/include/nodes/replnodes.h ====== src/include/postmaster/bgworker_internals.h ====== src/include/replication/logicallauncher.h ====== src/include/replication/logicalworker.h ====== src/include/replication/slot.h 22. + + /* + * Is standby synced slot? + */ + bool synced; } ReplicationSlotPersistentData; Comment is unclear: - does it mean "has this primary slot been synsc to standby" ? - does it mean "this is a slot created by a sync-slot worker"? - something else? ====== src/include/replication/walreceiver.h 23. +/* + * Slot's DBids receiver from remote. + */ +typedef struct WalRecvReplicationSlotDbData +{ + Oid database; + TimestampTz last_sync_time; +} WalRecvReplicationSlotDbData; + Is that comment correct? Or should it be more like "The slot's DBid received from remote.". Anyway, that comment seems more for the 'database' field only, not a structure-level comment. ~~~ 24. walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_list_db_for_logical_slots_fn walrcv_list_db_for_logical_slots; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; This function name doesn't seem consistent with the existing names. Something like 'walrcv_get_dbinfo_for_logical_slots_fn' might be better? ====== src/include/replication/worker_internal.h 25. +typedef struct SlotSyncWorkerWatchSlot +{ + NameData slot_name; + XLogRecPtr confirmed_lsn; + int inactivity_count; +} SlotSyncWorkerWatchSlot; I did not find any reference to this typedef except in the following struct for SlotSyncWorker. So why not just make this a nested structure within 'SlotSyncWorker' instead? ~~~ 26. +typedef struct SlotSyncWorker +{ + /* Time at which this worker was launched. */ + TimestampTz launch_time; + + /* Indicates if this slot is used or free. */ + bool in_use; + + /* The slot in worker pool to which it is attached */ + int slot; + + /* Increased every time the slot is taken by new worker. */ + uint16 generation; + + /* Pointer to proc array. NULL if not running. */ + PGPROC *proc; + + /* User to use for connection (will be same as owner of subscription). */ + Oid userid; + + /* Database id to connect to. */ + Oid dbid; + + /* Count of Database ids it manages */ + uint32 dbcount; + + /* DSA for dbids */ + dsa_area *dbids_dsa; + + /* dsa_pointer for database ids it manages */ + dsa_pointer dbids_dp; + + /* Mutex to access dbids in dsa */ + slock_t mutex; + + /* Info about slot being monitored for worker's naptime purpose */ + SlotSyncWorkerWatchSlot monitor; +} SlotSyncWorker; There seems an awful lot about this struct which is common with 'LogicalRepWorker' struct. It seems a shame not to make use of the commonality instead of all the cut/paste here. E.g. Can it be rearranged so all these common fields are shared: - launch_time - in_use - slot - generation - proc - userid - dbid ====== src/include/storage/lwlock.h 27. LWTRANCHE_LAUNCHER_HASH, - LWTRANCHE_FIRST_USER_DEFINED + LWTRANCHE_FIRST_USER_DEFINED, + LWTRANCHE_SLOT_SYNC_DSA } BuiltinTrancheIds; Isn't 'LWTRANCHE_FIRST_USER_DEFINED' supposed to the be last enum? ====== src/test/recovery/meson.build ====== src/test/recovery/t/051_slot_sync.pl 28. + +# Copyright (c) 2021, PostgreSQL Global Development Group + +use strict; Wrong copyright date ~~~ 29. +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby'); +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); 29a. Can't all the subroutines be up-front? Then this can move to be with the other node initialisation code that comets next. ~ 29b. Add a comment something like # Setup nodes ~~~ 30. +# Check conflicting status in pg_replication_slots. +sub check_slots_conflicting_status +{ + my $res = $node_phys_standby->safe_psql( + 'postgres', qq( + select bool_and(conflicting) from pg_replication_slots;)); + + is($res, 't', + "Logical slot is reported as conflicting"); +} Doesn't bool_and() mean returns false if only some but not all slots are conflicting - is that intentional?> Or is this sub-routine only expecting to test one slot, in which case maybe the SQL should include also the 'slot_name'? ~~~ 31. +$node_primary->start; +$node_primary->psql('postgres', q{SELECT pg_create_physical_replication_slot('pslot1');}); + +$node_primary->backup('backup'); + +$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 1); +$node_phys_standby->append_conf('postgresql.conf', q{ +synchronize_slot_names = '*' +primary_slot_name = 'pslot1' +hot_standby_feedback = off +}); +$node_phys_standby->start; + +$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)"); +$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)"); The comments seem mostly to describe details about what are the expectations at each test step. IMO there also needs to be a larger "overview" comment to describe more generally *what* this is testing, and *how* it is testing it. e.g. it is hard to understand the test without being already familiar with the patch. ------ Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: