2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
44 using std::unique_ptr;
47 typedef DeterministicSchedule DSched;
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
54 TurnSequencer<Atom>& seq,
55 Atom<uint32_t>& spinThreshold,
58 for (int op = i; op < numOps; op += numThreads) {
59 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60 EXPECT_EQ(prev, op - 1);
62 seq.completeTurn(init + op);
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68 TurnSequencer<Atom> seq(init);
69 Atom<uint32_t> spinThreshold(0);
72 vector<std::thread> threads(numThreads);
73 for (int i = 0; i < numThreads; ++i) {
74 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
79 for (auto& thr : threads) {
83 EXPECT_EQ(prev, numOps - 1);
86 TEST(MPMCQueue, sequencer) {
87 run_mt_sequencer_test<std::atomic>(1, 100, 0);
88 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
98 TEST(MPMCQueue, sequencer_deterministic) {
99 DSched sched(DSched::uniform(0));
100 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107 MPMCQueue<T, std::atomic, Dynamic> cq(10);
108 cq.blockingWrite(std::forward<T>(src));
110 cq.blockingRead(dest);
111 EXPECT_TRUE(cq.write(std::move(dest)));
112 EXPECT_TRUE(cq.read(dest));
113 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115 EXPECT_TRUE(cq.read(dest));
116 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118 EXPECT_TRUE(cq.read(dest));
122 static FOLLY_TLS int active_instances;
124 mutable std::atomic<int> rc;
126 RefCounted() : rc(0) {
134 FOLLY_TLS int RefCounted::active_instances;
136 void intrusive_ptr_add_ref(RefCounted const* p) {
140 void intrusive_ptr_release(RefCounted const* p) {
141 if (--(p->rc) == 0) {
146 TEST(MPMCQueue, lots_of_element_types) {
147 runElementTypeTest(10);
148 runElementTypeTest(string("abc"));
149 runElementTypeTest(std::make_pair(10, string("def")));
150 runElementTypeTest(vector<string>{{"abc"}});
151 runElementTypeTest(std::make_shared<char>('a'));
152 runElementTypeTest(folly::make_unique<char>('a'));
153 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154 EXPECT_EQ(RefCounted::active_instances, 0);
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158 runElementTypeTest<true>(10);
159 runElementTypeTest<true>(string("abc"));
160 runElementTypeTest<true>(std::make_pair(10, string("def")));
161 runElementTypeTest<true>(vector<string>{{"abc"}});
162 runElementTypeTest<true>(std::make_shared<char>('a'));
163 runElementTypeTest<true>(folly::make_unique<char>('a'));
164 runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165 EXPECT_EQ(RefCounted::active_instances, 0);
168 TEST(MPMCQueue, single_thread_enqdeq) {
169 // Non-dynamic version only.
170 // False positive for dynamic version. Capacity can be temporarily
171 // higher than specified.
172 MPMCQueue<int> cq(10);
174 for (int pass = 0; pass < 10; ++pass) {
175 for (int i = 0; i < 10; ++i) {
176 EXPECT_TRUE(cq.write(i));
178 EXPECT_FALSE(cq.write(-1));
179 EXPECT_FALSE(cq.isEmpty());
180 EXPECT_EQ(cq.size(), 10);
182 for (int i = 0; i < 5; ++i) {
184 EXPECT_TRUE(cq.read(dest));
187 for (int i = 5; i < 10; ++i) {
189 cq.blockingRead(dest);
193 EXPECT_FALSE(cq.read(dest));
196 EXPECT_TRUE(cq.isEmpty());
197 EXPECT_EQ(cq.size(), 0);
201 TEST(MPMCQueue, tryenq_capacity_test) {
202 // Non-dynamic version only.
203 // False positive for dynamic version. Capacity can be temporarily
204 // higher than specified.
205 for (size_t cap = 1; cap < 100; ++cap) {
206 MPMCQueue<int> cq(cap);
207 for (size_t i = 0; i < cap; ++i) {
208 EXPECT_TRUE(cq.write(i));
210 EXPECT_FALSE(cq.write(100));
214 TEST(MPMCQueue, enq_capacity_test) {
215 // Non-dynamic version only.
216 // False positive for dynamic version. Capacity can be temporarily
217 // higher than specified.
218 for (auto cap : { 1, 100, 10000 }) {
219 MPMCQueue<int> cq(cap);
220 for (int i = 0; i < cap; ++i) {
225 auto thr = std::thread([&]{
226 cq.blockingWrite(100);
232 cq.blockingRead(dummy);
238 template <template<typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
242 MPMCQueue<int, Atom, Dynamic>& cq,
243 std::atomic<uint64_t>& sum,
245 uint64_t threadSum = 0;
247 // received doesn't reflect any actual values, we just start with
248 // t and increment by numThreads to get the rounding of termination
249 // correct if numThreads doesn't evenly divide numOps
251 while (src < n || received < n) {
252 if (src < n && cq.write(src)) {
257 if (received < n && cq.read(dst)) {
258 received += numThreads;
265 template <template<typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267 // write and read aren't linearizable, so we don't have
268 // hard guarantees on their individual behavior. We can still test
269 // correctness in aggregate
270 MPMCQueue<int,Atom, Dynamic> cq(numThreads);
273 vector<std::thread> threads(numThreads);
274 std::atomic<uint64_t> sum(0);
275 for (int t = 0; t < numThreads; ++t) {
276 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277 numThreads, n, std::ref(cq), std::ref(sum), t));
279 for (auto& t : threads) {
282 EXPECT_TRUE(cq.isEmpty());
283 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
286 TEST(MPMCQueue, mt_try_enq_deq) {
287 int nts[] = { 1, 3, 100 };
291 runTryEnqDeqTest<std::atomic>(nt, n);
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296 int nts[] = { 1, 3, 100 };
300 runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305 int nts[] = { 1, 3, 100 };
309 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314 int nts[] = { 1, 3, 100 };
318 runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323 int nts[] = { 3, 10 };
326 LOG(INFO) << "using seed " << seed;
331 DSched sched(DSched::uniform(seed));
332 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
335 DSched sched(DSched::uniformSubset(seed, 2));
336 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
339 DSched sched(DSched::uniform(seed));
340 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
343 DSched sched(DSched::uniformSubset(seed, 2));
344 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
349 uint64_t nowMicro() {
351 gettimeofday(&tv, 0);
352 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
355 template <typename Q>
356 struct WriteMethodCaller {
357 WriteMethodCaller() {}
358 virtual ~WriteMethodCaller() = default;
359 virtual bool callWrite(Q& q, int i) = 0;
360 virtual string methodName() = 0;
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365 bool callWrite(Q& q, int i) override {
369 string methodName() override { return "blockingWrite"; }
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375 string methodName() override { return "writeIfNotFull"; }
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380 bool callWrite(Q& q, int i) override { return q.write(i); }
381 string methodName() override { return "write"; }
384 template <typename Q,
385 class Clock = steady_clock,
386 class Duration = typename Clock::duration>
387 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
388 const Duration duration_;
389 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
390 bool callWrite(Q& q, int i) override {
391 auto then = Clock::now() + duration_;
392 return q.tryWriteUntil(then, i);
394 string methodName() override {
395 return folly::sformat(
396 "tryWriteUntil({}ms)",
397 std::chrono::duration_cast<milliseconds>(duration_).count());
401 template <typename Q>
402 string producerConsumerBench(Q&& queue,
407 WriteMethodCaller<Q>& writer,
408 bool ignoreContents = false) {
411 struct rusage beginUsage;
412 getrusage(RUSAGE_SELF, &beginUsage);
414 auto beginMicro = nowMicro();
417 std::atomic<uint64_t> sum(0);
418 std::atomic<uint64_t> failed(0);
420 vector<std::thread> producers(numProducers);
421 for (int t = 0; t < numProducers; ++t) {
422 producers[t] = DSched::thread([&,t]{
423 for (int i = t; i < numOps; i += numProducers) {
424 while (!writer.callWrite(q, i)) {
431 vector<std::thread> consumers(numConsumers);
432 for (int t = 0; t < numConsumers; ++t) {
433 consumers[t] = DSched::thread([&,t]{
434 uint64_t localSum = 0;
435 for (int i = t; i < numOps; i += numConsumers) {
437 q.blockingRead(dest);
438 EXPECT_FALSE(dest == -1);
445 for (auto& t : producers) {
448 for (auto& t : consumers) {
451 if (!ignoreContents) {
452 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
455 auto endMicro = nowMicro();
457 struct rusage endUsage;
458 getrusage(RUSAGE_SELF, &endUsage);
460 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
461 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
462 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
463 uint64_t failures = failed;
464 size_t allocated = q.allocatedCapacity();
466 return folly::sformat(
467 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
468 "handoff, {} failures, {} allocated",
480 template <bool Dynamic = false>
481 void runMtProdConsDeterministic(long seed) {
482 // we use the Bench method, but perf results are meaningless under DSched
483 DSched sched(DSched::uniform(seed));
485 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
487 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
488 DeterministicAtomic, Dynamic>>>());
489 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
490 DeterministicAtomic, Dynamic>>>());
491 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
492 DeterministicAtomic, Dynamic>>>());
493 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
494 DeterministicAtomic, Dynamic>>>(milliseconds(1)));
495 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
496 DeterministicAtomic, Dynamic>>>(seconds(2)));
499 for (const auto& caller : callers) {
502 producerConsumerBench(
503 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
504 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
505 + folly::to<std::string>(cap)+")",
512 producerConsumerBench(
513 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
514 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
515 + folly::to<std::string>(cap)+")",
522 producerConsumerBench(
523 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
524 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
525 + folly::to<std::string>(cap)+")",
532 producerConsumerBench(
533 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
534 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
535 + folly::to<std::string>(cap)+")",
542 producerConsumerBench(
543 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
544 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
545 + folly::to<std::string>(cap)+")",
553 void runMtProdConsDeterministicDynamic(
562 // we use the Bench method, but perf results are meaningless under DSched
563 DSched sched(DSched::uniform(seed));
565 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
567 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
568 DeterministicAtomic, true>>>());
569 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
570 DeterministicAtomic, true>>>());
571 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
572 DeterministicAtomic, true>>>());
573 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
574 DeterministicAtomic, true>>>(milliseconds(1)));
575 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
576 DeterministicAtomic, true>>>(seconds(2)));
578 for (const auto& caller : callers) {
580 producerConsumerBench(
581 MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
582 "MPMCQueue<int, DeterministicAtomic, true>("
583 + folly::to<std::string>(cap) + ", "
584 + folly::to<std::string>(minCap) + ", "
585 + folly::to<std::string>(mult)+")",
593 TEST(MPMCQueue, mt_prod_cons_deterministic) {
594 runMtProdConsDeterministic(0);
597 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
598 runMtProdConsDeterministic<true>(0);
601 template <typename T>
602 void setFromEnv(T& var, const char* envvar) {
603 char* str = std::getenv(envvar);
604 if (str) { var = atoi(str); }
607 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
611 uint32_t numOps = 1000;
615 setFromEnv(seed, "SEED");
616 setFromEnv(prods, "PRODS");
617 setFromEnv(cons, "CONS");
618 setFromEnv(numOps, "NUM_OPS");
619 setFromEnv(cap, "CAP");
620 setFromEnv(minCap, "MIN_CAP");
621 setFromEnv(mult, "MULT");
622 runMtProdConsDeterministicDynamic(
623 seed, prods, cons, numOps, cap, minCap, mult);
626 #define PC_BENCH(q, np, nc, ...) \
627 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
629 template <bool Dynamic = false>
630 void runMtProdCons() {
632 setFromEnv(n, "NUM_OPS");
633 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
635 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
636 std::atomic, Dynamic>>>());
637 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
638 std::atomic, Dynamic>>>());
639 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
641 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
642 std::atomic, Dynamic>>>(milliseconds(1)));
643 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
644 std::atomic, Dynamic>>>(seconds(2)));
645 for (const auto& caller : callers) {
646 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
648 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
650 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
652 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
654 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
656 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
658 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
660 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
662 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
663 32, 100, n, *caller);
667 TEST(MPMCQueue, mt_prod_cons) {
671 TEST(MPMCQueue, mt_prod_cons_dynamic) {
672 runMtProdCons</* Dynamic = */ true>();
675 template <bool Dynamic = false>
676 void runMtProdConsEmulatedFutex() {
678 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
680 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
681 EmulatedFutexAtomic, Dynamic>>>());
682 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
683 EmulatedFutexAtomic, Dynamic>>>());
684 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
685 EmulatedFutexAtomic, Dynamic>>>());
686 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
687 EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
688 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
689 EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
690 for (const auto& caller : callers) {
691 LOG(INFO) << PC_BENCH(
692 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
693 LOG(INFO) << PC_BENCH(
694 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
695 LOG(INFO) << PC_BENCH(
696 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
697 LOG(INFO) << PC_BENCH(
698 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
699 LOG(INFO) << PC_BENCH(
700 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
701 LOG(INFO) << PC_BENCH(
702 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
703 LOG(INFO) << PC_BENCH(
704 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
705 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
706 (10000)), 10, 10, n, *caller);
707 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
708 (100000)), 32, 100, n, *caller);
712 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
713 runMtProdConsEmulatedFutex();
716 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
717 runMtProdConsEmulatedFutex</* Dynamic = */ true>();
720 template <template <typename> class Atom, bool Dynamic = false>
721 void runNeverFailThread(int numThreads,
723 MPMCQueue<int, Atom, Dynamic>& cq,
724 std::atomic<uint64_t>& sum,
726 uint64_t threadSum = 0;
727 for (int i = t; i < n; i += numThreads) {
729 EXPECT_TRUE(cq.writeIfNotFull(i));
732 EXPECT_TRUE(cq.readIfNotEmpty(dest));
733 EXPECT_TRUE(dest >= 0);
739 template <template <typename> class Atom, bool Dynamic = false>
740 uint64_t runNeverFailTest(int numThreads, int numOps) {
741 // always #enq >= #deq
742 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
745 auto beginMicro = nowMicro();
747 vector<std::thread> threads(numThreads);
748 std::atomic<uint64_t> sum(0);
749 for (int t = 0; t < numThreads; ++t) {
750 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
757 for (auto& t : threads) {
760 EXPECT_TRUE(cq.isEmpty());
761 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
763 return nowMicro() - beginMicro;
766 template <template<typename> class Atom, bool Dynamic = false>
767 void runMtNeverFail(std::vector<int>& nts, int n) {
769 uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
770 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
775 TEST(MPMCQueue, mt_never_fail) {
776 std::vector<int> nts {1, 3, 100};
778 runMtNeverFail<std::atomic>(nts, n);
781 TEST(MPMCQueue, mt_never_fail_dynamic) {
782 std::vector<int> nts {1, 3, 100};
784 runMtNeverFail<std::atomic, true>(nts, n);
787 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
788 std::vector<int> nts {1, 3, 100};
790 runMtNeverFail<EmulatedFutexAtomic>(nts, n);
793 TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
794 std::vector<int> nts {1, 3, 100};
796 runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
799 template<bool Dynamic = false>
800 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
801 LOG(INFO) << "using seed " << seed;
804 DSched sched(DSched::uniform(seed));
805 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
808 DSched sched(DSched::uniformSubset(seed, 2));
809 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
814 TEST(MPMCQueue, mt_never_fail_deterministic) {
815 std::vector<int> nts {3, 10};
816 long seed = 0; // nowMicro() % 10000;
818 runMtNeverFailDeterministic(nts, n, seed);
821 TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
822 std::vector<int> nts {3, 10};
823 long seed = 0; // nowMicro() % 10000;
825 runMtNeverFailDeterministic<true>(nts, n, seed);
828 template <class Clock, template <typename> class Atom, bool Dynamic>
829 void runNeverFailUntilThread(int numThreads,
831 MPMCQueue<int, Atom, Dynamic>& cq,
832 std::atomic<uint64_t>& sum,
834 uint64_t threadSum = 0;
835 for (int i = t; i < n; i += numThreads) {
837 auto soon = Clock::now() + std::chrono::seconds(1);
838 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
841 EXPECT_TRUE(cq.readIfNotEmpty(dest));
842 EXPECT_TRUE(dest >= 0);
848 template <class Clock, template <typename> class Atom, bool Dynamic = false>
849 uint64_t runNeverFailTest(int numThreads, int numOps) {
850 // always #enq >= #deq
851 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
854 auto beginMicro = nowMicro();
856 vector<std::thread> threads(numThreads);
857 std::atomic<uint64_t> sum(0);
858 for (int t = 0; t < numThreads; ++t) {
859 threads[t] = DSched::thread(std::bind(
860 runNeverFailUntilThread<Clock, Atom, Dynamic>,
867 for (auto& t : threads) {
870 EXPECT_TRUE(cq.isEmpty());
871 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
873 return nowMicro() - beginMicro;
876 template <bool Dynamic = false>
877 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
880 runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
881 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
886 TEST(MPMCQueue, mt_never_fail_until_system) {
887 std::vector<int> nts {1, 3, 100};
889 runMtNeverFailUntilSystem(nts, n);
892 TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
893 std::vector<int> nts {1, 3, 100};
895 runMtNeverFailUntilSystem<true>(nts, n);
898 template <bool Dynamic = false>
899 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
902 runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
903 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
908 TEST(MPMCQueue, mt_never_fail_until_steady) {
909 std::vector<int> nts {1, 3, 100};
911 runMtNeverFailUntilSteady(nts, n);
914 TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
915 std::vector<int> nts {1, 3, 100};
917 runMtNeverFailUntilSteady<true>(nts, n);
920 enum LifecycleEvent {
932 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
933 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
935 static int lc_outstanding() {
936 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
937 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
938 lc_counts[DESTRUCTOR];
941 static void lc_snap() {
942 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
943 lc_prev[i] = lc_counts[i];
947 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
949 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
950 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
951 int delta = i == what || i == what2 ? 1 : 0;
952 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
953 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
954 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
955 << ", from line " << lineno;
960 template <typename R>
962 typedef R IsRelocatable;
966 Lifecycle() noexcept : constructed(true) {
967 ++lc_counts[DEFAULT_CONSTRUCTOR];
970 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
971 : constructed(true) {
972 ++lc_counts[TWO_ARG_CONSTRUCTOR];
975 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
976 ++lc_counts[COPY_CONSTRUCTOR];
979 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
980 ++lc_counts[MOVE_CONSTRUCTOR];
983 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
984 ++lc_counts[COPY_OPERATOR];
988 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
989 ++lc_counts[MOVE_OPERATOR];
993 ~Lifecycle() noexcept {
994 ++lc_counts[DESTRUCTOR];
995 assert(lc_outstanding() >= 0);
1001 template <typename R>
1002 void runPerfectForwardingTest() {
1004 EXPECT_EQ(lc_outstanding(), 0);
1007 // Non-dynamic only. False positive for dynamic.
1008 MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
1009 LIFECYCLE_STEP(NOTHING);
1011 for (int pass = 0; pass < 10; ++pass) {
1012 for (int i = 0; i < 10; ++i) {
1013 queue.blockingWrite();
1014 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1016 queue.blockingWrite(1, "one");
1017 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1021 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1022 queue.blockingWrite(std::move(src));
1023 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1025 LIFECYCLE_STEP(DESTRUCTOR);
1029 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1030 queue.blockingWrite(src);
1031 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1033 LIFECYCLE_STEP(DESTRUCTOR);
1035 EXPECT_TRUE(queue.write());
1036 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1039 EXPECT_EQ(queue.size(), 50);
1040 EXPECT_FALSE(queue.write(2, "two"));
1041 LIFECYCLE_STEP(NOTHING);
1043 for (int i = 0; i < 50; ++i) {
1046 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1048 queue.blockingRead(node);
1050 // relocatable, moved via memcpy
1051 LIFECYCLE_STEP(DESTRUCTOR);
1053 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1056 LIFECYCLE_STEP(DESTRUCTOR);
1059 EXPECT_EQ(queue.size(), 0);
1062 // put one element back before destruction
1064 Lifecycle<R> src(3, "three");
1065 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1066 queue.write(std::move(src));
1067 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1069 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1071 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1073 EXPECT_EQ(lc_outstanding(), 0);
1076 TEST(MPMCQueue, perfect_forwarding) {
1077 runPerfectForwardingTest<std::false_type>();
1080 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1081 runPerfectForwardingTest<std::true_type>();
1084 template <bool Dynamic = false>
1085 void run_queue_moving() {
1087 EXPECT_EQ(lc_outstanding(), 0);
1090 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1091 LIFECYCLE_STEP(NOTHING);
1094 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1097 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1099 LIFECYCLE_STEP(NOTHING);
1100 EXPECT_EQ(a.capacity(), 0);
1101 EXPECT_EQ(a.size(), 0);
1102 EXPECT_EQ(b.capacity(), 50);
1103 EXPECT_EQ(b.size(), 1);
1106 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1109 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1110 LIFECYCLE_STEP(NOTHING);
1112 LIFECYCLE_STEP(NOTHING);
1113 EXPECT_EQ(c.capacity(), 50);
1114 EXPECT_EQ(c.size(), 2);
1117 Lifecycle<std::false_type> dst;
1118 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1119 c.blockingRead(dst);
1120 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1124 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1125 LIFECYCLE_STEP(NOTHING);
1127 LIFECYCLE_STEP(NOTHING);
1128 EXPECT_EQ(c.capacity(), 10);
1129 EXPECT_TRUE(c.isEmpty());
1130 EXPECT_EQ(d.capacity(), 50);
1131 EXPECT_EQ(d.size(), 1);
1133 d.blockingRead(dst);
1134 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1136 c.blockingWrite(dst);
1137 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1139 d.blockingWrite(std::move(dst));
1140 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1141 } // d goes out of scope
1142 LIFECYCLE_STEP(DESTRUCTOR);
1143 } // dst goes out of scope
1144 LIFECYCLE_STEP(DESTRUCTOR);
1145 } // c goes out of scope
1146 LIFECYCLE_STEP(DESTRUCTOR);
1149 TEST(MPMCQueue, queue_moving) {
1153 TEST(MPMCQueue, queue_moving_dynamic) {
1154 run_queue_moving<true>();
1157 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1158 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1160 using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1161 ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1164 template <bool Dynamic>
1165 void testTryReadUntil() {
1166 MPMCQueue<int, std::atomic, Dynamic> q{1};
1168 const auto wait = std::chrono::milliseconds(100);
1172 std::vector<std::thread> threads;
1173 boost::barrier b{3};
1174 for (int i = 0; i < 2; i++) {
1175 threads.emplace_back([&, i] {
1177 rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1182 EXPECT_TRUE(q.write(42));
1184 for (int i = 0; i < 2; i++) {
1188 for (int i = 0; i < 2; i++) {
1189 int other = (i + 1) % 2;
1191 EXPECT_EQ(42, vals[i]);
1192 EXPECT_FALSE(rets[other]);
1196 EXPECT_TRUE(watch.elapsed(wait));
1199 template <bool Dynamic>
1200 void testTryWriteUntil() {
1201 MPMCQueue<int, std::atomic, Dynamic> q{1};
1202 EXPECT_TRUE(q.write(42));
1204 const auto wait = std::chrono::milliseconds(100);
1207 std::vector<std::thread> threads;
1208 boost::barrier b{3};
1209 for (int i = 0; i < 2; i++) {
1210 threads.emplace_back([&, i] {
1212 rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1218 EXPECT_TRUE(q.read(x));
1221 for (int i = 0; i < 2; i++) {
1224 EXPECT_TRUE(q.read(x));
1226 for (int i = 0; i < 2; i++) {
1227 int other = (i + 1) % 2;
1230 EXPECT_FALSE(rets[other]);
1234 EXPECT_TRUE(watch.elapsed(wait));
1237 TEST(MPMCQueue, try_read_until) {
1238 testTryReadUntil<false>();
1241 TEST(MPMCQueue, try_read_until_dynamic) {
1242 testTryReadUntil<true>();
1245 TEST(MPMCQueue, try_write_until) {
1246 testTryWriteUntil<false>();
1249 TEST(MPMCQueue, try_write_until_dynamic) {
1250 testTryWriteUntil<true>();