Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Andres Freund
Subject Re: Minimal logical decoding on standbys
Date
Msg-id 20230330070431.tojeodaeo4zmpofu@awork3.anarazel.de
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  ("Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>)
Responses Re: Minimal logical decoding on standbys  ("Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>)
List pgsql-hackers
Hi,

On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote:
> Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical
>  slot conflict handling.
> 
> Overall design:
> 
> 1. We want to enable logical decoding on standbys, but replay of WAL
> from the primary might remove data that is needed by logical decoding,
> causing error(s) on the standby. To prevent those errors, a new replication
> conflict scenario needs to be addressed (as much as hot standby does).
> 
> 2. Our chosen strategy for dealing with this type of replication slot
> is to invalidate logical slots for which needed data has been removed.
> 
> 3. To do this we need the latestRemovedXid for each change, just as we
> do for physical replication conflicts, but we also need to know
> whether any particular change was to data that logical replication
> might access. That way, during WAL replay, we know when there is a risk of
> conflict and, if so, if there is a conflict.
> 
> 4. We can't rely on the standby's relcache entries for this purpose in
> any way, because the startup process can't access catalog contents.
> 
> 5. Therefore every WAL record that potentially removes data from the
> index or heap must carry a flag indicating whether or not it is one
> that might be accessed during logical decoding.
> 
> Why do we need this for logical decoding on standby?
> 
> First, let's forget about logical decoding on standby and recall that
> on a primary database, any catalog rows that may be needed by a logical
> decoding replication slot are not removed.
> 
> This is done thanks to the catalog_xmin associated with the logical
> replication slot.
> 
> But, with logical decoding on standby, in the following cases:
> 
> - hot_standby_feedback is off
> - hot_standby_feedback is on but there is no a physical slot between
>   the primary and the standby. Then, hot_standby_feedback will work,
>   but only while the connection is alive (for example a node restart
>   would break it)
> 
> Then, the primary may delete system catalog rows that could be needed
> by the logical decoding on the standby (as it does not know about the
> catalog_xmin on the standby).
> 
> So, it’s mandatory to identify those rows and invalidate the slots
> that may need them if any. Identifying those rows is the purpose of
> this commit.

This is a very nice commit message.


> Implementation:
> 
> When a WAL replay on standby indicates that a catalog table tuple is
> to be deleted by an xid that is greater than a logical slot's
> catalog_xmin, then that means the slot's catalog_xmin conflicts with
> the xid, and we need to handle the conflict. While subsequent commits
> will do the actual conflict handling, this commit adds a new field
> isCatalogRel in such WAL records (and a new bit set in the
> xl_heap_visible flags field), that is true for catalog tables, so as to
> arrange for conflict handling.
> 
> The affected WAL records are the ones that already contain the
> snapshotConflictHorizon field, namely:
> 
> - gistxlogDelete
> - gistxlogPageReuse
> - xl_hash_vacuum_one_page
> - xl_heap_prune
> - xl_heap_freeze_page
> - xl_heap_visible
> - xl_btree_reuse_page
> - xl_btree_delete
> - spgxlogVacuumRedirect
> 
> Due to this new field being added, xl_hash_vacuum_one_page and
> gistxlogDelete do now contain the offsets to be deleted as a
> FLEXIBLE_ARRAY_MEMBER. This is needed to ensure correct alignement.
> It's not needed on the others struct where isCatalogRel has
> been added.
> 
> Author: Andres Freund (in an older version), Amit Khandekar, Bertrand
> Drouvot

I think you're first author on this one by now.


I think this commit is ready to go. Unless somebody thinks differently, I
think I might push it tomorrow.


> Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby.


> @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags)
>       */
>      XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
>      KeepLogSeg(recptr, &_logSegNo);
> -    if (InvalidateObsoleteReplicationSlots(_logSegNo))
> +    InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL);
> +    if (invalidated)
>      {
>          /*
>           * Some slots have been invalidated; recalculate the old-segment

I don't really understand why you changed InvalidateObsoleteReplicationSlots
to return void instead of bool, and then added an output boolean argument via
a pointer?



> @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record)
>          /* Update our copy of the parameters in pg_control */
>          memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>  
> +        /*
> +         * Invalidate logical slots if we are in hot standby and the primary does not
> +         * have a WAL level sufficient for logical decoding. No need to search
> +         * for potentially conflicting logically slots if standby is running
> +         * with wal_level lower than logical, because in that case, we would
> +         * have either disallowed creation of logical slots or invalidated existing
> +         * ones.
> +         */
> +        if (InRecovery && InHotStandby &&
> +            xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +            wal_level >= WAL_LEVEL_LOGICAL)
> +        {
> +            TransactionId ConflictHorizon = InvalidTransactionId;
> +            InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, InvalidOid, &ConflictHorizon);
> +        }
> +

Are there races around changing wal_level?



> @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>          SpinLockAcquire(&s->mutex);
>          effective_xmin = s->effective_xmin;
>          effective_catalog_xmin = s->effective_catalog_xmin;
> -        invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> -                       XLogRecPtrIsInvalid(s->data.restart_lsn));
> +        invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> +                        XLogRecPtrIsInvalid(s->data.restart_lsn))
> +                       || (!TransactionIdIsValid(s->data.xmin) &&
> +                           !TransactionIdIsValid(s->data.catalog_xmin)));
>          SpinLockRelease(&s->mutex);
>  
>          /* invalidated slots need not apply */

I still would like a wrapper function to determine whether a slot has been
invalidated. This This is too complicated to be repeated in other places.


> @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void)
>  }
>  
>  /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
>   *
>   * Returns whether ReplicationSlotControlLock was released in the interim (and
>   * in that case we're not holding the lock at return, otherwise we are).
>   *
> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)
>   *
>   * This is inherently racy, because we release the LWLock
>   * for syscalls, so caller must restart if we return true.
>   */
>  static bool
> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> -                               bool *invalidated)
> +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> +                                                   bool *invalidated, TransactionId *xid)

This is too long a name. I'd probably just leave it at the old name.



> @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>           * Check if the slot needs to be invalidated. If it needs to be
>           * invalidated, and is not currently acquired, acquire it and mark it
>           * as having been invalidated.  We do this with the spinlock held to
> -         * avoid race conditions -- for example the restart_lsn could move
> -         * forward, or the slot could be dropped.
> +         * avoid race conditions -- for example the restart_lsn (or the
> +         * xmin(s) could) move forward or the slot could be dropped.
>           */
>          SpinLockAcquire(&s->mutex);
>  
>          restart_lsn = s->data.restart_lsn;
> +        slot_xmin = s->data.xmin;
> +        slot_catalog_xmin = s->data.catalog_xmin;
> +
> +        /* slot has been invalidated (logical decoding conflict case) */
> +        if ((xid &&
> +             ((LogicalReplicationSlotIsInvalid(s))
> +              ||
>  

Uh, huh?

That's very odd formatting.

>          /*
> -         * If the slot is already invalid or is fresh enough, we don't need to
> -         * do anything.
> +         * We are not forcing for invalidation because the xid is valid and
> +         * this is a non conflicting slot.
>           */
> -        if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> +              (TransactionIdIsValid(*xid) && !(
> +                                               (TransactionIdIsValid(slot_xmin) &&
TransactionIdPrecedesOrEquals(slot_xmin,*xid))
 
> +                                               ||
> +                                               (TransactionIdIsValid(slot_catalog_xmin) &&
TransactionIdPrecedesOrEquals(slot_catalog_xmin,*xid))
 
> +                                               ))
> +              ))
> +            ||
> +        /* slot has been invalidated (obsolete LSN case) */
> +            (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
>          {
>              SpinLockRelease(&s->mutex);
>              if (released_lock)


This needs some cleanup.


> @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>          {
>              MyReplicationSlot = s;
>              s->active_pid = MyProcPid;
> -            s->data.invalidated_at = restart_lsn;
> -            s->data.restart_lsn = InvalidXLogRecPtr;
> -
> +            if (xid)
> +            {
> +                s->data.xmin = InvalidTransactionId;
> +                s->data.catalog_xmin = InvalidTransactionId;
> +            }
> +            else
> +            {
> +                s->data.invalidated_at = restart_lsn;
> +                s->data.restart_lsn = InvalidXLogRecPtr;
> +            }
>              /* Let caller know */
>              *invalidated = true;
>          }
> @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>               */
>              if (last_signaled_pid != active_pid)
>              {
> -                ereport(LOG,
> -                        errmsg("terminating process %d to release replication slot \"%s\"",
> -                               active_pid, NameStr(slotname)),
> -                        errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> -                                  LSN_FORMAT_ARGS(restart_lsn),
> -                                  (unsigned long long) (oldestLSN - restart_lsn)),
> -                        errhint("You might need to increase max_slot_wal_keep_size."));
> +                if (xid)
> +                {
> +                    if (TransactionIdIsValid(*xid))
> +                    {
> +                        ereport(LOG,
> +                                errmsg("terminating process %d because replication slot \"%s\" conflicts with
recovery",
> +                                       active_pid, NameStr(slotname)),
> +                                errdetail("The slot conflicted with xid horizon %u.",
> +                                          *xid));
> +                    }
> +                    else
> +                    {
> +                        ereport(LOG,
> +                                errmsg("terminating process %d because replication slot \"%s\" conflicts with
recovery",
> +                                       active_pid, NameStr(slotname)),
> +                                errdetail("Logical decoding on standby requires wal_level to be at least logical on
theprimary server"));
 
> +                    }
> +
> +                    (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
> +                }
> +                else
> +                {
> +                    ereport(LOG,
> +                            errmsg("terminating process %d to release replication slot \"%s\"",
> +                                   active_pid, NameStr(slotname)),
> +                            errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> +                                      LSN_FORMAT_ARGS(restart_lsn),
> +                                      (unsigned long long) (oldestLSN - restart_lsn)),
> +                            errhint("You might need to increase max_slot_wal_keep_size."));
> +
> +                    (void) kill(active_pid, SIGTERM);

I think it ought be possible to deduplicate this a fair bit. For one, two of
the errmsg()s above are identical.  But I think this could be consolidated
further, e.g. by using the same message style for the three cases, and passing
in a separately translated reason for the termination?


> +                }
>  
> -                (void) kill(active_pid, SIGTERM);
>                  last_signaled_pid = active_pid;
>              }
>  
> @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>              ReplicationSlotSave();
>              ReplicationSlotRelease();
>  
> -            ereport(LOG,
> -                    errmsg("invalidating obsolete replication slot \"%s\"",
> -                           NameStr(slotname)),
> -                    errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> -                              LSN_FORMAT_ARGS(restart_lsn),
> -                              (unsigned long long) (oldestLSN - restart_lsn)),
> -                    errhint("You might need to increase max_slot_wal_keep_size."));
> +            if (xid)
> +            {
> +                pgstat_drop_replslot(s);

Why is this done here now?


> +                if (TransactionIdIsValid(*xid))
> +                {
> +                    ereport(LOG,
> +                            errmsg("invalidating slot \"%s\" because it conflicts with recovery",
NameStr(slotname)),
> +                            errdetail("The slot conflicted with xid horizon %u.", *xid));
> +                }
> +                else
> +                {
> +                    ereport(LOG,
> +                            errmsg("invalidating slot \"%s\" because it conflicts with recovery",
NameStr(slotname)),
> +                            errdetail("Logical decoding on standby requires wal_level to be at least logical on the
primaryserver"));
 
> +                }
> +            }
> +            else
> +            {
> +                ereport(LOG,
> +                        errmsg("invalidating obsolete replication slot \"%s\"",
> +                               NameStr(slotname)),
> +                        errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> +                                  LSN_FORMAT_ARGS(restart_lsn),
> +                                  (unsigned long long) (oldestLSN - restart_lsn)),
> +                        errhint("You might need to increase max_slot_wal_keep_size."));
> +            }
>

I don't like all these repeated elogs...



> @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>              case PROCSIG_RECOVERY_CONFLICT_LOCK:
>              case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
>              case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
> +            case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
> +
> +                /*
> +                 * For conflicts that require a logical slot to be
> +                 * invalidated, the requirement is for the signal receiver to
> +                 * release the slot, so that it could be invalidated by the
> +                 * signal sender. So for normal backends, the transaction
> +                 * should be aborted, just like for other recovery conflicts.
> +                 * But if it's walsender on standby, we don't want to go
> +                 * through the following IsTransactionOrTransactionBlock()
> +                 * check, so break here.
> +                 */
> +                if (am_cascading_walsender &&
> +                    reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
> +                    MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
> +                {
> +                    RecoveryConflictPending = true;
> +                    QueryCancelPending = true;
> +                    InterruptPending = true;
> +                    break;
> +                }
>  
>                  /*
>                   * If we aren't in a transaction any longer then ignore.

I can't see any reason for this to be mixed into the same case "body" as LOCK
etc?


> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index 38c6f18886..290d4b45f4 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -51,6 +51,7 @@
>  #include "storage/proc.h"
>  #include "storage/procarray.h"
>  #include "utils/builtins.h"
> +#include "access/xlogrecovery.h"

Add new includes in the "alphabetically" right place...



Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Daniel Gustafsson
Date:
Subject: Re: [EXTERNAL] Support load balancing in libpq
Next
From: Julien Rouhaud
Date:
Subject: Re: [POC] Allow an extension to add data into Query and PlannedStmt nodes