#include <folly/wangle/Promise.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/Executor.h>
+#include <folly/wangle/detail/FSM.h>
namespace folly { namespace wangle { namespace detail {
template<typename T>
void empty_callback(Try<T>&&) { }
+enum class State {
+ Waiting,
+ Interruptible,
+ Interrupted,
+ Done,
+};
+
/** The shared state object for Future and Promise. */
template<typename T>
-class Core {
+class Core : protected FSM<State> {
public:
// This must be heap-constructed. There's probably a way to enforce that in
// code but since this is just internal detail code and I don't know how
// off-hand, I'm punting.
- Core() = default;
+ Core() : FSM<State>(State::Waiting) {}
~Core() {
assert(calledBack_);
assert(detached_ == 2);
template <typename F>
void setCallback(F func) {
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
-
+ auto setCallback_ = [&]{
if (callback_) {
throw std::logic_error("setCallback called twice");
}
callback_ = std::move(func);
- }
-
- maybeCallback();
+ };
+
+ FSM_START
+ case State::Waiting:
+ case State::Interruptible:
+ case State::Interrupted:
+ FSM_UPDATE(state, setCallback_);
+ break;
+
+ case State::Done:
+ FSM_UPDATE2(State::Done,
+ setCallback_,
+ [&]{ maybeCallback(); });
+ break;
+ FSM_END
}
void setResult(Try<T>&& t) {
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
-
- if (ready()) {
+ FSM_START
+ case State::Waiting:
+ case State::Interruptible:
+ case State::Interrupted:
+ FSM_UPDATE2(State::Done,
+ [&]{ result_ = std::move(t); },
+ [&]{ maybeCallback(); });
+ break;
+
+ case State::Done:
throw std::logic_error("setResult called twice");
- }
-
- result_ = std::move(t);
- assert(ready());
- }
-
- maybeCallback();
+ FSM_END
}
bool ready() const {
- return result_.hasValue();
+ return getState() == State::Done;
}
// Called by a destructing Future
}
void deactivate() {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
active_ = false;
}
void activate() {
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
- active_ = true;
+ active_ = true;
+ if (ready()) {
+ maybeCallback();
}
- maybeCallback();
}
bool isActive() { return active_; }
void setExecutor(Executor* x) {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
executor_ = x;
}
+ void raise(std::exception_ptr const& e) {
+ FSM_START
+ case State::Interruptible:
+ FSM_UPDATE2(State::Interrupted,
+ [&]{ interrupt_ = e; },
+ [&]{ interruptHandler_(interrupt_); });
+ break;
+
+ case State::Waiting:
+ case State::Interrupted:
+ FSM_UPDATE(State::Interrupted,
+ [&]{ interrupt_ = e; });
+ break;
+
+ case State::Done:
+ FSM_BREAK
+ FSM_END
+ }
+
+ void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
+ FSM_START
+ case State::Waiting:
+ case State::Interruptible:
+ FSM_UPDATE(State::Interruptible,
+ [&]{ interruptHandler_ = std::move(fn); });
+ break;
+
+ case State::Interrupted:
+ fn(interrupt_);
+ FSM_BREAK
+
+ case State::Done:
+ FSM_BREAK
+ FSM_END
+ }
+
private:
void maybeCallback() {
- std::unique_lock<decltype(mutex_)> lock(mutex_);
- if (!calledBack_ &&
- result_ && callback_ && isActive()) {
- // TODO(5306911) we should probably try/catch here
- if (executor_) {
- MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+ assert(ready());
+ if (!calledBack_ && isActive() && callback_) {
+ // TODO(5306911) we should probably try/catch
+ calledBack_ = true;
+ Executor* x = executor_;
+ if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
- executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); });
- calledBack_ = true;
+ MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+ x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
} else {
- calledBack_ = true;
- lock.unlock();
callback_(std::move(*result_));
}
}
}
void detachOne() {
- bool shouldDelete;
- {
- std::lock_guard<decltype(mutex_)> lock(mutex_);
- detached_++;
- assert(detached_ == 1 || detached_ == 2);
- shouldDelete = (detached_ == 2);
- }
-
- if (shouldDelete) {
+ ++detached_;
+ assert(detached_ == 1 || detached_ == 2);
+ if (detached_ == 2) {
// we should have already executed the callback with the value
assert(calledBack_);
delete this;
folly::Optional<Try<T>> result_;
std::function<void(Try<T>&&)> callback_;
- bool calledBack_ = false;
- unsigned char detached_ = 0;
- bool active_ = true;
- Executor* executor_ = nullptr;
-
- // this lock isn't meant to protect all accesses to members, only the ones
- // that need to be threadsafe: the act of setting result_ and callback_, and
- // seeing if they are set and whether we should then continue.
- folly::MicroSpinLock mutex_ {0};
+ std::atomic<bool> calledBack_ {false};
+ std::atomic<unsigned char> detached_ {0};
+ std::atomic<bool> active_ {true};
+ std::atomic<Executor*> executor_ {nullptr};
+ std::exception_ptr interrupt_;
+ std::function<void(std::exception_ptr const&)> interruptHandler_;
};
template <typename... Ts>
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <folly/wangle/Future.h>
+#include <folly/wangle/Promise.h>
+
+using namespace folly::wangle;
+
+TEST(Interrupts, raise) {
+ std::runtime_error eggs("eggs");
+ Promise<void> p;
+ p.setInterruptHandler([&](std::exception_ptr e) {
+ EXPECT_THROW(std::rethrow_exception(e), decltype(eggs));
+ });
+ p.getFuture().raise(eggs);
+}
+
+TEST(Interrupts, cancel) {
+ Promise<void> p;
+ p.setInterruptHandler([&](std::exception_ptr e) {
+ EXPECT_THROW(std::rethrow_exception(e), FutureCancellation);
+ });
+ p.getFuture().cancel();
+}
+
+TEST(Interrupts, handleThenInterrupt) {
+ Promise<int> p;
+ bool flag = false;
+ p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+ p.getFuture().cancel();
+ EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptThenHandle) {
+ Promise<int> p;
+ bool flag = false;
+ p.getFuture().cancel();
+ p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+ EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptAfterFulfilNoop) {
+ Promise<void> p;
+ bool flag = false;
+ p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+ p.setValue();
+ p.getFuture().cancel();
+ EXPECT_FALSE(flag);
+}
+
+TEST(Interrupts, secondInterruptNoop) {
+ Promise<void> p;
+ int count = 0;
+ p.setInterruptHandler([&](std::exception_ptr e) { count++; });
+ auto f = p.getFuture();
+ f.cancel();
+ f.cancel();
+ EXPECT_EQ(1, count);
+}