/// off-hand, I'm punting.
Core() {}
~Core() {
- assert(detached_ == 2);
+ assert(attached_ == 0);
}
// not copyable
}
}
+ template <typename F>
+ class LambdaBufHelper {
+ public:
+ explicit LambdaBufHelper(F&& func) : func_(std::forward<F>(func)) {}
+ void operator()(Try<T>&& t) {
+ SCOPE_EXIT { this->~LambdaBufHelper(); };
+ func_(std::move(t));
+ }
+ private:
+ F func_;
+ };
+
/// Call only from Future thread.
template <typename F>
void setCallback(F func) {
bool transitionToArmed = false;
auto setCallback_ = [&]{
context_ = RequestContext::saveContext();
- callback_ = std::move(func);
+
+ // Move the lambda into the Core if it fits
+ if (sizeof(LambdaBufHelper<F>) <= lambdaBufSize) {
+ auto funcLoc = static_cast<LambdaBufHelper<F>*>((void*)lambdaBuf_);
+ new (funcLoc) LambdaBufHelper<F>(std::forward<F>(func));
+ callback_ = std::ref(*funcLoc);
+ } else {
+ callback_ = std::move(func);
+ }
};
FSM_START(fsm_)
// TODO(6115514) semantic race on reading executor_ and setExecutor()
Executor* x = executor_;
if (x) {
- MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
- MoveWrapper<Try<T>> val(std::move(*result_));
- x->add([cb, val]() mutable { (*cb)(std::move(*val)); });
+ ++attached_; // keep Core alive until executor did its thing
+ x->add([this]() mutable {
+ SCOPE_EXIT { detachOne(); };
+ callback_(std::move(*result_));
+ });
} else {
callback_(std::move(*result_));
}
}
void detachOne() {
- auto d = ++detached_;
- assert(d >= 1);
- assert(d <= 2);
- if (d == 2) {
+ auto a = --attached_;
+ assert(a >= 0);
+ assert(a <= 2);
+ if (a == 0) {
delete this;
}
}
FSM<State> fsm_ {State::Start};
- std::atomic<unsigned char> detached_ {0};
+ std::atomic<unsigned char> attached_ {2};
std::atomic<bool> active_ {true};
folly::MicroSpinLock interruptLock_ {0};
folly::Optional<Try<T>> result_ {};
std::function<void(Try<T>&&)> callback_ {nullptr};
+ static constexpr size_t lambdaBufSize = 8 * sizeof(void*);
+ char lambdaBuf_[lambdaBufSize];
std::shared_ptr<RequestContext> context_ {nullptr};
std::atomic<Executor*> executor_ {nullptr};
std::unique_ptr<exception_wrapper> interrupt_ {};
public:
explicit ThreadExecutor(size_t n = 1024)
- : funcs(n), worker(std::bind(&ThreadExecutor::work, this)) {}
+ : funcs(n) {
+ worker = std::thread(std::bind(&ThreadExecutor::work, this));
+ }
~ThreadExecutor() {
done = true;
TEST(Future, coreSize) {
// If this number goes down, it's fine!
// If it goes up, please seek professional advice ;-)
- EXPECT_EQ(128, sizeof(detail::Core<void>));
+ EXPECT_EQ(192, sizeof(detail::Core<void>));
}
// Future