Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Melanie Plageman
Subject Re: Minimal logical decoding on standbys
Date
Msg-id 20230407223204.vj5t6ligooo6xa6e@liskov
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  (Andres Freund <andres@anarazel.de>)
Responses Re: Minimal logical decoding on standbys  (Andres Freund <andres@anarazel.de>)
List pgsql-hackers
Code review only of 0001-0005.

I noticed you had two 0008, btw.

On Fri, Apr 07, 2023 at 11:12:26AM -0700, Andres Freund wrote:
> Hi,
> 
> On 2023-04-07 08:47:57 -0700, Andres Freund wrote:
> > Integrated all of these.
> 
> From 0e038eb5dfddec500fbf4625775d1fa508a208f6 Mon Sep 17 00:00:00 2001
> From: Andres Freund <andres@anarazel.de>
> Date: Thu, 6 Apr 2023 20:00:07 -0700
> Subject: [PATCH va67 1/9] Replace a replication slot's invalidated_at LSN with
>  an enum
> 
> diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
> index 8872c80cdfe..ebcb637baed 100644
> --- a/src/include/replication/slot.h
> +++ b/src/include/replication/slot.h
> @@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
>      RS_TEMPORARY
>  } ReplicationSlotPersistency;
>  
> +/*
> + * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
> + * 'invalidated' field is set to a value other than _NONE.
> + */
> +typedef enum ReplicationSlotInvalidationCause
> +{
> +    RS_INVAL_NONE,
> +    /* required WAL has been removed */

I just wonder if RS_INVAL_WAL is too generic. Something like
RS_INVAL_WAL_MISSING or similar may be better since it seems there are
other inavlidation causes that may be related to WAL.

> +    RS_INVAL_WAL,
> +} ReplicationSlotInvalidationCause;
> +

0002 LGTM

> From 52c25cc15abc4470d19e305d245b9362e6b8d6a3 Mon Sep 17 00:00:00 2001
> From: Andres Freund <andres@anarazel.de>
> Date: Fri, 7 Apr 2023 09:32:48 -0700
> Subject: [PATCH va67 3/9] Support invalidating replication slots due to
>  horizon and wal_level
> MIME-Version: 1.0
> Content-Type: text/plain; charset=UTF-8
> Content-Transfer-Encoding: 8bit
> 
> Needed for supporting logical decoding on a standby. The new invalidation
> methods will be used in a subsequent commit.
> 

You probably are aware, but applying 0003 and 0004 both gives me two
warnings:

warning: 1 line adds whitespace errors.
Warning: commit message did not conform to UTF-8.
You may want to amend it after fixing the message, or set the config
variable i18n.commitEncoding to the encoding your project uses.

> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index df23b7ed31e..c2a9accebf6 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
>  }
>  
>  /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Report that replication slot needs to be invalidated
> + */
> +static void
> +ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
> +                       bool terminating,
> +                       int pid,
> +                       NameData slotname,
> +                       XLogRecPtr restart_lsn,
> +                       XLogRecPtr oldestLSN,
> +                       TransactionId snapshotConflictHorizon)
> +{
> +    StringInfoData err_detail;
> +    bool        hint = false;
> +
> +    initStringInfo(&err_detail);
> +
> +    switch (cause)
> +    {
> +        case RS_INVAL_WAL:
> +            hint = true;
> +            appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
> +                             LSN_FORMAT_ARGS(restart_lsn),

I'm not sure what the below cast is meant to do. If you are trying to
protect against overflow/underflow, I think you'd need to cast before
doing the subtraction.

> +                             (unsigned long long) (oldestLSN - restart_lsn));
> +            break;
> +        case RS_INVAL_HORIZON:
> +            appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
> +                             snapshotConflictHorizon);
> +            break;
> +
> +        case RS_INVAL_WAL_LEVEL:
> +            appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical
onthe primary server"));
 
> +            break;
> +        case RS_INVAL_NONE:
> +            pg_unreachable();
> +    }

This ereport is quite hard to read. Is there any simplification you can
do of the ternaries without undue duplication?

> +    ereport(LOG,
> +            terminating ?
> +            errmsg("terminating process %d to release replication slot \"%s\"",
> +                   pid, NameStr(slotname)) :
> +            errmsg("invalidating obsolete replication slot \"%s\"",
> +                   NameStr(slotname)),
> +            errdetail_internal("%s", err_detail.data),
> +            hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
> +
> +    pfree(err_detail.data);
> +}
> +
> +/*
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
>   *
>   * Returns whether ReplicationSlotControlLock was released in the interim (and
>   * in that case we're not holding the lock at return, otherwise we are).
> @@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
>   * for syscalls, so caller must restart if we return true.
>   */
>  static bool
> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> +InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
> +                               ReplicationSlot *s,
> +                               XLogRecPtr oldestLSN,
> +                               Oid dboid, TransactionId snapshotConflictHorizon,
>                                 bool *invalidated)
>  {
>      int            last_signaled_pid = 0;
> @@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>          XLogRecPtr    restart_lsn;
>          NameData    slotname;
>          int            active_pid = 0;
> +        ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
>  
>          Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
>  
> @@ -1286,10 +1340,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>          restart_lsn = s->data.restart_lsn;
>  
>          /*
> -         * If the slot is already invalid or is fresh enough, we don't need to
> -         * do anything.
> +         * If the slot is already invalid or is a non conflicting slot, we
> +         * don't need to do anything.
>           */
> -        if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> +        if (s->data.invalidated == RS_INVAL_NONE)
> +        {
> +            switch (cause)
> +            {
> +                case RS_INVAL_WAL:
> +                    if (s->data.restart_lsn != InvalidXLogRecPtr &&
> +                        s->data.restart_lsn < oldestLSN)
> +                        conflict = cause;
> +                    break;

Should the below be an error? a physical slot with RS_INVAL_HORIZON
invalidation cause?

> +                case RS_INVAL_HORIZON:
> +                    if (!SlotIsLogical(s))
> +                        break;
> +                    /* invalid DB oid signals a shared relation */
> +                    if (dboid != InvalidOid && dboid != s->data.database)
> +                        break;
> +                    if (TransactionIdIsValid(s->effective_xmin) &&
> +                        TransactionIdPrecedesOrEquals(s->effective_xmin,
> +                                                      snapshotConflictHorizon))
> +                        conflict = cause;
> +                    else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
> +                             TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
> +                                                           snapshotConflictHorizon))
> +                        conflict = cause;
> +                    break;
> +                case RS_INVAL_WAL_LEVEL:
> +                    if (SlotIsLogical(s))
> +                        conflict = cause;
> +                    break;

All three of default, pg_unreachable(), and break seems a bit like
overkill. Perhaps remove the break?

> +                default:
> +                    pg_unreachable();
> +                    break;
> +            }
> +        }
> +

> @@ -1390,14 +1476,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>              ReplicationSlotMarkDirty();
>              ReplicationSlotSave();
>              ReplicationSlotRelease();
> +            pgstat_drop_replslot(s);
>  
> -            ereport(LOG,
> -                    errmsg("invalidating obsolete replication slot \"%s\"",
> -                           NameStr(slotname)),
> -                    errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> -                              LSN_FORMAT_ARGS(restart_lsn),
> -                              (unsigned long long) (oldestLSN - restart_lsn)),
> -                    errhint("You might need to increase max_slot_wal_keep_size."));
> +            ReportSlotInvalidation(conflict, false, active_pid,
> +                                   slotname, restart_lsn,
> +                                   oldestLSN, snapshotConflictHorizon);
>  
>              /* done with this slot for now */
>              break;
> @@ -1410,19 +1493,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>  }
>  
>  /*
> - * Mark any slot that points to an LSN older than the given segment
> - * as invalid; it requires WAL that's about to be removed.
> + * Invalidate slots that require resources about to be removed.
>   *
>   * Returns true when any slot have got invalidated.
>   *
> + * Whether a slot needs to be invalidated depends on the cause. A slot is
> + * removed if it:
> + * - RS_INVAL_WAL: requires a LSN older than the given segment
> + * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon, in the given db
> +     dboid may be InvalidOid for shared relations

the comma above reduces readability

is this what you mean?

RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
db; dboid may be InvalidOid for shared relations

> From 311a1d8f9c2d1acf0c22e091d53f7a533073c8b7 Mon Sep 17 00:00:00 2001
> From: Andres Freund <andres@anarazel.de>
> Date: Fri, 7 Apr 2023 09:56:02 -0700
> Subject: [PATCH va67 4/9] Handle logical slot conflicts on standby
> MIME-Version: 1.0
> Content-Type: text/plain; charset=UTF-8
> Content-Transfer-Encoding: 8bit
> 
> During WAL replay on standby, when slot conflict is identified, invalidate
> such slots. Also do the same thing if wal_level on the primary server is
> reduced to below logical and there are existing logical slots on
> standby. Introduce a new ProcSignalReason value for slot conflict recovery.
> 
> Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
> Author: Andres Freund <andres@anarazel.de>
> Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)
> Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
> Reviewed-by: Andres Freund <andres@anarazel.de>
> Reviewed-by: Robert Haas <robertmhaas@gmail.com>
> Reviewed-by: Fabr�zio de Royes Mello <fabriziomello@gmail.com>
> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
> Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
> ---
>  src/include/storage/procsignal.h     |  1 +
>  src/include/storage/standby.h        |  2 ++
>  src/backend/access/gist/gistxlog.c   |  2 ++
>  src/backend/access/hash/hash_xlog.c  |  1 +
>  src/backend/access/heap/heapam.c     |  3 +++
>  src/backend/access/nbtree/nbtxlog.c  |  2 ++
>  src/backend/access/spgist/spgxlog.c  |  1 +
>  src/backend/access/transam/xlog.c    | 15 +++++++++++++++
>  src/backend/replication/slot.c       |  8 +++++++-
>  src/backend/storage/ipc/procsignal.c |  3 +++
>  src/backend/storage/ipc/standby.c    | 20 +++++++++++++++++++-
>  src/backend/tcop/postgres.c          |  9 +++++++++
>  12 files changed, 65 insertions(+), 2 deletions(-)
> 
> diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
> index 905af2231ba..2f52100b009 100644
> --- a/src/include/storage/procsignal.h
> +++ b/src/include/storage/procsignal.h
> @@ -42,6 +42,7 @@ typedef enum
>      PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
>      PROCSIG_RECOVERY_CONFLICT_LOCK,
>      PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
> +    PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
>      PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
>      PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
>  
> diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
> index 2effdea126f..41f4dc372e6 100644
> --- a/src/include/storage/standby.h
> +++ b/src/include/storage/standby.h
> @@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
>  extern void ShutdownRecoveryTransactionEnvironment(void);
>  
>  extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> +                                                bool isCatalogRel,
>                                                  RelFileLocator locator);
>  extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
> +                                                       bool isCatalogRel,
>                                                         RelFileLocator locator);
>  extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
>  extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
> diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
> index b7678f3c144..9a86fb3feff 100644
> --- a/src/backend/access/gist/gistxlog.c
> +++ b/src/backend/access/gist/gistxlog.c
> @@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
>          XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>  
>          ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> +                                            xldata->isCatalogRel,
>                                              rlocator);
>      }
>  
> @@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
>       */
>      if (InHotStandby)
>          ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
> +                                                   xlrec->isCatalogRel,
>                                                     xlrec->locator);
>  }
>  
> diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
> index f2dd9be8d3f..e8e06c62a95 100644
> --- a/src/backend/access/hash/hash_xlog.c
> +++ b/src/backend/access/hash/hash_xlog.c
> @@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
>  
>          XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>          ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> +                                            xldata->isCatalogRel,
>                                              rlocator);
>      }
>  
> diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
> index 8b13e3f8925..f389ceee1ea 100644
> --- a/src/backend/access/heap/heapam.c
> +++ b/src/backend/access/heap/heapam.c
> @@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
>       */
>      if (InHotStandby)
>          ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                            xlrec->isCatalogRel,
>                                              rlocator);
>  
>      /*
> @@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
>       */
>      if (InHotStandby)
>          ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                            xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
>                                              rlocator);
>  
>      /*
> @@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
>  
>          XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>          ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                            xlrec->isCatalogRel,
>                                              rlocator);
>      }
>  
> diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
> index 414ca4f6deb..c87e46ed66e 100644
> --- a/src/backend/access/nbtree/nbtxlog.c
> +++ b/src/backend/access/nbtree/nbtxlog.c
> @@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
>          XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>  
>          ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                            xlrec->isCatalogRel,
>                                              rlocator);
>      }
>  
> @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
>  
>      if (InHotStandby)
>          ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
> +                                                   xlrec->isCatalogRel,
>                                                     xlrec->locator);
>  }
>  
> diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
> index b071b59c8ac..459ac929ba5 100644
> --- a/src/backend/access/spgist/spgxlog.c
> +++ b/src/backend/access/spgist/spgxlog.c
> @@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
>  
>          XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
>          ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> +                                            xldata->isCatalogRel,
>                                              locator);
>      }
>  
> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
> index 1485e8f9ca9..5227fc675c8 100644
> --- a/src/backend/access/transam/xlog.c
> +++ b/src/backend/access/transam/xlog.c
> @@ -7965,6 +7965,21 @@ xlog_redo(XLogReaderState *record)
>          /* Update our copy of the parameters in pg_control */
>          memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>  
> +        /*
> +         * Invalidate logical slots if we are in hot standby and the primary
> +         * does not have a WAL level sufficient for logical decoding. No need
> +         * to search for potentially conflicting logically slots if standby is
> +         * running with wal_level lower than logical, because in that case, we
> +         * would have either disallowed creation of logical slots or
> +         * invalidated existing ones.
> +         */
> +        if (InRecovery && InHotStandby &&
> +            xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +            wal_level >= WAL_LEVEL_LOGICAL)
> +            InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
> +                                               0, InvalidOid,
> +                                               InvalidTransactionId);
> +
>          LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
>          ControlFile->MaxConnections = xlrec.MaxConnections;
>          ControlFile->max_worker_processes = xlrec.max_worker_processes;
> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index c2a9accebf6..1b1b51e21ed 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -1443,7 +1443,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
>                                         slotname, restart_lsn,
>                                         oldestLSN, snapshotConflictHorizon);
>  
> -                (void) kill(active_pid, SIGTERM);
> +                if (MyBackendType == B_STARTUP)

Is SendProcSignal() marked warn_unused_result or something? I don't see
other callers who don't use its return value void casting it.

> +                    (void) SendProcSignal(active_pid,
> +                                          PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
> +                                          InvalidBackendId);
> +                else
> +                    (void) kill(active_pid, SIGTERM);
> +
>                  last_signaled_pid = active_pid;
>              }

> diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
> index 9f56b4e95cf..3b5d654347e 100644
> --- a/src/backend/storage/ipc/standby.c
> +++ b/src/backend/storage/ipc/standby.c
> @@ -24,6 +24,7 @@
>  #include "access/xlogutils.h"
>  #include "miscadmin.h"
>  #include "pgstat.h"
> +#include "replication/slot.h"
>  #include "storage/bufmgr.h"
>  #include "storage/lmgr.h"
>  #include "storage/proc.h"
> @@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
>   */
>  void
>  ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> +                                    bool isCatalogRel,
>                                      RelFileLocator locator)
>  {
>      VirtualTransactionId *backends;
> @@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
>                                             PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
>                                             WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
>                                             true);
> +
> +    /*
> +     * Note that WaitExceedsMaxStandbyDelay() is not taken into account here
> +     * (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That
> +     * seems OK, given that this kind of conflict should not normally be

do you mean "when using a physical replication slot"?

> +     * reached, e.g. by using a physical replication slot.
> +     */
> +    if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
> +        InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
> +                                           snapshotConflictHorizon);
>  }


0005 LGTM

- Melanie



pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Commitfest 2023-03 starting tomorrow!
Next
From: Tom Lane
Date:
Subject: Re: Making background psql nicer to use in tap tests