// collectAll (variadic)
template <typename... Fs>
-typename detail::VariadicContext<
+typename detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>::type
collectAll(Fs&&... fs) {
- auto ctx = std::make_shared<detail::VariadicContext<
+ auto ctx = std::make_shared<detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>>();
- detail::collectAllVariadicHelper(ctx,
- std::forward<typename std::decay<Fs>::type>(fs)...);
+ detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
+ ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
return ctx->p.getFuture();
}
return ctx->p.getFuture();
}
+// collect (iterator)
+
namespace detail {
template <typename T>
return ctx->p.getFuture();
}
+// collect (variadic)
+
+template <typename... Fs>
+typename detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>::type
+collect(Fs&&... fs) {
+ auto ctx = std::make_shared<detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>>();
+ detail::collectVariadicHelper<detail::CollectVariadicContext>(
+ ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ return ctx->p.getFuture();
+}
+
+// collectAny (iterator)
+
template <class InputIterator>
Future<
std::pair<size_t,
return ctx->p.getFuture();
}
+// collectN (iterator)
+
template <class InputIterator>
Future<std::vector<std::pair<size_t, Try<typename
std::iterator_traits<InputIterator>::value_type::value_type>>>>
return ctx->p.getFuture();
}
+// reduce (iterator)
+
template <class It, class T, class F>
Future<T> reduce(It first, It last, T&& initial, F&& func) {
if (first == last) {
return f;
}
+// window (collection)
+
template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>>
window(Collection input, F func, size_t n) {
return futures;
}
+// reduce
+
template <class T>
template <class I, class F>
Future<I> Future<T>::reduce(I&& initial, F&& func) {
});
}
+// unorderedReduce (iterator)
+
template <class It, class T, class F, class ItT, class Arg>
Future<T> unorderedReduce(It first, It last, T initial, F func) {
if (first == last) {
return ctx->promise_.getFuture();
}
+// within
+
template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk);
return ctx->promise.getFuture().via(getExecutor());
}
+// delayed
+
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
return collectAll(*this, futures::sleep(dur, tk))
namespace detail {
template <class> class Core;
-template <class...> struct VariadicContext;
+template <class...> struct CollectAllVariadicContext;
+template <class...> struct CollectVariadicContext;
template <class> struct CollectContext;
template<typename F, typename... Args>
};
template <typename... Ts>
-struct VariadicContext {
- VariadicContext() {}
- ~VariadicContext() {
+struct CollectAllVariadicContext {
+ CollectAllVariadicContext() {}
+ template <typename T, size_t I>
+ inline void setPartialResult(Try<T>& t) {
+ std::get<I>(results) = std::move(t);
+ }
+ ~CollectAllVariadicContext() {
p.setValue(std::move(results));
}
- Promise<std::tuple<Try<Ts>... >> p;
- std::tuple<Try<Ts>... > results;
+ Promise<std::tuple<Try<Ts>...>> p;
+ std::tuple<Try<Ts>...> results;
typedef Future<std::tuple<Try<Ts>...>> type;
};
-template <typename... Ts, typename THead, typename... Fs>
-typename std::enable_if<sizeof...(Fs) == 0, void>::type
-collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx,
- THead&& head, Fs&&... tail) {
- head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
- std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
- });
+template <typename... Ts>
+struct CollectVariadicContext {
+ CollectVariadicContext() {}
+ template <typename T, size_t I>
+ inline void setPartialResult(Try<T>& t) {
+ if (t.hasException()) {
+ if (!threw.exchange(true)) {
+ p.setException(std::move(t.exception()));
+ }
+ } else if (!threw) {
+ std::get<I>(results) = std::move(t.value());
+ }
+ }
+ ~CollectVariadicContext() {
+ if (!threw.exchange(true)) {
+ p.setValue(std::move(results));
+ }
+ }
+ Promise<std::tuple<Ts...>> p;
+ std::tuple<Ts...> results;
+ std::atomic<bool> threw;
+ typedef Future<std::tuple<Ts...>> type;
+};
+
+template <template <typename ...> class T, typename... Ts>
+void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx) {
+ // base case
}
-template <typename... Ts, typename THead, typename... Fs>
-typename std::enable_if<sizeof...(Fs) != 0, void>::type
-collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx,
- THead&& head, Fs&&... tail) {
+template <template <typename ...> class T, typename... Ts,
+ typename THead, typename... TTail>
+void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx,
+ THead&& head, TTail&&... tail) {
head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
- std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
+ ctx->template setPartialResult<typename THead::value_type,
+ sizeof...(Ts) - sizeof...(TTail) - 1>(t);
});
// template tail-recursion
- collectAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
+ collectVariadicHelper(ctx, std::forward<TTail>(tail)...);
}
}} // folly::detail
/// is a Future<std::tuple<Try<T1>, Try<T2>, ...>>.
/// The Futures are moved in, so your copies are invalid.
template <typename... Fs>
-typename detail::VariadicContext<
+typename detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>::type
collectAll(Fs&&... fs);
return collect(c.begin(), c.end());
}
+/// Like collectAll, but will short circuit on the first exception. Thus, the
+/// type of the returned Future is std::tuple<T1, T2, ...> instead of
+/// std::tuple<Try<T1>, Try<T2>, ...>
+template <typename... Fs>
+typename detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>::type
+collect(Fs&&... fs);
+
/** The result is a pair of the index of the first Future to complete and
the Try. If multiple Futures complete at the same time (or are already
complete when passed in), the "winner" is chosen non-deterministically.
#include <gtest/gtest.h>
+#include <boost/thread/barrier.hpp>
+
#include <folly/futures/Future.h>
#include <folly/Random.h>
#include <folly/small_vector.h>
}
}
+TEST(Collect, parallel) {
+ std::vector<Promise<int>> ps(10);
+ std::vector<Future<int>> fs;
+ for (size_t i = 0; i < ps.size(); i++) {
+ fs.emplace_back(ps[i].getFuture());
+ }
+ auto f = collect(fs);
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ ps[i].setValue(i);
+ });
+ }
+
+ barrier.wait();
+
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts[i].join();
+ }
+
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ EXPECT_EQ(i, f.value()[i]);
+ }
+}
+
+TEST(Collect, parallelWithError) {
+ std::vector<Promise<int>> ps(10);
+ std::vector<Future<int>> fs;
+ for (size_t i = 0; i < ps.size(); i++) {
+ fs.emplace_back(ps[i].getFuture());
+ }
+ auto f = collect(fs);
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size()/2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts[i].join();
+ }
+
+ EXPECT_TRUE(f.isReady());
+ EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(Collect, allParallel) {
+ std::vector<Promise<int>> ps(10);
+ std::vector<Future<int>> fs;
+ for (size_t i = 0; i < ps.size(); i++) {
+ fs.emplace_back(ps[i].getFuture());
+ }
+ auto f = collectAll(fs);
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ ps[i].setValue(i);
+ });
+ }
+
+ barrier.wait();
+
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts[i].join();
+ }
+
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ EXPECT_TRUE(f.value()[i].hasValue());
+ EXPECT_EQ(i, f.value()[i].value());
+ }
+}
+
+TEST(Collect, allParallelWithError) {
+ std::vector<Promise<int>> ps(10);
+ std::vector<Future<int>> fs;
+ for (size_t i = 0; i < ps.size(); i++) {
+ fs.emplace_back(ps[i].getFuture());
+ }
+ auto f = collectAll(fs);
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size()/2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts[i].join();
+ }
+
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ if (i == (ps.size()/2)) {
+ EXPECT_THROW(f.value()[i].value(), eggs_t);
+ } else {
+ EXPECT_TRUE(f.value()[i].hasValue());
+ EXPECT_EQ(i, f.value()[i].value());
+ }
+ }
+}
+
TEST(Collect, collectN) {
std::vector<Promise<void>> promises(10);
std::vector<Future<void>> futures;
EXPECT_TRUE(flag);
}
+TEST(Collect, collectAllVariadicWithException) {
+ Promise<bool> pb;
+ Promise<int> pi;
+ Future<bool> fb = pb.getFuture();
+ Future<int> fi = pi.getFuture();
+ bool flag = false;
+ collectAll(std::move(fb), std::move(fi))
+ .then([&](std::tuple<Try<bool>, Try<int>> tup) {
+ flag = true;
+ EXPECT_TRUE(std::get<0>(tup).hasValue());
+ EXPECT_EQ(std::get<0>(tup).value(), true);
+ EXPECT_TRUE(std::get<1>(tup).hasException());
+ EXPECT_THROW(std::get<1>(tup).value(), eggs_t);
+ });
+ pb.setValue(true);
+ EXPECT_FALSE(flag);
+ pi.setException(eggs);
+ EXPECT_TRUE(flag);
+}
+
+TEST(Collect, collectVariadic) {
+ Promise<bool> pb;
+ Promise<int> pi;
+ Future<bool> fb = pb.getFuture();
+ Future<int> fi = pi.getFuture();
+ bool flag = false;
+ collect(std::move(fb), std::move(fi))
+ .then([&](std::tuple<bool, int> tup) {
+ flag = true;
+ EXPECT_EQ(std::get<0>(tup), true);
+ EXPECT_EQ(std::get<1>(tup), 42);
+ });
+ pb.setValue(true);
+ EXPECT_FALSE(flag);
+ pi.setValue(42);
+ EXPECT_TRUE(flag);
+}
+
+TEST(Collect, collectVariadicWithException) {
+ Promise<bool> pb;
+ Promise<int> pi;
+ Future<bool> fb = pb.getFuture();
+ Future<int> fi = pi.getFuture();
+ bool flag = false;
+ auto f = collect(std::move(fb), std::move(fi));
+ pb.setValue(true);
+ EXPECT_FALSE(f.isReady());
+ pi.setException(eggs);
+ EXPECT_TRUE(f.isReady());
+ EXPECT_TRUE(f.getTry().hasException());
+ EXPECT_THROW(f.get(), eggs_t);
+}
+
TEST(Collect, collectAllNone) {
std::vector<Future<int>> fs;
auto f = collectAll(fs);
EXPECT_EQ(f.value(), "start;static;class-static;class");
}
+TEST(Future, thenStdFunction) {
+ {
+ std::function<int()> fn = [](){ return 42; };
+ auto f = makeFuture().then(std::move(fn));
+ EXPECT_EQ(f.value(), 42);
+ }
+ {
+ std::function<int(int)> fn = [](int i){ return i + 23; };
+ auto f = makeFuture(19).then(std::move(fn));
+ EXPECT_EQ(f.value(), 42);
+ }
+ {
+ std::function<int(Try<int>&)> fn = [](Try<int>& t){ return t.value() + 2; };
+ auto f = makeFuture(1).then(std::move(fn));
+ EXPECT_EQ(f.value(), 3);
+ }
+ {
+ bool flag = false;
+ std::function<void()> fn = [&flag](){ flag = true; };
+ auto f = makeFuture().then(std::move(fn));
+ EXPECT_TRUE(f.isReady());
+ EXPECT_TRUE(flag);
+ }
+}
+
TEST(Future, thenBind) {
auto l = []() {
return makeFuture("bind");
#include <gtest/gtest.h>
+#include <boost/thread/barrier.hpp>
+
#include <folly/futures/Future.h>
#include <vector>
using namespace folly;
+typedef FutureException eggs_t;
+static eggs_t eggs("eggs");
+
TEST(Window, basic) {
// int -> Future<int>
auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
EXPECT_EQ(6, res);
}
}
+
+TEST(Window, parallel) {
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collect(window(input, [&](int i) {
+ return ps[i].getFuture();
+ }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ ps[i].setValue(i);
+ });
+ }
+
+ barrier.wait();
+
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts[i].join();
+ }
+
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ EXPECT_EQ(i, f.value()[i]);
+ }
+}
+
+TEST(Window, parallelWithError) {
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collect(window(input, [&](int i) {
+ return ps[i].getFuture();
+ }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size()/2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts[i].join();
+ }
+
+ EXPECT_TRUE(f.isReady());
+ EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(Window, allParallelWithError) {
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collectAll(window(input, [&](int i) {
+ return ps[i].getFuture();
+ }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size()/2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts[i].join();
+ }
+
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ if (i == (ps.size()/2)) {
+ EXPECT_THROW(f.value()[i].value(), eggs_t);
+ } else {
+ EXPECT_TRUE(f.value()[i].hasValue());
+ EXPECT_EQ(i, f.value()[i].value());
+ }
+ }
+}