Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Drouvot, Bertrand
Subject Re: Minimal logical decoding on standbys
Date
Msg-id b51f01b6-7ff4-fc5d-8068-6d94f9558235@gmail.com
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  (Andres Freund <andres@anarazel.de>)
Responses Re: Minimal logical decoding on standbys
List pgsql-hackers
Hi,

On 4/5/23 8:28 PM, Andres Freund wrote:
> Hi,
> 
> On 2023-04-05 17:56:14 +0200, Drouvot, Bertrand wrote:
> 
>> @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
>>           /* Update our copy of the parameters in pg_control */
>>           memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>>
>> +        /*
>> +         * Invalidate logical slots if we are in hot standby and the primary
>> +         * does not have a WAL level sufficient for logical decoding. No need
>> +         * to search for potentially conflicting logically slots if standby is
>> +         * running with wal_level lower than logical, because in that case, we
>> +         * would have either disallowed creation of logical slots or
>> +         * invalidated existing ones.
>> +         */
>> +        if (InRecovery && InHotStandby &&
>> +            xlrec.wal_level < WAL_LEVEL_LOGICAL &&
>> +            wal_level >= WAL_LEVEL_LOGICAL)
>> +        {
>> +            TransactionId ConflictHorizon = InvalidTransactionId;
>> +
>> +            InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, InvalidOid, &ConflictHorizon);
>> +        }
> 
> I mentioned this before, 

Sorry, I probably missed it.

> but I still don't understand why
> InvalidateObsoleteReplicationSlots() accepts ConflictHorizon as a
> pointer. It's not even modified, as far as I can see?
> 

The initial goal was to be able to check if
xid pointer was NULL and also if *xid was a valid xid or not. So basically being able to
do 3 checks with the same parameter.

That's how we decided wether or not we are in the wal_level < logical on primary conflict case in
ReportTerminationInvalidation().

I agree that passing a pointer is not the best approach (as there is a "risk" of modifying the value it points to),
so adding an extra bool to InvalidateObsoleteReplicationSlots() in attached V62 instead.

Also replacing the InvalidXLogRecPtr by 0 as it does sound odd to use "InvalidXLogRecPtr"
naming for a XLogSegNo.

> 
>>   /*
>>    * Report shared-memory space needed by ReplicationSlotsShmemInit.
>>    */
>> @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>>           SpinLockAcquire(&s->mutex);
>>           effective_xmin = s->effective_xmin;
>>           effective_catalog_xmin = s->effective_catalog_xmin;
>> -        invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
>> -                       XLogRecPtrIsInvalid(s->data.restart_lsn));
>> +        invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s);
>>           SpinLockRelease(&s->mutex);
> 
> I don't understand why we need to have two different functions for this.
> 

LogicalReplicationSlotIsInvalid() has been created to provide a different error message
than in ".....because it exceeded the maximum reserved size" in StartLogicalReplication()
and "This slot has never previously reserved WAL" in pg_logical_slot_get_changes_guts().

So basically to distinguish with the max_slot_wal_keep_size related messages.

> 
>>           /* invalidated slots need not apply */
>> @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void)
>>       }
>>   }
>>
>> +
>> +/*
>> + * Report terminating or conflicting message.
>> + *
>> + * For both, logical conflict on standby and obsolete slot are handled.
>> + */
>> +static void
>> +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
>> +                              NameData slotname, TransactionId *xid,
>> +                              XLogRecPtr restart_lsn, XLogRecPtr oldestLSN)
>> +{
>> +    StringInfoData err_msg;
>> +    StringInfoData err_detail;
>> +    bool        hint = false;
>> +
>> +    initStringInfo(&err_detail);
>> +
>> +    if (check_on_xid)
>> +    {
>> +        if (!terminating)
>> +        {
>> +            initStringInfo(&err_msg);
>> +            appendStringInfo(&err_msg, _("invalidating replication slot \"%s\" because it conflicts with
recovery"),
>> +                             NameStr(slotname));
> 
> I still don't think the main error message should differ between invalidating
> a slot due recovery and max_slot_wal_keep_size.

Okay. I gave a second thought and I agree that "obsolete" does also make
sense for the xid conflict case. So, done that way in V62.

> 
>> +
>>   /*
>> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
>> - * and mark it invalid, if necessary and possible.
>> + * Helper for InvalidateObsoleteReplicationSlots
>> + *
>> + * Acquires the given slot and mark it invalid, if necessary and possible.
>>    *
>>    * Returns whether ReplicationSlotControlLock was released in the interim (and
>>    * in that case we're not holding the lock at return, otherwise we are).
>>    *
>> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
>> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)
> 
> What's the point of making this specific to "obsolete slots"?

There is no. Should be coming from a previous version/experiment.
Removed in V62, thanks!

> 
> 
>>    * This is inherently racy, because we release the LWLock
>>    * for syscalls, so caller must restart if we return true.
>>    */
>>   static bool
>>   InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> -                               bool *invalidated)
>> +                               bool *invalidated, TransactionId *xid)
>>   {
>>       int            last_signaled_pid = 0;
>>       bool        released_lock = false;
>> +    bool        check_on_xid;
>> +
>> +    check_on_xid = xid ? true : false;
>>
>>       for (;;)
>>       {
>>           XLogRecPtr    restart_lsn;
>> +
>>           NameData    slotname;
>>           int            active_pid = 0;
>>
>> @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>            * Check if the slot needs to be invalidated. If it needs to be
>>            * invalidated, and is not currently acquired, acquire it and mark it
>>            * as having been invalidated.  We do this with the spinlock held to
>> -         * avoid race conditions -- for example the restart_lsn could move
>> -         * forward, or the slot could be dropped.
>> +         * avoid race conditions -- for example the restart_lsn (or the
>> +         * xmin(s) could) move forward or the slot could be dropped.
>>            */
>>           SpinLockAcquire(&s->mutex);
>>
>>           restart_lsn = s->data.restart_lsn;
>>
>>           /*
>> -         * If the slot is already invalid or is fresh enough, we don't need to
>> -         * do anything.
>> +         * If the slot is already invalid or is a non conflicting slot, we
>> +         * don't need to do anything.
>>            */
>> -        if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
>> +        if (DoNotInvalidateSlot(s, xid, &oldestLSN))
> 
> DoNotInvalidateSlot() seems odd to me, and makes the code harder to
> understand. I'd make it something like:
> 
> if (!SlotIsInvalid(s) && (
>        LogicalSlotConflictsWith(s, xid) ||
>        SlotConflictsWithLSN(s, lsn)))
> 

I think that's a matter of taste (having a single function was suggested
by Amit up-thread).

I think I prefer having one single function as it seems to me easier to
understand if we want to check on xid or not.

> 
>>   /*
>> - * Mark any slot that points to an LSN older than the given segment
>> - * as invalid; it requires WAL that's about to be removed.
>> + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots.
> 
> I don't like that this spreads "obsolete slots" around further - it's very
> unspecific. A logical slot that needs to be removed due to an xid conflict is
> just as obsolete as one that needs to be removed due to max_slot_wal_keep_size.
> 
> I'd rephrase this to be about required resources getting removed or such, one
> case of that is WAL another case is xids.
> 

Agree. Re-worded in V62.


>>   restart:
>>       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
>> @@ -1414,21 +1505,35 @@ restart:
>>           if (!s->in_use)
>>               continue;
>>
>> -        if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
>> +        if (xid)
>>           {
>> -            /* if the lock was released, start from scratch */
>> -            goto restart;
>> +            /* we are only dealing with *logical* slot conflicts */
>> +            if (!SlotIsLogical(s))
>> +                continue;
>> +
>> +            /*
>> +             * not the database of interest and we don't want all the
>> +             * database, skip
>> +             */
>> +            if (s->data.database != dboid && TransactionIdIsValid(*xid))
>> +                continue;
> 
> ISTM that this should be in InvalidatePossiblyObsoleteSlot().
> 

Agree, done in V62.

> 
>>       /*
>> -     * If any slots have been invalidated, recalculate the resource limits.
>> +     * If any slots have been invalidated, recalculate the required xmin and
>> +     * the required lsn (if appropriate).
>>        */
>>       if (invalidated)
>>       {
>>           ReplicationSlotsComputeRequiredXmin(false);
>> -        ReplicationSlotsComputeRequiredLSN();
>> +        if (!xid)
>> +            ReplicationSlotsComputeRequiredLSN();
>>       }
> 
> Why make this conditional? If we invalidated a logical slot, we also don't
> require as much WAL anymore, no?
> 

Agree, done in V62.

> 
>> @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
>>                                              PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
>>                                              WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
>>                                              true);
>> +
>> +    if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
>> +        InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, locator.dbOid, &snapshotConflictHorizon);
>>   }
> 
> Hm. Is there a reason for doing this before resolving conflicts with existing
> sessions?
> 

Do you mean, you'd prefer to InvalidateObsoleteReplicationSlots() before ResolveRecoveryConflictWithVirtualXIDs()?

> 
> Another issue: ResolveRecoveryConflictWithVirtualXIDs() takes
> WaitExceedsMaxStandbyDelay() into account, but
> InvalidateObsoleteReplicationSlots() does not.

humm, good point.

> I think that's ok, because the
> setup should prevent this case from being reached in normal paths, but at
> least there should be a comment documenting this.
> 

I started to add the comment InvalidateObsoleteReplicationSlots() but I'm not
sure what you mean by "the setup should prevent this case from being reached in normal paths"
(so I let "XXXX" in the comment for now).

Did you mean hsf and a physical slot between the primary and the standby should be in place?
Could you please elaborate?

> 
> 
>> +static inline bool
>> +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
>> +{
>> +    TransactionId slot_effective_xmin;
>> +    TransactionId slot_catalog_xmin;
>> +
>> +    slot_effective_xmin = s->effective_xmin;
>> +    slot_catalog_xmin = s->data.catalog_xmin;
>> +
>> +    return (((TransactionIdIsValid(slot_effective_xmin) && TransactionIdPrecedesOrEquals(slot_effective_xmin, xid))
||
>> +             (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
>> +}
> 
> return -ETOOMANYPARENS
> 

gave it a try to make it better in V62.

> 
>> +static inline bool
>> +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
>> +{
>> +    return (s->data.restart_lsn >= oldestLSN);
>> +}
>> +
>> +static inline bool
>> +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid)
>> +{
>> +    return (TransactionIdIsValid(*xid) && !LogicalReplicationSlotXidsConflict(s, *xid));
>> +}
>> +
>> +static inline bool
>> +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr *oldestLSN)
>> +{
>> +    if (xid)
>> +        return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid));
>> +    else
>> +        return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, *oldestLSN));
>> +
>> +}
> 
> See above for some more comments. But please don't accept stuff via pointer if
> you don't have a reason for it. There's no reason for it for xid and oldestLSN
> afaict.

Agree that there is no reason for oldestLSN. Changing in V62.
As far the xid, I explained why I used a pointer above but find a way to remove the need
in V62 (as explained above).

> 
> 
>> diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
>> index dbe9394762..186e4ef600 100644
>> --- a/src/backend/access/transam/xlogrecovery.c
>> +++ b/src/backend/access/transam/xlogrecovery.c
>> @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
>>       XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
>>       SpinLockRelease(&XLogRecoveryCtl->info_lck);
>>
>> +    /*
>> +     * Wakeup walsenders:
>> +     *
>> +     * On the standby, the WAL is flushed first (which will only wake up
>> +     * physical walsenders) and then applied, which will only wake up logical
>> +     * walsenders.
>> +     * Indeed, logical walsenders on standby can't decode and send data until
>> +     * it's been applied.
>> +     *
>> +     * Physical walsenders don't need to be waked up during replay unless
> 
> s/waked/woken/

Thans, fixed.

>> +     * cascading replication is allowed and time line change occured (so that
>> +     * they can notice that they are on a new time line).
>> +     *
>> +     * That's why the wake up conditions are for:
>> +     *
>> +     *  - physical walsenders in case of new time line and cascade
>> +     *  replication is allowed.
>> +     *  - logical walsenders in case of new time line or recovery is in progress
>> +     *  (logical decoding on standby).
>> +     */
>> +    WalSndWakeup(switchedTLI && AllowCascadeReplication(),
>> +                 switchedTLI || RecoveryInProgress());
> 
> I don't think it's possible to get here without RecoveryInProgress() being
> true. So we don't need that condition.

Right, so using "true" instead as we don't want to rely only on a time line change
for a logical walsender.

> 
> 
>> @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
>>           /* Signal the startup process and walsender that new WAL has arrived */
>>           WakeupRecovery();
>>           if (AllowCascadeReplication())
>> -            WalSndWakeup();
>> +            WalSndWakeup(true, !RecoveryInProgress());
> 
> Same comment as earlier.

done.

> 
> 
>>           /* Report XLOG streaming progress in PS display */
>>           if (update_process_title)
>> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
>> index 2d908d1de2..5c68ebb79e 100644
>> --- a/src/backend/replication/walsender.c
>> +++ b/src/backend/replication/walsender.c
>> @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void)
>>               walsnd->sync_standby_priority = 0;
>>               walsnd->latch = &MyProc->procLatch;
>>               walsnd->replyTime = 0;
>> +
>> +            /*
>> +             * The kind assignment is done here and not in StartReplication()
>> +             * and StartLogicalReplication(). Indeed, the logical walsender
>> +             * needs to read WAL records (like snapshot of running
>> +             * transactions) during the slot creation. So it needs to be woken
>> +             * up based on its kind.
>> +             *
>> +             * The kind assignment could also be done in StartReplication(),
>> +             * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
>> +             * seems better to set it on one place.
>> +             */
> 
> Doesn't that mean we'll wake up logical walsenders even if they're doing
> normal query processing?
> 

I'm not following what you mean here.

> 
>> +            if (MyDatabaseId == InvalidOid)
>> +                walsnd->kind = REPLICATION_KIND_PHYSICAL;
>> +            else
>> +                walsnd->kind = REPLICATION_KIND_LOGICAL;
>> +
>>               SpinLockRelease(&walsnd->mutex);
>>               /* don't need the lock anymore */
>>               MyWalSnd = (WalSnd *) walsnd;
>> @@ -3310,30 +3327,39 @@ WalSndShmemInit(void)
>>   }
>>
>>   /*
>> - * Wake up all walsenders
>> + * Wake up physical, logical or both walsenders kind
>> + *
>> + * The distinction between physical and logical walsenders is done, because:
>> + * - physical walsenders can't send data until it's been flushed
>> + * - logical walsenders on standby can't decode and send data until it's been
>> + * applied
>> + *
>> + * For cascading replication we need to wake up physical
>> + * walsenders separately from logical walsenders (see the comment before calling
>> + * WalSndWakeup() in ApplyWalRecord() for more details).
>>    *
>>    * This will be called inside critical sections, so throwing an error is not
>>    * advisable.
>>    */
>>   void
>> -WalSndWakeup(void)
>> +WalSndWakeup(bool physical, bool logical)
>>   {
>>       int            i;
>>
>>       for (i = 0; i < max_wal_senders; i++)
>>       {
>>           Latch       *latch;
>> +        ReplicationKind kind;
>>           WalSnd       *walsnd = &WalSndCtl->walsnds[i];
>>
>> -        /*
>> -         * Get latch pointer with spinlock held, for the unlikely case that
>> -         * pointer reads aren't atomic (as they're 8 bytes).
>> -         */
>> +        /* get latch pointer and kind with spinlock helds */
>>           SpinLockAcquire(&walsnd->mutex);
>>           latch = walsnd->latch;
>> +        kind = walsnd->kind;
>>           SpinLockRelease(&walsnd->mutex);
>>
>> -        if (latch != NULL)
>> +        if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
>> +                              (logical && kind == REPLICATION_KIND_LOGICAL)))
>>               SetLatch(latch);
>>       }
>>   }
> 
> I'd consider rewriting this to something like:
> 
> if (latch == NULL)
>      continue;
> 
> if ((physical && kind == REPLICATION_KIND_PHYSICAL)) ||
>     (logical && kind == REPLICATION_KIND_LOGICAL)
>      SetLatch(latch)
> 

Yeah better, done.

Regards,

-- 
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachment

pgsql-hackers by date:

Previous
From: Daniel Gustafsson
Date:
Subject: Re: Should vacuum process config file reload more often
Next
From: "Drouvot, Bertrand"
Date:
Subject: Re: Minimal logical decoding on standbys