From 155535f92df1d9cdf97739465b26ba970ed97063 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 12 Sep 2025 15:51:55 -0500 Subject: [PATCH v9 1/1] Optimize hex_encode() and hex_decode() using SIMD. --- src/backend/utils/adt/encode.c | 142 ++++++++++++++++++++++- src/include/port/simd.h | 201 +++++++++++++++++++++++++++++++++ 2 files changed, 339 insertions(+), 4 deletions(-) diff --git a/src/backend/utils/adt/encode.c b/src/backend/utils/adt/encode.c index 4ccaed815d1..13883c27680 100644 --- a/src/backend/utils/adt/encode.c +++ b/src/backend/utils/adt/encode.c @@ -16,6 +16,7 @@ #include #include "mb/pg_wchar.h" +#include "port/simd.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "varatt.h" @@ -177,8 +178,8 @@ static const int8 hexlookup[128] = { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, }; -uint64 -hex_encode(const char *src, size_t len, char *dst) +static inline uint64 +hex_encode_scalar(const char *src, size_t len, char *dst) { const char *end = src + len; @@ -193,6 +194,57 @@ hex_encode(const char *src, size_t len, char *dst) return (uint64) len * 2; } +uint64 +hex_encode(const char *src, size_t len, char *dst) +{ +#ifdef USE_NO_SIMD + return hex_encode_scalar(src, len, dst); +#else + const uint64 tail_idx = len & ~(sizeof(Vector8) - 1); + uint64 i; + + /* + * This works by splitting the high and low nibbles of each byte into + * separate vectors, adding the vectors to a mask that converts the + * nibbles to their equivalent ASCII bytes, and interleaving those bytes + * back together to form the final hex-encoded string. It might be + * possible to squeeze out a little more gain by manually unrolling the + * loop, but for now we don't bother. + */ + for (i = 0; i < tail_idx; i += sizeof(Vector8)) + { + Vector8 srcv; + Vector8 lo; + Vector8 hi; + Vector8 mask; + + vector8_load(&srcv, (const uint8 *) &src[i]); + + lo = vector8_and(srcv, vector8_broadcast(0x0f)); + mask = vector8_gt(lo, vector8_broadcast(0x9)); + mask = vector8_and(mask, vector8_broadcast('a' - '0' - 10)); + mask = vector8_add(mask, vector8_broadcast('0')); + lo = vector8_add(lo, mask); + + hi = vector8_and(srcv, vector8_broadcast(0xf0)); + hi = vector32_shift_right_nibble(hi); + mask = vector8_gt(hi, vector8_broadcast(0x9)); + mask = vector8_and(mask, vector8_broadcast('a' - '0' - 10)); + mask = vector8_add(mask, vector8_broadcast('0')); + hi = vector8_add(hi, mask); + + vector8_store((uint8 *) &dst[i * 2], + vector8_interleave_low(hi, lo)); + vector8_store((uint8 *) &dst[i * 2 + sizeof(Vector8)], + vector8_interleave_high(hi, lo)); + } + + (void) hex_encode_scalar(src + i, len - i, dst + i * 2); + + return (uint64) len * 2; +#endif +} + static inline bool get_hex(const char *cp, char *out) { @@ -213,8 +265,8 @@ hex_decode(const char *src, size_t len, char *dst) return hex_decode_safe(src, len, dst, NULL); } -uint64 -hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext) +static inline uint64 +hex_decode_safe_scalar(const char *src, size_t len, char *dst, Node *escontext) { const char *s, *srcend; @@ -254,6 +306,88 @@ hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext) return p - dst; } +/* + * This helper converts each byte to its binary-equivalent nibble by + * subtraction. Along the way, it verifies the input is within the expected + * range of ASCII values and returns false if not. Finally, it combines the + * generated nibbles to form the return bytes, which will be separated by zero + * bytes in the destination vector. If everything goes as planned, returns + * true. + */ +#ifndef USE_NO_SIMD +static inline bool +hex_decode_simd_helper(const Vector8 src, Vector8 *dst) +{ + Vector8 msk; + Vector8 sub; + + msk = vector8_lt(src, vector8_broadcast('0')); + if (unlikely(vector8_is_highbit_set(msk))) + return false; + + msk = vector8_lt(src, vector8_broadcast('A')); + sub = vector8_and(msk, vector8_broadcast('0')); + *dst = vector8_ssub(src, sub); + + msk = vector8_and(*dst, msk); + msk = vector8_gt(msk, vector8_broadcast(0x9)); + if (unlikely(vector8_is_highbit_set(msk))) + return false; + + msk = vector8_gt(src, vector8_broadcast('a' - 1)); + sub = vector8_and(msk, vector8_broadcast('a' - 10)); + msk = vector8_xor(msk, vector8_gt(sub, vector8_broadcast('A' - 1))); + msk = vector8_and(msk, vector8_broadcast('A' - 10)); + sub = vector8_or(sub, msk); + *dst = vector8_ssub(*dst, sub); + + msk = vector8_gt(*dst, vector8_broadcast(0xf)); + if (unlikely(vector8_is_highbit_set(msk))) + return false; + + msk = vector8_and(*dst, vector32_broadcast(0xff00ff00)); + msk = vector32_shift_right_byte(msk); + *dst = vector8_and(*dst, vector32_broadcast(0x00ff00ff)); + *dst = vector32_shift_left_nibble(*dst); + *dst = vector8_or(*dst, msk); + return true; +} +#endif /* ! USE_NO_SIMD */ + +uint64 +hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext) +{ +#ifdef USE_NO_SIMD + return hex_decode_safe_scalar(src, len, dst, escontext); +#else + const uint64 tail_idx = len & ~(sizeof(Vector8) * 2 - 1); + uint64 i; + + /* + * We must process 2 vectors at a time since the output will be half the + * length of the input. + */ + for (i = 0; i < tail_idx; i += sizeof(Vector8) * 2) + { + Vector8 srcv; + Vector8 dstv1; + Vector8 dstv2; + + vector8_load(&srcv, (const uint8 *) &src[i]); + if (unlikely(!hex_decode_simd_helper(srcv, &dstv1))) + break; + + vector8_load(&srcv, (const uint8 *) &src[i + sizeof(Vector8)]); + if (unlikely(!hex_decode_simd_helper(srcv, &dstv2))) + break; + + vector8_store((uint8 *) &dst[i / 2], vector8_pack_16(dstv1, dstv2)); + } + + return i / 2 + hex_decode_safe_scalar(src + i, len - i, dst + i / 2, escontext); +#endif +} + static uint64 hex_enc_len(const char *src, size_t srclen) { diff --git a/src/include/port/simd.h b/src/include/port/simd.h index 97c5f353022..0a9805e8ef1 100644 --- a/src/include/port/simd.h +++ b/src/include/port/simd.h @@ -70,6 +70,7 @@ static inline void vector32_load(Vector32 *v, const uint32 *s); static inline Vector8 vector8_broadcast(const uint8 c); #ifndef USE_NO_SIMD static inline Vector32 vector32_broadcast(const uint32 c); +static inline void vector8_store(uint8 *s, Vector8 v); #endif /* element-wise comparisons to a scalar */ @@ -86,6 +87,9 @@ static inline uint32 vector8_highbit_mask(const Vector8 v); static inline Vector8 vector8_or(const Vector8 v1, const Vector8 v2); #ifndef USE_NO_SIMD static inline Vector32 vector32_or(const Vector32 v1, const Vector32 v2); +static inline Vector8 vector8_xor(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_and(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_add(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_ssub(const Vector8 v1, const Vector8 v2); #endif @@ -99,6 +103,18 @@ static inline Vector8 vector8_ssub(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_eq(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_min(const Vector8 v1, const Vector8 v2); static inline Vector32 vector32_eq(const Vector32 v1, const Vector32 v2); +static inline Vector8 vector8_lt(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_gt(const Vector8 v1, const Vector8 v2); +#endif + +/* vector manipulation */ +#ifndef USE_NO_SIMD +static inline Vector8 vector8_interleave_low(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_interleave_high(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_pack_16(const Vector8 v1, const Vector8 v2); +static inline Vector32 vector32_shift_left_nibble(const Vector32 v1); +static inline Vector32 vector32_shift_right_nibble(const Vector32 v1); +static inline Vector32 vector32_shift_right_byte(const Vector32 v1); #endif /* @@ -128,6 +144,21 @@ vector32_load(Vector32 *v, const uint32 *s) } #endif /* ! USE_NO_SIMD */ +/* + * Store a vector into the given memory address. + */ +#ifndef USE_NO_SIMD +static inline void +vector8_store(uint8 *s, Vector8 v) +{ +#ifdef USE_SSE2 + _mm_storeu_si128((Vector8 *) s, v); +#elif defined(USE_NEON) + vst1q_u8(s, v); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Create a vector with all elements set to the same value. */ @@ -358,6 +389,51 @@ vector32_or(const Vector32 v1, const Vector32 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Return the bitwise XOR of the inputs. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_xor(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_xor_si128(v1, v2); +#elif defined(USE_NEON) + return veorq_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Return the bitwise AND of the inputs. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_and(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_and_si128(v1, v2); +#elif defined(USE_NEON) + return vandq_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Return the result of adding the respective elements of the input vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_add(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_add_epi8(v1, v2); +#elif defined(USE_NEON) + return vaddq_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Return the result of subtracting the respective elements of the input * vectors using saturation (i.e., if the operation would yield a value less @@ -404,6 +480,39 @@ vector32_eq(const Vector32 v1, const Vector32 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Return a vector with all bits set for each lane of v1 that is less than the + * corresponding lane of v2. NB: The comparison treats the elements as signed. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_lt(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_cmplt_epi8(v1, v2); +#elif defined(USE_NEON) + return vcltq_s8((int8x16_t) v1, (int8x16_t) v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Return a vector with all bits set for each lane of v1 that is greater than + * the corresponding lane of v2. NB: The comparison treats the elements as + * signed. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_gt(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_cmpgt_epi8(v1, v2); +#elif defined(USE_NEON) + return vcgtq_s8((int8x16_t) v1, (int8x16_t) v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Given two vectors, return a vector with the minimum element of each. */ @@ -419,4 +528,96 @@ vector8_min(const Vector8 v1, const Vector8 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Interleave elements of low halves of given vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_interleave_low(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_unpacklo_epi8(v1, v2); +#elif defined(USE_NEON) + return vzip1q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Interleave elements of high halves of given vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_interleave_high(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_unpackhi_epi8(v1, v2); +#elif defined(USE_NEON) + return vzip2q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Pack 16-bit elements in the given vectors into a single vector of 8-bit + * elements. NB: The upper 8-bits of each 16-bit element must be zeros, else + * this will produce different results on different architectures. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_pack_16(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_packus_epi16(v1, v2); +#elif defined(USE_NEON) + return vuzp1q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Unsigned shift left of each element in the vector by 4 bits. + */ +#ifndef USE_NO_SIMD +static inline Vector32 +vector32_shift_left_nibble(const Vector32 v1) +{ +#ifdef USE_SSE2 + return _mm_slli_epi32(v1, 4); +#elif defined(USE_NEON) + return vshlq_n_u32(v1, 4); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Unsigned shift right of each element in the vector by 4 bits. + */ +#ifndef USE_NO_SIMD +static inline Vector32 +vector32_shift_right_nibble(const Vector32 v1) +{ +#ifdef USE_SSE2 + return _mm_srli_epi32(v1, 4); +#elif defined(USE_NEON) + return vshrq_n_u32(v1, 4); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Unsigned shift right of each element in the vector by 1 byte. + */ +#ifndef USE_NO_SIMD +static inline Vector32 +vector32_shift_right_byte(const Vector32 v1) +{ +#ifdef USE_SSE2 + return _mm_srli_epi32(v1, 8); +#elif defined(USE_NEON) + return vshrq_n_u32(v1, 8); +#endif +} +#endif /* ! USE_NO_SIMD */ + #endif /* SIMD_H */ -- 2.39.5 (Apple Git-154)