From: Qi Zhou Date: Tue, 20 Jun 2017 18:36:53 +0000 (-0700) Subject: fix folly::FunctionScheduler.cancelFunctionAndWait() hanging issue X-Git-Tag: v2017.06.26.00~36 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=b3ef1edce60571289bd205b71a7eacaedf0e6598;p=folly.git fix folly::FunctionScheduler.cancelFunctionAndWait() hanging issue Summary: When - only one function is scheduled in FunctionScheduler; and - the function is running while cancelFunctionAndWait() is being called; FunctionScheduler.cancelFunctionAndWait() will hang forever. The root cause is that the condition in cancelFunctionAndWait() is incorrect: ``` runningCondvar_.wait(l, [currentFunction, this]() { return currentFunction != currentFunction_; }); ``` because currentFunction will not be changed if only one function is in the scheduler. The fix here is to - clear currentFunction as nullptr. This also makes the internal behaviors of cancelFunction() and cancelFunctionAndWait() consistent. - introduces additional variable to indicate the state of cancelling current function. After running the function, the background thread will detect cancellation of current function and clear the variable. - cancelFunctionAndWait() condition variable will wait for the variable to be cleared. Similarly, cancelAllFunctionsAndWait() also suffers from the same issue. Unit tests are added to reproduce the issue. Reviewed By: yfeldblum Differential Revision: D5271664 fbshipit-source-id: acb223304d3eab23129907ce9ff5e57e55f1e909 --- diff --git a/folly/experimental/FunctionScheduler.cpp b/folly/experimental/FunctionScheduler.cpp index 3677e533..2a945015 100644 --- a/folly/experimental/FunctionScheduler.cpp +++ b/folly/experimental/FunctionScheduler.cpp @@ -213,13 +213,24 @@ void FunctionScheduler::addFunctionInternal( runOnce)); } -bool FunctionScheduler::cancelFunction(StringPiece nameID) { - std::unique_lock l(mutex_); - +bool FunctionScheduler::cancelFunctionWithLock( + std::unique_lock& lock, + StringPiece nameID) { + CHECK_EQ(lock.owns_lock(), true); if (currentFunction_ && currentFunction_->name == nameID) { // This function is currently being run. Clear currentFunction_ // The running thread will see this and won't reschedule the function. currentFunction_ = nullptr; + cancellingCurrentFunction_ = true; + return true; + } + return false; +} + +bool FunctionScheduler::cancelFunction(StringPiece nameID) { + std::unique_lock l(mutex_); + + if (cancelFunctionWithLock(l, nameID)) { return true; } @@ -235,11 +246,9 @@ bool FunctionScheduler::cancelFunction(StringPiece nameID) { bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) { std::unique_lock l(mutex_); - auto* currentFunction = currentFunction_; - if (currentFunction && currentFunction->name == nameID) { - runningCondvar_.wait(l, [currentFunction, this]() { - return currentFunction != currentFunction_; - }); + if (cancelFunctionWithLock(l, nameID)) { + runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; }); + return true; } for (auto it = functions_.begin(); it != functions_.end(); ++it) { @@ -272,18 +281,27 @@ void FunctionScheduler::cancelFunction(const std::unique_lock& l, } } -void FunctionScheduler::cancelAllFunctions() { - std::unique_lock l(mutex_); +bool FunctionScheduler::cancelAllFunctionsWithLock( + std::unique_lock& lock) { + CHECK_EQ(lock.owns_lock(), true); functions_.clear(); + if (currentFunction_) { + cancellingCurrentFunction_ = true; + } currentFunction_ = nullptr; + return cancellingCurrentFunction_; +} + +void FunctionScheduler::cancelAllFunctions() { + std::unique_lock l(mutex_); + cancelAllFunctionsWithLock(l); } void FunctionScheduler::cancelAllFunctionsAndWait() { std::unique_lock l(mutex_); - if (currentFunction_) { - runningCondvar_.wait(l, [this]() { return currentFunction_ == nullptr; }); + if (cancelAllFunctionsWithLock(l)) { + runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; }); } - functions_.clear(); } bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) { @@ -441,6 +459,7 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, if (!currentFunction_) { // The function was cancelled while we were running it. // We shouldn't reschedule it; + cancellingCurrentFunction_ = false; return; } if (currentFunction_->runOnce) { diff --git a/folly/experimental/FunctionScheduler.h b/folly/experimental/FunctionScheduler.h index 0c590842..6e5b2198 100644 --- a/folly/experimental/FunctionScheduler.h +++ b/folly/experimental/FunctionScheduler.h @@ -264,6 +264,12 @@ class FunctionScheduler { std::chrono::milliseconds startDelay, bool runOnce); + // Return true if the current function is being canceled + bool cancelAllFunctionsWithLock(std::unique_lock& lock); + bool cancelFunctionWithLock( + std::unique_lock& lock, + StringPiece nameID); + std::thread thread_; // Mutex to protect our member variables. @@ -285,6 +291,7 @@ class FunctionScheduler { std::string threadName_; bool steady_{false}; + bool cancellingCurrentFunction_{false}; }; } diff --git a/folly/experimental/test/FunctionSchedulerTest.cpp b/folly/experimental/test/FunctionSchedulerTest.cpp index 01d22d73..5da41d24 100644 --- a/folly/experimental/test/FunctionSchedulerTest.cpp +++ b/folly/experimental/test/FunctionSchedulerTest.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -567,3 +568,77 @@ TEST(FunctionScheduler, cancelAllFunctionsAndWait) { EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled fs.shutdown(); } + +TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) { + folly::Baton<> baton; + std::thread th([&baton]() { + FunctionScheduler fs; + fs.addFunction([] { delay(10); }, testInterval(2), "func"); + fs.start(); + delay(1); + EXPECT_TRUE(fs.cancelFunctionAndWait("func")); + baton.post(); + }); + + ASSERT_TRUE(baton.timed_wait(testInterval(15))); + th.join(); +} + +TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) { + folly::Baton<> baton; + std::thread th([&baton]() { + FunctionScheduler fs; + fs.addFunction([] { delay(10); }, testInterval(2), "func"); + fs.start(); + delay(1); + fs.cancelAllFunctionsAndWait(); + baton.post(); + }); + + ASSERT_TRUE(baton.timed_wait(testInterval(15))); + th.join(); +} + +TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) { + folly::Baton<> baton; + std::thread th([&baton]() { + std::atomic nExecuted(0); + FunctionScheduler fs; + fs.addFunction( + [&nExecuted] { + nExecuted++; + delay(10); + }, + testInterval(2), + "func0"); + fs.addFunction( + [&nExecuted] { + nExecuted++; + delay(10); + }, + testInterval(2), + "func1", + testInterval(5)); + fs.start(); + delay(1); + fs.cancelAllFunctionsAndWait(); + EXPECT_EQ(nExecuted, 1); + baton.post(); + }); + + ASSERT_TRUE(baton.timed_wait(testInterval(15))); + th.join(); +} + +TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) { + FunctionScheduler fs; + fs.addFunction([] { delay(10); }, testInterval(2), "func"); + + fs.start(); + delay(1); + std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); }); + delay(1); + std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); }); + th1.join(); + th2.join(); +}