2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include <folly/Likely.h>
24 #include <folly/concurrency/CacheLocality.h>
29 * Default clock class used by ParameterizedDynamicTokenBucket and derived
30 * classes. User-defined clock classes must be steady (monotonic) and define a
31 * static function std::chrono::duration<> timeSinceEpoch().
33 struct DefaultTokenBucketClock {
34 static auto timeSinceEpoch() noexcept
35 -> decltype(std::chrono::steady_clock::now().time_since_epoch()) {
36 return std::chrono::steady_clock::now().time_since_epoch();
41 * Thread-safe (atomic) token bucket implementation.
43 * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
44 * of events with an average rate and some amount of burstiness. The canonical
45 * example is a packet switched network: the network can accept some number of
46 * bytes per second and the bytes come in finite packets (bursts). A token
47 * bucket stores up to a fixed number of tokens (the burst size). Some number
48 * of tokens are removed when an event occurs. The tokens are replenished at a
51 * This implementation records the last time it was updated. This allows the
52 * token bucket to add tokens "just in time" when tokens are requested.
54 * The "dynamic" base variant allows the token generation rate and maximum
55 * burst size to change with every token consumption.
57 * @tparam ClockT Clock type, must be steady i.e. monotonic.
59 template <typename ClockT = DefaultTokenBucketClock>
60 class ParameterizedDynamicTokenBucket {
65 * @param zeroTime Initial time at which to consider the token bucket
66 * starting to fill. Defaults to 0, so by default token
67 * buckets are "full" after construction.
69 explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept
70 : zeroTime_(zeroTime) {}
75 * Thread-safe. (Copy constructors of derived classes may not be thread-safe
78 ParameterizedDynamicTokenBucket(
79 const ParameterizedDynamicTokenBucket& other) noexcept
80 : zeroTime_(other.zeroTime_.load()) {}
83 * Copy-assignment operator.
85 * Warning: not thread safe for the object being assigned to (including
86 * self-assignment). Thread-safe for the other object.
88 ParameterizedDynamicTokenBucket& operator=(
89 const ParameterizedDynamicTokenBucket& other) noexcept {
90 zeroTime_ = other.zeroTime_.load();
95 * Re-initialize token bucket.
99 * @param zeroTime Initial time at which to consider the token bucket
100 * starting to fill. Defaults to 0, so by default token
101 * bucket is reset to "full".
103 void reset(double zeroTime = 0) noexcept {
104 zeroTime_ = zeroTime;
108 * Returns the current time in seconds since Epoch.
110 static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) {
111 return std::chrono::duration_cast<std::chrono::duration<double>>(
112 ClockT::timeSinceEpoch())
117 * Attempts to consume some number of tokens. Tokens are first added to the
118 * bucket based on the time elapsed since the last attempt to consume tokens.
119 * Note: Attempts to consume more tokens than the burst size will always
124 * @param toConsume The number of tokens to consume.
125 * @param rate Number of tokens to generate per second.
126 * @param burstSize Maximum burst size. Must be greater than 0.
127 * @param nowInSeconds Current time in seconds. Should be monotonically
128 * increasing from the nowInSeconds specified in
129 * this token bucket's constructor.
130 * @return True if the rate limit check passed, false otherwise.
136 double nowInSeconds = defaultClockNow()) {
138 assert(burstSize > 0);
140 return this->consumeImpl(
141 rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
142 if (tokens < toConsume) {
151 * Similar to consume, but always consumes some number of tokens. If the
152 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
157 * @param toConsume The number of tokens to consume.
158 * @param rate Number of tokens to generate per second.
159 * @param burstSize Maximum burst size. Must be greater than 0.
160 * @param nowInSeconds Current time in seconds. Should be monotonically
161 * increasing from the nowInSeconds specified in
162 * this token bucket's constructor.
163 * @return number of tokens that were consumed.
165 double consumeOrDrain(
169 double nowInSeconds = defaultClockNow()) {
171 assert(burstSize > 0);
175 rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
176 if (tokens < toConsume) {
180 consumed = toConsume;
189 * Returns the number of tokens currently available.
191 * Thread-safe (but returned value may immediately be outdated).
196 double nowInSeconds = defaultClockNow()) const noexcept {
198 assert(burstSize > 0);
200 return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
204 template <typename TCallback>
209 const TCallback& callback) {
210 auto zeroTimeOld = zeroTime_.load();
213 auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
214 if (!callback(tokens)) {
217 zeroTimeNew = nowInSeconds - tokens / rate;
219 UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
224 alignas(hardware_destructive_interference_size) std::atomic<double> zeroTime_;
228 * Specialization of ParameterizedDynamicTokenBucket with a fixed token
229 * generation rate and a fixed maximum burst size.
231 template <typename ClockT = DefaultTokenBucketClock>
232 class ParameterizedTokenBucket {
234 using Impl = ParameterizedDynamicTokenBucket<ClockT>;
238 * Construct a token bucket with a specific maximum rate and burst size.
240 * @param genRate Number of tokens to generate per second.
241 * @param burstSize Maximum burst size. Must be greater than 0.
242 * @param zeroTime Initial time at which to consider the token bucket
243 * starting to fill. Defaults to 0, so by default token
244 * bucket is "full" after construction.
246 ParameterizedTokenBucket(
249 double zeroTime = 0) noexcept
250 : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
252 assert(burstSize_ > 0);
258 * Warning: not thread safe!
260 ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept =
264 * Copy-assignment operator.
266 * Warning: not thread safe!
268 ParameterizedTokenBucket& operator=(
269 const ParameterizedTokenBucket& other) noexcept = default;
272 * Returns the current time in seconds since Epoch.
274 static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
275 return Impl::defaultClockNow();
279 * Change rate and burst size.
281 * Warning: not thread safe!
283 * @param genRate Number of tokens to generate per second.
284 * @param burstSize Maximum burst size. Must be greater than 0.
285 * @param nowInSeconds Current time in seconds. Should be monotonically
286 * increasing from the nowInSeconds specified in
287 * this token bucket's constructor.
292 double nowInSeconds = defaultClockNow()) noexcept {
294 assert(burstSize > 0);
295 double availTokens = available(nowInSeconds);
297 burstSize_ = burstSize;
298 setCapacity(availTokens, nowInSeconds);
302 * Change number of tokens in bucket.
304 * Warning: not thread safe!
306 * @param tokens Desired number of tokens in bucket after the call.
307 * @param nowInSeconds Current time in seconds. Should be monotonically
308 * increasing from the nowInSeconds specified in
309 * this token bucket's constructor.
311 void setCapacity(double tokens, double nowInSeconds) noexcept {
312 tokenBucket_.reset(nowInSeconds - tokens / rate_);
316 * Attempts to consume some number of tokens. Tokens are first added to the
317 * bucket based on the time elapsed since the last attempt to consume tokens.
318 * Note: Attempts to consume more tokens than the burst size will always
323 * @param toConsume The number of tokens to consume.
324 * @param nowInSeconds Current time in seconds. Should be monotonically
325 * increasing from the nowInSeconds specified in
326 * this token bucket's constructor.
327 * @return True if the rate limit check passed, false otherwise.
329 bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
330 return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
334 * Similar to consume, but always consumes some number of tokens. If the
335 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
340 * @param toConsume The number of tokens to consume.
341 * @param nowInSeconds Current time in seconds. Should be monotonically
342 * increasing from the nowInSeconds specified in
343 * this token bucket's constructor.
344 * @return number of tokens that were consumed.
346 double consumeOrDrain(
348 double nowInSeconds = defaultClockNow()) {
349 return tokenBucket_.consumeOrDrain(
350 toConsume, rate_, burstSize_, nowInSeconds);
354 * Returns the number of tokens currently available.
356 * Thread-safe (but returned value may immediately be outdated).
358 double available(double nowInSeconds = defaultClockNow()) const {
359 return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
363 * Returns the number of tokens generated per second.
365 * Thread-safe (but returned value may immediately be outdated).
367 double rate() const noexcept {
372 * Returns the maximum burst size.
374 * Thread-safe (but returned value may immediately be outdated).
376 double burst() const noexcept {
386 using TokenBucket = ParameterizedTokenBucket<>;
387 using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>;