From 6e7e5a64d39ea0f2ed11660f46c4ba4d926ff419 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Tue, 31 Oct 2017 14:20:08 -0700 Subject: [PATCH] Adding DeferredExecutor to support deferred execution of tasks on a future returned from an interface. Summary: This adds a DeferredExecutor type that is boostable, which means that it follows the expectation we expect for C++20 that .then and get will trigger boost-blocking behaviour and ensure work makes progress. Unlike discussions for C++ this adds boost blocking to folly only in the specific case of deferring work to run on the caller's executor, to avoid the necessity to pass an executor into a library purely to ensure that finalisation work and future completion occor on a well-defined exewcutor. Reviewed By: yfeldblum Differential Revision: D5828743 fbshipit-source-id: 9a4b69d7deaa33c3cecd6546651b99cc99f0c286 --- folly/Executor.h | 4 + folly/futures/Future-inl.h | 101 ++++++++++++++++++++++--- folly/futures/Future-pre.h | 100 +++++++++++++++++++++++++ folly/futures/Future.cpp | 3 +- folly/futures/Future.h | 102 +++++++++++++++++++------- folly/futures/test/SemiFutureTest.cpp | 83 +++++++++++++++++++++ 6 files changed, 355 insertions(+), 38 deletions(-) diff --git a/folly/Executor.h b/folly/Executor.h index 68866210..a2ac0d63 100644 --- a/folly/Executor.h +++ b/folly/Executor.h @@ -59,6 +59,10 @@ class Executor { return executor_ != nullptr; } + Executor* get() const { + return executor_.get(); + } + private: friend class Executor; explicit KeepAlive(folly::Executor* executor) : executor_(executor) {} diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 17f47231..1baee146 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -200,17 +200,6 @@ T const&& FutureBase::value() const&& { return std::move(core_->getTry().value()); } -template -inline Future FutureBase::via(Executor* executor, int8_t priority) && { - throwIfInvalid(); - - setExecutor(executor, priority); - - auto newFuture = Future(core_); - core_ = nullptr; - return newFuture; -} - template bool FutureBase::isReady() const { throwIfInvalid(); @@ -478,6 +467,75 @@ SemiFuture& SemiFuture::operator=(Future&& other) noexcept { return *this; } +template +void SemiFuture::boost_() { + // If a SemiFuture has an executor it should be deferred, so boost it + if (auto e = this->getExecutor()) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(dynamic_cast(e)); + + auto ka = static_cast(e)->getKeepAliveToken(); + static_cast(e)->boost(); + } +} + +template +inline Future SemiFuture::via(Executor* executor, int8_t priority) && { + throwIfInvalid(); + + // If current executor is deferred, boost block to ensure that work + // progresses and is run on the new executor. + auto oldExecutor = this->getExecutor(); + if (oldExecutor && executor && (executor != oldExecutor)) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(dynamic_cast(this->getExecutor())); + if (static_cast(oldExecutor)) { + executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() { + static_cast(oldExecutorKA.get())->boost(); + }); + } + } + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + +template +template +SemiFuture::Return::value_type> +SemiFuture::defer(F&& func) && { + // If we already have a deferred executor, use it, otherwise create one + auto defKeepAlive = this->getExecutor() + ? this->getExecutor()->getKeepAliveToken() + : DeferredExecutor::create(); + auto e = defKeepAlive.get(); + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor (either it was that way before, or we just created it). + // Verify this in debug mode. + DCHECK(dynamic_cast(e)); + // Convert to a folly::future with a deferred executor + // Will be low-cost if this is not a new executor as via optimises for that + // case + auto sf = + std::move(*this) + .via(defKeepAlive.get()) + // Then add the work, with a wrapper function that captures the + // keepAlive so the executor is destroyed at the right time. + .then( + DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func))) + // Finally, convert back o a folly::SemiFuture to hide the executor + .semi(); + // Carry deferred executor through chain as constructor from Future will + // nullify it + sf.setExecutor(e); + return sf; +} + template Future Future::makeEmpty() { return Future(futures::detail::EmptyConstruct{}); @@ -539,6 +597,17 @@ typename std:: }); } +template +inline Future Future::via(Executor* executor, int8_t priority) && { + this->throwIfInvalid(); + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + template inline Future Future::via(Executor* executor, int8_t priority) & { this->throwIfInvalid(); @@ -1269,6 +1338,14 @@ Future Future::delayed(Duration dur, Timekeeper* tk) { namespace futures { namespace detail { +template +void doBoost(folly::Future& /* usused */) {} + +template +void doBoost(folly::SemiFuture& f) { + f.boost_(); +} + template void waitImpl(FutureType& f) { // short-circuit if there's nothing to do @@ -1278,6 +1355,7 @@ void waitImpl(FutureType& f) { FutureBatonType baton; f.setCallback_([&](const Try& /* t */) { baton.post(); }); + doBoost(f); baton.wait(); assert(f.isReady()); } @@ -1296,6 +1374,7 @@ void waitImpl(FutureType& f, Duration dur) { promise.setTry(std::move(t)); baton->post(); }); + doBoost(f); f = std::move(ret); if (baton->timed_wait(dur)) { assert(f.isReady()); diff --git a/folly/futures/Future-pre.h b/folly/futures/Future-pre.h index 86514b07..1fd0ebe7 100644 --- a/folly/futures/Future-pre.h +++ b/folly/futures/Future-pre.h @@ -153,6 +153,106 @@ struct Extract { typedef typename ArgType::FirstArg FirstArg; }; +/** + * Defer work until executor is actively boosted. + */ +class DeferredExecutor final : public Executor { + public: + template + struct DeferredWorkWrapper; + + /** + * Work wrapper class to capture the keepalive and forward the argument + * list to the captured function. + */ + template + struct DeferredWorkWrapper { + R operator()(Args... args) { + return func(std::forward(args)...); + } + + Executor::KeepAlive a; + F func; + }; + + /** + * Construction is private to ensure that creation and deletion are + * symmetric + */ + static KeepAlive create() { + std::unique_ptr devb{ + new futures::detail::DeferredExecutor{}}; + auto keepAlive = devb->getKeepAliveToken(); + devb.release(); + return keepAlive; + } + + /// Enqueue a function to executed by this executor. This is not thread-safe. + void add(Func func) override { + // If we already have a function, wrap and chain. Otherwise assign. + if (func_) { + func_ = [oldFunc = std::move(func_), func = std::move(func)]() mutable { + oldFunc(); + func(); + }; + } else { + func_ = std::move(func); + } + } + + // Boost is like drive for certain types of deferred work + // Unlike drive it is safe to run on another executor because it + // will only be implemented on deferred-safe executors + void boost() { + // Ensure that the DeferredExecutor outlives its run operation + ++keepAliveCount_; + SCOPE_EXIT { + releaseAndTryFree(); + }; + + // Drain the executor + while (auto func = std::move(func_)) { + func(); + } + } + + KeepAlive getKeepAliveToken() override { + keepAliveAcquire(); + return makeKeepAlive(); + } + + ~DeferredExecutor() = default; + + template + static auto wrap(Executor::KeepAlive keepAlive, F&& func) + -> DeferredWorkWrapper { + return DeferredExecutor::DeferredWorkWrapper{ + std::move(keepAlive), std::forward(func)}; + } + + protected: + void keepAliveAcquire() override { + ++keepAliveCount_; + } + + void keepAliveRelease() override { + releaseAndTryFree(); + } + + void releaseAndTryFree() { + --keepAliveCount_; + if (keepAliveCount_ == 0) { + delete this; + } + } + + private: + Func func_; + ssize_t keepAliveCount_{0}; + + DeferredExecutor() = default; +}; + } // namespace detail } // namespace futures diff --git a/folly/futures/Future.cpp b/folly/futures/Future.cpp index c4f19a76..49e4c02c 100644 --- a/folly/futures/Future.cpp +++ b/folly/futures/Future.cpp @@ -35,7 +35,8 @@ template class Future; template class Future; } // namespace folly -namespace folly { namespace futures { +namespace folly { +namespace futures { Future sleep(Duration dur, Timekeeper* tk) { std::shared_ptr tks; diff --git a/folly/futures/Future.h b/folly/futures/Future.h index 39957470..5cd5436d 100644 --- a/folly/futures/Future.h +++ b/folly/futures/Future.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -95,30 +96,6 @@ class FutureBase { T&& value() &&; T const&& value() const&&; - /// Returns an inactive Future which will call back on the other side of - /// executor (when it is activated). - /// - /// NB remember that Futures activate when they destruct. This is good, - /// it means that this will work: - /// - /// f.via(e).then(a).then(b); - /// - /// a and b will execute in the same context (the far side of e), because - /// the Future (temporary variable) created by via(e) does not call back - /// until it destructs, which is after then(a) and then(b) have been wired - /// up. - /// - /// But this is still racy: - /// - /// f = f.via(e).then(a); - /// f.then(b); - // The ref-qualifier allows for `this` to be moved out so we - // don't get access-after-free situations in chaining. - // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/ - inline Future via( - Executor* executor, - int8_t priority = Executor::MID_PRI) &&; - /** True when the result (or exception) is ready. */ bool isReady() const; @@ -218,6 +195,7 @@ template class SemiFuture : private futures::detail::FutureBase { private: using Base = futures::detail::FutureBase; + using DeferredExecutor = futures::detail::DeferredExecutor; public: static SemiFuture makeEmpty(); // equivalent to moved-from @@ -262,7 +240,6 @@ class SemiFuture : private futures::detail::FutureBase { using Base::raise; using Base::setCallback_; using Base::value; - using Base::via; SemiFuture& operator=(SemiFuture const&) = delete; SemiFuture& operator=(SemiFuture&&) noexcept; @@ -290,11 +267,57 @@ class SemiFuture : private futures::detail::FutureBase { /// Overload of wait(Duration) for rvalue Futures SemiFuture&& wait(Duration) &&; + /// Returns an inactive Future which will call back on the other side of + /// executor (when it is activated). + /// + /// NB remember that Futures activate when they destruct. This is good, + /// it means that this will work: + /// + /// f.via(e).then(a).then(b); + /// + /// a and b will execute in the same context (the far side of e), because + /// the Future (temporary variable) created by via(e) does not call back + /// until it destructs, which is after then(a) and then(b) have been wired + /// up. + /// + /// But this is still racy: + /// + /// f = f.via(e).then(a); + /// f.then(b); + // The ref-qualifier allows for `this` to be moved out so we + // don't get access-after-free situations in chaining. + // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/ + inline Future via( + Executor* executor, + int8_t priority = Executor::MID_PRI) &&; + + /** + * Defer work to run on the consumer of the future. + * This work will be run eithe ron an executor that the caller sets on the + * SemiFuture, or inline with the call to .get(). + * NB: This is a custom method because boost-blocking executors is a + * special-case for work deferral in folly. With more general boost-blocking + * support all executors would boost block and we would simply use some form + * of driveable executor here. + */ + template + SemiFuture::Return::value_type> + defer(F&& func) &&; + + // Public as for setCallback_ + // Ensure that a boostable executor performs work to chain deferred work + // cleanly + void boost_(); + private: template friend class futures::detail::FutureBase; + template + friend class SemiFuture; using typename Base::corePtr; + using Base::setExecutor; + using Base::throwIfInvalid; template friend SemiFuture makeSemiFuture(Try&&); @@ -374,7 +397,6 @@ class Future : private futures::detail::FutureBase { using Base::raise; using Base::setCallback_; using Base::value; - using Base::via; static Future makeEmpty(); // equivalent to moved-from @@ -401,6 +423,30 @@ class Future : private futures::detail::FutureBase { enable_if::value, Future::Inner>>::type unwrap(); + /// Returns an inactive Future which will call back on the other side of + /// executor (when it is activated). + /// + /// NB remember that Futures activate when they destruct. This is good, + /// it means that this will work: + /// + /// f.via(e).then(a).then(b); + /// + /// a and b will execute in the same context (the far side of e), because + /// the Future (temporary variable) created by via(e) does not call back + /// until it destructs, which is after then(a) and then(b) have been wired + /// up. + /// + /// But this is still racy: + /// + /// f = f.via(e).then(a); + /// f.then(b); + // The ref-qualifier allows for `this` to be moved out so we + // don't get access-after-free situations in chaining. + // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/ + inline Future via( + Executor* executor, + int8_t priority = Executor::MID_PRI) &&; + /// This variant creates a new future, where the ref-qualifier && version /// moves `this` out. This one is less efficient but avoids confusing users /// when "return f.via(x);" fails. @@ -693,7 +739,11 @@ class Future : private futures::detail::FutureBase { friend class futures::detail::FutureBase; template friend class Future; + template + friend class SemiFuture; + using Base::setExecutor; + using Base::throwIfInvalid; using typename Base::corePtr; explicit Future(corePtr obj) : Base(obj) {} diff --git a/folly/futures/test/SemiFutureTest.cpp b/folly/futures/test/SemiFutureTest.cpp index 29afb08f..95314832 100644 --- a/folly/futures/test/SemiFutureTest.cpp +++ b/folly/futures/test/SemiFutureTest.cpp @@ -216,3 +216,86 @@ TEST(SemiFuture, MakeFutureFromSemiFutureLValue) { ASSERT_EQ(future.value(), 42); ASSERT_EQ(result, 42); } + +TEST(SemiFuture, SimpleDefer) { + std::atomic innerResult{0}; + Promise p; + auto f = p.getFuture(); + auto sf = std::move(f).semi().defer([&]() { innerResult = 17; }); + p.setValue(); + // Run "F" here inline in the calling thread + std::move(sf).get(); + ASSERT_EQ(innerResult, 17); +} + +TEST(SemiFuture, DeferWithVia) { + std::atomic innerResult{0}; + EventBase e2; + Promise p; + auto f = p.getFuture(); + auto sf = std::move(f).semi().defer([&]() { innerResult = 17; }); + // Run "F" here inline in the calling thread + auto tf = std::move(sf).via(&e2); + p.setValue(); + tf.getVia(&e2); + ASSERT_EQ(innerResult, 17); +} + +TEST(SemiFuture, ChainingDefertoThen) { + std::atomic innerResult{0}; + std::atomic result{0}; + EventBase e2; + Promise p; + auto f = p.getFuture(); + auto sf = std::move(f).semi().defer([&]() { innerResult = 17; }); + // Run "F" here inline in a task running on the eventbase + auto tf = std::move(sf).via(&e2).then([&]() { result = 42; }); + p.setValue(); + tf.getVia(&e2); + ASSERT_EQ(innerResult, 17); + ASSERT_EQ(result, 42); +} + +TEST(SemiFuture, SimpleDeferWithValue) { + std::atomic innerResult{0}; + Promise p; + auto f = p.getFuture(); + auto sf = std::move(f).semi().defer([&](int a) { innerResult = a; }); + p.setValue(7); + // Run "F" here inline in the calling thread + std::move(sf).get(); + ASSERT_EQ(innerResult, 7); +} + +TEST(SemiFuture, ChainingDefertoThenWithValue) { + std::atomic innerResult{0}; + std::atomic result{0}; + EventBase e2; + Promise p; + auto f = p.getFuture(); + auto sf = std::move(f).semi().defer([&](int a) { + innerResult = a; + return a; + }); + // Run "F" here inline in a task running on the eventbase + auto tf = std::move(sf).via(&e2).then([&](int a) { result = a; }); + p.setValue(7); + tf.getVia(&e2); + ASSERT_EQ(innerResult, 7); + ASSERT_EQ(result, 7); +} + +TEST(SemiFuture, MakeSemiFutureFromFutureWithTry) { + Promise p; + auto f = p.getFuture(); + auto sf = std::move(f).semi().defer([&](Try t) { + if (auto err = t.tryGetExceptionObject()) { + return Try(err->what()); + } + return Try( + make_exception_wrapper("Exception")); + }); + p.setException(make_exception_wrapper("Try")); + auto tryResult = std::move(sf).get(); + ASSERT_EQ(tryResult.value(), "Try"); +} -- 2.34.1