#!/usr/bin/env python3 """ Benchmark script for LISTEN/NOTIFY pattern matching feature. Tests: 1. Exact channel vs pattern channel overhead - exact_vs_pattern 2. Pattern count scaling (how many patterns a backend listens on) - scaling 3. Pattern complexity (prefix vs infix patterns) - complexity 4. Mixed workload (some exact, some pattern listeners) - non_matching Test names: 1. exact_vs_pattern 2. scaling 3. complexity 4. non_matching Usage: ./bench_pattern_notify.py [--dsn DSN] [--duration SECONDS] [--warmup SECONDS] [--test TEST_NAME] Requirements: pip install psycopg """ import argparse import select import statistics import sys import time from dataclasses import dataclass from typing import Callable import psycopg @dataclass class BenchmarkResult: name: str notifications_per_sec: float total_notifications: int duration: float stddev: float = 0.0 def wait_for_notify(conn, timeout: float = 1.0) -> int: """Wait for notifications and return count received.""" count = 0 gen = conn.notifies(timeout=timeout) for notify in gen: count += 1 # Stop after receiving one batch break return count def run_benchmark( dsn: str, name: str, setup_listener: Callable, channel_name: str, duration: float = 5.0, warmup: float = 1.0, runs: int = 3, ) -> BenchmarkResult: """ Run a benchmark measuring notification throughput. Args: dsn: PostgreSQL connection string name: Benchmark name for reporting setup_listener: Function that takes a connection and sets up LISTEN channel_name: Channel name to NOTIFY duration: How long to run each measurement (seconds) warmup: Warmup period before measuring (seconds) runs: Number of runs to average """ results = [] for run in range(runs): # Create listener and notifier connections listener_conn = psycopg.connect(dsn, autocommit=True) notifier_conn = psycopg.connect(dsn, autocommit=True) # Setup listener setup_listener(listener_conn) # Warmup warmup_end = time.monotonic() + warmup while time.monotonic() < warmup_end: notifier_conn.execute(f"NOTIFY {channel_name}, 'warmup'") wait_for_notify(listener_conn, timeout=0.1) # Drain any pending notifications listener_conn.execute("SELECT 1") time.sleep(0.1) # Benchmark total_notifications = 0 start_time = time.monotonic() end_time = start_time + duration while time.monotonic() < end_time: notifier_conn.execute(f"NOTIFY {channel_name}, 'bench'") received = wait_for_notify(listener_conn, timeout=0.1) total_notifications += received elapsed = time.monotonic() - start_time results.append(total_notifications / elapsed) listener_conn.close() notifier_conn.close() return BenchmarkResult( name=name, notifications_per_sec=statistics.mean(results), total_notifications=int(statistics.mean(results) * duration), duration=duration, stddev=statistics.stdev(results) if len(results) > 1 else 0.0, ) def bench_exact_vs_pattern(dsn: str, duration: float, warmup: float) -> list[BenchmarkResult]: """Test 1: Compare exact channel match vs pattern match.""" results = [] # Exact match def setup_exact(conn): conn.execute("UNLISTEN *") conn.execute('LISTEN "user_123"') results.append( run_benchmark( dsn=dsn, name='Exact channel (LISTEN "user_123")', setup_listener=setup_exact, channel_name="user_123", duration=duration, warmup=warmup, ) ) # Pattern match - simple prefix def setup_pattern(conn): conn.execute("UNLISTEN *") conn.execute('LISTEN "user_*"') results.append( run_benchmark( dsn=dsn, name='Pattern channel (LISTEN "user_*")', setup_listener=setup_pattern, channel_name="user_123", duration=duration, warmup=warmup, ) ) return results def bench_pattern_count_scaling(dsn: str, duration: float, warmup: float) -> list[BenchmarkResult]: """Test 2: How does performance scale with number of patterns?""" results = [] for num_patterns in [1, 5, 10, 25, 50, 100]: def setup_patterns(conn, n=num_patterns): conn.execute("UNLISTEN *") # Listen on multiple patterns, only one will match for i in range(n): if i == 0: conn.execute('LISTEN "target_*"') # This one matches else: conn.execute(f'LISTEN "other{i}_*"') # These don't match results.append( run_benchmark( dsn=dsn, name=f"{num_patterns} pattern(s)", setup_listener=setup_patterns, channel_name="target_123", duration=duration, warmup=warmup, ) ) return results def bench_pattern_complexity(dsn: str, duration: float, warmup: float) -> list[BenchmarkResult]: """Test 3: Compare different pattern complexities.""" results = [] # Simple prefix pattern: user_* def setup_prefix(conn): conn.execute("UNLISTEN *") conn.execute('LISTEN "user_*"') results.append( run_benchmark( dsn=dsn, name="Prefix pattern (user_*)", setup_listener=setup_prefix, channel_name="user_123_extra_stuff", duration=duration, warmup=warmup, ) ) # Single char wildcard: user_? def setup_single(conn): conn.execute("UNLISTEN *") conn.execute('LISTEN "user_?"') results.append( run_benchmark( dsn=dsn, name="Single char pattern (user_?)", setup_listener=setup_single, channel_name="user_X", duration=duration, warmup=warmup, ) ) # Infix pattern: prefix_*_suffix def setup_infix(conn): conn.execute("UNLISTEN *") conn.execute('LISTEN "prefix_*_suffix"') results.append( run_benchmark( dsn=dsn, name="Infix pattern (prefix_*_suffix)", setup_listener=setup_infix, channel_name="prefix_middle_part_suffix", duration=duration, warmup=warmup, ) ) # Multiple wildcards: a_*_b_*_c def setup_multi(conn): conn.execute("UNLISTEN *") conn.execute('LISTEN "a_*_b_*_c"') results.append( run_benchmark( dsn=dsn, name="Multi-wildcard (a_*_b_*_c)", setup_listener=setup_multi, channel_name="a_xxx_b_yyy_c", duration=duration, warmup=warmup, ) ) return results def bench_non_matching_patterns(dsn: str, duration: float, warmup: float) -> list[BenchmarkResult]: """Test 4: Overhead of checking patterns that don't match.""" results = [] # Baseline: exact match, no patterns def setup_exact(conn): conn.execute("UNLISTEN *") conn.execute('LISTEN "channel"') results.append( run_benchmark( dsn=dsn, name="Exact only (baseline)", setup_listener=setup_exact, channel_name="channel", duration=duration, warmup=warmup, ) ) # Exact match + non-matching patterns for num_patterns in [10, 50, 100]: def setup_mixed(conn, n=num_patterns): conn.execute("UNLISTEN *") conn.execute('LISTEN "channel"') # Exact match for the notify for i in range(n): conn.execute(f'LISTEN "nomatch{i}_*"') # Patterns that won't match results.append( run_benchmark( dsn=dsn, name=f"Exact + {num_patterns} non-matching patterns", setup_listener=setup_mixed, channel_name="channel", duration=duration, warmup=warmup, ) ) return results def print_results(title: str, results: list[BenchmarkResult]): """Print benchmark results in a formatted table.""" print(f"\n{'=' * 70}") print(f" {title}") print("=" * 70) print(f"{'Benchmark':<45} {'Notif/sec':>12} {'StdDev':>10}") print("-" * 70) baseline = results[0].notifications_per_sec if results else 0 for r in results: pct = ((r.notifications_per_sec / baseline) - 1) * 100 if baseline else 0 pct_str = f"({pct:+.1f}%)" if r != results[0] else "(baseline)" print(f"{r.name:<45} {r.notifications_per_sec:>10.0f} {pct_str:>10}") print() def main(): parser = argparse.ArgumentParser(description="Benchmark LISTEN/NOTIFY pattern matching performance") parser.add_argument( "--dsn", default="dbname=postgres", help="PostgreSQL connection string (default: dbname=postgres)", ) parser.add_argument( "--duration", type=float, default=5.0, help="Duration of each benchmark run in seconds (default: 5)", ) parser.add_argument( "--warmup", type=float, default=1.0, help="Warmup period in seconds (default: 1)", ) parser.add_argument( "--test", choices=["all", "exact_vs_pattern", "scaling", "complexity", "non_matching", "throughput"], default="all", help="Which test to run (default: all)", ) args = parser.parse_args() print(f"LISTEN/NOTIFY Pattern Matching Benchmark") print(f"DSN: {args.dsn}") print(f"Duration: {args.duration}s per run, Warmup: {args.warmup}s") # Verify connection try: conn = psycopg.connect(args.dsn) conn.close() except Exception as e: print(f"Error connecting to database: {e}", file=sys.stderr) sys.exit(1) if args.test in ("all", "exact_vs_pattern"): results = bench_exact_vs_pattern(args.dsn, args.duration, args.warmup) print_results("Test 1: Exact Channel vs Pattern Channel", results) if args.test in ("all", "scaling"): results = bench_pattern_count_scaling(args.dsn, args.duration, args.warmup) print_results("Test 2: Pattern Count Scaling", results) if args.test in ("all", "complexity"): results = bench_pattern_complexity(args.dsn, args.duration, args.warmup) print_results("Test 3: Pattern Complexity", results) if args.test in ("all", "non_matching"): results = bench_non_matching_patterns(args.dsn, args.duration, args.warmup) print_results("Test 4: Non-Matching Pattern Overhead", results) if __name__ == "__main__": main()