From 280e99ae5d9a3becadc9640474d0edf9dc2b670d Mon Sep 17 00:00:00 2001 From: Michael Morton Date: Thu, 27 Aug 2015 15:44:43 -0700 Subject: [PATCH] Adding singular token bucket implementation. Summary: Contains only the central implementation of the TokenBucket, without moving other files to use the new implementation. Reviewed By: @yfeldblum Differential Revision: D2329892 --- folly/TokenBucket.h | 172 +++++++++++++++++++++++++++++++++ folly/test/Makefile.am | 5 + folly/test/TokenBucketTest.cpp | 74 ++++++++++++++ folly/test/TokenBucketTest.h | 28 ++++++ 4 files changed, 279 insertions(+) create mode 100644 folly/TokenBucket.h create mode 100644 folly/test/TokenBucketTest.cpp create mode 100644 folly/test/TokenBucketTest.h diff --git a/folly/TokenBucket.h b/folly/TokenBucket.h new file mode 100644 index 00000000..f5d04597 --- /dev/null +++ b/folly/TokenBucket.h @@ -0,0 +1,172 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include + +namespace folly { + +/** Threadsafe TokenBucket implementation, based on the idea of + * converting tokens into time and maintaining state as a timestamp relative to + * now. The number of tokens available is represented by the delta between now + * and the timestamp, and the 'burst' is represented by the maximum delta. + */ +class TokenBucket { + private: + std::atomic time_; + std::atomic secondsPerToken_; + std::atomic secondsPerBurst_; + + public: + TokenBucket(double rate, double burst, double nowInSeconds) noexcept + : time_(nowInSeconds) { + reset(rate, burst, nowInSeconds); + } + + void reset(double rate, double burst, double nowInSeconds) noexcept { + double tokens = available(nowInSeconds); + + secondsPerToken_.store( + 1.0 / rate - std::numeric_limits::epsilon(), + std::memory_order_relaxed); + + secondsPerBurst_.store( + burst / rate + std::numeric_limits::epsilon(), + std::memory_order_relaxed); + + set_capacity(tokens, nowInSeconds); + } + + void set_capacity(double tokens, double nowInSeconds) noexcept { + const double secondsPerToken = std::atomic_load_explicit( + &secondsPerToken_, std::memory_order_relaxed); + + const double secondsPerBurst = std::atomic_load_explicit( + &secondsPerBurst_, std::memory_order_relaxed); + + double newTime = nowInSeconds - std::min( + tokens * secondsPerToken, secondsPerBurst); + + time_.store(newTime, std::memory_order_relaxed); + } + + // If there are `tokens` avilable at `nowInSeconds`, consume them and + // return true. Otherwise, return false. + // + // This implementation is written in a lock-free manner using a + // compare-and-exchange loop, with branch prediction optimized to minimize + // time spent in the 'success' case which performs a write. + bool consume(double tokens, double nowInSeconds) noexcept { + const double secondsNeeded = tokens * std::atomic_load_explicit( + &secondsPerToken_, std::memory_order_relaxed); + + const double minTime = nowInSeconds - std::atomic_load_explicit( + &secondsPerBurst_, std::memory_order_relaxed); + + double oldTime = + std::atomic_load_explicit(&time_, std::memory_order_relaxed); + double newTime = oldTime; + + // Limit the number of available tokens to 'burst'. We don't need to do + // this inside the loop because if we iterate more than once another + // caller will have performed an update that also covered this + // calculation. Also, tell the compiler to optimize branch prediction to + // minimize time spent between reads and writes in the success case + if (UNLIKELY(minTime > oldTime)) { + newTime = minTime; + } + + while (true) { + newTime += secondsNeeded; + + // Optimize for the write-contention case, to minimize the impact of + // branch misprediction on other threads + if (UNLIKELY(newTime > nowInSeconds)) { + return false; + } + + // Optimize for the write-contention case, to minimize the impact of + // branch misprediction on other threads + if (LIKELY(std::atomic_compare_exchange_weak_explicit( + &time_, &oldTime, newTime, + std::memory_order_relaxed, std::memory_order_relaxed))) { + return true; + } + + newTime = oldTime; + } + + return true; + } + + // Similar to consume, but will always consume some number of tokens. + double consumeOrDrain(double tokens, double nowInSeconds) noexcept { + const double secondsPerToken = std::atomic_load_explicit( + &secondsPerToken_, std::memory_order_relaxed); + + const double secondsNeeded = tokens * secondsPerToken; + const double minTime = nowInSeconds - std::atomic_load_explicit( + &secondsPerBurst_, std::memory_order_relaxed); + + double oldTime = + std::atomic_load_explicit(&time_, std::memory_order_relaxed); + double newTime = oldTime; + + + // Limit the number of available tokens to 'burst'. + // Also, tell the compiler to optimize branch prediction to + // minimize time spent between reads and writes in the success case + if (UNLIKELY(minTime > oldTime)) { + newTime = minTime; + } + + double consumed; + + newTime += secondsNeeded; + + consumed = (newTime - nowInSeconds) / secondsPerToken; + time_.store(newTime, std::memory_order_relaxed); + + return consumed; + } + + double available(double nowInSeconds = defaultClockNow()) const noexcept { + double time = + std::atomic_load_explicit(&time_, std::memory_order_relaxed); + + double deltaTime = std::min( + std::atomic_load_explicit(&secondsPerBurst_, + std::memory_order_relaxed), + nowInSeconds - time); + + return std::max(0.0, deltaTime / std::atomic_load_explicit( + &secondsPerToken_, std::memory_order_relaxed)); + } + + static double defaultClockNow() { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch() + ).count() / 1000000.0; + } +}; + +} diff --git a/folly/test/Makefile.am b/folly/test/Makefile.am index f144a7a1..f2af913b 100644 --- a/folly/test/Makefile.am +++ b/folly/test/Makefile.am @@ -182,6 +182,11 @@ spooky_hash_v2_test_SOURCES = SpookyHashV2Test.cpp spooky_hash_v2_test_LDADD = libgtestmain.la $(top_builddir)/libfolly.la $(top_builddir)/libfollybenchmark.la TESTS += spooky_hash_v2_test +token_bucket_test_SOURCES = TokenBucketTest.cpp +token_bucket_test_LDADD = libgtest.la $(top_builddir)/libfolly.la $(top_builddir)/libfollybenchmark.la +TESTS += token_bucket_test + + futures_test_SOURCES = \ ../futures/test/CollectTest.cpp \ ../futures/test/ContextTest.cpp \ diff --git a/folly/test/TokenBucketTest.cpp b/folly/test/TokenBucketTest.cpp new file mode 100644 index 00000000..1709913e --- /dev/null +++ b/folly/test/TokenBucketTest.cpp @@ -0,0 +1,74 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +using namespace folly; + +TEST(TokenBucket, ReverseTime) { + const double rate = 1000; + TokenBucket tokenBucket(rate, rate * 0.01 + 1e-6, 0); + size_t count = 0; + while (tokenBucket.consume(1, 0.1)) { + count += 1; + } + EXPECT_EQ(10, count); + // Going backwards in time has no affect on the toke count (this protects + // against different threads providing out of order timestamps). + double tokensBefore = tokenBucket.available(); + EXPECT_FALSE(tokenBucket.consume(1, 0.09999999)); + EXPECT_EQ(tokensBefore, tokenBucket.available()); +} + +TEST_P(TokenBucketTest, sanity) { + std::pair params = GetParam(); + double rate = params.first; + double consumeSize = params.second; + + const double tenMillisecondBurst = rate * 0.010; + // Select a burst size of 10 milliseconds at the max rate or the consume size + // if 10 ms at rate is too small. + const double burstSize = std::max(consumeSize, tenMillisecondBurst); + TokenBucket tokenBucket(rate, burstSize, 0); + double tokenCounter = 0; + double currentTime = 0; + // Simulate time advancing 10 seconds + for (; currentTime <= 10.0; currentTime += 0.001) { + EXPECT_FALSE(tokenBucket.consume(burstSize + 1, currentTime)); + while (tokenBucket.consume(consumeSize, currentTime)) { + tokenCounter += consumeSize; + } + // Tokens consumed should exceed some lower bound based on rate. + // Note: The token bucket implementation is not precise, so the lower bound + // is somewhat fudged. The upper bound is accurate however. + EXPECT_LE(rate * currentTime * 0.9 - 1, tokenCounter); + // Tokens consumed should not exceed some upper bound based on rate. + EXPECT_GE(rate * currentTime + 1e-6, tokenCounter); + } +} + +static std::vector > rateToConsumeSize = { + {100, 1}, + {1000, 1}, + {10000, 1}, + {10000, 5}, +}; + +INSTANTIATE_TEST_CASE_P(TokenBucket, + TokenBucketTest, + ::testing::ValuesIn(rateToConsumeSize)); diff --git a/folly/test/TokenBucketTest.h b/folly/test/TokenBucketTest.h new file mode 100644 index 00000000..0e7d2a5b --- /dev/null +++ b/folly/test/TokenBucketTest.h @@ -0,0 +1,28 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace folly { + +struct TokenBucketTest : + public ::testing::TestWithParam > {}; + +} // folly -- 2.34.1