From ebd0dc2218a744c0c94180472dc92a4ab901999a Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <postgres@jeltef.nl>
Date: Tue, 16 Dec 2025 09:25:48 +0100
Subject: [PATCH v9 2/5] Add pytest infrastructure to interact with PostgreSQL
 servers

This adds functionality to the pytest infrastructure that allows tests
to do common things with PostgreSQL servers like:
- creating
- starting
- stopping
- connecting
- running queries
- handling errors

The goal of this infrastructure is to be so easy to use that the actual
tests really only contain the logic to test the behaviour that the tests
are testing, as opposed to a bunch of boilerplate. Examples of this are:

Types get converted to their Python counter parts automatically. Errors
become actual Python exceptions. Results of queries that only return a
single row or cell are unpacked automatically, so you don't have to do
rows[0][0] if the query only returns a single cell.

The only new tests that are part of this commit are tests that cover
this testing infrastructure itself. It's debatable whether such tests
are useful long term, because any infrastructure that's unused by actual
tests should probably not exist. For now it seems good to test this
basic functionality though, both to make sure we don't break it before
committing actual tests that use it, and also as an example for people
writing new tests.
---
 doc/src/sgml/regress.sgml                 |  66 ++-
 pyproject.toml                            |   3 +
 src/test/pytest/README                    | 154 ++++++-
 src/test/pytest/libpq/__init__.py         |  35 ++
 src/test/pytest/libpq/_core.py            | 488 ++++++++++++++++++++++
 src/test/pytest/libpq/errors.py           |  62 +++
 src/test/pytest/meson.build               |   4 +
 src/test/pytest/pypg/__init__.py          |  10 +
 src/test/pytest/pypg/_env.py              |  72 ++++
 src/test/pytest/pypg/fixtures.py          | 335 +++++++++++++++
 src/test/pytest/pypg/server.py            | 470 +++++++++++++++++++++
 src/test/pytest/pypg/util.py              |  42 ++
 src/test/pytest/pyt/conftest.py           |   1 +
 src/test/pytest/pyt/test_errors.py        |  34 ++
 src/test/pytest/pyt/test_libpq.py         | 172 ++++++++
 src/test/pytest/pyt/test_multi_server.py  |  46 ++
 src/test/pytest/pyt/test_query_helpers.py | 347 +++++++++++++++
 17 files changed, 2339 insertions(+), 2 deletions(-)
 create mode 100644 src/test/pytest/libpq/__init__.py
 create mode 100644 src/test/pytest/libpq/_core.py
 create mode 100644 src/test/pytest/libpq/errors.py
 create mode 100644 src/test/pytest/pypg/__init__.py
 create mode 100644 src/test/pytest/pypg/_env.py
 create mode 100644 src/test/pytest/pypg/fixtures.py
 create mode 100644 src/test/pytest/pypg/server.py
 create mode 100644 src/test/pytest/pypg/util.py
 create mode 100644 src/test/pytest/pyt/conftest.py
 create mode 100644 src/test/pytest/pyt/test_errors.py
 create mode 100644 src/test/pytest/pyt/test_libpq.py
 create mode 100644 src/test/pytest/pyt/test_multi_server.py
 create mode 100644 src/test/pytest/pyt/test_query_helpers.py

diff --git a/doc/src/sgml/regress.sgml b/doc/src/sgml/regress.sgml
index d80dd46c5fd..2d85edacec7 100644
--- a/doc/src/sgml/regress.sgml
+++ b/doc/src/sgml/regress.sgml
@@ -840,7 +840,7 @@ float4:out:.*-.*-cygwin.*=float4-misrounded-input.out
   </sect1>
 
   <sect1 id="regress-tap">
-   <title>TAP Tests</title>
+   <title>Perl TAP Tests</title>
 
    <para>
     Various tests, particularly the client program tests
@@ -929,6 +929,70 @@ PG_TEST_NOCLEAN=1 make -C src/bin/pg_dump check
 
   </sect1>
 
+  <sect1 id="regress-pytest">
+   <title>Pytest Tests</title>
+
+   <para>
+    Tests in <filename>pyt</filename> directories use the Python
+    <application>pytest</application> framework. These tests provide a
+    convenient way to test libpq client functionality and scenarios requiring
+    multiple PostgreSQL server instances.
+   </para>
+
+   <para>
+    The pytest tests require <productname>PostgreSQL</productname> to be
+    configured with the option <option>--enable-pytest</option> (or
+    <option>-Dpytest=enabled</option> for Meson builds). You also need
+    <application>pytest</application> installed. You can either install it
+    system-wide, or create a virtual environment in the source directory:
+<programlisting>
+python -m venv .venv
+source .venv/bin/activate
+pip install .
+</programlisting>
+    Alternatively, if you have <application>uv</application> installed:
+<programlisting>
+uv sync
+source .venv/bin/activate
+</programlisting>
+    Remember to activate the virtual environment before running
+    <command>configure</command> or <command>meson setup</command>.
+   </para>
+
+   <para>
+    With Meson builds, you can run the pytest tests using:
+<programlisting>
+meson test --suite pytest
+</programlisting>
+    With autoconf-based builds, you can run them from the
+    <filename>src/test/pytest</filename> directory using:
+<programlisting>
+make check
+</programlisting>
+   </para>
+
+   <para>
+    You can also run specific test files directly using pytest:
+<programlisting>
+pytest src/test/pytest/pyt/test_libpq.py
+pytest -k "test_connstr"
+</programlisting>
+   </para>
+
+   <para>
+    Many operations in the test suites use a 180-second timeout, which on slow
+    hosts may lead to load-induced timeouts.  Setting the environment variable
+    <varname>PG_TEST_TIMEOUT_DEFAULT</varname> to a higher number will change
+    the default to avoid this.
+   </para>
+
+   <para>
+    For more information on writing pytest tests, see the
+    <filename>src/test/pytest/README</filename> file.
+   </para>
+
+  </sect1>
+
   <sect1 id="regress-coverage">
    <title>Test Coverage Examination</title>
 
diff --git a/pyproject.toml b/pyproject.toml
index 60abb4d0655..4628d2274e0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -19,3 +19,6 @@ minversion = "7.0"
 
 # Common test code can be found here.
 pythonpath = ["src/test/pytest"]
+
+# Load the shared fixtures plugin
+addopts = ["-p", "pypg.fixtures"]
diff --git a/src/test/pytest/README b/src/test/pytest/README
index 1333ed77b7e..bb75e56a25d 100644
--- a/src/test/pytest/README
+++ b/src/test/pytest/README
@@ -1 +1,153 @@
-TODO
+src/test/pytest/README
+
+Pytest-based tests
+==================
+
+This directory contains infrastructure for Python-based tests using pytest,
+along with some core tests for the pytest infrastructure itself. The framework
+provides fixtures for managing PostgreSQL server instances and connecting to
+them via libpq.
+
+
+Running the tests
+=================
+
+NOTE: You must have given the --enable-pytest argument to configure (or
+-Dpytest=enabled for Meson builds). You also need to have pytest installed.
+
+If you don't have pytest installed system-wide, you can create a virtual
+environment:
+
+    python3 -m venv .venv
+    source .venv/bin/activate    # On Windows: .venv\Scripts\activate
+    pip install .                # Installs pytest and other dependencies
+
+Or using uv (https://docs.astral.sh/uv/):
+
+    uv sync
+    source .venv/bin/activate    # On Windows: .venv\Scripts\activate
+
+Remember to activate the virtual environment before running configure/meson
+setup.
+
+With Meson builds, you can run:
+    meson test --suite pytest
+
+With autoconf based builds, you can run:
+    make check
+or
+    make installcheck
+
+You can run specific test files and/or use pytest's -k option to select tests:
+    pytest src/test/pytest/pyt/test_libpq.py
+    pytest -k "test_connstr"
+
+
+Directory structure
+===================
+
+pypg/
+    Python library providing common functions and pytest fixtures that can be
+    used in tests.
+
+libpq/
+    A simple but user-friendly python wrapper around libpq
+
+pyt/
+    Tests for the pytest infrastructure itself
+
+pgtap.py
+    A pytest plugin to output results in TAP format
+
+
+Writing tests
+=============
+
+Tests use pytest fixtures to manage server instances and connections. The
+most commonly used fixtures are:
+
+pg
+    A PostgresServer instance configured for the current test. Use this for
+    creating test users/databases or modifying server configuration. Changes
+    are automatically rolled back after the test.
+
+conn
+    A connected PGconn instance to the test server. Automatically cleaned up
+    after the test.
+
+connect
+    A function to create additional connections with custom options.
+
+create_pg
+    A factory function to create additional PostgreSQL servers within a test.
+    Servers are automatically cleaned up at the end of the test. Useful for
+    testing scenarios that require multiple independent servers.
+
+create_pg_module
+    Like create_pg, but servers persist for the entire test module. Use this
+    when multiple tests in a module can share the same servers, which is
+    faster than creating new servers for each test.
+
+
+Example test:
+
+    def test_simple_query(conn):
+        result = conn.sql("SELECT 1 + 1")
+        assert result == 2
+
+    def test_with_user(pg):
+        users = pg.create_users("test")
+        with pg.reloading() as s:
+            s.hba.prepend(["local", "all", users["test"], "trust"])
+
+        conn = pg.connect(user=users["test"])
+        assert conn.sql("SELECT current_user") == users["test"]
+
+    def test_multiple_servers(create_pg):
+        node1 = create_pg("primary")
+        node2 = create_pg("secondary")
+
+        conn1 = node1.connect()
+        conn2 = node2.connect()
+
+        # Each server is independent
+        assert node1.port != node2.port
+
+
+Server configuration
+====================
+
+Tests can temporarily modify server configuration using context managers:
+
+    with pg.reloading() as s:
+        s.conf.set(log_connections="on")
+        s.hba.prepend("local all all trust")
+        # Server is reloaded here
+    # After the test finished the original configuration is restored and
+    # the server is reloaded again
+
+Use pg.restarting() instead if the configuration change requires a restart.
+
+
+Timeouts
+========
+
+Tests inherit the PG_TEST_TIMEOUT_DEFAULT environment variable (defaulting
+to 180 seconds). The remaining_timeout fixture provides a function that
+returns how much time remains for the current test.
+
+
+Environment variables
+=====================
+
+PG_TEST_TIMEOUT_DEFAULT
+    Per-test timeout in seconds (default: 180)
+
+PG_CONFIG
+    Path to pg_config (default: uses PATH)
+
+TESTDATADIR
+    Directory for test data (default: pytest temp directory)
+
+PG_TEST_EXTRA
+    Space-separated list of optional test categories to run (e.g., "ssl")
diff --git a/src/test/pytest/libpq/__init__.py b/src/test/pytest/libpq/__init__.py
new file mode 100644
index 00000000000..6a71ebbe43f
--- /dev/null
+++ b/src/test/pytest/libpq/__init__.py
@@ -0,0 +1,35 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+libpq testing utilities - ctypes bindings and helpers for PostgreSQL's libpq library.
+
+This module provides Python wrappers around libpq for use in pytest tests.
+"""
+
+from . import errors
+from .errors import LibpqError
+from ._core import (
+    ConnectionStatus,
+    DiagField,
+    ExecStatus,
+    PGconn,
+    PGresult,
+    connect,
+    connstr,
+    load_libpq_handle,
+    register_type_info,
+)
+
+__all__ = [
+    "errors",
+    "LibpqError",
+    "ConnectionStatus",
+    "DiagField",
+    "ExecStatus",
+    "PGconn",
+    "PGresult",
+    "connect",
+    "connstr",
+    "load_libpq_handle",
+    "register_type_info",
+]
diff --git a/src/test/pytest/libpq/_core.py b/src/test/pytest/libpq/_core.py
new file mode 100644
index 00000000000..1c059b9b446
--- /dev/null
+++ b/src/test/pytest/libpq/_core.py
@@ -0,0 +1,488 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+Core libpq functionality - ctypes bindings and connection handling.
+"""
+
+import contextlib
+import ctypes
+import datetime
+import decimal
+import enum
+import json
+import platform
+import os
+import uuid
+from typing import Any, Callable, Dict, Optional
+
+from .errors import LibpqError
+
+
+# PG_DIAG field identifiers from postgres_ext.h
+class DiagField(enum.IntEnum):
+    SEVERITY = ord("S")
+    SEVERITY_NONLOCALIZED = ord("V")
+    SQLSTATE = ord("C")
+    MESSAGE_PRIMARY = ord("M")
+    MESSAGE_DETAIL = ord("D")
+    MESSAGE_HINT = ord("H")
+    STATEMENT_POSITION = ord("P")
+    INTERNAL_POSITION = ord("p")
+    INTERNAL_QUERY = ord("q")
+    CONTEXT = ord("W")
+    SCHEMA_NAME = ord("s")
+    TABLE_NAME = ord("t")
+    COLUMN_NAME = ord("c")
+    DATATYPE_NAME = ord("d")
+    CONSTRAINT_NAME = ord("n")
+    SOURCE_FILE = ord("F")
+    SOURCE_LINE = ord("L")
+    SOURCE_FUNCTION = ord("R")
+
+
+class ConnectionStatus(enum.IntEnum):
+    """PostgreSQL connection status codes from libpq."""
+
+    CONNECTION_OK = 0
+    CONNECTION_BAD = 1
+
+
+class ExecStatus(enum.IntEnum):
+    """PostgreSQL result status codes from PQresultStatus."""
+
+    PGRES_EMPTY_QUERY = 0
+    PGRES_COMMAND_OK = 1
+    PGRES_TUPLES_OK = 2
+    PGRES_COPY_OUT = 3
+    PGRES_COPY_IN = 4
+    PGRES_BAD_RESPONSE = 5
+    PGRES_NONFATAL_ERROR = 6
+    PGRES_FATAL_ERROR = 7
+    PGRES_COPY_BOTH = 8
+    PGRES_SINGLE_TUPLE = 9
+    PGRES_PIPELINE_SYNC = 10
+    PGRES_PIPELINE_ABORTED = 11
+
+
+class _PGconn(ctypes.Structure):
+    pass
+
+
+class _PGresult(ctypes.Structure):
+    pass
+
+
+_PGconn_p = ctypes.POINTER(_PGconn)
+_PGresult_p = ctypes.POINTER(_PGresult)
+
+
+def load_libpq_handle(libdir, bindir):
+    """
+    Loads a ctypes handle for libpq. Some common function prototypes are
+    initialized for general use.
+    """
+    system = platform.system()
+
+    if system in ("Linux", "FreeBSD", "NetBSD", "OpenBSD"):
+        name = "libpq.so.5"
+    elif system == "Darwin":
+        name = "libpq.5.dylib"
+    elif system == "Windows":
+        name = "libpq.dll"
+    else:
+        assert False, f"the libpq fixture must be updated for {system}"
+
+    if system == "Windows":
+        # On Windows, libpq.dll is confusingly in bindir, not libdir. And we
+        # need to add this directory the the search path.
+        libpq_path = os.path.join(bindir, name)
+        lib = ctypes.CDLL(libpq_path)
+    else:
+        libpq_path = os.path.join(libdir, name)
+        lib = ctypes.CDLL(libpq_path)
+
+    #
+    # Function Prototypes
+    #
+
+    lib.PQconnectdb.restype = _PGconn_p
+    lib.PQconnectdb.argtypes = [ctypes.c_char_p]
+
+    lib.PQstatus.restype = ctypes.c_int
+    lib.PQstatus.argtypes = [_PGconn_p]
+
+    lib.PQexec.restype = _PGresult_p
+    lib.PQexec.argtypes = [_PGconn_p, ctypes.c_char_p]
+
+    lib.PQresultStatus.restype = ctypes.c_int
+    lib.PQresultStatus.argtypes = [_PGresult_p]
+
+    lib.PQclear.restype = None
+    lib.PQclear.argtypes = [_PGresult_p]
+
+    lib.PQerrorMessage.restype = ctypes.c_char_p
+    lib.PQerrorMessage.argtypes = [_PGconn_p]
+
+    lib.PQfinish.restype = None
+    lib.PQfinish.argtypes = [_PGconn_p]
+
+    lib.PQresultErrorMessage.restype = ctypes.c_char_p
+    lib.PQresultErrorMessage.argtypes = [_PGresult_p]
+
+    lib.PQntuples.restype = ctypes.c_int
+    lib.PQntuples.argtypes = [_PGresult_p]
+
+    lib.PQnfields.restype = ctypes.c_int
+    lib.PQnfields.argtypes = [_PGresult_p]
+
+    lib.PQgetvalue.restype = ctypes.c_char_p
+    lib.PQgetvalue.argtypes = [_PGresult_p, ctypes.c_int, ctypes.c_int]
+
+    lib.PQgetisnull.restype = ctypes.c_int
+    lib.PQgetisnull.argtypes = [_PGresult_p, ctypes.c_int, ctypes.c_int]
+
+    lib.PQftype.restype = ctypes.c_uint
+    lib.PQftype.argtypes = [_PGresult_p, ctypes.c_int]
+
+    lib.PQresultErrorField.restype = ctypes.c_char_p
+    lib.PQresultErrorField.argtypes = [_PGresult_p, ctypes.c_int]
+
+    return lib
+
+
+# PostgreSQL type OIDs and conversion system
+# Type registry - maps OID to converter function
+_type_converters: Dict[int, Callable[[str], Any]] = {}
+_array_to_elem_map: Dict[int, int] = {}
+
+
+def register_type_info(
+    name: str, oid: int, array_oid: int, converter: Callable[[str], Any]
+):
+    """
+    Register a PostgreSQL type with its OID, array OID, and conversion function.
+
+    Usage:
+        register_type_info("bool", 16, 1000, lambda v: v == "t")
+    """
+    _type_converters[oid] = converter
+    if array_oid is not None:
+        _array_to_elem_map[array_oid] = oid
+
+
+def _parse_array(value: str, elem_oid: int):
+    """Parse PostgreSQL array syntax into nested Python lists."""
+    stack: list[list] = []
+    current_element: list[str] = []
+    in_quotes = False
+    was_quoted = False
+    pos = 0
+
+    while pos < len(value):
+        char = value[pos]
+
+        if in_quotes:
+            if char == "\\":
+                next_char = value[pos + 1]
+                if next_char not in '"\\':
+                    raise NotImplementedError('Only \\" and \\\\ escapes are supported')
+                current_element.append(next_char)
+                pos += 2
+                continue
+            elif char == '"':
+                in_quotes = False
+            else:
+                current_element.append(char)
+        elif char == '"':
+            in_quotes = True
+            was_quoted = True
+        elif char == "{":
+            stack.append([])
+        elif char in ",}":
+            if current_element or was_quoted:
+                elem = "".join(current_element)
+                if not was_quoted and elem == "NULL":
+                    stack[-1].append(None)
+                else:
+                    stack[-1].append(_convert_pg_value(elem, elem_oid))
+                current_element = []
+                was_quoted = False
+            if char == "}":
+                completed = stack.pop()
+                if not stack:
+                    return completed
+                stack[-1].append(completed)
+        elif char != " ":
+            current_element.append(char)
+        pos += 1
+
+    raise ValueError(f"Malformed array literal: {value}")
+
+
+# Register standard PostgreSQL types that we'll likely encounter in tests
+register_type_info("bool", 16, 1000, lambda v: v == "t")
+register_type_info("int2", 21, 1005, int)
+register_type_info("int4", 23, 1007, int)
+register_type_info("int8", 20, 1016, int)
+register_type_info("float4", 700, 1021, float)
+register_type_info("float8", 701, 1022, float)
+register_type_info("numeric", 1700, 1231, decimal.Decimal)
+register_type_info("text", 25, 1009, str)
+register_type_info("varchar", 1043, 1015, str)
+register_type_info("date", 1082, 1182, datetime.date.fromisoformat)
+register_type_info("time", 1083, 1183, datetime.time.fromisoformat)
+register_type_info("timestamp", 1114, 1115, datetime.datetime.fromisoformat)
+register_type_info("timestamptz", 1184, 1185, datetime.datetime.fromisoformat)
+register_type_info("uuid", 2950, 2951, uuid.UUID)
+register_type_info("json", 114, 199, json.loads)
+register_type_info("jsonb", 3802, 3807, json.loads)
+
+
+def _convert_pg_value(value: str, type_oid: int) -> Any:
+    """
+    Convert PostgreSQL string value to appropriate Python type based on OID.
+    Uses the registered type converters from register_type_info().
+    """
+    # Check if it's an array type
+    if type_oid in _array_to_elem_map:
+        elem_oid = _array_to_elem_map[type_oid]
+        return _parse_array(value, elem_oid)
+
+    # Use registered converter if available
+    converter = _type_converters.get(type_oid)
+    if converter:
+        return converter(value)
+
+    # Unknown types - return as string
+    return value
+
+
+def simplify_query_results(results) -> Any:
+    """
+    Simplify the results of a query so that the caller doesn't have to unpack
+    lists and tuples of length 1.
+    """
+    if len(results) == 1:
+        row = results[0]
+        if len(row) == 1:
+            # If there's only a single cell, just return the value
+            return row[0]
+        # If there's only a single row, just return that row
+        return row
+
+    if len(results) != 0 and len(results[0]) == 1:
+        # If there's only a single column, return an array of values
+        return [row[0] for row in results]
+
+    # if there are multiple rows and columns, return the results as is
+    return results
+
+
+class PGresult(contextlib.AbstractContextManager):
+    """Wraps a raw _PGresult_p with a more friendly interface."""
+
+    def __init__(self, lib: ctypes.CDLL, res: _PGresult_p):
+        self._lib = lib
+        self._res = res
+
+    def __exit__(self, *exc):
+        self._lib.PQclear(self._res)
+        self._res = None
+
+    def status(self) -> ExecStatus:
+        return ExecStatus(self._lib.PQresultStatus(self._res))
+
+    def error_message(self):
+        """Returns the error message associated with this result."""
+        msg = self._lib.PQresultErrorMessage(self._res)
+        return msg.decode() if msg else ""
+
+    def _get_error_field(self, field: DiagField) -> Optional[str]:
+        """Get an error field from the result using PQresultErrorField."""
+        val = self._lib.PQresultErrorField(self._res, int(field))
+        return val.decode() if val else None
+
+    def raise_error(self) -> None:
+        """
+        Raises LibpqError with diagnostic information from the result.
+        """
+        if not self._res:
+            raise LibpqError("query failed: out of memory or connection lost")
+
+        sqlstate = self._get_error_field(DiagField.SQLSTATE)
+        primary = self._get_error_field(DiagField.MESSAGE_PRIMARY)
+        detail = self._get_error_field(DiagField.MESSAGE_DETAIL)
+        hint = self._get_error_field(DiagField.MESSAGE_HINT)
+        severity = self._get_error_field(DiagField.SEVERITY)
+        schema_name = self._get_error_field(DiagField.SCHEMA_NAME)
+        table_name = self._get_error_field(DiagField.TABLE_NAME)
+        column_name = self._get_error_field(DiagField.COLUMN_NAME)
+        datatype_name = self._get_error_field(DiagField.DATATYPE_NAME)
+        constraint_name = self._get_error_field(DiagField.CONSTRAINT_NAME)
+        context = self._get_error_field(DiagField.CONTEXT)
+
+        position_str = self._get_error_field(DiagField.STATEMENT_POSITION)
+        position = int(position_str) if position_str else None
+
+        raise LibpqError(
+            primary or self.error_message(),
+            sqlstate=sqlstate,
+            severity=severity,
+            primary=primary,
+            detail=detail,
+            hint=hint,
+            schema_name=schema_name,
+            table_name=table_name,
+            column_name=column_name,
+            datatype_name=datatype_name,
+            constraint_name=constraint_name,
+            position=position,
+            context=context,
+        )
+
+    def fetch_all(self):
+        """
+        Fetch all rows and convert to Python types.
+        Returns a list of tuples, with values converted based on their PostgreSQL type.
+        """
+        nrows = self._lib.PQntuples(self._res)
+        ncols = self._lib.PQnfields(self._res)
+
+        # Get type OIDs for each column
+        type_oids = [self._lib.PQftype(self._res, col) for col in range(ncols)]
+
+        results = []
+        for row in range(nrows):
+            row_data = []
+            for col in range(ncols):
+                if self._lib.PQgetisnull(self._res, row, col):
+                    row_data.append(None)
+                else:
+                    value = self._lib.PQgetvalue(self._res, row, col).decode()
+                    row_data.append(_convert_pg_value(value, type_oids[col]))
+            results.append(tuple(row_data))
+
+        return results
+
+
+class PGconn(contextlib.AbstractContextManager):
+    """
+    Wraps a raw _PGconn_p with a more friendly interface. This is just a
+    stub; it's expected to grow.
+    """
+
+    def __init__(
+        self,
+        lib: ctypes.CDLL,
+        handle: _PGconn_p,
+        stack: contextlib.ExitStack,
+    ):
+        self._lib = lib
+        self._handle = handle
+        self._stack = stack
+
+    def __exit__(self, *exc):
+        self._lib.PQfinish(self._handle)
+        self._handle = None
+
+    def exec(self, query: str):
+        """
+        Executes a query via PQexec() and returns a PGresult.
+        """
+        res = self._lib.PQexec(self._handle, query.encode())
+        return self._stack.enter_context(PGresult(self._lib, res))
+
+    def sql(self, query: str):
+        """
+        Executes a query and raises an exception if it fails.
+        Returns the query results with automatic type conversion and simplification.
+        For commands that don't return data (INSERT, UPDATE, etc.), returns None.
+
+        Examples:
+        - SELECT 1 -> 1
+        - SELECT 1, 2 -> (1, 2)
+        - SELECT * FROM generate_series(1, 3) -> [1, 2, 3]
+        - SELECT * FROM (VALUES (1, 'a'), (2, 'b')) t -> [(1, 'a'), (2, 'b')]
+        - CREATE TABLE ... -> None
+        - INSERT INTO ... -> None
+        """
+        res = self.exec(query)
+        status = res.status()
+
+        if status == ExecStatus.PGRES_FATAL_ERROR:
+            res.raise_error()
+        elif status == ExecStatus.PGRES_COMMAND_OK:
+            return None
+        elif status == ExecStatus.PGRES_TUPLES_OK:
+            results = res.fetch_all()
+            return simplify_query_results(results)
+        else:
+            res.raise_error()
+
+
+def connstr(opts: Dict[str, Any]) -> str:
+    """
+    Flattens the provided options into a libpq connection string. Values
+    are converted to str and quoted/escaped as necessary.
+    """
+    settings = []
+
+    for k, v in opts.items():
+        v = str(v)
+        if not v:
+            v = "''"
+        else:
+            v = v.replace("\\", "\\\\")
+            v = v.replace("'", "\\'")
+
+            if " " in v:
+                v = f"'{v}'"
+
+        settings.append(f"{k}={v}")
+
+    return " ".join(settings)
+
+
+def connect(
+    libpq_handle: ctypes.CDLL,
+    stack: contextlib.ExitStack,
+    remaining_timeout_fn: Callable[[], float],
+    **opts,
+) -> PGconn:
+    """
+    Connects to a server, using the given connection options, and
+    returns a PGconn object wrapping the connection handle. A
+    failure will raise LibpqError.
+
+    Connections honor PG_TEST_TIMEOUT_DEFAULT unless connect_timeout is
+    explicitly overridden in opts.
+
+    Args:
+        libpq_handle: ctypes.CDLL handle to libpq library
+        stack: ExitStack for managing connection cleanup
+        remaining_timeout_fn: Function that returns remaining timeout in seconds
+        **opts: Connection options (host, port, dbname, etc.)
+
+    Returns:
+        PGconn: Connected database connection
+
+    Raises:
+        LibpqError: If connection fails
+    """
+
+    if "connect_timeout" not in opts:
+        t = int(remaining_timeout_fn())
+        opts["connect_timeout"] = max(t, 1)
+
+    conn_p = libpq_handle.PQconnectdb(connstr(opts).encode())
+
+    # Check connection status before adding to stack
+    if libpq_handle.PQstatus(conn_p) != ConnectionStatus.CONNECTION_OK:
+        error_msg = libpq_handle.PQerrorMessage(conn_p).decode()
+        # Manually close the failed connection
+        libpq_handle.PQfinish(conn_p)
+        raise LibpqError(error_msg)
+
+    # Connection succeeded - add to stack for cleanup
+    conn = stack.enter_context(PGconn(libpq_handle, conn_p, stack=stack))
+    return conn
diff --git a/src/test/pytest/libpq/errors.py b/src/test/pytest/libpq/errors.py
new file mode 100644
index 00000000000..c665b663e22
--- /dev/null
+++ b/src/test/pytest/libpq/errors.py
@@ -0,0 +1,62 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+Exception classes for libpq errors.
+"""
+
+from typing import Optional
+
+
+class LibpqError(RuntimeError):
+    """Exception for libpq errors with PostgreSQL diagnostic fields."""
+
+    sqlstate: Optional[str]
+    severity: Optional[str]
+    primary: Optional[str]
+    detail: Optional[str]
+    hint: Optional[str]
+    schema_name: Optional[str]
+    table_name: Optional[str]
+    column_name: Optional[str]
+    datatype_name: Optional[str]
+    constraint_name: Optional[str]
+    position: Optional[int]
+    context: Optional[str]
+
+    def __init__(
+        self,
+        message: str,
+        *,
+        sqlstate: Optional[str] = None,
+        severity: Optional[str] = None,
+        primary: Optional[str] = None,
+        detail: Optional[str] = None,
+        hint: Optional[str] = None,
+        schema_name: Optional[str] = None,
+        table_name: Optional[str] = None,
+        column_name: Optional[str] = None,
+        datatype_name: Optional[str] = None,
+        constraint_name: Optional[str] = None,
+        position: Optional[int] = None,
+        context: Optional[str] = None,
+    ):
+        super().__init__(message)
+        self.sqlstate = sqlstate
+        self.severity = severity
+        self.primary = primary
+        self.detail = detail
+        self.hint = hint
+        self.schema_name = schema_name
+        self.table_name = table_name
+        self.column_name = column_name
+        self.datatype_name = datatype_name
+        self.constraint_name = constraint_name
+        self.position = position
+        self.context = context
+
+    @property
+    def sqlstate_class(self) -> Optional[str]:
+        """Returns the 2-character SQLSTATE class."""
+        if self.sqlstate and len(self.sqlstate) >= 2:
+            return self.sqlstate[:2]
+        return None
diff --git a/src/test/pytest/meson.build b/src/test/pytest/meson.build
index b1f6061b307..b86be901e7c 100644
--- a/src/test/pytest/meson.build
+++ b/src/test/pytest/meson.build
@@ -10,6 +10,10 @@ tests += {
   'bd': meson.current_build_dir(),
   'pytest': {
     'tests': [
+      'pyt/test_errors.py',
+      'pyt/test_libpq.py',
+      'pyt/test_multi_server.py',
+      'pyt/test_query_helpers.py',
     ],
   },
 }
diff --git a/src/test/pytest/pypg/__init__.py b/src/test/pytest/pypg/__init__.py
new file mode 100644
index 00000000000..4ee91289f70
--- /dev/null
+++ b/src/test/pytest/pypg/__init__.py
@@ -0,0 +1,10 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+from ._env import require_test_extras, skip_unless_test_extras
+from .server import PostgresServer
+
+__all__ = [
+    "require_test_extras",
+    "skip_unless_test_extras",
+    "PostgresServer",
+]
diff --git a/src/test/pytest/pypg/_env.py b/src/test/pytest/pypg/_env.py
new file mode 100644
index 00000000000..c4087be3212
--- /dev/null
+++ b/src/test/pytest/pypg/_env.py
@@ -0,0 +1,72 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import logging
+import os
+
+import pytest
+
+logger = logging.getLogger(__name__)
+
+
+def _test_extra_skip_reason(*keys: str) -> str:
+    return "requires {} to be set in PG_TEST_EXTRA".format(", ".join(keys))
+
+
+def _has_test_extra(key: str) -> bool:
+    """
+    Returns True if the PG_TEST_EXTRA environment variable contains the given
+    key.
+    """
+    extra = os.getenv("PG_TEST_EXTRA", "")
+    return key in extra.split()
+
+
+def require_test_extras(*keys: str):
+    """
+    A convenience annotation which will skip tests if all of the required keys
+    are not present in PG_TEST_EXTRA.
+
+    To skip a particular test function or class:
+
+        @pypg.require_test_extras("ldap")
+        def test_some_ldap_feature():
+            ...
+
+    To skip an entire module:
+
+        pytestmark = pypg.require_test_extra("ssl", "kerberos")
+    """
+    return pytest.mark.skipif(
+        not all([_has_test_extra(k) for k in keys]),
+        reason=_test_extra_skip_reason(*keys),
+    )
+
+
+def skip_unless_test_extras(*keys: str):
+    """
+    Skip the current test/fixture if any of the required keys are not present
+    in PG_TEST_EXTRA. Use this inside fixtures where decorators can't be used.
+
+        @pytest.fixture
+        def my_fixture():
+            skip_unless_test_extras("ldap")
+            ...
+    """
+    if not all([_has_test_extra(k) for k in keys]):
+        pytest.skip(_test_extra_skip_reason(*keys))
+
+
+def test_timeout_default() -> int:
+    """
+    Returns the value of the PG_TEST_TIMEOUT_DEFAULT environment variable, in
+    seconds, or 180 if one was not provided.
+    """
+    default = os.getenv("PG_TEST_TIMEOUT_DEFAULT", "")
+    if not default:
+        return 180
+
+    try:
+        return int(default)
+    except ValueError as v:
+        logger.warning("PG_TEST_TIMEOUT_DEFAULT could not be parsed: " + str(v))
+        return 180
diff --git a/src/test/pytest/pypg/fixtures.py b/src/test/pytest/pypg/fixtures.py
new file mode 100644
index 00000000000..8c0cb60daa5
--- /dev/null
+++ b/src/test/pytest/pypg/fixtures.py
@@ -0,0 +1,335 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import os
+import contextlib
+import pathlib
+import time
+from typing import List
+
+import pytest
+
+from ._env import test_timeout_default
+from .util import capture
+from .server import PostgresServer
+
+from libpq import load_libpq_handle, connect as libpq_connect
+
+
+# Stash key for tracking servers for log reporting.
+_servers_key = pytest.StashKey[List[PostgresServer]]()
+
+
+def _record_server_for_log_reporting(request, server):
+    """Record a server for log reporting on test failure."""
+    if _servers_key not in request.node.stash:
+        request.node.stash[_servers_key] = []
+    request.node.stash[_servers_key].append(server)
+
+
+@pytest.fixture
+def remaining_timeout():
+    """
+    This fixture provides a function that returns how much of the
+    PG_TEST_TIMEOUT_DEFAULT remains for the current test, in fractional seconds.
+    This value is never less than zero.
+
+    This fixture is per-test, so the deadline is also reset on a per-test basis.
+    """
+    now = time.monotonic()
+    deadline = now + test_timeout_default()
+
+    return lambda: max(deadline - time.monotonic(), 0)
+
+
+@pytest.fixture(scope="module")
+def remaining_timeout_module():
+    """
+    Same as remaining_timeout, but the deadline is set once per module.
+
+    This fixture is per-module, which means it's generally only really useful
+    for configuring timeouts of operations that happen in the setup phase of
+    another module fixtures. If you use it in a test it would mean that each
+    subsequent test in the module gets a reduced timeout.
+    """
+    now = time.monotonic()
+    deadline = now + test_timeout_default()
+
+    return lambda: max(deadline - time.monotonic(), 0)
+
+
+@pytest.fixture(scope="session")
+def libpq_handle(libdir, bindir):
+    """
+    Loads a ctypes handle for libpq. Some common function prototypes are
+    initialized for general use.
+    """
+    try:
+        return load_libpq_handle(libdir, bindir)
+    except OSError as e:
+        if "wrong ELF class" in str(e):
+            # This happens in CI when trying to lead a 32-bit libpq library
+            # with a 64-bit Python
+            pytest.skip("libpq architecture does not match Python interpreter")
+        raise
+
+
+@pytest.fixture
+def connect(libpq_handle, remaining_timeout):
+    """
+    Returns a function to connect to PostgreSQL via libpq.
+
+    The returned function accepts connection options as keyword arguments
+    (host, port, dbname, etc.) and returns a PGconn object. Connections
+    are automatically cleaned up at the end of the test.
+
+    Example:
+        conn = connect(host='localhost', port=5432, dbname='postgres')
+        result = conn.sql("SELECT 1")
+    """
+    with contextlib.ExitStack() as stack:
+
+        def _connect(**opts):
+            return libpq_connect(libpq_handle, stack, remaining_timeout, **opts)
+
+        yield _connect
+
+
+@pytest.fixture(scope="session")
+def pg_config():
+    """
+    Returns the path to pg_config. Uses PG_CONFIG environment variable if set,
+    otherwise uses 'pg_config' from PATH.
+    """
+    return os.environ.get("PG_CONFIG", "pg_config")
+
+
+@pytest.fixture(scope="session")
+def bindir(pg_config):
+    """
+    Returns the PostgreSQL bin directory using pg_config --bindir.
+    """
+    return pathlib.Path(capture(pg_config, "--bindir"))
+
+
+@pytest.fixture(scope="session")
+def libdir(pg_config):
+    """
+    Returns the PostgreSQL lib directory using pg_config --libdir.
+    """
+    return pathlib.Path(capture(pg_config, "--libdir"))
+
+
+@pytest.fixture(scope="session")
+def tmp_check(tmp_path_factory) -> pathlib.Path:
+    """
+    Returns the tmp_check directory that should be used for the tests. If
+    TESTDATADIR is provided, that will be used; otherwise a new temporary
+    directory is created in the pytest temp root.
+    """
+    d = os.getenv("TESTDATADIR")
+    if d:
+        d = pathlib.Path(d)
+    else:
+        d = tmp_path_factory.mktemp("tmp_check")
+
+    return d
+
+
+@pytest.fixture(scope="session")
+def datadir(tmp_check):
+    """
+    Returns the data directory to use for the pg fixture.
+    """
+
+    return tmp_check / "pgdata"
+
+
+@pytest.fixture(scope="session")
+def sockdir(tmp_path_factory):
+    """
+    Returns the directory name to use as the server's unix_socket_directories
+    setting. Local client connections use this as the PGHOST.
+
+    At the moment, this is always put under the pytest temp root.
+    """
+    return tmp_path_factory.mktemp("sockfiles")
+
+
+@pytest.fixture(scope="session")
+def pg_server_global(bindir, datadir, sockdir, libpq_handle):
+    """
+    Starts a running Postgres server listening on localhost. The HBA initially
+    allows only local UNIX connections from the same user.
+
+    Returns a PostgresServer instance with methods for server management, configuration,
+    and creating test databases/users.
+    """
+    server = PostgresServer("default", bindir, datadir, sockdir, libpq_handle)
+
+    yield server
+
+    # Cleanup any test resources
+    server.cleanup()
+
+    # Stop the server
+    server.stop()
+
+
+@pytest.fixture(scope="module")
+def pg_server_module(pg_server_global):
+    """
+    Module-scoped server context. Which can be useful so that certain settings
+    can be overriden at the module level through autouse fixtures. An example
+    of this is in the SSL tests.
+    """
+    with pg_server_global.subcontext() as s:
+        yield s
+
+
+@pytest.fixture
+def pg(request, pg_server_module, remaining_timeout):
+    """
+    Per-test server context. Use this fixture to make changes to the server
+    which will be rolled back at the end of the test (e.g., creating test
+    users/databases).
+
+    Also captures the PostgreSQL log position at test start so that any new
+    log entries can be included in the test report on failure.
+    """
+    with pg_server_module.start_new_test(remaining_timeout) as s:
+        _record_server_for_log_reporting(request, s)
+        yield s
+
+
+@pytest.fixture
+def conn(pg):
+    """
+    Returns a connected PGconn instance to the test PostgreSQL server.
+    The connection is automatically cleaned up at the end of the test.
+
+    Example:
+        def test_something(conn):
+            result = conn.sql("SELECT 1")
+            assert result == 1
+    """
+    return pg.connect()
+
+
+@pytest.fixture
+def create_pg(request, bindir, sockdir, libpq_handle, tmp_check, remaining_timeout):
+    """
+    Factory fixture to create additional PostgreSQL servers (per-test scope).
+
+    Returns a function that creates new PostgreSQL server instances.
+    Servers are automatically cleaned up at the end of the test.
+
+    Example:
+        def test_multiple_servers(create_pg):
+            node1 = create_pg()
+            node2 = create_pg()
+            node3 = create_pg()
+    """
+    servers = []
+
+    def _create(name=None, **kwargs):
+        if name is None:
+            count = len(servers) + 1
+            name = f"pg{count}"
+
+        datadir = tmp_check / f"pgdata_{name}"
+        server = PostgresServer(name, bindir, datadir, sockdir, libpq_handle, **kwargs)
+        server.set_timeout(remaining_timeout)
+        _record_server_for_log_reporting(request, server)
+        servers.append(server)
+        return server
+
+    yield _create
+
+    for server in servers:
+        server.cleanup()
+        server.stop()
+
+
+@pytest.fixture(scope="module")
+def _module_scoped_servers():
+    """Session-scoped list to track servers created by create_pg_module."""
+    return []
+
+
+@pytest.fixture(scope="module")
+def create_pg_module(
+    bindir,
+    sockdir,
+    libpq_handle,
+    tmp_check,
+    remaining_timeout_module,
+    _module_scoped_servers,
+):
+    """
+    Factory fixture to create additional PostgreSQL servers (module scope).
+
+    Like create_pg, but servers persist for the entire test module.
+    Use this when multiple tests in a module can share the same servers.
+
+    The timeout is automatically set on all servers at the start of each test
+    via the _set_module_server_timeouts autouse fixture.
+
+    Example:
+        @pytest.fixture(scope="module")
+        def shared_nodes(create_pg_module):
+            return [create_pg_module() for _ in range(3)]
+    """
+
+    def _create(name=None, **kwargs):
+        if name is None:
+            count = len(_module_scoped_servers) + 1
+            name = f"pg{count}"
+        datadir = tmp_check / f"pgdata_{name}"
+        server = PostgresServer(name, bindir, datadir, sockdir, libpq_handle, **kwargs)
+        server.set_timeout(remaining_timeout_module)
+        _module_scoped_servers.append(server)
+        return server
+
+    yield _create
+
+    for server in _module_scoped_servers:
+        server.cleanup()
+        server.stop()
+
+
+@pytest.fixture(autouse=True)
+def _set_module_server_timeouts(request, _module_scoped_servers, remaining_timeout):
+    """Autouse fixture that sets timeout, enters subcontext, and records log positions for module-scoped servers."""
+    with contextlib.ExitStack() as stack:
+        for server in _module_scoped_servers:
+            stack.enter_context(server.start_new_test(remaining_timeout))
+            _record_server_for_log_reporting(request, server)
+        yield
+
+
+@pytest.hookimpl(hookwrapper=True, trylast=True)
+def pytest_runtest_makereport(item, call):
+    """
+    Adds PostgreSQL server logs to the test report sections.
+    """
+    outcome = yield
+    report = outcome.get_result()
+
+    if report.when != "call":
+        return
+
+    if _servers_key not in item.stash:
+        return
+
+    servers = item.stash[_servers_key]
+    del item.stash[_servers_key]
+
+    include_name = len(servers) > 1
+
+    for server in servers:
+        content = server.log_content()
+        if content.strip():
+            section_title = "Postgres log"
+            if include_name:
+                section_title += f" ({server.name})"
+            report.sections.append((section_title, content))
diff --git a/src/test/pytest/pypg/server.py b/src/test/pytest/pypg/server.py
new file mode 100644
index 00000000000..9242ab25007
--- /dev/null
+++ b/src/test/pytest/pypg/server.py
@@ -0,0 +1,470 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import os
+import pathlib
+import platform
+import re
+import shutil
+import socket
+import subprocess
+import tempfile
+from collections import namedtuple
+from typing import Callable, Optional
+
+from .util import run
+from libpq import PGconn, connect as libpq_connect
+
+
+class FileBackup(contextlib.AbstractContextManager):
+    """
+    A context manager which backs up a file's contents, restoring them on exit.
+    """
+
+    def __init__(self, file: pathlib.Path):
+        super().__init__()
+
+        self._file = file
+
+    def __enter__(self):
+        with tempfile.NamedTemporaryFile(
+            prefix=self._file.name, dir=self._file.parent, delete=False
+        ) as f:
+            self._backup = pathlib.Path(f.name)
+
+        shutil.copyfile(self._file, self._backup)
+
+        return self
+
+    def __exit__(self, *exc):
+        # Swap the backup and the original file, so that the modified contents
+        # can still be inspected in case of failure.
+        tmp = self._backup.parent / (self._backup.name + ".tmp")
+
+        shutil.copyfile(self._file, tmp)
+        shutil.copyfile(self._backup, self._file)
+        shutil.move(tmp, self._backup)
+
+
+class HBA(FileBackup):
+    """
+    Backs up a server's HBA configuration and provides means for temporarily
+    editing it.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "pg_hba.conf")
+
+    def prepend(self, *lines):
+        """
+        Temporarily prepends lines to the server's pg_hba.conf.
+
+        As sugar for aligning HBA columns in the tests, each line can be either
+        a string or a list of strings. List elements will be joined by single
+        spaces before they are written to file.
+        """
+        with open(self._file, "r") as f:
+            prior_data = f.read()
+
+        with open(self._file, "w") as f:
+            for line in lines:
+                if isinstance(line, list):
+                    print(*line, file=f)
+                else:
+                    print(line, file=f)
+
+            f.write(prior_data)
+
+
+class Config(FileBackup):
+    """
+    Backs up a server's postgresql.conf and provides means for temporarily
+    editing it.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "postgresql.conf")
+
+    def set(self, **gucs):
+        """
+        Temporarily appends GUC settings to the server's postgresql.conf.
+        """
+
+        with open(self._file, "a") as f:
+            print(file=f)
+
+            for n, v in gucs.items():
+                v = str(v)
+
+                # TODO: proper quoting
+                v = v.replace("\\", "\\\\")
+                v = v.replace("'", "\\'")
+                v = "'{}'".format(v)
+
+                print(n, "=", v, file=f)
+
+
+Backup = namedtuple("Backup", "conf, hba")
+
+
+class PostgresServer:
+    """
+    Represents a running PostgreSQL server instance with management utilities.
+    Provides methods for configuration, user/database creation, and server control.
+    """
+
+    def __init__(
+        self,
+        name,
+        bindir,
+        datadir,
+        sockdir,
+        libpq_handle,
+        *,
+        hostaddr: Optional[str] = None,
+        port: Optional[int] = None,
+    ):
+        """
+        Initialize and start a PostgreSQL server instance.
+
+        Args:
+            name: The name of this server instance (for logging purposes)
+            bindir: Path to PostgreSQL bin directory
+            datadir: Path to data directory for this server
+            sockdir: Path to directory for Unix sockets
+            libpq_handle: ctypes handle to libpq
+            hostaddr: If provided, use this specific address (e.g., "127.0.0.2")
+            port: If provided, use this port instead of finding a free one,
+                is currently only allowed if hostaddr is also provided
+        """
+
+        if hostaddr is None and port is not None:
+            raise NotImplementedError("port was provided without hostaddr")
+
+        self.name = name
+        self.datadir = datadir
+        self.sockdir = sockdir
+        self.libpq_handle = libpq_handle
+        self._remaining_timeout_fn: Optional[Callable[[], float]] = None
+        self._bindir = bindir
+        self._pg_ctl = bindir / "pg_ctl"
+        self.log = datadir / "postgresql.log"
+        self._log_start_pos = 0
+
+        # Determine whether to use Unix sockets
+        use_unix_sockets = platform.system() != "Windows" and hostaddr is None
+
+        # Use INITDB_TEMPLATE if available (much faster than running initdb)
+        initdb_template = os.environ.get("INITDB_TEMPLATE")
+        if initdb_template and os.path.isdir(initdb_template):
+            shutil.copytree(initdb_template, datadir)
+        else:
+            if platform.system() == "Windows":
+                auth_method = "trust"
+            else:
+                auth_method = "peer"
+            run(
+                bindir / "initdb",
+                "--no-sync",
+                "--auth",
+                auth_method,
+                "--pgdata",
+                self.datadir,
+            )
+
+        # Figure out a port to listen on. Attempt to reserve both IPv4 and IPv6
+        # addresses in one go.
+        #
+        # Note: socket.has_dualstack_ipv6/create_server are only in Python 3.8+.
+        if hostaddr is not None:
+            # Explicit address provided
+            addrs: list[str] = [hostaddr]
+            temp_sock = socket.socket()
+            if port is None:
+                temp_sock.bind((hostaddr, 0))
+                _, port = temp_sock.getsockname()
+
+        elif hasattr(socket, "has_dualstack_ipv6") and socket.has_dualstack_ipv6():
+            addr = ("::1", 0)
+            temp_sock = socket.create_server(
+                addr, family=socket.AF_INET6, dualstack_ipv6=True
+            )
+
+            hostaddr, port, _, _ = temp_sock.getsockname()
+            assert hostaddr is not None
+            addrs = [hostaddr, "127.0.0.1"]
+
+        else:
+            addr = ("127.0.0.1", 0)
+
+            temp_sock = socket.socket()
+            temp_sock.bind(addr)
+
+            hostaddr, port = temp_sock.getsockname()
+            assert hostaddr is not None
+            addrs = [hostaddr]
+
+        # Store the computed values
+        self.hostaddr = hostaddr
+        self.port = port
+        # Including the host to use for connections - either the socket
+        # directory or TCP address
+        if use_unix_sockets:
+            self.host = str(sockdir)
+        else:
+            self.host = hostaddr
+
+        with open(os.path.join(datadir, "postgresql.conf"), "a") as f:
+            print(file=f)
+            if use_unix_sockets:
+                print(
+                    "unix_socket_directories = '{}'".format(sockdir.as_posix()),
+                    file=f,
+                )
+            else:
+                # Disable Unix sockets when using TCP to avoid lock conflicts
+                print("unix_socket_directories = ''", file=f)
+            print("listen_addresses = '{}'".format(",".join(addrs)), file=f)
+            print("port =", port, file=f)
+            print("log_connections = all", file=f)
+            print("fsync = off", file=f)
+            print("datestyle = 'ISO'", file=f)
+            print("timezone = 'UTC'", file=f)
+
+        # Between closing of the socket, s, and server start, we're racing
+        # against anything that wants to open up ephemeral ports, so try not to
+        # put any new work here.
+
+        temp_sock.close()
+        self.pg_ctl("start")
+
+        # Read the PID file to get the postmaster PID
+        with open(os.path.join(datadir, "postmaster.pid")) as f:
+            self.pid = int(f.readline().strip())
+
+        # ExitStack for cleanup callbacks
+        self._cleanup_stack = contextlib.ExitStack()
+
+    def current_log_position(self):
+        """Get the current end position of the log file."""
+        if self.log.exists():
+            return self.log.stat().st_size
+        return 0
+
+    def reset_log_position(self):
+        """Mark current log position as start for log_content()."""
+        self._log_start_pos = self.current_log_position()
+
+    @contextlib.contextmanager
+    def start_new_test(self, remaining_timeout):
+        """
+        Prepare server for a new test.
+
+        Sets timeout, resets log position, and enters a cleanup subcontext.
+        """
+        self.set_timeout(remaining_timeout)
+        self.reset_log_position()
+        with self.subcontext():
+            yield self
+
+    def psql(self, *args):
+        """Run psql with the given arguments."""
+        self._run(os.path.join(self._bindir, "psql"), "-w", *args)
+
+    def sql(self, query):
+        """Execute a SQL query via libpq. Returns simplified results."""
+        with self.connect() as conn:
+            return conn.sql(query)
+
+    def pg_ctl(self, *args):
+        """Run pg_ctl with the given arguments."""
+        self._run(self._pg_ctl, "--pgdata", self.datadir, "--log", self.log, *args)
+
+    def _run(self, cmd, *args, addenv: Optional[dict] = None):
+        """Run a command with PG* environment variables set."""
+        subenv = dict(os.environ)
+        subenv.update(
+            {
+                "PGHOST": str(self.host),
+                "PGPORT": str(self.port),
+                "PGDATABASE": "postgres",
+                "PGDATA": str(self.datadir),
+            }
+        )
+        if addenv:
+            subenv.update(addenv)
+        run(cmd, *args, env=subenv)
+
+    def create_users(self, *userkeys: str):
+        """Create test users and register them for cleanup."""
+        usermap = {}
+        for u in userkeys:
+            name = u + "user"
+            usermap[u] = name
+            self.psql("-c", "CREATE USER " + name)
+            self._cleanup_stack.callback(self.psql, "-c", "DROP USER " + name)
+        return usermap
+
+    def create_dbs(self, *dbkeys: str):
+        """Create test databases and register them for cleanup."""
+        dbmap = {}
+        for d in dbkeys:
+            name = d + "db"
+            dbmap[d] = name
+            self.psql("-c", "CREATE DATABASE " + name)
+            self._cleanup_stack.callback(self.psql, "-c", "DROP DATABASE " + name)
+        return dbmap
+
+    @contextlib.contextmanager
+    def reloading(self):
+        """
+        Provides a context manager for making configuration changes.
+
+        If the context suite finishes successfully, the configuration will
+        be reloaded via pg_ctl. On teardown, the configuration changes will
+        be unwound, and the server will be signaled to reload again.
+
+        The context target contains the following attributes which can be
+        used to configure the server:
+        - .conf: modifies postgresql.conf
+        - .hba: modifies pg_hba.conf
+
+        For example:
+
+            with pg_server_session.reloading() as s:
+                s.conf.set(log_connections="on")
+                s.hba.prepend("local all all trust")
+        """
+        # Push a reload onto the stack before making any other
+        # unwindable changes. That way the order of operations will be
+        #
+        #  # test
+        #   - config change 1
+        #   - config change 2
+        #   - reload
+        #  # teardown
+        #   - undo config change 2
+        #   - undo config change 1
+        #   - reload
+        #
+        self._cleanup_stack.callback(self.pg_ctl, "reload")
+        yield self._backup_configuration()
+
+        # Now actually reload
+        self.pg_ctl("reload")
+
+    @contextlib.contextmanager
+    def restarting(self):
+        """Like .reloading(), but with a full server restart."""
+        self._cleanup_stack.callback(self.pg_ctl, "restart")
+        yield self._backup_configuration()
+        self.pg_ctl("restart")
+
+    def _backup_configuration(self):
+        # Wrap the existing HBA and configuration with FileBackups.
+        return Backup(
+            hba=self._cleanup_stack.enter_context(HBA(self.datadir)),
+            conf=self._cleanup_stack.enter_context(Config(self.datadir)),
+        )
+
+    @contextlib.contextmanager
+    def subcontext(self):
+        """
+        Create a new cleanup context for per-test isolation.
+
+        Temporarily replaces the cleanup stack so that any cleanup callbacks
+        registered within this context will be cleaned up when the context exits.
+        """
+        old_stack = self._cleanup_stack
+        self._cleanup_stack = contextlib.ExitStack()
+        try:
+            self._cleanup_stack.__enter__()
+            yield self
+        finally:
+            self._cleanup_stack.__exit__(None, None, None)
+            self._cleanup_stack = old_stack
+
+    def stop(self, mode="fast"):
+        """
+        Stop the PostgreSQL server instance.
+
+        Ignores failures if the server is already stopped.
+        """
+        try:
+            self.pg_ctl("stop", "--mode", mode)
+        except subprocess.CalledProcessError:
+            # Server may have already been stopped
+            pass
+
+    def log_content(self) -> str:
+        """Return log content from the current context's start position."""
+        with open(self.log) as f:
+            f.seek(self._log_start_pos)
+            return f.read()
+
+    @contextlib.contextmanager
+    def log_contains(self, pattern, times=None):
+        """
+        Context manager that checks if the log matches pattern during the block.
+
+        Args:
+            pattern: The regex pattern to search for.
+            times: If None, any number of matches is accepted.
+                   If a number, exactly that many matches are required.
+        """
+        start_pos = self.current_log_position()
+        yield
+        with open(self.log) as f:
+            f.seek(start_pos)
+            content = f.read()
+        if times is None:
+            assert re.search(pattern, content), f"Pattern {pattern!r} not found in log"
+        else:
+            match_count = len(re.findall(pattern, content))
+            assert match_count == times, (
+                f"Expected {times} matches of {pattern!r}, found {match_count}"
+            )
+
+    def cleanup(self):
+        """Run all registered cleanup callbacks."""
+        self._cleanup_stack.close()
+
+    def set_timeout(self, remaining_timeout_fn: Callable[[], float]) -> None:
+        """
+        Set the timeout function for connections.
+        This is typically called by pg fixture for each test.
+        """
+        self._remaining_timeout_fn = remaining_timeout_fn
+
+    def connect(self, **opts) -> PGconn:
+        """
+        Creates a connection to this PostgreSQL server instance.
+
+        Args:
+            **opts: Additional connection options (can override defaults)
+
+        Returns:
+            PGconn: Connected database connection
+
+        Example:
+            conn = pg.connect()
+            conn = pg.connect(dbname='mydb')
+        """
+        if self._remaining_timeout_fn is None:
+            raise RuntimeError(
+                "Timeout function not set. Use set_timeout() or pg fixture."
+            )
+
+        defaults = {
+            "host": self.host,
+            "port": self.port,
+            "dbname": "postgres",
+        }
+        defaults.update(opts)
+
+        return libpq_connect(
+            self.libpq_handle,
+            self._cleanup_stack,
+            self._remaining_timeout_fn,
+            **defaults,
+        )
diff --git a/src/test/pytest/pypg/util.py b/src/test/pytest/pypg/util.py
new file mode 100644
index 00000000000..b2a1e627e4b
--- /dev/null
+++ b/src/test/pytest/pypg/util.py
@@ -0,0 +1,42 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import shlex
+import subprocess
+import sys
+
+
+def eprint(*args, **kwargs):
+    """eprint prints to stderr"""
+    print(*args, file=sys.stderr, **kwargs)
+
+
+def run(*command, check=True, shell=None, silent=False, **kwargs):
+    """run runs the given command and prints it to stderr"""
+
+    if shell is None:
+        shell = len(command) == 1 and isinstance(command[0], str)
+
+    if shell:
+        command = command[0]
+    else:
+        command = list(map(str, command))
+
+    if not silent:
+        if shell:
+            eprint(f"+ {command}")
+        else:
+            # We could normally use shlex.join here, but it's not available in
+            # Python 3.6 which we still like to support
+            unsafe_string_cmd = " ".join(map(shlex.quote, command))
+            eprint(f"+ {unsafe_string_cmd}")
+
+    if silent:
+        kwargs.setdefault("stdout", subprocess.DEVNULL)
+
+    return subprocess.run(command, check=check, shell=shell, **kwargs)
+
+
+def capture(command, *args, stdout=subprocess.PIPE, encoding="utf-8", **kwargs):
+    return run(
+        command, *args, stdout=stdout, encoding=encoding, **kwargs
+    ).stdout.removesuffix("\n")
diff --git a/src/test/pytest/pyt/conftest.py b/src/test/pytest/pyt/conftest.py
new file mode 100644
index 00000000000..dd73917c68c
--- /dev/null
+++ b/src/test/pytest/pyt/conftest.py
@@ -0,0 +1 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
diff --git a/src/test/pytest/pyt/test_errors.py b/src/test/pytest/pyt/test_errors.py
new file mode 100644
index 00000000000..771fe8f76e3
--- /dev/null
+++ b/src/test/pytest/pyt/test_errors.py
@@ -0,0 +1,34 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+Tests for libpq error types and SQLSTATE-based exception mapping.
+"""
+
+import pytest
+from libpq import LibpqError
+
+
+def test_syntax_error(conn):
+    """Invalid SQL syntax raises LibpqError with correct SQLSTATE."""
+    with pytest.raises(LibpqError) as exc_info:
+        conn.sql("SELEC 1")
+
+    err = exc_info.value
+    assert err.sqlstate == "42601"
+    assert err.sqlstate_class == "42"
+    assert "syntax" in str(err).lower()
+
+
+def test_unique_violation(conn):
+    """Unique violation includes all error fields."""
+    conn.sql("CREATE TEMP TABLE test_uv (id int CONSTRAINT test_uv_pk PRIMARY KEY)")
+    conn.sql("INSERT INTO test_uv VALUES (1)")
+
+    with pytest.raises(LibpqError) as exc_info:
+        conn.sql("INSERT INTO test_uv VALUES (1)")
+
+    err = exc_info.value
+    assert err.sqlstate == "23505"
+    assert err.table_name == "test_uv"
+    assert err.constraint_name == "test_uv_pk"
+    assert err.detail == "Key (id)=(1) already exists."
diff --git a/src/test/pytest/pyt/test_libpq.py b/src/test/pytest/pyt/test_libpq.py
new file mode 100644
index 00000000000..4fcf4056f41
--- /dev/null
+++ b/src/test/pytest/pyt/test_libpq.py
@@ -0,0 +1,172 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import os
+import socket
+import struct
+import threading
+from typing import Callable
+
+import pytest
+
+from libpq import connstr, LibpqError
+
+
+@pytest.mark.parametrize(
+    "opts, expected",
+    [
+        (dict(), ""),
+        (dict(port=5432), "port=5432"),
+        (dict(port=5432, dbname="postgres"), "port=5432 dbname=postgres"),
+        (dict(host=""), "host=''"),
+        (dict(host=" "), r"host=' '"),
+        (dict(keyword="'"), r"keyword=\'"),
+        (dict(keyword=" \\' "), r"keyword=' \\\' '"),
+    ],
+)
+def test_connstr(opts, expected):
+    """Tests the escape behavior for connstr()."""
+    assert connstr(opts) == expected
+
+
+def test_must_connect_errors(connect):
+    """Tests that connect() raises LibpqError."""
+    with pytest.raises(LibpqError, match="invalid connection option"):
+        connect(some_unknown_keyword="whatever")
+
+
+@pytest.fixture
+def local_server(tmp_path, remaining_timeout):
+    """
+    Opens up a local UNIX socket for mocking a Postgres server on a background
+    thread. See the _Server API for usage.
+
+    This fixture requires AF_UNIX support; dependent tests will be skipped on
+    platforms that don't provide it.
+    """
+
+    try:
+        from socket import AF_UNIX
+    except ImportError:
+        pytest.skip("AF_UNIX not supported on this platform")
+
+    class _Server(contextlib.ExitStack):
+        """
+        Implementation class for local_server. See .background() for the primary
+        entry point for tests. Postgres clients may connect to this server via
+        local_server.host/local_server.port.
+
+        _Server derives from contextlib.ExitStack to provide easy cleanup of
+        associated resources; see the documentation for that class for a full
+        explanation.
+        """
+
+        def __init__(self):
+            super().__init__()
+
+            self.host = tmp_path
+            self.port = 5432
+
+            self._thread = None
+            self._thread_exc = None
+            self._listener = self.enter_context(
+                socket.socket(AF_UNIX, socket.SOCK_STREAM),
+            )
+
+        def bind_and_listen(self):
+            """
+            Does the actual work of binding the UNIX socket using the Postgres
+            server conventions and listening for connections.
+
+            The listen backlog is currently hardcoded to one.
+            """
+            sockfile = self.host / ".s.PGSQL.{}".format(self.port)
+
+            # Lock down the permissions on the new socket.
+            prev_mask = os.umask(0o077)
+
+            # Bind (creating the socket file), and immediately register it for
+            # deletion from disk when the stack is cleaned up.
+            self._listener.bind(bytes(sockfile))
+            self.callback(os.unlink, sockfile)
+
+            os.umask(prev_mask)
+
+            self._listener.listen(1)
+
+        def background(self, fn: Callable[[socket.socket], None]) -> None:
+            """
+            Accepts a client connection on a background thread and passes it to
+            the provided callback. Any exceptions raised from the callback will
+            be re-raised on the main thread during fixture teardown.
+
+            Blocking operations on the connected socket default to using the
+            remaining_timeout(), though this can be changed by the test via the
+            socket's .settimeout().
+            """
+
+            def _bg():
+                try:
+                    self._listener.settimeout(remaining_timeout())
+                    sock, _ = self._listener.accept()
+
+                    with sock:
+                        sock.settimeout(remaining_timeout())
+                        fn(sock)
+
+                except Exception as e:
+                    # Save the exception for re-raising on the main thread.
+                    self._thread_exc = e
+
+            # TODO: rather than using callback(), consider explicitly signaling
+            # the fn() implementation to stop early if we get an exception.
+            # Otherwise we'll hang until the end of the timeout.
+            self._thread = threading.Thread(target=_bg)
+            self.callback(self._join)
+
+            self._thread.start()
+
+        def _join(self):
+            """
+            Waits for the background thread to finish and raises any thrown
+            exception. This is called during fixture teardown.
+            """
+            # Give a little bit of wiggle room on the join timeout, since we're
+            # racing against the test's own use of remaining_timeout(). (It's
+            # preferable to let tests report timeouts; the stack traces will
+            # help with debugging.)
+            self._thread.join(remaining_timeout() + 1)
+            if self._thread.is_alive():
+                raise TimeoutError("background thread is still running after timeout")
+
+            if self._thread_exc is not None:
+                raise self._thread_exc
+
+    with _Server() as s:
+        s.bind_and_listen()
+        yield s
+
+
+def test_connection_is_finished_on_error(connect, local_server):
+    """Tests that PQfinish() gets called at the end of testing."""
+    expected_error = "something is wrong"
+
+    def serve_error(s: socket.socket) -> None:
+        pktlen = struct.unpack("!I", s.recv(4))[0]
+
+        # Quick check for the startup packet version.
+        version = struct.unpack("!HH", s.recv(4))
+        assert version == (3, 0)
+
+        # Discard the remainder of the startup packet and send a v2 error.
+        s.recv(pktlen - 8)
+        s.send(b"E" + expected_error.encode() + b"\0")
+
+        # And now the socket should be closed.
+        assert not s.recv(1), "client sent unexpected data"
+
+    local_server.background(serve_error)
+
+    with pytest.raises(LibpqError, match=expected_error):
+        # Exiting this context should result in PQfinish().
+        connect(host=local_server.host, port=local_server.port)
diff --git a/src/test/pytest/pyt/test_multi_server.py b/src/test/pytest/pyt/test_multi_server.py
new file mode 100644
index 00000000000..8ee045b0cc8
--- /dev/null
+++ b/src/test/pytest/pyt/test_multi_server.py
@@ -0,0 +1,46 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+Tests demonstrating multi-server functionality using create_pg fixture.
+
+These tests verify that the pytest infrastructure correctly handles
+multiple PostgreSQL server instances within a single test, and that
+module-scoped servers persist across tests.
+"""
+
+import pytest
+
+
+def test_multiple_servers_basic(create_pg):
+    """Test that we can create and connect to multiple servers."""
+    node1 = create_pg("primary")
+    node2 = create_pg("secondary")
+
+    conn1 = node1.connect()
+    conn2 = node2.connect()
+
+    # Each server should have its own data directory
+    datadir1 = conn1.sql("SHOW data_directory")
+    datadir2 = conn2.sql("SHOW data_directory")
+    assert datadir1 != datadir2
+
+    # Each server should be listening on a different port
+    assert node1.port != node2.port
+
+
+@pytest.fixture(scope="module")
+def shared_server(create_pg_module):
+    """A server shared across all tests in this module."""
+    server = create_pg_module("shared")
+    server.sql("CREATE TABLE module_state (value int DEFAULT 0)")
+    return server
+
+
+def test_module_server_create_row(shared_server):
+    """First test: create a row in the shared server."""
+    shared_server.connect().sql("INSERT INTO module_state VALUES (42)")
+
+
+def test_module_server_see_row(shared_server):
+    """Second test: verify we see the row from the previous test."""
+    assert shared_server.connect().sql("SELECT value FROM module_state") == 42
diff --git a/src/test/pytest/pyt/test_query_helpers.py b/src/test/pytest/pyt/test_query_helpers.py
new file mode 100644
index 00000000000..abcd9084214
--- /dev/null
+++ b/src/test/pytest/pyt/test_query_helpers.py
@@ -0,0 +1,347 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+Tests for query helper functions with type conversion and result simplification.
+"""
+
+import uuid
+
+import pytest
+
+
+def test_single_cell_int(conn):
+    """Single cell integer query returns just the value."""
+    result = conn.sql("SELECT 1")
+    assert result == 1
+    assert isinstance(result, int)
+
+
+def test_single_cell_string(conn):
+    """Single cell string query returns just the value."""
+    result = conn.sql("SELECT 'hello'")
+    assert result == "hello"
+    assert isinstance(result, str)
+
+
+def test_single_cell_bool(conn):
+    """Single cell boolean query returns just the value."""
+
+    result = conn.sql("SELECT true")
+    assert result is True
+    assert isinstance(result, bool)
+
+    result = conn.sql("SELECT false")
+    assert result is False
+
+
+def test_single_cell_float(conn):
+    """Single cell float query returns just the value."""
+
+    result = conn.sql("SELECT 3.14::float4")
+    assert isinstance(result, float)
+    assert abs(result - 3.14) < 0.01
+
+
+def test_single_cell_null(conn):
+    """Single cell NULL query returns None."""
+
+    result = conn.sql("SELECT NULL")
+    assert result is None
+
+
+def test_single_row_multiple_columns(conn):
+    """Single row with multiple columns returns a tuple."""
+
+    result = conn.sql("SELECT 1, 'hello', true")
+    assert result == (1, "hello", True)
+    assert isinstance(result, tuple)
+
+
+def test_single_column_multiple_rows(conn):
+    """Single column with multiple rows returns a list of values."""
+
+    result = conn.sql("SELECT * FROM generate_series(1, 3)")
+    assert result == [1, 2, 3]
+    assert isinstance(result, list)
+
+
+def test_multiple_rows_and_columns(conn):
+    """Multiple rows and columns returns list of tuples."""
+
+    result = conn.sql("SELECT * FROM (VALUES (1, 'a'), (2, 'b'), (3, 'c')) AS t")
+    assert result == [(1, "a"), (2, "b"), (3, "c")]
+    assert isinstance(result, list)
+    assert all(isinstance(row, tuple) for row in result)
+
+
+def test_empty_result(conn):
+    """Empty result set returns empty list."""
+
+    result = conn.sql("SELECT 1 WHERE false")
+    assert result == []
+
+
+def test_query_error_handling(conn):
+    """Query errors raise RuntimeError with actual error message."""
+
+    with pytest.raises(RuntimeError) as exc_info:
+        conn.sql("SELECT * FROM nonexistent_table")
+
+    error_msg = str(exc_info.value)
+    assert "nonexistent_table" in error_msg or "does not exist" in error_msg
+
+
+def test_division_by_zero_error(conn):
+    """Division by zero raises RuntimeError."""
+
+    with pytest.raises(RuntimeError) as exc_info:
+        conn.sql("SELECT 1/0")
+
+    error_msg = str(exc_info.value)
+    assert "division by zero" in error_msg.lower()
+
+
+def test_simple_exec_create_table(conn):
+    """sql for CREATE TABLE returns None."""
+
+    result = conn.sql("CREATE TEMP TABLE test_table (id int, name text)")
+    assert result is None
+
+    # Verify table was created
+    count = conn.sql("SELECT COUNT(*) FROM test_table")
+    assert count == 0
+
+
+def test_simple_exec_insert(conn):
+    """sql for INSERT returns None."""
+
+    conn.sql("CREATE TEMP TABLE test_table (id int, name text)")
+    result = conn.sql("INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob')")
+    assert result is None
+
+    # Verify data was inserted
+    count = conn.sql("SELECT COUNT(*) FROM test_table")
+    assert count == 2
+
+
+def test_type_conversion_mixed(conn):
+    """Test mixed type conversion in a single row."""
+
+    result = conn.sql("SELECT 42::int4, 123::int8, 3.14::float8, 'text', true, NULL")
+    assert result == (42, 123, 3.14, "text", True, None)
+    assert isinstance(result[0], int)
+    assert isinstance(result[1], int)
+    assert isinstance(result[2], float)
+    assert isinstance(result[3], str)
+    assert isinstance(result[4], bool)
+    assert result[5] is None
+
+
+def test_multiple_queries_same_connection(conn):
+    """Test running multiple queries on the same connection."""
+
+    result1 = conn.sql("SELECT 1")
+    assert result1 == 1
+
+    result2 = conn.sql("SELECT 'hello', 'world'")
+    assert result2 == ("hello", "world")
+
+    result3 = conn.sql("SELECT * FROM generate_series(1, 5)")
+    assert result3 == [1, 2, 3, 4, 5]
+
+
+def test_date_type(conn):
+    """Test date type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '2025-10-20'::date")
+    assert result == datetime.date(2025, 10, 20)
+    assert isinstance(result, datetime.date)
+
+
+def test_timestamp_type(conn):
+    """Test timestamp type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '2025-10-20 15:30:45'::timestamp")
+    assert result == datetime.datetime(2025, 10, 20, 15, 30, 45)
+    assert isinstance(result, datetime.datetime)
+
+
+def test_time_type(conn):
+    """Test time type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '15:30:45'::time")
+    assert result == datetime.time(15, 30, 45)
+    assert isinstance(result, datetime.time)
+
+
+def test_numeric_type(conn):
+    """Test numeric/decimal type conversion."""
+    import decimal
+
+    result = conn.sql("SELECT 123.456::numeric")
+    assert result == decimal.Decimal("123.456")
+    assert isinstance(result, decimal.Decimal)
+
+
+def test_int_array(conn):
+    """Test integer array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[1, 2, 3, 4, 5]")
+    assert result == [1, 2, 3, 4, 5]
+    assert isinstance(result, list)
+    assert all(isinstance(x, int) for x in result)
+
+
+def test_text_array(conn):
+    """Test text array type conversion."""
+
+    result = conn.sql("SELECT ARRAY['hello', 'world', 'test']")
+    assert result == ["hello", "world", "test"]
+    assert isinstance(result, list)
+    assert all(isinstance(x, str) for x in result)
+
+
+def test_bool_array(conn):
+    """Test boolean array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[true, false, true]")
+    assert result == [True, False, True]
+    assert isinstance(result, list)
+    assert all(isinstance(x, bool) for x in result)
+
+
+def test_empty_array(conn):
+    """Test empty array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[]::int[]")
+    assert result == []
+    assert isinstance(result, list)
+
+
+def test_json_type(conn):
+    """Test JSON type (parsed to dict)."""
+
+    result = conn.sql('SELECT \'{"key": "value"}\'::json')
+    assert isinstance(result, dict)
+    assert result == {"key": "value"}
+
+
+def test_jsonb_type(conn):
+    """Test JSONB type (parsed to dict)."""
+
+    result = conn.sql('SELECT \'{"name": "test", "count": 42}\'::jsonb')
+    assert isinstance(result, dict)
+    assert result == {"name": "test", "count": 42}
+
+
+def test_json_array(conn):
+    """Test JSON array type."""
+
+    result = conn.sql("SELECT '[1, 2, 3, 4, 5]'::json")
+    assert isinstance(result, list)
+    assert result == [1, 2, 3, 4, 5]
+
+
+def test_json_nested(conn):
+    """Test nested JSON object."""
+
+    result = conn.sql(
+        'SELECT \'{"user": {"id": 1, "name": "Alice"}, "active": true}\'::json'
+    )
+    assert isinstance(result, dict)
+    assert result == {"user": {"id": 1, "name": "Alice"}, "active": True}
+
+
+def test_mixed_types_with_arrays(conn):
+    """Test mixed types including arrays in a single row."""
+
+    result = conn.sql("SELECT 42, 'text', ARRAY[1, 2, 3], true")
+    assert result == (42, "text", [1, 2, 3], True)
+    assert isinstance(result[0], int)
+    assert isinstance(result[1], str)
+    assert isinstance(result[2], list)
+    assert isinstance(result[3], bool)
+
+
+def test_uuid_type(conn):
+    """Test UUID type conversion."""
+    test_uuid = "550e8400-e29b-41d4-a716-446655440000"
+    result = conn.sql(f"SELECT '{test_uuid}'::uuid")
+    assert result == uuid.UUID(test_uuid)
+    assert isinstance(result, uuid.UUID)
+
+
+def test_uuid_generation(conn):
+    """Test generated UUID type conversion."""
+    result = conn.sql("SELECT uuidv4()")
+    assert isinstance(result, uuid.UUID)
+    # Check it's a valid UUID by ensuring it can be converted to string
+    assert len(str(result)) == 36  # UUID string format length
+
+
+def test_text_array_with_commas(conn):
+    """Test text array with elements containing commas."""
+
+    result = conn.sql("SELECT ARRAY['A,B', 'C', ' D ']")
+    assert result == ["A,B", "C", " D "]
+
+
+def test_text_array_with_quotes(conn):
+    """Test text array with elements containing quotes."""
+
+    result = conn.sql(r"SELECT ARRAY[E'a\"b', 'c']")
+    assert result == ['a"b', "c"]
+
+
+def test_text_array_with_backslash(conn):
+    """Test text array with elements containing backslashes."""
+
+    result = conn.sql(r"SELECT ARRAY[E'a\\b', 'c']")
+    assert result == ["a\\b", "c"]
+
+
+def test_json_array_type(conn):
+    """Test array of JSON values with embedded quotes and commas."""
+
+    result = conn.sql("""SELECT ARRAY['{"abc": 123, "xyz": 456}'::json]""")
+    assert result == [{"abc": 123, "xyz": 456}]
+
+
+def test_json_array_multiple(conn):
+    """Test array of multiple JSON objects."""
+
+    result = conn.sql(
+        """SELECT ARRAY['{"a": 1}'::json, '{"b": 2}'::json, '["x", "y"]'::json]"""
+    )
+    assert result == [{"a": 1}, {"b": 2}, ["x", "y"]]
+
+
+def test_2d_int_array(conn):
+    """Test 2D integer array."""
+
+    result = conn.sql("SELECT ARRAY[[1,2],[3,4]]")
+    assert result == [[1, 2], [3, 4]]
+
+
+def test_2d_text_array(conn):
+    """Test 2D integer array."""
+
+    result = conn.sql("SELECT ARRAY[['a','b'],['c','d,e']]")
+    assert result == [["a", "b"], ["c", "d,e"]]
+
+
+def test_3d_int_array(conn):
+    """Test 3D integer array."""
+
+    result = conn.sql("SELECT ARRAY[[[1,2],[3,4]],[[5,6],[7,8]]]")
+    assert result == [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
+
+
+def test_array_with_null(conn):
+    """Test array with NULL elements."""
+
+    result = conn.sql("SELECT ARRAY[1, NULL, 3]")
+    assert result == [1, None, 3]
-- 
2.52.0

