Summary: `window` creates up to `n` Futures at a time and only starts new ones when previous ones complete. A sliding window.
Test Plan: Run all the tests.
Reviewed By: hans@fb.com
Subscribers: bmatheny, henryf, scottstraw, juliafu, folly-diffs@, jsedgwick, yfeldblum, chalfant
FB internal diff:
D2015310
Signature: t1:
2015310:
1431557556:
1017006cc9c9c2562ebe2c3dabfc4dbf316ff408
return f;
}
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n) {
+ struct WindowContext {
+ WindowContext(Collection&& i, F&& fn)
+ : i_(0), input_(std::move(i)), promises_(input_.size()),
+ func_(std::move(fn))
+ {}
+ std::atomic<size_t> i_;
+ Collection input_;
+ std::vector<Promise<Result>> promises_;
+ F func_;
+
+ static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
+ size_t i = ctx->i_++;
+ if (i < ctx->input_.size()) {
+ // Using setCallback_ directly since we don't need the Future
+ ctx->func_(std::move(ctx->input_[i])).setCallback_(
+ // ctx is captured by value
+ [ctx, i](Try<ItT>&& t) {
+ ctx->promises_[i].setTry(std::move(t));
+ // Chain another future onto this one
+ spawn(std::move(ctx));
+ });
+ }
+ }
+ };
+
+ auto max = std::min(n, input.size());
+
+ auto ctx = std::make_shared<WindowContext>(
+ std::move(input), std::move(func));
+
+ for (size_t i = 0; i < max; ++i) {
+ // Start the first n Futures
+ WindowContext::spawn(ctx);
+ }
+
+ std::vector<Future<Result>> futures;
+ futures.reserve(ctx->promises_.size());
+ for (auto& promise : ctx->promises_) {
+ futures.emplace_back(promise.getFuture());
+ }
+
+ return futures;
+}
+
template <class T>
template <class I, class F>
Future<I> Future<T>::reduce(I&& initial, F&& func) {
return collectN(c.begin(), c.end(), n);
}
+/** window creates up to n Futures using the values
+ in the collection, and then another Future for each Future
+ that completes
+
+ this is basically a sliding window of Futures of size n
+
+ func must return a Future for each value in input
+ */
+template <class Collection, class F,
+ class ItT = typename std::iterator_traits<
+ typename Collection::iterator>::value_type,
+ class Result = typename detail::resultOf<F, ItT&&>::value_type>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n);
+
template <typename F, typename T, typename ItT>
using MaybeTryArg = typename std::conditional<
detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type;
EXPECT_EQ(7, f.value());
}
+TEST(Future, stream) {
+ auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+ auto res = reduce(
+ window(
+ input,
+ [](int i) { return makeFuture(i); },
+ 2),
+ 0,
+ [](int sum, const Try<int>& b) {
+ return sum + *b;
+ }).get();
+ EXPECT_EQ(expect, res);
+ };
+ {
+ // streaming 2 at a time
+ vector<int> input = {1, 2, 3};
+ fn(input, 2, 6);
+ }
+ {
+ // streaming 4 at a time
+ vector<int> input = {1, 2, 3};
+ fn(input, 4, 6);
+ }
+ {
+ // empty inpt
+ vector<int> input;
+ fn(input, 1, 0);
+ }
+}
+
TEST(Future, collectAll) {
// returns a vector variant
{