From: Patryk Zaryjewski Date: Mon, 28 Aug 2017 06:35:38 +0000 (-0700) Subject: Make cancelling and rescheduling of functions O(1) X-Git-Tag: v2017.08.28.00 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=979ea5af2431fdb772855e3086be9398b0bbb692;p=folly.git Make cancelling and rescheduling of functions O(1) Summary: Currently FunctionScheduler calls that cancel/restart timer for a function of particular id are O(n). By introducing hashmap that translate id to pointer of particular RepeatFunc, we make it O(1). Reviewed By: simpkins Differential Revision: D5668557 fbshipit-source-id: e5e8bf9bd75b6d5d42f0bfa398d476703e5801fa --- diff --git a/folly/experimental/FunctionScheduler.cpp b/folly/experimental/FunctionScheduler.cpp index 63a2ad7c..cee5a4c4 100644 --- a/folly/experimental/FunctionScheduler.cpp +++ b/folly/experimental/FunctionScheduler.cpp @@ -188,15 +188,13 @@ void FunctionScheduler::addFunctionInternal( } std::unique_lock l(mutex_); + auto it = functionsMap_.find(nameID); // check if the nameID is unique - for (const auto& f : functions_) { - if (f.isValid() && f.name == nameID) { - throw std::invalid_argument( - to("FunctionScheduler: a function named \"", - nameID, - "\" already exists")); - } + if (it != functionsMap_.end() && it->second->isValid()) { + throw std::invalid_argument(to( + "FunctionScheduler: a function named \"", nameID, "\" already exists")); } + if (currentFunction_ && currentFunction_->name == nameID) { throw std::invalid_argument(to( "FunctionScheduler: a function named \"", nameID, "\" already exists")); @@ -204,7 +202,7 @@ void FunctionScheduler::addFunctionInternal( addFunctionToHeap( l, - RepeatFunc( + std::make_unique( std::move(cb), std::move(intervalFunc), nameID, @@ -218,6 +216,7 @@ bool FunctionScheduler::cancelFunctionWithLock( StringPiece nameID) { CHECK_EQ(lock.owns_lock(), true); if (currentFunction_ && currentFunction_->name == nameID) { + functionsMap_.erase(currentFunction_->name); // This function is currently being run. Clear currentFunction_ // The running thread will see this and won't reschedule the function. currentFunction_ = nullptr; @@ -229,17 +228,15 @@ bool FunctionScheduler::cancelFunctionWithLock( bool FunctionScheduler::cancelFunction(StringPiece nameID) { std::unique_lock l(mutex_); - if (cancelFunctionWithLock(l, nameID)) { return true; } - - for (auto it = functions_.begin(); it != functions_.end(); ++it) { - if (it->isValid() && it->name == nameID) { - cancelFunction(l, it); - return true; - } + auto it = functionsMap_.find(nameID); + if (it != functionsMap_.end() && it->second->isValid()) { + cancelFunction(l, it->second); + return true; } + return false; } @@ -251,40 +248,28 @@ bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) { return true; } - for (auto it = functions_.begin(); it != functions_.end(); ++it) { - if (it->isValid() && it->name == nameID) { - cancelFunction(l, it); + auto it = functionsMap_.find(nameID); + if (it != functionsMap_.end() && it->second->isValid()) { + cancelFunction(l, it->second); return true; } - } return false; } void FunctionScheduler::cancelFunction(const std::unique_lock& l, - FunctionHeap::iterator it) { + RepeatFunc* it) { // This function should only be called with mutex_ already locked. DCHECK(l.mutex() == &mutex_); DCHECK(l.owns_lock()); - - if (running_) { - // Internally gcc has an __adjust_heap() function to fill in a hole in the - // heap. Unfortunately it isn't part of the standard API. - // - // For now we just leave the RepeatFunc in our heap, but mark it as unused. - // When its nextTimeInterval comes up, the runner thread will pop it from - // the heap and simply throw it away. - it->cancel(); - } else { - // We're not running, so functions_ doesn't need to be maintained in heap - // order. - functions_.erase(it); - } + functionsMap_.erase(it->name); + it->cancel(); } bool FunctionScheduler::cancelAllFunctionsWithLock( std::unique_lock& lock) { CHECK_EQ(lock.owns_lock(), true); functions_.clear(); + functionsMap_.clear(); if (currentFunction_) { cancellingCurrentFunction_ = true; } @@ -307,25 +292,29 @@ void FunctionScheduler::cancelAllFunctionsAndWait() { bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) { std::unique_lock l(mutex_); if (currentFunction_ && currentFunction_->name == nameID) { - RepeatFunc* funcPtrCopy = currentFunction_; + // TODO: This moves out of RepeatFunc object while folly:Function can + // potentially be executed. This might be unsafe. + auto funcPtrCopy = std::make_unique(std::move(*currentFunction_)); // This function is currently being run. Clear currentFunction_ // to avoid rescheduling it, and add the function again to honor the // startDelay. currentFunction_ = nullptr; - addFunctionToHeap(l, std::move(*funcPtrCopy)); + addFunctionToHeap(l, std::move(funcPtrCopy)); return true; } // Since __adjust_heap() isn't a part of the standard API, there's no way to // fix the heap ordering if we adjust the key (nextRunTime) for the existing // RepeatFunc. Instead, we just cancel it and add an identical object. - for (auto it = functions_.begin(); it != functions_.end(); ++it) { - if (it->isValid() && it->name == nameID) { - RepeatFunc funcCopy(std::move(*it)); - cancelFunction(l, it); - addFunctionToHeap(l, std::move(funcCopy)); - return true; - } + auto it = functionsMap_.find(nameID); + + if (it != functionsMap_.end() && it->second->isValid()) { + auto funcCopy = std::make_unique(std::move(*(it->second))); + it->second->cancel(); + // This will take care of making sure that functionsMap_[it->first] = + // funcCopy. + addFunctionToHeap(l, std::move(funcCopy)); + return true; } return false; } @@ -341,11 +330,11 @@ bool FunctionScheduler::start() { auto now = steady_clock::now(); // Reset the next run time. for all functions. // note: this is needed since one can shutdown() and start() again - for (auto& f : functions_) { - f.resetNextRunTime(now); - VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str()) - << ", period = " << f.intervalDescr - << ", delay = " << f.startDelay.count() << "ms"; + for (const auto& f : functions_) { + f->resetNextRunTime(now); + VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str()) + << ", period = " << f->intervalDescr + << ", delay = " << f->startDelay.count() << "ms"; } std::make_heap(functions_.begin(), functions_.end(), fnCmp_); @@ -391,12 +380,12 @@ void FunctionScheduler::run() { // Check to see if the function was cancelled. // If so, just remove it and continue around the loop. - if (!functions_.back().isValid()) { + if (!functions_.back()->isValid()) { functions_.pop_back(); continue; } - auto sleepTime = functions_.back().getNextRunTime() - now; + auto sleepTime = functions_.back()->getNextRunTime() - now; if (sleepTime < milliseconds::zero()) { // We need to run this function now runOneFunction(lock, now); @@ -420,25 +409,24 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, // Fully remove it from functions_ now. // We need to release mutex_ while we invoke this function, and we need to // maintain the heap property on functions_ while mutex_ is unlocked. - RepeatFunc func(std::move(functions_.back())); + auto func = std::move(functions_.back()); functions_.pop_back(); - if (!func.cb) { - VLOG(5) << func.name << "function has been canceled while waiting"; + if (!func->cb) { + VLOG(5) << func->name << "function has been canceled while waiting"; return; } - currentFunction_ = &func; - + currentFunction_ = func.get(); // Update the function's next run time. if (steady_) { // This allows scheduler to catch up - func.setNextRunTimeSteady(); + func->setNextRunTimeSteady(); } else { // Note that we set nextRunTime based on the current time where we started // the function call, rather than the time when the function finishes. // This ensures that we call the function once every time interval, as // opposed to waiting time interval seconds between calls. (These can be // different if the function takes a significant amount of time to run.) - func.setNextRunTimeStrict(now); + func->setNextRunTimeStrict(now); } // Release the lock while we invoke the user's function @@ -446,11 +434,11 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, // Invoke the function try { - VLOG(5) << "Now running " << func.name; - func.cb(); + VLOG(5) << "Now running " << func->name; + func->cb(); } catch (const std::exception& ex) { LOG(ERROR) << "Error running the scheduled function <" - << func.name << ">: " << exceptionStr(ex); + << func->name << ">: " << exceptionStr(ex); } // Re-acquire the lock @@ -464,17 +452,19 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, } if (currentFunction_->runOnce) { // Don't reschedule if the function only needed to run once. + functionsMap_.erase(currentFunction_->name); currentFunction_ = nullptr; return; } - // Clear currentFunction_ - CHECK_EQ(currentFunction_, &func); - currentFunction_ = nullptr; // Re-insert the function into our functions_ heap. // We only maintain the heap property while running_ is set. (running_ may // have been cleared while we were invoking the user's function.) functions_.push_back(std::move(func)); + + // Clear currentFunction_ + currentFunction_ = nullptr; + if (running_) { std::push_heap(functions_.begin(), functions_.end(), fnCmp_); } @@ -482,14 +472,15 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, void FunctionScheduler::addFunctionToHeap( const std::unique_lock& lock, - RepeatFunc&& func) { + std::unique_ptr func) { // This function should only be called with mutex_ already locked. DCHECK(lock.mutex() == &mutex_); DCHECK(lock.owns_lock()); - functions_.emplace_back(std::move(func)); + functions_.push_back(std::move(func)); + functionsMap_[functions_.back()->name] = functions_.back().get(); if (running_) { - functions_.back().resetNextRunTime(steady_clock::now()); + functions_.back()->resetNextRunTime(steady_clock::now()); std::push_heap(functions_.begin(), functions_.end(), fnCmp_); // Signal the running thread to wake up and see if it needs to change // its current scheduling decision. diff --git a/folly/experimental/FunctionScheduler.h b/folly/experimental/FunctionScheduler.h index 6e5b2198..fbc73993 100644 --- a/folly/experimental/FunctionScheduler.h +++ b/folly/experimental/FunctionScheduler.h @@ -18,11 +18,13 @@ #include #include +#include #include #include #include #include #include +#include namespace folly { @@ -241,20 +243,21 @@ class FunctionScheduler { }; struct RunTimeOrder { - bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const { - return f1.getNextRunTime() > f2.getNextRunTime(); + bool operator()(const std::unique_ptr& f1, const std::unique_ptr& f2) const { + return f1->getNextRunTime() > f2->getNextRunTime(); } }; - typedef std::vector FunctionHeap; + typedef std::vector> FunctionHeap; + typedef std::unordered_map FunctionMap; void run(); void runOneFunction(std::unique_lock& lock, std::chrono::steady_clock::time_point now); void cancelFunction(const std::unique_lock& lock, - FunctionHeap::iterator it); + RepeatFunc* it); void addFunctionToHeap(const std::unique_lock& lock, - RepeatFunc&& func); + std::unique_ptr func); void addFunctionInternal( Function&& cb, @@ -279,6 +282,7 @@ class FunctionScheduler { // The functions to run. // This is a heap, ordered by next run time. FunctionHeap functions_; + FunctionMap functionsMap_; RunTimeOrder fnCmp_; // The function currently being invoked by the running thread.