From 38ccd93b4a70d65e16da3303e9f94851c3a3fb5a Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Sun, 8 Jun 2025 18:53:12 +0200 Subject: [PATCH v20250911 2/8] NUMA: clockweep partitioning Similar to the frelist patch - partition the "clocksweep" algorithm to work on the sequence of smaller partitions, one by one. It extends the "pg_buffercache_partitions" view to include information about the clocksweep activity. Note: This needs some sort of "balancing" when one of the partitions is much busier than the rest (e.g. because there's a single backend consuming a lot of buffers from it). Note: There's a problem with some tests running out of unpinned buffers, due to (intentionally) setting shared buffers very low. That happens because StrategyGetBuffer() only searches a single partition, and it has a couple more issues. --- .../pg_buffercache--1.6--1.7.sql | 8 +- contrib/pg_buffercache/pg_buffercache_pages.c | 32 +- src/backend/storage/buffer/buf_init.c | 11 + src/backend/storage/buffer/bufmgr.c | 186 +++++---- src/backend/storage/buffer/freelist.c | 353 ++++++++++++++++-- src/include/storage/buf_internals.h | 5 +- src/include/storage/bufmgr.h | 5 + src/test/recovery/t/027_stream_regress.pl | 5 + src/tools/pgindent/typedefs.list | 1 + 9 files changed, 503 insertions(+), 103 deletions(-) diff --git a/contrib/pg_buffercache/pg_buffercache--1.6--1.7.sql b/contrib/pg_buffercache/pg_buffercache--1.6--1.7.sql index fb9003c011e..6676e807034 100644 --- a/contrib/pg_buffercache/pg_buffercache--1.6--1.7.sql +++ b/contrib/pg_buffercache/pg_buffercache--1.6--1.7.sql @@ -16,7 +16,13 @@ CREATE VIEW pg_buffercache_partitions AS numa_node integer, -- NUMA node of the partitioon num_buffers integer, -- number of buffers in the partition first_buffer integer, -- first buffer of partition - last_buffer integer); -- last buffer of partition + last_buffer integer, -- last buffer of partition + + -- clocksweep counters + num_passes bigint, -- clocksweep passes + next_buffer integer, -- next victim buffer for clocksweep + total_allocs bigint, -- handled allocs (running total) + num_allocs bigint); -- handled allocs (current cycle) -- Don't want these to be available to public. REVOKE ALL ON FUNCTION pg_buffercache_partitions() FROM PUBLIC; diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c index 8a0a4bd5cd6..c9dfc8a1b82 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.c +++ b/contrib/pg_buffercache/pg_buffercache_pages.c @@ -27,7 +27,7 @@ #define NUM_BUFFERCACHE_EVICT_ALL_ELEM 3 #define NUM_BUFFERCACHE_NUMA_ELEM 3 -#define NUM_BUFFERCACHE_PARTITIONS_ELEM 5 +#define NUM_BUFFERCACHE_PARTITIONS_ELEM 9 PG_MODULE_MAGIC_EXT( .name = "pg_buffercache", @@ -818,6 +818,14 @@ pg_buffercache_partitions(PG_FUNCTION_ARGS) INT4OID, -1, 0); TupleDescInitEntry(tupledesc, (AttrNumber) 5, "last_buffer", INT4OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 6, "num_passes", + INT8OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 7, "next_buffer", + INT4OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 8, "total_allocs", + INT8OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 9, "num_allocs", + INT8OID, -1, 0); funcctx->user_fctx = BlessTupleDesc(tupledesc); @@ -839,12 +847,22 @@ pg_buffercache_partitions(PG_FUNCTION_ARGS) first_buffer, last_buffer; + uint64 buffer_total_allocs; + + uint32 complete_passes, + next_victim_buffer, + buffer_allocs; + Datum values[NUM_BUFFERCACHE_PARTITIONS_ELEM]; bool nulls[NUM_BUFFERCACHE_PARTITIONS_ELEM]; BufferPartitionGet(i, &numa_node, &num_buffers, &first_buffer, &last_buffer); + ClockSweepPartitionGetInfo(i, + &complete_passes, &next_victim_buffer, + &buffer_total_allocs, &buffer_allocs); + values[0] = Int32GetDatum(i); nulls[0] = false; @@ -860,6 +878,18 @@ pg_buffercache_partitions(PG_FUNCTION_ARGS) values[4] = Int32GetDatum(last_buffer); nulls[4] = false; + values[5] = Int64GetDatum(complete_passes); + nulls[5] = false; + + values[6] = Int32GetDatum(next_victim_buffer); + nulls[6] = false; + + values[7] = Int64GetDatum(buffer_total_allocs); + nulls[7] = false; + + values[8] = Int64GetDatum(buffer_allocs); + nulls[8] = false; + /* Build and return the tuple. */ tuple = heap_form_tuple((TupleDesc) funcctx->user_fctx, values, nulls); result = HeapTupleGetDatum(tuple); diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index f51c7db7855..dd9f51529b4 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -716,6 +716,17 @@ BufferPartitionGet(int idx, int *node, int *num_buffers, elog(ERROR, "invalid partition index"); } +/* return parameters before the partitions are initialized (during sizing) */ +void +BufferPartitionParams(int *num_partitions, int *num_nodes) +{ + if (num_partitions) + *num_partitions = numa_partitions; + + if (num_nodes) + *num_nodes = numa_nodes; +} + /* XXX the GUC hooks should probably be somewhere else? */ bool check_debug_numa(char **newval, void **extra, GucSource source) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fe470de63f2..121134bb94c 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -3580,33 +3580,29 @@ BufferSync(int flags) } /* - * BgBufferSync -- Write out some dirty buffers in the pool. + * Information saved between calls so we can determine the strategy + * point's advance rate and avoid scanning already-cleaned buffers. * - * This is called periodically by the background writer process. + * XXX One value per partition. We don't know how many partitions are + * there, so allocate 32, should be enough for the PoC patch. * - * Returns true if it's appropriate for the bgwriter process to go into - * low-power hibernation mode. (This happens if the strategy clock-sweep - * has been "lapped" and no buffer allocations have occurred recently, - * or if the bgwriter has been effectively disabled by setting - * bgwriter_lru_maxpages to 0.) + * XXX might be better to have a per-partition struct with all the info */ -bool -BgBufferSync(WritebackContext *wb_context) +#define MAX_CLOCKSWEEP_PARTITIONS 32 +static bool saved_info_valid = false; +static int prev_strategy_buf_id[MAX_CLOCKSWEEP_PARTITIONS]; +static uint32 prev_strategy_passes[MAX_CLOCKSWEEP_PARTITIONS]; +static int next_to_clean[MAX_CLOCKSWEEP_PARTITIONS]; +static uint32 next_passes[MAX_CLOCKSWEEP_PARTITIONS]; + + +static bool +BgBufferSyncPartition(WritebackContext *wb_context, int num_partitions, + int partition, int recent_alloc_partition) { /* info obtained from freelist.c */ int strategy_buf_id; uint32 strategy_passes; - uint32 recent_alloc; - - /* - * Information saved between calls so we can determine the strategy - * point's advance rate and avoid scanning already-cleaned buffers. - */ - static bool saved_info_valid = false; - static int prev_strategy_buf_id; - static uint32 prev_strategy_passes; - static int next_to_clean; - static uint32 next_passes; /* Moving averages of allocation rate and clean-buffer density */ static float smoothed_alloc = 0; @@ -3634,25 +3630,16 @@ BgBufferSync(WritebackContext *wb_context) long new_strategy_delta; uint32 new_recent_alloc; + /* buffer range for the clocksweep partition */ + int first_buffer; + int num_buffers; + /* * Find out where the clock-sweep currently is, and how many buffer * allocations have happened since our last call. */ - strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc); - - /* Report buffer alloc counts to pgstat */ - PendingBgWriterStats.buf_alloc += recent_alloc; - - /* - * If we're not running the LRU scan, just stop after doing the stats - * stuff. We mark the saved state invalid so that we can recover sanely - * if LRU scan is turned back on later. - */ - if (bgwriter_lru_maxpages <= 0) - { - saved_info_valid = false; - return true; - } + strategy_buf_id = StrategySyncStart(partition, &strategy_passes, + &first_buffer, &num_buffers); /* * Compute strategy_delta = how many buffers have been scanned by the @@ -3664,17 +3651,17 @@ BgBufferSync(WritebackContext *wb_context) */ if (saved_info_valid) { - int32 passes_delta = strategy_passes - prev_strategy_passes; + int32 passes_delta = strategy_passes - prev_strategy_passes[partition]; - strategy_delta = strategy_buf_id - prev_strategy_buf_id; - strategy_delta += (long) passes_delta * NBuffers; + strategy_delta = strategy_buf_id - prev_strategy_buf_id[partition]; + strategy_delta += (long) passes_delta * num_buffers; Assert(strategy_delta >= 0); - if ((int32) (next_passes - strategy_passes) > 0) + if ((int32) (next_passes[partition] - strategy_passes) > 0) { /* we're one pass ahead of the strategy point */ - bufs_to_lap = strategy_buf_id - next_to_clean; + bufs_to_lap = strategy_buf_id - next_to_clean[partition]; #ifdef BGW_DEBUG elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d", next_passes, next_to_clean, @@ -3682,11 +3669,11 @@ BgBufferSync(WritebackContext *wb_context) strategy_delta, bufs_to_lap); #endif } - else if (next_passes == strategy_passes && - next_to_clean >= strategy_buf_id) + else if (next_passes[partition] == strategy_passes && + next_to_clean[partition] >= strategy_buf_id) { /* on same pass, but ahead or at least not behind */ - bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id); + bufs_to_lap = num_buffers - (next_to_clean[partition] - strategy_buf_id); #ifdef BGW_DEBUG elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d", next_passes, next_to_clean, @@ -3706,9 +3693,9 @@ BgBufferSync(WritebackContext *wb_context) strategy_passes, strategy_buf_id, strategy_delta); #endif - next_to_clean = strategy_buf_id; - next_passes = strategy_passes; - bufs_to_lap = NBuffers; + next_to_clean[partition] = strategy_buf_id; + next_passes[partition] = strategy_passes; + bufs_to_lap = num_buffers; } } else @@ -3722,15 +3709,16 @@ BgBufferSync(WritebackContext *wb_context) strategy_passes, strategy_buf_id); #endif strategy_delta = 0; - next_to_clean = strategy_buf_id; - next_passes = strategy_passes; - bufs_to_lap = NBuffers; + next_to_clean[partition] = strategy_buf_id; + next_passes[partition] = strategy_passes; + bufs_to_lap = num_buffers; } /* Update saved info for next time */ - prev_strategy_buf_id = strategy_buf_id; - prev_strategy_passes = strategy_passes; - saved_info_valid = true; + prev_strategy_buf_id[partition] = strategy_buf_id; + prev_strategy_passes[partition] = strategy_passes; + /* XXX this needs to happen only after all partitions */ + /* saved_info_valid = true; */ /* * Compute how many buffers had to be scanned for each new allocation, ie, @@ -3738,9 +3726,9 @@ BgBufferSync(WritebackContext *wb_context) * * If the strategy point didn't move, we don't update the density estimate */ - if (strategy_delta > 0 && recent_alloc > 0) + if (strategy_delta > 0 && recent_alloc_partition > 0) { - scans_per_alloc = (float) strategy_delta / (float) recent_alloc; + scans_per_alloc = (float) strategy_delta / (float) recent_alloc_partition; smoothed_density += (scans_per_alloc - smoothed_density) / smoothing_samples; } @@ -3750,7 +3738,7 @@ BgBufferSync(WritebackContext *wb_context) * strategy point and where we've scanned ahead to, based on the smoothed * density estimate. */ - bufs_ahead = NBuffers - bufs_to_lap; + bufs_ahead = num_buffers - bufs_to_lap; reusable_buffers_est = (float) bufs_ahead / smoothed_density; /* @@ -3758,10 +3746,10 @@ BgBufferSync(WritebackContext *wb_context) * a true average we want a fast-attack, slow-decline behavior: we * immediately follow any increase. */ - if (smoothed_alloc <= (float) recent_alloc) - smoothed_alloc = recent_alloc; + if (smoothed_alloc <= (float) recent_alloc_partition) + smoothed_alloc = recent_alloc_partition; else - smoothed_alloc += ((float) recent_alloc - smoothed_alloc) / + smoothed_alloc += ((float) recent_alloc_partition - smoothed_alloc) / smoothing_samples; /* Scale the estimate by a GUC to allow more aggressive tuning. */ @@ -3788,7 +3776,7 @@ BgBufferSync(WritebackContext *wb_context) * the BGW will be called during the scan_whole_pool time; slice the * buffer pool into that many sections. */ - min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay)); + min_scan_buffers = (int) (num_buffers / (scan_whole_pool_milliseconds / BgWriterDelay)); if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est)) { @@ -3813,20 +3801,20 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int sync_state = SyncOneBuffer(next_to_clean, true, + int sync_state = SyncOneBuffer(next_to_clean[partition], true, wb_context); - if (++next_to_clean >= NBuffers) + if (++next_to_clean[partition] >= (first_buffer + num_buffers)) { - next_to_clean = 0; - next_passes++; + next_to_clean[partition] = first_buffer; + next_passes[partition]++; } num_to_scan--; if (sync_state & BUF_WRITTEN) { reusable_buffers++; - if (++num_written >= bgwriter_lru_maxpages) + if (++num_written >= (bgwriter_lru_maxpages / num_partitions)) { PendingBgWriterStats.maxwritten_clean++; break; @@ -3840,7 +3828,7 @@ BgBufferSync(WritebackContext *wb_context) #ifdef BGW_DEBUG elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d", - recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead, + recent_alloc_partition, smoothed_alloc, strategy_delta, bufs_ahead, smoothed_density, reusable_buffers_est, upcoming_alloc_est, bufs_to_lap - num_to_scan, num_written, @@ -3870,8 +3858,74 @@ BgBufferSync(WritebackContext *wb_context) #endif } + /* can this partition hibernate */ + return (bufs_to_lap == 0 && recent_alloc_partition == 0); +} + +/* + * BgBufferSync -- Write out some dirty buffers in the pool. + * + * This is called periodically by the background writer process. + * + * Returns true if it's appropriate for the bgwriter process to go into + * low-power hibernation mode. (This happens if the strategy clock-sweep + * has been "lapped" and no buffer allocations have occurred recently, + * or if the bgwriter has been effectively disabled by setting + * bgwriter_lru_maxpages to 0.) + */ +bool +BgBufferSync(WritebackContext *wb_context) +{ + /* info obtained from freelist.c */ + uint32 recent_alloc; + uint32 recent_alloc_partition; + int num_partitions; + + /* assume we can hibernate, any partition can set to false */ + bool hibernate = true; + + /* get the number of clocksweep partitions, and total alloc count */ + StrategySyncPrepare(&num_partitions, &recent_alloc); + + Assert(num_partitions <= MAX_CLOCKSWEEP_PARTITIONS); + + /* Report buffer alloc counts to pgstat */ + PendingBgWriterStats.buf_alloc += recent_alloc; + + /* average alloc buffers per partition */ + recent_alloc_partition = (recent_alloc / num_partitions); + + /* + * If we're not running the LRU scan, just stop after doing the stats + * stuff. We mark the saved state invalid so that we can recover sanely + * if LRU scan is turned back on later. + */ + if (bgwriter_lru_maxpages <= 0) + { + saved_info_valid = false; + return true; + } + + /* + * now process the clocksweep partitions, one by one, using the same + * cleanup that we used for all buffers + * + * XXX Maybe we should randomize the order of partitions a bit, so that we + * don't start from partition 0 all the time? Perhaps not entirely, but at + * least pick a random starting point? + */ + for (int partition = 0; partition < num_partitions; partition++) + { + /* hibernate if all partitions can hibernate */ + hibernate &= BgBufferSyncPartition(wb_context, num_partitions, + partition, recent_alloc_partition); + } + + /* now that we've scanned all partitions, mark the cached info as valid */ + saved_info_valid = true; + /* Return true if OK to hibernate */ - return (bufs_to_lap == 0 && recent_alloc == 0); + return hibernate; } /* diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 7d59a92bd1a..d5f8f28f562 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -15,27 +15,47 @@ */ #include "postgres.h" +#ifdef USE_LIBNUMA +#include +#endif + +#ifdef USE_LIBNUMA +#include +#include +#endif + #include "pgstat.h" #include "port/atomics.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" +#include "storage/ipc.h" #include "storage/proc.h" #define INT_ACCESS_ONCE(var) ((int)(*((volatile int *)&(var)))) /* - * The shared freelist control information. + * Information about one partition of the ClockSweep (on a subset of buffers). + * + * XXX Should be careful to align this to cachelines, etc. */ typedef struct { /* Spinlock: protects the values below */ - slock_t buffer_strategy_lock; + slock_t clock_sweep_lock; + + /* range for this clock weep partition */ + int32 firstBuffer; + int32 numBuffers; /* * clock-sweep hand: index of next buffer to consider grabbing. Note that * this isn't a concrete buffer - we only ever increase the value. So, to * get an actual buffer, it needs to be used modulo NBuffers. + * + * XXX This is relative to firstBuffer, so needs to be offset properly. + * + * XXX firstBuffer + (nextVictimBuffer % numBuffers) */ pg_atomic_uint32 nextVictimBuffer; @@ -46,11 +66,34 @@ typedef struct uint32 completePasses; /* Complete cycles of the clock-sweep */ pg_atomic_uint32 numBufferAllocs; /* Buffers allocated since last reset */ + /* running total of allocs */ + pg_atomic_uint64 numTotalAllocs; + +} ClockSweep; + +/* + * The shared freelist control information. + */ +typedef struct +{ + /* Spinlock: protects the values below */ + slock_t buffer_strategy_lock; + /* * Bgworker process to be notified upon activity or -1 if none. See * StrategyNotifyBgWriter. */ int bgwprocno; + // the _attribute_ does not work on Windows, it seems + //int __attribute__((aligned(64))) bgwprocno; + + /* info about freelist partitioning */ + int num_nodes; /* effectively number of NUMA nodes */ + int num_partitions; + int num_partitions_per_node; + + /* clocksweep partitions */ + ClockSweep sweeps[FLEXIBLE_ARRAY_MEMBER]; } BufferStrategyControl; /* Pointers to shared state */ @@ -89,6 +132,7 @@ static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); +static ClockSweep *ChooseClockSweep(void); /* * ClockSweepTick - Helper routine for StrategyGetBuffer() @@ -100,6 +144,7 @@ static inline uint32 ClockSweepTick(void) { uint32 victim; + ClockSweep *sweep = ChooseClockSweep(); /* * Atomically move hand ahead one buffer - if there's several processes @@ -107,14 +152,14 @@ ClockSweepTick(void) * apparent order. */ victim = - pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1); + pg_atomic_fetch_add_u32(&sweep->nextVictimBuffer, 1); - if (victim >= NBuffers) + if (victim >= sweep->numBuffers) { uint32 originalVictim = victim; /* always wrap what we look up in BufferDescriptors */ - victim = victim % NBuffers; + victim = victim % sweep->numBuffers; /* * If we're the one that just caused a wraparound, force @@ -140,19 +185,117 @@ ClockSweepTick(void) * could lead to an overflow of nextVictimBuffers, but that's * highly unlikely and wouldn't be particularly harmful. */ - SpinLockAcquire(&StrategyControl->buffer_strategy_lock); + SpinLockAcquire(&sweep->clock_sweep_lock); - wrapped = expected % NBuffers; + wrapped = expected % sweep->numBuffers; - success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer, + success = pg_atomic_compare_exchange_u32(&sweep->nextVictimBuffer, &expected, wrapped); if (success) - StrategyControl->completePasses++; - SpinLockRelease(&StrategyControl->buffer_strategy_lock); + sweep->completePasses++; + SpinLockRelease(&sweep->clock_sweep_lock); } } } - return victim; + + /* XXX buffer IDs are 1-based, we're calculating 0-based indexes */ + Assert(BufferIsValid(1 + sweep->firstBuffer + (victim % sweep->numBuffers))); + + return sweep->firstBuffer + victim; +} + +/* + * calculate_partition_index + * calculate the buffer / clock-sweep partition to use + * + * With libnuma, use the NUMA node and CPU to pick the partition. Otherwise + * use just PID instead of CPU (we assume everything is a single NUMA node). + */ +static int +calculate_partition_index(void) +{ + int cpu, + node, + index; + + /* + * The buffers are partitioned, so determine the CPU/NUMA node, and pick a + * partition based on that. + * + * Without NUMA assume everything is a single NUMA node, and we pick the + * partition based on PID (we may not have sched_getcpu). + */ +#ifdef USE_LIBNUMA + cpu = sched_getcpu(); + + if (cpu < 0) + elog(ERROR, "sched_getcpu failed: %m"); + + node = numa_node_of_cpu(cpu); +#else + cpu = MyProcPid; + node = 0; +#endif + + Assert(StrategyControl->num_partitions == + (StrategyControl->num_nodes * StrategyControl->num_partitions_per_node)); + + /* + * XXX We should't get nodes that we haven't considered while building the + * partitions. Maybe if we allow this (e.g. due to support adjusting the + * NUMA stuff at runtime), we should just do our best to minimize the + * conflicts somehow. But it'll make the mapping harder, so for now we + * ignore it. + */ + if (node > StrategyControl->num_nodes) + elog(ERROR, "node out of range: %d > %u", cpu, StrategyControl->num_nodes); + + /* + * Find the partition. If we have a single partition per node, we can + * calculate the index directly from node. Otherwise we need to do two + * steps, using node and then cpu. + */ + if (StrategyControl->num_partitions_per_node == 1) + { + /* fast-path */ + index = (node % StrategyControl->num_partitions); + } + else + { + int index_group, + index_part; + + /* two steps - calculate group from node, partition from cpu */ + index_group = (node % StrategyControl->num_nodes); + index_part = (cpu % StrategyControl->num_partitions_per_node); + + index = (index_group * StrategyControl->num_partitions_per_node) + + index_part; + } + + return index; +} + +/* + * ChooseClockSweep + * pick a clocksweep partition based on NUMA node and CPU + * + * The number of clocksweep partitions may not match the number of NUMA + * nodes, but it should not be lower. Each partition should be mapped to + * a single NUMA node, but a node may have multiple partitions. If there + * are multiple partitions per node (all nodes have the same number of + * partitions), we pick the partition using CPU. + * + * XXX Maybe we should do both the total and "per group" counts a power of + * two? That'd allow using shifts instead of divisions in the calculation, + * and that's cheaper. But how would that deal with odd number of nodes? + */ +static ClockSweep * +ChooseClockSweep(void) +{ + int index = calculate_partition_index(); + + return &StrategyControl->sweeps[index]; } /* @@ -222,9 +365,35 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r * the rate of buffer consumption. Note that buffers recycled by a * strategy object are intentionally not counted here. */ - pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1); + pg_atomic_fetch_add_u32(&ChooseClockSweep()->numBufferAllocs, 1); - /* Use the "clock sweep" algorithm to find a free buffer */ + /* + * Use the "clock sweep" algorithm to find a free buffer + * + * XXX Note that ClockSweepTick() is NUMA-aware, i.e. it only looks at + * buffers from a single partition, aligned with the NUMA node. That means + * it only accesses buffers from the same NUMA node. + * + * XXX That also means each process "sweeps" only a fraction of buffers, + * even if the other buffers are better candidates for eviction. Maybe + * there should be some logic to "steal" buffers from other freelists or + * other nodes? + * + * XXX Would that also mean we'd have multiple bgwriters, one for each + * node, or would one bgwriter handle all of that? + * + * XXX This only searches a single partition, which can result in "no + * unpinned buffers available" even if there are buffers in other + * partitions. Should be fixed by falling back to other partitions if + * needed. + * + * XXX Also, the trycounter should not be set to NBuffers, but to buffer + * count for that one partition. In fact, this should not call ClockSweepTick + * for every iteration. The call is likely quite expensive (does a lot + * of stuff), and also may return a different partition on each call. + * We should just do it once, and then do the for(;;) loop. And then + * maybe advance to the next partition, until we scan through all of them. + */ trycounter = NBuffers; for (;;) { @@ -269,6 +438,46 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * StrategySyncPrepare -- prepare for sync of all partitions + * + * Determine the number of clocksweep partitions, and calculate the recent + * buffers allocs (as a sum of all the partitions). This allows BgBufferSync + * to calculate average number of allocations per partition for the next + * sync cycle. + * + * In addition it returns the count of recent buffer allocs, which is a total + * summed from all partitions. The alloc counts are reset after being read, + * as the partitions are walked. + */ +void +StrategySyncPrepare(int *num_parts, uint32 *num_buf_alloc) +{ + *num_buf_alloc = 0; + *num_parts = StrategyControl->num_partitions; + + /* + * We lock the partitions one by one, so not exacly in sync, but that + * should be fine. We're only looking for heuristics anyway. + */ + for (int i = 0; i < StrategyControl->num_partitions; i++) + { + ClockSweep *sweep = &StrategyControl->sweeps[i]; + + SpinLockAcquire(&sweep->clock_sweep_lock); + if (num_buf_alloc) + { + uint32 allocs = pg_atomic_exchange_u32(&sweep->numBufferAllocs, 0); + + /* include the count in the running total */ + pg_atomic_fetch_add_u64(&sweep->numTotalAllocs, allocs); + + *num_buf_alloc += allocs; + } + SpinLockRelease(&sweep->clock_sweep_lock); + } +} + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * @@ -276,37 +485,44 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r * BgBufferSync() will proceed circularly around the buffer array from there. * * In addition, we return the completed-pass count (which is effectively - * the higher-order bits of nextVictimBuffer) and the count of recent buffer - * allocs if non-NULL pointers are passed. The alloc count is reset after - * being read. + * the higher-order bits of nextVictimBuffer). + * + * This only considers a single clocksweep partition, as BgBufferSync looks + * at them one by one. */ int -StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc) +StrategySyncStart(int partition, uint32 *complete_passes, + int *first_buffer, int *num_buffers) { uint32 nextVictimBuffer; int result; + ClockSweep *sweep = &StrategyControl->sweeps[partition]; - SpinLockAcquire(&StrategyControl->buffer_strategy_lock); - nextVictimBuffer = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer); - result = nextVictimBuffer % NBuffers; + Assert((partition >= 0) && (partition < StrategyControl->num_partitions)); + + SpinLockAcquire(&sweep->clock_sweep_lock); + nextVictimBuffer = pg_atomic_read_u32(&sweep->nextVictimBuffer); + result = nextVictimBuffer % sweep->numBuffers; + + *first_buffer = sweep->firstBuffer; + *num_buffers = sweep->numBuffers; if (complete_passes) { - *complete_passes = StrategyControl->completePasses; + *complete_passes = sweep->completePasses; /* * Additionally add the number of wraparounds that happened before * completePasses could be incremented. C.f. ClockSweepTick(). */ - *complete_passes += nextVictimBuffer / NBuffers; + *complete_passes += nextVictimBuffer / sweep->numBuffers; } + SpinLockRelease(&sweep->clock_sweep_lock); - if (num_buf_alloc) - { - *num_buf_alloc = pg_atomic_exchange_u32(&StrategyControl->numBufferAllocs, 0); - } - SpinLockRelease(&StrategyControl->buffer_strategy_lock); - return result; + /* XXX buffer IDs start at 1, we're calculating 0-based indexes */ + Assert(BufferIsValid(1 + sweep->firstBuffer + result)); + + return sweep->firstBuffer + result; } /* @@ -343,6 +559,9 @@ Size StrategyShmemSize(void) { Size size = 0; + int num_partitions; + + BufferPartitionParams(&num_partitions, NULL); /* size of lookup hash table ... see comment in StrategyInitialize */ size = add_size(size, BufTableShmemSize(NBuffers + NUM_BUFFER_PARTITIONS)); @@ -350,6 +569,10 @@ StrategyShmemSize(void) /* size of the shared replacement strategy control block */ size = add_size(size, MAXALIGN(sizeof(BufferStrategyControl))); + /* size of clocksweep partitions (at least one per NUMA node) */ + size = add_size(size, MAXALIGN(mul_size(sizeof(ClockSweep), + num_partitions))); + return size; } @@ -365,6 +588,18 @@ StrategyInitialize(bool init) { bool found; + int num_nodes; + int num_partitions; + int num_partitions_per_node; + + num_partitions = BufferPartitionCount(); + num_nodes = BufferPartitionNodes(); + + /* always a multiple of NUMA nodes */ + Assert(num_partitions % num_nodes == 0); + + num_partitions_per_node = (num_partitions / num_nodes); + /* * Initialize the shared buffer lookup hashtable. * @@ -382,7 +617,8 @@ StrategyInitialize(bool init) */ StrategyControl = (BufferStrategyControl *) ShmemInitStruct("Buffer Strategy Status", - sizeof(BufferStrategyControl), + MAXALIGN(offsetof(BufferStrategyControl, sweeps)) + + MAXALIGN(sizeof(ClockSweep) * num_partitions), &found); if (!found) @@ -394,15 +630,44 @@ StrategyInitialize(bool init) SpinLockInit(&StrategyControl->buffer_strategy_lock); - /* Initialize the clock-sweep pointer */ - pg_atomic_init_u32(&StrategyControl->nextVictimBuffer, 0); + /* Initialize the clock sweep pointers (for all partitions) */ + for (int i = 0; i < num_partitions; i++) + { + int node, + num_buffers, + first_buffer, + last_buffer; + + SpinLockInit(&StrategyControl->sweeps[i].clock_sweep_lock); + + pg_atomic_init_u32(&StrategyControl->sweeps[i].nextVictimBuffer, 0); - /* Clear statistics */ - StrategyControl->completePasses = 0; - pg_atomic_init_u32(&StrategyControl->numBufferAllocs, 0); + /* get info about the buffer partition */ + BufferPartitionGet(i, &node, &num_buffers, + &first_buffer, &last_buffer); + + /* + * FIXME This may not quite right, because if NBuffers is not a + * perfect multiple of numBuffers, the last partition will have + * numBuffers set too high. buf_init handles this by tracking the + * remaining number of buffers, and not overflowing. + */ + StrategyControl->sweeps[i].numBuffers = num_buffers; + StrategyControl->sweeps[i].firstBuffer = first_buffer; + + /* Clear statistics */ + StrategyControl->sweeps[i].completePasses = 0; + pg_atomic_init_u32(&StrategyControl->sweeps[i].numBufferAllocs, 0); + pg_atomic_init_u64(&StrategyControl->sweeps[i].numTotalAllocs, 0); + } /* No pending notification */ StrategyControl->bgwprocno = -1; + + /* initialize the partitioned clocksweep */ + StrategyControl->num_partitions = num_partitions; + StrategyControl->num_nodes = num_nodes; + StrategyControl->num_partitions_per_node = num_partitions_per_node; } else Assert(!init); @@ -739,3 +1004,23 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r return true; } + +void +ClockSweepPartitionGetInfo(int idx, + uint32 *complete_passes, uint32 *next_victim_buffer, + uint64 *buffer_total_allocs, uint32 *buffer_allocs) +{ + ClockSweep *sweep = &StrategyControl->sweeps[idx]; + + Assert((idx >= 0) && (idx < StrategyControl->num_partitions)); + + /* get the clocksweep stats */ + *complete_passes = sweep->completePasses; + *next_victim_buffer = pg_atomic_read_u32(&sweep->nextVictimBuffer); + + *buffer_allocs = pg_atomic_read_u32(&sweep->numBufferAllocs); + *buffer_total_allocs = pg_atomic_read_u64(&sweep->numTotalAllocs); + + /* calculate the actual buffer ID */ + *next_victim_buffer = sweep->firstBuffer + (*next_victim_buffer % sweep->numBuffers); +} diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 294188e21c5..5cce690933b 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -439,7 +439,9 @@ extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_ring); -extern int StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc); +extern void StrategySyncPrepare(int *num_parts, uint32 *num_buf_alloc); +extern int StrategySyncStart(int partition, uint32 *complete_passes, + int *first_buffer, int *num_buffers); extern void StrategyNotifyBgWriter(int bgwprocno); extern Size StrategyShmemSize(void); @@ -485,5 +487,6 @@ extern int BufferPartitionCount(void); extern int BufferPartitionNodes(void); extern void BufferPartitionGet(int idx, int *node, int *num_buffers, int *first_buffer, int *last_buffer); +extern void BufferPartitionParams(int *num_partitions, int *num_nodes); #endif /* BUFMGR_INTERNALS_H */ diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index e52fca9e483..9ade69e53b5 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -355,6 +355,11 @@ extern int GetAccessStrategyBufferCount(BufferAccessStrategy strategy); extern int GetAccessStrategyPinLimit(BufferAccessStrategy strategy); extern void FreeAccessStrategy(BufferAccessStrategy strategy); +extern void ClockSweepPartitionGetInfo(int idx, + uint32 *complete_passes, + uint32 *next_victim_buffer, + uint64 *buffer_total_allocs, + uint32 *buffer_allocs); /* inline functions */ diff --git a/src/test/recovery/t/027_stream_regress.pl b/src/test/recovery/t/027_stream_regress.pl index 589c79d97d3..98b146ed4b7 100644 --- a/src/test/recovery/t/027_stream_regress.pl +++ b/src/test/recovery/t/027_stream_regress.pl @@ -18,6 +18,11 @@ $node_primary->adjust_conf('postgresql.conf', 'max_connections', '25'); $node_primary->append_conf('postgresql.conf', 'max_prepared_transactions = 10'); +# The default is 1MB, which is not enough with clock-sweep partitioning. +# Increase to 32MB, so that we don't get "no unpinned buffers". +$node_primary->append_conf('postgresql.conf', + 'shared_buffers = 32MB'); + # Enable pg_stat_statements to force tests to do query jumbling. # pg_stat_statements.max should be large enough to hold all the entries # of the regression database. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 50195718294..b68f75b7f31 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -427,6 +427,7 @@ ClientCertName ClientConnectionInfo ClientData ClientSocket +ClockSweep ClonePtrType ClosePortalStmt ClosePtrType -- 2.51.0