From a428c5f8e194595cd6a8d8ffca79a9189bc8aa9f Mon Sep 17 00:00:00 2001
From: Andrew Dunstan <andrew@dunslane.net>
Date: Tue, 2 Jun 2026 11:27:38 -0400
Subject: [PATCH v12 1/3] Add PostgreSQL::Test::Session for libpq-based TAP
 test sessions

Introduce a session object for TAP tests that talks to the server
directly through libpq instead of spawning a psql child process for
each background session.  Connecting in-process avoids the overhead and
fragility of driving psql over pipes and gives tests synchronous and
asynchronous query interfaces, notice capture, and pipelined queries.

libpq is reached through an FFI::Platypus wrapper, so no compiled XS
module is required.  The new modules are:

  PostgreSQL::Test::Session  - the libpq session object
  PostgreSQL::PqFFI          - FFI bindings to libpq
  PostgreSQL::PqConstants    - libpq constants
  PostgreSQL::PGTypes        - libpq type OID definitions
  PostgreSQL::FindLib        - locate the libpq shared library

Install the new modules from both the make and meson builds.
---
 src/test/perl/Makefile                   |   10 +
 src/test/perl/PostgreSQL/FindLib.pm      |  164 ++++
 src/test/perl/PostgreSQL/PGTypes.pm      |  356 +++++++
 src/test/perl/PostgreSQL/PqConstants.pm  |  185 ++++
 src/test/perl/PostgreSQL/PqFFI.pm        |  437 +++++++++
 src/test/perl/PostgreSQL/Test/Session.pm | 1075 ++++++++++++++++++++++
 src/test/perl/meson.build                |    5 +
 7 files changed, 2232 insertions(+)
 create mode 100644 src/test/perl/PostgreSQL/FindLib.pm
 create mode 100644 src/test/perl/PostgreSQL/PGTypes.pm
 create mode 100644 src/test/perl/PostgreSQL/PqConstants.pm
 create mode 100644 src/test/perl/PostgreSQL/PqFFI.pm
 create mode 100644 src/test/perl/PostgreSQL/Test/Session.pm

diff --git a/src/test/perl/Makefile b/src/test/perl/Makefile
index fd4fdaf700b..d234cc59608 100644
--- a/src/test/perl/Makefile
+++ b/src/test/perl/Makefile
@@ -25,9 +25,14 @@ install: all installdirs
 	$(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/Kerberos.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Kerberos.pm'
 	$(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/Cluster.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Cluster.pm'
 	$(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/BackgroundPsql.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm'
+	$(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/Session.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Session.pm'
 	$(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/AdjustDump.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustDump.pm'
 	$(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/AdjustUpgrade.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustUpgrade.pm'
 	$(INSTALL_DATA) $(srcdir)/PostgreSQL/Version.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm'
+	$(INSTALL_DATA) $(srcdir)/PostgreSQL/FindLib.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/FindLib.pm'
+	$(INSTALL_DATA) $(srcdir)/PostgreSQL/PqFFI.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/PqFFI.pm'
+	$(INSTALL_DATA) $(srcdir)/PostgreSQL/PqConstants.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/PqConstants.pm'
+	$(INSTALL_DATA) $(srcdir)/PostgreSQL/PGTypes.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/PGTypes.pm'
 
 uninstall:
 	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Utils.pm'
@@ -36,8 +41,13 @@ uninstall:
 	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Kerberos.pm'
 	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Cluster.pm'
 	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm'
+	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Session.pm'
 	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustDump.pm'
 	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustUpgrade.pm'
 	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm'
+	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/FindLib.pm'
+	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/PqFFI.pm'
+	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/PqConstants.pm'
+	rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/PGTypes.pm'
 
 endif
diff --git a/src/test/perl/PostgreSQL/FindLib.pm b/src/test/perl/PostgreSQL/FindLib.pm
new file mode 100644
index 00000000000..b4290c84c4f
--- /dev/null
+++ b/src/test/perl/PostgreSQL/FindLib.pm
@@ -0,0 +1,164 @@
+
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+PostgreSQL::FindLib - find shared libraries for PostgreSQL TAP tests
+
+=head1 SYNOPSIS
+
+  use PostgreSQL::FindLib;
+
+  my $libpath = find_lib_or_die(
+      lib     => 'pq',
+      libpath => ['/usr/local/pgsql/lib'],
+  );
+
+=head1 DESCRIPTION
+
+This module provides a simple mechanism to locate shared libraries,
+used as a lightweight replacement for C<FFI::CheckLib>. It searches
+for libraries in specified paths and common system locations.
+
+=head1 EXPORTED FUNCTIONS
+
+=over
+
+=item find_lib_or_die(%args)
+
+Searches for a shared library and returns its full path. Dies if the
+library cannot be found.
+
+Arguments:
+
+=over
+
+=item lib => $name
+
+Required. The library name without prefix or suffix (e.g., C<'pq'> for
+C<libpq.so>).
+
+=item libpath => \@paths
+
+Optional. Array of directories to search first.
+
+=item systempath => \@paths
+
+Optional. If set to an empty array C<[]>, system paths will not be searched.
+
+=back
+
+=back
+
+=cut
+
+package PostgreSQL::FindLib;
+
+use strict;
+use warnings FATAL => qw(all);
+
+use Exporter qw(import);
+use File::Spec;
+use Config;
+
+our @EXPORT = qw(find_lib_or_die);
+
+sub find_lib_or_die
+{
+	my %args = @_;
+
+	my $libname = $args{lib} or die "find_lib_or_die: 'lib' argument required";
+	my $libpath = $args{libpath} // [];
+	my $systempath = $args{systempath};
+
+	my @search_paths = @$libpath;
+
+	# Add system paths unless explicitly disabled
+	unless (defined $systempath && ref($systempath) eq 'ARRAY' && @$systempath == 0)
+	{
+		push @search_paths, _get_system_lib_paths();
+	}
+
+	# Determine library file patterns based on OS
+	my @patterns = _get_lib_patterns($libname);
+
+	for my $dir (@search_paths)
+	{
+		next unless -d $dir;
+
+		for my $pattern (@patterns)
+		{
+			my @matches = glob(File::Spec->catfile($dir, $pattern));
+			for my $match (@matches)
+			{
+				return $match if -f $match && -r $match;
+			}
+		}
+	}
+
+	die "find_lib_or_die: unable to find lib$libname in: " . join(", ", @search_paths);
+}
+
+sub _get_lib_patterns
+{
+	my $libname = shift;
+
+	if ($^O eq 'darwin')
+	{
+		return ("lib$libname.dylib", "lib$libname.*.dylib");
+	}
+	elsif ($^O eq 'MSWin32' || $^O eq 'cygwin')
+	{
+		return ("$libname.dll", "lib$libname.dll");
+	}
+	else
+	{
+		# Linux and other Unix-like systems
+		return ("lib$libname.so", "lib$libname.so.*");
+	}
+}
+
+sub _get_system_lib_paths
+{
+	my @paths;
+
+	# Common system library paths
+	push @paths, '/usr/lib', '/usr/local/lib', '/lib';
+
+	# Add architecture-specific paths on Linux
+	if ($^O eq 'linux')
+	{
+		push @paths, '/usr/lib/x86_64-linux-gnu', '/usr/lib/aarch64-linux-gnu';
+		push @paths, '/usr/lib64', '/lib64';
+	}
+
+	# Add paths from LD_LIBRARY_PATH
+	if ($ENV{LD_LIBRARY_PATH})
+	{
+		push @paths, split(/:/, $ENV{LD_LIBRARY_PATH});
+	}
+
+	# macOS specific
+	if ($^O eq 'darwin')
+	{
+		push @paths, '/opt/homebrew/lib', '/usr/local/opt/libpq/lib';
+		if ($ENV{DYLD_LIBRARY_PATH})
+		{
+			push @paths, split(/:/, $ENV{DYLD_LIBRARY_PATH});
+		}
+	}
+
+	return @paths;
+}
+
+=pod
+
+=head1 SEE ALSO
+
+L<PostgreSQL::PqFFI>
+
+=cut
+
+1;
diff --git a/src/test/perl/PostgreSQL/PGTypes.pm b/src/test/perl/PostgreSQL/PGTypes.pm
new file mode 100644
index 00000000000..2bc007c323a
--- /dev/null
+++ b/src/test/perl/PostgreSQL/PGTypes.pm
@@ -0,0 +1,356 @@
+
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+PostgreSQL::PGTypes - PostgreSQL backend type OID constants
+
+=head1 SYNOPSIS
+
+  use PostgreSQL::PGTypes;
+
+  if ($type_oid == TEXTOID) { ... }
+
+  if ($type_oid == INT4ARRAYOID) { ... }
+
+=head1 DESCRIPTION
+
+This module provides constants for PostgreSQL backend type OIDs, as defined
+in C<src/include/catalog/pg_type_d.h>. These can be used to identify column
+types in query results via C<PQftype()>.
+
+All constants are exported by default.
+
+=head1 EXPORTED CONSTANTS
+
+=head2 Basic Types
+
+C<BOOLOID>, C<BYTEAOID>, C<CHAROID>, C<NAMEOID>, C<INT8OID>, C<INT2OID>,
+C<INT2VECTOROID>, C<INT4OID>, C<TEXTOID>, C<OIDOID>, C<TIDOID>, C<XIDOID>,
+C<CIDOID>, C<OIDVECTOROID>, C<JSONOID>, C<XMLOID>, C<XID8OID>, C<POINTOID>,
+C<LSEGOID>, C<PATHOID>, C<BOXOID>, C<POLYGONOID>, C<LINEOID>, C<FLOAT4OID>,
+C<FLOAT8OID>, C<UNKNOWNOID>, C<CIRCLEOID>, C<MONEYOID>, C<MACADDROID>,
+C<INETOID>, C<CIDROID>, C<MACADDR8OID>, C<ACLITEMOID>, C<BPCHAROID>,
+C<VARCHAROID>, C<DATEOID>, C<TIMEOID>, C<TIMESTAMPOID>, C<TIMESTAMPTZOID>,
+C<INTERVALOID>, C<TIMETZOID>, C<BITOID>, C<VARBITOID>, C<NUMERICOID>,
+C<REFCURSOROID>, C<UUIDOID>, C<TSVECTOROID>, C<GTSVECTOROID>, C<TSQUERYOID>,
+C<JSONBOID>, C<JSONPATHOID>, C<TXID_SNAPSHOTOID>
+
+=head2 Range Types
+
+C<INT4RANGEOID>, C<NUMRANGEOID>, C<TSRANGEOID>, C<TSTZRANGEOID>,
+C<DATERANGEOID>, C<INT8RANGEOID>
+
+=head2 Multirange Types
+
+C<INT4MULTIRANGEOID>, C<NUMMULTIRANGEOID>, C<TSMULTIRANGEOID>,
+C<TSTZMULTIRANGEOID>, C<DATEMULTIRANGEOID>, C<INT8MULTIRANGEOID>
+
+=head2 Pseudo Types
+
+C<RECORDOID>, C<RECORDARRAYOID>, C<CSTRINGOID>, C<VOIDOID>, C<TRIGGEROID>,
+C<EVENT_TRIGGEROID>
+
+=head2 Array Types
+
+Array type OIDs follow the pattern C<{BASENAME}ARRAYOID>, e.g., C<TEXTARRAYOID>,
+C<INT4ARRAYOID>, C<JSONBARRAYOID>.
+
+=cut
+
+package PostgreSQL::PGTypes;
+
+use strict;
+use warnings FATAL => qw(all);
+
+use Exporter qw(import);
+
+our @EXPORT = qw(
+
+  BOOLOID
+  BYTEAOID
+  CHAROID
+  NAMEOID
+  INT8OID
+  INT2OID
+  INT2VECTOROID
+  INT4OID
+  TEXTOID
+  OIDOID
+  TIDOID
+  XIDOID
+  CIDOID
+  OIDVECTOROID
+  JSONOID
+  XMLOID
+  XID8OID
+  POINTOID
+  LSEGOID
+  PATHOID
+  BOXOID
+  POLYGONOID
+  LINEOID
+  FLOAT4OID
+  FLOAT8OID
+  UNKNOWNOID
+  CIRCLEOID
+  MONEYOID
+  MACADDROID
+  INETOID
+  CIDROID
+  MACADDR8OID
+  ACLITEMOID
+  BPCHAROID
+  VARCHAROID
+  DATEOID
+  TIMEOID
+  TIMESTAMPOID
+  TIMESTAMPTZOID
+  INTERVALOID
+  TIMETZOID
+  BITOID
+  VARBITOID
+  NUMERICOID
+  REFCURSOROID
+  UUIDOID
+  TSVECTOROID
+  GTSVECTOROID
+  TSQUERYOID
+  JSONBOID
+  JSONPATHOID
+  TXID_SNAPSHOTOID
+  INT4RANGEOID
+  NUMRANGEOID
+  TSRANGEOID
+  TSTZRANGEOID
+  DATERANGEOID
+  INT8RANGEOID
+  INT4MULTIRANGEOID
+  NUMMULTIRANGEOID
+  TSMULTIRANGEOID
+  TSTZMULTIRANGEOID
+  DATEMULTIRANGEOID
+  INT8MULTIRANGEOID
+  RECORDOID
+  RECORDARRAYOID
+  CSTRINGOID
+  VOIDOID
+  TRIGGEROID
+  EVENT_TRIGGEROID
+
+  BOOLARRAYOID
+  BYTEAARRAYOID
+  CHARARRAYOID
+  NAMEARRAYOID
+  INT8ARRAYOID
+  INT2ARRAYOID
+  INT2VECTORARRAYOID
+  INT4ARRAYOID
+  TEXTARRAYOID
+  OIDARRAYOID
+  TIDARRAYOID
+  XIDARRAYOID
+  CIDARRAYOID
+  OIDVECTORARRAYOID
+  JSONARRAYOID
+  XMLARRAYOID
+  XID8ARRAYOID
+  POINTARRAYOID
+  LSEGARRAYOID
+  PATHARRAYOID
+  BOXARRAYOID
+  POLYGONARRAYOID
+  LINEARRAYOID
+  FLOAT4ARRAYOID
+  FLOAT8ARRAYOID
+  CIRCLEARRAYOID
+  MONEYARRAYOID
+  MACADDRARRAYOID
+  INETARRAYOID
+  CIDRARRAYOID
+  MACADDR8ARRAYOID
+  ACLITEMARRAYOID
+  BPCHARARRAYOID
+  VARCHARARRAYOID
+  DATEARRAYOID
+  TIMEARRAYOID
+  TIMESTAMPARRAYOID
+  TIMESTAMPTZARRAYOID
+  INTERVALARRAYOID
+  TIMETZARRAYOID
+  BITARRAYOID
+  VARBITARRAYOID
+  NUMERICARRAYOID
+  REFCURSORARRAYOID
+  UUIDARRAYOID
+  TSVECTORARRAYOID
+  GTSVECTORARRAYOID
+  TSQUERYARRAYOID
+  JSONBARRAYOID
+  JSONPATHARRAYOID
+  TXID_SNAPSHOTARRAYOID
+  INT4RANGEARRAYOID
+  NUMRANGEARRAYOID
+  TSRANGEARRAYOID
+  TSTZRANGEARRAYOID
+  DATERANGEARRAYOID
+  INT8RANGEARRAYOID
+  INT4MULTIRANGEARRAYOID
+  NUMMULTIRANGEARRAYOID
+  TSMULTIRANGEARRAYOID
+  TSTZMULTIRANGEARRAYOID
+  DATEMULTIRANGEARRAYOID
+  INT8MULTIRANGEARRAYOID
+  CSTRINGARRAYOID
+
+);
+
+use constant {
+	BOOLOID                 => 16,
+	BYTEAOID                => 17,
+	CHAROID                 => 18,
+	NAMEOID                 => 19,
+	INT8OID                 => 20,
+	INT2OID                 => 21,
+	INT2VECTOROID           => 22,
+	INT4OID                 => 23,
+	TEXTOID                 => 25,
+	OIDOID                  => 26,
+	TIDOID                  => 27,
+	XIDOID                  => 28,
+	CIDOID                  => 29,
+	OIDVECTOROID            => 30,
+	JSONOID                 => 114,
+	XMLOID                  => 142,
+	XID8OID                 => 5069,
+	POINTOID                => 600,
+	LSEGOID                 => 601,
+	PATHOID                 => 602,
+	BOXOID                  => 603,
+	POLYGONOID              => 604,
+	LINEOID                 => 628,
+	FLOAT4OID               => 700,
+	FLOAT8OID               => 701,
+	UNKNOWNOID              => 705,
+	CIRCLEOID               => 718,
+	MONEYOID                => 790,
+	MACADDROID              => 829,
+	INETOID                 => 869,
+	CIDROID                 => 650,
+	MACADDR8OID             => 774,
+	ACLITEMOID              => 1033,
+	BPCHAROID               => 1042,
+	VARCHAROID              => 1043,
+	DATEOID                 => 1082,
+	TIMEOID                 => 1083,
+	TIMESTAMPOID            => 1114,
+	TIMESTAMPTZOID          => 1184,
+	INTERVALOID             => 1186,
+	TIMETZOID               => 1266,
+	BITOID                  => 1560,
+	VARBITOID               => 1562,
+	NUMERICOID              => 1700,
+	REFCURSOROID            => 1790,
+	UUIDOID                 => 2950,
+	TSVECTOROID             => 3614,
+	GTSVECTOROID            => 3642,
+	TSQUERYOID              => 3615,
+	JSONBOID                => 3802,
+	JSONPATHOID             => 4072,
+	TXID_SNAPSHOTOID        => 2970,
+	INT4RANGEOID            => 3904,
+	NUMRANGEOID             => 3906,
+	TSRANGEOID              => 3908,
+	TSTZRANGEOID            => 3910,
+	DATERANGEOID            => 3912,
+	INT8RANGEOID            => 3926,
+	INT4MULTIRANGEOID       => 4451,
+	NUMMULTIRANGEOID        => 4532,
+	TSMULTIRANGEOID         => 4533,
+	TSTZMULTIRANGEOID       => 4534,
+	DATEMULTIRANGEOID       => 4535,
+	INT8MULTIRANGEOID       => 4536,
+	RECORDOID               => 2249,
+	RECORDARRAYOID          => 2287,
+	CSTRINGOID              => 2275,
+	VOIDOID                 => 2278,
+	TRIGGEROID              => 2279,
+	EVENT_TRIGGEROID        => 3838,
+
+	BOOLARRAYOID            => 1000,
+	BYTEAARRAYOID           => 1001,
+	CHARARRAYOID            => 1002,
+	NAMEARRAYOID            => 1003,
+	INT8ARRAYOID            => 1016,
+	INT2ARRAYOID            => 1005,
+	INT2VECTORARRAYOID      => 1006,
+	INT4ARRAYOID            => 1007,
+	TEXTARRAYOID            => 1009,
+	OIDARRAYOID             => 1028,
+	TIDARRAYOID             => 1010,
+	XIDARRAYOID             => 1011,
+	CIDARRAYOID             => 1012,
+	OIDVECTORARRAYOID       => 1013,
+	JSONARRAYOID            => 199,
+	XMLARRAYOID             => 143,
+	XID8ARRAYOID            => 271,
+	POINTARRAYOID           => 1017,
+	LSEGARRAYOID            => 1018,
+	PATHARRAYOID            => 1019,
+	BOXARRAYOID             => 1020,
+	POLYGONARRAYOID         => 1027,
+	LINEARRAYOID            => 629,
+	FLOAT4ARRAYOID          => 1021,
+	FLOAT8ARRAYOID          => 1022,
+	CIRCLEARRAYOID          => 719,
+	MONEYARRAYOID           => 791,
+	MACADDRARRAYOID         => 1040,
+	INETARRAYOID            => 1041,
+	CIDRARRAYOID            => 651,
+	MACADDR8ARRAYOID        => 775,
+	ACLITEMARRAYOID         => 1034,
+	BPCHARARRAYOID          => 1014,
+	VARCHARARRAYOID         => 1015,
+	DATEARRAYOID            => 1182,
+	TIMEARRAYOID            => 1183,
+	TIMESTAMPARRAYOID       => 1115,
+	TIMESTAMPTZARRAYOID     => 1185,
+	INTERVALARRAYOID        => 1187,
+	TIMETZARRAYOID          => 1270,
+	BITARRAYOID             => 1561,
+	VARBITARRAYOID          => 1563,
+	NUMERICARRAYOID         => 1231,
+	REFCURSORARRAYOID       => 2201,
+	UUIDARRAYOID            => 2951,
+	TSVECTORARRAYOID        => 3643,
+	GTSVECTORARRAYOID       => 3644,
+	TSQUERYARRAYOID         => 3645,
+	JSONBARRAYOID           => 3807,
+	JSONPATHARRAYOID        => 4073,
+	TXID_SNAPSHOTARRAYOID   => 2949,
+	INT4RANGEARRAYOID       => 3905,
+	NUMRANGEARRAYOID        => 3907,
+	TSRANGEARRAYOID         => 3909,
+	TSTZRANGEARRAYOID       => 3911,
+	DATERANGEARRAYOID       => 3913,
+	INT8RANGEARRAYOID       => 3927,
+	INT4MULTIRANGEARRAYOID  => 6150,
+	NUMMULTIRANGEARRAYOID   => 6151,
+	TSMULTIRANGEARRAYOID    => 6152,
+	TSTZMULTIRANGEARRAYOID  => 6153,
+	DATEMULTIRANGEARRAYOID  => 6155,
+	INT8MULTIRANGEARRAYOID  => 6157,
+	CSTRINGARRAYOID         => 1263,
+};
+
+=pod
+
+=head1 SEE ALSO
+
+L<PostgreSQL::PqFFI>, L<PostgreSQL::Test::Session>
+
+=cut
+
+1;
diff --git a/src/test/perl/PostgreSQL/PqConstants.pm b/src/test/perl/PostgreSQL/PqConstants.pm
new file mode 100644
index 00000000000..f81913b580a
--- /dev/null
+++ b/src/test/perl/PostgreSQL/PqConstants.pm
@@ -0,0 +1,185 @@
+
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+PostgreSQL::PqConstants - libpq constants for PostgreSQL TAP tests
+
+=head1 SYNOPSIS
+
+  use PostgreSQL::PqConstants;
+
+  if (PQstatus($conn) == CONNECTION_OK) { ... }
+
+  if (PQresultStatus($result) == PGRES_TUPLES_OK) { ... }
+
+=head1 DESCRIPTION
+
+This module provides libpq constants used by the FFI backend
+(C<PostgreSQL::PqFFI>).  All constants are exported by default.
+
+=head1 EXPORTED CONSTANTS
+
+=head2 Connection Status (ConnStatusType)
+
+C<CONNECTION_OK>, C<CONNECTION_BAD>, C<CONNECTION_STARTED>,
+C<CONNECTION_MADE>, C<CONNECTION_AWAITING_RESPONSE>, C<CONNECTION_AUTH_OK>,
+C<CONNECTION_SETENV>, C<CONNECTION_SSL_STARTUP>, C<CONNECTION_NEEDED>,
+C<CONNECTION_CHECK_WRITABLE>, C<CONNECTION_CONSUME>, C<CONNECTION_GSS_STARTUP>,
+C<CONNECTION_CHECK_TARGET>, C<CONNECTION_CHECK_STANDBY>, C<CONNECTION_ALLOCATED>
+
+=head2 Execution Status (ExecStatusType)
+
+C<PGRES_EMPTY_QUERY>, C<PGRES_COMMAND_OK>, C<PGRES_TUPLES_OK>,
+C<PGRES_COPY_OUT>, C<PGRES_COPY_IN>, C<PGRES_BAD_RESPONSE>,
+C<PGRES_NONFATAL_ERROR>, C<PGRES_FATAL_ERROR>, C<PGRES_COPY_BOTH>,
+C<PGRES_SINGLE_TUPLE>, C<PGRES_PIPELINE_SYNC>, C<PGRES_PIPELINE_ABORTED>,
+C<PGRES_TUPLES_CHUNK>
+
+=head2 Polling Status (PostgresPollingStatusType)
+
+C<PGRES_POLLING_FAILED>, C<PGRES_POLLING_READING>, C<PGRES_POLLING_WRITING>,
+C<PGRES_POLLING_OK>, C<PGRES_POLLING_ACTIVE>
+
+=head2 Ping Status (PGPing)
+
+C<PQPING_OK>, C<PQPING_REJECT>, C<PQPING_NO_RESPONSE>, C<PQPING_NO_ATTEMPT>
+
+=head2 Transaction Status (PGTransactionStatusType)
+
+C<PQTRANS_IDLE>, C<PQTRANS_ACTIVE>, C<PQTRANS_INTRANS>, C<PQTRANS_INERROR>,
+C<PQTRANS_UNKNOWN>
+
+=cut
+
+package PostgreSQL::PqConstants;
+
+use strict;
+use warnings FATAL => qw(all);
+
+use Exporter qw(import);
+
+our @EXPORT = qw(
+
+  CONNECTION_OK
+  CONNECTION_BAD
+  CONNECTION_STARTED
+  CONNECTION_MADE
+  CONNECTION_AWAITING_RESPONSE
+  CONNECTION_AUTH_OK
+  CONNECTION_SETENV
+  CONNECTION_SSL_STARTUP
+  CONNECTION_NEEDED
+  CONNECTION_CHECK_WRITABLE
+  CONNECTION_CONSUME
+  CONNECTION_GSS_STARTUP
+  CONNECTION_CHECK_TARGET
+  CONNECTION_CHECK_STANDBY
+  CONNECTION_ALLOCATED
+
+  PGRES_EMPTY_QUERY
+  PGRES_COMMAND_OK
+  PGRES_TUPLES_OK
+  PGRES_COPY_OUT
+  PGRES_COPY_IN
+  PGRES_BAD_RESPONSE
+  PGRES_NONFATAL_ERROR
+  PGRES_FATAL_ERROR
+  PGRES_COPY_BOTH
+  PGRES_SINGLE_TUPLE
+  PGRES_PIPELINE_SYNC
+  PGRES_PIPELINE_ABORTED
+  PGRES_TUPLES_CHUNK
+
+  PGRES_POLLING_FAILED
+  PGRES_POLLING_READING
+  PGRES_POLLING_WRITING
+  PGRES_POLLING_OK
+  PGRES_POLLING_ACTIVE
+
+  PQPING_OK
+  PQPING_REJECT
+  PQPING_NO_RESPONSE
+  PQPING_NO_ATTEMPT
+
+  PQTRANS_IDLE
+  PQTRANS_ACTIVE
+  PQTRANS_INTRANS
+  PQTRANS_INERROR
+  PQTRANS_UNKNOWN
+
+);
+
+# ConnStatusType:
+use constant {
+	CONNECTION_OK                 => 0,
+	CONNECTION_BAD                => 1,
+	CONNECTION_STARTED            => 2,
+	CONNECTION_MADE               => 3,
+	CONNECTION_AWAITING_RESPONSE  => 4,
+	CONNECTION_AUTH_OK            => 5,
+	CONNECTION_SETENV             => 6,
+	CONNECTION_SSL_STARTUP        => 7,
+	CONNECTION_NEEDED             => 8,
+	CONNECTION_CHECK_WRITABLE     => 9,
+	CONNECTION_CONSUME            => 10,
+	CONNECTION_GSS_STARTUP        => 11,
+	CONNECTION_CHECK_TARGET       => 12,
+	CONNECTION_CHECK_STANDBY      => 13,
+	CONNECTION_ALLOCATED          => 14,
+};
+
+# ExecStatusType:
+use constant {
+	PGRES_EMPTY_QUERY     => 0,
+	PGRES_COMMAND_OK      => 1,
+	PGRES_TUPLES_OK       => 2,
+	PGRES_COPY_OUT        => 3,
+	PGRES_COPY_IN         => 4,
+	PGRES_BAD_RESPONSE    => 5,
+	PGRES_NONFATAL_ERROR  => 6,
+	PGRES_FATAL_ERROR     => 7,
+	PGRES_COPY_BOTH       => 8,
+	PGRES_SINGLE_TUPLE    => 9,
+	PGRES_PIPELINE_SYNC   => 10,
+	PGRES_PIPELINE_ABORTED => 11,
+	PGRES_TUPLES_CHUNK    => 12,
+};
+
+# PostgresPollingStatusType:
+use constant {
+	PGRES_POLLING_FAILED  => 0,
+	PGRES_POLLING_READING => 1,
+	PGRES_POLLING_WRITING => 2,
+	PGRES_POLLING_OK      => 3,
+	PGRES_POLLING_ACTIVE  => 4,
+};
+
+# PGPing:
+use constant {
+	PQPING_OK          => 0,
+	PQPING_REJECT      => 1,
+	PQPING_NO_RESPONSE => 2,
+	PQPING_NO_ATTEMPT  => 3,
+};
+
+# PGTransactionStatusType:
+use constant {
+	PQTRANS_IDLE    => 0,
+	PQTRANS_ACTIVE  => 1,
+	PQTRANS_INTRANS => 2,
+	PQTRANS_INERROR => 3,
+	PQTRANS_UNKNOWN => 4,
+};
+
+=pod
+
+=head1 SEE ALSO
+
+L<PostgreSQL::PqFFI>, L<PostgreSQL::Test::Session>
+
+=cut
+
+1;
diff --git a/src/test/perl/PostgreSQL/PqFFI.pm b/src/test/perl/PostgreSQL/PqFFI.pm
new file mode 100644
index 00000000000..f16470c46d9
--- /dev/null
+++ b/src/test/perl/PostgreSQL/PqFFI.pm
@@ -0,0 +1,437 @@
+
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+PostgreSQL::PqFFI - FFI wrapper for libpq
+
+=head1 SYNOPSIS
+
+  use PostgreSQL::PqFFI;
+
+  # Initialize the FFI bindings (required before use)
+  PostgreSQL::PqFFI::setup($libdir);
+
+  # Connect to database
+  my $conn = PQconnectdb("dbname=postgres");
+  die PQerrorMessage($conn) unless PQstatus($conn) == CONNECTION_OK;
+
+  # Execute query
+  my $result = PQexec($conn, "SELECT 1");
+  if (PQresultStatus($result) == PGRES_TUPLES_OK) {
+      print PQgetvalue($result, 0, 0), "\n";
+  }
+  PQclear($result);
+
+  PQfinish($conn);
+
+=head1 DESCRIPTION
+
+This module provides Perl bindings to libpq using L<FFI::Platypus>.
+It is the backend used by L<PostgreSQL::Test::Session>.
+
+The module must be initialized by calling C<setup()> before any libpq
+functions can be used.
+
+=head1 FUNCTIONS
+
+=head2 setup($libdir [, $use_system_path])
+
+Initialize the FFI bindings. C<$libdir> specifies where to find libpq.
+If C<$use_system_path> is false, only C<$libdir> is searched.
+
+=head2 libpq Functions
+
+All standard libpq functions are exported. See the PostgreSQL libpq
+documentation for details. Commonly used functions include:
+
+B<Connection:> C<PQconnectdb>, C<PQconnectStart>, C<PQconnectPoll>,
+C<PQfinish>, C<PQstatus>, C<PQerrorMessage>
+
+B<Execution:> C<PQexec>, C<PQexecParams>, C<PQprepare>, C<PQexecPrepared>,
+C<PQsendQuery>, C<PQgetResult>, C<PQclear>
+
+B<Results:> C<PQresultStatus>, C<PQntuples>, C<PQnfields>, C<PQfname>,
+C<PQftype>, C<PQgetvalue>, C<PQgetisnull>
+
+B<Pipeline:> C<PQenterPipelineMode>, C<PQexitPipelineMode>,
+C<PQpipelineSync>, C<PQsendQueryParams>
+
+B<Notifications:> C<PQnotifies>, C<PQnotify_channel>, C<PQnotify_payload>,
+C<PQnotify_be_pid>, C<PQnotify_free>
+
+B<Notice Processing:> C<PQsetNoticeProcessor>, C<create_notice_processor>
+
+=head2 create_notice_processor($callback)
+
+Creates a notice processor closure that can be passed to
+C<PQsetNoticeProcessor>. The callback receives C<($arg, $message)>.
+The caller must keep a reference to the returned closure to prevent
+garbage collection.
+
+=head1 EXPORTED CONSTANTS
+
+This module re-exports all constants from L<PostgreSQL::PqConstants>
+and L<PostgreSQL::PGTypes>.
+
+=cut
+
+package PostgreSQL::PqFFI;
+
+use strict;
+use warnings FATAL => qw(all);
+
+use FFI::Platypus;
+use FFI::Platypus::Record;
+use PostgreSQL::FindLib;
+
+# PGnotify struct for notification support
+# typedef struct pgNotify {
+#     char *relname;    /* notification channel name */
+#     int   be_pid;     /* process ID of notifying server process */
+#     char *extra;      /* notification payload string */
+# } PGnotify;
+package PGnotify {
+	use FFI::Platypus::Record;
+	record_layout_1(
+		'opaque' => 'relname',
+		'sint32' => 'be_pid',
+		'opaque' => 'extra',
+	);
+}
+package PostgreSQL::PqFFI;
+use PostgreSQL::PqConstants;
+use PostgreSQL::PGTypes;
+
+use Exporter qw(import);
+
+our @EXPORT = (
+  @PostgreSQL::PqConstants::EXPORT,
+  @PostgreSQL::PGTypes::EXPORT,
+);
+
+
+
+my @procs = qw(
+
+  PQnotifies
+  PQfreemem
+  PQnotify_channel
+  PQnotify_payload
+  PQnotify_be_pid
+  PQnotify_free
+
+  PQconnectdb
+  PQconnectdbParams
+  PQsetdbLogin
+  PQfinish
+  PQreset
+  PQdb
+  PQuser
+  PQpass
+  PQhost
+  PQhostaddr
+  PQport
+  PQtty
+  PQoptions
+  PQstatus
+  PQtransactionStatus
+  PQparameterStatus
+  PQping
+  PQpingParams
+
+  PQexec
+  PQexecParams
+  PQprepare
+  PQexecPrepared
+
+  PQdescribePrepared
+  PQdescribePortal
+
+  PQclosePrepared
+  PQclosePortal
+  PQclear
+
+  PQsendQuery
+  PQgetResult
+  PQisBusy
+  PQconsumeInput
+
+  PQprotocolVersion
+  PQserverVersion
+  PQerrorMessage
+  PQsocket
+  PQsocketPoll
+  PQgetCurrentTimeUSec
+  PQbackendPID
+  PQconnectionNeedsPassword
+  PQconnectionUsedPassword
+  PQconnectionUsedGSSAPI
+  PQclientEncoding
+  PQsetClientEncoding
+
+  PQresultStatus
+  PQresStatus
+  PQresultErrorMessage
+  PQresultErrorField
+  PQntuples
+  PQnfields
+  PQbinaryTuples
+  PQfname
+  PQfnumber
+  PQftable
+  PQftablecol
+  PQfformat
+  PQftype
+  PQfsize
+  PQfmod
+  PQcmdStatus
+  PQoidValue
+  PQcmdTuples
+  PQgetvalue
+  PQgetlength
+  PQgetisnull
+  PQnparams
+  PQparamtype
+  PQchangePassword
+
+  PQpipelineStatus
+  PQenterPipelineMode
+  PQexitPipelineMode
+  PQpipelineSync
+  PQsendFlushRequest
+  PQsendPipelineSync
+  PQsendQueryParams
+
+  PQsetnonblocking
+  PQisnonblocking
+  PQflush
+
+  PQconnectStart
+  PQconnectStartParams
+  PQconnectPoll
+
+  PQsetNoticeProcessor
+  create_notice_processor
+
+);
+
+push(@EXPORT, @procs);
+
+sub setup
+{
+	my $libdir = shift;
+	my $use_system_path = shift;
+
+	my $ffi = FFI::Platypus->new(api => 1);
+
+	my @system_path;
+	@system_path = (systempath => []) unless $use_system_path;
+
+	$ffi->type('opaque' => 'PGconn');
+	$ffi->type('opaque' => 'PGresult');
+	$ffi->type('uint32' => 'Oid');
+	$ffi->type('int' => 'ExecStatusType');
+
+	# Register the PGnotify record type for struct access
+	$ffi->type('record(PGnotify)' => 'PGnotify_record');
+
+	my $lib = find_lib_or_die(
+		lib => 'pq',
+		libpath => [$libdir],
+	    @system_path,
+	   );
+	$ffi->lib($lib);
+
+	$ffi->attach('PQconnectdb' => ['string'] => 'PGconn');
+	$ffi->attach(
+		'PQconnectdbParams' => [ 'string[]', 'string[]', 'int' ] => 'PGconn');
+	$ffi->attach(
+		'PQsetdbLogin' => [
+			'string', 'string', 'string', 'string',
+			'string', 'string', 'string',
+		] => 'PGconn');
+	$ffi->attach('PQfinish' => ['PGconn'] => 'void');
+	$ffi->attach('PQreset' => ['PGconn'] => 'void');
+	$ffi->attach('PQdb' => ['PGconn'] => 'string');
+	$ffi->attach('PQuser' => ['PGconn'] => 'string');
+	$ffi->attach('PQpass' => ['PGconn'] => 'string');
+	$ffi->attach('PQhost' => ['PGconn'] => 'string');
+	$ffi->attach('PQhostaddr' => ['PGconn'] => 'string');
+	$ffi->attach('PQport' => ['PGconn'] => 'string');
+	$ffi->attach('PQtty' => ['PGconn'] => 'string');
+	$ffi->attach('PQoptions' => ['PGconn'] => 'string');
+	$ffi->attach('PQstatus' => ['PGconn'] => 'int');
+	$ffi->attach('PQtransactionStatus' => ['PGconn'] => 'int');
+	$ffi->attach('PQparameterStatus' => [ 'PGconn', 'string' ] => 'string');
+	$ffi->attach('PQping' => ['string'] => 'int');
+	$ffi->attach(
+		'PQpingParams' => [ 'string[]', 'string[]', 'int' ] => 'int');
+
+	$ffi->attach('PQprotocolVersion' => ['PGconn'] => 'int');
+	$ffi->attach('PQserverVersion' => ['PGconn'] => 'int');
+	$ffi->attach('PQerrorMessage' => ['PGconn'] => 'string');
+	$ffi->attach('PQsocket' => ['PGconn'] => 'int');
+	$ffi->attach('PQsocketPoll' => ['int', 'int', 'int', 'sint64'] => 'int');
+	$ffi->attach('PQgetCurrentTimeUSec' => [] => 'sint64');
+	$ffi->attach('PQbackendPID' => ['PGconn'] => 'int');
+	$ffi->attach('PQconnectionNeedsPassword' => ['PGconn'] => 'int');
+	$ffi->attach('PQconnectionUsedPassword' => ['PGconn'] => 'int');
+	$ffi->attach('PQconnectionUsedGSSAPI' => ['PGconn'] => 'int');
+	$ffi->attach('PQclientEncoding' => ['PGconn'] => 'int');
+	$ffi->attach('PQsetClientEncoding' => [ 'PGconn', 'string' ] => 'int');
+
+	$ffi->attach('PQexec' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach(
+		'PQexecParams' => [
+			'PGconn', 'string', 'int', 'int[]',
+			'string[]', 'int[]', 'int[]', 'int'
+		] => 'PGresult');
+	$ffi->attach(
+		'PQprepare' => [ 'PGconn', 'string', 'string', 'int', 'int[]' ] =>
+		  'PGresult');
+	$ffi->attach(
+		'PQexecPrepared' => [ 'PGconn', 'string', 'int',
+			'string[]', 'int[]', 'int[]', 'int' ] => 'PGresult');
+
+	$ffi->attach('PQresultStatus' => ['PGresult'] => 'ExecStatusType');
+	$ffi->attach('PQresStatus' => ['ExecStatusType'] => 'string');
+	$ffi->attach('PQresultErrorMessage' => ['PGresult'] => 'string');
+	$ffi->attach('PQresultErrorField' => [ 'PGresult', 'int' ] => 'string');
+	$ffi->attach('PQntuples' => ['PGresult'] => 'int');
+	$ffi->attach('PQnfields' => ['PGresult'] => 'int');
+	$ffi->attach('PQbinaryTuples' => ['PGresult'] => 'int');
+	$ffi->attach('PQfname' => [ 'PGresult', 'int' ] => 'string');
+	$ffi->attach('PQfnumber' => [ 'PGresult', 'string' ] => 'int');
+	$ffi->attach('PQftable' => [ 'PGresult', 'int' ] => 'Oid');
+	$ffi->attach('PQftablecol' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQfformat' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQftype' => [ 'PGresult', 'int' ] => 'Oid');
+	$ffi->attach('PQfsize' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQfmod' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQcmdStatus' => ['PGresult'] => 'string');
+	$ffi->attach('PQoidValue' => ['PGresult'] => 'Oid');
+	$ffi->attach('PQcmdTuples' => ['PGresult'] => 'string');
+	$ffi->attach('PQgetvalue' => [ 'PGresult', 'int', 'int' ] => 'string');
+	$ffi->attach('PQgetlength' => [ 'PGresult', 'int', 'int' ] => 'int');
+	$ffi->attach('PQgetisnull' => [ 'PGresult', 'int', 'int' ] => 'int');
+	$ffi->attach('PQnparams' => ['PGresult'] => 'int');
+	$ffi->attach('PQparamtype' => [ 'PGresult', 'int' ] => 'Oid');
+
+
+	$ffi->attach(
+		'PQdescribePrepared' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach('PQdescribePortal' => [ 'PGconn', 'string' ] => 'PGresult');
+
+	$ffi->attach('PQclosePrepared' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach('PQclosePortal' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach('PQclear' => ['PGresult'] => 'void');
+
+	$ffi->attach('PQconnectStart' => [ 'string' ] => 'PGconn');
+	$ffi->attach(
+		'PQconnectStartParams' => [ 'string[]', 'string[]', 'int' ] => 'PGconn');
+	$ffi->attach('PQconnectPoll' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQresetStart' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQresetPoll' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQsendQuery' => [ 'PGconn',  'string' ] => 'int');
+	$ffi->attach('PQsendQueryParams' => [
+		'PGconn', 'string', 'int', 'Oid*', 'string*',
+		'int*', 'int*', 'int' ] => 'int');
+	$ffi->attach('PQsendPrepare' => [ 'PGconn', 'string', 'string', 'int', 'Oid[]' ] => 'int');
+	$ffi->attach('PQgetResult' => [ 'PGconn' ] => 'PGresult');
+
+	$ffi->attach('PQisBusy' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQconsumeInput' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQchangePassword' => [ 'PGconn', 'string', 'string' ] => 'PGresult');
+
+	$ffi->attach('PQpipelineStatus' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQenterPipelineMode' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQexitPipelineMode' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQpipelineSync' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQsendFlushRequest' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQsendPipelineSync' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQsetnonblocking' => [ 'PGconn', 'int' ] => 'int');
+	$ffi->attach('PQisnonblocking' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQflush' => [ 'PGconn' ] => 'int');
+
+	# Notification support - PQnotifies returns a pointer to PGnotify struct
+	# We return opaque to preserve the original pointer for freeing with PQfreemem
+	$ffi->attach('PQnotifies' => [ 'PGconn' ] => 'opaque');
+	$ffi->attach('PQfreemem' => [ 'opaque' ] => 'void');
+
+	# Notice processor callback support
+	# typedef void (*PQnoticeProcessor)(void *arg, const char *message);
+	$ffi->type('(opaque,string)->void' => 'PQnoticeProcessor');
+	$ffi->attach('PQsetNoticeProcessor' => [ 'PGconn', 'PQnoticeProcessor', 'opaque' ] => 'opaque');
+
+	# Store the $ffi instance for use by helper functions
+	$PostgreSQL::PqFFI::_ffi = $ffi;
+}
+
+# Helper functions to extract values from PGnotify struct
+# The opaque pointer is cast to the record type to access fields
+
+sub _cast_to_pgnotify
+{
+	my $ptr = shift;
+	return undef unless $ptr;
+	return $PostgreSQL::PqFFI::_ffi->cast('opaque', 'record(PGnotify)*', $ptr);
+}
+
+sub PQnotify_channel
+{
+	my $ptr = shift;
+	return undef unless $ptr;
+	my $notify = _cast_to_pgnotify($ptr);
+	my $str_ptr = $notify->relname;
+	return undef unless $str_ptr;
+	return $PostgreSQL::PqFFI::_ffi->cast('opaque', 'string', $str_ptr);
+}
+
+sub PQnotify_payload
+{
+	my $ptr = shift;
+	return undef unless $ptr;
+	my $notify = _cast_to_pgnotify($ptr);
+	my $str_ptr = $notify->extra;
+	return undef unless $str_ptr;
+	return $PostgreSQL::PqFFI::_ffi->cast('opaque', 'string', $str_ptr);
+}
+
+sub PQnotify_be_pid
+{
+	my $ptr = shift;
+	return undef unless $ptr;
+	my $notify = _cast_to_pgnotify($ptr);
+	return $notify->be_pid;
+}
+
+# Free a PGnotify struct using the original pointer
+sub PQnotify_free
+{
+	my $ptr = shift;
+	return unless $ptr;
+	PQfreemem($ptr);
+}
+
+# Create a notice processor closure that can be passed to PQsetNoticeProcessor.
+# The callback will be invoked with (arg, message) where arg is opaque and message is string.
+# Returns the closure - caller must keep a reference to prevent garbage collection.
+sub create_notice_processor
+{
+	my $callback = shift;  # sub { my ($arg, $message) = @_; ... }
+	return $PostgreSQL::PqFFI::_ffi->closure($callback);
+}
+
+=pod
+
+=head1 SEE ALSO
+
+L<PostgreSQL::Test::Session>,
+L<PostgreSQL::PqConstants>, L<PostgreSQL::PGTypes>, L<FFI::Platypus>
+
+=cut
+
+1;
diff --git a/src/test/perl/PostgreSQL/Test/Session.pm b/src/test/perl/PostgreSQL/Test/Session.pm
new file mode 100644
index 00000000000..5a674bd2371
--- /dev/null
+++ b/src/test/perl/PostgreSQL/Test/Session.pm
@@ -0,0 +1,1075 @@
+
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+PostgreSQL::Test::Session - class for a PostgreSQL libpq session
+
+=head1 SYNOPSIS
+
+  use PostgreSQL::Test::Session;
+
+  use PostgreSQL::Test::Cluster;
+
+  my $node = PostgreSQL::Test::Cluster->new('mynode');
+
+  # create a new session. defult dbname is 'postgres'
+  my $session = PostgreSQL::Test::Session->new(node => $node
+                                               [, dbname => $dbname] );
+
+  # close the session
+  $session->close;
+
+  # reopen the session, after closing it if not closed
+  $session->reconnect;
+
+  # check if the session is ok
+  # my $status = $session->conn_status;
+
+  # run some SQL, not producing tuples
+  my $result = $session->do($sql, ...);
+
+  # run an SQL statement asynchronously
+  my $result = $session->do_async($sql);
+
+  # wait for and async SQL to complete
+  $session->wait_for_completion;
+
+  # set a password for a user
+  my $result = $session->set_password($user, $password);
+
+  # get some data
+  my $result = $session->query($sql);
+
+  # get a single value, default croaks if no value found
+  my $val = $session->query_oneval($sql [, $missing_ok ]);
+
+  #return lines of tuples like "psql -A -t"
+  my @lines = $session->query_tuples($sql, ...);
+
+=head1 DESCRIPTION
+
+C<PostgreSQL::Test::Session> encapsulates a C<libpq> session for use in
+PostgreSQL TAP tests, allowing the test to connect without having to spawn
+C<psql> in a child process.
+
+The session object is automatically closed when the object goes out of scope,
+including at script end.
+
+Several methods return a hashref as a result, which will have the following
+fields:
+
+=over
+
+=item * status
+
+=item * error_message (only if there is an error)
+
+=item * names
+
+=item * types
+
+=item * rows
+
+=item * psqlout
+
+=back
+
+The last 4 will be empty unless the SQL produces tuples.
+
+=cut
+
+package PostgreSQL::Test::Session;
+
+use strict;
+use warnings FATAL => 'all';
+
+use Carp;
+
+my $setup_ok;
+
+BEGIN
+{
+	# Use the FFI libpq wrapper.  This will fail if the FFI libraries are not
+	# available.
+	#
+	# The actual setup is done per session, because we get the libdir from
+	# the node object (in most cases).
+	require PostgreSQL::PqFFI;
+	PostgreSQL::PqFFI->import;
+}
+
+sub _setup
+{
+	return if $setup_ok;
+	my $libdir = shift;
+	PostgreSQL::PqFFI::setup($libdir);
+	$setup_ok = 1;
+}
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item PostgreSQL::Test::Session->new(node=> $node [, dbname=> $dbname ])
+
+Set up a new session for the node, which must be a C<PostgreSQL::Test::Cluster>
+instance. The default dbame is C<postgres>.
+
+=item PostgreSQL::Test::Session->new(connstr => $connstr [, libdir => $libdir])
+
+Set up a new session for the connection string. If using the FFI libpq wrapper,
+C<$libdir> must point to the directory where the libpq library is installed.
+
+=cut
+
+sub new
+{
+	my $class = shift;
+	my $self = {};
+	bless $self, $class;
+	my %args = @_;
+	my $node = $args{node};
+	my $dbname = $args{dbname} || 'postgres';
+	my $libdir = $args{libdir};
+	my $connstr = $args{connstr};
+	my $wait = $args{wait} // 1;
+	unless ($setup_ok)
+	{
+		unless ($libdir)
+		{
+			croak "bad node" unless $node->isa("PostgreSQL::Test::Cluster");
+			$libdir = $node->config_data($^O eq 'MSWin32' ? '--bindir' : '--libdir');
+		}
+		_setup($libdir);
+	}
+	unless ($connstr)
+	{
+		croak "bad node" unless $node->isa("PostgreSQL::Test::Cluster");
+		$connstr = $node->connstr($dbname);
+	}
+	$self->{connstr} = $connstr;
+	$self->{notices} = [];
+
+	if ($wait)
+	{
+		$self->{conn} = PQconnectdb($connstr);
+		# The destructor will clean up for us even if we fail
+		return undef unless PQstatus($self->{conn}) == CONNECTION_OK;
+		$self->_setup_notice_processor();
+		return $self;
+	}
+	else
+	{
+		$self->{conn} = PQconnectStart($connstr);
+		return PQstatus($self->{conn}) != CONNECTION_BAD ? $self : undef;
+	}
+}
+
+# Set up a notice processor to capture notices/warnings
+sub _setup_notice_processor
+{
+	my $self = shift;
+
+	# Only available with FFI backend
+	return unless defined &create_notice_processor;
+
+	my $notices = $self->{notices};
+
+	# Create closure that captures notices into our array
+	$self->{_notice_closure} = create_notice_processor(sub {
+		my ($arg, $message) = @_;
+		push @$notices, $message;
+	});
+
+	PQsetNoticeProcessor($self->{conn}, $self->{_notice_closure}, undef);
+}
+
+
+# for a connection started with PQconnectStart, wait until it is in CONNECTION_OK state.
+# Uses PQconnectPoll to drive the async connection forward.
+sub wait_connect
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	my $timeout = $PostgreSQL::Test::Utils::timeout_default;
+	my $start = time;
+	while (1)
+	{
+		my $poll_res = PQconnectPoll($conn);
+		my $status = PQstatus($conn);
+
+		# Connection is complete
+		if ($poll_res == PGRES_POLLING_OK || $status == CONNECTION_OK)
+		{
+			$self->_setup_notice_processor();
+			return;
+		}
+
+		# Connection failed
+		if ($poll_res == PGRES_POLLING_FAILED || $status == CONNECTION_BAD)
+		{
+			die "connection failed: " . PQerrorMessage($conn);
+		}
+
+		die "timed out waiting for connection" if time - $start > $timeout;
+
+		# Wait on socket based on what PQconnectPoll needs
+		my $socket = PQsocket($conn);
+		if ($socket >= 0)
+		{
+			my $forRead = ($poll_res == PGRES_POLLING_READING) ? 1 : 0;
+			my $forWrite = ($poll_res == PGRES_POLLING_WRITING) ? 1 : 0;
+			my $end_time = PQgetCurrentTimeUSec() + 1_000_000;  # 1 second
+			PQsocketPoll($socket, $forRead, $forWrite, $end_time);
+		}
+	}
+}
+
+# Single step of async connection polling - drives the connection state machine
+# forward without blocking. Returns the poll result (PGRES_POLLING_* constant).
+sub poll_connect
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	return PQconnectPoll($conn);
+}
+
+=pod
+
+=item $session->close()
+
+Close the connection
+
+=cut
+
+sub close
+{
+	my $self = shift;
+	PQfinish($self->{conn});
+	delete $self->{conn};
+}
+
+# Alias for compatibility with BackgroundPsql
+*quit = \&close;
+
+# close the session if the object goes out of scope
+sub DESTROY
+{
+	my $self = shift;
+
+	# During global destruction the FFI::Platypus bindings (and any notice
+	# processor closure registered with libpq) may already have been torn
+	# down in an unpredictable order.  Calling PQfinish() then can invoke a
+	# freed notice callback or a freed FFI thunk, dying with "Can't use an
+	# undefined value as a subroutine reference".  The process is exiting
+	# anyway, so just let the OS reclaim the socket.
+	return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
+
+	$self->close if exists $self->{conn};
+}
+
+=pod
+
+=item $session->reconnect()
+
+Reopen the session using the original connstr. If the session is still open,
+close it before reopening.
+
+=cut
+
+sub reconnect
+{
+	my $self = shift;
+	$self->close if exists $self->{conn};
+	$self->{conn} = PQconnectdb($self->{connstr});
+	my $status = PQstatus($self->{conn});
+	if ($status == CONNECTION_OK)
+	{
+		$self->_setup_notice_processor();
+	}
+	return $status;
+}
+
+=pod
+
+=item $session->reconnect_and_clear()
+
+Reconnect and clear all captured notices. Returns the connection status.
+
+=cut
+
+sub reconnect_and_clear
+{
+	my $self = shift;
+	my $status = $self->reconnect();
+	$self->clear_notices();
+	return $status;
+}
+
+=pod
+
+=item $session->get_notices()
+
+Return an arrayref of all captured notices/warnings since the last clear.
+
+=cut
+
+sub get_notices
+{
+	my $self = shift;
+	return $self->{notices};
+}
+
+=pod
+
+=item $session->get_notices_str()
+
+Return all captured notices/warnings as a single string (joined by empty string).
+This is similar to how BackgroundPsql's {stderr} field works.
+
+=cut
+
+sub get_notices_str
+{
+	my $self = shift;
+	return join('', @{$self->{notices}});
+}
+
+=pod
+
+=item $session->clear_notices()
+
+Clear all captured notices/warnings.
+
+=cut
+
+sub clear_notices
+{
+	my $self = shift;
+	# Clear in place - don't replace the array, as the notice processor
+	# closure has a reference to it
+	@{$self->{notices}} = ();
+}
+
+=pod
+
+=item $session->get_stderr()
+
+Return a stderr-like string containing all notices plus any error message
+from the last query. This mimics BackgroundPsql's {stderr} behavior.
+
+=cut
+
+sub get_stderr
+{
+	my $self = shift;
+	my $stderr = $self->get_notices_str();
+	if (exists $self->{last_error} && defined $self->{last_error})
+	{
+		$stderr .= $self->{last_error};
+	}
+	return $stderr;
+}
+
+=pod
+
+=item $session->clear_stderr()
+
+Clear notices and last error, like setting $psql->{stderr} = ''.
+
+=cut
+
+sub clear_stderr
+{
+	my $self = shift;
+	$self->clear_notices();
+	delete $self->{last_error};
+}
+
+=pod
+
+=item $session->conn_status()
+
+Return the connection status. This will be a libpq status value like
+C<CONNECTION_OK>.
+
+=cut
+
+sub conn_status
+{
+	my $self = shift;
+	return exists $self->{conn} ? PQstatus($self->{conn}) : undef;
+}
+
+=pod
+
+=item $session->backend_pid()
+
+Return the backend process ID for this connection.
+
+=cut
+
+sub backend_pid
+{
+	my $self = shift;
+	return PQbackendPID($self->{conn});
+}
+
+=pod
+
+=item $session->do($sql, ...)
+
+Run one or more SQL statements synchronously (using C<PQexec>). The statements
+should not return any tuples. Returns the status, which will be
+C<PGRES_COMMAND_OK> (i.e. 1) in the case of success.
+
+=cut
+
+sub do
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	my $status;
+	foreach my $sql (@_)
+	{
+		my $result = PQexec($conn, $sql);
+		$status = PQresultStatus($result);
+		PQclear($result);
+		return $status unless $status == PGRES_COMMAND_OK;
+	}
+	return $status;
+}
+
+=pod
+
+=item $session->do_async($sql)
+
+Run a single statement asynchronously, using C<PQsendQuery>. The return value
+is a boolean indicating success.
+
+=cut
+
+sub do_async
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	my $sql = shift;
+	my $result = PQsendQuery($conn, $sql);
+	return $result; # 1 or 0
+}
+
+# get the next resultset from some async commands
+# wait if necessary using PQsocketPoll
+# c.f. libpqsrv_get_result
+sub _get_result
+{
+	my $conn = shift;
+	my $socket = PQsocket($conn);
+	while (PQisBusy($conn))
+	{
+		# Wait for the socket to become readable (no timeout = -1)
+		PQsocketPoll($socket, 1, 0, -1);
+		last if PQconsumeInput($conn) == 0;
+	}
+	return PQgetResult($conn);
+}
+
+=pod
+
+=item $session->wait_for_completion()
+
+Wait until all asynchronous SQL has completed
+
+=cut
+
+sub wait_for_completion
+{
+	# wait for all the resultsets and clear them
+	# c.f. libpqsrv_get_result_last
+	my $self = shift;
+	my $conn = $self->{conn};
+	while (my $res = _get_result($conn))
+	{
+		PQclear($res);
+	}
+}
+
+=pod
+
+=item $session->get_async_result()
+
+Wait for and return the result of an async query as a result hash.
+Clears any subsequent results.
+
+=cut
+
+sub get_async_result
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	my $result = _get_result($conn);
+	return undef unless $result;
+	my $res = _get_result_data($result, $conn);
+	PQclear($result);
+	# Clear any remaining results
+	while (my $r = _get_result($conn))
+	{
+		PQclear($r);
+	}
+	return $res;
+}
+
+=pod
+
+=item $session->wait_for_async_pattern($pattern, $timeout)
+
+Wait for an async query result whose psqlout matches the given regex pattern.
+Returns the matching output string, or dies on timeout/error.
+Default timeout is from $PostgreSQL::Test::Utils::timeout_default.
+
+=cut
+
+sub wait_for_async_pattern
+{
+	my $self = shift;
+	my $pattern = shift;
+	my $timeout = shift // $PostgreSQL::Test::Utils::timeout_default;
+	my $conn = $self->{conn};
+	my $socket = PQsocket($conn);
+	my $start = time;
+
+	while (1)
+	{
+		# Check if result is ready (non-blocking)
+		PQconsumeInput($conn);
+		if (!PQisBusy($conn))
+		{
+			my $result = PQgetResult($conn);
+			if ($result)
+			{
+				my $res = _get_result_data($result, $conn);
+				PQclear($result);
+				# Clear any remaining results
+				while (my $r = PQgetResult($conn))
+				{
+					PQclear($r);
+				}
+				my $output = $res->{psqlout};
+				if ($output =~ $pattern)
+				{
+					return $output;
+				}
+				# If we got a result but it didn't match, return it anyway
+				# (caller may want to check error)
+				return $output;
+			}
+		}
+
+		die "timed out waiting for async result" if time - $start > $timeout;
+
+		# Wait for the socket to become readable, with 1 second timeout
+		# to allow periodic timeout checks
+		my $end_time = PQgetCurrentTimeUSec() + 1_000_000;  # 1 second
+		PQsocketPoll($socket, 1, 0, $end_time);
+	}
+}
+
+=pod
+
+=item $session->try_get_async_result()
+
+Non-blocking check for async query result. Returns result hash if available,
+undef if query is still running.
+
+=cut
+
+sub try_get_async_result
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+
+	PQconsumeInput($conn);
+	return undef if PQisBusy($conn);
+
+	my $result = PQgetResult($conn);
+	return undef unless $result;
+
+	my $res = _get_result_data($result, $conn);
+	PQclear($result);
+	# Clear any remaining results
+	while (my $r = PQgetResult($conn))
+	{
+		PQclear($r);
+	}
+	return $res;
+}
+
+=pod
+
+=item  $session->set_password($user, $password)
+
+Set the user's password by calling C<PQchangePassword>.
+
+Returns a result hash.
+
+=cut
+
+# set password for user
+sub set_password
+{
+	my $self = shift;
+	my $user = shift;
+	my $password = shift;
+	my $conn = $self->{conn};
+	my $result = PQchangePassword($conn, $user, $password);
+	my $ret = _get_result_data($result);
+	PQclear($result);
+	return $ret;
+}
+
+# Common internal routine to process result data.
+# The returned object is dead and will be garbage collected as necessary.
+
+sub _get_result_data
+{
+	my $result = shift;
+	my $conn = shift;
+	my $status = PQresultStatus($result);
+	my $res = {	status => $status, names => [], types => [], rows => [],
+			psqlout => ""};
+	unless ($status == PGRES_TUPLES_OK || $status == PGRES_COMMAND_OK)
+	{
+		$res->{error_message} = PQerrorMessage($conn);
+		return $res;
+	}
+	if ($status == PGRES_COMMAND_OK)
+	{
+		return $res;
+	}
+	my $ntuples = PQntuples($result);
+	my $nfields = PQnfields($result);
+	# assuming here that the strings returned by PQfname and PQgetvalue
+	# are mapped into perl space using setsvpv or similar and thus won't
+	# be affect by us calling PQclear on the result object.
+	foreach my $field (0 .. $nfields-1)
+	{
+		push(@{$res->{names}}, PQfname($result, $field));
+		push(@{$res->{types}}, PQftype($result, $field));
+	}
+	my @textrows;
+	foreach my $nrow (0 .. $ntuples - 1)
+	{
+		my $row = [];
+		foreach my $field ( 0 .. $nfields - 1)
+		{
+			my $val = PQgetvalue($result, $nrow, $field);
+			if (($val // "") eq "")
+			{
+				$val = undef if PQgetisnull($result, $nrow, $field);
+			}
+			push(@$row, $val);
+		}
+		push(@{$res->{rows}}, $row);
+		no warnings qw(uninitialized);
+		push(@textrows, join('|', @$row));
+	}
+	$res->{psqlout} = join("\n",@textrows) if $ntuples;
+	return $res;
+}
+
+
+=pod
+
+=item $session->query($sql)
+
+Runs sql that might return tuples.
+
+Returns a result hash.
+
+=cut
+
+sub query
+{
+	my $self = shift;
+	my $sql = shift;
+	my $conn = $self->{conn};
+
+	# Use PQsendQuery + PQgetResult to handle multi-statement SQL properly.
+	# This collects results from all statements and returns the last one
+	# that had tuples, similar to how psql works.
+	PQsendQuery($conn, $sql) or do {
+		return { status => -1, error_message => PQerrorMessage($conn),
+				 names => [], types => [], rows => [], psqlout => "" };
+	};
+
+	my $final_res;
+	my $last_error;
+	my @all_psqlout;
+
+	while (my $result = _get_result($conn))
+	{
+		my $res = _get_result_data($result, $conn);
+		PQclear($result);
+
+		# Collect output from all statements
+		push @all_psqlout, $res->{psqlout} if $res->{psqlout} ne "";
+
+		# Track errors
+		$last_error = $res->{error_message} if exists $res->{error_message};
+
+		# Keep the last result that had tuples, or the last result overall
+		if ($res->{status} == PGRES_TUPLES_OK || !defined $final_res)
+		{
+			$final_res = $res;
+		}
+	}
+
+	$final_res //= { status => PGRES_COMMAND_OK, names => [], types => [],
+					 rows => [], psqlout => "" };
+
+	# Combine all output (like psql does)
+	$final_res->{psqlout} = join("\n", @all_psqlout) if @all_psqlout;
+
+	# If there was any error, include it in the result for query_safe
+	$final_res->{error_message} = $last_error if defined $last_error;
+
+	# Store error for get_stderr()
+	$self->{last_error} = $last_error;
+
+	# If we're in an aborted transaction state, roll it back to clean up.
+	# This mimics psql's behavior with on_error_stop => 0.
+	if (PQtransactionStatus($conn) == PQTRANS_INERROR)
+	{
+		my $rb = PQexec($conn, "ROLLBACK");
+		PQclear($rb) if $rb;
+	}
+
+	return $final_res;
+}
+
+=pod
+
+=item $session->query_safe($sql)
+
+Runs sql that might return tuples, croaking if there's an error.
+Returns the psqlout string (like psql -At output) on success.
+
+=cut
+
+sub query_safe
+{
+	my $self = shift;
+	my $sql = shift;
+	my $res = $self->query($sql);
+	if (exists $res->{error_message}) {
+		# Debug: show where the error occurred
+		my $short_sql = substr($sql, 0, 100);
+		$short_sql =~ s/\s+/ /g;
+		croak "query_safe failed on [$short_sql...]: $res->{error_message}";
+	}
+	return $res->{psqlout};
+}
+
+=pod
+
+=item $session->query_oneval($sql [, $missing_ok ] )
+
+Run a query that is expected to return no more than one tuple with one value;
+
+If C<$missing_ok> is true, return undef if the query returns no tuple. Otherwise
+croak if there is not exactly one tuple, or of the tuple does not have
+exctly one value.
+
+If none of these apply, return the single value from the query. A NULL value
+will result in undef, so if C<$missing_ok> is true you won't be able to
+distinguish between a null value and a missing tuple.
+
+A non NULL value is returned as the string value obtained from C<PQgetvalue>.
+
+=cut
+
+sub query_oneval
+{
+	my $self = shift;
+	my $sql = shift;
+	my $missing_ok = shift; # default is not ok
+	my $conn = $self->{conn};
+	my $result = PQexec($conn, $sql);
+	my $status = PQresultStatus($result);
+	unless  ($status == PGRES_TUPLES_OK)
+	{
+		PQclear($result) if $result;
+		croak PQerrorMessage($conn);
+	}
+	my $ntuples = PQntuples($result);
+	return undef if ($missing_ok && !$ntuples);
+	my $nfields = PQnfields($result);
+	croak "$ntuples tuples != 1 or $nfields fields != 1"
+	  if $ntuples != 1 || $nfields != 1;
+	my $val = PQgetvalue($result, 0, 0);
+	if ($val eq "")
+	{
+		$val = undef if PQgetisnull($result, 0, 0);
+	}
+	PQclear($result);
+	return $val;
+}
+
+=pod
+
+=item $session->query_tuples($sql, ...)
+
+Run the sql commands and return the output as a single piece of text in the
+same format as C<psql -A -t>.
+
+Fields within tuples are separated by a "|", tuples are spearated by "\n"
+
+=cut
+
+sub query_tuples
+{
+	my $self = shift;
+	# Use pipelined version for 4+ queries where the overhead is worth it
+	return $self->query_tuples_pipelined(@_) if @_ >= 4;
+
+	my @results;
+	foreach my $sql (@_)
+	{
+		my $res = $self->query($sql);
+		croak $res->{error_message}
+		  unless $res->{status} == PGRES_TUPLES_OK;
+		my $rows = $res->{rows};
+		unless (@$rows)
+		{
+			# unfortunately breaks at least one test
+			# push(@results,"-- empty");
+			next;
+		}
+		# join will render undef as an empty string here
+		no warnings qw(uninitialized);
+		my @tuples = map { join('|', @$_); } @$rows;
+		push(@results, join("\n",@tuples));
+	}
+	return join("\n",@results);
+}
+
+=pod
+
+=item $session->query_tuples_pipelined($sql, ...)
+
+Run multiple SQL queries using pipeline mode for efficiency. Returns output
+in the same format as C<query_tuples> but with only one network round-trip
+for all queries.
+
+=cut
+
+sub query_tuples_pipelined
+{
+	my $self = shift;
+	my @queries = @_;
+	my $conn = $self->{conn};
+	my @results;
+
+	# Enter pipeline mode
+	PQenterPipelineMode($conn) or croak "Failed to enter pipeline mode";
+
+	# Send all queries using PQsendQueryParams (PQsendQuery not allowed in pipeline mode)
+	for my $sql (@queries)
+	{
+		PQsendQueryParams($conn, $sql, 0, undef, undef, undef, undef, 0) or do {
+			PQexitPipelineMode($conn);
+			croak "Failed to send query: " . PQerrorMessage($conn);
+		};
+	}
+
+	# Mark end of pipeline
+	PQpipelineSync($conn) or do {
+		PQexitPipelineMode($conn);
+		croak "Failed to sync pipeline";
+	};
+
+	# Collect results for each query
+	for my $i (0 .. $#queries)
+	{
+		my $result = _get_result($conn);
+		if (!$result)
+		{
+			PQexitPipelineMode($conn);
+			croak "No result for query $i";
+		}
+
+		my $status = PQresultStatus($result);
+		if ($status == PGRES_PIPELINE_ABORTED)
+		{
+			PQclear($result);
+			PQexitPipelineMode($conn);
+			croak "Pipeline aborted at query $i";
+		}
+
+		if ($status == PGRES_TUPLES_OK)
+		{
+			my $res = _get_result_data($result, $conn);
+			my $rows = $res->{rows};
+			if (@$rows)
+			{
+				no warnings qw(uninitialized);
+				my @tuples = map { join('|', @$_); } @$rows;
+				push(@results, join("\n", @tuples));
+			}
+		}
+		elsif ($status != PGRES_COMMAND_OK)
+		{
+			my $err = PQerrorMessage($conn);
+			PQclear($result);
+			PQexitPipelineMode($conn);
+			croak "Query $i failed: $err";
+		}
+		PQclear($result);
+
+		# Consume the NULL result that marks end of this query's results
+		while (my $extra = PQgetResult($conn))
+		{
+			PQclear($extra);
+		}
+	}
+
+	# Consume the pipeline sync result
+	my $sync_result = _get_result($conn);
+	if ($sync_result)
+	{
+		my $status = PQresultStatus($sync_result);
+		PQclear($sync_result);
+		if ($status != PGRES_PIPELINE_SYNC)
+		{
+			PQexitPipelineMode($conn);
+			croak "Expected PGRES_PIPELINE_SYNC, got $status";
+		}
+	}
+
+	# Exit pipeline mode
+	PQexitPipelineMode($conn) or croak "Failed to exit pipeline mode";
+
+	return join("\n", @results);
+}
+
+
+sub setnonblocking
+{
+	my $self = shift;
+	my $val = shift;
+	my $res = PQsetnonblocking($self->{conn}, $val);
+	croak "problem setting non-blocking"
+	  if $res;
+	return;
+}
+
+sub isnonblocking
+{
+	my $self = shift;
+	return PQisnonblocking($self->{conn});
+}
+
+sub enterPipelineMode
+{
+	my $self = shift;
+	return PQenterPipelineMode($self->{conn});
+}
+
+sub exitPipelineMode
+{
+	my $self = shift;
+	return PQexitPipelineMode($self->{conn});
+}
+
+sub pipelineStatus
+{
+	my $self = shift;
+	return PQpipelineStatus($self->{conn});
+}
+
+sub pipelineSync
+{
+	my $self = shift;
+	return PQpipelineSync($self->{conn});
+}
+
+
+sub do_pipeline
+{
+	my $self = shift;
+	my $statement = shift;
+	my @args = @_;
+	my $nargs = scalar(@args);
+	my $res = PQsendQueryParams($self->{conn}, $statement, $nargs, undef, \@args, undef , undef, 0);
+	return $res;
+}
+
+=pod
+
+=item $session->get_notification()
+
+Check for a pending notification and return it as a hashref with keys
+C<channel>, C<pid>, and C<payload>. Returns undef if no notification is
+available.
+
+Automatically consumes any pending input before checking for notifications.
+
+=cut
+
+sub get_notification
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+
+	# Consume any pending input
+	PQconsumeInput($conn);
+
+	my $notify = PQnotifies($conn);
+	return undef unless $notify;
+
+	my $result = {
+		channel => PQnotify_channel($notify),
+		pid     => PQnotify_be_pid($notify),
+		payload => PQnotify_payload($notify),
+	};
+
+	PQnotify_free($notify);
+
+	return $result;
+}
+
+=pod
+
+=item $session->get_all_notifications()
+
+Consume input and return all pending notifications as an arrayref of hashrefs.
+Each hashref has keys C<channel>, C<pid>, and C<payload>.
+
+=cut
+
+sub get_all_notifications
+{
+	my $self = shift;
+	my @notifications;
+
+	while (my $notify = $self->get_notification())
+	{
+		push @notifications, $notify;
+	}
+
+	return \@notifications;
+}
+
+=pod
+
+=back
+
+=cut
+
+
+1;
diff --git a/src/test/perl/meson.build b/src/test/perl/meson.build
index 0fd36c9e570..37d8ac2a36a 100644
--- a/src/test/perl/meson.build
+++ b/src/test/perl/meson.build
@@ -4,6 +4,10 @@
 
 install_data(
   'PostgreSQL/Version.pm',
+  'PostgreSQL/FindLib.pm',
+  'PostgreSQL/PqFFI.pm',
+  'PostgreSQL/PqConstants.pm',
+  'PostgreSQL/PGTypes.pm',
   install_dir: dir_pgxs / 'src/test/perl/PostgreSQL')
 
 install_data(
@@ -15,4 +19,5 @@ install_data(
   'PostgreSQL/Test/BackgroundPsql.pm',
   'PostgreSQL/Test/AdjustDump.pm',
   'PostgreSQL/Test/AdjustUpgrade.pm',
+  'PostgreSQL/Test/Session.pm',
   install_dir: dir_pgxs / 'src/test/perl/PostgreSQL/Test')
-- 
2.43.0

