Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers

From Andres Freund
Subject Re: Synchronizing slots from primary to standby
Date
Msg-id 20220207204557.74mgbhowydjco4mh@alap3.anarazel.de
Whole thread Raw
In response to Re: Synchronizing slots from primary to standby  (Peter Eisentraut <peter.eisentraut@enterprisedb.com>)
List pgsql-hackers
Hi,

On 2022-01-03 14:46:52 +0100, Peter Eisentraut wrote:
> +static void
> +ApplyLauncherStartSlotSync(TimestampTz *last_start_time, long *wait_time)
> +{
> [...]
> +
> +    foreach(lc, slots)
> +    {
> +        WalRecvReplicationSlotData *slot_data = lfirst(lc);
> +        LogicalRepWorker *w;
> +
> +        if (!OidIsValid(slot_data->database))
> +            continue;
> +
> +        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +        w = logicalrep_worker_find(slot_data->database, InvalidOid,
> +                                   InvalidOid, false);
> +        LWLockRelease(LogicalRepWorkerLock);
> +
> +        if (w == NULL)
> +        {
> +            *last_start_time = now;
> +            *wait_time = wal_retrieve_retry_interval;
> +
> +            logicalrep_worker_launch(slot_data->database, InvalidOid, NULL,
> +                                     BOOTSTRAP_SUPERUSERID, InvalidOid);

Do we really need a dedicated worker for each single slot? That seems
excessively expensive.


> +++ b/src/backend/replication/logical/reorderbuffer.c
> [...]
> +static void
> +wait_for_standby_confirmation(XLogRecPtr commit_lsn)
> +{
> +    char       *rawname;
> +    List       *namelist;
> +    ListCell   *lc;
> +    XLogRecPtr    flush_pos = InvalidXLogRecPtr;
> +
> +    if (strcmp(standby_slot_names, "") == 0)
> +        return;
> +
> +    rawname = pstrdup(standby_slot_names);
> +    SplitIdentifierString(rawname, ',', &namelist);
> +
> +    while (true)
> +    {
> +        int            wait_slots_remaining;
> +        XLogRecPtr    oldest_flush_pos = InvalidXLogRecPtr;
> +        int            rc;
> +
> +        wait_slots_remaining = list_length(namelist);
> +
> +        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> +        for (int i = 0; i < max_replication_slots; i++)
> +        {
> +            ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
> +            bool        inlist;
> +
> +            if (!s->in_use)
> +                continue;
> +
> +            inlist = false;
> +            foreach (lc, namelist)
> +            {
> +                char *name = lfirst(lc);
> +                if (strcmp(name, NameStr(s->data.name)) == 0)
> +                {
> +                    inlist = true;
> +                    break;
> +                }
> +            }
> +            if (!inlist)
> +                continue;
> +
> +            SpinLockAcquire(&s->mutex);

It doesn't seem like a good idea to perform O(max_replication_slots *
#standby_slot_names) work on each decoded commit. Nor to
SplitIdentifierString(pstrdup(standby_slot_names)) every time.


> +            if (s->data.database == InvalidOid)
> +                /* Physical slots advance restart_lsn on flush and ignore confirmed_flush_lsn */
> +                flush_pos = s->data.restart_lsn;
> +            else
> +                /* For logical slots we must wait for commit and flush */
> +                flush_pos = s->data.confirmed_flush;
> +
> +            SpinLockRelease(&s->mutex);
> +
> +            /* We want to find out the min(flush pos) over all named slots */
> +            if (oldest_flush_pos == InvalidXLogRecPtr
> +                || oldest_flush_pos > flush_pos)
> +                oldest_flush_pos = flush_pos;
> +
> +            if (flush_pos >= commit_lsn && wait_slots_remaining > 0)
> +                wait_slots_remaining --;
> +        }
> +        LWLockRelease(ReplicationSlotControlLock);
> +
> +        if (wait_slots_remaining == 0)
> +            return;
> +
> +        rc = WaitLatch(MyLatch,
> +                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
> +                       1000L, PG_WAIT_EXTENSION);

I don't think it's a good idea to block here like this - no walsender specific
handling is going to happen. E.g. not sending replies to receivers will cause
them to time out.   And for the SQL functions this will cause blocking even
though the interface expects to return when reaching the end of the WAL -
which this pretty much is.


I think this needs to be restructured so that you only do the checking of the
"up to this point" position when needed, rather than every commit. We already
*have* a check for not replaying further than the flushed WAL position, see
the GetFlushRecPtr() calls in WalSndWaitForWal(),
pg_logical_slot_get_changes_guts(). I think you'd basically need to integrate
with that, rather than introduce blocking in reorderbuffer.c



> +        if (rc & WL_POSTMASTER_DEATH)
> +            proc_exit(1);

Should use WL_EXIT_ON_PM_DEATH these days.

Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: Re: Synchronizing slots from primary to standby
Next
From: Tom Lane
Date:
Subject: Re: [RFC] building postgres with meson - autogenerated headers