Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Andres Freund
Subject Re: Minimal logical decoding on standbys
Date
Msg-id 20230405182835.ehufe4fj2zx3pjix@awork3.anarazel.de
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  ("Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>)
Responses Re: Minimal logical decoding on standbys  ("Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>)
List pgsql-hackers
Hi,

On 2023-04-05 17:56:14 +0200, Drouvot, Bertrand wrote:

> @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
>          /* Update our copy of the parameters in pg_control */
>          memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>
> +        /*
> +         * Invalidate logical slots if we are in hot standby and the primary
> +         * does not have a WAL level sufficient for logical decoding. No need
> +         * to search for potentially conflicting logically slots if standby is
> +         * running with wal_level lower than logical, because in that case, we
> +         * would have either disallowed creation of logical slots or
> +         * invalidated existing ones.
> +         */
> +        if (InRecovery && InHotStandby &&
> +            xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +            wal_level >= WAL_LEVEL_LOGICAL)
> +        {
> +            TransactionId ConflictHorizon = InvalidTransactionId;
> +
> +            InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, InvalidOid, &ConflictHorizon);
> +        }

I mentioned this before, but I still don't understand why
InvalidateObsoleteReplicationSlots() accepts ConflictHorizon as a
pointer. It's not even modified, as far as I can see?


>  /*
>   * Report shared-memory space needed by ReplicationSlotsShmemInit.
>   */
> @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>          SpinLockAcquire(&s->mutex);
>          effective_xmin = s->effective_xmin;
>          effective_catalog_xmin = s->effective_catalog_xmin;
> -        invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> -                       XLogRecPtrIsInvalid(s->data.restart_lsn));
> +        invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s);
>          SpinLockRelease(&s->mutex);

I don't understand why we need to have two different functions for this.


>          /* invalidated slots need not apply */
> @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void)
>      }
>  }
>
> +
> +/*
> + * Report terminating or conflicting message.
> + *
> + * For both, logical conflict on standby and obsolete slot are handled.
> + */
> +static void
> +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
> +                              NameData slotname, TransactionId *xid,
> +                              XLogRecPtr restart_lsn, XLogRecPtr oldestLSN)
> +{
> +    StringInfoData err_msg;
> +    StringInfoData err_detail;
> +    bool        hint = false;
> +
> +    initStringInfo(&err_detail);
> +
> +    if (check_on_xid)
> +    {
> +        if (!terminating)
> +        {
> +            initStringInfo(&err_msg);
> +            appendStringInfo(&err_msg, _("invalidating replication slot \"%s\" because it conflicts with
recovery"),
> +                             NameStr(slotname));

I still don't think the main error message should differ between invalidating
a slot due recovery and max_slot_wal_keep_size.

> +
>  /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
>   *
>   * Returns whether ReplicationSlotControlLock was released in the interim (and
>   * in that case we're not holding the lock at return, otherwise we are).
>   *
> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)

What's the point of making this specific to "obsolete slots"?


>   * This is inherently racy, because we release the LWLock
>   * for syscalls, so caller must restart if we return true.
>   */
>  static bool
>  InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> -                               bool *invalidated)
> +                               bool *invalidated, TransactionId *xid)
>  {
>      int            last_signaled_pid = 0;
>      bool        released_lock = false;
> +    bool        check_on_xid;
> +
> +    check_on_xid = xid ? true : false;
>
>      for (;;)
>      {
>          XLogRecPtr    restart_lsn;
> +
>          NameData    slotname;
>          int            active_pid = 0;
>
> @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>           * Check if the slot needs to be invalidated. If it needs to be
>           * invalidated, and is not currently acquired, acquire it and mark it
>           * as having been invalidated.  We do this with the spinlock held to
> -         * avoid race conditions -- for example the restart_lsn could move
> -         * forward, or the slot could be dropped.
> +         * avoid race conditions -- for example the restart_lsn (or the
> +         * xmin(s) could) move forward or the slot could be dropped.
>           */
>          SpinLockAcquire(&s->mutex);
>
>          restart_lsn = s->data.restart_lsn;
>
>          /*
> -         * If the slot is already invalid or is fresh enough, we don't need to
> -         * do anything.
> +         * If the slot is already invalid or is a non conflicting slot, we
> +         * don't need to do anything.
>           */
> -        if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> +        if (DoNotInvalidateSlot(s, xid, &oldestLSN))

DoNotInvalidateSlot() seems odd to me, and makes the code harder to
understand. I'd make it something like:

if (!SlotIsInvalid(s) && (
      LogicalSlotConflictsWith(s, xid) ||
      SlotConflictsWithLSN(s, lsn)))


>  /*
> - * Mark any slot that points to an LSN older than the given segment
> - * as invalid; it requires WAL that's about to be removed.
> + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots.

I don't like that this spreads "obsolete slots" around further - it's very
unspecific. A logical slot that needs to be removed due to an xid conflict is
just as obsolete as one that needs to be removed due to max_slot_wal_keep_size.

I'd rephrase this to be about required resources getting removed or such, one
case of that is WAL another case is xids.

>  restart:
>      LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> @@ -1414,21 +1505,35 @@ restart:
>          if (!s->in_use)
>              continue;
>
> -        if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
> +        if (xid)
>          {
> -            /* if the lock was released, start from scratch */
> -            goto restart;
> +            /* we are only dealing with *logical* slot conflicts */
> +            if (!SlotIsLogical(s))
> +                continue;
> +
> +            /*
> +             * not the database of interest and we don't want all the
> +             * database, skip
> +             */
> +            if (s->data.database != dboid && TransactionIdIsValid(*xid))
> +                continue;

ISTM that this should be in InvalidatePossiblyObsoleteSlot().


>      /*
> -     * If any slots have been invalidated, recalculate the resource limits.
> +     * If any slots have been invalidated, recalculate the required xmin and
> +     * the required lsn (if appropriate).
>       */
>      if (invalidated)
>      {
>          ReplicationSlotsComputeRequiredXmin(false);
> -        ReplicationSlotsComputeRequiredLSN();
> +        if (!xid)
> +            ReplicationSlotsComputeRequiredLSN();
>      }

Why make this conditional? If we invalidated a logical slot, we also don't
require as much WAL anymore, no?


> @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
>                                             PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
>                                             WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
>                                             true);
> +
> +    if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
> +        InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, locator.dbOid, &snapshotConflictHorizon);
>  }

Hm. Is there a reason for doing this before resolving conflicts with existing
sessions?


Another issue: ResolveRecoveryConflictWithVirtualXIDs() takes
WaitExceedsMaxStandbyDelay() into account, but
InvalidateObsoleteReplicationSlots() does not. I think that's ok, because the
setup should prevent this case from being reached in normal paths, but at
least there should be a comment documenting this.



> +static inline bool
> +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
> +{
> +    TransactionId slot_effective_xmin;
> +    TransactionId slot_catalog_xmin;
> +
> +    slot_effective_xmin = s->effective_xmin;
> +    slot_catalog_xmin = s->data.catalog_xmin;
> +
> +    return (((TransactionIdIsValid(slot_effective_xmin) && TransactionIdPrecedesOrEquals(slot_effective_xmin, xid))
||
> +             (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
> +}

return -ETOOMANYPARENS


> +static inline bool
> +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
> +{
> +    return (s->data.restart_lsn >= oldestLSN);
> +}
> +
> +static inline bool
> +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid)
> +{
> +    return (TransactionIdIsValid(*xid) && !LogicalReplicationSlotXidsConflict(s, *xid));
> +}
> +
> +static inline bool
> +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr *oldestLSN)
> +{
> +    if (xid)
> +        return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid));
> +    else
> +        return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, *oldestLSN));
> +
> +}

See above for some more comments. But please don't accept stuff via pointer if
you don't have a reason for it. There's no reason for it for xid and oldestLSN
afaict.


> diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
> index dbe9394762..186e4ef600 100644
> --- a/src/backend/access/transam/xlogrecovery.c
> +++ b/src/backend/access/transam/xlogrecovery.c
> @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
>      XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
>      SpinLockRelease(&XLogRecoveryCtl->info_lck);
>
> +    /*
> +     * Wakeup walsenders:
> +     *
> +     * On the standby, the WAL is flushed first (which will only wake up
> +     * physical walsenders) and then applied, which will only wake up logical
> +     * walsenders.
> +     * Indeed, logical walsenders on standby can't decode and send data until
> +     * it's been applied.
> +     *
> +     * Physical walsenders don't need to be waked up during replay unless

s/waked/woken/

> +     * cascading replication is allowed and time line change occured (so that
> +     * they can notice that they are on a new time line).
> +     *
> +     * That's why the wake up conditions are for:
> +     *
> +     *  - physical walsenders in case of new time line and cascade
> +     *  replication is allowed.
> +     *  - logical walsenders in case of new time line or recovery is in progress
> +     *  (logical decoding on standby).
> +     */
> +    WalSndWakeup(switchedTLI && AllowCascadeReplication(),
> +                 switchedTLI || RecoveryInProgress());

I don't think it's possible to get here without RecoveryInProgress() being
true. So we don't need that condition.


> @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
>          /* Signal the startup process and walsender that new WAL has arrived */
>          WakeupRecovery();
>          if (AllowCascadeReplication())
> -            WalSndWakeup();
> +            WalSndWakeup(true, !RecoveryInProgress());

Same comment as earlier.


>          /* Report XLOG streaming progress in PS display */
>          if (update_process_title)
> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
> index 2d908d1de2..5c68ebb79e 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void)
>              walsnd->sync_standby_priority = 0;
>              walsnd->latch = &MyProc->procLatch;
>              walsnd->replyTime = 0;
> +
> +            /*
> +             * The kind assignment is done here and not in StartReplication()
> +             * and StartLogicalReplication(). Indeed, the logical walsender
> +             * needs to read WAL records (like snapshot of running
> +             * transactions) during the slot creation. So it needs to be woken
> +             * up based on its kind.
> +             *
> +             * The kind assignment could also be done in StartReplication(),
> +             * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
> +             * seems better to set it on one place.
> +             */

Doesn't that mean we'll wake up logical walsenders even if they're doing
normal query processing?


> +            if (MyDatabaseId == InvalidOid)
> +                walsnd->kind = REPLICATION_KIND_PHYSICAL;
> +            else
> +                walsnd->kind = REPLICATION_KIND_LOGICAL;
> +
>              SpinLockRelease(&walsnd->mutex);
>              /* don't need the lock anymore */
>              MyWalSnd = (WalSnd *) walsnd;
> @@ -3310,30 +3327,39 @@ WalSndShmemInit(void)
>  }
>
>  /*
> - * Wake up all walsenders
> + * Wake up physical, logical or both walsenders kind
> + *
> + * The distinction between physical and logical walsenders is done, because:
> + * - physical walsenders can't send data until it's been flushed
> + * - logical walsenders on standby can't decode and send data until it's been
> + * applied
> + *
> + * For cascading replication we need to wake up physical
> + * walsenders separately from logical walsenders (see the comment before calling
> + * WalSndWakeup() in ApplyWalRecord() for more details).
>   *
>   * This will be called inside critical sections, so throwing an error is not
>   * advisable.
>   */
>  void
> -WalSndWakeup(void)
> +WalSndWakeup(bool physical, bool logical)
>  {
>      int            i;
>
>      for (i = 0; i < max_wal_senders; i++)
>      {
>          Latch       *latch;
> +        ReplicationKind kind;
>          WalSnd       *walsnd = &WalSndCtl->walsnds[i];
>
> -        /*
> -         * Get latch pointer with spinlock held, for the unlikely case that
> -         * pointer reads aren't atomic (as they're 8 bytes).
> -         */
> +        /* get latch pointer and kind with spinlock helds */
>          SpinLockAcquire(&walsnd->mutex);
>          latch = walsnd->latch;
> +        kind = walsnd->kind;
>          SpinLockRelease(&walsnd->mutex);
>
> -        if (latch != NULL)
> +        if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
> +                              (logical && kind == REPLICATION_KIND_LOGICAL)))
>              SetLatch(latch);
>      }
>  }

I'd consider rewriting this to something like:

if (latch == NULL)
    continue;

if ((physical && kind == REPLICATION_KIND_PHYSICAL)) ||
   (logical && kind == REPLICATION_KIND_LOGICAL)
    SetLatch(latch)



Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Pavel Stehule
Date:
Subject: Re: Schema variables - new implementation for Postgres 15
Next
From: Tom Lane
Date:
Subject: Re: on placeholder entries in view rule action query's range table