From ccb3d6c072899063bc6e47388ef9a222a19be324 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Mon, 17 Apr 2023 12:29:10 -0700 Subject: [PATCH v3 2/2] speed up several functions for lists with inline 32-bit data using SIMD --- src/backend/nodes/list.c | 192 +++++++++++++++++++++++++++++++-------- 1 file changed, 155 insertions(+), 37 deletions(-) diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index 92bc48de17..fa06caf553 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -57,6 +57,9 @@ #define IsOidList(l) ((l) == NIL || IsA((l), OidList)) #define IsXidList(l) ((l) == NIL || IsA((l), XidList)) +static inline ListCell *list_member_inline_internal_idx(const List *list, uint32 datum); +static inline bool list_member_inline_internal(const List *list, uint32 datum); + #ifdef USE_ASSERT_CHECKING /* * Check that the specified List is valid (so far as we can tell). @@ -717,18 +720,10 @@ list_member_ptr(const List *list, const void *datum) bool list_member_int(const List *list, int datum) { - const ListCell *cell; - Assert(IsIntegerList(list)); check_list_invariants(list); - foreach(cell, list) - { - if (lfirst_int(cell) == datum) - return true; - } - - return false; + return list_member_inline_internal(list, datum); } /* @@ -737,18 +732,10 @@ list_member_int(const List *list, int datum) bool list_member_oid(const List *list, Oid datum) { - const ListCell *cell; - Assert(IsOidList(list)); check_list_invariants(list); - foreach(cell, list) - { - if (lfirst_oid(cell) == datum) - return true; - } - - return false; + return list_member_inline_internal(list, datum); } /* @@ -757,18 +744,10 @@ list_member_oid(const List *list, Oid datum) bool list_member_xid(const List *list, TransactionId datum) { - const ListCell *cell; - Assert(IsXidList(list)); check_list_invariants(list); - foreach(cell, list) - { - if (lfirst_xid(cell) == datum) - return true; - } - - return false; + return list_member_inline_internal(list, datum); } /* @@ -929,11 +908,9 @@ list_delete_int(List *list, int datum) Assert(IsIntegerList(list)); check_list_invariants(list); - foreach(cell, list) - { - if (lfirst_int(cell) == datum) - return list_delete_cell(list, cell); - } + cell = list_member_inline_internal_idx(list, datum); + if (cell != NULL) + return list_delete_cell(list, cell); /* Didn't find a match: return the list unmodified */ return list; @@ -948,11 +925,9 @@ list_delete_oid(List *list, Oid datum) Assert(IsOidList(list)); check_list_invariants(list); - foreach(cell, list) - { - if (lfirst_oid(cell) == datum) - return list_delete_cell(list, cell); - } + cell = list_member_inline_internal_idx(list, datum); + if (cell != NULL) + return list_delete_cell(list, cell); /* Didn't find a match: return the list unmodified */ return list; @@ -1749,3 +1724,146 @@ list_oid_cmp(const ListCell *p1, const ListCell *p2) return 1; return 0; } + +/* + * list_member_inline_helper + * + * Workhorse for list_member_inline_internal and + * list_member_inline_internal_idx. + */ +static inline bool +list_member_inline_helper(const List *list, uint32 datum, uint32 *i) +{ +#ifdef USE_NO_SIMD + + *i = 0; + +#else + + /* + * For better instruction-level parallelism, each loop iteration operates + * on a block of four registers. + */ + const Vector32 keys = vector32_broadcast(datum); /* load copies of key */ + const uint32 nelem_per_vector = sizeof(Vector32) / sizeof(uint32); + const uint32 nelem_per_iteration = 4 * nelem_per_vector; +#ifdef USE_NEON + const Vector32 mask = (Vector32) vector64_broadcast(UINT64CONST(0xFFFFFFFF)); +#else + const Vector32 mask = vector64_broadcast(UINT64CONST(0xFFFFFFFF)); +#endif + const uint32 *elements = (const uint32 *) list->elements; + + /* round down to multiple of elements per iteration */ + const uint32 tail_idx = (list->length * 2) & ~(nelem_per_iteration - 1); + + /* + * The SIMD optimized portion of this routine is written with the + * expectation that the 32-bit datum we are searching for only takes up + * half of a ListCell. If that changes, this routine must change, too. + */ + Assert(sizeof(ListCell) == 8); + + for (*i = 0; *i < tail_idx; *i += nelem_per_iteration) + { + Vector32 vals1, + vals2, + vals3, + vals4, + result1, + result2, + result3, + result4, + tmp1, + tmp2, + result, + masked; + + /* load the next block into 4 registers */ + vector32_load(&vals1, &elements[*i]); + vector32_load(&vals2, &elements[*i + nelem_per_vector]); + vector32_load(&vals3, &elements[*i + nelem_per_vector * 2]); + vector32_load(&vals4, &elements[*i + nelem_per_vector * 3]); + + /* compare each value to the key */ + result1 = vector32_eq(keys, vals1); + result2 = vector32_eq(keys, vals2); + result3 = vector32_eq(keys, vals3); + result4 = vector32_eq(keys, vals4); + + /* combine the results into a single variable */ + tmp1 = vector32_or(result1, result2); + tmp2 = vector32_or(result3, result4); + result = vector32_or(tmp1, tmp2); + + /* filter out matches in space between data */ + masked = vector32_and(result, mask); + + /* break out and find the exact element if there was a match */ + if (vector32_is_highbit_set(masked)) + { + *i /= 2; + return true; + } + } + +#endif /* ! USE_NO_SIMD */ + + *i /= 2; + return false; +} + +/* + * list_member_inline_internal + * + * Optimized linear search routine (using SIMD intrinsics where available) for + * lists with inline 32-bit data. + */ +static inline bool +list_member_inline_internal(const List *list, uint32 datum) +{ + uint32 i = 0; + const ListCell *cell; + + if (list == NIL) + return false; + + if (list_member_inline_helper(list, datum, &i)) + return true; + + /* Process the remaining elements one at a time. */ + for_each_from(cell, list, i) + { + if (lfirst_int(cell) == (int) datum) + return true; + } + + return false; +} + +/* + * list_member_inline_internal_idx + * + * Optimized linear search routine (using SIMD intrinsics where available) for + * lists with inline 32-bit data. + */ +static inline ListCell * +list_member_inline_internal_idx(const List *list, uint32 datum) +{ + uint32 i = 0; + ListCell *cell; + + if (list == NIL) + return NULL; + + (void) list_member_inline_helper(list, datum, &i); + + /* Process the remaining elements one at a time. */ + for_each_from(cell, list, i) + { + if (lfirst_int(cell) == (int) datum) + return cell; + } + + return NULL; +} -- 2.25.1