2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Likely.h>
28 /** Threadsafe TokenBucket implementation, based on the idea of
29 * converting tokens into time and maintaining state as a timestamp relative to
30 * now. The number of tokens available is represented by the delta between now
31 * and the timestamp, and the 'burst' is represented by the maximum delta.
35 std::atomic<double> time_;
36 std::atomic<double> secondsPerToken_;
37 std::atomic<double> secondsPerBurst_;
40 TokenBucket(double rate, double burst, double nowInSeconds) noexcept
41 : time_(nowInSeconds) {
42 reset(rate, burst, nowInSeconds);
45 void reset(double rate, double burst, double nowInSeconds) noexcept {
46 double tokens = available(nowInSeconds);
48 secondsPerToken_.store(
49 1.0 / rate - std::numeric_limits<double>::epsilon(),
50 std::memory_order_relaxed);
52 secondsPerBurst_.store(
53 burst / rate + std::numeric_limits<double>::epsilon(),
54 std::memory_order_relaxed);
56 set_capacity(tokens, nowInSeconds);
59 void set_capacity(double tokens, double nowInSeconds) noexcept {
60 const double secondsPerToken = std::atomic_load_explicit(
61 &secondsPerToken_, std::memory_order_relaxed);
63 const double secondsPerBurst = std::atomic_load_explicit(
64 &secondsPerBurst_, std::memory_order_relaxed);
66 double newTime = nowInSeconds - std::min(
67 tokens * secondsPerToken, secondsPerBurst);
69 time_.store(newTime, std::memory_order_relaxed);
72 // If there are `tokens` avilable at `nowInSeconds`, consume them and
73 // return true. Otherwise, return false.
75 // This implementation is written in a lock-free manner using a
76 // compare-and-exchange loop, with branch prediction optimized to minimize
77 // time spent in the 'success' case which performs a write.
78 bool consume(double tokens, double nowInSeconds) noexcept {
79 const double secondsNeeded = tokens * std::atomic_load_explicit(
80 &secondsPerToken_, std::memory_order_relaxed);
82 const double minTime = nowInSeconds - std::atomic_load_explicit(
83 &secondsPerBurst_, std::memory_order_relaxed);
86 std::atomic_load_explicit(&time_, std::memory_order_relaxed);
87 double newTime = oldTime;
89 // Limit the number of available tokens to 'burst'. We don't need to do
90 // this inside the loop because if we iterate more than once another
91 // caller will have performed an update that also covered this
92 // calculation. Also, tell the compiler to optimize branch prediction to
93 // minimize time spent between reads and writes in the success case
94 if (UNLIKELY(minTime > oldTime)) {
99 newTime += secondsNeeded;
101 // Optimize for the write-contention case, to minimize the impact of
102 // branch misprediction on other threads
103 if (UNLIKELY(newTime > nowInSeconds)) {
107 // Optimize for the write-contention case, to minimize the impact of
108 // branch misprediction on other threads
109 if (LIKELY(std::atomic_compare_exchange_weak_explicit(
110 &time_, &oldTime, newTime,
111 std::memory_order_relaxed, std::memory_order_relaxed))) {
121 // Similar to consume, but will always consume some number of tokens.
122 double consumeOrDrain(double tokens, double nowInSeconds) noexcept {
123 const double secondsPerToken = std::atomic_load_explicit(
124 &secondsPerToken_, std::memory_order_relaxed);
126 const double secondsNeeded = tokens * secondsPerToken;
127 const double minTime = nowInSeconds - std::atomic_load_explicit(
128 &secondsPerBurst_, std::memory_order_relaxed);
131 std::atomic_load_explicit(&time_, std::memory_order_relaxed);
132 double newTime = oldTime;
135 // Limit the number of available tokens to 'burst'.
136 // Also, tell the compiler to optimize branch prediction to
137 // minimize time spent between reads and writes in the success case
138 if (UNLIKELY(minTime > oldTime)) {
144 newTime += secondsNeeded;
146 consumed = (newTime - nowInSeconds) / secondsPerToken;
147 time_.store(newTime, std::memory_order_relaxed);
152 double available(double nowInSeconds = defaultClockNow()) const noexcept {
154 std::atomic_load_explicit(&time_, std::memory_order_relaxed);
156 double deltaTime = std::min(
157 std::atomic_load_explicit(&secondsPerBurst_,
158 std::memory_order_relaxed),
159 nowInSeconds - time);
161 return std::max(0.0, deltaTime / std::atomic_load_explicit(
162 &secondsPerToken_, std::memory_order_relaxed));
165 static double defaultClockNow() {
166 return std::chrono::duration_cast<std::chrono::microseconds>(
167 std::chrono::steady_clock::now().time_since_epoch()
168 ).count() / 1000000.0;