use strict;
use warnings;
use DBI;
use IO::Pipe;
use Storable;
use Data::Dumper;
use List::Util qw(shuffle);
use Digest::MD5 qw(md5_hex);

## This is a stress tester for PostgreSQL crash recovery.

## It spawns a number of processes which all connect to the database
## and madly update a table until either the server crashes, or
## for a million updates (per process).

## Upon crash, each Perl process reports up to the parent how many times each value was updated
## plus which update was 'in flight' at the time of the crash. (Since we received neither an
## error nor a confirmation, the proper status of this in flight update is unknowable)

## The parent consolidates this info, waits for the database to recover, and verifies
## that the state of the database matches what we know it ought to be.

## first arg is number of processes (default 8), 2nd is number of updates per process
## (default 1_000_000), 3rd argument causes aborts when a certain discrepancy is seen

## Arranging for the server to crash is the obligation of the outer driving script (do.sh)
## and the accompanying instrumentation patch.

## I invoke this in an outer driving script and let both the Perl messages and the
## postmaster logs spool together into one log file.  That way it is easier to correlate
## server events with the client/Perl events chronologically.

## This generates a lot of logging info.  The tension here is that if you generate too much
## info, it is hard to find anomalies in the log file.  But if you generate too little info,
## then once you do find anomalies you can't figure out the cause.  So I error on the side
## of logging too much, and use command lines (memorialized below) to pull out the most
## interesting things.

## But with really high logging, the lines in the log file start
## getting garbled up, so back off a bit.  The commented out warn and elog things in this file
## and the patch file show places where I previously needed logging for debugging specific things,
## but decided I don't need it all of the time.  Leave the commented code as landmark for the future.

## look for odd messages in log file that originate from Perl
#fgrep ' line ' do.out |sort|uniq -c|sort -n|fgrep -v 'in flight'

## look at rate of incrementing over time, for Excel or SpotFire.
#grep -P '2014-05|^sum ' do.out |grep -P '^sum' -B1|perl -ne 'my @x=split/PDT/; print $x[0] and next if @x>1; print if /sum/' > ~/jj.txt

## check consistency between child and parent table: (the 10 here matches the 10 in 'if ($count->[0][0] % 10 == 0)'
## psql -c 'select abs(id), sum(count) from foo_parent where count>0 group by abs(id) except select abs(p_id), sum(floor(count::float/10)) from foo group by abs(p_id)'

my $SIZE=10_000;

## centralize connections to one place, in case we want to point to a remote server or use a password
sub dbconnect {
  my $dbh = DBI->connect("dbi:Pg:", "", "", {AutoCommit => 1, RaiseError=>1, PrintError=>0});
  return $dbh;
};

my %count;

while (1) {
  %count=();
  eval {
    my $dbh = dbconnect();
    eval { ## on multiple times through, the table already exists, just let it fail
         ## But if the table exists, don't pollute the log with errors
         ($dbh->selectrow_array("select count(*) from pg_tables where tablename='foo';"))[0] == 1 and return;
         $dbh->do(<<'END');
create table foo(index int, count int);
create unique index foobarindex on foo(index);
END
    };
    ## get rid of garbage tuples if they were present during the crash
    $dbh->do("delete from foo where index < 0");
    my $dat = $dbh->selectall_arrayref("select index, count from foo where index>=0");
    if (@$dat == $SIZE) {
         $count{$_->[0]}=$_->[1] foreach @$dat;
    } else {
      warn "table not correct size, ", scalar @$dat unless @$dat==0;
      $dbh->do("truncate foo");
      %count=();
      my $sth=$dbh->prepare("insert into foo (index, count) values (?,0)");
      $dbh->begin_work();
      $sth->execute($_) foreach 1..$SIZE;
      $dbh->commit();
    };
    ## even the pause every 100 rounds to let autovac do its things is not enough
    ## because the autovac itself generates enough IO to trigger crashes so that it never completes,
    ## lead to wrap around shut down.  This should keep the vaccum load low enough to complete, at least some times
    ## $dbh->do("vacuum foo") if rand()<0.1;
  };
  last unless $@;
  warn "Failed with $@, trying again";
  sleep 1;
};
warn "init done";

## Fork off a given number of child presses, opening pipes for them to 
## communicate back to the parent.  Communication is a one-time shot,
## at the end of their lifetimes.
my @child_pipe;
my $pipe_up;
foreach (1.. ((@ARGV and $ARGV[0]>0) ? $ARGV[0] : 8)) {
    my $pipe = new IO::Pipe;
    defined (my $fork = fork) or die "fork failed: $!";
    if ($fork) {
      push @child_pipe, {pipe => $pipe->reader(), pid => $fork};
    } else {
      $pipe_up=$pipe->writer();
      @child_pipe=();
      last;
    };
};

#warn "fork done";

if (@child_pipe) {
  #warn "in harvest";
  my %state;
  ### harvest children data, which consists of the in-flight item, plus a hash with the counts of all confirmed-committed items
  local $/;
  foreach my $handle ( @child_pipe ) {
    my $data=Storable::fd_retrieve($handle->{pipe});
    if (defined $data->[1] and $data->[1] !~ /^(begin)/) {
      $state{$data->[0]}=$data->[1] if defined $data->[0];
    };
    while (my ($k,$v)=each %{$data->[2]}) {
       $count{$k}+=$v;
    };
    close $handle->{pipe} or die "$$ closing child failed with bang $!, and question $?";
    my $pid =waitpid $handle->{pid}, 0 ;
    die "$$: my child $pid exited with non-zero status $?" if $?;
  };
  #warn "harvest done";
  my ($dat,$dat2);
  foreach (1..300) {
       sleep 1;
       ## used to do just the connect in the eval loop,
       ## but sometimes the database crashed again during the
       ## query, so do it all in the eval-loop
       eval {
         warn "summary attempt $_" if $_>1;
         my $dbh = dbconnect();
         ## detect wrap around shutdown (actually not shutdown, but read-onlyness) and bail out
         ## need to detect before $dat is set, or else it won't trigger a Perl fatal error.
         $dbh->do("create temporary table aldjf (x serial)");
         ## commit prepared.  Should we commit all prepared, or only the ones our children
         ## told us about?  Finding one we were told about should be a fatal error.  But just make it 
         ## a warning, because the eval will trap an error anyway.
         foreach (@{$dbh->selectcol_arrayref('select gid from pg_prepared_xacts')}) {
           my ($id)= /^(\d+)_/ or die $_;
           #warn "dealing with gid $_";
           $state{$_} or warn "Transaction $_ ($state{$_}) was prepared in the database, but was not expected to be";
           if (rand()<0.5) {
             $dbh->do('commit prepared ?',undef,$_);
             delete $state{$_};  
             $count{$id}+=1;
           } else { # make sure this 'rollback prepared' after crash aspect is tested as well
             ## If we re-crash after sending the rollback, but before betting a response, then if it is no longer
             ## listed in pg_prepared_xacts when we come back up, it must have actually rolled back.
             ## need to flag it as such.
             $state{$_}="rolling back prepared $_";
             $dbh->do('rollback prepared ?',undef,$_);
             delete $state{$_};  
           };
           ##Code below expects it to be missing, not just 0.
         };
         foreach (keys %state) {
           my ($id) = $state{$_} =~ /prepared (\d+)_/;
           unless ($id) {  ## 
             ## things not prepared are not really in flight. 
             warn $state{$_} unless $state{$_} =~ /^(begin |preparing )/;
             next;
           };
           ## If it was prepared, but not in pg_prepared_xacts, then it must have committed and child just didn't get the word.
           ## Unless we were trying to roll it back, then it must have succeeded at the rollback.
           $count{$id}+=1 unless $state{$_} =~ /rolling back prepared \d+/;
           delete $state{$_}
         };
           
         $dat = $dbh->selectall_arrayref("select index, count from foo");
         warn "sum is ", $dbh->selectrow_array("select sum(count) from foo"), "\n";
         warn "count is ", $dbh->selectrow_array("select count(*) from foo"), "\n";
         # Try to force it to walk the index to get to each row, so corrupt indexes are detected
         # If they are not detected here, they will be detected later when the wrong number of
         # rows gets updated.
         # (Without the "where index is not null", it won't use an index scan no matter what)
         $dat2 = $dbh->selectall_arrayref("set enable_seqscan=off; select index, count from foo where index is not null and index >=0 ");
       };
       last unless $@;
       $@ =~ s/\n/\\n /g if defined $@;
       warn $@;
  };
  die "Database didn't recover even after 5 minutes, giving up" unless $dat2;
  ## don't do sorts in SQL because it might change the execution plan
  @$dat=sort {$a->[0]<=>$b->[0]} @$dat;
  @$dat2=sort {$a->[0]<=>$b->[0]} @$dat2;
  foreach (@$dat) {
    $_->[0] == $dat2->[0][0] and $_->[1] == $dat2->[0][1] or die "seq scan doesn't match index scan"; shift @$dat2;
    no warnings 'uninitialized';
    #warn "For $_->[0], $_->[1] != $count{$_->[0]}", exists $in_flight{$_->[0]}? " in flight":"", " state ", $state{$_->[0]}  if $_->[1] != $count{$_->[0]};
    if ($_->[1] != $count{$_->[0]} and defined $ARGV[2]) {
       #bring down the system now, before autovac destroys the evidence
       die;
    };
    delete $count{$_->[0]};
  };
  warn "Left over in %count: @{[%count]}" if %count;
  die if %count and defined $ARGV[2];
  warn "normal exit at ", time();
  exit;
};


my %h; # how many time has each item been incremented
my $i; # in flight item which is not reported to have been committed
my $id;
my $state; #  state of the transaction

eval {
  ## do the dbconnect in the eval, in case we crash when some children are not yet
  ## up.  The children that fail to connect in the first place still
  ## need to send the empty data to nstore_fd, or else fd_retieve fatals out.
  my $dbh = dbconnect();
  # $dbh->do("SET SESSION synchronous_commit = false");
  my $sth=$dbh->prepare("update foo set count=count+1 where index=?");
  foreach (1..($ARGV[1]//1e6)) {
    $i=1+int rand($SIZE);
    $id="${i}_${$}_$_";
    $dbh->begin_work();
    $state="begin $id";
    my $c = $sth->execute($i);
    $c == 1 or die "update did not update 1 row: key $i updated $c";
    $state="preparing $id";
    $dbh->do("prepare transaction ?",undef,$id);
    if (rand() < 0.0001) { 0 and warn "sleeping on $id"; select undef,undef,undef,10 };
    if (rand() < 0.1) {
      $state="rolling back prepared $id";
      $dbh->do("rollback prepared ?", undef,$id);
    } else {
      $state="prepared $id";
      $dbh->do("commit prepared ?", undef,$id);
    };
    $h{$i}++ if $state =~ /^prepared/; 
    undef $state;
    undef $i;
    undef $id;
  };
  $@ =~ s/\n/\\n /g if defined $@;
  warn "child exit ", $dbh->state(), " $@" if length $@;
};
$@ =~ s/\n/\\n /g if defined $@;
die "child abnormal exit $@" if length $@ and $@ =~ / 0E0/;
warn "child abnormal exit $@" if length $@;

Storable::nstore_fd([$id,$state,\%h],$pipe_up);
close $pipe_up or die "$! $?";
