#!/bin/bash

set -e -u

ulimit -c unlimited

# dump shmem in cores
echo 15 > /proc/self/coredump_filter

DATADIR=96slottest
PGPORT_MASTER=5144
PGPORT_REPLICA=$(( $PGPORT_MASTER + 1 ))
export PGUSER=postgres
export PGDATABASE=postgres

export PATH=$HOME/pg/96/bin:$PATH

if [ -e $DATADIR ]; then
    pg_ctl -D $DATADIR -w stop -m immediate || true
    rm -rf $DATADIR ${DATADIR}.log
fi

if [ -e ${DATADIR}-replica ]; then
    pg_ctl -D ${DATADIR}-replica -w stop -m immediate || true
    rm -rf ${DATADIR}-replica ${DATADIR}-replica.log
fi

rm -rf xlogs

postmaster_opts='-c max_replication_slots=12 -c wal_level=logical -c max_wal_senders=10 -c track_commit_timestamp=on -c wal_keep_segments=100 -c log_min_messages=debug2 -c log_error_verbosity=verbose'

echo "Initdb'ing master"
initdb -D $DATADIR -A trust -N -U postgres > ${DATADIR}-initdb.log

cat > $DATADIR/pg_hba.conf <<'__END__'
# TYPE  DATABASE        USER            ADDRESS                 METHOD
local   all             all                                     trust
host    all             all             127.0.0.1/32            trust
host    all             all             ::1/128                 trust
local   replication     postgres                                trust
host    replication     postgres        127.0.0.1/32            trust
host    replication     postgres        ::1/128                 trust
__END__

echo "Starting master"
PGPORT=$PGPORT_MASTER pg_ctl -l ${DATADIR}.log -D $DATADIR -w start -o "$postmaster_opts"

# A function to wait for replica catchup
PGPORT=$PGPORT_MASTER psql <<'__END__'
CREATE OR REPLACE FUNCTION public.pg_xlog_wait_remote_apply(i_pos pg_lsn, i_pid integer) RETURNS VOID
AS $FUNC$
BEGIN
    WHILE EXISTS(SELECT true FROM pg_stat_get_wal_senders() s WHERE s.flush_location < i_pos AND (i_pid = 0 OR s.pid = i_pid)) LOOP
                PERFORM pg_sleep(0.01);
        END LOOP;
END;$FUNC$
LANGUAGE plpgsql;
__END__

wait_for_catchup()
{
  PGPORT=$PGPORT_MASTER psql -c "SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);"
}

print_slots_replica()
{
  echo "Replica slots:"
  PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_replication_slots"
}

print_slots_master()
{
  echo "Master slots:"
  PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_replication_slots"
}

print_slots()
{
    print_slots_master
    print_slots_replica
}

PGPID=$(pg_ctl -D ${DATADIR} status | awk '/(PID:.*)/ { found = match($0, "\\(PID:\\s([0-9]*)\\)", arr); if (found) { print arr[1]; } }')
echo "Master postmaster PID is ${PGPID}"

echo "Creating before_basebackup slots"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_logical_replication_slot('before_basebackup', 'test_decoding', true);"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_logical_replication_slot('before_basebackup_nf', 'test_decoding');"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_physical_replication_slot('before_basebackup_ph', true, true);"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_physical_replication_slot('before_basebackup_ph_nf');"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_logical_replication_slot('drop_test', 'test_decoding', true);"

print_slots_master

# Crash the master and restart it. The main point here is to ensure we don't
# delete non-failover slots during normal crash recovery.
PGPORT=$PGPORT_MASTER pg_ctl -D $DATADIR -w stop -m immediate
PGPORT=$PGPORT_MASTER pg_ctl -l ${DATADIR}.log -D $DATADIR -w start -o "$postmaster_opts"

sleep 1
print_slots_master

echo "Making basebackup"
PGPORT=$PGPORT_MASTER pg_basebackup -D ${DATADIR}-replica -X stream -R

echo "replica xlogs:"
ls ${DATADIR}-replica/pg_xlog

echo "Starting the replica"
PGPORT=$PGPORT_REPLICA pg_ctl -l ${DATADIR}-replica.log -D ${DATADIR}-replica -w start -o "$postmaster_opts -c hot_standby=on"

PGPID=$(pg_ctl -D "${DATADIR}-replica" status | awk '/(PID:.*)/ { found = match($0, "\\(PID:\\s([0-9]*)\\)", arr); if (found) { print arr[1]; } }')
echo "Replica postmaster PID is ${PGPID}"

#echo "---- attach now -----"
#sleep 20
#echo "---------------------"

PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_logical_replication_slot('after_basebackup', 'test_decoding', true);"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_logical_replication_slot('after_basebackup_nf', 'test_decoding');"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_physical_replication_slot('after_basebackup_ph', true, true);"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_physical_replication_slot('after_basebackup_ph_nf');"

echo "Slots after creation of before and after slots:"
print_slots

# expect this to fail
set +e
echo "Attempting creation of non-failover logical slot on replica (WILLFAIL)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_create_logical_replication_slot('on_replica', 'test_decoding');"
echo "Attempting creation of failover logical slot on replica (WILLFAIL)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_create_logical_replication_slot('on_replica', 'test_decoding', true);"
echo "Attempting creation of failover physical slot on replica (WILLFAIL)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_create_physical_replication_slot('on_replica', true, true);"
set -e

# this must succeed. Using pg_receivexlog just for a different interface; it'd
# be fine to use SQL instead, the two are equivalent.
echo "Creating non-failover physical slot on replica"
PGPORT=$PGPORT_REPLICA pg_receivexlog -S "phys_conflict_test" --create-slot

wait_for_catchup

mkdir -p xlogs

echo "Attempt replay from physical replica failover slot (WILLFAIL)"
set +e
PGPORT=$PGPORT_REPLICA pg_receivexlog -S "after_basebackup_ph" -D xlogs --no-loop
set -e

echo "Start replay from physical replica non-replay slot"
PGPORT=$PGPORT_REPLICA pg_receivexlog -S "phys_conflict_test" -D xlogs --no-loop &

print_slots_replica

# Now make a failover slot on the master with the same name. Rather than causing fireworks
# this should terminate our pg_receivexlog and clobber the slot.
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_create_physical_replication_slot('phys_conflict_test', true, true);"

# pg_recievexlog should terminate
wait

# drop the slot from the replica
PGPORT=$PGPORT_MASTER pg_receivexlog -S "phys_conflict_test" --drop-slot

# Test dropping of a failover slot on the master
PGPORT=$PGPORT_MASTER pg_receivexlog -S "drop_test" --drop-slot

wait_for_catchup
echo "Slots:"
print_slots

# do some writes
PGPORT=$PGPORT_MASTER psql -c "CREATE TABLE test_tab(blah text, msg text); INSERT INTO test_tab(blah, msg) SELECT x::text, 'onmaster-beforeread' FROM generate_series(0,1) x;"

wait_for_catchup
echo "Slots:"
print_slots

# Attempt to read from replica's slots. This must fail, but not with an
# error indicating that the slots are missing.
set +e
echo "REPLICA: Trying to replay logical slot before_basebackup from replica (WILLFAIL)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL);"
echo "REPLICA: Trying to replay logical slot after_basebackup from replica (WILLFAIL)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL);"
set -e

# Read from the master. These must succeed.
echo "MASTER: Trying to replay logical slot before_basebackup from master (expect rows 0,1)"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL);"
echo "MASTER: Trying to replay logical slot after_basebackup from master (expect rows 0,1)"
PGPORT=$PGPORT_MASTER psql -c "SELECT * FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL);"

# do some more writes
#
# Because we replayed the slot up to the first rows the failover slot replay should only see these
# rows not the first ones.
#
PGPORT=$PGPORT_MASTER psql -c "INSERT INTO test_tab(blah, msg) SELECT x::text, 'onmaster-afterread' FROM generate_series(2,3) x;"

wait_for_catchup
print_slots

# kill the master and promote the replica
echo "Killing master"
PGPORT=$PGPORT_MASTER pg_ctl -D $DATADIR -w stop -m fast
echo "Promoting replica"
PGPORT=$PGPORT_REPLICA pg_ctl -D ${DATADIR}-replica -w promote

sleep 1

if ! PGPORT=$PGPORT_REPLICA pg_isready -t 10 ; then
    echo "Unable to connect to promoted replica"
    exit 1
fi

while test "$(PGPORT=$PGPORT_REPLICA psql -qAtc 'SELECT pg_is_in_recovery()')" = 't'
do
    echo "Waiting for promotion to finish..."
    sleep 1
done

sleep 1
echo "Replica up"

print_slots_replica

# do some more writes on the promoted replica / new master
# These should be visible to both slots, but possibly only after a second call
# (since we don't follow the timeline switch within one call)
PGPORT=$PGPORT_REPLICA psql -c "INSERT INTO test_tab(blah, msg) SELECT x::text, 'onreplica' FROM generate_series(4,5) x;"

echo "Final LSN is $(PGPORT=$PGPORT_REPLICA psql -qAt -c "SELECT pg_current_xlog_insert_location();")"

# read from the slots again. They should still exist and be readable.
echo "REPLICA: Slot changes for after_basebackup from promoted replica (expect rows 2+)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL);"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL);"
echo "REPLICA: Slot changes for before_basebackup from promoted replica (expect rows 2+)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL);"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL);"

print_slots_replica

echo "--- restarting replica ---"

echo "(stopping)"
PGPORT=$PGPORT_REPLICA pg_ctl -D ${DATADIR}-replica -w stop -m fast
echo "(starting)"
PGPORT=$PGPORT_REPLICA pg_ctl -l ${DATADIR}-replica.log -D ${DATADIR}-replica -w start -o "$postmaster_opts"

sleep 1
print_slots_replica

# Should work and only replay whatever was left, if anything
set +e
echo "REPLICA: Slot changes for after_basebackup (expect remaining rows)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL);"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL);"
echo "REPLICA: Slot changes for before_basebackup (expect remaining rows)"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL);"
PGPORT=$PGPORT_REPLICA psql -c "SELECT * FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL);"
set -e
