# Copyright (c) 2025, PostgreSQL Global Development Group

# TAP test for logical replication and rb_total_size
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Empirically determined size of WAL per row in the test table
my $wal_size_per_row = 233;
# Set up publisher node
my $publisher = PostgreSQL::Test::Cluster->new('publisher');
$publisher->init(allows_streaming => 'logical');
$publisher->append_conf('postgresql.conf', 'wal_level = logical');
$publisher->append_conf('postgresql.conf', 'max_replication_slots = 4');
$publisher->append_conf('postgresql.conf', 'max_wal_senders = 4');
$publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
$publisher->append_conf('postgresql.conf', 'max_prepared_transactions = 100');
$publisher->start;

# Set up subscriber node
my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$subscriber->init(allows_streaming => 'logical');
$subscriber->append_conf('postgresql.conf', 'wal_level = logical');
$subscriber->append_conf('postgresql.conf', 'max_prepared_transactions = 100');
$subscriber->start;
# Create test table and publication on publisher
$publisher->safe_psql('postgres', 'CREATE TABLE tab_rep (a int primary key, b char(100));');
$publisher->safe_psql('postgres', 'CREATE PUBLICATION pub FOR TABLE tab_rep;');
# Create a dummy table for non-replicated transactions
$publisher->safe_psql('postgres', 'CREATE TABLE dummy_tab (id int);');
# Create test table and subscription on subscriber
$subscriber->safe_psql('postgres', 'CREATE TABLE tab_rep (a int primary key, b char(100));');
my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
$subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (two_phase = true);");
$subscriber->wait_for_subscription_sync($publisher, 'sub');

# Case: Committed transaction
my $bpgsql = $publisher->background_psql('postgres', on_error_stop => 1);
$bpgsql->query_safe('BEGIN;');
$bpgsql->query_safe("INSERT INTO tab_rep VALUES (1, 'foo'), (2, 'bar');");
# Dummy commit to force stats update
$publisher->safe_psql('postgres', 'INSERT INTO dummy_tab VALUES (1);');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"), 2 * $wal_size_per_row, 'expected rb_size before commit');
$bpgsql->query_safe('COMMIT;');
$bpgsql->quit;
$publisher->wait_for_catchup('sub');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"), 0, 'expected rb_size after commit');

# Case: Aborted transaction
$bpgsql = $publisher->background_psql('postgres', on_error_stop => 1);
$bpgsql->query_safe('BEGIN;');
$bpgsql->query_safe("INSERT INTO tab_rep VALUES (3, 'foo'), (4, 'bar');");
# Dummy commit to force stats update
$publisher->safe_psql('postgres', 'INSERT INTO dummy_tab VALUES (1);');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"),  2 * $wal_size_per_row, 'expected rb_size before abort');
$bpgsql->query_safe('ABORT;');
$bpgsql->quit;
$publisher->wait_for_catchup('sub');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"), 0, 'expected rb_size after abort');

# Case: Prepared transaction (commit and abort)

# Prepare a transaction
$bpgsql = $publisher->background_psql('postgres', on_error_stop => 1);
$bpgsql->query_safe('BEGIN;');
$bpgsql->query_safe("INSERT INTO tab_rep VALUES (10001, 'prepared'), (10002, 'prepared');");
# Dummy commit to force stats update
$publisher->safe_psql('postgres', 'INSERT INTO dummy_tab VALUES (1);');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"),
    2 * $wal_size_per_row, 'expected rb_size before prepare');
$bpgsql->query_safe("PREPARE TRANSACTION 'ptx1';");
$bpgsql->quit;
$publisher->wait_for_catchup('sub');
# After prepare, the changes should be removed from the reorder buffer
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"),
    0, 'expected rb_size after prepare');
# Commit the prepared transaction
$publisher->safe_psql('postgres', "COMMIT PREPARED 'ptx1';");
$publisher->wait_for_catchup('sub');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"),
    0, 'expected rb_size after commit prepared');

# Prepare and abort a transaction
$bpgsql = $publisher->background_psql('postgres', on_error_stop => 1);
$bpgsql->query_safe('BEGIN;');
$bpgsql->query_safe("INSERT INTO tab_rep VALUES (10003, 'prepared'), (10004, 'prepared');");
$publisher->safe_psql('postgres', 'INSERT INTO dummy_tab VALUES (1);');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"),
    2 * $wal_size_per_row, 'expected rb_size before prepare (abort case)');
$bpgsql->query_safe("PREPARE TRANSACTION 'ptx2';");
$bpgsql->quit;
$publisher->wait_for_catchup('sub');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"),
    0, 'expected rb_size after prepare (abort case)');
$publisher->safe_psql('postgres', "ROLLBACK PREPARED 'ptx2';");
$publisher->wait_for_catchup('sub');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"),
    0, 'expected rb_size after abort prepared');

# Case: Multiple concurrent transactions to fill the reorder buffer and trigger streaming
my @bpg_conns;
my $num_txns = 5;
my $rows_per_txn = 200; # Adjust so total rows triggers streaming
for my $i (0..$num_txns-1) {
    $bpg_conns[$i] = $publisher->background_psql('postgres', on_error_stop => 1);
    $bpg_conns[$i]->query_safe('BEGIN;');
    $bpg_conns[$i]->query_safe("INSERT INTO tab_rep SELECT ".(5+$i*$rows_per_txn)."+g, 'streamed' FROM generate_series(0, ".($rows_per_txn-1).") g;");
    # Dummy commit to force stats update
    $publisher->safe_psql('postgres', 'INSERT INTO dummy_tab VALUES (1);');
    is(
        $publisher->safe_psql('postgres',
            "SELECT rb_total_size FROM pg_stat_replication;"
        ),
        ($i + 1) * $rows_per_txn * $wal_size_per_row,
        'expected rb_size before finish '.$i);
}
# Commit some transactions, abort others, and check rb_total_size after each
for my $i (0..$num_txns-1) {
    if ($i % 2 == 0) {
        $bpg_conns[$i]->query_safe('COMMIT;');
    } else {
        $bpg_conns[$i]->query_safe('ABORT;');
    }
    $bpg_conns[$i]->quit;
    $publisher->wait_for_catchup('sub');
    is($publisher->safe_psql('postgres',
        "SELECT rb_total_size FROM pg_stat_replication;"
    ),
    ($num_txns - $i - 1) * $rows_per_txn * $wal_size_per_row,
    'expected rb_size after finish '.$i);
}
is($subscriber->safe_psql('postgres', 'SELECT count(*) FROM tab_rep;'), 2 + int(($num_txns+1)/2) * $rows_per_txn, 'all data replicated after mixed streaming commits and aborts');
is($publisher->safe_psql('postgres',
    "SELECT rb_total_size FROM pg_stat_replication;"
), 0, 'expected rb_size after finishing all streaming transactions');

# TODO: prepared transactions (aborted, committed, streamed, spilled), spilled transactions (aborted, committed), subtransactions (aborted, committed, streamed, spilled)
done_testing();
