From: James Sedgwick Date: Wed, 28 Jan 2015 16:05:30 +0000 (-0800) Subject: Another stab at waitWithSemaphore -> Future::wait() X-Git-Tag: v0.23.0~17 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=c4a221ce754b643ce89ce853f1bb3edbc603a735;p=folly.git Another stab at waitWithSemaphore -> Future::wait() Summary: See D1785572. Check out the new Future test and the commented portion of wait(Duration) for the fix The test only fails a few times out of a hundred before the fix, but hasn't failed yet after Test Plan: futures unit, wait for contbuild Reviewed By: hans@fb.com Subscribers: trunkagent, rushix, fbcode-common-diffs@, hero-diffs@, cold-storage-diffs@, adamsyta, zhuohuang, darshan, micha, folly-diffs@, lins, tingy, hannesr, jsedgwick FB internal diff: D1803526 Tasks: 5940008, 6059995 Signature: t1:1803526:1422309486:3613c59a708ecac312d241723828763feb2a57aa --- diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index e95b4a1f..dd0f196d 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -510,65 +510,6 @@ whenN(InputIterator first, InputIterator last, size_t n) { return ctx->p.getFuture(); } -template -Future -waitWithSemaphore(Future&& f) { - Baton<> baton; - auto done = f.then([&](Try &&t) { - baton.post(); - return std::move(t.value()); - }); - baton.wait(); - while (!done.isReady()) { - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - std::this_thread::yield(); - } - return done; -} - -template<> -inline Future waitWithSemaphore(Future&& f) { - Baton<> baton; - auto done = f.then([&](Try &&t) { - baton.post(); - t.value(); - }); - baton.wait(); - while (!done.isReady()) { - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - std::this_thread::yield(); - } - return done; -} - -template -Future -waitWithSemaphore(Future&& f, Dur timeout) { - auto baton = std::make_shared>(); - auto done = f.then([baton](Try &&t) { - baton->post(); - return std::move(t.value()); - }); - baton->timed_wait(std::chrono::system_clock::now() + timeout); - return done; -} - -template -Future -waitWithSemaphore(Future&& f, Dur timeout) { - auto baton = std::make_shared>(); - auto done = f.then([baton](Try &&t) { - baton->post(); - t.value(); - }); - baton->timed_wait(std::chrono::system_clock::now() + timeout); - return done; -} - namespace { template void getWaitHelper(Future* f) { @@ -741,7 +682,14 @@ Future Future::wait(Duration dur) { baton->post(); return makeFuture(std::move(t)); }); - baton->timed_wait(std::chrono::system_clock::now() + dur); + // Let's preserve the invariant that if we did not timeout (timed_wait returns + // true), then the returned Future is complete when it is returned to the + // caller. We need to wait out the race for that Future to complete. + if (baton->timed_wait(std::chrono::system_clock::now() + dur)) { + while (!done.isReady()) { + std::this_thread::yield(); + } + } return done; } diff --git a/folly/futures/Future.h b/folly/futures/Future.h index 0f406d76..fe570fad 100644 --- a/folly/futures/Future.h +++ b/folly/futures/Future.h @@ -545,24 +545,6 @@ Future::value_type::value_type>>>> whenN(InputIterator first, InputIterator last, size_t n); -/** Wait for the given future to complete on a semaphore. Returns a completed - * future containing the result. - * - * NB if the promise for the future would be fulfilled in the same thread that - * you call this, it will deadlock. - */ -template -Future waitWithSemaphore(Future&& f); - -/** Wait for up to `timeout` for the given future to complete. Returns a future - * which may or may not be completed depending whether the given future - * completed in time - * - * Note: each call to this starts a (short-lived) thread and allocates memory. - */ -template -Future waitWithSemaphore(Future&& f, Dur timeout); - } // folly #include diff --git a/folly/futures/test/FutureTest.cpp b/folly/futures/test/FutureTest.cpp index f4cd5286..8eec4e31 100644 --- a/folly/futures/test/FutureTest.cpp +++ b/folly/futures/test/FutureTest.cpp @@ -912,31 +912,31 @@ TEST(Future, throwIfFailed) { }); } -TEST(Future, waitWithSemaphoreImmediate) { - waitWithSemaphore(makeFuture()); - auto done = waitWithSemaphore(makeFuture(42)).value(); +TEST(Future, waitImmediate) { + makeFuture().wait(); + auto done = makeFuture(42).wait().value(); EXPECT_EQ(42, done); vector v{1,2,3}; - auto done_v = waitWithSemaphore(makeFuture(v)).value(); + auto done_v = makeFuture(v).wait().value(); EXPECT_EQ(v.size(), done_v.size()); EXPECT_EQ(v, done_v); vector> v_f; v_f.push_back(makeFuture()); v_f.push_back(makeFuture()); - auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())).value(); + auto done_v_f = whenAll(v_f.begin(), v_f.end()).wait().value(); EXPECT_EQ(2, done_v_f.size()); vector> v_fb; v_fb.push_back(makeFuture(true)); v_fb.push_back(makeFuture(false)); auto fut = whenAll(v_fb.begin(), v_fb.end()); - auto done_v_fb = std::move(waitWithSemaphore(std::move(fut)).value()); + auto done_v_fb = std::move(fut.wait().value()); EXPECT_EQ(2, done_v_fb.size()); } -TEST(Future, waitWithSemaphore) { +TEST(Future, wait) { Promise p; Future f = p.getFuture(); std::atomic flag{false}; @@ -949,7 +949,7 @@ TEST(Future, waitWithSemaphore) { return t.value(); }); flag = true; - result.store(waitWithSemaphore(std::move(n)).value()); + result.store(n.wait().value()); }, std::move(f) ); @@ -963,12 +963,11 @@ TEST(Future, waitWithSemaphore) { EXPECT_EQ(result.load(), 42); } -TEST(Future, waitWithSemaphoreForTime) { +TEST(Future, waitWithDuration) { { Promise p; Future f = p.getFuture(); - auto t = waitWithSemaphore(std::move(f), - std::chrono::microseconds(1)); + auto t = f.wait(std::chrono::milliseconds(1)); EXPECT_FALSE(t.isReady()); p.setValue(1); EXPECT_TRUE(t.isReady()); @@ -977,8 +976,7 @@ TEST(Future, waitWithSemaphoreForTime) { Promise p; Future f = p.getFuture(); p.setValue(1); - auto t = waitWithSemaphore(std::move(f), - std::chrono::milliseconds(1)); + auto t = f.wait(std::chrono::milliseconds(1)); EXPECT_TRUE(t.isReady()); } { @@ -986,8 +984,7 @@ TEST(Future, waitWithSemaphoreForTime) { v_fb.push_back(makeFuture(true)); v_fb.push_back(makeFuture(false)); auto f = whenAll(v_fb.begin(), v_fb.end()); - auto t = waitWithSemaphore(std::move(f), - std::chrono::milliseconds(1)); + auto t = f.wait(std::chrono::milliseconds(1)); EXPECT_TRUE(t.isReady()); EXPECT_EQ(2, t.value().size()); } @@ -998,8 +995,7 @@ TEST(Future, waitWithSemaphoreForTime) { v_fb.push_back(p1.getFuture()); v_fb.push_back(p2.getFuture()); auto f = whenAll(v_fb.begin(), v_fb.end()); - auto t = waitWithSemaphore(std::move(f), - std::chrono::milliseconds(1)); + auto t = f.wait(std::chrono::milliseconds(1)); EXPECT_FALSE(t.isReady()); p1.setValue(true); EXPECT_FALSE(t.isReady()); @@ -1007,10 +1003,36 @@ TEST(Future, waitWithSemaphoreForTime) { EXPECT_TRUE(t.isReady()); } { - auto t = waitWithSemaphore(makeFuture(), - std::chrono::milliseconds(1)); + auto t = makeFuture().wait(std::chrono::milliseconds(1)); EXPECT_TRUE(t.isReady()); } + + { + Promise p; + auto start = std::chrono::steady_clock::now(); + auto f = p.getFuture().wait(std::chrono::milliseconds(100)); + auto elapsed = std::chrono::steady_clock::now() - start; + EXPECT_GE(elapsed, std::chrono::milliseconds(100)); + EXPECT_FALSE(f.isReady()); + p.setValue(); + EXPECT_TRUE(f.isReady()); + } + + { + // Try to trigger the race where the resultant Future is not yet complete + // even if we didn't hit the timeout, and make sure we deal with it properly + Promise p; + folly::Baton<> b; + auto t = std::thread([&]{ + b.post(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + p.setValue(); + }); + b.wait(); + auto f = p.getFuture().wait(std::chrono::seconds(3600)); + EXPECT_TRUE(f.isReady()); + t.join(); + } } class DummyDrivableExecutor : public DrivableExecutor { @@ -1263,7 +1285,7 @@ TEST(Future, t5506504) { return whenAll(futures.begin(), futures.end()); }; - waitWithSemaphore(fn()); + fn().wait(); } // Test of handling of a circular dependency. It's never recommended