2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/ThreadCachedInt.h>
20 #include <condition_variable>
23 #include <glog/logging.h>
25 #include <folly/Benchmark.h>
26 #include <folly/Hash.h>
27 #include <folly/ThreadId.h>
28 #include <folly/portability/GFlags.h>
29 #include <folly/portability/GTest.h>
31 using namespace folly;
33 using std::unique_ptr;
36 using Counter = ThreadCachedInt<int64_t>;
38 class ThreadCachedIntTest : public testing::Test {
40 uint32_t GetDeadThreadsTotal(const Counter& counter) {
41 return counter.readFast();
45 // Multithreaded tests. Creates a specified number of threads each of
46 // which iterates a different amount and dies.
49 // Set cacheSize to be large so cached data moves to target_ only when
51 Counter g_counter_for_mt_slow(0, UINT32_MAX);
52 Counter g_counter_for_mt_fast(0, UINT32_MAX);
54 // Used to sync between threads. The value of this variable is the
55 // maximum iteration index upto which Runner() is allowed to go.
56 uint32_t g_sync_for_mt(0);
57 std::condition_variable cv;
60 // Performs the specified number of iterations. Within each
61 // iteration, it increments counter 10 times. At the beginning of
62 // each iteration it checks g_sync_for_mt to see if it can proceed,
63 // otherwise goes into a loop sleeping and rechecking.
64 void Runner(Counter* counter, uint32_t iterations) {
65 for (uint32_t i = 0; i < iterations; ++i) {
66 std::unique_lock<std::mutex> lk(cv_m);
67 cv.wait(lk, [i] { return i < g_sync_for_mt; });
68 for (uint32_t j = 0; j < 10; ++j) {
69 counter->increment(1);
75 // Slow test with fewer threads where there are more busy waits and
76 // many calls to readFull(). This attempts to test as many of the
77 // code paths in Counter as possible to ensure that counter values are
78 // properly passed from thread local state, both at calls to
79 // readFull() and at thread death.
80 TEST_F(ThreadCachedIntTest, MultithreadedSlow) {
81 static constexpr uint32_t kNumThreads = 20;
83 vector<unique_ptr<std::thread>> threads(kNumThreads);
84 // Creates kNumThreads threads. Each thread performs a different
85 // number of iterations in Runner() - threads[0] performs 1
86 // iteration, threads[1] performs 2 iterations, threads[2] performs
87 // 3 iterations, and so on.
88 for (uint32_t i = 0; i < kNumThreads; ++i) {
89 threads[i].reset(new std::thread(Runner, &g_counter_for_mt_slow, i + 1));
91 // Variable to grab current counter value.
92 int32_t counter_value;
93 // The expected value of the counter.
95 // The expected value of GetDeadThreadsTotal().
96 int32_t dead_total = 0;
97 // Each iteration of the following thread allows one additional
98 // iteration of the threads. Given that the threads perform
99 // different number of iterations from 1 through kNumThreads, one
100 // thread will complete in each of the iterations of the loop below.
101 for (uint32_t i = 0; i < kNumThreads; ++i) {
102 // Allow upto iteration i on all threads.
104 std::lock_guard<std::mutex> lk(cv_m);
105 g_sync_for_mt = i + 1;
108 total += (kNumThreads - i) * 10;
109 // Loop until the counter reaches its expected value.
111 counter_value = g_counter_for_mt_slow.readFull();
112 } while (counter_value < total);
113 // All threads have done what they can until iteration i, now make
114 // sure they don't go further by checking 10 more times in the
116 for (uint32_t j = 0; j < 10; ++j) {
117 counter_value = g_counter_for_mt_slow.readFull();
118 EXPECT_EQ(total, counter_value);
120 dead_total += (i + 1) * 10;
121 EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
123 // All threads are done.
124 for (uint32_t i = 0; i < kNumThreads; ++i) {
127 counter_value = g_counter_for_mt_slow.readFull();
128 EXPECT_EQ(total, counter_value);
129 EXPECT_EQ(total, dead_total);
130 EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
133 // Fast test with lots of threads and only one call to readFull()
135 TEST_F(ThreadCachedIntTest, MultithreadedFast) {
136 static constexpr uint32_t kNumThreads = 1000;
138 vector<unique_ptr<std::thread>> threads(kNumThreads);
139 // Creates kNumThreads threads. Each thread performs a different
140 // number of iterations in Runner() - threads[0] performs 1
141 // iteration, threads[1] performs 2 iterations, threads[2] performs
142 // 3 iterations, and so on.
143 for (uint32_t i = 0; i < kNumThreads; ++i) {
144 threads[i].reset(new std::thread(Runner, &g_counter_for_mt_fast, i + 1));
146 // Let the threads run to completion.
148 std::lock_guard<std::mutex> lk(cv_m);
149 g_sync_for_mt = kNumThreads;
152 // The expected value of the counter.
154 for (uint32_t i = 0; i < kNumThreads; ++i) {
155 total += (kNumThreads - i) * 10;
157 // Wait for all threads to complete.
158 for (uint32_t i = 0; i < kNumThreads; ++i) {
161 int32_t counter_value = g_counter_for_mt_fast.readFull();
162 EXPECT_EQ(total, counter_value);
163 EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast));
166 TEST(ThreadCachedInt, SingleThreadedNotCached) {
167 ThreadCachedInt<int64_t> val(0, 0);
168 EXPECT_EQ(0, val.readFast());
170 EXPECT_EQ(1, val.readFast());
171 for (int i = 0; i < 41; ++i) {
174 EXPECT_EQ(42, val.readFast());
176 EXPECT_EQ(41, val.readFast());
179 // Note: This is somewhat fragile to the implementation. If this causes
180 // problems, feel free to remove it.
181 TEST(ThreadCachedInt, SingleThreadedCached) {
182 ThreadCachedInt<int64_t> val(0, 10);
183 EXPECT_EQ(0, val.readFast());
185 EXPECT_EQ(0, val.readFast());
186 for (int i = 0; i < 7; ++i) {
189 EXPECT_EQ(0, val.readFast());
190 EXPECT_EQ(0, val.readFastAndReset());
191 EXPECT_EQ(8, val.readFull());
192 EXPECT_EQ(8, val.readFullAndReset());
193 EXPECT_EQ(0, val.readFull());
194 EXPECT_EQ(0, val.readFast());
197 ThreadCachedInt<int32_t> globalInt32(0, 11);
198 ThreadCachedInt<int64_t> globalInt64(0, 11);
199 int kNumInserts = 100000;
200 DEFINE_int32(numThreads, 8, "Number simultaneous threads for benchmarks.");
201 #define CREATE_INC_FUNC(size) \
202 void incFunc ## size () { \
203 const int num = kNumInserts / FLAGS_numThreads; \
204 for (int i = 0; i < num; ++i) { \
205 ++globalInt ## size ; \
211 // Confirms counts are accurate with competing threads
212 TEST(ThreadCachedInt, MultiThreadedCached) {
213 kNumInserts = 100000;
214 CHECK_EQ(0, kNumInserts % FLAGS_numThreads) <<
215 "FLAGS_numThreads must evenly divide kNumInserts (" << kNumInserts << ").";
216 const int numPerThread = kNumInserts / FLAGS_numThreads;
217 ThreadCachedInt<int64_t> TCInt64(0, numPerThread - 2);
219 std::atomic<bool> run(true);
220 std::atomic<int> threadsDone(0);
221 std::vector<std::thread> threads;
222 for (int i = 0; i < FLAGS_numThreads; ++i) {
223 threads.push_back(std::thread([&] {
224 FOR_EACH_RANGE(k, 0, numPerThread) {
227 std::atomic_fetch_add(&threadsDone, 1);
228 while (run.load()) { usleep(100); }
232 // We create and increment another ThreadCachedInt here to make sure it
233 // doesn't interact with the other instances
234 ThreadCachedInt<int64_t> otherTCInt64(0, 10);
235 otherTCInt64.set(33);
238 while (threadsDone.load() < FLAGS_numThreads) { usleep(100); }
242 // Threads are done incrementing, but caches have not been flushed yet, so
243 // we have to readFull.
244 EXPECT_NE(kNumInserts, TCInt64.readFast());
245 EXPECT_EQ(kNumInserts, TCInt64.readFull());
248 for (auto& t : threads) {
252 } // Caches are flushed when threads finish
253 EXPECT_EQ(kNumInserts, TCInt64.readFast());
256 #define MAKE_MT_CACHE_SIZE_BM(size) \
257 void BM_mt_cache_size ## size (int iters, int cacheSize) { \
258 kNumInserts = iters; \
259 globalInt ## size.set(0); \
260 globalInt ## size.setCacheSize(cacheSize); \
261 std::vector<std::thread> threads; \
262 for (int i = 0; i < FLAGS_numThreads; ++i) { \
263 threads.push_back(std::thread(incFunc ## size)); \
265 for (auto& t : threads) { \
269 MAKE_MT_CACHE_SIZE_BM(64);
270 MAKE_MT_CACHE_SIZE_BM(32);
272 #define REG_BASELINE(name, inc_stmt) \
273 BENCHMARK(FB_CONCATENATE(BM_mt_baseline_, name), iters) { \
274 const int iterPerThread = iters / FLAGS_numThreads; \
275 std::vector<std::thread> threads; \
276 for (int i = 0; i < FLAGS_numThreads; ++i) { \
277 threads.push_back(std::thread([&]() { \
278 for (int j = 0; j < iterPerThread; ++j) { \
283 for (auto& t : threads) { \
288 ThreadLocal<int64_t> globalTL64Baseline;
289 ThreadLocal<int32_t> globalTL32Baseline;
290 std::atomic<int64_t> globalInt64Baseline(0);
291 std::atomic<int32_t> globalInt32Baseline(0);
292 FOLLY_TLS int64_t global__thread64;
293 FOLLY_TLS int32_t global__thread32;
295 // Alternate lock-free implementation. Achieves about the same performance,
296 // but uses about 20x more memory than ThreadCachedInt with 24 threads.
297 struct ShardedAtomicInt {
298 static const int64_t kBuckets_ = 2048;
299 std::atomic<int64_t> ints_[kBuckets_];
301 inline void inc(int64_t val = 1) {
302 int buck = hash::twang_mix64(folly::getCurrentThreadID()) & (kBuckets_ - 1);
303 std::atomic_fetch_add(&ints_[buck], val);
306 // read the first few and extrapolate
309 static const int numToRead = 8;
310 FOR_EACH_RANGE(i, 0, numToRead) {
311 ret += ints_[i].load(std::memory_order_relaxed);
313 return ret * (kBuckets_ / numToRead);
316 // readFull is lock-free, but has to do thousands of loads...
319 for (auto& i : ints_) {
320 // Fun fact - using memory_order_consume below reduces perf 30-40% in high
321 // contention benchmarks.
322 ret += i.load(std::memory_order_relaxed);
327 ShardedAtomicInt shd_int64;
329 REG_BASELINE(_thread64, global__thread64 += 1);
330 REG_BASELINE(_thread32, global__thread32 += 1);
331 REG_BASELINE(ThreadLocal64, *globalTL64Baseline += 1);
332 REG_BASELINE(ThreadLocal32, *globalTL32Baseline += 1);
333 REG_BASELINE(atomic_inc64,
334 std::atomic_fetch_add(&globalInt64Baseline, int64_t(1)));
335 REG_BASELINE(atomic_inc32,
336 std::atomic_fetch_add(&globalInt32Baseline, int32_t(1)));
337 REG_BASELINE(ShardedAtm64, shd_int64.inc());
339 BENCHMARK_PARAM(BM_mt_cache_size64, 0);
340 BENCHMARK_PARAM(BM_mt_cache_size64, 10);
341 BENCHMARK_PARAM(BM_mt_cache_size64, 100);
342 BENCHMARK_PARAM(BM_mt_cache_size64, 1000);
343 BENCHMARK_PARAM(BM_mt_cache_size32, 0);
344 BENCHMARK_PARAM(BM_mt_cache_size32, 10);
345 BENCHMARK_PARAM(BM_mt_cache_size32, 100);
346 BENCHMARK_PARAM(BM_mt_cache_size32, 1000);
347 BENCHMARK_DRAW_LINE();
350 BENCHMARK(Atomic_readFull) {
351 doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed));
353 BENCHMARK(ThrCache_readFull) {
354 doNotOptimizeAway(globalInt64.readFull());
356 BENCHMARK(Sharded_readFull) {
357 doNotOptimizeAway(shd_int64.readFull());
359 BENCHMARK(ThrCache_readFast) {
360 doNotOptimizeAway(globalInt64.readFast());
362 BENCHMARK(Sharded_readFast) {
363 doNotOptimizeAway(shd_int64.readFast());
365 BENCHMARK_DRAW_LINE();
368 REG_BASELINE(Atomic_readFull,
369 doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed)));
370 REG_BASELINE(ThrCache_readFull, doNotOptimizeAway(globalInt64.readFull()));
371 REG_BASELINE(Sharded_readFull, doNotOptimizeAway(shd_int64.readFull()));
372 REG_BASELINE(ThrCache_readFast, doNotOptimizeAway(globalInt64.readFast()));
373 REG_BASELINE(Sharded_readFast, doNotOptimizeAway(shd_int64.readFast()));
374 BENCHMARK_DRAW_LINE();
376 int main(int argc, char** argv) {
377 testing::InitGoogleTest(&argc, argv);
378 gflags::ParseCommandLineFlags(&argc, &argv, true);
379 gflags::SetCommandLineOptionWithMode(
380 "bm_min_usec", "10000", gflags::SET_FLAG_IF_DEFAULT
382 if (FLAGS_benchmark) {
383 folly::runBenchmarks();
385 return RUN_ALL_TESTS();
389 Ran with 20 threads on dual 12-core Xeon(R) X5650 @ 2.67GHz with 12-MB caches
391 Benchmark Iters Total t t/iter iter/sec
392 ------------------------------------------------------------------------------
393 + 103% BM_mt_baseline__thread64 10000000 13.54 ms 1.354 ns 704.4 M
394 * BM_mt_baseline__thread32 10000000 6.651 ms 665.1 ps 1.4 G
395 +50.3% BM_mt_baseline_ThreadLocal64 10000000 9.994 ms 999.4 ps 954.2 M
396 +49.9% BM_mt_baseline_ThreadLocal32 10000000 9.972 ms 997.2 ps 956.4 M
397 +2650% BM_mt_baseline_atomic_inc64 10000000 182.9 ms 18.29 ns 52.13 M
398 +2665% BM_mt_baseline_atomic_inc32 10000000 183.9 ms 18.39 ns 51.85 M
399 +75.3% BM_mt_baseline_ShardedAtm64 10000000 11.66 ms 1.166 ns 817.8 M
400 +6670% BM_mt_cache_size64/0 10000000 450.3 ms 45.03 ns 21.18 M
401 +1644% BM_mt_cache_size64/10 10000000 116 ms 11.6 ns 82.2 M
402 + 381% BM_mt_cache_size64/100 10000000 32.04 ms 3.204 ns 297.7 M
403 + 129% BM_mt_cache_size64/1000 10000000 15.24 ms 1.524 ns 625.8 M
404 +6052% BM_mt_cache_size32/0 10000000 409.2 ms 40.92 ns 23.31 M
405 +1304% BM_mt_cache_size32/10 10000000 93.39 ms 9.339 ns 102.1 M
406 + 298% BM_mt_cache_size32/100 10000000 26.52 ms 2.651 ns 359.7 M
407 +68.1% BM_mt_cache_size32/1000 10000000 11.18 ms 1.118 ns 852.9 M
408 ------------------------------------------------------------------------------
409 +10.4% Atomic_readFull 10000000 36.05 ms 3.605 ns 264.5 M
410 + 619% ThrCache_readFull 10000000 235.1 ms 23.51 ns 40.57 M
411 SLOW Sharded_readFull 1981093 2 s 1.01 us 967.3 k
412 * ThrCache_readFast 10000000 32.65 ms 3.265 ns 292.1 M
413 +10.0% Sharded_readFast 10000000 35.92 ms 3.592 ns 265.5 M
414 ------------------------------------------------------------------------------
415 +4.54% BM_mt_baseline_Atomic_readFull 10000000 8.672 ms 867.2 ps 1.074 G
416 SLOW BM_mt_baseline_ThrCache_readFull 10000000 996.9 ms 99.69 ns 9.567 M
417 SLOW BM_mt_baseline_Sharded_readFull 10000000 891.5 ms 89.15 ns 10.7 M
418 * BM_mt_baseline_ThrCache_readFast 10000000 8.295 ms 829.5 ps 1.123 G
419 +12.7% BM_mt_baseline_Sharded_readFast 10000000 9.348 ms 934.8 ps 1020 M
420 ------------------------------------------------------------------------------