#include <folly/Baton.h>
#include <folly/Optional.h>
+#include <folly/futures/InlineExecutor.h>
#include <folly/futures/Timekeeper.h>
#include <folly/futures/detail/Core.h>
template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>>
window(Collection input, F func, size_t n) {
- struct WindowContext {
- WindowContext(Collection&& i, F&& fn)
- : input_(std::move(i)), promises_(input_.size()),
- func_(std::move(fn))
- {}
- std::atomic<size_t> i_ {0};
- Collection input_;
- std::vector<Promise<Result>> promises_;
- F func_;
+ // Use global inline executor singleton
+ auto executor = &InlineExecutor::instance();
+ return window(executor, std::move(input), std::move(func), n);
+}
- static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
- size_t i = ctx->i_++;
- if (i < ctx->input_.size()) {
- // Using setCallback_ directly since we don't need the Future
- ctx->func_(std::move(ctx->input_[i])).setCallback_(
- // ctx is captured by value
- [ctx, i](Try<Result>&& t) {
- ctx->promises_[i].setTry(std::move(t));
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Executor* executor, Collection input, F func, size_t n) {
+ struct WindowContext {
+ WindowContext(Executor* executor_, Collection&& input_, F&& func_)
+ : executor(executor_),
+ input(std::move(input_)),
+ promises(input.size()),
+ func(std::move(func_)) {}
+ std::atomic<size_t> i{0};
+ Executor* executor;
+ Collection input;
+ std::vector<Promise<Result>> promises;
+ F func;
+
+ static inline void spawn(std::shared_ptr<WindowContext> ctx) {
+ size_t i = ctx->i++;
+ if (i < ctx->input.size()) {
+ auto fut = ctx->func(std::move(ctx->input[i]));
+ fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
+ const auto executor_ = ctx->executor;
+ executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
+ ctx->promises[i].setTry(std::move(t));
// Chain another future onto this one
spawn(std::move(ctx));
});
+ });
}
}
};
auto max = std::min(n, input.size());
auto ctx = std::make_shared<WindowContext>(
- std::move(input), std::move(func));
+ executor, std::move(input), std::move(func));
+ // Start the first n Futures
for (size_t i = 0; i < max; ++i) {
- // Start the first n Futures
- WindowContext::spawn(ctx);
+ executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
}
std::vector<Future<Result>> futures;
- futures.reserve(ctx->promises_.size());
- for (auto& promise : ctx->promises_) {
+ futures.reserve(ctx->promises.size());
+ for (auto& promise : ctx->promises) {
futures.emplace_back(promise.getFuture());
}
#include <folly/Conv.h>
#include <folly/futures/Future.h>
+#include <folly/futures/ManualExecutor.h>
#include <folly/portability/GTest.h>
#include <vector>
EXPECT_EQ(expect, res);
};
{
- // 2 in-flight at a time
+ SCOPED_TRACE("2 in-flight at a time");
std::vector<int> input = {1, 2, 3};
fn(input, 2, 6);
}
{
- // 4 in-flight at a time
+ SCOPED_TRACE("4 in-flight at a time");
std::vector<int> input = {1, 2, 3};
fn(input, 4, 6);
}
{
- // empty input
+ SCOPED_TRACE("empty input");
std::vector<int> input;
fn(input, 1, 0);
}
barrier.wait();
- for (size_t i = 0; i < ps.size(); i++) {
- ts[i].join();
+ for (auto& t : ts) {
+ t.join();
}
EXPECT_TRUE(f.isReady());
barrier.wait();
- for (size_t i = 0; i < ps.size(); i++) {
- ts[i].join();
+ for (auto& t : ts) {
+ t.join();
}
EXPECT_TRUE(f.isReady());
barrier.wait();
- for (size_t i = 0; i < ps.size(); i++) {
- ts[i].join();
+ for (auto& t : ts) {
+ t.join();
}
EXPECT_TRUE(f.isReady());
}
}
}
+
+TEST(WindowExecutor, basic) {
+ ManualExecutor executor;
+
+ // int -> Future<int>
+ auto fn = [executor_ = &executor](
+ std::vector<int> input, size_t window_size, size_t expect) {
+ auto res = reduce(
+ window(
+ executor_, input, [](int i) { return makeFuture(i); }, window_size),
+ 0,
+ [](int sum, const Try<int>& b) { return sum + *b; });
+ executor_->waitFor(res);
+ EXPECT_EQ(expect, res.get());
+ };
+ {
+ SCOPED_TRACE("2 in-flight at a time");
+ std::vector<int> input = {1, 2, 3};
+ fn(input, 2, 6);
+ }
+ {
+ SCOPED_TRACE("4 in-flight at a time");
+ std::vector<int> input = {1, 2, 3};
+ fn(input, 4, 6);
+ }
+ {
+ SCOPED_TRACE("empty input");
+ std::vector<int> input;
+ fn(input, 1, 0);
+ }
+ {
+ // int -> Future<Unit>
+ auto res = reduce(
+ window(
+ &executor,
+ std::vector<int>({1, 2, 3}),
+ [](int /* i */) { return makeFuture(); },
+ 2),
+ 0,
+ [](int sum, const Try<Unit>& b) {
+ EXPECT_TRUE(b.hasValue());
+ return sum + 1;
+ });
+ executor.waitFor(res);
+ EXPECT_EQ(3, res.get());
+ }
+ {
+ // string -> return Future<int>
+ auto res = reduce(
+ window(
+ &executor,
+ std::vector<std::string>{"1", "2", "3"},
+ [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
+ 2),
+ 0,
+ [](int sum, const Try<int>& b) { return sum + *b; });
+ executor.waitFor(res);
+ EXPECT_EQ(6, res.get());
+ }
+}
+
+TEST(WindowExecutor, parallel) {
+ ManualExecutor executor;
+
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collect(
+ window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ ps[i].setValue(i);
+ });
+ }
+
+ barrier.wait();
+
+ for (auto& t : ts) {
+ t.join();
+ }
+
+ executor.waitFor(f);
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ EXPECT_EQ(i, f.value()[i]);
+ }
+}
+
+TEST(WindowExecutor, parallelWithError) {
+ ManualExecutor executor;
+
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collect(
+ window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size() / 2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (auto& t : ts) {
+ t.join();
+ }
+
+ executor.waitFor(f);
+ EXPECT_TRUE(f.isReady());
+ EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(WindowExecutor, allParallelWithError) {
+ ManualExecutor executor;
+
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collectAll(
+ window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size() / 2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (auto& t : ts) {
+ t.join();
+ }
+
+ executor.waitFor(f);
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ if (i == (ps.size() / 2)) {
+ EXPECT_THROW(f.value()[i].value(), eggs_t);
+ } else {
+ EXPECT_TRUE(f.value()[i].hasValue());
+ EXPECT_EQ(i, f.value()[i].value());
+ }
+ }
+}