#include <folly/wangle/Promise.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/Executor.h>
-#include <folly/wangle/detail/FSM.h>
namespace folly { namespace wangle { namespace detail {
template<typename T>
void empty_callback(Try<T>&&) { }
-enum class State {
- Waiting,
- Interruptible,
- Interrupted,
- Done,
-};
-
/** The shared state object for Future and Promise. */
template<typename T>
-class Core : protected FSM<State> {
+class Core {
public:
// This must be heap-constructed. There's probably a way to enforce that in
// code but since this is just internal detail code and I don't know how
// off-hand, I'm punting.
- Core() : FSM<State>(State::Waiting) {}
+ Core() = default;
~Core() {
assert(calledBack_);
assert(detached_ == 2);
template <typename F>
void setCallback(F func) {
- auto setCallback_ = [&]{
+ {
+ std::lock_guard<decltype(mutex_)> lock(mutex_);
+
if (callback_) {
throw std::logic_error("setCallback called twice");
}
callback_ = std::move(func);
- };
-
- FSM_START
- case State::Waiting:
- case State::Interruptible:
- case State::Interrupted:
- FSM_UPDATE(state, setCallback_);
- break;
-
- case State::Done:
- FSM_UPDATE2(State::Done,
- setCallback_,
- [&]{ maybeCallback(); });
- break;
- FSM_END
+ }
+
+ maybeCallback();
}
void setResult(Try<T>&& t) {
- FSM_START
- case State::Waiting:
- case State::Interruptible:
- case State::Interrupted:
- FSM_UPDATE2(State::Done,
- [&]{ result_ = std::move(t); },
- [&]{ maybeCallback(); });
- break;
-
- case State::Done:
+ {
+ std::lock_guard<decltype(mutex_)> lock(mutex_);
+
+ if (ready()) {
throw std::logic_error("setResult called twice");
- FSM_END
+ }
+
+ result_ = std::move(t);
+ assert(ready());
+ }
+
+ maybeCallback();
}
bool ready() const {
- return getState() == State::Done;
+ return result_.hasValue();
}
// Called by a destructing Future
}
void deactivate() {
+ std::lock_guard<decltype(mutex_)> lock(mutex_);
active_ = false;
}
void activate() {
- active_ = true;
- if (ready()) {
- maybeCallback();
+ {
+ std::lock_guard<decltype(mutex_)> lock(mutex_);
+ active_ = true;
}
+ maybeCallback();
}
bool isActive() { return active_; }
void setExecutor(Executor* x) {
+ std::lock_guard<decltype(mutex_)> lock(mutex_);
executor_ = x;
}
- void raise(std::exception_ptr const& e) {
- FSM_START
- case State::Interruptible:
- FSM_UPDATE2(State::Interrupted,
- [&]{ interrupt_ = e; },
- [&]{ interruptHandler_(interrupt_); });
- break;
-
- case State::Waiting:
- case State::Interrupted:
- FSM_UPDATE(State::Interrupted,
- [&]{ interrupt_ = e; });
- break;
-
- case State::Done:
- FSM_BREAK
- FSM_END
- }
-
- void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
- FSM_START
- case State::Waiting:
- case State::Interruptible:
- FSM_UPDATE(State::Interruptible,
- [&]{ interruptHandler_ = std::move(fn); });
- break;
-
- case State::Interrupted:
- fn(interrupt_);
- FSM_BREAK
-
- case State::Done:
- FSM_BREAK
- FSM_END
- }
-
private:
void maybeCallback() {
- assert(ready());
- if (!calledBack_ && isActive() && callback_) {
- // TODO(5306911) we should probably try/catch
- calledBack_ = true;
- Executor* x = executor_;
- if (x) {
- MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
+ std::unique_lock<decltype(mutex_)> lock(mutex_);
+ if (!calledBack_ &&
+ result_ && callback_ && isActive()) {
+ // TODO(5306911) we should probably try/catch here
+ if (executor_) {
MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
- x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
+ MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
+ executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); });
+ calledBack_ = true;
} else {
+ calledBack_ = true;
+ lock.unlock();
callback_(std::move(*result_));
}
}
}
void detachOne() {
- ++detached_;
- assert(detached_ == 1 || detached_ == 2);
- if (detached_ == 2) {
+ bool shouldDelete;
+ {
+ std::lock_guard<decltype(mutex_)> lock(mutex_);
+ detached_++;
+ assert(detached_ == 1 || detached_ == 2);
+ shouldDelete = (detached_ == 2);
+ }
+
+ if (shouldDelete) {
// we should have already executed the callback with the value
assert(calledBack_);
delete this;
folly::Optional<Try<T>> result_;
std::function<void(Try<T>&&)> callback_;
- std::atomic<bool> calledBack_ {false};
- std::atomic<unsigned char> detached_ {0};
- std::atomic<bool> active_ {true};
- std::atomic<Executor*> executor_ {nullptr};
- std::exception_ptr interrupt_;
- std::function<void(std::exception_ptr const&)> interruptHandler_;
+ bool calledBack_ = false;
+ unsigned char detached_ = 0;
+ bool active_ = true;
+ Executor* executor_ = nullptr;
+
+ // this lock isn't meant to protect all accesses to members, only the ones
+ // that need to be threadsafe: the act of setting result_ and callback_, and
+ // seeing if they are set and whether we should then continue.
+ folly::MicroSpinLock mutex_ {0};
};
template <typename... Ts>
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <gtest/gtest.h>
-
-#include <folly/wangle/Future.h>
-#include <folly/wangle/Promise.h>
-
-using namespace folly::wangle;
-
-TEST(Interrupts, raise) {
- std::runtime_error eggs("eggs");
- Promise<void> p;
- p.setInterruptHandler([&](std::exception_ptr e) {
- EXPECT_THROW(std::rethrow_exception(e), decltype(eggs));
- });
- p.getFuture().raise(eggs);
-}
-
-TEST(Interrupts, cancel) {
- Promise<void> p;
- p.setInterruptHandler([&](std::exception_ptr e) {
- EXPECT_THROW(std::rethrow_exception(e), FutureCancellation);
- });
- p.getFuture().cancel();
-}
-
-TEST(Interrupts, handleThenInterrupt) {
- Promise<int> p;
- bool flag = false;
- p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
- p.getFuture().cancel();
- EXPECT_TRUE(flag);
-}
-
-TEST(Interrupts, interruptThenHandle) {
- Promise<int> p;
- bool flag = false;
- p.getFuture().cancel();
- p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
- EXPECT_TRUE(flag);
-}
-
-TEST(Interrupts, interruptAfterFulfilNoop) {
- Promise<void> p;
- bool flag = false;
- p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
- p.setValue();
- p.getFuture().cancel();
- EXPECT_FALSE(flag);
-}
-
-TEST(Interrupts, secondInterruptNoop) {
- Promise<void> p;
- int count = 0;
- p.setInterruptHandler([&](std::exception_ptr e) { count++; });
- auto f = p.getFuture();
- f.cancel();
- f.cancel();
- EXPECT_EQ(1, count);
-}