From: Sven Over Date: Fri, 29 Jul 2016 11:45:35 +0000 (-0700) Subject: futures: fix behaviour when executors don't exec callback X-Git-Tag: v2016.07.29.00~2 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=9095cd537c3dc1acc1ee9577ae704308d780e2e4;p=folly.git futures: fix behaviour when executors don't exec callback Summary: When future callbacks are to be executed by an executor (via `via`) and the executor does not actually execute the callback function (for whatever reason), then waiting for the final future (the one returned by `via`) block forever. In case the callback function that got passed to the executor gets destroyed without being executed, the future should be set to a folly::BrokenPromise exception instead of remaining unset forever. This diff modifies the reference counting in folly::detail::Core to make sure the reference held by the callback function is properly removed not only after the callback gets executed, but also when the callback is destroyed without having been executed. Reviewed By: yfeldblum Differential Revision: D3455931 fbshipit-source-id: debb6f3563384a658d1e0149a4aadbbcb268938c --- diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index a6ac6109..7ef597a0 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -73,7 +73,7 @@ enum class State : uint8_t { /// doesn't access a Future or Promise object from more than one thread at a /// time there won't be any problems. template -class Core { +class Core final { static_assert(!std::is_void::value, "void futures are not supported. Use Unit instead."); public: @@ -300,7 +300,55 @@ class Core { interruptHandler_ = std::move(fn); } - protected: + private: + class CountedReference { + public: + ~CountedReference() { + if (core_) { + core_->detachOne(); + core_ = nullptr; + } + } + + explicit CountedReference(Core* core) noexcept : core_(core) { + // do not construct a CountedReference from nullptr! + DCHECK(core); + + ++core_->attached_; + } + + // CountedReference must be copy-constructable as long as + // folly::Executor::add takes a std::function + CountedReference(CountedReference const& o) noexcept : core_(o.core_) { + if (core_) { + ++core_->attached_; + } + } + + CountedReference& operator=(CountedReference const& o) noexcept { + ~CountedReference(); + new (this) CountedReference(o); + return *this; + } + + CountedReference(CountedReference&& o) noexcept { + std::swap(core_, o.core_); + } + + CountedReference& operator=(CountedReference&& o) noexcept { + ~CountedReference(); + new (this) CountedReference(std::move(o)); + return *this; + } + + Core* getCore() const noexcept { + return core_; + } + + private: + Core* core_{nullptr}; + }; + void maybeCallback() { FSM_START(fsm_) case State::Armed: @@ -326,35 +374,34 @@ class Core { executorLock_.unlock(); } - // keep Core alive until callback did its thing - ++attached_; - if (x) { try { if (LIKELY(x->getNumPriorities() == 1)) { - x->add([this]() mutable { - SCOPE_EXIT { detachOne(); }; - RequestContextScopeGuard rctx(context_); - SCOPE_EXIT { callback_ = {}; }; - callback_(std::move(*result_)); + x->add([core_ref = CountedReference(this)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + SCOPE_EXIT { core->callback_ = {}; }; + core->callback_(std::move(*core->result_)); }); } else { - x->addWithPriority([this]() mutable { - SCOPE_EXIT { detachOne(); }; - RequestContextScopeGuard rctx(context_); - SCOPE_EXIT { callback_ = {}; }; - callback_(std::move(*result_)); + x->addWithPriority([core_ref = CountedReference(this)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + SCOPE_EXIT { core->callback_ = {}; }; + core->callback_(std::move(*core->result_)); }, priority); } } catch (...) { - --attached_; // Account for extra ++attached_ before try + CountedReference core_ref(this); RequestContextScopeGuard rctx(context_); result_ = Try(exception_wrapper(std::current_exception())); SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); } } else { - SCOPE_EXIT { detachOne(); }; + CountedReference core_ref(this); RequestContextScopeGuard rctx(context_); SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); @@ -362,10 +409,9 @@ class Core { } void detachOne() { - auto a = --attached_; - assert(a >= 0); - assert(a <= 2); - if (a == 0) { + auto a = attached_--; + assert(a >= 1); + if (a == 1) { delete this; } } diff --git a/folly/futures/test/ExecutorTest.cpp b/folly/futures/test/ExecutorTest.cpp index 3e7e72e5..b11a8a6c 100644 --- a/folly/futures/test/ExecutorTest.cpp +++ b/folly/futures/test/ExecutorTest.cpp @@ -210,3 +210,30 @@ TEST(Executor, CrappyExecutor) { }); EXPECT_TRUE(flag); } + +class DoNothingExecutor : public Executor { + public: + void add(Func f) override { + storedFunc_ = std::move(f); + } + + private: + Func storedFunc_; +}; + +TEST(Executor, DoNothingExecutor) { + DoNothingExecutor x; + + // Submit future callback to DoNothingExecutor + auto f = folly::via(&x).then([] { return 42; }); + + // Callback function is stored in DoNothingExecutor, but not executed. + EXPECT_FALSE(f.isReady()); + + // Destroy the function stored in DoNothingExecutor. The future callback + // will never get executed. + x.add({}); + + EXPECT_TRUE(f.isReady()); + EXPECT_THROW(f.get(), folly::BrokenPromise); +} diff --git a/folly/futures/test/SelfDestructTest.cpp b/folly/futures/test/SelfDestructTest.cpp index 52842f44..68a75d09 100644 --- a/folly/futures/test/SelfDestructTest.cpp +++ b/folly/futures/test/SelfDestructTest.cpp @@ -17,6 +17,7 @@ #include #include +#include using namespace folly; @@ -27,12 +28,51 @@ TEST(SelfDestruct, then) { return x + 1; }); p->setValue(123); - EXPECT_EQ(future.get(), 124); + EXPECT_EQ(124, future.get()); } TEST(SelfDestruct, ensure) { auto* p = new Promise(); auto future = p->getFuture().ensure([p] { delete p; }); p->setValue(123); - EXPECT_EQ(future.get(), 123); + EXPECT_EQ(123, future.get()); +} + +class ThrowingExecutorError : public std::runtime_error { + public: + using std::runtime_error::runtime_error; +}; + +class ThrowingExecutor : public folly::Executor { + public: + void add(folly::Func) override { + throw ThrowingExecutorError("ThrowingExecutor::add"); + } +}; + +TEST(SelfDestruct, throwingExecutor) { + ThrowingExecutor executor; + auto* p = new Promise(); + auto future = + p->getFuture().via(&executor).onError([p](ThrowingExecutorError const&) { + delete p; + return 456; + }); + p->setValue(123); + EXPECT_EQ(456, future.get()); +} + +TEST(SelfDestruct, throwingInlineExecutor) { + folly::InlineExecutor executor; + + auto* p = new Promise(); + auto future = p->getFuture() + .via(&executor) + .then([p]() -> int { + delete p; + throw ThrowingExecutorError("callback throws"); + }) + .onError([](ThrowingExecutorError const&) { return 456; }); + p->setValue(123); + EXPECT_EQ(456, future.get()); }