2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include <folly/Likely.h>
24 #include <folly/detail/CacheLocality.h>
29 * Default clock class used by ParameterizedDynamicTokenBucket and derived
30 * classes. User-defined clock classes must be steady (monotonic) and define a
31 * static function std::chrono::duration<> timeSinceEpoch().
33 struct DefaultTokenBucketClock {
34 static auto timeSinceEpoch() noexcept
35 -> decltype(std::chrono::steady_clock::now().time_since_epoch()) {
36 return std::chrono::steady_clock::now().time_since_epoch();
41 * Thread-safe (atomic) token bucket implementation.
43 * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
44 * of events with an average rate and some amount of burstiness. The canonical
45 * example is a packet switched network: the network can accept some number of
46 * bytes per second and the bytes come in finite packets (bursts). A token
47 * bucket stores up to a fixed number of tokens (the burst size). Some number
48 * of tokens are removed when an event occurs. The tokens are replenished at a
51 * This implementation records the last time it was updated. This allows the
52 * token bucket to add tokens "just in time" when tokens are requested.
54 * The "dynamic" base variant allows the token generation rate and maximum
55 * burst size to change with every token consumption.
57 * @tparam ClockT Clock type, must be steady i.e. monotonic.
59 template <typename ClockT = DefaultTokenBucketClock>
60 class ParameterizedDynamicTokenBucket {
65 * @param zeroTime Initial time at which to consider the token bucket
66 * starting to fill. Defaults to 0, so by default token
67 * buckets are "full" after construction.
69 explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept
70 : zeroTime_(zeroTime) {}
75 * Thread-safe. (Copy constructors of derived classes may not be thread-safe
78 ParameterizedDynamicTokenBucket(
79 const ParameterizedDynamicTokenBucket& other) noexcept
80 : zeroTime_(other.zeroTime_.load()) {}
83 * Copy-assignment operator.
85 * Warning: not thread safe for the object being assigned to (including
86 * self-assignment). Thread-safe for the other object.
88 ParameterizedDynamicTokenBucket& operator=(
89 const ParameterizedDynamicTokenBucket& other) noexcept {
90 zeroTime_ = other.zeroTime_.load();
95 * Re-initialize token bucket.
99 * @param zeroTime Initial time at which to consider the token bucket
100 * starting to fill. Defaults to 0, so by default token
101 * bucket is reset to "full".
103 void reset(double zeroTime = 0) noexcept {
104 zeroTime_ = zeroTime;
108 * Attempts to consume some number of tokens. Tokens are first added to the
109 * bucket based on the time elapsed since the last attempt to consume tokens.
110 * Note: Attempts to consume more tokens than the burst size will always
115 * @param toConsume The number of tokens to consume.
116 * @param rate Number of tokens to generate per second.
117 * @param burstSize Maximum burst size. Must be greater than 0.
118 * @param nowInSeconds Current time in seconds. Should be monotonically
119 * increasing from the nowInSeconds specified in
120 * this token bucket's constructor.
121 * @return True if the rate limit check passed, false otherwise.
127 double nowInSeconds = defaultClockNow()) {
129 assert(burstSize > 0);
131 return this->consumeImpl(
132 rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
133 if (tokens < toConsume) {
142 * Similar to consume, but always consumes some number of tokens. If the
143 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
148 * @param toConsume The number of tokens to consume.
149 * @param rate Number of tokens to generate per second.
150 * @param burstSize Maximum burst size. Must be greater than 0.
151 * @param nowInSeconds Current time in seconds. Should be monotonically
152 * increasing from the nowInSeconds specified in
153 * this token bucket's constructor.
154 * @return number of tokens that were consumed.
156 double consumeOrDrain(
160 double nowInSeconds = defaultClockNow()) {
162 assert(burstSize > 0);
166 rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
167 if (tokens < toConsume) {
171 consumed = toConsume;
180 * Returns the number of tokens currently available.
182 * Thread-safe (but returned value may immediately be outdated).
187 double nowInSeconds = defaultClockNow()) const noexcept {
189 assert(burstSize > 0);
191 return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
195 * Returns the current time in seconds since Epoch.
197 static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) {
198 return std::chrono::duration_cast<std::chrono::duration<double>>(
199 ClockT::timeSinceEpoch())
204 template <typename TCallback>
209 const TCallback& callback) {
210 auto zeroTimeOld = zeroTime_.load();
213 auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
214 if (!callback(tokens)) {
217 zeroTimeNew = nowInSeconds - tokens / rate;
219 UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
224 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<double> zeroTime_;
228 * Specialization of ParameterizedDynamicTokenBucket with a fixed token
229 * generation rate and a fixed maximum burst size.
231 template <typename ClockT = DefaultTokenBucketClock>
232 class ParameterizedTokenBucket {
234 using Impl = ParameterizedDynamicTokenBucket<ClockT>;
238 * Construct a token bucket with a specific maximum rate and burst size.
240 * @param genRate Number of tokens to generate per second.
241 * @param burstSize Maximum burst size. Must be greater than 0.
242 * @param zeroTime Initial time at which to consider the token bucket
243 * starting to fill. Defaults to 0, so by default token
244 * bucket is "full" after construction.
246 ParameterizedTokenBucket(
249 double zeroTime = 0) noexcept
250 : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
252 assert(burstSize_ > 0);
258 * Warning: not thread safe!
260 ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept =
264 * Copy-assignment operator.
266 * Warning: not thread safe!
268 ParameterizedTokenBucket& operator=(
269 const ParameterizedTokenBucket& other) noexcept = default;
272 * Change rate and burst size.
274 * Warning: not thread safe!
276 * @param genRate Number of tokens to generate per second.
277 * @param burstSize Maximum burst size. Must be greater than 0.
278 * @param nowInSeconds Current time in seconds. Should be monotonically
279 * increasing from the nowInSeconds specified in
280 * this token bucket's constructor.
285 double nowInSeconds = defaultClockNow()) noexcept {
287 assert(burstSize > 0);
288 double availTokens = available(nowInSeconds);
290 burstSize_ = burstSize;
291 setCapacity(availTokens, nowInSeconds);
295 * Change number of tokens in bucket.
297 * Warning: not thread safe!
299 * @param tokens Desired number of tokens in bucket after the call.
300 * @param nowInSeconds Current time in seconds. Should be monotonically
301 * increasing from the nowInSeconds specified in
302 * this token bucket's constructor.
304 void setCapacity(double tokens, double nowInSeconds) noexcept {
305 tokenBucket_.reset(nowInSeconds - tokens / rate_);
309 * Attempts to consume some number of tokens. Tokens are first added to the
310 * bucket based on the time elapsed since the last attempt to consume tokens.
311 * Note: Attempts to consume more tokens than the burst size will always
316 * @param toConsume The number of tokens to consume.
317 * @param nowInSeconds Current time in seconds. Should be monotonically
318 * increasing from the nowInSeconds specified in
319 * this token bucket's constructor.
320 * @return True if the rate limit check passed, false otherwise.
322 bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
323 return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
327 * Similar to consume, but always consumes some number of tokens. If the
328 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
333 * @param toConsume The number of tokens to consume.
334 * @param nowInSeconds Current time in seconds. Should be monotonically
335 * increasing from the nowInSeconds specified in
336 * this token bucket's constructor.
337 * @return number of tokens that were consumed.
339 double consumeOrDrain(
341 double nowInSeconds = defaultClockNow()) {
342 return tokenBucket_.consumeOrDrain(
343 toConsume, rate_, burstSize_, nowInSeconds);
347 * Returns the number of tokens currently available.
349 * Thread-safe (but returned value may immediately be outdated).
351 double available(double nowInSeconds = defaultClockNow()) const {
352 return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
356 * Returns the number of tokens generated per second.
358 * Thread-safe (but returned value may immediately be outdated).
360 double rate() const noexcept {
365 * Returns the maximum burst size.
367 * Thread-safe (but returned value may immediately be outdated).
369 double burst() const noexcept {
374 * Returns the current time in seconds since Epoch.
376 static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
377 return Impl::defaultClockNow();
386 using TokenBucket = ParameterizedTokenBucket<>;
387 using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>;