});
}
+template <class It, class T, class F, class ItT, class Arg>
+Future<T> unorderedReduce(It first, It last, T initial, F func) {
+ if (first == last) {
+ return makeFuture(std::move(initial));
+ }
+
+ typedef isTry<Arg> IsTry;
+
+ struct UnorderedReduceContext {
+ UnorderedReduceContext(T&& memo, F&& fn, size_t n)
+ : lock_(), memo_(makeFuture<T>(std::move(memo))),
+ func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
+ {};
+ folly::MicroSpinLock lock_; // protects memo_ and numThens_
+ Future<T> memo_;
+ F func_;
+ size_t numThens_; // how many Futures completed and called .then()
+ size_t numFutures_; // how many Futures in total
+ Promise<T> promise_;
+ };
+
+ auto ctx = std::make_shared<UnorderedReduceContext>(
+ std::move(initial), std::move(func), std::distance(first, last));
+
+ mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
+ folly::MoveWrapper<Try<ItT>> mt(std::move(t));
+ // Futures can be completed in any order, simultaneously.
+ // To make this non-blocking, we create a new Future chain in
+ // the order of completion to reduce the values.
+ // The spinlock just protects chaining a new Future, not actually
+ // executing the reduce, which should be really fast.
+ folly::MSLGuard lock(ctx->lock_);
+ ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
+ });
+ if (++ctx->numThens_ == ctx->numFutures_) {
+ // After reducing the value of the last Future, fulfill the Promise
+ ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
+ ctx->promise_.setValue(std::move(t2));
+ });
+ }
+ });
+
+ return ctx->promise_.getFuture();
+}
+
template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk);
The type of the final result is a Future of the type of the initial value.
Func can either return a T, or a Future<T>
+
+ func is called in order of the input, see unorderedReduce if that is not
+ a requirement
*/
template <class It, class T, class F>
Future<T> reduce(It first, It last, T&& initial, F&& func);
std::forward<F>(func));
}
+/** like reduce, but calls func on finished futures as they complete
+ does NOT keep the order of the input
+ */
+template <class It, class T, class F,
+ class ItT = typename std::iterator_traits<It>::value_type::value_type,
+ class Arg = MaybeTryArg<F, T, ItT>>
+Future<T> unorderedReduce(It first, It last, T initial, F func);
+
+/// Sugar for the most common case
+template <class Collection, class T, class F>
+auto unorderedReduce(Collection&& c, T&& initial, F&& func)
+ -> decltype(unorderedReduce(c.begin(), c.end(), std::forward<T>(initial),
+ std::forward<F>(func))) {
+ return unorderedReduce(
+ c.begin(),
+ c.end(),
+ std::forward<T>(initial),
+ std::forward<F>(func));
+}
+
} // namespace folly
}
}
+TEST(Reduce, Streaming) {
+ {
+ std::vector<Future<int>> fs;
+ fs.push_back(makeFuture(1));
+ fs.push_back(makeFuture(2));
+ fs.push_back(makeFuture(3));
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return double(b);
+ });
+ EXPECT_EQ(3.0, f.get());
+ }
+ {
+ Promise<int> p1;
+ Promise<int> p2;
+ Promise<int> p3;
+
+ std::vector<Future<int>> fs;
+ fs.push_back(p1.getFuture());
+ fs.push_back(p2.getFuture());
+ fs.push_back(p3.getFuture());
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return double(b);
+ });
+ p3.setValue(3);
+ p2.setValue(2);
+ p1.setValue(1);
+ EXPECT_EQ(1.0, f.get());
+ }
+}
+
+TEST(Reduce, StreamingException) {
+ Promise<int> p1;
+ Promise<int> p2;
+ Promise<int> p3;
+
+ std::vector<Future<int>> fs;
+ fs.push_back(p1.getFuture());
+ fs.push_back(p2.getFuture());
+ fs.push_back(p3.getFuture());
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return b + 0.0;
+ });
+ p3.setValue(3);
+ p2.setException(exception_wrapper(std::runtime_error("blah")));
+ p1.setValue(1);
+ EXPECT_THROW(f.get(), std::runtime_error);
+}
+
TEST(Map, Basic) {
Promise<int> p1;
Promise<int> p2;