2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/SysResource.h>
21 #include <folly/portability/SysTime.h>
22 #include <folly/portability/Unistd.h>
23 #include <folly/test/DeterministicSchedule.h>
25 #include <boost/intrusive_ptr.hpp>
31 #include <gtest/gtest.h>
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35 using namespace folly;
36 using namespace detail;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
43 using std::unique_ptr;
46 typedef DeterministicSchedule DSched;
48 template <template<typename> class Atom>
49 void run_mt_sequencer_thread(
53 TurnSequencer<Atom>& seq,
54 Atom<uint32_t>& spinThreshold,
57 for (int op = i; op < numOps; op += numThreads) {
58 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
59 EXPECT_EQ(prev, op - 1);
61 seq.completeTurn(init + op);
65 template <template<typename> class Atom>
66 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
67 TurnSequencer<Atom> seq(init);
68 Atom<uint32_t> spinThreshold(0);
71 vector<std::thread> threads(numThreads);
72 for (int i = 0; i < numThreads; ++i) {
73 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
74 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
78 for (auto& thr : threads) {
82 EXPECT_EQ(prev, numOps - 1);
85 TEST(MPMCQueue, sequencer) {
86 run_mt_sequencer_test<std::atomic>(1, 100, 0);
87 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
88 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
91 TEST(MPMCQueue, sequencer_emulated_futex) {
92 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
93 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
97 TEST(MPMCQueue, sequencer_deterministic) {
98 DSched sched(DSched::uniform(0));
99 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
100 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
101 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
104 template <typename T>
105 void runElementTypeTest(T&& src) {
107 cq.blockingWrite(std::forward<T>(src));
109 cq.blockingRead(dest);
110 EXPECT_TRUE(cq.write(std::move(dest)));
111 EXPECT_TRUE(cq.read(dest));
112 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
113 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
114 EXPECT_TRUE(cq.read(dest));
115 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
116 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
117 EXPECT_TRUE(cq.read(dest));
121 static FOLLY_TLS int active_instances;
123 mutable std::atomic<int> rc;
125 RefCounted() : rc(0) {
133 FOLLY_TLS int RefCounted::active_instances;
135 void intrusive_ptr_add_ref(RefCounted const* p) {
139 void intrusive_ptr_release(RefCounted const* p) {
140 if (--(p->rc) == 0) {
145 TEST(MPMCQueue, lots_of_element_types) {
146 runElementTypeTest(10);
147 runElementTypeTest(string("abc"));
148 runElementTypeTest(std::make_pair(10, string("def")));
149 runElementTypeTest(vector<string>{{"abc"}});
150 runElementTypeTest(std::make_shared<char>('a'));
151 runElementTypeTest(folly::make_unique<char>('a'));
152 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
153 EXPECT_EQ(RefCounted::active_instances, 0);
156 TEST(MPMCQueue, single_thread_enqdeq) {
157 MPMCQueue<int> cq(10);
159 for (int pass = 0; pass < 10; ++pass) {
160 for (int i = 0; i < 10; ++i) {
161 EXPECT_TRUE(cq.write(i));
163 EXPECT_FALSE(cq.write(-1));
164 EXPECT_FALSE(cq.isEmpty());
165 EXPECT_EQ(cq.size(), 10);
167 for (int i = 0; i < 5; ++i) {
169 EXPECT_TRUE(cq.read(dest));
172 for (int i = 5; i < 10; ++i) {
174 cq.blockingRead(dest);
178 EXPECT_FALSE(cq.read(dest));
181 EXPECT_TRUE(cq.isEmpty());
182 EXPECT_EQ(cq.size(), 0);
186 TEST(MPMCQueue, tryenq_capacity_test) {
187 for (size_t cap = 1; cap < 100; ++cap) {
188 MPMCQueue<int> cq(cap);
189 for (size_t i = 0; i < cap; ++i) {
190 EXPECT_TRUE(cq.write(i));
192 EXPECT_FALSE(cq.write(100));
196 TEST(MPMCQueue, enq_capacity_test) {
197 for (auto cap : { 1, 100, 10000 }) {
198 MPMCQueue<int> cq(cap);
199 for (int i = 0; i < cap; ++i) {
204 auto thr = std::thread([&]{
205 cq.blockingWrite(100);
211 cq.blockingRead(dummy);
217 template <template<typename> class Atom>
218 void runTryEnqDeqThread(
221 MPMCQueue<int, Atom>& cq,
222 std::atomic<uint64_t>& sum,
224 uint64_t threadSum = 0;
226 // received doesn't reflect any actual values, we just start with
227 // t and increment by numThreads to get the rounding of termination
228 // correct if numThreads doesn't evenly divide numOps
230 while (src < n || received < n) {
231 if (src < n && cq.write(src)) {
236 if (received < n && cq.read(dst)) {
237 received += numThreads;
244 template <template<typename> class Atom>
245 void runTryEnqDeqTest(int numThreads, int numOps) {
246 // write and read aren't linearizable, so we don't have
247 // hard guarantees on their individual behavior. We can still test
248 // correctness in aggregate
249 MPMCQueue<int,Atom> cq(numThreads);
252 vector<std::thread> threads(numThreads);
253 std::atomic<uint64_t> sum(0);
254 for (int t = 0; t < numThreads; ++t) {
255 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
256 numThreads, n, std::ref(cq), std::ref(sum), t));
258 for (auto& t : threads) {
261 EXPECT_TRUE(cq.isEmpty());
262 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
265 TEST(MPMCQueue, mt_try_enq_deq) {
266 int nts[] = { 1, 3, 100 };
270 runTryEnqDeqTest<std::atomic>(nt, n);
274 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
275 int nts[] = { 1, 3, 100 };
279 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
283 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
284 int nts[] = { 3, 10 };
287 LOG(INFO) << "using seed " << seed;
292 DSched sched(DSched::uniform(seed));
293 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
296 DSched sched(DSched::uniformSubset(seed, 2));
297 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
302 uint64_t nowMicro() {
304 gettimeofday(&tv, 0);
305 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
308 template <typename Q>
309 struct WriteMethodCaller {
310 WriteMethodCaller() {}
311 virtual ~WriteMethodCaller() = default;
312 virtual bool callWrite(Q& q, int i) = 0;
313 virtual string methodName() = 0;
316 template <typename Q>
317 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
318 bool callWrite(Q& q, int i) override {
322 string methodName() override { return "blockingWrite"; }
325 template <typename Q>
326 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
327 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
328 string methodName() override { return "writeIfNotFull"; }
331 template <typename Q>
332 struct WriteCaller : public WriteMethodCaller<Q> {
333 bool callWrite(Q& q, int i) override { return q.write(i); }
334 string methodName() override { return "write"; }
337 template <typename Q,
338 class Clock = steady_clock,
339 class Duration = typename Clock::duration>
340 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
341 const Duration duration_;
342 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
343 bool callWrite(Q& q, int i) override {
344 auto then = Clock::now() + duration_;
345 return q.tryWriteUntil(then, i);
347 string methodName() override {
348 return folly::sformat(
349 "tryWriteUntil({}ms)",
350 std::chrono::duration_cast<milliseconds>(duration_).count());
354 template <typename Q>
355 string producerConsumerBench(Q&& queue,
360 WriteMethodCaller<Q>& writer,
361 bool ignoreContents = false) {
364 struct rusage beginUsage;
365 getrusage(RUSAGE_SELF, &beginUsage);
367 auto beginMicro = nowMicro();
370 std::atomic<uint64_t> sum(0);
371 std::atomic<uint64_t> failed(0);
373 vector<std::thread> producers(numProducers);
374 for (int t = 0; t < numProducers; ++t) {
375 producers[t] = DSched::thread([&,t]{
376 for (int i = t; i < numOps; i += numProducers) {
377 while (!writer.callWrite(q, i)) {
384 vector<std::thread> consumers(numConsumers);
385 for (int t = 0; t < numConsumers; ++t) {
386 consumers[t] = DSched::thread([&,t]{
387 uint64_t localSum = 0;
388 for (int i = t; i < numOps; i += numConsumers) {
390 q.blockingRead(dest);
391 EXPECT_FALSE(dest == -1);
398 for (auto& t : producers) {
401 for (auto& t : consumers) {
404 if (!ignoreContents) {
405 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
408 auto endMicro = nowMicro();
410 struct rusage endUsage;
411 getrusage(RUSAGE_SELF, &endUsage);
413 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
414 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
415 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
416 uint64_t failures = failed;
418 return folly::sformat(
419 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
420 "handoff, {} failures",
431 TEST(MPMCQueue, mt_prod_cons_deterministic) {
432 // we use the Bench method, but perf results are meaningless under DSched
433 DSched sched(DSched::uniform(0));
435 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
437 callers.emplace_back(
438 make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
439 callers.emplace_back(
440 make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
441 callers.emplace_back(
442 make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
443 callers.emplace_back(
444 make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
446 callers.emplace_back(
447 make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
450 for (const auto& caller : callers) {
452 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
453 "MPMCQueue<int, DeterministicAtomic>(10)",
459 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
460 "MPMCQueue<int, DeterministicAtomic>(100)",
466 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
467 "MPMCQueue<int, DeterministicAtomic>(10)",
473 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
474 "MPMCQueue<int, DeterministicAtomic>(100)",
479 LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
480 "MPMCQueue<int, DeterministicAtomic>(1)",
488 #define PC_BENCH(q, np, nc, ...) \
489 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
491 TEST(MPMCQueue, mt_prod_cons) {
493 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
494 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
495 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
496 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
497 callers.emplace_back(
498 make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
499 callers.emplace_back(
500 make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
501 for (const auto& caller : callers) {
502 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
503 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
504 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
505 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
506 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
507 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
508 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
509 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
510 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
514 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
516 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
518 callers.emplace_back(
519 make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
520 callers.emplace_back(
521 make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
522 callers.emplace_back(
523 make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
524 callers.emplace_back(
525 make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
527 callers.emplace_back(
528 make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
530 for (const auto& caller : callers) {
531 LOG(INFO) << PC_BENCH(
532 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
533 LOG(INFO) << PC_BENCH(
534 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
535 LOG(INFO) << PC_BENCH(
536 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
537 LOG(INFO) << PC_BENCH(
538 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
539 LOG(INFO) << PC_BENCH(
540 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
541 LOG(INFO) << PC_BENCH(
542 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
543 LOG(INFO) << PC_BENCH(
544 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
545 LOG(INFO) << PC_BENCH(
546 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
547 LOG(INFO) << PC_BENCH(
548 (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
552 template <template <typename> class Atom>
553 void runNeverFailThread(int numThreads,
555 MPMCQueue<int, Atom>& cq,
556 std::atomic<uint64_t>& sum,
558 uint64_t threadSum = 0;
559 for (int i = t; i < n; i += numThreads) {
561 EXPECT_TRUE(cq.writeIfNotFull(i));
564 EXPECT_TRUE(cq.readIfNotEmpty(dest));
565 EXPECT_TRUE(dest >= 0);
571 template <template <typename> class Atom>
572 uint64_t runNeverFailTest(int numThreads, int numOps) {
573 // always #enq >= #deq
574 MPMCQueue<int, Atom> cq(numThreads);
577 auto beginMicro = nowMicro();
579 vector<std::thread> threads(numThreads);
580 std::atomic<uint64_t> sum(0);
581 for (int t = 0; t < numThreads; ++t) {
582 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
589 for (auto& t : threads) {
592 EXPECT_TRUE(cq.isEmpty());
593 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
595 return nowMicro() - beginMicro;
598 TEST(MPMCQueue, mt_never_fail) {
599 int nts[] = {1, 3, 100};
603 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
604 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
609 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
610 int nts[] = {1, 3, 100};
614 uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
615 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
620 TEST(MPMCQueue, mt_never_fail_deterministic) {
623 long seed = 0; // nowMicro() % 10000;
624 LOG(INFO) << "using seed " << seed;
629 DSched sched(DSched::uniform(seed));
630 runNeverFailTest<DeterministicAtomic>(nt, n);
633 DSched sched(DSched::uniformSubset(seed, 2));
634 runNeverFailTest<DeterministicAtomic>(nt, n);
639 template <class Clock, template <typename> class Atom>
640 void runNeverFailUntilThread(int numThreads,
642 MPMCQueue<int, Atom>& cq,
643 std::atomic<uint64_t>& sum,
645 uint64_t threadSum = 0;
646 for (int i = t; i < n; i += numThreads) {
648 auto soon = Clock::now() + std::chrono::seconds(1);
649 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
652 EXPECT_TRUE(cq.readIfNotEmpty(dest));
653 EXPECT_TRUE(dest >= 0);
659 template <class Clock, template <typename> class Atom>
660 uint64_t runNeverFailTest(int numThreads, int numOps) {
661 // always #enq >= #deq
662 MPMCQueue<int, Atom> cq(numThreads);
665 auto beginMicro = nowMicro();
667 vector<std::thread> threads(numThreads);
668 std::atomic<uint64_t> sum(0);
669 for (int t = 0; t < numThreads; ++t) {
670 threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
677 for (auto& t : threads) {
680 EXPECT_TRUE(cq.isEmpty());
681 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
683 return nowMicro() - beginMicro;
686 TEST(MPMCQueue, mt_never_fail_until_system) {
687 int nts[] = {1, 3, 100};
692 runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
693 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
698 TEST(MPMCQueue, mt_never_fail_until_steady) {
699 int nts[] = {1, 3, 100};
704 runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
705 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
710 enum LifecycleEvent {
722 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
723 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
725 static int lc_outstanding() {
726 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
727 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
728 lc_counts[DESTRUCTOR];
731 static void lc_snap() {
732 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
733 lc_prev[i] = lc_counts[i];
737 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
739 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
740 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
741 int delta = i == what || i == what2 ? 1 : 0;
742 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
743 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
744 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
745 << ", from line " << lineno;
750 template <typename R>
752 typedef R IsRelocatable;
756 Lifecycle() noexcept : constructed(true) {
757 ++lc_counts[DEFAULT_CONSTRUCTOR];
760 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
761 : constructed(true) {
762 ++lc_counts[TWO_ARG_CONSTRUCTOR];
765 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
766 ++lc_counts[COPY_CONSTRUCTOR];
769 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
770 ++lc_counts[MOVE_CONSTRUCTOR];
773 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
774 ++lc_counts[COPY_OPERATOR];
778 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
779 ++lc_counts[MOVE_OPERATOR];
783 ~Lifecycle() noexcept {
784 ++lc_counts[DESTRUCTOR];
785 assert(lc_outstanding() >= 0);
791 template <typename R>
792 void runPerfectForwardingTest() {
794 EXPECT_EQ(lc_outstanding(), 0);
797 MPMCQueue<Lifecycle<R>> queue(50);
798 LIFECYCLE_STEP(NOTHING);
800 for (int pass = 0; pass < 10; ++pass) {
801 for (int i = 0; i < 10; ++i) {
802 queue.blockingWrite();
803 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
805 queue.blockingWrite(1, "one");
806 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
810 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
811 queue.blockingWrite(std::move(src));
812 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
814 LIFECYCLE_STEP(DESTRUCTOR);
818 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
819 queue.blockingWrite(src);
820 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
822 LIFECYCLE_STEP(DESTRUCTOR);
824 EXPECT_TRUE(queue.write());
825 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
828 EXPECT_EQ(queue.size(), 50);
829 EXPECT_FALSE(queue.write(2, "two"));
830 LIFECYCLE_STEP(NOTHING);
832 for (int i = 0; i < 50; ++i) {
835 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
837 queue.blockingRead(node);
839 // relocatable, moved via memcpy
840 LIFECYCLE_STEP(DESTRUCTOR);
842 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
845 LIFECYCLE_STEP(DESTRUCTOR);
848 EXPECT_EQ(queue.size(), 0);
851 // put one element back before destruction
853 Lifecycle<R> src(3, "three");
854 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
855 queue.write(std::move(src));
856 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
858 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
860 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
862 EXPECT_EQ(lc_outstanding(), 0);
865 TEST(MPMCQueue, perfect_forwarding) {
866 runPerfectForwardingTest<std::false_type>();
869 TEST(MPMCQueue, perfect_forwarding_relocatable) {
870 runPerfectForwardingTest<std::true_type>();
873 TEST(MPMCQueue, queue_moving) {
875 EXPECT_EQ(lc_outstanding(), 0);
878 MPMCQueue<Lifecycle<std::false_type>> a(50);
879 LIFECYCLE_STEP(NOTHING);
882 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
885 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
886 LIFECYCLE_STEP(NOTHING);
887 EXPECT_EQ(a.capacity(), 0);
888 EXPECT_EQ(a.size(), 0);
889 EXPECT_EQ(b.capacity(), 50);
890 EXPECT_EQ(b.size(), 1);
893 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
896 MPMCQueue<Lifecycle<std::false_type>> c;
897 LIFECYCLE_STEP(NOTHING);
899 LIFECYCLE_STEP(NOTHING);
900 EXPECT_EQ(c.capacity(), 50);
901 EXPECT_EQ(c.size(), 2);
904 Lifecycle<std::false_type> dst;
905 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
907 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
911 MPMCQueue<Lifecycle<std::false_type>> d(10);
912 LIFECYCLE_STEP(NOTHING);
914 LIFECYCLE_STEP(NOTHING);
915 EXPECT_EQ(c.capacity(), 10);
916 EXPECT_TRUE(c.isEmpty());
917 EXPECT_EQ(d.capacity(), 50);
918 EXPECT_EQ(d.size(), 1);
921 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
923 c.blockingWrite(dst);
924 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
926 d.blockingWrite(std::move(dst));
927 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
928 } // d goes out of scope
929 LIFECYCLE_STEP(DESTRUCTOR);
930 } // dst goes out of scope
931 LIFECYCLE_STEP(DESTRUCTOR);
932 } // c goes out of scope
933 LIFECYCLE_STEP(DESTRUCTOR);
936 TEST(MPMCQueue, explicit_zero_capacity_fail) {
937 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);