Summary:
Instead of returning a deactivated future, have `via` just set the executor. Propagate the executor from `then`. This fixes the `via().get()` problem, and has semantics similar to before for `via().then().then()`.
However, the semantics are now slightly different - each `then` goes back through the executor. This adds some overhead and tweaks the semantics (e.g. if the executor is a threadpool it might execute subsequent `then`s in another thread). However, with `futures::chain` recently introduced, and any other convenience methods that you can dream up and make a case for, we can reasonably get the old once-through-the-executor behavior when performance or other concerns demand it. e.g. `via().then(futures::chain(a, b, c))`.
Test Plan: unit tests
Reviewed By: hannesr@fb.com
Subscribers: zeus-diffs@, mmandal, steveo, rituraj, trunkagent, exa, folly-diffs@, yfeldblum, jsedgwick, davejwatson
FB internal diff:
D1839691
Tasks:
6048744
Signature: t1:
1839691:
1424397180:
ca0b0ea7b3867769ab8abd254a510059df67011e
// grab the Future now before we lose our handle on the Promise
auto f = p->getFuture();
+ if (getExecutor()) {
+ f.setExecutor(getExecutor());
+ }
/* This is a bit tricky.
// grab the Future now before we lose our handle on the Promise
auto f = p->getFuture();
+ if (getExecutor()) {
+ f.setExecutor(getExecutor());
+ }
setCallback_(
[p, funcm](Try<T>&& t) mutable {
inline Future<T> Future<T>::via(Executor* executor) && {
throwIfInvalid();
- this->deactivate();
- core_->setExecutor(executor);
+ setExecutor(executor);
return std::move(*this);
}
/// Overload of waitVia() for rvalue Futures
Future<T>&& waitVia(DrivableExecutor* e) &&;
- private:
+ protected:
typedef detail::Core<T>* corePtr;
// shared core state object
void throwIfInvalid() const;
friend class Promise<T>;
+ template <class> friend class Future;
// Variant: returns a value
// e.g. f.then([](Try<T> t){ return t.value(); });
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
thenImplementation(F func, detail::argResult<isTry, F, Args...>);
+
+ Executor* getExecutor() { return core_->getExecutor(); }
+ void setExecutor(Executor* x) { core_->setExecutor(x); }
};
/**
/// Called by a destructing Future (in the Future thread, by definition)
void detachFuture() {
- activate();
+ activateNoDeprecatedWarning();
detachOne();
}
}
/// May call from any thread
- void deactivate() {
+ void deactivate() DEPRECATED {
active_ = false;
}
/// May call from any thread
- void activate() {
- active_ = true;
- maybeCallback();
+ void activate() DEPRECATED {
+ activateNoDeprecatedWarning();
}
/// May call from any thread
executor_ = x;
}
+ Executor* getExecutor() {
+ return executor_;
+ }
+
/// Call only from Future thread
void raise(exception_wrapper e) {
std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
}
}
- private:
+ protected:
+ void activateNoDeprecatedWarning() {
+ active_ = true;
+ maybeCallback();
+ }
+
void maybeCallback() {
FSM_START(fsm_)
case State::Armed:
{
// try rvalue as well
ManualExecutor x;
- auto f = via(&x).activate().then().waitVia(&x);
+ auto f = via(&x).then().waitVia(&x);
EXPECT_TRUE(f.isReady());
}
}
}
-TEST(Future, callbackAfterActivate) {
- Promise<void> p;
- auto f = p.getFuture();
- f.deactivate();
-
- size_t count = 0;
- f.then([&](Try<void>&&) { count++; });
-
- p.setValue();
- EXPECT_EQ(0, count);
-
- f.activate();
- EXPECT_EQ(1, count);
-}
-
-TEST(Future, activateOnDestruct) {
- auto f = std::make_shared<Future<void>>(makeFuture());
- f->deactivate();
-
- size_t count = 0;
- f->then([&](Try<void>&&) { count++; });
- EXPECT_EQ(0, count);
-
- f.reset();
- EXPECT_EQ(1, count);
-}
-
-TEST(Future, viaActsCold) {
- ManualExecutor x;
- size_t count = 0;
-
- auto fv = via(&x);
- fv.then([&](Try<void>&&) { count++; });
-
- EXPECT_EQ(0, count);
-
- fv.activate();
-
- EXPECT_EQ(1, x.run());
- EXPECT_EQ(1, count);
-}
-
-TEST(Future, viaIsCold) {
- ManualExecutor x;
- EXPECT_FALSE(via(&x).isActive());
-}
-
TEST(Future, viaRaces) {
ManualExecutor x;
Promise<void> p;
t2.join();
}
-// TODO(#4920689)
-TEST(Future, viaRaces_2stage) {
- ManualExecutor x;
- Promise<void> p;
- auto tid = std::this_thread::get_id();
- bool done = false;
-
- std::thread t1([&] {
- auto f2 = p.getFuture().via(&x);
- f2.then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
- .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
- .then([&](Try<void>&&) { done = true; });
-
- // the bug was in the promise being fulfilled before f2 is reactivated. we
- // could sleep, but yielding should cause this to fail with reasonable
- // probability
- std::this_thread::yield();
- f2.activate();
- });
-
- std::thread t2([&] {
- p.setValue();
- });
-
- while (!done) x.run();
- t1.join();
- t2.join();
-}
-
TEST(Future, getFuture_after_setValue) {
Promise<int> p;
p.setValue(42);
done(false)
{
t = std::thread([=] {
- ManualWaiter eastWaiter(eastExecutor);
- while (!done)
- eastWaiter.drive();
+ ManualWaiter eastWaiter(eastExecutor);
+ while (!done)
+ eastWaiter.drive();
});
}
auto future = makeFuture(1)
.then([](Try<int>&& t) {
return makeFuture(t.value() == 1);
- })
- ;
+ });
EXPECT_TRUE(future.value());
}
EXPECT_EQ(f.value(), "start;static;class-static;class");
}
-TEST_F(ViaFixture, deactivateChain) {
- bool flag = false;
- auto f = makeFuture().deactivate();
- EXPECT_FALSE(f.isActive());
- auto f2 = f.then([&](Try<void>){ flag = true; });
- EXPECT_FALSE(flag);
-}
-
-TEST_F(ViaFixture, deactivateActivateChain) {
- bool flag = false;
- // you can do this all day long with temporaries.
- auto f1 = makeFuture().deactivate().activate().deactivate();
- // Chaining on activate/deactivate requires an rvalue, so you have to move
- // one of these two ways (if you're not using a temporary).
- auto f2 = std::move(f1).activate();
- f2.deactivate();
- auto f3 = std::move(f2.activate());
- f3.then([&](Try<void>){ flag = true; });
- EXPECT_TRUE(flag);
-}
-
TEST_F(ViaFixture, thread_hops) {
auto westThreadId = std::this_thread::get_id();
auto f = via(eastExecutor.get()).then([=](Try<void>&& t) {
TEST_F(ViaFixture, chain_vias) {
auto westThreadId = std::this_thread::get_id();
- auto f = via(eastExecutor.get()).then([=](Try<void>&& t) {
+ auto f = via(eastExecutor.get()).then([=]() {
EXPECT_NE(std::this_thread::get_id(), westThreadId);
- return makeFuture<int>(1);
- }).then([=](Try<int>&& t) {
- int val = t.value();
- return makeFuture(std::move(val)).via(westExecutor.get())
- .then([=](Try<int>&& t) mutable {
+ return 1;
+ }).then([=](int val) {
+ return makeFuture(val).via(westExecutor.get())
+ .then([=](int val) mutable {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
- return t.value();
+ return val + 1;
});
- }).then([=](Try<int>&& t) {
+ }).then([=](int val) {
+ // even though ultimately the future that triggers this one executed in
+ // the west thread, this then() inherited the executor from its
+ // predecessor, ie the eastExecutor.
+ EXPECT_NE(std::this_thread::get_id(), westThreadId);
+ return val + 1;
+ }).via(westExecutor.get()).then([=](int val) {
+ // go back to west, so we can wait on it
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
- return t.value();
+ return val + 1;
});
- EXPECT_EQ(f.getVia(waiter.get()), 1);
+ EXPECT_EQ(f.getVia(waiter.get()), 4);
}
TEST_F(ViaFixture, bareViaAssignment) {