#include <vector>
#include <folly/Optional.h>
+#include <folly/SmallLocks.h>
#include <folly/wangle/Try.h>
#include <folly/wangle/Promise.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/Executor.h>
+#include <folly/wangle/detail/FSM.h>
+
+#include <folly/io/async/Request.h>
namespace folly { namespace wangle { namespace detail {
template<typename T>
void empty_callback(Try<T>&&) { }
+enum class State {
+ Waiting,
+ Interruptible,
+ Interrupted,
+ Done,
+};
+
/** The shared state object for Future and Promise. */
template<typename T>
-class Core {
+class Core : protected FSM<State> {
public:
// This must be heap-constructed. There's probably a way to enforce that in
// code but since this is just internal detail code and I don't know how
// off-hand, I'm punting.
- Core() = default;
+ Core() : FSM<State>(State::Waiting) {}
~Core() {
assert(calledBack_);
assert(detached_ == 2);
Core& operator=(Core&&) = delete;
Try<T>& getTry() {
- return *value_;
+ if (ready()) {
+ return *result_;
+ } else {
+ throw FutureNotReady();
+ }
}
template <typename F>
void setCallback(F func) {
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
-
+ auto setCallback_ = [&]{
if (callback_) {
throw std::logic_error("setCallback called twice");
}
+ context_ = RequestContext::saveContext();
callback_ = std::move(func);
- }
-
- maybeCallback();
- }
-
- void fulfil(Try<T>&& t) {
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
-
- if (ready()) {
- throw std::logic_error("fulfil called twice");
- }
-
- value_ = std::move(t);
- assert(ready());
- }
-
- maybeCallback();
- }
-
- void setException(std::exception_ptr const& e) {
- fulfil(Try<T>(e));
+ };
+
+ FSM_START
+ case State::Waiting:
+ case State::Interruptible:
+ case State::Interrupted:
+ FSM_UPDATE(state, setCallback_);
+ break;
+
+ case State::Done:
+ FSM_UPDATE2(State::Done,
+ setCallback_,
+ [&]{ maybeCallback(); });
+ break;
+ FSM_END
}
- template <class E> void setException(E const& e) {
- fulfil(Try<T>(std::make_exception_ptr<E>(e)));
+ void setResult(Try<T>&& t) {
+ FSM_START
+ case State::Waiting:
+ case State::Interruptible:
+ case State::Interrupted:
+ FSM_UPDATE2(State::Done,
+ [&]{ result_ = std::move(t); },
+ [&]{ maybeCallback(); });
+ break;
+
+ case State::Done:
+ throw std::logic_error("setResult called twice");
+ FSM_END
}
bool ready() const {
- return value_.hasValue();
- }
-
- typename std::add_lvalue_reference<T>::type value() {
- if (ready()) {
- return value_->value();
- } else {
- throw FutureNotReady();
- }
+ return getState() == State::Done;
}
// Called by a destructing Future
// Called by a destructing Promise
void detachPromise() {
if (!ready()) {
- setException(BrokenPromise());
+ setResult(Try<T>(std::make_exception_ptr(BrokenPromise())));
}
detachOne();
}
void deactivate() {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
active_ = false;
}
void activate() {
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
- active_ = true;
+ active_ = true;
+ if (ready()) {
+ maybeCallback();
}
- maybeCallback();
}
bool isActive() { return active_; }
void setExecutor(Executor* x) {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
executor_ = x;
}
+ void raise(std::exception_ptr const& e) {
+ FSM_START
+ case State::Interruptible:
+ FSM_UPDATE2(State::Interrupted,
+ [&]{ interrupt_ = e; },
+ [&]{ interruptHandler_(interrupt_); });
+ break;
+
+ case State::Waiting:
+ case State::Interrupted:
+ FSM_UPDATE(State::Interrupted,
+ [&]{ interrupt_ = e; });
+ break;
+
+ case State::Done:
+ FSM_BREAK
+ FSM_END
+ }
+
+ void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
+ FSM_START
+ case State::Waiting:
+ case State::Interruptible:
+ FSM_UPDATE(State::Interruptible,
+ [&]{ interruptHandler_ = std::move(fn); });
+ break;
+
+ case State::Interrupted:
+ fn(interrupt_);
+ FSM_BREAK
+
+ case State::Done:
+ FSM_BREAK
+ FSM_END
+ }
+
private:
void maybeCallback() {
- std::unique_lock<decltype(mutex_)> lock(mutex_);
- if (!calledBack_ &&
- value_ && callback_ && isActive()) {
- // TODO(5306911) we should probably try/catch here
- if (executor_) {
- MoveWrapper<folly::Optional<Try<T>>> val(std::move(value_));
+ assert(ready());
+ if (!calledBack_ && isActive() && callback_) {
+ // TODO(5306911) we should probably try/catch
+ calledBack_ = true;
+ Executor* x = executor_;
+
+ RequestContext::setContext(context_);
+ if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
- executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); });
- calledBack_ = true;
+ MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+ x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
} else {
- calledBack_ = true;
- lock.unlock();
- callback_(std::move(*value_));
+ callback_(std::move(*result_));
}
}
}
void detachOne() {
- bool shouldDelete;
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
- detached_++;
- assert(detached_ == 1 || detached_ == 2);
- shouldDelete = (detached_ == 2);
- }
-
- if (shouldDelete) {
+ auto d = ++detached_;
+ assert(d >= 1);
+ assert(d <= 2);
+ if (d == 2) {
// we should have already executed the callback with the value
assert(calledBack_);
delete this;
}
}
- folly::Optional<Try<T>> value_;
+ folly::Optional<Try<T>> result_;
std::function<void(Try<T>&&)> callback_;
- bool calledBack_ = false;
- unsigned char detached_ = 0;
- bool active_ = true;
- Executor* executor_ = nullptr;
-
- // this lock isn't meant to protect all accesses to members, only the ones
- // that need to be threadsafe: the act of setting value_ and callback_, and
- // seeing if they are set and whether we should then continue.
- std::mutex mutex_;
+ std::shared_ptr<RequestContext> context_{nullptr};
+ std::atomic<bool> calledBack_ {false};
+ std::atomic<unsigned char> detached_ {0};
+ std::atomic<bool> active_ {true};
+ std::atomic<Executor*> executor_ {nullptr};
+ std::exception_ptr interrupt_;
+ std::function<void(std::exception_ptr const&)> interruptHandler_;
};
template <typename... Ts>