2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
22 #include <boost/intrusive_ptr.hpp>
29 #include <sys/resource.h>
31 #include <gtest/gtest.h>
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35 using namespace folly;
36 using namespace detail;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
43 using std::unique_ptr;
46 typedef DeterministicSchedule DSched;
48 template <template<typename> class Atom>
49 void run_mt_sequencer_thread(
53 TurnSequencer<Atom>& seq,
54 Atom<uint32_t>& spinThreshold,
57 for (int op = i; op < numOps; op += numThreads) {
58 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
59 EXPECT_EQ(prev, op - 1);
61 seq.completeTurn(init + op);
65 template <template<typename> class Atom>
66 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
67 TurnSequencer<Atom> seq(init);
68 Atom<uint32_t> spinThreshold(0);
71 vector<std::thread> threads(numThreads);
72 for (int i = 0; i < numThreads; ++i) {
73 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
74 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
78 for (auto& thr : threads) {
82 EXPECT_EQ(prev, numOps - 1);
85 TEST(MPMCQueue, sequencer) {
86 run_mt_sequencer_test<std::atomic>(1, 100, 0);
87 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
88 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
91 TEST(MPMCQueue, sequencer_emulated_futex) {
92 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
93 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
97 TEST(MPMCQueue, sequencer_deterministic) {
98 DSched sched(DSched::uniform(0));
99 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
100 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
101 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
104 template <typename T>
105 void runElementTypeTest(T&& src) {
107 cq.blockingWrite(std::forward<T>(src));
109 cq.blockingRead(dest);
110 EXPECT_TRUE(cq.write(std::move(dest)));
111 EXPECT_TRUE(cq.read(dest));
112 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
113 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
114 EXPECT_TRUE(cq.read(dest));
115 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
116 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
117 EXPECT_TRUE(cq.read(dest));
121 static __thread int active_instances;
123 mutable std::atomic<int> rc;
125 RefCounted() : rc(0) {
133 __thread int RefCounted::active_instances;
136 void intrusive_ptr_add_ref(RefCounted const* p) {
140 void intrusive_ptr_release(RefCounted const* p) {
141 if (--(p->rc) == 0) {
146 TEST(MPMCQueue, lots_of_element_types) {
147 runElementTypeTest(10);
148 runElementTypeTest(string("abc"));
149 runElementTypeTest(std::make_pair(10, string("def")));
150 runElementTypeTest(vector<string>{{"abc"}});
151 runElementTypeTest(std::make_shared<char>('a'));
152 runElementTypeTest(folly::make_unique<char>('a'));
153 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154 EXPECT_EQ(RefCounted::active_instances, 0);
157 TEST(MPMCQueue, single_thread_enqdeq) {
158 MPMCQueue<int> cq(10);
160 for (int pass = 0; pass < 10; ++pass) {
161 for (int i = 0; i < 10; ++i) {
162 EXPECT_TRUE(cq.write(i));
164 EXPECT_FALSE(cq.write(-1));
165 EXPECT_FALSE(cq.isEmpty());
166 EXPECT_EQ(cq.size(), 10);
168 for (int i = 0; i < 5; ++i) {
170 EXPECT_TRUE(cq.read(dest));
173 for (int i = 5; i < 10; ++i) {
175 cq.blockingRead(dest);
179 EXPECT_FALSE(cq.read(dest));
182 EXPECT_TRUE(cq.isEmpty());
183 EXPECT_EQ(cq.size(), 0);
187 TEST(MPMCQueue, tryenq_capacity_test) {
188 for (size_t cap = 1; cap < 100; ++cap) {
189 MPMCQueue<int> cq(cap);
190 for (size_t i = 0; i < cap; ++i) {
191 EXPECT_TRUE(cq.write(i));
193 EXPECT_FALSE(cq.write(100));
197 TEST(MPMCQueue, enq_capacity_test) {
198 for (auto cap : { 1, 100, 10000 }) {
199 MPMCQueue<int> cq(cap);
200 for (int i = 0; i < cap; ++i) {
205 auto thr = std::thread([&]{
206 cq.blockingWrite(100);
212 cq.blockingRead(dummy);
218 template <template<typename> class Atom>
219 void runTryEnqDeqThread(
222 MPMCQueue<int, Atom>& cq,
223 std::atomic<uint64_t>& sum,
225 uint64_t threadSum = 0;
227 // received doesn't reflect any actual values, we just start with
228 // t and increment by numThreads to get the rounding of termination
229 // correct if numThreads doesn't evenly divide numOps
231 while (src < n || received < n) {
232 if (src < n && cq.write(src)) {
237 if (received < n && cq.read(dst)) {
238 received += numThreads;
245 template <template<typename> class Atom>
246 void runTryEnqDeqTest(int numThreads, int numOps) {
247 // write and read aren't linearizable, so we don't have
248 // hard guarantees on their individual behavior. We can still test
249 // correctness in aggregate
250 MPMCQueue<int,Atom> cq(numThreads);
253 vector<std::thread> threads(numThreads);
254 std::atomic<uint64_t> sum(0);
255 for (int t = 0; t < numThreads; ++t) {
256 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
257 numThreads, n, std::ref(cq), std::ref(sum), t));
259 for (auto& t : threads) {
262 EXPECT_TRUE(cq.isEmpty());
263 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
266 TEST(MPMCQueue, mt_try_enq_deq) {
267 int nts[] = { 1, 3, 100 };
271 runTryEnqDeqTest<std::atomic>(nt, n);
275 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
276 int nts[] = { 1, 3, 100 };
280 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
284 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
285 int nts[] = { 3, 10 };
288 LOG(INFO) << "using seed " << seed;
293 DSched sched(DSched::uniform(seed));
294 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
297 DSched sched(DSched::uniformSubset(seed, 2));
298 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
303 uint64_t nowMicro() {
305 gettimeofday(&tv, 0);
306 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
309 template <typename Q>
310 struct WriteMethodCaller {
311 WriteMethodCaller() {}
312 virtual ~WriteMethodCaller() = default;
313 virtual bool callWrite(Q& q, int i) = 0;
314 virtual string methodName() = 0;
317 template <typename Q>
318 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
319 bool callWrite(Q& q, int i) override {
323 string methodName() override { return "blockingWrite"; }
326 template <typename Q>
327 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
328 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
329 string methodName() override { return "writeIfNotFull"; }
332 template <typename Q>
333 struct WriteCaller : public WriteMethodCaller<Q> {
334 bool callWrite(Q& q, int i) override { return q.write(i); }
335 string methodName() override { return "write"; }
338 template <typename Q,
339 class Clock = steady_clock,
340 class Duration = typename Clock::duration>
341 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
342 const Duration duration_;
343 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
344 bool callWrite(Q& q, int i) override {
345 auto then = Clock::now() + duration_;
346 return q.tryWriteUntil(then, i);
348 string methodName() override {
349 return folly::sformat(
350 "tryWriteUntil({}ms)",
351 std::chrono::duration_cast<milliseconds>(duration_).count());
355 template <typename Q>
356 string producerConsumerBench(Q&& queue,
361 WriteMethodCaller<Q>& writer,
362 bool ignoreContents = false) {
365 struct rusage beginUsage;
366 getrusage(RUSAGE_SELF, &beginUsage);
368 auto beginMicro = nowMicro();
371 std::atomic<uint64_t> sum(0);
372 std::atomic<uint64_t> failed(0);
374 vector<std::thread> producers(numProducers);
375 for (int t = 0; t < numProducers; ++t) {
376 producers[t] = DSched::thread([&,t]{
377 for (int i = t; i < numOps; i += numProducers) {
378 while (!writer.callWrite(q, i)) {
385 vector<std::thread> consumers(numConsumers);
386 for (int t = 0; t < numConsumers; ++t) {
387 consumers[t] = DSched::thread([&,t]{
388 uint64_t localSum = 0;
389 for (int i = t; i < numOps; i += numConsumers) {
391 q.blockingRead(dest);
392 EXPECT_FALSE(dest == -1);
399 for (auto& t : producers) {
402 for (auto& t : consumers) {
405 if (!ignoreContents) {
406 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
409 auto endMicro = nowMicro();
411 struct rusage endUsage;
412 getrusage(RUSAGE_SELF, &endUsage);
414 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
415 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
416 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
417 uint64_t failures = failed;
419 return folly::sformat(
420 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
421 "handoff, {} failures",
432 TEST(MPMCQueue, mt_prod_cons_deterministic) {
433 // we use the Bench method, but perf results are meaningless under DSched
434 DSched sched(DSched::uniform(0));
436 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
438 callers.emplace_back(
439 make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
440 callers.emplace_back(
441 make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
442 callers.emplace_back(
443 make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
444 callers.emplace_back(
445 make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
447 callers.emplace_back(
448 make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
451 for (const auto& caller : callers) {
453 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
454 "MPMCQueue<int, DeterministicAtomic>(10)",
460 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
461 "MPMCQueue<int, DeterministicAtomic>(100)",
467 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
468 "MPMCQueue<int, DeterministicAtomic>(10)",
474 << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
475 "MPMCQueue<int, DeterministicAtomic>(100)",
480 LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
481 "MPMCQueue<int, DeterministicAtomic>(1)",
489 #define PC_BENCH(q, np, nc, ...) \
490 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
492 TEST(MPMCQueue, mt_prod_cons) {
494 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
495 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
496 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
497 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
498 callers.emplace_back(
499 make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
500 callers.emplace_back(
501 make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
502 for (const auto& caller : callers) {
503 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
504 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
505 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
506 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
507 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
508 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
509 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
510 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
511 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
515 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
517 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
519 callers.emplace_back(
520 make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
521 callers.emplace_back(
522 make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
523 callers.emplace_back(
524 make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
525 callers.emplace_back(
526 make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
528 callers.emplace_back(
529 make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
531 for (const auto& caller : callers) {
532 LOG(INFO) << PC_BENCH(
533 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
534 LOG(INFO) << PC_BENCH(
535 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
536 LOG(INFO) << PC_BENCH(
537 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
538 LOG(INFO) << PC_BENCH(
539 (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
540 LOG(INFO) << PC_BENCH(
541 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
542 LOG(INFO) << PC_BENCH(
543 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
544 LOG(INFO) << PC_BENCH(
545 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
546 LOG(INFO) << PC_BENCH(
547 (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
548 LOG(INFO) << PC_BENCH(
549 (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
553 template <template <typename> class Atom>
554 void runNeverFailThread(int numThreads,
556 MPMCQueue<int, Atom>& cq,
557 std::atomic<uint64_t>& sum,
559 uint64_t threadSum = 0;
560 for (int i = t; i < n; i += numThreads) {
562 EXPECT_TRUE(cq.writeIfNotFull(i));
565 EXPECT_TRUE(cq.readIfNotEmpty(dest));
566 EXPECT_TRUE(dest >= 0);
572 template <template <typename> class Atom>
573 uint64_t runNeverFailTest(int numThreads, int numOps) {
574 // always #enq >= #deq
575 MPMCQueue<int, Atom> cq(numThreads);
578 auto beginMicro = nowMicro();
580 vector<std::thread> threads(numThreads);
581 std::atomic<uint64_t> sum(0);
582 for (int t = 0; t < numThreads; ++t) {
583 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
590 for (auto& t : threads) {
593 EXPECT_TRUE(cq.isEmpty());
594 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
596 return nowMicro() - beginMicro;
599 TEST(MPMCQueue, mt_never_fail) {
600 int nts[] = {1, 3, 100};
604 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
605 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
610 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
611 int nts[] = {1, 3, 100};
615 uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
616 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
621 TEST(MPMCQueue, mt_never_fail_deterministic) {
624 long seed = 0; // nowMicro() % 10000;
625 LOG(INFO) << "using seed " << seed;
630 DSched sched(DSched::uniform(seed));
631 runNeverFailTest<DeterministicAtomic>(nt, n);
634 DSched sched(DSched::uniformSubset(seed, 2));
635 runNeverFailTest<DeterministicAtomic>(nt, n);
640 template <class Clock, template <typename> class Atom>
641 void runNeverFailUntilThread(int numThreads,
643 MPMCQueue<int, Atom>& cq,
644 std::atomic<uint64_t>& sum,
646 uint64_t threadSum = 0;
647 for (int i = t; i < n; i += numThreads) {
649 auto soon = Clock::now() + std::chrono::seconds(1);
650 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
653 EXPECT_TRUE(cq.readIfNotEmpty(dest));
654 EXPECT_TRUE(dest >= 0);
660 template <class Clock, template <typename> class Atom>
661 uint64_t runNeverFailTest(int numThreads, int numOps) {
662 // always #enq >= #deq
663 MPMCQueue<int, Atom> cq(numThreads);
666 auto beginMicro = nowMicro();
668 vector<std::thread> threads(numThreads);
669 std::atomic<uint64_t> sum(0);
670 for (int t = 0; t < numThreads; ++t) {
671 threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
678 for (auto& t : threads) {
681 EXPECT_TRUE(cq.isEmpty());
682 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
684 return nowMicro() - beginMicro;
687 TEST(MPMCQueue, mt_never_fail_until_system) {
688 int nts[] = {1, 3, 100};
693 runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
694 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
699 TEST(MPMCQueue, mt_never_fail_until_steady) {
700 int nts[] = {1, 3, 100};
705 runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
706 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
711 enum LifecycleEvent {
723 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
724 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
726 static int lc_outstanding() {
727 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
728 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
729 lc_counts[DESTRUCTOR];
732 static void lc_snap() {
733 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
734 lc_prev[i] = lc_counts[i];
738 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
740 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
741 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
742 int delta = i == what || i == what2 ? 1 : 0;
743 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
744 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
745 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
746 << ", from line " << lineno;
751 template <typename R>
753 typedef R IsRelocatable;
757 Lifecycle() noexcept : constructed(true) {
758 ++lc_counts[DEFAULT_CONSTRUCTOR];
761 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
762 : constructed(true) {
763 ++lc_counts[TWO_ARG_CONSTRUCTOR];
766 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
767 ++lc_counts[COPY_CONSTRUCTOR];
770 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
771 ++lc_counts[MOVE_CONSTRUCTOR];
774 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
775 ++lc_counts[COPY_OPERATOR];
779 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
780 ++lc_counts[MOVE_OPERATOR];
784 ~Lifecycle() noexcept {
785 ++lc_counts[DESTRUCTOR];
786 assert(lc_outstanding() >= 0);
792 template <typename R>
793 void runPerfectForwardingTest() {
795 EXPECT_EQ(lc_outstanding(), 0);
798 MPMCQueue<Lifecycle<R>> queue(50);
799 LIFECYCLE_STEP(NOTHING);
801 for (int pass = 0; pass < 10; ++pass) {
802 for (int i = 0; i < 10; ++i) {
803 queue.blockingWrite();
804 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
806 queue.blockingWrite(1, "one");
807 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
811 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
812 queue.blockingWrite(std::move(src));
813 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
815 LIFECYCLE_STEP(DESTRUCTOR);
819 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
820 queue.blockingWrite(src);
821 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
823 LIFECYCLE_STEP(DESTRUCTOR);
825 EXPECT_TRUE(queue.write());
826 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
829 EXPECT_EQ(queue.size(), 50);
830 EXPECT_FALSE(queue.write(2, "two"));
831 LIFECYCLE_STEP(NOTHING);
833 for (int i = 0; i < 50; ++i) {
836 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
838 queue.blockingRead(node);
840 // relocatable, moved via memcpy
841 LIFECYCLE_STEP(DESTRUCTOR);
843 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
846 LIFECYCLE_STEP(DESTRUCTOR);
849 EXPECT_EQ(queue.size(), 0);
852 // put one element back before destruction
854 Lifecycle<R> src(3, "three");
855 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
856 queue.write(std::move(src));
857 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
859 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
861 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
863 EXPECT_EQ(lc_outstanding(), 0);
866 TEST(MPMCQueue, perfect_forwarding) {
867 runPerfectForwardingTest<std::false_type>();
870 TEST(MPMCQueue, perfect_forwarding_relocatable) {
871 runPerfectForwardingTest<std::true_type>();
874 TEST(MPMCQueue, queue_moving) {
876 EXPECT_EQ(lc_outstanding(), 0);
879 MPMCQueue<Lifecycle<std::false_type>> a(50);
880 LIFECYCLE_STEP(NOTHING);
883 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
886 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
887 LIFECYCLE_STEP(NOTHING);
888 EXPECT_EQ(a.capacity(), 0);
889 EXPECT_EQ(a.size(), 0);
890 EXPECT_EQ(b.capacity(), 50);
891 EXPECT_EQ(b.size(), 1);
894 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
897 MPMCQueue<Lifecycle<std::false_type>> c;
898 LIFECYCLE_STEP(NOTHING);
900 LIFECYCLE_STEP(NOTHING);
901 EXPECT_EQ(c.capacity(), 50);
902 EXPECT_EQ(c.size(), 2);
905 Lifecycle<std::false_type> dst;
906 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
908 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
912 MPMCQueue<Lifecycle<std::false_type>> d(10);
913 LIFECYCLE_STEP(NOTHING);
915 LIFECYCLE_STEP(NOTHING);
916 EXPECT_EQ(c.capacity(), 10);
917 EXPECT_TRUE(c.isEmpty());
918 EXPECT_EQ(d.capacity(), 50);
919 EXPECT_EQ(d.size(), 1);
922 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
924 c.blockingWrite(dst);
925 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
927 d.blockingWrite(std::move(dst));
928 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
929 } // d goes out of scope
930 LIFECYCLE_STEP(DESTRUCTOR);
931 } // dst goes out of scope
932 LIFECYCLE_STEP(DESTRUCTOR);
933 } // c goes out of scope
934 LIFECYCLE_STEP(DESTRUCTOR);
937 TEST(MPMCQueue, explicit_zero_capacity_fail) {
938 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);