From 3d6fc64fce1e416b36a792f540d51306b08238e4 Mon Sep 17 00:00:00 2001 From: Hans Fugal Date: Mon, 27 Oct 2014 08:53:20 -0700 Subject: [PATCH] unrevert "(wangle) express current Core functionality with a state machine"" Summary: Reverts D1633874. Companion to D1636490 which fixes the bug. Test Plan: git reverting code that was git reverted and hasn't changed in the interim Won't be checked in without the companion bugfix diff (D1636490) Reviewed By: davejwatson@fb.com Subscribers: trunkagent, net-systems@, fugalh, exa, njormrod, folly-diffs@ FB internal diff: D1636487 Tasks: 5438209 Blame Revision: D1633874 --- folly/wangle/Future-inl.h | 5 ++ folly/wangle/Future.h | 19 ++++ folly/wangle/Promise-inl.h | 6 ++ folly/wangle/Promise.h | 7 ++ folly/wangle/WangleException.h | 5 ++ folly/wangle/detail/Core.h | 148 ++++++++++++++++++++----------- folly/wangle/test/Interrupts.cpp | 74 ++++++++++++++++ 7 files changed, 210 insertions(+), 54 deletions(-) create mode 100644 folly/wangle/test/Interrupts.cpp diff --git a/folly/wangle/Future-inl.h b/folly/wangle/Future-inl.h index e3119cf5..f462189c 100644 --- a/folly/wangle/Future-inl.h +++ b/folly/wangle/Future-inl.h @@ -206,6 +206,11 @@ bool Future::isReady() const { return core_->ready(); } +template +void Future::raise(std::exception_ptr exception) { + core_->raise(exception); +} + // makeFuture template diff --git a/folly/wangle/Future.h b/folly/wangle/Future.h index bcd4c694..f3d50ce0 100644 --- a/folly/wangle/Future.h +++ b/folly/wangle/Future.h @@ -214,6 +214,25 @@ class Future { return core_->isActive(); } + template + void raise(E&& exception) { + raise(std::make_exception_ptr(std::forward(exception))); + } + + /// Raise an interrupt. If the promise holder has an interrupt + /// handler it will be called and potentially stop asynchronous work from + /// being done. This is advisory only - a promise holder may not set an + /// interrupt handler, or may do anything including ignore. But, if you know + /// your future supports this the most likely result is stopping or + /// preventing the asynchronous operation (if in time), and the promise + /// holder setting an exception on the future. (That may happen + /// asynchronously, of course.) + void raise(std::exception_ptr interrupt); + + void cancel() { + raise(FutureCancellation()); + } + private: typedef detail::Core* corePtr; diff --git a/folly/wangle/Promise-inl.h b/folly/wangle/Promise-inl.h index efb8edfa..68d69641 100644 --- a/folly/wangle/Promise-inl.h +++ b/folly/wangle/Promise-inl.h @@ -88,6 +88,12 @@ void Promise::setException(std::exception_ptr const& e) { core_->setResult(Try(e)); } +template +void Promise::setInterruptHandler( + std::function fn) { + core_->setInterruptHandler(std::move(fn)); +} + template void Promise::fulfilTry(Try&& t) { throwIfFulfilled(); diff --git a/folly/wangle/Promise.h b/folly/wangle/Promise.h index 404bfb54..7442e451 100644 --- a/folly/wangle/Promise.h +++ b/folly/wangle/Promise.h @@ -56,6 +56,13 @@ public: */ template void setException(E const&); + /// Set an interrupt handler to handle interrupts. See the documentation for + /// Future::raise(). Your handler can do whatever it wants, but if you + /// bother to set one then you probably will want to fulfil the promise with + /// an exception (or special value) indicating how the interrupt was + /// handled. + void setInterruptHandler(std::function); + /** Fulfil this Promise (only for Promise) */ void setValue(); diff --git a/folly/wangle/WangleException.h b/folly/wangle/WangleException.h index aec22970..bddf86c7 100644 --- a/folly/wangle/WangleException.h +++ b/folly/wangle/WangleException.h @@ -81,4 +81,9 @@ class UsingUninitializedTry : public WangleException { WangleException("Using unitialized try") { } }; +class FutureCancellation : public WangleException { + public: + FutureCancellation() : WangleException("Future was cancelled") {} +}; + }} diff --git a/folly/wangle/detail/Core.h b/folly/wangle/detail/Core.h index 1cfdcc13..9b9295d8 100644 --- a/folly/wangle/detail/Core.h +++ b/folly/wangle/detail/Core.h @@ -28,6 +28,7 @@ #include #include #include +#include namespace folly { namespace wangle { namespace detail { @@ -36,14 +37,21 @@ namespace folly { namespace wangle { namespace detail { template void empty_callback(Try&&) { } +enum class State { + Waiting, + Interruptible, + Interrupted, + Done, +}; + /** The shared state object for Future and Promise. */ template -class Core { +class Core : protected FSM { public: // This must be heap-constructed. There's probably a way to enforce that in // code but since this is just internal detail code and I don't know how // off-hand, I'm punting. - Core() = default; + Core() : FSM(State::Waiting) {} ~Core() { assert(calledBack_); assert(detached_ == 2); @@ -67,36 +75,46 @@ class Core { template void setCallback(F func) { - { - std::lock_guard lock(mutex_); - + auto setCallback_ = [&]{ if (callback_) { throw std::logic_error("setCallback called twice"); } callback_ = std::move(func); - } - - maybeCallback(); + }; + + FSM_START + case State::Waiting: + case State::Interruptible: + case State::Interrupted: + FSM_UPDATE(state, setCallback_); + break; + + case State::Done: + FSM_UPDATE2(State::Done, + setCallback_, + [&]{ maybeCallback(); }); + break; + FSM_END } void setResult(Try&& t) { - { - std::lock_guard lock(mutex_); - - if (ready()) { + FSM_START + case State::Waiting: + case State::Interruptible: + case State::Interrupted: + FSM_UPDATE2(State::Done, + [&]{ result_ = std::move(t); }, + [&]{ maybeCallback(); }); + break; + + case State::Done: throw std::logic_error("setResult called twice"); - } - - result_ = std::move(t); - assert(ready()); - } - - maybeCallback(); + FSM_END } bool ready() const { - return result_.hasValue(); + return getState() == State::Done; } // Called by a destructing Future @@ -117,54 +135,79 @@ class Core { } void deactivate() { - std::lock_guard lock(mutex_); active_ = false; } void activate() { - { - std::lock_guard lock(mutex_); - active_ = true; + active_ = true; + if (ready()) { + maybeCallback(); } - maybeCallback(); } bool isActive() { return active_; } void setExecutor(Executor* x) { - std::lock_guard lock(mutex_); executor_ = x; } + void raise(std::exception_ptr const& e) { + FSM_START + case State::Interruptible: + FSM_UPDATE2(State::Interrupted, + [&]{ interrupt_ = e; }, + [&]{ interruptHandler_(interrupt_); }); + break; + + case State::Waiting: + case State::Interrupted: + FSM_UPDATE(State::Interrupted, + [&]{ interrupt_ = e; }); + break; + + case State::Done: + FSM_BREAK + FSM_END + } + + void setInterruptHandler(std::function fn) { + FSM_START + case State::Waiting: + case State::Interruptible: + FSM_UPDATE(State::Interruptible, + [&]{ interruptHandler_ = std::move(fn); }); + break; + + case State::Interrupted: + fn(interrupt_); + FSM_BREAK + + case State::Done: + FSM_BREAK + FSM_END + } + private: void maybeCallback() { - std::unique_lock lock(mutex_); - if (!calledBack_ && - result_ && callback_ && isActive()) { - // TODO(5306911) we should probably try/catch here - if (executor_) { - MoveWrapper>> val(std::move(result_)); + assert(ready()); + if (!calledBack_ && isActive() && callback_) { + // TODO(5306911) we should probably try/catch + calledBack_ = true; + Executor* x = executor_; + if (x) { MoveWrapper&&)>> cb(std::move(callback_)); - executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); }); - calledBack_ = true; + MoveWrapper>> val(std::move(result_)); + x->add([cb, val]() mutable { (*cb)(std::move(**val)); }); } else { - calledBack_ = true; - lock.unlock(); callback_(std::move(*result_)); } } } void detachOne() { - bool shouldDelete; - { - std::lock_guard lock(mutex_); - detached_++; - assert(detached_ == 1 || detached_ == 2); - shouldDelete = (detached_ == 2); - } - - if (shouldDelete) { + ++detached_; + assert(detached_ == 1 || detached_ == 2); + if (detached_ == 2) { // we should have already executed the callback with the value assert(calledBack_); delete this; @@ -173,15 +216,12 @@ class Core { folly::Optional> result_; std::function&&)> callback_; - bool calledBack_ = false; - unsigned char detached_ = 0; - bool active_ = true; - Executor* executor_ = nullptr; - - // this lock isn't meant to protect all accesses to members, only the ones - // that need to be threadsafe: the act of setting result_ and callback_, and - // seeing if they are set and whether we should then continue. - folly::MicroSpinLock mutex_ {0}; + std::atomic calledBack_ {false}; + std::atomic detached_ {0}; + std::atomic active_ {true}; + std::atomic executor_ {nullptr}; + std::exception_ptr interrupt_; + std::function interruptHandler_; }; template diff --git a/folly/wangle/test/Interrupts.cpp b/folly/wangle/test/Interrupts.cpp new file mode 100644 index 00000000..41d5bf7d --- /dev/null +++ b/folly/wangle/test/Interrupts.cpp @@ -0,0 +1,74 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +using namespace folly::wangle; + +TEST(Interrupts, raise) { + std::runtime_error eggs("eggs"); + Promise p; + p.setInterruptHandler([&](std::exception_ptr e) { + EXPECT_THROW(std::rethrow_exception(e), decltype(eggs)); + }); + p.getFuture().raise(eggs); +} + +TEST(Interrupts, cancel) { + Promise p; + p.setInterruptHandler([&](std::exception_ptr e) { + EXPECT_THROW(std::rethrow_exception(e), FutureCancellation); + }); + p.getFuture().cancel(); +} + +TEST(Interrupts, handleThenInterrupt) { + Promise p; + bool flag = false; + p.setInterruptHandler([&](std::exception_ptr e) { flag = true; }); + p.getFuture().cancel(); + EXPECT_TRUE(flag); +} + +TEST(Interrupts, interruptThenHandle) { + Promise p; + bool flag = false; + p.getFuture().cancel(); + p.setInterruptHandler([&](std::exception_ptr e) { flag = true; }); + EXPECT_TRUE(flag); +} + +TEST(Interrupts, interruptAfterFulfilNoop) { + Promise p; + bool flag = false; + p.setInterruptHandler([&](std::exception_ptr e) { flag = true; }); + p.setValue(); + p.getFuture().cancel(); + EXPECT_FALSE(flag); +} + +TEST(Interrupts, secondInterruptNoop) { + Promise p; + int count = 0; + p.setInterruptHandler([&](std::exception_ptr e) { count++; }); + auto f = p.getFuture(); + f.cancel(); + f.cancel(); + EXPECT_EQ(1, count); +} -- 2.34.1