Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers
From | Melanie Plageman |
---|---|
Subject | Re: Minimal logical decoding on standbys |
Date | |
Msg-id | 20230407223204.vj5t6ligooo6xa6e@liskov Whole thread Raw |
In response to | Re: Minimal logical decoding on standbys (Andres Freund <andres@anarazel.de>) |
Responses |
Re: Minimal logical decoding on standbys
|
List | pgsql-hackers |
Code review only of 0001-0005. I noticed you had two 0008, btw. On Fri, Apr 07, 2023 at 11:12:26AM -0700, Andres Freund wrote: > Hi, > > On 2023-04-07 08:47:57 -0700, Andres Freund wrote: > > Integrated all of these. > > From 0e038eb5dfddec500fbf4625775d1fa508a208f6 Mon Sep 17 00:00:00 2001 > From: Andres Freund <andres@anarazel.de> > Date: Thu, 6 Apr 2023 20:00:07 -0700 > Subject: [PATCH va67 1/9] Replace a replication slot's invalidated_at LSN with > an enum > > diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h > index 8872c80cdfe..ebcb637baed 100644 > --- a/src/include/replication/slot.h > +++ b/src/include/replication/slot.h > @@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency > RS_TEMPORARY > } ReplicationSlotPersistency; > > +/* > + * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the > + * 'invalidated' field is set to a value other than _NONE. > + */ > +typedef enum ReplicationSlotInvalidationCause > +{ > + RS_INVAL_NONE, > + /* required WAL has been removed */ I just wonder if RS_INVAL_WAL is too generic. Something like RS_INVAL_WAL_MISSING or similar may be better since it seems there are other inavlidation causes that may be related to WAL. > + RS_INVAL_WAL, > +} ReplicationSlotInvalidationCause; > + 0002 LGTM > From 52c25cc15abc4470d19e305d245b9362e6b8d6a3 Mon Sep 17 00:00:00 2001 > From: Andres Freund <andres@anarazel.de> > Date: Fri, 7 Apr 2023 09:32:48 -0700 > Subject: [PATCH va67 3/9] Support invalidating replication slots due to > horizon and wal_level > MIME-Version: 1.0 > Content-Type: text/plain; charset=UTF-8 > Content-Transfer-Encoding: 8bit > > Needed for supporting logical decoding on a standby. The new invalidation > methods will be used in a subsequent commit. > You probably are aware, but applying 0003 and 0004 both gives me two warnings: warning: 1 line adds whitespace errors. Warning: commit message did not conform to UTF-8. You may want to amend it after fixing the message, or set the config variable i18n.commitEncoding to the encoding your project uses. > diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c > index df23b7ed31e..c2a9accebf6 100644 > --- a/src/backend/replication/slot.c > +++ b/src/backend/replication/slot.c > @@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void) > } > > /* > - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot > - * and mark it invalid, if necessary and possible. > + * Report that replication slot needs to be invalidated > + */ > +static void > +ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, > + bool terminating, > + int pid, > + NameData slotname, > + XLogRecPtr restart_lsn, > + XLogRecPtr oldestLSN, > + TransactionId snapshotConflictHorizon) > +{ > + StringInfoData err_detail; > + bool hint = false; > + > + initStringInfo(&err_detail); > + > + switch (cause) > + { > + case RS_INVAL_WAL: > + hint = true; > + appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."), > + LSN_FORMAT_ARGS(restart_lsn), I'm not sure what the below cast is meant to do. If you are trying to protect against overflow/underflow, I think you'd need to cast before doing the subtraction. > + (unsigned long long) (oldestLSN - restart_lsn)); > + break; > + case RS_INVAL_HORIZON: > + appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."), > + snapshotConflictHorizon); > + break; > + > + case RS_INVAL_WAL_LEVEL: > + appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical onthe primary server")); > + break; > + case RS_INVAL_NONE: > + pg_unreachable(); > + } This ereport is quite hard to read. Is there any simplification you can do of the ternaries without undue duplication? > + ereport(LOG, > + terminating ? > + errmsg("terminating process %d to release replication slot \"%s\"", > + pid, NameStr(slotname)) : > + errmsg("invalidating obsolete replication slot \"%s\"", > + NameStr(slotname)), > + errdetail_internal("%s", err_detail.data), > + hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0); > + > + pfree(err_detail.data); > +} > + > +/* > + * Helper for InvalidateObsoleteReplicationSlots > + * > + * Acquires the given slot and mark it invalid, if necessary and possible. > * > * Returns whether ReplicationSlotControlLock was released in the interim (and > * in that case we're not holding the lock at return, otherwise we are). > @@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void) > * for syscalls, so caller must restart if we return true. > */ > static bool > -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > +InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, > + ReplicationSlot *s, > + XLogRecPtr oldestLSN, > + Oid dboid, TransactionId snapshotConflictHorizon, > bool *invalidated) > { > int last_signaled_pid = 0; > @@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > XLogRecPtr restart_lsn; > NameData slotname; > int active_pid = 0; > + ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE; > > Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); > > @@ -1286,10 +1340,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > restart_lsn = s->data.restart_lsn; > > /* > - * If the slot is already invalid or is fresh enough, we don't need to > - * do anything. > + * If the slot is already invalid or is a non conflicting slot, we > + * don't need to do anything. > */ > - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) > + if (s->data.invalidated == RS_INVAL_NONE) > + { > + switch (cause) > + { > + case RS_INVAL_WAL: > + if (s->data.restart_lsn != InvalidXLogRecPtr && > + s->data.restart_lsn < oldestLSN) > + conflict = cause; > + break; Should the below be an error? a physical slot with RS_INVAL_HORIZON invalidation cause? > + case RS_INVAL_HORIZON: > + if (!SlotIsLogical(s)) > + break; > + /* invalid DB oid signals a shared relation */ > + if (dboid != InvalidOid && dboid != s->data.database) > + break; > + if (TransactionIdIsValid(s->effective_xmin) && > + TransactionIdPrecedesOrEquals(s->effective_xmin, > + snapshotConflictHorizon)) > + conflict = cause; > + else if (TransactionIdIsValid(s->effective_catalog_xmin) && > + TransactionIdPrecedesOrEquals(s->effective_catalog_xmin, > + snapshotConflictHorizon)) > + conflict = cause; > + break; > + case RS_INVAL_WAL_LEVEL: > + if (SlotIsLogical(s)) > + conflict = cause; > + break; All three of default, pg_unreachable(), and break seems a bit like overkill. Perhaps remove the break? > + default: > + pg_unreachable(); > + break; > + } > + } > + > @@ -1390,14 +1476,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > ReplicationSlotMarkDirty(); > ReplicationSlotSave(); > ReplicationSlotRelease(); > + pgstat_drop_replslot(s); > > - ereport(LOG, > - errmsg("invalidating obsolete replication slot \"%s\"", > - NameStr(slotname)), > - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", > - LSN_FORMAT_ARGS(restart_lsn), > - (unsigned long long) (oldestLSN - restart_lsn)), > - errhint("You might need to increase max_slot_wal_keep_size.")); > + ReportSlotInvalidation(conflict, false, active_pid, > + slotname, restart_lsn, > + oldestLSN, snapshotConflictHorizon); > > /* done with this slot for now */ > break; > @@ -1410,19 +1493,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > } > > /* > - * Mark any slot that points to an LSN older than the given segment > - * as invalid; it requires WAL that's about to be removed. > + * Invalidate slots that require resources about to be removed. > * > * Returns true when any slot have got invalidated. > * > + * Whether a slot needs to be invalidated depends on the cause. A slot is > + * removed if it: > + * - RS_INVAL_WAL: requires a LSN older than the given segment > + * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon, in the given db > + dboid may be InvalidOid for shared relations the comma above reduces readability is this what you mean? RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given db; dboid may be InvalidOid for shared relations > From 311a1d8f9c2d1acf0c22e091d53f7a533073c8b7 Mon Sep 17 00:00:00 2001 > From: Andres Freund <andres@anarazel.de> > Date: Fri, 7 Apr 2023 09:56:02 -0700 > Subject: [PATCH va67 4/9] Handle logical slot conflicts on standby > MIME-Version: 1.0 > Content-Type: text/plain; charset=UTF-8 > Content-Transfer-Encoding: 8bit > > During WAL replay on standby, when slot conflict is identified, invalidate > such slots. Also do the same thing if wal_level on the primary server is > reduced to below logical and there are existing logical slots on > standby. Introduce a new ProcSignalReason value for slot conflict recovery. > > Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> > Author: Andres Freund <andres@anarazel.de> > Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) > Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> > Reviewed-by: Andres Freund <andres@anarazel.de> > Reviewed-by: Robert Haas <robertmhaas@gmail.com> > Reviewed-by: Fabr�zio de Royes Mello <fabriziomello@gmail.com> > Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> > Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> > Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> > Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de > --- > src/include/storage/procsignal.h | 1 + > src/include/storage/standby.h | 2 ++ > src/backend/access/gist/gistxlog.c | 2 ++ > src/backend/access/hash/hash_xlog.c | 1 + > src/backend/access/heap/heapam.c | 3 +++ > src/backend/access/nbtree/nbtxlog.c | 2 ++ > src/backend/access/spgist/spgxlog.c | 1 + > src/backend/access/transam/xlog.c | 15 +++++++++++++++ > src/backend/replication/slot.c | 8 +++++++- > src/backend/storage/ipc/procsignal.c | 3 +++ > src/backend/storage/ipc/standby.c | 20 +++++++++++++++++++- > src/backend/tcop/postgres.c | 9 +++++++++ > 12 files changed, 65 insertions(+), 2 deletions(-) > > diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h > index 905af2231ba..2f52100b009 100644 > --- a/src/include/storage/procsignal.h > +++ b/src/include/storage/procsignal.h > @@ -42,6 +42,7 @@ typedef enum > PROCSIG_RECOVERY_CONFLICT_TABLESPACE, > PROCSIG_RECOVERY_CONFLICT_LOCK, > PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, > + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, > PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, > PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, > > diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h > index 2effdea126f..41f4dc372e6 100644 > --- a/src/include/storage/standby.h > +++ b/src/include/storage/standby.h > @@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void); > extern void ShutdownRecoveryTransactionEnvironment(void); > > extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, > + bool isCatalogRel, > RelFileLocator locator); > extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, > + bool isCatalogRel, > RelFileLocator locator); > extern void ResolveRecoveryConflictWithTablespace(Oid tsid); > extern void ResolveRecoveryConflictWithDatabase(Oid dbid); > diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c > index b7678f3c144..9a86fb3feff 100644 > --- a/src/backend/access/gist/gistxlog.c > +++ b/src/backend/access/gist/gistxlog.c > @@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record) > XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); > > ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, > + xldata->isCatalogRel, > rlocator); > } > > @@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record) > */ > if (InHotStandby) > ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, > + xlrec->isCatalogRel, > xlrec->locator); > } > > diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c > index f2dd9be8d3f..e8e06c62a95 100644 > --- a/src/backend/access/hash/hash_xlog.c > +++ b/src/backend/access/hash/hash_xlog.c > @@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) > > XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); > ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, > + xldata->isCatalogRel, > rlocator); > } > > diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c > index 8b13e3f8925..f389ceee1ea 100644 > --- a/src/backend/access/heap/heapam.c > +++ b/src/backend/access/heap/heapam.c > @@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record) > */ > if (InHotStandby) > ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, > + xlrec->isCatalogRel, > rlocator); > > /* > @@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record) > */ > if (InHotStandby) > ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, > + xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL, > rlocator); > > /* > @@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record) > > XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); > ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, > + xlrec->isCatalogRel, > rlocator); > } > > diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c > index 414ca4f6deb..c87e46ed66e 100644 > --- a/src/backend/access/nbtree/nbtxlog.c > +++ b/src/backend/access/nbtree/nbtxlog.c > @@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record) > XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); > > ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, > + xlrec->isCatalogRel, > rlocator); > } > > @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record) > > if (InHotStandby) > ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, > + xlrec->isCatalogRel, > xlrec->locator); > } > > diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c > index b071b59c8ac..459ac929ba5 100644 > --- a/src/backend/access/spgist/spgxlog.c > +++ b/src/backend/access/spgist/spgxlog.c > @@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) > > XLogRecGetBlockTag(record, 0, &locator, NULL, NULL); > ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, > + xldata->isCatalogRel, > locator); > } > > diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c > index 1485e8f9ca9..5227fc675c8 100644 > --- a/src/backend/access/transam/xlog.c > +++ b/src/backend/access/transam/xlog.c > @@ -7965,6 +7965,21 @@ xlog_redo(XLogReaderState *record) > /* Update our copy of the parameters in pg_control */ > memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); > > + /* > + * Invalidate logical slots if we are in hot standby and the primary > + * does not have a WAL level sufficient for logical decoding. No need > + * to search for potentially conflicting logically slots if standby is > + * running with wal_level lower than logical, because in that case, we > + * would have either disallowed creation of logical slots or > + * invalidated existing ones. > + */ > + if (InRecovery && InHotStandby && > + xlrec.wal_level < WAL_LEVEL_LOGICAL && > + wal_level >= WAL_LEVEL_LOGICAL) > + InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, > + 0, InvalidOid, > + InvalidTransactionId); > + > LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); > ControlFile->MaxConnections = xlrec.MaxConnections; > ControlFile->max_worker_processes = xlrec.max_worker_processes; > diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c > index c2a9accebf6..1b1b51e21ed 100644 > --- a/src/backend/replication/slot.c > +++ b/src/backend/replication/slot.c > @@ -1443,7 +1443,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, > slotname, restart_lsn, > oldestLSN, snapshotConflictHorizon); > > - (void) kill(active_pid, SIGTERM); > + if (MyBackendType == B_STARTUP) Is SendProcSignal() marked warn_unused_result or something? I don't see other callers who don't use its return value void casting it. > + (void) SendProcSignal(active_pid, > + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, > + InvalidBackendId); > + else > + (void) kill(active_pid, SIGTERM); > + > last_signaled_pid = active_pid; > } > diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c > index 9f56b4e95cf..3b5d654347e 100644 > --- a/src/backend/storage/ipc/standby.c > +++ b/src/backend/storage/ipc/standby.c > @@ -24,6 +24,7 @@ > #include "access/xlogutils.h" > #include "miscadmin.h" > #include "pgstat.h" > +#include "replication/slot.h" > #include "storage/bufmgr.h" > #include "storage/lmgr.h" > #include "storage/proc.h" > @@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, > */ > void > ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, > + bool isCatalogRel, > RelFileLocator locator) > { > VirtualTransactionId *backends; > @@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, > PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, > WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, > true); > + > + /* > + * Note that WaitExceedsMaxStandbyDelay() is not taken into account here > + * (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That > + * seems OK, given that this kind of conflict should not normally be do you mean "when using a physical replication slot"? > + * reached, e.g. by using a physical replication slot. > + */ > + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) > + InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, > + snapshotConflictHorizon); > } 0005 LGTM - Melanie
pgsql-hackers by date: