*/
#include <folly/experimental/FunctionScheduler.h>
-#include <folly/ThreadName.h>
+
+#include <random>
+
#include <folly/Conv.h>
+#include <folly/Random.h>
#include <folly/String.h>
+#include <folly/ThreadName.h>
-using namespace std;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
namespace folly {
-FunctionScheduler::FunctionScheduler() {
-}
+namespace {
+
+struct ConstIntervalFunctor {
+ const milliseconds constInterval;
+
+ explicit ConstIntervalFunctor(milliseconds interval)
+ : constInterval(interval) {
+ if (interval < milliseconds::zero()) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "time interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() const { return constInterval; }
+};
+
+struct PoissonDistributionFunctor {
+ std::default_random_engine generator;
+ std::poisson_distribution<int> poissonRandom;
+
+ explicit PoissonDistributionFunctor(double meanPoissonMs)
+ : poissonRandom(meanPoissonMs) {
+ if (meanPoissonMs < 0.0) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "Poisson mean interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
+};
+
+struct UniformDistributionFunctor {
+ std::default_random_engine generator;
+ std::uniform_int_distribution<> dist;
+
+ UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
+ : generator(Random::rand32()),
+ dist(minInterval.count(), maxInterval.count()) {
+ if (minInterval > maxInterval) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "min time interval must be less or equal than max interval");
+ }
+ if (minInterval < milliseconds::zero()) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "time interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() { return milliseconds(dist(generator)); }
+};
+
+} // anonymous namespace
+
+FunctionScheduler::FunctionScheduler() {}
FunctionScheduler::~FunctionScheduler() {
// make sure to stop the thread (if running)
milliseconds interval,
StringPiece nameID,
milliseconds startDelay) {
- LatencyDistribution latencyDistr(false, 0.0);
- addFunction(cb, interval, latencyDistr, nameID, startDelay);
+ addFunctionGenericDistribution(
+ cb,
+ IntervalDistributionFunc(ConstIntervalFunctor(interval)),
+ nameID.str(),
+ to<std::string>(interval.count(), "ms"),
+ startDelay);
}
void FunctionScheduler::addFunction(const std::function<void()>& cb,
const LatencyDistribution& latencyDistr,
StringPiece nameID,
milliseconds startDelay) {
- if (interval < milliseconds::zero()) {
- throw std::invalid_argument("FunctionScheduler: "
- "time interval must be non-negative");
+ if (latencyDistr.isPoisson) {
+ addFunctionGenericDistribution(
+ cb,
+ IntervalDistributionFunc(
+ PoissonDistributionFunctor(latencyDistr.poissonMean)),
+ nameID.str(),
+ to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
+ startDelay);
+ } else {
+ addFunction(cb, interval, nameID, startDelay);
+ }
+}
+
+void FunctionScheduler::addFunctionUniformDistribution(
+ const std::function<void()>& cb,
+ milliseconds minInterval,
+ milliseconds maxInterval,
+ StringPiece nameID,
+ milliseconds startDelay) {
+ addFunctionGenericDistribution(
+ cb,
+ IntervalDistributionFunc(
+ UniformDistributionFunctor(minInterval, maxInterval)),
+ nameID.str(),
+ to<std::string>(
+ "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
+ startDelay);
+}
+
+void FunctionScheduler::addFunctionGenericDistribution(
+ const std::function<void()>& cb,
+ const IntervalDistributionFunc& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ milliseconds startDelay) {
+ if (!cb) {
+ throw std::invalid_argument(
+ "FunctionScheduler: Scheduled function must be set");
+ }
+ if (!intervalFunc) {
+ throw std::invalid_argument(
+ "FunctionScheduler: interval distribution function must be set");
}
if (startDelay < milliseconds::zero()) {
- throw std::invalid_argument("FunctionScheduler: "
- "start delay must be non-negative");
+ throw std::invalid_argument(
+ "FunctionScheduler: start delay must be non-negative");
}
std::lock_guard<std::mutex> l(mutex_);
// check if the nameID is unique
for (const auto& f : functions_) {
if (f.isValid() && f.name == nameID) {
- throw std::invalid_argument(to<string>(
- "FunctionScheduler: a function named \"", nameID,
- "\" already exists"));
+ throw std::invalid_argument(
+ to<std::string>("FunctionScheduler: a function named \"",
+ nameID,
+ "\" already exists"));
}
}
if (currentFunction_ && currentFunction_->name == nameID) {
- throw std::invalid_argument(to<string>(
- "FunctionScheduler: a function named \"", nameID,
- "\" already exists"));
+ throw std::invalid_argument(to<std::string>(
+ "FunctionScheduler: a function named \"", nameID, "\" already exists"));
}
- functions_.emplace_back(cb, interval, nameID.str(), startDelay,
- latencyDistr.isPoisson, latencyDistr.poissonMean);
+ functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
if (running_) {
- functions_.back().setNextRunTime(steady_clock::now() + startDelay);
+ functions_.back().resetNextRunTime(steady_clock::now());
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
// Signal the running thread to wake up and see if it needs to change it's
// current scheduling decision.
// Reset the next run time. for all functions.
// note: this is needed since one can shutdown() and start() again
for (auto& f : functions_) {
- f.setNextRunTime(now + f.startDelay);
- VLOG(1) << " - func: "
- << (f.name.empty() ? "(anon)" : f.name.c_str())
- << ", period = " << f.timeInterval.count()
- << "ms, delay = " << f.startDelay.count() << "ms";
+ f.resetNextRunTime(now);
+ VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
+ << ", period = " << f.intervalDescr
+ << ", delay = " << f.startDelay.count() << "ms";
}
std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
// maintain the heap property on functions_ while mutex_ is unlocked.
RepeatFunc func(std::move(functions_.back()));
functions_.pop_back();
+ if (!func.cb) {
+ VLOG(5) << func.name << "function has been canceled while waiting";
+ return;
+ }
currentFunction_ = &func;
- // Update the function's run time, and re-insert it into the heap.
+ // Update the function's next run time.
if (steady_) {
// This allows scheduler to catch up
- func.lastRunTime += func.timeInterval;
+ func.setNextRunTimeSteady();
} else {
- // Note that we adjust lastRunTime to the current time where we started the
- // function call, rather than the time when the function finishes.
+ // Note that we set nextRunTime based on the current time where we started
+ // the function call, rather than the time when the function finishes.
// This ensures that we call the function once every time interval, as
// opposed to waiting time interval seconds between calls. (These can be
// different if the function takes a significant amount of time to run.)
- func.lastRunTime = now;
+ func.setNextRunTimeStrict(now);
}
// Release the lock while we invoke the user's function
// Re-insert the function into our functions_ heap.
// We only maintain the heap property while running_ is set. (running_ may
// have been cleared while we were invoking the user's function.)
- if (func.isPoissonDistr) {
- func.setTimeIntervalPoissonDistr();
- }
functions_.push_back(std::move(func));
if (running_) {
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
#include <mutex>
#include <thread>
#include <vector>
-#include <random>
namespace folly {
* Add a new function to the FunctionScheduler with a specified
* LatencyDistribution
*/
- void addFunction(const std::function<void()>& cb,
- std::chrono::milliseconds interval,
- const LatencyDistribution& latencyDistr,
- StringPiece nameID = StringPiece(),
- std::chrono::milliseconds startDelay =
- std::chrono::milliseconds(0));
+ void addFunction(
+ const std::function<void()>& cb,
+ std::chrono::milliseconds interval,
+ const LatencyDistribution& latencyDistr,
+ StringPiece nameID = StringPiece(),
+ std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+
+ /**
+ * Add a new function to the FunctionScheduler with the time
+ * interval being distributed uniformly within the given interval
+ * [minInterval, maxInterval].
+ */
+ void addFunctionUniformDistribution(const std::function<void()>& cb,
+ std::chrono::milliseconds minInterval,
+ std::chrono::milliseconds maxInterval,
+ StringPiece nameID,
+ std::chrono::milliseconds startDelay);
+
+ /**
+ * A type alias for function that is called to determine the time
+ * interval for the next scheduled run.
+ */
+ using IntervalDistributionFunc = std::function<std::chrono::milliseconds()>;
+
+ /**
+ * Add a new function to the FunctionScheduler. The scheduling interval
+ * is determined by the interval distribution functor, which is called
+ * every time the next function execution is scheduled. This allows
+ * for supporting custom interval distribution algorithms in addition
+ * to built in constant interval; and Poisson and jitter distributions
+ * (@see FunctionScheduler::addFunction and
+ * @see FunctionScheduler::addFunctionJitterInterval).
+ */
+ void addFunctionGenericDistribution(
+ const std::function<void()>& cb,
+ const IntervalDistributionFunc& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ std::chrono::milliseconds startDelay);
/**
* Cancels the function with the specified name, so it will no longer be run.
*/
void setThreadName(StringPiece threadName);
-
private:
struct RepeatFunc {
std::function<void()> cb;
- std::chrono::milliseconds timeInterval;
- std::chrono::steady_clock::time_point lastRunTime;
+ IntervalDistributionFunc intervalFunc;
+ std::chrono::steady_clock::time_point nextRunTime;
std::string name;
std::chrono::milliseconds startDelay;
- bool isPoissonDistr;
- std::default_random_engine generator;
- std::poisson_distribution<int> poisson_random;
+ std::string intervalDescr;
RepeatFunc(const std::function<void()>& cback,
- std::chrono::milliseconds interval,
+ const IntervalDistributionFunc& intervalFn,
const std::string& nameID,
- std::chrono::milliseconds delay,
- bool poisson = false,
- double meanPoisson = 1.0)
- : cb(cback),
- timeInterval(interval),
- lastRunTime(),
- name(nameID),
- startDelay(delay),
- isPoissonDistr(poisson),
- poisson_random(meanPoisson) {
- }
+ const std::string& intervalDistDescription,
+ std::chrono::milliseconds delay)
+ : cb(cback),
+ intervalFunc(intervalFn),
+ nextRunTime(),
+ name(nameID),
+ startDelay(delay),
+ intervalDescr(intervalDistDescription) {}
std::chrono::steady_clock::time_point getNextRunTime() const {
- return lastRunTime + timeInterval;
+ return nextRunTime;
}
- void setNextRunTime(std::chrono::steady_clock::time_point time) {
- lastRunTime = time - timeInterval;
+ void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
+ nextRunTime = curTime + intervalFunc();
}
- void setTimeIntervalPoissonDistr() {
- if (isPoissonDistr) {
- timeInterval = std::chrono::milliseconds(poisson_random(generator));
- }
+ void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
+ void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
+ nextRunTime = curTime + startDelay;
}
void cancel() {
// Simply reset cb to an empty function.
cb = std::function<void()>();
}
- bool isValid() const {
- return bool(cb);
- }
+ bool isValid() const { return bool(cb); }
};
+
struct RunTimeOrder {
bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
return f1.getNextRunTime() > f2.getNextRunTime();
}
};
+
typedef std::vector<RepeatFunc> FunctionHeap;
void run();
void runOneFunction(std::unique_lock<std::mutex>& lock,
std::chrono::steady_clock::time_point now);
- void cancelFunction(const std::unique_lock<std::mutex> &lock,
+ void cancelFunction(const std::unique_lock<std::mutex>& lock,
FunctionHeap::iterator it);
std::thread thread_;
*/
#include <folly/experimental/FunctionScheduler.h>
+#include <algorithm>
#include <atomic>
+#include <cassert>
+#include <random>
+#include <folly/Random.h>
#include <gtest/gtest.h>
using namespace folly;
* to run.
*/
static const auto timeFactor = std::chrono::milliseconds(100);
-std::chrono::milliseconds testInterval(int n) {
- return n * timeFactor;
+std::chrono::milliseconds testInterval(int n) { return n * timeFactor; }
+int getTicksWithinRange(int n, int min, int max) {
+ assert(min <= max);
+ n = std::max(min, n);
+ n = std::min(max, n);
+ return n;
}
void delay(int n) {
std::chrono::microseconds usec(n * timeFactor);
// enough to catch back up to schedule
EXPECT_NEAR(100, ticks.load(), 10);
}
+
+TEST(FunctionScheduler, UniformDistribution) {
+ int total = 0;
+ const int kTicks = 2;
+ std::chrono::milliseconds minInterval =
+ testInterval(kTicks) - (timeFactor / 5);
+ std::chrono::milliseconds maxInterval =
+ testInterval(kTicks) + (timeFactor / 5);
+ FunctionScheduler fs;
+ fs.addFunctionUniformDistribution([&] { total += 2; },
+ minInterval,
+ maxInterval,
+ "UniformDistribution",
+ std::chrono::milliseconds(0));
+ fs.start();
+ delay(1);
+ EXPECT_EQ(2, total);
+ delay(kTicks);
+ EXPECT_EQ(4, total);
+ delay(kTicks);
+ EXPECT_EQ(6, total);
+ fs.shutdown();
+ delay(2);
+ EXPECT_EQ(6, total);
+}
+
+TEST(FunctionScheduler, ExponentialBackoff) {
+ int total = 0;
+ int expectedInterval = 0;
+ int nextInterval = 2;
+ FunctionScheduler fs;
+ fs.addFunctionGenericDistribution(
+ [&] { total += 2; },
+ [&expectedInterval, nextInterval]() mutable {
+ expectedInterval = nextInterval;
+ nextInterval *= nextInterval;
+ return testInterval(expectedInterval);
+ },
+ "ExponentialBackoff",
+ "2^n * 100ms",
+ std::chrono::milliseconds(0));
+ fs.start();
+ delay(1);
+ EXPECT_EQ(2, total);
+ delay(expectedInterval);
+ EXPECT_EQ(4, total);
+ delay(expectedInterval);
+ EXPECT_EQ(6, total);
+ fs.shutdown();
+ delay(2);
+ EXPECT_EQ(6, total);
+}
+
+TEST(FunctionScheduler, GammaIntervalDistribution) {
+ int total = 0;
+ int expectedInterval = 0;
+ FunctionScheduler fs;
+ std::default_random_engine generator(folly::Random::rand32());
+ // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
+ // These values do not matter much in this test, as we are not testing the
+ // std::gamma_distribution itself...
+ std::gamma_distribution<double> gamma(2.0, 2.0);
+ fs.addFunctionGenericDistribution(
+ [&] { total += 2; },
+ [&expectedInterval, generator, gamma]() mutable {
+ expectedInterval =
+ getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
+ return testInterval(expectedInterval);
+ },
+ "GammaDistribution",
+ "gamma(2.0,2.0)*100ms",
+ std::chrono::milliseconds(0));
+ fs.start();
+ delay(1);
+ EXPECT_EQ(2, total);
+ delay(expectedInterval);
+ EXPECT_EQ(4, total);
+ delay(expectedInterval);
+ EXPECT_EQ(6, total);
+ fs.shutdown();
+ delay(2);
+ EXPECT_EQ(6, total);
+}