2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/test/DeterministicSchedule.h>
26 #include <boost/intrusive_ptr.hpp>
32 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
34 using namespace folly;
35 using namespace detail;
37 using std::chrono::time_point;
38 using std::chrono::steady_clock;
39 using std::chrono::seconds;
40 using std::chrono::milliseconds;
42 using std::unique_ptr;
45 typedef DeterministicSchedule DSched;
47 template <template<typename> class Atom>
48 void run_mt_sequencer_thread(
52 TurnSequencer<Atom>& seq,
53 Atom<uint32_t>& spinThreshold,
56 for (int op = i; op < numOps; op += numThreads) {
57 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
58 EXPECT_EQ(prev, op - 1);
60 seq.completeTurn(init + op);
64 template <template<typename> class Atom>
65 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
66 TurnSequencer<Atom> seq(init);
67 Atom<uint32_t> spinThreshold(0);
70 vector<std::thread> threads(numThreads);
71 for (int i = 0; i < numThreads; ++i) {
72 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
73 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
77 for (auto& thr : threads) {
81 EXPECT_EQ(prev, numOps - 1);
84 TEST(MPMCQueue, sequencer) {
85 run_mt_sequencer_test<std::atomic>(1, 100, 0);
86 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
87 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
90 TEST(MPMCQueue, sequencer_emulated_futex) {
91 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
92 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
93 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
96 TEST(MPMCQueue, sequencer_deterministic) {
97 DSched sched(DSched::uniform(0));
98 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
99 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
100 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
103 template <bool Dynamic = false, typename T>
104 void runElementTypeTest(T&& src) {
105 MPMCQueue<T, std::atomic, Dynamic> cq(10);
106 cq.blockingWrite(std::forward<T>(src));
108 cq.blockingRead(dest);
109 EXPECT_TRUE(cq.write(std::move(dest)));
110 EXPECT_TRUE(cq.read(dest));
111 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
112 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
113 EXPECT_TRUE(cq.read(dest));
114 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
115 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
116 EXPECT_TRUE(cq.read(dest));
120 static FOLLY_TLS int active_instances;
122 mutable std::atomic<int> rc;
124 RefCounted() : rc(0) {
132 FOLLY_TLS int RefCounted::active_instances;
134 void intrusive_ptr_add_ref(RefCounted const* p) {
138 void intrusive_ptr_release(RefCounted const* p) {
139 if (--(p->rc) == 0) {
144 TEST(MPMCQueue, lots_of_element_types) {
145 runElementTypeTest(10);
146 runElementTypeTest(string("abc"));
147 runElementTypeTest(std::make_pair(10, string("def")));
148 runElementTypeTest(vector<string>{{"abc"}});
149 runElementTypeTest(std::make_shared<char>('a'));
150 runElementTypeTest(folly::make_unique<char>('a'));
151 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
152 EXPECT_EQ(RefCounted::active_instances, 0);
155 TEST(MPMCQueue, lots_of_element_types_dynamic) {
156 runElementTypeTest<true>(10);
157 runElementTypeTest<true>(string("abc"));
158 runElementTypeTest<true>(std::make_pair(10, string("def")));
159 runElementTypeTest<true>(vector<string>{{"abc"}});
160 runElementTypeTest<true>(std::make_shared<char>('a'));
161 runElementTypeTest<true>(folly::make_unique<char>('a'));
162 runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
163 EXPECT_EQ(RefCounted::active_instances, 0);
166 TEST(MPMCQueue, single_thread_enqdeq) {
167 // Non-dynamic version only.
168 // False positive for dynamic version. Capacity can be temporarily
169 // higher than specified.
170 MPMCQueue<int> cq(10);
172 for (int pass = 0; pass < 10; ++pass) {
173 for (int i = 0; i < 10; ++i) {
174 EXPECT_TRUE(cq.write(i));
176 EXPECT_FALSE(cq.write(-1));
177 EXPECT_FALSE(cq.isEmpty());
178 EXPECT_EQ(cq.size(), 10);
180 for (int i = 0; i < 5; ++i) {
182 EXPECT_TRUE(cq.read(dest));
185 for (int i = 5; i < 10; ++i) {
187 cq.blockingRead(dest);
191 EXPECT_FALSE(cq.read(dest));
194 EXPECT_TRUE(cq.isEmpty());
195 EXPECT_EQ(cq.size(), 0);
199 TEST(MPMCQueue, tryenq_capacity_test) {
200 // Non-dynamic version only.
201 // False positive for dynamic version. Capacity can be temporarily
202 // higher than specified.
203 for (size_t cap = 1; cap < 100; ++cap) {
204 MPMCQueue<int> cq(cap);
205 for (size_t i = 0; i < cap; ++i) {
206 EXPECT_TRUE(cq.write(i));
208 EXPECT_FALSE(cq.write(100));
212 TEST(MPMCQueue, enq_capacity_test) {
213 // Non-dynamic version only.
214 // False positive for dynamic version. Capacity can be temporarily
215 // higher than specified.
216 for (auto cap : { 1, 100, 10000 }) {
217 MPMCQueue<int> cq(cap);
218 for (int i = 0; i < cap; ++i) {
223 auto thr = std::thread([&]{
224 cq.blockingWrite(100);
230 cq.blockingRead(dummy);
236 template <template<typename> class Atom, bool Dynamic = false>
237 void runTryEnqDeqThread(
240 MPMCQueue<int, Atom, Dynamic>& cq,
241 std::atomic<uint64_t>& sum,
243 uint64_t threadSum = 0;
245 // received doesn't reflect any actual values, we just start with
246 // t and increment by numThreads to get the rounding of termination
247 // correct if numThreads doesn't evenly divide numOps
249 while (src < n || received < n) {
250 if (src < n && cq.write(src)) {
255 if (received < n && cq.read(dst)) {
256 received += numThreads;
263 template <template<typename> class Atom, bool Dynamic = false>
264 void runTryEnqDeqTest(int numThreads, int numOps) {
265 // write and read aren't linearizable, so we don't have
266 // hard guarantees on their individual behavior. We can still test
267 // correctness in aggregate
268 MPMCQueue<int,Atom, Dynamic> cq(numThreads);
271 vector<std::thread> threads(numThreads);
272 std::atomic<uint64_t> sum(0);
273 for (int t = 0; t < numThreads; ++t) {
274 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
275 numThreads, n, std::ref(cq), std::ref(sum), t));
277 for (auto& t : threads) {
280 EXPECT_TRUE(cq.isEmpty());
281 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
284 TEST(MPMCQueue, mt_try_enq_deq) {
285 int nts[] = { 1, 3, 100 };
289 runTryEnqDeqTest<std::atomic>(nt, n);
293 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
294 int nts[] = { 1, 3, 100 };
298 runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
302 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
303 int nts[] = { 1, 3, 100 };
307 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
311 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
312 int nts[] = { 1, 3, 100 };
316 runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
320 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
321 int nts[] = { 3, 10 };
324 LOG(INFO) << "using seed " << seed;
329 DSched sched(DSched::uniform(seed));
330 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
333 DSched sched(DSched::uniformSubset(seed, 2));
334 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
337 DSched sched(DSched::uniform(seed));
338 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
341 DSched sched(DSched::uniformSubset(seed, 2));
342 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
347 uint64_t nowMicro() {
349 gettimeofday(&tv, 0);
350 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
353 template <typename Q>
354 struct WriteMethodCaller {
355 WriteMethodCaller() {}
356 virtual ~WriteMethodCaller() = default;
357 virtual bool callWrite(Q& q, int i) = 0;
358 virtual string methodName() = 0;
361 template <typename Q>
362 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
363 bool callWrite(Q& q, int i) override {
367 string methodName() override { return "blockingWrite"; }
370 template <typename Q>
371 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
372 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
373 string methodName() override { return "writeIfNotFull"; }
376 template <typename Q>
377 struct WriteCaller : public WriteMethodCaller<Q> {
378 bool callWrite(Q& q, int i) override { return q.write(i); }
379 string methodName() override { return "write"; }
382 template <typename Q,
383 class Clock = steady_clock,
384 class Duration = typename Clock::duration>
385 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
386 const Duration duration_;
387 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
388 bool callWrite(Q& q, int i) override {
389 auto then = Clock::now() + duration_;
390 return q.tryWriteUntil(then, i);
392 string methodName() override {
393 return folly::sformat(
394 "tryWriteUntil({}ms)",
395 std::chrono::duration_cast<milliseconds>(duration_).count());
399 template <typename Q>
400 string producerConsumerBench(Q&& queue,
405 WriteMethodCaller<Q>& writer,
406 bool ignoreContents = false) {
409 struct rusage beginUsage;
410 getrusage(RUSAGE_SELF, &beginUsage);
412 auto beginMicro = nowMicro();
415 std::atomic<uint64_t> sum(0);
416 std::atomic<uint64_t> failed(0);
418 vector<std::thread> producers(numProducers);
419 for (int t = 0; t < numProducers; ++t) {
420 producers[t] = DSched::thread([&,t]{
421 for (int i = t; i < numOps; i += numProducers) {
422 while (!writer.callWrite(q, i)) {
429 vector<std::thread> consumers(numConsumers);
430 for (int t = 0; t < numConsumers; ++t) {
431 consumers[t] = DSched::thread([&,t]{
432 uint64_t localSum = 0;
433 for (int i = t; i < numOps; i += numConsumers) {
435 q.blockingRead(dest);
436 EXPECT_FALSE(dest == -1);
443 for (auto& t : producers) {
446 for (auto& t : consumers) {
449 if (!ignoreContents) {
450 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
453 auto endMicro = nowMicro();
455 struct rusage endUsage;
456 getrusage(RUSAGE_SELF, &endUsage);
458 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
459 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
460 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
461 uint64_t failures = failed;
462 size_t allocated = q.allocatedCapacity();
464 return folly::sformat(
465 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
466 "handoff, {} failures, {} allocated",
478 template <bool Dynamic = false>
479 void runMtProdConsDeterministic(long seed) {
480 // we use the Bench method, but perf results are meaningless under DSched
481 DSched sched(DSched::uniform(seed));
483 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
485 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
486 DeterministicAtomic, Dynamic>>>());
487 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
488 DeterministicAtomic, Dynamic>>>());
489 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
490 DeterministicAtomic, Dynamic>>>());
491 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
492 DeterministicAtomic, Dynamic>>>(milliseconds(1)));
493 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
494 DeterministicAtomic, Dynamic>>>(seconds(2)));
497 for (const auto& caller : callers) {
500 producerConsumerBench(
501 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
502 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
503 + folly::to<std::string>(cap)+")",
510 producerConsumerBench(
511 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
512 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
513 + folly::to<std::string>(cap)+")",
520 producerConsumerBench(
521 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
522 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
523 + folly::to<std::string>(cap)+")",
530 producerConsumerBench(
531 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
532 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
533 + folly::to<std::string>(cap)+")",
540 producerConsumerBench(
541 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
542 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
543 + folly::to<std::string>(cap)+")",
551 void runMtProdConsDeterministicDynamic(
560 // we use the Bench method, but perf results are meaningless under DSched
561 DSched sched(DSched::uniform(seed));
563 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
565 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
566 DeterministicAtomic, true>>>());
567 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
568 DeterministicAtomic, true>>>());
569 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
570 DeterministicAtomic, true>>>());
571 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
572 DeterministicAtomic, true>>>(milliseconds(1)));
573 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
574 DeterministicAtomic, true>>>(seconds(2)));
576 for (const auto& caller : callers) {
578 producerConsumerBench(
579 MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
580 "MPMCQueue<int, DeterministicAtomic, true>("
581 + folly::to<std::string>(cap) + ", "
582 + folly::to<std::string>(minCap) + ", "
583 + folly::to<std::string>(mult)+")",
591 TEST(MPMCQueue, mt_prod_cons_deterministic) {
592 runMtProdConsDeterministic(0);
595 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
596 runMtProdConsDeterministic<true>(0);
599 template <typename T>
600 void setFromEnv(T& var, const char* envvar) {
601 char* str = std::getenv(envvar);
602 if (str) { var = atoi(str); }
605 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
609 uint32_t numOps = 1000;
613 setFromEnv(seed, "SEED");
614 setFromEnv(prods, "PRODS");
615 setFromEnv(cons, "CONS");
616 setFromEnv(numOps, "NUM_OPS");
617 setFromEnv(cap, "CAP");
618 setFromEnv(minCap, "MIN_CAP");
619 setFromEnv(mult, "MULT");
620 runMtProdConsDeterministicDynamic(
621 seed, prods, cons, numOps, cap, minCap, mult);
624 #define PC_BENCH(q, np, nc, ...) \
625 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
627 template <bool Dynamic = false>
628 void runMtProdCons() {
630 setFromEnv(n, "NUM_OPS");
631 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
633 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
634 std::atomic, Dynamic>>>());
635 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
636 std::atomic, Dynamic>>>());
637 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
639 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
640 std::atomic, Dynamic>>>(milliseconds(1)));
641 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
642 std::atomic, Dynamic>>>(seconds(2)));
643 for (const auto& caller : callers) {
644 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
646 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
648 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
650 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
652 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
654 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
656 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
658 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
660 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
661 32, 100, n, *caller);
665 TEST(MPMCQueue, mt_prod_cons) {
669 TEST(MPMCQueue, mt_prod_cons_dynamic) {
670 runMtProdCons</* Dynamic = */ true>();
673 template <bool Dynamic = false>
674 void runMtProdConsEmulatedFutex() {
676 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
678 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
679 EmulatedFutexAtomic, Dynamic>>>());
680 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
681 EmulatedFutexAtomic, Dynamic>>>());
682 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
683 EmulatedFutexAtomic, Dynamic>>>());
684 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
685 EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
686 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
687 EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
688 for (const auto& caller : callers) {
689 LOG(INFO) << PC_BENCH(
690 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
691 LOG(INFO) << PC_BENCH(
692 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
693 LOG(INFO) << PC_BENCH(
694 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
695 LOG(INFO) << PC_BENCH(
696 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
697 LOG(INFO) << PC_BENCH(
698 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
699 LOG(INFO) << PC_BENCH(
700 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
701 LOG(INFO) << PC_BENCH(
702 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
703 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
704 (10000)), 10, 10, n, *caller);
705 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
706 (100000)), 32, 100, n, *caller);
710 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
711 runMtProdConsEmulatedFutex();
714 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
715 runMtProdConsEmulatedFutex</* Dynamic = */ true>();
718 template <template <typename> class Atom, bool Dynamic = false>
719 void runNeverFailThread(int numThreads,
721 MPMCQueue<int, Atom, Dynamic>& cq,
722 std::atomic<uint64_t>& sum,
724 uint64_t threadSum = 0;
725 for (int i = t; i < n; i += numThreads) {
727 EXPECT_TRUE(cq.writeIfNotFull(i));
730 EXPECT_TRUE(cq.readIfNotEmpty(dest));
731 EXPECT_TRUE(dest >= 0);
737 template <template <typename> class Atom, bool Dynamic = false>
738 uint64_t runNeverFailTest(int numThreads, int numOps) {
739 // always #enq >= #deq
740 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
743 auto beginMicro = nowMicro();
745 vector<std::thread> threads(numThreads);
746 std::atomic<uint64_t> sum(0);
747 for (int t = 0; t < numThreads; ++t) {
748 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
755 for (auto& t : threads) {
758 EXPECT_TRUE(cq.isEmpty());
759 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
761 return nowMicro() - beginMicro;
764 template <template<typename> class Atom, bool Dynamic = false>
765 void runMtNeverFail(std::vector<int>& nts, int n) {
767 uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
768 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
773 TEST(MPMCQueue, mt_never_fail) {
774 std::vector<int> nts {1, 3, 100};
776 runMtNeverFail<std::atomic>(nts, n);
779 TEST(MPMCQueue, mt_never_fail_dynamic) {
780 std::vector<int> nts {1, 3, 100};
782 runMtNeverFail<std::atomic, true>(nts, n);
785 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
786 std::vector<int> nts {1, 3, 100};
788 runMtNeverFail<EmulatedFutexAtomic>(nts, n);
791 TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
792 std::vector<int> nts {1, 3, 100};
794 runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
797 template<bool Dynamic = false>
798 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
799 LOG(INFO) << "using seed " << seed;
802 DSched sched(DSched::uniform(seed));
803 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
806 DSched sched(DSched::uniformSubset(seed, 2));
807 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
812 TEST(MPMCQueue, mt_never_fail_deterministic) {
813 std::vector<int> nts {3, 10};
814 long seed = 0; // nowMicro() % 10000;
816 runMtNeverFailDeterministic(nts, n, seed);
819 TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
820 std::vector<int> nts {3, 10};
821 long seed = 0; // nowMicro() % 10000;
823 runMtNeverFailDeterministic<true>(nts, n, seed);
826 template <class Clock, template <typename> class Atom, bool Dynamic>
827 void runNeverFailUntilThread(int numThreads,
829 MPMCQueue<int, Atom, Dynamic>& cq,
830 std::atomic<uint64_t>& sum,
832 uint64_t threadSum = 0;
833 for (int i = t; i < n; i += numThreads) {
835 auto soon = Clock::now() + std::chrono::seconds(1);
836 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
839 EXPECT_TRUE(cq.readIfNotEmpty(dest));
840 EXPECT_TRUE(dest >= 0);
846 template <class Clock, template <typename> class Atom, bool Dynamic = false>
847 uint64_t runNeverFailTest(int numThreads, int numOps) {
848 // always #enq >= #deq
849 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
852 auto beginMicro = nowMicro();
854 vector<std::thread> threads(numThreads);
855 std::atomic<uint64_t> sum(0);
856 for (int t = 0; t < numThreads; ++t) {
857 threads[t] = DSched::thread(std::bind(
858 runNeverFailUntilThread<Clock, Atom, Dynamic>,
865 for (auto& t : threads) {
868 EXPECT_TRUE(cq.isEmpty());
869 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
871 return nowMicro() - beginMicro;
874 template <bool Dynamic = false>
875 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
878 runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
879 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
884 TEST(MPMCQueue, mt_never_fail_until_system) {
885 std::vector<int> nts {1, 3, 100};
887 runMtNeverFailUntilSystem(nts, n);
890 TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
891 std::vector<int> nts {1, 3, 100};
893 runMtNeverFailUntilSystem<true>(nts, n);
896 template <bool Dynamic = false>
897 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
900 runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
901 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
906 TEST(MPMCQueue, mt_never_fail_until_steady) {
907 std::vector<int> nts {1, 3, 100};
909 runMtNeverFailUntilSteady(nts, n);
912 TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
913 std::vector<int> nts {1, 3, 100};
915 runMtNeverFailUntilSteady<true>(nts, n);
918 enum LifecycleEvent {
930 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
931 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
933 static int lc_outstanding() {
934 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
935 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
936 lc_counts[DESTRUCTOR];
939 static void lc_snap() {
940 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
941 lc_prev[i] = lc_counts[i];
945 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
947 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
948 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
949 int delta = i == what || i == what2 ? 1 : 0;
950 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
951 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
952 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
953 << ", from line " << lineno;
958 template <typename R>
960 typedef R IsRelocatable;
964 Lifecycle() noexcept : constructed(true) {
965 ++lc_counts[DEFAULT_CONSTRUCTOR];
968 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
969 : constructed(true) {
970 ++lc_counts[TWO_ARG_CONSTRUCTOR];
973 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
974 ++lc_counts[COPY_CONSTRUCTOR];
977 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
978 ++lc_counts[MOVE_CONSTRUCTOR];
981 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
982 ++lc_counts[COPY_OPERATOR];
986 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
987 ++lc_counts[MOVE_OPERATOR];
991 ~Lifecycle() noexcept {
992 ++lc_counts[DESTRUCTOR];
993 assert(lc_outstanding() >= 0);
999 template <typename R>
1000 void runPerfectForwardingTest() {
1002 EXPECT_EQ(lc_outstanding(), 0);
1005 // Non-dynamic only. False positive for dynamic.
1006 MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
1007 LIFECYCLE_STEP(NOTHING);
1009 for (int pass = 0; pass < 10; ++pass) {
1010 for (int i = 0; i < 10; ++i) {
1011 queue.blockingWrite();
1012 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1014 queue.blockingWrite(1, "one");
1015 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1019 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1020 queue.blockingWrite(std::move(src));
1021 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1023 LIFECYCLE_STEP(DESTRUCTOR);
1027 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1028 queue.blockingWrite(src);
1029 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1031 LIFECYCLE_STEP(DESTRUCTOR);
1033 EXPECT_TRUE(queue.write());
1034 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1037 EXPECT_EQ(queue.size(), 50);
1038 EXPECT_FALSE(queue.write(2, "two"));
1039 LIFECYCLE_STEP(NOTHING);
1041 for (int i = 0; i < 50; ++i) {
1044 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1046 queue.blockingRead(node);
1048 // relocatable, moved via memcpy
1049 LIFECYCLE_STEP(DESTRUCTOR);
1051 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1054 LIFECYCLE_STEP(DESTRUCTOR);
1057 EXPECT_EQ(queue.size(), 0);
1060 // put one element back before destruction
1062 Lifecycle<R> src(3, "three");
1063 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1064 queue.write(std::move(src));
1065 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1067 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1069 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1071 EXPECT_EQ(lc_outstanding(), 0);
1074 TEST(MPMCQueue, perfect_forwarding) {
1075 runPerfectForwardingTest<std::false_type>();
1078 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1079 runPerfectForwardingTest<std::true_type>();
1082 template <bool Dynamic = false>
1083 void run_queue_moving() {
1085 EXPECT_EQ(lc_outstanding(), 0);
1088 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1089 LIFECYCLE_STEP(NOTHING);
1092 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1095 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1097 LIFECYCLE_STEP(NOTHING);
1098 EXPECT_EQ(a.capacity(), 0);
1099 EXPECT_EQ(a.size(), 0);
1100 EXPECT_EQ(b.capacity(), 50);
1101 EXPECT_EQ(b.size(), 1);
1104 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1107 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1108 LIFECYCLE_STEP(NOTHING);
1110 LIFECYCLE_STEP(NOTHING);
1111 EXPECT_EQ(c.capacity(), 50);
1112 EXPECT_EQ(c.size(), 2);
1115 Lifecycle<std::false_type> dst;
1116 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1117 c.blockingRead(dst);
1118 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1122 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1123 LIFECYCLE_STEP(NOTHING);
1125 LIFECYCLE_STEP(NOTHING);
1126 EXPECT_EQ(c.capacity(), 10);
1127 EXPECT_TRUE(c.isEmpty());
1128 EXPECT_EQ(d.capacity(), 50);
1129 EXPECT_EQ(d.size(), 1);
1131 d.blockingRead(dst);
1132 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1134 c.blockingWrite(dst);
1135 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1137 d.blockingWrite(std::move(dst));
1138 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1139 } // d goes out of scope
1140 LIFECYCLE_STEP(DESTRUCTOR);
1141 } // dst goes out of scope
1142 LIFECYCLE_STEP(DESTRUCTOR);
1143 } // c goes out of scope
1144 LIFECYCLE_STEP(DESTRUCTOR);
1147 TEST(MPMCQueue, queue_moving) {
1151 TEST(MPMCQueue, queue_moving_dynamic) {
1152 run_queue_moving<true>();
1155 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1156 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1158 using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1159 ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);