Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Drouvot, Bertrand
Subject Re: Minimal logical decoding on standbys
Date
Msg-id 86dccc77-0722-63b9-4c83-fb6928691387@gmail.com
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  (Andres Freund <andres@anarazel.de>)
Responses Re: Minimal logical decoding on standbys
Re: Minimal logical decoding on standbys
List pgsql-hackers
Hi,

On 4/2/23 10:10 PM, Andres Freund wrote:
> Hi,
> 
> Btw, most of the patches have some things that pgindent will change (and some
> that my editor will highlight). It wouldn't hurt to run pgindent for the later
> patches...

done.

> 
> Pushed the WAL format change.
> 

Thanks!

> 
> On 2023-04-02 10:27:45 +0200, Drouvot, Bertrand wrote:
>>     5.3% doc/src/sgml/
>>     6.2% src/backend/access/transam/
>>     4.6% src/backend/replication/logical/
>>    55.6% src/backend/replication/
>>     4.4% src/backend/storage/ipc/
>>     6.9% src/backend/tcop/
>>     5.3% src/backend/
>>     3.8% src/include/catalog/
>>     5.3% src/include/replication/
> 
> I think it might be worth trying to split this up a bit.
> 

Okay. Split in 2 parts in V56 enclosed.

One part to handle logical slot conflicts on standby, and one part
to arrange for a new pg_stat_database_conflicts and pg_replication_slots field.

> 
>>           restart_lsn = s->data.restart_lsn;
>> -
>> -        /*
>> -         * If the slot is already invalid or is fresh enough, we don't need to
>> -         * do anything.
>> -         */
>> -        if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
>> +        slot_xmin = s->data.xmin;
>> +        slot_catalog_xmin = s->data.catalog_xmin;
>> +
>> +        /* the slot has been invalidated (logical decoding conflict case) */
>> +        if ((xid && ((LogicalReplicationSlotIsInvalid(s)) ||
>> +        /* or the xid is valid and this is a non conflicting slot */
>> +                     (TransactionIdIsValid(*xid) && !(LogicalReplicationSlotXidsConflict(slot_xmin,
slot_catalog_xmin,*xid))))) ||
 
>> +        /* or the slot has been invalidated (obsolete LSN case) */
>> +            (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
>>           {
> 
> This still looks nearly unreadable. I suggest moving comments outside of the
> if (), remove redundant parentheses, use a function to detect if the slot has
> been invalidated.
> 

I made it as simple as:

         /*
          * If the slot is already invalid or is a non conflicting slot, we don't
          * need to do anything.
          */
         islogical = xid ? true : false;

         if (SlotIsInvalid(s, islogical) || SlotIsNotConflicting(s, islogical, xid, &oldestLSN))

in V56 attached.

> 
>> @@ -1329,16 +1345,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>                */
>>               if (last_signaled_pid != active_pid)
>>               {
>> +                bool        send_signal = false;
>> +
>> +                initStringInfo(&err_msg);
>> +                initStringInfo(&err_detail);
>> +
>> +                appendStringInfo(&err_msg, "terminating process %d to release replication slot \"%s\"",
>> +                                 active_pid,
>> +                                 NameStr(slotname));
> 
> For this to be translatable you need to use _("message").

Thanks!

> 
> 
>> +                if (xid)
>> +                {
>> +                    appendStringInfo(&err_msg, " because it conflicts with recovery");
>> +                    send_signal = true;
>> +
>> +                    if (TransactionIdIsValid(*xid))
>> +                        appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid);
>> +                    else
>> +                        appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at
leastlogical on the primary server");
 
>> +                }
>> +                else
>> +                {
>> +                    appendStringInfo(&err_detail, "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> +                                     LSN_FORMAT_ARGS(restart_lsn),
>> +                                     (unsigned long long) (oldestLSN - restart_lsn));
>> +                }
>> +
>>                   ereport(LOG,
>> -                        errmsg("terminating process %d to release replication slot \"%s\"",
>> -                               active_pid, NameStr(slotname)),
>> -                        errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> -                                  LSN_FORMAT_ARGS(restart_lsn),
>> -                                  (unsigned long long) (oldestLSN - restart_lsn)),
>> -                        errhint("You might need to increase max_slot_wal_keep_size."));
>> -
>> -                (void) kill(active_pid, SIGTERM);
>> +                        errmsg("%s", err_msg.data),
>> +                        errdetail("%s", err_detail.data),
>> +                        send_signal ? 0 : errhint("You might need to increase max_slot_wal_keep_size."));
>> +
>> +                if (send_signal)
>> +                    (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
>> +                else
>> +                    (void) kill(active_pid, SIGTERM);
>> +
>>                   last_signaled_pid = active_pid;
>> +
>> +                pfree(err_msg.data);
>> +                pfree(err_detail.data);
>>               }
>>   
>>               /* Wait until the slot is released. */
>> @@ -1355,6 +1400,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>           }
>>           else
>>           {
>> +            bool        hint = false;;
>> +
>> +            initStringInfo(&err_msg);
>> +            initStringInfo(&err_detail);
>> +
>>               /*
>>                * We hold the slot now and have already invalidated it; flush it
>>                * to ensure that state persists.
>> @@ -1370,14 +1420,37 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>>               ReplicationSlotMarkDirty();
>>               ReplicationSlotSave();
>>               ReplicationSlotRelease();
>> +            pgstat_drop_replslot(s);
>> +
>> +            appendStringInfo(&err_msg, "invalidating");
>> +
>> +            if (xid)
>> +            {
>> +                if (TransactionIdIsValid(*xid))
>> +                    appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid);
>> +                else
>> +                    appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at least
logicalon the primary server");
 
>> +            }
>> +            else
> 
> These are nearly the same messags as above. This is too much code to duplicate
> between terminating and invalidating. Put this into a helper or such.
> 

ReportTerminationInvalidation() added in V56 for this purpose.

> 
>> @@ -3099,6 +3102,31 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>>                   /* Intentional fall through to session cancel */
>>                   /* FALLTHROUGH */
>>   
>> +            case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
> 
> The case: above is explicitl falling through. This makes no sense here as far
> as I can tell.

There is an if "reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT" in the
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT case, so that seems ok to me.

Or are you saying that you'd prefer to see the PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT case somewhere else?
If so, where?

> I thought you did change this in response to my last comment
> about it?
> 

Yes.


>> index 8872c80cdf..013cd2b4d0 100644
>> --- a/src/include/replication/slot.h
>> +++ b/src/include/replication/slot.h
>> @@ -17,6 +17,17 @@
>>   #include "storage/spin.h"
>>   #include "replication/walreceiver.h"
>>   
>> +#define ObsoleteSlotIsInvalid(s) (!XLogRecPtrIsInvalid(s->data.invalidated_at) && \
>> +                                  XLogRecPtrIsInvalid(s->data.restart_lsn))
>> +
>> +#define LogicalReplicationSlotIsInvalid(s) (!TransactionIdIsValid(s->data.xmin) && \
>> +                                            !TransactionIdIsValid(s->data.catalog_xmin))
>> +
>> +#define SlotIsInvalid(s) (ObsoleteSlotIsInvalid(s) || LogicalReplicationSlotIsInvalid (s))
>> +
>> +#define LogicalReplicationSlotXidsConflict(slot_xmin, catalog_xmin, xid) \
>> +        ((TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid)) || \
>> +        (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)))
> 
> Can you make these static inlines instead?
> 
> 

Done.

> 
> 
>> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
>> index 8fe7bb65f1..8457eec4c4 100644
>> --- a/src/backend/replication/logical/decode.c
>> +++ b/src/backend/replication/logical/decode.c
>> @@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>>                * can restart from there.
>>                */
>>               break;
>> +        case XLOG_PARAMETER_CHANGE:
>> +        {
>> +            xl_parameter_change *xlrec =
>> +                (xl_parameter_change *) XLogRecGetData(buf->record);
>> +
>> +            /*
>> +             * If wal_level on primary is reduced to less than logical, then we
>> +             * want to prevent existing logical slots from being used.
>> +             * Existing logical slots on standby get invalidated when this WAL
>> +             * record is replayed; and further, slot creation fails when the
>> +             * wal level is not sufficient; but all these operations are not
>> +             * synchronized, so a logical slot may creep in while the wal_level
>> +             * is being reduced. Hence this extra check.
>> +             */
>> +            if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
>> +                ereport(ERROR,
>> +                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>> +                         errmsg("logical decoding on standby requires wal_level "
>> +                                "to be at least logical on the primary server")));
> 
> Please don't break error messages into multiple lines, makes it harder to grep
> for.
>
done.

Regards,

-- 
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachment

pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: Initial Schema Sync for Logical Replication
Next
From: Alvaro Herrera
Date:
Subject: Re: Minimal logical decoding on standbys