From 8184f054562f747387beb62f11b7508c4d5c80b3 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 9 Apr 2021 21:46:36 +0900 Subject: [PATCH] POC: Use HTAB for replication slot stats. --- src/backend/catalog/system_views.sql | 30 +-- src/backend/postmaster/pgstat.c | 273 +++++++++++----------- src/backend/replication/logical/logical.c | 16 +- src/backend/replication/slot.c | 17 +- src/backend/utils/adt/pgstatfuncs.c | 134 ++++++----- src/include/catalog/pg_proc.dat | 14 +- src/include/pgstat.h | 12 +- src/include/replication/slot.h | 2 +- src/test/regress/expected/rules.out | 8 +- 9 files changed, 267 insertions(+), 239 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6d78b33590..e5591794c2 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -866,20 +866,6 @@ CREATE VIEW pg_stat_replication AS JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); -CREATE VIEW pg_stat_replication_slots AS - SELECT - s.slot_name, - s.spill_txns, - s.spill_count, - s.spill_bytes, - s.stream_txns, - s.stream_count, - s.stream_bytes, - s.total_txns, - s.total_bytes, - s.stats_reset - FROM pg_stat_get_replication_slots() AS s; - CREATE VIEW pg_stat_slru AS SELECT s.name, @@ -984,6 +970,22 @@ CREATE VIEW pg_replication_slots AS FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); +CREATE VIEW pg_stat_replication_slots AS + SELECT + s.slot_name, + s.total_txns, + s.total_bytes, + s.spill_txns, + s.spill_count, + s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, + s.stats_reset + FROM pg_replication_slots as r, + LATERAL pg_stat_get_replication_slot(slot_name) as s + WHERE r.datoid IS NOT NULL; -- excluding physical slots + CREATE VIEW pg_stat_database AS SELECT D.oid AS datid, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e5b1fb045e..46eb4507c1 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -106,6 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 +#define PGSTAT_REPLSLOT_HASH_SIZE 32 /* ---------- @@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; -static PgStat_ReplSlotStats *replSlotStats; -static int nReplSlotStats; +static HTAB *replSlotStats = NULL; static PgStat_RecoveryPrefetchStats recoveryPrefetchStats; /* @@ -319,8 +319,8 @@ static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); static bool pgstat_db_requested(Oid databaseid); -static int pgstat_replslot_index(const char *name, bool create_it); -static void pgstat_reset_replslot(int i, TimestampTz ts); +static PgStat_ReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it); +static void pgstat_reset_replslot(PgStat_ReplSlotEntry *slotstats, TimestampTz ts); static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); static void pgstat_send_funcstats(void); @@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void) /* Clean up */ hash_destroy(htab); + /* + * Check for all replication slots in stats hash table if they still exist. + * + * XXX: maybe we can skip this check until the number of entries in the + * replication stats hash exceeds max_replication_slots. + */ + if (replSlotStats) + { + PgStat_ReplSlotEntry *slotentry; + + hash_seq_init(&hstat, replSlotStats); + while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL) + pgstat_report_replslot_drop(NameStr(slotentry->slotname)); + } + } + /* * Lookup our own database entry; if not found, nothing more to do. */ @@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name) if (name) { - ReplicationSlot *slot; - - /* - * Check if the slot exists with the given name. It is possible that by - * the time this message is executed the slot is dropped but at least - * this check will ensure that the given name is for a valid slot. - */ - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - slot = SearchNamedReplicationSlot(name); - LWLockRelease(ReplicationSlotControlLock); - - if (!slot) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot \"%s\" does not exist", - name))); - - /* - * Nothing to do for physical slots as we collect stats only for - * logical slots. - */ - if (SlotIsPhysical(slot)) - return; - namestrcpy(&msg.m_slotname, name); msg.clearall = false; } @@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize) * ---------- */ void -pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) +pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat) { PgStat_MsgReplSlot msg; @@ -1823,14 +1817,14 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname)); msg.m_drop = false; + msg.m_total_txns = repSlotStat->total_txns; + msg.m_total_bytes = repSlotStat->total_bytes; msg.m_spill_txns = repSlotStat->spill_txns; msg.m_spill_count = repSlotStat->spill_count;; msg.m_spill_bytes = repSlotStat->spill_bytes; msg.m_stream_txns = repSlotStat->stream_txns; msg.m_stream_count = repSlotStat->stream_count; msg.m_stream_bytes = repSlotStat->stream_bytes; - msg.m_total_txns = repSlotStat->total_txns; - msg.m_total_bytes = repSlotStat->total_bytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -2872,17 +2866,19 @@ pgstat_fetch_slru(void) * pgstat_fetch_replslot() - * * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the replication slot statistics struct and sets the - * number of entries in nslots_p. + * a pointer to the replication slot statistics struct. * --------- */ -PgStat_ReplSlotStats * -pgstat_fetch_replslot(int *nslots_p) +PgStat_ReplSlotEntry * +pgstat_fetch_replslot(NameData slotname) { + PgStat_ReplSlotEntry *slotent = NULL; + backend_read_statsfile(); - *nslots_p = nReplSlotStats; - return replSlotStats; + slotent = pgstat_get_replslot_entry(slotname, false); + + return slotent; } /* @@ -3654,7 +3650,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; - int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -3744,11 +3739,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) /* * Write replication slot stats struct */ - for (i = 0; i < nReplSlotStats; i++) + if (replSlotStats) { - fputc('R', fpout); - rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + PgStat_ReplSlotEntry *slotentry; + + hash_seq_init(&hstat, replSlotStats); + while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + fputc('R', fpout); + rc = fwrite(slotentry, sizeof(PgStat_ReplSlotEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } } /* @@ -3975,12 +3976,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* Allocate the space for replication slot statistics */ - replSlotStats = MemoryContextAllocZero(pgStatLocalContext, - max_replication_slots - * sizeof(PgStat_ReplSlotStats)); - nReplSlotStats = 0; - /* * Clear out global, archiver, WAL and SLRU statistics so they start from * zero in case we can't load an existing statsfile. @@ -4005,12 +4000,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) for (i = 0; i < SLRU_NUM_ELEMENTS; i++) slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; - /* - * Set the same reset timestamp for all replication slots too. - */ - for (i = 0; i < max_replication_slots; i++) - replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; - /* * Try to open the stats file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch @@ -4197,21 +4186,27 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication + * 'R' A PgStat_ReplSlotEntry struct describing a replication * slot follows. */ case 'R': - if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) + { + PgStat_ReplSlotEntry slotstats; + PgStat_ReplSlotEntry *slotent; + + if (fread(&slotstats, 1, sizeof(PgStat_ReplSlotEntry), fpin) + != sizeof(PgStat_ReplSlotEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted statistics file \"%s\"", statfile))); - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); goto done; } - nReplSlotStats++; + + slotent = pgstat_get_replslot_entry(slotstats.slotname, true); + memcpy(slotent, &slotstats, sizeof(PgStat_ReplSlotEntry)); break; + } case 'E': goto done; @@ -4424,7 +4419,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_ArchiverStats myArchiverStats; PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; - PgStat_ReplSlotStats myReplSlotStats; + PgStat_ReplSlotEntry myReplSlotStats; PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats; FILE *fpin; int32 format_id; @@ -4553,12 +4548,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication + * 'R' A PgStat_ReplSlotEntry struct describing a replication * slot follows. */ case 'R': - if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) + if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotEntry), fpin) + != sizeof(PgStat_ReplSlotEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted statistics file \"%s\"", @@ -4765,7 +4760,6 @@ pgstat_clear_snapshot(void) pgStatLocalContext = NULL; pgStatDBHash = NULL; replSlotStats = NULL; - nReplSlotStats = 0; /* * Historically the backend_status.c facilities lived in this file, and @@ -5189,20 +5183,26 @@ static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len) { - int i; - int idx = -1; + PgStat_ReplSlotEntry *slotent; TimestampTz ts; + /* Return if we don't have replication slot statistics */ + if (replSlotStats == NULL) + return; + ts = GetCurrentTimestamp(); if (msg->clearall) { - for (i = 0; i < nReplSlotStats; i++) - pgstat_reset_replslot(i, ts); + HASH_SEQ_STATUS sstat; + + hash_seq_init(&sstat, replSlotStats); + while ((slotent = (PgStat_ReplSlotEntry *) hash_seq_search(&sstat)) != NULL) + pgstat_reset_replslot(slotent, ts); } else { /* Get the index of replication slot statistics to reset */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), false); + slotent = pgstat_get_replslot_entry(msg->m_slotname, false); /* * Nothing to do if the given slot entry is not found. This could @@ -5210,11 +5210,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, * corresponding statistics entry is also removed before receiving the * reset message. */ - if (idx < 0) + if (!slotent) return; /* Reset the stats for the requested replication slot */ - pgstat_reset_replslot(idx, ts); + pgstat_reset_replslot(slotent, ts); } } @@ -5532,46 +5532,29 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) { - int idx; - - /* - * Get the index of replication slot statistics. On dropping, we don't - * create the new statistics. - */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop); - - /* - * The slot entry is not found or there is no space to accommodate the new - * entry. This could happen when the message for the creation of a slot - * reached before the drop message even though the actual operations - * happen in reverse order. In such a case, the next update of the - * statistics for the same slot will create the required entry. - */ - if (idx < 0) - return; - - /* it must be a valid replication slot index */ - Assert(idx < nReplSlotStats); - if (msg->m_drop) { /* Remove the replication slot statistics with the given name */ - if (idx < nReplSlotStats - 1) - memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], - sizeof(PgStat_ReplSlotStats)); - nReplSlotStats--; + if (replSlotStats != NULL) + (void) hash_search(replSlotStats, (void *) NameStr(msg->m_slotname), + HASH_REMOVE, NULL); } else { + PgStat_ReplSlotEntry *slotent; + + slotent = pgstat_get_replslot_entry(msg->m_slotname, true); + Assert(slotent); + /* Update the replication slot statistics */ - replSlotStats[idx].spill_txns += msg->m_spill_txns; - replSlotStats[idx].spill_count += msg->m_spill_count; - replSlotStats[idx].spill_bytes += msg->m_spill_bytes; - replSlotStats[idx].stream_txns += msg->m_stream_txns; - replSlotStats[idx].stream_count += msg->m_stream_count; - replSlotStats[idx].stream_bytes += msg->m_stream_bytes; - replSlotStats[idx].total_txns += msg->m_total_txns; - replSlotStats[idx].total_bytes += msg->m_total_bytes; + slotent->total_txns += msg->m_total_txns; + slotent->total_bytes += msg->m_total_bytes; + slotent->spill_txns += msg->m_spill_txns; + slotent->spill_count += msg->m_spill_count; + slotent->spill_bytes += msg->m_spill_bytes; + slotent->stream_txns += msg->m_stream_txns; + slotent->stream_count += msg->m_stream_count; + slotent->stream_bytes += msg->m_stream_bytes; } } @@ -5749,59 +5732,79 @@ pgstat_db_requested(Oid databaseid) } /* ---------- - * pgstat_replslot_index + * pgstat_replslot_entry * - * Return the index of entry of a replication slot with the given name, or - * -1 if the slot is not found. + * Return the entry of replication slot stats with the given name. Return + * NULL if not found and the caller didn't request to create it. * * create_it tells whether to create the new slot entry if it is not found. * ---------- */ -static int -pgstat_replslot_index(const char *name, bool create_it) +static PgStat_ReplSlotEntry * +pgstat_get_replslot_entry(NameData name, bool create_it) { - int i; + PgStat_ReplSlotEntry *slotent; + bool found; - Assert(nReplSlotStats <= max_replication_slots); - for (i = 0; i < nReplSlotStats; i++) + /* + * Create the replication slot stats hash table if we don't have + * it already. + */ + if (replSlotStats == NULL) { - if (namestrcmp(&replSlotStats[i].slotname, name) == 0) - return i; /* found */ + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(NameData); + hash_ctl.entrysize = sizeof(PgStat_ReplSlotEntry); + hash_ctl.hcxt = pgStatLocalContext; + + replSlotStats = hash_create("Replication slots hash", + PGSTAT_REPLSLOT_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); } - /* - * The slot is not found. We don't want to register the new statistics if - * the list is already full or the caller didn't request. - */ - if (i == max_replication_slots || !create_it) - return -1; + slotent = (PgStat_ReplSlotEntry *) hash_search(replSlotStats, + (void *) &name, + create_it ? HASH_ENTER : HASH_FIND, + &found); + + if (!slotent) + { + /* not found */ + Assert(!create_it && !found); + return NULL; + } - /* Register new slot */ - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); - namestrcpy(&replSlotStats[nReplSlotStats].slotname, name); + /* initialize the entry */ + if (create_it && !found) + { + memset(slotent, 0, sizeof(PgStat_ReplSlotEntry)); + namestrcpy(&(slotent->slotname), NameStr(name)); + } - return nReplSlotStats++; + return slotent; } /* ---------- * pgstat_reset_replslot * - * Reset the replication slot stats at index 'i'. + * Reset the given replication slot stats. * ---------- */ static void -pgstat_reset_replslot(int i, TimestampTz ts) +pgstat_reset_replslot(PgStat_ReplSlotEntry *slotent, TimestampTz ts) { /* reset only counters. Don't clear slot name */ - replSlotStats[i].spill_txns = 0; - replSlotStats[i].spill_count = 0; - replSlotStats[i].spill_bytes = 0; - replSlotStats[i].stream_txns = 0; - replSlotStats[i].stream_count = 0; - replSlotStats[i].stream_bytes = 0; - replSlotStats[i].total_txns = 0; - replSlotStats[i].total_bytes = 0; - replSlotStats[i].stat_reset_timestamp = ts; + slotent->total_txns = 0; + slotent->total_bytes = 0; + slotent->spill_txns = 0; + slotent->spill_count = 0; + slotent->spill_bytes = 0; + slotent->stream_txns = 0; + slotent->stream_count = 0; + slotent->stream_bytes = 0; + slotent->stat_reset_timestamp = ts; } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 20a99d3035..cc2a28a045 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1773,7 +1773,7 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; - PgStat_ReplSlotStats repSlotStat; + PgStat_ReplSlotEntry repSlotStat; /* * Nothing to do if we don't have any replication stats to be sent. @@ -1783,32 +1783,32 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld", rb, + (long long) rb->totalTxns, + (long long) rb->totalBytes, (long long) rb->spillTxns, (long long) rb->spillCount, (long long) rb->spillBytes, (long long) rb->streamTxns, (long long) rb->streamCount, - (long long) rb->streamBytes, - (long long) rb->totalTxns, - (long long) rb->totalBytes); + (long long) rb->streamBytes); namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name)); + repSlotStat.total_txns = rb->totalTxns; + repSlotStat.total_bytes = rb->totalBytes; repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; repSlotStat.spill_bytes = rb->spillBytes; repSlotStat.stream_txns = rb->streamTxns; repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; pgstat_report_replslot(&repSlotStat); + rb->totalTxns = 0; + rb->totalBytes = 0; rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; rb->streamTxns = 0; rb->streamCount = 0; rb->streamBytes = 0; - rb->totalTxns = 0; - rb->totalBytes = 0; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f61b163f78..ea8e0305f8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -329,8 +329,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, */ if (SlotIsLogical(slot)) { - PgStat_ReplSlotStats repSlotStat; - MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats)); + PgStat_ReplSlotEntry repSlotStat; + MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotEntry)); namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name)); pgstat_report_replslot(&repSlotStat); } @@ -349,17 +349,15 @@ ReplicationSlotCreate(const char *name, bool db_specific, * Search for the named replication slot. * * Return the replication slot if found, otherwise NULL. - * - * The caller must hold ReplicationSlotControlLock in shared mode. */ ReplicationSlot * -SearchNamedReplicationSlot(const char *name) +SearchNamedReplicationSlot(const char *name, bool need_lock) { int i; ReplicationSlot *slot = NULL; - Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, - LW_SHARED)); + if (need_lock) + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -372,6 +370,9 @@ SearchNamedReplicationSlot(const char *name) } } + if (need_lock) + LWLockRelease(ReplicationSlotControlLock); + return slot; } @@ -416,7 +417,7 @@ retry: * Search for the slot with the specified name if the slot to acquire is * not given. If the slot is not found, we either return -1 or error out. */ - s = slot ? slot : SearchNamedReplicationSlot(name); + s = slot ? slot : SearchNamedReplicationSlot(name, false); if (s == NULL || !s->in_use) { LWLockRelease(ReplicationSlotControlLock); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 2680190a40..f92c50dd27 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -24,6 +24,7 @@ #include "pgstat.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/acl.h" @@ -2207,8 +2208,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) char *target = NULL; if (!PG_ARGISNULL(0)) + { + ReplicationSlot *slot; + target = text_to_cstring(PG_GETARG_TEXT_PP(0)); + /* + * Check if the slot exists with the given name. It is possible that by + * the time this message is executed the slot is dropped but at least + * this check will ensure that the given name is for a valid slot. + */ + slot = SearchNamedReplicationSlot(target, true); + + if (!slot) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" does not exist", + target))); + + /* + * Nothing to do for physical slots as we collect stats only for + * logical slots. + */ + if (SlotIsPhysical(slot)) + PG_RETURN_VOID(); + } + pgstat_reset_replslot_counter(target); PG_RETURN_VOID(); @@ -2280,73 +2305,68 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } -/* Get the statistics for the replication slots */ +/* Get the statistics for the replication slot */ Datum -pg_stat_get_replication_slots(PG_FUNCTION_ARGS) +pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { #define PG_STAT_GET_REPLICATION_SLOT_COLS 10 - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + text *slotname_text = PG_GETARG_TEXT_P(0); + NameData slotname; TupleDesc tupdesc; - Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - PgStat_ReplSlotStats *slotstats; - int nstats; - int i; + Datum values[10]; + bool nulls[10]; + PgStat_ReplSlotEntry *slotent; - /* check to see if caller supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context that cannot accept a set"))); - if (!(rsinfo->allowedModes & SFRM_Materialize)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not allowed in this context"))); - - /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; - oldcontext = MemoryContextSwitchTo(per_query_ctx); - - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); - MemoryContextSwitchTo(oldcontext); + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "total_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "total_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "spill_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "spill_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "stream_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stream_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TIMESTAMPTZOID, -1, 0); - slotstats = pgstat_fetch_replslot(&nstats); - for (i = 0; i < nstats; i++) - { - Datum values[PG_STAT_GET_REPLICATION_SLOT_COLS]; - bool nulls[PG_STAT_GET_REPLICATION_SLOT_COLS]; - PgStat_ReplSlotStats *s = &(slotstats[i]); + BlessTupleDesc(tupdesc); - MemSet(values, 0, sizeof(values)); - MemSet(nulls, 0, sizeof(nulls)); + namestrcpy(&slotname, text_to_cstring(slotname_text)); + slotent = pgstat_fetch_replslot(slotname); - values[0] = CStringGetTextDatum(NameStr(s->slotname)); - values[1] = Int64GetDatum(s->spill_txns); - values[2] = Int64GetDatum(s->spill_count); - values[3] = Int64GetDatum(s->spill_bytes); - values[4] = Int64GetDatum(s->stream_txns); - values[5] = Int64GetDatum(s->stream_count); - values[6] = Int64GetDatum(s->stream_bytes); - values[7] = Int64GetDatum(s->total_txns); - values[8] = Int64GetDatum(s->total_bytes); - - if (s->stat_reset_timestamp == 0) - nulls[9] = true; - else - values[9] = TimestampTzGetDatum(s->stat_reset_timestamp); + if (!slotent) + PG_RETURN_NULL(); - tuplestore_putvalues(tupstore, tupdesc, values, nulls); - } + values[0] = CStringGetTextDatum(NameStr(slotent->slotname)); + values[1] = Int64GetDatum(slotent->total_txns); + values[2] = Int64GetDatum(slotent->total_bytes); + values[3] = Int64GetDatum(slotent->spill_txns); + values[4] = Int64GetDatum(slotent->spill_count); + values[5] = Int64GetDatum(slotent->spill_bytes); + values[6] = Int64GetDatum(slotent->stream_txns); + values[7] = Int64GetDatum(slotent->stream_count); + values[8] = Int64GetDatum(slotent->stream_bytes); - tuplestore_donestoring(tupstore); + if (slotent->stat_reset_timestamp == 0) + nulls[9] = true; + else + values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp); - return (Datum) 0; + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 591753fe81..85720dcb0c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5311,14 +5311,14 @@ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}', prosrc => 'pg_stat_get_wal_receiver' }, -{ oid => '8595', descr => 'statistics: information about replication slots', - proname => 'pg_stat_get_replication_slots', prorows => '10', +{ oid => '8595', descr => 'statistics: information about replication slot', + proname => 'pg_stat_get_replication_slot', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', - prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', - prosrc => 'pg_stat_get_replication_slots' }, + prorettype => 'record', proargtypes => 'text', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,total_txns,total_bytes,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}', + prosrc => 'pg_stat_get_replication_slot' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 2aeb3cded4..a6de486291 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -917,19 +917,19 @@ typedef struct PgStat_SLRUStats /* * Replication slot statistics kept in the stats collector */ -typedef struct PgStat_ReplSlotStats +typedef struct PgStat_ReplSlotEntry { NameData slotname; + PgStat_Counter total_txns; + PgStat_Counter total_bytes; PgStat_Counter spill_txns; PgStat_Counter spill_count; PgStat_Counter spill_bytes; PgStat_Counter stream_txns; PgStat_Counter stream_count; PgStat_Counter stream_bytes; - PgStat_Counter total_txns; - PgStat_Counter total_bytes; TimestampTz stat_reset_timestamp; -} PgStat_ReplSlotStats; +} PgStat_ReplSlotEntry; /* @@ -1031,7 +1031,7 @@ extern void pgstat_report_recovery_conflict(int reason); extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); -extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat); +extern void pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); @@ -1129,7 +1129,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); -extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p); +extern PgStat_ReplSlotEntry *pgstat_fetch_replslot(NameData slotname); extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void); extern void pgstat_count_slru_page_zeroed(int slru_idx); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c50d..357068403a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); -extern ReplicationSlot *SearchNamedReplicationSlot(const char *name); +extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6399f3feef..4380577b7d 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2062,16 +2062,18 @@ pg_stat_replication| SELECT s.pid, JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, + s.total_txns, + s.total_bytes, s.spill_txns, s.spill_count, s.spill_bytes, s.stream_txns, s.stream_count, s.stream_bytes, - s.total_txns, - s.total_bytes, s.stats_reset - FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset); + FROM pg_replication_slots r, + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, total_txns, total_bytes, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset) + WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, -- 2.24.3 (Apple Git-128)