template <typename... Fs>
typename detail::VariadicContext<
typename std::decay<Fs>::type::value_type...>::type
-whenAll(Fs&&... fs)
-{
+whenAll(Fs&&... fs) {
auto ctx =
new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
ctx->total = sizeof...(fs);
Future<
std::vector<
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
-whenAll(InputIterator first, InputIterator last)
-{
+whenAll(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
for (size_t i = 0; first != last; ++first, ++i) {
assert(i < n);
auto& f = *first;
- f.setCallback_([ctx, i, n](Try<T>&& t) {
- ctx->results[i] = std::move(t);
- if (++ctx->count == n) {
- ctx->p.setValue(std::move(ctx->results));
- delete ctx;
+ f.setCallback_([ctx, i, n](Try<T> t) {
+ ctx->results[i] = std::move(t);
+ if (++ctx->count == n) {
+ ctx->p.setValue(std::move(ctx->results));
+ delete ctx;
+ }
+ });
+ }
+
+ return f_saved;
+}
+
+namespace detail {
+
+template <typename T>
+struct CollectContext {
+ explicit CollectContext(int n) : count(0), threw(false) {
+ results.resize(n);
+ }
+ Promise<std::vector<T>> p;
+ std::vector<T> results;
+ std::atomic<size_t> count;
+ std::atomic_bool threw;
+
+ typedef std::vector<T> result_type;
+
+ static inline Future<std::vector<T>> makeEmptyFuture() {
+ return makeFuture(std::vector<T>());
+ }
+
+ inline void setValue() {
+ p.setValue(std::move(results));
+ }
+
+ inline void addResult(int i, Try<T>& t) {
+ results[i] = std::move(t.value());
+ }
+};
+
+template <>
+struct CollectContext<void> {
+ explicit CollectContext(int n) : count(0), threw(false) {}
+ Promise<void> p;
+ std::atomic<size_t> count;
+ std::atomic_bool threw;
+
+ typedef void result_type;
+
+ static inline Future<void> makeEmptyFuture() {
+ return makeFuture();
+ }
+
+ inline void setValue() {
+ p.setValue();
+ }
+
+ inline void addResult(int i, Try<void>& t) {
+ // do nothing
+ }
+};
+
+} // detail
+
+template <class InputIterator>
+Future<typename detail::CollectContext<
+ typename std::iterator_traits<InputIterator>::value_type::value_type
+>::result_type>
+collect(InputIterator first, InputIterator last) {
+ typedef
+ typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+ if (first >= last) {
+ return detail::CollectContext<T>::makeEmptyFuture();
+ }
+
+ size_t n = std::distance(first, last);
+ auto ctx = new detail::CollectContext<T>(n);
+ auto f_saved = ctx->p.getFuture();
+
+ for (size_t i = 0; first != last; ++first, ++i) {
+ assert(i < n);
+ auto& f = *first;
+ f.setCallback_([ctx, i, n](Try<T> t) {
+ auto c = ++ctx->count;
+
+ if (t.hasException()) {
+ if (!ctx->threw.exchange(true)) {
+ ctx->p.setException(std::move(t.exception()));
}
- });
+ } else if (!ctx->threw) {
+ ctx->addResult(i, t);
+ if (c == n) {
+ ctx->setValue();
+ }
+ }
+
+ if (c == n) {
+ delete ctx;
+ }
+ });
}
return f_saved;
template <class> struct Core;
template <class...> struct VariadicContext;
+template <class> struct CollectContext;
template<typename F, typename... Args>
using resultOf = decltype(std::declval<F>()(std::declval<Args>()...));
typename std::decay<Fs>::type::value_type...>::type
whenAll(Fs&&... fs);
+/// Like whenAll, but will short circuit on the first exception. Thus, the
+/// type of the returned Future is std::vector<T> instead of
+/// std::vector<Try<T>>
+template <class InputIterator>
+Future<typename detail::CollectContext<
+ typename std::iterator_traits<InputIterator>::value_type::value_type
+>::result_type>
+collect(InputIterator first, InputIterator last);
+
/** The result is a pair of the index of the first Future to complete and
the Try. If multiple Futures complete at the same time (or are already
complete when passed in), the "winner" is chosen non-deterministically.
}
}
+TEST(Future, collect) {
+ // success case
+ {
+ vector<Promise<int>> promises(10);
+ vector<Future<int>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures.begin(), futures.end());
+
+ random_shuffle(promises.begin(), promises.end());
+ for (auto& p : promises) {
+ EXPECT_FALSE(allf.isReady());
+ p.setValue(42);
+ }
+
+ EXPECT_TRUE(allf.isReady());
+ for (auto i : allf.value()) {
+ EXPECT_EQ(42, i);
+ }
+ }
+
+ // failure case
+ {
+ vector<Promise<int>> promises(10);
+ vector<Future<int>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures.begin(), futures.end());
+
+ random_shuffle(promises.begin(), promises.end());
+ for (int i = 0; i < 10; i++) {
+ if (i < 5) {
+ // everthing goes well so far...
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setValue(42);
+ } else if (i == 5) {
+ // short circuit with an exception
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setException(eggs);
+ EXPECT_TRUE(allf.isReady());
+ } else if (i < 8) {
+ // don't blow up on further values
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setValue(42);
+ } else {
+ // don't blow up on further exceptions
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setException(eggs);
+ }
+ }
+
+ EXPECT_THROW(allf.value(), eggs_t);
+ }
+
+ // void futures success case
+ {
+ vector<Promise<void>> promises(10);
+ vector<Future<void>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures.begin(), futures.end());
+
+ random_shuffle(promises.begin(), promises.end());
+ for (auto& p : promises) {
+ EXPECT_FALSE(allf.isReady());
+ p.setValue();
+ }
+
+ EXPECT_TRUE(allf.isReady());
+ }
+
+ // void futures failure case
+ {
+ vector<Promise<void>> promises(10);
+ vector<Future<void>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures.begin(), futures.end());
+
+ random_shuffle(promises.begin(), promises.end());
+ for (int i = 0; i < 10; i++) {
+ if (i < 5) {
+ // everthing goes well so far...
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setValue();
+ } else if (i == 5) {
+ // short circuit with an exception
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setException(eggs);
+ EXPECT_TRUE(allf.isReady());
+ } else if (i < 8) {
+ // don't blow up on further values
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setValue();
+ } else {
+ // don't blow up on further exceptions
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setException(eggs);
+ }
+ }
+
+ EXPECT_THROW(allf.value(), eggs_t);
+ }
+}
TEST(Future, whenAny) {
{