Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers
From | Drouvot, Bertrand |
---|---|
Subject | Re: Minimal logical decoding on standbys |
Date | |
Msg-id | 86dccc77-0722-63b9-4c83-fb6928691387@gmail.com Whole thread Raw |
In response to | Re: Minimal logical decoding on standbys (Andres Freund <andres@anarazel.de>) |
Responses |
Re: Minimal logical decoding on standbys
Re: Minimal logical decoding on standbys |
List | pgsql-hackers |
Hi, On 4/2/23 10:10 PM, Andres Freund wrote: > Hi, > > Btw, most of the patches have some things that pgindent will change (and some > that my editor will highlight). It wouldn't hurt to run pgindent for the later > patches... done. > > Pushed the WAL format change. > Thanks! > > On 2023-04-02 10:27:45 +0200, Drouvot, Bertrand wrote: >> 5.3% doc/src/sgml/ >> 6.2% src/backend/access/transam/ >> 4.6% src/backend/replication/logical/ >> 55.6% src/backend/replication/ >> 4.4% src/backend/storage/ipc/ >> 6.9% src/backend/tcop/ >> 5.3% src/backend/ >> 3.8% src/include/catalog/ >> 5.3% src/include/replication/ > > I think it might be worth trying to split this up a bit. > Okay. Split in 2 parts in V56 enclosed. One part to handle logical slot conflicts on standby, and one part to arrange for a new pg_stat_database_conflicts and pg_replication_slots field. > >> restart_lsn = s->data.restart_lsn; >> - >> - /* >> - * If the slot is already invalid or is fresh enough, we don't need to >> - * do anything. >> - */ >> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) >> + slot_xmin = s->data.xmin; >> + slot_catalog_xmin = s->data.catalog_xmin; >> + >> + /* the slot has been invalidated (logical decoding conflict case) */ >> + if ((xid && ((LogicalReplicationSlotIsInvalid(s)) || >> + /* or the xid is valid and this is a non conflicting slot */ >> + (TransactionIdIsValid(*xid) && !(LogicalReplicationSlotXidsConflict(slot_xmin, slot_catalog_xmin,*xid))))) || >> + /* or the slot has been invalidated (obsolete LSN case) */ >> + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN))) >> { > > This still looks nearly unreadable. I suggest moving comments outside of the > if (), remove redundant parentheses, use a function to detect if the slot has > been invalidated. > I made it as simple as: /* * If the slot is already invalid or is a non conflicting slot, we don't * need to do anything. */ islogical = xid ? true : false; if (SlotIsInvalid(s, islogical) || SlotIsNotConflicting(s, islogical, xid, &oldestLSN)) in V56 attached. > >> @@ -1329,16 +1345,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> */ >> if (last_signaled_pid != active_pid) >> { >> + bool send_signal = false; >> + >> + initStringInfo(&err_msg); >> + initStringInfo(&err_detail); >> + >> + appendStringInfo(&err_msg, "terminating process %d to release replication slot \"%s\"", >> + active_pid, >> + NameStr(slotname)); > > For this to be translatable you need to use _("message"). Thanks! > > >> + if (xid) >> + { >> + appendStringInfo(&err_msg, " because it conflicts with recovery"); >> + send_signal = true; >> + >> + if (TransactionIdIsValid(*xid)) >> + appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid); >> + else >> + appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at leastlogical on the primary server"); >> + } >> + else >> + { >> + appendStringInfo(&err_detail, "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", >> + LSN_FORMAT_ARGS(restart_lsn), >> + (unsigned long long) (oldestLSN - restart_lsn)); >> + } >> + >> ereport(LOG, >> - errmsg("terminating process %d to release replication slot \"%s\"", >> - active_pid, NameStr(slotname)), >> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", >> - LSN_FORMAT_ARGS(restart_lsn), >> - (unsigned long long) (oldestLSN - restart_lsn)), >> - errhint("You might need to increase max_slot_wal_keep_size.")); >> - >> - (void) kill(active_pid, SIGTERM); >> + errmsg("%s", err_msg.data), >> + errdetail("%s", err_detail.data), >> + send_signal ? 0 : errhint("You might need to increase max_slot_wal_keep_size.")); >> + >> + if (send_signal) >> + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId); >> + else >> + (void) kill(active_pid, SIGTERM); >> + >> last_signaled_pid = active_pid; >> + >> + pfree(err_msg.data); >> + pfree(err_detail.data); >> } >> >> /* Wait until the slot is released. */ >> @@ -1355,6 +1400,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> } >> else >> { >> + bool hint = false;; >> + >> + initStringInfo(&err_msg); >> + initStringInfo(&err_detail); >> + >> /* >> * We hold the slot now and have already invalidated it; flush it >> * to ensure that state persists. >> @@ -1370,14 +1420,37 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> ReplicationSlotMarkDirty(); >> ReplicationSlotSave(); >> ReplicationSlotRelease(); >> + pgstat_drop_replslot(s); >> + >> + appendStringInfo(&err_msg, "invalidating"); >> + >> + if (xid) >> + { >> + if (TransactionIdIsValid(*xid)) >> + appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid); >> + else >> + appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at least logicalon the primary server"); >> + } >> + else > > These are nearly the same messags as above. This is too much code to duplicate > between terminating and invalidating. Put this into a helper or such. > ReportTerminationInvalidation() added in V56 for this purpose. > >> @@ -3099,6 +3102,31 @@ RecoveryConflictInterrupt(ProcSignalReason reason) >> /* Intentional fall through to session cancel */ >> /* FALLTHROUGH */ >> >> + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: > > The case: above is explicitl falling through. This makes no sense here as far > as I can tell. There is an if "reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT" in the PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT case, so that seems ok to me. Or are you saying that you'd prefer to see the PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT case somewhere else? If so, where? > I thought you did change this in response to my last comment > about it? > Yes. >> index 8872c80cdf..013cd2b4d0 100644 >> --- a/src/include/replication/slot.h >> +++ b/src/include/replication/slot.h >> @@ -17,6 +17,17 @@ >> #include "storage/spin.h" >> #include "replication/walreceiver.h" >> >> +#define ObsoleteSlotIsInvalid(s) (!XLogRecPtrIsInvalid(s->data.invalidated_at) && \ >> + XLogRecPtrIsInvalid(s->data.restart_lsn)) >> + >> +#define LogicalReplicationSlotIsInvalid(s) (!TransactionIdIsValid(s->data.xmin) && \ >> + !TransactionIdIsValid(s->data.catalog_xmin)) >> + >> +#define SlotIsInvalid(s) (ObsoleteSlotIsInvalid(s) || LogicalReplicationSlotIsInvalid (s)) >> + >> +#define LogicalReplicationSlotXidsConflict(slot_xmin, catalog_xmin, xid) \ >> + ((TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid)) || \ >> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))) > > Can you make these static inlines instead? > > Done. > > >> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c >> index 8fe7bb65f1..8457eec4c4 100644 >> --- a/src/backend/replication/logical/decode.c >> +++ b/src/backend/replication/logical/decode.c >> @@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) >> * can restart from there. >> */ >> break; >> + case XLOG_PARAMETER_CHANGE: >> + { >> + xl_parameter_change *xlrec = >> + (xl_parameter_change *) XLogRecGetData(buf->record); >> + >> + /* >> + * If wal_level on primary is reduced to less than logical, then we >> + * want to prevent existing logical slots from being used. >> + * Existing logical slots on standby get invalidated when this WAL >> + * record is replayed; and further, slot creation fails when the >> + * wal level is not sufficient; but all these operations are not >> + * synchronized, so a logical slot may creep in while the wal_level >> + * is being reduced. Hence this extra check. >> + */ >> + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) >> + ereport(ERROR, >> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), >> + errmsg("logical decoding on standby requires wal_level " >> + "to be at least logical on the primary server"))); > > Please don't break error messages into multiple lines, makes it harder to grep > for. > done. Regards, -- Bertrand Drouvot PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
Attachment
- v56-0006-Doc-changes-describing-details-about-logical-dec.patch
- v56-0005-New-TAP-test-for-logical-decoding-on-standby.patch
- v56-0004-Fixing-Walsender-corner-case-with-logical-decodi.patch
- v56-0003-Allow-logical-decoding-on-standby.patch
- v56-0002-Arrange-for-a-new-pg_stat_database_conflicts-and.patch
- v56-0001-Handle-logical-slot-conflicts-on-standby.patch
pgsql-hackers by date: