From ad0c254dbdbac5b166c1b4e9e3386ba487842630 Mon Sep 17 00:00:00 2001 From: Hannes Roth Date: Thu, 14 May 2015 11:50:06 -0700 Subject: [PATCH] (Wangle) window Summary: `window` creates up to `n` Futures at a time and only starts new ones when previous ones complete. A sliding window. Test Plan: Run all the tests. Reviewed By: hans@fb.com Subscribers: bmatheny, henryf, scottstraw, juliafu, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2015310 Signature: t1:2015310:1431557556:1017006cc9c9c2562ebe2c3dabfc4dbf316ff408 --- folly/futures/Future-inl.h | 47 +++++++++++++++++++++++++++++++ folly/futures/helpers.h | 15 ++++++++++ folly/futures/test/FutureTest.cpp | 30 ++++++++++++++++++++ 3 files changed, 92 insertions(+) diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 28cb216f..f17eabfa 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -739,6 +739,53 @@ Future reduce(It first, It last, T&& initial, F&& func) { return f; } +template +std::vector> +window(Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Collection&& i, F&& fn) + : i_(0), input_(std::move(i)), promises_(input_.size()), + func_(std::move(fn)) + {} + std::atomic i_; + Collection input_; + std::vector> promises_; + F func_; + + static inline void spawn(const std::shared_ptr& ctx) { + size_t i = ctx->i_++; + if (i < ctx->input_.size()) { + // Using setCallback_ directly since we don't need the Future + ctx->func_(std::move(ctx->input_[i])).setCallback_( + // ctx is captured by value + [ctx, i](Try&& t) { + ctx->promises_[i].setTry(std::move(t)); + // Chain another future onto this one + spawn(std::move(ctx)); + }); + } + } + }; + + auto max = std::min(n, input.size()); + + auto ctx = std::make_shared( + std::move(input), std::move(func)); + + for (size_t i = 0; i < max; ++i) { + // Start the first n Futures + WindowContext::spawn(ctx); + } + + std::vector> futures; + futures.reserve(ctx->promises_.size()); + for (auto& promise : ctx->promises_) { + futures.emplace_back(promise.getFuture()); + } + + return futures; +} + template template Future Future::reduce(I&& initial, F&& func) { diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index c7456411..ded42702 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -224,6 +224,21 @@ auto collectN(Collection&& c, size_t n) return collectN(c.begin(), c.end(), n); } +/** window creates up to n Futures using the values + in the collection, and then another Future for each Future + that completes + + this is basically a sliding window of Futures of size n + + func must return a Future for each value in input + */ +template ::value_type, + class Result = typename detail::resultOf::value_type> +std::vector> +window(Collection input, F func, size_t n); + template using MaybeTryArg = typename std::conditional< detail::callableWith&&>::value, Try, ItT>::type; diff --git a/folly/futures/test/FutureTest.cpp b/folly/futures/test/FutureTest.cpp index faeb1da2..96b5cd5f 100644 --- a/folly/futures/test/FutureTest.cpp +++ b/folly/futures/test/FutureTest.cpp @@ -690,6 +690,36 @@ TEST(Future, unwrap) { EXPECT_EQ(7, f.value()); } +TEST(Future, stream) { + auto fn = [](vector input, size_t window_size, size_t expect) { + auto res = reduce( + window( + input, + [](int i) { return makeFuture(i); }, + 2), + 0, + [](int sum, const Try& b) { + return sum + *b; + }).get(); + EXPECT_EQ(expect, res); + }; + { + // streaming 2 at a time + vector input = {1, 2, 3}; + fn(input, 2, 6); + } + { + // streaming 4 at a time + vector input = {1, 2, 3}; + fn(input, 4, 6); + } + { + // empty inpt + vector input; + fn(input, 1, 0); + } +} + TEST(Future, collectAll) { // returns a vector variant { -- 2.34.1