/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
#include <algorithm>
#include <chrono>
#include <thread>
-#include <folly/Baton.h>
#include <folly/Optional.h>
+#include <folly/executors/InlineExecutor.h>
#include <folly/futures/Timekeeper.h>
#include <folly/futures/detail/Core.h>
+#include <folly/synchronization/Baton.h>
#ifndef FOLLY_FUTURE_USING_FIBER
#if FOLLY_MOBILE || defined(__APPLE__)
return CoreCallbackState<T, _t<std::decay<F>>>(
std::move(p), std::forward<F>(f));
}
-} // namespace detail
-} // namespace futures
-
-template <class T>
-SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
- return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
-}
-
-inline SemiFuture<Unit> makeSemiFuture() {
- return makeSemiFuture(Unit{});
-}
-
-// makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
-template <class F>
-typename std::enable_if<
- isSemiFuture<typename std::result_of<F()>::type>::value,
- typename std::result_of<F()>::type>::type
-makeSemiFutureWith(F&& func) {
- using InnerType =
- typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
- try {
- return std::forward<F>(func)();
- } catch (std::exception& e) {
- return makeSemiFuture<InnerType>(
- exception_wrapper(std::current_exception(), e));
- } catch (...) {
- return makeSemiFuture<InnerType>(
- exception_wrapper(std::current_exception()));
- }
-}
-
-// makeSemiFutureWith(T()) -> SemiFuture<T>
-// makeSemiFutureWith(void()) -> SemiFuture<Unit>
-template <class F>
-typename std::enable_if<
- !(isSemiFuture<typename std::result_of<F()>::type>::value),
- SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
-makeSemiFutureWith(F&& func) {
- using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
- return makeSemiFuture<LiftedResult>(
- makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
-}
template <class T>
-SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
- return makeSemiFuture(Try<T>(e));
+FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
+ other.core_ = nullptr;
}
template <class T>
-SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
- return makeSemiFuture(Try<T>(std::move(ew)));
-}
-
-template <class T, class E>
-typename std::
- enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
- makeSemiFuture(E const& e) {
- return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
+FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
+ other.core_ = nullptr;
}
template <class T>
-SemiFuture<T> makeSemiFuture(Try<T>&& t) {
- return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
-}
+template <class T2, typename>
+FutureBase<T>::FutureBase(T2&& val)
+ : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
template <class T>
-SemiFuture<T> SemiFuture<T>::makeEmpty() {
- return SemiFuture<T>(futures::detail::EmptyConstruct{});
-}
+template <typename T2>
+FutureBase<T>::FutureBase(
+ typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
+ : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
template <class T>
-SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
- other.core_ = nullptr;
+template <
+ class... Args,
+ typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
+ type>
+FutureBase<T>::FutureBase(in_place_t, Args&&... args)
+ : core_(
+ new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
}
template <class T>
-SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
+template <class FutureType>
+void FutureBase<T>::assign(FutureType& other) noexcept {
std::swap(core_, other.core_);
- return *this;
}
template <class T>
-SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
- other.core_ = nullptr;
+FutureBase<T>::~FutureBase() {
+ detach();
}
template <class T>
-SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
- std::swap(core_, other.core_);
- return *this;
+T& FutureBase<T>::value() & {
+ return result().value();
}
template <class T>
-template <class T2, typename>
-SemiFuture<T>::SemiFuture(T2&& val)
- : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
-
-template <class T>
-template <typename T2>
-SemiFuture<T>::SemiFuture(
- typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
- : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
+T const& FutureBase<T>::value() const& {
+ return result().value();
+}
template <class T>
-template <
- class... Args,
- typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
- type>
-SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
- : core_(
- new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
+T&& FutureBase<T>::value() && {
+ return std::move(result().value());
}
template <class T>
-SemiFuture<T>::~SemiFuture() {
- detach();
+T const&& FutureBase<T>::value() const&& {
+ return std::move(result().value());
}
template <class T>
-typename std::add_lvalue_reference<T>::type SemiFuture<T>::value() {
+Try<T>& FutureBase<T>::result() & {
throwIfInvalid();
- return core_->getTry().value();
+ return core_->getTry();
}
template <class T>
-typename std::add_lvalue_reference<const T>::type SemiFuture<T>::value() const {
+Try<T> const& FutureBase<T>::result() const& {
throwIfInvalid();
- return core_->getTry().value();
+ return core_->getTry();
}
template <class T>
-inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
+Try<T>&& FutureBase<T>::result() && {
throwIfInvalid();
- setExecutor(executor, priority);
-
- auto newFuture = Future<T>(core_);
- core_ = nullptr;
- return newFuture;
+ return std::move(core_->getTry());
}
template <class T>
-inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
+Try<T> const&& FutureBase<T>::result() const&& {
throwIfInvalid();
- Promise<T> p;
- auto f = p.getFuture();
- auto func = [p = std::move(p)](Try<T>&& t) mutable {
- p.setTry(std::move(t));
- };
- using R = futures::detail::callableResult<T, decltype(func)>;
- thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
- return std::move(f).via(executor, priority);
+
+ return std::move(core_->getTry());
}
template <class T>
-bool SemiFuture<T>::isReady() const {
+bool FutureBase<T>::isReady() const {
throwIfInvalid();
return core_->ready();
}
template <class T>
-bool SemiFuture<T>::hasValue() {
- return getTry().hasValue();
+bool FutureBase<T>::hasValue() {
+ return core_->getTry().hasValue();
}
template <class T>
-bool SemiFuture<T>::hasException() {
- return getTry().hasException();
+bool FutureBase<T>::hasException() {
+ return core_->getTry().hasException();
}
template <class T>
-void SemiFuture<T>::detach() {
+void FutureBase<T>::detach() {
if (core_) {
core_->detachFuture();
core_ = nullptr;
}
template <class T>
-Try<T>& SemiFuture<T>::getTry() {
- throwIfInvalid();
-
- return core_->getTry();
-}
-
-template <class T>
-void SemiFuture<T>::throwIfInvalid() const {
- if (!core_)
+void FutureBase<T>::throwIfInvalid() const {
+ if (!core_) {
throwNoState();
+ }
}
template <class T>
-Optional<Try<T>> SemiFuture<T>::poll() {
+Optional<Try<T>> FutureBase<T>::poll() {
Optional<Try<T>> o;
if (core_->ready()) {
o = std::move(core_->getTry());
}
template <class T>
-void SemiFuture<T>::raise(exception_wrapper exception) {
+void FutureBase<T>::raise(exception_wrapper exception) {
core_->raise(std::move(exception));
}
template <class T>
template <class F>
-void SemiFuture<T>::setCallback_(F&& func) {
+void FutureBase<T>::setCallback_(F&& func) {
throwIfInvalid();
core_->setCallback(std::forward<F>(func));
}
template <class T>
-SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
+FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
: core_(nullptr) {}
-template <class T>
-Future<T> Future<T>::makeEmpty() {
- return Future<T>(futures::detail::EmptyConstruct{});
-}
-
-template <class T>
-Future<T>::Future(Future<T>&& other) noexcept
- : SemiFuture<T>(std::move(other)) {}
-
-template <class T>
-Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
- SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
- return *this;
-}
-
-template <class T>
-template <
- class T2,
- typename std::enable_if<
- !std::is_same<T, typename std::decay<T2>::type>::value &&
- std::is_constructible<T, T2&&>::value &&
- std::is_convertible<T2&&, T>::value,
- int>::type>
-Future<T>::Future(Future<T2>&& other)
- : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
-
-template <class T>
-template <
- class T2,
- typename std::enable_if<
- !std::is_same<T, typename std::decay<T2>::type>::value &&
- std::is_constructible<T, T2&&>::value &&
- !std::is_convertible<T2&&, T>::value,
- int>::type>
-Future<T>::Future(Future<T2>&& other)
- : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
-
-template <class T>
-template <
- class T2,
- typename std::enable_if<
- !std::is_same<T, typename std::decay<T2>::type>::value &&
- std::is_constructible<T, T2&&>::value,
- int>::type>
-Future<T>& Future<T>::operator=(Future<T2>&& other) {
- return operator=(
- std::move(other).then([](T2&& v) { return T(std::move(v)); }));
-}
-
-// TODO: isSemiFuture
-template <class T>
-template <class T2, typename>
-Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
-
-template <class T>
-template <typename T2>
-Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
- : SemiFuture<T>() {}
-
-template <class T>
-template <
- class... Args,
- typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
- type>
-Future<T>::Future(in_place_t, Args&&... args)
- : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
-
-template <class T>
-Future<T>::~Future() {
-}
-
-// unwrap
-
-template <class T>
-template <class F>
-typename std::enable_if<isFuture<F>::value,
- Future<typename isFuture<T>::Inner>>::type
-Future<T>::unwrap() {
- return then([](Future<typename isFuture<T>::Inner> internal_future) {
- return internal_future;
- });
-}
-
// then
// Variant: returns a value
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
-SemiFuture<T>::thenImplementation(
+FutureBase<T>::thenImplementation(
F&& func,
futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
-SemiFuture<T>::thenImplementation(
+FutureBase<T>::thenImplementation(
F&& func,
futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
return f;
}
+} // namespace detail
+} // namespace futures
+
+template <class T>
+SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
+ return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
+}
+
+// makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
+template <class F>
+typename std::enable_if<
+ isSemiFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+makeSemiFutureWith(F&& func) {
+ using InnerType =
+ typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
+ try {
+ return std::forward<F>(func)();
+ } catch (std::exception& e) {
+ return makeSemiFuture<InnerType>(
+ exception_wrapper(std::current_exception(), e));
+ } catch (...) {
+ return makeSemiFuture<InnerType>(
+ exception_wrapper(std::current_exception()));
+ }
+}
+
+// makeSemiFutureWith(T()) -> SemiFuture<T>
+// makeSemiFutureWith(void()) -> SemiFuture<Unit>
+template <class F>
+typename std::enable_if<
+ !(isSemiFuture<typename std::result_of<F()>::type>::value),
+ SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
+makeSemiFutureWith(F&& func) {
+ using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
+ return makeSemiFuture<LiftedResult>(
+ makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
+}
+
+template <class T>
+SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
+ return makeSemiFuture(Try<T>(e));
+}
+
+template <class T>
+SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
+ return makeSemiFuture(Try<T>(std::move(ew)));
+}
+
+template <class T, class E>
+typename std::
+ enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
+ makeSemiFuture(E const& e) {
+ return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
+}
+
+template <class T>
+SemiFuture<T> makeSemiFuture(Try<T>&& t) {
+ return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
+}
+
+// This must be defined after the constructors to avoid a bug in MSVC
+// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
+inline SemiFuture<Unit> makeSemiFuture() {
+ return makeSemiFuture(Unit{});
+}
+
+template <class T>
+SemiFuture<T> SemiFuture<T>::makeEmpty() {
+ return SemiFuture<T>(futures::detail::EmptyConstruct{});
+}
+
+template <class T>
+SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
+ : futures::detail::FutureBase<T>(std::move(other)) {}
+
+template <class T>
+SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
+ : futures::detail::FutureBase<T>(std::move(other)) {
+ // SemiFuture should not have an executor on construction
+ if (this->core_) {
+ this->setExecutor(nullptr);
+ }
+}
+
+template <class T>
+SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
+ this->assign(other);
+ return *this;
+}
+
+template <class T>
+SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
+ this->assign(other);
+ // SemiFuture should not have an executor on construction
+ if (this->core_) {
+ this->setExecutor(nullptr);
+ }
+ return *this;
+}
+
+template <class T>
+void SemiFuture<T>::boost_() {
+ // If a SemiFuture has an executor it should be deferred, so boost it
+ if (auto e = this->getExecutor()) {
+ // We know in a SemiFuture that if we have an executor it should be
+ // DeferredExecutor. Verify this in debug mode.
+ DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
+
+ auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
+ static_cast<DeferredExecutor*>(e)->boost();
+ }
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
+ throwIfInvalid();
+ if (!executor) {
+ throwNoExecutor();
+ }
+
+ // If current executor is deferred, boost block to ensure that work
+ // progresses and is run on the new executor.
+ auto oldExecutor = this->getExecutor();
+ if (oldExecutor && executor && (executor != oldExecutor)) {
+ // We know in a SemiFuture that if we have an executor it should be
+ // DeferredExecutor. Verify this in debug mode.
+ DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(this->getExecutor()));
+ if (static_cast<DeferredExecutor*>(oldExecutor)) {
+ executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
+ static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
+ });
+ }
+ }
+
+ this->setExecutor(executor, priority);
+
+ auto newFuture = Future<T>(this->core_);
+ this->core_ = nullptr;
+ return newFuture;
+}
+
+template <class T>
+template <typename F>
+SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
+SemiFuture<T>::defer(F&& func) && {
+ // If we already have a deferred executor, use it, otherwise create one
+ auto defKeepAlive = this->getExecutor()
+ ? this->getExecutor()->getKeepAliveToken()
+ : DeferredExecutor::create();
+ auto e = defKeepAlive.get();
+ // We know in a SemiFuture that if we have an executor it should be
+ // DeferredExecutor (either it was that way before, or we just created it).
+ // Verify this in debug mode.
+ DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
+ // Convert to a folly::future with a deferred executor
+ // Will be low-cost if this is not a new executor as via optimises for that
+ // case
+ auto sf =
+ std::move(*this)
+ .via(e)
+ // Then add the work, with a wrapper function that captures the
+ // keepAlive so the executor is destroyed at the right time.
+ .then(
+ DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
+ // Finally, convert back o a folly::SemiFuture to hide the executor
+ .semi();
+ // Carry deferred executor through chain as constructor from Future will
+ // nullify it
+ sf.setExecutor(e);
+ return sf;
+}
+
+template <class T>
+Future<T> Future<T>::makeEmpty() {
+ return Future<T>(futures::detail::EmptyConstruct{});
+}
+
+template <class T>
+Future<T>::Future(Future<T>&& other) noexcept
+ : futures::detail::FutureBase<T>(std::move(other)) {}
+
+template <class T>
+Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
+ this->assign(other);
+ return *this;
+}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ !std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value,
+ int>::type>
+Future<T>& Future<T>::operator=(Future<T2>&& other) {
+ return operator=(
+ std::move(other).then([](T2&& v) { return T(std::move(v)); }));
+}
+
+// unwrap
+
+template <class T>
+template <class F>
+typename std::
+ enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
+ Future<T>::unwrap() {
+ return then([](Future<typename isFuture<T>::Inner> internal_future) {
+ return internal_future;
+ });
+}
+
+template <class T>
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
+ this->throwIfInvalid();
+
+ this->setExecutor(executor, priority);
+
+ auto newFuture = Future<T>(this->core_);
+ this->core_ = nullptr;
+ return newFuture;
+}
+
+template <class T>
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
+ this->throwIfInvalid();
+ Promise<T> p;
+ auto f = p.getFuture();
+ auto func = [p = std::move(p)](Try<T>&& t) mutable {
+ p.setTry(std::move(t));
+ };
+ using R = futures::detail::callableResult<T, decltype(func)>;
+ this->template thenImplementation<decltype(func), R>(
+ std::move(func), typename R::Arg());
+ return std::move(f).via(executor, priority);
+}
template <typename T>
template <typename R, typename Caller, typename... Args>
return f;
}
-template <class T>
-Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
- return waitVia(e).getTry();
-}
-
template <class Func>
auto via(Executor* x, Func&& func)
-> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
return via(x).then(std::forward<Func>(func));
}
-template <class T>
-Future<T>::Future(futures::detail::EmptyConstruct) noexcept
- : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
-
// makeFuture
template <class T>
Nothing,
std::vector<Optional<T>>>::type;
- explicit CollectContext(size_t n) : result(n) {}
+ explicit CollectContext(size_t n) : result(n) {
+ finalResult.reserve(n);
+ }
~CollectContext() {
if (!threw.exchange(true)) {
// map Optional<T> -> T
- std::vector<T> finalResult;
- finalResult.reserve(result.size());
std::transform(result.begin(), result.end(),
std::back_inserter(finalResult),
[](Optional<T>& o) { return std::move(o.value()); });
}
Promise<Result> p;
InternalResult result;
+ Result finalResult;
std::atomic<bool> threw {false};
};
template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>>
window(Collection input, F func, size_t n) {
- struct WindowContext {
- WindowContext(Collection&& i, F&& fn)
- : input_(std::move(i)), promises_(input_.size()),
- func_(std::move(fn))
- {}
- std::atomic<size_t> i_ {0};
- Collection input_;
- std::vector<Promise<Result>> promises_;
- F func_;
+ // Use global inline executor singleton
+ auto executor = &InlineExecutor::instance();
+ return window(executor, std::move(input), std::move(func), n);
+}
- static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
- size_t i = ctx->i_++;
- if (i < ctx->input_.size()) {
- // Using setCallback_ directly since we don't need the Future
- ctx->func_(std::move(ctx->input_[i])).setCallback_(
- // ctx is captured by value
- [ctx, i](Try<Result>&& t) {
- ctx->promises_[i].setTry(std::move(t));
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Executor* executor, Collection input, F func, size_t n) {
+ struct WindowContext {
+ WindowContext(Executor* executor_, Collection&& input_, F&& func_)
+ : executor(executor_),
+ input(std::move(input_)),
+ promises(input.size()),
+ func(std::move(func_)) {}
+ std::atomic<size_t> i{0};
+ Executor* executor;
+ Collection input;
+ std::vector<Promise<Result>> promises;
+ F func;
+
+ static inline void spawn(std::shared_ptr<WindowContext> ctx) {
+ size_t i = ctx->i++;
+ if (i < ctx->input.size()) {
+ auto fut = ctx->func(std::move(ctx->input[i]));
+ fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
+ const auto executor_ = ctx->executor;
+ executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
+ ctx->promises[i].setTry(std::move(t));
// Chain another future onto this one
spawn(std::move(ctx));
});
+ });
}
}
};
auto max = std::min(n, input.size());
auto ctx = std::make_shared<WindowContext>(
- std::move(input), std::move(func));
+ executor, std::move(input), std::move(func));
+ // Start the first n Futures
for (size_t i = 0; i < max; ++i) {
- // Start the first n Futures
- WindowContext::spawn(ctx);
+ executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
}
std::vector<Future<Result>> futures;
- futures.reserve(ctx->promises_.size());
- for (auto& promise : ctx->promises_) {
+ futures.reserve(ctx->promises.size());
+ for (auto& promise : ctx->promises) {
futures.emplace_back(promise.getFuture());
}
}
std::shared_ptr<Timekeeper> tks;
- if (!tk) {
+ if (LIKELY(!tk)) {
tks = folly::detail::getTimekeeperSingleton();
- tk = DCHECK_NOTNULL(tks.get());
+ tk = tks.get();
+ }
+
+ if (UNLIKELY(!tk)) {
+ return makeFuture<T>(NoTimekeeper());
}
auto ctx = std::make_shared<Context>(std::move(e));
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
return collectAll(*this, futures::sleep(dur, tk))
- .then([](std::tuple<Try<T>, Try<Unit>> tup) {
- Try<T>& t = std::get<0>(tup);
- return makeFuture<T>(std::move(t));
- });
+ .then([](std::tuple<Try<T>, Try<Unit>> tup) {
+ Try<T>& t = std::get<0>(tup);
+ return makeFuture<T>(std::move(t));
+ });
}
namespace futures {
namespace detail {
+template <class T>
+void doBoost(folly::Future<T>& /* usused */) {}
+
+template <class T>
+void doBoost(folly::SemiFuture<T>& f) {
+ f.boost_();
+}
+
template <class FutureType, typename T = typename FutureType::value_type>
void waitImpl(FutureType& f) {
// short-circuit if there's nothing to do
- if (f.isReady()) return;
+ if (f.isReady()) {
+ return;
+ }
FutureBatonType baton;
f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
+ doBoost(f);
baton.wait();
assert(f.isReady());
}
Promise<T> promise;
auto ret = promise.getFuture();
auto baton = std::make_shared<FutureBatonType>();
- f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
+ f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
promise.setTry(std::move(t));
baton->post();
});
+ doBoost(f);
f = std::move(ret);
- if (baton->timed_wait(dur)) {
+ if (baton->try_wait_for(dur)) {
assert(f.isReady());
}
}
// Set callback so to ensure that the via executor has something on it
// so that once the preceding future triggers this callback, drive will
// always have a callback to satisfy it
- if (f.isReady())
+ if (f.isReady()) {
return;
+ }
f = f.via(e).then([](T&& t) { return std::move(t); });
while (!f.isReady()) {
e->drive();
assert(f.isReady());
}
+template <class T>
+void waitViaImpl(SemiFuture<T>& f, DrivableExecutor* e) {
+ // Set callback so to ensure that the via executor has something on it
+ // so that once the preceding future triggers this callback, drive will
+ // always have a callback to satisfy it
+ if (f.isReady()) {
+ return;
+ }
+ f = std::move(f).via(e).then([](T&& t) { return std::move(t); });
+ while (!f.isReady()) {
+ e->drive();
+ }
+ assert(f.isReady());
+}
+
} // namespace detail
} // namespace futures
}
template <class T>
-T SemiFuture<T>::get() {
- return std::move(wait().value());
+SemiFuture<T>& SemiFuture<T>::waitVia(DrivableExecutor* e) & {
+ futures::detail::waitViaImpl(*this, e);
+ return *this;
}
template <class T>
-T SemiFuture<T>::get(Duration dur) {
+SemiFuture<T>&& SemiFuture<T>::waitVia(DrivableExecutor* e) && {
+ futures::detail::waitViaImpl(*this, e);
+ return std::move(*this);
+}
+
+template <class T>
+T SemiFuture<T>::get() && {
+ return std::move(wait()).value();
+}
+
+template <class T>
+T SemiFuture<T>::get(Duration dur) && {
wait(dur);
if (this->isReady()) {
return std::move(this->value());
}
}
+template <class T>
+Try<T> SemiFuture<T>::getTry() && {
+ return std::move(wait()).result();
+}
+
+template <class T>
+Try<T> SemiFuture<T>::getTry(Duration dur) && {
+ wait(dur);
+ if (this->isReady()) {
+ return std::move(this->result());
+ } else {
+ throwTimedOut();
+ }
+}
+
+template <class T>
+T SemiFuture<T>::getVia(DrivableExecutor* e) && {
+ return std::move(waitVia(e)).value();
+}
+
+template <class T>
+Try<T> SemiFuture<T>::getTryVia(DrivableExecutor* e) && {
+ return std::move(waitVia(e)).result();
+}
+
template <class T>
Future<T>& Future<T>::wait() & {
futures::detail::waitImpl(*this);
return std::move(*this);
}
+template <class T>
+T Future<T>::get() {
+ return std::move(wait().value());
+}
+
+template <class T>
+T Future<T>::get(Duration dur) {
+ wait(dur);
+ if (this->isReady()) {
+ return std::move(this->value());
+ } else {
+ throwTimedOut();
+ }
+}
+
+template <class T>
+Try<T>& Future<T>::getTry() {
+ return result();
+}
+
template <class T>
T Future<T>::getVia(DrivableExecutor* e) {
return std::move(waitVia(e).value());
}
+template <class T>
+Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
+ return waitVia(e).getTry();
+}
+
namespace futures {
namespace detail {
template <class T>
}
namespace futures {
- template <class It, class F, class ItT, class Result>
- std::vector<Future<Result>> map(It first, It last, F func) {
- std::vector<Future<Result>> results;
- for (auto it = first; it != last; it++) {
- results.push_back(it->then(func));
- }
- return results;
+template <class It, class F, class ItT, class Result>
+std::vector<Future<Result>> map(It first, It last, F func) {
+ std::vector<Future<Result>> results;
+ for (auto it = first; it != last; it++) {
+ results.push_back(it->then(func));
}
+ return results;
}
+} // namespace futures
// Instantiate the most common Future types to save compile time
extern template class Future<Unit>;