}
std::unique_lock<std::mutex> l(mutex_);
+ auto it = functionsMap_.find(nameID);
// check if the nameID is unique
- for (const auto& f : functions_) {
- if (f.isValid() && f.name == nameID) {
- throw std::invalid_argument(
- to<std::string>("FunctionScheduler: a function named \"",
- nameID,
- "\" already exists"));
- }
+ if (it != functionsMap_.end() && it->second->isValid()) {
+ throw std::invalid_argument(to<std::string>(
+ "FunctionScheduler: a function named \"", nameID, "\" already exists"));
}
+
if (currentFunction_ && currentFunction_->name == nameID) {
throw std::invalid_argument(to<std::string>(
"FunctionScheduler: a function named \"", nameID, "\" already exists"));
addFunctionToHeap(
l,
- RepeatFunc(
+ std::make_unique<RepeatFunc>(
std::move(cb),
std::move(intervalFunc),
nameID,
StringPiece nameID) {
CHECK_EQ(lock.owns_lock(), true);
if (currentFunction_ && currentFunction_->name == nameID) {
+ functionsMap_.erase(currentFunction_->name);
// This function is currently being run. Clear currentFunction_
// The running thread will see this and won't reschedule the function.
currentFunction_ = nullptr;
bool FunctionScheduler::cancelFunction(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
-
if (cancelFunctionWithLock(l, nameID)) {
return true;
}
-
- for (auto it = functions_.begin(); it != functions_.end(); ++it) {
- if (it->isValid() && it->name == nameID) {
- cancelFunction(l, it);
- return true;
- }
+ auto it = functionsMap_.find(nameID);
+ if (it != functionsMap_.end() && it->second->isValid()) {
+ cancelFunction(l, it->second);
+ return true;
}
+
return false;
}
return true;
}
- for (auto it = functions_.begin(); it != functions_.end(); ++it) {
- if (it->isValid() && it->name == nameID) {
- cancelFunction(l, it);
+ auto it = functionsMap_.find(nameID);
+ if (it != functionsMap_.end() && it->second->isValid()) {
+ cancelFunction(l, it->second);
return true;
}
- }
return false;
}
void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
- FunctionHeap::iterator it) {
+ RepeatFunc* it) {
// This function should only be called with mutex_ already locked.
DCHECK(l.mutex() == &mutex_);
DCHECK(l.owns_lock());
-
- if (running_) {
- // Internally gcc has an __adjust_heap() function to fill in a hole in the
- // heap. Unfortunately it isn't part of the standard API.
- //
- // For now we just leave the RepeatFunc in our heap, but mark it as unused.
- // When its nextTimeInterval comes up, the runner thread will pop it from
- // the heap and simply throw it away.
- it->cancel();
- } else {
- // We're not running, so functions_ doesn't need to be maintained in heap
- // order.
- functions_.erase(it);
- }
+ functionsMap_.erase(it->name);
+ it->cancel();
}
bool FunctionScheduler::cancelAllFunctionsWithLock(
std::unique_lock<std::mutex>& lock) {
CHECK_EQ(lock.owns_lock(), true);
functions_.clear();
+ functionsMap_.clear();
if (currentFunction_) {
cancellingCurrentFunction_ = true;
}
bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
if (currentFunction_ && currentFunction_->name == nameID) {
- RepeatFunc* funcPtrCopy = currentFunction_;
+ // TODO: This moves out of RepeatFunc object while folly:Function can
+ // potentially be executed. This might be unsafe.
+ auto funcPtrCopy = std::make_unique<RepeatFunc>(std::move(*currentFunction_));
// This function is currently being run. Clear currentFunction_
// to avoid rescheduling it, and add the function again to honor the
// startDelay.
currentFunction_ = nullptr;
- addFunctionToHeap(l, std::move(*funcPtrCopy));
+ addFunctionToHeap(l, std::move(funcPtrCopy));
return true;
}
// Since __adjust_heap() isn't a part of the standard API, there's no way to
// fix the heap ordering if we adjust the key (nextRunTime) for the existing
// RepeatFunc. Instead, we just cancel it and add an identical object.
- for (auto it = functions_.begin(); it != functions_.end(); ++it) {
- if (it->isValid() && it->name == nameID) {
- RepeatFunc funcCopy(std::move(*it));
- cancelFunction(l, it);
- addFunctionToHeap(l, std::move(funcCopy));
- return true;
- }
+ auto it = functionsMap_.find(nameID);
+
+ if (it != functionsMap_.end() && it->second->isValid()) {
+ auto funcCopy = std::make_unique<RepeatFunc>(std::move(*(it->second)));
+ it->second->cancel();
+ // This will take care of making sure that functionsMap_[it->first] =
+ // funcCopy.
+ addFunctionToHeap(l, std::move(funcCopy));
+ return true;
}
return false;
}
auto now = steady_clock::now();
// Reset the next run time. for all functions.
// note: this is needed since one can shutdown() and start() again
- for (auto& f : functions_) {
- f.resetNextRunTime(now);
- VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
- << ", period = " << f.intervalDescr
- << ", delay = " << f.startDelay.count() << "ms";
+ for (const auto& f : functions_) {
+ f->resetNextRunTime(now);
+ VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
+ << ", period = " << f->intervalDescr
+ << ", delay = " << f->startDelay.count() << "ms";
}
std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
// Check to see if the function was cancelled.
// If so, just remove it and continue around the loop.
- if (!functions_.back().isValid()) {
+ if (!functions_.back()->isValid()) {
functions_.pop_back();
continue;
}
- auto sleepTime = functions_.back().getNextRunTime() - now;
+ auto sleepTime = functions_.back()->getNextRunTime() - now;
if (sleepTime < milliseconds::zero()) {
// We need to run this function now
runOneFunction(lock, now);
// Fully remove it from functions_ now.
// We need to release mutex_ while we invoke this function, and we need to
// maintain the heap property on functions_ while mutex_ is unlocked.
- RepeatFunc func(std::move(functions_.back()));
+ auto func = std::move(functions_.back());
functions_.pop_back();
- if (!func.cb) {
- VLOG(5) << func.name << "function has been canceled while waiting";
+ if (!func->cb) {
+ VLOG(5) << func->name << "function has been canceled while waiting";
return;
}
- currentFunction_ = &func;
-
+ currentFunction_ = func.get();
// Update the function's next run time.
if (steady_) {
// This allows scheduler to catch up
- func.setNextRunTimeSteady();
+ func->setNextRunTimeSteady();
} else {
// Note that we set nextRunTime based on the current time where we started
// the function call, rather than the time when the function finishes.
// This ensures that we call the function once every time interval, as
// opposed to waiting time interval seconds between calls. (These can be
// different if the function takes a significant amount of time to run.)
- func.setNextRunTimeStrict(now);
+ func->setNextRunTimeStrict(now);
}
// Release the lock while we invoke the user's function
// Invoke the function
try {
- VLOG(5) << "Now running " << func.name;
- func.cb();
+ VLOG(5) << "Now running " << func->name;
+ func->cb();
} catch (const std::exception& ex) {
LOG(ERROR) << "Error running the scheduled function <"
- << func.name << ">: " << exceptionStr(ex);
+ << func->name << ">: " << exceptionStr(ex);
}
// Re-acquire the lock
}
if (currentFunction_->runOnce) {
// Don't reschedule if the function only needed to run once.
+ functionsMap_.erase(currentFunction_->name);
currentFunction_ = nullptr;
return;
}
- // Clear currentFunction_
- CHECK_EQ(currentFunction_, &func);
- currentFunction_ = nullptr;
// Re-insert the function into our functions_ heap.
// We only maintain the heap property while running_ is set. (running_ may
// have been cleared while we were invoking the user's function.)
functions_.push_back(std::move(func));
+
+ // Clear currentFunction_
+ currentFunction_ = nullptr;
+
if (running_) {
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
}
void FunctionScheduler::addFunctionToHeap(
const std::unique_lock<std::mutex>& lock,
- RepeatFunc&& func) {
+ std::unique_ptr<RepeatFunc> func) {
// This function should only be called with mutex_ already locked.
DCHECK(lock.mutex() == &mutex_);
DCHECK(lock.owns_lock());
- functions_.emplace_back(std::move(func));
+ functions_.push_back(std::move(func));
+ functionsMap_[functions_.back()->name] = functions_.back().get();
if (running_) {
- functions_.back().resetNextRunTime(steady_clock::now());
+ functions_.back()->resetNextRunTime(steady_clock::now());
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
// Signal the running thread to wake up and see if it needs to change
// its current scheduling decision.