futures: fix behaviour when executors don't exec callback
authorSven Over <over@fb.com>
Fri, 29 Jul 2016 11:45:35 +0000 (04:45 -0700)
committerFacebook Github Bot 0 <facebook-github-bot-0-bot@fb.com>
Fri, 29 Jul 2016 11:53:24 +0000 (04:53 -0700)
Summary:
When future callbacks are to be executed by an executor (via `via`)
and the executor does not actually execute the callback function
(for whatever reason), then waiting for the final future (the one
returned by `via`) block forever. In case the callback function
that got passed to the executor gets destroyed without being executed,
the future should be set to a folly::BrokenPromise exception instead
of remaining unset forever.

This diff modifies the reference counting in folly::detail::Core
to make sure the reference held by the callback function is
properly removed not only after the callback gets executed, but
also when the callback is destroyed without having been executed.

Reviewed By: yfeldblum

Differential Revision: D3455931

fbshipit-source-id: debb6f3563384a658d1e0149a4aadbbcb268938c

folly/futures/detail/Core.h
folly/futures/test/ExecutorTest.cpp
folly/futures/test/SelfDestructTest.cpp

index a6ac6109a643da5549144b9986e838267a26e4c6..7ef597a096715e0784867bcf1ef132a5477d6036 100644 (file)
@@ -73,7 +73,7 @@ enum class State : uint8_t {
 /// doesn't access a Future or Promise object from more than one thread at a
 /// time there won't be any problems.
 template<typename T>
-class Core {
+class Core final {
   static_assert(!std::is_void<T>::value,
                 "void futures are not supported. Use Unit instead.");
  public:
@@ -300,7 +300,55 @@ class Core {
     interruptHandler_ = std::move(fn);
   }
 
- protected:
+ private:
+  class CountedReference {
+   public:
+    ~CountedReference() {
+      if (core_) {
+        core_->detachOne();
+        core_ = nullptr;
+      }
+    }
+
+    explicit CountedReference(Core* core) noexcept : core_(core) {
+      // do not construct a CountedReference from nullptr!
+      DCHECK(core);
+
+      ++core_->attached_;
+    }
+
+    // CountedReference must be copy-constructable as long as
+    // folly::Executor::add takes a std::function
+    CountedReference(CountedReference const& o) noexcept : core_(o.core_) {
+      if (core_) {
+        ++core_->attached_;
+      }
+    }
+
+    CountedReference& operator=(CountedReference const& o) noexcept {
+      ~CountedReference();
+      new (this) CountedReference(o);
+      return *this;
+    }
+
+    CountedReference(CountedReference&& o) noexcept {
+      std::swap(core_, o.core_);
+    }
+
+    CountedReference& operator=(CountedReference&& o) noexcept {
+      ~CountedReference();
+      new (this) CountedReference(std::move(o));
+      return *this;
+    }
+
+    Core* getCore() const noexcept {
+      return core_;
+    }
+
+   private:
+    Core* core_{nullptr};
+  };
+
   void maybeCallback() {
     FSM_START(fsm_)
       case State::Armed:
@@ -326,35 +374,34 @@ class Core {
       executorLock_.unlock();
     }
 
-    // keep Core alive until callback did its thing
-    ++attached_;
-
     if (x) {
       try {
         if (LIKELY(x->getNumPriorities() == 1)) {
-          x->add([this]() mutable {
-            SCOPE_EXIT { detachOne(); };
-            RequestContextScopeGuard rctx(context_);
-            SCOPE_EXIT { callback_ = {}; };
-            callback_(std::move(*result_));
+          x->add([core_ref = CountedReference(this)]() mutable {
+            auto cr = std::move(core_ref);
+            Core* const core = cr.getCore();
+            RequestContextScopeGuard rctx(core->context_);
+            SCOPE_EXIT { core->callback_ = {}; };
+            core->callback_(std::move(*core->result_));
           });
         } else {
-          x->addWithPriority([this]() mutable {
-            SCOPE_EXIT { detachOne(); };
-            RequestContextScopeGuard rctx(context_);
-            SCOPE_EXIT { callback_ = {}; };
-            callback_(std::move(*result_));
+          x->addWithPriority([core_ref = CountedReference(this)]() mutable {
+            auto cr = std::move(core_ref);
+            Core* const core = cr.getCore();
+            RequestContextScopeGuard rctx(core->context_);
+            SCOPE_EXIT { core->callback_ = {}; };
+            core->callback_(std::move(*core->result_));
           }, priority);
         }
       } catch (...) {
-        --attached_; // Account for extra ++attached_ before try
+        CountedReference core_ref(this);
         RequestContextScopeGuard rctx(context_);
         result_ = Try<T>(exception_wrapper(std::current_exception()));
         SCOPE_EXIT { callback_ = {}; };
         callback_(std::move(*result_));
       }
     } else {
-      SCOPE_EXIT { detachOne(); };
+      CountedReference core_ref(this);
       RequestContextScopeGuard rctx(context_);
       SCOPE_EXIT { callback_ = {}; };
       callback_(std::move(*result_));
@@ -362,10 +409,9 @@ class Core {
   }
 
   void detachOne() {
-    auto a = --attached_;
-    assert(a >= 0);
-    assert(a <= 2);
-    if (a == 0) {
+    auto a = attached_--;
+    assert(a >= 1);
+    if (a == 1) {
       delete this;
     }
   }
index 3e7e72e593f79955cc3389e201c25e86b468d82e..b11a8a6c6dfa624b0442b603d03307839e392ffd 100644 (file)
@@ -210,3 +210,30 @@ TEST(Executor, CrappyExecutor) {
   });
   EXPECT_TRUE(flag);
 }
+
+class DoNothingExecutor : public Executor {
+ public:
+  void add(Func f) override {
+    storedFunc_ = std::move(f);
+  }
+
+ private:
+  Func storedFunc_;
+};
+
+TEST(Executor, DoNothingExecutor) {
+  DoNothingExecutor x;
+
+  // Submit future callback to DoNothingExecutor
+  auto f = folly::via(&x).then([] { return 42; });
+
+  // Callback function is stored in DoNothingExecutor, but not executed.
+  EXPECT_FALSE(f.isReady());
+
+  // Destroy the function stored in DoNothingExecutor. The future callback
+  // will never get executed.
+  x.add({});
+
+  EXPECT_TRUE(f.isReady());
+  EXPECT_THROW(f.get(), folly::BrokenPromise);
+}
index 52842f44ea3611d9c8b77cfb18f3a9559e5a4c0b..68a75d09967bca243904620c71caa694844a01f6 100644 (file)
@@ -17,6 +17,7 @@
 #include <gtest/gtest.h>
 
 #include <folly/futures/Future.h>
+#include <folly/futures/InlineExecutor.h>
 
 using namespace folly;
 
@@ -27,12 +28,51 @@ TEST(SelfDestruct, then) {
     return x + 1;
   });
   p->setValue(123);
-  EXPECT_EQ(future.get(), 124);
+  EXPECT_EQ(124, future.get());
 }
 
 TEST(SelfDestruct, ensure) {
   auto* p = new Promise<int>();
   auto future = p->getFuture().ensure([p] { delete p; });
   p->setValue(123);
-  EXPECT_EQ(future.get(), 123);
+  EXPECT_EQ(123, future.get());
+}
+
+class ThrowingExecutorError : public std::runtime_error {
+ public:
+  using std::runtime_error::runtime_error;
+};
+
+class ThrowingExecutor : public folly::Executor {
+ public:
+  void add(folly::Func) override {
+    throw ThrowingExecutorError("ThrowingExecutor::add");
+  }
+};
+
+TEST(SelfDestruct, throwingExecutor) {
+  ThrowingExecutor executor;
+  auto* p = new Promise<int>();
+  auto future =
+      p->getFuture().via(&executor).onError([p](ThrowingExecutorError const&) {
+        delete p;
+        return 456;
+      });
+  p->setValue(123);
+  EXPECT_EQ(456, future.get());
+}
+
+TEST(SelfDestruct, throwingInlineExecutor) {
+  folly::InlineExecutor executor;
+
+  auto* p = new Promise<int>();
+  auto future = p->getFuture()
+                    .via(&executor)
+                    .then([p]() -> int {
+                      delete p;
+                      throw ThrowingExecutorError("callback throws");
+                    })
+                    .onError([](ThrowingExecutorError const&) { return 456; });
+  p->setValue(123);
+  EXPECT_EQ(456, future.get());
 }