2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/SysResource.h>
21 #include <folly/portability/SysTime.h>
22 #include <folly/portability/Unistd.h>
23 #include <folly/test/DeterministicSchedule.h>
25 #include <boost/intrusive_ptr.hpp>
31 #include <gtest/gtest.h>
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35 using namespace folly;
36 using namespace detail;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
43 using std::unique_ptr;
46 typedef DeterministicSchedule DSched;
48 template <template<typename> class Atom>
49 void run_mt_sequencer_thread(
53 TurnSequencer<Atom>& seq,
54 Atom<uint32_t>& spinThreshold,
57 for (int op = i; op < numOps; op += numThreads) {
58 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
59 EXPECT_EQ(prev, op - 1);
61 seq.completeTurn(init + op);
65 template <template<typename> class Atom>
66 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
67 TurnSequencer<Atom> seq(init);
68 Atom<uint32_t> spinThreshold(0);
71 vector<std::thread> threads(numThreads);
72 for (int i = 0; i < numThreads; ++i) {
73 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
74 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
78 for (auto& thr : threads) {
82 EXPECT_EQ(prev, numOps - 1);
85 TEST(MPMCQueue, sequencer) {
86 run_mt_sequencer_test<std::atomic>(1, 100, 0);
87 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
88 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
91 TEST(MPMCQueue, sequencer_emulated_futex) {
92 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
93 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
97 TEST(MPMCQueue, sequencer_deterministic) {
98 DSched sched(DSched::uniform(0));
99 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
100 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
101 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
104 template <bool Dynamic = false, typename T>
105 void runElementTypeTest(T&& src) {
106 MPMCQueue<T, std::atomic, Dynamic> cq(10);
107 cq.blockingWrite(std::forward<T>(src));
109 cq.blockingRead(dest);
110 EXPECT_TRUE(cq.write(std::move(dest)));
111 EXPECT_TRUE(cq.read(dest));
112 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
113 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
114 EXPECT_TRUE(cq.read(dest));
115 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
116 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
117 EXPECT_TRUE(cq.read(dest));
121 static FOLLY_TLS int active_instances;
123 mutable std::atomic<int> rc;
125 RefCounted() : rc(0) {
133 FOLLY_TLS int RefCounted::active_instances;
135 void intrusive_ptr_add_ref(RefCounted const* p) {
139 void intrusive_ptr_release(RefCounted const* p) {
140 if (--(p->rc) == 0) {
145 TEST(MPMCQueue, lots_of_element_types) {
146 runElementTypeTest(10);
147 runElementTypeTest(string("abc"));
148 runElementTypeTest(std::make_pair(10, string("def")));
149 runElementTypeTest(vector<string>{{"abc"}});
150 runElementTypeTest(std::make_shared<char>('a'));
151 runElementTypeTest(folly::make_unique<char>('a'));
152 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
153 EXPECT_EQ(RefCounted::active_instances, 0);
156 TEST(MPMCQueue, lots_of_element_types_dynamic) {
157 runElementTypeTest<true>(10);
158 runElementTypeTest<true>(string("abc"));
159 runElementTypeTest<true>(std::make_pair(10, string("def")));
160 runElementTypeTest<true>(vector<string>{{"abc"}});
161 runElementTypeTest<true>(std::make_shared<char>('a'));
162 runElementTypeTest<true>(folly::make_unique<char>('a'));
163 runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
164 EXPECT_EQ(RefCounted::active_instances, 0);
167 TEST(MPMCQueue, single_thread_enqdeq) {
168 // Non-dynamic version only.
169 // False positive for dynamic version. Capacity can be temporarily
170 // higher than specified.
171 MPMCQueue<int> cq(10);
173 for (int pass = 0; pass < 10; ++pass) {
174 for (int i = 0; i < 10; ++i) {
175 EXPECT_TRUE(cq.write(i));
177 EXPECT_FALSE(cq.write(-1));
178 EXPECT_FALSE(cq.isEmpty());
179 EXPECT_EQ(cq.size(), 10);
181 for (int i = 0; i < 5; ++i) {
183 EXPECT_TRUE(cq.read(dest));
186 for (int i = 5; i < 10; ++i) {
188 cq.blockingRead(dest);
192 EXPECT_FALSE(cq.read(dest));
195 EXPECT_TRUE(cq.isEmpty());
196 EXPECT_EQ(cq.size(), 0);
200 TEST(MPMCQueue, tryenq_capacity_test) {
201 // Non-dynamic version only.
202 // False positive for dynamic version. Capacity can be temporarily
203 // higher than specified.
204 for (size_t cap = 1; cap < 100; ++cap) {
205 MPMCQueue<int> cq(cap);
206 for (size_t i = 0; i < cap; ++i) {
207 EXPECT_TRUE(cq.write(i));
209 EXPECT_FALSE(cq.write(100));
213 TEST(MPMCQueue, enq_capacity_test) {
214 // Non-dynamic version only.
215 // False positive for dynamic version. Capacity can be temporarily
216 // higher than specified.
217 for (auto cap : { 1, 100, 10000 }) {
218 MPMCQueue<int> cq(cap);
219 for (int i = 0; i < cap; ++i) {
224 auto thr = std::thread([&]{
225 cq.blockingWrite(100);
231 cq.blockingRead(dummy);
237 template <template<typename> class Atom, bool Dynamic = false>
238 void runTryEnqDeqThread(
241 MPMCQueue<int, Atom, Dynamic>& cq,
242 std::atomic<uint64_t>& sum,
244 uint64_t threadSum = 0;
246 // received doesn't reflect any actual values, we just start with
247 // t and increment by numThreads to get the rounding of termination
248 // correct if numThreads doesn't evenly divide numOps
250 while (src < n || received < n) {
251 if (src < n && cq.write(src)) {
256 if (received < n && cq.read(dst)) {
257 received += numThreads;
264 template <template<typename> class Atom, bool Dynamic = false>
265 void runTryEnqDeqTest(int numThreads, int numOps) {
266 // write and read aren't linearizable, so we don't have
267 // hard guarantees on their individual behavior. We can still test
268 // correctness in aggregate
269 MPMCQueue<int,Atom, Dynamic> cq(numThreads);
272 vector<std::thread> threads(numThreads);
273 std::atomic<uint64_t> sum(0);
274 for (int t = 0; t < numThreads; ++t) {
275 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
276 numThreads, n, std::ref(cq), std::ref(sum), t));
278 for (auto& t : threads) {
281 EXPECT_TRUE(cq.isEmpty());
282 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
285 TEST(MPMCQueue, mt_try_enq_deq) {
286 int nts[] = { 1, 3, 100 };
290 runTryEnqDeqTest<std::atomic>(nt, n);
294 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
295 int nts[] = { 1, 3, 100 };
299 runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
303 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
304 int nts[] = { 1, 3, 100 };
308 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
312 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
313 int nts[] = { 1, 3, 100 };
317 runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
321 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
322 int nts[] = { 3, 10 };
325 LOG(INFO) << "using seed " << seed;
330 DSched sched(DSched::uniform(seed));
331 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
334 DSched sched(DSched::uniformSubset(seed, 2));
335 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
338 DSched sched(DSched::uniform(seed));
339 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
342 DSched sched(DSched::uniformSubset(seed, 2));
343 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
348 uint64_t nowMicro() {
350 gettimeofday(&tv, 0);
351 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
354 template <typename Q>
355 struct WriteMethodCaller {
356 WriteMethodCaller() {}
357 virtual ~WriteMethodCaller() = default;
358 virtual bool callWrite(Q& q, int i) = 0;
359 virtual string methodName() = 0;
362 template <typename Q>
363 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
364 bool callWrite(Q& q, int i) override {
368 string methodName() override { return "blockingWrite"; }
371 template <typename Q>
372 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
373 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
374 string methodName() override { return "writeIfNotFull"; }
377 template <typename Q>
378 struct WriteCaller : public WriteMethodCaller<Q> {
379 bool callWrite(Q& q, int i) override { return q.write(i); }
380 string methodName() override { return "write"; }
383 template <typename Q,
384 class Clock = steady_clock,
385 class Duration = typename Clock::duration>
386 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
387 const Duration duration_;
388 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
389 bool callWrite(Q& q, int i) override {
390 auto then = Clock::now() + duration_;
391 return q.tryWriteUntil(then, i);
393 string methodName() override {
394 return folly::sformat(
395 "tryWriteUntil({}ms)",
396 std::chrono::duration_cast<milliseconds>(duration_).count());
400 template <typename Q>
401 string producerConsumerBench(Q&& queue,
406 WriteMethodCaller<Q>& writer,
407 bool ignoreContents = false) {
410 struct rusage beginUsage;
411 getrusage(RUSAGE_SELF, &beginUsage);
413 auto beginMicro = nowMicro();
416 std::atomic<uint64_t> sum(0);
417 std::atomic<uint64_t> failed(0);
419 vector<std::thread> producers(numProducers);
420 for (int t = 0; t < numProducers; ++t) {
421 producers[t] = DSched::thread([&,t]{
422 for (int i = t; i < numOps; i += numProducers) {
423 while (!writer.callWrite(q, i)) {
430 vector<std::thread> consumers(numConsumers);
431 for (int t = 0; t < numConsumers; ++t) {
432 consumers[t] = DSched::thread([&,t]{
433 uint64_t localSum = 0;
434 for (int i = t; i < numOps; i += numConsumers) {
436 q.blockingRead(dest);
437 EXPECT_FALSE(dest == -1);
444 for (auto& t : producers) {
447 for (auto& t : consumers) {
450 if (!ignoreContents) {
451 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
454 auto endMicro = nowMicro();
456 struct rusage endUsage;
457 getrusage(RUSAGE_SELF, &endUsage);
459 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
460 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
461 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
462 uint64_t failures = failed;
463 size_t allocated = q.allocatedCapacity();
465 return folly::sformat(
466 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
467 "handoff, {} failures, {} allocated",
479 template <bool Dynamic = false>
480 void runMtProdConsDeterministic(long seed) {
481 // we use the Bench method, but perf results are meaningless under DSched
482 DSched sched(DSched::uniform(seed));
484 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
486 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
487 DeterministicAtomic, Dynamic>>>());
488 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
489 DeterministicAtomic, Dynamic>>>());
490 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
491 DeterministicAtomic, Dynamic>>>());
492 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
493 DeterministicAtomic, Dynamic>>>(milliseconds(1)));
494 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
495 DeterministicAtomic, Dynamic>>>(seconds(2)));
498 for (const auto& caller : callers) {
501 producerConsumerBench(
502 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
503 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
504 + folly::to<std::string>(cap)+")",
511 producerConsumerBench(
512 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
513 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
514 + folly::to<std::string>(cap)+")",
521 producerConsumerBench(
522 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
523 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
524 + folly::to<std::string>(cap)+")",
531 producerConsumerBench(
532 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
533 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
534 + folly::to<std::string>(cap)+")",
541 producerConsumerBench(
542 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
543 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
544 + folly::to<std::string>(cap)+")",
552 void runMtProdConsDeterministicDynamic(
561 // we use the Bench method, but perf results are meaningless under DSched
562 DSched sched(DSched::uniform(seed));
564 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
566 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
567 DeterministicAtomic, true>>>());
568 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
569 DeterministicAtomic, true>>>());
570 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
571 DeterministicAtomic, true>>>());
572 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
573 DeterministicAtomic, true>>>(milliseconds(1)));
574 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
575 DeterministicAtomic, true>>>(seconds(2)));
577 for (const auto& caller : callers) {
579 producerConsumerBench(
580 MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
581 "MPMCQueue<int, DeterministicAtomic, true>("
582 + folly::to<std::string>(cap) + ", "
583 + folly::to<std::string>(minCap) + ", "
584 + folly::to<std::string>(mult)+")",
592 TEST(MPMCQueue, mt_prod_cons_deterministic) {
593 runMtProdConsDeterministic(0);
596 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
597 runMtProdConsDeterministic<true>(0);
600 template <typename T>
601 void setFromEnv(T& var, const char* envvar) {
602 char* str = std::getenv(envvar);
603 if (str) { var = atoi(str); }
606 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
610 uint32_t numOps = 1000;
614 setFromEnv(seed, "SEED");
615 setFromEnv(prods, "PRODS");
616 setFromEnv(cons, "CONS");
617 setFromEnv(numOps, "NUM_OPS");
618 setFromEnv(cap, "CAP");
619 setFromEnv(minCap, "MIN_CAP");
620 setFromEnv(mult, "MULT");
621 runMtProdConsDeterministicDynamic(
622 seed, prods, cons, numOps, cap, minCap, mult);
625 #define PC_BENCH(q, np, nc, ...) \
626 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
628 template <bool Dynamic = false>
629 void runMtProdCons() {
631 setFromEnv(n, "NUM_OPS");
632 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
634 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
635 std::atomic, Dynamic>>>());
636 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
637 std::atomic, Dynamic>>>());
638 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
640 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
641 std::atomic, Dynamic>>>(milliseconds(1)));
642 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
643 std::atomic, Dynamic>>>(seconds(2)));
644 for (const auto& caller : callers) {
645 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
647 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
649 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
651 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
653 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
655 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
657 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
659 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
661 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
662 32, 100, n, *caller);
666 TEST(MPMCQueue, mt_prod_cons) {
670 TEST(MPMCQueue, mt_prod_cons_dynamic) {
671 runMtProdCons</* Dynamic = */ true>();
674 template <bool Dynamic = false>
675 void runMtProdConsEmulatedFutex() {
677 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
679 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
680 EmulatedFutexAtomic, Dynamic>>>());
681 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
682 EmulatedFutexAtomic, Dynamic>>>());
683 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
684 EmulatedFutexAtomic, Dynamic>>>());
685 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
686 EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
687 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
688 EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
689 for (const auto& caller : callers) {
690 LOG(INFO) << PC_BENCH(
691 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
692 LOG(INFO) << PC_BENCH(
693 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
694 LOG(INFO) << PC_BENCH(
695 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
696 LOG(INFO) << PC_BENCH(
697 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
698 LOG(INFO) << PC_BENCH(
699 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
700 LOG(INFO) << PC_BENCH(
701 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
702 LOG(INFO) << PC_BENCH(
703 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
704 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
705 (10000)), 10, 10, n, *caller);
706 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
707 (100000)), 32, 100, n, *caller);
711 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
712 runMtProdConsEmulatedFutex();
715 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
716 runMtProdConsEmulatedFutex</* Dynamic = */ true>();
719 template <template <typename> class Atom, bool Dynamic = false>
720 void runNeverFailThread(int numThreads,
722 MPMCQueue<int, Atom, Dynamic>& cq,
723 std::atomic<uint64_t>& sum,
725 uint64_t threadSum = 0;
726 for (int i = t; i < n; i += numThreads) {
728 EXPECT_TRUE(cq.writeIfNotFull(i));
731 EXPECT_TRUE(cq.readIfNotEmpty(dest));
732 EXPECT_TRUE(dest >= 0);
738 template <template <typename> class Atom, bool Dynamic = false>
739 uint64_t runNeverFailTest(int numThreads, int numOps) {
740 // always #enq >= #deq
741 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
744 auto beginMicro = nowMicro();
746 vector<std::thread> threads(numThreads);
747 std::atomic<uint64_t> sum(0);
748 for (int t = 0; t < numThreads; ++t) {
749 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
756 for (auto& t : threads) {
759 EXPECT_TRUE(cq.isEmpty());
760 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
762 return nowMicro() - beginMicro;
765 template <template<typename> class Atom, bool Dynamic = false>
766 void runMtNeverFail(std::vector<int>& nts, int n) {
768 uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
769 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
774 TEST(MPMCQueue, mt_never_fail) {
775 std::vector<int> nts {1, 3, 100};
777 runMtNeverFail<std::atomic>(nts, n);
780 TEST(MPMCQueue, mt_never_fail_dynamic) {
781 std::vector<int> nts {1, 3, 100};
783 runMtNeverFail<std::atomic, true>(nts, n);
786 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
787 std::vector<int> nts {1, 3, 100};
789 runMtNeverFail<EmulatedFutexAtomic>(nts, n);
792 TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
793 std::vector<int> nts {1, 3, 100};
795 runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
798 template<bool Dynamic = false>
799 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
800 LOG(INFO) << "using seed " << seed;
803 DSched sched(DSched::uniform(seed));
804 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
807 DSched sched(DSched::uniformSubset(seed, 2));
808 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
813 TEST(MPMCQueue, mt_never_fail_deterministic) {
814 std::vector<int> nts {3, 10};
815 long seed = 0; // nowMicro() % 10000;
817 runMtNeverFailDeterministic(nts, n, seed);
820 TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
821 std::vector<int> nts {3, 10};
822 long seed = 0; // nowMicro() % 10000;
824 runMtNeverFailDeterministic<true>(nts, n, seed);
827 template <class Clock, template <typename> class Atom, bool Dynamic>
828 void runNeverFailUntilThread(int numThreads,
830 MPMCQueue<int, Atom, Dynamic>& cq,
831 std::atomic<uint64_t>& sum,
833 uint64_t threadSum = 0;
834 for (int i = t; i < n; i += numThreads) {
836 auto soon = Clock::now() + std::chrono::seconds(1);
837 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
840 EXPECT_TRUE(cq.readIfNotEmpty(dest));
841 EXPECT_TRUE(dest >= 0);
847 template <class Clock, template <typename> class Atom, bool Dynamic = false>
848 uint64_t runNeverFailTest(int numThreads, int numOps) {
849 // always #enq >= #deq
850 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
853 auto beginMicro = nowMicro();
855 vector<std::thread> threads(numThreads);
856 std::atomic<uint64_t> sum(0);
857 for (int t = 0; t < numThreads; ++t) {
858 threads[t] = DSched::thread(std::bind(
859 runNeverFailUntilThread<Clock, Atom, Dynamic>,
866 for (auto& t : threads) {
869 EXPECT_TRUE(cq.isEmpty());
870 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
872 return nowMicro() - beginMicro;
875 template <bool Dynamic = false>
876 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
879 runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
880 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
885 TEST(MPMCQueue, mt_never_fail_until_system) {
886 std::vector<int> nts {1, 3, 100};
888 runMtNeverFailUntilSystem(nts, n);
891 TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
892 std::vector<int> nts {1, 3, 100};
894 runMtNeverFailUntilSystem<true>(nts, n);
897 template <bool Dynamic = false>
898 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
901 runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
902 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
907 TEST(MPMCQueue, mt_never_fail_until_steady) {
908 std::vector<int> nts {1, 3, 100};
910 runMtNeverFailUntilSteady(nts, n);
913 TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
914 std::vector<int> nts {1, 3, 100};
916 runMtNeverFailUntilSteady<true>(nts, n);
919 enum LifecycleEvent {
931 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
932 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
934 static int lc_outstanding() {
935 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
936 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
937 lc_counts[DESTRUCTOR];
940 static void lc_snap() {
941 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
942 lc_prev[i] = lc_counts[i];
946 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
948 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
949 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
950 int delta = i == what || i == what2 ? 1 : 0;
951 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
952 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
953 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
954 << ", from line " << lineno;
959 template <typename R>
961 typedef R IsRelocatable;
965 Lifecycle() noexcept : constructed(true) {
966 ++lc_counts[DEFAULT_CONSTRUCTOR];
969 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
970 : constructed(true) {
971 ++lc_counts[TWO_ARG_CONSTRUCTOR];
974 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
975 ++lc_counts[COPY_CONSTRUCTOR];
978 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
979 ++lc_counts[MOVE_CONSTRUCTOR];
982 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
983 ++lc_counts[COPY_OPERATOR];
987 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
988 ++lc_counts[MOVE_OPERATOR];
992 ~Lifecycle() noexcept {
993 ++lc_counts[DESTRUCTOR];
994 assert(lc_outstanding() >= 0);
1000 template <typename R>
1001 void runPerfectForwardingTest() {
1003 EXPECT_EQ(lc_outstanding(), 0);
1006 // Non-dynamic only. False positive for dynamic.
1007 MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
1008 LIFECYCLE_STEP(NOTHING);
1010 for (int pass = 0; pass < 10; ++pass) {
1011 for (int i = 0; i < 10; ++i) {
1012 queue.blockingWrite();
1013 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1015 queue.blockingWrite(1, "one");
1016 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1020 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1021 queue.blockingWrite(std::move(src));
1022 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1024 LIFECYCLE_STEP(DESTRUCTOR);
1028 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1029 queue.blockingWrite(src);
1030 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1032 LIFECYCLE_STEP(DESTRUCTOR);
1034 EXPECT_TRUE(queue.write());
1035 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1038 EXPECT_EQ(queue.size(), 50);
1039 EXPECT_FALSE(queue.write(2, "two"));
1040 LIFECYCLE_STEP(NOTHING);
1042 for (int i = 0; i < 50; ++i) {
1045 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1047 queue.blockingRead(node);
1049 // relocatable, moved via memcpy
1050 LIFECYCLE_STEP(DESTRUCTOR);
1052 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1055 LIFECYCLE_STEP(DESTRUCTOR);
1058 EXPECT_EQ(queue.size(), 0);
1061 // put one element back before destruction
1063 Lifecycle<R> src(3, "three");
1064 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1065 queue.write(std::move(src));
1066 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1068 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1070 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1072 EXPECT_EQ(lc_outstanding(), 0);
1075 TEST(MPMCQueue, perfect_forwarding) {
1076 runPerfectForwardingTest<std::false_type>();
1079 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1080 runPerfectForwardingTest<std::true_type>();
1083 template <bool Dynamic = false>
1084 void run_queue_moving() {
1086 EXPECT_EQ(lc_outstanding(), 0);
1089 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1090 LIFECYCLE_STEP(NOTHING);
1093 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1096 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1098 LIFECYCLE_STEP(NOTHING);
1099 EXPECT_EQ(a.capacity(), 0);
1100 EXPECT_EQ(a.size(), 0);
1101 EXPECT_EQ(b.capacity(), 50);
1102 EXPECT_EQ(b.size(), 1);
1105 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1108 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1109 LIFECYCLE_STEP(NOTHING);
1111 LIFECYCLE_STEP(NOTHING);
1112 EXPECT_EQ(c.capacity(), 50);
1113 EXPECT_EQ(c.size(), 2);
1116 Lifecycle<std::false_type> dst;
1117 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1118 c.blockingRead(dst);
1119 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1123 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1124 LIFECYCLE_STEP(NOTHING);
1126 LIFECYCLE_STEP(NOTHING);
1127 EXPECT_EQ(c.capacity(), 10);
1128 EXPECT_TRUE(c.isEmpty());
1129 EXPECT_EQ(d.capacity(), 50);
1130 EXPECT_EQ(d.size(), 1);
1132 d.blockingRead(dst);
1133 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1135 c.blockingWrite(dst);
1136 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1138 d.blockingWrite(std::move(dst));
1139 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1140 } // d goes out of scope
1141 LIFECYCLE_STEP(DESTRUCTOR);
1142 } // dst goes out of scope
1143 LIFECYCLE_STEP(DESTRUCTOR);
1144 } // c goes out of scope
1145 LIFECYCLE_STEP(DESTRUCTOR);
1148 TEST(MPMCQueue, queue_moving) {
1152 TEST(MPMCQueue, queue_moving_dynamic) {
1153 run_queue_moving<true>();
1156 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1157 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1159 using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1160 ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);