Re: Improve pg_sync_replication_slots() to wait for primary to advance - Mailing list pgsql-hackers
From | shveta malik |
---|---|
Subject | Re: Improve pg_sync_replication_slots() to wait for primary to advance |
Date | |
Msg-id | CAJpy0uAB0NxV8MuoKOJr09HJ-iMyRO41VXyf3bPeY74SPVBdYw@mail.gmail.com Whole thread Raw |
In response to | Re: Improve pg_sync_replication_slots() to wait for primary to advance (Ajin Cherian <itsajin@gmail.com>) |
Responses |
Re: Improve pg_sync_replication_slots() to wait for primary to advance
|
List | pgsql-hackers |
On Thu, Aug 14, 2025 at 7:28 AM Ajin Cherian <itsajin@gmail.com> wrote: > > > Patch v6 attached. > Thanks Ajin. Please find my comments: 1) SyncReplicationSlots: + remote_slots = fetch_or_refresh_remote_slots(wrconn, NIL); + + /* Retry until all slots are sync ready atleast */ + for (;;) + { + bool some_slot_updated = false; + + /* + * Refresh the remote slot data. We keep using the original slot + * list, even if some slots are already sync ready, so that all + * slots are updated with the latest status from the primary. + */ + remote_slots = fetch_or_refresh_remote_slots(wrconn, remote_slots); When the API begins, it seems we are fetching remote_list twice before we even sync it once. We can get rid of 'fetch_or_refresh_remote_slots' from outside the loop and retain the inside one. At first call, remote_slots will be NIL and thus it will fetch all slots and in subsequent calls, it will be populated one. 2) SyncReplicationSlots: + /* + * The syscache access in fetch_or_refresh_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + started_tx = true; + } + if (started_tx) + CommitTransactionCommand(); Shall we move these 2 inside fetch_or_refresh_remote_slots() (both worker and APi flow) similar to how validate_remote_info() also has it inside? 3) SyncReplicationSlots: + /* Done if all slots are atleast sync ready */ + if (!SlotSyncCtx->slot_not_persisted) + break; + else + { + /* wait for 2 seconds before retrying */ + wait_for_slot_activity(some_slot_updated, true); No need to have 'else' block here. The code can be put without having 'else' because 'if' when true, breaks from the loop. 4) 'fetch_or_refresh_remote_slots' can be renamed to 'fetch_remote_slots' simply and then a comment can define an extra argument. Because ultimately we are re-fetching some/all slots in both cases. 5) In the case of API, wait_for_slot_activity() does not change its wait time based on 'some_slot_updated'. I think we can pull 'WaitLatch, ResetLatch' in API-function itself and lets not change worker's wait_for_slot_activity(). 6) fetch_or_refresh_remote_slots: + { + if (is_refresh) + { + ereport(WARNING, + errmsg("could not fetch updated failover logical slots info" + " from the primary server: %s", + res->err)); + pfree(query.data); + return remote_slot_list; /* Return original list on refresh failure */ + } + else + { + ereport(ERROR, + errmsg("could not fetch failover logical slots info from the primary server: %s", + res->err)); + } + } I think there is no need for different behaviour here for worker and API. Since worker errors-out here, we can make API also error-out. 7) +fetch_or_refresh_remote_slots(WalReceiverConn *wrconn, List *remote_slot_list) We can name the argument as 'target_slot_list' and replace the name 'updated_slot_list' with 'remote_slot_list'. 8) + /* If refreshing, free the original list structures */ + if (is_refresh) + { + foreach(lc, remote_slot_list) + { + RemoteSlot *old_slot = (RemoteSlot *) lfirst(lc); + pfree(old_slot); + } + list_free(remote_slot_list); + } We can get rid of 'is_refresh' and can simply check if 'target_slot_list != NIL', free it. We can use list_free_deep instead of freeing each element. Having said that, it looks slightly odd to free the list in this function, I will think more here. Meanwhile, we can do this. 9) -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(WalReceiverConn * wrconn, + RemoteSlot * remote_slot, Oid remote_dbid) We can get rid of wrconn as we are not using it. Same with wrconn argument for synchronize_one_slot() 10) + /* used by pg_sync_replication_slots() API only */ + bool slot_not_persisted; We can move comment outside structure. We can first define it and then say the above line. 11) + SlotSyncCtx->slot_not_persisted = false; This may overwrite the 'slot_not_persisted' set for the previous slot and ultimately make it 'false' at the end of cycle even though we had few not-persisted slots in the beginning of cycle. Should it be: SlotSyncCtx->slot_not_persisted |= false; 12) Shall we rename this to : slot_persistence_pending (based on many other modules using similar names: detach_pending, send_pending, callback_pending)? 13) - errmsg("could not synchronize replication slot \"%s\"", - remote_slot->name), - errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.", - LSN_FORMAT_ARGS(remote_slot->restart_lsn), - remote_slot->catalog_xmin, - LSN_FORMAT_ARGS(slot->data.restart_lsn), - slot->data.catalog_xmin)); + errmsg("Replication slot \"%s\" is not sync ready; will keep retrying", + remote_slot->name), + errdetail("Attempting Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.", + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + slot->data.catalog_xmin)); We can retain the same message as it was put after a lot of discussion. We can attempt to change if others comment. The idea is since a worker dumps it in each subsequent cycle (if such a situation arises), on the same basis now the API can also do so because it is also performing multiple cycles now. Earlier I had suggested changing it for API based on messages 'continuing to wait..' which are no longer there now. thanks Shveta
pgsql-hackers by date: