using namespace folly;
using namespace detail;
using namespace test;
+using std::chrono::time_point;
+using std::chrono::steady_clock;
+using std::chrono::seconds;
+using std::chrono::milliseconds;
+using std::string;
+using std::make_unique;
+using std::unique_ptr;
+using std::vector;
typedef DeterministicSchedule DSched;
Atom<uint32_t> spinThreshold(0);
int prev = -1;
- std::vector<std::thread> threads(numThreads);
+ vector<std::thread> threads(numThreads);
for (int i = 0; i < numThreads; ++i) {
threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
cq.blockingRead(dest);
EXPECT_TRUE(cq.write(std::move(dest)));
EXPECT_TRUE(cq.read(dest));
+ auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
+ EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
+ EXPECT_TRUE(cq.read(dest));
+ auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
+ EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
+ EXPECT_TRUE(cq.read(dest));
}
struct RefCounted {
TEST(MPMCQueue, lots_of_element_types) {
runElementTypeTest(10);
- runElementTypeTest(std::string("abc"));
- runElementTypeTest(std::make_pair(10, std::string("def")));
- runElementTypeTest(std::vector<std::string>{ { "abc" } });
+ runElementTypeTest(string("abc"));
+ runElementTypeTest(std::make_pair(10, string("def")));
+ runElementTypeTest(vector<string>{{"abc"}});
runElementTypeTest(std::make_shared<char>('a'));
runElementTypeTest(folly::make_unique<char>('a'));
runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
MPMCQueue<int,Atom> cq(numThreads);
uint64_t n = numOps;
- std::vector<std::thread> threads(numThreads);
+ vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
}
template <typename Q>
-std::string producerConsumerBench(Q&& queue, std::string qName,
- int numProducers, int numConsumers,
- int numOps, bool ignoreContents = false) {
+struct WriteMethodCaller {
+ WriteMethodCaller() {}
+ virtual ~WriteMethodCaller() = default;
+ virtual bool callWrite(Q& q, int i) = 0;
+ virtual string methodName() = 0;
+};
+
+template <typename Q>
+struct BlockingWriteCaller : public WriteMethodCaller<Q> {
+ bool callWrite(Q& q, int i) override {
+ q.blockingWrite(i);
+ return true;
+ }
+ string methodName() override { return "blockingWrite"; }
+};
+
+template <typename Q>
+struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
+ bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
+ string methodName() override { return "writeIfNotFull"; }
+};
+
+template <typename Q>
+struct WriteCaller : public WriteMethodCaller<Q> {
+ bool callWrite(Q& q, int i) override { return q.write(i); }
+ string methodName() override { return "write"; }
+};
+
+template <typename Q,
+ class Clock = steady_clock,
+ class Duration = typename Clock::duration>
+struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
+ const Duration duration_;
+ explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
+ bool callWrite(Q& q, int i) override {
+ auto then = Clock::now() + duration_;
+ return q.tryWriteUntil(then, i);
+ }
+ string methodName() override {
+ return folly::sformat(
+ "tryWriteUntil({}ms)",
+ std::chrono::duration_cast<milliseconds>(duration_).count());
+ }
+};
+
+template <typename Q>
+string producerConsumerBench(Q&& queue,
+ string qName,
+ int numProducers,
+ int numConsumers,
+ int numOps,
+ WriteMethodCaller<Q>& writer,
+ bool ignoreContents = false) {
Q& q = queue;
struct rusage beginUsage;
uint64_t n = numOps;
std::atomic<uint64_t> sum(0);
+ std::atomic<uint64_t> failed(0);
- std::vector<std::thread> producers(numProducers);
+ vector<std::thread> producers(numProducers);
for (int t = 0; t < numProducers; ++t) {
producers[t] = DSched::thread([&,t]{
for (int i = t; i < numOps; i += numProducers) {
- q.blockingWrite(i);
+ while (!writer.callWrite(q, i)) {
+ ++failed;
+ }
}
});
}
- std::vector<std::thread> consumers(numConsumers);
+ vector<std::thread> consumers(numConsumers);
for (int t = 0; t < numConsumers; ++t) {
consumers[t] = DSched::thread([&,t]{
uint64_t localSum = 0;
uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
(beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
+ uint64_t failures = failed;
- return folly::format(
- "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
- qName, numProducers, numConsumers, nanosPer, csw, n).str();
+ return folly::sformat(
+ "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
+ "handoff, {} failures",
+ qName,
+ numProducers,
+ writer.methodName(),
+ numConsumers,
+ nanosPer,
+ csw,
+ n,
+ failures);
}
-
TEST(MPMCQueue, mt_prod_cons_deterministic) {
// we use the Bench method, but perf results are meaningless under DSched
DSched sched(DSched::uniform(0));
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
- "", 1, 1, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
- "", 10, 10, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
- "", 1, 1, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
- "", 10, 10, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
- "", 10, 10, 1000);
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
+ callers;
+ callers.emplace_back(
+ make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+ milliseconds(1)));
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+ seconds(2)));
+
+ for (const auto& caller : callers) {
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+ "MPMCQueue<int, DeterministicAtomic>(10)",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+ "MPMCQueue<int, DeterministicAtomic>(100)",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+ "MPMCQueue<int, DeterministicAtomic>(10)",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+ "MPMCQueue<int, DeterministicAtomic>(100)",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
+ "MPMCQueue<int, DeterministicAtomic>(1)",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ }
}
#define PC_BENCH(q, np, nc, ...) \
TEST(MPMCQueue, mt_prod_cons) {
int n = 100000;
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+ for (const auto& caller : callers) {
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+ }
}
TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
int n = 100000;
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
- LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
- LOG(INFO)
- << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
-}
-
-template <template<typename> class Atom>
-void runNeverFailThread(
- int numThreads,
- int n, /*numOps*/
- MPMCQueue<int, Atom>& cq,
- std::atomic<uint64_t>& sum,
- int t) {
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
+ callers;
+ callers.emplace_back(
+ make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+ milliseconds(1)));
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+ seconds(2)));
+ for (const auto& caller : callers) {
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+ }
+}
+
+template <template <typename> class Atom>
+void runNeverFailThread(int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
uint64_t threadSum = 0;
for (int i = t; i < n; i += numThreads) {
// enq + deq
sum += threadSum;
}
-template <template<typename> class Atom>
+template <template <typename> class Atom>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
- MPMCQueue<int,Atom> cq(numThreads);
+ MPMCQueue<int, Atom> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
- std::vector<std::thread> threads(numThreads);
+ vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
- numThreads, n, std::ref(cq), std::ref(sum), t));
+ numThreads,
+ n,
+ std::ref(cq),
+ std::ref(sum),
+ t));
}
for (auto& t : threads) {
DSched::join(t);
}
TEST(MPMCQueue, mt_never_fail) {
- int nts[] = { 1, 3, 100 };
+ int nts[] = {1, 3, 100};
int n = 100000;
for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
- LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
- << nt << " threads";
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
}
}
TEST(MPMCQueue, mt_never_fail_emulated_futex) {
- int nts[] = { 1, 3, 100 };
+ int nts[] = {1, 3, 100};
int n = 100000;
for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
- LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
- << nt << " threads";
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
}
}
TEST(MPMCQueue, mt_never_fail_deterministic) {
- int nts[] = { 3, 10 };
+ int nts[] = {3, 10};
long seed = 0; // nowMicro() % 10000;
LOG(INFO) << "using seed " << seed;
}
}
+template <class Clock, template <typename> class Atom>
+void runNeverFailUntilThread(int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ for (int i = t; i < n; i += numThreads) {
+ // enq + deq
+ auto soon = Clock::now() + std::chrono::seconds(1);
+ EXPECT_TRUE(cq.tryWriteUntil(soon, i));
+
+ int dest = -1;
+ EXPECT_TRUE(cq.readIfNotEmpty(dest));
+ EXPECT_TRUE(dest >= 0);
+ threadSum += dest;
+ }
+ sum += threadSum;
+}
+
+template <class Clock, template <typename> class Atom>
+uint64_t runNeverFailTest(int numThreads, int numOps) {
+ // always #enq >= #deq
+ MPMCQueue<int, Atom> cq(numThreads);
+
+ uint64_t n = numOps;
+ auto beginMicro = nowMicro();
+
+ vector<std::thread> threads(numThreads);
+ std::atomic<uint64_t> sum(0);
+ for (int t = 0; t < numThreads; ++t) {
+ threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
+ numThreads,
+ n,
+ std::ref(cq),
+ std::ref(sum),
+ t));
+ }
+ for (auto& t : threads) {
+ DSched::join(t);
+ }
+ EXPECT_TRUE(cq.isEmpty());
+ EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+
+ return nowMicro() - beginMicro;
+}
+
+TEST(MPMCQueue, mt_never_fail_until_system) {
+ int nts[] = {1, 3, 100};
+
+ int n = 100000;
+ for (int nt : nts) {
+ uint64_t elapsed =
+ runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
+ }
+}
+
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+ int nts[] = {1, 3, 100};
+
+ int n = 100000;
+ for (int nt : nts) {
+ uint64_t elapsed =
+ runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
+ }
+}
+
enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,