From ba097b763eb80ab8c9c78503fcb5be5575342ff8 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 12 Sep 2025 15:51:55 -0500 Subject: [PATCH v10 1/1] Optimize hex_encode() and hex_decode() using SIMD. --- src/backend/utils/adt/encode.c | 131 ++++++++++++++++++- src/include/port/simd.h | 227 ++++++++++++++++++++++++++++++++- 2 files changed, 348 insertions(+), 10 deletions(-) diff --git a/src/backend/utils/adt/encode.c b/src/backend/utils/adt/encode.c index 9a9c7e8da99..028e0ca6887 100644 --- a/src/backend/utils/adt/encode.c +++ b/src/backend/utils/adt/encode.c @@ -16,6 +16,7 @@ #include #include "mb/pg_wchar.h" +#include "port/simd.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "varatt.h" @@ -177,8 +178,8 @@ static const int8 hexlookup[128] = { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, }; -uint64 -hex_encode(const char *src, size_t len, char *dst) +static inline uint64 +hex_encode_scalar(const char *src, size_t len, char *dst) { const char *end = src + len; @@ -193,6 +194,57 @@ hex_encode(const char *src, size_t len, char *dst) return (uint64) len * 2; } +uint64 +hex_encode(const char *src, size_t len, char *dst) +{ +#ifdef USE_NO_SIMD + return hex_encode_scalar(src, len, dst); +#else + const uint64 tail_idx = len & ~(sizeof(Vector8) - 1); + uint64 i; + + /* + * This works by splitting the high and low nibbles of each byte into + * separate vectors, adding the vectors to a mask that converts the + * nibbles to their equivalent ASCII bytes, and interleaving those bytes + * back together to form the final hex-encoded string. It might be + * possible to squeeze out a little more gain by manually unrolling the + * loop, but for now we don't bother. + */ + for (i = 0; i < tail_idx; i += sizeof(Vector8)) + { + Vector8 srcv; + Vector8 lo; + Vector8 hi; + Vector8 mask; + + vector8_load(&srcv, (const uint8 *) &src[i]); + + lo = vector8_and(srcv, vector8_broadcast(0x0f)); + mask = vector8_gt(lo, vector8_broadcast(0x9)); + mask = vector8_and(mask, vector8_broadcast('a' - '0' - 10)); + mask = vector8_add(mask, vector8_broadcast('0')); + lo = vector8_add(lo, mask); + + hi = vector8_and(srcv, vector8_broadcast(0xf0)); + hi = vector32_shift_right_nibble(hi); + mask = vector8_gt(hi, vector8_broadcast(0x9)); + mask = vector8_and(mask, vector8_broadcast('a' - '0' - 10)); + mask = vector8_add(mask, vector8_broadcast('0')); + hi = vector8_add(hi, mask); + + vector8_store((uint8 *) &dst[i * 2], + vector8_interleave_low(hi, lo)); + vector8_store((uint8 *) &dst[i * 2 + sizeof(Vector8)], + vector8_interleave_high(hi, lo)); + } + + (void) hex_encode_scalar(src + i, len - i, dst + i * 2); + + return (uint64) len * 2; +#endif +} + static inline bool get_hex(const char *cp, char *out) { @@ -213,8 +265,8 @@ hex_decode(const char *src, size_t len, char *dst) return hex_decode_safe(src, len, dst, NULL); } -uint64 -hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext) +static inline uint64 +hex_decode_safe_scalar(const char *src, size_t len, char *dst, Node *escontext) { const char *s, *srcend; @@ -254,6 +306,77 @@ hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext) return p - dst; } +/* + * This helper converts each byte to its binary-equivalent nibble by + * subtraction and combines them to form the return bytes (separated by zero + * bytes). Returns false if any input bytes are outside the expected ranges of + * ASCII values. Otherwise, returns true. + */ +#ifndef USE_NO_SIMD +static inline bool +hex_decode_simd_helper(const Vector8 src, Vector8 *dst) +{ + Vector8 sub; + Vector8 msk; + + msk = vector8_gt(vector8_broadcast('9' + 1), src); + sub = vector8_and(msk, vector8_broadcast('0')); + + msk = vector8_gt(src, vector8_broadcast('A' - 1)); + msk = vector8_and(msk, vector8_broadcast('A' - 10)); + sub = vector8_add(sub, msk); + + msk = vector8_gt(src, vector8_broadcast('a' - 1)); + msk = vector8_and(msk, vector8_broadcast('a' - 'A')); + sub = vector8_add(sub, msk); + + *dst = vector8_sssub(src, sub); + if (unlikely(vector8_has_ge(*dst, 0x10))) + return false; + + msk = vector8_and(*dst, vector32_broadcast(0xff00ff00)); + msk = vector32_shift_right_byte(msk); + *dst = vector8_and(*dst, vector32_broadcast(0x00ff00ff)); + *dst = vector32_shift_left_nibble(*dst); + *dst = vector8_or(*dst, msk); + return true; +} +#endif /* ! USE_NO_SIMD */ + +uint64 +hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext) +{ +#ifdef USE_NO_SIMD + return hex_decode_safe_scalar(src, len, dst, escontext); +#else + const uint64 tail_idx = len & ~(sizeof(Vector8) * 2 - 1); + uint64 i; + + /* + * We must process 2 vectors at a time since the output will be half the + * length of the input. + */ + for (i = 0; i < tail_idx; i += sizeof(Vector8) * 2) + { + Vector8 srcv; + Vector8 dstv1; + Vector8 dstv2; + + vector8_load(&srcv, (const uint8 *) &src[i]); + if (unlikely(!hex_decode_simd_helper(srcv, &dstv1))) + break; + + vector8_load(&srcv, (const uint8 *) &src[i + sizeof(Vector8)]); + if (unlikely(!hex_decode_simd_helper(srcv, &dstv2))) + break; + + vector8_store((uint8 *) &dst[i / 2], vector8_pack_16(dstv1, dstv2)); + } + + return i / 2 + hex_decode_safe_scalar(src + i, len - i, dst + i / 2, escontext); +#endif +} + static uint64 hex_enc_len(const char *src, size_t srclen) { diff --git a/src/include/port/simd.h b/src/include/port/simd.h index 97c5f353022..8059e37acc7 100644 --- a/src/include/port/simd.h +++ b/src/include/port/simd.h @@ -70,12 +70,16 @@ static inline void vector32_load(Vector32 *v, const uint32 *s); static inline Vector8 vector8_broadcast(const uint8 c); #ifndef USE_NO_SIMD static inline Vector32 vector32_broadcast(const uint32 c); +static inline void vector8_store(uint8 *s, Vector8 v); #endif /* element-wise comparisons to a scalar */ static inline bool vector8_has(const Vector8 v, const uint8 c); static inline bool vector8_has_zero(const Vector8 v); static inline bool vector8_has_le(const Vector8 v, const uint8 c); +#ifndef USE_NO_SIMD +static inline bool vector8_has_ge(const Vector8 v, const uint8 c); +#endif static inline bool vector8_is_highbit_set(const Vector8 v); #ifndef USE_NO_SIMD static inline bool vector32_is_highbit_set(const Vector32 v); @@ -86,7 +90,10 @@ static inline uint32 vector8_highbit_mask(const Vector8 v); static inline Vector8 vector8_or(const Vector8 v1, const Vector8 v2); #ifndef USE_NO_SIMD static inline Vector32 vector32_or(const Vector32 v1, const Vector32 v2); -static inline Vector8 vector8_ssub(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_and(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_add(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_sssub(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_ussub(const Vector8 v1, const Vector8 v2); #endif /* @@ -99,6 +106,17 @@ static inline Vector8 vector8_ssub(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_eq(const Vector8 v1, const Vector8 v2); static inline Vector8 vector8_min(const Vector8 v1, const Vector8 v2); static inline Vector32 vector32_eq(const Vector32 v1, const Vector32 v2); +static inline Vector8 vector8_gt(const Vector8 v1, const Vector8 v2); +#endif + +/* vector manipulation */ +#ifndef USE_NO_SIMD +static inline Vector8 vector8_interleave_low(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_interleave_high(const Vector8 v1, const Vector8 v2); +static inline Vector8 vector8_pack_16(const Vector8 v1, const Vector8 v2); +static inline Vector32 vector32_shift_left_nibble(const Vector32 v1); +static inline Vector32 vector32_shift_right_nibble(const Vector32 v1); +static inline Vector32 vector32_shift_right_byte(const Vector32 v1); #endif /* @@ -128,6 +146,21 @@ vector32_load(Vector32 *v, const uint32 *s) } #endif /* ! USE_NO_SIMD */ +/* + * Store a vector into the given memory address. + */ +#ifndef USE_NO_SIMD +static inline void +vector8_store(uint8 *s, Vector8 v) +{ +#ifdef USE_SSE2 + _mm_storeu_si128((Vector8 *) s, v); +#elif defined(USE_NEON) + vst1q_u8(s, v); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Create a vector with all elements set to the same value. */ @@ -257,13 +290,32 @@ vector8_has_le(const Vector8 v, const uint8 c) * NUL bytes. This approach is a workaround for the lack of unsigned * comparison instructions on some architectures. */ - result = vector8_has_zero(vector8_ssub(v, vector8_broadcast(c))); + result = vector8_has_zero(vector8_ussub(v, vector8_broadcast(c))); #endif Assert(assert_result == result); return result; } +/* + * Returns true if any elements in the vector are greater than or equal to the + * given scalar. + */ +#ifndef USE_NO_SIMD +static inline bool +vector8_has_ge(const Vector8 v, const uint8 c) +{ +#ifdef USE_SSE2 + Vector8 umax = _mm_max_epu8(v, vector8_broadcast(c)); + Vector8 cmpe = _mm_cmpeq_epi8(umax, v); + + return vector8_is_highbit_set(cmpe); +#elif defined(USE_NEON) + return vmaxvq_u8(v) >= c; +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Return true if the high bit of any element is set */ @@ -358,15 +410,65 @@ vector32_or(const Vector32 v1, const Vector32 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Return the bitwise AND of the inputs. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_and(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_and_si128(v1, v2); +#elif defined(USE_NEON) + return vandq_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Return the result of adding the respective elements of the input vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_add(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_add_epi8(v1, v2); +#elif defined(USE_NEON) + return vaddq_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Return the result of subtracting the respective elements of the input - * vectors using saturation (i.e., if the operation would yield a value less - * than zero, zero is returned instead). For more information on saturation - * arithmetic, see https://en.wikipedia.org/wiki/Saturation_arithmetic + * vectors using signed saturation (i.e., if the operation would yield a value + * less than -128, -128 is returned instead). For more information on + * saturation arithmetic, see + * https://en.wikipedia.org/wiki/Saturation_arithmetic */ #ifndef USE_NO_SIMD static inline Vector8 -vector8_ssub(const Vector8 v1, const Vector8 v2) +vector8_sssub(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_subs_epi8(v1, v2); +#elif defined(USE_NEON) + return vqsubq_s8((int8x16_t) v1, (int8x16_t) v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Return the result of subtracting the respective elements of the input + * vectors using unsigned saturation (i.e., if the operation would yield a + * value less than zero, zero is returned instead). For more information on + * saturation arithmetic, see + * https://en.wikipedia.org/wiki/Saturation_arithmetic + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_ussub(const Vector8 v1, const Vector8 v2) { #ifdef USE_SSE2 return _mm_subs_epu8(v1, v2); @@ -404,6 +506,23 @@ vector32_eq(const Vector32 v1, const Vector32 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Return a vector with all bits set for each lane of v1 that is greater than + * the corresponding lane of v2. NB: The comparison treats the elements as + * signed. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_gt(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_cmpgt_epi8(v1, v2); +#elif defined(USE_NEON) + return vcgtq_s8((int8x16_t) v1, (int8x16_t) v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + /* * Given two vectors, return a vector with the minimum element of each. */ @@ -419,4 +538,100 @@ vector8_min(const Vector8 v1, const Vector8 v2) } #endif /* ! USE_NO_SIMD */ +/* + * Interleave elements of low halves of given vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_interleave_low(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_unpacklo_epi8(v1, v2); +#elif defined(USE_NEON) + return vzip1q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Interleave elements of high halves of given vectors. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_interleave_high(const Vector8 v1, const Vector8 v2) +{ +#ifdef USE_SSE2 + return _mm_unpackhi_epi8(v1, v2); +#elif defined(USE_NEON) + return vzip2q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Pack 16-bit elements in the given vectors into a single vector of 8-bit + * elements. NB: The upper 8-bits of each 16-bit element must be zeros, else + * this will produce different results on different architectures. + */ +#ifndef USE_NO_SIMD +static inline Vector8 +vector8_pack_16(const Vector8 v1, const Vector8 v2) +{ + Vector32 mask PG_USED_FOR_ASSERTS_ONLY = vector32_broadcast(0xff00ff00); + + Assert(!vector8_has_ge(vector8_and(v1, mask), 1)); + Assert(!vector8_has_ge(vector8_and(v2, mask), 1)); +#ifdef USE_SSE2 + return _mm_packus_epi16(v1, v2); +#elif defined(USE_NEON) + return vuzp1q_u8(v1, v2); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Unsigned shift left of each element in the vector by 4 bits. + */ +#ifndef USE_NO_SIMD +static inline Vector32 +vector32_shift_left_nibble(const Vector32 v1) +{ +#ifdef USE_SSE2 + return _mm_slli_epi32(v1, 4); +#elif defined(USE_NEON) + return vshlq_n_u32(v1, 4); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Unsigned shift right of each element in the vector by 4 bits. + */ +#ifndef USE_NO_SIMD +static inline Vector32 +vector32_shift_right_nibble(const Vector32 v1) +{ +#ifdef USE_SSE2 + return _mm_srli_epi32(v1, 4); +#elif defined(USE_NEON) + return vshrq_n_u32(v1, 4); +#endif +} +#endif /* ! USE_NO_SIMD */ + +/* + * Unsigned shift right of each element in the vector by 1 byte. + */ +#ifndef USE_NO_SIMD +static inline Vector32 +vector32_shift_right_byte(const Vector32 v1) +{ +#ifdef USE_SSE2 + return _mm_srli_epi32(v1, 8); +#elif defined(USE_NEON) + return vshrq_n_u32(v1, 8); +#endif +} +#endif /* ! USE_NO_SIMD */ + #endif /* SIMD_H */ -- 2.39.5 (Apple Git-154)