From: Hans Fugal Date: Tue, 30 Sep 2014 22:52:52 +0000 (-0700) Subject: (Wangle) Make via behave more like gate X-Git-Tag: v0.22.0~310 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=684bae3b8250b2a7e4f4ce920d3b12476d581526;p=folly.git (Wangle) Make via behave more like gate Summary: Could the problem be that via continues the existing chain of futures, whereas we actually want to start a new chain? Is there any particular reason this wasn't implemented like this originally? Test Plan: Ran all the unit tests. I hope to try to reproduce the thread issue and see if this improves things. Reviewed By: davejwatson@fb.com Subscribers: trunkagent, net-systems@, exa, njormrod, fugalh FB internal diff: D1500225 Tasks: 4920689 --- diff --git a/folly/wangle/Future-inl.h b/folly/wangle/Future-inl.h index 9a0fe997..f404067b 100644 --- a/folly/wangle/Future-inl.h +++ b/folly/wangle/Future-inl.h @@ -193,14 +193,41 @@ template template inline Future Future::via(Executor* executor) { throwIfInvalid(); - auto f = then([=](Try&& t) { - MoveWrapper> promise; + MoveWrapper> promise; + + auto f = promise->getFuture(); + // We are obligated to return a cold future. + f.deactivate(); + // But we also need to make this one cold for via to at least work some of + // the time. (see below) + deactivate(); + + then([=](Try&& t) mutable { MoveWrapper> tw(std::move(t)); - auto f2 = promise->getFuture(); + // There is a race here. + // When the promise is fulfilled, and the future is still inactive, when + // the future is activated (e.g. by destruction) the callback will happen + // in that context, not in the intended context (which has already left + // the building). + // + // Currently, this will work fine because all the temporaries are + // destructed in an order that is compatible with this implementation: + // + // makeFuture().via(x).then(a).then(b); + // + // However, this will not work reliably: + // + // auto f2 = makeFuture().via(x); + // f2.then(a).then(b); + // + // Because the first temporary is destructed on the first line, and the + // executor is fed. But by the time f2 is destructed, the executor + // may have already fulfilled the promise on the other thread. + // + // TODO(#4920689) fix it executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); }); - return f2; }); - f.deactivate(); + return f; } diff --git a/folly/wangle/Future.h b/folly/wangle/Future.h index e9ee2fb2..df30efc5 100644 --- a/folly/wangle/Future.h +++ b/folly/wangle/Future.h @@ -204,6 +204,9 @@ class Future { void deactivate() { state_->deactivate(); } + bool isActive() { + return state_->isActive(); + } private: typedef detail::State* statePtr; diff --git a/folly/wangle/detail/State.h b/folly/wangle/detail/State.h index 491c38fa..0f7f4bf1 100644 --- a/folly/wangle/detail/State.h +++ b/folly/wangle/detail/State.h @@ -139,11 +139,13 @@ class State { maybeCallback(); } + bool isActive() { return active_; } + private: void maybeCallback() { std::lock_guard lock(mutex_); if (!calledBack_ && - value_ && callback_ && active_) { + value_ && callback_ && isActive()) { // TODO we should probably try/catch here callback_(std::move(*value_)); calledBack_ = true; diff --git a/folly/wangle/test/FutureTest.cpp b/folly/wangle/test/FutureTest.cpp index 1f0fad3c..0f045af3 100644 --- a/folly/wangle/test/FutureTest.cpp +++ b/folly/wangle/test/FutureTest.cpp @@ -764,7 +764,7 @@ TEST(Future, activateOnDestruct) { EXPECT_EQ(1, count); } -TEST(Future, viaIsCold) { +TEST(Future, viaActsCold) { ManualExecutor x; size_t count = 0; @@ -779,6 +779,63 @@ TEST(Future, viaIsCold) { EXPECT_EQ(1, count); } +TEST(Future, viaIsCold) { + ManualExecutor x; + EXPECT_FALSE(makeFuture().via(&x).isActive()); +} + +TEST(Future, viaRaces) { + ManualExecutor x; + Promise p; + auto tid = std::this_thread::get_id(); + bool done = false; + + std::thread t1([&] { + p.getFuture() + .via(&x) + .then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) + .then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) + .then([&](Try&&) { done = true; }); + }); + + std::thread t2([&] { + p.setValue(); + }); + + while (!done) x.run(); + t1.join(); + t2.join(); +} + +// TODO(#4920689) +TEST(Future, DISABLED_viaRaces_2stage) { + ManualExecutor x; + Promise p; + auto tid = std::this_thread::get_id(); + bool done = false; + + std::thread t1([&] { + auto f2 = p.getFuture().via(&x); + f2.then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) + .then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) + .then([&](Try&&) { done = true; }); + + // the bug was in the promise being fulfilled before f2 is reactivated. we + // could sleep, but yielding should cause this to fail with reasonable + // probability + std::this_thread::yield(); + f2.activate(); + }); + + std::thread t2([&] { + p.setValue(); + }); + + while (!done) x.run(); + t1.join(); + t2.join(); +} + TEST(Future, getFuture_after_setValue) { Promise p; p.setValue(42);