--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <atomic>
+#include <limits>
+#include <chrono>
+
+#include <folly/Likely.h>
+
+namespace folly {
+
+/** Threadsafe TokenBucket implementation, based on the idea of
+ * converting tokens into time and maintaining state as a timestamp relative to
+ * now. The number of tokens available is represented by the delta between now
+ * and the timestamp, and the 'burst' is represented by the maximum delta.
+ */
+class TokenBucket {
+ private:
+ std::atomic<double> time_;
+ std::atomic<double> secondsPerToken_;
+ std::atomic<double> secondsPerBurst_;
+
+ public:
+ TokenBucket(double rate, double burst, double nowInSeconds) noexcept
+ : time_(nowInSeconds) {
+ reset(rate, burst, nowInSeconds);
+ }
+
+ void reset(double rate, double burst, double nowInSeconds) noexcept {
+ double tokens = available(nowInSeconds);
+
+ secondsPerToken_.store(
+ 1.0 / rate - std::numeric_limits<double>::epsilon(),
+ std::memory_order_relaxed);
+
+ secondsPerBurst_.store(
+ burst / rate + std::numeric_limits<double>::epsilon(),
+ std::memory_order_relaxed);
+
+ set_capacity(tokens, nowInSeconds);
+ }
+
+ void set_capacity(double tokens, double nowInSeconds) noexcept {
+ const double secondsPerToken = std::atomic_load_explicit(
+ &secondsPerToken_, std::memory_order_relaxed);
+
+ const double secondsPerBurst = std::atomic_load_explicit(
+ &secondsPerBurst_, std::memory_order_relaxed);
+
+ double newTime = nowInSeconds - std::min(
+ tokens * secondsPerToken, secondsPerBurst);
+
+ time_.store(newTime, std::memory_order_relaxed);
+ }
+
+ // If there are `tokens` avilable at `nowInSeconds`, consume them and
+ // return true. Otherwise, return false.
+ //
+ // This implementation is written in a lock-free manner using a
+ // compare-and-exchange loop, with branch prediction optimized to minimize
+ // time spent in the 'success' case which performs a write.
+ bool consume(double tokens, double nowInSeconds) noexcept {
+ const double secondsNeeded = tokens * std::atomic_load_explicit(
+ &secondsPerToken_, std::memory_order_relaxed);
+
+ const double minTime = nowInSeconds - std::atomic_load_explicit(
+ &secondsPerBurst_, std::memory_order_relaxed);
+
+ double oldTime =
+ std::atomic_load_explicit(&time_, std::memory_order_relaxed);
+ double newTime = oldTime;
+
+ // Limit the number of available tokens to 'burst'. We don't need to do
+ // this inside the loop because if we iterate more than once another
+ // caller will have performed an update that also covered this
+ // calculation. Also, tell the compiler to optimize branch prediction to
+ // minimize time spent between reads and writes in the success case
+ if (UNLIKELY(minTime > oldTime)) {
+ newTime = minTime;
+ }
+
+ while (true) {
+ newTime += secondsNeeded;
+
+ // Optimize for the write-contention case, to minimize the impact of
+ // branch misprediction on other threads
+ if (UNLIKELY(newTime > nowInSeconds)) {
+ return false;
+ }
+
+ // Optimize for the write-contention case, to minimize the impact of
+ // branch misprediction on other threads
+ if (LIKELY(std::atomic_compare_exchange_weak_explicit(
+ &time_, &oldTime, newTime,
+ std::memory_order_relaxed, std::memory_order_relaxed))) {
+ return true;
+ }
+
+ newTime = oldTime;
+ }
+
+ return true;
+ }
+
+ // Similar to consume, but will always consume some number of tokens.
+ double consumeOrDrain(double tokens, double nowInSeconds) noexcept {
+ const double secondsPerToken = std::atomic_load_explicit(
+ &secondsPerToken_, std::memory_order_relaxed);
+
+ const double secondsNeeded = tokens * secondsPerToken;
+ const double minTime = nowInSeconds - std::atomic_load_explicit(
+ &secondsPerBurst_, std::memory_order_relaxed);
+
+ double oldTime =
+ std::atomic_load_explicit(&time_, std::memory_order_relaxed);
+ double newTime = oldTime;
+
+
+ // Limit the number of available tokens to 'burst'.
+ // Also, tell the compiler to optimize branch prediction to
+ // minimize time spent between reads and writes in the success case
+ if (UNLIKELY(minTime > oldTime)) {
+ newTime = minTime;
+ }
+
+ double consumed;
+
+ newTime += secondsNeeded;
+
+ consumed = (newTime - nowInSeconds) / secondsPerToken;
+ time_.store(newTime, std::memory_order_relaxed);
+
+ return consumed;
+ }
+
+ double available(double nowInSeconds = defaultClockNow()) const noexcept {
+ double time =
+ std::atomic_load_explicit(&time_, std::memory_order_relaxed);
+
+ double deltaTime = std::min(
+ std::atomic_load_explicit(&secondsPerBurst_,
+ std::memory_order_relaxed),
+ nowInSeconds - time);
+
+ return std::max(0.0, deltaTime / std::atomic_load_explicit(
+ &secondsPerToken_, std::memory_order_relaxed));
+ }
+
+ static double defaultClockNow() {
+ return std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()
+ ).count() / 1000000.0;
+ }
+};
+
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/test/TokenBucketTest.h>
+
+#include <gtest/gtest.h>
+
+using namespace folly;
+
+TEST(TokenBucket, ReverseTime) {
+ const double rate = 1000;
+ TokenBucket tokenBucket(rate, rate * 0.01 + 1e-6, 0);
+ size_t count = 0;
+ while (tokenBucket.consume(1, 0.1)) {
+ count += 1;
+ }
+ EXPECT_EQ(10, count);
+ // Going backwards in time has no affect on the toke count (this protects
+ // against different threads providing out of order timestamps).
+ double tokensBefore = tokenBucket.available();
+ EXPECT_FALSE(tokenBucket.consume(1, 0.09999999));
+ EXPECT_EQ(tokensBefore, tokenBucket.available());
+}
+
+TEST_P(TokenBucketTest, sanity) {
+ std::pair<double, double> params = GetParam();
+ double rate = params.first;
+ double consumeSize = params.second;
+
+ const double tenMillisecondBurst = rate * 0.010;
+ // Select a burst size of 10 milliseconds at the max rate or the consume size
+ // if 10 ms at rate is too small.
+ const double burstSize = std::max(consumeSize, tenMillisecondBurst);
+ TokenBucket tokenBucket(rate, burstSize, 0);
+ double tokenCounter = 0;
+ double currentTime = 0;
+ // Simulate time advancing 10 seconds
+ for (; currentTime <= 10.0; currentTime += 0.001) {
+ EXPECT_FALSE(tokenBucket.consume(burstSize + 1, currentTime));
+ while (tokenBucket.consume(consumeSize, currentTime)) {
+ tokenCounter += consumeSize;
+ }
+ // Tokens consumed should exceed some lower bound based on rate.
+ // Note: The token bucket implementation is not precise, so the lower bound
+ // is somewhat fudged. The upper bound is accurate however.
+ EXPECT_LE(rate * currentTime * 0.9 - 1, tokenCounter);
+ // Tokens consumed should not exceed some upper bound based on rate.
+ EXPECT_GE(rate * currentTime + 1e-6, tokenCounter);
+ }
+}
+
+static std::vector<std::pair<double, double> > rateToConsumeSize = {
+ {100, 1},
+ {1000, 1},
+ {10000, 1},
+ {10000, 5},
+};
+
+INSTANTIATE_TEST_CASE_P(TokenBucket,
+ TokenBucketTest,
+ ::testing::ValuesIn(rateToConsumeSize));