#!/bin/bash

# The idea of script is to enable two_phase after Preparing a transaction
# and then attempt to move confirmed_lsn backward (prior to two_phase_at)
# resulting in replay of PREPARE twice. 
# Script achieves this by stoppign walsender using injection point when needed.


port_primary=5433
port_secondary=5434
port_subscriber=5435

echo '=========='
echo '=Clean up='
echo '=========='

pg_ctl stop -D data_primary
pg_ctl stop -D data_secondary
pg_ctl stop -D data_subscriber

rm -rf data_* *log

echo '======================='
echo '=Set up primary server='
echo '======================='

initdb -D data_primary

cat << EOF >> data_primary/postgresql.conf
wal_level = logical 
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB

max_wal_senders=550
max_worker_processes=1000 
max_replication_slots=550 
log_replication_commands = 'on' 
checkpoint_timeout = 1d 
shared_buffers = 6GB 
max_worker_processes = 32 
max_parallel_maintenance_workers = 24 
max_parallel_workers = 32 
synchronous_commit = on 
checkpoint_timeout = 1d 
max_wal_size = 24GB 
min_wal_size = 15GB 
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s

log_min_messages = 'debug1'


max_prepared_transactions = 10

EOF


pg_ctl -D data_primary start -w -l primary.log 

psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');"

echo '========================='
echo '=Set up secondary server='
echo '========================='

psql -d postgres -p $port_primary -c "CHECKPOINT;"

pg_basebackup -D data_secondary -p $port_primary

cat << EOF >> data_secondary/postgresql.conf
port = $port_secondary
primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgres'
#primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgreis'
primary_slot_name = 'physical'
hot_standby = on 
hot_standby_feedback = on
#sync_replication_slots = on
#standby_slot_names = ''
EOF

cat << EOF >> data_secondary/standby.signal
EOF

pg_ctl -D data_secondary start -w -l sec.log 


psql -d postgres -p $port_secondary -c "SELECT 'init' FROM pg_create_logical_replication_slot('stuck', 'pgoutput');" &

sleep 1

psql -d postgres -p $port_primary -c "CHECKPOINT;"

echo '==================='
echo '=Set up subscirber='
echo '==================='

initdb -D data_subscriber

cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber


checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s


wal_receiver_status_interval = 1
max_prepared_transactions = 10
log_min_messages = 'debug1'
EOF

pg_ctl start -D data_subscriber -l sub.log

psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); INSERT INTO tab1 VALUES(1); CREATE PUBLICATION pub FOR TABLE tab1;"

psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);"
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (slot_name='logicalslot', create_slot=true, copy_data = true, two_phase=false, failover=true, enabled=true)"

sleep 2 

# PREPARE the transaction with two_phase disabled.
# It will not be transimitted to subscriber as two_phase is disabled
psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(1);"
psql -d postgres -p $port_primary -c "BEGIN; SELECT pg_logical_emit_message(true, 'test', 'test'); PREPARE TRANSACTION 'slotsync_twophase_prepared';"

sleep 1

# By this time, walsender must have sent keepalive message to subscriber making
# subscriber to acknowledge the lsn resulting in confirmed_flush advancement
# on pub without origin_lsn advancement on sub.


# Now enable the two_phase but "after" invoking injection point in walsender to
# stop it from sending further keepalive messages which may advance confirmed_lsn
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"
sleep 1 

psql -d postgres -p $port_primary -c "create extension injection_points;SELECT injection_points_attach('process-replies', 'wait');"

# This sub enable with two_phase=ON will restart replication using old origin_lsn.
# The two_phase_at will be set to confirmed_flush (advanced one) in 
# StartLogicalReplication-->CreateDecodingContext
# and then walsender will be stuck on injection point wait in WalSndLoop.
#
# Note that since walsender is stopped using injection point before ProcessRepliesIfAny(),
# it will not be able to set confirmed_flush to origin_lsn yet. Also it will not be able to
# send any further keepalive messages.
psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase =on); alter subscription sub enable ;"

sleep 2 

# Disable the subscription and trigger COMMIT PREPARE on pub and 
# meanwhile release the wait in walsender as well for it to process
# the changes once subscription is enabled.
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

# As soon as walsender's injection point is released here, it will hit ProcessRepliesIfAny() and 
# will proceed with setting confirmed_flush to origin_lsn sent by previous 'alter subscription sub enable'
# command and will then exit.
#
# Now the state is:
# -- two_phase_at is set to advanced confirmed_lsn
# -- confirmed_lsn set to origin_lsn sent by first 'subscription enable' resulting in confirmed_lsn < two_phase_at.
psql -d postgres -p $port_primary -c "SELECT injection_points_wakeup('process-replies');SELECT injection_points_detach('process-replies');"
sleep 1

psql -d postgres -p $port_primary -c "COMMIT PREPARED 'slotsync_twophase_prepared';"

# Now, with this enable, walsender with restart with confirmed_flush < two_phase_at.
# In such a case, since a prepared transaction exists before two_phase_at, then after
# re-enabling the subscription, it will replicate that prepared transaction when
# decoding the PREPARE record and replicate that again when decoding the COMMIT
# PREPARED record, resulting in erorr on sub:
# ERROR: transaction identifier "pg_gid_16387_755" is already in use. 
psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

sleep 1

exit
