From ccb3d6c072899063bc6e47388ef9a222a19be324 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Mon, 17 Apr 2023 12:29:10 -0700
Subject: [PATCH v3 2/2] speed up several functions for lists with inline
 32-bit data using SIMD

---
 src/backend/nodes/list.c | 192 +++++++++++++++++++++++++++++++--------
 1 file changed, 155 insertions(+), 37 deletions(-)

diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index 92bc48de17..fa06caf553 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -57,6 +57,9 @@
 #define IsOidList(l)			((l) == NIL || IsA((l), OidList))
 #define IsXidList(l)			((l) == NIL || IsA((l), XidList))
 
+static inline ListCell *list_member_inline_internal_idx(const List *list, uint32 datum);
+static inline bool list_member_inline_internal(const List *list, uint32 datum);
+
 #ifdef USE_ASSERT_CHECKING
 /*
  * Check that the specified List is valid (so far as we can tell).
@@ -717,18 +720,10 @@ list_member_ptr(const List *list, const void *datum)
 bool
 list_member_int(const List *list, int datum)
 {
-	const ListCell *cell;
-
 	Assert(IsIntegerList(list));
 	check_list_invariants(list);
 
-	foreach(cell, list)
-	{
-		if (lfirst_int(cell) == datum)
-			return true;
-	}
-
-	return false;
+	return list_member_inline_internal(list, datum);
 }
 
 /*
@@ -737,18 +732,10 @@ list_member_int(const List *list, int datum)
 bool
 list_member_oid(const List *list, Oid datum)
 {
-	const ListCell *cell;
-
 	Assert(IsOidList(list));
 	check_list_invariants(list);
 
-	foreach(cell, list)
-	{
-		if (lfirst_oid(cell) == datum)
-			return true;
-	}
-
-	return false;
+	return list_member_inline_internal(list, datum);
 }
 
 /*
@@ -757,18 +744,10 @@ list_member_oid(const List *list, Oid datum)
 bool
 list_member_xid(const List *list, TransactionId datum)
 {
-	const ListCell *cell;
-
 	Assert(IsXidList(list));
 	check_list_invariants(list);
 
-	foreach(cell, list)
-	{
-		if (lfirst_xid(cell) == datum)
-			return true;
-	}
-
-	return false;
+	return list_member_inline_internal(list, datum);
 }
 
 /*
@@ -929,11 +908,9 @@ list_delete_int(List *list, int datum)
 	Assert(IsIntegerList(list));
 	check_list_invariants(list);
 
-	foreach(cell, list)
-	{
-		if (lfirst_int(cell) == datum)
-			return list_delete_cell(list, cell);
-	}
+	cell = list_member_inline_internal_idx(list, datum);
+	if (cell != NULL)
+		return list_delete_cell(list, cell);
 
 	/* Didn't find a match: return the list unmodified */
 	return list;
@@ -948,11 +925,9 @@ list_delete_oid(List *list, Oid datum)
 	Assert(IsOidList(list));
 	check_list_invariants(list);
 
-	foreach(cell, list)
-	{
-		if (lfirst_oid(cell) == datum)
-			return list_delete_cell(list, cell);
-	}
+	cell = list_member_inline_internal_idx(list, datum);
+	if (cell != NULL)
+		return list_delete_cell(list, cell);
 
 	/* Didn't find a match: return the list unmodified */
 	return list;
@@ -1749,3 +1724,146 @@ list_oid_cmp(const ListCell *p1, const ListCell *p2)
 		return 1;
 	return 0;
 }
+
+/*
+ * list_member_inline_helper
+ *
+ * Workhorse for list_member_inline_internal and
+ * list_member_inline_internal_idx.
+ */
+static inline bool
+list_member_inline_helper(const List *list, uint32 datum, uint32 *i)
+{
+#ifdef USE_NO_SIMD
+
+	*i = 0;
+
+#else
+
+	/*
+	 * For better instruction-level parallelism, each loop iteration operates
+	 * on a block of four registers.
+	 */
+	const Vector32 keys = vector32_broadcast(datum);	/* load copies of key */
+	const uint32 nelem_per_vector = sizeof(Vector32) / sizeof(uint32);
+	const uint32 nelem_per_iteration = 4 * nelem_per_vector;
+#ifdef USE_NEON
+	const Vector32 mask = (Vector32) vector64_broadcast(UINT64CONST(0xFFFFFFFF));
+#else
+	const Vector32 mask = vector64_broadcast(UINT64CONST(0xFFFFFFFF));
+#endif
+	const uint32 *elements = (const uint32 *) list->elements;
+
+	/* round down to multiple of elements per iteration */
+	const uint32 tail_idx = (list->length * 2) & ~(nelem_per_iteration - 1);
+
+	/*
+	 * The SIMD optimized portion of this routine is written with the
+	 * expectation that the 32-bit datum we are searching for only takes up
+	 * half of a ListCell.  If that changes, this routine must change, too.
+	 */
+	Assert(sizeof(ListCell) == 8);
+
+	for (*i = 0; *i < tail_idx; *i += nelem_per_iteration)
+	{
+		Vector32	vals1,
+					vals2,
+					vals3,
+					vals4,
+					result1,
+					result2,
+					result3,
+					result4,
+					tmp1,
+					tmp2,
+					result,
+					masked;
+
+		/* load the next block into 4 registers */
+		vector32_load(&vals1, &elements[*i]);
+		vector32_load(&vals2, &elements[*i + nelem_per_vector]);
+		vector32_load(&vals3, &elements[*i + nelem_per_vector * 2]);
+		vector32_load(&vals4, &elements[*i + nelem_per_vector * 3]);
+
+		/* compare each value to the key */
+		result1 = vector32_eq(keys, vals1);
+		result2 = vector32_eq(keys, vals2);
+		result3 = vector32_eq(keys, vals3);
+		result4 = vector32_eq(keys, vals4);
+
+		/* combine the results into a single variable */
+		tmp1 = vector32_or(result1, result2);
+		tmp2 = vector32_or(result3, result4);
+		result = vector32_or(tmp1, tmp2);
+
+		/* filter out matches in space between data */
+		masked = vector32_and(result, mask);
+
+		/* break out and find the exact element if there was a match */
+		if (vector32_is_highbit_set(masked))
+		{
+			*i /= 2;
+			return true;
+		}
+	}
+
+#endif							/* ! USE_NO_SIMD */
+
+	*i /= 2;
+	return false;
+}
+
+/*
+ * list_member_inline_internal
+ *
+ * Optimized linear search routine (using SIMD intrinsics where available) for
+ * lists with inline 32-bit data.
+ */
+static inline bool
+list_member_inline_internal(const List *list, uint32 datum)
+{
+	uint32		i = 0;
+	const ListCell *cell;
+
+	if (list == NIL)
+		return false;
+
+	if (list_member_inline_helper(list, datum, &i))
+		return true;
+
+	/* Process the remaining elements one at a time. */
+	for_each_from(cell, list, i)
+	{
+		if (lfirst_int(cell) == (int) datum)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * list_member_inline_internal_idx
+ *
+ * Optimized linear search routine (using SIMD intrinsics where available) for
+ * lists with inline 32-bit data.
+ */
+static inline ListCell *
+list_member_inline_internal_idx(const List *list, uint32 datum)
+{
+	uint32		i = 0;
+	ListCell *cell;
+
+	if (list == NIL)
+		return NULL;
+
+	(void) list_member_inline_helper(list, datum, &i);
+
+	/* Process the remaining elements one at a time. */
+	for_each_from(cell, list, i)
+	{
+		if (lfirst_int(cell) == (int) datum)
+			return cell;
+	}
+
+	return NULL;
+}
-- 
2.25.1

