2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCPipeline.h>
22 #include <glog/logging.h>
23 #include <gtest/gtest.h>
25 #include <folly/Conv.h>
27 namespace folly { namespace test {
29 TEST(MPMCPipeline, Trivial) {
30 MPMCPipeline<int, std::string> a(2, 2);
31 EXPECT_EQ(0, a.sizeGuess());
33 EXPECT_EQ(1, a.sizeGuess());
36 auto ticket = a.blockingReadStage<0>(val);
38 EXPECT_EQ(1, a.sizeGuess());
40 a.blockingWriteStage<0>(ticket, "hello world");
41 EXPECT_EQ(1, a.sizeGuess());
46 EXPECT_EQ("hello world", s);
47 EXPECT_EQ(0, a.sizeGuess());
50 TEST(MPMCPipeline, TrivialAmplification) {
51 MPMCPipeline<int, MPMCPipelineStage<std::string, 2>> a(2, 2);
52 EXPECT_EQ(0, a.sizeGuess());
54 EXPECT_EQ(2, a.sizeGuess());
57 auto ticket = a.blockingReadStage<0>(val);
59 EXPECT_EQ(2, a.sizeGuess());
61 a.blockingWriteStage<0>(ticket, "hello world");
62 EXPECT_EQ(2, a.sizeGuess());
63 a.blockingWriteStage<0>(ticket, "goodbye");
64 EXPECT_EQ(2, a.sizeGuess());
69 EXPECT_EQ("hello world", s);
70 EXPECT_EQ(1, a.sizeGuess());
73 EXPECT_EQ("goodbye", s);
74 EXPECT_EQ(0, a.sizeGuess());
77 TEST(MPMCPipeline, MultiThreaded) {
78 constexpr size_t numThreadsPerStage = 6;
79 MPMCPipeline<int, std::string, std::string> a(5, 5, 5);
81 std::vector<std::thread> threads;
82 threads.reserve(numThreadsPerStage * 2 + 1);
83 for (size_t i = 0; i < numThreadsPerStage; ++i) {
84 threads.emplace_back([&a, i] () {
87 auto ticket = a.blockingReadStage<0>(val);
88 if (val == -1) { // stop
89 // We still need to propagate
90 a.blockingWriteStage<0>(ticket, "");
93 a.blockingWriteStage<0>(
94 ticket, folly::to<std::string>(val, " hello"));
99 for (size_t i = 0; i < numThreadsPerStage; ++i) {
100 threads.emplace_back([&a, i] () {
103 auto ticket = a.blockingReadStage<1>(val);
104 if (val.empty()) { // stop
105 // We still need to propagate
106 a.blockingWriteStage<1>(ticket, "");
109 a.blockingWriteStage<1>(
110 ticket, folly::to<std::string>(val, " world"));
115 std::vector<std::string> results;
116 threads.emplace_back([&a, &results] () {
123 results.push_back(val);
127 constexpr size_t numValues = 1000;
128 for (size_t i = 0; i < numValues; ++i) {
131 for (size_t i = 0; i < numThreadsPerStage; ++i) {
135 for (auto& t : threads) {
139 // The consumer thread dequeued the first empty string, there should be
140 // numThreadsPerStage - 1 left.
141 EXPECT_EQ(numThreadsPerStage - 1, a.sizeGuess());
142 for (size_t i = 0; i < numThreadsPerStage - 1; ++i) {
145 EXPECT_TRUE(val.empty());
149 EXPECT_FALSE(a.read(tmp));
151 EXPECT_EQ(0, a.sizeGuess());
153 EXPECT_EQ(numValues, results.size());
154 for (size_t i = 0; i < results.size(); ++i) {
155 EXPECT_EQ(folly::to<std::string>(i, " hello world"), results[i]);
161 int main(int argc, char *argv[]) {
162 testing::InitGoogleTest(&argc, argv);
163 gflags::ParseCommandLineFlags(&argc, &argv, true);
164 return RUN_ALL_TESTS();