2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/FunctionScheduler.h>
21 #include <folly/Conv.h>
22 #include <folly/Random.h>
23 #include <folly/String.h>
24 #include <folly/ThreadName.h>
26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
33 struct ConstIntervalFunctor {
34 const milliseconds constInterval;
36 explicit ConstIntervalFunctor(milliseconds interval)
37 : constInterval(interval) {
38 if (interval < milliseconds::zero()) {
39 throw std::invalid_argument(
41 "time interval must be non-negative");
45 milliseconds operator()() const { return constInterval; }
48 struct PoissonDistributionFunctor {
49 std::default_random_engine generator;
50 std::poisson_distribution<int> poissonRandom;
52 explicit PoissonDistributionFunctor(double meanPoissonMs)
53 : poissonRandom(meanPoissonMs) {
54 if (meanPoissonMs < 0.0) {
55 throw std::invalid_argument(
57 "Poisson mean interval must be non-negative");
61 milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
64 struct UniformDistributionFunctor {
65 std::default_random_engine generator;
66 std::uniform_int_distribution<> dist;
68 UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
69 : generator(Random::rand32()),
70 dist(minInterval.count(), maxInterval.count()) {
71 if (minInterval > maxInterval) {
72 throw std::invalid_argument(
74 "min time interval must be less or equal than max interval");
76 if (minInterval < milliseconds::zero()) {
77 throw std::invalid_argument(
79 "time interval must be non-negative");
83 milliseconds operator()() { return milliseconds(dist(generator)); }
86 } // anonymous namespace
88 FunctionScheduler::FunctionScheduler() {}
90 FunctionScheduler::~FunctionScheduler() {
91 // make sure to stop the thread (if running)
95 void FunctionScheduler::addFunction(const std::function<void()>& cb,
96 milliseconds interval,
98 milliseconds startDelay) {
99 addFunctionGenericDistribution(
101 IntervalDistributionFunc(ConstIntervalFunctor(interval)),
103 to<std::string>(interval.count(), "ms"),
107 void FunctionScheduler::addFunction(const std::function<void()>& cb,
108 milliseconds interval,
109 const LatencyDistribution& latencyDistr,
111 milliseconds startDelay) {
112 if (latencyDistr.isPoisson) {
113 addFunctionGenericDistribution(
115 IntervalDistributionFunc(
116 PoissonDistributionFunctor(latencyDistr.poissonMean)),
118 to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
121 addFunction(cb, interval, nameID, startDelay);
125 void FunctionScheduler::addFunctionUniformDistribution(
126 const std::function<void()>& cb,
127 milliseconds minInterval,
128 milliseconds maxInterval,
130 milliseconds startDelay) {
131 addFunctionGenericDistribution(
133 IntervalDistributionFunc(
134 UniformDistributionFunctor(minInterval, maxInterval)),
137 "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
141 void FunctionScheduler::addFunctionGenericDistribution(
142 const std::function<void()>& cb,
143 const IntervalDistributionFunc& intervalFunc,
144 const std::string& nameID,
145 const std::string& intervalDescr,
146 milliseconds startDelay) {
148 throw std::invalid_argument(
149 "FunctionScheduler: Scheduled function must be set");
152 throw std::invalid_argument(
153 "FunctionScheduler: interval distribution function must be set");
155 if (startDelay < milliseconds::zero()) {
156 throw std::invalid_argument(
157 "FunctionScheduler: start delay must be non-negative");
160 std::unique_lock<std::mutex> l(mutex_);
161 // check if the nameID is unique
162 for (const auto& f : functions_) {
163 if (f.isValid() && f.name == nameID) {
164 throw std::invalid_argument(
165 to<std::string>("FunctionScheduler: a function named \"",
167 "\" already exists"));
170 if (currentFunction_ && currentFunction_->name == nameID) {
171 throw std::invalid_argument(to<std::string>(
172 "FunctionScheduler: a function named \"", nameID, "\" already exists"));
176 l, RepeatFunc(cb, intervalFunc, nameID, intervalDescr, startDelay));
179 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
180 std::unique_lock<std::mutex> l(mutex_);
182 if (currentFunction_ && currentFunction_->name == nameID) {
183 // This function is currently being run. Clear currentFunction_
184 // The running thread will see this and won't reschedule the function.
185 currentFunction_ = nullptr;
189 for (auto it = functions_.begin(); it != functions_.end(); ++it) {
190 if (it->isValid() && it->name == nameID) {
191 cancelFunction(l, it);
198 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
199 FunctionHeap::iterator it) {
200 // This function should only be called with mutex_ already locked.
201 DCHECK(l.mutex() == &mutex_);
202 DCHECK(l.owns_lock());
205 // Internally gcc has an __adjust_heap() function to fill in a hole in the
206 // heap. Unfortunately it isn't part of the standard API.
208 // For now we just leave the RepeatFunc in our heap, but mark it as unused.
209 // When its nextTimeInterval comes up, the runner thread will pop it from
210 // the heap and simply throw it away.
213 // We're not running, so functions_ doesn't need to be maintained in heap
215 functions_.erase(it);
219 void FunctionScheduler::cancelAllFunctions() {
220 std::unique_lock<std::mutex> l(mutex_);
224 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
225 std::unique_lock<std::mutex> l(mutex_);
226 if (currentFunction_ && currentFunction_->name == nameID) {
227 RepeatFunc* funcPtrCopy = currentFunction_;
228 // This function is currently being run. Clear currentFunction_
229 // to avoid rescheduling it, and add the function again to honor the
231 currentFunction_ = nullptr;
232 addFunctionToHeap(l, std::move(*funcPtrCopy));
236 // Since __adjust_heap() isn't a part of the standard API, there's no way to
237 // fix the heap ordering if we adjust the key (nextRunTime) for the existing
238 // RepeatFunc. Instead, we just cancel it and add an identical object.
239 for (auto it = functions_.begin(); it != functions_.end(); ++it) {
240 if (it->isValid() && it->name == nameID) {
241 RepeatFunc funcCopy(std::move(*it));
242 cancelFunction(l, it);
243 addFunctionToHeap(l, std::move(funcCopy));
250 bool FunctionScheduler::start() {
251 std::unique_lock<std::mutex> l(mutex_);
258 VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
260 auto now = steady_clock::now();
261 // Reset the next run time. for all functions.
262 // note: this is needed since one can shutdown() and start() again
263 for (auto& f : functions_) {
264 f.resetNextRunTime(now);
265 VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
266 << ", period = " << f.intervalDescr
267 << ", delay = " << f.startDelay.count() << "ms";
269 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
271 thread_ = std::thread([&] { this->run(); });
275 void FunctionScheduler::shutdown() {
277 std::lock_guard<std::mutex> g(mutex_);
283 runningCondvar_.notify_one();
288 void FunctionScheduler::run() {
289 std::unique_lock<std::mutex> lock(mutex_);
291 if (!threadName_.empty()) {
292 folly::setThreadName(threadName_);
296 // If we have nothing to run, wait until a function is added or until we
298 if (functions_.empty()) {
299 runningCondvar_.wait(lock);
303 auto now = steady_clock::now();
305 // Move the next function to run to the end of functions_
306 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
308 // Check to see if the function was cancelled.
309 // If so, just remove it and continue around the loop.
310 if (!functions_.back().isValid()) {
311 functions_.pop_back();
315 auto sleepTime = functions_.back().getNextRunTime() - now;
316 if (sleepTime < milliseconds::zero()) {
317 // We need to run this function now
318 runOneFunction(lock, now);
320 // Re-add the function to the heap, and wait until we actually
322 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
323 runningCondvar_.wait_for(lock, sleepTime);
328 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
329 steady_clock::time_point now) {
330 DCHECK(lock.mutex() == &mutex_);
331 DCHECK(lock.owns_lock());
333 // The function to run will be at the end of functions_ already.
335 // Fully remove it from functions_ now.
336 // We need to release mutex_ while we invoke this function, and we need to
337 // maintain the heap property on functions_ while mutex_ is unlocked.
338 RepeatFunc func(std::move(functions_.back()));
339 functions_.pop_back();
341 VLOG(5) << func.name << "function has been canceled while waiting";
344 currentFunction_ = &func;
346 // Update the function's next run time.
348 // This allows scheduler to catch up
349 func.setNextRunTimeSteady();
351 // Note that we set nextRunTime based on the current time where we started
352 // the function call, rather than the time when the function finishes.
353 // This ensures that we call the function once every time interval, as
354 // opposed to waiting time interval seconds between calls. (These can be
355 // different if the function takes a significant amount of time to run.)
356 func.setNextRunTimeStrict(now);
359 // Release the lock while we invoke the user's function
362 // Invoke the function
364 VLOG(5) << "Now running " << func.name;
366 } catch (const std::exception& ex) {
367 LOG(ERROR) << "Error running the scheduled function <"
368 << func.name << ">: " << exceptionStr(ex);
371 // Re-acquire the lock
374 if (!currentFunction_) {
375 // The function was cancelled while we were running it.
376 // We shouldn't reschedule it;
379 // Clear currentFunction_
380 CHECK_EQ(currentFunction_, &func);
381 currentFunction_ = nullptr;
383 // Re-insert the function into our functions_ heap.
384 // We only maintain the heap property while running_ is set. (running_ may
385 // have been cleared while we were invoking the user's function.)
386 functions_.push_back(std::move(func));
388 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
392 void FunctionScheduler::addFunctionToHeap(
393 const std::unique_lock<std::mutex>& lock,
395 // This function should only be called with mutex_ already locked.
396 DCHECK(lock.mutex() == &mutex_);
397 DCHECK(lock.owns_lock());
399 functions_.emplace_back(std::move(func));
401 functions_.back().resetNextRunTime(steady_clock::now());
402 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
403 // Signal the running thread to wake up and see if it needs to change
404 // its current scheduling decision.
405 runningCondvar_.notify_one();
409 void FunctionScheduler::setThreadName(StringPiece threadName) {
410 std::unique_lock<std::mutex> l(mutex_);
411 threadName_ = threadName.str();