From 36b69e2028c8febb35ca2c45327384251c44b775 Mon Sep 17 00:00:00 2001 From: Alexandre Felipe Date: Wed, 11 Mar 2026 12:05:50 +0000 Subject: [PATCH 1/5] Benchmark buffer pinning --- src/test/modules/test_buffer_pin/Makefile | 18 + src/test/modules/test_buffer_pin/benchmark.py | 185 ++++++++++ .../modules/test_buffer_pin/requirements.txt | 9 + .../test_buffer_pin/test_buffer_pin--1.0.sql | 66 ++++ .../modules/test_buffer_pin/test_buffer_pin.c | 323 ++++++++++++++++++ .../test_buffer_pin/test_buffer_pin.control | 4 + 6 files changed, 605 insertions(+) create mode 100644 src/test/modules/test_buffer_pin/Makefile create mode 100755 src/test/modules/test_buffer_pin/benchmark.py create mode 100644 src/test/modules/test_buffer_pin/requirements.txt create mode 100644 src/test/modules/test_buffer_pin/test_buffer_pin--1.0.sql create mode 100644 src/test/modules/test_buffer_pin/test_buffer_pin.c create mode 100644 src/test/modules/test_buffer_pin/test_buffer_pin.control diff --git a/src/test/modules/test_buffer_pin/Makefile b/src/test/modules/test_buffer_pin/Makefile new file mode 100644 index 0000000000..2707f84c5b --- /dev/null +++ b/src/test/modules/test_buffer_pin/Makefile @@ -0,0 +1,18 @@ +MODULE_big = test_buffer_pin +OBJS = test_buffer_pin.o + +EXTENSION = test_buffer_pin +DATA = test_buffer_pin--1.0.sql + +PGFILEDESC = "test_buffer_pin - buffer pinning benchmarks" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_buffer_pin +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_buffer_pin/benchmark.py b/src/test/modules/test_buffer_pin/benchmark.py new file mode 100755 index 0000000000..004044bc5c --- /dev/null +++ b/src/test/modules/test_buffer_pin/benchmark.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +""" +Buffer pinning benchmark - runs SQL benchmarks and saves results. + +Usage: + python3 benchmark.py [options] + +Examples: + python3 benchmark.py --port 5434 --name pg18-baseline --title "PostgreSQL 18 Baseline" + python3 benchmark.py --port 5434 --name patch-0005 --title "With Patch 0005" +""" + +import argparse +import numpy as np +import pandas as pd +from sqlalchemy import create_engine, text +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path + + +def geomspace_int(start, stop, num): + """Generate geometrically spaced integers, pushing duplicates forward.""" + raw = np.geomspace(start, stop, num) + result = [] + for v in raw: + candidate = max(int(round(v)), result[-1] + 1 if result else 1) + result.append(candidate) + return result + + +def ensure_test_table(conn, table_name, min_blocks): + """Create a table with enough blocks for the benchmark.""" + rows_needed = min_blocks * 226 + 10000 + print(f"Creating table {table_name} with ~{min_blocks} blocks...") + conn.execute(text(f"DROP TABLE IF EXISTS {table_name}")) + conn.execute(text(f"CREATE UNLOGGED TABLE {table_name} AS SELECT generate_series(1, {rows_needed}) AS id")) + result = conn.execute(text(f"SELECT pg_relation_size('{table_name}') / 8192 as blocks")).fetchone() + assert result[0] >= min_blocks, f"Table {table_name} has {result[0]} blocks, expected at least {min_blocks}" + +def run_benchmark(conn, func_template, buffer_counts): + """Run a benchmark function across all buffer counts and patterns in one query.""" + array_str = str(buffer_counts) + sql = text(f""" + SELECT + CASE WHEN r.random THEN 'random' ELSE 'sequential' END as pattern, + b.num_buffers, + percentile_cont(0.5) WITHIN GROUP (ORDER BY bench.per_op_ns) as median_ns + FROM + unnest(ARRAY[false, true]) AS r(random), + unnest(ARRAY[{array_str}]) AS b(num_buffers), + LATERAL {func_template} AS bench + WHERE bench.iteration > 0 + GROUP BY r.random, b.num_buffers + ORDER BY r.random, b.num_buffers + """) + return [dict(row._mapping) for row in conn.execute(sql)] + + +def save_results(results_dir, name, results_dict, title): + """Save benchmark results to CSV and generate SVG plot.""" + results_dir = Path(results_dir) + results_dir.mkdir(parents=True, exist_ok=True) + + all_data = [] + for bench_name, data in results_dict.items(): + df = pd.DataFrame(data) + df['benchmark'] = bench_name + all_data.append(df) + + combined_df = pd.concat(all_data, ignore_index=True) + + csv_path = results_dir / f"{name}.csv" + combined_df.to_csv(csv_path, index=False) + print(f"Saved CSV: {csv_path}") + return combined_df + + +def plot_results(results_dir, name, results_dict, title): + """Generate SVG plot from benchmark results.""" + results_dir = Path(results_dir) + + fig, ax = plt.subplots(figsize=(10, 6)) + + benchmarks = [ + ('read', 'blue', 'o', 'ReadBuffer/ReleaseBuffer'), + ('pinning', 'green', 's', 'IncrBufferRefCount/ReleaseBuffer'), + ('locking', 'purple', 'd', 'LockBuffer/UnlockBuffer'), + ('resowner', 'red', '^', 'ResourceOwner Remember/Forget'), + ] + + for bench_name, color, marker, label in benchmarks: + if bench_name not in results_dict: + continue + df = pd.DataFrame(results_dict[bench_name]) + + seq_data = df[df['pattern'] == 'sequential'] + ax.plot(seq_data['num_buffers'], seq_data['median_ns'], + color=color, linestyle='-', marker=marker, + linewidth=2, markersize=6, label=f'{label} (seq)') + + rand_data = df[df['pattern'] == 'random'] + ax.plot(rand_data['num_buffers'], rand_data['median_ns'], + color=color, linestyle='--', marker=marker, + linewidth=2, markersize=6, label=f'{label} (rand)') + + ax.set_xlabel('Number of Buffers (sliding window)') + ax.set_ylabel('Time per operation (ns)') + ax.set_title(title) + ax.set_ylim(0, None) + ax.grid(True, alpha=0.3) + ax.set_xscale('log', base=2) + ax.legend(fontsize=8, loc='upper left') + + plt.tight_layout() + svg_path = results_dir / f"{name}.svg" + plt.savefig(svg_path, format='svg') + plt.close() + print(f"Saved SVG: {svg_path}") + + +def main(): + parser = argparse.ArgumentParser(description='Buffer pinning benchmark') + parser.add_argument('--iterations', '-n', type=int, default=10000, + help='Number of operations per sample (default: 10000)') + parser.add_argument('--samples', type=int, default=50, + help='Number of samples per data point (default: 50)') + parser.add_argument('--port', type=int, default=5432, + help='PostgreSQL port (default: 5432)') + parser.add_argument('--points', type=int, default=50, + help='Number of data points (default: 200)') + parser.add_argument('--max-dist', type=int, default=500, + help='Maximum buffer count to test (default: 512)') + parser.add_argument('--name', default='benchmark', + help='Name for output files (default: benchmark)') + parser.add_argument('--title', default='PostgreSQL Buffer Pinning Performance', + help='Title for the plot') + parser.add_argument('--results-dir', default=None, + help='Directory for results (default: ./results)') + parser.add_argument('--table', default='bench_large', + help='Table name to use for benchmarks (default: bench_large)') + args = parser.parse_args() + + if args.results_dir is None: + args.results_dir = Path(__file__).parent / 'results' + + engine = create_engine(f'postgresql://localhost:{args.port}/postgres') + buffer_counts = geomspace_int(1, args.max_dist, args.points) + + min_blocks_needed = args.max_dist + 100 + + with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + conn.execute(text("CREATE EXTENSION IF NOT EXISTS test_buffer_pin")) + ensure_test_table(conn, args.table, min_blocks_needed) + + + n, s = args.iterations, args.samples + print(f"\nRunning benchmarks: {n} iterations, {s} samples, max {args.max_dist} buffers") + + results = {} + + print(" Running: bench_pinning...") + results['pinning'] = run_benchmark(conn, + f"bench_pinning('{args.table}', b.num_buffers, {n}, {s}, r.random)", + buffer_counts) + + print(" Running: bench_prefetch_pipeline...") + results['read'] = run_benchmark(conn, + f"bench_prefetch_pipeline('{args.table}', b.num_buffers, {n}, {s}, r.random)", + buffer_counts) + + print(" Running: bench_resowner...") + results['resowner'] = run_benchmark(conn, + f"bench_resowner(b.num_buffers, {n}, {s}, r.random)", + buffer_counts) + + save_results(args.results_dir, args.name, results, args.title) + plot_results(args.results_dir, args.name, results, args.title) + + print(f"\nDone! Results saved to {args.results_dir}/{args.name}.*") + + +if __name__ == '__main__': + main() diff --git a/src/test/modules/test_buffer_pin/requirements.txt b/src/test/modules/test_buffer_pin/requirements.txt new file mode 100644 index 0000000000..e73ab56927 --- /dev/null +++ b/src/test/modules/test_buffer_pin/requirements.txt @@ -0,0 +1,9 @@ +# Requirements for benchmark.py +# pip install -r requirements.txt +# not enforcing versions, as it might simply +# work with your installed versions +numpy # tested with 1.24+ +pandas # tested with 2.0+ +sqlalchemy # tested with 2.0+ +matplotlib # tested with 3.7+ +psycopg2-binary # tested with 2.9+ diff --git a/src/test/modules/test_buffer_pin/test_buffer_pin--1.0.sql b/src/test/modules/test_buffer_pin/test_buffer_pin--1.0.sql new file mode 100644 index 0000000000..4c43c41496 --- /dev/null +++ b/src/test/modules/test_buffer_pin/test_buffer_pin--1.0.sql @@ -0,0 +1,66 @@ +/* test_buffer_pin--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_buffer_pin" to load this file. \quit + +CREATE FUNCTION bench_prefetch_pipeline( + relname text, + prefetch_dist int, + num_ops int, + iterations int, + random_access bool +) +RETURNS TABLE(iteration int, total_ns bigint, per_op_ns float8) +AS 'MODULE_PATHNAME', 'bench_prefetch_pipeline' +LANGUAGE C STRICT; + +COMMENT ON FUNCTION bench_prefetch_pipeline IS +'Benchmark buffer pinning with sliding window prefetch simulation. +Arguments: + relname - name of the relation to use + prefetch_dist - number of buffers to keep pinned (sliding window size) + num_ops - number of pin/unpin operations to perform + iterations - number of times to repeat the benchmark + random_access - if true, access blocks randomly; if false, sequentially +'; + +CREATE FUNCTION bench_pinning( + relname text, + num_buffers int, + num_ops int, + iterations int, + random_access bool +) +RETURNS TABLE(iteration int, total_ns bigint, per_op_ns float8) +AS 'MODULE_PATHNAME', 'bench_pinning' +LANGUAGE C STRICT; + +COMMENT ON FUNCTION bench_pinning IS +'Benchmark pure pin/unpin operations without buffer lookup. +Uses IncrBufferRefCount/ReleaseBuffer on pre-pinned buffers to isolate +refcount tracking overhead from buffer table lookups and I/O. +Arguments: + relname - name of the relation to use for pinning buffers + num_buffers - number of buffers to keep as base pins + num_ops - number of pin/unpin pairs to perform + iterations - number of times to repeat the benchmark + random_access - if true, access buffers randomly; if false, sequentially'; + +CREATE FUNCTION bench_resowner( + num_buffers int, + num_ops int, + iterations int, + random_access bool +) +RETURNS TABLE(iteration int, total_ns bigint, per_op_ns float8) +AS 'MODULE_PATHNAME', 'bench_resowner' +LANGUAGE C STRICT; + +COMMENT ON FUNCTION bench_resowner IS +'Benchmark ResourceOwner remember/forget operations only. +Uses fake resource values - no actual resources are tracked. +Arguments: + num_buffers - number of fake resources to track + num_ops - number of remember/forget pairs to perform + iterations - number of times to repeat the benchmark + random_access - if true, access resources randomly; if false, sequentially'; diff --git a/src/test/modules/test_buffer_pin/test_buffer_pin.c b/src/test/modules/test_buffer_pin/test_buffer_pin.c new file mode 100644 index 0000000000..7b6c91678f --- /dev/null +++ b/src/test/modules/test_buffer_pin/test_buffer_pin.c @@ -0,0 +1,323 @@ +/* + * test_buffer_pin.c - Buffer pinning benchmark + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "catalog/namespace.h" +#include "common/pg_prng.h" +#include "fmgr.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "portability/instr_time.h" +#include "storage/buf_internals.h" +#include "storage/bufmgr.h" +#include "utils/builtins.h" +#include "utils/rel.h" +#include "utils/resowner.h" +#include "utils/varlena.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(bench_prefetch_pipeline); +PG_FUNCTION_INFO_V1(bench_pinning); +PG_FUNCTION_INFO_V1(bench_resowner); + +/* Custom ResourceOwnerDesc for benchmark - does nothing on release */ +static void bench_release_resource(Datum res) { /* no-op */ } + +static const ResourceOwnerDesc bench_resowner_desc = { + .name = "BenchmarkResource", + .release_phase = RESOURCE_RELEASE_AFTER_LOCKS, + .release_priority = RELEASE_PRIO_FIRST, + .ReleaseResource = bench_release_resource, + .DebugPrint = NULL, +}; + +/* + * Generate an access sequence for benchmarking. + * If random_access is true, uses Fisher-Yates shuffle. + * Otherwise, generates sequential pattern modulo num_items. + */ +static void +generate_access_sequence(int *sequence, int num_operations, int num_items, bool random_access) +{ + for (int i = 0; i < num_operations; i++) + sequence[i] = i % num_items; + + if (random_access) + { + for (int i = num_operations - 1; i > 0; i--) + { + int j = pg_prng_uint64_range(&pg_global_prng_state, 0, i); + int tmp = sequence[i]; + sequence[i] = sequence[j]; + sequence[j] = tmp; + } + } +} + + +/* + * bench_pinning - benchmark pure pin/unpin operations + * + * Warms up the cache by reading all blocks in the relation. + * Precompute a block sequence in which the buffers will be read. + * Then scan times repeatedly a scan following the block sequence. + * with a fixed distance of `num_buffers` (plus ramp up and ramp down) +*/ +Datum +bench_prefetch_pipeline(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + text *relname_text = PG_GETARG_TEXT_PP(0); + int num_buffers = PG_GETARG_INT32(1); + int num_operations = PG_GETARG_INT32(2); + int iterations = PG_GETARG_INT32(3); + bool random_access = PG_GETARG_BOOL(4); + + Oid relid; + Relation rel; + BlockNumber nblocks; + Buffer *pipeline; + int *block_sequence; + int iter; + + Tuplestorestate *tupstore; + TupleDesc tupdesc; + Datum values[3]; + bool nulls[3] = {false, false, false}; + + if (num_buffers < 1 || num_operations < num_buffers) + ereport(ERROR, (errmsg("invalid parameters"))); + + InitMaterializedSRF(fcinfo, 0); + tupstore = rsinfo->setResult; + tupdesc = rsinfo->setDesc; + + relid = RangeVarGetRelid(makeRangeVarFromNameList(textToQualifiedNameList(relname_text)), AccessShareLock, false); + rel = relation_open(relid, AccessShareLock); + nblocks = RelationGetNumberOfBlocks(rel); + + if (nblocks == 0) + ereport(ERROR, (errmsg("relation has no blocks"))); + + pipeline = palloc0(num_buffers * sizeof(Buffer)); + block_sequence = palloc(num_operations * sizeof(int)); + + /* Warm up */ + for (int i = 0; i < num_operations && i < (int) nblocks; i++) + { + Buffer buf = ReadBuffer(rel, i); + ReleaseBuffer(buf); + } + generate_access_sequence(block_sequence, num_operations, nblocks, random_access); + + for (iter = 0; iter < iterations; iter++) + { + instr_time start_time, end_time; + int64 elapsed_ns; + + INSTR_TIME_SET_CURRENT(start_time); + + for (int i = 0; i < num_operations + num_buffers; i++) + { + if (i >= num_buffers) + ReleaseBuffer(pipeline[(i - num_buffers) % num_buffers]); + if (i < num_operations) + pipeline[i % num_buffers] = ReadBuffer(rel, block_sequence[i]); + } + + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + elapsed_ns = INSTR_TIME_GET_NANOSEC(end_time); + + values[0] = Int32GetDatum(iter); + values[1] = Int64GetDatum(elapsed_ns); + values[2] = Float8GetDatum((double) elapsed_ns / num_operations); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + pfree(pipeline); + pfree(block_sequence); + relation_close(rel, AccessShareLock); + + return (Datum) 0; +} + +/* + * bench_pinning - benchmark pure pin/unpin operations + * + * Pre-reads buffers into cache, then times IncrBufferRefCount/ReleaseBuffer + * cycles in a pipelined fashion. This is intended to separate the + * pin count dependent part of the ReadBuffer/ReleaseBuffer operations + */ +Datum +bench_pinning(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + text *relname_text = PG_GETARG_TEXT_PP(0); + int num_buffers = PG_GETARG_INT32(1); + int num_operations = PG_GETARG_INT32(2); + int iterations = PG_GETARG_INT32(3); + bool random_access = PG_GETARG_BOOL(4); + + Oid relid; + Relation rel; + BlockNumber nblocks; + Buffer *base_buffers; + Buffer *pipeline; + int *access_sequence; + int iter; + + Tuplestorestate *tupstore; + TupleDesc tupdesc; + Datum values[3]; + bool nulls[3] = {false, false, false}; + + if (num_buffers < 1 || num_operations < num_buffers) + ereport(ERROR, (errmsg("invalid parameters"))); + + InitMaterializedSRF(fcinfo, 0); + tupstore = rsinfo->setResult; + tupdesc = rsinfo->setDesc; + + relid = RangeVarGetRelid(makeRangeVarFromNameList(textToQualifiedNameList(relname_text)), AccessShareLock, false); + rel = relation_open(relid, AccessShareLock); + nblocks = RelationGetNumberOfBlocks(rel); + + if ((BlockNumber) num_buffers > nblocks) + ereport(ERROR, (errmsg("not enough blocks in relation"))); + + base_buffers = palloc(num_buffers * sizeof(Buffer)); + pipeline = palloc(num_buffers * sizeof(Buffer)); + access_sequence = palloc(num_operations * sizeof(int)); + + generate_access_sequence(access_sequence, num_operations, num_buffers, random_access); + + /* Pin the buffers as base pins (keeps them in cache) */ + for (int i = 0; i < num_buffers; i++) + base_buffers[i] = ReadBuffer(rel, i); + + for (iter = 0; iter < iterations; iter++) + { + instr_time start_time, end_time; + int64 elapsed_ns; + + INSTR_TIME_SET_CURRENT(start_time); + + for (int i = 0; i < num_operations + num_buffers; i++) + { + if (i >= num_buffers) + ReleaseBuffer(pipeline[(i - num_buffers) % num_buffers]); + if (i < num_operations) + { + Buffer buf = base_buffers[access_sequence[i]]; + IncrBufferRefCount(buf); + pipeline[i % num_buffers] = buf; + } + } + + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + elapsed_ns = INSTR_TIME_GET_NANOSEC(end_time); + + values[0] = Int32GetDatum(iter); + values[1] = Int64GetDatum(elapsed_ns); + values[2] = Float8GetDatum((double) elapsed_ns / num_operations); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* Release base pins */ + for (int i = 0; i < num_buffers; i++) + ReleaseBuffer(base_buffers[i]); + + pfree(access_sequence); + pfree(pipeline); + pfree(base_buffers); + relation_close(rel, AccessShareLock); + + return (Datum) 0; +} + +/* + * bench_resowner - benchmark ResourceOwner remember/forget operations + * + * Uses fake resource values to test pure ResourceOwner tracking overhead + * without any actual resource operations. Uses same pipelined structure. + */ +Datum +bench_resowner(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + int num_buffers = PG_GETARG_INT32(0); + int num_operations = PG_GETARG_INT32(1); + int iterations = PG_GETARG_INT32(2); + bool random_access = PG_GETARG_BOOL(3); + + Datum *resources; + Datum *pipeline; + int *access_sequence; + int iter; + + Tuplestorestate *tupstore; + TupleDesc tupdesc; + Datum values[3]; + bool nulls[3] = {false, false, false}; + + if (num_buffers < 1 || num_operations < num_buffers) + ereport(ERROR, (errmsg("invalid parameters"))); + + InitMaterializedSRF(fcinfo, 0); + tupstore = rsinfo->setResult; + tupdesc = rsinfo->setDesc; + + resources = palloc(num_buffers * sizeof(Datum)); + pipeline = palloc(num_buffers * sizeof(Datum)); + access_sequence = palloc(num_operations * sizeof(int)); + + /* Use fake resource values (pointers to array elements for uniqueness) */ + for (int i = 0; i < num_buffers; i++) + resources[i] = PointerGetDatum(&resources[i]); + + generate_access_sequence(access_sequence, num_operations, num_buffers, random_access); + + for (iter = 0; iter < iterations; iter++) + { + instr_time start_time, end_time; + int64 elapsed_ns; + + INSTR_TIME_SET_CURRENT(start_time); + + for (int i = 0; i < num_operations + num_buffers; i++) + { + if (i >= num_buffers) + ResourceOwnerForget(CurrentResourceOwner, + pipeline[(i - num_buffers) % num_buffers], + &bench_resowner_desc); + if (i < num_operations) + { + Datum res = resources[access_sequence[i]]; + ResourceOwnerEnlarge(CurrentResourceOwner); + ResourceOwnerRemember(CurrentResourceOwner, res, &bench_resowner_desc); + pipeline[i % num_buffers] = res; + } + } + + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + elapsed_ns = INSTR_TIME_GET_NANOSEC(end_time); + + values[0] = Int32GetDatum(iter); + values[1] = Int64GetDatum(elapsed_ns); + values[2] = Float8GetDatum((double) elapsed_ns / num_operations); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + pfree(access_sequence); + pfree(pipeline); + pfree(resources); + + return (Datum) 0; +} diff --git a/src/test/modules/test_buffer_pin/test_buffer_pin.control b/src/test/modules/test_buffer_pin/test_buffer_pin.control new file mode 100644 index 0000000000..f065941712 --- /dev/null +++ b/src/test/modules/test_buffer_pin/test_buffer_pin.control @@ -0,0 +1,4 @@ +comment = 'test_buffer_pin - buffer pinning benchmarks' +default_version = '1.0' +module_pathname = '$libdir/test_buffer_pin' +relocatable = true -- 2.34.1