return executor_ != nullptr;
}
+ Executor* get() const {
+ return executor_.get();
+ }
+
private:
friend class Executor;
explicit KeepAlive(folly::Executor* executor) : executor_(executor) {}
return std::move(core_->getTry().value());
}
-template <class T>
-inline Future<T> FutureBase<T>::via(Executor* executor, int8_t priority) && {
- throwIfInvalid();
-
- setExecutor(executor, priority);
-
- auto newFuture = Future<T>(core_);
- core_ = nullptr;
- return newFuture;
-}
-
template <class T>
bool FutureBase<T>::isReady() const {
throwIfInvalid();
return *this;
}
+template <class T>
+void SemiFuture<T>::boost_() {
+ // If a SemiFuture has an executor it should be deferred, so boost it
+ if (auto e = this->getExecutor()) {
+ // We know in a SemiFuture that if we have an executor it should be
+ // DeferredExecutor. Verify this in debug mode.
+ DCHECK(dynamic_cast<DeferredExecutor*>(e));
+
+ auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
+ static_cast<DeferredExecutor*>(e)->boost();
+ }
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
+ throwIfInvalid();
+
+ // If current executor is deferred, boost block to ensure that work
+ // progresses and is run on the new executor.
+ auto oldExecutor = this->getExecutor();
+ if (oldExecutor && executor && (executor != oldExecutor)) {
+ // We know in a SemiFuture that if we have an executor it should be
+ // DeferredExecutor. Verify this in debug mode.
+ DCHECK(dynamic_cast<DeferredExecutor*>(this->getExecutor()));
+ if (static_cast<DeferredExecutor*>(oldExecutor)) {
+ executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
+ static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
+ });
+ }
+ }
+
+ this->setExecutor(executor, priority);
+
+ auto newFuture = Future<T>(this->core_);
+ this->core_ = nullptr;
+ return newFuture;
+}
+
+template <class T>
+template <typename F>
+SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
+SemiFuture<T>::defer(F&& func) && {
+ // If we already have a deferred executor, use it, otherwise create one
+ auto defKeepAlive = this->getExecutor()
+ ? this->getExecutor()->getKeepAliveToken()
+ : DeferredExecutor::create();
+ auto e = defKeepAlive.get();
+ // We know in a SemiFuture that if we have an executor it should be
+ // DeferredExecutor (either it was that way before, or we just created it).
+ // Verify this in debug mode.
+ DCHECK(dynamic_cast<DeferredExecutor*>(e));
+ // Convert to a folly::future with a deferred executor
+ // Will be low-cost if this is not a new executor as via optimises for that
+ // case
+ auto sf =
+ std::move(*this)
+ .via(defKeepAlive.get())
+ // Then add the work, with a wrapper function that captures the
+ // keepAlive so the executor is destroyed at the right time.
+ .then(
+ DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
+ // Finally, convert back o a folly::SemiFuture to hide the executor
+ .semi();
+ // Carry deferred executor through chain as constructor from Future will
+ // nullify it
+ sf.setExecutor(e);
+ return sf;
+}
+
template <class T>
Future<T> Future<T>::makeEmpty() {
return Future<T>(futures::detail::EmptyConstruct{});
});
}
+template <class T>
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
+ this->throwIfInvalid();
+
+ this->setExecutor(executor, priority);
+
+ auto newFuture = Future<T>(this->core_);
+ this->core_ = nullptr;
+ return newFuture;
+}
+
template <class T>
inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
this->throwIfInvalid();
namespace futures {
namespace detail {
+template <class T>
+void doBoost(folly::Future<T>& /* usused */) {}
+
+template <class T>
+void doBoost(folly::SemiFuture<T>& f) {
+ f.boost_();
+}
+
template <class FutureType, typename T = typename FutureType::value_type>
void waitImpl(FutureType& f) {
// short-circuit if there's nothing to do
FutureBatonType baton;
f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
+ doBoost(f);
baton.wait();
assert(f.isReady());
}
promise.setTry(std::move(t));
baton->post();
});
+ doBoost(f);
f = std::move(ret);
if (baton->timed_wait(dur)) {
assert(f.isReady());
typedef typename ArgType<Args...>::FirstArg FirstArg;
};
+/**
+ * Defer work until executor is actively boosted.
+ */
+class DeferredExecutor final : public Executor {
+ public:
+ template <typename Class, typename F>
+ struct DeferredWorkWrapper;
+
+ /**
+ * Work wrapper class to capture the keepalive and forward the argument
+ * list to the captured function.
+ */
+ template <typename F, typename R, typename... Args>
+ struct DeferredWorkWrapper<F, R (F::*)(Args...) const> {
+ R operator()(Args... args) {
+ return func(std::forward<Args>(args)...);
+ }
+
+ Executor::KeepAlive a;
+ F func;
+ };
+
+ /**
+ * Construction is private to ensure that creation and deletion are
+ * symmetric
+ */
+ static KeepAlive create() {
+ std::unique_ptr<futures::detail::DeferredExecutor> devb{
+ new futures::detail::DeferredExecutor{}};
+ auto keepAlive = devb->getKeepAliveToken();
+ devb.release();
+ return keepAlive;
+ }
+
+ /// Enqueue a function to executed by this executor. This is not thread-safe.
+ void add(Func func) override {
+ // If we already have a function, wrap and chain. Otherwise assign.
+ if (func_) {
+ func_ = [oldFunc = std::move(func_), func = std::move(func)]() mutable {
+ oldFunc();
+ func();
+ };
+ } else {
+ func_ = std::move(func);
+ }
+ }
+
+ // Boost is like drive for certain types of deferred work
+ // Unlike drive it is safe to run on another executor because it
+ // will only be implemented on deferred-safe executors
+ void boost() {
+ // Ensure that the DeferredExecutor outlives its run operation
+ ++keepAliveCount_;
+ SCOPE_EXIT {
+ releaseAndTryFree();
+ };
+
+ // Drain the executor
+ while (auto func = std::move(func_)) {
+ func();
+ }
+ }
+
+ KeepAlive getKeepAliveToken() override {
+ keepAliveAcquire();
+ return makeKeepAlive();
+ }
+
+ ~DeferredExecutor() = default;
+
+ template <class F>
+ static auto wrap(Executor::KeepAlive keepAlive, F&& func)
+ -> DeferredWorkWrapper<F, decltype(&F::operator())> {
+ return DeferredExecutor::DeferredWorkWrapper<F, decltype(&F::operator())>{
+ std::move(keepAlive), std::forward<F>(func)};
+ }
+
+ protected:
+ void keepAliveAcquire() override {
+ ++keepAliveCount_;
+ }
+
+ void keepAliveRelease() override {
+ releaseAndTryFree();
+ }
+
+ void releaseAndTryFree() {
+ --keepAliveCount_;
+ if (keepAliveCount_ == 0) {
+ delete this;
+ }
+ }
+
+ private:
+ Func func_;
+ ssize_t keepAliveCount_{0};
+
+ DeferredExecutor() = default;
+};
+
} // namespace detail
} // namespace futures
template class Future<double>;
} // namespace folly
-namespace folly { namespace futures {
+namespace folly {
+namespace futures {
Future<Unit> sleep(Duration dur, Timekeeper* tk) {
std::shared_ptr<Timekeeper> tks;
#include <folly/Optional.h>
#include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
#include <folly/Try.h>
#include <folly/Utility.h>
#include <folly/executors/DrivableExecutor.h>
T&& value() &&;
T const&& value() const&&;
- /// Returns an inactive Future which will call back on the other side of
- /// executor (when it is activated).
- ///
- /// NB remember that Futures activate when they destruct. This is good,
- /// it means that this will work:
- ///
- /// f.via(e).then(a).then(b);
- ///
- /// a and b will execute in the same context (the far side of e), because
- /// the Future (temporary variable) created by via(e) does not call back
- /// until it destructs, which is after then(a) and then(b) have been wired
- /// up.
- ///
- /// But this is still racy:
- ///
- /// f = f.via(e).then(a);
- /// f.then(b);
- // The ref-qualifier allows for `this` to be moved out so we
- // don't get access-after-free situations in chaining.
- // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
- inline Future<T> via(
- Executor* executor,
- int8_t priority = Executor::MID_PRI) &&;
-
/** True when the result (or exception) is ready. */
bool isReady() const;
class SemiFuture : private futures::detail::FutureBase<T> {
private:
using Base = futures::detail::FutureBase<T>;
+ using DeferredExecutor = futures::detail::DeferredExecutor;
public:
static SemiFuture<T> makeEmpty(); // equivalent to moved-from
using Base::raise;
using Base::setCallback_;
using Base::value;
- using Base::via;
SemiFuture& operator=(SemiFuture const&) = delete;
SemiFuture& operator=(SemiFuture&&) noexcept;
/// Overload of wait(Duration) for rvalue Futures
SemiFuture<T>&& wait(Duration) &&;
+ /// Returns an inactive Future which will call back on the other side of
+ /// executor (when it is activated).
+ ///
+ /// NB remember that Futures activate when they destruct. This is good,
+ /// it means that this will work:
+ ///
+ /// f.via(e).then(a).then(b);
+ ///
+ /// a and b will execute in the same context (the far side of e), because
+ /// the Future (temporary variable) created by via(e) does not call back
+ /// until it destructs, which is after then(a) and then(b) have been wired
+ /// up.
+ ///
+ /// But this is still racy:
+ ///
+ /// f = f.via(e).then(a);
+ /// f.then(b);
+ // The ref-qualifier allows for `this` to be moved out so we
+ // don't get access-after-free situations in chaining.
+ // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
+ inline Future<T> via(
+ Executor* executor,
+ int8_t priority = Executor::MID_PRI) &&;
+
+ /**
+ * Defer work to run on the consumer of the future.
+ * This work will be run eithe ron an executor that the caller sets on the
+ * SemiFuture, or inline with the call to .get().
+ * NB: This is a custom method because boost-blocking executors is a
+ * special-case for work deferral in folly. With more general boost-blocking
+ * support all executors would boost block and we would simply use some form
+ * of driveable executor here.
+ */
+ template <typename F>
+ SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
+ defer(F&& func) &&;
+
+ // Public as for setCallback_
+ // Ensure that a boostable executor performs work to chain deferred work
+ // cleanly
+ void boost_();
+
private:
template <class>
friend class futures::detail::FutureBase;
+ template <class>
+ friend class SemiFuture;
using typename Base::corePtr;
+ using Base::setExecutor;
+ using Base::throwIfInvalid;
template <class T2>
friend SemiFuture<T2> makeSemiFuture(Try<T2>&&);
using Base::raise;
using Base::setCallback_;
using Base::value;
- using Base::via;
static Future<T> makeEmpty(); // equivalent to moved-from
enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
unwrap();
+ /// Returns an inactive Future which will call back on the other side of
+ /// executor (when it is activated).
+ ///
+ /// NB remember that Futures activate when they destruct. This is good,
+ /// it means that this will work:
+ ///
+ /// f.via(e).then(a).then(b);
+ ///
+ /// a and b will execute in the same context (the far side of e), because
+ /// the Future (temporary variable) created by via(e) does not call back
+ /// until it destructs, which is after then(a) and then(b) have been wired
+ /// up.
+ ///
+ /// But this is still racy:
+ ///
+ /// f = f.via(e).then(a);
+ /// f.then(b);
+ // The ref-qualifier allows for `this` to be moved out so we
+ // don't get access-after-free situations in chaining.
+ // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
+ inline Future<T> via(
+ Executor* executor,
+ int8_t priority = Executor::MID_PRI) &&;
+
/// This variant creates a new future, where the ref-qualifier && version
/// moves `this` out. This one is less efficient but avoids confusing users
/// when "return f.via(x);" fails.
friend class futures::detail::FutureBase;
template <class>
friend class Future;
+ template <class>
+ friend class SemiFuture;
+ using Base::setExecutor;
+ using Base::throwIfInvalid;
using typename Base::corePtr;
explicit Future(corePtr obj) : Base(obj) {}
ASSERT_EQ(future.value(), 42);
ASSERT_EQ(result, 42);
}
+
+TEST(SemiFuture, SimpleDefer) {
+ std::atomic<int> innerResult{0};
+ Promise<folly::Unit> p;
+ auto f = p.getFuture();
+ auto sf = std::move(f).semi().defer([&]() { innerResult = 17; });
+ p.setValue();
+ // Run "F" here inline in the calling thread
+ std::move(sf).get();
+ ASSERT_EQ(innerResult, 17);
+}
+
+TEST(SemiFuture, DeferWithVia) {
+ std::atomic<int> innerResult{0};
+ EventBase e2;
+ Promise<folly::Unit> p;
+ auto f = p.getFuture();
+ auto sf = std::move(f).semi().defer([&]() { innerResult = 17; });
+ // Run "F" here inline in the calling thread
+ auto tf = std::move(sf).via(&e2);
+ p.setValue();
+ tf.getVia(&e2);
+ ASSERT_EQ(innerResult, 17);
+}
+
+TEST(SemiFuture, ChainingDefertoThen) {
+ std::atomic<int> innerResult{0};
+ std::atomic<int> result{0};
+ EventBase e2;
+ Promise<folly::Unit> p;
+ auto f = p.getFuture();
+ auto sf = std::move(f).semi().defer([&]() { innerResult = 17; });
+ // Run "F" here inline in a task running on the eventbase
+ auto tf = std::move(sf).via(&e2).then([&]() { result = 42; });
+ p.setValue();
+ tf.getVia(&e2);
+ ASSERT_EQ(innerResult, 17);
+ ASSERT_EQ(result, 42);
+}
+
+TEST(SemiFuture, SimpleDeferWithValue) {
+ std::atomic<int> innerResult{0};
+ Promise<int> p;
+ auto f = p.getFuture();
+ auto sf = std::move(f).semi().defer([&](int a) { innerResult = a; });
+ p.setValue(7);
+ // Run "F" here inline in the calling thread
+ std::move(sf).get();
+ ASSERT_EQ(innerResult, 7);
+}
+
+TEST(SemiFuture, ChainingDefertoThenWithValue) {
+ std::atomic<int> innerResult{0};
+ std::atomic<int> result{0};
+ EventBase e2;
+ Promise<int> p;
+ auto f = p.getFuture();
+ auto sf = std::move(f).semi().defer([&](int a) {
+ innerResult = a;
+ return a;
+ });
+ // Run "F" here inline in a task running on the eventbase
+ auto tf = std::move(sf).via(&e2).then([&](int a) { result = a; });
+ p.setValue(7);
+ tf.getVia(&e2);
+ ASSERT_EQ(innerResult, 7);
+ ASSERT_EQ(result, 7);
+}
+
+TEST(SemiFuture, MakeSemiFutureFromFutureWithTry) {
+ Promise<int> p;
+ auto f = p.getFuture();
+ auto sf = std::move(f).semi().defer([&](Try<int> t) {
+ if (auto err = t.tryGetExceptionObject<std::logic_error>()) {
+ return Try<std::string>(err->what());
+ }
+ return Try<std::string>(
+ make_exception_wrapper<std::logic_error>("Exception"));
+ });
+ p.setException(make_exception_wrapper<std::logic_error>("Try"));
+ auto tryResult = std::move(sf).get();
+ ASSERT_EQ(tryResult.value(), "Try");
+}