2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
22 #include <boost/intrusive_ptr.hpp>
29 #include <sys/resource.h>
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
40 typedef DeterministicSchedule DSched;
42 template <template<typename> class Atom>
43 void run_mt_sequencer_thread(
47 TurnSequencer<Atom>& seq,
48 Atom<int>& spinThreshold,
51 for (int op = i; op < numOps; op += numThreads) {
52 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
53 EXPECT_EQ(prev, op - 1);
55 seq.completeTurn(init + op);
59 template <template<typename> class Atom>
60 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
61 TurnSequencer<Atom> seq(init);
62 Atom<int> spinThreshold(0);
65 std::vector<std::thread> threads(numThreads);
66 for (int i = 0; i < numThreads; ++i) {
67 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
68 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
72 for (auto& thr : threads) {
76 EXPECT_EQ(prev, numOps - 1);
79 TEST(MPMCQueue, sequencer) {
80 run_mt_sequencer_test<std::atomic>(1, 100, 0);
81 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
82 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
85 TEST(MPMCQueue, sequencer_deterministic) {
86 DSched sched(DSched::uniform(0));
87 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
88 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
89 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
93 void runElementTypeTest(T&& src) {
95 cq.blockingWrite(std::move(src));
97 cq.blockingRead(dest);
98 EXPECT_TRUE(cq.write(std::move(dest)));
99 EXPECT_TRUE(cq.read(dest));
103 static __thread int active_instances;
105 mutable std::atomic<int> rc;
107 RefCounted() : rc(0) {
115 __thread int RefCounted::active_instances;
118 void intrusive_ptr_add_ref(RefCounted const* p) {
122 void intrusive_ptr_release(RefCounted const* p) {
123 if (--(p->rc) == 0) {
128 TEST(MPMCQueue, lots_of_element_types) {
129 runElementTypeTest(10);
130 runElementTypeTest(std::string("abc"));
131 runElementTypeTest(std::make_pair(10, std::string("def")));
132 runElementTypeTest(std::vector<std::string>{ { "abc" } });
133 runElementTypeTest(std::make_shared<char>('a'));
134 runElementTypeTest(folly::make_unique<char>('a'));
135 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
136 EXPECT_EQ(RefCounted::active_instances, 0);
139 TEST(MPMCQueue, single_thread_enqdeq) {
140 MPMCQueue<int> cq(10);
142 for (int pass = 0; pass < 10; ++pass) {
143 for (int i = 0; i < 10; ++i) {
144 EXPECT_TRUE(cq.write(i));
146 EXPECT_FALSE(cq.write(-1));
147 EXPECT_FALSE(cq.isEmpty());
148 EXPECT_EQ(cq.size(), 10);
150 for (int i = 0; i < 5; ++i) {
152 EXPECT_TRUE(cq.read(dest));
155 for (int i = 5; i < 10; ++i) {
157 cq.blockingRead(dest);
161 EXPECT_FALSE(cq.read(dest));
164 EXPECT_TRUE(cq.isEmpty());
165 EXPECT_EQ(cq.size(), 0);
169 TEST(MPMCQueue, tryenq_capacity_test) {
170 for (size_t cap = 1; cap < 100; ++cap) {
171 MPMCQueue<int> cq(cap);
172 for (int i = 0; i < cap; ++i) {
173 EXPECT_TRUE(cq.write(i));
175 EXPECT_FALSE(cq.write(100));
179 TEST(MPMCQueue, enq_capacity_test) {
180 for (auto cap : { 1, 100, 10000 }) {
181 MPMCQueue<int> cq(cap);
182 for (int i = 0; i < cap; ++i) {
187 auto thr = std::thread([&]{
188 cq.blockingWrite(100);
194 cq.blockingRead(dummy);
200 template <template<typename> class Atom>
201 void runTryEnqDeqThread(
204 MPMCQueue<int, Atom>& cq,
205 std::atomic<uint64_t>& sum,
207 uint64_t threadSum = 0;
209 // received doesn't reflect any actual values, we just start with
210 // t and increment by numThreads to get the rounding of termination
211 // correct if numThreads doesn't evenly divide numOps
213 while (src < n || received < n) {
214 if (src < n && cq.write(src)) {
219 if (received < n && cq.read(dst)) {
220 received += numThreads;
227 template <template<typename> class Atom>
228 void runTryEnqDeqTest(int numThreads, int numOps) {
229 // write and read aren't linearizable, so we don't have
230 // hard guarantees on their individual behavior. We can still test
231 // correctness in aggregate
232 MPMCQueue<int,Atom> cq(numThreads);
235 std::vector<std::thread> threads(numThreads);
236 std::atomic<uint64_t> sum(0);
237 for (int t = 0; t < numThreads; ++t) {
238 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
239 numThreads, n, std::ref(cq), std::ref(sum), t));
241 for (auto& t : threads) {
244 EXPECT_TRUE(cq.isEmpty());
245 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
248 TEST(MPMCQueue, mt_try_enq_deq) {
249 int nts[] = { 1, 3, 100 };
253 runTryEnqDeqTest<std::atomic>(nt, n);
257 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
258 int nts[] = { 3, 10 };
261 LOG(INFO) << "using seed " << seed;
266 DSched sched(DSched::uniform(seed));
267 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
270 DSched sched(DSched::uniformSubset(seed, 2));
271 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
276 uint64_t nowMicro() {
278 gettimeofday(&tv, 0);
279 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
282 template <typename Q>
283 std::string producerConsumerBench(Q&& queue, std::string qName,
284 int numProducers, int numConsumers,
285 int numOps, bool ignoreContents = false) {
288 struct rusage beginUsage;
289 getrusage(RUSAGE_SELF, &beginUsage);
291 auto beginMicro = nowMicro();
294 std::atomic<uint64_t> sum(0);
296 std::vector<std::thread> producers(numProducers);
297 for (int t = 0; t < numProducers; ++t) {
298 producers[t] = DSched::thread([&,t]{
299 for (int i = t; i < numOps; i += numProducers) {
305 std::vector<std::thread> consumers(numConsumers);
306 for (int t = 0; t < numConsumers; ++t) {
307 consumers[t] = DSched::thread([&,t]{
308 uint64_t localSum = 0;
309 for (int i = t; i < numOps; i += numConsumers) {
311 q.blockingRead(dest);
312 EXPECT_FALSE(dest == -1);
319 for (auto& t : producers) {
322 for (auto& t : consumers) {
325 if (!ignoreContents) {
326 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
329 auto endMicro = nowMicro();
331 struct rusage endUsage;
332 getrusage(RUSAGE_SELF, &endUsage);
334 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
335 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
336 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
338 return folly::format(
339 "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
340 qName, numProducers, numConsumers, nanosPer, csw, n).str();
344 TEST(MPMCQueue, mt_prod_cons_deterministic) {
345 // we use the Bench method, but perf results are meaningless under DSched
346 DSched sched(DSched::uniform(0));
348 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
350 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
352 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
354 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
356 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
360 #define PC_BENCH(q, np, nc, ...) \
361 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
363 TEST(MPMCQueue, mt_prod_cons) {
365 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
366 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
367 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
368 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
369 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
370 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
371 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
372 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
373 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
376 template <template<typename> class Atom>
377 void runNeverFailThread(
380 MPMCQueue<int, Atom>& cq,
381 std::atomic<uint64_t>& sum,
383 uint64_t threadSum = 0;
384 for (int i = t; i < n; i += numThreads) {
386 EXPECT_TRUE(cq.writeIfNotFull(i));
389 EXPECT_TRUE(cq.readIfNotEmpty(dest));
390 EXPECT_TRUE(dest >= 0);
396 template <template<typename> class Atom>
397 uint64_t runNeverFailTest(int numThreads, int numOps) {
398 // always #enq >= #deq
399 MPMCQueue<int,Atom> cq(numThreads);
402 auto beginMicro = nowMicro();
404 std::vector<std::thread> threads(numThreads);
405 std::atomic<uint64_t> sum(0);
406 for (int t = 0; t < numThreads; ++t) {
407 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
408 numThreads, n, std::ref(cq), std::ref(sum), t));
410 for (auto& t : threads) {
413 EXPECT_TRUE(cq.isEmpty());
414 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
416 return nowMicro() - beginMicro;
419 TEST(MPMCQueue, mt_never_fail) {
420 int nts[] = { 1, 3, 100 };
424 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
425 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
430 TEST(MPMCQueue, mt_never_fail_deterministic) {
431 int nts[] = { 3, 10 };
433 long seed = 0; // nowMicro() % 10000;
434 LOG(INFO) << "using seed " << seed;
439 DSched sched(DSched::uniform(seed));
440 runNeverFailTest<DeterministicAtomic>(nt, n);
443 DSched sched(DSched::uniformSubset(seed, 2));
444 runNeverFailTest<DeterministicAtomic>(nt, n);
449 enum LifecycleEvent {
461 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
462 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
464 static int lc_outstanding() {
465 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
466 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
467 lc_counts[DESTRUCTOR];
470 static void lc_snap() {
471 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
472 lc_prev[i] = lc_counts[i];
476 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
478 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
479 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
480 int delta = i == what || i == what2 ? 1 : 0;
481 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
482 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
483 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
484 << ", from line " << lineno;
489 template <typename R>
491 typedef R IsRelocatable;
495 Lifecycle() noexcept : constructed(true) {
496 ++lc_counts[DEFAULT_CONSTRUCTOR];
499 explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
500 ++lc_counts[TWO_ARG_CONSTRUCTOR];
503 Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
504 ++lc_counts[COPY_CONSTRUCTOR];
507 Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
508 ++lc_counts[MOVE_CONSTRUCTOR];
511 Lifecycle& operator= (const Lifecycle& rhs) noexcept {
512 ++lc_counts[COPY_OPERATOR];
516 Lifecycle& operator= (Lifecycle&& rhs) noexcept {
517 ++lc_counts[MOVE_OPERATOR];
521 ~Lifecycle() noexcept {
522 ++lc_counts[DESTRUCTOR];
523 assert(lc_outstanding() >= 0);
529 template <typename R>
530 void runPerfectForwardingTest() {
532 EXPECT_EQ(lc_outstanding(), 0);
535 MPMCQueue<Lifecycle<R>> queue(50);
536 LIFECYCLE_STEP(NOTHING);
538 for (int pass = 0; pass < 10; ++pass) {
539 for (int i = 0; i < 10; ++i) {
540 queue.blockingWrite();
541 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
543 queue.blockingWrite(1, "one");
544 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
548 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
549 queue.blockingWrite(std::move(src));
550 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
552 LIFECYCLE_STEP(DESTRUCTOR);
556 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
557 queue.blockingWrite(src);
558 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
560 LIFECYCLE_STEP(DESTRUCTOR);
562 EXPECT_TRUE(queue.write());
563 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
566 EXPECT_EQ(queue.size(), 50);
567 EXPECT_FALSE(queue.write(2, "two"));
568 LIFECYCLE_STEP(NOTHING);
570 for (int i = 0; i < 50; ++i) {
573 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
575 queue.blockingRead(node);
577 // relocatable, moved via memcpy
578 LIFECYCLE_STEP(DESTRUCTOR);
580 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
583 LIFECYCLE_STEP(DESTRUCTOR);
586 EXPECT_EQ(queue.size(), 0);
589 // put one element back before destruction
591 Lifecycle<R> src(3, "three");
592 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
593 queue.write(std::move(src));
594 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
596 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
598 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
600 EXPECT_EQ(lc_outstanding(), 0);
603 TEST(MPMCQueue, perfect_forwarding) {
604 runPerfectForwardingTest<std::false_type>();
607 TEST(MPMCQueue, perfect_forwarding_relocatable) {
608 runPerfectForwardingTest<std::true_type>();
611 TEST(MPMCQueue, queue_moving) {
613 EXPECT_EQ(lc_outstanding(), 0);
616 MPMCQueue<Lifecycle<std::false_type>> a(50);
617 LIFECYCLE_STEP(NOTHING);
620 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
623 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
624 LIFECYCLE_STEP(NOTHING);
625 EXPECT_EQ(a.capacity(), 0);
626 EXPECT_EQ(a.size(), 0);
627 EXPECT_EQ(b.capacity(), 50);
628 EXPECT_EQ(b.size(), 1);
631 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
634 MPMCQueue<Lifecycle<std::false_type>> c;
635 LIFECYCLE_STEP(NOTHING);
637 LIFECYCLE_STEP(NOTHING);
638 EXPECT_EQ(c.capacity(), 50);
639 EXPECT_EQ(c.size(), 2);
642 Lifecycle<std::false_type> dst;
643 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
645 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
649 MPMCQueue<Lifecycle<std::false_type>> d(10);
650 LIFECYCLE_STEP(NOTHING);
652 LIFECYCLE_STEP(NOTHING);
653 EXPECT_EQ(c.capacity(), 10);
654 EXPECT_TRUE(c.isEmpty());
655 EXPECT_EQ(d.capacity(), 50);
656 EXPECT_EQ(d.size(), 1);
659 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
661 c.blockingWrite(dst);
662 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
664 d.blockingWrite(std::move(dst));
665 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
666 } // d goes out of scope
667 LIFECYCLE_STEP(DESTRUCTOR);
668 } // dst goes out of scope
669 LIFECYCLE_STEP(DESTRUCTOR);
670 } // c goes out of scope
671 LIFECYCLE_STEP(DESTRUCTOR);
674 int main(int argc, char ** argv) {
675 testing::InitGoogleTest(&argc, argv);
676 gflags::ParseCommandLineFlags(&argc, &argv, true);
677 return RUN_ALL_TESTS();