#!/usr/bin/perl
#
# Test a deadlock between a backend and the startup processes when processing
# XLOG_PRUNE_PAGE wal record. The test is based on 031_recovery_conflict.pl
# vanilla test.
#

use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Test settings
my $deadlock_timeout = 3000; # ms
my $log_startup_progress_interval = 1000; # ms
my $testdb = "testdb";
my $backup_name = 'my_backup';
my $table1 = "table1";
my $table2 = "table2";

# Set up nodes
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
$node_primary->init(allows_streaming => 1);

$node_primary->append_conf('postgresql.conf', qq[

	# for deadlock test
	max_prepared_transactions = 10

	# wait some to test the wait paths as well, but not long for obvious reasons
	max_standby_streaming_delay = -1

	# Some of the recovery conflict logging code only gets exercised after
	# deadlock_timeout. The test doesn't rely on that additional output, but it's
	# nice to get some minimal coverage of that code.
	log_recovery_conflict_waits = on

	log_startup_progress_interval = ${log_startup_progress_interval}
	deadlock_timeout = ${deadlock_timeout}
	autovacuum = off
]);

$node_primary->start;
$node_primary->backup($backup_name);

my $node_standby = PostgreSQL::Test::Cluster->new('standby');
$node_standby->init_from_backup($node_primary, $backup_name, has_streaming => 1);
$node_standby->start();

my $log_location = -s $node_standby->logfile;

# use a new database, to trigger database recovery conflict
$node_primary->safe_psql('postgres', "CREATE DATABASE $testdb");
#$node_primary->safe_psql('postgres', "CREATE TABLE $table2(a int, b int)");

$node_primary->safe_psql($testdb, qq[
	CREATE TABLE $table1(a int, b int);
	CREATE TABLE $table2(a int, b int);
	INSERT INTO $table1 VALUES (1);
]);

# Generate a few dead rows, to later be cleaned up by vacuum. Then acquire a
# lock on another relation in a prepared xact, so it's held continuously by
# the startup process. The standby psql will block acquiring that lock while
# holding a pin that vacuum needs, triggering the deadlock.
$node_primary->safe_psql($testdb, qq[
	BEGIN;
	INSERT INTO $table1(a) SELECT generate_series(1, 100) i;
	ROLLBACK;
]);

$node_primary->safe_psql($testdb, qq[
	BEGIN;
	LOCK TABLE $table2;
	PREPARE TRANSACTION 'lock';
	INSERT INTO $table1(a) VALUES (170);
	SELECT txid_current();
]);

$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush'));

my $psql_standby = $node_standby->background_psql($testdb, on_error_stop => 0);

$psql_standby->query_until(qr/^1$/m, qq[
	BEGIN;
	-- hold pin
	DECLARE test_recovery_conflict_cursor CURSOR FOR SELECT a FROM $table1;
	FETCH FORWARD FROM test_recovery_conflict_cursor;
	-- wait for lock held by prepared transaction
	SELECT * FROM $table2;
]);

ok(1, "cursor holding conflicting pin, also waiting for lock, established");

# VACUUM will prune away rows, causing a buffer pin conflict, while standby
# psql is waiting on lock
$node_primary->safe_psql($testdb, qq[VACUUM $table1;]);
$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush'));

# Wait and check that the deadlock detector was triggered and found a deadlock
# in the backend process (not in startup process).
check_conflict_log("User transaction caused buffer deadlock with recovery.");

# clean up for next tests
$node_primary->safe_psql($testdb, qq[ROLLBACK PREPARED 'lock';]);

# Check that expected number of conflicts show in pg_stat_database. Needs to
# be tested before database is dropped, for obvious reasons.
my $nconflicts = $node_standby->safe_psql($testdb, "SELECT conflicts FROM pg_stat_database WHERE datname = '$testdb'");
note("number of recovery conflicts: $nconflicts");
#is($nconflicts, 1, qq[one recovery conflict shown in pg_stat_database]);

$psql_standby->quit();

$node_standby->stop();
$node_primary->stop();

sub check_conflict_log
{
	my $message = shift;
	my $old_log_location = $log_location;

	$log_location = $node_standby->wait_for_log(qr/$message/, $log_location);

	cmp_ok($log_location, '>', $old_log_location,
		"logfile contains terminated connection due to recovery conflict"
	);
}

sub check_conflict_stat
{
	my $conflict_type = shift;

	my $count = $node_standby->safe_psql($testdb,
		qq[SELECT confl_$conflict_type FROM pg_stat_database_conflicts WHERE datname='$testdb';]
	);

	is($count, 1, "stats show conflict on standby");
}

done_testing();