From: Walker Mills Date: Thu, 19 Oct 2017 07:57:03 +0000 (-0700) Subject: Add window overload that takes an executor to prevent stack overflow X-Git-Tag: v2017.10.23.00~17 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=ebded22a9dae565f3dc6aa4bf7593f52ec189cd0;p=folly.git Add window overload that takes an executor to prevent stack overflow Summary: AIUI, if there is no executor available, then callbacks are executed inline. `folly::window` uses a recursive helper function (`spawn`) to handle chaining callbacks. So if `window` is used on a large enough collection of `Future`s without executors (e.g., created by `makeFuture`, or have otherwise already completed), you get a stack overflow. A minimal repro looks like: ``` int main(int argc, char** argv) { std::vector v(100000); for(int i=0; i < v.size(); i++) { v[i] = i; } std::vector> f = folly::window( std::move(v), [](int /* unused */) { return folly::makeFuture(); }, 1); folly::collectAll(f).get(); } ``` This diff resolves the issue by adding an overload of `folly::window` which takes an executor as its first parameter. The executor-less `window` overload calls through to the new function using an `InlineExecutor` as the default executor. Reviewed By: yfeldblum Differential Revision: D6038733 fbshipit-source-id: 5dcab575592650efa2e106f12632ec06817a0009 --- diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index c6ea59a8..258c9f83 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -1067,27 +1068,38 @@ Future reduce(It first, It last, T&& initial, F&& func) { template std::vector> window(Collection input, F func, size_t n) { - struct WindowContext { - WindowContext(Collection&& i, F&& fn) - : input_(std::move(i)), promises_(input_.size()), - func_(std::move(fn)) - {} - std::atomic i_ {0}; - Collection input_; - std::vector> promises_; - F func_; + // Use global inline executor singleton + auto executor = &InlineExecutor::instance(); + return window(executor, std::move(input), std::move(func), n); +} - static inline void spawn(const std::shared_ptr& ctx) { - size_t i = ctx->i_++; - if (i < ctx->input_.size()) { - // Using setCallback_ directly since we don't need the Future - ctx->func_(std::move(ctx->input_[i])).setCallback_( - // ctx is captured by value - [ctx, i](Try&& t) { - ctx->promises_[i].setTry(std::move(t)); +template +std::vector> +window(Executor* executor, Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Executor* executor_, Collection&& input_, F&& func_) + : executor(executor_), + input(std::move(input_)), + promises(input.size()), + func(std::move(func_)) {} + std::atomic i{0}; + Executor* executor; + Collection input; + std::vector> promises; + F func; + + static inline void spawn(std::shared_ptr ctx) { + size_t i = ctx->i++; + if (i < ctx->input.size()) { + auto fut = ctx->func(std::move(ctx->input[i])); + fut.setCallback_([ctx = std::move(ctx), i](Try&& t) mutable { + const auto executor_ = ctx->executor; + executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable { + ctx->promises[i].setTry(std::move(t)); // Chain another future onto this one spawn(std::move(ctx)); }); + }); } } }; @@ -1095,16 +1107,16 @@ window(Collection input, F func, size_t n) { auto max = std::min(n, input.size()); auto ctx = std::make_shared( - std::move(input), std::move(func)); + executor, std::move(input), std::move(func)); + // Start the first n Futures for (size_t i = 0; i < max; ++i) { - // Start the first n Futures - WindowContext::spawn(ctx); + executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); }); } std::vector> futures; - futures.reserve(ctx->promises_.size()); - for (auto& promise : ctx->promises_) { + futures.reserve(ctx->promises.size()); + for (auto& promise : ctx->promises) { futures.emplace_back(promise.getFuture()); } diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index ff132af0..a5198e71 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -328,6 +328,15 @@ template < class Result = typename futures::detail::resultOf::value_type> std::vector> window(Collection input, F func, size_t n); +template < + class Collection, + class F, + class ItT = typename std::iterator_traits< + typename Collection::iterator>::value_type, + class Result = typename futures::detail::resultOf::value_type> +std::vector> +window(Executor* executor, Collection input, F func, size_t n); + template using MaybeTryArg = typename std::conditional< futures::detail::callableWith&&>::value, diff --git a/folly/futures/test/WindowTest.cpp b/folly/futures/test/WindowTest.cpp index c2105f0a..098fc7a8 100644 --- a/folly/futures/test/WindowTest.cpp +++ b/folly/futures/test/WindowTest.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -42,17 +43,17 @@ TEST(Window, basic) { EXPECT_EQ(expect, res); }; { - // 2 in-flight at a time + SCOPED_TRACE("2 in-flight at a time"); std::vector input = {1, 2, 3}; fn(input, 2, 6); } { - // 4 in-flight at a time + SCOPED_TRACE("4 in-flight at a time"); std::vector input = {1, 2, 3}; fn(input, 4, 6); } { - // empty input + SCOPED_TRACE("empty input"); std::vector input; fn(input, 1, 0); } @@ -104,8 +105,8 @@ TEST(Window, parallel) { barrier.wait(); - for (size_t i = 0; i < ps.size(); i++) { - ts[i].join(); + for (auto& t : ts) { + t.join(); } EXPECT_TRUE(f.isReady()); @@ -139,8 +140,8 @@ TEST(Window, parallelWithError) { barrier.wait(); - for (size_t i = 0; i < ps.size(); i++) { - ts[i].join(); + for (auto& t : ts) { + t.join(); } EXPECT_TRUE(f.isReady()); @@ -172,8 +173,8 @@ TEST(Window, allParallelWithError) { barrier.wait(); - for (size_t i = 0; i < ps.size(); i++) { - ts[i].join(); + for (auto& t : ts) { + t.join(); } EXPECT_TRUE(f.isReady()); @@ -186,3 +187,173 @@ TEST(Window, allParallelWithError) { } } } + +TEST(WindowExecutor, basic) { + ManualExecutor executor; + + // int -> Future + auto fn = [executor_ = &executor]( + std::vector input, size_t window_size, size_t expect) { + auto res = reduce( + window( + executor_, input, [](int i) { return makeFuture(i); }, window_size), + 0, + [](int sum, const Try& b) { return sum + *b; }); + executor_->waitFor(res); + EXPECT_EQ(expect, res.get()); + }; + { + SCOPED_TRACE("2 in-flight at a time"); + std::vector input = {1, 2, 3}; + fn(input, 2, 6); + } + { + SCOPED_TRACE("4 in-flight at a time"); + std::vector input = {1, 2, 3}; + fn(input, 4, 6); + } + { + SCOPED_TRACE("empty input"); + std::vector input; + fn(input, 1, 0); + } + { + // int -> Future + auto res = reduce( + window( + &executor, + std::vector({1, 2, 3}), + [](int /* i */) { return makeFuture(); }, + 2), + 0, + [](int sum, const Try& b) { + EXPECT_TRUE(b.hasValue()); + return sum + 1; + }); + executor.waitFor(res); + EXPECT_EQ(3, res.get()); + } + { + // string -> return Future + auto res = reduce( + window( + &executor, + std::vector{"1", "2", "3"}, + [](std::string s) { return makeFuture(folly::to(s)); }, + 2), + 0, + [](int sum, const Try& b) { return sum + *b; }); + executor.waitFor(res); + EXPECT_EQ(6, res.get()); + } +} + +TEST(WindowExecutor, parallel) { + ManualExecutor executor; + + std::vector input; + std::vector> ps(10); + for (size_t i = 0; i < ps.size(); i++) { + input.emplace_back(i); + } + auto f = collect( + window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3)); + + std::vector ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + ps[i].setValue(i); + }); + } + + barrier.wait(); + + for (auto& t : ts) { + t.join(); + } + + executor.waitFor(f); + EXPECT_TRUE(f.isReady()); + for (size_t i = 0; i < ps.size(); i++) { + EXPECT_EQ(i, f.value()[i]); + } +} + +TEST(WindowExecutor, parallelWithError) { + ManualExecutor executor; + + std::vector input; + std::vector> ps(10); + for (size_t i = 0; i < ps.size(); i++) { + input.emplace_back(i); + } + auto f = collect( + window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3)); + + std::vector ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + if (i == (ps.size() / 2)) { + ps[i].setException(eggs); + } else { + ps[i].setValue(i); + } + }); + } + + barrier.wait(); + + for (auto& t : ts) { + t.join(); + } + + executor.waitFor(f); + EXPECT_TRUE(f.isReady()); + EXPECT_THROW(f.value(), eggs_t); +} + +TEST(WindowExecutor, allParallelWithError) { + ManualExecutor executor; + + std::vector input; + std::vector> ps(10); + for (size_t i = 0; i < ps.size(); i++) { + input.emplace_back(i); + } + auto f = collectAll( + window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3)); + + std::vector ts; + boost::barrier barrier(ps.size() + 1); + for (size_t i = 0; i < ps.size(); i++) { + ts.emplace_back([&ps, &barrier, i]() { + barrier.wait(); + if (i == (ps.size() / 2)) { + ps[i].setException(eggs); + } else { + ps[i].setValue(i); + } + }); + } + + barrier.wait(); + + for (auto& t : ts) { + t.join(); + } + + executor.waitFor(f); + EXPECT_TRUE(f.isReady()); + for (size_t i = 0; i < ps.size(); i++) { + if (i == (ps.size() / 2)) { + EXPECT_THROW(f.value()[i].value(), eggs_t); + } else { + EXPECT_TRUE(f.value()[i].hasValue()); + EXPECT_EQ(i, f.value()[i].value()); + } + } +}