From e872b4400ebf7d68f9b0fca1b3b851556d22513a Mon Sep 17 00:00:00 2001 From: Hannes Roth Date: Thu, 14 May 2015 15:36:27 -0700 Subject: [PATCH] (Wangle) unorderedReduce Summary: Use this if you don't need the order of the input, e.g. summing up values. This constructs a separate Future chain to do the reducing, because we don't want to add locking while reducing. The only lock necessary is when adding a new Future to the chain, which should be really quick. Test Plan: Run all the tests. Reviewed By: hans@fb.com Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2015326 Tasks: 6025252 Signature: t1:2015326:1431557191:9ea2edccb0162dedf067b5b3300de2fe72a1a4c9 --- folly/futures/Future-inl.h | 48 +++++++++++++++++++++++++++ folly/futures/helpers.h | 23 +++++++++++++ folly/futures/test/FutureTest.cpp | 54 +++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+) diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index aecfa578..98072524 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -800,6 +800,54 @@ Future Future::reduce(I&& initial, F&& func) { }); } +template +Future unorderedReduce(It first, It last, T initial, F func) { + if (first == last) { + return makeFuture(std::move(initial)); + } + + typedef isTry IsTry; + + struct UnorderedReduceContext { + UnorderedReduceContext(T&& memo, F&& fn, size_t n) + : lock_(), memo_(makeFuture(std::move(memo))), + func_(std::move(fn)), numThens_(0), numFutures_(n), promise_() + {}; + folly::MicroSpinLock lock_; // protects memo_ and numThens_ + Future memo_; + F func_; + size_t numThens_; // how many Futures completed and called .then() + size_t numFutures_; // how many Futures in total + Promise promise_; + }; + + auto ctx = std::make_shared( + std::move(initial), std::move(func), std::distance(first, last)); + + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + folly::MoveWrapper> mt(std::move(t)); + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), mt->template get()); + }); + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_([ctx](Try&& t2) { + ctx->promise_.setValue(std::move(t2)); + }); + } + }); + + return ctx->promise_.getFuture(); +} + template Future Future::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index 775989e8..010a5a78 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -239,6 +239,9 @@ using isFutureResult = isFuture::type>; The type of the final result is a Future of the type of the initial value. Func can either return a T, or a Future + + func is called in order of the input, see unorderedReduce if that is not + a requirement */ template Future reduce(It first, It last, T&& initial, F&& func); @@ -255,4 +258,24 @@ auto reduce(Collection&& c, T&& initial, F&& func) std::forward(func)); } +/** like reduce, but calls func on finished futures as they complete + does NOT keep the order of the input + */ +template ::value_type::value_type, + class Arg = MaybeTryArg> +Future unorderedReduce(It first, It last, T initial, F func); + +/// Sugar for the most common case +template +auto unorderedReduce(Collection&& c, T&& initial, F&& func) + -> decltype(unorderedReduce(c.begin(), c.end(), std::forward(initial), + std::forward(func))) { + return unorderedReduce( + c.begin(), + c.end(), + std::forward(initial), + std::forward(func)); +} + } // namespace folly diff --git a/folly/futures/test/FutureTest.cpp b/folly/futures/test/FutureTest.cpp index 96b5cd5f..bff61b0c 100644 --- a/folly/futures/test/FutureTest.cpp +++ b/folly/futures/test/FutureTest.cpp @@ -1807,6 +1807,60 @@ TEST(Reduce, Chain) { } } +TEST(Reduce, Streaming) { + { + std::vector> fs; + fs.push_back(makeFuture(1)); + fs.push_back(makeFuture(2)); + fs.push_back(makeFuture(3)); + + Future f = unorderedReduce(fs.begin(), fs.end(), 0.0, + [](double a, int&& b){ + return double(b); + }); + EXPECT_EQ(3.0, f.get()); + } + { + Promise p1; + Promise p2; + Promise p3; + + std::vector> fs; + fs.push_back(p1.getFuture()); + fs.push_back(p2.getFuture()); + fs.push_back(p3.getFuture()); + + Future f = unorderedReduce(fs.begin(), fs.end(), 0.0, + [](double a, int&& b){ + return double(b); + }); + p3.setValue(3); + p2.setValue(2); + p1.setValue(1); + EXPECT_EQ(1.0, f.get()); + } +} + +TEST(Reduce, StreamingException) { + Promise p1; + Promise p2; + Promise p3; + + std::vector> fs; + fs.push_back(p1.getFuture()); + fs.push_back(p2.getFuture()); + fs.push_back(p3.getFuture()); + + Future f = unorderedReduce(fs.begin(), fs.end(), 0.0, + [](double a, int&& b){ + return b + 0.0; + }); + p3.setValue(3); + p2.setException(exception_wrapper(std::runtime_error("blah"))); + p1.setValue(1); + EXPECT_THROW(f.get(), std::runtime_error); +} + TEST(Map, Basic) { Promise p1; Promise p2; -- 2.34.1