From ad0c254dbdbac5b166c1b4e9e3386ba487842630 Mon Sep 17 00:00:00 2001
From: Hannes Roth <hannesr@fb.com>
Date: Thu, 14 May 2015 11:50:06 -0700
Subject: [PATCH] (Wangle) window

Summary: `window` creates up to `n` Futures at a time and only starts new ones when previous ones complete. A sliding window.

Test Plan: Run all the tests.

Reviewed By: hans@fb.com

Subscribers: bmatheny, henryf, scottstraw, juliafu, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2015310

Signature: t1:2015310:1431557556:1017006cc9c9c2562ebe2c3dabfc4dbf316ff408
---
 folly/futures/Future-inl.h        | 47 +++++++++++++++++++++++++++++++
 folly/futures/helpers.h           | 15 ++++++++++
 folly/futures/test/FutureTest.cpp | 30 ++++++++++++++++++++
 3 files changed, 92 insertions(+)

diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h
index 28cb216f..f17eabfa 100644
--- a/folly/futures/Future-inl.h
+++ b/folly/futures/Future-inl.h
@@ -739,6 +739,53 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) {
   return f;
 }
 
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n) {
+  struct WindowContext {
+    WindowContext(Collection&& i, F&& fn)
+        : i_(0), input_(std::move(i)), promises_(input_.size()),
+          func_(std::move(fn))
+      {}
+    std::atomic<size_t> i_;
+    Collection input_;
+    std::vector<Promise<Result>> promises_;
+    F func_;
+
+    static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
+      size_t i = ctx->i_++;
+      if (i < ctx->input_.size()) {
+        // Using setCallback_ directly since we don't need the Future
+        ctx->func_(std::move(ctx->input_[i])).setCallback_(
+          // ctx is captured by value
+          [ctx, i](Try<ItT>&& t) {
+            ctx->promises_[i].setTry(std::move(t));
+            // Chain another future onto this one
+            spawn(std::move(ctx));
+          });
+      }
+    }
+  };
+
+  auto max = std::min(n, input.size());
+
+  auto ctx = std::make_shared<WindowContext>(
+    std::move(input), std::move(func));
+
+  for (size_t i = 0; i < max; ++i) {
+    // Start the first n Futures
+    WindowContext::spawn(ctx);
+  }
+
+  std::vector<Future<Result>> futures;
+  futures.reserve(ctx->promises_.size());
+  for (auto& promise : ctx->promises_) {
+    futures.emplace_back(promise.getFuture());
+  }
+
+  return futures;
+}
+
 template <class T>
 template <class I, class F>
 Future<I> Future<T>::reduce(I&& initial, F&& func) {
diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h
index c7456411..ded42702 100644
--- a/folly/futures/helpers.h
+++ b/folly/futures/helpers.h
@@ -224,6 +224,21 @@ auto collectN(Collection&& c, size_t n)
   return collectN(c.begin(), c.end(), n);
 }
 
+/** window creates up to n Futures using the values
+    in the collection, and then another Future for each Future
+    that completes
+
+    this is basically a sliding window of Futures of size n
+
+    func must return a Future for each value in input
+  */
+template <class Collection, class F,
+          class ItT = typename std::iterator_traits<
+            typename Collection::iterator>::value_type,
+          class Result = typename detail::resultOf<F, ItT&&>::value_type>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n);
+
 template <typename F, typename T, typename ItT>
 using MaybeTryArg = typename std::conditional<
   detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type;
diff --git a/folly/futures/test/FutureTest.cpp b/folly/futures/test/FutureTest.cpp
index faeb1da2..96b5cd5f 100644
--- a/folly/futures/test/FutureTest.cpp
+++ b/folly/futures/test/FutureTest.cpp
@@ -690,6 +690,36 @@ TEST(Future, unwrap) {
   EXPECT_EQ(7, f.value());
 }
 
+TEST(Future, stream) {
+  auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+      window(
+        input,
+        [](int i) { return makeFuture(i); },
+        2),
+      0,
+      [](int sum, const Try<int>& b) {
+        return sum + *b;
+      }).get();
+    EXPECT_EQ(expect, res);
+  };
+  {
+    // streaming 2 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    // streaming 4 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    // empty inpt
+    vector<int> input;
+    fn(input, 1, 0);
+  }
+}
+
 TEST(Future, collectAll) {
   // returns a vector variant
   {
-- 
2.34.1