From f2b4f8cf844dead4658469257b771d3394a46ed0 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Wed, 10 Sep 2025 21:37:20 -0500 Subject: [PATCH v7 1/1] Optimize hex_encode() using SIMD. --- src/backend/utils/adt/encode.c | 56 +++++++++++++++- src/include/port/simd.h | 118 +++++++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/src/backend/utils/adt/encode.c b/src/backend/utils/adt/encode.c index 4ccaed815d1..0372d0e787a 100644 --- a/src/backend/utils/adt/encode.c +++ b/src/backend/utils/adt/encode.c @@ -16,6 +16,7 @@ #include #include "mb/pg_wchar.h" +#include "port/simd.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "varatt.h" @@ -177,8 +178,8 @@ static const int8 hexlookup[128] = { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, }; -uint64 -hex_encode(const char *src, size_t len, char *dst) +static inline uint64 +hex_encode_scalar(const char *src, size_t len, char *dst) { const char *end = src + len; @@ -193,6 +194,57 @@ hex_encode(const char *src, size_t len, char *dst) return (uint64) len * 2; } +uint64 +hex_encode(const char *src, size_t len, char *dst) +{ +#ifdef USE_NO_SIMD + return hex_encode_scalar(src, len, dst); +#else + const uint64 tail_idx = len & ~(sizeof(Vector8) - 1); + uint64 i; + + /* + * This works by splitting the high and low nibbles of each byte into + * separate vectors, adding the vectors to a mask that converts the + * nibbles to their equivalent ASCII bytes, and interleaving those bytes + * back together to form the final hex-encoded string. It might be + * possible to squeeze out a little more gain by manually unrolling the + * loop, but for now we don't bother. + */ + for (i = 0; i < tail_idx; i += sizeof(Vector8)) + { + Vector8 srcv; + Vector8 lo; + Vector8 hi; + Vector8 mask; + + vector8_load(&srcv, (const uint8 *) &src[i]); + + lo = vector8_and(srcv, vector8_broadcast(0x0f)); + mask = vector8_gt(lo, vector8_broadcast(0x9)); + mask = vector8_and(mask, vector8_broadcast('a' - '0' - 10)); + mask = vector8_add(mask, vector8_broadcast('0')); + lo = vector8_add(lo, mask); + + hi = vector8_and(srcv, vector8_broadcast(0xf0)); + hi = vector32_shift_right_nibble((Vector32) hi); + mask = vector8_gt(hi, vector8_broadcast(0x9)); + mask = vector8_and(mask, vector8_broadcast('a' - '0' - 10)); + mask = vector8_add(mask, vector8_broadcast('0')); + hi = vector8_add(hi, mask); + + vector8_store((uint8 *) &dst[i * 2], + vector8_interleave_low(hi, lo)); + vector8_store((uint8 *) &dst[i * 2 + sizeof(Vector8)], + vector8_interleave_high(hi, lo)); + } + + (void) hex_encode_scalar(src + i, len - i, dst + i * 2); + + return (uint64) len * 2; +#endif +} + static inline bool get_hex(const char *cp, char *out) { diff --git a/src/include/port/simd.h b/src/include/port/simd.h index 97c5f353022..f1d5353d2b3 100644 --- a/src/include/port/simd.h +++ b/src/include/port/simd.h @@ -70,6 +70,7 @@ static inline void vector32_load(Vector32 *v, const uint32 *s); static inline Vector8 vector8_broadcast(const uint8 c); #ifndef USE_NO_SIMD static inline Vector32 vector32_broadcast(const uint32 c); +static inline void vector8_store(uint8 *s, Vector8 v); #endif /* element-wise comparisons to a scalar */ @@ -86,6 +87,8 @@ static inline uint32 vector8_highbit_mask(const Vector8 v); static inline Vector8 vector8_or(const Vector8 v1, const Vector8 v2); #ifndef USE_NO_SIMD static inline Vector32 vector32_or(const Vector32 v1, const Vector32 v2); +static inline Vector8 vector8_and(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_add(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_ssub(const Vector8 v1, const Vector8 v2); #endif @@ -99,6 +102,14 @@ static inline Vector8 vector8_ssub(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_eq(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_min(const Vector8 v1, const Vector8 v2); static inline Vector32 vector32_eq(const Vector32 v1, const Vector32 v2); +static inline Vector8 vector8_gt(const Vector8 v1, const Vector8 v2); +#endif + +/* vector manipulation */ +#ifndef USE_NO_SIMD +static inline Vector8 vector8_interleave_low(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_interleave_high(const Vector8 v1, const Vector8 v2); +static inline Vector32 vector32_shift_right_nibble(const Vector32 v1); #endif /* @@ -128,6 +139,21 @@ vector32_load(Vector32 *v, const uint32 *s) } #endif /* ! USE_NO_SIMD */ +/* + * Store a vector into the given memory address. + */ +#ifndef USE_NO_SIMD +static inline void +vector8_store(uint8 *s, Vector8 v) +{ +#ifdef USE_SSE2 + _mm_storeu_si128((Vector8 *) s, v); +#elif defined(USE_NEON) + vst1q_u8(s, v); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Create a vector with all elements set to the same value. */ @@ -358,6 +384,36 @@ vector32_or(const Vector32 v1, const Vector32 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Return the bitwise AND of the inputs. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_and(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_and_si128(v1, v2); +#elif defined(USE_NEON) + return vandq_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Return the result of adding the respective elements of the input vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_add(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_add_epi8(v1, v2); +#elif defined(USE_NEON) + return vaddq_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Return the result of subtracting the respective elements of the input * vectors using saturation (i.e., if the operation would yield a value less @@ -404,6 +460,23 @@ vector32_eq(const Vector32 v1, const Vector32 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Return a vector with all bits set for each lane of v1 that is greater than + * the corresponding lane of v2. NB: The comparison treats the elements as + * signed. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_gt(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_cmpgt_epi8(v1, v2); +#elif defined (USE_NEON) + return vcgtq_s8((int8x16_t) v1, (int8x16_t) v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Given two vectors, return a vector with the minimum element of each. */ @@ -419,4 +492,49 @@ vector8_min(const Vector8 v1, const Vector8 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Interleave elements of low halves of given vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_interleave_low(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_unpacklo_epi8(v1, v2); +#elif defined(USE_NEON) + return vzip1q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Interleave elements of high halves of given vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_interleave_high(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_unpackhi_epi8(v1, v2); +#elif defined(USE_NEON) + return vzip2q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Unsigned shift right of each element in the vector by 4 bits. + */ +#ifndef USE_NO_SIMD +static inline Vector32 +vector32_shift_right_nibble(const Vector32 v1) +{ +#ifdef USE_SSE2 + return _mm_srli_epi32(v1, 4); +#elif defined(USE_NEON) + return vshrq_n_u32(v1, 4); +#endif +} +#endif /* ! USE_NO_SIMD */ + #endif /* SIMD_H */ -- 2.39.5 (Apple Git-154)