#include <algorithm>
#include <atomic>
-#include <limits>
#include <chrono>
#include <folly/Likely.h>
+#include <folly/detail/CacheLocality.h>
namespace folly {
-/** Threadsafe TokenBucket implementation, based on the idea of
- * converting tokens into time and maintaining state as a timestamp relative to
- * now. The number of tokens available is represented by the delta between now
- * and the timestamp, and the 'burst' is represented by the maximum delta.
+/**
+ * Default clock class used by ParameterizedDynamicTokenBucket and derived
+ * classes. User-defined clock classes must be steady (monotonic) and define a
+ * static function std::chrono::duration<> timeSinceEpoch().
*/
-class TokenBucket {
- private:
- std::atomic<double> time_;
- std::atomic<double> secondsPerToken_;
- std::atomic<double> secondsPerBurst_;
+struct DefaultTokenBucketClock {
+ static auto timeSinceEpoch() noexcept
+ -> decltype(std::chrono::steady_clock::now().time_since_epoch()) {
+ return std::chrono::steady_clock::now().time_since_epoch();
+ }
+};
+/**
+ * Thread-safe (atomic) token bucket implementation.
+ *
+ * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
+ * of events with an average rate and some amount of burstiness. The canonical
+ * example is a packet switched network: the network can accept some number of
+ * bytes per second and the bytes come in finite packets (bursts). A token
+ * bucket stores up to a fixed number of tokens (the burst size). Some number
+ * of tokens are removed when an event occurs. The tokens are replenished at a
+ * fixed rate.
+ *
+ * This implementation records the last time it was updated. This allows the
+ * token bucket to add tokens "just in time" when tokens are requested.
+ *
+ * The "dynamic" base variant allows the token generation rate and maximum
+ * burst size to change with every token consumption.
+ *
+ * @tparam ClockT Clock type, must be steady i.e. monotonic.
+ */
+template <typename ClockT = DefaultTokenBucketClock>
+class ParameterizedDynamicTokenBucket {
public:
- TokenBucket(double rate, double burst, double nowInSeconds) noexcept
- : time_(nowInSeconds) {
- reset(rate, burst, nowInSeconds);
+ /**
+ * Constructor.
+ *
+ * @param zeroTime Initial time at which to consider the token bucket
+ * starting to fill. Defaults to 0, so by default token
+ * buckets are "full" after construction.
+ */
+ explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept
+ : zeroTime_(zeroTime) {}
+
+ /**
+ * Copy constructor.
+ *
+ * Thread-safe. (Copy constructors of derived classes may not be thread-safe
+ * however.)
+ */
+ ParameterizedDynamicTokenBucket(
+ const ParameterizedDynamicTokenBucket& other) noexcept
+ : zeroTime_(other.zeroTime_.load()) {}
+
+ /**
+ * Copy-assignment operator.
+ *
+ * Warning: not thread safe for the object being assigned to (including
+ * self-assignment). Thread-safe for the other object.
+ */
+ ParameterizedDynamicTokenBucket& operator=(
+ const ParameterizedDynamicTokenBucket& other) noexcept {
+ zeroTime_ = other.zeroTime_.load();
+ return *this;
}
- void reset(double rate, double burst, double nowInSeconds) noexcept {
- double tokens = available(nowInSeconds);
-
- secondsPerToken_.store(
- 1.0 / rate - std::numeric_limits<double>::epsilon(),
- std::memory_order_relaxed);
-
- secondsPerBurst_.store(
- burst / rate + std::numeric_limits<double>::epsilon(),
- std::memory_order_relaxed);
-
- set_capacity(tokens, nowInSeconds);
+ /**
+ * Re-initialize token bucket.
+ *
+ * Thread-safe.
+ *
+ * @param zeroTime Initial time at which to consider the token bucket
+ * starting to fill. Defaults to 0, so by default token
+ * bucket is reset to "full".
+ */
+ void reset(double zeroTime = 0) noexcept {
+ zeroTime_ = zeroTime;
}
- void set_capacity(double tokens, double nowInSeconds) noexcept {
- const double secondsPerToken = std::atomic_load_explicit(
- &secondsPerToken_, std::memory_order_relaxed);
-
- const double secondsPerBurst = std::atomic_load_explicit(
- &secondsPerBurst_, std::memory_order_relaxed);
-
- double newTime = nowInSeconds - std::min(
- tokens * secondsPerToken, secondsPerBurst);
-
- time_.store(newTime, std::memory_order_relaxed);
+ /**
+ * Attempts to consume some number of tokens. Tokens are first added to the
+ * bucket based on the time elapsed since the last attempt to consume tokens.
+ * Note: Attempts to consume more tokens than the burst size will always
+ * fail.
+ *
+ * Thread-safe.
+ *
+ * @param toConsume The number of tokens to consume.
+ * @param rate Number of tokens to generate per second.
+ * @param burstSize Maximum burst size. Must be greater than 0.
+ * @param nowInSeconds Current time in seconds. Should be monotonically
+ * increasing from the nowInSeconds specified in
+ * this token bucket's constructor.
+ * @return True if the rate limit check passed, false otherwise.
+ */
+ bool consume(
+ double toConsume,
+ double rate,
+ double burstSize,
+ double nowInSeconds = defaultClockNow()) {
+ assert(rate > 0);
+ assert(burstSize > 0);
+
+ return this->consumeImpl(
+ rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
+ if (tokens < toConsume) {
+ return false;
+ }
+ tokens -= toConsume;
+ return true;
+ });
}
- // If there are `tokens` avilable at `nowInSeconds`, consume them and
- // return true. Otherwise, return false.
- //
- // This implementation is written in a lock-free manner using a
- // compare-and-exchange loop, with branch prediction optimized to minimize
- // time spent in the 'success' case which performs a write.
- bool consume(double tokens, double nowInSeconds) noexcept {
- const double secondsNeeded = tokens * std::atomic_load_explicit(
- &secondsPerToken_, std::memory_order_relaxed);
-
- const double minTime = nowInSeconds - std::atomic_load_explicit(
- &secondsPerBurst_, std::memory_order_relaxed);
+ /**
+ * Similar to consume, but always consumes some number of tokens. If the
+ * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
+ * bucket is drained.
+ *
+ * Thread-safe.
+ *
+ * @param toConsume The number of tokens to consume.
+ * @param rate Number of tokens to generate per second.
+ * @param burstSize Maximum burst size. Must be greater than 0.
+ * @param nowInSeconds Current time in seconds. Should be monotonically
+ * increasing from the nowInSeconds specified in
+ * this token bucket's constructor.
+ * @return number of tokens that were consumed.
+ */
+ double consumeOrDrain(
+ double toConsume,
+ double rate,
+ double burstSize,
+ double nowInSeconds = defaultClockNow()) {
+ assert(rate > 0);
+ assert(burstSize > 0);
- double oldTime =
- std::atomic_load_explicit(&time_, std::memory_order_relaxed);
- double newTime = oldTime;
+ double consumed;
+ this->consumeImpl(
+ rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
+ if (tokens < toConsume) {
+ consumed = tokens;
+ tokens = 0.0;
+ } else {
+ consumed = toConsume;
+ tokens -= toConsume;
+ }
+ return true;
+ });
+ return consumed;
+ }
- // Limit the number of available tokens to 'burst'. We don't need to do
- // this inside the loop because if we iterate more than once another
- // caller will have performed an update that also covered this
- // calculation. Also, tell the compiler to optimize branch prediction to
- // minimize time spent between reads and writes in the success case
- if (UNLIKELY(minTime > oldTime)) {
- newTime = minTime;
- }
+ /**
+ * Returns the number of tokens currently available.
+ *
+ * Thread-safe (but returned value may immediately be outdated).
+ */
+ double available(
+ double rate,
+ double burstSize,
+ double nowInSeconds = defaultClockNow()) const noexcept {
+ assert(rate > 0);
+ assert(burstSize > 0);
+
+ return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
+ }
- while (true) {
- newTime += secondsNeeded;
+ /**
+ * Returns the current time in seconds since Epoch.
+ */
+ static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) {
+ return std::chrono::duration_cast<std::chrono::duration<double>>(
+ ClockT::timeSinceEpoch())
+ .count();
+ }
- // Optimize for the write-contention case, to minimize the impact of
- // branch misprediction on other threads
- if (UNLIKELY(newTime > nowInSeconds)) {
+ private:
+ template <typename TCallback>
+ bool consumeImpl(
+ double rate,
+ double burstSize,
+ double nowInSeconds,
+ const TCallback& callback) {
+ auto zeroTimeOld = zeroTime_.load();
+ double zeroTimeNew;
+ do {
+ auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
+ if (!callback(tokens)) {
return false;
}
-
- // Optimize for the write-contention case, to minimize the impact of
- // branch misprediction on other threads
- if (LIKELY(std::atomic_compare_exchange_weak_explicit(
- &time_, &oldTime, newTime,
- std::memory_order_relaxed, std::memory_order_relaxed))) {
- return true;
- }
-
- newTime = oldTime;
- }
+ zeroTimeNew = nowInSeconds - tokens / rate;
+ } while (
+ UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
return true;
}
- // Similar to consume, but will always consume some number of tokens.
- double consumeOrDrain(double tokens, double nowInSeconds) noexcept {
- const double secondsPerToken = std::atomic_load_explicit(
- &secondsPerToken_, std::memory_order_relaxed);
-
- const double secondsNeeded = tokens * secondsPerToken;
- const double minTime = nowInSeconds - std::atomic_load_explicit(
- &secondsPerBurst_, std::memory_order_relaxed);
-
- double oldTime =
- std::atomic_load_explicit(&time_, std::memory_order_relaxed);
- double newTime = oldTime;
+ std::atomic<double> zeroTime_ FOLLY_ALIGN_TO_AVOID_FALSE_SHARING;
+};
+/**
+ * Specialization of ParameterizedDynamicTokenBucket with a fixed token
+ * generation rate and a fixed maximum burst size.
+ */
+template <typename ClockT = DefaultTokenBucketClock>
+class ParameterizedTokenBucket {
+ private:
+ using Impl = ParameterizedDynamicTokenBucket<ClockT>;
- // Limit the number of available tokens to 'burst'.
- // Also, tell the compiler to optimize branch prediction to
- // minimize time spent between reads and writes in the success case
- if (UNLIKELY(minTime > oldTime)) {
- newTime = minTime;
- }
+ public:
+ /**
+ * Construct a token bucket with a specific maximum rate and burst size.
+ *
+ * @param genRate Number of tokens to generate per second.
+ * @param burstSize Maximum burst size. Must be greater than 0.
+ * @param zeroTime Initial time at which to consider the token bucket
+ * starting to fill. Defaults to 0, so by default token
+ * bucket is "full" after construction.
+ */
+ ParameterizedTokenBucket(
+ double genRate,
+ double burstSize,
+ double zeroTime = 0) noexcept
+ : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
+ assert(rate_ > 0);
+ assert(burstSize_ > 0);
+ }
- double consumed;
+ /**
+ * Copy constructor.
+ *
+ * Warning: not thread safe!
+ */
+ ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept =
+ default;
+
+ /**
+ * Copy-assignment operator.
+ *
+ * Warning: not thread safe!
+ */
+ ParameterizedTokenBucket& operator=(
+ const ParameterizedTokenBucket& other) noexcept = default;
+
+ /**
+ * Change rate and burst size.
+ *
+ * Warning: not thread safe!
+ *
+ * @param genRate Number of tokens to generate per second.
+ * @param burstSize Maximum burst size. Must be greater than 0.
+ * @param nowInSeconds Current time in seconds. Should be monotonically
+ * increasing from the nowInSeconds specified in
+ * this token bucket's constructor.
+ */
+ void reset(
+ double genRate,
+ double burstSize,
+ double nowInSeconds = defaultClockNow()) noexcept {
+ assert(genRate > 0);
+ assert(burstSize > 0);
+ double availTokens = available(nowInSeconds);
+ rate_ = genRate;
+ burstSize_ = burstSize;
+ setCapacity(availTokens, nowInSeconds);
+ }
- newTime += secondsNeeded;
+ /**
+ * Change number of tokens in bucket.
+ *
+ * Warning: not thread safe!
+ *
+ * @param tokens Desired number of tokens in bucket after the call.
+ * @param nowInSeconds Current time in seconds. Should be monotonically
+ * increasing from the nowInSeconds specified in
+ * this token bucket's constructor.
+ */
+ void setCapacity(double tokens, double nowInSeconds) noexcept {
+ tokenBucket_.reset(nowInSeconds - tokens / rate_);
+ }
- consumed = (newTime - nowInSeconds) / secondsPerToken;
- time_.store(newTime, std::memory_order_relaxed);
+ /**
+ * Attempts to consume some number of tokens. Tokens are first added to the
+ * bucket based on the time elapsed since the last attempt to consume tokens.
+ * Note: Attempts to consume more tokens than the burst size will always
+ * fail.
+ *
+ * Thread-safe.
+ *
+ * @param toConsume The number of tokens to consume.
+ * @param nowInSeconds Current time in seconds. Should be monotonically
+ * increasing from the nowInSeconds specified in
+ * this token bucket's constructor.
+ * @return True if the rate limit check passed, false otherwise.
+ */
+ bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
+ return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
+ }
- return consumed;
+ /**
+ * Similar to consume, but always consumes some number of tokens. If the
+ * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
+ * bucket is drained.
+ *
+ * Thread-safe.
+ *
+ * @param toConsume The number of tokens to consume.
+ * @param nowInSeconds Current time in seconds. Should be monotonically
+ * increasing from the nowInSeconds specified in
+ * this token bucket's constructor.
+ * @return number of tokens that were consumed.
+ */
+ double consumeOrDrain(
+ double toConsume,
+ double nowInSeconds = defaultClockNow()) {
+ return tokenBucket_.consumeOrDrain(
+ toConsume, rate_, burstSize_, nowInSeconds);
}
- double available(double nowInSeconds = defaultClockNow()) const noexcept {
- double time =
- std::atomic_load_explicit(&time_, std::memory_order_relaxed);
+ /**
+ * Returns the number of tokens currently available.
+ *
+ * Thread-safe (but returned value may immediately be outdated).
+ */
+ double available(double nowInSeconds = defaultClockNow()) const {
+ return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
+ }
- double deltaTime = std::min(
- std::atomic_load_explicit(&secondsPerBurst_,
- std::memory_order_relaxed),
- nowInSeconds - time);
+ /**
+ * Returns the number of tokens generated per second.
+ *
+ * Thread-safe (but returned value may immediately be outdated).
+ */
+ double rate() const noexcept {
+ return rate_;
+ }
- return std::max(0.0, deltaTime / std::atomic_load_explicit(
- &secondsPerToken_, std::memory_order_relaxed));
+ /**
+ * Returns the maximum burst size.
+ *
+ * Thread-safe (but returned value may immediately be outdated).
+ */
+ double burst() const noexcept {
+ return burstSize_;
}
- static double defaultClockNow() {
- return std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now().time_since_epoch()
- ).count() / 1000000.0;
+ /**
+ * Returns the current time in seconds since Epoch.
+ */
+ static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
+ return Impl::defaultClockNow();
}
+
+ private:
+ Impl tokenBucket_;
+ double rate_;
+ double burstSize_;
};
+using TokenBucket = ParameterizedTokenBucket<>;
+using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>;
}
INSTANTIATE_TEST_CASE_P(TokenBucket,
TokenBucketTest,
::testing::ValuesIn(rateToConsumeSize));
+
+void doTokenBucketTest(double maxQps, double consumeSize) {
+ const double tenMillisecondBurst = maxQps * 0.010;
+ // Select a burst size of 10 milliseconds at the max rate or the consume size
+ // if 10 ms at maxQps is too small.
+ const double burstSize = std::max(consumeSize, tenMillisecondBurst);
+ TokenBucket tokenBucket(maxQps, burstSize, 0);
+ double tokenCounter = 0;
+ double currentTime = 0;
+ // Simulate time advancing 10 seconds
+ for (; currentTime <= 10.0; currentTime += 0.001) {
+ EXPECT_FALSE(tokenBucket.consume(burstSize + 1, currentTime));
+ while (tokenBucket.consume(consumeSize, currentTime)) {
+ tokenCounter += consumeSize;
+ }
+ // Tokens consumed should exceed some lower bound based on maxQps.
+ // Note: The token bucket implementation is not precise, so the lower bound
+ // is somewhat fudged. The upper bound is accurate however.
+ EXPECT_LE(maxQps * currentTime * 0.9 - 1, tokenCounter);
+ // Tokens consumed should not exceed some upper bound based on maxQps.
+ EXPECT_GE(maxQps * currentTime + 1e-6, tokenCounter);
+ }
+}
+
+TEST(TokenBucket, sanity) {
+ doTokenBucketTest(100, 1);
+ doTokenBucketTest(1000, 1);
+ doTokenBucketTest(10000, 1);
+ // Consume more than one at a time.
+ doTokenBucketTest(10000, 5);
+}
+
+TEST(TokenBucket, ReverseTime2) {
+ const double rate = 1000;
+ TokenBucket tokenBucket(rate, rate * 0.01 + 1e-6);
+ size_t count = 0;
+ while (tokenBucket.consume(1, 0.1)) {
+ count += 1;
+ }
+ EXPECT_EQ(10, count);
+ // Going backwards in time has no affect on the toke count (this protects
+ // against different threads providing out of order timestamps).
+ double tokensBefore = tokenBucket.available();
+ EXPECT_FALSE(tokenBucket.consume(1, 0.09999999));
+ EXPECT_EQ(tokensBefore, tokenBucket.available());
+}
+
+TEST(TokenBucket, drainOnFail) {
+ DynamicTokenBucket tokenBucket;
+
+ // Almost empty the bucket
+ EXPECT_TRUE(tokenBucket.consume(9, 10, 10, 1));
+
+ // Request more tokens than available
+ EXPECT_FALSE(tokenBucket.consume(5, 10, 10, 1));
+ EXPECT_DOUBLE_EQ(1.0, tokenBucket.available(10, 10, 1));
+
+ // Again request more tokens than available, but ask to drain
+ EXPECT_DOUBLE_EQ(1.0, tokenBucket.consumeOrDrain(5, 10, 10, 1));
+ EXPECT_DOUBLE_EQ(0.0, tokenBucket.consumeOrDrain(1, 10, 10, 1));
+}