runOnce));
}
-bool FunctionScheduler::cancelFunction(StringPiece nameID) {
- std::unique_lock<std::mutex> l(mutex_);
-
+bool FunctionScheduler::cancelFunctionWithLock(
+ std::unique_lock<std::mutex>& lock,
+ StringPiece nameID) {
+ CHECK_EQ(lock.owns_lock(), true);
if (currentFunction_ && currentFunction_->name == nameID) {
// This function is currently being run. Clear currentFunction_
// The running thread will see this and won't reschedule the function.
currentFunction_ = nullptr;
+ cancellingCurrentFunction_ = true;
+ return true;
+ }
+ return false;
+}
+
+bool FunctionScheduler::cancelFunction(StringPiece nameID) {
+ std::unique_lock<std::mutex> l(mutex_);
+
+ if (cancelFunctionWithLock(l, nameID)) {
return true;
}
bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
- auto* currentFunction = currentFunction_;
- if (currentFunction && currentFunction->name == nameID) {
- runningCondvar_.wait(l, [currentFunction, this]() {
- return currentFunction != currentFunction_;
- });
+ if (cancelFunctionWithLock(l, nameID)) {
+ runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
+ return true;
}
for (auto it = functions_.begin(); it != functions_.end(); ++it) {
}
}
-void FunctionScheduler::cancelAllFunctions() {
- std::unique_lock<std::mutex> l(mutex_);
+bool FunctionScheduler::cancelAllFunctionsWithLock(
+ std::unique_lock<std::mutex>& lock) {
+ CHECK_EQ(lock.owns_lock(), true);
functions_.clear();
+ if (currentFunction_) {
+ cancellingCurrentFunction_ = true;
+ }
currentFunction_ = nullptr;
+ return cancellingCurrentFunction_;
+}
+
+void FunctionScheduler::cancelAllFunctions() {
+ std::unique_lock<std::mutex> l(mutex_);
+ cancelAllFunctionsWithLock(l);
}
void FunctionScheduler::cancelAllFunctionsAndWait() {
std::unique_lock<std::mutex> l(mutex_);
- if (currentFunction_) {
- runningCondvar_.wait(l, [this]() { return currentFunction_ == nullptr; });
+ if (cancelAllFunctionsWithLock(l)) {
+ runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
}
- functions_.clear();
}
bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
if (!currentFunction_) {
// The function was cancelled while we were running it.
// We shouldn't reschedule it;
+ cancellingCurrentFunction_ = false;
return;
}
if (currentFunction_->runOnce) {
#include <cassert>
#include <random>
+#include <folly/Baton.h>
#include <folly/Random.h>
#include <folly/experimental/FunctionScheduler.h>
#include <folly/portability/GTest.h>
EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
fs.shutdown();
}
+
+TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
+ folly::Baton<> baton;
+ std::thread th([&baton]() {
+ FunctionScheduler fs;
+ fs.addFunction([] { delay(10); }, testInterval(2), "func");
+ fs.start();
+ delay(1);
+ EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
+ baton.post();
+ });
+
+ ASSERT_TRUE(baton.timed_wait(testInterval(15)));
+ th.join();
+}
+
+TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
+ folly::Baton<> baton;
+ std::thread th([&baton]() {
+ FunctionScheduler fs;
+ fs.addFunction([] { delay(10); }, testInterval(2), "func");
+ fs.start();
+ delay(1);
+ fs.cancelAllFunctionsAndWait();
+ baton.post();
+ });
+
+ ASSERT_TRUE(baton.timed_wait(testInterval(15)));
+ th.join();
+}
+
+TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
+ folly::Baton<> baton;
+ std::thread th([&baton]() {
+ std::atomic<int> nExecuted(0);
+ FunctionScheduler fs;
+ fs.addFunction(
+ [&nExecuted] {
+ nExecuted++;
+ delay(10);
+ },
+ testInterval(2),
+ "func0");
+ fs.addFunction(
+ [&nExecuted] {
+ nExecuted++;
+ delay(10);
+ },
+ testInterval(2),
+ "func1",
+ testInterval(5));
+ fs.start();
+ delay(1);
+ fs.cancelAllFunctionsAndWait();
+ EXPECT_EQ(nExecuted, 1);
+ baton.post();
+ });
+
+ ASSERT_TRUE(baton.timed_wait(testInterval(15)));
+ th.join();
+}
+
+TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
+ FunctionScheduler fs;
+ fs.addFunction([] { delay(10); }, testInterval(2), "func");
+
+ fs.start();
+ delay(1);
+ std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
+ delay(1);
+ std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
+ th1.join();
+ th2.join();
+}