From be07e2a0ad1a0da67a505a5184729b8c7dcd023d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 17 Feb 2026 21:13:54 +1300
Subject: [PATCH 2/2] Provide utf16 type.

Science project...
---
 src/backend/utils/adt/pg_locale.c         | 103 +++++
 src/backend/utils/adt/pg_locale_icu.c     |  53 +++
 src/backend/utils/adt/varlena.c           | 468 +++++++++++++++++++-
 src/include/c.h                           |   5 +
 src/include/catalog/pg_amop.dat           |  79 ++++
 src/include/catalog/pg_amproc.dat         |  12 +
 src/include/catalog/pg_opclass.dat        |   2 +
 src/include/catalog/pg_operator.dat       | 160 +++++++
 src/include/catalog/pg_proc.dat           | 130 ++++++
 src/include/catalog/pg_type.dat           |   8 +
 src/include/fmgr.h                        |   3 +
 src/include/mb/pg_wchar.h                 |  34 ++
 src/include/mb/string_iterator.h          | 500 ++++++++++++++++++++++
 src/include/mb/unicode_strings.h          | 254 +++++++++++
 src/include/mb/unicode_types.h            |  61 +++
 src/include/utils/pg_locale.h             |  20 +
 src/test/regress/expected/encoding.out    |  32 ++
 src/test/regress/expected/type_sanity.out |   1 +
 src/test/regress/expected/unicode.out     | 134 ++++++
 src/test/regress/regress.c                |  16 +
 src/test/regress/sql/encoding.sql         |  16 +
 src/test/regress/sql/type_sanity.sql      |   1 +
 src/test/regress/sql/unicode.sql          | 101 +++++
 src/tools/pgindent/typedefs.list          |   4 +
 24 files changed, 2185 insertions(+), 12 deletions(-)
 create mode 100644 src/include/mb/string_iterator.h
 create mode 100644 src/include/mb/unicode_strings.h
 create mode 100644 src/include/mb/unicode_types.h

diff --git a/src/backend/utils/adt/pg_locale.c b/src/backend/utils/adt/pg_locale.c
index 6c5c1019e1e..9231d48ea3f 100644
--- a/src/backend/utils/adt/pg_locale.c
+++ b/src/backend/utils/adt/pg_locale.c
@@ -42,6 +42,7 @@
 #include "common/hashfn.h"
 #include "common/string.h"
 #include "mb/pg_wchar.h"
+#include "mb/unicode_strings.h"
 #include "miscadmin.h"
 #include "utils/builtins.h"
 #include "utils/guc_hooks.h"
@@ -50,6 +51,7 @@
 #include "utils/pg_locale.h"
 #include "utils/pg_locale_c.h"
 #include "utils/relcache.h"
+#include "utils/pg_stack_alloc.h"
 #include "utils/syscache.h"
 
 #ifdef WIN32
@@ -1407,6 +1409,107 @@ pg_strncoll(const char *arg1, ssize_t len1, const char *arg2, ssize_t len2,
 	return locale->collate->strncoll(arg1, len1, arg2, len2, locale);
 }
 
+/*
+ * For providers without UTF-16 support, convert both strings to database
+ * encoding with NUL-terminator.
+ */
+static int
+pg_strncoll_char16_convert(const storage_char16_t *data1, size_t size1,
+						   const storage_char16_t *data2, size_t size2,
+						   pg_locale_t locale)
+{
+	char	   *cstr1;
+	char	   *cstr2;
+	int			result;
+
+	DECLARE_PG_STACK();
+
+	cstr1 = pg_stack_alloc(char16_to_mb_max_size(size1) + 1);
+	char16_to_local_cstr(cstr1, data1, size1);
+
+	cstr2 = pg_stack_alloc(char16_to_mb_max_size(size2) + 1);
+	char16_to_local_cstr(cstr2, data2, size2);
+
+	result = pg_strncoll(cstr1, -1, cstr2, -1, locale);
+
+	pg_stack_free(cstr1);
+	pg_stack_free(cstr2);
+
+	return result;
+}
+
+/*
+ * Collate two UTF-16 strings.
+ */
+int
+pg_strncoll_char16(const storage_char16_t *data1, size_t size1,
+				   const storage_char16_t *data2, size_t size2,
+				   pg_locale_t locale)
+{
+	if (locale->collate->strncoll_char16)
+		return locale->collate->strncoll_char16(data1, size1,
+												data2, size2,
+												locale);
+
+	return pg_strncoll_char16_convert(data1, size1, data2, size2, locale);
+}
+
+/*
+ * For providers without UTF-16 support, convert one string to database
+ * encoding with NUL-terminator.
+ */
+static int
+pg_strncoll_char16_local_convert(const storage_char16_t *data1, size_t size1,
+								 const char *data2, size_t size2,
+								 pg_locale_t locale)
+{
+	char	   *cstr1;
+	int			result;
+
+	DECLARE_PG_STACK();
+
+	cstr1 = pg_stack_alloc(char16_to_mb_max_size(size1) + 1);
+	char16_to_local_cstr(cstr1, data1, size1);
+
+	result = pg_strncoll(cstr1, -1, data2, size2, locale);
+
+	pg_stack_free(cstr1);
+
+	return result;
+}
+
+/*
+ * Compare a UTF-16 string and a database encoding string.
+ */
+int
+pg_strncoll_char16_local(const storage_char16_t *data1, size_t size1,
+						 const char *data2, size_t size2,
+						 pg_locale_t locale)
+{
+	if (locale->collate->strncoll_char16_local)
+		return locale->collate->strncoll_char16_local(data1, size2,
+													  data2, size2,
+													  locale);
+
+	return pg_strncoll_char16_local_convert(data1, size1, data2, size2,
+											locale);
+}
+
+/*
+ * Compare a UTF-16 string and a database encoding string.
+ */
+int
+pg_strncoll_local_char16(const char *data1, size_t size1,
+						 const storage_char16_t *data2, size_t size2,
+						 pg_locale_t locale)
+{
+	/* No seperate implementation for now. */
+	int			result = pg_strncoll_char16_local(data2, size2, data1, size1, locale);
+
+	INVERT_COMPARE_RESULT(result);
+	return result;
+}
+
 /*
  * Return true if the collation provider supports pg_strxfrm() and
  * pg_strnxfrm(); otherwise false.
diff --git a/src/backend/utils/adt/pg_locale_icu.c b/src/backend/utils/adt/pg_locale_icu.c
index a4a4e82eb9e..6bf617f8e56 100644
--- a/src/backend/utils/adt/pg_locale_icu.c
+++ b/src/backend/utils/adt/pg_locale_icu.c
@@ -77,6 +77,12 @@ static size_t downcase_ident_icu(char *dst, size_t dstsize, const char *src,
 static int	strncoll_icu(const char *arg1, ssize_t len1,
 						 const char *arg2, ssize_t len2,
 						 pg_locale_t locale);
+static int	strncoll_char16_icu(const storage_char16_t *arg1, size_t len1,
+								const storage_char16_t *arg2, size_t len2,
+								pg_locale_t locale);
+static int	strncoll_char16_utf8_icu(const storage_char16_t *arg1, size_t len1,
+									 const char *arg2, size_t len2,
+									 pg_locale_t locale);
 static size_t strnxfrm_icu(char *dest, size_t destsize,
 						   const char *src, ssize_t srclen,
 						   pg_locale_t locale);
@@ -165,6 +171,8 @@ static const struct collate_methods collate_methods_icu_utf8 = {
 #else
 	.strncoll = strncoll_icu,
 #endif
+	.strncoll_char16 = strncoll_char16_icu,
+	.strncoll_char16_local = strncoll_char16_utf8_icu,
 	.strnxfrm = strnxfrm_icu,
 	.strnxfrm_prefix = strnxfrm_prefix_icu_utf8,
 	.strxfrm_is_safe = true,
@@ -1062,6 +1070,51 @@ strncoll_icu(const char *arg1, ssize_t len1,
 	return result;
 }
 
+static int
+strncoll_char16_icu(const storage_char16_t *data1, size_t size1,
+					const storage_char16_t *data2, size_t size2,
+					pg_locale_t locale)
+{
+	UCharIterator iter1;
+	UCharIterator iter2;
+	UErrorCode	status;
+	int			result;
+
+	UITER_SET_STORAGE_CHAR_T(&iter1, data1, size1);
+	UITER_SET_STORAGE_CHAR_T(&iter2, data2, size2);
+
+	status = U_ZERO_ERROR;
+	result = ucol_strcollIter(locale->icu.ucol, &iter1, &iter2, &status);
+	if (U_FAILURE(status))
+		ereport(ERROR,
+				(errmsg("collation failed: %s", u_errorName(status))));
+
+	return result;
+}
+
+static int
+strncoll_char16_utf8_icu(const storage_char16_t *data1, size_t size1,
+						 const char *data2, size_t size2,
+						 pg_locale_t locale)
+{
+	UCharIterator iter1;
+	UCharIterator iter2;
+	UErrorCode	status;
+	int			result;
+
+	/* Iterators for unaligned big-endian UTF-16 and UTF-8. */
+	UITER_SET_STORAGE_CHAR_T(&iter1, data1, size1);
+	uiter_setUTF8(&iter2, data2, size2);
+
+	status = U_ZERO_ERROR;
+	result = ucol_strcollIter(locale->icu.ucol, &iter1, &iter2, &status);
+	if (U_FAILURE(status))
+		ereport(ERROR,
+				(errmsg("collation failed: %s", u_errorName(status))));
+
+	return result;
+}
+
 /* 'srclen' of -1 means the strings are NUL-terminated */
 static size_t
 strnxfrm_prefix_icu(char *dest, size_t destsize,
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index c0ff51bd2fc..174aac26c48 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -30,6 +30,7 @@
 #include "funcapi.h"
 #include "lib/hyperloglog.h"
 #include "libpq/pqformat.h"
+#include "mb/unicode_strings.h"
 #include "miscadmin.h"
 #include "nodes/execnodes.h"
 #include "parser/scansup.h"
@@ -40,6 +41,7 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_locale.h"
+#include "utils/pg_stack_alloc.h"
 #include "utils/sortsupport.h"
 #include "utils/tuplestore.h"
 #include "utils/varlena.h"
@@ -4728,7 +4730,7 @@ text_reverse(PG_FUNCTION_ARGS)
 	text	   *str = PG_GETARG_TEXT_PP(0);
 	const char *p = VARDATA_ANY(str);
 	int			len = VARSIZE_ANY_EXHDR(str);
-	const char *endp = p + len;
+	mb_iterator iter = MB_ITERATOR_INIT_LOCAL(p, len);
 	text	   *result;
 	char	   *dst;
 
@@ -4739,22 +4741,16 @@ text_reverse(PG_FUNCTION_ARGS)
 	if (pg_database_encoding_max_length() > 1)
 	{
 		/* multibyte version */
-		while (p < endp)
-		{
-			int			sz;
-
-			sz = pg_mblen_range(p, endp);
-			dst -= sz;
-			memcpy(dst, p, sz);
-			p += sz;
-		}
+		while (mb_iterator_has_more(&iter))
+			dst -= mb_iterator_store_before(&iter, dst);
 	}
 	else
 	{
 		/* single byte version */
-		while (p < endp)
-			*(--dst) = *p++;
+		while (mb_iterator_has_more(&iter))
+			dst -= mb_iterator_store_before__sb(&iter, dst);
 	}
+	Assert(dst = VARDATA_ANY(result));
 
 	PG_RETURN_TEXT_P(result);
 }
@@ -5809,3 +5805,451 @@ invalid_pair:
 			 errmsg("invalid Unicode surrogate pair")));
 	PG_RETURN_NULL();			/* keep compiler quiet */
 }
+
+/*
+ * UTF-16 strings.  These provide more compact storage of some languages that
+ * would otherwise require 3-byte UTF8 seuences for their core character set.
+ * They are never directly exposed to clients without converted to database
+ * encoding..
+ */
+
+/*
+ * Allocate a new utf16 with space for up to max_utf16_size char16_t
+ * codepoints.  When converting from database encoding, multiply source bytes
+ * by MAX_UTF16_CODEPOINTS_PER_MBLEN.
+ */
+static inline utf16 *
+utf16_new(size_t max_utf16_size)
+{
+	utf16	   *result = palloc(VARHDRSZ + max_utf16_size * sizeof(char16_t));
+
+	SET_VARSIZE(result, VARHDRSZ + sizeof(char16_t) * max_utf16_size);
+	return result;
+}
+
+static inline storage_char16_t *
+utf16_data(utf16 *value)
+{
+	return (storage_char16_t *) VARDATA_ANY(value);
+}
+
+/*
+ * Return number of char16_t characters in a utf16.
+ */
+static inline size_t
+utf16_size(const utf16 *value)
+{
+	return VARSIZE_ANY_EXHDR(value) / sizeof(storage_char16_t);
+}
+
+/*
+ * Convenience macro for initializing char16_iterator directly from a utf16
+ * varlena object.
+ */
+#define CHAR16_ITERATOR_INIT_WITH_UTF16(o) \
+	CHAR16_ITERATOR_INIT(utf16_data(o), utf16_size(o))
+
+/*
+ * Set number of char16_t characters in a utf16, if it differs from the
+ * estimate given to utf16_new().  It can't be set larger.
+ */
+static inline void
+utf16_set_size(utf16 *value, size_t size)
+{
+	size_t		varsize = VARHDRSZ + sizeof(storage_char16_t) * size;
+
+	Assert(varsize <= VARSIZE(value));
+	SET_VARSIZE(value, varsize);
+}
+
+/*
+ * Construct a new utf16 from database encoding.
+ */
+static inline utf16 *
+utf16_new_from_local(const char *src, size_t src_size)
+{
+	utf16	   *result;
+	storage_char16_t *dst;
+	size_t		dst_size;
+
+	result = utf16_new(mb_to_char16_max_size(src_size));
+	dst = utf16_data(result);
+	dst_size = local_to_char16(dst, src, src_size);
+	utf16_set_size(result, dst_size);
+
+	return result;
+}
+
+Datum
+utf16_length(PG_FUNCTION_ARGS)
+{
+	utf16	   *u = PG_GETARG_UTF16_PP(0);
+	char16_iterator iter = CHAR16_ITERATOR_INIT_WITH_UTF16(u);
+	int32		result = 0;
+
+	while (char16_iterator_has_more(&iter))
+	{
+		result++;
+		char16_iterator_advance(&iter);
+	}
+
+	PG_RETURN_INT32(result);
+}
+
+Datum
+utf16_octet_length(PG_FUNCTION_ARGS)
+{
+	Datum		str = PG_GETARG_DATUM(0);
+
+	PG_RETURN_INT32((toast_raw_datum_size(str) - VARHDRSZ));
+}
+
+Datum
+utf16out(PG_FUNCTION_ARGS)
+{
+	utf16	   *value = PG_GETARG_UTF16_PP(0);
+	const storage_char16_t *src = utf16_data(value);
+	size_t		src_size = utf16_size(value);
+	char	   *dst_cstr;
+
+	dst_cstr = palloc(char16_to_mb_max_size(src_size) + 1);
+	char16_to_local_cstr(dst_cstr, src, src_size);
+
+	PG_RETURN_CSTRING(dst_cstr);
+}
+
+Datum
+utf16in(PG_FUNCTION_ARGS)
+{
+	const char *src_cstr = PG_GETARG_CSTRING(0);
+	size_t		src_size = strlen(src_cstr);
+	utf16	   *result;
+
+	result = utf16_new_from_local(src_cstr, src_size);
+
+	PG_RETURN_UTF16_P(result);
+}
+
+Datum
+utf16recv(PG_FUNCTION_ARGS)
+{
+	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
+	char	   *src;
+	int			src_size;
+	utf16	   *result;
+
+	src = pq_getmsgtext(buf, buf->len - buf->cursor, &src_size);
+	result = utf16_new_from_local(src, src_size);
+	pfree(src);
+
+	PG_RETURN_UTF16_P(result);
+}
+
+Datum
+utf16send(PG_FUNCTION_ARGS)
+{
+	utf16	   *value = PG_GETARG_UTF16_PP(0);
+	StringInfoData buf;
+	const storage_char16_t *src = utf16_data(value);
+	size_t		src_size = utf16_size(value);
+	char	   *dst;
+	size_t		dst_size;
+
+	DECLARE_PG_STACK();
+
+	dst = pg_stack_alloc(char16_to_mb_max_size(src_size));
+	dst_size = char16_to_local(dst, src, src_size);
+
+	pq_begintypsend(&buf);
+	pq_sendtext(&buf, dst, dst_size);
+
+	pg_stack_free(dst);
+
+	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+}
+
+/* Codepoint order comparison. */
+static int
+char16_cmp(const storage_char16_t *data1, size_t size1,
+		   const storage_char16_t *data2, size_t size2)
+{
+	int			result = char16_cmp1(data1, data2, Min(size1, size2));
+
+	if (result == 0)
+		result = size1 < size2 ? -1 : size1 > size2 ? 1 : 0;
+	return result;
+}
+
+/*
+ * XXX It might be nice to use _Generic for type selection instead of funky
+ * name-pasting macros where you have to state the typenames explicitly, but
+ * utf16 and text are not distinct C types.
+ */
+
+/* Get an fmgr argument of type T. */
+#define GEN_GET_ARG__text								PG_GETARG_TEXT_PP
+#define GEN_GET_ARG__NameData 							PG_GETARG_NAME
+#define GEN_GET_ARG__utf16 								PG_GETARG_UTF16_PP
+#define GEN_GET_ARG(T, n) CppConcat2(GEN_GET_ARG__, T)(n)
+
+/* Get the character type for type T. */
+#define GEN_TYPEOF_CHAR__text							char
+#define GEN_TYPEOF_CHAR__NameData						char
+#define GEN_TYPEOF_CHAR__utf16							storage_char16_t
+#define GEN_TYPEOF_CHAR(T) CppConcat2(GEN_TYPEOF_CHAR__, T)
+
+/* Get a pointer to the basic characters from type T. */
+#define GEN_GET_DATA__text(v) 							VARDATA_ANY(v)
+#define GEN_GET_DATA__NameData(v)						NameStr(*(v))
+#define GEN_GET_DATA__utf16(v) 							utf16_data(v)
+#define GEN_GET_DATA(T, v) CppConcat2(GEN_GET_DATA__, T)(v)
+
+/* Get size in basic characters from T. */
+#define GEN_GET_SIZE__text(v) VARSIZE_ANY_EXHDR(v)
+#define GEN_GET_SIZE__NameData(v) strnlen(NameStr(*(v)), NAMEDATALEN)
+#define GEN_GET_SIZE__utf16(v) utf16_size(v)
+#define GEN_GET_SIZE(T, v) CppConcat2(GEN_GET_SIZE__, T)(v)
+
+/* Name mangling convention for pairs of character types. */
+#define GEN_FNAME__char__char							local
+#define GEN_FNAME__storage_char16_t__storage_char16_t	char16
+#define GEN_FNAME__char__storage_char16_t				local_char16
+#define GEN_FNAME__storage_char16_t__char				char16_local
+
+/* Name mangling convention for pairs of types. */
+#define GEN_FNAME__text__text							text
+#define GEN_FNAME__text__NameData						textname
+#define GEN_FNAME__text__utf16 							textutf16
+#define GEN_FNAME__NameData__text 						nametext
+#define GEN_FNAME__NameData__NameData					name
+#define GEN_FNAME__NameData__utf16						nameutf16
+#define GEN_FNAME__utf16__text							utf16text
+#define GEN_FNAME__utf16__NameData						utf16name
+#define GEN_FNAME__utf16__utf16							utf16
+
+/* Make function name for overload T1, T2 (prefix, suffix style). */
+#define GEN_FNAME_P(prefix, T1, T2)										\
+	CppConcat2(prefix, CppConcat2(GEN_FNAME__, T1##__##T2))
+#define GEN_FNAME_S(T1, T2, suffix)										\
+	CppConcat2(CppConcat2(GEN_FNAME__, T1##__##T2), suffix)
+
+/* Call function overloaded for T1, T2 (prefix, suffix style). */
+#define GEN_CALL_OVERLOAD_P(prefix, T1, T2, ...)						\
+	GEN_FNAME_P(prefix, T1, T2)(__VA_ARGS__)
+#define GEN_CALL_OVERLOAD_S(T1, T2, suffix, ...)						\
+	GEN_FNAME_S(T1, T2, suffix)(__VA_ARGS__)
+
+#define GEN_STRNCOLL(T1, T2)											\
+static int																\
+GEN_FNAME_S(T1, T2, _strncoll) (const T1 *data1, size_t size1,			\
+								const T2 *data2, size_t size2,			\
+								Oid collid)								\
+{																		\
+	pg_locale_t mylocale;												\
+	int			result;													\
+	check_collation_set(collid);										\
+	mylocale = pg_newlocale_from_collation(collid);						\
+	if (mylocale->collate_is_c)											\
+	{																	\
+		/* Codepoint order determines result. */						\
+		result = GEN_CALL_OVERLOAD_S(T1, T2, _cmp,						\
+									 data1, size1, data2, size2);		\
+	}																	\
+	else																\
+	{																	\
+		/* Locale determines results. */								\
+		result = GEN_CALL_OVERLOAD_P(pg_strncoll_, T1, T2,				\
+									 data1, size1, data2, size2,		\
+									 mylocale);							\
+		/* Codepoint order tie-breaker for derministic locales. */		\
+		if (result == 0 && mylocale->deterministic)						\
+			result = GEN_CALL_OVERLOAD_S(T1, T2, _cmp,					\
+										 data1, size1, data2, size2);	\
+	}																	\
+	return result;														\
+}
+
+/* Dispatch to correct overload of XXX_strncoll(). */
+#define GEN_CMP(T1, T2)													\
+Datum																	\
+GEN_FNAME_S(T1, T2, cmp)(PG_FUNCTION_ARGS)								\
+{																		\
+	T1 *arg1 = GEN_GET_ARG(T1, 0);										\
+	T2 *arg2 = GEN_GET_ARG(T2, 1);										\
+	Oid collid = PG_GET_COLLATION();									\
+	int result = GEN_CALL_OVERLOAD_S(GEN_TYPEOF_CHAR(T1),				\
+									 GEN_TYPEOF_CHAR(T2),				\
+									 _strncoll,							\
+									 GEN_GET_DATA(T1, arg1),			\
+									 GEN_GET_SIZE(T1, arg1),			\
+									 GEN_GET_DATA(T2, arg2),			\
+									 GEN_GET_SIZE(T2, arg2),			\
+									 collid);							\
+	PG_FREE_IF_COPY(arg1, 0);											\
+	PG_FREE_IF_COPY(arg2, 1);											\
+	PG_RETURN_INT32(result);											\
+}
+
+/* General case: dispatch to correct overload of XXX_strncoll(). */
+#define GEN_REL(T1, T2, suffix, op)										\
+Datum																	\
+GEN_FNAME_S(T1, T2, suffix)(PG_FUNCTION_ARGS)							\
+{																		\
+	T1 *arg1 = GEN_GET_ARG(T1, 0);										\
+	T2 *arg2 = GEN_GET_ARG(T2, 1);										\
+	Oid collid = PG_GET_COLLATION();									\
+	int result = GEN_CALL_OVERLOAD_S(GEN_TYPEOF_CHAR(T1),				\
+									 GEN_TYPEOF_CHAR(T2),				\
+									 _strncoll,							\
+									 GEN_GET_DATA(T1, arg1),			\
+									 GEN_GET_SIZE(T1, arg1),			\
+									 GEN_GET_DATA(T2, arg2),			\
+									 GEN_GET_SIZE(T2, arg2),			\
+									 collid);							\
+	PG_FREE_IF_COPY(arg1, 0);											\
+	PG_FREE_IF_COPY(arg2, 1);											\
+	PG_RETURN_BOOL(result op 0);										\
+}
+
+/*
+ * Special case for == and != and deterministic locales: bit level compare,
+ * skipping the locale system.  Otherwise, same as the above.
+ */
+#define GEN_EQ(T1, T2, suffix, op)										\
+Datum																	\
+GEN_FNAME_S(T1, T2, suffix)(PG_FUNCTION_ARGS)							\
+{																		\
+	Oid			collid = PG_GET_COLLATION();							\
+	pg_locale_t mylocale = 0;											\
+	static_assert(sizeof(GEN_TYPEOF_CHAR(T1)) !=						\
+				  sizeof(GEN_TYPEOF_CHAR(T2)),							\
+				  "should use optimized version for same char type");	\
+	check_collation_set(collid);										\
+	mylocale = pg_newlocale_from_collation(collid);						\
+	if (mylocale->deterministic)										\
+	{																	\
+		/* Codepoint order then length determine result. */				\
+		T1 *arg1 = GEN_GET_ARG(T1, 0);									\
+		T2 *arg2 = GEN_GET_ARG(T2, 1);									\
+		int result = GEN_CALL_OVERLOAD_S(GEN_TYPEOF_CHAR(T1),			\
+										 GEN_TYPEOF_CHAR(T2),			\
+										 _cmp,							\
+										 GEN_GET_DATA(T1, arg1),		\
+										 GEN_GET_SIZE(T1, arg1),		\
+										 GEN_GET_DATA(T2, arg2),		\
+										 GEN_GET_SIZE(T2, arg2));		\
+		PG_FREE_IF_COPY(arg1, 0);										\
+		PG_FREE_IF_COPY(arg2, 1);										\
+		PG_RETURN_BOOL(result op 0);									\
+	}																	\
+	else																\
+	{																	\
+		/* Locale determines result. */									\
+		T1 *arg1 = GEN_GET_ARG(T1, 0);									\
+		T2 *arg2 = GEN_GET_ARG(T2, 1);									\
+		int result = GEN_CALL_OVERLOAD_S(GEN_TYPEOF_CHAR(T1),			\
+										 GEN_TYPEOF_CHAR(T2),			\
+										 _strncoll,						\
+										 GEN_GET_DATA(T1, arg1),		\
+										 GEN_GET_SIZE(T1, arg1),		\
+										 GEN_GET_DATA(T2, arg2),		\
+										 GEN_GET_SIZE(T2, arg2),		\
+										 collid);						\
+		PG_FREE_IF_COPY(arg1, 0);										\
+		PG_FREE_IF_COPY(arg2, 1);										\
+		PG_RETURN_BOOL(result op 0);									\
+	}																	\
+}
+
+/*
+ * Special case for == and != and deterministic locales when the basic char
+ * type is the same: we can cheaply check the size first to determine that
+ * strings are NOT equal.
+ */
+#define GEN_EQ_SAME_CHAR(T1, T2, suffix, op)							\
+Datum																	\
+GEN_FNAME_S(T1, T2, suffix)(PG_FUNCTION_ARGS)							\
+{																		\
+	Oid			collid = PG_GET_COLLATION();							\
+	pg_locale_t mylocale = 0;											\
+	static_assert(sizeof(GEN_TYPEOF_CHAR(T1)) ==						\
+				  sizeof(GEN_TYPEOF_CHAR(T2)),							\
+				  "optimization requires same char");					\
+	check_collation_set(collid);										\
+	mylocale = pg_newlocale_from_collation(collid);						\
+	if (mylocale->deterministic)										\
+	{																	\
+		/* Optimization for same basic character type. */				\
+		if (toast_raw_datum_size(PG_GETARG_DATUM(0)) !=					\
+			toast_raw_datum_size(PG_GETARG_DATUM(1)))					\
+		{																\
+			/* Different size: fast result without detoasting. */		\
+			PG_RETURN_BOOL(false op true);								\
+		}																\
+		else															\
+		{																\
+			/* Same size: codepoint equality determines result. */		\
+			T1 *arg1 = PG_GETARG_UTF16_PP(0);							\
+			T2 *arg2 = PG_GETARG_UTF16_PP(1);							\
+			int result = GEN_CALL_OVERLOAD_S(GEN_TYPEOF_CHAR(T1),		\
+											 GEN_TYPEOF_CHAR(T2),		\
+											 _cmp1,						\
+											 GEN_GET_DATA(T1, arg1),	\
+											 GEN_GET_DATA(T2, arg2),	\
+											 GEN_GET_SIZE(T1, arg1));	\
+			Assert(GEN_GET_SIZE(T1, arg1) == GEN_GET_SIZE(T2, arg2));	\
+			PG_FREE_IF_COPY(arg1, 0);									\
+			PG_FREE_IF_COPY(arg2, 1);									\
+			PG_RETURN_BOOL(result op 0);								\
+		}																\
+	}																	\
+	else																\
+	{																	\
+		/* Locale determines result. */									\
+		T1 *arg1 = GEN_GET_ARG(T1, 0);									\
+		T2 *arg2 = GEN_GET_ARG(T2, 1);									\
+		int result = GEN_CALL_OVERLOAD_S(GEN_TYPEOF_CHAR(T1),			\
+										 GEN_TYPEOF_CHAR(T2),			\
+										 _strncoll,						\
+										 GEN_GET_DATA(T1, arg1),		\
+										 GEN_GET_SIZE(T1, arg1),		\
+										 GEN_GET_DATA(T2, arg2),		\
+										 GEN_GET_SIZE(T2, arg2),		\
+										 collid);						\
+		PG_FREE_IF_COPY(arg1, 0);										\
+		PG_FREE_IF_COPY(arg2, 1);										\
+		PG_RETURN_BOOL(result op 0);									\
+	}																	\
+}
+
+/* Make the workhorse character-based collation functions. */
+GEN_STRNCOLL(storage_char16_t, storage_char16_t);
+GEN_STRNCOLL(storage_char16_t, char);
+GEN_STRNCOLL(char, storage_char16_t);
+
+/* Make the registered procedures with non-detoasting optimization. */
+#define GEN_RELS_SAME_CHAR(T1, T2)										\
+	GEN_CMP(T1, T2);													\
+	GEN_EQ_SAME_CHAR(T1, T2, eq, ==);									\
+	GEN_EQ_SAME_CHAR(T1, T2, ne, !=);									\
+	GEN_REL(T1, T2, lt, <);												\
+	GEN_REL(T1, T2, le, <=);											\
+	GEN_REL(T1, T2, ge, >=);											\
+	GEN_REL(T1, T2, gt, >);
+
+/* Make the registered precedures without non-detoasting optimization. */
+#define GEN_RELS_DIFF_CHAR(T1, T2)										\
+	GEN_CMP(T1, T2);													\
+	GEN_EQ(T1, T2, eq, ==);												\
+	GEN_EQ(T1, T2, ne, !=);												\
+	GEN_REL(T1, T2, lt, <);												\
+	GEN_REL(T1, T2, le, <=);											\
+	GEN_REL(T1, T2, ge, >=);											\
+	GEN_REL(T1, T2, gt, >);
+
+/* For now, generate only type permutations with utf16 on one side. */
+GEN_RELS_DIFF_CHAR(utf16, text);
+GEN_RELS_DIFF_CHAR(utf16, NameData);
+GEN_RELS_DIFF_CHAR(text, utf16);
+GEN_RELS_DIFF_CHAR(NameData, utf16);
+GEN_RELS_SAME_CHAR(utf16, utf16);
diff --git a/src/include/c.h b/src/include/c.h
index 97ed8c63f5e..07766516bde 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -497,6 +497,8 @@ extern "C++"
  *		Convert the argument to a string, after one round of macro expansion.
  * CppConcat
  *		Concatenate two arguments together, using the C preprocessor.
+ * CppConcat2
+ *		Concatenate two arguments together, and apply macro expansion.
  *
  * Note: There used to be support here for pre-ANSI C compilers that didn't
  * support # and ##.  Nowadays, these macros are just for clarity and/or
@@ -505,6 +507,7 @@ extern "C++"
 #define CppAsString(identifier) #identifier
 #define CppAsString2(x)			CppAsString(x)
 #define CppConcat(x, y)			x##y
+#define CppConcat2(x, y)		CppConcat(x, y)
 
 /*
  * VA_ARGS_NARGS
@@ -787,6 +790,7 @@ typedef struct varlena
  */
 typedef varlena bytea;
 typedef varlena text;
+typedef varlena utf16;
 typedef varlena BpChar;			/* blank-padded char, ie SQL char(n) */
 typedef varlena VarChar;		/* var-length char, ie SQL varchar(n) */
 
@@ -1508,6 +1512,7 @@ typedef uint32_t char32_t;
 #endif
 #endif
 
+
 /* IWYU pragma: end_exports */
 
 #endif							/* C_H */
diff --git a/src/include/catalog/pg_amop.dat b/src/include/catalog/pg_amop.dat
index 8d5a0004a47..febb5f71f30 100644
--- a/src/include/catalog/pg_amop.dat
+++ b/src/include/catalog/pg_amop.dat
@@ -397,6 +397,85 @@
 { amopfamily => 'btree/text_ops', amoplefttype => 'text',
   amoprighttype => 'name', amopstrategy => '5', amopopr => '>(text,name)',
   amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'utf16', amopstrategy => '1', amopopr => '<(utf16,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'utf16', amopstrategy => '2', amopopr => '<=(utf16,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'utf16', amopstrategy => '3', amopopr => '=(utf16,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'utf16', amopstrategy => '4', amopopr => '>=(utf16,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'utf16', amopstrategy => '5', amopopr => '>(utf16,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'text', amopstrategy => '1', amopopr => '<(utf16,text)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'text', amopstrategy => '2', amopopr => '<=(utf16,text)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'text', amopstrategy => '3', amopopr => '=(utf16,text)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'text', amopstrategy => '4', amopopr => '>=(utf16,text)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'text', amopstrategy => '5', amopopr => '>(utf16,text)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'text',
+  amoprighttype => 'utf16', amopstrategy => '1', amopopr => '<(text,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'text',
+  amoprighttype => 'utf16', amopstrategy => '2', amopopr => '<=(text,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'text',
+  amoprighttype => 'utf16', amopstrategy => '3', amopopr => '=(text,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'text',
+  amoprighttype => 'utf16', amopstrategy => '4', amopopr => '>=(text,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'text',
+  amoprighttype => 'utf16', amopstrategy => '5', amopopr => '>(text,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'name', amopstrategy => '1', amopopr => '<(utf16,name)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'name', amopstrategy => '2', amopopr => '<=(utf16,name)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'name', amopstrategy => '3', amopopr => '=(utf16,name)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'name', amopstrategy => '4', amopopr => '>=(utf16,name)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'utf16',
+  amoprighttype => 'name', amopstrategy => '5', amopopr => '>(utf16,name)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'name',
+  amoprighttype => 'utf16', amopstrategy => '1', amopopr => '<(name,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'name',
+  amoprighttype => 'utf16', amopstrategy => '2', amopopr => '<=(name,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'name',
+  amoprighttype => 'utf16', amopstrategy => '3', amopopr => '=(name,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'name',
+  amoprighttype => 'utf16', amopstrategy => '4', amopopr => '>=(name,utf16)',
+  amopmethod => 'btree' },
+{ amopfamily => 'btree/text_ops', amoplefttype => 'name',
+  amoprighttype => 'utf16', amopstrategy => '5', amopopr => '>(name,utf16)',
+  amopmethod => 'btree' },
+
+
+
+
 
 # btree bpchar_ops
 
diff --git a/src/include/catalog/pg_amproc.dat b/src/include/catalog/pg_amproc.dat
index 4a1efdbc899..c78891b3f77 100644
--- a/src/include/catalog/pg_amproc.dat
+++ b/src/include/catalog/pg_amproc.dat
@@ -232,6 +232,18 @@
   amprocrighttype => 'text', amprocnum => '2', amproc => 'bttextsortsupport' },
 { amprocfamily => 'btree/text_ops', amproclefttype => 'text',
   amprocrighttype => 'text', amprocnum => '4', amproc => 'btvarstrequalimage' },
+{ amprocfamily => 'btree/text_ops', amproclefttype => 'utf16',
+  amprocrighttype => 'utf16', amprocnum => '1', amproc => 'utf16cmp' },
+{ amprocfamily => 'btree/text_ops', amproclefttype => 'utf16',
+  amprocrighttype => 'utf16', amprocnum => '4', amproc => 'btequalimage' },
+{ amprocfamily => 'btree/text_ops', amproclefttype => 'utf16',
+  amprocrighttype => 'text', amprocnum => '1', amproc => 'utf16textcmp' },
+{ amprocfamily => 'btree/text_ops', amproclefttype => 'text',
+  amprocrighttype => 'utf16', amprocnum => '1', amproc => 'textutf16cmp' },
+{ amprocfamily => 'btree/text_ops', amproclefttype => 'utf16',
+  amprocrighttype => 'name', amprocnum => '1', amproc => 'utf16namecmp' },
+{ amprocfamily => 'btree/text_ops', amproclefttype => 'name',
+  amprocrighttype => 'utf16', amprocnum => '1', amproc => 'nameutf16cmp' },
 { amprocfamily => 'btree/text_ops', amproclefttype => 'name',
   amprocrighttype => 'name', amprocnum => '1', amproc => 'btnamecmp' },
 { amprocfamily => 'btree/text_ops', amproclefttype => 'name',
diff --git a/src/include/catalog/pg_opclass.dat b/src/include/catalog/pg_opclass.dat
index df170b80840..b3dfabb6956 100644
--- a/src/include/catalog/pg_opclass.dat
+++ b/src/include/catalog/pg_opclass.dat
@@ -127,6 +127,8 @@
   opcintype => 'text' },
 { opcmethod => 'hash', opcname => 'text_ops', opcfamily => 'hash/text_ops',
   opcintype => 'text' },
+{ opcmethod => 'btree', opcname => 'utf16_ops', opcfamily => 'btree/text_ops',
+  opcintype => 'utf16' },
 { opcmethod => 'btree', opcname => 'time_ops', opcfamily => 'btree/time_ops',
   opcintype => 'time' },
 { opcmethod => 'hash', opcname => 'time_ops', opcfamily => 'hash/time_ops',
diff --git a/src/include/catalog/pg_operator.dat b/src/include/catalog/pg_operator.dat
index 1465f13120a..f8f07d74e8b 100644
--- a/src/include/catalog/pg_operator.dat
+++ b/src/include/catalog/pg_operator.dat
@@ -107,6 +107,166 @@
   oprcode => 'starts_with', oprrest => 'prefixsel',
   oprjoin => 'prefixjoinsel' },
 
+# text_utf6
+{ oid => '9820', descr => 'equal',
+  oprname => '=', oprcanmerge => 't', oprleft => 'utf16',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '=(utf16,utf16)',
+  oprnegate => '<>(utf16,utf16)', oprcode => 'utf16eq', oprrest => 'eqsel',
+  oprjoin => 'eqjoinsel' },
+{ oid => '9821', descr => 'not equal',
+  oprname => '<>', oprleft => 'utf16',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<>(utf16,utf16)',
+  oprnegate => '=(utf16,utf16)', oprcode => 'utf16ne', oprrest => 'neqsel',
+  oprjoin => 'neqjoinsel' },
+{ oid => '9825', descr => 'less than',
+  oprname => '<', oprleft => 'utf16',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '>(utf16,utf16)',
+  oprnegate => '>=(utf16,utf16)', oprcode => 'utf16lt', oprrest => 'scalarltsel',
+  oprjoin => 'scalarltjoinsel' },
+{ oid => '9826', descr => 'less than or equal',
+  oprname => '<=', oprleft => 'utf16',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '>=(utf16,utf16)',
+  oprnegate => '>(utf16,utf16)', oprcode => 'utf16le', oprrest => 'scalarlesel',
+  oprjoin => 'scalarlejoinsel' },
+{ oid => '9827', descr => 'greater than',
+  oprname => '>', oprleft => 'utf16',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<(utf16,utf16)',
+  oprnegate => '<=(utf16,utf16)', oprcode => 'utf16gt', oprrest => 'scalargtsel',
+  oprjoin => 'scalargtjoinsel' },
+{ oid => '9828', descr => 'greater than or equal',
+  oprname => '>=', oprleft => 'utf16',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<=(utf16,utf16)',
+  oprnegate => '<(utf16,utf16)', oprcode => 'utf16ge', oprrest => 'scalargesel',
+  oprjoin => 'scalargejoinsel' },
+
+# utf16, text
+{ oid => '9860', descr => 'equal',
+  oprname => '=', oprcanmerge => 't', oprleft => 'utf16',
+  oprright => 'text', oprresult => 'bool', oprcom => '=(text,utf16)',
+  oprnegate => '<>(utf16,text)', oprcode => 'utf16texteq', oprrest => 'eqsel',
+  oprjoin => 'eqjoinsel' },
+{ oid => '9861', descr => 'not equal',
+  oprname => '<>', oprleft => 'utf16',
+  oprright => 'text', oprresult => 'bool', oprcom => '<>(text,utf16)',
+  oprnegate => '=(utf16,text)', oprcode => 'utf16textne', oprrest => 'neqsel',
+  oprjoin => 'neqjoinsel' },
+{ oid => '9865', descr => 'less than',
+  oprname => '<', oprleft => 'utf16',
+  oprright => 'text', oprresult => 'bool', oprcom => '>(text,utf16)',
+  oprnegate => '>=(utf16,text)', oprcode => 'utf16textlt', oprrest => 'scalarltsel',
+  oprjoin => 'scalarltjoinsel' },
+{ oid => '9866', descr => 'less than or equal',
+  oprname => '<=', oprleft => 'utf16',
+  oprright => 'text', oprresult => 'bool', oprcom => '>=(text,utf16)',
+  oprnegate => '>(utf16,text)', oprcode => 'utf16textle', oprrest => 'scalarlesel',
+  oprjoin => 'scalarlejoinsel' },
+{ oid => '9867', descr => 'greater than',
+  oprname => '>', oprleft => 'utf16',
+  oprright => 'text', oprresult => 'bool', oprcom => '<(text,utf16)',
+  oprnegate => '<=(utf16,text)', oprcode => 'utf16textgt', oprrest => 'scalargtsel',
+  oprjoin => 'scalargtjoinsel' },
+{ oid => '9868', descr => 'greater than or equal',
+  oprname => '>=', oprleft => 'utf16',
+  oprright => 'text', oprresult => 'bool', oprcom => '<=(text,utf16)',
+  oprnegate => '<(utf16,text)', oprcode => 'utf16textge', oprrest => 'scalargesel',
+  oprjoin => 'scalargejoinsel' },
+
+# text, utf16
+{ oid => '9870', descr => 'equal',
+  oprname => '=', oprcanmerge => 't', oprleft => 'text',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '=(utf16,text)',
+  oprnegate => '<>(text,utf16)', oprcode => 'textutf16eq', oprrest => 'eqsel',
+  oprjoin => 'eqjoinsel' },
+{ oid => '9871', descr => 'not equal',
+  oprname => '<>', oprleft => 'text',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<>(utf16,text)',
+  oprnegate => '=(text,utf16)', oprcode => 'textutf16ne', oprrest => 'neqsel',
+  oprjoin => 'neqjoinsel' },
+{ oid => '9875', descr => 'less than',
+  oprname => '<', oprleft => 'text',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '>(utf16,text)',
+  oprnegate => '>=(text,utf16)', oprcode => 'textutf16lt', oprrest => 'scalarltsel',
+  oprjoin => 'scalarltjoinsel' },
+{ oid => '9876', descr => 'less than or equal',
+  oprname => '<=', oprleft => 'text',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '>=(utf16,text)',
+  oprnegate => '>(text,utf16)', oprcode => 'textutf16le', oprrest => 'scalarlesel',
+  oprjoin => 'scalarlejoinsel' },
+{ oid => '9877', descr => 'greater than',
+  oprname => '>', oprleft => 'text',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<(utf16,text)',
+  oprnegate => '<=(text,utf16)', oprcode => 'textutf16gt', oprrest => 'scalargtsel',
+  oprjoin => 'scalargtjoinsel' },
+{ oid => '9878', descr => 'greater than or equal',
+  oprname => '>=', oprleft => 'text',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<=(utf16,text)',
+  oprnegate => '<(text,utf16)', oprcode => 'textutf16ge', oprrest => 'scalargesel',
+  oprjoin => 'scalargejoinsel' },
+
+# utf16, name
+{ oid => '9460', descr => 'equal',
+  oprname => '=', oprcanmerge => 't', oprleft => 'utf16',
+  oprright => 'name', oprresult => 'bool', oprcom => '=(name,utf16)',
+  oprnegate => '<>(utf16,name)', oprcode => 'utf16nameeq', oprrest => 'eqsel',
+  oprjoin => 'eqjoinsel' },
+{ oid => '9461', descr => 'not equal',
+  oprname => '<>', oprleft => 'utf16',
+  oprright => 'name', oprresult => 'bool', oprcom => '<>(name,utf16)',
+  oprnegate => '=(utf16,name)', oprcode => 'utf16namene', oprrest => 'neqsel',
+  oprjoin => 'neqjoinsel' },
+{ oid => '9465', descr => 'less than',
+  oprname => '<', oprleft => 'utf16',
+  oprright => 'name', oprresult => 'bool', oprcom => '>(name,utf16)',
+  oprnegate => '>=(utf16,name)', oprcode => 'utf16namelt', oprrest => 'scalarltsel',
+  oprjoin => 'scalarltjoinsel' },
+{ oid => '9466', descr => 'less than or equal',
+  oprname => '<=', oprleft => 'utf16',
+  oprright => 'name', oprresult => 'bool', oprcom => '>=(name,utf16)',
+  oprnegate => '>(utf16,name)', oprcode => 'utf16namele', oprrest => 'scalarlesel',
+  oprjoin => 'scalarlejoinsel' },
+{ oid => '9467', descr => 'greater than',
+  oprname => '>', oprleft => 'utf16',
+  oprright => 'name', oprresult => 'bool', oprcom => '<(name,utf16)',
+  oprnegate => '<=(utf16,name)', oprcode => 'utf16namegt', oprrest => 'scalargtsel',
+  oprjoin => 'scalargtjoinsel' },
+{ oid => '9468', descr => 'greater than or equal',
+  oprname => '>=', oprleft => 'utf16',
+  oprright => 'name', oprresult => 'bool', oprcom => '<=(name,utf16)',
+  oprnegate => '<(utf16,name)', oprcode => 'utf16namege', oprrest => 'scalargesel',
+  oprjoin => 'scalargejoinsel' },
+
+# name, utf16
+{ oid => '9470', descr => 'equal',
+  oprname => '=', oprcanmerge => 't', oprleft => 'name',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '=(utf16,name)',
+  oprnegate => '<>(name,utf16)', oprcode => 'nameutf16eq', oprrest => 'eqsel',
+  oprjoin => 'eqjoinsel' },
+{ oid => '9471', descr => 'not equal',
+  oprname => '<>', oprleft => 'name',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<>(utf16,name)',
+  oprnegate => '=(name,utf16)', oprcode => 'nameutf16ne', oprrest => 'neqsel',
+  oprjoin => 'neqjoinsel' },
+{ oid => '9475', descr => 'less than',
+  oprname => '<', oprleft => 'name',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '>(utf16,name)',
+  oprnegate => '>=(name,utf16)', oprcode => 'nameutf16lt', oprrest => 'scalarltsel',
+  oprjoin => 'scalarltjoinsel' },
+{ oid => '9476', descr => 'less than or equal',
+  oprname => '<=', oprleft => 'name',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '>=(utf16,name)',
+  oprnegate => '>(name,utf16)', oprcode => 'nameutf16le', oprrest => 'scalarlesel',
+  oprjoin => 'scalarlejoinsel' },
+{ oid => '9477', descr => 'greater than',
+  oprname => '>', oprleft => 'name',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<(utf16,name)',
+  oprnegate => '<=(name,utf16)', oprcode => 'nameutf16gt', oprrest => 'scalargtsel',
+  oprjoin => 'scalargtjoinsel' },
+{ oid => '9478', descr => 'greater than or equal',
+  oprname => '>=', oprleft => 'name',
+  oprright => 'utf16', oprresult => 'bool', oprcom => '<=(utf16,name)',
+  oprnegate => '<(name,utf16)', oprcode => 'nameutf16ge', oprrest => 'scalargesel',
+  oprjoin => 'scalargejoinsel' },
+
 { oid => '254', oid_symbol => 'NameEqualTextOperator', descr => 'equal',
   oprname => '=', oprcanmerge => 't', oprcanhash => 't', oprleft => 'name',
   oprright => 'text', oprresult => 'bool', oprcom => '=(text,name)',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fa9ae79082b..97019deca7e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -97,9 +97,17 @@
 { oid => '46', descr => 'I/O',
   proname => 'textin', prorettype => 'text', proargtypes => 'cstring',
   prosrc => 'textin' },
+{ oid => '8890', descr => 'I/O',
+  proname => 'utf16in', prorettype => 'utf16',
+  proargtypes => 'cstring',
+  prosrc => 'utf16in' },
 { oid => '47', descr => 'I/O',
   proname => 'textout', prorettype => 'cstring', proargtypes => 'text',
   prosrc => 'textout' },
+{ oid => '8891', descr => 'I/O',
+  proname => 'utf16out', prorettype => 'cstring',
+  proargtypes => 'utf16',
+  prosrc => 'utf16out' },
 { oid => '48', descr => 'I/O',
   proname => 'tidin', prorettype => 'tid', proargtypes => 'cstring',
   prosrc => 'tidin' },
@@ -264,6 +272,116 @@
   proname => 'version', provolatile => 's', prorettype => 'text',
   proargtypes => '', prosrc => 'pgsql_version' },
 
+{ oid => '8935', descr => 'less-equal-greater',
+  proname => 'utf16cmp', prorettype => 'int4',
+  proargtypes => 'utf16 utf16', prosrc => 'utf16cmp' },
+{ oid => '8901',
+  proname => 'utf16le', prorettype => 'bool',
+  proargtypes => 'utf16 utf16', prosrc => 'utf16le' },
+{ oid => '8902',
+  proname => 'utf16lt', prorettype => 'bool',
+  proargtypes => 'utf16 utf16', prosrc => 'utf16lt' },
+{ oid => '8907',
+  proname => 'utf16gt', prorettype => 'bool',
+  proargtypes => 'utf16 utf16', prosrc => 'utf16gt' },
+{ oid => '8904',
+  proname => 'utf16ge', prorettype => 'bool',
+  proargtypes => 'utf16 utf16', prosrc => 'utf16ge' },
+{ oid => '8900',
+  proname => 'utf16eq', prorettype => 'bool',
+  proargtypes => 'utf16 utf16', prosrc => 'utf16eq' },
+{ oid => '8905',
+  proname => 'utf16ne', prorettype => 'bool',
+  proargtypes => 'utf16 utf16', prosrc => 'utf16ne' },
+
+{ oid => '8972', descr => 'less-equal-greater',
+  proname => 'textutf16cmp', prorettype => 'int4',
+  proargtypes => 'text utf16', prosrc => 'textutf16cmp' },
+{ oid => '8941',
+  proname => 'textutf16le', prorettype => 'bool',
+  proargtypes => 'text utf16', prosrc => 'textutf16le' },
+{ oid => '8942',
+  proname => 'textutf16lt', prorettype => 'bool',
+  proargtypes => 'text utf16', prosrc => 'textutf16lt' },
+{ oid => '8947',
+  proname => 'textutf16gt', prorettype => 'bool',
+  proargtypes => 'text utf16', prosrc => 'textutf16gt' },
+{ oid => '8944',
+  proname => 'textutf16ge', prorettype => 'bool',
+  proargtypes => 'text utf16', prosrc => 'textutf16ge' },
+{ oid => '8970',
+  proname => 'textutf16eq', prorettype => 'bool',
+  proargtypes => 'text utf16', prosrc => 'textutf16eq' },
+{ oid => '8971',
+  proname => 'textutf16ne', prorettype => 'bool',
+  proargtypes => 'text utf16', prosrc => 'textutf16ne' },
+
+{ oid => '8932', descr => 'less-equal-greater',
+  proname => 'utf16textcmp', prorettype => 'int4',
+  proargtypes => 'utf16 text', prosrc => 'utf16textcmp' },
+{ oid => '8951',
+  proname => 'utf16textle', prorettype => 'bool',
+  proargtypes => 'utf16 text', prosrc => 'utf16textle' },
+{ oid => '8952',
+  proname => 'utf16textlt', prorettype => 'bool',
+  proargtypes => 'utf16 text', prosrc => 'utf16textlt' },
+{ oid => '8957',
+  proname => 'utf16textgt', prorettype => 'bool',
+  proargtypes => 'utf16 text', prosrc => 'utf16textgt' },
+{ oid => '8954',
+  proname => 'utf16textge', prorettype => 'bool',
+  proargtypes => 'utf16 text', prosrc => 'utf16textge' },
+{ oid => '8980',
+  proname => 'utf16texteq', prorettype => 'bool',
+  proargtypes => 'utf16 text', prosrc => 'utf16texteq' },
+{ oid => '8981',
+  proname => 'utf16textne', prorettype => 'bool',
+  proargtypes => 'utf16 text', prosrc => 'utf16textne' },
+
+{ oid => '8672', descr => 'less-equal-greater',
+  proname => 'nameutf16cmp', prorettype => 'int4',
+  proargtypes => 'name utf16', prosrc => 'nameutf16cmp' },
+{ oid => '8641',
+  proname => 'nameutf16le', prorettype => 'bool',
+  proargtypes => 'name utf16', prosrc => 'nameutf16le' },
+{ oid => '8642',
+  proname => 'nameutf16lt', prorettype => 'bool',
+  proargtypes => 'name utf16', prosrc => 'nameutf16lt' },
+{ oid => '8647',
+  proname => 'nameutf16gt', prorettype => 'bool',
+  proargtypes => 'name utf16', prosrc => 'nameutf16gt' },
+{ oid => '8644',
+  proname => 'nameutf16ge', prorettype => 'bool',
+  proargtypes => 'name utf16', prosrc => 'nameutf16ge' },
+{ oid => '8670',
+  proname => 'nameutf16eq', prorettype => 'bool',
+  proargtypes => 'name utf16', prosrc => 'nameutf16eq' },
+{ oid => '8671',
+  proname => 'nameutf16ne', prorettype => 'bool',
+  proargtypes => 'name utf16', prosrc => 'nameutf16ne' },
+
+{ oid => '8472', descr => 'less-equal-greater',
+  proname => 'utf16namecmp', prorettype => 'int4',
+  proargtypes => 'utf16 name', prosrc => 'utf16namecmp' },
+{ oid => '8451',
+  proname => 'utf16namele', prorettype => 'bool',
+  proargtypes => 'utf16 name', prosrc => 'utf16namele' },
+{ oid => '8452',
+  proname => 'utf16namelt', prorettype => 'bool',
+  proargtypes => 'utf16 name', prosrc => 'utf16namelt' },
+{ oid => '8457',
+  proname => 'utf16namegt', prorettype => 'bool',
+  proargtypes => 'utf16 name', prosrc => 'utf16namegt' },
+{ oid => '8454',
+  proname => 'utf16namege', prorettype => 'bool',
+  proargtypes => 'utf16 name', prosrc => 'utf16namege' },
+{ oid => '8470',
+  proname => 'utf16nameeq', prorettype => 'bool',
+  proargtypes => 'utf16 name', prosrc => 'utf16nameeq' },
+{ oid => '8471',
+  proname => 'utf16namene', prorettype => 'bool',
+  proargtypes => 'utf16 name', prosrc => 'utf16namene' },
+
 { oid => '86', descr => 'I/O',
   proname => 'pg_ddl_command_in', prorettype => 'pg_ddl_command',
   proargtypes => 'cstring', prosrc => 'pg_ddl_command_in' },
@@ -2875,6 +2993,9 @@
 { oid => '1317', descr => 'length',
   proname => 'length', prorettype => 'int4', proargtypes => 'text',
   prosrc => 'textlen' },
+{ oid => '8861', descr => 'length',
+  proname => 'length', prorettype => 'int4', proargtypes => 'utf16',
+  prosrc => 'utf16_length' },
 { oid => '1318', descr => 'character length',
   proname => 'length', prorettype => 'int4', proargtypes => 'bpchar',
   prosrc => 'bpcharlen' },
@@ -2992,6 +3113,9 @@
 { oid => '1375', descr => 'octet length',
   proname => 'octet_length', prorettype => 'int4', proargtypes => 'bpchar',
   prosrc => 'bpcharoctetlen' },
+{ oid => '8860', descr => 'octet length',
+  proname => 'octet_length', prorettype => 'int4', proargtypes => 'utf16',
+  prosrc => 'utf16_octet_length' },
 
 { oid => '1377', descr => 'larger of two',
   proname => 'time_larger', prorettype => 'time', proargtypes => 'time time',
@@ -8303,9 +8427,15 @@
 { oid => '2414', descr => 'I/O',
   proname => 'textrecv', provolatile => 's', prorettype => 'text',
   proargtypes => 'internal', prosrc => 'textrecv' },
+{ oid => '8892', descr => 'I/O',
+  proname => 'utf16recv', provolatile => 's', prorettype => 'utf16',
+  proargtypes => 'internal', prosrc => 'utf16recv' },
 { oid => '2415', descr => 'I/O',
   proname => 'textsend', provolatile => 's', prorettype => 'bytea',
   proargtypes => 'text', prosrc => 'textsend' },
+{ oid => '8893', descr => 'I/O',
+  proname => 'utf16send', provolatile => 's', prorettype => 'bytea',
+  proargtypes => 'utf16', prosrc => 'utf16send' },
 { oid => '2416', descr => 'I/O',
   proname => 'unknownrecv', prorettype => 'unknown', proargtypes => 'internal',
   prosrc => 'unknownrecv' },
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index a1a753d1797..130dce0adf6 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -84,6 +84,14 @@
   typispreferred => 't', typinput => 'textin', typoutput => 'textout',
   typreceive => 'textrecv', typsend => 'textsend', typalign => 'i',
   typstorage => 'x', typcollation => 'default' },
+{ oid => '8888', array_type_oid => '8889',
+  descr => 'variable-length UTF-16 string',
+  typname => 'utf16', typlen => '-1', typbyval => 'f', typcategory => 'S',
+  typispreferred => 'f',
+  typinput => 'utf16in', typoutput => 'utf16out',
+  typreceive => 'utf16recv', typsend => 'utf16send',
+  typalign => 'i',
+  typstorage => 'x', typcollation => 'default' },
 { oid => '26', array_type_oid => '1028',
   descr => 'object identifier(oid), maximum 4 billion',
   typname => 'oid', typlen => '4', typbyval => 't', typcategory => 'N',
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index 10d02bdb79f..75a86af4baa 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -291,6 +291,7 @@ extern varlena *pg_detoast_datum_packed(varlena *datum);
 /* DatumGetFoo macros for varlena types will typically look like this: */
 #define DatumGetByteaPP(X)			((bytea *) PG_DETOAST_DATUM_PACKED(X))
 #define DatumGetTextPP(X)			((text *) PG_DETOAST_DATUM_PACKED(X))
+#define DatumGetUtf16PP(X)			((utf16 *) PG_DETOAST_DATUM_PACKED(X))
 #define DatumGetBpCharPP(X)			((BpChar *) PG_DETOAST_DATUM_PACKED(X))
 #define DatumGetVarCharPP(X)		((VarChar *) PG_DETOAST_DATUM_PACKED(X))
 #define DatumGetHeapTupleHeader(X)	((HeapTupleHeader) PG_DETOAST_DATUM(X))
@@ -308,6 +309,7 @@ extern varlena *pg_detoast_datum_packed(varlena *datum);
 /* GETARG macros for varlena types will typically look like this: */
 #define PG_GETARG_BYTEA_PP(n)		DatumGetByteaPP(PG_GETARG_DATUM(n))
 #define PG_GETARG_TEXT_PP(n)		DatumGetTextPP(PG_GETARG_DATUM(n))
+#define PG_GETARG_UTF16_PP(n)		DatumGetUtf16PP(PG_GETARG_DATUM(n))
 #define PG_GETARG_BPCHAR_PP(n)		DatumGetBpCharPP(PG_GETARG_DATUM(n))
 #define PG_GETARG_VARCHAR_PP(n)		DatumGetVarCharPP(PG_GETARG_DATUM(n))
 #define PG_GETARG_HEAPTUPLEHEADER(n)	DatumGetHeapTupleHeader(PG_GETARG_DATUM(n))
@@ -372,6 +374,7 @@ extern varlena *pg_detoast_datum_packed(varlena *datum);
 /* RETURN macros for other pass-by-ref types will typically look like this: */
 #define PG_RETURN_BYTEA_P(x)   PG_RETURN_POINTER(x)
 #define PG_RETURN_TEXT_P(x)    PG_RETURN_POINTER(x)
+#define PG_RETURN_UTF16_P(x)   PG_RETURN_POINTER(x)
 #define PG_RETURN_BPCHAR_P(x)  PG_RETURN_POINTER(x)
 #define PG_RETURN_VARCHAR_P(x) PG_RETURN_POINTER(x)
 #define PG_RETURN_HEAPTUPLEHEADER(x)  return HeapTupleHeaderGetDatum(x)
diff --git a/src/include/mb/pg_wchar.h b/src/include/mb/pg_wchar.h
index deee2a832c3..f7e8179ad45 100644
--- a/src/include/mb/pg_wchar.h
+++ b/src/include/mb/pg_wchar.h
@@ -367,6 +367,7 @@ typedef uint32 (*utf_local_conversion_func) (uint32 code);
 
 /*
  * Some handy functions for Unicode-specific tests.
+ * XXX Move these to unicode.h?
  */
 static inline bool
 is_valid_unicode_codepoint(char32_t c)
@@ -392,6 +393,21 @@ surrogate_pair_to_codepoint(char16_t first, char16_t second)
 	return ((first & 0x3FF) << 10) + 0x10000 + (second & 0x3FF);
 }
 
+static inline bool
+codepoint_has_surrogate_pair(char32_t c)
+{
+	return c >= 0x10000;
+}
+
+static inline void
+codepoint_to_surrogate_pair(char16_t *first, char16_t *second, char32_t c)
+{
+	Assert(codepoint_has_surrogate_pair(c));
+	c -= 0x10000;
+	*first = (c >> 10) + 0xD800;
+	*second = (c & 0x3FF) + 0xDC00;
+}
+
 /*
  * Convert a UTF-8 character to a Unicode code point.
  * This is a one-character version of pg_utf2wchar_with_len.
@@ -469,6 +485,24 @@ unicode_utf8len(char32_t c)
 		return 4;
 }
 
+/*
+ * Number of bytes in a UTF8 sequence, based on the first byte.
+ */
+static inline int
+utf8_len_from_lead_byte(unsigned char c)
+{
+	if ((c & 0x80) == 0)
+		return 1;
+	else if ((c & 0xe0) == 0xc0)
+		return 2;
+	else if ((c & 0xf0) == 0xe0)
+		return 3;
+	else if ((c & 0xf8) == 0xf0)
+		return 4;
+	else
+		return 1;
+}
+
 /*
  * The functions in this list are exported by libpq, and we need to be sure
  * that we know which calls are satisfied by libpq and which are satisfied
diff --git a/src/include/mb/string_iterator.h b/src/include/mb/string_iterator.h
new file mode 100644
index 00000000000..1a1bbce879f
--- /dev/null
+++ b/src/include/mb/string_iterator.h
@@ -0,0 +1,500 @@
+/*-------------------------------------------------------------------------
+ *
+ * string_iterator.h
+ *	  Tools for iterating over text strings as char, char16_t or char32_t.
+ *
+ * Support for UTF-16 and UTF-32 is degraded if the database encoding is not
+ * UTF8: only the 8-bit subset (LATIN1) or 7-bit subset of Unicode that can be
+ * cast directly is supported, and out-of-range codepoints raise errors.
+ *
+ * XXX Data provision via callbacks could be investigated as a way to support
+ * incremental or deferred detoasting with centralized infrastructure, to
+ * avoid the need to open-code detoasting optimizations at every site.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/mb/string_iterator.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef STRING_ITERATOR_H
+#define STRING_ITERATOR_H
+
+#include "mb/pg_wchar.h"
+#include "mb/unicode_types.h"
+
+/* A device for iterating over multibyte strings. */
+typedef struct mb_iterator
+{
+	const char *p;
+	const char *end;
+	int			encoding;
+	char16_t	surrogate;
+} mb_iterator;
+
+/* An iterator for iterating over UTF-16 strings. */
+typedef struct char16_iterator
+{
+	const storage_char16_t *p;
+	const storage_char16_t *end;
+} char16_iterator;
+
+/* Static initializers for the above. */
+#define MB_ITERATOR_INIT(data, size, encoding) \
+	{(data), (data) + (size), (encoding)}
+#define MB_ITERATOR_INIT_LOCAL(data, size) \
+	{(data), (data) + (size), GetDatabaseEncoding()}
+#define CHAR16_ITERATOR_INIT(data, size) {(data), (data) + (size)}
+
+/* Function pointer types useful for creating specializations. */
+typedef char32_t (*mb_iterator_next_char32_t_fn) (mb_iterator *);
+typedef size_t (*mb_iterator_store_char16_fn) (mb_iterator *, storage_char16_t *);
+typedef char32_t (*char16_iterator_next_char32_t_fn) (char16_iterator *);
+typedef size_t (*char16_iterator_store_mb_fn) (char16_iterator *, char *, int encoding);
+
+static inline void
+mb_iterator_begin(mb_iterator *iterator, const char *data, size_t size)
+{
+	iterator->p = data;
+	iterator->end = data + size;
+	iterator->surrogate = 0;
+}
+
+static inline bool
+mb_iterator_has_more(mb_iterator *iterator)
+{
+	return iterator->p < iterator->end || iterator->surrogate;
+}
+
+static inline void
+mb_iterator_check_ascii_range(mb_iterator *iterator,
+							  const char *target_encoding,
+							  unsigned char c)
+{
+	if (unlikely(c > 0x7f))
+		elog(ERROR,
+			 "no conversion from \"%s\" to \"%s\" is available for the sequence beginning 0x%02x",
+			 pg_encoding_to_char(iterator->encoding),
+			 target_encoding,
+			 c);
+}
+
+/* Store a character at dst and return byte count. */
+static inline size_t
+mb_iterator_store(mb_iterator *iterator, char *dst)
+{
+	size_t		size;
+
+	Assert(mb_iterator_has_more(iterator));
+	size = pg_mblen_range(iterator->p, iterator->end);
+	memcpy(dst, iterator->p, size);
+	iterator->p += size;
+	return size;
+}
+
+static inline size_t
+mb_iterator_store__sb(mb_iterator *iterator, char *dst)
+{
+	Assert(pg_encoding_max_length(iterator->encoding) == 1);
+	Assert(mb_iterator_has_more(iterator));
+	*dst = *iterator->p++;
+	return 1;
+}
+
+/* Store a character in memory before dst and return byte count. */
+static inline size_t
+mb_iterator_store_before(mb_iterator *iterator, char *dst)
+{
+	size_t		size;
+
+	Assert(mb_iterator_has_more(iterator));
+	size = pg_mblen_range(iterator->p, iterator->end);
+	dst -= size;
+	memcpy(dst, iterator->p, size);
+	iterator->p += size;
+	return size;
+}
+
+static inline size_t
+mb_iterator_store_before__sb(mb_iterator *iterator, char *dst)
+{
+	Assert(pg_encoding_max_length(iterator->encoding) == 1);
+	Assert(mb_iterator_has_more(iterator));
+	dst--;
+	*dst = *iterator->p++;
+	return 1;
+}
+
+static inline char32_t
+mb_iterator_next_char32_t__ascii(mb_iterator *iterator)
+{
+	unsigned char c;
+
+	Assert(mb_iterator_has_more(iterator));
+
+	/* ASCII can be cast directly to char32_t, after 7-bit range check. */
+	c = *iterator->p++;
+	mb_iterator_check_ascii_range(iterator, "UTF-32", c);
+	return c;
+}
+
+static inline char32_t
+mb_iterator_next_char32_t__latin1(mb_iterator *iterator)
+{
+	Assert(iterator->encoding == PG_LATIN1);
+	Assert(mb_iterator_has_more(iterator));
+
+	/* LATIN1 (unsigned) can be cast directly to char32_t. */
+	return (unsigned char) *iterator->p++;
+}
+
+static inline char32_t
+mb_iterator_next_char32_t__utf8(mb_iterator *iterator)
+{
+	const char *p = iterator->p;
+	size_t		size;
+
+	Assert(iterator->encoding == PG_UTF8);
+	Assert(mb_iterator_has_more(iterator));
+
+	size = utf8_len_from_lead_byte(*p);
+	if (p + size > iterator->end)
+		report_invalid_encoding(iterator->encoding, p, iterator->end - p);
+	iterator->p += size;
+
+	return utf8_to_unicode((unsigned char *) p);
+}
+
+static inline char32_t
+mb_iterator_next_char32_t(mb_iterator *iterator)
+{
+	switch (iterator->encoding)
+	{
+		case PG_UTF8:
+			return mb_iterator_next_char32_t__utf8(iterator);
+		case PG_LATIN1:
+			return mb_iterator_next_char32_t__latin1(iterator);
+		default:
+			return mb_iterator_next_char32_t__ascii(iterator);
+	}
+}
+
+static inline char16_t
+mb_iterator_next_char16_t__ascii(mb_iterator *iterator)
+{
+	unsigned char c;
+
+	Assert(mb_iterator_has_more(iterator));
+
+	/* ASCII can be cast directly to char16_t after 7-bit range check. */
+	c = *iterator->p++;
+	mb_iterator_check_ascii_range(iterator, "UTF-16", c);
+	return *iterator->p++;
+}
+
+static inline char16_t
+mb_iterator_next_char16_t__latin1(mb_iterator *iterator)
+{
+	unsigned char c;
+
+	Assert(mb_iterator_has_more(iterator));
+	c = *iterator->p++;
+
+	/* LATIN1 (unsigned) can be cast directly to char16_t. */
+	return c;
+}
+
+static inline char16_t
+mb_iterator_next_char16_t__utf8(mb_iterator *iterator)
+{
+	char32_t	codepoint;
+
+	Assert(iterator->encoding == PG_UTF8);
+	Assert(mb_iterator_has_more(iterator));
+
+	if (unlikely(iterator->surrogate))
+	{
+		char16_t	result = iterator->surrogate;
+
+		iterator->surrogate = 0;
+		return result;
+	}
+
+	codepoint = mb_iterator_next_char32_t__utf8(iterator);
+	if (unlikely(codepoint_has_surrogate_pair(codepoint)))
+	{
+		char16_t	result;
+
+		codepoint_to_surrogate_pair(&result, &iterator->surrogate, codepoint);
+		return result;
+	}
+
+	return codepoint;
+}
+
+static inline char16_t
+mb_iterator_next_char16_t(mb_iterator *iterator)
+{
+	switch (iterator->encoding)
+	{
+		case PG_UTF8:
+			return mb_iterator_next_char16_t__utf8(iterator);
+		case PG_LATIN1:
+			return mb_iterator_next_char16_t__latin1(iterator);
+		default:
+			return mb_iterator_next_char16_t__ascii(iterator);
+	}
+}
+
+static inline size_t
+mb_iterator_store_char16__ascii(mb_iterator *iterator, storage_char16_t *dst)
+{
+	unsigned char c;
+
+	Assert(mb_iterator_has_more(iterator));
+
+	/* ASCII can be cast directly to char16_t after 7-bit range check. */
+	c = *iterator->p++;
+	mb_iterator_check_ascii_range(iterator, "UTF-16", c);
+	char16_store(dst, c);
+	return 1;
+}
+
+static inline size_t
+mb_iterator_store_char16__latin1(mb_iterator *iterator, storage_char16_t *dst)
+{
+	unsigned char c;
+
+	Assert(iterator->encoding == PG_LATIN1);
+	Assert(mb_iterator_has_more(iterator));
+
+	/* LATIN1 (unsigned) can be cast directly to char16_t. */
+	c = *iterator->p++;
+	char16_store(dst, c);
+	return 1;
+}
+
+static inline size_t
+mb_iterator_store_char16__utf8(mb_iterator *iterator, storage_char16_t *dst)
+{
+	char32_t	codepoint;
+
+	Assert(iterator->encoding == PG_UTF8);
+	Assert(mb_iterator_has_more(iterator));
+
+	codepoint = mb_iterator_next_char32_t__utf8(iterator);
+	if (unlikely(codepoint_has_surrogate_pair(codepoint)))
+	{
+		char16_t	codepoint1;
+		char16_t	codepoint2;
+
+		codepoint_to_surrogate_pair(&codepoint1, &codepoint2, codepoint);
+		char16_store(&dst[0], codepoint1);
+		char16_store(&dst[1], codepoint2);
+		return 2;
+	}
+
+	char16_store(dst, codepoint);
+	return 1;
+}
+
+static inline char16_t
+mb_iterator_store_char16(mb_iterator *iterator, storage_char16_t *dst)
+{
+	switch (iterator->encoding)
+	{
+		case PG_UTF8:
+			return mb_iterator_store_char16__utf8(iterator, dst);
+		case PG_LATIN1:
+			return mb_iterator_store_char16__latin1(iterator, dst);
+		default:
+			return mb_iterator_store_char16__ascii(iterator, dst);
+	}
+}
+
+static inline void
+char16_iterator_begin(char16_iterator *iterator,
+					  const storage_char16_t *data,
+					  size_t size)
+{
+	iterator->p = data;
+	iterator->end = data + size;
+}
+
+static inline bool
+char16_iterator_has_more(char16_iterator *iterator)
+{
+	return iterator->p < iterator->end;
+}
+
+static inline void
+char16_iterator_report_short_pair(char16_t codepoint1)
+{
+	elog(ERROR, "invalid UTF-16 sequence 0x%04x", codepoint1);
+}
+
+static inline void
+char16_iterator_report_bad_pair(char16_t codepoint1, char16_t codepoint2)
+{
+	elog(ERROR, "invalid UTF-16 sequence 0x%04x 0x%04x",
+		 codepoint1, codepoint2);
+}
+
+static inline char32_t
+char16_iterator_next_char32_t(char16_iterator *iterator)
+{
+	char32_t	codepoint;
+
+	Assert(char16_iterator_has_more(iterator));
+	codepoint = char16_load(iterator->p++);
+
+	if (unlikely(is_utf16_surrogate_first(codepoint)))
+	{
+		if (!char16_iterator_has_more(iterator))
+			char16_iterator_report_short_pair(codepoint);
+		codepoint = surrogate_pair_to_codepoint(codepoint,
+												char16_load(iterator->p++));
+	}
+
+	return codepoint;
+}
+
+static inline char16_t
+char16_iterator_next_char16_t(char16_iterator *iterator)
+{
+	Assert(char16_iterator_has_more(iterator));
+	return char16_load(iterator->p++);
+}
+
+static pg_attribute_always_inline size_t
+char16_iterator_store_mb__sb(char16_iterator *iterator, char *dst,
+							 unsigned char max_char, int encoding)
+{
+	char16_t	codepoint;
+
+	Assert(char16_iterator_has_more(iterator));
+
+	codepoint = char16_load(iterator->p++);
+	if (unlikely(codepoint > max_char))
+		elog(ERROR,
+			 "no conversion from \"UTF-16\" to \"%s\" is available for the codepoint %04x",
+			 pg_encoding_to_char(encoding),
+			 codepoint);
+	*dst = codepoint;
+
+	return 1;
+}
+
+static inline size_t
+char16_iterator_store_mb__ascii(char16_iterator *iterator, char *dst,
+								int encoding)
+{
+	/* Unicode can be cast to ASCII after 7-bit range check. */
+	return char16_iterator_store_mb__sb(iterator, dst, 0x7f, encoding);
+}
+
+static inline size_t
+char16_iterator_store_mb__latin1(char16_iterator *iterator, char *dst,
+								 int encoding)
+{
+	/* Unicode can be cast to LATIN1 after 8-bit range check. */
+	Assert(encoding == PG_LATIN1);
+	return char16_iterator_store_mb__sb(iterator, dst, 0xff, encoding);
+}
+
+static inline size_t
+char16_iterator_store_mb__utf8(char16_iterator *iterator, char *dst,
+							   int encoding)
+{
+	char32_t	codepoint;
+
+	Assert(encoding == PG_UTF8);
+	Assert(char16_iterator_has_more(iterator));
+
+	codepoint = char16_load(iterator->p++);
+
+	/* Start of a surrogate pair? */
+	if (unlikely(is_utf16_surrogate_first(codepoint)))
+	{
+		char16_t	codepoint2;
+
+		if (unlikely(!char16_iterator_has_more(iterator)))
+			char16_iterator_report_short_pair(codepoint);
+		codepoint2 = char16_load(iterator->p++);
+		if (unlikely(!is_utf16_surrogate_second(codepoint2)))
+			char16_iterator_report_bad_pair(codepoint, codepoint2);
+		codepoint = surrogate_pair_to_codepoint(codepoint, codepoint2);
+	}
+
+	unicode_to_utf8(codepoint, (unsigned char *) dst);
+	return unicode_utf8len(codepoint);
+}
+
+/*
+ * The destination must have space for MAX_MB_LEN_PER_UTF32_CODEPOINT bytes,
+ * because UTF-16 surrogate pairs are combined to UTF-32.
+ *
+ * Call one of the specializations directly to avoid dispatching overhead.
+ */
+static inline size_t
+char16_iterator_store_mb(char16_iterator *iterator, char *dst, int encoding)
+{
+	switch (encoding)
+	{
+		case PG_UTF8:
+			return char16_iterator_store_mb__utf8(iterator, dst, encoding);
+		case PG_LATIN1:
+			return char16_iterator_store_mb__latin1(iterator, dst, encoding);
+		default:
+			return char16_iterator_store_mb__ascii(iterator, dst, encoding);
+	}
+}
+
+/*
+ * char16_iterator_store_mb() for database encoding.
+ */
+static inline size_t
+char16_iterator_store_local(char16_iterator *iterator, char *dst)
+{
+	return char16_iterator_store_mb(iterator, dst, GetDatabaseEncoding());
+}
+
+/*
+ * Skip one UTF-32 codepoint.  The iterator must not be exhausted.
+ */
+static inline void
+char16_iterator_advance(char16_iterator *iterator)
+{
+	char16_t	codepoint;
+
+	Assert(char16_iterator_has_more(iterator));
+	codepoint = char16_load(iterator->p++);
+	if (unlikely(is_utf16_surrogate_first(codepoint)))
+	{
+		if (!char16_iterator_has_more(iterator))
+			char16_iterator_report_short_pair(codepoint);
+		iterator->p++;
+	}
+}
+
+/*
+ * Skip as many UTF-32 codepoints as possible, returning the number that were
+ * skipped before the string ended.
+ */
+static inline size_t
+char16_iterator_advance_n(char16_iterator *iterator, size_t n)
+{
+	size_t		distance = 0;
+
+	while (n > 0 && char16_iterator_has_more(iterator))
+	{
+		char16_iterator_advance(iterator);
+		distance++;
+		n--;
+	}
+
+	return distance;
+}
+
+#endif							/* STRING_ITERATOR_H */
diff --git a/src/include/mb/unicode_strings.h b/src/include/mb/unicode_strings.h
new file mode 100644
index 00000000000..84f6edbfa83
--- /dev/null
+++ b/src/include/mb/unicode_strings.h
@@ -0,0 +1,254 @@
+/*-------------------------------------------------------------------------
+ *
+ * unicode_strings.h
+ *	  Support functions for converting and comparing Unicode encodings.
+ *
+ * Limited support is available in all database encodings, but only the ASCII
+ * or LATIN1 range that maps directly to Unicode.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/mb/unicode_strings.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNICODE_STRINGS_H
+#define UNICODE_STRINGS_H
+
+#include "mb/string_iterator.h"
+#include "mb/unicode_types.h"
+
+/*
+ * Constants used for worst-case buffer management for conversions.  Single
+ * UTF-16 codepoints map to 1, 2 or 3-byte UTF-8 sequences (basic plane).
+ * UTF-16 surrogate pairs map to 4-byte UTF-8 sequences, but that works out to
+ * 2 bytes of UTF-8 per UTF-16 codepoint.
+ */
+#define MAX_CHAR16_CODEPOINTS_PER_UTF8_BYTE 1
+#define MAX_CHAR16_CODEPOINTS_PER_UTF8_CHAR 2
+#define MAX_UTF8_LEN_PER_CHAR16_CODEPOINT   3
+#define MAX_UTF8_LEN_PER_CHAR32_CODEPOINT   4
+
+/*
+ * Since we don't currently support transcodings other than ASCII and LATIN1
+ * (which are strict subsets of Unicode by definition), the corresponding
+ * values for other encodings are 1.  We still define and use these more
+ * general macro names, in anticipation of potential transcoding support.
+ */
+#define MAX_MB_LEN_PER_CHAR16_CODEPOINT   MAX_UTF8_LEN_PER_CHAR16_CODEPOINT
+#define MAX_MB_LEN_PER_CHAR32_CODEPOINT   MAX_UTF8_LEN_PER_CHAR32_CODEPOINT
+#define MAX_CHAR16_CODEPOINTS_PER_MB_BYTE MAX_CHAR16_CODEPOINTS_PER_UTF8_BYTE
+#define MAX_CHAR32_CODEPOINTS_PER_MB_BYTE MAX_CHAR32_CODEPOINTS_PER_UTF8_BYTE
+
+/*
+ * How many UTF-16 codepoints might the given database encoding string occupy?
+ * TODO: Also provide an _exact_size() function?
+ */
+static inline size_t
+mb_to_char16_max_size(size_t size)
+{
+	return size * MAX_CHAR16_CODEPOINTS_PER_MB_BYTE;
+}
+
+/*
+ * How many bytes of database encoding might the given UTF-16 string occupy?
+ * TODO: Also provide an _exact_size() function?
+ */
+static inline size_t
+char16_to_mb_max_size(size_t size)
+{
+	return size * MAX_MB_LEN_PER_CHAR16_CODEPOINT;
+}
+
+static pg_attribute_always_inline size_t
+mb_to_char16__template(storage_char16_t *dst,
+					   const char *src, size_t src_size, int src_encoding,
+					   mb_iterator_store_char16_fn store_char16)
+{
+	mb_iterator iter = MB_ITERATOR_INIT(src, src_size, src_encoding);
+	storage_char16_t *p = dst;
+
+	while (mb_iterator_has_more(&iter))
+		p += store_char16(&iter, p);
+
+	return p - dst;
+}
+
+#define GENERATE_MB_TO_CHAR16(encoding) \
+static inline size_t \
+mb_to_char16__##encoding(storage_char16_t *dst, \
+						const char *src, size_t src_size, int src_encoding) \
+{ \
+	return mb_to_char16__template(dst, src, src_size, src_encoding, \
+								 mb_iterator_store_char16__##encoding); \
+}
+GENERATE_MB_TO_CHAR16(utf8);
+GENERATE_MB_TO_CHAR16(latin1);
+GENERATE_MB_TO_CHAR16(ascii);
+
+static inline size_t
+mb_to_char16(storage_char16_t *dst,
+			 const char *src, size_t src_size, int src_encoding)
+{
+	switch (src_encoding)
+	{
+		case PG_UTF8:
+			return mb_to_char16__utf8(dst, src, src_size, src_encoding);
+		case PG_LATIN1:
+			return mb_to_char16__latin1(dst, src, src_size, src_encoding);
+		default:
+			return mb_to_char16__ascii(dst, src, src_size, src_encoding);
+	}
+}
+
+static inline size_t
+local_to_char16(storage_char16_t *dst, const char *src, size_t src_size)
+{
+	return mb_to_char16(dst, src, src_size, GetDatabaseEncoding());
+}
+
+static pg_attribute_always_inline size_t
+char16_to_mb__template(char *dst, int dst_encoding,
+					   const storage_char16_t *src, size_t src_size,
+					   char16_iterator_store_mb_fn store_mb)
+{
+	char16_iterator iter = CHAR16_ITERATOR_INIT(src, src_size);
+	char	   *p = dst;
+
+	while (char16_iterator_has_more(&iter))
+		p += store_mb(&iter, p, dst_encoding);
+
+	return p - dst;
+}
+
+/*
+ * Try to inline char16_iterator_store_mb__XXX specializations into
+ * char16_to_local__XXX specializations.
+ */
+#define GENERATE_CHAR16_TO_MB(encoding) \
+static inline size_t \
+char16_to_mb__##encoding(char *dst, int dst_encoding, \
+						const storage_char16_t *src, size_t src_size) \
+{ \
+	return char16_to_mb__template(dst, dst_encoding, src, src_size, \
+								 char16_iterator_store_mb__##encoding); \
+}
+GENERATE_CHAR16_TO_MB(ascii);
+GENERATE_CHAR16_TO_MB(latin1);
+GENERATE_CHAR16_TO_MB(utf8);
+
+static inline size_t
+char16_to_mb(char *dst, int dst_encoding,
+			 const storage_char16_t *src, size_t src_size)
+{
+	switch (dst_encoding)
+	{
+		case PG_UTF8:
+			return char16_to_mb__utf8(dst, dst_encoding, src, src_size);
+		case PG_LATIN1:
+			return char16_to_mb__latin1(dst, dst_encoding, src, src_size);
+		default:
+			return char16_to_mb__ascii(dst, dst_encoding, src, src_size);
+	}
+}
+
+static inline size_t
+char16_to_local(char *dst, const storage_char16_t *src, size_t src_size)
+{
+	return char16_to_mb(dst, GetDatabaseEncoding(), src, src_size);
+}
+
+static inline size_t
+char16_to_local_cstr(char *dst, const storage_char16_t *src, size_t src_size)
+{
+	size_t		size = char16_to_local(dst, src, src_size);
+
+	dst[size] = 0;
+	return size;
+}
+
+static pg_attribute_always_inline int
+char16_mb_cmp__template(const storage_char16_t *data1, size_t size1,
+						const char *data2, size_t size2, int encoding2,
+						mb_iterator_next_char32_t_fn next_char32_t)
+{
+	char16_iterator iter1 = CHAR16_ITERATOR_INIT(data1, size1);
+	mb_iterator iter2 = MB_ITERATOR_INIT(data2, size2, encoding2);
+
+	while (char16_iterator_has_more(&iter1) &&
+		   mb_iterator_has_more(&iter2))
+	{
+		char32_t	codepoint1 = char16_iterator_next_char32_t(&iter1);
+		char32_t	codepoint2 = next_char32_t(&iter2);
+
+		if (codepoint1 < codepoint2)
+			return -1;
+		else if (codepoint1 > codepoint2)
+			return 1;
+	}
+
+	if (mb_iterator_has_more(&iter2))
+		return -1;
+	else if (char16_iterator_has_more(&iter1))
+		return 1;
+
+	return 0;
+}
+
+/*
+ * Try to inline char16_iterator_store_mb__XXX specializations into
+ * char16_mb_cmp__XXX specializations.
+ */
+#define GENERATE_CHAR16_MB_CMP(encoding) \
+static inline size_t \
+char16_mb_cmp__##encoding(const storage_char16_t *data1, size_t size1, \
+						 const char *data2, size_t size2, int encoding2) \
+{ \
+	return char16_mb_cmp__template(data1, size1, data2, size2, encoding2, \
+								  mb_iterator_next_char32_t__##encoding); \
+}
+GENERATE_CHAR16_MB_CMP(ascii);
+GENERATE_CHAR16_MB_CMP(latin1);
+GENERATE_CHAR16_MB_CMP(utf8);
+
+static inline int
+char16_mb_cmp(const storage_char16_t *data1, size_t size1,
+			  const char *data2, size_t size2, int encoding2)
+{
+	switch (encoding2)
+	{
+		case PG_UTF8:
+			return char16_mb_cmp__utf8(data1, size1, data2, size2, encoding2);
+		case PG_LATIN1:
+			return char16_mb_cmp__latin1(data1, size1, data2, size2, encoding2);
+		default:
+			return char16_mb_cmp__ascii(data1, size1, data2, size2, encoding2);
+	}
+}
+
+static inline int
+mb_char16_cmp(const char *data1, size_t size1, int encoding1,
+			  const storage_char16_t *data2, size_t size2)
+{
+	int			result = char16_mb_cmp(data2, size2, data1, size1, encoding1);
+
+	INVERT_COMPARE_RESULT(result);
+	return result;
+}
+
+static inline int
+char16_local_cmp(const storage_char16_t *data1, size_t size1,
+				 const char *data2, size_t size2)
+{
+	return char16_mb_cmp(data1, size1, data2, size2, GetDatabaseEncoding());
+}
+
+static inline int
+local_char16_cmp(const char *data1, size_t size1,
+				 const storage_char16_t *data2, size_t size2)
+{
+	return mb_char16_cmp(data1, size1, GetDatabaseEncoding(), data2, size2);
+}
+
+#endif							/* UNICODE_STRINGS_H */
diff --git a/src/include/mb/unicode_types.h b/src/include/mb/unicode_types.h
new file mode 100644
index 00000000000..e6e85029266
--- /dev/null
+++ b/src/include/mb/unicode_types.h
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * unicode_types.h
+ *	  Types for representing Unicode.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/mb/unicode.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNICODE_TYPES_H
+#define UNICODE_TYPES_H
+
+/*
+ * The type used to represent UTF-16 codepoints in varlena objects.  All
+ * access to storage_char16_t and knowledge of its layout should be contained
+ * in this file.
+ */
+typedef struct storage_char16_t
+{
+	uint8_t		high;
+	uint8_t		low;
+} storage_char16_t;
+
+static_assert(alignof(storage_char16_t) == 1, "bad alignment for varlena");
+static_assert(sizeof(storage_char16_t) == sizeof(char16_t), "bad size");
+
+/* Read char16_t from storage_char16_t. */
+static inline char16_t
+char16_load(const storage_char16_t *s)
+{
+	return (s->high << 8) | s->low;
+}
+
+/* Write char16_t to storage_char16_t. */
+static inline void
+char16_store(storage_char16_t *s, char16_t c)
+{
+	s->high = c >> 8;
+	s->low = c;
+}
+
+/* Codepoint order of two storage_char16_t strings of equal size. */
+static inline int
+char16_cmp1(const storage_char16_t *s1,
+			const storage_char16_t *s2,
+			size_t size)
+{
+	/* Fast binary comparison, motivating big-endian representation. */
+	return memcmp(s1, s2, sizeof(storage_char16_t) * size);
+}
+
+/* Tell ICU's UCharIterator how to read from storage_char16_t format. */
+#define UITER_SET_STORAGE_CHAR_T(iterator, s, size) \
+	uiter_setUTF16BE((iterator), \
+					 (const char *) (s), \
+					 (size) * sizeof(storage_char16_t))
+
+#endif							/* UNICODE_TYPES_H */
diff --git a/src/include/utils/pg_locale.h b/src/include/utils/pg_locale.h
index 444350bb803..e468e57e358 100644
--- a/src/include/utils/pg_locale.h
+++ b/src/include/utils/pg_locale.h
@@ -13,6 +13,7 @@
 #define _PG_LOCALE_
 
 #include "mb/pg_wchar.h"
+#include "mb/unicode_types.h"
 
 /* use for libc locale names */
 #define LOCALE_NAME_BUFLEN 128
@@ -67,6 +68,16 @@ struct collate_methods
 							 const char *arg2, ssize_t len2,
 							 pg_locale_t locale);
 
+	/* optional */
+	int			(*strncoll_char16) (const storage_char16_t *arg1, size_t len1,
+									const storage_char16_t *arg2, size_t len2,
+									pg_locale_t locale);
+
+	/* optional */
+	int			(*strncoll_char16_local) (const storage_char16_t *arg1, size_t len1,
+										  const char *arg2, size_t len2,
+										  pg_locale_t locale);
+
 	/* required */
 	size_t		(*strnxfrm) (char *dest, size_t destsize,
 							 const char *src, ssize_t srclen,
@@ -188,6 +199,15 @@ extern size_t pg_downcase_ident(char *dst, size_t dstsize,
 extern int	pg_strcoll(const char *arg1, const char *arg2, pg_locale_t locale);
 extern int	pg_strncoll(const char *arg1, ssize_t len1,
 						const char *arg2, ssize_t len2, pg_locale_t locale);
+extern int	pg_strncoll_char16(const storage_char16_t *data1, size_t size1,
+							   const storage_char16_t *data2, size_t size2,
+							   pg_locale_t locale);
+extern int	pg_strncoll_char16_local(const storage_char16_t *data1, size_t size1,
+									 const char *data2, size_t size2,
+									 pg_locale_t locale);
+extern int	pg_strncoll_local_char16(const char *data1, size_t size1,
+									 const storage_char16_t *data2, size_t size2,
+									 pg_locale_t locale);
 extern bool pg_strxfrm_enabled(pg_locale_t locale);
 extern size_t pg_strxfrm(char *dest, const char *src, size_t destsize,
 						 pg_locale_t locale);
diff --git a/src/test/regress/expected/encoding.out b/src/test/regress/expected/encoding.out
index 2ecd255f182..4222f01c675 100644
--- a/src/test/regress/expected/encoding.out
+++ b/src/test/regress/expected/encoding.out
@@ -8,8 +8,12 @@ SELECT getdatabaseencoding() <> 'UTF8' AS skip_test \gset
 \set regresslib :libdir '/regress' :dlsuffix
 CREATE FUNCTION test_bytea_to_text(bytea) RETURNS text
     AS :'regresslib' LANGUAGE C STRICT;
+CREATE FUNCTION test_bytea_to_utf16(bytea) RETURNS utf16
+    AS :'regresslib' LANGUAGE C STRICT;
 CREATE FUNCTION test_text_to_bytea(text) RETURNS bytea
     AS :'regresslib' LANGUAGE C STRICT;
+CREATE FUNCTION test_utf16_to_bytea(text) RETURNS bytea
+    AS :'regresslib' LANGUAGE C STRICT;
 CREATE FUNCTION test_mblen_func(text, text, text, int) RETURNS int
     AS :'regresslib' LANGUAGE C STRICT;
 CREATE FUNCTION test_text_to_wchars(text, text) RETURNS int[]
@@ -401,6 +405,32 @@ SELECT SUBSTRING(c FROM 3000 FOR 1) FROM toast_4b_utf8;
  🚀
 (1 row)
 
+-- storage format of UTF-16 is big-endian even on little-endian system
+SELECT test_bytea_to_utf16('\x0041') = 'A';
+ ?column? 
+----------
+ t
+(1 row)
+
+-- odd number of bytes (corrupted storage), trailing byte ignored
+SELECT test_bytea_to_utf16('\x004100') = 'A';
+ ?column? 
+----------
+ f
+(1 row)
+
+SELECT test_bytea_to_utf16('\x004100');
+ test_bytea_to_utf16 
+---------------------
+ A
+(1 row)
+
+-- incomplete surrogate pair
+SELECT test_bytea_to_utf16('\xd83d');
+ERROR:  invalid UTF-16 sequence 0xd83d
+-- bad second codepoint in surrogate pair
+SELECT test_bytea_to_utf16('\xd83dbeef');
+ERROR:  invalid UTF-16 sequence 0xd83d 0xbeef
 DROP TABLE encoding_tests;
 DROP TABLE toast_4b_utf8;
 DROP FUNCTION test_encoding;
@@ -409,7 +439,9 @@ DROP FUNCTION test_text_to_wchars;
 DROP FUNCTION test_valid_server_encoding;
 DROP FUNCTION test_mblen_func;
 DROP FUNCTION test_bytea_to_text;
+DROP FUNCTION test_bytea_to_utf16;
 DROP FUNCTION test_text_to_bytea;
+DROP FUNCTION test_utf16_to_bytea;
 -- substring slow path: multi-byte escape char vs. multi-byte pattern char.
 SELECT SUBSTRING('a' SIMILAR U&'\00AC' ESCAPE U&'\00A7');
  substring 
diff --git a/src/test/regress/expected/type_sanity.out b/src/test/regress/expected/type_sanity.out
index 1d21d3eb446..3391c1984a0 100644
--- a/src/test/regress/expected/type_sanity.out
+++ b/src/test/regress/expected/type_sanity.out
@@ -751,6 +751,7 @@ CREATE TABLE tab_core_types AS SELECT
   'abc'::varchar,
   'name'::name,
   'txt'::text,
+  'utf16'::utf16,
   true::bool,
   E'\\xDEADBEEF'::bytea,
   B'10001'::bit,
diff --git a/src/test/regress/expected/unicode.out b/src/test/regress/expected/unicode.out
index 1e06de22649..4b761eec6e9 100644
--- a/src/test/regress/expected/unicode.out
+++ b/src/test/regress/expected/unicode.out
@@ -105,3 +105,137 @@ ORDER BY num;
 
 SELECT is_normalized('abc', 'def');  -- run-time error
 ERROR:  invalid normalization form: def
+-- Interesting thresholds for UTF-8 and UTF-16 encoding
+WITH octet_length_thresholds(t, description) AS (VALUES
+  (U&'\+000001', 'First 1-byte UTF-8 sequence supported by PostgreSQL'),
+  (U&'\+00007F', 'Final 1-byte UTF-8 sequence'),
+  (U&'\+000080', 'First 2-byte UTF-8 sequence'),
+  (U&'\+0007FF', 'Final 2-byte UTF-8 sequence'),
+  (U&'\+000800', 'First 3-byte UTF-8 sequence'),
+  (U&'\+00FFFF', 'Final 3-byte UTF-8 sequence (end of BMP)'),
+  (U&'\+010000', 'First 4-byte UTF-8 sequence, UTF-16 pair'),
+  (U&'\+10FFFF', 'Final valid codepoint'))
+SELECT to_hex(ascii(t)),
+       description,
+       octet_length(t::text) AS utf8_octets,
+       octet_length(t::utf16) AS utf16_octets,
+       length(t::text) AS utf8_length,
+       length(t::utf16) AS utf16_length
+FROM octet_length_thresholds;
+ to_hex |                     description                     | utf8_octets | utf16_octets | utf8_length | utf16_length 
+--------+-----------------------------------------------------+-------------+--------------+-------------+--------------
+ 1      | First 1-byte UTF-8 sequence supported by PostgreSQL |           1 |            2 |           1 |            1
+ 7f     | Final 1-byte UTF-8 sequence                         |           1 |            2 |           1 |            1
+ 80     | First 2-byte UTF-8 sequence                         |           2 |            2 |           1 |            1
+ 7ff    | Final 2-byte UTF-8 sequence                         |           2 |            2 |           1 |            1
+ 800    | First 3-byte UTF-8 sequence                         |           3 |            2 |           1 |            1
+ ffff   | Final 3-byte UTF-8 sequence (end of BMP)            |           3 |            2 |           1 |            1
+ 10000  | First 4-byte UTF-8 sequence, UTF-16 pair            |           4 |            4 |           1 |            1
+ 10ffff | Final valid codepoint                               |           4 |            4 |           1 |            1
+(8 rows)
+
+-- Out of range codepoints
+SELECT U&'\+000000';
+ERROR:  invalid Unicode escape value
+LINE 1: SELECT U&'\+000000';
+                  ^
+SELECT U&'\+110000';
+ERROR:  invalid Unicode escape value
+LINE 1: SELECT U&'\+110000';
+                  ^
+CREATE FUNCTION check_text_op(left_string text,
+                              left_type text,
+                              right_string text,
+                              right_type text,
+                              op text)
+RETURNS boolean
+LANGUAGE plpgsql
+AS
+$$
+DECLARE
+  format text;
+  text_e text;
+  text_i int;
+  text_b boolean;
+  test_e text;
+  test_i int;
+  test_b boolean;
+BEGIN
+  -- all cross-type results against text, text
+  IF op = 'cmp' THEN
+    format := '%s%scmp(''%s''::%s, ''%s''::%s)';
+    text_e := format(format, 'bttext', '', left_string, 'text', right_string, 'text');
+    EXECUTE format('SELECT sign(%s)', text_e) INTO text_i;
+    test_e := format(format, left_type, CASE WHEN left_type = right_type THEN '' ELSE right_type END, left_string, left_type, right_string, right_type);
+    EXECUTE format('SELECT sign(%s)', test_e) INTO test_i;
+    IF test_i <> text_i THEN
+      RAISE NOTICE '% -> %, but % -> %', text_e, text_i, test_e, test_i;
+    END IF;
+  ELSE
+    format := '''%s''::%s %s ''%s''::%s';
+    text_e := format(format, left_string, 'text', op, right_string, 'text');
+    EXECUTE format('SELECT %s', text_e) INTO text_b;
+    test_e := format(format, left_string, left_type, op, right_string, right_type);
+    EXECUTE format('SELECT %s', test_e) INTO test_b;
+    IF test_b <> text_b THEN
+      RAISE NOTICE '% -> %, but % -> %', text_e, text_b, test_e, test_b;
+    END IF;
+  END IF;
+  RETURN true;
+END;
+$$;
+WITH strings (s) AS (VALUES ('a'), ('aa'), ('aaa')),
+     ops (o)     AS (VALUES ('<'), ('<='), ('='), ('<>'), ('>='), ('>'), ('cmp')),
+     types (t)   AS (VALUES ('text'), ('name'), ('utf16'))
+SELECT count(check_text_op(left_string.s,
+                           left_type.t,
+                           right_string.s,
+                           right_type.t,
+                           op.o))
+FROM       strings left_string
+CROSS JOIN strings right_string
+CROSS JOIN types   left_type
+CROSS JOIN types   right_type
+CROSS JOIN ops     op
+WHERE left_type.t = 'utf16' OR right_type.t = 'utf16';
+ count 
+-------
+   315
+(1 row)
+
+WITH examples(language, string) AS
+(VALUES
+  ('English',  'In a hole in the ground there lived a hobbit.'),
+  ('Spanish',  'En un agujero en el suelo, vivía un hobbit.'),
+  ('Russian',  'В норе под землей жил-был хоббит.'),
+  ('Arabic',   'كان يعيش هوبيت في حفرة في الأرض.'),
+  ('Hebrew',   'בתוך חור באדמה חי הוביט.'),
+  ('Greek',    'Σε μια τρύπα στο έδαφος ζούσε ένα χόμπιτ.'),
+  ('Korean',   '땅속 어느 구멍에 한 호빗이 살고 있었다.'),
+  ('Hindi',    'जमीन में बने एक गड्ढे में एक हॉबिट रहता था।'),
+  ('Tamil',    'அந்த நிலத்தில் ஒரு துளையில் ஒரு ஹாபிட் வசித்து வந்தது.'),
+  ('Chinese',  '在地下一个洞里，住着一个霍比特人。'),
+  ('Japanese', '穴のなかに、ひとりのホビットが暮らしていた。'))
+SELECT language,
+       octet_length(string::text) || '→' || octet_length(string::utf16) AS octets,
+       to_char(ROUND(100 *
+                     ((octet_length(string::utf16)::float /
+                      (octet_length(string::text)::float) - 1.0))),
+               'S999%') AS delta,
+       string
+FROM examples;
+ language | octets  | delta |                     string                      
+----------+---------+-------+-------------------------------------------------
+ English  | 45→90   | +100% | In a hole in the ground there lived a hobbit.
+ Spanish  | 44→86   |  +95% | En un agujero en el suelo, vivía un hobbit.
+ Russian  | 59→66   |  +12% | В норе под землей жил-был хоббит.
+ Arabic   | 57→64   |  +12% | كان يعيش هوبيت في حفرة في الأرض.
+ Hebrew   | 43→48   |  +12% | בתוך חור באדמה חי הוביט.
+ Greek    | 74→82   |  +11% | Σε μια τρύπα στο έδαφος ζούσε ένα χόμπιτ.
+ Korean   | 55→46   |  -16% | 땅속 어느 구멍에 한 호빗이 살고 있었다.
+ Hindi    | 111→86  |  -23% | जमीन में बने एक गड्ढे में एक हॉबिट रहता था।
+ Tamil    | 146→108 |  -26% | அந்த நிலத்தில் ஒரு துளையில் ஒரு ஹாபிட் வசித்து வந்தது.
+ Chinese  | 51→34   |  -33% | 在地下一个洞里，住着一个霍比特人。
+ Japanese | 66→44   |  -33% | 穴のなかに、ひとりのホビットが暮らしていた。
+(11 rows)
+
diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c
index 74fe6fdce9d..617c719a804 100644
--- a/src/test/regress/regress.c
+++ b/src/test/regress/regress.c
@@ -1144,6 +1144,22 @@ test_text_to_bytea(PG_FUNCTION_ARGS)
 	PG_RETURN_BYTEA_P(PG_GETARG_TEXT_PP(0));
 }
 
+/* Convert bytea to utf16 without validation. */
+PG_FUNCTION_INFO_V1(test_bytea_to_utf16);
+Datum
+test_bytea_to_utf16(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_UTF16_P(PG_GETARG_BYTEA_PP(0));
+}
+
+/* And the reverse. */
+PG_FUNCTION_INFO_V1(test_utf16_to_bytea);
+Datum
+test_utf16_to_bytea(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_BYTEA_P(PG_GETARG_UTF16_PP(0));
+}
+
 /* Corruption tests in C. */
 PG_FUNCTION_INFO_V1(test_mblen_func);
 Datum
diff --git a/src/test/regress/sql/encoding.sql b/src/test/regress/sql/encoding.sql
index 07d7dc8ff18..7a0e386f5fb 100644
--- a/src/test/regress/sql/encoding.sql
+++ b/src/test/regress/sql/encoding.sql
@@ -11,8 +11,12 @@ SELECT getdatabaseencoding() <> 'UTF8' AS skip_test \gset
 
 CREATE FUNCTION test_bytea_to_text(bytea) RETURNS text
     AS :'regresslib' LANGUAGE C STRICT;
+CREATE FUNCTION test_bytea_to_utf16(bytea) RETURNS utf16
+    AS :'regresslib' LANGUAGE C STRICT;
 CREATE FUNCTION test_text_to_bytea(text) RETURNS bytea
     AS :'regresslib' LANGUAGE C STRICT;
+CREATE FUNCTION test_utf16_to_bytea(text) RETURNS bytea
+    AS :'regresslib' LANGUAGE C STRICT;
 CREATE FUNCTION test_mblen_func(text, text, text, int) RETURNS int
     AS :'regresslib' LANGUAGE C STRICT;
 CREATE FUNCTION test_text_to_wchars(text, text) RETURNS int[]
@@ -219,6 +223,16 @@ ALTER TABLE toast_3b_utf8 RENAME TO toast_4b_utf8;
 UPDATE toast_4b_utf8 SET c = repeat(U&'\+01F680', 3000);
 SELECT SUBSTRING(c FROM 3000 FOR 1) FROM toast_4b_utf8;
 
+-- storage format of UTF-16 is big-endian even on little-endian system
+SELECT test_bytea_to_utf16('\x0041') = 'A';
+-- odd number of bytes (corrupted storage), trailing byte ignored
+SELECT test_bytea_to_utf16('\x004100') = 'A';
+SELECT test_bytea_to_utf16('\x004100');
+-- incomplete surrogate pair
+SELECT test_bytea_to_utf16('\xd83d');
+-- bad second codepoint in surrogate pair
+SELECT test_bytea_to_utf16('\xd83dbeef');
+
 DROP TABLE encoding_tests;
 DROP TABLE toast_4b_utf8;
 DROP FUNCTION test_encoding;
@@ -227,7 +241,9 @@ DROP FUNCTION test_text_to_wchars;
 DROP FUNCTION test_valid_server_encoding;
 DROP FUNCTION test_mblen_func;
 DROP FUNCTION test_bytea_to_text;
+DROP FUNCTION test_bytea_to_utf16;
 DROP FUNCTION test_text_to_bytea;
+DROP FUNCTION test_utf16_to_bytea;
 
 
 -- substring slow path: multi-byte escape char vs. multi-byte pattern char.
diff --git a/src/test/regress/sql/type_sanity.sql b/src/test/regress/sql/type_sanity.sql
index 95d5b6e0915..c14f0a3c82f 100644
--- a/src/test/regress/sql/type_sanity.sql
+++ b/src/test/regress/sql/type_sanity.sql
@@ -567,6 +567,7 @@ CREATE TABLE tab_core_types AS SELECT
   'abc'::varchar,
   'name'::name,
   'txt'::text,
+  'utf16'::utf16,
   true::bool,
   E'\\xDEADBEEF'::bytea,
   B'10001'::bit,
diff --git a/src/test/regress/sql/unicode.sql b/src/test/regress/sql/unicode.sql
index e50adb68ed0..0eeb86801b6 100644
--- a/src/test/regress/sql/unicode.sql
+++ b/src/test/regress/sql/unicode.sql
@@ -36,3 +36,104 @@ FROM
 ORDER BY num;
 
 SELECT is_normalized('abc', 'def');  -- run-time error
+
+-- Interesting thresholds for UTF-8 and UTF-16 encoding
+
+WITH octet_length_thresholds(t, description) AS (VALUES
+  (U&'\+000001', 'First 1-byte UTF-8 sequence supported by PostgreSQL'),
+  (U&'\+00007F', 'Final 1-byte UTF-8 sequence'),
+  (U&'\+000080', 'First 2-byte UTF-8 sequence'),
+  (U&'\+0007FF', 'Final 2-byte UTF-8 sequence'),
+  (U&'\+000800', 'First 3-byte UTF-8 sequence'),
+  (U&'\+00FFFF', 'Final 3-byte UTF-8 sequence (end of BMP)'),
+  (U&'\+010000', 'First 4-byte UTF-8 sequence, UTF-16 pair'),
+  (U&'\+10FFFF', 'Final valid codepoint'))
+SELECT to_hex(ascii(t)),
+       description,
+       octet_length(t::text) AS utf8_octets,
+       octet_length(t::utf16) AS utf16_octets,
+       length(t::text) AS utf8_length,
+       length(t::utf16) AS utf16_length
+FROM octet_length_thresholds;
+-- Out of range codepoints
+SELECT U&'\+000000';
+SELECT U&'\+110000';
+
+CREATE FUNCTION check_text_op(left_string text,
+                              left_type text,
+                              right_string text,
+                              right_type text,
+                              op text)
+RETURNS boolean
+LANGUAGE plpgsql
+AS
+$$
+DECLARE
+  format text;
+  text_e text;
+  text_i int;
+  text_b boolean;
+  test_e text;
+  test_i int;
+  test_b boolean;
+BEGIN
+  -- all cross-type results against text, text
+  IF op = 'cmp' THEN
+    format := '%s%scmp(''%s''::%s, ''%s''::%s)';
+    text_e := format(format, 'bttext', '', left_string, 'text', right_string, 'text');
+    EXECUTE format('SELECT sign(%s)', text_e) INTO text_i;
+    test_e := format(format, left_type, CASE WHEN left_type = right_type THEN '' ELSE right_type END, left_string, left_type, right_string, right_type);
+    EXECUTE format('SELECT sign(%s)', test_e) INTO test_i;
+    IF test_i <> text_i THEN
+      RAISE NOTICE '% -> %, but % -> %', text_e, text_i, test_e, test_i;
+    END IF;
+  ELSE
+    format := '''%s''::%s %s ''%s''::%s';
+    text_e := format(format, left_string, 'text', op, right_string, 'text');
+    EXECUTE format('SELECT %s', text_e) INTO text_b;
+    test_e := format(format, left_string, left_type, op, right_string, right_type);
+    EXECUTE format('SELECT %s', test_e) INTO test_b;
+    IF test_b <> text_b THEN
+      RAISE NOTICE '% -> %, but % -> %', text_e, text_b, test_e, test_b;
+    END IF;
+  END IF;
+  RETURN true;
+END;
+$$;
+
+WITH strings (s) AS (VALUES ('a'), ('aa'), ('aaa')),
+     ops (o)     AS (VALUES ('<'), ('<='), ('='), ('<>'), ('>='), ('>'), ('cmp')),
+     types (t)   AS (VALUES ('text'), ('name'), ('utf16'))
+SELECT count(check_text_op(left_string.s,
+                           left_type.t,
+                           right_string.s,
+                           right_type.t,
+                           op.o))
+FROM       strings left_string
+CROSS JOIN strings right_string
+CROSS JOIN types   left_type
+CROSS JOIN types   right_type
+CROSS JOIN ops     op
+WHERE left_type.t = 'utf16' OR right_type.t = 'utf16';
+
+WITH examples(language, string) AS
+(VALUES
+  ('English',  'In a hole in the ground there lived a hobbit.'),
+  ('Spanish',  'En un agujero en el suelo, vivía un hobbit.'),
+  ('Russian',  'В норе под землей жил-был хоббит.'),
+  ('Arabic',   'كان يعيش هوبيت في حفرة في الأرض.'),
+  ('Hebrew',   'בתוך חור באדמה חי הוביט.'),
+  ('Greek',    'Σε μια τρύπα στο έδαφος ζούσε ένα χόμπιτ.'),
+  ('Korean',   '땅속 어느 구멍에 한 호빗이 살고 있었다.'),
+  ('Hindi',    'जमीन में बने एक गड्ढे में एक हॉबिट रहता था।'),
+  ('Tamil',    'அந்த நிலத்தில் ஒரு துளையில் ஒரு ஹாபிட் வசித்து வந்தது.'),
+  ('Chinese',  '在地下一个洞里，住着一个霍比特人。'),
+  ('Japanese', '穴のなかに、ひとりのホビットが暮らしていた。'))
+SELECT language,
+       octet_length(string::text) || '→' || octet_length(string::utf16) AS octets,
+       to_char(ROUND(100 *
+                     ((octet_length(string::utf16)::float /
+                      (octet_length(string::text)::float) - 1.0))),
+               'S999%') AS delta,
+       string
+FROM examples;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 49dfb662abc..8ff10a36ee6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3649,6 +3649,7 @@ cb_cleanup_dir
 cb_options
 cb_tablespace
 cb_tablespace_mapping
+char16_iterator
 char16_t
 char32_t
 check_agg_arguments_context
@@ -3946,6 +3947,7 @@ mbcharacter_incrementer
 mbdisplaylen_converter
 mblen_converter
 mbstr_verifier
+mb_iterator
 memoize_hash
 memoize_iterator
 metastring
@@ -4313,6 +4315,7 @@ ssize_t
 standard_qp_extra
 stemmer_module
 stmtCacheEntry
+storage_char16_t
 storeInfo
 storeRes_func
 stream_stop_callback
@@ -4388,6 +4391,7 @@ unicodeStyleRowFormat
 unicode_linestyle
 unit_conversion
 unlogged_relation_entry
+utf16
 utf_local_conversion_func
 uuidKEY
 uuid_rc_t
-- 
2.47.3

