#!/usr/bin/env bash

set -euo pipefail

pg_bin=""
workdir=""
port_base=55432
slots=100
samples=20
interval=30
dump_timeout=45
keep=0

primary_port=""
standby_port=""
primary_dir=""
standby_dir=""
primary_log=""
standby_log=""
sync_psql_pid=""

usage()
{
	cat <<EOF
Usage:
  $0 --pg-bin DIR [--workdir DIR] [--slots N] [--samples N]
     [--interval SEC] [--dump-timeout SEC] [--port-base PORT] [--keep]

Creates a primary and standby, creates failover logical slots on the primary,
forces pg_sync_replication_slots() on the standby to wait/retry, then samples
the waiting backend with pg_log_backend_memory_contexts().

Output is CSV: timestamp,backend_pid,total_bytes,delta_bytes
EOF
}

die()
{
	echo "error: $*" >&2
	exit 1
}

run()
{
	echo "+ $*" >&2
	"$@"
}

psql_primary()
{
	"$pg_bin/psql" -X -v ON_ERROR_STOP=1 \
		"host=$workdir port=$primary_port dbname=postgres" "$@"
}

psql_standby()
{
	"$pg_bin/psql" -X -v ON_ERROR_STOP=1 \
		"host=$workdir port=$standby_port dbname=postgres" "$@"
}

log_size()
{
	wc -c < "$standby_log" | tr -d '[:space:]'
}

extract_grand_total_since()
{
	local offset=$1

	tail -c +"$((offset + 1))" "$standby_log" 2>/dev/null |
		awk '
			/Grand total:/ {
				for (i = 1; i <= NF; i++)
				{
					if ($i == "total:")
					{
						gsub(/[^0-9]/, "", $(i + 1));
						total = $(i + 1);
					}
				}
			}
			END {
				if (total != "")
					print total;
			}
		'
}

wait_sql()
{
	local node=$1
	local sql=$2
	local timeout=${3:-60}

	for _ in $(seq 1 "$timeout"); do
		if [ "$node" = primary ]; then
			if psql_primary -Atqc "$sql" 2>/dev/null | grep -qx t; then
				return 0
			fi
		else
			if psql_standby -Atqc "$sql" 2>/dev/null | grep -qx t; then
				return 0
			fi
		fi
		sleep 1
	done

	return 1
}

wait_for_log()
{
	local pattern=$1
	local offset=$2
	local timeout=${3:-60}

	for _ in $(seq 1 "$timeout"); do
		if tail -c +"$((offset + 1))" "$standby_log" 2>/dev/null |
			grep "$pattern" >/dev/null; then
			return 0
		fi

		if [ -n "$sync_psql_pid" ] && ! kill -0 "$sync_psql_pid" 2>/dev/null; then
			break
		fi

		sleep 1
	done

	return 1
}

wait_for_file()
{
	local file=$1
	local timeout=${2:-60}

	for _ in $(seq 1 "$timeout"); do
		[ -s "$file" ] && return 0
		sleep 1
	done

	return 1
}

stop_cluster()
{
	local datadir=$1

	if [ -n "$datadir" ] && [ -d "$datadir" ]; then
		"$pg_bin/pg_ctl" -D "$datadir" -m immediate stop >/dev/null 2>&1 || true
	fi
}

prepare_workdir()
{
	mkdir -p "$workdir"

	# Allow reusing a --keep workdir by removing only files created by this
	# harness.  Leave unrelated files in the directory alone.
	stop_cluster "$standby_dir"
	stop_cluster "$primary_dir"

	rm -rf "$primary_dir" "$standby_dir"
	rm -f "$primary_log" "$standby_log"
	rm -f "$workdir/create_slots.sql" "$workdir/run_slotsync.sql"
	rm -f "$workdir/slotsync.out" "$workdir/slotsync.err"
	rm -f "$workdir/slotsync_backend.pid"
}

cleanup()
{
	if [ -n "$sync_psql_pid" ] && kill -0 "$sync_psql_pid" 2>/dev/null; then
		kill "$sync_psql_pid" 2>/dev/null || true
		wait "$sync_psql_pid" 2>/dev/null || true
	fi

	stop_cluster "$standby_dir"
	stop_cluster "$primary_dir"

	if [ "$keep" -eq 0 ] && [ -n "$workdir" ] && [ -d "$workdir" ]; then
		rm -rf "$workdir"
	else
		echo "kept workdir: $workdir" >&2
	fi
}

while [ $# -gt 0 ]; do
	case "$1" in
		--pg-bin)
			pg_bin=${2:?}
			shift 2
			;;
		--workdir)
			workdir=${2:?}
			shift 2
			;;
		--slots)
			slots=${2:?}
			shift 2
			;;
		--samples)
			samples=${2:?}
			shift 2
			;;
		--interval)
			interval=${2:?}
			shift 2
			;;
		--dump-timeout)
			dump_timeout=${2:?}
			shift 2
			;;
		--port-base)
			port_base=${2:?}
			shift 2
			;;
		--keep)
			keep=1
			shift
			;;
		-h|--help)
			usage
			exit 0
			;;
		*)
			die "unknown option: $1"
			;;
	esac
done

[ -n "$pg_bin" ] || die "--pg-bin is required"
[ -x "$pg_bin/postgres" ] || die "postgres not found in --pg-bin: $pg_bin"
[ "$slots" -gt 0 ] || die "--slots must be positive"

if [ -z "$workdir" ]; then
	workdir=$(mktemp -d "${TMPDIR:-/tmp}/slotsync-memory.XXXXXX")
fi

primary_port=$port_base
standby_port=$((port_base + 1))
primary_dir="$workdir/primary"
standby_dir="$workdir/standby"
primary_log="$workdir/primary.log"
standby_log="$workdir/standby.log"

trap cleanup EXIT
prepare_workdir

echo "workdir=$workdir" >&2
echo "primary_port=$primary_port standby_port=$standby_port slots=$slots" >&2

run "$pg_bin/initdb" -D "$primary_dir" -A trust >/dev/null

cat >> "$primary_dir/postgresql.conf" <<EOF
listen_addresses = ''
unix_socket_directories = '$workdir'
port = $primary_port
wal_level = logical
max_wal_senders = 20
max_replication_slots = $((slots + 10))
max_connections = 100
log_min_messages = debug1
EOF

cat >> "$primary_dir/pg_hba.conf" <<EOF
local all all trust
local replication all trust
EOF

run "$pg_bin/pg_ctl" -D "$primary_dir" -l "$primary_log" -w start >/dev/null

wait_sql primary "SELECT true" 30 || die "primary did not start"

psql_primary -qAtc "SELECT pg_create_physical_replication_slot('standby_slot');" >/dev/null

run "$pg_bin/pg_basebackup" -D "$standby_dir" -h "$workdir" -p "$primary_port" \
	-d "dbname=postgres" -X stream >/dev/null

cat >> "$standby_dir/postgresql.conf" <<EOF
listen_addresses = ''
unix_socket_directories = '$workdir'
port = $standby_port
hot_standby = on
hot_standby_feedback = on
primary_conninfo = 'host=$workdir port=$primary_port dbname=postgres application_name=standby'
primary_slot_name = 'standby_slot'
max_replication_slots = $((slots + 10))
max_connections = 100
log_min_messages = debug1
EOF

touch "$standby_dir/standby.signal"

run "$pg_bin/pg_ctl" -D "$standby_dir" -l "$standby_log" -w start >/dev/null

wait_sql standby "SELECT pg_is_in_recovery()" 30 || die "standby did not enter recovery"

slot_sql="$workdir/create_slots.sql"
for i in $(seq 1 "$slots"); do
	printf "SELECT pg_create_logical_replication_slot('memslot_%s', 'pgoutput', false, false, true);\n" "$i"
done > "$slot_sql"
psql_primary -q -f "$slot_sql" >/dev/null

psql_primary -q <<'SQL' >/dev/null
CREATE TABLE slotsync_memory_wait(a int);
INSERT INTO slotsync_memory_wait SELECT generate_series(1, 10000);
CHECKPOINT;
SELECT pg_switch_wal();
SQL

target_lsn=$(psql_primary -Atqc "SELECT pg_current_wal_flush_lsn();")
wait_sql standby "SELECT pg_last_wal_replay_lsn() >= '$target_lsn'::pg_lsn" 60 ||
	die "standby did not catch up to $target_lsn"

# Advance the standby restart point so the local slot reservation is ahead of
# the remote slot positions, forcing pg_sync_replication_slots() to retry.
psql_standby -qAtc "CHECKPOINT;" >/dev/null || true

pid_file="$workdir/slotsync_backend.pid"
sync_sql="$workdir/run_slotsync.sql"

cat > "$sync_sql" <<EOF
\\pset tuples_only on
\\pset format unaligned
\\o $pid_file
SELECT pg_backend_pid();
\\o
SELECT pg_sync_replication_slots();
EOF

log_offset=$(log_size)
"$pg_bin/psql" -X -v ON_ERROR_STOP=1 \
	"host=$workdir port=$standby_port dbname=postgres" \
	-f "$sync_sql" > "$workdir/slotsync.out" 2> "$workdir/slotsync.err" &
sync_psql_pid=$!

wait_for_file "$pid_file" 30 || die "timed out waiting for slotsync backend pid"
backend_pid=$(tr -d '[:space:]' < "$pid_file")

wait_for_log "could not synchronize replication slot" "$log_offset" 60 ||
	die "pg_sync_replication_slots() did not enter the expected retry path"

echo "timestamp,backend_pid,total_bytes,delta_bytes"

previous_total=""
offset=$(log_size)

for _ in $(seq 1 "$samples"); do
	timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

	if ! kill -0 "$sync_psql_pid" 2>/dev/null; then
		echo "$timestamp,$backend_pid,,"
		break
	fi

	ok=$(psql_standby -Atqc "SELECT pg_log_backend_memory_contexts($backend_pid);" 2>/dev/null || true)

	if [ "$ok" != "t" ]; then
		echo "warning: pg_log_backend_memory_contexts($backend_pid) returned '$ok'" >&2
		echo "$timestamp,$backend_pid,,"
		break
	fi

	total=""
	for _ in $(seq 1 "$dump_timeout"); do
		total=$(extract_grand_total_since "$offset")
		[ -n "$total" ] && break
		sleep 1
	done

	if [ -z "$total" ]; then
		echo "warning: no memory dump observed within ${dump_timeout}s for pid $backend_pid" >&2
	fi

	offset=$(log_size)

	if [ -n "$total" ] && [ -n "$previous_total" ]; then
		delta=$((total - previous_total))
	else
		delta=""
	fi

	echo "$timestamp,$backend_pid,$total,$delta"

	if [ -n "$total" ]; then
		previous_total=$total
	fi

	sleep "$interval"
done
