2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <boost/thread/barrier.hpp>
19 #include <folly/Conv.h>
20 #include <folly/futures/Future.h>
21 #include <folly/futures/ManualExecutor.h>
22 #include <folly/portability/GTest.h>
26 using namespace folly;
28 typedef FutureException eggs_t;
29 static eggs_t eggs("eggs");
33 auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
37 [](int i) { return makeFuture(i); },
40 [](int sum, const Try<int>& b) {
43 EXPECT_EQ(expect, res);
46 SCOPED_TRACE("2 in-flight at a time");
47 std::vector<int> input = {1, 2, 3};
51 SCOPED_TRACE("4 in-flight at a time");
52 std::vector<int> input = {1, 2, 3};
56 SCOPED_TRACE("empty input");
57 std::vector<int> input;
61 // int -> Future<Unit>
62 auto res = reduce(window(std::vector<int>({1, 2, 3}),
63 [](int /* i */) { return makeFuture(); },
66 [](int sum, const Try<Unit>& b) {
67 EXPECT_TRUE(b.hasValue());
73 // string -> return Future<int>
76 std::vector<std::string>{"1", "2", "3"},
77 [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
80 [](int sum, const Try<int>& b) {
87 TEST(Window, parallel) {
88 std::vector<int> input;
89 std::vector<Promise<int>> ps(10);
90 for (size_t i = 0; i < ps.size(); i++) {
91 input.emplace_back(i);
93 auto f = collect(window(input, [&](int i) {
94 return ps[i].getFuture();
97 std::vector<std::thread> ts;
98 boost::barrier barrier(ps.size() + 1);
99 for (size_t i = 0; i < ps.size(); i++) {
100 ts.emplace_back([&ps, &barrier, i]() {
112 EXPECT_TRUE(f.isReady());
113 for (size_t i = 0; i < ps.size(); i++) {
114 EXPECT_EQ(i, f.value()[i]);
118 TEST(Window, parallelWithError) {
119 std::vector<int> input;
120 std::vector<Promise<int>> ps(10);
121 for (size_t i = 0; i < ps.size(); i++) {
122 input.emplace_back(i);
124 auto f = collect(window(input, [&](int i) {
125 return ps[i].getFuture();
128 std::vector<std::thread> ts;
129 boost::barrier barrier(ps.size() + 1);
130 for (size_t i = 0; i < ps.size(); i++) {
131 ts.emplace_back([&ps, &barrier, i]() {
133 if (i == (ps.size()/2)) {
134 ps[i].setException(eggs);
147 EXPECT_TRUE(f.isReady());
148 EXPECT_THROW(f.value(), eggs_t);
151 TEST(Window, allParallelWithError) {
152 std::vector<int> input;
153 std::vector<Promise<int>> ps(10);
154 for (size_t i = 0; i < ps.size(); i++) {
155 input.emplace_back(i);
157 auto f = collectAll(window(input, [&](int i) {
158 return ps[i].getFuture();
161 std::vector<std::thread> ts;
162 boost::barrier barrier(ps.size() + 1);
163 for (size_t i = 0; i < ps.size(); i++) {
164 ts.emplace_back([&ps, &barrier, i]() {
166 if (i == (ps.size()/2)) {
167 ps[i].setException(eggs);
180 EXPECT_TRUE(f.isReady());
181 for (size_t i = 0; i < ps.size(); i++) {
182 if (i == (ps.size()/2)) {
183 EXPECT_THROW(f.value()[i].value(), eggs_t);
185 EXPECT_TRUE(f.value()[i].hasValue());
186 EXPECT_EQ(i, f.value()[i].value());
191 TEST(WindowExecutor, basic) {
192 ManualExecutor executor;
194 // int -> Future<int>
195 auto fn = [executor_ = &executor](
196 std::vector<int> input, size_t window_size, size_t expect) {
199 executor_, input, [](int i) { return makeFuture(i); }, window_size),
201 [](int sum, const Try<int>& b) { return sum + *b; });
202 executor_->waitFor(res);
203 EXPECT_EQ(expect, res.get());
206 SCOPED_TRACE("2 in-flight at a time");
207 std::vector<int> input = {1, 2, 3};
211 SCOPED_TRACE("4 in-flight at a time");
212 std::vector<int> input = {1, 2, 3};
216 SCOPED_TRACE("empty input");
217 std::vector<int> input;
221 // int -> Future<Unit>
225 std::vector<int>({1, 2, 3}),
226 [](int /* i */) { return makeFuture(); },
229 [](int sum, const Try<Unit>& b) {
230 EXPECT_TRUE(b.hasValue());
233 executor.waitFor(res);
234 EXPECT_EQ(3, res.get());
237 // string -> return Future<int>
241 std::vector<std::string>{"1", "2", "3"},
242 [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
245 [](int sum, const Try<int>& b) { return sum + *b; });
246 executor.waitFor(res);
247 EXPECT_EQ(6, res.get());
251 TEST(WindowExecutor, parallel) {
252 ManualExecutor executor;
254 std::vector<int> input;
255 std::vector<Promise<int>> ps(10);
256 for (size_t i = 0; i < ps.size(); i++) {
257 input.emplace_back(i);
260 window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
262 std::vector<std::thread> ts;
263 boost::barrier barrier(ps.size() + 1);
264 for (size_t i = 0; i < ps.size(); i++) {
265 ts.emplace_back([&ps, &barrier, i]() {
278 EXPECT_TRUE(f.isReady());
279 for (size_t i = 0; i < ps.size(); i++) {
280 EXPECT_EQ(i, f.value()[i]);
284 TEST(WindowExecutor, parallelWithError) {
285 ManualExecutor executor;
287 std::vector<int> input;
288 std::vector<Promise<int>> ps(10);
289 for (size_t i = 0; i < ps.size(); i++) {
290 input.emplace_back(i);
293 window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
295 std::vector<std::thread> ts;
296 boost::barrier barrier(ps.size() + 1);
297 for (size_t i = 0; i < ps.size(); i++) {
298 ts.emplace_back([&ps, &barrier, i]() {
300 if (i == (ps.size() / 2)) {
301 ps[i].setException(eggs);
315 EXPECT_TRUE(f.isReady());
316 EXPECT_THROW(f.value(), eggs_t);
319 TEST(WindowExecutor, allParallelWithError) {
320 ManualExecutor executor;
322 std::vector<int> input;
323 std::vector<Promise<int>> ps(10);
324 for (size_t i = 0; i < ps.size(); i++) {
325 input.emplace_back(i);
328 window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
330 std::vector<std::thread> ts;
331 boost::barrier barrier(ps.size() + 1);
332 for (size_t i = 0; i < ps.size(); i++) {
333 ts.emplace_back([&ps, &barrier, i]() {
335 if (i == (ps.size() / 2)) {
336 ps[i].setException(eggs);
350 EXPECT_TRUE(f.isReady());
351 for (size_t i = 0; i < ps.size(); i++) {
352 if (i == (ps.size() / 2)) {
353 EXPECT_THROW(f.value()[i].value(), eggs_t);
355 EXPECT_TRUE(f.value()[i].hasValue());
356 EXPECT_EQ(i, f.value()[i].value());