# This script runs different test cases through psql to find the maximum number # of lines that still trigger psql to use the pager. # # Termios is used to set the terminal size. # # Use environment variable PATH to select the psql binary to test. Use libpq # environment variables to select a database for testing in which this script # can run the test setup. import argparse import os import os.path import subprocess import sys import tempfile import time import termios import typing def run_tests(outfile): # Prepare database schema proc = subprocess.run(['psql', '-X'], text=True, input=r''' \set ON_ERROR_STOP on BEGIN; CREATE OR REPLACE FUNCTION generate_lines(n int) RETURNS TABLE (lines text) LANGUAGE sql AS $$ SELECT string_agg(s::text, e'\n') FROM generate_series(1, n) s $$; -- This creates a view name and column name with 24 line breaks when truncated -- to the default NAMEDATALEN (63 = 9*2 + 15*3) SELECT format('CREATE OR REPLACE VIEW %I (c) AS SELECT null;' 'CREATE OR REPLACE VIEW nl_column (%1$I) AS SELECT null;', string_agg(s::text, e'\n')) FROM generate_series(1, current_setting('max_identifier_length')::int) s \gexec COMMIT; ''').check_returncode() testcases = [] for cmd in commands: # Repeat each command with every combination of flags that affect the # number of output lines. for flags in range(1 << 3): testcases.append(Testcase( cmd=cmd, unaligned=flags & 1, tuples_only=flags & (1 << 1), expanded=flags & (1 << 2), )) # Remember current term size save_term_size = termios.tcgetwinsize(sys.stdout.fileno()) max_paged_lines = [] for tc in testcases: max_paged_lines.append(find_max_paged_lines(tc.psql_args())) # Restore term size termios.tcsetwinsize(sys.stdout.fileno(), save_term_size) # Print the testcase results for tc, lines in zip(testcases, max_paged_lines): flags = '' flags += 'A' if tc.unaligned else '-' flags += 't' if tc.tuples_only else '-' flags += 'x' if tc.expanded else '-' # Make sure we get one output line per testcase cmd = tc.cmd.replace('\n', r'\n') print(f"{lines:2} {flags} {cmd}", file=outfile) def run_psql_with_pager(args): with tempfile.NamedTemporaryFile() as tmp: mtime_before = os.stat(tmp.name).st_mtime_ns env = { # Inherit environment variables (especially PATH and libpq-specific # ones). **os.environ, # Set PAGER so that we can tell from the temp file's mtime that the # pager was triggered. 'PAGER': f'touch {tmp.name}', } proc = subprocess.run(['psql', '-X', *args], env=env) if proc.returncode: return None mtime_after = os.stat(tmp.name).st_mtime_ns pager_used = mtime_after > mtime_before return pager_used def find_max_paged_lines(psql_args): # Binary search the maximum number of lines at which psql still triggers # the pager. min_lines = 1 max_lines = 100 cols = 100 # sufficient for our test cases while min_lines <= max_lines: lines = min_lines + (max_lines - min_lines) // 2 termios.tcsetwinsize(sys.stdout.fileno(), (lines, cols)) pager_used = run_psql_with_pager(psql_args) if pager_used is None: return -1 if pager_used: min_lines = lines + 1 else: max_lines = lines - 1 return max_lines class Testcase(typing.NamedTuple): cmd: str unaligned: bool tuples_only: bool expanded: bool def psql_args(self): args = ['-c', self.cmd] if self.unaligned: args.append('-A') if self.tuples_only: args.append('-t') if self.expanded: args.append('-x') return args # The SQL and meta commands we will be testing commands = [ 'SELECT * FROM generate_lines(25)', 'SELECT * FROM nl_column', '\\d nl_column', '\\d+ nl_column', '\\d "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15\n16\n17\n18\n19\n20\n21\n22\n23\n24\n"', '\\d+ "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15\n16\n17\n18\n19\n20\n21\n22\n23\n24\n"', ] parser = argparse.ArgumentParser() parser.add_argument('expectfile', metavar='FILE', help="file with expected test results") args = parser.parse_args() basename, _ = os.path.splitext(args.expectfile) outfile = basename + '.tmp' difffile = basename + '.diff' with open(outfile, 'w') as fp: run_tests(fp) with open(difffile, 'w') as fp: if os.path.isfile(args.expectfile): srcfile = args.expectfile else: srcfile = os.devnull proc = subprocess.run(['diff', '-u', srcfile, outfile], stdout=fp) if proc.returncode: print() print("Test output does not match the expected output.") print(f'The differences can be viewed in file "{difffile}".') exit(1)