fix folly::FunctionScheduler.cancelFunctionAndWait() hanging issue
authorQi Zhou <qizhou@fb.com>
Tue, 20 Jun 2017 18:36:53 +0000 (11:36 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Tue, 20 Jun 2017 18:50:20 +0000 (11:50 -0700)
Summary:
When
- only one function is scheduled in FunctionScheduler; and
- the function is running while cancelFunctionAndWait() is being called;
FunctionScheduler.cancelFunctionAndWait() will hang forever.  The root cause is that the condition in cancelFunctionAndWait() is incorrect:

```
runningCondvar_.wait(l, [currentFunction, this]() {
  return currentFunction != currentFunction_;
});
```

because currentFunction will not be changed if only one function is in the scheduler.

The fix here is to
- clear currentFunction as nullptr.  This also makes the internal behaviors of cancelFunction() and cancelFunctionAndWait() consistent.
- introduces additional variable to indicate the state of cancelling current function.  After running the function, the background thread will detect cancellation of current function and clear the variable.
- cancelFunctionAndWait() condition variable will wait for the variable to be cleared.

Similarly, cancelAllFunctionsAndWait() also suffers from the same issue.

Unit tests are added to reproduce the issue.

Reviewed By: yfeldblum

Differential Revision: D5271664

fbshipit-source-id: acb223304d3eab23129907ce9ff5e57e55f1e909

folly/experimental/FunctionScheduler.cpp
folly/experimental/FunctionScheduler.h
folly/experimental/test/FunctionSchedulerTest.cpp

index 3677e53346a98d3cf865c40f50b985626c7a63dd..2a94501523340d28c82ddc07afbbdee20236cc1e 100644 (file)
@@ -213,13 +213,24 @@ void FunctionScheduler::addFunctionInternal(
           runOnce));
 }
 
-bool FunctionScheduler::cancelFunction(StringPiece nameID) {
-  std::unique_lock<std::mutex> l(mutex_);
-
+bool FunctionScheduler::cancelFunctionWithLock(
+    std::unique_lock<std::mutex>& lock,
+    StringPiece nameID) {
+  CHECK_EQ(lock.owns_lock(), true);
   if (currentFunction_ && currentFunction_->name == nameID) {
     // This function is currently being run. Clear currentFunction_
     // The running thread will see this and won't reschedule the function.
     currentFunction_ = nullptr;
+    cancellingCurrentFunction_ = true;
+    return true;
+  }
+  return false;
+}
+
+bool FunctionScheduler::cancelFunction(StringPiece nameID) {
+  std::unique_lock<std::mutex> l(mutex_);
+
+  if (cancelFunctionWithLock(l, nameID)) {
     return true;
   }
 
@@ -235,11 +246,9 @@ bool FunctionScheduler::cancelFunction(StringPiece nameID) {
 bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
   std::unique_lock<std::mutex> l(mutex_);
 
-  auto* currentFunction = currentFunction_;
-  if (currentFunction && currentFunction->name == nameID) {
-    runningCondvar_.wait(l, [currentFunction, this]() {
-      return currentFunction != currentFunction_;
-    });
+  if (cancelFunctionWithLock(l, nameID)) {
+    runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
+    return true;
   }
 
   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
@@ -272,18 +281,27 @@ void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
   }
 }
 
-void FunctionScheduler::cancelAllFunctions() {
-  std::unique_lock<std::mutex> l(mutex_);
+bool FunctionScheduler::cancelAllFunctionsWithLock(
+    std::unique_lock<std::mutex>& lock) {
+  CHECK_EQ(lock.owns_lock(), true);
   functions_.clear();
+  if (currentFunction_) {
+    cancellingCurrentFunction_ = true;
+  }
   currentFunction_ = nullptr;
+  return cancellingCurrentFunction_;
+}
+
+void FunctionScheduler::cancelAllFunctions() {
+  std::unique_lock<std::mutex> l(mutex_);
+  cancelAllFunctionsWithLock(l);
 }
 
 void FunctionScheduler::cancelAllFunctionsAndWait() {
   std::unique_lock<std::mutex> l(mutex_);
-  if (currentFunction_) {
-    runningCondvar_.wait(l, [this]() { return currentFunction_ == nullptr; });
+  if (cancelAllFunctionsWithLock(l)) {
+    runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
   }
-  functions_.clear();
 }
 
 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
@@ -441,6 +459,7 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   if (!currentFunction_) {
     // The function was cancelled while we were running it.
     // We shouldn't reschedule it;
+    cancellingCurrentFunction_ = false;
     return;
   }
   if (currentFunction_->runOnce) {
index 0c590842a0adb3698164f6604e31b53be1a11c1d..6e5b2198e7946a67d86f17fc996921897d272630 100644 (file)
@@ -264,6 +264,12 @@ class FunctionScheduler {
       std::chrono::milliseconds startDelay,
       bool runOnce);
 
+  // Return true if the current function is being canceled
+  bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
+  bool cancelFunctionWithLock(
+      std::unique_lock<std::mutex>& lock,
+      StringPiece nameID);
+
   std::thread thread_;
 
   // Mutex to protect our member variables.
@@ -285,6 +291,7 @@ class FunctionScheduler {
 
   std::string threadName_;
   bool steady_{false};
+  bool cancellingCurrentFunction_{false};
 };
 
 }
index 01d22d73fdb043fb25c0c69281055a708789353e..5da41d24b5613cd09f3498239de36e49c8f095e4 100644 (file)
@@ -18,6 +18,7 @@
 #include <cassert>
 #include <random>
 
+#include <folly/Baton.h>
 #include <folly/Random.h>
 #include <folly/experimental/FunctionScheduler.h>
 #include <folly/portability/GTest.h>
@@ -567,3 +568,77 @@ TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
   EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
   fs.shutdown();
 }
+
+TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
+  folly::Baton<> baton;
+  std::thread th([&baton]() {
+    FunctionScheduler fs;
+    fs.addFunction([] { delay(10); }, testInterval(2), "func");
+    fs.start();
+    delay(1);
+    EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
+    baton.post();
+  });
+
+  ASSERT_TRUE(baton.timed_wait(testInterval(15)));
+  th.join();
+}
+
+TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
+  folly::Baton<> baton;
+  std::thread th([&baton]() {
+    FunctionScheduler fs;
+    fs.addFunction([] { delay(10); }, testInterval(2), "func");
+    fs.start();
+    delay(1);
+    fs.cancelAllFunctionsAndWait();
+    baton.post();
+  });
+
+  ASSERT_TRUE(baton.timed_wait(testInterval(15)));
+  th.join();
+}
+
+TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
+  folly::Baton<> baton;
+  std::thread th([&baton]() {
+    std::atomic<int> nExecuted(0);
+    FunctionScheduler fs;
+    fs.addFunction(
+        [&nExecuted] {
+          nExecuted++;
+          delay(10);
+        },
+        testInterval(2),
+        "func0");
+    fs.addFunction(
+        [&nExecuted] {
+          nExecuted++;
+          delay(10);
+        },
+        testInterval(2),
+        "func1",
+        testInterval(5));
+    fs.start();
+    delay(1);
+    fs.cancelAllFunctionsAndWait();
+    EXPECT_EQ(nExecuted, 1);
+    baton.post();
+  });
+
+  ASSERT_TRUE(baton.timed_wait(testInterval(15)));
+  th.join();
+}
+
+TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
+  FunctionScheduler fs;
+  fs.addFunction([] { delay(10); }, testInterval(2), "func");
+
+  fs.start();
+  delay(1);
+  std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
+  delay(1);
+  std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
+  th1.join();
+  th2.join();
+}