Rework the Future::Core state machine
authorHans Fugal <fugalh@fb.com>
Fri, 30 Jan 2015 23:23:24 +0000 (15:23 -0800)
committerwoo <woo@fb.com>
Mon, 2 Feb 2015 21:14:50 +0000 (13:14 -0800)
Summary:
There was a race reading `callback_` in `maybeCallback` and setting `callback_` in `setCallback`. This diff reworks the state machine to make this unpossible. To avoid the explosion of states due to the cross-product of has-interrupt-handler/has-been-interrupted/etc. I introduce a separate lock for setting interrupt handler and interruption, since this is primarily orthogonal. Other attributes (active, for example) are still atomic variables, and while somewhat tied into the state machine logically (e.g. transitioning from Armed to Done only happens when active) they are mostly independent, keeping the state machine simple (and probably faster).

I think it may even be possible to do some things cheaper. In some states, we may not need to protect the writing of `callback_` and `result_`. But we'd need to enforce some ordering so I'm not going to try to tackle that. But that could be some speedup if we can do it cheaply.

Test Plan:
Builds and all existing tests pass.

Reviewed By: rockyliu4@fb.com

Subscribers: yfeldblum, stepan, trunkagent, exa, folly-diffs@, jsedgwick

FB internal diff: D1807854

Tasks: 6087856

Signature: t1:1807854:1422656713:25b62706cd7952b2dde06dab08074f8030db456b

folly/futures/detail/Core.h

index 0c39212d6647278e2829da8521aed025ff4d9127..c2bc1deba7d17bfe82086bb20a60ccfdb712ee4c 100644 (file)
 
 namespace folly { namespace detail {
 
-// As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
-// to functions, using a helper avoids a call to malloc.
-template<typename T>
-void empty_callback(Try<T>&&) { }
-
+/*
+        OnlyCallback
+       /            \
+  Start              Armed - Done
+       \            /
+         OnlyResult
+
+This state machine is fairly self-explanatory. The most important bit is
+that the callback is only executed on the transition from Armed to Done,
+and that transition can happen immediately after transitioning from Only*
+to Armed, if it is active (the usual case).
+*/
 enum class State {
-  Waiting,
-  Interruptible,
-  Interrupted,
+  Start,
+  OnlyResult,
+  OnlyCallback,
+  Armed,
   Done,
 };
 
-/** The shared state object for Future and Promise. */
+/// The shared state object for Future and Promise.
+/// Some methods must only be called by either the Future thread or the
+/// Promise thread. The Future thread is the thread that currently "owns" the
+/// Future and its callback-related operations, and the Promise thread is
+/// likewise the thread that currently "owns" the Promise and its
+/// result-related operations. Also, Futures own interruption, Promises own
+/// interrupt handlers. Unfortunately, there are things that users can do to
+/// break this, and we can't detect that. However if they follow move
+/// semantics religiously wrt threading, they should be ok.
+///
+/// It's worth pointing out that Futures and/or Promises can and usually will
+/// migrate between threads, though this usually happens within the API code.
+/// For example, an async operation will probably make a Promise, grab its
+/// Future, then move the Promise into another thread that will eventually
+/// fulfil it. With executors and via, this gets slightly more complicated at
+/// first blush, but it's the same principle. In general, as long as the user
+/// doesn't access a Future or Promise object from more than one thread at a
+/// time there won't be any problems.
 template<typename T>
 class Core : protected FSM<State> {
  public:
-  // This must be heap-constructed. There's probably a way to enforce that in
-  // code but since this is just internal detail code and I don't know how
-  // off-hand, I'm punting.
-  Core() : FSM<State>(State::Waiting) {}
+  /// This must be heap-constructed. There's probably a way to enforce that in
+  /// code but since this is just internal detail code and I don't know how
+  /// off-hand, I'm punting.
+  Core() : FSM<State>(State::Start) {}
   ~Core() {
-    assert(calledBack_);
     assert(detached_ == 2);
   }
 
@@ -67,6 +91,26 @@ class Core : protected FSM<State> {
   Core(Core&&) noexcept = delete;
   Core& operator=(Core&&) = delete;
 
+  /// May call from any thread
+  bool hasResult() const {
+    switch (getState()) {
+      case State::OnlyResult:
+      case State::Armed:
+      case State::Done:
+        assert(!!result_);
+        return true;
+
+      default:
+        return false;
+    }
+  }
+
+  /// May call from any thread
+  bool ready() const {
+    return hasResult();
+  }
+
+  /// May call from any thread
   Try<T>& getTry() {
     if (ready()) {
       return *result_;
@@ -75,138 +119,148 @@ class Core : protected FSM<State> {
     }
   }
 
+  /// Call only from Future thread.
   template <typename F>
   void setCallback(F func) {
+    bool transitionToArmed = false;
     auto setCallback_ = [&]{
-      if (callback_) {
-        throw std::logic_error("setCallback called twice");
-      }
-
       context_ = RequestContext::saveContext();
       callback_ = std::move(func);
     };
 
     FSM_START
-      case State::Waiting:
-      case State::Interruptible:
-      case State::Interrupted:
-        FSM_UPDATE(state, setCallback_);
+      case State::Start:
+        FSM_UPDATE(State::OnlyCallback, setCallback_);
         break;
 
-      case State::Done:
-        FSM_UPDATE2(State::Done,
-          setCallback_,
-          [&]{ maybeCallback(); });
+      case State::OnlyResult:
+        FSM_UPDATE(State::Armed, setCallback_);
+        transitionToArmed = true;
         break;
+
+      case State::OnlyCallback:
+      case State::Armed:
+      case State::Done:
+        throw std::logic_error("setCallback called twice");
     FSM_END
+
+    // we could always call this, it is an optimization to only call it when
+    // it might be needed.
+    if (transitionToArmed) {
+      maybeCallback();
+    }
   }
 
+  /// Call only from Promise thread
   void setResult(Try<T>&& t) {
+    bool transitionToArmed = false;
+    auto setResult_ = [&]{ result_ = std::move(t); };
     FSM_START
-      case State::Waiting:
-      case State::Interruptible:
-      case State::Interrupted:
-        FSM_UPDATE2(State::Done,
-          [&]{ result_ = std::move(t); },
-          [&]{ maybeCallback(); });
+      case State::Start:
+        FSM_UPDATE(State::OnlyResult, setResult_);
+        break;
+
+      case State::OnlyCallback:
+        FSM_UPDATE(State::Armed, setResult_);
+        transitionToArmed = true;
         break;
 
+      case State::OnlyResult:
+      case State::Armed:
       case State::Done:
         throw std::logic_error("setResult called twice");
     FSM_END
-  }
 
-  bool ready() const {
-    return getState() == State::Done;
+    if (transitionToArmed) {
+      maybeCallback();
+    }
   }
 
-  // Called by a destructing Future
+  /// Called by a destructing Future (in the Future thread, by definition)
   void detachFuture() {
-    if (!callback_) {
-      setCallback(empty_callback<T>);
-    }
     activate();
     detachOne();
   }
 
-  // Called by a destructing Promise
+  /// Called by a destructing Promise (in the Promise thread, by definition)
   void detachPromise() {
-    if (!ready()) {
+    // detachPromise() and setResult() should never be called in parallel
+    // so we don't need to protect this.
+    if (!result_) {
       setResult(Try<T>(exception_wrapper(BrokenPromise())));
     }
     detachOne();
   }
 
+  /// May call from any thread
   void deactivate() {
     active_ = false;
   }
 
+  /// May call from any thread
   void activate() {
     active_ = true;
-    if (ready()) {
-      maybeCallback();
-    }
+    maybeCallback();
   }
 
+  /// May call from any thread
   bool isActive() { return active_; }
 
+  /// Call only from Future thread
   void setExecutor(Executor* x) {
     executor_ = x;
   }
 
-  void raise(exception_wrapper const& e) {
-    FSM_START
-      case State::Interruptible:
-        FSM_UPDATE2(State::Interrupted,
-          [&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); },
-          [&]{ interruptHandler_(*interrupt_); });
-        break;
-
-      case State::Waiting:
-      case State::Interrupted:
-        FSM_UPDATE(State::Interrupted,
-          [&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); });
-        break;
-
-      case State::Done:
-        FSM_BREAK
-    FSM_END
+  /// Call only from Future thread
+  void raise(exception_wrapper e) {
+    std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+    if (!interrupt_ && !hasResult()) {
+      interrupt_ = std::move(e);
+      if (interruptHandler_) {
+        interruptHandler_(interrupt_);
+      }
+    }
   }
 
+  /// Call only from Promise thread
   void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
-    FSM_START
-      case State::Waiting:
-      case State::Interruptible:
-        FSM_UPDATE(State::Interruptible,
-          [&]{ interruptHandler_ = std::move(fn); });
-        break;
+    std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+    if (!hasResult()) {
+      if (!!interrupt_) {
+        fn(interrupt_);
+      } else {
+        interruptHandler_ = std::move(fn);
+      }
+    }
+  }
 
-      case State::Interrupted:
-        fn(*interrupt_);
+ private:
+  void maybeCallback() {
+    FSM_START
+      case State::Armed:
+        if (active_) {
+          FSM_UPDATE2(State::Done, []{}, std::bind(&Core::doCallback, this));
+        }
         FSM_BREAK
 
-      case State::Done:
+      default:
         FSM_BREAK
     FSM_END
   }
 
- private:
-  void maybeCallback() {
-    assert(ready());
-    if (isActive() && callback_) {
-      if (!calledBack_.exchange(true)) {
-        // TODO(5306911) we should probably try/catch
-        Executor* x = executor_;
-
-        RequestContext::setContext(context_);
-        if (x) {
-          MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
-          MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
-          x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
-        } else {
-          callback_(std::move(*result_));
-        }
-      }
+  void doCallback() {
+    // TODO(5306911) we should probably try/catch around the callback
+
+    RequestContext::setContext(context_);
+
+    // TODO(6115514) semantic race on reading executor_ and setExecutor()
+    Executor* x = executor_;
+    if (x) {
+      MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
+      MoveWrapper<Try<T>> val(std::move(*result_));
+      x->add([cb, val]() mutable { (*cb)(std::move(*val)); });
+    } else {
+      callback_(std::move(*result_));
     }
   }
 
@@ -215,8 +269,6 @@ class Core : protected FSM<State> {
     assert(d >= 1);
     assert(d <= 2);
     if (d == 2) {
-      // we should have already executed the callback with the value
-      assert(calledBack_);
       delete this;
     }
   }
@@ -224,12 +276,12 @@ class Core : protected FSM<State> {
   folly::Optional<Try<T>> result_;
   std::function<void(Try<T>&&)> callback_;
   std::shared_ptr<RequestContext> context_{nullptr};
-  std::atomic<bool> calledBack_ {false};
   std::atomic<unsigned char> detached_ {0};
   std::atomic<bool> active_ {true};
   std::atomic<Executor*> executor_ {nullptr};
-  std::unique_ptr<exception_wrapper> interrupt_;
+  exception_wrapper interrupt_;
   std::function<void(exception_wrapper const&)> interruptHandler_;
+  folly::MicroSpinLock interruptLock_ {0};
 };
 
 template <typename... Ts>