/*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
typedef DeterministicSchedule DSched;
+template <template<typename> class Atom>
+void run_mt_sequencer_thread(
+ int numThreads,
+ int numOps,
+ uint32_t init,
+ TurnSequencer<Atom>& seq,
+ Atom<uint32_t>& spinThreshold,
+ int& prev,
+ int i) {
+ for (int op = i; op < numOps; op += numThreads) {
+ seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
+ EXPECT_EQ(prev, op - 1);
+ prev = op;
+ seq.completeTurn(init + op);
+ }
+}
template <template<typename> class Atom>
void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
TurnSequencer<Atom> seq(init);
- Atom<int> spinThreshold(0);
+ Atom<uint32_t> spinThreshold(0);
int prev = -1;
std::vector<std::thread> threads(numThreads);
for (int i = 0; i < numThreads; ++i) {
- threads[i] = DSched::thread([&, i]{
- for (int op = i; op < numOps; op += numThreads) {
- seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
- EXPECT_EQ(prev, op - 1);
- prev = op;
- seq.completeTurn(init + op);
- }
- });
+ threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
+ numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
+ std::ref(prev), i));
}
for (auto& thr : threads) {
run_mt_sequencer_test<std::atomic>(100, 10000, -100);
}
+TEST(MPMCQueue, sequencer_emulated_futex) {
+ run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
+ run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
+ run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
+}
+
TEST(MPMCQueue, sequencer_deterministic) {
DSched sched(DSched::uniform(0));
run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
}
struct RefCounted {
+ static __thread int active_instances;
+
mutable std::atomic<int> rc;
- RefCounted() : rc(0) {}
+ RefCounted() : rc(0) {
+ ++active_instances;
+ }
+
+ ~RefCounted() {
+ --active_instances;
+ }
};
+__thread int RefCounted::active_instances;
+
void intrusive_ptr_add_ref(RefCounted const* p) {
p->rc++;
}
void intrusive_ptr_release(RefCounted const* p) {
- if (--(p->rc)) {
+ if (--(p->rc) == 0) {
delete p;
}
}
runElementTypeTest(std::make_shared<char>('a'));
runElementTypeTest(folly::make_unique<char>('a'));
runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
+ EXPECT_EQ(RefCounted::active_instances, 0);
}
TEST(MPMCQueue, single_thread_enqdeq) {
TEST(MPMCQueue, tryenq_capacity_test) {
for (size_t cap = 1; cap < 100; ++cap) {
MPMCQueue<int> cq(cap);
- for (int i = 0; i < cap; ++i) {
+ for (size_t i = 0; i < cap; ++i) {
EXPECT_TRUE(cq.write(i));
}
EXPECT_FALSE(cq.write(100));
}
}
+template <template<typename> class Atom>
+void runTryEnqDeqThread(
+ int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ int src = t;
+ // received doesn't reflect any actual values, we just start with
+ // t and increment by numThreads to get the rounding of termination
+ // correct if numThreads doesn't evenly divide numOps
+ int received = t;
+ while (src < n || received < n) {
+ if (src < n && cq.write(src)) {
+ src += numThreads;
+ }
+
+ int dst;
+ if (received < n && cq.read(dst)) {
+ received += numThreads;
+ threadSum += dst;
+ }
+ }
+ sum += threadSum;
+}
+
template <template<typename> class Atom>
void runTryEnqDeqTest(int numThreads, int numOps) {
// write and read aren't linearizable, so we don't have
std::vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread([&,t]{
- uint64_t threadSum = 0;
- int src = t;
- // received doesn't reflect any actual values, we just start with
- // t and increment by numThreads to get the rounding of termination
- // correct if numThreads doesn't evenly divide numOps
- int received = t;
- while (src < n || received < n) {
- if (src < n && cq.write(src)) {
- src += numThreads;
- }
-
- int dst;
- if (received < n && cq.read(dst)) {
- received += numThreads;
- threadSum += dst;
- }
- }
- sum += threadSum;
- });
+ threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+ numThreads, n, std::ref(cq), std::ref(sum), t));
}
for (auto& t : threads) {
DSched::join(t);
}
}
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
+ int nts[] = { 1, 3, 100 };
+
+ int n = 100000;
+ for (int nt : nts) {
+ runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
+ }
+}
+
TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
int nts[] = { 3, 10 };
"", 10, 10, 1000);
}
-#define PC_BENCH(q, np, nc, nops...) \
- producerConsumerBench(q, #q, (np), (nc), nops)
+#define PC_BENCH(q, np, nc, ...) \
+ producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
TEST(MPMCQueue, mt_prod_cons) {
int n = 100000;
LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
}
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+ int n = 100000;
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
+ LOG(INFO)
+ << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
+}
+
+template <template<typename> class Atom>
+void runNeverFailThread(
+ int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ for (int i = t; i < n; i += numThreads) {
+ // enq + deq
+ EXPECT_TRUE(cq.writeIfNotFull(i));
+
+ int dest = -1;
+ EXPECT_TRUE(cq.readIfNotEmpty(dest));
+ EXPECT_TRUE(dest >= 0);
+ threadSum += dest;
+ }
+ sum += threadSum;
+}
+
template <template<typename> class Atom>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
std::vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread([&,t]{
- uint64_t threadSum = 0;
- for (int i = t; i < n; i += numThreads) {
- // enq + deq
- EXPECT_TRUE(cq.writeIfNotFull(i));
-
- int dest = -1;
- EXPECT_TRUE(cq.readIfNotEmpty(dest));
- EXPECT_TRUE(dest >= 0);
- threadSum += dest;
- }
- sum += threadSum;
- });
+ threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+ numThreads, n, std::ref(cq), std::ref(sum), t));
}
for (auto& t : threads) {
DSched::join(t);
}
}
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+ int nts[] = { 1, 3, 100 };
+
+ int n = 100000;
+ for (int nt : nts) {
+ uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
+ << nt << " threads";
+ }
+}
+
TEST(MPMCQueue, mt_never_fail_deterministic) {
int nts[] = { 3, 10 };
MAX_LIFECYCLE_EVENT
};
-static __thread int lc_counts[MAX_LIFECYCLE_EVENT];
-static __thread int lc_prev[MAX_LIFECYCLE_EVENT];
+static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
+static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
static int lc_outstanding() {
return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
}
}
-#define LIFECYCLE_STEP(args...) lc_step(__LINE__, args)
+#define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
int main(int argc, char ** argv) {
testing::InitGoogleTest(&argc, argv);
- google::ParseCommandLineFlags(&argc, &argv, true);
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
}
-