From 07802d906f2f110e22a337b98673ad1c786cdc52 Mon Sep 17 00:00:00 2001 From: Philipp Unterbrunner Date: Mon, 29 Aug 2016 06:27:23 -0700 Subject: [PATCH] Generalized and polished folly::TokenBucket Summary: Added support for user-defined clock classes, improved comments, and removed part of the std::atomics use that had no effect on thread-safety. Reviewed By: yfeldblum Differential Revision: D3708378 fbshipit-source-id: 1a933c3707c12311584a3b33afd773ee91577167 --- folly/TokenBucket.h | 438 ++++++++++++++++++++++++--------- folly/test/TokenBucketTest.cpp | 61 +++++ 2 files changed, 388 insertions(+), 111 deletions(-) diff --git a/folly/TokenBucket.h b/folly/TokenBucket.h index 363bb7c4..4cd32561 100644 --- a/folly/TokenBucket.h +++ b/folly/TokenBucket.h @@ -18,155 +18,371 @@ #include #include -#include #include #include +#include namespace folly { -/** Threadsafe TokenBucket implementation, based on the idea of - * converting tokens into time and maintaining state as a timestamp relative to - * now. The number of tokens available is represented by the delta between now - * and the timestamp, and the 'burst' is represented by the maximum delta. +/** + * Default clock class used by ParameterizedDynamicTokenBucket and derived + * classes. User-defined clock classes must be steady (monotonic) and define a + * static function std::chrono::duration<> timeSinceEpoch(). */ -class TokenBucket { - private: - std::atomic time_; - std::atomic secondsPerToken_; - std::atomic secondsPerBurst_; +struct DefaultTokenBucketClock { + static auto timeSinceEpoch() noexcept + -> decltype(std::chrono::steady_clock::now().time_since_epoch()) { + return std::chrono::steady_clock::now().time_since_epoch(); + } +}; +/** + * Thread-safe (atomic) token bucket implementation. + * + * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream + * of events with an average rate and some amount of burstiness. The canonical + * example is a packet switched network: the network can accept some number of + * bytes per second and the bytes come in finite packets (bursts). A token + * bucket stores up to a fixed number of tokens (the burst size). Some number + * of tokens are removed when an event occurs. The tokens are replenished at a + * fixed rate. + * + * This implementation records the last time it was updated. This allows the + * token bucket to add tokens "just in time" when tokens are requested. + * + * The "dynamic" base variant allows the token generation rate and maximum + * burst size to change with every token consumption. + * + * @tparam ClockT Clock type, must be steady i.e. monotonic. + */ +template +class ParameterizedDynamicTokenBucket { public: - TokenBucket(double rate, double burst, double nowInSeconds) noexcept - : time_(nowInSeconds) { - reset(rate, burst, nowInSeconds); + /** + * Constructor. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * buckets are "full" after construction. + */ + explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept + : zeroTime_(zeroTime) {} + + /** + * Copy constructor. + * + * Thread-safe. (Copy constructors of derived classes may not be thread-safe + * however.) + */ + ParameterizedDynamicTokenBucket( + const ParameterizedDynamicTokenBucket& other) noexcept + : zeroTime_(other.zeroTime_.load()) {} + + /** + * Copy-assignment operator. + * + * Warning: not thread safe for the object being assigned to (including + * self-assignment). Thread-safe for the other object. + */ + ParameterizedDynamicTokenBucket& operator=( + const ParameterizedDynamicTokenBucket& other) noexcept { + zeroTime_ = other.zeroTime_.load(); + return *this; } - void reset(double rate, double burst, double nowInSeconds) noexcept { - double tokens = available(nowInSeconds); - - secondsPerToken_.store( - 1.0 / rate - std::numeric_limits::epsilon(), - std::memory_order_relaxed); - - secondsPerBurst_.store( - burst / rate + std::numeric_limits::epsilon(), - std::memory_order_relaxed); - - set_capacity(tokens, nowInSeconds); + /** + * Re-initialize token bucket. + * + * Thread-safe. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is reset to "full". + */ + void reset(double zeroTime = 0) noexcept { + zeroTime_ = zeroTime; } - void set_capacity(double tokens, double nowInSeconds) noexcept { - const double secondsPerToken = std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed); - - const double secondsPerBurst = std::atomic_load_explicit( - &secondsPerBurst_, std::memory_order_relaxed); - - double newTime = nowInSeconds - std::min( - tokens * secondsPerToken, secondsPerBurst); - - time_.store(newTime, std::memory_order_relaxed); + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume( + double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) { + assert(rate > 0); + assert(burstSize > 0); + + return this->consumeImpl( + rate, burstSize, nowInSeconds, [toConsume](double& tokens) { + if (tokens < toConsume) { + return false; + } + tokens -= toConsume; + return true; + }); } - // If there are `tokens` avilable at `nowInSeconds`, consume them and - // return true. Otherwise, return false. - // - // This implementation is written in a lock-free manner using a - // compare-and-exchange loop, with branch prediction optimized to minimize - // time spent in the 'success' case which performs a write. - bool consume(double tokens, double nowInSeconds) noexcept { - const double secondsNeeded = tokens * std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed); - - const double minTime = nowInSeconds - std::atomic_load_explicit( - &secondsPerBurst_, std::memory_order_relaxed); + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain( + double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) { + assert(rate > 0); + assert(burstSize > 0); - double oldTime = - std::atomic_load_explicit(&time_, std::memory_order_relaxed); - double newTime = oldTime; + double consumed; + this->consumeImpl( + rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) { + if (tokens < toConsume) { + consumed = tokens; + tokens = 0.0; + } else { + consumed = toConsume; + tokens -= toConsume; + } + return true; + }); + return consumed; + } - // Limit the number of available tokens to 'burst'. We don't need to do - // this inside the loop because if we iterate more than once another - // caller will have performed an update that also covered this - // calculation. Also, tell the compiler to optimize branch prediction to - // minimize time spent between reads and writes in the success case - if (UNLIKELY(minTime > oldTime)) { - newTime = minTime; - } + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available( + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) const noexcept { + assert(rate > 0); + assert(burstSize > 0); + + return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize); + } - while (true) { - newTime += secondsNeeded; + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) { + return std::chrono::duration_cast>( + ClockT::timeSinceEpoch()) + .count(); + } - // Optimize for the write-contention case, to minimize the impact of - // branch misprediction on other threads - if (UNLIKELY(newTime > nowInSeconds)) { + private: + template + bool consumeImpl( + double rate, + double burstSize, + double nowInSeconds, + const TCallback& callback) { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); + if (!callback(tokens)) { return false; } - - // Optimize for the write-contention case, to minimize the impact of - // branch misprediction on other threads - if (LIKELY(std::atomic_compare_exchange_weak_explicit( - &time_, &oldTime, newTime, - std::memory_order_relaxed, std::memory_order_relaxed))) { - return true; - } - - newTime = oldTime; - } + zeroTimeNew = nowInSeconds - tokens / rate; + } while ( + UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); return true; } - // Similar to consume, but will always consume some number of tokens. - double consumeOrDrain(double tokens, double nowInSeconds) noexcept { - const double secondsPerToken = std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed); - - const double secondsNeeded = tokens * secondsPerToken; - const double minTime = nowInSeconds - std::atomic_load_explicit( - &secondsPerBurst_, std::memory_order_relaxed); - - double oldTime = - std::atomic_load_explicit(&time_, std::memory_order_relaxed); - double newTime = oldTime; + std::atomic zeroTime_ FOLLY_ALIGN_TO_AVOID_FALSE_SHARING; +}; +/** + * Specialization of ParameterizedDynamicTokenBucket with a fixed token + * generation rate and a fixed maximum burst size. + */ +template +class ParameterizedTokenBucket { + private: + using Impl = ParameterizedDynamicTokenBucket; - // Limit the number of available tokens to 'burst'. - // Also, tell the compiler to optimize branch prediction to - // minimize time spent between reads and writes in the success case - if (UNLIKELY(minTime > oldTime)) { - newTime = minTime; - } + public: + /** + * Construct a token bucket with a specific maximum rate and burst size. + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is "full" after construction. + */ + ParameterizedTokenBucket( + double genRate, + double burstSize, + double zeroTime = 0) noexcept + : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) { + assert(rate_ > 0); + assert(burstSize_ > 0); + } - double consumed; + /** + * Copy constructor. + * + * Warning: not thread safe! + */ + ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept = + default; + + /** + * Copy-assignment operator. + * + * Warning: not thread safe! + */ + ParameterizedTokenBucket& operator=( + const ParameterizedTokenBucket& other) noexcept = default; + + /** + * Change rate and burst size. + * + * Warning: not thread safe! + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void reset( + double genRate, + double burstSize, + double nowInSeconds = defaultClockNow()) noexcept { + assert(genRate > 0); + assert(burstSize > 0); + double availTokens = available(nowInSeconds); + rate_ = genRate; + burstSize_ = burstSize; + setCapacity(availTokens, nowInSeconds); + } - newTime += secondsNeeded; + /** + * Change number of tokens in bucket. + * + * Warning: not thread safe! + * + * @param tokens Desired number of tokens in bucket after the call. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void setCapacity(double tokens, double nowInSeconds) noexcept { + tokenBucket_.reset(nowInSeconds - tokens / rate_); + } - consumed = (newTime - nowInSeconds) / secondsPerToken; - time_.store(newTime, std::memory_order_relaxed); + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, double nowInSeconds = defaultClockNow()) { + return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); + } - return consumed; + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain( + double toConsume, + double nowInSeconds = defaultClockNow()) { + return tokenBucket_.consumeOrDrain( + toConsume, rate_, burstSize_, nowInSeconds); } - double available(double nowInSeconds = defaultClockNow()) const noexcept { - double time = - std::atomic_load_explicit(&time_, std::memory_order_relaxed); + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double nowInSeconds = defaultClockNow()) const { + return tokenBucket_.available(rate_, burstSize_, nowInSeconds); + } - double deltaTime = std::min( - std::atomic_load_explicit(&secondsPerBurst_, - std::memory_order_relaxed), - nowInSeconds - time); + /** + * Returns the number of tokens generated per second. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double rate() const noexcept { + return rate_; + } - return std::max(0.0, deltaTime / std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed)); + /** + * Returns the maximum burst size. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double burst() const noexcept { + return burstSize_; } - static double defaultClockNow() { - return std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch() - ).count() / 1000000.0; + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) { + return Impl::defaultClockNow(); } + + private: + Impl tokenBucket_; + double rate_; + double burstSize_; }; +using TokenBucket = ParameterizedTokenBucket<>; +using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>; } diff --git a/folly/test/TokenBucketTest.cpp b/folly/test/TokenBucketTest.cpp index 6b699c80..7f0f4953 100644 --- a/folly/test/TokenBucketTest.cpp +++ b/folly/test/TokenBucketTest.cpp @@ -72,3 +72,64 @@ static std::vector > rateToConsumeSize = { INSTANTIATE_TEST_CASE_P(TokenBucket, TokenBucketTest, ::testing::ValuesIn(rateToConsumeSize)); + +void doTokenBucketTest(double maxQps, double consumeSize) { + const double tenMillisecondBurst = maxQps * 0.010; + // Select a burst size of 10 milliseconds at the max rate or the consume size + // if 10 ms at maxQps is too small. + const double burstSize = std::max(consumeSize, tenMillisecondBurst); + TokenBucket tokenBucket(maxQps, burstSize, 0); + double tokenCounter = 0; + double currentTime = 0; + // Simulate time advancing 10 seconds + for (; currentTime <= 10.0; currentTime += 0.001) { + EXPECT_FALSE(tokenBucket.consume(burstSize + 1, currentTime)); + while (tokenBucket.consume(consumeSize, currentTime)) { + tokenCounter += consumeSize; + } + // Tokens consumed should exceed some lower bound based on maxQps. + // Note: The token bucket implementation is not precise, so the lower bound + // is somewhat fudged. The upper bound is accurate however. + EXPECT_LE(maxQps * currentTime * 0.9 - 1, tokenCounter); + // Tokens consumed should not exceed some upper bound based on maxQps. + EXPECT_GE(maxQps * currentTime + 1e-6, tokenCounter); + } +} + +TEST(TokenBucket, sanity) { + doTokenBucketTest(100, 1); + doTokenBucketTest(1000, 1); + doTokenBucketTest(10000, 1); + // Consume more than one at a time. + doTokenBucketTest(10000, 5); +} + +TEST(TokenBucket, ReverseTime2) { + const double rate = 1000; + TokenBucket tokenBucket(rate, rate * 0.01 + 1e-6); + size_t count = 0; + while (tokenBucket.consume(1, 0.1)) { + count += 1; + } + EXPECT_EQ(10, count); + // Going backwards in time has no affect on the toke count (this protects + // against different threads providing out of order timestamps). + double tokensBefore = tokenBucket.available(); + EXPECT_FALSE(tokenBucket.consume(1, 0.09999999)); + EXPECT_EQ(tokensBefore, tokenBucket.available()); +} + +TEST(TokenBucket, drainOnFail) { + DynamicTokenBucket tokenBucket; + + // Almost empty the bucket + EXPECT_TRUE(tokenBucket.consume(9, 10, 10, 1)); + + // Request more tokens than available + EXPECT_FALSE(tokenBucket.consume(5, 10, 10, 1)); + EXPECT_DOUBLE_EQ(1.0, tokenBucket.available(10, 10, 1)); + + // Again request more tokens than available, but ask to drain + EXPECT_DOUBLE_EQ(1.0, tokenBucket.consumeOrDrain(5, 10, 10, 1)); + EXPECT_DOUBLE_EQ(0.0, tokenBucket.consumeOrDrain(1, 10, 10, 1)); +} -- 2.34.1