2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
22 #include <boost/intrusive_ptr.hpp>
29 #include <sys/resource.h>
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
40 typedef DeterministicSchedule DSched;
42 template <template<typename> class Atom>
43 void run_mt_sequencer_thread(
47 TurnSequencer<Atom>& seq,
48 Atom<int>& spinThreshold,
51 for (int op = i; op < numOps; op += numThreads) {
52 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
53 EXPECT_EQ(prev, op - 1);
55 seq.completeTurn(init + op);
59 template <template<typename> class Atom>
60 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
61 TurnSequencer<Atom> seq(init);
62 Atom<int> spinThreshold(0);
65 std::vector<std::thread> threads(numThreads);
66 for (int i = 0; i < numThreads; ++i) {
67 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
68 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
72 for (auto& thr : threads) {
76 EXPECT_EQ(prev, numOps - 1);
79 TEST(MPMCQueue, sequencer) {
80 run_mt_sequencer_test<std::atomic>(1, 100, 0);
81 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
82 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
85 TEST(MPMCQueue, sequencer_deterministic) {
86 DSched sched(DSched::uniform(0));
87 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
88 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
89 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
93 void runElementTypeTest(T&& src) {
95 cq.blockingWrite(std::move(src));
97 cq.blockingRead(dest);
98 EXPECT_TRUE(cq.write(std::move(dest)));
99 EXPECT_TRUE(cq.read(dest));
103 mutable std::atomic<int> rc;
105 RefCounted() : rc(0) {}
108 void intrusive_ptr_add_ref(RefCounted const* p) {
112 void intrusive_ptr_release(RefCounted const* p) {
118 TEST(MPMCQueue, lots_of_element_types) {
119 runElementTypeTest(10);
120 runElementTypeTest(std::string("abc"));
121 runElementTypeTest(std::make_pair(10, std::string("def")));
122 runElementTypeTest(std::vector<std::string>{ { "abc" } });
123 runElementTypeTest(std::make_shared<char>('a'));
124 runElementTypeTest(folly::make_unique<char>('a'));
125 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
128 TEST(MPMCQueue, single_thread_enqdeq) {
129 MPMCQueue<int> cq(10);
131 for (int pass = 0; pass < 10; ++pass) {
132 for (int i = 0; i < 10; ++i) {
133 EXPECT_TRUE(cq.write(i));
135 EXPECT_FALSE(cq.write(-1));
136 EXPECT_FALSE(cq.isEmpty());
137 EXPECT_EQ(cq.size(), 10);
139 for (int i = 0; i < 5; ++i) {
141 EXPECT_TRUE(cq.read(dest));
144 for (int i = 5; i < 10; ++i) {
146 cq.blockingRead(dest);
150 EXPECT_FALSE(cq.read(dest));
153 EXPECT_TRUE(cq.isEmpty());
154 EXPECT_EQ(cq.size(), 0);
158 TEST(MPMCQueue, tryenq_capacity_test) {
159 for (size_t cap = 1; cap < 100; ++cap) {
160 MPMCQueue<int> cq(cap);
161 for (int i = 0; i < cap; ++i) {
162 EXPECT_TRUE(cq.write(i));
164 EXPECT_FALSE(cq.write(100));
168 TEST(MPMCQueue, enq_capacity_test) {
169 for (auto cap : { 1, 100, 10000 }) {
170 MPMCQueue<int> cq(cap);
171 for (int i = 0; i < cap; ++i) {
176 auto thr = std::thread([&]{
177 cq.blockingWrite(100);
183 cq.blockingRead(dummy);
189 template <template<typename> class Atom>
190 void runTryEnqDeqThread(
193 MPMCQueue<int, Atom>& cq,
194 std::atomic<uint64_t>& sum,
196 uint64_t threadSum = 0;
198 // received doesn't reflect any actual values, we just start with
199 // t and increment by numThreads to get the rounding of termination
200 // correct if numThreads doesn't evenly divide numOps
202 while (src < n || received < n) {
203 if (src < n && cq.write(src)) {
208 if (received < n && cq.read(dst)) {
209 received += numThreads;
216 template <template<typename> class Atom>
217 void runTryEnqDeqTest(int numThreads, int numOps) {
218 // write and read aren't linearizable, so we don't have
219 // hard guarantees on their individual behavior. We can still test
220 // correctness in aggregate
221 MPMCQueue<int,Atom> cq(numThreads);
224 std::vector<std::thread> threads(numThreads);
225 std::atomic<uint64_t> sum(0);
226 for (int t = 0; t < numThreads; ++t) {
227 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
228 numThreads, n, std::ref(cq), std::ref(sum), t));
230 for (auto& t : threads) {
233 EXPECT_TRUE(cq.isEmpty());
234 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
237 TEST(MPMCQueue, mt_try_enq_deq) {
238 int nts[] = { 1, 3, 100 };
242 runTryEnqDeqTest<std::atomic>(nt, n);
246 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
247 int nts[] = { 3, 10 };
250 LOG(INFO) << "using seed " << seed;
255 DSched sched(DSched::uniform(seed));
256 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
259 DSched sched(DSched::uniformSubset(seed, 2));
260 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
265 uint64_t nowMicro() {
267 gettimeofday(&tv, 0);
268 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
271 template <typename Q>
272 std::string producerConsumerBench(Q&& queue, std::string qName,
273 int numProducers, int numConsumers,
274 int numOps, bool ignoreContents = false) {
277 struct rusage beginUsage;
278 getrusage(RUSAGE_SELF, &beginUsage);
280 auto beginMicro = nowMicro();
283 std::atomic<uint64_t> sum(0);
285 std::vector<std::thread> producers(numProducers);
286 for (int t = 0; t < numProducers; ++t) {
287 producers[t] = DSched::thread([&,t]{
288 for (int i = t; i < numOps; i += numProducers) {
294 std::vector<std::thread> consumers(numConsumers);
295 for (int t = 0; t < numConsumers; ++t) {
296 consumers[t] = DSched::thread([&,t]{
297 uint64_t localSum = 0;
298 for (int i = t; i < numOps; i += numConsumers) {
300 q.blockingRead(dest);
301 EXPECT_FALSE(dest == -1);
308 for (auto& t : producers) {
311 for (auto& t : consumers) {
314 if (!ignoreContents) {
315 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
318 auto endMicro = nowMicro();
320 struct rusage endUsage;
321 getrusage(RUSAGE_SELF, &endUsage);
323 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
324 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
325 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
327 return folly::format(
328 "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
329 qName, numProducers, numConsumers, nanosPer, csw, n).str();
333 TEST(MPMCQueue, mt_prod_cons_deterministic) {
334 // we use the Bench method, but perf results are meaningless under DSched
335 DSched sched(DSched::uniform(0));
337 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
339 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
341 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
343 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
345 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
349 #define PC_BENCH(q, np, nc, ...) \
350 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
352 TEST(MPMCQueue, mt_prod_cons) {
354 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
355 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
356 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
357 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
358 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
359 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
360 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
361 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
362 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
365 template <template<typename> class Atom>
366 void runNeverFailThread(
369 MPMCQueue<int, Atom>& cq,
370 std::atomic<uint64_t>& sum,
372 uint64_t threadSum = 0;
373 for (int i = t; i < n; i += numThreads) {
375 EXPECT_TRUE(cq.writeIfNotFull(i));
378 EXPECT_TRUE(cq.readIfNotEmpty(dest));
379 EXPECT_TRUE(dest >= 0);
385 template <template<typename> class Atom>
386 uint64_t runNeverFailTest(int numThreads, int numOps) {
387 // always #enq >= #deq
388 MPMCQueue<int,Atom> cq(numThreads);
391 auto beginMicro = nowMicro();
393 std::vector<std::thread> threads(numThreads);
394 std::atomic<uint64_t> sum(0);
395 for (int t = 0; t < numThreads; ++t) {
396 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
397 numThreads, n, std::ref(cq), std::ref(sum), t));
399 for (auto& t : threads) {
402 EXPECT_TRUE(cq.isEmpty());
403 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
405 return nowMicro() - beginMicro;
408 TEST(MPMCQueue, mt_never_fail) {
409 int nts[] = { 1, 3, 100 };
413 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
414 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
419 TEST(MPMCQueue, mt_never_fail_deterministic) {
420 int nts[] = { 3, 10 };
422 long seed = 0; // nowMicro() % 10000;
423 LOG(INFO) << "using seed " << seed;
428 DSched sched(DSched::uniform(seed));
429 runNeverFailTest<DeterministicAtomic>(nt, n);
432 DSched sched(DSched::uniformSubset(seed, 2));
433 runNeverFailTest<DeterministicAtomic>(nt, n);
438 enum LifecycleEvent {
450 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
451 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
453 static int lc_outstanding() {
454 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
455 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
456 lc_counts[DESTRUCTOR];
459 static void lc_snap() {
460 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
461 lc_prev[i] = lc_counts[i];
465 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
467 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
468 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
469 int delta = i == what || i == what2 ? 1 : 0;
470 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
471 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
472 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
473 << ", from line " << lineno;
478 template <typename R>
480 typedef R IsRelocatable;
484 Lifecycle() noexcept : constructed(true) {
485 ++lc_counts[DEFAULT_CONSTRUCTOR];
488 explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
489 ++lc_counts[TWO_ARG_CONSTRUCTOR];
492 Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
493 ++lc_counts[COPY_CONSTRUCTOR];
496 Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
497 ++lc_counts[MOVE_CONSTRUCTOR];
500 Lifecycle& operator= (const Lifecycle& rhs) noexcept {
501 ++lc_counts[COPY_OPERATOR];
505 Lifecycle& operator= (Lifecycle&& rhs) noexcept {
506 ++lc_counts[MOVE_OPERATOR];
510 ~Lifecycle() noexcept {
511 ++lc_counts[DESTRUCTOR];
512 assert(lc_outstanding() >= 0);
518 template <typename R>
519 void runPerfectForwardingTest() {
521 EXPECT_EQ(lc_outstanding(), 0);
524 MPMCQueue<Lifecycle<R>> queue(50);
525 LIFECYCLE_STEP(NOTHING);
527 for (int pass = 0; pass < 10; ++pass) {
528 for (int i = 0; i < 10; ++i) {
529 queue.blockingWrite();
530 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
532 queue.blockingWrite(1, "one");
533 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
537 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
538 queue.blockingWrite(std::move(src));
539 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
541 LIFECYCLE_STEP(DESTRUCTOR);
545 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
546 queue.blockingWrite(src);
547 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
549 LIFECYCLE_STEP(DESTRUCTOR);
551 EXPECT_TRUE(queue.write());
552 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
555 EXPECT_EQ(queue.size(), 50);
556 EXPECT_FALSE(queue.write(2, "two"));
557 LIFECYCLE_STEP(NOTHING);
559 for (int i = 0; i < 50; ++i) {
562 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
564 queue.blockingRead(node);
566 // relocatable, moved via memcpy
567 LIFECYCLE_STEP(DESTRUCTOR);
569 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
572 LIFECYCLE_STEP(DESTRUCTOR);
575 EXPECT_EQ(queue.size(), 0);
578 // put one element back before destruction
580 Lifecycle<R> src(3, "three");
581 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
582 queue.write(std::move(src));
583 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
585 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
587 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
589 EXPECT_EQ(lc_outstanding(), 0);
592 TEST(MPMCQueue, perfect_forwarding) {
593 runPerfectForwardingTest<std::false_type>();
596 TEST(MPMCQueue, perfect_forwarding_relocatable) {
597 runPerfectForwardingTest<std::true_type>();
600 TEST(MPMCQueue, queue_moving) {
602 EXPECT_EQ(lc_outstanding(), 0);
605 MPMCQueue<Lifecycle<std::false_type>> a(50);
606 LIFECYCLE_STEP(NOTHING);
609 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
612 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
613 LIFECYCLE_STEP(NOTHING);
614 EXPECT_EQ(a.capacity(), 0);
615 EXPECT_EQ(a.size(), 0);
616 EXPECT_EQ(b.capacity(), 50);
617 EXPECT_EQ(b.size(), 1);
620 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
623 MPMCQueue<Lifecycle<std::false_type>> c;
624 LIFECYCLE_STEP(NOTHING);
626 LIFECYCLE_STEP(NOTHING);
627 EXPECT_EQ(c.capacity(), 50);
628 EXPECT_EQ(c.size(), 2);
631 Lifecycle<std::false_type> dst;
632 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
634 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
638 MPMCQueue<Lifecycle<std::false_type>> d(10);
639 LIFECYCLE_STEP(NOTHING);
641 LIFECYCLE_STEP(NOTHING);
642 EXPECT_EQ(c.capacity(), 10);
643 EXPECT_TRUE(c.isEmpty());
644 EXPECT_EQ(d.capacity(), 50);
645 EXPECT_EQ(d.size(), 1);
648 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
650 c.blockingWrite(dst);
651 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
653 d.blockingWrite(std::move(dst));
654 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
655 } // d goes out of scope
656 LIFECYCLE_STEP(DESTRUCTOR);
657 } // dst goes out of scope
658 LIFECYCLE_STEP(DESTRUCTOR);
659 } // c goes out of scope
660 LIFECYCLE_STEP(DESTRUCTOR);
663 int main(int argc, char ** argv) {
664 testing::InitGoogleTest(&argc, argv);
665 gflags::ParseCommandLineFlags(&argc, &argv, true);
666 return RUN_ALL_TESTS();