src/backend/access/heap/visibilitymap.c | 13 +- src/backend/nodes/bitmapset.c | 23 +-- src/backend/utils/adt/tsgistidx.c | 14 +- src/include/port/pg_bitutils.h | 12 +- src/port/pg_bitutils.c | 240 ++++++++++++++++++++++++++------ 5 files changed, 214 insertions(+), 88 deletions(-) diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c index e198df65d8..d910eb2875 100644 --- a/src/backend/access/heap/visibilitymap.c +++ b/src/backend/access/heap/visibilitymap.c @@ -388,7 +388,6 @@ visibilitymap_count(Relation rel, BlockNumber *all_visible, BlockNumber *all_fro { Buffer mapBuffer; uint64 *map; - int i; /* * Read till we fall off the end of the map. We assume that any extra @@ -409,17 +408,11 @@ visibilitymap_count(Relation rel, BlockNumber *all_visible, BlockNumber *all_fro StaticAssertStmt(MAPSIZE % sizeof(uint64) == 0, "unsupported MAPSIZE"); if (all_frozen == NULL) - { - for (i = 0; i < MAPSIZE / sizeof(uint64); i++) - nvisible += pg_popcount64(map[i] & VISIBLE_MASK64); - } + nvisible = pg_popcount_mask64(map, MAPSIZE, VISIBLE_MASK64); else { - for (i = 0; i < MAPSIZE / sizeof(uint64); i++) - { - nvisible += pg_popcount64(map[i] & VISIBLE_MASK64); - nfrozen += pg_popcount64(map[i] & FROZEN_MASK64); - } + nvisible = pg_popcount_mask64(map, MAPSIZE, VISIBLE_MASK64); + nfrozen = pg_popcount_mask64(map, MAPSIZE, FROZEN_MASK64); } ReleaseBuffer(mapBuffer); diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c index 649478b0d4..5368c72255 100644 --- a/src/backend/nodes/bitmapset.c +++ b/src/backend/nodes/bitmapset.c @@ -452,7 +452,6 @@ bms_is_member(int x, const Bitmapset *a) int bms_member_index(Bitmapset *a, int x) { - int i; int bitnum; int wordnum; int result = 0; @@ -466,14 +465,8 @@ bms_member_index(Bitmapset *a, int x) bitnum = BITNUM(x); /* count bits in preceding words */ - for (i = 0; i < wordnum; i++) - { - bitmapword w = a->words[i]; - - /* No need to count the bits in a zero word */ - if (w != 0) - result += bmw_popcount(w); - } + result = pg_popcount((const char *) a->words, + wordnum * BITS_PER_BITMAPWORD / BITS_PER_BYTE); /* * Now add bits of the last word, but only those before the item. We can @@ -645,22 +638,14 @@ bms_get_singleton_member(const Bitmapset *a, int *member) int bms_num_members(const Bitmapset *a) { - int result = 0; int nwords; - int wordnum; if (a == NULL) return 0; nwords = a->nwords; - for (wordnum = 0; wordnum < nwords; wordnum++) - { - bitmapword w = a->words[wordnum]; - /* No need to count the bits in a zero word */ - if (w != 0) - result += bmw_popcount(w); - } - return result; + return pg_popcount((const char *) a->words, + nwords * BITS_PER_BITMAPWORD / BITS_PER_BYTE); } /* diff --git a/src/backend/utils/adt/tsgistidx.c b/src/backend/utils/adt/tsgistidx.c index c09eefdda2..0602bb0b7f 100644 --- a/src/backend/utils/adt/tsgistidx.c +++ b/src/backend/utils/adt/tsgistidx.c @@ -489,17 +489,9 @@ sizebitvec(BITVECP sign, int siglen) static int hemdistsign(BITVECP a, BITVECP b, int siglen) { - int i, - diff, - dist = 0; - - LOOPBYTE(siglen) - { - diff = (unsigned char) (a[i] ^ b[i]); - /* Using the popcount functions here isn't likely to win */ - dist += pg_number_of_ones[diff]; - } - return dist; + return pg_popcount_xor((const unsigned char *) a, + (const unsigned char *) b, + siglen); } static int diff --git a/src/include/port/pg_bitutils.h b/src/include/port/pg_bitutils.h index f9b77ec278..73c923c2aa 100644 --- a/src/include/port/pg_bitutils.h +++ b/src/include/port/pg_bitutils.h @@ -208,11 +208,17 @@ pg_ceil_log2_64(uint64 num) } /* Count the number of one-bits in a uint32 or uint64 */ -extern int (*pg_popcount32) (uint32 word); -extern int (*pg_popcount64) (uint64 word); +extern int pg_popcount32 (uint32 word); +extern int pg_popcount64 (uint64 word); /* Count the number of one-bits in a byte array */ -extern uint64 pg_popcount(const char *buf, int bytes); +extern uint64 (*pg_popcount) (const char *buf, int bytes); + +/* Count the number of one-bits in the result of xor operation */ +extern uint64 (*pg_popcount_xor) (const unsigned char *a, const unsigned char *b, int bytes); + +/* Count the number of one-bits in an array of uint64, with the mask applied */ +extern uint64 pg_popcount_mask64(const uint64 *buf, int bytes, uint64 mask); /* * Rotate the bits of "word" to the right by n bits. diff --git a/src/port/pg_bitutils.c b/src/port/pg_bitutils.c index 2252021854..dccb6785a1 100644 --- a/src/port/pg_bitutils.c +++ b/src/port/pg_bitutils.c @@ -116,21 +116,21 @@ const uint8 pg_number_of_ones[256] = { #endif #endif -static int pg_popcount32_slow(uint32 word); -static int pg_popcount64_slow(uint64 word); +static uint64 pg_popcount_slow(const char *buf, int bytes); +static uint64 pg_popcount_xor_slow(const unsigned char *a, const unsigned char *b, int bytes); #ifdef USE_POPCNT_ASM static bool pg_popcount_available(void); -static int pg_popcount32_choose(uint32 word); -static int pg_popcount64_choose(uint64 word); -static int pg_popcount32_asm(uint32 word); -static int pg_popcount64_asm(uint64 word); +static uint64 pg_popcount_choose(const char *buf, int bytes); +static uint64 pg_popcount_asm(const char *buf, int bytes); +static uint64 pg_popcount_xor_choose(const unsigned char *a, const unsigned char *b, int bytes); +static uint64 pg_popcount_xor_asm(const unsigned char *a, const unsigned char *b, int bytes); -int (*pg_popcount32) (uint32 word) = pg_popcount32_choose; -int (*pg_popcount64) (uint64 word) = pg_popcount64_choose; +uint64 (*pg_popcount) (const char *buf, int bytes) = pg_popcount_choose; +uint64 (*pg_popcount_xor) (const unsigned char *a, const unsigned char *b, int bytes) = pg_popcount_xor_choose; #else -int (*pg_popcount32) (uint32 word) = pg_popcount32_slow; -int (*pg_popcount64) (uint64 word) = pg_popcount64_slow; +uint64 (*pg_popcount) (const char *buf, int bytes) = pg_popcount_slow; +uint64 (*pg_popcount_xor) (const unsigned char *a, const unsigned char *b, int bytes) = pg_popcount_xor_slow; #endif /* USE_POPCNT_ASM */ #ifdef USE_POPCNT_ASM @@ -155,50 +155,38 @@ pg_popcount_available(void) } /* - * These functions get called on the first call to pg_popcount32 etc. + * These functions get called on the first call to pg_popcount etc. * They detect whether we can use the asm implementations, and replace * the function pointers so that subsequent calls are routed directly to * the chosen implementation. */ -static int -pg_popcount32_choose(uint32 word) +static uint64 +pg_popcount_choose(const char *buf, int bytes) { if (pg_popcount_available()) - { - pg_popcount32 = pg_popcount32_asm; - pg_popcount64 = pg_popcount64_asm; - } + pg_popcount = pg_popcount_asm; else - { - pg_popcount32 = pg_popcount32_slow; - pg_popcount64 = pg_popcount64_slow; - } + pg_popcount = pg_popcount_slow; - return pg_popcount32(word); + return pg_popcount(buf, bytes); } -static int -pg_popcount64_choose(uint64 word) +static uint64 +pg_popcount_xor_choose(const unsigned char *a, const unsigned char *b, int bytes) { if (pg_popcount_available()) - { - pg_popcount32 = pg_popcount32_asm; - pg_popcount64 = pg_popcount64_asm; - } + pg_popcount_xor = pg_popcount_xor_asm; else - { - pg_popcount32 = pg_popcount32_slow; - pg_popcount64 = pg_popcount64_slow; - } + pg_popcount_xor = pg_popcount_xor_slow; - return pg_popcount64(word); + return pg_popcount_xor(a, b, bytes); } /* * pg_popcount32_asm * Return the number of 1 bits set in word */ -static int +static inline int pg_popcount32_asm(uint32 word) { uint32 res; @@ -211,7 +199,7 @@ __asm__ __volatile__(" popcntl %1,%0\n":"=q"(res):"rm"(word):"cc"); * pg_popcount64_asm * Return the number of 1 bits set in word */ -static int +static inline int pg_popcount64_asm(uint64 word) { uint64 res; @@ -222,13 +210,12 @@ __asm__ __volatile__(" popcntq %1,%0\n":"=q"(res):"rm"(word):"cc"); #endif /* USE_POPCNT_ASM */ - /* - * pg_popcount32_slow + * pg_popcount32 * Return the number of 1 bits set in word */ -static int -pg_popcount32_slow(uint32 word) +int +pg_popcount32(uint32 word) { #ifdef HAVE__BUILTIN_POPCOUNT return __builtin_popcount(word); @@ -246,11 +233,11 @@ pg_popcount32_slow(uint32 word) } /* - * pg_popcount64_slow + * pg_popcount64 * Return the number of 1 bits set in word */ -static int -pg_popcount64_slow(uint64 word) +int +pg_popcount64(uint64 word) { #ifdef HAVE__BUILTIN_POPCOUNT #if defined(HAVE_LONG_INT_64) @@ -273,13 +260,12 @@ pg_popcount64_slow(uint64 word) #endif /* HAVE__BUILTIN_POPCOUNT */ } - /* - * pg_popcount + * pg_popcount_slow * Returns the number of 1-bits in buf */ uint64 -pg_popcount(const char *buf, int bytes) +pg_popcount_slow(const char *buf, int bytes) { uint64 popcnt = 0; @@ -319,3 +305,167 @@ pg_popcount(const char *buf, int bytes) return popcnt; } + +/* + * pg_xorcount_slow + * Count the number of 1-bits in the result of xor operation. + */ +uint64 +pg_popcount_xor_slow(const unsigned char *a, const unsigned char *b, int bytes) +{ + uint64 popcnt = 0; + +#if SIZEOF_VOID_P >= 8 + + /* + * We can process 64-bit chunks only if both are aligned. + */ + if (PointerIsAligned(a, uint64) && PointerIsAligned(b, uint64)) + { + const uint64 *a_words = (const uint64 *) a; + const uint64 *b_words = (const uint64 *) b; + + while (bytes >= 8) + { + popcnt += pg_popcount64(*a_words++ ^ *b_words++); + + bytes -= 8; + } + + a = (const unsigned char *) a_words; + b = (const unsigned char *) b_words; + } +#endif + // TODO: 32-bit + + /* Process any remaining bytes */ + while (bytes--) + popcnt += pg_number_of_ones[*a++ ^ *b++]; + + return popcnt; +} + +#ifdef USE_POPCNT_ASM + +/* + * pg_popcount_asm + * Returns the number of 1-bits in buf using POPCNT + */ +uint64 +pg_popcount_asm(const char *buf, int bytes) +{ + uint64 popcnt = 0; + +#if SIZEOF_VOID_P >= 8 + /* Process in 64-bit chunks if the buffer is aligned. */ + if (buf == (const char *) TYPEALIGN(8, buf)) + { + const uint64 *words = (const uint64 *) buf; + + /* + * Manually unroll the loop, since the compiler won't do it for us. + */ + while (bytes >= 4 * 8) + { + popcnt += pg_popcount64_asm(words[0]); + popcnt += pg_popcount64_asm(words[1]); + popcnt += pg_popcount64_asm(words[2]); + popcnt += pg_popcount64_asm(words[3]); + + bytes -= 4 * 8; + words += 4; + } + + while (bytes >= 8) + { + popcnt += pg_popcount64_asm(*words++); + bytes -= 8; + } + + buf = (const char *) words; + } +#else + /* Process in 32-bit chunks if the buffer is aligned. */ + if (buf == (const char *) TYPEALIGN(4, buf)) + { + const uint32 *words = (const uint32 *) buf; + + while (bytes >= 4) + { + popcnt += pg_popcount32_asm(*words++); + bytes -= 4; + } + + buf = (const char *) words; + } +#endif + + /* Process any remaining bytes */ + while (bytes--) + popcnt += pg_number_of_ones[(unsigned char) *buf++]; + + return popcnt; +} + +/* + * pg_xorcount_slow + * Count the number of 1-bits in the result of xor operation. + */ +uint64 +pg_popcount_xor_asm(const unsigned char *a, const unsigned char *b, int bytes) +{ + uint64 popcnt = 0; + +#if SIZEOF_VOID_P >= 8 + + /* + * We can process 64-bit chunks only if both are aligned. + */ + if (PointerIsAligned(a, uint64) && PointerIsAligned(b, uint64)) + { + const uint64 *a_words = (const uint64 *) a; + const uint64 *b_words = (const uint64 *) b; + + while (bytes >= 8) + { + popcnt += pg_popcount64_asm(*a_words++ ^ *b_words++); + + bytes -= 8; + } + + a = (const unsigned char *) a_words; + b = (const unsigned char *) b_words; + } +#endif + // TODO: 32-bit + + /* Process any remaining bytes */ + while (bytes--) + popcnt += pg_number_of_ones[*a++ ^ *b++]; + + return popcnt; +} + +#endif /* USE_POPCNT_ASM */ + +/* + * pg_popcount_mask64 + * Returns the number of 1-bits in buf after applying the mask + * + * Note: We currently only need a 64-bit version for the visibility map. + */ +uint64 +pg_popcount_mask64(const uint64 *buf, int bytes, uint64 mask) +{ + uint64 popcnt = 0; + + Assert(bytes % sizeof(uint64) == 0); + + while (bytes >= 8) + { + popcnt += pg_popcount64(*buf++ & mask); + bytes -= 8; + } + + return popcnt; +}