From 922b056074f24e7eb6df971f69b166fc476e38d3 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Tue, 26 Aug 2025 13:40:19 -0400 Subject: [PATCH v14 3/3] Track buffer usage per-backend and use that to inform buffer managment Implement a comprehensive buffer pressure monitoring and management system to improve PostgreSQL's buffer replacement efficiency under high load. Problems: - Buffer pressure was only detectable reactively during allocation failures - No visibility into which backends contribute most to buffer contention - High usage counts force multiple clock-sweep passes under heavy load - bgwriter had limited intelligence about system-wide buffer usage patterns Solution: - Add per-backend buffer usage counters - Implement proactive buffer pressure calculation in bgwriter - Add targeted buffer writing for high-usage backends (90th percentile) - Adjust rate of usage count reduction when pressure exceeds 75% threshold Benefits: - Reduces buffer allocation stalls by proactively managing pressure - Provides fair resource management by targeting high-usage backends - Improves system responsiveness under memory pressure - Maintains backward compatibility with existing buffer management - Enables better observability of buffer usage patterns per backend --- src/backend/postmaster/bgwriter.c | 320 ++++++++++++++++++++++++++ src/backend/storage/buffer/buf_init.c | 3 + src/backend/storage/buffer/bufmgr.c | 9 + src/backend/storage/buffer/freelist.c | 70 +++++- src/backend/storage/lmgr/proc.c | 8 + src/include/storage/buf_internals.h | 2 + src/include/storage/proc.h | 10 + src/tools/pgindent/typedefs.list | 2 + 8 files changed, 420 insertions(+), 4 deletions(-) diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 72f5acceec7..3d44374f5ab 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -77,6 +77,283 @@ int BgWriterDelay = 200; static TimestampTz last_snapshot_ts; static XLogRecPtr last_snapshot_lsn = InvalidXLogRecPtr; +/* + * Collected buffer usage information. + */ +typedef struct BackendBufferStats +{ + int backend_id; + uint64 usage_sum; + double usage_ratio; +} BackendBufferStats; + +static int +compare_backend_usage(const void *a, const void *b) +{ + const BackendBufferStats *stat_a = (const BackendBufferStats *) a; + const BackendBufferStats *stat_b = (const BackendBufferStats *) b; + + if (stat_a->usage_ratio < stat_b->usage_ratio) + return -1; + if (stat_a->usage_ratio > stat_b->usage_ratio) + return 1; + return 0; +} + +static uint64 +CalculateSystemBufferPressure(BackendBufferStats *backend_stats[], int *num_backends) +{ + uint64 total_usage = 0; + int active_backends = 0; + BackendBufferStats *stats; + + /* Count active backends first */ + for (int i = 0; i < ProcGlobal->allProcCount; i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + + if (proc->pid != 0 && proc->databaseId != InvalidOid) + active_backends++; + } + + if (active_backends == 0) + { + *backend_stats = NULL; + *num_backends = 0; + return 0; + } + + /* Allocate stats array */ + stats = palloc(sizeof(BackendBufferStats) * active_backends); + *backend_stats = stats; + *num_backends = active_backends; + + /* Collect stats from all active backends */ + for (int i = 0, j = 0; i < ProcGlobal->allProcCount; i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + + if (proc->pid != 0 && proc->databaseId != InvalidOid) + { + uint64 usage_sum = pg_atomic_read_u32(&proc->bufferUsageSum); + + stats[j].backend_id = i; + stats[j].usage_sum = usage_sum; + stats[j].usage_ratio = (double) usage_sum / NBuffers; + total_usage += usage_sum; + j++; + } + } + + /* Sort by usage ratio for percentile calculation */ + qsort(stats, active_backends, sizeof(BackendBufferStats), + compare_backend_usage); + + return total_usage; +} + +static void +GetHighUsageBackends(BackendBufferStats *stats, int num_backends, + int **high_usage_backends, int *num_high_usage) +{ + int percentile_90_idx = (int) (num_backends * 0.9); + + *num_high_usage = num_backends - percentile_90_idx; + + if (*num_high_usage > 0) + { + *high_usage_backends = palloc(sizeof(int) * (*num_high_usage)); + for (int i = 0; i < *num_high_usage; i++) + (*high_usage_backends)[i] = stats[percentile_90_idx + i].backend_id; + } + else + { + *high_usage_backends = NULL; + *num_high_usage = 0; + } +} + +/* + * Shared buffer sync function used by both main loop and aggressive writing + */ +static int +SyncTargetedBuffers(WritebackContext *wb_context, int *target_backends, + int num_targets, int max_buffers) +{ + int buffers_written = 0; + int buffer_id; + BufferDesc *bufHdr; + uint32 buf_state; + + /* If no specific targets, sync any dirty buffers */ + if (target_backends == NULL || num_targets == 0) + return BgBufferSync(wb_context); + + /* Scan through all buffers looking for dirty ones from target backends */ + for (buffer_id = 0; buffer_id < NBuffers && buffers_written < max_buffers; buffer_id++) + { + uint32 dirty_backend; + bool is_target; + + bufHdr = GetBufferDescriptor(buffer_id); + + /* Quick check if buffer is dirty */ + buf_state = pg_atomic_read_u32(&bufHdr->state); + if (!(buf_state & BM_DIRTY)) + continue; + + /* Check if this buffer is from one of our target backends */ + dirty_backend = pg_atomic_read_u32(&bufHdr->dirty_backend_id); + is_target = false; + + for (int i = 0; i < num_targets; i++) + if (dirty_backend == target_backends[i]) + { + is_target = true; + break; + } + + if (!is_target) + continue; + + /* Skip if buffer is pinned */ + if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) + continue; + + /* Try to write this buffer using the writeback context */ + ScheduleBufferTagForWriteback(wb_context, + IOContextForStrategy(NULL), + &bufHdr->tag); + buffers_written++; + } + + /* Issue the actual writes */ + if (buffers_written > 0) + IssuePendingWritebacks(wb_context, IOContextForStrategy(NULL)); + + return buffers_written; +} + +static void +AggressiveBufferWrite(WritebackContext *wb_context, int *high_usage_backends, + int num_high_usage, bool critical) +{ + int write_target = critical ? bgwriter_lru_maxpages * 3 : bgwriter_lru_maxpages * 2; + int buffers_written = 0; + + /* Focus on buffers from high-usage backends first */ + buffers_written = SyncTargetedBuffers(wb_context, high_usage_backends, + num_high_usage, write_target); + + /* If still under target, write additional dirty buffers */ + if (buffers_written < write_target) + BgBufferSync(wb_context); +} + +/* In src/backend/postmaster/bgwriter.c - Enhanced UpdateBackendDecayRates */ +static void +UpdateBackendDecayRates(BackendBufferStats *backend_stats, int num_backends, + double pressure_ratio, int *high_usage_backends, int num_high_usage) +{ + uint32 base_decay_rate; + uint64 total_usage = 0; + uint64 avg_usage; + int i, + j; + + /* Calculate base decay rate from system pressure */ + if (pressure_ratio > 0.90) + /* Critical pressure - aggressive decay */ + base_decay_rate = 3; + else if (pressure_ratio > 0.75) + /* High pressure */ + base_decay_rate = 2; + else + /* Normal decay rate */ + base_decay_rate = 1; + + /* Calculate total usage for relative comparisons */ + for (i = 0; i < num_backends; i++) + total_usage += backend_stats[i].usage_sum; + avg_usage = num_backends > 0 ? total_usage / num_backends : 0; + + if (base_decay_rate > 1) + elog(DEBUG2, "Buffer pressure: %.2f%%, base decay rate: %u, avg usage: %lu", + pressure_ratio * 100, base_decay_rate, avg_usage); + + /* Update each backend's personalized decay rate */ + for (i = 0; i < ProcGlobal->allProcCount; i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + + /* Only update active user backends */ + if (proc->pid != 0 && proc->databaseId != InvalidOid) + { + uint32 backend_usage = pg_atomic_read_u32(&proc->bufferUsageSum); + uint32 personalized_rate = base_decay_rate; + + /* Find this backend in the stats array */ + BackendBufferStats *backend_stat = NULL; + + for (j = 0; j < num_backends; j++) + { + if (backend_stats[j].backend_id == i) + { + backend_stat = &backend_stats[j]; + break; + } + } + + /* + * Calculate personalized decay rate based on usage and + * clock-sweep performance. + */ + if (backend_stat != NULL && avg_usage > 0) + { + double usage_ratio = (double) backend_usage / avg_usage; + + /* Get clock-sweep performance metrics */ + uint32 search_count = pg_atomic_read_u32(&proc->bufferSearchCount); + uint64 total_distance = pg_atomic_read_u64(&proc->clockSweepDistance); + uint32 total_passes = pg_atomic_read_u32(&proc->clockSweepPasses); + uint64 total_time = pg_atomic_read_u64(&proc->clockSweepTimeMicros); + + /* Calculate average search metrics */ + double avg_distance = search_count > 0 ? (double) total_distance / search_count : 0; + double avg_passes = search_count > 0 ? (double) total_passes / search_count : 0; + double avg_time = search_count > 0 ? (double) total_time / search_count : 0; + + /* Adjust decay rate based on usage relative to average */ + if (usage_ratio > 2.0) + /* High usage backends get more aggressive decay */ + personalized_rate = Min(4, base_decay_rate + 2); + else if (usage_ratio > 1.5) + personalized_rate = Min(4, base_decay_rate + 1); + else if (usage_ratio < 0.5) + /* Low usage backends get less aggressive decay */ + personalized_rate = Max(1, base_decay_rate > 1 ? base_decay_rate - 1 : 1); + + /* Further adjust based on clock-sweep performance */ + if (avg_distance > NBuffers * 0.5) + /* Searching more than half the buffer pool */ + personalized_rate = Min(4, personalized_rate + 1); + if (avg_passes > 1.0) + /* Making multiple complete passes */ + personalized_rate = Min(4, personalized_rate + 1); + if (avg_time > 1000.0) + /* Taking more than 1ms per search */ + personalized_rate = Min(4, personalized_rate + 1); + + elog(DEBUG2, "Backend %d: usage_ratio=%.2f, avg_distance=%.1f, avg_passes=%.2f, " + "avg_time=%.1fμs, decay_rate=%u", + i, usage_ratio, avg_distance, avg_passes, avg_time, personalized_rate); + } + + /* Update the backend's decay rate */ + pg_atomic_write_u32(&proc->bufferDecayRate, personalized_rate); + } + } +} /* * Main entry point for bgwriter process @@ -222,6 +499,15 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) */ for (;;) { + BackendBufferStats *backend_stats = NULL; + int num_backends; + int *high_usage_backends = NULL; + int num_high_usage; + uint64 max_possible; + uint64 total_usage; + double pressure_ratio; + bool high_pressure; + bool critical_pressure; bool can_hibernate; int rc; @@ -230,6 +516,35 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) ProcessMainLoopInterrupts(); + /* Calculate current buffer pressure */ + total_usage = CalculateSystemBufferPressure(&backend_stats, &num_backends); + max_possible = (uint64) NBuffers * BM_MAX_USAGE_COUNT; + total_usage = total_usage > max_possible ? max_possible : total_usage; + pressure_ratio = (double) total_usage / max_possible; + + /* Get high-usage backends (90th percentile) */ + if (backend_stats != NULL) + GetHighUsageBackends(backend_stats, num_backends, + &high_usage_backends, &num_high_usage); + + /* Update global decay rate based on current pressure */ + UpdateBackendDecayRates(backend_stats, num_backends, pressure_ratio, + high_usage_backends, num_high_usage); + + /* Determine if proactive action is needed */ + high_pressure = pressure_ratio > 0.75; /* 75% threshold */ + critical_pressure = pressure_ratio > 0.90; /* 90% threshold */ + + if (high_pressure) + { + elog(LOG, "%s buffer pressure detected: %.2f%% (%d high-usage backends)", + critical_pressure ? "Critical" : "High", + pressure_ratio * 100, num_high_usage); + + /* Aggressive writing of dirty buffers */ + AggressiveBufferWrite(&wb_context, high_usage_backends, num_high_usage, critical_pressure); + } + /* * Do one cycle of dirty-buffer writing. */ @@ -294,6 +609,11 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) } } + if (backend_stats != NULL) + pfree(backend_stats); + if (high_usage_backends != NULL) + pfree(high_usage_backends); + /* * Sleep until we are signaled or BgWriterDelay has elapsed. * diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index 6fd3a6bbac5..dfc5e1f5696 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -124,6 +124,9 @@ BufferManagerShmemInit(void) pg_atomic_init_u32(&buf->state, 0); buf->wait_backend_pgprocno = INVALID_PROC_NUMBER; + /* Initialize dirty backend tracking */ + pg_atomic_init_u32(&buf->dirty_backend_id, INVALID_PROC_NUMBER); + buf->buf_id = i; pgaio_wref_clear(&buf->io_wref); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 719a5bb6f97..ffa52acfbeb 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -2136,6 +2136,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * just like permanent relations. */ victim_buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; + if (MyProc != NULL) + pg_atomic_add_fetch_u32(&MyProc->bufferUsageSum, 1); if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM) victim_buf_state |= BM_PERMANENT; @@ -2781,6 +2783,8 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, victim_buf_hdr->tag = tag; buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; + if (MyProc != NULL) + pg_atomic_add_fetch_u32(&MyProc->bufferUsageSum, 1); if (bmr.relpersistence == RELPERSISTENCE_PERMANENT || fork == INIT_FORKNUM) buf_state |= BM_PERMANENT; @@ -2950,6 +2954,11 @@ MarkBufferDirty(Buffer buffer) Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + /* Track which backend dirtied this buffer */ + if (MyProc != NULL) + pg_atomic_write_u32(&bufHdr->dirty_backend_id, + MyProc - ProcGlobal->allProcs); + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, buf_state)) break; diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 7d59a92bd1a..7a7b8b1ab4e 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -81,7 +81,7 @@ typedef struct BufferAccessStrategyData * struct. */ Buffer buffers[FLEXIBLE_ARRAY_MEMBER]; -} BufferAccessStrategyData; +} BufferAccessStrategyData; /* Prototypes for internal functions */ @@ -174,6 +174,14 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r int bgwprocno; int trycounter; uint32 local_buf_state; /* to avoid repeated (de-)referencing */ + uint32 backend_decay_rate; + + /* Clock-sweep performance tracking */ + instr_time start_time, + end_time; + uint64 buffers_examined = 0; + uint32 complete_passes = 0; + uint32 initial_clock_hand; *from_ring = false; @@ -191,6 +199,18 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } + initial_clock_hand = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer); + if (initial_clock_hand >= NBuffers) + initial_clock_hand %= NBuffers; + + /* Start timing the buffer search */ + INSTR_TIME_SET_CURRENT(start_time); + + /* Get this backend's personalized decay rate */ + backend_decay_rate = pg_atomic_read_u32(&MyProc->bufferDecayRate); + if (backend_decay_rate == 0) + backend_decay_rate = 1; + /* * If asked, we need to waken the bgwriter. Since we don't want to rely on * a spinlock for this we force a read from shared memory once, and then @@ -228,7 +248,10 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r trycounter = NBuffers; for (;;) { - buf = GetBufferDescriptor(ClockSweepTick()); + uint32 hand = ClockSweepTick(); + + buf = GetBufferDescriptor(hand); + buffers_examined++; /* * If the buffer is pinned or has a nonzero usage_count, we cannot use @@ -238,18 +261,53 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0) { - if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0) + uint32 current_usage = BUF_STATE_GET_USAGECOUNT(local_buf_state); + + if (current_usage != 0) { - local_buf_state -= BUF_USAGECOUNT_ONE; + uint32 current_sum; + uint32 new_sum; + uint32 decay_amount = Min(current_usage, backend_decay_rate); + + local_buf_state -= decay_amount * BUF_USAGECOUNT_ONE; + + do + { + current_sum = pg_atomic_read_u32(&MyProc->bufferUsageSum); + if (current_sum < decay_amount) + new_sum = 0; + else + new_sum = current_sum - decay_amount; + } while (!pg_atomic_compare_exchange_u32(&MyProc->bufferUsageSum, + ¤t_sum, new_sum)); trycounter = NBuffers; } else { + uint64 search_time_micros; + + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + + search_time_micros = INSTR_TIME_GET_MICROSEC(end_time); + + /* Update this backend's clock-sweep performance metrics */ + pg_atomic_add_fetch_u64(&MyProc->clockSweepDistance, buffers_examined); + pg_atomic_add_fetch_u32(&MyProc->clockSweepPasses, complete_passes); + pg_atomic_add_fetch_u64(&MyProc->clockSweepTimeMicros, search_time_micros); + pg_atomic_add_fetch_u32(&MyProc->bufferSearchCount, 1); + + elog(DEBUG2, "Buffer search completed: examined=%lu, passes=%u, time=%luμs, decay_rate=%u", + buffers_examined, complete_passes, search_time_micros, backend_decay_rate); + /* Found a usable buffer */ if (strategy != NULL) AddBufferToRing(strategy, buf); *buf_state = local_buf_state; + + pg_atomic_add_fetch_u32(&MyProc->bufferUsageSum, 1); + return buf; } } @@ -266,6 +324,9 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r elog(ERROR, "no unpinned buffers available"); } UnlockBufHdr(buf, local_buf_state); + + if (buffers_examined > 1 && hand == initial_clock_hand) + complete_passes++; } } @@ -305,6 +366,7 @@ StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc) { *num_buf_alloc = pg_atomic_exchange_u32(&StrategyControl->numBufferAllocs, 0); } + SpinLockRelease(&StrategyControl->buffer_strategy_lock); return result; } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index e9ef0fbfe32..fdb2554e3f5 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -528,6 +528,14 @@ InitProcess(void) MyProc->clogGroupMemberLsn = InvalidXLogRecPtr; Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PROC_NUMBER); + /* Initialize buffer usage tracking */ + pg_atomic_init_u32(&MyProc->bufferUsageSum, 0); + pg_atomic_init_u32(&MyProc->bufferDecayRate, 1); + pg_atomic_init_u64(&MyProc->clockSweepDistance, 0); + pg_atomic_init_u32(&MyProc->clockSweepPasses, 0); + pg_atomic_init_u64(&MyProc->clockSweepTimeMicros, 0); + pg_atomic_init_u32(&MyProc->bufferSearchCount, 0); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 9fcc94ef02d..ac87bd90afd 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -266,6 +266,8 @@ typedef struct BufferDesc PgAioWaitRef io_wref; /* set iff AIO is in progress */ LWLock content_lock; /* to lock access to buffer contents */ + + pg_atomic_uint32 dirty_backend_id; /* backend ID that last dirtied this buffer */ } BufferDesc; /* diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c6f5ebceefd..e5daaf99276 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -247,6 +247,16 @@ struct PGPROC uint8 lwWaitMode; /* lwlock mode being waited for */ proclist_node lwWaitLink; /* position in LW lock wait list */ + /* Per-backend buffer usage tracking */ + pg_atomic_uint32 bufferUsageSum; /* Running total of buffer usage */ + pg_atomic_uint32 bufferDecayRate; /* Per-tick usage decay rate */ + + /* Clock-sweep performance metrics */ + pg_atomic_uint64 clockSweepDistance; /* Total buffers examined */ + pg_atomic_uint32 clockSweepPasses; /* Complete clock passes */ + pg_atomic_uint64 clockSweepTimeMicros; /* Total time in microseconds */ + pg_atomic_uint32 bufferSearchCount; /* Number of buffer searches */ + /* Support for condition variables. */ proclist_node cvWaitLink; /* position in CV wait list */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a13e8162890..518f7aa3a92 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -234,6 +234,7 @@ BTWriteState BUF_MEM BYTE BY_HANDLE_FILE_INFORMATION +BackendBufferStats BackendParameters BackendStartupData BackendState @@ -336,6 +337,7 @@ Bucket BufFile Buffer BufferAccessStrategy +BufferAccessStrategyData BufferAccessStrategyType BufferCacheNumaContext BufferCacheNumaRec -- 2.49.0