Make cancelling and rescheduling of functions O(1) v2017.08.28.00
authorPatryk Zaryjewski <patzar@fb.com>
Mon, 28 Aug 2017 06:35:38 +0000 (23:35 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Mon, 28 Aug 2017 07:00:42 +0000 (00:00 -0700)
Summary: Currently FunctionScheduler calls that cancel/restart timer for a function of particular id are O(n). By introducing hashmap that translate id to pointer of particular RepeatFunc, we make it O(1).

Reviewed By: simpkins

Differential Revision: D5668557

fbshipit-source-id: e5e8bf9bd75b6d5d42f0bfa398d476703e5801fa

folly/experimental/FunctionScheduler.cpp
folly/experimental/FunctionScheduler.h

index 63a2ad7c0a67706a30ae20122c9a6fb0d80d1f6b..cee5a4c4b4511f533dc3546383814ab4cb2e9bd3 100644 (file)
@@ -188,15 +188,13 @@ void FunctionScheduler::addFunctionInternal(
   }
 
   std::unique_lock<std::mutex> l(mutex_);
+  auto it = functionsMap_.find(nameID);
   // check if the nameID is unique
-  for (const auto& f : functions_) {
-    if (f.isValid() && f.name == nameID) {
-      throw std::invalid_argument(
-          to<std::string>("FunctionScheduler: a function named \"",
-                          nameID,
-                          "\" already exists"));
-    }
+  if (it != functionsMap_.end() && it->second->isValid()) {
+    throw std::invalid_argument(to<std::string>(
+        "FunctionScheduler: a function named \"", nameID, "\" already exists"));
   }
+
   if (currentFunction_ && currentFunction_->name == nameID) {
     throw std::invalid_argument(to<std::string>(
         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
@@ -204,7 +202,7 @@ void FunctionScheduler::addFunctionInternal(
 
   addFunctionToHeap(
       l,
-      RepeatFunc(
+      std::make_unique<RepeatFunc>(
           std::move(cb),
           std::move(intervalFunc),
           nameID,
@@ -218,6 +216,7 @@ bool FunctionScheduler::cancelFunctionWithLock(
     StringPiece nameID) {
   CHECK_EQ(lock.owns_lock(), true);
   if (currentFunction_ && currentFunction_->name == nameID) {
+    functionsMap_.erase(currentFunction_->name);
     // This function is currently being run. Clear currentFunction_
     // The running thread will see this and won't reschedule the function.
     currentFunction_ = nullptr;
@@ -229,17 +228,15 @@ bool FunctionScheduler::cancelFunctionWithLock(
 
 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
   std::unique_lock<std::mutex> l(mutex_);
-
   if (cancelFunctionWithLock(l, nameID)) {
     return true;
   }
-
-  for (auto it = functions_.begin(); it != functions_.end(); ++it) {
-    if (it->isValid() && it->name == nameID) {
-      cancelFunction(l, it);
-      return true;
-    }
+  auto it = functionsMap_.find(nameID);
+  if (it != functionsMap_.end() && it->second->isValid()) {
+    cancelFunction(l, it->second);
+    return true;
   }
+
   return false;
 }
 
@@ -251,40 +248,28 @@ bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
     return true;
   }
 
-  for (auto it = functions_.begin(); it != functions_.end(); ++it) {
-    if (it->isValid() && it->name == nameID) {
-      cancelFunction(l, it);
+  auto it = functionsMap_.find(nameID);
+    if (it != functionsMap_.end() && it->second->isValid()) {
+      cancelFunction(l, it->second);
       return true;
     }
-  }
   return false;
 }
 
 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
-                                       FunctionHeap::iterator it) {
+                                      RepeatFunc* it) {
   // This function should only be called with mutex_ already locked.
   DCHECK(l.mutex() == &mutex_);
   DCHECK(l.owns_lock());
-
-  if (running_) {
-    // Internally gcc has an __adjust_heap() function to fill in a hole in the
-    // heap.  Unfortunately it isn't part of the standard API.
-    //
-    // For now we just leave the RepeatFunc in our heap, but mark it as unused.
-    // When its nextTimeInterval comes up, the runner thread will pop it from
-    // the heap and simply throw it away.
-    it->cancel();
-  } else {
-    // We're not running, so functions_ doesn't need to be maintained in heap
-    // order.
-    functions_.erase(it);
-  }
+  functionsMap_.erase(it->name);
+  it->cancel();
 }
 
 bool FunctionScheduler::cancelAllFunctionsWithLock(
     std::unique_lock<std::mutex>& lock) {
   CHECK_EQ(lock.owns_lock(), true);
   functions_.clear();
+  functionsMap_.clear();
   if (currentFunction_) {
     cancellingCurrentFunction_ = true;
   }
@@ -307,25 +292,29 @@ void FunctionScheduler::cancelAllFunctionsAndWait() {
 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
   std::unique_lock<std::mutex> l(mutex_);
   if (currentFunction_ && currentFunction_->name == nameID) {
-    RepeatFunc* funcPtrCopy = currentFunction_;
+    // TODO: This moves out of RepeatFunc object while folly:Function can
+    // potentially be executed. This might be unsafe.
+    auto funcPtrCopy = std::make_unique<RepeatFunc>(std::move(*currentFunction_));
     // This function is currently being run. Clear currentFunction_
     // to avoid rescheduling it, and add the function again to honor the
     // startDelay.
     currentFunction_ = nullptr;
-    addFunctionToHeap(l, std::move(*funcPtrCopy));
+    addFunctionToHeap(l, std::move(funcPtrCopy));
     return true;
   }
 
   // Since __adjust_heap() isn't a part of the standard API, there's no way to
   // fix the heap ordering if we adjust the key (nextRunTime) for the existing
   // RepeatFunc. Instead, we just cancel it and add an identical object.
-  for (auto it = functions_.begin(); it != functions_.end(); ++it) {
-    if (it->isValid() && it->name == nameID) {
-      RepeatFunc funcCopy(std::move(*it));
-      cancelFunction(l, it);
-      addFunctionToHeap(l, std::move(funcCopy));
-      return true;
-    }
+  auto it = functionsMap_.find(nameID);
+
+  if (it != functionsMap_.end() && it->second->isValid()) {
+    auto funcCopy = std::make_unique<RepeatFunc>(std::move(*(it->second)));
+    it->second->cancel();
+    // This will take care of making sure that functionsMap_[it->first] =
+    // funcCopy.
+    addFunctionToHeap(l, std::move(funcCopy));
+    return true;
   }
   return false;
 }
@@ -341,11 +330,11 @@ bool FunctionScheduler::start() {
   auto now = steady_clock::now();
   // Reset the next run time. for all functions.
   // note: this is needed since one can shutdown() and start() again
-  for (auto& f : functions_) {
-    f.resetNextRunTime(now);
-    VLOG(1) << "   - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
-            << ", period = " << f.intervalDescr
-            << ", delay = " << f.startDelay.count() << "ms";
+  for (const auto& f : functions_) {
+    f->resetNextRunTime(now);
+    VLOG(1) << "   - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
+            << ", period = " << f->intervalDescr
+            << ", delay = " << f->startDelay.count() << "ms";
   }
   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
 
@@ -391,12 +380,12 @@ void FunctionScheduler::run() {
 
     // Check to see if the function was cancelled.
     // If so, just remove it and continue around the loop.
-    if (!functions_.back().isValid()) {
+    if (!functions_.back()->isValid()) {
       functions_.pop_back();
       continue;
     }
 
-    auto sleepTime = functions_.back().getNextRunTime() - now;
+    auto sleepTime = functions_.back()->getNextRunTime() - now;
     if (sleepTime < milliseconds::zero()) {
       // We need to run this function now
       runOneFunction(lock, now);
@@ -420,25 +409,24 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   // Fully remove it from functions_ now.
   // We need to release mutex_ while we invoke this function, and we need to
   // maintain the heap property on functions_ while mutex_ is unlocked.
-  RepeatFunc func(std::move(functions_.back()));
+  auto func = std::move(functions_.back());
   functions_.pop_back();
-  if (!func.cb) {
-    VLOG(5) << func.name << "function has been canceled while waiting";
+  if (!func->cb) {
+    VLOG(5) << func->name << "function has been canceled while waiting";
     return;
   }
-  currentFunction_ = &func;
-
+  currentFunction_ = func.get();
   // Update the function's next run time.
   if (steady_) {
     // This allows scheduler to catch up
-    func.setNextRunTimeSteady();
+    func->setNextRunTimeSteady();
   } else {
     // Note that we set nextRunTime based on the current time where we started
     // the function call, rather than the time when the function finishes.
     // This ensures that we call the function once every time interval, as
     // opposed to waiting time interval seconds between calls.  (These can be
     // different if the function takes a significant amount of time to run.)
-    func.setNextRunTimeStrict(now);
+    func->setNextRunTimeStrict(now);
   }
 
   // Release the lock while we invoke the user's function
@@ -446,11 +434,11 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
 
   // Invoke the function
   try {
-    VLOG(5) << "Now running " << func.name;
-    func.cb();
+    VLOG(5) << "Now running " << func->name;
+    func->cb();
   } catch (const std::exception& ex) {
     LOG(ERROR) << "Error running the scheduled function <"
-      << func.name << ">: " << exceptionStr(ex);
+      << func->name << ">: " << exceptionStr(ex);
   }
 
   // Re-acquire the lock
@@ -464,17 +452,19 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   }
   if (currentFunction_->runOnce) {
     // Don't reschedule if the function only needed to run once.
+    functionsMap_.erase(currentFunction_->name);
     currentFunction_ = nullptr;
     return;
   }
-  // Clear currentFunction_
-  CHECK_EQ(currentFunction_, &func);
-  currentFunction_ = nullptr;
 
   // Re-insert the function into our functions_ heap.
   // We only maintain the heap property while running_ is set.  (running_ may
   // have been cleared while we were invoking the user's function.)
   functions_.push_back(std::move(func));
+
+  // Clear currentFunction_
+  currentFunction_ = nullptr;
+
   if (running_) {
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
   }
@@ -482,14 +472,15 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
 
 void FunctionScheduler::addFunctionToHeap(
     const std::unique_lock<std::mutex>& lock,
-    RepeatFunc&& func) {
+    std::unique_ptr<RepeatFunc> func) {
   // This function should only be called with mutex_ already locked.
   DCHECK(lock.mutex() == &mutex_);
   DCHECK(lock.owns_lock());
 
-  functions_.emplace_back(std::move(func));
+  functions_.push_back(std::move(func));
+  functionsMap_[functions_.back()->name] = functions_.back().get();
   if (running_) {
-    functions_.back().resetNextRunTime(steady_clock::now());
+    functions_.back()->resetNextRunTime(steady_clock::now());
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
     // Signal the running thread to wake up and see if it needs to change
     // its current scheduling decision.
index 6e5b2198e7946a67d86f17fc996921897d272630..fbc739936550ce9eb1ef6731e31b8a8d8954edc6 100644 (file)
 
 #include <folly/Function.h>
 #include <folly/Range.h>
+#include <folly/Hash.h>
 #include <chrono>
 #include <condition_variable>
 #include <mutex>
 #include <thread>
 #include <vector>
+#include <unordered_map>
 
 namespace folly {
 
@@ -241,20 +243,21 @@ class FunctionScheduler {
   };
 
   struct RunTimeOrder {
-    bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
-      return f1.getNextRunTime() > f2.getNextRunTime();
+    bool operator()(const std::unique_ptr<RepeatFunc>& f1, const std::unique_ptr<RepeatFunc>& f2) const {
+      return f1->getNextRunTime() > f2->getNextRunTime();
     }
   };
 
-  typedef std::vector<RepeatFunc> FunctionHeap;
+  typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
+  typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
 
   void run();
   void runOneFunction(std::unique_lock<std::mutex>& lock,
                       std::chrono::steady_clock::time_point now);
   void cancelFunction(const std::unique_lock<std::mutex>& lock,
-                      FunctionHeap::iterator it);
+                      RepeatFunc* it);
   void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
-                         RepeatFunc&& func);
+                         std::unique_ptr<RepeatFunc> func);
 
   void addFunctionInternal(
       Function<void()>&& cb,
@@ -279,6 +282,7 @@ class FunctionScheduler {
   // The functions to run.
   // This is a heap, ordered by next run time.
   FunctionHeap functions_;
+  FunctionMap functionsMap_;
   RunTimeOrder fnCmp_;
 
   // The function currently being invoked by the running thread.