namespace folly { namespace detail {
-// As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
-// to functions, using a helper avoids a call to malloc.
-template<typename T>
-void empty_callback(Try<T>&&) { }
-
+/*
+ OnlyCallback
+ / \
+ Start Armed - Done
+ \ /
+ OnlyResult
+
+This state machine is fairly self-explanatory. The most important bit is
+that the callback is only executed on the transition from Armed to Done,
+and that transition can happen immediately after transitioning from Only*
+to Armed, if it is active (the usual case).
+*/
enum class State {
- Waiting,
- Interruptible,
- Interrupted,
+ Start,
+ OnlyResult,
+ OnlyCallback,
+ Armed,
Done,
};
-/** The shared state object for Future and Promise. */
+/// The shared state object for Future and Promise.
+/// Some methods must only be called by either the Future thread or the
+/// Promise thread. The Future thread is the thread that currently "owns" the
+/// Future and its callback-related operations, and the Promise thread is
+/// likewise the thread that currently "owns" the Promise and its
+/// result-related operations. Also, Futures own interruption, Promises own
+/// interrupt handlers. Unfortunately, there are things that users can do to
+/// break this, and we can't detect that. However if they follow move
+/// semantics religiously wrt threading, they should be ok.
+///
+/// It's worth pointing out that Futures and/or Promises can and usually will
+/// migrate between threads, though this usually happens within the API code.
+/// For example, an async operation will probably make a Promise, grab its
+/// Future, then move the Promise into another thread that will eventually
+/// fulfil it. With executors and via, this gets slightly more complicated at
+/// first blush, but it's the same principle. In general, as long as the user
+/// doesn't access a Future or Promise object from more than one thread at a
+/// time there won't be any problems.
template<typename T>
class Core : protected FSM<State> {
public:
- // This must be heap-constructed. There's probably a way to enforce that in
- // code but since this is just internal detail code and I don't know how
- // off-hand, I'm punting.
- Core() : FSM<State>(State::Waiting) {}
+ /// This must be heap-constructed. There's probably a way to enforce that in
+ /// code but since this is just internal detail code and I don't know how
+ /// off-hand, I'm punting.
+ Core() : FSM<State>(State::Start) {}
~Core() {
- assert(calledBack_);
assert(detached_ == 2);
}
Core(Core&&) noexcept = delete;
Core& operator=(Core&&) = delete;
+ /// May call from any thread
+ bool hasResult() const {
+ switch (getState()) {
+ case State::OnlyResult:
+ case State::Armed:
+ case State::Done:
+ assert(!!result_);
+ return true;
+
+ default:
+ return false;
+ }
+ }
+
+ /// May call from any thread
+ bool ready() const {
+ return hasResult();
+ }
+
+ /// May call from any thread
Try<T>& getTry() {
if (ready()) {
return *result_;
}
}
+ /// Call only from Future thread.
template <typename F>
void setCallback(F func) {
+ bool transitionToArmed = false;
auto setCallback_ = [&]{
- if (callback_) {
- throw std::logic_error("setCallback called twice");
- }
-
context_ = RequestContext::saveContext();
callback_ = std::move(func);
};
FSM_START
- case State::Waiting:
- case State::Interruptible:
- case State::Interrupted:
- FSM_UPDATE(state, setCallback_);
+ case State::Start:
+ FSM_UPDATE(State::OnlyCallback, setCallback_);
break;
- case State::Done:
- FSM_UPDATE2(State::Done,
- setCallback_,
- [&]{ maybeCallback(); });
+ case State::OnlyResult:
+ FSM_UPDATE(State::Armed, setCallback_);
+ transitionToArmed = true;
break;
+
+ case State::OnlyCallback:
+ case State::Armed:
+ case State::Done:
+ throw std::logic_error("setCallback called twice");
FSM_END
+
+ // we could always call this, it is an optimization to only call it when
+ // it might be needed.
+ if (transitionToArmed) {
+ maybeCallback();
+ }
}
+ /// Call only from Promise thread
void setResult(Try<T>&& t) {
+ bool transitionToArmed = false;
+ auto setResult_ = [&]{ result_ = std::move(t); };
FSM_START
- case State::Waiting:
- case State::Interruptible:
- case State::Interrupted:
- FSM_UPDATE2(State::Done,
- [&]{ result_ = std::move(t); },
- [&]{ maybeCallback(); });
+ case State::Start:
+ FSM_UPDATE(State::OnlyResult, setResult_);
+ break;
+
+ case State::OnlyCallback:
+ FSM_UPDATE(State::Armed, setResult_);
+ transitionToArmed = true;
break;
+ case State::OnlyResult:
+ case State::Armed:
case State::Done:
throw std::logic_error("setResult called twice");
FSM_END
- }
- bool ready() const {
- return getState() == State::Done;
+ if (transitionToArmed) {
+ maybeCallback();
+ }
}
- // Called by a destructing Future
+ /// Called by a destructing Future (in the Future thread, by definition)
void detachFuture() {
- if (!callback_) {
- setCallback(empty_callback<T>);
- }
activate();
detachOne();
}
- // Called by a destructing Promise
+ /// Called by a destructing Promise (in the Promise thread, by definition)
void detachPromise() {
- if (!ready()) {
+ // detachPromise() and setResult() should never be called in parallel
+ // so we don't need to protect this.
+ if (!result_) {
setResult(Try<T>(exception_wrapper(BrokenPromise())));
}
detachOne();
}
+ /// May call from any thread
void deactivate() {
active_ = false;
}
+ /// May call from any thread
void activate() {
active_ = true;
- if (ready()) {
- maybeCallback();
- }
+ maybeCallback();
}
+ /// May call from any thread
bool isActive() { return active_; }
+ /// Call only from Future thread
void setExecutor(Executor* x) {
executor_ = x;
}
- void raise(exception_wrapper const& e) {
- FSM_START
- case State::Interruptible:
- FSM_UPDATE2(State::Interrupted,
- [&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); },
- [&]{ interruptHandler_(*interrupt_); });
- break;
-
- case State::Waiting:
- case State::Interrupted:
- FSM_UPDATE(State::Interrupted,
- [&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); });
- break;
-
- case State::Done:
- FSM_BREAK
- FSM_END
+ /// Call only from Future thread
+ void raise(exception_wrapper e) {
+ std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+ if (!interrupt_ && !hasResult()) {
+ interrupt_ = std::move(e);
+ if (interruptHandler_) {
+ interruptHandler_(interrupt_);
+ }
+ }
}
+ /// Call only from Promise thread
void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
- FSM_START
- case State::Waiting:
- case State::Interruptible:
- FSM_UPDATE(State::Interruptible,
- [&]{ interruptHandler_ = std::move(fn); });
- break;
+ std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+ if (!hasResult()) {
+ if (!!interrupt_) {
+ fn(interrupt_);
+ } else {
+ interruptHandler_ = std::move(fn);
+ }
+ }
+ }
- case State::Interrupted:
- fn(*interrupt_);
+ private:
+ void maybeCallback() {
+ FSM_START
+ case State::Armed:
+ if (active_) {
+ FSM_UPDATE2(State::Done, []{}, std::bind(&Core::doCallback, this));
+ }
FSM_BREAK
- case State::Done:
+ default:
FSM_BREAK
FSM_END
}
- private:
- void maybeCallback() {
- assert(ready());
- if (isActive() && callback_) {
- if (!calledBack_.exchange(true)) {
- // TODO(5306911) we should probably try/catch
- Executor* x = executor_;
-
- RequestContext::setContext(context_);
- if (x) {
- MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
- MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
- x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
- } else {
- callback_(std::move(*result_));
- }
- }
+ void doCallback() {
+ // TODO(5306911) we should probably try/catch around the callback
+
+ RequestContext::setContext(context_);
+
+ // TODO(6115514) semantic race on reading executor_ and setExecutor()
+ Executor* x = executor_;
+ if (x) {
+ MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
+ MoveWrapper<Try<T>> val(std::move(*result_));
+ x->add([cb, val]() mutable { (*cb)(std::move(*val)); });
+ } else {
+ callback_(std::move(*result_));
}
}
assert(d >= 1);
assert(d <= 2);
if (d == 2) {
- // we should have already executed the callback with the value
- assert(calledBack_);
delete this;
}
}
folly::Optional<Try<T>> result_;
std::function<void(Try<T>&&)> callback_;
std::shared_ptr<RequestContext> context_{nullptr};
- std::atomic<bool> calledBack_ {false};
std::atomic<unsigned char> detached_ {0};
std::atomic<bool> active_ {true};
std::atomic<Executor*> executor_ {nullptr};
- std::unique_ptr<exception_wrapper> interrupt_;
+ exception_wrapper interrupt_;
std::function<void(exception_wrapper const&)> interruptHandler_;
+ folly::MicroSpinLock interruptLock_ {0};
};
template <typename... Ts>