Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers
From | shveta malik |
---|---|
Subject | Re: Synchronizing slots from primary to standby |
Date | |
Msg-id | CAJpy0uDTJFWFzz3=Qo75AG2SugjQ3dvVkOJChii=1vfHfnjWuQ@mail.gmail.com Whole thread Raw |
In response to | Re: Synchronizing slots from primary to standby (Peter Smith <smithpb2250@gmail.com>) |
Responses |
Re: Synchronizing slots from primary to standby
|
List | pgsql-hackers |
On Wed, Oct 4, 2023 at 8:53 AM Peter Smith <smithpb2250@gmail.com> wrote: > > Here are some review comments for v20-0002. > Thanks Peter for the feedback. Comments from 31 till end are addressed in v22. First 30 comments will be addressed in the next version. > ====== > 1. GENERAL - errmsg/elog messages > > There are a a lot of minor problems and/or quirks across all the > message texts. Here is a summary of some I found: > > ERROR > errmsg("could not receive list of slots from the primary server: %s", > errmsg("invalid response from primary server"), > errmsg("invalid connection string syntax: %s", > errmsg("replication slot-sync worker slot %d is empty, cannot attach", > errmsg("replication slot-sync worker slot %d is already used by > another worker, cannot attach", > errmsg("replication slot-sync worker slot %d is already used by > another worker, cannot attach", > errmsg("could not connect to the primary server: %s", > > errmsg("operation not permitted on replication slots on standby which > are synchronized from primary"))); > /primary/the primary/ > > errmsg("could not fetch invalidation cuase for slot \"%s\" from primary: %s", > /cuase/cause/ > /primary/the primary/ > > errmsg("slot \"%s\" disapeared from the primary", > /disapeared/disappeared/ > > errmsg("could not fetch slot info from the primary: %s", > errmsg("could not connect to the primary server: %s", err))); > errmsg("could not map dynamic shared memory segment for slot-sync worker"))); > > errmsg("physical replication slot %s found in synchronize_slot_names", > slot name not quoted? > --- > > WARNING > errmsg("out of background worker slots"), > > errmsg("Replication slot-sync worker failed to attach to worker-pool slot %d", > case? > > errmsg("Removed database %d from replication slot-sync worker %d; > dbcount now: %d", > case? > > errmsg("Skipping slots synchronization as primary_slot_name is not set.")); > case? > > errmsg("Skipping slots synchronization as hot_standby_feedback is off.")); > case? > > errmsg("Skipping slots synchronization as dbname is not specified in > primary_conninfo.")); > case? > > errmsg("slot-sync wait for slot %s interrupted by promotion, slot > creation aborted", > > errmsg("could not fetch slot info for slot \"%s\" from primary: %s", > /primary/the primary/ > > errmsg("slot \"%s\" disappeared from the primary, aborting slot creation", > errmsg("slot \"%s\" invalidated on primary, aborting slot creation", > > errmsg("slot-sync for slot %s interrupted by promotion, sync not possible", > slot name not quoted? > > errmsg("skipping sync of slot \"%s\" as the received slot-sync lsn > %X/%X is ahead of the standby position %X/%X", > > errmsg("not synchronizing slot %s; synchronization would move it backward", > slot name not quoted? > /backward/backwards/ > > --- > > LOG > errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d", > errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d", > errmsg("Stopping replication slot-sync worker %d", > errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin > (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)", > > errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)and catalog > xmin (%u) has now passed local slot LSN (%X/%X) and catalog xmin > (%u)", > missing spaces? > > elog(LOG, "Dropped replication slot \"%s\" ", > extra space? > why this one is elog but others are not? > > elog(LOG, "Replication slot-sync worker %d is shutting down on > receiving SIGINT", MySlotSyncWorker->slot); > case? > why this one is elog but others are not? > > elog(LOG, "Replication slot-sync worker %d started", worker_slot); > case? > why this one is elog but others are not? > ---- > > DEBUG1 > errmsg("allocated dsa for slot-sync worker for dbcount: %d" > worker number not given? > should be elog? > > errmsg_internal("logical replication launcher started") > should be elog? > > ---- > > DEBUG2 > elog(DEBUG2, "slot-sync worker%d's query:%s \n", > missing space after 'worker' > extra space before \n > > ====== > .../libpqwalreceiver/libpqwalreceiver.c > > 2. libpqrcv_get_dbname_from_conninfo > > +/* > + * Get database name from primary conninfo. > + * > + * If dbanme is not found in connInfo, return NULL value. > + * The caller should take care of handling NULL value. > + */ > +static char * > +libpqrcv_get_dbname_from_conninfo(const char *connInfo) > > 2a. > /dbanme/dbname/ > > ~ > > 2b. > "The caller should take care of handling NULL value." > > IMO this is not very useful; it's like saying "caller must handle > function return values". > > ~~~ > > 3. > + for (opt = opts; opt->keyword != NULL; ++opt) > + { > + /* Ignore connection options that are not present. */ > + if (opt->val == NULL) > + continue; > + > + if (strcmp(opt->keyword, "dbname") == 0 && opt->val[0] != '\0') > + { > + dbname = pstrdup(opt->val); > + } > + } > > 3a. > If there are multiple "dbname" in the conninfo then it will be the > LAST one that is returned. > > Judging by my quick syntax experiment (below) this seemed like the > correct thing to do, but I think there should be some comment to > explain about it. > > test_sub=# create subscription sub1 connection 'dbname=foo dbname=bar > dbname=test_pub' publication pub1; > 2023-09-28 19:15:15.012 AEST [23997] WARNING: subscriptions created > by regression test cases should have names starting with "regress_" > WARNING: subscriptions created by regression test cases should have > names starting with "regress_" > NOTICE: created replication slot "sub1" on publisher > CREATE SUBSCRIPTION > > ~ > > 3b. > The block brackets {} are not needed for the single statement. > > ~ > > 3c. > Since there is only one keyword of interest here it seemed overkill to > have a separate 'continue' check. Why not do everything in one line: > > for (opt = opts; opt->keyword != NULL; ++opt) > { > if (strcmp(opt->keyword, "dbname") == 0 && opt->val && opt->val[0] != '\0') > dbname = pstrdup(opt->val); > } > > ====== > src/backend/replication/logical/launcher.c > > 4. > +/* > + * The local variables to store the current values of slot-sync related GUCs > + * before each ConfigReload. > + */ > +static char *PrimaryConnInfoPreReload = NULL; > +static char *PrimarySlotNamePreReload = NULL; > +static char *SyncSlotNamesPreReload = NULL; > > /The local variables/Local variables/ > > ~~~ > > 5. fwd declare > > static void logicalrep_worker_cleanup(LogicalRepWorker *worker); > +static void slotsync_worker_cleanup(SlotSyncWorker *worker); > static int logicalrep_pa_worker_count(Oid subid); > > 5a. > Hmmn, I think there were lot more added static functions than just this one. > > e.g. what about all these? > static SlotSyncWorker *slotsync_worker_find > static dsa_handle slotsync_dsa_setup > static bool slotsync_worker_launch_or_reuse > static void slotsync_worker_stop_internal > static void slotsync_workers_stop > static void slotsync_remove_obsolete_dbs > static WalReceiverConn *primary_connect > static void SaveCurrentSlotSyncConfigs > static bool SlotSyncConfigsChanged > static void ApplyLauncherStartSlotSync > static void ApplyLauncherStartSubs > > ~ > > 5b. > There are inconsistent name style used for the new static functions -- > e.g. snake_case versus CamelCase. > > ~~~ > > 6. WaitForReplicationWorkerAttach > > int rc; > + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false; > > This seemed a hacky way to distinguish the sync-slot workers from > other kinds of workers. Wouldn't it be better to pass another > parameter to this function? > > ~~~ > > 7. slotsync_worker_attach > > It looks like almost a clone of the logicalrep_worker_attach. Seems a > shame if cannot make use of common code. > > ~~~ > > 8. slotsync_worker_find > > + * Walks the slot-sync workers pool and searches for one that matches given > + * dbid. Since one worker can manage multiple dbs, so it walks the db array in > + * each worker to find the match. > > 8a. > SUGGESTION > Searches the slot-sync worker pool for the worker who manages the > specified dbid. Because a worker can manage multiple dbs, also walk > the db array of each worker to find the match. > > ~ > > 8b. > Should the comment also say something like "Returns NULL if no > matching worker is found." > > ~~~ > > 9. > + /* Search for attached worker for a given dbid */ > > SUGGESTION > Search for an attached worker managing the given dbid. > > ~~~ > > 10. > +{ > + int i; > + SlotSyncWorker *res = NULL; > + Oid *dbids; > + > + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); > + > + /* Search for attached worker for a given dbid */ > + for (i = 0; i < max_slotsync_workers; i++) > + { > + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > + int cnt; > + > + if (!w->hdr.in_use) > + continue; > + > + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); > + for (cnt = 0; cnt < w->dbcount; cnt++) > + { > + Oid wdbid = dbids[cnt]; > + > + if (wdbid == dbid) > + { > + res = w; > + break; > + } > + } > + > + /* If worker is found, break the outer loop */ > + if (res) > + break; > + } > + > + return res; > +} > > IMO this logical can be simplified a lot: > - by not using the 'res' variable; directly return instead. > - also moved the 'dbids' declaration. > - and 'cnt' variable seems not meaningful; replace with 'dbidx' for > the db array index IMO. > > For example (25 lines instead of 35 lines) > > { > int i; > > Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); > > /* Search for an attached worker managing the given dbid. */ > for (i = 0; i < max_slotsync_workers; i++) > { > SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > int dbidx; > Oid *dbids; > > if (!w->hdr.in_use) > continue; > > dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); > for (dbidx = 0; dbidx < w->dbcount; dbidx++) > { > if (dbids[dbidx] == dbid) > return w; > } > } > > return NULL; > } > > ~~~ > > 11. slot_sync_dsa_setup > > +/* > + * Setup DSA for slot-sync worker. > + * > + * DSA is needed for dbids array. Since max number of dbs a worker can manage > + * is not known, so initially fixed size to hold DB_PER_WORKER_ALLOC_INIT > + * dbs is allocated. If this size is exhausted, it can be extended using > + * dsa free and allocate routines. > + */ > +static dsa_handle > +slotsync_dsa_setup(SlotSyncWorker *worker, int alloc_db_count) > > 11a. > SUGGESTION > DSA is used for the dbids array. Because the maximum number of dbs a > worker can manage is not known, initially enough memory for > DB_PER_WORKER_ALLOC_INIT dbs is allocated. If this size is exhausted, > it can be extended using dsa free and allocate routines. > > ~ > > 11b. > It doesn't make sense for the comment to say DB_PER_WORKER_ALLOC_INIT > is the initial allocation, but then the function has a parameter > 'alloc_db_count' (which is always passed as DB_PER_WORKER_ALLOC_INIT). > IMO revemo the 2nd parameter from this function and hardwire the > initial allocation same as what the function comment says. > > ~~~ > > 12. > + /* Be sure any memory allocated by DSA routines is persistent. */ > + oldcontext = MemoryContextSwitchTo(TopMemoryContext); > > /Be sure any memory/Ensure the memory/ > > ~~~ > > 13. slotsync_worker_launch_or_reuse > > +/* > + * Slot-sync worker launch or reuse > + * > + * Start new slot-sync background worker from the pool of available workers > + * going by max_slotsync_workers count. If the worker pool is exhausted, > + * reuse the existing worker with minimum number of dbs. The idea is to > + * always distribute the dbs equally among launched workers. > + * If initially allocated dbids array is exhausted for the selected worker, > + * reallocate the dbids array with increased size and copy the existing > + * dbids to it and assign the new one as well. > + * > + * Returns true on success, false on failure. > + */ > > /going by/limited by/ (??) > > ~~~ > > 14. > + BackgroundWorker bgw; > + BackgroundWorkerHandle *bgw_handle; > + uint16 generation; > + SlotSyncWorker *worker = NULL; > + uint32 mindbcnt = 0; > + uint32 alloc_count = 0; > + uint32 copied_dbcnt = 0; > + Oid *copied_dbids = NULL; > + int worker_slot = -1; > + dsa_handle handle; > + Oid *dbids; > + int i; > + bool attach; > > IIUC many of these variables can be declared at a different scope in > this function, so they will be closer to where they are used. > > ~~~ > > 15. > + /* > + * We need to do the modification of the shared memory under lock so that > + * we have consistent view. > + */ > + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); > > The current comment seems too much. > > SUGGESTION > The shared memory must only be modified under lock. > > ~~~ > > 16. > + /* Find unused worker slot. */ > + for (i = 0; i < max_slotsync_workers; i++) > + { > + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > + > + if (!w->hdr.in_use) > + { > + worker = w; > + worker_slot = i; > + break; > + } > + } > + > + /* > + * If all the workers are currently in use. Find the one with minimum > + * number of dbs and use that. > + */ > + if (!worker) > + { > + for (i = 0; i < max_slotsync_workers; i++) > + { > + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; > + > + if (i == 0) > + { > + mindbcnt = w->dbcount; > + worker = w; > + worker_slot = i; > + } > + else if (w->dbcount < mindbcnt) > + { > + mindbcnt = w->dbcount; > + worker = w; > + worker_slot = i; > + } > + } > + } > > Why not combine these 2 loops, to avoid iterating over the same slots > twice? Then, exit the loop immediately if unused worker found, > otherwise if reach the end of loop having not found anything unused > then you will already know the one having least dbs. > > ~~~ > > 17. > + /* Remember the old dbids before we reallocate dsa. */ > + copied_dbcnt = worker->dbcount; > + copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid)); > + memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid)); > > 17a. > Who frees this copied_dbids memory when you are finished needed it. It > seems allocated in the TopMemoryContext so IIUC this is a leak. > > ~ > > 17b. > These are the 'old' values. Not the 'copied' values. The copied_xxx > variable names seem misleading. > > ~~~ > > 18. > + /* Prepare the new worker. */ > + worker->hdr.launch_time = GetCurrentTimestamp(); > + worker->hdr.in_use = true; > > If a new worker is required then the launch_time is set like above. > > + { > + slot_db_data->last_launch_time = now; > + > + slotsync_worker_launch_or_reuse(slot_db_data->database); > + } > > Meanwhile, at the caller of slotsync_worker_launch_or_reuse(), the > dbid launch_time was already set as well. And those two timestamps are > almost (but not quite) the same value. Isn't that a bit strange? > > ~~~ > > 19. > + /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */ > + handle = slotsync_dsa_setup(worker, DB_PER_WORKER_ALLOC_INIT); > + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); > + > + dbids[worker->dbcount++] = dbid; > > Where was this worker->dbcount assigned to 0? > > Maybe it's better to do this explicity under the "/* Prepare the new > worker. */" comment. > > ~~~ > > 20. > + if (!attach) > + ereport(WARNING, > + (errmsg("Replication slot-sync worker failed to attach to " > + "worker-pool slot %d", worker_slot))); > + > + /* Attach is done, now safe to log that the worker is managing dbid */ > + if (attach) > + ereport(LOG, > + (errmsg("Added database %d to replication slot-sync " > + "worker %d; dbcount now: %d", > + dbid, worker_slot, worker->dbcount))); > > 20a. > IMO this should be coded as "if (attach) ...; else ..." > > ~ > > 99b. > In other code if it failed to register then slotsync_worker_cleanup > code is called. How come similar code is not done when fails to > attach? > > ~~~ > > 21. slotsync_worker_stop_internal > > +/* > + * Internal function to stop the slot-sync worker and wait until it detaches > + * from the slot-sync worker-pool slot. > + */ > +static void > +slotsync_worker_stop_internal(SlotSyncWorker *worker) > > IIUC this function does a bit more than what the function comment > says. IIUC (again) I think the "detached" worker slot will still be > flagged as 'inUse' but this function then does the extra step of > calling slotsync_worker_cleanup() function to make the worker slot > available for next process that needs it, am I correct? > > In this regard, this function seems a lot more like > logicalrep_worker_detach() function comment, so there seems some kind > of muddling of the different function names here... (??). > > ~~~ > > 22. slotsync_remove_obsolete_dbs > > This function says: > +/* > + * Slot-sync workers remove obsolete DBs from db-list > + * > + * If the DBIds fetched from the primary are lesser than the ones being managed > + * by slot-sync workers, remove extra dbs from worker's db-list. This > may happen > + * if some slots are removed on primary but 'synchronize_slot_names' has not > + * been changed yet. > + */ > +static void > +slotsync_remove_obsolete_dbs(List *remote_dbs) > > But, there was another similar logic function too: > > +/* > + * Drop obsolete slots > + * > + * Drop the slots which no longer need to be synced i.e. these either > + * do not exist on primary or are no longer part of synchronize_slot_names. > + * > + * Also drop the slots which are valid on primary and got invalidated > + * on standby due to conflict (say required rows removed on primary). > + * The assumption is, these will get recreated in next sync-cycle and > + * it is okay to drop and recreate such slots as long as these are not > + * consumable on standby (which is the case currently). > + */ > +static void > +drop_obsolete_slots(Oid *dbids, List *remote_slot_list) > > Those function header comments suggest these have a lot of overlapping > functionality. > > Can't those 2 functions be combined? Or maybe one delegate to the other? > > ~~~ > > 23. > + ListCell *lc; > + Oid *dbids; > + int widx; > + int dbidx; > + int i; > > Scope of some of these variable declarations can be different so they > are declared closer to where they are used. > > ~~~ > > 24. > + /* If not found, then delete this db from worker's db-list */ > + if (!found) > + { > + for (i = dbidx; i < worker->dbcount; i++) > + { > + /* Shift the DBs and get rid of wdbid */ > + if (i < (worker->dbcount - 1)) > + dbids[i] = dbids[i + 1]; > + } > > IIUC, that shift/loop could just have been a memmove() call to remove > one Oid element. > > ~~~ > > 25. > + /* If dbcount for any worker has become 0, shut it down */ > + for (widx = 0; widx < max_slotsync_workers; widx++) > + { > + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; > + > + if (worker->hdr.in_use && !worker->dbcount) > + slotsync_worker_stop_internal(worker); > + } > > Is it safe to stop this unguarded by SlotSyncWorkerLock locking? Is > there a window where another dbid decides to reuse this worker at the > same time this process is about to stop it? > > ~~~ > > 26. primary_connect > > +/* > + * Connect to primary server for slotsync purpose and return the connection > + * info. Disconnect previous connection if provided in wrconn_prev. > + */ > > /primary server/the primary server/ > > ~~~ > > 27. > + if (!RecoveryInProgress()) > + return NULL; > + > + if (max_slotsync_workers == 0) > + return NULL; > + > + if (strcmp(synchronize_slot_names, "") == 0) > + return NULL; > + > + /* The primary_slot_name is not set */ > + if (!WalRcv || WalRcv->slotname[0] == '\0') > + { > + ereport(WARNING, > + errmsg("Skipping slots synchronization as primary_slot_name " > + "is not set.")); > + return NULL; > + } > + > + /* The hot_standby_feedback must be ON for slot-sync to work */ > + if (!hot_standby_feedback) > + { > + ereport(WARNING, > + errmsg("Skipping slots synchronization as hot_standby_feedback " > + "is off.")); > + return NULL; > + } > > How come some of these checks giving WARNING that slot synchronization > will be skipped, but others are just silently returning NULL? > > ~~~ > > 28. SaveCurrentSlotSyncConfigs > > +static void > +SaveCurrentSlotSyncConfigs() > +{ > + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); > + PrimarySlotNamePreReload = pstrdup(WalRcv->slotname); > + SyncSlotNamesPreReload = pstrdup(synchronize_slot_names); > +} > > Shouldn't this code also do pfree first? Otherwise these will slowly > leak every time this function is called, right? > > ~~~ > > 29. SlotSyncConfigsChanged > > +static bool > +SlotSyncConfigsChanged() > +{ > + if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) > + return true; > + > + if (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0) > + return true; > + > + if (strcmp(SyncSlotNamesPreReload, synchronize_slot_names) != 0) > + return true; > > I felt those can all be combined to have 1 return instead of 3. > > ~~~ > > 30. > + /* > + * If we have reached this stage, it means original value of > + * hot_standby_feedback was 'true', so consider it changed if 'false' now. > + */ > + if (!hot_standby_feedback) > + return true; > > "If we have reached this stage" seems a bit vague. Can this have some > more explanation? And, maybe also an Assert(hot_standby_feedback); is > helpful in the calling code (before the config is reloaded)? > > ~~~ > > 31. ApplyLauncherStartSlotSync > > + * It connects to primary, get the list of DBIDs for slots configured in > + * synchronize_slot_names. It then launces the slot-sync workers as per > + * max_slotsync_workers and then assign the DBs equally to the workers > + * launched. > + */ > > SUGGESTION (fix typos etc) > Connect to the primary, to get the list of DBIDs for slots configured > in synchronize_slot_names. Then launch slot-sync workers (limited by > max_slotsync_workers) where the DBs are distributed equally among > those workers. > > ~~~ > > 32. > +static void > +ApplyLauncherStartSlotSync(long *wait_time, WalReceiverConn *wrconn) > > Why does this function even have 'Apply' in the name when it is > nothing to do with an apply worker; looks like some cut/paste > hangover. How about calling it something like 'LaunchSlotSyncWorkers' > > ~~~ > > 33. > + /* If connection is NULL due to lack of correct configurations, return */ > + if (!wrconn) > + return; > > IMO it would be better to Assert wrconn in this function. If it is > NULL then it should be checked a the caller, otherwise it just raises > more questions -- like "who logged the warning about bad > configuration" etc (which I already questions the NULL returns of > primary_connect. > > ~~~ > > 34. > + if (!OidIsValid(slot_db_data->database)) > + continue; > > This represents some kind of integrity error doesn't it? Is it really > OK just to silently skip such a thing? > > ~~~ > > 35. > + /* > + * If the worker is eligible to start now, launch it. Otherwise, > + * adjust wait_time so that we'll wake up as soon as it can be > + * started. > + * > + * Each apply worker can only be restarted once per > + * wal_retrieve_retry_interval, so that errors do not cause us to > + * repeatedly restart the worker as fast as possible. > + */ > > 35a. > I found the "we" part of "so that we'll wake up..." to be a bit > misleading. There is no waiting in this function; that wait value is > handed back to the caller to deal with. TBH, I did not really > understand why it is even necessary tp separate the waiting > calculation *per-worker* like this. It seems to overcomplicate things > and it might even give results like 1st worker is not started but last > works is started (if enough time elapsed in the loop). Why can't all > this wait logic be done one time up front, and either (a) start all > necessary workers, or (b) start none of them and wait a bit longer. > > ~ > > 35b. > "Each apply worker". Why is this talking about "apply" workers? Maybe > cut/paste error? > > ~~~ > > 36. > + last_launch_tried = slot_db_data->last_launch_time; > + now = GetCurrentTimestamp(); > + if (last_launch_tried == 0 || > + (elapsed = TimestampDifferenceMilliseconds(last_launch_tried, now)) >= > + wal_retrieve_retry_interval) > + { > + slot_db_data->last_launch_time = now; > + > + slotsync_worker_launch_or_reuse(slot_db_data->database); > + } > + else > + { > + *wait_time = Min(*wait_time, > + wal_retrieve_retry_interval - elapsed); > + } > > 36a. > IMO this might be simpler if you add another variable like bool 'launch_now': > > last_launch_tried = ... > now = ... > elapsed = ... > launch_now = elapsed >= wal_retrieve_retry_interval; > > ~ > > 36b. > Do you really care about checking "last_launch_tried == 0"; If it > really is zero, then I thought the elapsed check should be enough. > > ~ > > 36c. > Does this 'last_launch_time' really need to be in some shared memory? > Won't a static variable suffice? > > > ~~~ > > 37. ApplyLauncherStartSubs > > Wouldn't a better name for the function be something like > 'LaunchSubscriptionApplyWorker'? (it is a better match for the > suggested LaunchSlotSyncWorkers) > > > ~~~ > > 38. ApplyLauncherMain > > Now that this is not only for Apply worker but also for SlotSync > workers, maybe this function should be renamed as just LauncherMain, > or something equally generic? > > ~~~ > > 39. > + load_file("libpqwalreceiver", false); > + > + wrconn = primary_connect(NULL); > + > > This connection did not exist in the HEAD code so I think it is added > only for the slot-sync logic. IIUC it is still doing nothing for the > non-slot-sync cases because primary_connect will silently return in > that case: > > + if (!RecoveryInProgress()) > + return NULL; > > IMO this is too sneaky, and it is misleading to see the normal apply > worker launch apparently ccnnecting to something when it is not really > doing so AFAIK. I think these conditions should be done explicity here > at the caller to remove any such ambiguity. > > ~~~ > > 40. > + if (!RecoveryInProgress()) > + ApplyLauncherStartSubs(&wait_time); > + else > + ApplyLauncherStartSlotSync(&wait_time, wrconn); > > 40a. > IMO this is deserving of a comment to explain why RecoveryInProgress > means to perform the slot-synchronization. > > ~ > > 40b. > Also, better to have positive check RecoveryInProgress() instead of > !RecoveryInProgress() > > ~~~ > > 41. > if (ConfigReloadPending) > { > + bool ssConfigChanged = false; > + > + SaveCurrentSlotSyncConfigs(); > + > ConfigReloadPending = false; > ProcessConfigFile(PGC_SIGHUP); > + > + /* > + * Stop the slot-sync workers if any of the related GUCs changed. > + * These will be relaunched as per the new values during next > + * sync-cycle. > + */ > + ssConfigChanged = SlotSyncConfigsChanged(); > + if (ssConfigChanged) > + slotsync_workers_stop(); > + > + /* Reconnect in case primary_conninfo has changed */ > + wrconn = primary_connect(wrconn); > } > } > > ~ > > 41a. > The 'ssConfigChanged' assignement at declaration is not needed. > Indeed, the whole variable is not really necessary because it is used > only once. > > ~ > > 41b. > /as per the new values/using the new values/ > > ~ > > 41c. > + /* Reconnect in case primary_conninfo has changed */ > + wrconn = primary_connect(wrconn); > > To avoid unnecessary reconnections, shouldn't this be done only if > (ssConfigChanged). > > In fact, assuming the comment is correct, reconnect only if > (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) > > > ====== > src/backend/replication/logical/slotsync.c > > 42. wait_for_primary_slot_catchup > > + ereport(LOG, > + errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin" > + " (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)", > + remote_slot->name, > + LSN_FORMAT_ARGS(remote_slot->restart_lsn), > + remote_slot->catalog_xmin, > + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), > + MyReplicationSlot->data.catalog_xmin)); > > AFAIK it is usual for the LSN format string to be %X/%X (not %u/%X like here). > > ~~~ > > 43. > + appendStringInfo(&cmd, > + "SELECT restart_lsn, confirmed_flush_lsn, catalog_xmin" > + " FROM pg_catalog.pg_replication_slots" > + " WHERE slot_name = %s", > + quote_literal_cstr(remote_slot->name)); > > double space before FROM? > > ~~~ > > 44. synchronize_one_slot > > + /* > + * We might not have the WALs retained locally corresponding to > + * remote's restart_lsn if our local restart_lsn and/or local > + * catalog_xmin is ahead of remote's one. And thus we can not create > + * the local slot in sync with primary as that would mean moving local > + * slot backward. Thus wait for primary's restart_lsn and catalog_xmin > + * to catch up with the local ones and then do the sync. > + */ > + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || > + TransactionIdPrecedes(remote_slot->catalog_xmin, > + MyReplicationSlot->data.catalog_xmin)) > + { > + if (!wait_for_primary_slot_catchup(wrconn, remote_slot)) > + { > + /* > + * The remote slot didn't catch up to locally reserved > + * position > + */ > + ReplicationSlotRelease(); > + CommitTransactionCommand(); > + return; > + } > > > SUGGESTION (comment is slightly simplified) > If the local restart_lsn and/or local catalog_xmin is ahead of those > on the remote then we cannot create the local slot in sync with > primary because that would mean moving local slot backwards. In this > case we will wait for primary's restart_lsn and catalog_xmin to catch > up with the local one before attempting the sync. > > ====== > Kind Regards, > Peter Smith. > Fujitsu Australia
pgsql-hackers by date: