Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers
From | Drouvot, Bertrand |
---|---|
Subject | Re: Minimal logical decoding on standbys |
Date | |
Msg-id | b51f01b6-7ff4-fc5d-8068-6d94f9558235@gmail.com Whole thread Raw |
In response to | Re: Minimal logical decoding on standbys (Andres Freund <andres@anarazel.de>) |
Responses |
Re: Minimal logical decoding on standbys
|
List | pgsql-hackers |
Hi, On 4/5/23 8:28 PM, Andres Freund wrote: > Hi, > > On 2023-04-05 17:56:14 +0200, Drouvot, Bertrand wrote: > >> @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record) >> /* Update our copy of the parameters in pg_control */ >> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); >> >> + /* >> + * Invalidate logical slots if we are in hot standby and the primary >> + * does not have a WAL level sufficient for logical decoding. No need >> + * to search for potentially conflicting logically slots if standby is >> + * running with wal_level lower than logical, because in that case, we >> + * would have either disallowed creation of logical slots or >> + * invalidated existing ones. >> + */ >> + if (InRecovery && InHotStandby && >> + xlrec.wal_level < WAL_LEVEL_LOGICAL && >> + wal_level >= WAL_LEVEL_LOGICAL) >> + { >> + TransactionId ConflictHorizon = InvalidTransactionId; >> + >> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, InvalidOid, &ConflictHorizon); >> + } > > I mentioned this before, Sorry, I probably missed it. > but I still don't understand why > InvalidateObsoleteReplicationSlots() accepts ConflictHorizon as a > pointer. It's not even modified, as far as I can see? > The initial goal was to be able to check if xid pointer was NULL and also if *xid was a valid xid or not. So basically being able to do 3 checks with the same parameter. That's how we decided wether or not we are in the wal_level < logical on primary conflict case in ReportTerminationInvalidation(). I agree that passing a pointer is not the best approach (as there is a "risk" of modifying the value it points to), so adding an extra bool to InvalidateObsoleteReplicationSlots() in attached V62 instead. Also replacing the InvalidXLogRecPtr by 0 as it does sound odd to use "InvalidXLogRecPtr" naming for a XLogSegNo. > >> /* >> * Report shared-memory space needed by ReplicationSlotsShmemInit. >> */ >> @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) >> SpinLockAcquire(&s->mutex); >> effective_xmin = s->effective_xmin; >> effective_catalog_xmin = s->effective_catalog_xmin; >> - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && >> - XLogRecPtrIsInvalid(s->data.restart_lsn)); >> + invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s); >> SpinLockRelease(&s->mutex); > > I don't understand why we need to have two different functions for this. > LogicalReplicationSlotIsInvalid() has been created to provide a different error message than in ".....because it exceeded the maximum reserved size" in StartLogicalReplication() and "This slot has never previously reserved WAL" in pg_logical_slot_get_changes_guts(). So basically to distinguish with the max_slot_wal_keep_size related messages. > >> /* invalidated slots need not apply */ >> @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void) >> } >> } >> >> + >> +/* >> + * Report terminating or conflicting message. >> + * >> + * For both, logical conflict on standby and obsolete slot are handled. >> + */ >> +static void >> +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid, >> + NameData slotname, TransactionId *xid, >> + XLogRecPtr restart_lsn, XLogRecPtr oldestLSN) >> +{ >> + StringInfoData err_msg; >> + StringInfoData err_detail; >> + bool hint = false; >> + >> + initStringInfo(&err_detail); >> + >> + if (check_on_xid) >> + { >> + if (!terminating) >> + { >> + initStringInfo(&err_msg); >> + appendStringInfo(&err_msg, _("invalidating replication slot \"%s\" because it conflicts with recovery"), >> + NameStr(slotname)); > > I still don't think the main error message should differ between invalidating > a slot due recovery and max_slot_wal_keep_size. Okay. I gave a second thought and I agree that "obsolete" does also make sense for the xid conflict case. So, done that way in V62. > >> + >> /* >> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot >> - * and mark it invalid, if necessary and possible. >> + * Helper for InvalidateObsoleteReplicationSlots >> + * >> + * Acquires the given slot and mark it invalid, if necessary and possible. >> * >> * Returns whether ReplicationSlotControlLock was released in the interim (and >> * in that case we're not holding the lock at return, otherwise we are). >> * >> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) >> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.) > > What's the point of making this specific to "obsolete slots"? There is no. Should be coming from a previous version/experiment. Removed in V62, thanks! > > >> * This is inherently racy, because we release the LWLock >> * for syscalls, so caller must restart if we return true. >> */ >> static bool >> InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> - bool *invalidated) >> + bool *invalidated, TransactionId *xid) >> { >> int last_signaled_pid = 0; >> bool released_lock = false; >> + bool check_on_xid; >> + >> + check_on_xid = xid ? true : false; >> >> for (;;) >> { >> XLogRecPtr restart_lsn; >> + >> NameData slotname; >> int active_pid = 0; >> >> @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> * Check if the slot needs to be invalidated. If it needs to be >> * invalidated, and is not currently acquired, acquire it and mark it >> * as having been invalidated. We do this with the spinlock held to >> - * avoid race conditions -- for example the restart_lsn could move >> - * forward, or the slot could be dropped. >> + * avoid race conditions -- for example the restart_lsn (or the >> + * xmin(s) could) move forward or the slot could be dropped. >> */ >> SpinLockAcquire(&s->mutex); >> >> restart_lsn = s->data.restart_lsn; >> >> /* >> - * If the slot is already invalid or is fresh enough, we don't need to >> - * do anything. >> + * If the slot is already invalid or is a non conflicting slot, we >> + * don't need to do anything. >> */ >> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) >> + if (DoNotInvalidateSlot(s, xid, &oldestLSN)) > > DoNotInvalidateSlot() seems odd to me, and makes the code harder to > understand. I'd make it something like: > > if (!SlotIsInvalid(s) && ( > LogicalSlotConflictsWith(s, xid) || > SlotConflictsWithLSN(s, lsn))) > I think that's a matter of taste (having a single function was suggested by Amit up-thread). I think I prefer having one single function as it seems to me easier to understand if we want to check on xid or not. > >> /* >> - * Mark any slot that points to an LSN older than the given segment >> - * as invalid; it requires WAL that's about to be removed. >> + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots. > > I don't like that this spreads "obsolete slots" around further - it's very > unspecific. A logical slot that needs to be removed due to an xid conflict is > just as obsolete as one that needs to be removed due to max_slot_wal_keep_size. > > I'd rephrase this to be about required resources getting removed or such, one > case of that is WAL another case is xids. > Agree. Re-worded in V62. >> restart: >> LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); >> @@ -1414,21 +1505,35 @@ restart: >> if (!s->in_use) >> continue; >> >> - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) >> + if (xid) >> { >> - /* if the lock was released, start from scratch */ >> - goto restart; >> + /* we are only dealing with *logical* slot conflicts */ >> + if (!SlotIsLogical(s)) >> + continue; >> + >> + /* >> + * not the database of interest and we don't want all the >> + * database, skip >> + */ >> + if (s->data.database != dboid && TransactionIdIsValid(*xid)) >> + continue; > > ISTM that this should be in InvalidatePossiblyObsoleteSlot(). > Agree, done in V62. > >> /* >> - * If any slots have been invalidated, recalculate the resource limits. >> + * If any slots have been invalidated, recalculate the required xmin and >> + * the required lsn (if appropriate). >> */ >> if (invalidated) >> { >> ReplicationSlotsComputeRequiredXmin(false); >> - ReplicationSlotsComputeRequiredLSN(); >> + if (!xid) >> + ReplicationSlotsComputeRequiredLSN(); >> } > > Why make this conditional? If we invalidated a logical slot, we also don't > require as much WAL anymore, no? > Agree, done in V62. > >> @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, >> PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, >> WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, >> true); >> + >> + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) >> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, locator.dbOid, &snapshotConflictHorizon); >> } > > Hm. Is there a reason for doing this before resolving conflicts with existing > sessions? > Do you mean, you'd prefer to InvalidateObsoleteReplicationSlots() before ResolveRecoveryConflictWithVirtualXIDs()? > > Another issue: ResolveRecoveryConflictWithVirtualXIDs() takes > WaitExceedsMaxStandbyDelay() into account, but > InvalidateObsoleteReplicationSlots() does not. humm, good point. > I think that's ok, because the > setup should prevent this case from being reached in normal paths, but at > least there should be a comment documenting this. > I started to add the comment InvalidateObsoleteReplicationSlots() but I'm not sure what you mean by "the setup should prevent this case from being reached in normal paths" (so I let "XXXX" in the comment for now). Did you mean hsf and a physical slot between the primary and the standby should be in place? Could you please elaborate? > > >> +static inline bool >> +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid) >> +{ >> + TransactionId slot_effective_xmin; >> + TransactionId slot_catalog_xmin; >> + >> + slot_effective_xmin = s->effective_xmin; >> + slot_catalog_xmin = s->data.catalog_xmin; >> + >> + return (((TransactionIdIsValid(slot_effective_xmin) && TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) || >> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)))); >> +} > > return -ETOOMANYPARENS > gave it a try to make it better in V62. > >> +static inline bool >> +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN) >> +{ >> + return (s->data.restart_lsn >= oldestLSN); >> +} >> + >> +static inline bool >> +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid) >> +{ >> + return (TransactionIdIsValid(*xid) && !LogicalReplicationSlotXidsConflict(s, *xid)); >> +} >> + >> +static inline bool >> +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr *oldestLSN) >> +{ >> + if (xid) >> + return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid)); >> + else >> + return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, *oldestLSN)); >> + >> +} > > See above for some more comments. But please don't accept stuff via pointer if > you don't have a reason for it. There's no reason for it for xid and oldestLSN > afaict. Agree that there is no reason for oldestLSN. Changing in V62. As far the xid, I explained why I used a pointer above but find a way to remove the need in V62 (as explained above). > > >> diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c >> index dbe9394762..186e4ef600 100644 >> --- a/src/backend/access/transam/xlogrecovery.c >> +++ b/src/backend/access/transam/xlogrecovery.c >> @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl >> XLogRecoveryCtl->lastReplayedTLI = *replayTLI; >> SpinLockRelease(&XLogRecoveryCtl->info_lck); >> >> + /* >> + * Wakeup walsenders: >> + * >> + * On the standby, the WAL is flushed first (which will only wake up >> + * physical walsenders) and then applied, which will only wake up logical >> + * walsenders. >> + * Indeed, logical walsenders on standby can't decode and send data until >> + * it's been applied. >> + * >> + * Physical walsenders don't need to be waked up during replay unless > > s/waked/woken/ Thans, fixed. >> + * cascading replication is allowed and time line change occured (so that >> + * they can notice that they are on a new time line). >> + * >> + * That's why the wake up conditions are for: >> + * >> + * - physical walsenders in case of new time line and cascade >> + * replication is allowed. >> + * - logical walsenders in case of new time line or recovery is in progress >> + * (logical decoding on standby). >> + */ >> + WalSndWakeup(switchedTLI && AllowCascadeReplication(), >> + switchedTLI || RecoveryInProgress()); > > I don't think it's possible to get here without RecoveryInProgress() being > true. So we don't need that condition. Right, so using "true" instead as we don't want to rely only on a time line change for a logical walsender. > > >> @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) >> /* Signal the startup process and walsender that new WAL has arrived */ >> WakeupRecovery(); >> if (AllowCascadeReplication()) >> - WalSndWakeup(); >> + WalSndWakeup(true, !RecoveryInProgress()); > > Same comment as earlier. done. > > >> /* Report XLOG streaming progress in PS display */ >> if (update_process_title) >> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c >> index 2d908d1de2..5c68ebb79e 100644 >> --- a/src/backend/replication/walsender.c >> +++ b/src/backend/replication/walsender.c >> @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void) >> walsnd->sync_standby_priority = 0; >> walsnd->latch = &MyProc->procLatch; >> walsnd->replyTime = 0; >> + >> + /* >> + * The kind assignment is done here and not in StartReplication() >> + * and StartLogicalReplication(). Indeed, the logical walsender >> + * needs to read WAL records (like snapshot of running >> + * transactions) during the slot creation. So it needs to be woken >> + * up based on its kind. >> + * >> + * The kind assignment could also be done in StartReplication(), >> + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it >> + * seems better to set it on one place. >> + */ > > Doesn't that mean we'll wake up logical walsenders even if they're doing > normal query processing? > I'm not following what you mean here. > >> + if (MyDatabaseId == InvalidOid) >> + walsnd->kind = REPLICATION_KIND_PHYSICAL; >> + else >> + walsnd->kind = REPLICATION_KIND_LOGICAL; >> + >> SpinLockRelease(&walsnd->mutex); >> /* don't need the lock anymore */ >> MyWalSnd = (WalSnd *) walsnd; >> @@ -3310,30 +3327,39 @@ WalSndShmemInit(void) >> } >> >> /* >> - * Wake up all walsenders >> + * Wake up physical, logical or both walsenders kind >> + * >> + * The distinction between physical and logical walsenders is done, because: >> + * - physical walsenders can't send data until it's been flushed >> + * - logical walsenders on standby can't decode and send data until it's been >> + * applied >> + * >> + * For cascading replication we need to wake up physical >> + * walsenders separately from logical walsenders (see the comment before calling >> + * WalSndWakeup() in ApplyWalRecord() for more details). >> * >> * This will be called inside critical sections, so throwing an error is not >> * advisable. >> */ >> void >> -WalSndWakeup(void) >> +WalSndWakeup(bool physical, bool logical) >> { >> int i; >> >> for (i = 0; i < max_wal_senders; i++) >> { >> Latch *latch; >> + ReplicationKind kind; >> WalSnd *walsnd = &WalSndCtl->walsnds[i]; >> >> - /* >> - * Get latch pointer with spinlock held, for the unlikely case that >> - * pointer reads aren't atomic (as they're 8 bytes). >> - */ >> + /* get latch pointer and kind with spinlock helds */ >> SpinLockAcquire(&walsnd->mutex); >> latch = walsnd->latch; >> + kind = walsnd->kind; >> SpinLockRelease(&walsnd->mutex); >> >> - if (latch != NULL) >> + if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) || >> + (logical && kind == REPLICATION_KIND_LOGICAL))) >> SetLatch(latch); >> } >> } > > I'd consider rewriting this to something like: > > if (latch == NULL) > continue; > > if ((physical && kind == REPLICATION_KIND_PHYSICAL)) || > (logical && kind == REPLICATION_KIND_LOGICAL) > SetLatch(latch) > Yeah better, done. Regards, -- Bertrand Drouvot PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
Attachment
- v62-0006-Doc-changes-describing-details-about-logical-dec.patch
- v62-0005-New-TAP-test-for-logical-decoding-on-standby.patch
- v62-0004-For-cascading-replication-wake-up-physical-walse.patch
- v62-0003-Allow-logical-decoding-on-standby.patch
- v62-0002-Arrange-for-a-new-pg_stat_database_conflicts-and.patch
- v62-0001-Handle-logical-slot-conflicts-on-standby.patch
pgsql-hackers by date: