#!/usr/bin/env perl
#!/opt/perl-5.26/bin/perl
#!/home/aardvark/perl-5.27/bin/perl
use strict; use warnings;
use DBI; use Digest::MD5; use Tie::Comma; use Getopt::Long;
use IPC::Run qw/run start pump finish timeout/;
use Time::HiRes qw/tv_interval gettimeofday/;
use File::Temp qw/tempfile tempdir/;
use POSIX qw/strftime/;
use constant { BASE_PORT => 6515, };
our $USER         = 'aardvark';
our %pg_data_dir  = ();
our %pg_xlog_dir  = ();
our $ROOT_DIR     = "/home/$USER";
our $ROOT_TMPDIR  = $ROOT_DIR . "/tmp/cascade";
our $PG_STUFF_DIR = $ROOT_DIR . "/pg_stuff";
our $ASSERTIONS   = 1;
our $PGVERSION    = 
      "HEAD"
    # "REL_10_STABLE"
;
our $BIN_DIR      = $ASSERTIONS == 1 ? "bin" : "bin.fast" ;
our $POSTGRES     = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/postgres";
our $INITDB       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/initdb";
our $PG_CTL       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/pg_ctl";
our $DEVEL_FILE   = $PG_STUFF_DIR . '/.11devel'; # password  (same for 10, 11)
our $num_ok  = 0;
our $num_NOK = 0;
our @prj = (); our @port = (); our @dbh  = ();
$| = 1;
main();
exit;
sub settings {
  $BIN_DIR      = $ASSERTIONS == 1 ? "bin" : "bin.fast" ;
  $POSTGRES     = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/postgres";
  $INITDB       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/initdb";
  $PG_CTL       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/pg_ctl";
  $DEVEL_FILE   = $PG_STUFF_DIR . '/.11devel'; # password  (same for 10, 11)
}
sub main {
  print sprintf('-- perl ' . "%vd\n", $^V);
  my $repeats  = 1;
  my $scale    = 1;
  my $clients  = 8;
  my $duration = 1;
  my $num_proj = 3;
  my $waiting  = 5;
  my $threads  = 8;
  my $verbose  = 0;
  my $fast     = $ASSERTIONS == 0 ? 1 : 0;
  my $help     = undef;
  GetOptions(  "repeats=i"   => \$repeats
             , "scale=i"     => \$scale
             , "clients=i"   => \$clients
             , "threads=i"   => \$threads
             , "duration=i"  => \$duration
             , "instances=i" => \$num_proj
             , "waiting=i"   => \$waiting
             , "verbose"     => \$verbose
             , "fast"        => \$fast
             , "help"        => \$help
  ) or die("Error in command line arguments\n");
  if ($help) {
    usage();
    exit(0);
  }

  if ($fast) { $ASSERTIONS = 0 }
  else       { $ASSERTIONS = 1 }

  settings();

  print "-- ", construct_cmdline($num_proj, $scale, $clients, $threads, $duration, $repeats, $waiting, $verbose) , "\n";;

  for (1..$num_proj) { add_instance(); }
  print "\n";
  test_pgbench_derail($scale, $clients, $threads, $duration, $repeats, $waiting, $verbose);

  if (1) {
    print "-- stopping instances ";
    stop_instances();
    print "\n";
  }

  print "-- done.\n";

}
sub construct_cmdline {
  my ($num_proj, $scale, $clients, $threads, $duration, $repeats, $waiting, $verbose) = @_;
  return "./cascade.pl --instances=$num_proj --scale=$scale --clients=$clients --threads=$threads --duration=$duration --repeats=$repeats --waiting=$waiting " . ($verbose ? "--verbose" : "");
}
sub stop_instances {
  for my $n (0 .. scalar(@prj)-1) {
    print ".";
    stop_instance($prj[$n], $port[$n]);
  }
}
sub readdir_for_port {
  my ($port) = @_;
  my $dir = $ROOT_TMPDIR;
  opendir(my $dh, $dir) || die "Can't open $dir: $!";
  my ($in, $out, $err);
  my ($dir_found) = undef;
  #print "\n";
  while (readdir $dh) {
    next if ($_ =~ m/^[.]+$/);
    my $conf = "$dir/$_/data/postgresql.conf";
    #print "$port: conf = $conf\n";
    if ( -e $conf )
    {
      #print "-- found: $conf   now grepping\n";
      #print "-- grep -l $port $conf\n";
      run [ 'grep', '-l', $port, $conf ], \$in, \$out, \$err;
      chomp $out;
      #print "out   [", $out, "]\n";
      if ($out) {
         #print "for $port: grep out:  $port found in:  ", $out, "\n";
      }
      else {
         #print "for $port: grep out:  $port NOT found\n";
         next;
      }
      $dir_found = $out;
      if ($dir_found =~ m,^(.*)/data/postgresql.conf,) {
         $dir_found = $1;
      }
      last;
    }
    #else {
    #    print "no such file:  $conf\n";
    #}
  }
  closedir $dh;
  return $dir_found;
}
sub add_instance {
  my $n = scalar(@prj) ;
  my $project = "proj" . sprintf("%02d",$n);
  my $port = BASE_PORT + $n; # BASE_PORT is 6515
  if    ($n == 0) { print "-- "}
  elsif ($n == 1) { print "-- "}
  else            { print " " }
# print "$project";
  print sprintf("%02d/%d",$n, $port);
  push(@prj, $project);
  push(@port, $port);
  my $good_dir = readdir_for_port($port);
  my $INIT = 0;
  if (defined $good_dir &&  -d $good_dir)
  {
    #print "-- using previous directory  (to wit: $good_dir  )\n";
    $pg_data_dir{ $port } = $good_dir . "/data";
    $pg_xlog_dir{ $port } = $good_dir . "/wal";
  }
  else
  {
    #print "-- creating new directory\n";
    init_instance($project, $port);
    $INIT = 1;
  }
  start_instance($project, $port);
  my $dbh = connectdb($port);
  if ($INIT eq 1) {
    my $rc = $dbh->do( "
      create extension adminpack;
      create extension amcheck;
      create extension bloom;
      create extension btree_gin;
      create extension btree_gist;
      create extension citext;
      create extension cube;
      create extension file_fdw;
      create extension hstore;
      create extension intarray;
      create extension pg_stat_statements;
      create extension pgstattuple;
      create extension pg_trgm;
      create extension pgcrypto;
      create extension postgres_fdw;
    ");
    if ($rc ne "0E0") {
      print "installing modules returned [$rc]\n";
      sleep 3;
    }
  }
  push(@dbh, $dbh);
  if ( $n == 0 ) {
    print " ";
    show_instance($dbh);
  }
}
sub show_instance {
  my ($dbh) = @_;
  print $dbh->selectrow_arrayref( "select current_setting('port') || ' ' || current_setting('data_directory') || '  ' || version()
    || ', debug ' || current_setting('debug_assertions') " )->[0], "\n";
}
sub show_instances {
  for my $dbh (@dbh) {
    show_instance($dbh); # print $dbh->selectrow_arrayref( "select current_setting('port') || ' ' || current_setting('data_directory') || '  ' || version()" )->[0], "\n";
  }
  if ( 0 ) {
    for my $dbh (@dbh) {
      print
        $dbh->selectrow_arrayref( "select current_setting('port')")->[0],
        "  debug_assertions [", $dbh->selectrow_arrayref( "select current_setting('debug_assertions') " )->[0], "]\n";
    }
  }
}
sub test_pgbench_derail {

  my ($scale, $clients, $threads, $duration, $total, $waiting, $verbose) = @_;

  my @durations = ();
  my $dur_total = 0;
  print "-- test: test_pgbench_derail (", $total, "x)\n";
  for my $n (1..$total) {
    print "-- start\n";
    my $t0 = [gettimeofday];
    pgbench_derail2( $scale, $clients, $threads, $duration, $total, $waiting, $verbose);
    my $d0  = tv_interval($t0,[gettimeofday]);
    $dur_total += $d0;
    push(@durations,$d0);
    my $avg =
    print "-- end   ", sprintf("%6.3f", $d0), "    ok  $comma{$num_ok} (of $comma{$total})   avg "
                     , sprintf("%6.3f", $dur_total / $n), "\n";
    if ($num_NOK > 0) {
      print "                      NOK  $num_NOK\n";
    }
  }
  if ($total > 1) {
    printf "--                 ok %4d\n", $num_ok;
    printf "--                NOK %4d\n", $num_NOK;
  }
}
sub pgbench_drop_tables{
  my ($dbh) = @_;

  $dbh->do( "drop table if exists pgbench_accounts;
             drop table if exists pgbench_branches;
             drop table if exists pgbench_tellers;
             drop table if exists pgbench_history; " );
}
sub pgbench_init_cmds {
  my ($port, $dbname, $scale) = @_;
  my ($in, $out, $err);
  my @cmd = ();
  push(@cmd, "pgbench");
  push(@cmd, "--port=$port");
  push(@cmd, "--quiet");
  push(@cmd, "--initialize");
  push(@cmd, "--scale=$scale");
  push(@cmd, $dbname );
  (\@cmd);
}
sub pgbench_initialise {
  my ($dbh,$rcmd) = @_;
  my ($in,$out,$err);
  run $rcmd, \$in, \$out, \$err;
  #print( ($in  ? $in  : '')
  #     , ($out ? $out : '')
  #    #, ($err ? $err : '')
  #) ;
  $dbh->do("alter table pgbench_history add column hid serial primary key;");
}
sub pgbench_tables_dump_restore {
  my ($port1, $dbname1, $port2, $dbname2) = @_;
  my ($in, $out, $err) = (undef,undef,undef);
  run  [    'pg_dump'
          , '-Fc'
          , "-p$port1"
          , '--exclude-table-data=pgbench_history'
          , '--exclude-table-data=pgbench_accounts'
          , '--exclude-table-data=pgbench_branches'
          , '--exclude-table-data=pgbench_tellers'
          , '-tpgbench_history'
          , '-tpgbench_accounts'
          , '-tpgbench_branches'
          , '-tpgbench_tellers'
          , $dbname1
        ]
  , '|'
  ,     [   'pg_restore'
          , '-1'
          , '-p', $port2
          , '-d', $dbname2
        ]
  , \$in, \$out, \$err ;
  chomp $in;
  chomp $out;
  chomp $err;
  print $in , $out , $err ;
}
sub pgbench_run {

  my ($protocol,$clients,$threads,$duration,$pseconds,$port,$dbname, $scale) = @_;

  my ($in, $out, $err) = (undef, undef, undef);
  my $msg1 = "-- pgbench -M $protocol -c $clients -j $threads -T $duration -P $pseconds -n $dbname -- scale $scale";
  print $msg1;
  my $t0 = [gettimeofday];
  run ["pgbench", "-M$protocol", "-c$clients", "-j$threads", "-T$duration", "-P$pseconds", "-n", "-p$port", $dbname ]
       , \$in, \$out, \$err;
  chomp $out;
  my  @rows_processed = ($out =~ /actually processed: (\d+)/);
  my (@tps) = $out =~ /\ntps = (\d+)[.]\d+ .including/g;
  my $msg2 = "  change " . $rows_processed[0] . " rows, " . $tps[0] . " tps" ;
  print $msg2;
  print " "x(line_length()  - length($msg1 . $msg2)), sprintf("%7.3f\n", tv_interval($t0, [gettimeofday]) );
}
sub line_length { (5 * md5_width()) + 64 }
sub pgbench_change_subscriber_struct {
  my ($dbh,$port, $dbname) = @_;
  for my $n (1..5) {
    my $rc1 = $dbh->do("alter table pgbench_accounts add column dummy$n text default 'hah!'; ");
#   my $rc2 = $dbh->do("alter table pgbench_accounts add column id_$n serial ; ");
    #print $rc1, " (added dummy$n column to pgbench_accounts)\n";
  }
}
sub pgbench_derail2 {

#   pgbench_derail2( $scale, $clients, $threads, $duration, $total, $waiting );
  my               ($scale, $clients, $threads, $duration, $total, $waiting, $verbose) = @_;

  my $rc;
  my $dbname = "postgres";
  my $pseconds = int($duration / 5); if ($pseconds < 1) { $pseconds = 1; }
  my $wait     = defined $waiting ? $waiting :
                 $scale <=   5 ?  5 :
                 $scale <=  50 ? 20 :
                 $scale <= 100 ? 30 : 60 ;
  for my $n (0 .. scalar(@dbh)-1) { drop_subscriptions($dbh[$n]); }
  for my $n (0 .. scalar(@dbh)-1) { drop_publications ($dbh[$n]); }
  for (@dbh) { pgbench_drop_tables($_); }
  my ($rcmd) = pgbench_init_cmds($port[0], $dbname, $scale);
  my $init_msg = "-- " . join(" ", @$rcmd);
  print $init_msg;
  my $t0 = [gettimeofday];
  pgbench_initialise($dbh[0], $rcmd);
  print " "x(line_length() - length($init_msg)), sprintf("%7.3f\n", tv_interval($t0, [gettimeofday])  );
  my $pg_size_pretty = $dbh[0]->selectrow_arrayref("select
      pg_size_pretty( pg_relation_size('pgbench_accounts')\n"
                . " + pg_relation_size('pgbench_branches')\n"
                . " + pg_relation_size('pgbench_tellers' )\n"
                . " + pg_relation_size('pgbench_history' )\n )")->[0] ;
  print "-- size ", $pg_size_pretty, "\n";

  for my $n (1 .. scalar(@dbh)-1) {
      pgbench_tables_dump_restore($port[0], $dbname, $port[$n], $dbname);
  }

# pgbench_change_subscriber_struct($db2,$port2, $dbname2);
#
  create_pubsub_cascade($dbname);

  my $protocol = "prepared";
  pgbench_run($protocol,$clients,$threads,$duration,$pseconds,$port[0],$dbname,$scale);

  my $display_level = 0;
  if ( $verbose ) {
     $display_level = 1;
  }

  my ($rdigest1, $rdigest2, $rdigest3, $rline1, $rline2, $rline3) = (\'1' , \'2', \'3', \'', \'', \'');
  my %seen = ();
  my ($rdigest0, $rline0) = get_md5_digest_pgbench($dbh[0], $port[0], $dbname);
  print "\n", $$rline0, "\n";
  if ( $display_level  == 1) { print "\n"; }
  my $count = 0;
  my $limit = 1;
# my $t0 = [gettimeofday];
  while (1) {
    $count++;
    if (1) {
      my %tops = ();
      for my $n (0, scalar(@dbh)-1) {
        my $t0 = [gettimeofday];
        my ($rdigest_top, $rline_top) = get_md5_digest_pgbench_top($dbh[$n], $port[$n], $dbname, $limit);
        if ( $display_level  == 1 ) {
          print "", $$rline_top, "   ", $$rdigest_top eq $$rdigest0 ? "          " : "          ";
          printf "  %7.3f", tv_interval($t0, [gettimeofday]);
        }
        if (! exists $tops{ $$rdigest_top }) {
          $tops{ $$rdigest_top } = 1;
        }
        my $num_keys = keys %tops;
        if ($num_keys > 1) {
          if ( $display_level == 1) { print "\n"; }
          last;
        }
        if ( $display_level == 1) {
          print "\n";
        }
        else {
          print "x";
          if    ($count % 50 == 0) { print "\n"; }
          elsif ($count % 10 == 0) { print " ";  }
        }
      }
      my $num_keys = keys %tops;
      if ($num_keys > 1) {
        if ( $display_level == 1) { print "      ${wait}s..."; }
        sleep $wait;
        if ( $display_level == 1) { print " run\n"; }
        next;
      }
    }
    if ($display_level == 1) { print "\n"; }
    else                     { print "\nfull: "; }

    my (@digest,@line)=((),());
    for my $n (0 .. scalar(@dbh)-1) {
      my $t0 = [gettimeofday];
      my ($rdigest, $rline) = get_md5_digest_pgbench($dbh[$n], $port[$n], $dbname);
      if ($display_level == 1) {
        print $$rline, "   ", $$rdigest eq $$rdigest0 ? "replica ok" : "       NOK";
        printf "  %7.3f\n", tv_interval($t0, [gettimeofday]);
      }
      else {
        print $$rdigest eq $$rdigest0 ? "o" : "n";
      }
      push(@digest, $$rdigest);
      push(@line  , $$rline);
      if (not exists $seen{ $$rdigest } ) {
        $seen{ $$rdigest } = 1;
      }
    }
    print "\n";
    my $NOK = 0;
    for my $dig (@digest) {
       if ($dig ne $$rdigest0) {
         $NOK = 1;
       }
    }
    if ($NOK == 0) {
       $num_ok ++;
       print $line[ $#line ], "\n";
       print "-- All is well.\n";
       last;
    }
    else {
       print "-- Not good.\n";
    }
    sleep $wait;
  }
  for my $n (0 .. scalar(@dbh)-1) { drop_subscriptions($dbh[$n]); }
  for my $n (0 .. scalar(@dbh)-1) { drop_publications ($dbh[$n]); }
}
sub drop_subscriptions {
  my ($dbh) = @_;
  while ( (my $count = $dbh->selectrow_arrayref( "select count(*) from pg_subscription limit 1")->[0]) > 0 ) {
    my $subname = $dbh->selectrow_arrayref( "select subname from pg_subscription limit 1")->[0];
    #print "-- drop subscription if exists $subname\n";
    $dbh->do("drop subscription if exists $subname" );
  }
}
sub drop_publications {
  my ($dbh) = @_;
  while ( (my $count = $dbh->selectrow_arrayref( "select count(*) from pg_publication limit 1")->[0]) > 0 ) {
    my $pubname = $dbh->selectrow_arrayref( "select pubname from pg_publication limit 1")->[0];
    #print "-- drop publication  if exists $pubname\n";
    $dbh->do("drop publication  if exists $pubname" );
  }
}
sub create_pubsub {
  my ($dbname) = @_;
  my $pubport = $port[0];
  for my $n (0 .. (scalar(@dbh)-1)) {
    my $pubname = "pub1";
    my $subname = "sub1";
    my $dbh = $dbh[$n];
    my $subport = $port[$n];
    if ($n == 0) { create_publication(  $dbh, $pubname ); }
    else         { create_subscription( $dbh, $dbname, $subname, $subport, $pubname, $pubport ); }
  }
}
sub create_pubsub_cascade {
  my ($dbname) = @_;
  for my $n (0 .. (scalar(@dbh)-1)) {
    my $dbh = $dbh[$n];
    if    ($n == 0)  {
      my $pubport = $port[$n];
      my $pubname = "pub${n}_${pubport}";
      create_publication( $dbh, $pubname );
    }
    elsif ($n == (scalar(@dbh)-1)) {
      my $subport    = $port[$n];
      my $subname    = "sub${n}_${subport}";
      my $prepubport = $port[$n - 1];
      my $prepubname = "pub" . ($n - 1) . "_${prepubport}";
      create_subscription( $dbh, $dbname, $subname, $subport, $prepubname, $prepubport );
    }
    else {
      my $subport    = $port[$n];
      my $subname    = "sub${n}_${subport}";
      my $prepubport = $port[$n - 1];
      my $prepubname = "pub" . ($n - 1) . "_${prepubport}";
      create_subscription( $dbh, $dbname, $subname, $subport, $prepubname, $prepubport );
      my $pubport = $port[$n];
      my $pubname = "pub${n}_${pubport}";
      create_publication(  $dbh, $pubname );
    }
  }
}
sub get_md5_digest_pgbench_top {
  my ($dbh,$port,$dbname,$limit) = @_;
  my $md5_a_top = get_md5_limit($port, $dbname, "pgbench_accounts", "aid", ["aid","bid","abalance","filler"]                 , $limit );
  my $md5_b_top = get_md5_limit($port, $dbname, "pgbench_branches", "bid", ["bid","bbalance","filler"]                       , $limit );
  my $md5_t_top = get_md5_limit($port, $dbname, "pgbench_tellers" , "tid", ["tid","bid","tbalance","filler"]                 , $limit );
  my $md5_h_top = get_md5_limit($port, $dbname, "pgbench_history" , "hid", ["hid","bid","aid","delta","mtime","filler","hid"], $limit );
  my $ctx = Digest::MD5->new;
  $ctx->add("$md5_a_top $md5_b_top $md5_t_top $md5_h_top");
  my $digest = $ctx->hexdigest;
  my $line = sprintf("$port %11s %7s %7s %7s   %-s %-s %-s %-s   %s"
       , '', '', '', ''
       , $md5_a_top, $md5_b_top, $md5_t_top, $md5_h_top,  substr($digest,0,md5_width()) );
  (\$digest, \$line);
}
sub get_md5_digest_pgbench {
  my ($dbh,$port,$dbname) = @_;
  my $cnt_a = $dbh->selectrow_arrayref("select count(*) from pgbench_accounts")->[0];
  my $cnt_b = $dbh->selectrow_arrayref("select count(*) from pgbench_branches")->[0];
  my $cnt_t = $dbh->selectrow_arrayref("select count(*) from pgbench_tellers" )->[0];
  my $cnt_h = $dbh->selectrow_arrayref("select count(*) from pgbench_history" )->[0];
  my $md5_a = get_md5($port, $dbname, "pgbench_accounts", "aid", ["aid","bid","abalance","filler"]);
  my $md5_b = get_md5($port, $dbname, "pgbench_branches", "bid", ["bid","bbalance","filler"]);
  my $md5_t = get_md5($port, $dbname, "pgbench_tellers" , "tid", ["tid","bid","tbalance","filler"] );
  my $md5_h = get_md5($port, $dbname, "pgbench_history" , "hid", ["hid","bid","aid","delta","mtime","filler","hid"]);
  my $ctx = Digest::MD5->new;
  $ctx->add("$md5_a $md5_b $md5_t $md5_h");
  my $digest = $ctx->hexdigest;
  my $line = sprintf("$port %11s %7s %7s %7s   %-s %-s %-s %-s   %s"
       , $comma{ $cnt_a }, $comma{ $cnt_b }, $comma{ $cnt_t }, $comma{ $cnt_h }
       , $md5_a, $md5_b, $md5_t, $md5_h,  substr($digest,0,md5_width()) );
  (\$digest, \$line);
}
sub md5_width { 7 }
sub get_md5 {
  my ($port, $dbname, $table, $key, $rcols) = @_;
  my ($out, $err);
# run        [ 'echo', "select " . join(",", @$rcols) . " from $table order by $key" ]
#     , '|', [ 'psql', '-qtAXp', $port, '-d', $dbname ]
#     , '|', [ 'md5sum' ]
#     , '|', [ 'cut', '-b', '1-'.md5_width() ], \$out, \$err ;
  my $outf = 'cascade.' . $port . '.' . $table . '.md5' ;
  run        [ 'echo', "select " . join(",", @$rcols) . " from $table order by $key" ]
      , '|', [ 'psql', '-qtAXp', $port, '-d', $dbname ]
      , '>', $outf;
  run        [ 'md5sum', $outf ], 
      , '|', [ 'cut', '-b', '1-'.md5_width() ], \$out, \$err ;
  chomp $out;
  $out; # md5
}
sub get_md5_limit {
  my ($port, $dbname, $table, $key, $rcols, $limit) = @_;
  my ($out, $err);
  run        [ 'echo', "select " . join(",", @$rcols) . " from $table order by $key /*desc*/ limit $limit" ]
      , '|', [ 'psql', '-qtAXp', $port, '-d', $dbname ]
      , '|', [ 'md5sum' ]
      , '|', [ 'cut', '-b', '1-'.md5_width() ], \$out, \$err ;
  chomp $out;
  $out; # md5
}
sub connectdb {
    my ($port) = @_;
    my $dbh;
    my ($tries,$max_tries) = (0, 5);
    while (( ! defined $dbh) && $tries <= $max_tries) {
      $tries++;
      eval {
        $dbh = DBI->connect( "dbi:Pg:port=$port;db=postgres;", undef, undef );
      };
      if ($@) {
        print "error while connecting (try $tries) to db postgres on port $port - $!\n";
        sleep 2;
        next;
      }
    }
    if (! defined $dbh) {
        die "error: \$dbh is invalid\n";
    }
    $ENV{ PGAPPNAME } = "cascade.pl:" . $port;
    $dbh;
}
sub init_instance            { my($project, $port) = @_; action_instance( "init"        , $project, $port); }
sub start_instance           { my($project, $port) = @_; action_instance( "start"       , $project, $port); }
sub start_instance_as_master { my($project, $port) = @_; action_instance( "start master", $project, $port); }
sub start_instance_as_slave  { my($project, $port) = @_; action_instance( "start slave" , $project, $port); }
sub stop_instance            { my($project, $port) = @_; action_instance( "stop"        , $project, $port); }
sub action_instance {
  my ($action, $project, $port) = @_;
  my ($data_dir, $xlog_dir) = get_PGDATA($port);
  my $pg_stuff_dir   = $PG_STUFF_DIR ;
  my $logfile        = $data_dir . "/../logfile.$port"; # 
  my $pg_version     = $PGVERSION;
  my $bin_dir        = $BIN_DIR  ;
  my $postgres       = $POSTGRES ;
  my $initdb         = $INITDB   ;
  my $pg_ctl         = $PG_CTL   ;
  my $pwd_encr       = 'scram-sha-256';  # 'md5';
  my $devel_file     = $DEVEL_FILE;
  my $data_checksums = undef;  # '--data-checksums';

  my @startdb = ();
  push(@startdb, $pg_ctl);
  push(@startdb, "--pgdata=$data_dir");
  push(@startdb, "--log=$logfile");
  push(@startdb, "--wait");
  push(@startdb, "start");

  if ($action eq "init") {
    my @initdb = ();
    push(@initdb, $initdb);
    push(@initdb, "--pgdata=$data_dir");
    push(@initdb, "--encoding=UTF8");
    push(@initdb, "--auth=$pwd_encr");
    push(@initdb, "--pwfile=$devel_file");
    push(@initdb, "--waldir=$xlog_dir");
    if ($data_checksums) { push(@initdb, "$data_checksums"); }
    my ($in, $out, $err);
    run \@initdb, \$in, \$out, \$err;
    if ($in ) { print ">", $in , "< (in)\n";  }
  # if ($out) { print ">", $out, "< (out)\n"; }
    if ($err) { print ">", $err, "< (err)\n"; }
    my $conf = $data_dir . "/postgresql.conf";
    open(my $fh, '>>', $conf) or die "error - could not open file '$conf' $!";
    print $fh "port = $port\n";
    print $fh "client_min_messages = warning\n";
    close $fh;
  }
  elsif ($action eq "start") {
    my @start = ();
    push(@start, $postgres);
    push(@start, "-D"); push(@start, "$data_dir");
    push(@start, "-p"); push(@start, "$port");
    my @options = @{ get_options($port) };
    for my $o (@options) { push(@start, substr($o,0,1) eq '-' ? '' : '--' . $o); }
    my ($in, $out, $err) = (undef,undef,undef);
    start \@start, \$in, \$out, \$err;
    if ($in ) { print ">", $in , "< (in)\n";  }
    if ($out) { print "" , $out, " (out)\n"; }
    if ($err) { print ">", $err, "< (err)\n"; }

    #  run pg_isready till the database is available:
    while ( 1 ) {
      run ['pg_isready', "--timeout=10", "-p$port", "-dpostgres" ], \$in, \$out, \$err;
      chomp $out;
      if ($in ) { print ">", $in , "< (in)\n";  }
     #if ($out) { print "" , $out, " (out)\n"; }
      if ($err) { print ">", $err, "< (err)\n"; }
      last if (index($out, 'accepting connections' ) >= 0);
      sleep 1;
    }
    my $dbh = connectdb($port);
    drop_subscriptions($dbh);
    drop_publications($dbh);
    $dbh->disconnect;
  }
  elsif ($action eq "stop") {
    my @stopdb = ();
    push(@stopdb, $pg_ctl);
    push(@stopdb, "--pgdata=$data_dir");
    push(@stopdb, "--wait");
    push(@stopdb, "stop");
    my ($in, $out, $err) = (undef,undef,undef);
    run \@stopdb, \$in, \$out, \$err;
    if ($in ) { print ">", $in , "< (in)\n";  }
   #if ($out) { print "", $out, "\n"; }
    if ($err) { print ">", $err, "< (err)\n"; }
  }
  else {
    print "-- invalid action [$action]\n";
  }
}
sub create_publication {
  my ($dbh,$pubname) = @_;
  my $rc = $dbh->do("create publication $pubname for all tables;");
}
sub create_subscription {
 my ($dbh, $dbname, $subname, $subport, $pubname, $pubport) = @_;
 my $appname = "casc:" . $subport . "<" . $pubport ;
 my $sql = "create subscription $subname
            connection 'port=" . $pubport . " dbname=${dbname} application_name=${appname}'
            publication $pubname with(enabled=false, slot_name=${subname}_${subport});" ;
 my $rc = $dbh->do( $sql );
 $rc = $dbh->do("alter subscription $subname enable;");
}
sub usage {
  print '
  cascade.pl usage:
  GetOptions(  "repeats=i"   => \$repeats  # 1
             , "scale=i"     => \$scale    # 1
             , "clients=i"   => \$clients  # 8
             , "threads=i"   => \$threads  # 8
             , "duration=i"  => \$duration # 1
             , "instances=i" => \$num_proj # 6
             , "waiting=i"   => \$waiting  # 5
             , "help"        => \$help
  )

 example:

 ./cascade.pl --instances=6 --scale=1 --clients=64 --duration=1 --repeats=1
'
}
sub get_PGDATA{
  my ($port) = @_;
  if (! exists $pg_data_dir{$port} )
  {
    my $dir  = tempdir( CLEANUP => 0, DIR => "${ROOT_TMPDIR}", TEMPLATE => $port . 'XXXXXX'  );
  # my @name = $dir =~ ,([a-zA-Z0-9_]+)$,g;
    $pg_data_dir{ $port } = $dir . "/data";
    $pg_xlog_dir{ $port } = $dir . "/wal";
  # $pg_data_dir{ $name } = $dir . "/data";
  # $pg_xlog_dir{ $name } = $dir . "/wal";
  }
  ($pg_data_dir{$port}, $pg_xlog_dir{$port});
}
sub get_master_options { my ($port) = @_; get_options($port); }
sub get_slave_options  { my ($port) = @_; get_options($port); }
sub get_options {
  my ($port) = @_;
  my ($data_dir, $xlog_dir) = get_PGDATA($port);
  my $server_dir                        = $data_dir . "/.." ;
# my $max_wal_senders                   = 40;  # publication side
# my $max_replication_slots             = 40;  # publication side and subscription side
# my $max_worker_processes              = 42;  # subscription side
# my $max_logical_replication_workers   = 40;  # subscription side
# my $max_sync_workers_per_subscription =  6;  # subscription side
  my @o = ();
  push(@o, "wal_level=logical");
# push(@o, "max_replication_slots=$max_replication_slots");
# push(@o, "max_worker_processes=$max_worker_processes");
# push(@o, "max_logical_replication_workers=$max_logical_replication_workers");
# push(@o, "max_wal_senders=$max_wal_senders");
# push(@o, "max_sync_workers_per_subscription=$max_sync_workers_per_subscription");
  push(@o, "logging_collector=on");
  push(@o, "log_directory=$server_dir");
  push(@o, "log_filename=logfile.${port}");
  push(@o, "log_replication_commands=on");
# push(@o, "max_connections=320");
  push(@o, "autovacuum=off");
  \@o;
}
