From b58f5afd7e2d863445b4ecf9fbd750ac0b2606cb Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 26 Jan 2024 18:20:17 +0000 Subject: [PATCH v2] Track inactive replication slot information Currently postgres doesn't track metrics like the time at which the slot became inactive, and the total number of times the slot became inactive in its lifetime. This commit adds two new metrics inactive_at of type timestamptz and inactive_count of type numeric to ReplicationSlotPersistentData. Whenever a slot becomes inactive, the current timestamp and inactive count are persisted to disk. These metrics are useful in the following ways: - To improve replication slot monitoring tools. For instance, one can build a monitoring tool that signals a) when replication slots is lying inactive for a day or so using inactive_at metric, b) when a replication slot is becoming inactive too frequently using inactive_at metric. - To implement timeout-based inactive replication slot management capability in postgres. Increases SLOT_VERSION due to the added two new metrics. --- doc/src/sgml/system-views.sgml | 20 +++++++++++ src/backend/catalog/system_views.sql | 4 ++- src/backend/replication/slot.c | 50 +++++++++++++++++++++++----- src/backend/replication/slotfuncs.c | 15 ++++++++- src/include/catalog/pg_proc.dat | 6 ++-- src/include/replication/slot.h | 6 ++++ src/test/regress/expected/rules.out | 6 ++-- 7 files changed, 91 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index c61312793c..75f99f4ca0 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2566,6 +2566,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx Always false for physical slots. + + + + inactive_at timestamptz + + + The time at which the slot became inactive. + NULL if the slot is currently actively being + used. + + + + + + inactive_count numeric + + + The total number of times the slot became inactive in its lifetime. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index d78077b936..caa6db720c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,9 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.invalidation_reason, - L.failover + L.failover, + L.inactive_at, + L.inactive_count FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 02a14ec210..bf7429ba3f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 4 /* version for new files */ +#define SLOT_VERSION 5 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -315,6 +315,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.inactive_at = 0; + slot->data.inactive_count = 0; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -544,6 +546,17 @@ retry: if (am_walsender) { + if (s->data.persistency == RS_PERSISTENT) + { + SpinLockAcquire(&s->mutex); + s->data.inactive_at = 0; + SpinLockRelease(&s->mutex); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + ereport(log_replication_commands ? LOG : DEBUG1, SlotIsLogical(s) ? errmsg("acquired logical replication slot \"%s\"", @@ -611,16 +624,27 @@ ReplicationSlotRelease(void) ConditionVariableBroadcast(&slot->active_cv); } - MyReplicationSlot = NULL; - - /* might not have been set when we've been a plain slot */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; - ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; - LWLockRelease(ProcArrayLock); - if (am_walsender) { + if (slot->data.persistency == RS_PERSISTENT) + { + SpinLockAcquire(&slot->mutex); + slot->data.inactive_at = GetCurrentTimestamp(); + + /* + * XXX: Can inactive_count of type uint64 ever overflow? It takes + * about a half-billion years for inactive_count to overflow even + * if slot becomes inactive for every 1 millisecond. So, using + * pg_add_u64_overflow might be an overkill. + */ + slot->data.inactive_count++; + SpinLockRelease(&slot->mutex); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + ereport(log_replication_commands ? LOG : DEBUG1, is_logical ? errmsg("released logical replication slot \"%s\"", @@ -630,6 +654,14 @@ ReplicationSlotRelease(void) pfree(slotname); } + + MyReplicationSlot = NULL; + + /* might not have been set when we've been a plain slot */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + LWLockRelease(ProcArrayLock); } /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index e53aeb37c9..3c53f4ac48 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -237,10 +237,11 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 18 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; + char buf[256]; /* * We don't require any special permission to see this function's data @@ -428,6 +429,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.failover); + if (slot_contents.data.inactive_at > 0) + values[i++] = TimestampTzGetDatum(slot_contents.data.inactive_at); + else + nulls[i++] = true; + + /* Convert to numeric. */ + snprintf(buf, sizeof buf, UINT64_FORMAT, slot_contents.data.inactive_count); + values[i++] = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index de1115baa0..52e9fc4971 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11127,9 +11127,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,invalidation_reason,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,timestamptz,numeric}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,invalidation_reason,failover,inactive_at,inactive_count}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index db9bb22266..a7372d3bd5 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -117,6 +117,12 @@ typedef struct ReplicationSlotPersistentData * for logical slots on the primary server. */ bool failover; + + /* When did this slot become inactive last time? */ + TimestampTz inactive_at; + + /* How many times the slot has been inactive? */ + uint64 inactive_count; } ReplicationSlotPersistentData; /* diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 022f9bccb0..4a3cb182e6 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,10 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.invalidation_reason, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, invalidation_reason, failover) + l.failover, + l.inactive_at, + l.inactive_count + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, invalidation_reason, failover, inactive_at, inactive_count) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1