#!/usr/bin/env perl
#
# fuzz_cmin.pl — Fuzzer for tuplecid cmin mismatch in logical decoding
#
# Demonstrates a bug in ReorderBufferBuildTupleCidHash where catalog
# TID reuse after savepoint rollback causes an Assert failure (cassert
# builds) or silent catalog visibility corruption (production builds).
#
# WHAT IT DOES
#
#   1. Initializes a fresh PostgreSQL cluster with wal_level=logical.
#   2. Creates a database, a logical replication slot (test_decoding),
#      and 30 seed tables to populate catalog pages.
#   3. Runs up to N iterations.  Each iteration:
#      a. Generates a random transaction with CREATE TABLE, DROP TABLE,
#         SAVEPOINT, ROLLBACK TO SAVEPOINT, and ALTER TABLE operations.
#      b. Commits the transaction.
#      c. Calls pg_logical_slot_get_changes() to trigger logical decoding.
#   4. If the server crashes (Assert or PANIC), reports the iteration
#      number and saves the failing SQL to a file.
#
# WHAT TO EXPECT
#
#   On an UNPATCHED --enable-cassert build, the fuzzer typically triggers
#   within 300-1000 iterations (15-60 seconds) with output like:
#
#     !!! CMIN MISMATCH on iteration 389 !!!
#     Reproducer: fuzz_cases/case_389.sql
#     Panic details:
#     TRAP: failed Assert("ent->cmin == change->data.tuplecid.cmin"), ...
#
#   On a PATCHED build, it runs to completion with no crashes.
#
#   On a production (non-assert) build, the bug is silent — no crash,
#   but the logical decoder may produce incorrect output.
#
# REQUIREMENTS
#
#   - A PostgreSQL installation (--enable-cassert recommended).
#     Set PGINSTALL to the installation prefix, e.g.:
#       export PGINSTALL=/usr/local/pgsql
#     The directory must contain bin/initdb, bin/pg_ctl, bin/psql.
#   - The test_decoding contrib module must be installed.
#   - No other PostgreSQL instance using port 54329 (or set PGPORT).
#
# USAGE
#
#   perl fuzz_cmin.pl [max_iterations]
#
#   Default is 100000 iterations.  The fuzzer stops early on crash.
#   Data directory and test cases are written to fuzz_data/ and
#   fuzz_cases/ in the current directory.
#
# ENVIRONMENT VARIABLES
#
#   PGINSTALL   PostgreSQL installation prefix (default: /usr/local/pgsql)
#   PGPORT      Port number (default: 54329)
#

use strict;
use warnings;
use File::Path qw(rmtree);
use POSIX qw(WIFEXITED WEXITSTATUS);

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

my $max_iterations = $ARGV[0] || 100_000;
my $pg_install     = $ENV{PGINSTALL} || '/usr/local/pgsql';
my $port           = $ENV{PGPORT}    || 54329;
my $base_dir       = 'fuzz_data';
my $cases_dir      = 'fuzz_cases';
my $dbname         = 'fuzzdb';
my $slot_name      = 'fuzz_slot';

my $pg_ctl  = "$pg_install/bin/pg_ctl";
my $initdb  = "$pg_install/bin/initdb";
my $psql    = "$pg_install/bin/psql";

die "PGINSTALL=$pg_install does not contain bin/initdb\n"
    unless -x $initdb;

# Tunable ranges for test case generation
my $min_ops_per_txn     = 10;
my $max_ops_per_txn     = 80;
my $min_cols_per_table  = 5;
my $max_cols_per_table  = 25;
my $max_live_tables     = 30;
my $savepoint_prob      = 0.15;
my $rollback_prob       = 0.40;
my $alter_add_prob      = 0.10;
my $alter_drop_prob     = 0.05;

my @col_types = (
    ('integer')  x 5,
    ('bigint')   x 3,
    ('text')     x 4,
    ('boolean')  x 3,
    ('real')     x 2,
    'double precision',
    'numeric',
    'date',
    'timestamp',
    'bytea',
    'varchar(100)',
    'char(10)',
    'smallint',
    'oid',
    'name',
);

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

sub pick { return $_[int(rand(@_))] }

sub run_psql {
    my ($db, $sql, %opts) = @_;
    my $timeout = $opts{timeout} || 30;

    my $tmpfile = "/tmp/fuzz_cmin_$$.sql";
    open(my $fh, '>', $tmpfile) or die "Cannot write $tmpfile: $!\n";
    print $fh $sql;
    close($fh);

    my $cmd = "$psql -X -h /tmp -p $port -d $db -f $tmpfile 2>&1";
    my $output;
    eval {
        local $SIG{ALRM} = sub { die "timeout\n" };
        alarm($timeout);
        $output = `$cmd`;
        alarm(0);
    };
    my $exit_status = $?;
    alarm(0);

    unlink($tmpfile);

    if ($@ && $@ eq "timeout\n") {
        return (undef, "TIMEOUT after ${timeout}s");
    }

    my $ok = WIFEXITED($exit_status) && WEXITSTATUS($exit_status) == 0;
    return ($ok, $output);
}

sub server_alive {
    my ($ok) = run_psql('postgres', "SELECT 1;", timeout => 10);
    return $ok;
}

sub start_server {
    print "Starting server on port $port...\n";
    system("$pg_ctl -D $base_dir -l $base_dir/logfile -w start "
         . "-o '-p $port -k /tmp' >/dev/null 2>&1");
    die "Failed to start server\n" unless server_alive();
    print "Server running.\n";
}

sub stop_server {
    system("$pg_ctl -D $base_dir -m fast stop >/dev/null 2>&1");
}

sub check_log_for_panic {
    open(my $fh, '<', "$base_dir/logfile") or return "";
    my @matches;
    while (<$fh>) {
        push @matches, $_ if /tuplecid cmin mismatch|matching entry #|PANIC|TRAP/;
    }
    close($fh);
    return join("", @matches);
}

# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------

print "=== cmin mismatch fuzzer ===\n";
print "PG install: $pg_install\n";
print "Data dir:   $base_dir\n";
print "Port:       $port\n\n";

# Clean up any previous run
stop_server();
if (-d $base_dir) {
    rmtree($base_dir) or die "Cannot remove $base_dir: $!\n";
}
mkdir($cases_dir) unless -d $cases_dir;

# Initialize cluster
print "Initializing cluster...\n";
system("$initdb -D $base_dir --no-locale -A trust >/dev/null 2>&1") == 0
    or die "initdb failed\n";

# Configure for logical decoding
open(my $conf, '>>', "$base_dir/postgresql.conf") or die $!;
print $conf <<'CONF';
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
shared_buffers = '64MB'
log_min_messages = warning
log_line_prefix = '%m [%p] '
CONF
close($conf);

start_server();

# Create database and logical replication slot
print "Creating database and replication slot...\n";
run_psql('postgres', "CREATE DATABASE $dbname;");
run_psql($dbname,
    "SELECT pg_create_logical_replication_slot('$slot_name', 'test_decoding');");

# Pre-seed: create tables to populate catalog pages so that subsequent
# CREATE/DROP cycles within a transaction can trigger page pruning and
# TID reuse on catalog pages.
print "Pre-seeding catalog pages (30 tables)...\n";
for my $i (1..30) {
    my @cols = map { "c$_ integer" } 1..20;
    run_psql($dbname, "CREATE TABLE seed_$i (" . join(', ', @cols) . ");");
}
# Consume the seeding WAL
run_psql($dbname,
    "SELECT count(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL);");

print "Setup complete. Starting fuzzer.\n\n";

# ---------------------------------------------------------------------------
# Test case generator
# ---------------------------------------------------------------------------

sub generate_test_case {
    my $sql = "";
    my @live_tables;
    my @savepoints;
    my %sp_tables;
    my $table_counter = 0;
    my $sp_counter = 0;
    my $col_counter = 0;

    my $num_ops = $min_ops_per_txn +
                  int(rand($max_ops_per_txn - $min_ops_per_txn));

    $sql .= "BEGIN;\n";

    for my $op_num (1..$num_ops) {
        # Maybe create a savepoint
        if (rand() < $savepoint_prob && @savepoints < 3) {
            $sp_counter++;
            my $sp = "sp_$sp_counter";
            push @savepoints, $sp;
            $sp_tables{$sp} = [];
            $sql .= "SAVEPOINT $sp;\n";
            next;
        }

        # Maybe rollback to savepoint
        if (rand() < $rollback_prob && @savepoints > 0) {
            my $sp = $savepoints[-1];
            for my $t (@{$sp_tables{$sp}}) {
                @live_tables = grep { $_ ne $t } @live_tables;
            }
            $sql .= "ROLLBACK TO SAVEPOINT $sp;\n";
            $sp_tables{$sp} = [];
            next;
        }

        # Maybe ALTER TABLE ADD COLUMN
        if (rand() < $alter_add_prob && @live_tables > 0) {
            my $tbl = pick(@live_tables);
            $col_counter++;
            my $typ = pick(@col_types);
            $sql .= "ALTER TABLE $tbl ADD COLUMN extra_$col_counter $typ;\n";
            next;
        }

        # Maybe ALTER TABLE DROP COLUMN
        if (rand() < $alter_drop_prob && @live_tables > 0) {
            my $tbl = pick(@live_tables);
            if ($col_counter > 0) {
                my $c = 1 + int(rand($col_counter));
                $sql .= "DO \$\$ BEGIN "
                       . "ALTER TABLE $tbl DROP COLUMN IF EXISTS extra_$c; "
                       . "EXCEPTION WHEN OTHERS THEN NULL; "
                       . "END \$\$;\n";
            }
            next;
        }

        # Main choice: CREATE or DROP
        my $do_create = (@live_tables < 3)                ? 1 :
                        (@live_tables >= $max_live_tables) ? 0 :
                        (rand() < 0.55)                   ? 1 : 0;

        if ($do_create) {
            $table_counter++;
            my $tbl = "fuzz_t$table_counter";
            my $ncols = $min_cols_per_table +
                        int(rand($max_cols_per_table - $min_cols_per_table));
            my @cols;
            for my $c (1..$ncols) {
                push @cols, "c$c " . pick(@col_types);
            }
            $sql .= "CREATE TABLE $tbl (" . join(', ', @cols) . ");\n";
            push @live_tables, $tbl;
            if (@savepoints) {
                push @{$sp_tables{$savepoints[-1]}}, $tbl;
            }
        }
        else {
            my $idx = int(rand(@live_tables));
            my $tbl = splice(@live_tables, $idx, 1);
            $sql .= "DROP TABLE $tbl;\n";
        }
    }

    $sql .= "COMMIT;\n";
    return $sql;
}

# ---------------------------------------------------------------------------
# Main fuzzer loop
# ---------------------------------------------------------------------------

my $start_time = time();

for my $iter (1..$max_iterations) {
    my $test_sql = generate_test_case();

    # Save to file before executing (in case server crashes hard)
    my $case_file = "$cases_dir/case_$iter.sql";
    open(my $fh, '>', $case_file) or die "Cannot write $case_file: $!\n";
    print $fh $test_sql;
    close($fh);

    # Execute the transaction
    my ($txn_ok, $txn_out) = run_psql($dbname, $test_sql, timeout => 60);

    if (!$txn_ok) {
        if (!server_alive()) {
            my $panic = check_log_for_panic();
            print "\n!!! SERVER CRASHED on iteration $iter !!!\n";
            print "Reproducer: $case_file\n";
            print "Server log extract:\n$panic\n" if $panic;
            last;
        }
        # Server alive, txn just failed — consume WAL and continue
        run_psql($dbname,
            "SELECT count(*) FROM pg_logical_slot_get_changes("
            . "'$slot_name', NULL, NULL);", timeout => 30);
        next;
    }

    # Transaction committed — trigger logical decoding
    my ($dec_ok, $dec_out) = run_psql($dbname,
        "SELECT count(*) FROM pg_logical_slot_get_changes("
        . "'$slot_name', NULL, NULL);", timeout => 120);

    if (!$dec_ok) {
        sleep(2);
        my $panic = check_log_for_panic();

        if ($panic =~ /tuplecid cmin mismatch|TRAP.*Assert/) {
            print "\n!!! CMIN MISMATCH on iteration $iter !!!\n";
            print "Reproducer: $case_file\n";
            print "Server log extract:\n$panic\n";
            last;
        }

        if (!server_alive()) {
            print "\n!!! SERVER CRASHED during decode, iteration $iter !!!\n";
            print "Reproducer: $case_file\n";
            print "Server log extract:\n$panic\n" if $panic;
            last;
        }
    }

    # Periodic cleanup: drop fuzz_ tables to prevent unbounded catalog growth
    if ($iter % 50 == 0) {
        run_psql($dbname, <<'CLEANUP');
DO $$ DECLARE r record;
BEGIN
    FOR r IN SELECT schemaname || '.' || tablename AS t
             FROM pg_tables WHERE tablename LIKE 'fuzz_%'
    LOOP
        EXECUTE 'DROP TABLE IF EXISTS ' || r.t || ' CASCADE';
    END LOOP;
END $$;
CLEANUP
        run_psql($dbname,
            "SELECT count(*) FROM pg_logical_slot_get_changes("
            . "'$slot_name', NULL, NULL);", timeout => 30);
    }

    # Progress
    if ($iter % 100 == 0) {
        my $elapsed = time() - $start_time;
        printf "  [%d iterations, %ds elapsed, %.1f iter/s]\n",
               $iter, $elapsed, $iter / ($elapsed || 1);
    }
}

my $elapsed = time() - $start_time;
print "\nFuzzer finished after $elapsed seconds.\n";

# Cleanup
stop_server();
