Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Synchronizing slots from primary to standby |
Date | |
Msg-id | CAHut+PvX1q8CyO+QHUTnn_aPg3PM9eDUOdQGyWdKFKVoDfo67g@mail.gmail.com Whole thread Raw |
In response to | Re: Synchronizing slots from primary to standby (shveta malik <shveta.malik@gmail.com>) |
Responses |
Re: Synchronizing slots from primary to standby
|
List | pgsql-hackers |
Hi. Here are some review comments for v17-0002. This is a WIP and a long way from complete, but I wanted to send what I have so far (while it is still current with your latest posted patches). ====== 1. GENERAL - loop variable declaration There are some code examples like below where the loop variable is declared within the for. AFAIK this style of declaration is atypical for the PG source. + /* Find unused worker slot. */ + for (int i = 0; i < max_slotsync_workers; i++) Search/Replace. ~~~ 2. GENERAL - from primary There are multiple examples in messages and comments that say "from primary". I felt most would be better to say "from the primary". Search/Replace. ~~~ 3. GENERAL - pg_indent There are lots of examples of function arguments like "* worker" (with space) which changed to "*worker" (without space) in v16 and then changed back to "* worker" with space in v17. Can all these toggles be cleaned up by running pg_indent? ====== Commit message. 4. This patch attempts to implement logical replication slots synchronization from primary server to physical standby so that logical subscribers are not blocked after failover. Now-on, all the logical replication slots created on primary (assuming configurations are appropriate) are automatically created on physical standbys and are synced periodically. This has been acheived by starting slot-sync worker(s) on standby server which pings primary at regular intervals to get the logical slots information and create/update the slots locally. SUGGESTION (just minor rewording) This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical subscribers are not blocked after failover. All the logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker(s) on the standby server ping the primary at regular intervals to get the necessary logical slot information and create/update the slots locally. ~ 5. For max number of slot-sync workers on standby, new GUC max_slotsync_workers has been added, default value and max value is kept at 2 and 50 respectively. This parameter can only be set at server start. 5a. SUGGESTION (minor rewording) A new GUC 'max_slotsync_workers' defines the maximum number of slot-sync workers on the standby: default value = 2, max value = 50. This parameter can only be set at server start ~ 5b. Actually, I think mentioning the values 2 and 50 here might be too much detail, but I left it anyway. Consider removing that. ~~~ 6. Now replication launcher on physical standby queries primary to get list of dbids which belong to slots mentioned in GUC 'synchronize_slot_names'. Once it gets the dbids, if dbids < max_slotsync_workers, it starts only that many workers and if dbids > max_slotsync_workers, it starts max_slotsync_workers and divides the work equally among them. Each worker is then responsible to keep on syncing the concerned logical slots belonging to the DBs assigned to it. ~ 6a. SUGGESTION (first sentence) Now the replication launcher on the physical standby queries primary to get the list of dbids that belong to the... ~ 6b. "concerned" ?? ~~~ 7. Let us say slots mentioned in 'synchronize_slot_names' on primary belongs to 4 DBs and say 'max_slotsync_workers' is 4, then a new worker will be launched for each db. If a new logical slot with a different DB is found by replication launcher, it will assign this new db to the worker handling the minimum number of dbs currently (or first worker in case of equal count). ~ /Let us say/For example, let's say/ ~~~ 8. The naptime of worker is tuned as per the activity on primary. Each worker starts with naptime of 10ms and if no activity is observed on primary for some time, then naptime is increased to 10sec. And if activity is observed again, naptime is reduced back to 10ms. Each worker does it by choosing one slot (first one assigned to it) for monitoring purpose. If there is no change in lsn of that slot for say over 10 sync-checks, naptime is increased to 10sec and as soon as a change is observed, naptime is reduced back to 10ms. ~ /as per the activity on primary/according to the activity on the primary/ /is observed on primary/is observed on the primary/ /Each worker does it by choosing one slot/Each worker uses one slot/ ~~~ 9. If there is any change in synchronize_slot_names, then the slots which are no longer part of it or the ones which no longer exist on primary will be dropped by slot-sync workers on physical standbys. ~ 9a. /on primary/on the primary/ /which no longer exist/that no longer exist/ ~ 9b. I didn't really understand why this says "or the ones which no longer exist". IIUC (from prior paragraph) such slots would already be invalidated/removed by the sync-slot worker in due course -- i.e. we don't need to wait for some change to the 'synchronize_slot_names' list to trigger that deletion, right? ====== doc/src/sgml/config.sgml 10. + <varlistentry id="guc-max-slotsync-workers" xreflabel="max_slotsync_workers"> + <term><varname>max_slotsync_workers</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_slotsync_workers</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies maximum number of slot synchronization workers. + </para> + <para> + Slot synchronization workers are taken from the pool defined by + <varname>max_worker_processes</varname>. + </para> + <para> + The default value is 2. This parameter can only be set at server + start. + </para> + </listitem> + </varlistentry> This looks OK, but IMO there also needs some larger description (here or elsewhere?) about this feature more generally. Otherwise, why would the user change the 'max_slotsync_workers' when there is nothing to say "slot synchronization workers" are for? ====== src/backend/postmaster/bgworker.c 11. { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncMain", ReplSlotSyncMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, ~ I thought this entry point name/function should include the word "Worker" same as for the others. ====== .../libpqwalreceiver/libpqwalreceiver.c 12. +/* + * Get DB info for logical slots + * + * It gets the DBIDs for slot_names from primary. The list obatined has no + * duplicacy of DBIds. + */ +static List * +libpqrcv_get_dbinfo_for_logical_slots(WalReceiverConn *conn, + const char *slot_names) 12a. typo /obatined/ SUGGESTION The returned list has no duplicates. ~ 12b. I did not recognise any part of the function logic ensuring no duplicates are returned. IIUC it is actually within the logic of LIST_DBID_FOR_LOGICAL_SLOTS that this is handled, so maybe the comment can mention that. ~~~ 13. libpqrcv_get_dbinfo_for_logical_slots + if (PQnfields(res) != 1) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Could not get list of slots: got %d fields, " + "expected %d fields.", + nfields, 1))); + } Something seems not right about the message. The "expected" part plurality is wrong, and if it can only be 1 then why use substitution? ====== src/backend/replication/logical/Makefile OK ====== src/backend/replication/logical/launcher.c 14. slot_sync_worker_stop +static void +slot_sync_worker_stop(SlotSyncWorker *worker) +{ + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); ... + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + } + +} Unnecessary whitespace at the top and bottom of this function. ~~~ 15. slot_sync_worker_launch_or_reuse + /* Find unused worker slot. */ + for (int i = 0; i < max_slotsync_workers; i++) loop variable declaration. ~~~ 16. slot_sync_worker_launch_or_reuse + if (!worker) + { + for (int i = 0; i < max_slotsync_workers; i++) loop variable declaration. ~~~ 17. slot_sync_remove_obsolete_dbs + /* Traverse slot-sync-workers to validate the DBs */ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { loop variable declaration. ~ 18. + for (int dbidx = 0; dbidx < worker->dbcount;) + { loop variable declaration ~ 19. + for (int i = dbidx; i < worker->dbcount; i++) + { loop variable declaration ~ 20. + /* If dbcount for any worker has become 0, shut it down */ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { loop variable declaration ~ 21. + } + +} + Unnecessary whitespace at the end of the function body ~~~ 22. ApplyLauncherStartSubs +static void +ApplyLauncherStartSubs(long *wait_time) +{ Missing function comment. ====== .../replication/logical/logicalfuncs.c OK ====== src/backend/replication/logical/meson.build OK ====== src/backend/replication/logical/slotsync.c 23. +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby from primary + * + * Copyright (c) 2016-2018, PostgreSQL Global Development Group + * Wrong copyright date? ~~~ 24. + * This file contains the code for slot-sync worker on physical standby that + * fetches the logical replication slots information from primary server + * (PrimaryConnInfo) and creates the slots on standby and synchronizes them + * periodically. It synchronizes only the slots configured in + * 'synchronize_slot_names'. SUGGESTION This file contains the code for slot-sync workers on physical standby to fetch logical replication slot information from the primary server (PrimaryConnInfo), create the slots on the standby, and synchronize them periodically. Slot-sync workers only synchronize slots configured in 'synchronize_slot_names'. ~~~ 25. + * It takes a nap of WORKER_DEFAULT_NAPTIME before every next synchronization. + * If there is no acitivity observed on primary for sometime, it increases the + * naptime to WORKER_INACTIVITY_NAPTIME and as soon as any activity is observed, + * it brings back the naptime to default value. SUGGESTION (2nd sentence) If there is no activity observed on the primary for some time, the naptime is increased to WORKER_INACTIVITY_NAPTIME, but if any activity is observed, the naptime reverts to the default value. ~~~ 26. +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool conflicting; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; This deserves at least a struct-level comment. ~~~ 27. +/* + * Inactivity Threshold Count before increasing naptime of worker. + * + * If the lsn of slot being monitored did not change for these many times, + * then increase naptime of current worker from WORKER_DEFAULT_NAPTIME to + * WORKER_INACTIVITY_NAPTIME. + */ +#define WORKER_INACTIVITY_THRESHOLD 10 I felt this constant would be better expressed as a time interval instead of a magic number. You can easily derive that loop count anyway in the code logic. e.g. here the comment would be "If the lsn of the slot being monitored did not change for XXXms then...". ~~~ 28. wait_for_primary_slot_catchup +/* + * Wait for remote slot to pass localy reserved position. + */ +static void +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name, + XLogRecPtr min_lsn) /localy/locally/ ~~~ 29. wait_for_primary_slot_catchup + ereport(ERROR, + (errmsg("slot \"%s\" disapeared from primary", + slot_name))); /disapeared/disappeared/ ~~~ 30. ReplSlotSyncMain + if (!dsa) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory " + "segment for slot-sync worker"))); + + + /* Primary initialization is complete. Now, attach to our slot. */ Unnecessary double whitespace. ====== src/backend/replication/logical/tablesync.c OK ====== src/backend/replication/repl_gram.y OK ====== src/backend/replication/repl_scanner.l OK ====== src/backend/replication/slot.c ====== src/backend/replication/slotfuncs.c 31. +/* + * SQL function for getting invalidation cause of a slot. + * + * Returns ReplicationSlotInvalidationCause enum value for valid slot_name; + * returns NULL if slot with given name is not found. + */ +Datum +pg_get_invalidation_cause(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + ReplicationSlotInvalidationCause cause; + int slotno; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[slotno]; + if (strcmp(NameStr(s->data.name), NameStr(*name)) == 0) + { + cause = s->data.invalidated; + PG_RETURN_INT16(cause); + } + } + LWLockRelease(ReplicationSlotControlLock); + + PG_RETURN_NULL(); +} 31a. There seems no check if the slot actually is invalidated. I guess in that case the function just returns the enum value RS_INVAL_NONE, but should that be mentioned in the function header comment? ~ 31b. Seems a poor choice of function name -- does not even have the word "slot" in the name (??). ~ 31c. IMO it is better to have a blankline after the declaration in the loop. ~ 31b. Might be simpler just to remove that 'cause' variable. It's not doing much. ====== src/backend/replication/walsender.c 32. ListSlotDatabaseOIDs +/* + * Handle the LIST_SLOT_DATABASE_OIDS command. + */ +static void +ListSlotDatabaseOIDs(ListDBForLogicalSlotsCmd *cmd) 32a. The function-level comment seems too terse. Just saying "handle the command" does not describe what this function is actually doing and how it does it. ~ 32b. Is "LIST_SLOT_DATABASE_OIDS" even the correct name? I don't see that anywhere else in this patch. AFAICT it should be "LIST_DBID_FOR_LOGICAL_SLOTS". ~ 33. ListSlotDatabaseOIDs - comments The comments in the body of this function are inconsistent begining uppercase/lowercase ~ 34. ListSlotDatabaseOIDs - sorting/logic Maybe explain better the reason for having the qsort and other logic. TBH, I was not sure of the necessity for the names lists and the sorting and bsearch logic. AFAICT these are all *only* used to check for uniqueness and existence of the slot name. So I was wondering if a hashmap keyed by the slot name might be more appropriate and also simpler than all this list sorting/searching. ~~ 35. ListSlotDatabaseOIDs + for (int slotno = 0; slotno < max_replication_slots; slotno++) + { loop variable declaration ====== src/backend/storage/lmgr/lwlock.c OK ====== src/backend/storage/lmgr/lwlocknames.txt OK ====== .../utils/activity/wait_event_names.txt TODO ====== src/backend/utils/misc/guc_tables.c OK ====== src/backend/utils/misc/postgresql.conf.sample 36. # primary to streaming replication standby server +#max_slotsync_workers = 2 # max number of slot synchronization workers on a standby IMO it is better to say "maximum" instead of "max" in the comment. (make sure the GUC description text is identical) ====== src/include/catalog/pg_proc.dat 37. +{ oid => '6312', descr => 'get invalidate cause of a replication slot', + proname => 'pg_get_invalidation_cause', provolatile => 's', proisstrict => 't', + prorettype => 'int2', proargtypes => 'name', + prosrc => 'pg_get_invalidation_cause' }, 37a. SUGGESTION (descr) what caused the replication slot to become invalid ~ 37b 'pg_get_invalidation_cause' seemed like a poor function name because it doesn't have any context -- not even the word "slot" in it. ====== src/include/commands/subscriptioncmds.h OK ====== src/include/nodes/replnodes.h OK ====== src/include/postmaster/bgworker_internals.h 38. #define MAX_PARALLEL_WORKER_LIMIT 1024 +#define MAX_SLOT_SYNC_WORKER_LIMIT 50 Consider SLOTSYNC instead of SLOT_SYNC for consistency with other names of this worker. ====== OK ====== src/include/replication/logicalworker.h 39. extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); extern void TablesyncWorkerMain(Datum main_arg); +extern void ReplSlotSyncMain(Datum main_arg); The name is not consistent with others nearby. At least it should include the word "Worker" like everything else does. ====== src/include/replication/slot.h OK ====== src/include/replication/walreceiver.h 40. +/* + * Slot's DBid related data + */ +typedef struct WalRcvRepSlotDbData +{ + Oid database; /* Slot's DBid received from remote */ + TimestampTz last_sync_time; /* The last time we tried to launch sync + * worker for above Dbid */ +} WalRcvRepSlotDbData; + Is that comment about field 'last_sync_time' correct? I thought this field is the last time the slot was synced -- not the last time the worker was launched. ====== src/include/replication/worker_internal.h 41. - /* User to use for connection (will be same as owner of subscription). */ + /* User to use for connection (will be same as owner of subscription + * in case of LogicalRep worker). */ Oid userid; +} WorkerHeader; 41a. This is not the normal style for a multi-line comment. ~ 41b. I wondered if the name "WorkerHeader" is just a bit *too* generic and might cause future trouble because of the vague name. ~~~ 42. +typedef struct LogicalRepWorker +{ + WorkerHeader header; + + /* What type of worker is this? */ + LogicalRepWorkerType type; /* Subscription id for the worker. */ Oid subid; @@ -77,7 +84,7 @@ typedef struct LogicalRepWorker * would be created for each transaction which will be deleted after the * transaction is finished. */ - FileSet *stream_fileset; + struct FileSet *stream_fileset; /* * PID of leader apply worker if this slot is used for a parallel apply @@ -96,6 +103,32 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; 42a. I suggest having some struct-level comments. ~ 42b. The field name 'header' is propagated all over the place. So, IMO calling it 'hdr' instead of 'header' might be slightly less intrusive. I think there are lots of precedents for calling headers as 'hdr'. ~ 42c. What was the FileSet field changed to struct FileSet? Aren't the struct/typedef defined in the same place? ~~~ 43. +typedef struct SlotSyncWorker +{ + WorkerHeader header; + + /* The slot in worker pool to which it is attached */ + int slot; + + /* Count of Database ids it manages */ + uint32 dbcount; + + /* DSA for dbids */ + dsa_area *dbids_dsa; + + /* dsa_pointer for database ids it manages */ + dsa_pointer dbids_dp; + + /* Info about slot being monitored for worker's naptime purpose */ + struct SlotSyncWorkerWatchSlot + { + NameData slot_name; + XLogRecPtr confirmed_lsn; + int inactivity_count; + } monitoring_info; + +} SlotSyncWorker; 43a. I suggest having some struct-level comments. ~ 43b. IMO it will avoid ambiguitities to be more explicit in the comments instead of just saying "it" everywhere. + /* The slot in worker pool to which it is attached */ + /* Count of Database ids it manages */ + /* dsa_pointer for database ids it manages */ ~ 43c. There is inconsistent wording and case in these comments. Just pick one term to use everywhere. "Database ids" "database ids" "dbids" ~~~ 44. GENERAL = restructuring of common structs in worker_internal.h The field name 'header' is propagated all over the place. It is OK, and I guess there is no choice, but IMO calling it 'hdr' instead of 'header' might be slightly less intrusive. I think there are lots of precedents for calling headers as 'hdr'. ====== src/include/storage/lwlock.h ====== src/tools/pgindent/typedefs.list 45. Missing the the typedef WorkerHeader? ====== Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: