From: Hans Fugal Date: Tue, 21 Oct 2014 17:24:10 +0000 (-0700) Subject: (wangle) Interrupts (and therefore, cancellation) X-Git-Tag: v0.22.0~253 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=831405dc6952742e29fb030d870608fab8830f1a;p=folly.git (wangle) Interrupts (and therefore, cancellation) Summary: Modeled very closely after Finagle's interrupts. Compare with https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Promise.scala if you like. The basic idea is the promise holder can register an interrupt handler, and then interrupts will call that handler. A typical handler would fulfil the promise with an exception (or special value) indicating that it was interrupted (if it was interrupted in time). Raising an interrupt does not prevent setting a value or callbacks executing or any of that - it is only advisory to the promise holder. Test Plan: I wrote some unit tests. Reviewed By: davejwatson@fb.com Subscribers: folly-diffs@, net-systems@, fugalh, exa, hannesr, njormrod FB internal diff: D1620805 Tasks: 4618297 --- diff --git a/folly/wangle/Future-inl.h b/folly/wangle/Future-inl.h index 9db9ff5f..5615c7d5 100644 --- a/folly/wangle/Future-inl.h +++ b/folly/wangle/Future-inl.h @@ -206,6 +206,11 @@ bool Future::isReady() const { return core_->ready(); } +template +void Future::raise(std::exception_ptr exception) { + core_->raise(exception); +} + // makeFuture template diff --git a/folly/wangle/Future.h b/folly/wangle/Future.h index 3f46034b..4f0348ce 100644 --- a/folly/wangle/Future.h +++ b/folly/wangle/Future.h @@ -208,6 +208,25 @@ class Future { return core_->isActive(); } + template + void raise(E&& exception) { + raise(std::make_exception_ptr(std::forward(exception))); + } + + /// Raise an interrupt. If the promise holder has an interrupt + /// handler it will be called and potentially stop asynchronous work from + /// being done. This is advisory only - a promise holder may not set an + /// interrupt handler, or may do anything including ignore. But, if you know + /// your future supports this the most likely result is stopping or + /// preventing the asynchronous operation (if in time), and the promise + /// holder setting an exception on the future. (That may happen + /// asynchronously, of course.) + void raise(std::exception_ptr interrupt); + + void cancel() { + raise(FutureCancellation()); + } + private: typedef detail::Core* corePtr; diff --git a/folly/wangle/Promise-inl.h b/folly/wangle/Promise-inl.h index efb8edfa..68d69641 100644 --- a/folly/wangle/Promise-inl.h +++ b/folly/wangle/Promise-inl.h @@ -88,6 +88,12 @@ void Promise::setException(std::exception_ptr const& e) { core_->setResult(Try(e)); } +template +void Promise::setInterruptHandler( + std::function fn) { + core_->setInterruptHandler(std::move(fn)); +} + template void Promise::fulfilTry(Try&& t) { throwIfFulfilled(); diff --git a/folly/wangle/Promise.h b/folly/wangle/Promise.h index a4eb94b1..61bc0699 100644 --- a/folly/wangle/Promise.h +++ b/folly/wangle/Promise.h @@ -57,6 +57,13 @@ public: */ template void setException(E const&); + /// Set an interrupt handler to handle interrupts. See the documentation for + /// Future::raise(). Your handler can do whatever it wants, but if you + /// bother to set one then you probably will want to fulfil the promise with + /// an exception (or special value) indicating how the interrupt was + /// handled. + void setInterruptHandler(std::function); + /** Fulfil this Promise (only for Promise) */ void setValue(); diff --git a/folly/wangle/WangleException.h b/folly/wangle/WangleException.h index aec22970..bddf86c7 100644 --- a/folly/wangle/WangleException.h +++ b/folly/wangle/WangleException.h @@ -81,4 +81,9 @@ class UsingUninitializedTry : public WangleException { WangleException("Using unitialized try") { } }; +class FutureCancellation : public WangleException { + public: + FutureCancellation() : WangleException("Future was cancelled") {} +}; + }} diff --git a/folly/wangle/detail/Core.h b/folly/wangle/detail/Core.h index ee60e65c..d45b985d 100644 --- a/folly/wangle/detail/Core.h +++ b/folly/wangle/detail/Core.h @@ -39,6 +39,8 @@ void empty_callback(Try&&) { } enum class State { Waiting, + Interruptible, + Interrupted, Done, }; @@ -81,36 +83,34 @@ class Core : protected FSM { callback_ = std::move(func); }; - bool done = false; - while (!done) { - switch (getState()) { + FSM_START case State::Waiting: - done = updateState(State::Waiting, State::Waiting, setCallback_); + case State::Interruptible: + case State::Interrupted: + FSM_UPDATE(state, setCallback_); break; case State::Done: - done = updateState(State::Done, State::Done, - setCallback_, - [&]{ maybeCallback(); }); + FSM_UPDATE2(State::Done, + setCallback_, + [&]{ maybeCallback(); }); break; - } - } + FSM_END } void setResult(Try&& t) { - bool done = false; - while (!done) { - switch (getState()) { + FSM_START case State::Waiting: - done = updateState(State::Waiting, State::Done, + case State::Interruptible: + case State::Interrupted: + FSM_UPDATE2(State::Done, [&]{ result_ = std::move(t); }, [&]{ maybeCallback(); }); break; case State::Done: throw std::logic_error("setResult called twice"); - } - } + FSM_END } bool ready() const { @@ -151,6 +151,42 @@ class Core : protected FSM { executor_ = x; } + void raise(std::exception_ptr const& e) { + FSM_START + case State::Interruptible: + FSM_UPDATE2(State::Interrupted, + [&]{ interrupt_ = e; }, + [&]{ interruptHandler_(interrupt_); }); + break; + + case State::Waiting: + case State::Interrupted: + FSM_UPDATE(State::Interrupted, + [&]{ interrupt_ = e; }); + break; + + case State::Done: + FSM_BREAK + FSM_END + } + + void setInterruptHandler(std::function fn) { + FSM_START + case State::Waiting: + case State::Interruptible: + FSM_UPDATE(State::Interruptible, + [&]{ interruptHandler_ = std::move(fn); }); + break; + + case State::Interrupted: + fn(interrupt_); + FSM_BREAK + + case State::Done: + FSM_BREAK + FSM_END + } + private: void maybeCallback() { assert(ready()); @@ -183,6 +219,8 @@ class Core : protected FSM { std::atomic detached_ {0}; std::atomic active_ {true}; std::atomic executor_ {nullptr}; + std::exception_ptr interrupt_; + std::function interruptHandler_; }; template diff --git a/folly/wangle/test/Interrupts.cpp b/folly/wangle/test/Interrupts.cpp new file mode 100644 index 00000000..41d5bf7d --- /dev/null +++ b/folly/wangle/test/Interrupts.cpp @@ -0,0 +1,74 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +using namespace folly::wangle; + +TEST(Interrupts, raise) { + std::runtime_error eggs("eggs"); + Promise p; + p.setInterruptHandler([&](std::exception_ptr e) { + EXPECT_THROW(std::rethrow_exception(e), decltype(eggs)); + }); + p.getFuture().raise(eggs); +} + +TEST(Interrupts, cancel) { + Promise p; + p.setInterruptHandler([&](std::exception_ptr e) { + EXPECT_THROW(std::rethrow_exception(e), FutureCancellation); + }); + p.getFuture().cancel(); +} + +TEST(Interrupts, handleThenInterrupt) { + Promise p; + bool flag = false; + p.setInterruptHandler([&](std::exception_ptr e) { flag = true; }); + p.getFuture().cancel(); + EXPECT_TRUE(flag); +} + +TEST(Interrupts, interruptThenHandle) { + Promise p; + bool flag = false; + p.getFuture().cancel(); + p.setInterruptHandler([&](std::exception_ptr e) { flag = true; }); + EXPECT_TRUE(flag); +} + +TEST(Interrupts, interruptAfterFulfilNoop) { + Promise p; + bool flag = false; + p.setInterruptHandler([&](std::exception_ptr e) { flag = true; }); + p.setValue(); + p.getFuture().cancel(); + EXPECT_FALSE(flag); +} + +TEST(Interrupts, secondInterruptNoop) { + Promise p; + int count = 0; + p.setInterruptHandler([&](std::exception_ptr e) { count++; }); + auto f = p.getFuture(); + f.cancel(); + f.cancel(); + EXPECT_EQ(1, count); +}