From: Eugene Pekurovsky Date: Wed, 9 Sep 2015 19:33:59 +0000 (-0700) Subject: folly::FunctionScheduler: Adding support for uniform interval distribution X-Git-Tag: deprecate-dynamic-initializer~430 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=3e6ccd5c48456a86f19e1f3022545b3a2b52786e;p=folly.git folly::FunctionScheduler: Adding support for uniform interval distribution Summary: 1) Added uniform interval distribution functionality. 2) Added a generic API for custom interval distribution algorithms. 3) Fixed an issue with removing a canceled function. 4) Did some code cleanup along the way. Reviewed By: @​kaanb Differential Revision: D2339911 --- diff --git a/folly/experimental/FunctionScheduler.cpp b/folly/experimental/FunctionScheduler.cpp index e9ea7543..69af28ab 100644 --- a/folly/experimental/FunctionScheduler.cpp +++ b/folly/experimental/FunctionScheduler.cpp @@ -15,18 +15,77 @@ */ #include -#include + +#include + #include +#include #include +#include -using namespace std; using std::chrono::milliseconds; using std::chrono::steady_clock; namespace folly { -FunctionScheduler::FunctionScheduler() { -} +namespace { + +struct ConstIntervalFunctor { + const milliseconds constInterval; + + explicit ConstIntervalFunctor(milliseconds interval) + : constInterval(interval) { + if (interval < milliseconds::zero()) { + throw std::invalid_argument( + "FunctionScheduler: " + "time interval must be non-negative"); + } + } + + milliseconds operator()() const { return constInterval; } +}; + +struct PoissonDistributionFunctor { + std::default_random_engine generator; + std::poisson_distribution poissonRandom; + + explicit PoissonDistributionFunctor(double meanPoissonMs) + : poissonRandom(meanPoissonMs) { + if (meanPoissonMs < 0.0) { + throw std::invalid_argument( + "FunctionScheduler: " + "Poisson mean interval must be non-negative"); + } + } + + milliseconds operator()() { return milliseconds(poissonRandom(generator)); } +}; + +struct UniformDistributionFunctor { + std::default_random_engine generator; + std::uniform_int_distribution<> dist; + + UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval) + : generator(Random::rand32()), + dist(minInterval.count(), maxInterval.count()) { + if (minInterval > maxInterval) { + throw std::invalid_argument( + "FunctionScheduler: " + "min time interval must be less or equal than max interval"); + } + if (minInterval < milliseconds::zero()) { + throw std::invalid_argument( + "FunctionScheduler: " + "time interval must be non-negative"); + } + } + + milliseconds operator()() { return milliseconds(dist(generator)); } +}; + +} // anonymous namespace + +FunctionScheduler::FunctionScheduler() {} FunctionScheduler::~FunctionScheduler() { // make sure to stop the thread (if running) @@ -37,8 +96,12 @@ void FunctionScheduler::addFunction(const std::function& cb, milliseconds interval, StringPiece nameID, milliseconds startDelay) { - LatencyDistribution latencyDistr(false, 0.0); - addFunction(cb, interval, latencyDistr, nameID, startDelay); + addFunctionGenericDistribution( + cb, + IntervalDistributionFunc(ConstIntervalFunctor(interval)), + nameID.str(), + to(interval.count(), "ms"), + startDelay); } void FunctionScheduler::addFunction(const std::function& cb, @@ -46,34 +109,72 @@ void FunctionScheduler::addFunction(const std::function& cb, const LatencyDistribution& latencyDistr, StringPiece nameID, milliseconds startDelay) { - if (interval < milliseconds::zero()) { - throw std::invalid_argument("FunctionScheduler: " - "time interval must be non-negative"); + if (latencyDistr.isPoisson) { + addFunctionGenericDistribution( + cb, + IntervalDistributionFunc( + PoissonDistributionFunctor(latencyDistr.poissonMean)), + nameID.str(), + to(latencyDistr.poissonMean, "ms (Poisson mean)"), + startDelay); + } else { + addFunction(cb, interval, nameID, startDelay); + } +} + +void FunctionScheduler::addFunctionUniformDistribution( + const std::function& cb, + milliseconds minInterval, + milliseconds maxInterval, + StringPiece nameID, + milliseconds startDelay) { + addFunctionGenericDistribution( + cb, + IntervalDistributionFunc( + UniformDistributionFunctor(minInterval, maxInterval)), + nameID.str(), + to( + "[", minInterval.count(), " , ", maxInterval.count(), "] ms"), + startDelay); +} + +void FunctionScheduler::addFunctionGenericDistribution( + const std::function& cb, + const IntervalDistributionFunc& intervalFunc, + const std::string& nameID, + const std::string& intervalDescr, + milliseconds startDelay) { + if (!cb) { + throw std::invalid_argument( + "FunctionScheduler: Scheduled function must be set"); + } + if (!intervalFunc) { + throw std::invalid_argument( + "FunctionScheduler: interval distribution function must be set"); } if (startDelay < milliseconds::zero()) { - throw std::invalid_argument("FunctionScheduler: " - "start delay must be non-negative"); + throw std::invalid_argument( + "FunctionScheduler: start delay must be non-negative"); } std::lock_guard l(mutex_); // check if the nameID is unique for (const auto& f : functions_) { if (f.isValid() && f.name == nameID) { - throw std::invalid_argument(to( - "FunctionScheduler: a function named \"", nameID, - "\" already exists")); + throw std::invalid_argument( + to("FunctionScheduler: a function named \"", + nameID, + "\" already exists")); } } if (currentFunction_ && currentFunction_->name == nameID) { - throw std::invalid_argument(to( - "FunctionScheduler: a function named \"", nameID, - "\" already exists")); + throw std::invalid_argument(to( + "FunctionScheduler: a function named \"", nameID, "\" already exists")); } - functions_.emplace_back(cb, interval, nameID.str(), startDelay, - latencyDistr.isPoisson, latencyDistr.poissonMean); + functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay); if (running_) { - functions_.back().setNextRunTime(steady_clock::now() + startDelay); + functions_.back().resetNextRunTime(steady_clock::now()); std::push_heap(functions_.begin(), functions_.end(), fnCmp_); // Signal the running thread to wake up and see if it needs to change it's // current scheduling decision. @@ -140,11 +241,10 @@ bool FunctionScheduler::start() { // Reset the next run time. for all functions. // note: this is needed since one can shutdown() and start() again for (auto& f : functions_) { - f.setNextRunTime(now + f.startDelay); - VLOG(1) << " - func: " - << (f.name.empty() ? "(anon)" : f.name.c_str()) - << ", period = " << f.timeInterval.count() - << "ms, delay = " << f.startDelay.count() << "ms"; + f.resetNextRunTime(now); + VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str()) + << ", period = " << f.intervalDescr + << ", delay = " << f.startDelay.count() << "ms"; } std::make_heap(functions_.begin(), functions_.end(), fnCmp_); @@ -217,19 +317,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, // maintain the heap property on functions_ while mutex_ is unlocked. RepeatFunc func(std::move(functions_.back())); functions_.pop_back(); + if (!func.cb) { + VLOG(5) << func.name << "function has been canceled while waiting"; + return; + } currentFunction_ = &func; - // Update the function's run time, and re-insert it into the heap. + // Update the function's next run time. if (steady_) { // This allows scheduler to catch up - func.lastRunTime += func.timeInterval; + func.setNextRunTimeSteady(); } else { - // Note that we adjust lastRunTime to the current time where we started the - // function call, rather than the time when the function finishes. + // Note that we set nextRunTime based on the current time where we started + // the function call, rather than the time when the function finishes. // This ensures that we call the function once every time interval, as // opposed to waiting time interval seconds between calls. (These can be // different if the function takes a significant amount of time to run.) - func.lastRunTime = now; + func.setNextRunTimeStrict(now); } // Release the lock while we invoke the user's function @@ -259,9 +363,6 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, // Re-insert the function into our functions_ heap. // We only maintain the heap property while running_ is set. (running_ may // have been cleared while we were invoking the user's function.) - if (func.isPoissonDistr) { - func.setTimeIntervalPoissonDistr(); - } functions_.push_back(std::move(func)); if (running_) { std::push_heap(functions_.begin(), functions_.end(), fnCmp_); diff --git a/folly/experimental/FunctionScheduler.h b/folly/experimental/FunctionScheduler.h index 1e8802fe..5ccbbcd5 100644 --- a/folly/experimental/FunctionScheduler.h +++ b/folly/experimental/FunctionScheduler.h @@ -23,7 +23,6 @@ #include #include #include -#include namespace folly { @@ -104,12 +103,45 @@ class FunctionScheduler { * Add a new function to the FunctionScheduler with a specified * LatencyDistribution */ - void addFunction(const std::function& cb, - std::chrono::milliseconds interval, - const LatencyDistribution& latencyDistr, - StringPiece nameID = StringPiece(), - std::chrono::milliseconds startDelay = - std::chrono::milliseconds(0)); + void addFunction( + const std::function& cb, + std::chrono::milliseconds interval, + const LatencyDistribution& latencyDistr, + StringPiece nameID = StringPiece(), + std::chrono::milliseconds startDelay = std::chrono::milliseconds(0)); + + /** + * Add a new function to the FunctionScheduler with the time + * interval being distributed uniformly within the given interval + * [minInterval, maxInterval]. + */ + void addFunctionUniformDistribution(const std::function& cb, + std::chrono::milliseconds minInterval, + std::chrono::milliseconds maxInterval, + StringPiece nameID, + std::chrono::milliseconds startDelay); + + /** + * A type alias for function that is called to determine the time + * interval for the next scheduled run. + */ + using IntervalDistributionFunc = std::function; + + /** + * Add a new function to the FunctionScheduler. The scheduling interval + * is determined by the interval distribution functor, which is called + * every time the next function execution is scheduled. This allows + * for supporting custom interval distribution algorithms in addition + * to built in constant interval; and Poisson and jitter distributions + * (@see FunctionScheduler::addFunction and + * @see FunctionScheduler::addFunctionJitterInterval). + */ + void addFunctionGenericDistribution( + const std::function& cb, + const IntervalDistributionFunc& intervalFunc, + const std::string& nameID, + const std::string& intervalDescr, + std::chrono::milliseconds startDelay); /** * Cancels the function with the specified name, so it will no longer be run. @@ -142,63 +174,56 @@ class FunctionScheduler { */ void setThreadName(StringPiece threadName); - private: struct RepeatFunc { std::function cb; - std::chrono::milliseconds timeInterval; - std::chrono::steady_clock::time_point lastRunTime; + IntervalDistributionFunc intervalFunc; + std::chrono::steady_clock::time_point nextRunTime; std::string name; std::chrono::milliseconds startDelay; - bool isPoissonDistr; - std::default_random_engine generator; - std::poisson_distribution poisson_random; + std::string intervalDescr; RepeatFunc(const std::function& cback, - std::chrono::milliseconds interval, + const IntervalDistributionFunc& intervalFn, const std::string& nameID, - std::chrono::milliseconds delay, - bool poisson = false, - double meanPoisson = 1.0) - : cb(cback), - timeInterval(interval), - lastRunTime(), - name(nameID), - startDelay(delay), - isPoissonDistr(poisson), - poisson_random(meanPoisson) { - } + const std::string& intervalDistDescription, + std::chrono::milliseconds delay) + : cb(cback), + intervalFunc(intervalFn), + nextRunTime(), + name(nameID), + startDelay(delay), + intervalDescr(intervalDistDescription) {} std::chrono::steady_clock::time_point getNextRunTime() const { - return lastRunTime + timeInterval; + return nextRunTime; } - void setNextRunTime(std::chrono::steady_clock::time_point time) { - lastRunTime = time - timeInterval; + void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) { + nextRunTime = curTime + intervalFunc(); } - void setTimeIntervalPoissonDistr() { - if (isPoissonDistr) { - timeInterval = std::chrono::milliseconds(poisson_random(generator)); - } + void setNextRunTimeSteady() { nextRunTime += intervalFunc(); } + void resetNextRunTime(std::chrono::steady_clock::time_point curTime) { + nextRunTime = curTime + startDelay; } void cancel() { // Simply reset cb to an empty function. cb = std::function(); } - bool isValid() const { - return bool(cb); - } + bool isValid() const { return bool(cb); } }; + struct RunTimeOrder { bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const { return f1.getNextRunTime() > f2.getNextRunTime(); } }; + typedef std::vector FunctionHeap; void run(); void runOneFunction(std::unique_lock& lock, std::chrono::steady_clock::time_point now); - void cancelFunction(const std::unique_lock &lock, + void cancelFunction(const std::unique_lock& lock, FunctionHeap::iterator it); std::thread thread_; diff --git a/folly/experimental/test/FunctionSchedulerTest.cpp b/folly/experimental/test/FunctionSchedulerTest.cpp index ebe0caf6..466b0941 100644 --- a/folly/experimental/test/FunctionSchedulerTest.cpp +++ b/folly/experimental/test/FunctionSchedulerTest.cpp @@ -15,7 +15,11 @@ */ #include +#include #include +#include +#include +#include #include using namespace folly; @@ -31,8 +35,12 @@ namespace { * to run. */ static const auto timeFactor = std::chrono::milliseconds(100); -std::chrono::milliseconds testInterval(int n) { - return n * timeFactor; +std::chrono::milliseconds testInterval(int n) { return n * timeFactor; } +int getTicksWithinRange(int n, int min, int max) { + assert(min <= max); + n = std::max(min, n); + n = std::min(max, n); + return n; } void delay(int n) { std::chrono::microseconds usec(n * timeFactor); @@ -321,3 +329,86 @@ TEST(FunctionScheduler, SteadyCatchup) { // enough to catch back up to schedule EXPECT_NEAR(100, ticks.load(), 10); } + +TEST(FunctionScheduler, UniformDistribution) { + int total = 0; + const int kTicks = 2; + std::chrono::milliseconds minInterval = + testInterval(kTicks) - (timeFactor / 5); + std::chrono::milliseconds maxInterval = + testInterval(kTicks) + (timeFactor / 5); + FunctionScheduler fs; + fs.addFunctionUniformDistribution([&] { total += 2; }, + minInterval, + maxInterval, + "UniformDistribution", + std::chrono::milliseconds(0)); + fs.start(); + delay(1); + EXPECT_EQ(2, total); + delay(kTicks); + EXPECT_EQ(4, total); + delay(kTicks); + EXPECT_EQ(6, total); + fs.shutdown(); + delay(2); + EXPECT_EQ(6, total); +} + +TEST(FunctionScheduler, ExponentialBackoff) { + int total = 0; + int expectedInterval = 0; + int nextInterval = 2; + FunctionScheduler fs; + fs.addFunctionGenericDistribution( + [&] { total += 2; }, + [&expectedInterval, nextInterval]() mutable { + expectedInterval = nextInterval; + nextInterval *= nextInterval; + return testInterval(expectedInterval); + }, + "ExponentialBackoff", + "2^n * 100ms", + std::chrono::milliseconds(0)); + fs.start(); + delay(1); + EXPECT_EQ(2, total); + delay(expectedInterval); + EXPECT_EQ(4, total); + delay(expectedInterval); + EXPECT_EQ(6, total); + fs.shutdown(); + delay(2); + EXPECT_EQ(6, total); +} + +TEST(FunctionScheduler, GammaIntervalDistribution) { + int total = 0; + int expectedInterval = 0; + FunctionScheduler fs; + std::default_random_engine generator(folly::Random::rand32()); + // The alpha and beta arguments are selected, somewhat randomly, to be 2.0. + // These values do not matter much in this test, as we are not testing the + // std::gamma_distribution itself... + std::gamma_distribution gamma(2.0, 2.0); + fs.addFunctionGenericDistribution( + [&] { total += 2; }, + [&expectedInterval, generator, gamma]() mutable { + expectedInterval = + getTicksWithinRange(static_cast(gamma(generator)), 2, 10); + return testInterval(expectedInterval); + }, + "GammaDistribution", + "gamma(2.0,2.0)*100ms", + std::chrono::milliseconds(0)); + fs.start(); + delay(1); + EXPECT_EQ(2, total); + delay(expectedInterval); + EXPECT_EQ(4, total); + delay(expectedInterval); + EXPECT_EQ(6, total); + fs.shutdown(); + delay(2); + EXPECT_EQ(6, total); +}