2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <gtest/gtest.h>
19 #include <boost/thread/barrier.hpp>
21 #include <folly/futures/Future.h>
25 using namespace folly;
27 typedef FutureException eggs_t;
28 static eggs_t eggs("eggs");
32 auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
36 [](int i) { return makeFuture(i); },
39 [](int sum, const Try<int>& b) {
42 EXPECT_EQ(expect, res);
45 // 2 in-flight at a time
46 std::vector<int> input = {1, 2, 3};
50 // 4 in-flight at a time
51 std::vector<int> input = {1, 2, 3};
56 std::vector<int> input;
60 // int -> Future<void>
63 std::vector<int>({1, 2, 3}),
64 [](int i) { return makeFuture(); },
67 [](int sum, const Try<void>& b) {
68 EXPECT_TRUE(b.hasValue());
74 // string -> return Future<int>
77 std::vector<std::string>{"1", "2", "3"},
78 [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
81 [](int sum, const Try<int>& b) {
88 TEST(Window, parallel) {
89 std::vector<int> input;
90 std::vector<Promise<int>> ps(10);
91 for (size_t i = 0; i < ps.size(); i++) {
92 input.emplace_back(i);
94 auto f = collect(window(input, [&](int i) {
95 return ps[i].getFuture();
98 std::vector<std::thread> ts;
99 boost::barrier barrier(ps.size() + 1);
100 for (size_t i = 0; i < ps.size(); i++) {
101 ts.emplace_back([&ps, &barrier, i]() {
109 for (size_t i = 0; i < ps.size(); i++) {
113 EXPECT_TRUE(f.isReady());
114 for (size_t i = 0; i < ps.size(); i++) {
115 EXPECT_EQ(i, f.value()[i]);
119 TEST(Window, parallelWithError) {
120 std::vector<int> input;
121 std::vector<Promise<int>> ps(10);
122 for (size_t i = 0; i < ps.size(); i++) {
123 input.emplace_back(i);
125 auto f = collect(window(input, [&](int i) {
126 return ps[i].getFuture();
129 std::vector<std::thread> ts;
130 boost::barrier barrier(ps.size() + 1);
131 for (size_t i = 0; i < ps.size(); i++) {
132 ts.emplace_back([&ps, &barrier, i]() {
134 if (i == (ps.size()/2)) {
135 ps[i].setException(eggs);
144 for (size_t i = 0; i < ps.size(); i++) {
148 EXPECT_TRUE(f.isReady());
149 EXPECT_THROW(f.value(), eggs_t);
152 TEST(Window, allParallelWithError) {
153 std::vector<int> input;
154 std::vector<Promise<int>> ps(10);
155 for (size_t i = 0; i < ps.size(); i++) {
156 input.emplace_back(i);
158 auto f = collectAll(window(input, [&](int i) {
159 return ps[i].getFuture();
162 std::vector<std::thread> ts;
163 boost::barrier barrier(ps.size() + 1);
164 for (size_t i = 0; i < ps.size(); i++) {
165 ts.emplace_back([&ps, &barrier, i]() {
167 if (i == (ps.size()/2)) {
168 ps[i].setException(eggs);
177 for (size_t i = 0; i < ps.size(); i++) {
181 EXPECT_TRUE(f.isReady());
182 for (size_t i = 0; i < ps.size(); i++) {
183 if (i == (ps.size()/2)) {
184 EXPECT_THROW(f.value()[i].value(), eggs_t);
186 EXPECT_TRUE(f.value()[i].hasValue());
187 EXPECT_EQ(i, f.value()[i].value());