From 173044e4ea34f4bd3fe927359f73c5649a02436f Mon Sep 17 00:00:00 2001 From: Hans Fugal Date: Fri, 30 Jan 2015 15:23:24 -0800 Subject: [PATCH] Rework the Future::Core state machine Summary: There was a race reading `callback_` in `maybeCallback` and setting `callback_` in `setCallback`. This diff reworks the state machine to make this unpossible. To avoid the explosion of states due to the cross-product of has-interrupt-handler/has-been-interrupted/etc. I introduce a separate lock for setting interrupt handler and interruption, since this is primarily orthogonal. Other attributes (active, for example) are still atomic variables, and while somewhat tied into the state machine logically (e.g. transitioning from Armed to Done only happens when active) they are mostly independent, keeping the state machine simple (and probably faster). I think it may even be possible to do some things cheaper. In some states, we may not need to protect the writing of `callback_` and `result_`. But we'd need to enforce some ordering so I'm not going to try to tackle that. But that could be some speedup if we can do it cheaply. Test Plan: Builds and all existing tests pass. Reviewed By: rockyliu4@fb.com Subscribers: yfeldblum, stepan, trunkagent, exa, folly-diffs@, jsedgwick FB internal diff: D1807854 Tasks: 6087856 Signature: t1:1807854:1422656713:25b62706cd7952b2dde06dab08074f8030db456b --- folly/futures/detail/Core.h | 234 ++++++++++++++++++++++-------------- 1 file changed, 143 insertions(+), 91 deletions(-) diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 0c39212d..c2bc1deb 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -34,28 +34,52 @@ namespace folly { namespace detail { -// As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers -// to functions, using a helper avoids a call to malloc. -template -void empty_callback(Try&&) { } - +/* + OnlyCallback + / \ + Start Armed - Done + \ / + OnlyResult + +This state machine is fairly self-explanatory. The most important bit is +that the callback is only executed on the transition from Armed to Done, +and that transition can happen immediately after transitioning from Only* +to Armed, if it is active (the usual case). +*/ enum class State { - Waiting, - Interruptible, - Interrupted, + Start, + OnlyResult, + OnlyCallback, + Armed, Done, }; -/** The shared state object for Future and Promise. */ +/// The shared state object for Future and Promise. +/// Some methods must only be called by either the Future thread or the +/// Promise thread. The Future thread is the thread that currently "owns" the +/// Future and its callback-related operations, and the Promise thread is +/// likewise the thread that currently "owns" the Promise and its +/// result-related operations. Also, Futures own interruption, Promises own +/// interrupt handlers. Unfortunately, there are things that users can do to +/// break this, and we can't detect that. However if they follow move +/// semantics religiously wrt threading, they should be ok. +/// +/// It's worth pointing out that Futures and/or Promises can and usually will +/// migrate between threads, though this usually happens within the API code. +/// For example, an async operation will probably make a Promise, grab its +/// Future, then move the Promise into another thread that will eventually +/// fulfil it. With executors and via, this gets slightly more complicated at +/// first blush, but it's the same principle. In general, as long as the user +/// doesn't access a Future or Promise object from more than one thread at a +/// time there won't be any problems. template class Core : protected FSM { public: - // This must be heap-constructed. There's probably a way to enforce that in - // code but since this is just internal detail code and I don't know how - // off-hand, I'm punting. - Core() : FSM(State::Waiting) {} + /// This must be heap-constructed. There's probably a way to enforce that in + /// code but since this is just internal detail code and I don't know how + /// off-hand, I'm punting. + Core() : FSM(State::Start) {} ~Core() { - assert(calledBack_); assert(detached_ == 2); } @@ -67,6 +91,26 @@ class Core : protected FSM { Core(Core&&) noexcept = delete; Core& operator=(Core&&) = delete; + /// May call from any thread + bool hasResult() const { + switch (getState()) { + case State::OnlyResult: + case State::Armed: + case State::Done: + assert(!!result_); + return true; + + default: + return false; + } + } + + /// May call from any thread + bool ready() const { + return hasResult(); + } + + /// May call from any thread Try& getTry() { if (ready()) { return *result_; @@ -75,138 +119,148 @@ class Core : protected FSM { } } + /// Call only from Future thread. template void setCallback(F func) { + bool transitionToArmed = false; auto setCallback_ = [&]{ - if (callback_) { - throw std::logic_error("setCallback called twice"); - } - context_ = RequestContext::saveContext(); callback_ = std::move(func); }; FSM_START - case State::Waiting: - case State::Interruptible: - case State::Interrupted: - FSM_UPDATE(state, setCallback_); + case State::Start: + FSM_UPDATE(State::OnlyCallback, setCallback_); break; - case State::Done: - FSM_UPDATE2(State::Done, - setCallback_, - [&]{ maybeCallback(); }); + case State::OnlyResult: + FSM_UPDATE(State::Armed, setCallback_); + transitionToArmed = true; break; + + case State::OnlyCallback: + case State::Armed: + case State::Done: + throw std::logic_error("setCallback called twice"); FSM_END + + // we could always call this, it is an optimization to only call it when + // it might be needed. + if (transitionToArmed) { + maybeCallback(); + } } + /// Call only from Promise thread void setResult(Try&& t) { + bool transitionToArmed = false; + auto setResult_ = [&]{ result_ = std::move(t); }; FSM_START - case State::Waiting: - case State::Interruptible: - case State::Interrupted: - FSM_UPDATE2(State::Done, - [&]{ result_ = std::move(t); }, - [&]{ maybeCallback(); }); + case State::Start: + FSM_UPDATE(State::OnlyResult, setResult_); + break; + + case State::OnlyCallback: + FSM_UPDATE(State::Armed, setResult_); + transitionToArmed = true; break; + case State::OnlyResult: + case State::Armed: case State::Done: throw std::logic_error("setResult called twice"); FSM_END - } - bool ready() const { - return getState() == State::Done; + if (transitionToArmed) { + maybeCallback(); + } } - // Called by a destructing Future + /// Called by a destructing Future (in the Future thread, by definition) void detachFuture() { - if (!callback_) { - setCallback(empty_callback); - } activate(); detachOne(); } - // Called by a destructing Promise + /// Called by a destructing Promise (in the Promise thread, by definition) void detachPromise() { - if (!ready()) { + // detachPromise() and setResult() should never be called in parallel + // so we don't need to protect this. + if (!result_) { setResult(Try(exception_wrapper(BrokenPromise()))); } detachOne(); } + /// May call from any thread void deactivate() { active_ = false; } + /// May call from any thread void activate() { active_ = true; - if (ready()) { - maybeCallback(); - } + maybeCallback(); } + /// May call from any thread bool isActive() { return active_; } + /// Call only from Future thread void setExecutor(Executor* x) { executor_ = x; } - void raise(exception_wrapper const& e) { - FSM_START - case State::Interruptible: - FSM_UPDATE2(State::Interrupted, - [&]{ interrupt_ = folly::make_unique(e); }, - [&]{ interruptHandler_(*interrupt_); }); - break; - - case State::Waiting: - case State::Interrupted: - FSM_UPDATE(State::Interrupted, - [&]{ interrupt_ = folly::make_unique(e); }); - break; - - case State::Done: - FSM_BREAK - FSM_END + /// Call only from Future thread + void raise(exception_wrapper e) { + std::lock_guard guard(interruptLock_); + if (!interrupt_ && !hasResult()) { + interrupt_ = std::move(e); + if (interruptHandler_) { + interruptHandler_(interrupt_); + } + } } + /// Call only from Promise thread void setInterruptHandler(std::function fn) { - FSM_START - case State::Waiting: - case State::Interruptible: - FSM_UPDATE(State::Interruptible, - [&]{ interruptHandler_ = std::move(fn); }); - break; + std::lock_guard guard(interruptLock_); + if (!hasResult()) { + if (!!interrupt_) { + fn(interrupt_); + } else { + interruptHandler_ = std::move(fn); + } + } + } - case State::Interrupted: - fn(*interrupt_); + private: + void maybeCallback() { + FSM_START + case State::Armed: + if (active_) { + FSM_UPDATE2(State::Done, []{}, std::bind(&Core::doCallback, this)); + } FSM_BREAK - case State::Done: + default: FSM_BREAK FSM_END } - private: - void maybeCallback() { - assert(ready()); - if (isActive() && callback_) { - if (!calledBack_.exchange(true)) { - // TODO(5306911) we should probably try/catch - Executor* x = executor_; - - RequestContext::setContext(context_); - if (x) { - MoveWrapper&&)>> cb(std::move(callback_)); - MoveWrapper>> val(std::move(result_)); - x->add([cb, val]() mutable { (*cb)(std::move(**val)); }); - } else { - callback_(std::move(*result_)); - } - } + void doCallback() { + // TODO(5306911) we should probably try/catch around the callback + + RequestContext::setContext(context_); + + // TODO(6115514) semantic race on reading executor_ and setExecutor() + Executor* x = executor_; + if (x) { + MoveWrapper&&)>> cb(std::move(callback_)); + MoveWrapper> val(std::move(*result_)); + x->add([cb, val]() mutable { (*cb)(std::move(*val)); }); + } else { + callback_(std::move(*result_)); } } @@ -215,8 +269,6 @@ class Core : protected FSM { assert(d >= 1); assert(d <= 2); if (d == 2) { - // we should have already executed the callback with the value - assert(calledBack_); delete this; } } @@ -224,12 +276,12 @@ class Core : protected FSM { folly::Optional> result_; std::function&&)> callback_; std::shared_ptr context_{nullptr}; - std::atomic calledBack_ {false}; std::atomic detached_ {0}; std::atomic active_ {true}; std::atomic executor_ {nullptr}; - std::unique_ptr interrupt_; + exception_wrapper interrupt_; std::function interruptHandler_; + folly::MicroSpinLock interruptLock_ {0}; }; template -- 2.34.1