From 81af5c4c569265ae4e503e96429d93f3d8f47e65 Mon Sep 17 00:00:00 2001 From: Mike Curtiss Date: Fri, 22 Feb 2013 14:28:05 -0800 Subject: [PATCH] Use aligned loads for Range::find_first_of Summary: Aligned loads are faster on some architectures. Let's refactor qfind_first_of so that it uses aligned loads when possible. Also modify the benchmarks to account for begin/end-of-string logic. Test Plan: Tests pass. Updated benchmarks. Generally seeing a 5-20% speed-up, depending on the situation. Reviewed By: philipp@fb.com FB internal diff: D720369 --- folly/Range.cpp | 134 ++++++++++++++++-------------- folly/test/RangeFindBenchmark.cpp | 22 ++++- 2 files changed, 93 insertions(+), 63 deletions(-) diff --git a/folly/Range.cpp b/folly/Range.cpp index aca2daa7..cf0ba535 100644 --- a/folly/Range.cpp +++ b/folly/Range.cpp @@ -60,12 +60,17 @@ namespace { // It's okay if pages are bigger than this (as powers of two), but they should // not be smaller. constexpr size_t kMinPageSize = 4096; +static_assert(kMinPageSize >= 16, + "kMinPageSize must be at least SSE register size"); #define PAGE_FOR(addr) \ - (reinterpret_cast(addr) / kMinPageSize) + (reinterpret_cast(addr) / kMinPageSize) -// Rounds up to the next multiple of 16 -#define ROUND_UP_16(val) \ - ((val + 15) & ~0xF) +inline size_t nextAlignedIndex(const char* arr) { + auto firstPossible = reinterpret_cast(arr) + 1; + return 1 + // add 1 because the index starts at 'arr' + ((firstPossible + 15) & ~0xF) // round up to next multiple of 16 + - firstPossible; +} // build sse4.2-optimized version even if -msse4.2 is not passed to GCC size_t qfind_first_byte_of_needles16(const StringPiece& haystack, @@ -78,43 +83,37 @@ size_t qfind_first_byte_of_needles16(const StringPiece& haystack, DCHECK(!haystack.empty()); DCHECK(!needles.empty()); DCHECK_LE(needles.size(), 16); + // benchmarking shows that memchr beats out SSE for small needle-sets + // with large haystacks. if ((needles.size() <= 2 && haystack.size() >= 256) || - // we can't load needles into SSE register if it could cross page boundary - (PAGE_FOR(needles.end() - 1) != PAGE_FOR(needles.data() + 15))) { - // benchmarking shows that memchr beats out SSE for small needle-sets - // with large haystacks. - // TODO(mcurtiss): could this be because of unaligned SSE loads? + // must bail if we can't even SSE-load a single segment of haystack + (haystack.size() < 16 && + PAGE_FOR(haystack.end() - 1) != PAGE_FOR(haystack.data() + 15)) || + // can't load needles into SSE register if it could cross page boundary + PAGE_FOR(needles.end() - 1) != PAGE_FOR(needles.data() + 15)) { return detail::qfind_first_byte_of_memchr(haystack, needles); } - __v16qi arr2 = __builtin_ia32_loaddqu(needles.data()); - - // If true, the last byte we want to load into the SSE register is on the - // same page as the last byte of the actual Range. No risk of segfault. - bool canSseLoadLastBlock = - (PAGE_FOR(haystack.end() - 1) == - PAGE_FOR(haystack.data() + ROUND_UP_16(haystack.size()) - 1)); - int64_t lastSafeBlockIdx = canSseLoadLastBlock ? - haystack.size() : static_cast(haystack.size()) - 16; + auto arr2 = __builtin_ia32_loaddqu(needles.data()); + // do an unaligned load for first block of haystack + auto arr1 = __builtin_ia32_loaddqu(haystack.data()); + auto index = __builtin_ia32_pcmpestri128(arr2, needles.size(), + arr1, haystack.size(), 0); + if (index < 16) { + return index; + } - int64_t i = 0; - for (; i < lastSafeBlockIdx; i+= 16) { - auto arr1 = __builtin_ia32_loaddqu(haystack.data() + i); + // Now, we can do aligned loads hereafter... + size_t i = nextAlignedIndex(haystack.data()); + for (; i < haystack.size(); i+= 16) { + void* ptr1 = __builtin_assume_aligned(haystack.data() + i, 16); + auto arr1 = *reinterpret_cast(ptr1); auto index = __builtin_ia32_pcmpestri128(arr2, needles.size(), arr1, haystack.size() - i, 0); if (index < 16) { return i + index; } } - - if (!canSseLoadLastBlock) { - StringPiece tmp(haystack); - tmp.advance(i); - auto ret = detail::qfind_first_byte_of_memchr(tmp, needles); - if (ret != StringPiece::npos) { - return ret + i; - } - } return StringPiece::npos; } @@ -162,39 +161,48 @@ size_t qfind_first_byte_of_byteset(const StringPiece& haystack, return StringPiece::npos; } +template inline size_t scanHaystackBlock(const StringPiece& haystack, const StringPiece& needles, int64_t idx) -// inlining is okay because it's only called from other sse4.2 functions +// inline is okay because it's only called from other sse4.2 functions __attribute__ ((__target__("sse4.2"))); // Scans a 16-byte block of haystack (starting at blockStartIdx) to find first -// needle. If blockStartIdx is near the end of haystack, it may read a few bytes -// past the end; it is the caller's responsibility to ensure this is safe. +// needle. If HAYSTACK_ALIGNED, then haystack must be 16byte aligned. +// If !HAYSTACK_ALIGNED, then caller must ensure that it is safe to load the +// block. +template inline size_t scanHaystackBlock(const StringPiece& haystack, const StringPiece& needles, int64_t blockStartIdx) { - // small needle sets should be handled by qfind_first_byte_of_needles16() - DCHECK_GT(needles.size(), 16); + DCHECK_GT(needles.size(), 16); // should handled by *needles16() method DCHECK(blockStartIdx + 16 <= haystack.size() || (PAGE_FOR(haystack.data() + blockStartIdx) == PAGE_FOR(haystack.data() + blockStartIdx + 15))); - size_t b = 16; - auto arr1 = __builtin_ia32_loaddqu(haystack.data() + blockStartIdx); - int64_t j = 0; - for (; j < static_cast(needles.size()) - 16; j += 16) { - auto arr2 = __builtin_ia32_loaddqu(needles.data() + j); - auto index = __builtin_ia32_pcmpestri128( - arr2, 16, arr1, haystack.size() - blockStartIdx, 0); - b = std::min(index, b); + + __v16qi arr1; + if (HAYSTACK_ALIGNED) { + void* ptr1 = __builtin_assume_aligned(haystack.data() + blockStartIdx, 16); + arr1 = *reinterpret_cast(ptr1); + } else { + arr1 = __builtin_ia32_loaddqu(haystack.data() + blockStartIdx); } - // Avoid reading any bytes past the end needles by just reading the last - // 16 bytes of needles. We know this is safe because needles.size() > 16. - auto arr2 = __builtin_ia32_loaddqu(needles.end() - 16); - auto index = __builtin_ia32_pcmpestri128( + // This load is safe because needles.size() >= 16 + auto arr2 = __builtin_ia32_loaddqu(needles.data()); + size_t b = __builtin_ia32_pcmpestri128( arr2, 16, arr1, haystack.size() - blockStartIdx, 0); - b = std::min(index, b); + + size_t j = nextAlignedIndex(needles.data()); + for (; j < needles.size(); j += 16) { + void* ptr2 = __builtin_assume_aligned(needles.data() + j, 16); + arr2 = *reinterpret_cast(ptr2); + + auto index = __builtin_ia32_pcmpestri128( + arr2, needles.size() - j, arr1, haystack.size() - blockStartIdx, 0); + b = std::min(index, b); + } if (b < 16) { return blockStartIdx + b; @@ -216,23 +224,25 @@ size_t qfind_first_byte_of_sse42(const StringPiece& haystack, return qfind_first_byte_of_needles16(haystack, needles); } - int64_t i = 0; - for (; i < static_cast(haystack.size()) - 16; i += 16) { - auto ret = scanHaystackBlock(haystack, needles, i); - if (ret != StringPiece::npos) { - return ret; + if (haystack.size() < 16 && + PAGE_FOR(haystack.end() - 1) != PAGE_FOR(haystack.data() + 16)) { + // We can't safely SSE-load haystack. Use a different approach. + if (haystack.size() <= 2) { + return qfind_first_of(haystack, needles, asciiCaseSensitive); } - }; + return qfind_first_byte_of_byteset(haystack, needles); + } - if (i == haystack.size() - 16 || - PAGE_FOR(haystack.end() - 1) == PAGE_FOR(haystack.data() + i + 15)) { - return scanHaystackBlock(haystack, needles, i); - } else { - auto ret = qfind_first_byte_of_nosse(StringPiece(haystack.data() + i, - haystack.end()), - needles); + auto ret = scanHaystackBlock(haystack, needles, 0); + if (ret != StringPiece::npos) { + return ret; + } + + size_t i = nextAlignedIndex(haystack.data()); + for (; i < haystack.size(); i += 16) { + auto ret = scanHaystackBlock(haystack, needles, i); if (ret != StringPiece::npos) { - return i + ret; + return ret; } } diff --git a/folly/test/RangeFindBenchmark.cpp b/folly/test/RangeFindBenchmark.cpp index 031c67b8..11d555a5 100644 --- a/folly/test/RangeFindBenchmark.cpp +++ b/folly/test/RangeFindBenchmark.cpp @@ -40,6 +40,9 @@ using namespace std; namespace { std::string str; +constexpr int kVstrSize = 16; +std::vector vstr; +std::vector vstrp; void initStr(int len) { cout << "string length " << len << ':' << endl; @@ -47,6 +50,23 @@ void initStr(int len) { str.reserve(len + 1); str.append(len, 'a'); str.append(1, 'b'); + + // create 16 copies of str, each with a different 16byte alignment. + // Useful because some implementations of find_first_of have different + // behaviors based on byte alignment. + for (int i = 0; i < kVstrSize; ++i) { + string s(i, '$'); + s += str; + if (i % 2) { + // some find_first_of implementations have special (page-safe) logic + // for handling the end of a string. Flex that logic only sometimes. + s += string(32, '$'); + } + vstr.push_back(s); + StringPiece sp(vstr.back()); + sp.advance(i); + vstrp.push_back(sp); + } } std::mt19937 rnd; @@ -118,8 +138,8 @@ inline size_t qfind_first_byte_of_std(const StringPiece& haystack, template void findFirstOfRange(StringPiece needles, Func func, size_t n) { - StringPiece haystack(str); FOR_EACH_RANGE (i, 0, n) { + const StringPiece& haystack = vstr[i % kVstrSize]; doNotOptimizeAway(func(haystack, needles)); char x = haystack[0]; doNotOptimizeAway(&x); -- 2.34.1