2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
22 #include <boost/intrusive_ptr.hpp>
29 #include <sys/resource.h>
31 #include <gtest/gtest.h>
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35 using namespace folly;
36 using namespace detail;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
43 using std::make_unique;
44 using std::unique_ptr;
47 typedef DeterministicSchedule DSched;
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
54 TurnSequencer<Atom>& seq,
55 Atom<uint32_t>& spinThreshold,
58 for (int op = i; op < numOps; op += numThreads) {
59 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60 EXPECT_EQ(prev, op - 1);
62 seq.completeTurn(init + op);
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68 TurnSequencer<Atom> seq(init);
69 Atom<uint32_t> spinThreshold(0);
72 vector<std::thread> threads(numThreads);
73 for (int i = 0; i < numThreads; ++i) {
74 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
79 for (auto& thr : threads) {
83 EXPECT_EQ(prev, numOps - 1);
86 TEST(MPMCQueue, sequencer) {
87 run_mt_sequencer_test<std::atomic>(1, 100, 0);
88 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
98 TEST(MPMCQueue, sequencer_deterministic) {
99 DSched sched(DSched::uniform(0));
100 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
105 template <typename T>
106 void runElementTypeTest(T&& src) {
108 cq.blockingWrite(std::move(src));
110 cq.blockingRead(dest);
111 EXPECT_TRUE(cq.write(std::move(dest)));
112 EXPECT_TRUE(cq.read(dest));
113 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115 EXPECT_TRUE(cq.read(dest));
116 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118 EXPECT_TRUE(cq.read(dest));
122 static __thread int active_instances;
124 mutable std::atomic<int> rc;
126 RefCounted() : rc(0) {
134 __thread int RefCounted::active_instances;
137 void intrusive_ptr_add_ref(RefCounted const* p) {
141 void intrusive_ptr_release(RefCounted const* p) {
142 if (--(p->rc) == 0) {
147 TEST(MPMCQueue, lots_of_element_types) {
148 runElementTypeTest(10);
149 runElementTypeTest(string("abc"));
150 runElementTypeTest(std::make_pair(10, string("def")));
151 runElementTypeTest(vector<string>{{"abc"}});
152 runElementTypeTest(std::make_shared<char>('a'));
153 runElementTypeTest(folly::make_unique<char>('a'));
154 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
155 EXPECT_EQ(RefCounted::active_instances, 0);
158 TEST(MPMCQueue, single_thread_enqdeq) {
159 MPMCQueue<int> cq(10);
161 for (int pass = 0; pass < 10; ++pass) {
162 for (int i = 0; i < 10; ++i) {
163 EXPECT_TRUE(cq.write(i));
165 EXPECT_FALSE(cq.write(-1));
166 EXPECT_FALSE(cq.isEmpty());
167 EXPECT_EQ(cq.size(), 10);
169 for (int i = 0; i < 5; ++i) {
171 EXPECT_TRUE(cq.read(dest));
174 for (int i = 5; i < 10; ++i) {
176 cq.blockingRead(dest);
180 EXPECT_FALSE(cq.read(dest));
183 EXPECT_TRUE(cq.isEmpty());
184 EXPECT_EQ(cq.size(), 0);
188 TEST(MPMCQueue, tryenq_capacity_test) {
189 for (size_t cap = 1; cap < 100; ++cap) {
190 MPMCQueue<int> cq(cap);
191 for (size_t i = 0; i < cap; ++i) {
192 EXPECT_TRUE(cq.write(i));
194 EXPECT_FALSE(cq.write(100));
198 TEST(MPMCQueue, enq_capacity_test) {
199 for (auto cap : { 1, 100, 10000 }) {
200 MPMCQueue<int> cq(cap);
201 for (int i = 0; i < cap; ++i) {
206 auto thr = std::thread([&]{
207 cq.blockingWrite(100);
213 cq.blockingRead(dummy);
219 template <template<typename> class Atom>
220 void runTryEnqDeqThread(
223 MPMCQueue<int, Atom>& cq,
224 std::atomic<uint64_t>& sum,
226 uint64_t threadSum = 0;
228 // received doesn't reflect any actual values, we just start with
229 // t and increment by numThreads to get the rounding of termination
230 // correct if numThreads doesn't evenly divide numOps
232 while (src < n || received < n) {
233 if (src < n && cq.write(src)) {
238 if (received < n && cq.read(dst)) {
239 received += numThreads;
246 template <template<typename> class Atom>
247 void runTryEnqDeqTest(int numThreads, int numOps) {
248 // write and read aren't linearizable, so we don't have
249 // hard guarantees on their individual behavior. We can still test
250 // correctness in aggregate
251 MPMCQueue<int,Atom> cq(numThreads);
254 vector<std::thread> threads(numThreads);
255 std::atomic<uint64_t> sum(0);
256 for (int t = 0; t < numThreads; ++t) {
257 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
258 numThreads, n, std::ref(cq), std::ref(sum), t));
260 for (auto& t : threads) {
263 EXPECT_TRUE(cq.isEmpty());
264 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
267 TEST(MPMCQueue, mt_try_enq_deq) {
268 int nts[] = { 1, 3, 100 };
272 runTryEnqDeqTest<std::atomic>(nt, n);
276 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
277 int nts[] = { 1, 3, 100 };
281 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
285 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
286 int nts[] = { 3, 10 };
289 LOG(INFO) << "using seed " << seed;
294 DSched sched(DSched::uniform(seed));
295 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
298 DSched sched(DSched::uniformSubset(seed, 2));
299 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
304 uint64_t nowMicro() {
306 gettimeofday(&tv, 0);
307 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
310 template <typename Q>
311 struct WriteMethodCaller {
312 WriteMethodCaller() {}
313 virtual ~WriteMethodCaller() = default;
314 virtual bool callWrite(Q& q, int i) = 0;
315 virtual string methodName() = 0;
318 template <typename Q>
319 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
320 bool callWrite(Q& q, int i) override {
324 string methodName() override { return "blockingWrite"; }
327 template <typename Q>
328 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
329 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
330 string methodName() override { return "writeIfNotFull"; }
333 template <typename Q>
334 struct WriteCaller : public WriteMethodCaller<Q> {
335 bool callWrite(Q& q, int i) override { return q.write(i); }
336 string methodName() override { return "write"; }
339 template <typename Q,
340 class Clock = steady_clock,
341 class Duration = typename Clock::duration>
342 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
343 const Duration duration_;
344 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
345 bool callWrite(Q& q, int i) override {
346 auto then = Clock::now() + duration_;
347 return q.tryWriteUntil(then, i);
349 string methodName() override {
350 return folly::sformat(
351 "tryWriteUntil({}ms)",
352 std::chrono::duration_cast<milliseconds>(duration_).count());
356 template <typename Q>
357 string producerConsumerBench(Q&& queue,
362 WriteMethodCaller<Q>& writer,
363 bool ignoreContents = false) {
366 struct rusage beginUsage;
367 getrusage(RUSAGE_SELF, &beginUsage);
369 auto beginMicro = nowMicro();
372 std::atomic<uint64_t> sum(0);
373 std::atomic<uint64_t> failed(0);
375 vector<std::thread> producers(numProducers);
376 for (int t = 0; t < numProducers; ++t) {
377 producers[t] = DSched::thread([&,t]{
378 for (int i = t; i < numOps; i += numProducers) {
379 while (!writer.callWrite(q, i)) {
386 vector<std::thread> consumers(numConsumers);
387 for (int t = 0; t < numConsumers; ++t) {
388 consumers[t] = DSched::thread([&,t]{
389 uint64_t localSum = 0;
390 for (int i = t; i < numOps; i += numConsumers) {
392 q.blockingRead(dest);
393 EXPECT_FALSE(dest == -1);
400 for (auto& t : producers) {
403 for (auto& t : consumers) {
406 if (!ignoreContents) {
407 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
410 auto endMicro = nowMicro();
412 struct rusage endUsage;
413 getrusage(RUSAGE_SELF, &endUsage);
415 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
416 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
417 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
418 uint64_t failures = failed;
420 return folly::sformat(
421 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
422 "handoff, {} failures",
433 TEST(MPMCQueue, mt_prod_cons_deterministic) {
434 // we use the Bench method, but perf results are meaningless under DSched
435 DSched sched(DSched::uniform(0));
437 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
439 callers.emplace_back(
440 make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
441 callers.emplace_back(
442 make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
443 callers.emplace_back(
444 make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
445 callers.emplace_back(
446 make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
448 callers.emplace_back(
449 make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
452 for (const auto& caller : callers) {
454 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
455 "MPMCQueue<int, DeterministicAtomic>(10)",
461 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
462 "MPMCQueue<int, DeterministicAtomic>(100)",
468 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
469 "MPMCQueue<int, DeterministicAtomic>(10)",
475 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
476 "MPMCQueue<int, DeterministicAtomic>(100)",
481 LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
482 "MPMCQueue<int, DeterministicAtomic>(1)",
490 #define PC_BENCH(q, np, nc, ...) \
491 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
493 TEST(MPMCQueue, mt_prod_cons) {
495 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
496 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
497 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
498 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
499 callers.emplace_back(
500 make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
501 callers.emplace_back(
502 make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
503 for (const auto& caller : callers) {
504 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
505 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
506 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
507 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
508 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
509 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
510 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
511 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
512 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
516 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
518 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
520 callers.emplace_back(
521 make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
522 callers.emplace_back(
523 make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
524 callers.emplace_back(
525 make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
526 callers.emplace_back(
527 make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
529 callers.emplace_back(
530 make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
532 for (const auto& caller : callers) {
533 LOG(INFO) << PC_BENCH(
534 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
535 LOG(INFO) << PC_BENCH(
536 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
537 LOG(INFO) << PC_BENCH(
538 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
539 LOG(INFO) << PC_BENCH(
540 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
541 LOG(INFO) << PC_BENCH(
542 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
543 LOG(INFO) << PC_BENCH(
544 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
545 LOG(INFO) << PC_BENCH(
546 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
547 LOG(INFO) << PC_BENCH(
548 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
549 LOG(INFO) << PC_BENCH(
550 (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
554 template <template <typename> class Atom>
555 void runNeverFailThread(int numThreads,
557 MPMCQueue<int, Atom>& cq,
558 std::atomic<uint64_t>& sum,
560 uint64_t threadSum = 0;
561 for (int i = t; i < n; i += numThreads) {
563 EXPECT_TRUE(cq.writeIfNotFull(i));
566 EXPECT_TRUE(cq.readIfNotEmpty(dest));
567 EXPECT_TRUE(dest >= 0);
573 template <template <typename> class Atom>
574 uint64_t runNeverFailTest(int numThreads, int numOps) {
575 // always #enq >= #deq
576 MPMCQueue<int, Atom> cq(numThreads);
579 auto beginMicro = nowMicro();
581 vector<std::thread> threads(numThreads);
582 std::atomic<uint64_t> sum(0);
583 for (int t = 0; t < numThreads; ++t) {
584 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
591 for (auto& t : threads) {
594 EXPECT_TRUE(cq.isEmpty());
595 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
597 return nowMicro() - beginMicro;
600 TEST(MPMCQueue, mt_never_fail) {
601 int nts[] = {1, 3, 100};
605 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
606 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
611 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
612 int nts[] = {1, 3, 100};
616 uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
617 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
622 TEST(MPMCQueue, mt_never_fail_deterministic) {
625 long seed = 0; // nowMicro() % 10000;
626 LOG(INFO) << "using seed " << seed;
631 DSched sched(DSched::uniform(seed));
632 runNeverFailTest<DeterministicAtomic>(nt, n);
635 DSched sched(DSched::uniformSubset(seed, 2));
636 runNeverFailTest<DeterministicAtomic>(nt, n);
641 template <class Clock, template <typename> class Atom>
642 void runNeverFailUntilThread(int numThreads,
644 MPMCQueue<int, Atom>& cq,
645 std::atomic<uint64_t>& sum,
647 uint64_t threadSum = 0;
648 for (int i = t; i < n; i += numThreads) {
650 auto soon = Clock::now() + std::chrono::seconds(1);
651 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
654 EXPECT_TRUE(cq.readIfNotEmpty(dest));
655 EXPECT_TRUE(dest >= 0);
661 template <class Clock, template <typename> class Atom>
662 uint64_t runNeverFailTest(int numThreads, int numOps) {
663 // always #enq >= #deq
664 MPMCQueue<int, Atom> cq(numThreads);
667 auto beginMicro = nowMicro();
669 vector<std::thread> threads(numThreads);
670 std::atomic<uint64_t> sum(0);
671 for (int t = 0; t < numThreads; ++t) {
672 threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
679 for (auto& t : threads) {
682 EXPECT_TRUE(cq.isEmpty());
683 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
685 return nowMicro() - beginMicro;
688 TEST(MPMCQueue, mt_never_fail_until_system) {
689 int nts[] = {1, 3, 100};
694 runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
695 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
700 TEST(MPMCQueue, mt_never_fail_until_steady) {
701 int nts[] = {1, 3, 100};
706 runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
707 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
712 enum LifecycleEvent {
724 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
725 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
727 static int lc_outstanding() {
728 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
729 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
730 lc_counts[DESTRUCTOR];
733 static void lc_snap() {
734 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
735 lc_prev[i] = lc_counts[i];
739 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
741 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
742 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
743 int delta = i == what || i == what2 ? 1 : 0;
744 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
745 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
746 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
747 << ", from line " << lineno;
752 template <typename R>
754 typedef R IsRelocatable;
758 Lifecycle() noexcept : constructed(true) {
759 ++lc_counts[DEFAULT_CONSTRUCTOR];
762 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
763 : constructed(true) {
764 ++lc_counts[TWO_ARG_CONSTRUCTOR];
767 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
768 ++lc_counts[COPY_CONSTRUCTOR];
771 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
772 ++lc_counts[MOVE_CONSTRUCTOR];
775 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
776 ++lc_counts[COPY_OPERATOR];
780 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
781 ++lc_counts[MOVE_OPERATOR];
785 ~Lifecycle() noexcept {
786 ++lc_counts[DESTRUCTOR];
787 assert(lc_outstanding() >= 0);
793 template <typename R>
794 void runPerfectForwardingTest() {
796 EXPECT_EQ(lc_outstanding(), 0);
799 MPMCQueue<Lifecycle<R>> queue(50);
800 LIFECYCLE_STEP(NOTHING);
802 for (int pass = 0; pass < 10; ++pass) {
803 for (int i = 0; i < 10; ++i) {
804 queue.blockingWrite();
805 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
807 queue.blockingWrite(1, "one");
808 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
812 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
813 queue.blockingWrite(std::move(src));
814 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
816 LIFECYCLE_STEP(DESTRUCTOR);
820 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
821 queue.blockingWrite(src);
822 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
824 LIFECYCLE_STEP(DESTRUCTOR);
826 EXPECT_TRUE(queue.write());
827 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
830 EXPECT_EQ(queue.size(), 50);
831 EXPECT_FALSE(queue.write(2, "two"));
832 LIFECYCLE_STEP(NOTHING);
834 for (int i = 0; i < 50; ++i) {
837 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
839 queue.blockingRead(node);
841 // relocatable, moved via memcpy
842 LIFECYCLE_STEP(DESTRUCTOR);
844 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
847 LIFECYCLE_STEP(DESTRUCTOR);
850 EXPECT_EQ(queue.size(), 0);
853 // put one element back before destruction
855 Lifecycle<R> src(3, "three");
856 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
857 queue.write(std::move(src));
858 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
860 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
862 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
864 EXPECT_EQ(lc_outstanding(), 0);
867 TEST(MPMCQueue, perfect_forwarding) {
868 runPerfectForwardingTest<std::false_type>();
871 TEST(MPMCQueue, perfect_forwarding_relocatable) {
872 runPerfectForwardingTest<std::true_type>();
875 TEST(MPMCQueue, queue_moving) {
877 EXPECT_EQ(lc_outstanding(), 0);
880 MPMCQueue<Lifecycle<std::false_type>> a(50);
881 LIFECYCLE_STEP(NOTHING);
884 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
887 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
888 LIFECYCLE_STEP(NOTHING);
889 EXPECT_EQ(a.capacity(), 0);
890 EXPECT_EQ(a.size(), 0);
891 EXPECT_EQ(b.capacity(), 50);
892 EXPECT_EQ(b.size(), 1);
895 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
898 MPMCQueue<Lifecycle<std::false_type>> c;
899 LIFECYCLE_STEP(NOTHING);
901 LIFECYCLE_STEP(NOTHING);
902 EXPECT_EQ(c.capacity(), 50);
903 EXPECT_EQ(c.size(), 2);
906 Lifecycle<std::false_type> dst;
907 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
909 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
913 MPMCQueue<Lifecycle<std::false_type>> d(10);
914 LIFECYCLE_STEP(NOTHING);
916 LIFECYCLE_STEP(NOTHING);
917 EXPECT_EQ(c.capacity(), 10);
918 EXPECT_TRUE(c.isEmpty());
919 EXPECT_EQ(d.capacity(), 50);
920 EXPECT_EQ(d.size(), 1);
923 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
925 c.blockingWrite(dst);
926 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
928 d.blockingWrite(std::move(dst));
929 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
930 } // d goes out of scope
931 LIFECYCLE_STEP(DESTRUCTOR);
932 } // dst goes out of scope
933 LIFECYCLE_STEP(DESTRUCTOR);
934 } // c goes out of scope
935 LIFECYCLE_STEP(DESTRUCTOR);
938 TEST(MPMCQueue, explicit_zero_capacity_fail) {
939 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);