Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Drouvot, Bertrand
Subject Re: Minimal logical decoding on standbys
Date
Msg-id da7184cf-c7b9-c333-801e-0e7507a23ddf@gmail.com
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  (Andres Freund <andres@anarazel.de>)
Responses Re: Minimal logical decoding on standbys  (Andres Freund <andres@anarazel.de>)
List pgsql-hackers
Hi,

On 3/30/23 9:04 AM, Andres Freund wrote:
> Hi,
> 
> On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote:
>> Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical
>>   slot conflict handling.
> 
> This is a very nice commit message.

Thanks! Melanie and Robert did provide great feedback/input to help make it
as it is now.
  
> I think this commit is ready to go. Unless somebody thinks differently, I
> think I might push it tomorrow.

Great! Once done, I'll submit a new patch so that GlobalVisTestFor() can make
use of the heap relation in vacuumRedirectAndPlaceholder() (which will be possible
once 0001 is committed).

> 
>> Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby.
> 
> 
>> @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags)
>>        */
>>       XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
>>       KeepLogSeg(recptr, &_logSegNo);
>> -    if (InvalidateObsoleteReplicationSlots(_logSegNo))
>> +    InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL);
>> +    if (invalidated)
>>       {
>>           /*
>>            * Some slots have been invalidated; recalculate the old-segment
> 
> I don't really understand why you changed InvalidateObsoleteReplicationSlots
> to return void instead of bool, and then added an output boolean argument via
> a pointer?
> 
> 

I gave a second thought and it looks like I over complicated that part. I removed the
pointer parameter in V53 attached (and it now returns bool as before).

> 
>> @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record)
>>           /* Update our copy of the parameters in pg_control */
>>           memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>>   
>> +        /*
>> +         * Invalidate logical slots if we are in hot standby and the primary does not
>> +         * have a WAL level sufficient for logical decoding. No need to search
>> +         * for potentially conflicting logically slots if standby is running
>> +         * with wal_level lower than logical, because in that case, we would
>> +         * have either disallowed creation of logical slots or invalidated existing
>> +         * ones.
>> +         */
>> +        if (InRecovery && InHotStandby &&
>> +            xlrec.wal_level < WAL_LEVEL_LOGICAL &&
>> +            wal_level >= WAL_LEVEL_LOGICAL)
>> +        {
>> +            TransactionId ConflictHorizon = InvalidTransactionId;
>> +            InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, InvalidOid, &ConflictHorizon);
>> +        }
>> +
> 
> Are there races around changing wal_level?
> 

Humm, not that I can think of right now. Do you have one/some in mind?

> 
>> @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>>           SpinLockAcquire(&s->mutex);
>>           effective_xmin = s->effective_xmin;
>>           effective_catalog_xmin = s->effective_catalog_xmin;
>> -        invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
>> -                       XLogRecPtrIsInvalid(s->data.restart_lsn));
>> +        invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
>> +                        XLogRecPtrIsInvalid(s->data.restart_lsn))
>> +                       || (!TransactionIdIsValid(s->data.xmin) &&
>> +                           !TransactionIdIsValid(s->data.catalog_xmin)));
>>           SpinLockRelease(&s->mutex);
>>   
>>           /* invalidated slots need not apply */
> 
> I still would like a wrapper function to determine whether a slot has been
> invalidated. This This is too complicated to be repeated in other places.
> 
> 

Agree, so adding ObsoleteSlotIsInvalid() and SlotIsInvalid() in V53 attached.

ObsoleteSlotIsInvalid() could also be done in a dedicated patch outside this patch series, though.


>> @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void)
>>   }
>>   
>>   /*
>> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
>> - * and mark it invalid, if necessary and possible.
>> + * Helper for InvalidateObsoleteReplicationSlots
>> + *
>> + * Acquires the given slot and mark it invalid, if necessary and possible.
>>    *
>>    * Returns whether ReplicationSlotControlLock was released in the interim (and
>>    * in that case we're not holding the lock at return, otherwise we are).
>>    *
>> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
>> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)
>>    *
>>    * This is inherently racy, because we release the LWLock
>>    * for syscalls, so caller must restart if we return true.
>>    */
>>   static bool
>> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> -                               bool *invalidated)
>> +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> +                                                   bool *invalidated, TransactionId *xid)
> 
> This is too long a name. I'd probably just leave it at the old name.
> 
> 

Done in V53 attached.

> 
>> @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>            * Check if the slot needs to be invalidated. If it needs to be
>>            * invalidated, and is not currently acquired, acquire it and mark it
>>            * as having been invalidated.  We do this with the spinlock held to
>> -         * avoid race conditions -- for example the restart_lsn could move
>> -         * forward, or the slot could be dropped.
>> +         * avoid race conditions -- for example the restart_lsn (or the
>> +         * xmin(s) could) move forward or the slot could be dropped.
>>            */
>>           SpinLockAcquire(&s->mutex);
>>   
>>           restart_lsn = s->data.restart_lsn;
>> +        slot_xmin = s->data.xmin;
>> +        slot_catalog_xmin = s->data.catalog_xmin;
>> +
>> +        /* slot has been invalidated (logical decoding conflict case) */
>> +        if ((xid &&
>> +             ((LogicalReplicationSlotIsInvalid(s))
>> +              ||
>>   
> 
> Uh, huh?
> 
> That's very odd formatting.
> 
>>           /*
>> -         * If the slot is already invalid or is fresh enough, we don't need to
>> -         * do anything.
>> +         * We are not forcing for invalidation because the xid is valid and
>> +         * this is a non conflicting slot.
>>            */
>> -        if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
>> +              (TransactionIdIsValid(*xid) && !(
>> +                                               (TransactionIdIsValid(slot_xmin) &&
TransactionIdPrecedesOrEquals(slot_xmin,*xid))
 
>> +                                               ||
>> +                                               (TransactionIdIsValid(slot_catalog_xmin) &&
TransactionIdPrecedesOrEquals(slot_catalog_xmin,*xid))
 
>> +                                               ))
>> +              ))
>> +            ||
>> +        /* slot has been invalidated (obsolete LSN case) */
>> +            (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
>>           {
>>               SpinLockRelease(&s->mutex);
>>               if (released_lock)
> 
> 
> This needs some cleanup.

Added a new macro LogicalReplicationSlotXidsConflict() and reformatted a bit.
Also ran pgindent on it, hope it's cleaner now.

> 
> 
>> @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>           {
>>               MyReplicationSlot = s;
>>               s->active_pid = MyProcPid;
>> -            s->data.invalidated_at = restart_lsn;
>> -            s->data.restart_lsn = InvalidXLogRecPtr;
>> -
>> +            if (xid)
>> +            {
>> +                s->data.xmin = InvalidTransactionId;
>> +                s->data.catalog_xmin = InvalidTransactionId;
>> +            }
>> +            else
>> +            {
>> +                s->data.invalidated_at = restart_lsn;
>> +                s->data.restart_lsn = InvalidXLogRecPtr;
>> +            }
>>               /* Let caller know */
>>               *invalidated = true;
>>           }
>> @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>                */
>>               if (last_signaled_pid != active_pid)
>>               {
>> -                ereport(LOG,
>> -                        errmsg("terminating process %d to release replication slot \"%s\"",
>> -                               active_pid, NameStr(slotname)),
>> -                        errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> -                                  LSN_FORMAT_ARGS(restart_lsn),
>> -                                  (unsigned long long) (oldestLSN - restart_lsn)),
>> -                        errhint("You might need to increase max_slot_wal_keep_size."));
>> +                if (xid)
>> +                {
>> +                    if (TransactionIdIsValid(*xid))
>> +                    {
>> +                        ereport(LOG,
>> +                                errmsg("terminating process %d because replication slot \"%s\" conflicts with
recovery",
>> +                                       active_pid, NameStr(slotname)),
>> +                                errdetail("The slot conflicted with xid horizon %u.",
>> +                                          *xid));
>> +                    }
>> +                    else
>> +                    {
>> +                        ereport(LOG,
>> +                                errmsg("terminating process %d because replication slot \"%s\" conflicts with
recovery",
>> +                                       active_pid, NameStr(slotname)),
>> +                                errdetail("Logical decoding on standby requires wal_level to be at least logical on
theprimary server"));
 
>> +                    }
>> +
>> +                    (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
>> +                }
>> +                else
>> +                {
>> +                    ereport(LOG,
>> +                            errmsg("terminating process %d to release replication slot \"%s\"",
>> +                                   active_pid, NameStr(slotname)),
>> +                            errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> +                                      LSN_FORMAT_ARGS(restart_lsn),
>> +                                      (unsigned long long) (oldestLSN - restart_lsn)),
>> +                            errhint("You might need to increase max_slot_wal_keep_size."));
>> +
>> +                    (void) kill(active_pid, SIGTERM);
> 
> I think it ought be possible to deduplicate this a fair bit. For one, two of
> the errmsg()s above are identical.  But I think this could be consolidated
> further, e.g. by using the same message style for the three cases, and passing
> in a separately translated reason for the termination?
> 

deduplication done in V53 so that there is a single ereport() call.
I'm not sure the translation is fine the way I did it, please advice if that's not right.

> 
>> +                }
>>   
>> -                (void) kill(active_pid, SIGTERM);
>>                   last_signaled_pid = active_pid;
>>               }
>>   
>> @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>               ReplicationSlotSave();
>>               ReplicationSlotRelease();
>>   
>> -            ereport(LOG,
>> -                    errmsg("invalidating obsolete replication slot \"%s\"",
>> -                           NameStr(slotname)),
>> -                    errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> -                              LSN_FORMAT_ARGS(restart_lsn),
>> -                              (unsigned long long) (oldestLSN - restart_lsn)),
>> -                    errhint("You might need to increase max_slot_wal_keep_size."));
>> +            if (xid)
>> +            {
>> +                pgstat_drop_replslot(s);
> 
> Why is this done here now?
> 
> 

Oops, moved above the if() in V53.

>> +                if (TransactionIdIsValid(*xid))
>> +                {
>> +                    ereport(LOG,
>> +                            errmsg("invalidating slot \"%s\" because it conflicts with recovery",
NameStr(slotname)),
>> +                            errdetail("The slot conflicted with xid horizon %u.", *xid));
>> +                }
>> +                else
>> +                {
>> +                    ereport(LOG,
>> +                            errmsg("invalidating slot \"%s\" because it conflicts with recovery",
NameStr(slotname)),
>> +                            errdetail("Logical decoding on standby requires wal_level to be at least logical on the
primaryserver"));
 
>> +                }
>> +            }
>> +            else
>> +            {
>> +                ereport(LOG,
>> +                        errmsg("invalidating obsolete replication slot \"%s\"",
>> +                               NameStr(slotname)),
>> +                        errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> +                                  LSN_FORMAT_ARGS(restart_lsn),
>> +                                  (unsigned long long) (oldestLSN - restart_lsn)),
>> +                        errhint("You might need to increase max_slot_wal_keep_size."));
>> +            }
>>
> 
> I don't like all these repeated elogs...

deduplication done in V53 so that there is a single ereport() call.
I'm not sure the translation is fine the way I did it, please advice if that's not right.

> 
> 
> 
>> @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>>               case PROCSIG_RECOVERY_CONFLICT_LOCK:
>>               case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
>>               case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
>> +            case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
>> +
>> +                /*
>> +                 * For conflicts that require a logical slot to be
>> +                 * invalidated, the requirement is for the signal receiver to
>> +                 * release the slot, so that it could be invalidated by the
>> +                 * signal sender. So for normal backends, the transaction
>> +                 * should be aborted, just like for other recovery conflicts.
>> +                 * But if it's walsender on standby, we don't want to go
>> +                 * through the following IsTransactionOrTransactionBlock()
>> +                 * check, so break here.
>> +                 */
>> +                if (am_cascading_walsender &&
>> +                    reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
>> +                    MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
>> +                {
>> +                    RecoveryConflictPending = true;
>> +                    QueryCancelPending = true;
>> +                    InterruptPending = true;
>> +                    break;
>> +                }
>>   
>>                   /*
>>                    * If we aren't in a transaction any longer then ignore.
> 
> I can't see any reason for this to be mixed into the same case "body" as LOCK
> etc?
> 

Oh right, nice catch. I don't know how it ended up done that way. Fixed in V53.

> 
>> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
>> index 38c6f18886..290d4b45f4 100644
>> --- a/src/backend/replication/slot.c
>> +++ b/src/backend/replication/slot.c
>> @@ -51,6 +51,7 @@
>>   #include "storage/proc.h"
>>   #include "storage/procarray.h"
>>   #include "utils/builtins.h"
>> +#include "access/xlogrecovery.h"
> 
> Add new includes in the "alphabetically" right place...

Fixed in 0003 in V53 and the other places (aka other sub-patches) where it was needed.

Regards,

-- 
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachment

pgsql-hackers by date:

Previous
From: John Morris
Date:
Subject: FW: Add the ability to limit the amount of memory that can be allocated to backends.
Next
From: Masahiko Sawada
Date:
Subject: Re: Minimal logical decoding on standbys