Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers
From | Ajin Cherian |
---|---|
Subject | Re: Synchronizing slots from primary to standby |
Date | |
Msg-id | CAFPTHDaqn+m47_vkAToQD6Pe8diut0F0g0bSr8PdcuW6cbSSkQ@mail.gmail.com Whole thread Raw |
In response to | Re: Synchronizing slots from primary to standby (Peter Smith <smithpb2250@gmail.com>) |
List | pgsql-hackers |
On Wed, Oct 4, 2023 at 2:23 PM Peter Smith <smithpb2250@gmail.com> wrote: > > Here are some review comments for v20-0002. > These comments below have been addressed in patch v24 posted by Shveta. > ====== > 1. GENERAL - errmsg/elog messages > > There are a a lot of minor problems and/or quirks across all the > message texts. Here is a summary of some I found: > > ERROR > errmsg("could not receive list of slots from the primary server: %s", > errmsg("invalid response from primary server"), > errmsg("invalid connection string syntax: %s", > errmsg("replication slot-sync worker slot %d is empty, cannot attach", > errmsg("replication slot-sync worker slot %d is already used by > another worker, cannot attach", > errmsg("replication slot-sync worker slot %d is already used by > another worker, cannot attach", > errmsg("could not connect to the primary server: %s", > > errmsg("operation not permitted on replication slots on standby which > are synchronized from primary"))); > /primary/the primary/ > == comment no longer part of patch. > errmsg("could not fetch invalidation cuase for slot \"%s\" from primary: %s", > /cuase/cause/ > /primary/the primary/ > == fixed > errmsg("slot \"%s\" disapeared from the primary", > /disapeared/disappeared/ > == fixed > errmsg("could not fetch slot info from the primary: %s", > errmsg("could not connect to the primary server: %s", err))); > errmsg("could not map dynamic shared memory segment for slot-sync worker"))); > > errmsg("physical replication slot %s found in synchronize_slot_names", > slot name not quoted? > --- == comment no longer part of patch > > WARNING > errmsg("out of background worker slots"), > > errmsg("Replication slot-sync worker failed to attach to worker-pool slot %d", > case? > > errmsg("Removed database %d from replication slot-sync worker %d; > dbcount now: %d", > case? > > errmsg("Skipping slots synchronization as primary_slot_name is not set.")); > case? > > errmsg("Skipping slots synchronization as hot_standby_feedback is off.")); > case? > > errmsg("Skipping slots synchronization as dbname is not specified in > primary_conninfo.")); > case? > > errmsg("slot-sync wait for slot %s interrupted by promotion, slot > creation aborted", > > errmsg("could not fetch slot info for slot \"%s\" from primary: %s", > /primary/the primary/ > > errmsg("slot \"%s\" disappeared from the primary, aborting slot creation", > errmsg("slot \"%s\" invalidated on primary, aborting slot creation", > > errmsg("slot-sync for slot %s interrupted by promotion, sync not possible", > slot name not quoted? > == fixed > errmsg("skipping sync of slot \"%s\" as the received slot-sync lsn > %X/%X is ahead of the standby position %X/%X", > > errmsg("not synchronizing slot %s; synchronization would move it backward", > slot name not quoted? > /backward/backwards/ > == comment is no longer part of the patch. > --- > > LOG > errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d", > errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d", > errmsg("Stopping replication slot-sync worker %d", > errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin > (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)", > > errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)and catalog > xmin (%u) has now passed local slot LSN (%X/%X) and catalog xmin > (%u)", > missing spaces? > == fixed > elog(LOG, "Dropped replication slot \"%s\" ", > extra space? > why this one is elog but others are not? > > elog(LOG, "Replication slot-sync worker %d is shutting down on > receiving SIGINT", MySlotSyncWorker->slot); > case? > why this one is elog but others are not? > > elog(LOG, "Replication slot-sync worker %d started", worker_slot); > case? > why this one is elog but others are not? > ---- > == changed these to ereports. > DEBUG1 > errmsg("allocated dsa for slot-sync worker for dbcount: %d" > worker number not given? > should be elog? > > errmsg_internal("logical replication launcher started") > should be elog? > == changed to elog > ---- > > DEBUG2 > elog(DEBUG2, "slot-sync worker%d's query:%s \n", > missing space after 'worker' > extra space before \n > > ====== > .../libpqwalreceiver/libpqwalreceiver.c > > 2. libpqrcv_get_dbname_from_conninfo > > +/* > + * Get database name from primary conninfo. > + * > + * If dbanme is not found in connInfo, return NULL value. > + * The caller should take care of handling NULL value. > + */ > +static char * > +libpqrcv_get_dbname_from_conninfo(const char *connInfo) > > 2a. > /dbanme/dbname/ > == fixed > ~ > > 2b. > "The caller should take care of handling NULL value." > > IMO this is not very useful; it's like saying "caller must handle > function return values". > == removed > ~~~ > > 3. > + for (opt = opts; opt->keyword != NULL; ++opt) > + { > + /* Ignore connection options that are not present. */ > + if (opt->val == NULL) > + continue; > + > + if (strcmp(opt->keyword, "dbname") == 0 && opt->val[0] != '\0') > + { > + dbname = pstrdup(opt->val); > + } > + } > > 3a. > If there are multiple "dbname" in the conninfo then it will be the > LAST one that is returned. > > Judging by my quick syntax experiment (below) this seemed like the > correct thing to do, but I think there should be some comment to > explain about it. > > test_sub=# create subscription sub1 connection 'dbname=foo dbname=bar > dbname=test_pub' publication pub1; > 2023-09-28 19:15:15.012 AEST [23997] WARNING: subscriptions created > by regression test cases should have names starting with "regress_" > WARNING: subscriptions created by regression test cases should have > names starting with "regress_" > NOTICE: created replication slot "sub1" on publisher > CREATE SUBSCRIPTION > == added a comment saying that the last dbname would be selected. > ~ > > 3b. > The block brackets {} are not needed for the single statement. > == fixed > ~ > > 3c. > Since there is only one keyword of interest here it seemed overkill to > have a separate 'continue' check. Why not do everything in one line: > > for (opt = opts; opt->keyword != NULL; ++opt) > { > if (strcmp(opt->keyword, "dbname") == 0 && opt->val && opt->val[0] != '\0') > dbname = pstrdup(opt->val); > } > == fixed. > ====== > src/backend/replication/logical/launcher.c > > 4. > +/* > + * The local variables to store the current values of slot-sync related GUCs > + * before each ConfigReload. > + */ > +static char *PrimaryConnInfoPreReload = NULL; > +static char *PrimarySlotNamePreReload = NULL; > +static char *SyncSlotNamesPreReload = NULL; > > /The local variables/Local variables/ > == fixed. > ~~~ > > 5. fwd declare > > static void logicalrep_worker_cleanup(LogicalRepWorker *worker); > +static void slotsync_worker_cleanup(SlotSyncWorker *worker); > static int logicalrep_pa_worker_count(Oid subid); > > 5a. > Hmmn, I think there were lot more added static functions than just this one. > > e.g. what about all these? > static SlotSyncWorker *slotsync_worker_find > static dsa_handle slotsync_dsa_setup > static bool slotsync_worker_launch_or_reuse > static void slotsync_worker_stop_internal > static void slotsync_workers_stop > static void slotsync_remove_obsolete_dbs > static WalReceiverConn *primary_connect > static void SaveCurrentSlotSyncConfigs > static bool SlotSyncConfigsChanged > static void ApplyLauncherStartSlotSync > static void ApplyLauncherStartSubs > > ~ > > 5b. > There are inconsistent name style used for the new static functions -- > e.g. snake_case versus CamelCase. > == fixed. > ~~~ > > 6. WaitForReplicationWorkerAttach > > int rc; > + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false; > > This seemed a hacky way to distinguish the sync-slot workers from > other kinds of workers. Wouldn't it be better to pass another > parameter to this function? > == This was discussed and this seemed to be a simple way of doing this. > ~~~ > > 7. slotsync_worker_attach > > It looks like almost a clone of the logicalrep_worker_attach. Seems a > shame if cannot make use of common code. > == this was attempted but was found to require a lot of if conditions. > ~~~ > > 8. slotsync_worker_find > > + * Walks the slot-sync workers pool and searches for one that matches given > + * dbid. Since one worker can manage multiple dbs, so it walks the db array in > + * each worker to find the match. > > 8a. > SUGGESTION > Searches the slot-sync worker pool for the worker who manages the > specified dbid. Because a worker can manage multiple dbs, also walk > the db array of each worker to find the match. > > ~ > > 8b. > Should the comment also say something like "Returns NULL if no > matching worker is found." > == fixed > ~~~ > > 9. > + /* Search for attached worker for a given dbid */ > > SUGGESTION > Search for an attached worker managing the given dbid. > == fixed > ~~~ > > 10. > +{ > + int i; > + SlotSyncWorker *res = NULL; > + Oid *dbids; > + > + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); > + > + /* Search for attached worker for a given dbid */ > + for (i = 0; i < max_slotsync_workers; i++) > + { > + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > + int cnt; > + > + if (!w->hdr.in_use) > + continue; > + > + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); > + for (cnt = 0; cnt < w->dbcount; cnt++) > + { > + Oid wdbid = dbids[cnt]; > + > + if (wdbid == dbid) > + { > + res = w; > + break; > + } > + } > + > + /* If worker is found, break the outer loop */ > + if (res) > + break; > + } > + > + return res; > +} > > IMO this logical can be simplified a lot: > - by not using the 'res' variable; directly return instead. > - also moved the 'dbids' declaration. > - and 'cnt' variable seems not meaningful; replace with 'dbidx' for > the db array index IMO. > > For example (25 lines instead of 35 lines) > > { > int i; > > Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); > > /* Search for an attached worker managing the given dbid. */ > for (i = 0; i < max_slotsync_workers; i++) > { > SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > int dbidx; > Oid *dbids; > > if (!w->hdr.in_use) > continue; > > dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); > for (dbidx = 0; dbidx < w->dbcount; dbidx++) > { > if (dbids[dbidx] == dbid) > return w; > } > } > > return NULL; > } > == fixed > ~~~ > > 11. slot_sync_dsa_setup > > +/* > + * Setup DSA for slot-sync worker. > + * > + * DSA is needed for dbids array. Since max number of dbs a worker can manage > + * is not known, so initially fixed size to hold DB_PER_WORKER_ALLOC_INIT > + * dbs is allocated. If this size is exhausted, it can be extended using > + * dsa free and allocate routines. > + */ > +static dsa_handle > +slotsync_dsa_setup(SlotSyncWorker *worker, int alloc_db_count) > > 11a. > SUGGESTION > DSA is used for the dbids array. Because the maximum number of dbs a > worker can manage is not known, initially enough memory for > DB_PER_WORKER_ALLOC_INIT dbs is allocated. If this size is exhausted, > it can be extended using dsa free and allocate routines. > == fixed > ~ > > 11b. > It doesn't make sense for the comment to say DB_PER_WORKER_ALLOC_INIT > is the initial allocation, but then the function has a parameter > 'alloc_db_count' (which is always passed as DB_PER_WORKER_ALLOC_INIT). > IMO revemo the 2nd parameter from this function and hardwire the > initial allocation same as what the function comment says. > == fixed > ~~~ > > 12. > + /* Be sure any memory allocated by DSA routines is persistent. */ > + oldcontext = MemoryContextSwitchTo(TopMemoryContext); > > /Be sure any memory/Ensure the memory/ > == fixed > ~~~ > > 13. slotsync_worker_launch_or_reuse > > +/* > + * Slot-sync worker launch or reuse > + * > + * Start new slot-sync background worker from the pool of available workers > + * going by max_slotsync_workers count. If the worker pool is exhausted, > + * reuse the existing worker with minimum number of dbs. The idea is to > + * always distribute the dbs equally among launched workers. > + * If initially allocated dbids array is exhausted for the selected worker, > + * reallocate the dbids array with increased size and copy the existing > + * dbids to it and assign the new one as well. > + * > + * Returns true on success, false on failure. > + */ > > /going by/limited by/ (??) > == fixed > ~~~ > > 14. > + BackgroundWorker bgw; > + BackgroundWorkerHandle *bgw_handle; > + uint16 generation; > + SlotSyncWorker *worker = NULL; > + uint32 mindbcnt = 0; > + uint32 alloc_count = 0; > + uint32 copied_dbcnt = 0; > + Oid *copied_dbids = NULL; > + int worker_slot = -1; > + dsa_handle handle; > + Oid *dbids; > + int i; > + bool attach; > > IIUC many of these variables can be declared at a different scope in > this function, so they will be closer to where they are used. > == fixed > ~~~ > > 15. > + /* > + * We need to do the modification of the shared memory under lock so that > + * we have consistent view. > + */ > + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); > > The current comment seems too much. > > SUGGESTION > The shared memory must only be modified under lock. > == fixed > ~~~ > > 16. > + /* Find unused worker slot. */ > + for (i = 0; i < max_slotsync_workers; i++) > + { > + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > + > + if (!w->hdr.in_use) > + { > + worker = w; > + worker_slot = i; > + break; > + } > + } > + > + /* > + * If all the workers are currently in use. Find the one with minimum > + * number of dbs and use that. > + */ > + if (!worker) > + { > + for (i = 0; i < max_slotsync_workers; i++) > + { > + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > + > + if (i == 0) > + { > + mindbcnt = w->dbcount; > + worker = w; > + worker_slot = i; > + } > + else if (w->dbcount < mindbcnt) > + { > + mindbcnt = w->dbcount; > + worker = w; > + worker_slot = i; > + } > + } > + } > > Why not combine these 2 loops, to avoid iterating over the same slots > twice? Then, exit the loop immediately if unused worker found, > otherwise if reach the end of loop having not found anything unused > then you will already know the one having least dbs. > == fixed > ~~~ > > 17. > + /* Remember the old dbids before we reallocate dsa. */ > + copied_dbcnt = worker->dbcount; > + copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid)); > + memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid)); > > 17a. > Who frees this copied_dbids memory when you are finished needed it. It > seems allocated in the TopMemoryContext so IIUC this is a leak. > == fixed > ~ > > 17b. > These are the 'old' values. Not the 'copied' values. The copied_xxx > variable names seem misleading. > == fixed > ~~~ > > 18. > + /* Prepare the new worker. */ > + worker->hdr.launch_time = GetCurrentTimestamp(); > + worker->hdr.in_use = true; > > If a new worker is required then the launch_time is set like above. > > + { > + slot_db_data->last_launch_time = now; > + > + slotsync_worker_launch_or_reuse(slot_db_data->database); > + } > > Meanwhile, at the caller of slotsync_worker_launch_or_reuse(), the > dbid launch_time was already set as well. And those two timestamps are > almost (but not quite) the same value. Isn't that a bit strange? > == in the caller, the purpose of the timestamp is to calculate how long to wait before retrying. > ~~~ > > 19. > + /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */ > + handle = slotsync_dsa_setup(worker, DB_PER_WORKER_ALLOC_INIT); > + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); > + > + dbids[worker->dbcount++] = dbid; > > Where was this worker->dbcount assigned to 0? > > Maybe it's better to do this explicity under the "/* Prepare the new > worker. */" comment. > == dbcount is assigned 0 in the function called two lines above - slotsync_dsa_setup() > ~~~ > > 20. > + if (!attach) > + ereport(WARNING, > + (errmsg("Replication slot-sync worker failed to attach to " > + "worker-pool slot %d", worker_slot))); > + > + /* Attach is done, now safe to log that the worker is managing dbid */ > + if (attach) > + ereport(LOG, > + (errmsg("Added database %d to replication slot-sync " > + "worker %d; dbcount now: %d", > + dbid, worker_slot, worker->dbcount))); > > 20a. > IMO this should be coded as "if (attach) ...; else ..." > == fixed. > ~ > > 99b. > In other code if it failed to register then slotsync_worker_cleanup > code is called. How come similar code is not done when fails to > attach? > == WaitForReplicationWorkerAttach does the cleanup before returning false. > ~~~ > > 21. slotsync_worker_stop_internal > > +/* > + * Internal function to stop the slot-sync worker and wait until it detaches > + * from the slot-sync worker-pool slot. > + */ > +static void > +slotsync_worker_stop_internal(SlotSyncWorker *worker) > > IIUC this function does a bit more than what the function comment > says. IIUC (again) I think the "detached" worker slot will still be > flagged as 'inUse' but this function then does the extra step of > calling slotsync_worker_cleanup() function to make the worker slot > available for next process that needs it, am I correct? > > In this regard, this function seems a lot more like > logicalrep_worker_detach() function comment, so there seems some kind > of muddling of the different function names here... (??). > == modified the comment to mention the cleanup. > ~~~ > > 22. slotsync_remove_obsolete_dbs > > This function says: > +/* > + * Slot-sync workers remove obsolete DBs from db-list > + * > + * If the DBIds fetched from the primary are lesser than the ones being managed > + * by slot-sync workers, remove extra dbs from worker's db-list. This > may happen > + * if some slots are removed on primary but 'synchronize_slot_names' has not > + * been changed yet. > + */ > +static void > +slotsync_remove_obsolete_dbs(List *remote_dbs) > > But, there was another similar logic function too: > > +/* > + * Drop obsolete slots > + * > + * Drop the slots which no longer need to be synced i.e. these either > + * do not exist on primary or are no longer part of synchronize_slot_names. > + * > + * Also drop the slots which are valid on primary and got invalidated > + * on standby due to conflict (say required rows removed on primary). > + * The assumption is, these will get recreated in next sync-cycle and > + * it is okay to drop and recreate such slots as long as these are not > + * consumable on standby (which is the case currently). > + */ > +static void > +drop_obsolete_slots(Oid *dbids, List *remote_slot_list) > > Those function header comments suggest these have a lot of overlapping > functionality. > > Can't those 2 functions be combined? Or maybe one delegate to the other? > == One is called by the launcher, and the other is called by the slotsync worker. While one prunes the list of dbs that needs to be passed to each slot-sync worker, the other prunes the list of slots each slot-sync worker handles in its dblist. Both are different. > ~~~ > > 23. > + ListCell *lc; > + Oid *dbids; > + int widx; > + int dbidx; > + int i; > > Scope of some of these variable declarations can be different so they > are declared closer to where they are used. > == fixed > ~~~ > > 24. > + /* If not found, then delete this db from worker's db-list */ > + if (!found) > + { > + for (i = dbidx; i < worker->dbcount; i++) > + { > + /* Shift the DBs and get rid of wdbid */ > + if (i < (worker->dbcount - 1)) > + dbids[i] = dbids[i + 1]; > + } > > IIUC, that shift/loop could just have been a memmove() call to remove > one Oid element. > == fixed > ~~~ > > 25. > + /* If dbcount for any worker has become 0, shut it down */ > + for (widx = 0; widx < max_slotsync_workers; widx++) > + { > + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; > + > + if (worker->hdr.in_use && !worker->dbcount) > + slotsync_worker_stop_internal(worker); > + } > > Is it safe to stop this unguarded by SlotSyncWorkerLock locking? Is > there a window where another dbid decides to reuse this worker at the > same time this process is about to stop it? > == Only the launcher can do this, and there is only one launcher. > ~~~ > > 26. primary_connect > > +/* > + * Connect to primary server for slotsync purpose and return the connection > + * info. Disconnect previous connection if provided in wrconn_prev. > + */ > > /primary server/the primary server/ > == fixed > ~~~ > > 27. > + if (!RecoveryInProgress()) > + return NULL; > + > + if (max_slotsync_workers == 0) > + return NULL; > + > + if (strcmp(synchronize_slot_names, "") == 0) > + return NULL; > + > + /* The primary_slot_name is not set */ > + if (!WalRcv || WalRcv->slotname[0] == '\0') > + { > + ereport(WARNING, > + errmsg("Skipping slots synchronization as primary_slot_name " > + "is not set.")); > + return NULL; > + } > + > + /* The hot_standby_feedback must be ON for slot-sync to work */ > + if (!hot_standby_feedback) > + { > + ereport(WARNING, > + errmsg("Skipping slots synchronization as hot_standby_feedback " > + "is off.")); > + return NULL; > + } > > How come some of these checks giving WARNING that slot synchronization > will be skipped, but others are just silently returning NULL? > == primary_slot_name and hot_standby_feedback are not GUCs exclusive to slot synchronization, they are previously existing - so warning only for them. The others are specific to slot synchronization, so if users set them (which shows that the user intends to use sync-slot), then warning to let the user know that these others also need to be set. > ~~~ > > 28. SaveCurrentSlotSyncConfigs > > +static void > +SaveCurrentSlotSyncConfigs() > +{ > + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); > + PrimarySlotNamePreReload = pstrdup(WalRcv->slotname); > + SyncSlotNamesPreReload = pstrdup(synchronize_slot_names); > +} > > Shouldn't this code also do pfree first? Otherwise these will slowly > leak every time this function is called, right? > == fixed > ~~~ > > 29. SlotSyncConfigsChanged > > +static bool > +SlotSyncConfigsChanged() > +{ > + if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) > + return true; > + > + if (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0) > + return true; > + > + if (strcmp(SyncSlotNamesPreReload, synchronize_slot_names) != 0) > + return true; > > I felt those can all be combined to have 1 return instead of 3. > == fixed > ~~~ > > 30. > + /* > + * If we have reached this stage, it means original value of > + * hot_standby_feedback was 'true', so consider it changed if 'false' now. > + */ > + if (!hot_standby_feedback) > + return true; > > "If we have reached this stage" seems a bit vague. Can this have some > more explanation? And, maybe also an Assert(hot_standby_feedback); is > helpful in the calling code (before the config is reloaded)? > == rewrote this without that comment. regards, Ajin Cherian Fujitsu Australia
pgsql-hackers by date: