Summary:
Could the problem be that via continues the existing chain of futures,
whereas we actually want to start a new chain?
Is there any particular reason this wasn't implemented like this originally?
Test Plan:
Ran all the unit tests. I hope to try to reproduce the thread issue and
see if this improves things.
Reviewed By: davejwatson@fb.com
Subscribers: trunkagent, net-systems@, exa, njormrod, fugalh
FB internal diff:
D1500225
Tasks:
4920689
template <typename Executor>
inline Future<T> Future<T>::via(Executor* executor) {
throwIfInvalid();
- auto f = then([=](Try<T>&& t) {
- MoveWrapper<Promise<T>> promise;
+ MoveWrapper<Promise<T>> promise;
+
+ auto f = promise->getFuture();
+ // We are obligated to return a cold future.
+ f.deactivate();
+ // But we also need to make this one cold for via to at least work some of
+ // the time. (see below)
+ deactivate();
+
+ then([=](Try<T>&& t) mutable {
MoveWrapper<Try<T>> tw(std::move(t));
- auto f2 = promise->getFuture();
+ // There is a race here.
+ // When the promise is fulfilled, and the future is still inactive, when
+ // the future is activated (e.g. by destruction) the callback will happen
+ // in that context, not in the intended context (which has already left
+ // the building).
+ //
+ // Currently, this will work fine because all the temporaries are
+ // destructed in an order that is compatible with this implementation:
+ //
+ // makeFuture().via(x).then(a).then(b);
+ //
+ // However, this will not work reliably:
+ //
+ // auto f2 = makeFuture().via(x);
+ // f2.then(a).then(b);
+ //
+ // Because the first temporary is destructed on the first line, and the
+ // executor is fed. But by the time f2 is destructed, the executor
+ // may have already fulfilled the promise on the other thread.
+ //
+ // TODO(#4920689) fix it
executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
- return f2;
});
- f.deactivate();
+
return f;
}
void deactivate() {
state_->deactivate();
}
+ bool isActive() {
+ return state_->isActive();
+ }
private:
typedef detail::State<T>* statePtr;
maybeCallback();
}
+ bool isActive() { return active_; }
+
private:
void maybeCallback() {
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (!calledBack_ &&
- value_ && callback_ && active_) {
+ value_ && callback_ && isActive()) {
// TODO we should probably try/catch here
callback_(std::move(*value_));
calledBack_ = true;
EXPECT_EQ(1, count);
}
-TEST(Future, viaIsCold) {
+TEST(Future, viaActsCold) {
ManualExecutor x;
size_t count = 0;
EXPECT_EQ(1, count);
}
+TEST(Future, viaIsCold) {
+ ManualExecutor x;
+ EXPECT_FALSE(makeFuture().via(&x).isActive());
+}
+
+TEST(Future, viaRaces) {
+ ManualExecutor x;
+ Promise<void> p;
+ auto tid = std::this_thread::get_id();
+ bool done = false;
+
+ std::thread t1([&] {
+ p.getFuture()
+ .via(&x)
+ .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+ .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+ .then([&](Try<void>&&) { done = true; });
+ });
+
+ std::thread t2([&] {
+ p.setValue();
+ });
+
+ while (!done) x.run();
+ t1.join();
+ t2.join();
+}
+
+// TODO(#4920689)
+TEST(Future, DISABLED_viaRaces_2stage) {
+ ManualExecutor x;
+ Promise<void> p;
+ auto tid = std::this_thread::get_id();
+ bool done = false;
+
+ std::thread t1([&] {
+ auto f2 = p.getFuture().via(&x);
+ f2.then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+ .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+ .then([&](Try<void>&&) { done = true; });
+
+ // the bug was in the promise being fulfilled before f2 is reactivated. we
+ // could sleep, but yielding should cause this to fail with reasonable
+ // probability
+ std::this_thread::yield();
+ f2.activate();
+ });
+
+ std::thread t2([&] {
+ p.setValue();
+ });
+
+ while (!done) x.run();
+ t1.join();
+ t2.join();
+}
+
TEST(Future, getFuture_after_setValue) {
Promise<int> p;
p.setValue(42);