use strict;
use warnings FATAL => 'all';

use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;

use Test::More;

# Test: This test reproduces an assertion in walsender,c that can be triggered
# when a continuation record is overwritten by switching to a new timeline on the primary
#
# The test creates a cascade: Primary -> Upstream-Replica -> Downstream-Replica
# then forces the primary to zeroed invalid record and to switch to a new timeline.
# The Upstream-Replica then tries to send data to Downstream-Replica and hot the assertion.
#
# Based on 026_overwrite_contrecord.pl test
#

my $primary = PostgreSQL::Test::Cluster->new('primary');
$primary->init(allows_streaming => 1);
$primary->append_conf(
	'postgresql.conf', qq(
wal_compression = off
autovacuum = off
));
$primary->start();

$primary->safe_psql('postgres', 'create table filler (a int, b text)');

# Now consume all remaining room in the current WAL segment, leaving
# space enough only for the start of a largish record.
$primary->safe_psql(
	'postgres', q{
DO $$
DECLARE
    wal_segsize int := setting::int FROM pg_settings WHERE name = 'wal_segment_size';
    remain int;
    iters  int := 0;
BEGIN
    LOOP
        INSERT into filler
        select g, repeat(encode(sha256(g::text::bytea), 'hex'), (random() * 15 + 1)::int)
        from generate_series(1, 10) g;

        remain := wal_segsize - (pg_current_wal_insert_lsn() - '0/0') % wal_segsize;
        IF remain < 2 * setting::int from pg_settings where name = 'block_size' THEN
            RAISE log 'exiting after % iterations, % bytes to end of WAL segment', iters, remain;
            EXIT;
        END IF;
        iters := iters + 1;
    END LOOP;
END
$$;
});

my $initfile = $primary->safe_psql('postgres', 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
my $lsn_invalid_record = $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
print(qq(\nInvalid contrecord lsn: $lsn_invalid_record\n));
$primary->safe_psql('postgres',
	qq{SELECT pg_logical_emit_message(true, 'assert_in_cascade_walsender', repeat('xyzxz', 123456))}
);
my $endfile = $primary->safe_psql('postgres',
	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
ok($initfile ne $endfile, "$initfile differs from $endfile");

$primary->stop('immediate');

## Remove the WAL segment containing the continuation record to simulate lost WAL
unlink $primary->basedir . "/pgdata/pg_wal/$endfile"
  or die "could not unlink " . $primary->basedir . "/pgdata/pg_wal/$endfile: $!";

## Create cascading replication setup
$primary->backup_fs_cold('backup');

my $upstream_replica = PostgreSQL::Test::Cluster->new('upstream_replica');
$upstream_replica->init_from_backup($primary, 'backup', has_streaming => 1);
$upstream_replica->start();

$upstream_replica->backup('backup');
my $downstream_replica = PostgreSQL::Test::Cluster->new('downstream_replica');
$downstream_replica->init_from_backup($upstream_replica, 'backup', has_streaming => 1);
$downstream_replica->start();

## Set standby mode and promote to trigger timeline switch
$primary->set_standby_mode();
$primary->start();
$primary->promote();

sleep(15);

my $log = slurp_file($upstream_replica->logfile);
unlike(
	$log,
	 qr/TRAP:.*failed Assert.*sentPtr <= SendRqstPtr/i,
	 "upstream replica walsender did not hit the assertion");


done_testing();
