From a637b65fc53b208857e0d3d17141d8ed3609036f Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 14 Sep 2020 16:08:25 -0700
Subject: [PATCH v2] WIP: fix and test snapshot behaviour on standby.

Reported-By: Ian Barwick <ian.barwick@2ndquadrant.com>
Author: Andres Freund <andres@anarazel.de>
Author: Ian Barwick <ian.barwick@2ndquadrant.com>
Discussion: https://postgr.es/m/61291ffe-d611-f889-68b5-c298da9fb18f@2ndquadrant.com
---
 src/backend/storage/ipc/procarray.c       |   3 +
 src/test/recovery/t/021_row_visibility.pl | 227 ++++++++++++++++++++++
 2 files changed, 230 insertions(+)
 create mode 100644 src/test/recovery/t/021_row_visibility.pl

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 802b119c490..fffa5f7a93e 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -4280,6 +4280,9 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
 	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
 	MaintainLatestCompletedXidRecovery(max_xid);
 
+	/* ... and xactCompletionCount */
+	ShmemVariableCache->xactCompletionCount++;
+
 	LWLockRelease(ProcArrayLock);
 }
 
diff --git a/src/test/recovery/t/021_row_visibility.pl b/src/test/recovery/t/021_row_visibility.pl
new file mode 100644
index 00000000000..08713fa2686
--- /dev/null
+++ b/src/test/recovery/t/021_row_visibility.pl
@@ -0,0 +1,227 @@
+# Checks that a standby session can see all expected rows
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 10;
+
+# Initialize primary node
+my $node_primary = get_new_node('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->append_conf('postgresql.conf', 'max_prepared_transactions=10');
+$node_primary->start;
+
+# Initialize with empty test table
+$node_primary->safe_psql('postgres',
+	'CREATE TABLE public.test_visibility (data text not null)');
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create streaming standby from backup
+my $node_standby = get_new_node('standby');
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', 'max_prepared_transactions=10');
+$node_standby->start;
+
+# To avoid hanging while expecting some specific input from a psql
+# instance being driven by us, add a timeout high enough that it
+# should never trigger even on very slow machines, unless something
+# is really wrong.
+my $psql_timeout = IPC::Run::timer(5);
+
+
+# One psql to primary for all queries. That allows to check
+# uncommitted changes being replicated and such.
+my ($psql_primary_stdin, $psql_primary_stdout, $psql_primary_stderr) = ('', '', '');
+my $psql_primary = IPC::Run::start(
+	[
+		'psql', '-X', '-qAe', '-f', '-', '-d',
+		$node_primary->connstr('postgres')
+	],
+	'<',
+	\$psql_primary_stdin,
+	'>',
+	\$psql_primary_stdout,
+	'2>',
+	\$psql_primary_stderr,
+	$psql_timeout);
+
+# One psql to standby for all queries. That allows to reuse the same
+# session for multiple queries, which is important to detect some
+# types of errors.
+my ($psql_standby_stdin, $psql_standby_stdout, $psql_standby_stderr) = ('', '', '');
+my $psql_standby = IPC::Run::start(
+	[
+		'psql', '-X', '-qAe', '-f', '-', '-d',
+		$node_standby->connstr('postgres')
+	],
+	'<',
+	\$psql_standby_stdin,
+	'>',
+	\$psql_standby_stdout,
+	'2>',
+	\$psql_standby_stderr,
+	$psql_timeout);
+
+#
+# 1. Check initial data is the same
+#
+$psql_standby_stdin .= q[
+SELECT * FROM test_visibility ORDER BY data;
+  ];
+ok(pump_until($psql_standby, \$psql_standby_stdout, qr/0 rows/m),
+   'data not visible');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+
+#
+# 2. Check if an INSERT is replayed and visible
+#
+$node_primary->psql('postgres', "INSERT INTO test_visibility VALUES ('first insert')");
+$node_primary->wait_for_catchup($node_standby, 'replay',
+	$node_primary->lsn('insert'));
+
+$psql_standby_stdin .= q[
+SELECT * FROM test_visibility ORDER BY data;
+  ];
+ok(pump_until($psql_standby, \$psql_standby_stdout, qr/first insert/m),
+   'insert visible');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+
+#
+# 3. Verify that uncommitted changes aren't visible.
+#
+$psql_primary_stdin .= q[
+BEGIN;
+UPDATE test_visibility SET data = 'first update' RETURNING data;
+  ];
+ok(pump_until($psql_primary, \$psql_primary_stdout, qr/first update/m),
+   'UPDATE');
+
+# ensure WAL flush
+$node_primary->psql('postgres', "SELECT txid_current();");
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
+	$node_primary->lsn('insert'));
+
+$psql_standby_stdin .= q[
+SELECT * FROM test_visibility ORDER BY data;
+  ];
+ok(pump_until($psql_standby, \$psql_standby_stdout, qr/first insert/m),
+   'uncommitted update invisible');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+#
+# 4. That a commit turns 3. visible
+#
+$psql_primary_stdin .= q[
+COMMIT;
+  ];
+ok(pump_until($psql_primary, \$psql_primary_stdout, qr/first update/m),
+   'COMMIT');
+$node_primary->wait_for_catchup($node_standby, 'replay',
+	$node_primary->lsn('insert'));
+
+$psql_standby_stdin .= q[
+SELECT * FROM test_visibility ORDER BY data;
+  ];
+ok(pump_until($psql_standby, \$psql_standby_stdout, qr/first update/m),
+   'committed update visible');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+#
+# 5. Check that changes in prepared xacts is invisible
+#
+$psql_primary_stdin .= q[
+DELETE from test_visibility;
+BEGIN;
+INSERT INTO test_visibility VALUES('inserted in prepared will_commit');
+PREPARE TRANSACTION 'will_commit';
+  ];
+ok(pump_until($psql_primary, \$psql_primary_stdout, qr/first update/m),
+   'prepared will_commit');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+$psql_primary_stdin .= q[
+BEGIN;
+INSERT INTO test_visibility VALUES('inserted in prepared will_abort');
+PREPARE TRANSACTION 'will_abort';
+  ];
+ok(pump_until($psql_primary, \$psql_primary_stdout, qr/PREPARE/m),
+   'prepared will_abort');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+# ensure WAL flush
+$node_primary->psql('postgres', "SELECT txid_current();");
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
+	$node_primary->lsn('insert'));
+
+$psql_standby_stdin .= q[
+SELECT * FROM test_visibility ORDER BY data;
+  ];
+ok(pump_until($psql_standby, \$psql_standby_stdout, qr/0 rows/m),
+   'uncommitted prepared invisible');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+# For some variation, finish prepared xacts via separate connections
+$node_primary->safe_psql('postgres',
+	"COMMIT PREPARED 'will_commit';");
+$node_primary->safe_psql('postgres',
+	"ROLLBACK PREPARED 'will_abort';");
+$node_primary->wait_for_catchup($node_standby, 'replay',
+	$node_primary->lsn('insert'));
+
+$psql_standby_stdin .= q[
+SELECT * FROM test_visibility ORDER BY data;
+  ];
+ok(pump_until($psql_standby, \$psql_standby_stdout, qr/will_commit.*\n.*1 row/m),
+   'finished prepared visible');
+$psql_standby_stdout = '';
+$psql_standby_stderr = '';
+
+$node_primary->stop;
+$node_standby->stop;
+
+
+# Pump until string is matched, or timeout occurs
+sub pump_until
+{
+	my ($proc, $stream, $untl) = @_;
+	$proc->pump_nb();
+	while (1)
+	{
+		last if $$stream =~ /$untl/;
+		if ($psql_timeout->is_expired)
+		{
+			diag("aborting wait: program timed out");
+			diag("stream contents: >>", $$stream, "<<");
+			diag("pattern searched for: ", $untl);
+
+			return 0;
+		}
+		if (not $proc->pumpable())
+		{
+			diag("aborting wait: program died");
+			diag("stream contents: >>", $$stream, "<<");
+			diag("pattern searched for: ", $untl);
+
+			return 0;
+		}
+		$proc->pump();
+	}
+	return 1;
+
+}
-- 
2.25.0.114.g5b0ca878e0

