save/restore request context in future
[folly.git] / folly / wangle / detail / Core.h
index f237c6190049b5fb20bbe625ef8e1adb28f87c7f..f83e4f71c3673497f41020f756ab7342234b9aff 100644 (file)
 #include <vector>
 
 #include <folly/Optional.h>
+#include <folly/SmallLocks.h>
 
 #include <folly/wangle/Try.h>
 #include <folly/wangle/Promise.h>
 #include <folly/wangle/Future.h>
 #include <folly/wangle/Executor.h>
+#include <folly/wangle/detail/FSM.h>
+
+#include <folly/io/async/Request.h>
 
 namespace folly { namespace wangle { namespace detail {
 
@@ -35,14 +39,21 @@ namespace folly { namespace wangle { namespace detail {
 template<typename T>
 void empty_callback(Try<T>&&) { }
 
+enum class State {
+  Waiting,
+  Interruptible,
+  Interrupted,
+  Done,
+};
+
 /** The shared state object for Future and Promise. */
 template<typename T>
-class Core {
+class Core : protected FSM<State> {
  public:
   // This must be heap-constructed. There's probably a way to enforce that in
   // code but since this is just internal detail code and I don't know how
   // off-hand, I'm punting.
-  Core() = default;
+  Core() : FSM<State>(State::Waiting) {}
   ~Core() {
     assert(calledBack_);
     assert(detached_ == 2);
@@ -57,57 +68,56 @@ class Core {
   Core& operator=(Core&&) = delete;
 
   Try<T>& getTry() {
-    return *value_;
+    if (ready()) {
+      return *result_;
+    } else {
+      throw FutureNotReady();
+    }
   }
 
   template <typename F>
   void setCallback(F func) {
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-
+    auto setCallback_ = [&]{
       if (callback_) {
         throw std::logic_error("setCallback called twice");
       }
 
+      context_ = RequestContext::saveContext();
       callback_ = std::move(func);
-    }
-
-    maybeCallback();
-  }
-
-  void fulfil(Try<T>&& t) {
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-
-      if (ready()) {
-        throw std::logic_error("fulfil called twice");
-      }
-
-      value_ = std::move(t);
-      assert(ready());
-    }
-
-    maybeCallback();
-  }
-
-  void setException(std::exception_ptr const& e) {
-    fulfil(Try<T>(e));
+    };
+
+    FSM_START
+      case State::Waiting:
+      case State::Interruptible:
+      case State::Interrupted:
+        FSM_UPDATE(state, setCallback_);
+        break;
+
+      case State::Done:
+        FSM_UPDATE2(State::Done,
+          setCallback_,
+          [&]{ maybeCallback(); });
+        break;
+    FSM_END
   }
 
-  template <class E> void setException(E const& e) {
-    fulfil(Try<T>(std::make_exception_ptr<E>(e)));
+  void setResult(Try<T>&& t) {
+    FSM_START
+      case State::Waiting:
+      case State::Interruptible:
+      case State::Interrupted:
+        FSM_UPDATE2(State::Done,
+          [&]{ result_ = std::move(t); },
+          [&]{ maybeCallback(); });
+        break;
+
+      case State::Done:
+        throw std::logic_error("setResult called twice");
+    FSM_END
   }
 
   bool ready() const {
-    return value_.hasValue();
-  }
-
-  typename std::add_lvalue_reference<T>::type value() {
-    if (ready()) {
-      return value_->value();
-    } else {
-      throw FutureNotReady();
-    }
+    return getState() == State::Done;
   }
 
   // Called by a destructing Future
@@ -122,77 +132,103 @@ class Core {
   // Called by a destructing Promise
   void detachPromise() {
     if (!ready()) {
-      setException(BrokenPromise());
+      setResult(Try<T>(std::make_exception_ptr(BrokenPromise())));
     }
     detachOne();
   }
 
   void deactivate() {
-    std::lock_guard<decltype(mutex_)> lock(mutex_);
     active_ = false;
   }
 
   void activate() {
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-      active_ = true;
+    active_ = true;
+    if (ready()) {
+      maybeCallback();
     }
-    maybeCallback();
   }
 
   bool isActive() { return active_; }
 
   void setExecutor(Executor* x) {
-    std::lock_guard<decltype(mutex_)> lock(mutex_);
     executor_ = x;
   }
 
+  void raise(std::exception_ptr const& e) {
+    FSM_START
+      case State::Interruptible:
+        FSM_UPDATE2(State::Interrupted,
+          [&]{ interrupt_ = e; },
+          [&]{ interruptHandler_(interrupt_); });
+        break;
+
+      case State::Waiting:
+      case State::Interrupted:
+        FSM_UPDATE(State::Interrupted,
+          [&]{ interrupt_ = e; });
+        break;
+
+      case State::Done:
+        FSM_BREAK
+    FSM_END
+  }
+
+  void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
+    FSM_START
+      case State::Waiting:
+      case State::Interruptible:
+        FSM_UPDATE(State::Interruptible,
+          [&]{ interruptHandler_ = std::move(fn); });
+        break;
+
+      case State::Interrupted:
+        fn(interrupt_);
+        FSM_BREAK
+
+      case State::Done:
+        FSM_BREAK
+    FSM_END
+  }
+
  private:
   void maybeCallback() {
-    std::unique_lock<decltype(mutex_)> lock(mutex_);
-    if (!calledBack_ &&
-        value_ && callback_ && isActive()) {
-      // TODO(5306911) we should probably try/catch here
-      if (executor_) {
-        MoveWrapper<folly::Optional<Try<T>>> val(std::move(value_));
+    assert(ready());
+    if (!calledBack_ && isActive() && callback_) {
+      // TODO(5306911) we should probably try/catch
+      calledBack_ = true;
+      Executor* x = executor_;
+
+      RequestContext::setContext(context_);
+      if (x) {
         MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
-        executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); });
-        calledBack_ = true;
+        MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+        x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
       } else {
-        calledBack_ = true;
-        lock.unlock();
-        callback_(std::move(*value_));
+        callback_(std::move(*result_));
       }
     }
   }
 
   void detachOne() {
-    bool shouldDelete;
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-      detached_++;
-      assert(detached_ == 1 || detached_ == 2);
-      shouldDelete = (detached_ == 2);
-    }
-
-    if (shouldDelete) {
+    auto d = ++detached_;
+    assert(d >= 1);
+    assert(d <= 2);
+    if (d == 2) {
       // we should have already executed the callback with the value
       assert(calledBack_);
       delete this;
     }
   }
 
-  folly::Optional<Try<T>> value_;
+  folly::Optional<Try<T>> result_;
   std::function<void(Try<T>&&)> callback_;
-  bool calledBack_ = false;
-  unsigned char detached_ = 0;
-  bool active_ = true;
-  Executor* executor_ = nullptr;
-
-  // this lock isn't meant to protect all accesses to members, only the ones
-  // that need to be threadsafe: the act of setting value_ and callback_, and
-  // seeing if they are set and whether we should then continue.
-  std::mutex mutex_;
+  std::shared_ptr<RequestContext> context_{nullptr};
+  std::atomic<bool> calledBack_ {false};
+  std::atomic<unsigned char> detached_ {0};
+  std::atomic<bool> active_ {true};
+  std::atomic<Executor*> executor_ {nullptr};
+  std::exception_ptr interrupt_;
+  std::function<void(std::exception_ptr const&)> interruptHandler_;
 };
 
 template <typename... Ts>