From: Hannes Roth <hannesr@fb.com> Date: Mon, 8 Jun 2015 20:07:01 +0000 (-0700) Subject: (Wangle) variadic collect X-Git-Tag: v0.45.0~6 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=db57efd05b6605784e1a8ed88bb20609352fe927;p=folly.git (Wangle) variadic collect Summary: For D2099047 (matthieu) and also for symmetry. Can re-use most of the code, also refactored it a bit (using an empty base case). Test Plan: Run all the tests. Will add some more before committing. Reviewed By: jsedgwick@fb.com Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant, matthieu FB internal diff: D2131515 Signature: t1:2131515:1433776852:544166fbfdfabf6760fd78f87821290e17e6e4a3 --- diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index a63b2a80..4b2c7477 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -545,13 +545,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) { // collectAll (variadic) template <typename... Fs> -typename detail::VariadicContext< +typename detail::CollectAllVariadicContext< typename std::decay<Fs>::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = std::make_shared<detail::VariadicContext< + auto ctx = std::make_shared<detail::CollectAllVariadicContext< typename std::decay<Fs>::type::value_type...>>(); - detail::collectAllVariadicHelper(ctx, - std::forward<typename std::decay<Fs>::type>(fs)...); + detail::collectVariadicHelper<detail::CollectAllVariadicContext>( + ctx, std::forward<typename std::decay<Fs>::type>(fs)...); return ctx->p.getFuture(); } @@ -581,6 +581,8 @@ collectAll(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collect (iterator) + namespace detail { template <typename T> @@ -648,6 +650,21 @@ collect(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collect (variadic) + +template <typename... Fs> +typename detail::CollectVariadicContext< + typename std::decay<Fs>::type::value_type...>::type +collect(Fs&&... fs) { + auto ctx = std::make_shared<detail::CollectVariadicContext< + typename std::decay<Fs>::type::value_type...>>(); + detail::collectVariadicHelper<detail::CollectVariadicContext>( + ctx, std::forward<typename std::decay<Fs>::type>(fs)...); + return ctx->p.getFuture(); +} + +// collectAny (iterator) + template <class InputIterator> Future< std::pair<size_t, @@ -673,6 +690,8 @@ collectAny(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collectN (iterator) + template <class InputIterator> Future<std::vector<std::pair<size_t, Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>> @@ -709,6 +728,8 @@ collectN(InputIterator first, InputIterator last, size_t n) { return ctx->p.getFuture(); } +// reduce (iterator) + template <class It, class T, class F> Future<T> reduce(It first, It last, T&& initial, F&& func) { if (first == last) { @@ -740,6 +761,8 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) { return f; } +// window (collection) + template <class Collection, class F, class ItT, class Result> std::vector<Future<Result>> window(Collection input, F func, size_t n) { @@ -787,6 +810,8 @@ window(Collection input, F func, size_t n) { return futures; } +// reduce + template <class T> template <class I, class F> Future<I> Future<T>::reduce(I&& initial, F&& func) { @@ -801,6 +826,8 @@ Future<I> Future<T>::reduce(I&& initial, F&& func) { }); } +// unorderedReduce (iterator) + template <class It, class T, class F, class ItT, class Arg> Future<T> unorderedReduce(It first, It last, T initial, F func) { if (first == last) { @@ -849,6 +876,8 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) { return ctx->promise_.getFuture(); } +// within + template <class T> Future<T> Future<T>::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); @@ -890,6 +919,8 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) { return ctx->promise.getFuture().via(getExecutor()); } +// delayed + template <class T> Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) diff --git a/folly/futures/Future-pre.h b/folly/futures/Future-pre.h index b7ee7b5c..f76c24ab 100644 --- a/folly/futures/Future-pre.h +++ b/folly/futures/Future-pre.h @@ -41,7 +41,8 @@ struct isTry<Try<T>> : std::true_type {}; namespace detail { template <class> class Core; -template <class...> struct VariadicContext; +template <class...> struct CollectAllVariadicContext; +template <class...> struct CollectVariadicContext; template <class> struct CollectContext; template<typename F, typename... Args> diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 34d0a725..de0173e7 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -341,34 +341,59 @@ class Core { }; template <typename... Ts> -struct VariadicContext { - VariadicContext() {} - ~VariadicContext() { +struct CollectAllVariadicContext { + CollectAllVariadicContext() {} + template <typename T, size_t I> + inline void setPartialResult(Try<T>& t) { + std::get<I>(results) = std::move(t); + } + ~CollectAllVariadicContext() { p.setValue(std::move(results)); } - Promise<std::tuple<Try<Ts>... >> p; - std::tuple<Try<Ts>... > results; + Promise<std::tuple<Try<Ts>...>> p; + std::tuple<Try<Ts>...> results; typedef Future<std::tuple<Try<Ts>...>> type; }; -template <typename... Ts, typename THead, typename... Fs> -typename std::enable_if<sizeof...(Fs) == 0, void>::type -collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx, - THead&& head, Fs&&... tail) { - head.setCallback_([ctx](Try<typename THead::value_type>&& t) { - std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t); - }); +template <typename... Ts> +struct CollectVariadicContext { + CollectVariadicContext() {} + template <typename T, size_t I> + inline void setPartialResult(Try<T>& t) { + if (t.hasException()) { + if (!threw.exchange(true)) { + p.setException(std::move(t.exception())); + } + } else if (!threw) { + std::get<I>(results) = std::move(t.value()); + } + } + ~CollectVariadicContext() { + if (!threw.exchange(true)) { + p.setValue(std::move(results)); + } + } + Promise<std::tuple<Ts...>> p; + std::tuple<Ts...> results; + std::atomic<bool> threw; + typedef Future<std::tuple<Ts...>> type; +}; + +template <template <typename ...> class T, typename... Ts> +void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx) { + // base case } -template <typename... Ts, typename THead, typename... Fs> -typename std::enable_if<sizeof...(Fs) != 0, void>::type -collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx, - THead&& head, Fs&&... tail) { +template <template <typename ...> class T, typename... Ts, + typename THead, typename... TTail> +void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx, + THead&& head, TTail&&... tail) { head.setCallback_([ctx](Try<typename THead::value_type>&& t) { - std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t); + ctx->template setPartialResult<typename THead::value_type, + sizeof...(Ts) - sizeof...(TTail) - 1>(t); }); // template tail-recursion - collectAllVariadicHelper(ctx, std::forward<Fs>(tail)...); + collectVariadicHelper(ctx, std::forward<TTail>(tail)...); } }} // folly::detail diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index 010a5a78..5ae28f1f 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -155,7 +155,7 @@ auto collectAll(Collection&& c) -> decltype(collectAll(c.begin(), c.end())) { /// is a Future<std::tuple<Try<T1>, Try<T2>, ...>>. /// The Futures are moved in, so your copies are invalid. template <typename... Fs> -typename detail::VariadicContext< +typename detail::CollectAllVariadicContext< typename std::decay<Fs>::type::value_type...>::type collectAll(Fs&&... fs); @@ -174,6 +174,14 @@ auto collect(Collection&& c) -> decltype(collect(c.begin(), c.end())) { return collect(c.begin(), c.end()); } +/// Like collectAll, but will short circuit on the first exception. Thus, the +/// type of the returned Future is std::tuple<T1, T2, ...> instead of +/// std::tuple<Try<T1>, Try<T2>, ...> +template <typename... Fs> +typename detail::CollectVariadicContext< + typename std::decay<Fs>::type::value_type...>::type +collect(Fs&&... fs); + /** The result is a pair of the index of the first Future to complete and the Try. If multiple Futures complete at the same time (or are already complete when passed in), the "winner" is chosen non-deterministically. diff --git a/folly/futures/test/CollectTest.cpp b/folly/futures/test/CollectTest.cpp index ea4b3bf8..f9746d18 100644 --- a/folly/futures/test/CollectTest.cpp +++ b/folly/futures/test/CollectTest.cpp @@ -16,6 +16,8 @@ #include <gtest/gtest.h> +#include <boost/thread/barrier.hpp> + #include <folly/futures/Future.h> #include <folly/Random.h> #include <folly/small_vector.h> @@ -353,6 +355,134 @@ TEST(Collect, alreadyCompleted) { } } +TEST(Collect, parallel) { + std::vector<Promise<int>> ps(10); + std::vector<Future<int>> fs; + for (size_t i = 0; i < ps.size(); i++) { + fs.emplace_back(ps[i].getFuture()); + } + auto f = collect(fs); + + std::vector<std::thread> ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + ps[i].setValue(i); + }); + } + + barrier.wait(); + + for (size_t i = 0; i < ps.size(); i++) { + ts[i].join(); + } + + EXPECT_TRUE(f.isReady()); + for (size_t i = 0; i < ps.size(); i++) { + EXPECT_EQ(i, f.value()[i]); + } +} + +TEST(Collect, parallelWithError) { + std::vector<Promise<int>> ps(10); + std::vector<Future<int>> fs; + for (size_t i = 0; i < ps.size(); i++) { + fs.emplace_back(ps[i].getFuture()); + } + auto f = collect(fs); + + std::vector<std::thread> ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + if (i == (ps.size()/2)) { + ps[i].setException(eggs); + } else { + ps[i].setValue(i); + } + }); + } + + barrier.wait(); + + for (size_t i = 0; i < ps.size(); i++) { + ts[i].join(); + } + + EXPECT_TRUE(f.isReady()); + EXPECT_THROW(f.value(), eggs_t); +} + +TEST(Collect, allParallel) { + std::vector<Promise<int>> ps(10); + std::vector<Future<int>> fs; + for (size_t i = 0; i < ps.size(); i++) { + fs.emplace_back(ps[i].getFuture()); + } + auto f = collectAll(fs); + + std::vector<std::thread> ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + ps[i].setValue(i); + }); + } + + barrier.wait(); + + for (size_t i = 0; i < ps.size(); i++) { + ts[i].join(); + } + + EXPECT_TRUE(f.isReady()); + for (size_t i = 0; i < ps.size(); i++) { + EXPECT_TRUE(f.value()[i].hasValue()); + EXPECT_EQ(i, f.value()[i].value()); + } +} + +TEST(Collect, allParallelWithError) { + std::vector<Promise<int>> ps(10); + std::vector<Future<int>> fs; + for (size_t i = 0; i < ps.size(); i++) { + fs.emplace_back(ps[i].getFuture()); + } + auto f = collectAll(fs); + + std::vector<std::thread> ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + if (i == (ps.size()/2)) { + ps[i].setException(eggs); + } else { + ps[i].setValue(i); + } + }); + } + + barrier.wait(); + + for (size_t i = 0; i < ps.size(); i++) { + ts[i].join(); + } + + EXPECT_TRUE(f.isReady()); + for (size_t i = 0; i < ps.size(); i++) { + if (i == (ps.size()/2)) { + EXPECT_THROW(f.value()[i].value(), eggs_t); + } else { + EXPECT_TRUE(f.value()[i].hasValue()); + EXPECT_EQ(i, f.value()[i].value()); + } + } +} + TEST(Collect, collectN) { std::vector<Promise<void>> promises(10); std::vector<Future<void>> futures; @@ -443,6 +573,59 @@ TEST(Collect, collectAllVariadicReferences) { EXPECT_TRUE(flag); } +TEST(Collect, collectAllVariadicWithException) { + Promise<bool> pb; + Promise<int> pi; + Future<bool> fb = pb.getFuture(); + Future<int> fi = pi.getFuture(); + bool flag = false; + collectAll(std::move(fb), std::move(fi)) + .then([&](std::tuple<Try<bool>, Try<int>> tup) { + flag = true; + EXPECT_TRUE(std::get<0>(tup).hasValue()); + EXPECT_EQ(std::get<0>(tup).value(), true); + EXPECT_TRUE(std::get<1>(tup).hasException()); + EXPECT_THROW(std::get<1>(tup).value(), eggs_t); + }); + pb.setValue(true); + EXPECT_FALSE(flag); + pi.setException(eggs); + EXPECT_TRUE(flag); +} + +TEST(Collect, collectVariadic) { + Promise<bool> pb; + Promise<int> pi; + Future<bool> fb = pb.getFuture(); + Future<int> fi = pi.getFuture(); + bool flag = false; + collect(std::move(fb), std::move(fi)) + .then([&](std::tuple<bool, int> tup) { + flag = true; + EXPECT_EQ(std::get<0>(tup), true); + EXPECT_EQ(std::get<1>(tup), 42); + }); + pb.setValue(true); + EXPECT_FALSE(flag); + pi.setValue(42); + EXPECT_TRUE(flag); +} + +TEST(Collect, collectVariadicWithException) { + Promise<bool> pb; + Promise<int> pi; + Future<bool> fb = pb.getFuture(); + Future<int> fi = pi.getFuture(); + bool flag = false; + auto f = collect(std::move(fb), std::move(fi)); + pb.setValue(true); + EXPECT_FALSE(f.isReady()); + pi.setException(eggs); + EXPECT_TRUE(f.isReady()); + EXPECT_TRUE(f.getTry().hasException()); + EXPECT_THROW(f.get(), eggs_t); +} + TEST(Collect, collectAllNone) { std::vector<Future<int>> fs; auto f = collectAll(fs); diff --git a/folly/futures/test/FutureTest.cpp b/folly/futures/test/FutureTest.cpp index 5d171bb9..ec5cb5a1 100644 --- a/folly/futures/test/FutureTest.cpp +++ b/folly/futures/test/FutureTest.cpp @@ -413,6 +413,31 @@ TEST(Future, thenFunctionFuture) { EXPECT_EQ(f.value(), "start;static;class-static;class"); } +TEST(Future, thenStdFunction) { + { + std::function<int()> fn = [](){ return 42; }; + auto f = makeFuture().then(std::move(fn)); + EXPECT_EQ(f.value(), 42); + } + { + std::function<int(int)> fn = [](int i){ return i + 23; }; + auto f = makeFuture(19).then(std::move(fn)); + EXPECT_EQ(f.value(), 42); + } + { + std::function<int(Try<int>&)> fn = [](Try<int>& t){ return t.value() + 2; }; + auto f = makeFuture(1).then(std::move(fn)); + EXPECT_EQ(f.value(), 3); + } + { + bool flag = false; + std::function<void()> fn = [&flag](){ flag = true; }; + auto f = makeFuture().then(std::move(fn)); + EXPECT_TRUE(f.isReady()); + EXPECT_TRUE(flag); + } +} + TEST(Future, thenBind) { auto l = []() { return makeFuture("bind"); diff --git a/folly/futures/test/WindowTest.cpp b/folly/futures/test/WindowTest.cpp index cc480fbe..9dbdb156 100644 --- a/folly/futures/test/WindowTest.cpp +++ b/folly/futures/test/WindowTest.cpp @@ -16,12 +16,17 @@ #include <gtest/gtest.h> +#include <boost/thread/barrier.hpp> + #include <folly/futures/Future.h> #include <vector> using namespace folly; +typedef FutureException eggs_t; +static eggs_t eggs("eggs"); + TEST(Window, basic) { // int -> Future<int> auto fn = [](std::vector<int> input, size_t window_size, size_t expect) { @@ -79,3 +84,107 @@ TEST(Window, basic) { EXPECT_EQ(6, res); } } + +TEST(Window, parallel) { + std::vector<int> input; + std::vector<Promise<int>> ps(10); + for (size_t i = 0; i < ps.size(); i++) { + input.emplace_back(i); + } + auto f = collect(window(input, [&](int i) { + return ps[i].getFuture(); + }, 3)); + + std::vector<std::thread> ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + ps[i].setValue(i); + }); + } + + barrier.wait(); + + for (size_t i = 0; i < ps.size(); i++) { + ts[i].join(); + } + + EXPECT_TRUE(f.isReady()); + for (size_t i = 0; i < ps.size(); i++) { + EXPECT_EQ(i, f.value()[i]); + } +} + +TEST(Window, parallelWithError) { + std::vector<int> input; + std::vector<Promise<int>> ps(10); + for (size_t i = 0; i < ps.size(); i++) { + input.emplace_back(i); + } + auto f = collect(window(input, [&](int i) { + return ps[i].getFuture(); + }, 3)); + + std::vector<std::thread> ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + if (i == (ps.size()/2)) { + ps[i].setException(eggs); + } else { + ps[i].setValue(i); + } + }); + } + + barrier.wait(); + + for (size_t i = 0; i < ps.size(); i++) { + ts[i].join(); + } + + EXPECT_TRUE(f.isReady()); + EXPECT_THROW(f.value(), eggs_t); +} + +TEST(Window, allParallelWithError) { + std::vector<int> input; + std::vector<Promise<int>> ps(10); + for (size_t i = 0; i < ps.size(); i++) { + input.emplace_back(i); + } + auto f = collectAll(window(input, [&](int i) { + return ps[i].getFuture(); + }, 3)); + + std::vector<std::thread> ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + if (i == (ps.size()/2)) { + ps[i].setException(eggs); + } else { + ps[i].setValue(i); + } + }); + } + + barrier.wait(); + + for (size_t i = 0; i < ps.size(); i++) { + ts[i].join(); + } + + EXPECT_TRUE(f.isReady()); + for (size_t i = 0; i < ps.size(); i++) { + if (i == (ps.size()/2)) { + EXPECT_THROW(f.value()[i].value(), eggs_t); + } else { + EXPECT_TRUE(f.value()[i].hasValue()); + EXPECT_EQ(i, f.value()[i].value()); + } + } +}