Summary:
Modeled very closely after Finagle's interrupts. Compare with https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Promise.scala if you like.
The basic idea is the promise holder can register an interrupt handler, and then interrupts will call that handler. A typical handler would fulfil the promise with an exception (or special value) indicating that it was interrupted (if it was interrupted in time).
Raising an interrupt does not prevent setting a value or callbacks executing or any of that - it is only advisory to the promise holder.
Test Plan: I wrote some unit tests.
Reviewed By: davejwatson@fb.com
Subscribers: folly-diffs@, net-systems@, fugalh, exa, hannesr, njormrod
FB internal diff:
D1620805
Tasks:
4618297
return core_->ready();
}
+template <class T>
+void Future<T>::raise(std::exception_ptr exception) {
+ core_->raise(exception);
+}
+
// makeFuture
template <class T>
return core_->isActive();
}
+ template <class E>
+ void raise(E&& exception) {
+ raise(std::make_exception_ptr(std::forward<E>(exception)));
+ }
+
+ /// Raise an interrupt. If the promise holder has an interrupt
+ /// handler it will be called and potentially stop asynchronous work from
+ /// being done. This is advisory only - a promise holder may not set an
+ /// interrupt handler, or may do anything including ignore. But, if you know
+ /// your future supports this the most likely result is stopping or
+ /// preventing the asynchronous operation (if in time), and the promise
+ /// holder setting an exception on the future. (That may happen
+ /// asynchronously, of course.)
+ void raise(std::exception_ptr interrupt);
+
+ void cancel() {
+ raise(FutureCancellation());
+ }
+
private:
typedef detail::Core<T>* corePtr;
core_->setResult(Try<T>(e));
}
+template <class T>
+void Promise<T>::setInterruptHandler(
+ std::function<void(std::exception_ptr const&)> fn) {
+ core_->setInterruptHandler(std::move(fn));
+}
+
template <class T>
void Promise<T>::fulfilTry(Try<T>&& t) {
throwIfFulfilled();
*/
template <class E> void setException(E const&);
+ /// Set an interrupt handler to handle interrupts. See the documentation for
+ /// Future::raise(). Your handler can do whatever it wants, but if you
+ /// bother to set one then you probably will want to fulfil the promise with
+ /// an exception (or special value) indicating how the interrupt was
+ /// handled.
+ void setInterruptHandler(std::function<void(std::exception_ptr const&)>);
+
/** Fulfil this Promise (only for Promise<void>) */
void setValue();
WangleException("Using unitialized try") { }
};
+class FutureCancellation : public WangleException {
+ public:
+ FutureCancellation() : WangleException("Future was cancelled") {}
+};
+
}}
enum class State {
Waiting,
+ Interruptible,
+ Interrupted,
Done,
};
callback_ = std::move(func);
};
- bool done = false;
- while (!done) {
- switch (getState()) {
+ FSM_START
case State::Waiting:
- done = updateState(State::Waiting, State::Waiting, setCallback_);
+ case State::Interruptible:
+ case State::Interrupted:
+ FSM_UPDATE(state, setCallback_);
break;
case State::Done:
- done = updateState(State::Done, State::Done,
- setCallback_,
- [&]{ maybeCallback(); });
+ FSM_UPDATE2(State::Done,
+ setCallback_,
+ [&]{ maybeCallback(); });
break;
- }
- }
+ FSM_END
}
void setResult(Try<T>&& t) {
- bool done = false;
- while (!done) {
- switch (getState()) {
+ FSM_START
case State::Waiting:
- done = updateState(State::Waiting, State::Done,
+ case State::Interruptible:
+ case State::Interrupted:
+ FSM_UPDATE2(State::Done,
[&]{ result_ = std::move(t); },
[&]{ maybeCallback(); });
break;
case State::Done:
throw std::logic_error("setResult called twice");
- }
- }
+ FSM_END
}
bool ready() const {
executor_ = x;
}
+ void raise(std::exception_ptr const& e) {
+ FSM_START
+ case State::Interruptible:
+ FSM_UPDATE2(State::Interrupted,
+ [&]{ interrupt_ = e; },
+ [&]{ interruptHandler_(interrupt_); });
+ break;
+
+ case State::Waiting:
+ case State::Interrupted:
+ FSM_UPDATE(State::Interrupted,
+ [&]{ interrupt_ = e; });
+ break;
+
+ case State::Done:
+ FSM_BREAK
+ FSM_END
+ }
+
+ void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
+ FSM_START
+ case State::Waiting:
+ case State::Interruptible:
+ FSM_UPDATE(State::Interruptible,
+ [&]{ interruptHandler_ = std::move(fn); });
+ break;
+
+ case State::Interrupted:
+ fn(interrupt_);
+ FSM_BREAK
+
+ case State::Done:
+ FSM_BREAK
+ FSM_END
+ }
+
private:
void maybeCallback() {
assert(ready());
std::atomic<unsigned char> detached_ {0};
std::atomic<bool> active_ {true};
std::atomic<Executor*> executor_ {nullptr};
+ std::exception_ptr interrupt_;
+ std::function<void(std::exception_ptr const&)> interruptHandler_;
};
template <typename... Ts>
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <folly/wangle/Future.h>
+#include <folly/wangle/Promise.h>
+
+using namespace folly::wangle;
+
+TEST(Interrupts, raise) {
+ std::runtime_error eggs("eggs");
+ Promise<void> p;
+ p.setInterruptHandler([&](std::exception_ptr e) {
+ EXPECT_THROW(std::rethrow_exception(e), decltype(eggs));
+ });
+ p.getFuture().raise(eggs);
+}
+
+TEST(Interrupts, cancel) {
+ Promise<void> p;
+ p.setInterruptHandler([&](std::exception_ptr e) {
+ EXPECT_THROW(std::rethrow_exception(e), FutureCancellation);
+ });
+ p.getFuture().cancel();
+}
+
+TEST(Interrupts, handleThenInterrupt) {
+ Promise<int> p;
+ bool flag = false;
+ p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+ p.getFuture().cancel();
+ EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptThenHandle) {
+ Promise<int> p;
+ bool flag = false;
+ p.getFuture().cancel();
+ p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+ EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptAfterFulfilNoop) {
+ Promise<void> p;
+ bool flag = false;
+ p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+ p.setValue();
+ p.getFuture().cancel();
+ EXPECT_FALSE(flag);
+}
+
+TEST(Interrupts, secondInterruptNoop) {
+ Promise<void> p;
+ int count = 0;
+ p.setInterruptHandler([&](std::exception_ptr e) { count++; });
+ auto f = p.getFuture();
+ f.cancel();
+ f.cancel();
+ EXPECT_EQ(1, count);
+}