* 1000000 for initialSize, and the iteration count for n.
*/
#define BENCHMARK_PARAM(name, param) \
+ BENCHMARK_NAMED_PARAM(name, param, param)
+
+/*
+ * Like BENCHMARK_PARAM(), but allows a custom name to be specified for each
+ * parameter, rather than using the parameter value.
+ *
+ * Useful when the parameter value is not a valid token for string pasting,
+ * of when you want to specify multiple parameter arguments.
+ *
+ * For example:
+ *
+ * void addValue(uint n, int64_t bucketSize, int64_t min, int64_t max) {
+ * Histogram<int64_t> hist(bucketSize, min, max);
+ * int64_t num = min;
+ * FOR_EACH_RANGE (i, 0, n) {
+ * hist.addValue(num);
+ * ++num;
+ * if (num > max) { num = min; }
+ * }
+ * }
+ *
+ * BENCHMARK_NAMED_PARAM(addValue, 0_to_100, 1, 0, 100)
+ * BENCHMARK_NAMED_PARAM(addValue, 0_to_1000, 10, 0, 1000)
+ * BENCHMARK_NAMED_PARAM(addValue, 5k_to_20k, 250, 5000, 20000)
+ */
+#define BENCHMARK_NAMED_PARAM(name, param_name, ...) \
BENCHMARK_IMPL( \
- FB_CONCATENATE(name, FB_CONCATENATE(_, param)), \
- FB_STRINGIZE(name) "(" FB_STRINGIZE(param) ")", \
+ FB_CONCATENATE(name, FB_CONCATENATE(_, param_name)), \
+ FB_STRINGIZE(name) "(" FB_STRINGIZE(param_name) ")", \
unsigned, \
iters) { \
- name(iters, param); \
+ name(iters, ## __VA_ARGS__); \
}
/**
* A combination of BENCHMARK_RELATIVE and BENCHMARK_PARAM.
*/
#define BENCHMARK_RELATIVE_PARAM(name, param) \
+ BENCHMARK_RELATIVE_NAMED_PARAM(name, param, param)
+
+/**
+ * A combination of BENCHMARK_RELATIVE and BENCHMARK_NAMED_PARAM.
+ */
+#define BENCHMARK_RELATIVE_NAMED_PARAM(name, param_name, ...) \
BENCHMARK_IMPL( \
- FB_CONCATENATE(name, FB_CONCATENATE(_, param)), \
- "%" FB_STRINGIZE(name) "(" FB_STRINGIZE(param) ")", \
+ FB_CONCATENATE(name, FB_CONCATENATE(_, param_name)), \
+ "%" FB_STRINGIZE(name) "(" FB_STRINGIZE(param_name) ")", \
unsigned, \
iters) { \
- name(iters, param); \
+ name(iters, ## __VA_ARGS__); \
}
/**
#define FOLLY_HISTOGRAM_H_
#include <cstddef>
-#include <cstdint>
#include <limits>
#include <string>
#include <vector>
#include <stdexcept>
+#include "folly/detail/Stats.h"
+
namespace folly {
namespace detail {
class Histogram {
public:
typedef T ValueType;
-
- struct Bucket {
- Bucket()
- : sum(0),
- count(0) {}
-
- void clear() {
- sum = 0;
- count = 0;
- }
-
- Bucket& merge(const Bucket &bucket) {
- if (this != &bucket) {
- sum += bucket.sum;
- count += bucket.count;
- }
- return *this;
- }
-
- Bucket& operator=(const Bucket& bucket) {
- if (this != &bucket) {
- sum = bucket.sum;
- count = bucket.count;
- }
- return *this;
- }
-
- ValueType sum;
- uint64_t count;
- };
+ typedef detail::Bucket<T> Bucket;
Histogram(ValueType bucketSize, ValueType min, ValueType max)
: buckets_(bucketSize, min, max, Bucket()) {}
}
for (int i = 0; i < buckets_.getNumBuckets(); i++) {
- buckets_.getByIndex(i).merge(hist.buckets_.getByIndex(i));
+ buckets_.getByIndex(i) += hist.buckets_.getByIndex(i);
}
}
--- /dev/null
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_DETAIL_STATS_H_
+#define FOLLY_DETAIL_STATS_H_
+
+#include <cstdint>
+
+namespace folly { namespace detail {
+
+template<typename T>
+struct Bucket {
+ public:
+ typedef T ValueType;
+
+ Bucket()
+ : sum(ValueType()),
+ count(0) {}
+
+ void clear() {
+ sum = ValueType();
+ count = 0;
+ }
+
+ void add(const ValueType& s, uint64_t c) {
+ // TODO: It would be nice to handle overflow here.
+ sum += s;
+ count += c;
+ }
+
+ Bucket& operator+=(const Bucket& o) {
+ add(o.sum, o.count);
+ return *this;
+ }
+
+ Bucket& operator-=(const Bucket& o) {
+ // TODO: It would be nice to handle overflow here.
+ sum -= o.sum;
+ count -= o.count;
+ return *this;
+ }
+
+ template <typename ReturnType>
+ ReturnType avg() const {
+ return (count ?
+ static_cast<ReturnType>(sum) / count :
+ ReturnType(0));
+ }
+
+ ValueType sum;
+ uint64_t count;
+};
+
+}} // folly::detail
+
+#endif // FOLLY_DETAIL_STATS_H_
--- /dev/null
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
+#define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
+
+#include <glog/logging.h>
+
+namespace folly {
+
+template <typename VT, typename TT>
+BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
+ TimeType duration)
+ : firstTime_(1),
+ latestTime_(0),
+ duration_(duration) {
+ // For tracking all-time data we only use total_, and don't need to bother
+ // with buckets_
+ if (!isAllTime()) {
+ // Round numBuckets down to duration_.count().
+ //
+ // There is no point in having more buckets than our timestamp
+ // granularity: otherwise we would have buckets that could never be used.
+ if (numBuckets > duration_.count()) {
+ numBuckets = duration_.count();
+ }
+
+ buckets_.resize(numBuckets, Bucket());
+ }
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
+ addValueAggregated(now, val, 1);
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
+ const ValueType& val,
+ int64_t times) {
+ addValueAggregated(now, val * times, times);
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
+ const ValueType& sum,
+ int64_t nsamples) {
+ // Make sure time doesn't go backwards
+ now = std::max(now, latestTime_);
+
+ if (isAllTime()) {
+ if (empty()) {
+ firstTime_ = now;
+ }
+ latestTime_ = now;
+ total_.add(sum, nsamples);
+ return;
+ }
+
+ // Update the buckets
+ size_t curBucket = update(now);
+ buckets_[curBucket].add(sum, nsamples);
+
+ // Update the aggregate sum/count
+ total_.add(sum, nsamples);
+}
+
+template <typename VT, typename TT>
+size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
+ if (empty()) {
+ // This is the first data point.
+ firstTime_ = now;
+ }
+
+ // For all-time data, all we need to do is update latestTime_
+ if (isAllTime()) {
+ latestTime_ = std::max(latestTime_, now);
+ return 0;
+ }
+
+ // Make sure time doesn't go backwards.
+ // If the time is less than or equal to the latest time we have already seen,
+ // we don't need to do anything.
+ if (now <= latestTime_) {
+ return getBucketIdx(latestTime_);
+ }
+
+ // We could cache nextBucketStart as a member variable, so we don't have to
+ // recompute it each time update() is called with a new timestamp value.
+ // This makes things faster when update() (or addValue()) is called once
+ // per second, but slightly slower when update() is called multiple times a
+ // second. We care more about optimizing the cases where addValue() is being
+ // called frequently. If addValue() is only being called once every few
+ // seconds, it doesn't matter as much if it is fast.
+
+ // Get info about the bucket that latestTime_ points at
+ size_t currentBucket;
+ TimeType currentBucketStart;
+ TimeType nextBucketStart;
+ getBucketInfo(latestTime_, ¤tBucket,
+ ¤tBucketStart, &nextBucketStart);
+
+ // Update latestTime_
+ latestTime_ = now;
+
+ if (now < nextBucketStart) {
+ // We're still in the same bucket.
+ // We're done after updating latestTime_.
+ return currentBucket;
+ } else if (now >= currentBucketStart + duration_) {
+ // It's been a while. We have wrapped, and all of the buckets need to be
+ // cleared.
+ for (Bucket& bucket : buckets_) {
+ bucket.clear();
+ }
+ total_.clear();
+ return getBucketIdx(latestTime_);
+ } else {
+ // clear all the buckets between the last time and current time, meaning
+ // buckets in the range [(currentBucket+1), newBucket]. Note that
+ // the bucket (currentBucket+1) is always the oldest bucket we have. Since
+ // our array is circular, loop when we reach the end.
+ size_t newBucket = getBucketIdx(now);
+ size_t idx = currentBucket;
+ while (idx != newBucket) {
+ ++idx;
+ if (idx >= buckets_.size()) {
+ idx = 0;
+ }
+ total_ -= buckets_[idx];
+ buckets_[idx].clear();
+ }
+ return newBucket;
+ }
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::clear() {
+ for (Bucket& bucket : buckets_) {
+ bucket.clear();
+ }
+ total_.clear();
+ // Set firstTime_ larger than latestTime_,
+ // to indicate that the timeseries is empty
+ firstTime_ = TimeType(1);
+ latestTime_ = TimeType(0);
+}
+
+
+template <typename VT, typename TT>
+TT BucketedTimeSeries<VT, TT>::elapsed() const {
+ if (empty()) {
+ return TimeType(0);
+ }
+
+ if (isAllTime()) {
+ return latestTime_ - firstTime_ + TimeType(1);
+ }
+
+ size_t currentBucket;
+ TimeType currentBucketStart;
+ TimeType nextBucketStart;
+ getBucketInfo(latestTime_, ¤tBucket,
+ ¤tBucketStart, &nextBucketStart);
+
+ // Subtract 1 duration from the start of the next bucket to find the
+ // earliest possible data point we could be tracking.
+ TimeType earliestTime = nextBucketStart - duration_;
+
+ // We're never tracking data before firstTime_
+ earliestTime = std::max(earliestTime, firstTime_);
+
+ return latestTime_ - earliestTime + TimeType(1);
+}
+
+template <typename VT, typename TT>
+VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
+ ValueType sum = ValueType();
+ forEachBucket(start, end, [&](const Bucket& bucket,
+ TimeType bucketStart,
+ TimeType nextBucketStart) {
+ sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+ bucket.sum);
+ return true;
+ });
+
+ return sum;
+}
+
+template <typename VT, typename TT>
+uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
+ uint64_t count = 0;
+ forEachBucket(start, end, [&](const Bucket& bucket,
+ TimeType bucketStart,
+ TimeType nextBucketStart) {
+ count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+ bucket.count);
+ return true;
+ });
+
+ return count;
+}
+
+template <typename VT, typename TT>
+template <typename ReturnType>
+ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
+ ValueType sum = ValueType();
+ uint64_t count = 0;
+ forEachBucket(start, end, [&](const Bucket& bucket,
+ TimeType bucketStart,
+ TimeType nextBucketStart) {
+ sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+ bucket.sum);
+ count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+ bucket.count);
+ return true;
+ });
+
+ if (count == 0) {
+ return ReturnType(0);
+ }
+
+ return static_cast<ReturnType>(sum) / count;
+}
+
+/*
+ * A note about some of the bucket index calculations below:
+ *
+ * buckets_.size() may not divide evenly into duration_. When this happens,
+ * some buckets will be wider than others. We still want to spread the data
+ * out as evenly as possible among the buckets (as opposed to just making the
+ * last bucket be significantly wider than all of the others).
+ *
+ * To make the division work out, we pretend that the buckets are each
+ * duration_ wide, so that the overall duration becomes
+ * buckets.size() * duration_.
+ *
+ * To transform a real timestamp into the scale used by our buckets,
+ * we have to multiply by buckets_.size(). To figure out which bucket it goes
+ * into, we then divide by duration_.
+ */
+
+template <typename VT, typename TT>
+size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
+ // For all-time data we don't use buckets_. Everything is tracked in total_.
+ DCHECK(!isAllTime());
+
+ time %= duration_;
+ return time.count() * buckets_.size() / duration_.count();
+}
+
+/*
+ * Compute the bucket index for the specified time, as well as the earliest
+ * time that falls into this bucket.
+ */
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::getBucketInfo(
+ TimeType time, size_t *bucketIdx,
+ TimeType* bucketStart, TimeType* nextBucketStart) const {
+ typedef typename TimeType::rep TimeInt;
+ DCHECK(!isAllTime());
+
+ // Keep these two lines together. The compiler should be able to compute
+ // both the division and modulus with a single operation.
+ TimeType timeMod = time % duration_;
+ TimeInt numFullDurations = time / duration_;
+
+ TimeInt scaledTime = timeMod.count() * buckets_.size();
+
+ // Keep these two lines together. The compiler should be able to compute
+ // both the division and modulus with a single operation.
+ *bucketIdx = scaledTime / duration_.count();
+ TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
+
+ TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
+ TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
+
+ TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
+ buckets_.size());
+ TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
+ buckets_.size());
+
+ TimeType durationStart(numFullDurations * duration_.count());
+ *bucketStart = bucketStartMod + durationStart;
+ *nextBucketStart = nextBucketStartMod + durationStart;
+}
+
+template <typename VT, typename TT>
+template <typename Function>
+void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
+ if (isAllTime()) {
+ fn(total_, firstTime_, latestTime_ + TimeType(1));
+ return;
+ }
+
+ typedef typename TimeType::rep TimeInt;
+
+ // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
+ // the same way as in getBucketInfo().
+ TimeType timeMod = latestTime_ % duration_;
+ TimeInt numFullDurations = latestTime_ / duration_;
+ TimeType durationStart(numFullDurations * duration_.count());
+ TimeInt scaledTime = timeMod.count() * buckets_.size();
+ size_t latestBucketIdx = scaledTime / duration_.count();
+ TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
+ TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
+ TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
+
+ // Walk through the buckets, starting one past the current bucket.
+ // The next bucket is from the previous cycle, so subtract 1 duration
+ // from durationStart.
+ size_t idx = latestBucketIdx;
+ durationStart -= duration_;
+
+ TimeType nextBucketStart =
+ TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
+ durationStart;
+ while (true) {
+ ++idx;
+ if (idx >= buckets_.size()) {
+ idx = 0;
+ durationStart += duration_;
+ scaledNextBucketStart = duration_.count();
+ } else {
+ scaledNextBucketStart += duration_.count();
+ }
+
+ TimeType bucketStart = nextBucketStart;
+ nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
+ buckets_.size()) + durationStart;
+
+ // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
+ // For now we go ahead and invoke the function with these buckets.
+ // sum and count should always be 0 in these buckets.
+
+ DCHECK_LE(bucketStart.count(), latestTime_.count());
+ bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
+ if (!ret) {
+ break;
+ }
+
+ if (idx == latestBucketIdx) {
+ // all done
+ break;
+ }
+ }
+}
+
+/*
+ * Adjust the input value from the specified bucket to only account
+ * for the desired range.
+ *
+ * For example, if the bucket spans time [10, 20), but we only care about the
+ * range [10, 16), this will return 60% of the input value.
+ */
+template<typename VT, typename TT>
+VT BucketedTimeSeries<VT, TT>::rangeAdjust(
+ TimeType bucketStart, TimeType nextBucketStart,
+ TimeType start, TimeType end, ValueType input) const {
+ // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
+ // if it were latestTime_. This makes us more accurate when someone is
+ // querying for all of the data up to latestTime_. Even though latestTime_
+ // may only be partially through the bucket, we don't want to adjust
+ // downwards in this case, because the bucket really only has data up to
+ // latestTime_.
+ if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
+ nextBucketStart = latestTime_ + TimeType(1);
+ }
+
+ if (start <= bucketStart && end >= nextBucketStart) {
+ // The bucket is wholly contained in the [start, end) interval
+ return input;
+ }
+
+ TimeType intervalStart = std::max(start, bucketStart);
+ TimeType intervalEnd = std::min(end, nextBucketStart);
+ return input * (intervalEnd - intervalStart) /
+ (nextBucketStart - bucketStart);
+}
+
+template <typename VT, typename TT>
+template <typename Function>
+void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
+ Function fn) const {
+ forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
+ TimeType nextBucketStart) {
+ if (start >= nextBucketStart) {
+ return true;
+ }
+ if (end <= bucketStart) {
+ return false;
+ }
+ bool ret = fn(bucket, bucketStart, nextBucketStart);
+ return ret;
+ });
+}
+
+} // folly
+
+#endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
--- /dev/null
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_STATS_BUCKETEDTIMESERIES_H_
+#define FOLLY_STATS_BUCKETEDTIMESERIES_H_
+
+#include <chrono>
+#include <vector>
+
+#include "folly/detail/Stats.h"
+
+namespace folly {
+
+/*
+ * This class represents a bucketed time series which keeps track of values
+ * added in the recent past, and merges these values together into a fixed
+ * number of buckets to keep a lid on memory use if the number of values
+ * added is very large.
+ *
+ * For example, a BucketedTimeSeries() with duration == 60s and 10 buckets
+ * will keep track of 10 6-second buckets, and discard all data added more
+ * than 1 minute ago. As time ticks by, a 6-second bucket at a time will
+ * be discarded and new data will go into the newly opened bucket. Internally,
+ * it uses a circular array of buckets that it reuses as time advances.
+ *
+ * The class assumes that time advances forward -- you can't retroactively add
+ * values for events in the past -- the 'now' argument is provided for better
+ * efficiency and ease of unittesting.
+ *
+ *
+ * This class is not thread-safe -- use your own synchronization!
+ */
+template <typename VT, typename TT=std::chrono::seconds>
+class BucketedTimeSeries {
+ public:
+ typedef VT ValueType;
+ typedef TT TimeType;
+ typedef detail::Bucket<ValueType> Bucket;
+
+ /*
+ * Create a new BucketedTimeSeries.
+ *
+ * This creates a new BucketedTimeSeries with the specified number of
+ * buckets, storing data for the specified amount of time.
+ *
+ * If the duration is 0, the BucketedTimeSeries will track data forever,
+ * and does not need the rolling buckets. The numBuckets parameter is
+ * ignored when duration is 0.
+ */
+ BucketedTimeSeries(size_t numBuckets, TimeType duration);
+
+ /*
+ * Adds the value 'val' at time 'now'
+ *
+ * This function expects time to always move forwards: it cannot be used to
+ * add historical data points that have occurred in the past. If now is
+ * older than the another timestamp that has already been passed to
+ * addValue() or update(), now will be ignored and the latest timestamp will
+ * be used.
+ */
+ void addValue(TimeType now, const ValueType& val);
+
+ /*
+ * Adds the value 'val' the given number of 'times' at time 'now'
+ */
+ void addValue(TimeType now, const ValueType& val, int64_t times);
+
+ /*
+ * Adds the value 'sum' as the sum of 'nsamples' samples
+ */
+ void addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples);
+
+ /*
+ * Updates the container to the specified time, doing all the necessary
+ * work to rotate the buckets and remove any stale data points.
+ *
+ * The addValue() methods automatically call update() when adding new data
+ * points. However, when reading data from the timeseries, you should make
+ * sure to manually call update() before accessing the data. Otherwise you
+ * may be reading stale data if update() has not been called recently.
+ *
+ * Returns the current bucket index after the update.
+ */
+ size_t update(TimeType now);
+
+ /*
+ * Reset the timeseries to an empty state,
+ * as if no data points have ever been added to it.
+ */
+ void clear();
+
+ /*
+ * Get the latest time that has ever been passed to update() or addValue().
+ */
+ TimeType getLatestTime() const {
+ return latestTime_;
+ }
+
+ /*
+ * Return the number of buckets.
+ */
+ size_t numBuckets() const {
+ return buckets_.size();
+ }
+
+ /*
+ * Return the maximum duration of data that can be tracked by this
+ * BucketedTimeSeries.
+ */
+ TimeType duration() const {
+ return duration_;
+ }
+
+ /*
+ * Returns true if this BucketedTimeSeries stores data for all-time, without
+ * ever rolling over into new buckets.
+ */
+ bool isAllTime() const {
+ return (duration_ == TimeType(0));
+ }
+
+ /*
+ * Returns true if no calls to update() have been made since the last call to
+ * clear().
+ */
+ bool empty() const {
+ // We set firstTime_ greater than latestTime_ in the constructor and in
+ // clear, so we use this to distinguish if the timeseries is empty.
+ //
+ // Once a data point has been added, latestTime_ will always be greater
+ // than or equal to firstTime_.
+ return firstTime_ > latestTime_;
+ }
+
+ /*
+ * Get the amount of time tracked by this timeseries.
+ *
+ * For an all-time timeseries, this returns the length of time since the
+ * first data point was added to the time series.
+ *
+ * Otherwise, this never returns a value greater than the overall timeseries
+ * duration. If the first data point was recorded less than a full duration
+ * ago, the time since the first data point is returned. If a full duration
+ * has elapsed, and we have already thrown away some data, the time since the
+ * oldest bucket is returned.
+ *
+ * For example, say we are tracking 600 seconds worth of data, in 60 buckets.
+ * - If less than 600 seconds have elapsed since the first data point,
+ * elapsed() returns the total elapsed time so far.
+ * - If more than 600 seconds have elapsed, we have already thrown away some
+ * data. However, we throw away a full bucket (10 seconds worth) at once,
+ * so at any point in time we have from 590 to 600 seconds worth of data.
+ * elapsed() will therefore return a value between 590 and 600.
+ *
+ * Note that you generally should call update() before calling elapsed(), to
+ * make sure you are not reading stale data.
+ */
+ TimeType elapsed() const;
+
+ /*
+ * Return the sum of all the data points currently tracked by this
+ * BucketedTimeSeries.
+ *
+ * Note that you generally should call update() before calling sum(), to
+ * make sure you are not reading stale data.
+ */
+ const ValueType& sum() const {
+ return total_.sum;
+ }
+
+ /*
+ * Return the number of data points currently tracked by this
+ * BucketedTimeSeries.
+ *
+ * Note that you generally should call update() before calling count(), to
+ * make sure you are not reading stale data.
+ */
+ uint64_t count() const {
+ return total_.count;
+ }
+
+ /*
+ * Return the average value (sum / count).
+ *
+ * The return type may be specified to control whether floating-point or
+ * integer division should be performed.
+ *
+ * Note that you generally should call update() before calling avg(), to
+ * make sure you are not reading stale data.
+ */
+ template <typename ReturnType=double>
+ ReturnType avg() const {
+ return total_.avg<ReturnType>();
+ }
+
+ /*
+ * Return the sum divided by the elapsed time.
+ *
+ * Note that you generally should call update() before calling rate(), to
+ * make sure you are not reading stale data.
+ */
+ template <typename ReturnType=double, typename Interval=TimeType>
+ ReturnType rate() const {
+ return rateHelper<ReturnType, Interval>(total_.sum, elapsed());
+ }
+
+ /*
+ * Return the count divided by the elapsed time.
+ *
+ * The Interval template parameter causes the elapsed time to be converted to
+ * the Interval type before using it. For example, if Interval is
+ * std::chrono::seconds, the return value will be the count per second.
+ * If Interval is std::chrono::hours, the return value will be the count per
+ * hour.
+ *
+ * Note that you generally should call update() before calling countRate(),
+ * to make sure you are not reading stale data.
+ */
+ template <typename ReturnType=double, typename Interval=TimeType>
+ ReturnType countRate() const {
+ return rateHelper<ReturnType, Interval>(total_.count, elapsed());
+ }
+
+ /*
+ * Estimate the sum of the data points that occurred in the specified time
+ * period.
+ *
+ * The range queried is [start, end).
+ * That is, start is inclusive, and end is exclusive.
+ *
+ * Note that data outside of the timeseries duration will no longer be
+ * available for use in the estimation. Specifying a start time earlier than
+ * (getLatestTime() - elapsed()) will not have much effect, since only data
+ * points after that point in time will be counted.
+ *
+ * Note that the value returned is an estimate, and may not be precise.
+ */
+ ValueType sum(TimeType start, TimeType end) const;
+
+ /*
+ * Estimate the number of data points that occurred in the specified time
+ * period.
+ *
+ * The same caveats documented in the sum(TimeType start, TimeType end)
+ * comments apply here as well.
+ */
+ uint64_t count(TimeType start, TimeType end) const;
+
+ /*
+ * Estimate the average value during the specified time period.
+ *
+ * The same caveats documented in the sum(TimeType start, TimeType end)
+ * comments apply here as well.
+ */
+ template <typename ReturnType=double>
+ ReturnType avg(TimeType start, TimeType end) const;
+
+ /*
+ * Estimate the rate during the specified time period.
+ *
+ * The same caveats documented in the sum(TimeType start, TimeType end)
+ * comments apply here as well.
+ */
+ template <typename ReturnType=double, typename Interval=TimeType>
+ ReturnType rate(TimeType start, TimeType end) const {
+ ValueType intervalSum = sum(start, end);
+ return rateHelper<ReturnType, Interval>(intervalSum, end - start);
+ }
+
+ /*
+ * Estimate the rate of data points being added during the specified time
+ * period.
+ *
+ * The same caveats documented in the sum(TimeType start, TimeType end)
+ * comments apply here as well.
+ */
+ template <typename ReturnType=double, typename Interval=TimeType>
+ ReturnType countRate(TimeType start, TimeType end) const {
+ uint64_t intervalCount = count(start, end);
+ return rateHelper<ReturnType, Interval>(intervalCount, end - start);
+ }
+
+ /*
+ * Invoke a function for each bucket.
+ *
+ * The function will take as arguments the bucket index,
+ * the bucket start time, and the start time of the subsequent bucket.
+ *
+ * It should return true to continue iterating through the buckets, and false
+ * to break out of the loop and stop, without calling the function on any
+ * more buckets.
+ *
+ * bool function(const Bucket& bucket, TimeType bucketStart,
+ * TimeType nextBucketStart)
+ */
+ template <typename Function>
+ void forEachBucket(Function fn) const;
+
+ /*
+ * Get the index for the bucket containing the specified time.
+ *
+ * Note that the index is only valid if this time actually falls within one
+ * of the current buckets. If you pass in a value more recent than
+ * getLatestTime() or older than (getLatestTime() - elapsed()), the index
+ * returned will not be valid.
+ *
+ * This method may not be called for all-time data.
+ */
+ size_t getBucketIdx(TimeType time) const;
+
+ /*
+ * Get the bucket at the specified index.
+ *
+ * This method may not be called for all-time data.
+ */
+ const Bucket& getBucketByIndex(size_t idx) const {
+ return buckets_[idx];
+ }
+
+ /*
+ * Compute the bucket index that the specified time falls into,
+ * as well as the bucket start time and the next bucket's start time.
+ *
+ * This method may not be called for all-time data.
+ */
+ void getBucketInfo(TimeType time, size_t* bucketIdx,
+ TimeType* bucketStart, TimeType* nextBucketStart) const;
+
+ private:
+ template <typename ReturnType=double, typename Interval=TimeType>
+ ReturnType rateHelper(ReturnType numerator, TimeType elapsed) const {
+ if (elapsed == TimeType(0)) {
+ return 0;
+ }
+
+ // Use std::chrono::duration_cast to convert between the native
+ // duration and the desired interval. However, convert the rates,
+ // rather than just converting the elapsed duration. Converting the
+ // elapsed time first may collapse it down to 0 if the elapsed interval
+ // is less than the desired interval, which will incorrectly result in
+ // an infinite rate.
+ typedef std::chrono::duration<
+ ReturnType, std::ratio<TimeType::period::den,
+ TimeType::period::num>> NativeRate;
+ typedef std::chrono::duration<
+ ReturnType, std::ratio<Interval::period::den,
+ Interval::period::num>> DesiredRate;
+
+ NativeRate native(numerator / elapsed.count());
+ DesiredRate desired = std::chrono::duration_cast<DesiredRate>(native);
+ return desired.count();
+ }
+
+ ValueType rangeAdjust(TimeType bucketStart, TimeType nextBucketStart,
+ TimeType start, TimeType end,
+ ValueType input) const;
+
+ template <typename Function>
+ void forEachBucket(TimeType start, TimeType end, Function fn) const;
+
+ TimeType firstTime_; // time of first update() since clear()/constructor
+ TimeType latestTime_; // time of last update()
+ TimeType duration_; // total duration ("window length") of the time series
+
+ Bucket total_; // sum and count of everything in time series
+ std::vector<Bucket> buckets_; // actual buckets of values
+};
+
+} // folly
+
+#endif // FOLLY_STATS_BUCKETEDTIMESERIES_H_
--- /dev/null
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "folly/stats/BucketedTimeSeries.h"
+#include "folly/stats/BucketedTimeSeries-defs.h"
+
+#include <glog/logging.h>
+
+#include "folly/Benchmark.h"
+
+using std::chrono::seconds;
+using folly::BenchmarkSuspender;
+using folly::BucketedTimeSeries;
+
+void addValue(unsigned int iters,
+ seconds duration, size_t numBuckets,
+ size_t callsPerSecond) {
+ BenchmarkSuspender suspend;
+ BucketedTimeSeries<int64_t> ts(numBuckets, duration);
+ suspend.dismiss();
+
+ seconds currentTime(1342000000);
+ size_t timeCounter = 0;
+ for (unsigned int n = 0; n < iters; ++n, ++timeCounter) {
+ if (timeCounter >= callsPerSecond) {
+ timeCounter = 0;
+ ++currentTime;
+ }
+ ts.addValue(currentTime, n);
+ }
+}
+
+BENCHMARK_NAMED_PARAM(addValue, AllTime_1perSec, seconds(0), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 3600x60_1perSec, seconds(3600), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 600x60_1perSec, seconds(600), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 60x60_1perSec, seconds(60), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 100x10_1perSec, seconds(100), 10, 1);
+BENCHMARK_NAMED_PARAM(addValue, 71x5_1perSec, seconds(71), 5, 1);
+BENCHMARK_NAMED_PARAM(addValue, 1x1_1perSec, seconds(1), 1, 1);
+
+BENCHMARK_DRAW_LINE()
+
+BENCHMARK_NAMED_PARAM(addValue, AllTime_10perSec, seconds(0), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 3600x60_10perSec, seconds(3600), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 600x60_10perSec, seconds(600), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 60x60_10perSec, seconds(60), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 100x10_10perSec, seconds(100), 10, 10);
+BENCHMARK_NAMED_PARAM(addValue, 71x5_10perSec, seconds(71), 5, 10);
+BENCHMARK_NAMED_PARAM(addValue, 1x1_10perSec, seconds(1), 1, 10);
+
+BENCHMARK_DRAW_LINE()
+
+BENCHMARK_NAMED_PARAM(addValue, AllTime_100perSec, seconds(0), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 3600x60_100perSec, seconds(3600), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 600x60_100perSec, seconds(600), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 60x60_100perSec, seconds(60), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 100x10_100perSec, seconds(100), 10, 100);
+BENCHMARK_NAMED_PARAM(addValue, 71x5_100perSec, seconds(71), 5, 100);
+BENCHMARK_NAMED_PARAM(addValue, 1x1_100perSec, seconds(1), 1, 100);
+
+int main(int argc, char *argv[]) {
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ folly::runBenchmarks();
+ return 0;
+}
--- /dev/null
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "folly/stats/BucketedTimeSeries.h"
+#include "folly/stats/BucketedTimeSeries-defs.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+using std::chrono::seconds;
+using std::string;
+using std::vector;
+using folly::BucketedTimeSeries;
+
+struct TestData {
+ size_t duration;
+ size_t numBuckets;
+ vector<ssize_t> bucketStarts;
+};
+vector<TestData> testData = {
+ // 71 seconds x 4 buckets
+ { 71, 4, {0, 18, 36, 54}},
+ // 100 seconds x 10 buckets
+ { 100, 10, {0, 10, 20, 30, 40, 50, 60, 70, 80, 90}},
+ // 10 seconds x 10 buckets
+ { 10, 10, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}},
+ // 10 seconds x 1 buckets
+ { 10, 1, {0}},
+ // 1 second x 1 buckets
+ { 1, 1, {0}},
+};
+
+TEST(BucketedTimeSeries, getBucketInfo) {
+ for (const auto& data : testData) {
+ BucketedTimeSeries<int64_t> ts(data.numBuckets, seconds(data.duration));
+
+ for (uint32_t n = 0; n < 10000; n += 1234) {
+ seconds offset(n * data.duration);
+
+ for (uint32_t idx = 0; idx < data.numBuckets; ++idx) {
+ seconds bucketStart(data.bucketStarts[idx]);
+ seconds nextBucketStart;
+ if (idx + 1 < data.numBuckets) {
+ nextBucketStart = seconds(data.bucketStarts[idx + 1]);
+ } else {
+ nextBucketStart = seconds(data.duration);
+ }
+
+ seconds expectedStart = offset + bucketStart;
+ seconds expectedNextStart = offset + nextBucketStart;
+ seconds midpoint = (expectedStart + expectedNextStart) / 2;
+
+ vector<std::pair<string, seconds>> timePoints = {
+ {"expectedStart", expectedStart},
+ {"midpoint", midpoint},
+ {"expectedEnd", expectedNextStart - seconds(1)},
+ };
+
+ for (const auto& point : timePoints) {
+ // Check that getBucketIdx() returns the expected index
+ EXPECT_EQ(idx, ts.getBucketIdx(point.second)) <<
+ data.duration << "x" << data.numBuckets << ": " <<
+ point.first << "=" << point.second.count();
+
+ // Check the data returned by getBucketInfo()
+ size_t returnedIdx;
+ seconds returnedStart;
+ seconds returnedNextStart;
+ ts.getBucketInfo(expectedStart, &returnedIdx,
+ &returnedStart, &returnedNextStart);
+ EXPECT_EQ(idx, returnedIdx) <<
+ data.duration << "x" << data.numBuckets << ": " <<
+ point.first << "=" << point.second.count();
+ EXPECT_EQ(expectedStart.count(), returnedStart.count()) <<
+ data.duration << "x" << data.numBuckets << ": " <<
+ point.first << "=" << point.second.count();
+ EXPECT_EQ(expectedNextStart.count(), returnedNextStart.count()) <<
+ data.duration << "x" << data.numBuckets << ": " <<
+ point.first << "=" << point.second.count();
+ }
+ }
+ }
+ }
+}
+
+void testUpdate100x10(size_t offset) {
+ // This test code only works when offset is a multiple of the bucket width
+ CHECK_EQ(0, offset % 10);
+
+ // Create a 100 second timeseries, with 10 buckets
+ BucketedTimeSeries<int64_t> ts(10, seconds(100));
+
+ auto setup = [&] {
+ ts.clear();
+ // Add 1 value to each bucket
+ for (int n = 5; n <= 95; n += 10) {
+ ts.addValue(seconds(n + offset), 6);
+ }
+
+ EXPECT_EQ(10, ts.count());
+ EXPECT_EQ(60, ts.sum());
+ EXPECT_EQ(6, ts.avg());
+ };
+
+ // Update 2 buckets forwards. This should throw away 2 data points.
+ setup();
+ ts.update(seconds(110 + offset));
+ EXPECT_EQ(8, ts.count());
+ EXPECT_EQ(48, ts.sum());
+ EXPECT_EQ(6, ts.avg());
+
+ // The last time we added was 95.
+ // Try updating to 189. This should clear everything but the last bucket.
+ setup();
+ ts.update(seconds(151 + offset));
+ EXPECT_EQ(4, ts.count());
+ //EXPECT_EQ(6, ts.sum());
+ EXPECT_EQ(6, ts.avg());
+
+ // The last time we added was 95.
+ // Try updating to 193: This is nearly one full loop around,
+ // back to the same bucket. update() needs to clear everything
+ setup();
+ ts.update(seconds(193 + offset));
+ EXPECT_EQ(0, ts.count());
+ EXPECT_EQ(0, ts.sum());
+ EXPECT_EQ(0, ts.avg());
+
+ // The last time we added was 95.
+ // Try updating to 197: This is slightly over one full loop around,
+ // back to the same bucket. update() needs to clear everything
+ setup();
+ ts.update(seconds(197 + offset));
+ EXPECT_EQ(0, ts.count());
+ EXPECT_EQ(0, ts.sum());
+ EXPECT_EQ(0, ts.avg());
+
+ // The last time we added was 95.
+ // Try updating to 230: This is well over one full loop around,
+ // and everything should be cleared.
+ setup();
+ ts.update(seconds(230 + offset));
+ EXPECT_EQ(0, ts.count());
+ EXPECT_EQ(0, ts.sum());
+ EXPECT_EQ(0, ts.avg());
+}
+
+TEST(BucketedTimeSeries, update100x10) {
+ // Run testUpdate100x10() multiple times, with various offsets.
+ // This makes sure the update code works regardless of which bucket it starts
+ // at in the modulo arithmetic.
+ testUpdate100x10(0);
+ testUpdate100x10(50);
+ testUpdate100x10(370);
+ testUpdate100x10(1937090);
+}
+
+TEST(BucketedTimeSeries, update71x5) {
+ // Create a 71 second timeseries, with 5 buckets
+ // This tests when the number of buckets does not divide evenly into the
+ // duration.
+ BucketedTimeSeries<int64_t> ts(5, seconds(71));
+
+ auto setup = [&] {
+ ts.clear();
+ // Add 1 value to each bucket
+ ts.addValue(seconds(11), 6);
+ ts.addValue(seconds(24), 6);
+ ts.addValue(seconds(42), 6);
+ ts.addValue(seconds(43), 6);
+ ts.addValue(seconds(66), 6);
+
+ EXPECT_EQ(5, ts.count());
+ EXPECT_EQ(30, ts.sum());
+ EXPECT_EQ(6, ts.avg());
+ };
+
+ // Update 2 buckets forwards. This should throw away 2 data points.
+ setup();
+ ts.update(seconds(99));
+ EXPECT_EQ(3, ts.count());
+ EXPECT_EQ(18, ts.sum());
+ EXPECT_EQ(6, ts.avg());
+
+ // Update 3 buckets forwards. This should throw away 3 data points.
+ setup();
+ ts.update(seconds(100));
+ EXPECT_EQ(2, ts.count());
+ EXPECT_EQ(12, ts.sum());
+ EXPECT_EQ(6, ts.avg());
+
+ // Update 4 buckets forwards, just under the wrap limit.
+ // This should throw everything but the last bucket away.
+ setup();
+ ts.update(seconds(127));
+ EXPECT_EQ(1, ts.count());
+ EXPECT_EQ(6, ts.sum());
+ EXPECT_EQ(6, ts.avg());
+
+ // Update 5 buckets forwards, exactly at the wrap limit.
+ // This should throw everything away.
+ setup();
+ ts.update(seconds(128));
+ EXPECT_EQ(0, ts.count());
+ EXPECT_EQ(0, ts.sum());
+ EXPECT_EQ(0, ts.avg());
+
+ // Update very far forwards, wrapping multiple times.
+ // This should throw everything away.
+ setup();
+ ts.update(seconds(1234));
+ EXPECT_EQ(0, ts.count());
+ EXPECT_EQ(0, ts.sum());
+ EXPECT_EQ(0, ts.avg());
+}
+
+TEST(BucketedTimeSeries, elapsed) {
+ BucketedTimeSeries<int64_t> ts(60, seconds(600));
+
+ // elapsed() is 0 when no data points have been added
+ EXPECT_EQ(0, ts.elapsed().count());
+
+ // With exactly 1 data point, elapsed() should report 1 second of data
+ seconds start(239218);
+ ts.addValue(start + seconds(0), 200);
+ EXPECT_EQ(1, ts.elapsed().count());
+ // Adding a data point 10 seconds later should result in an elapsed time of
+ // 11 seconds (the time range is [0, 10], inclusive).
+ ts.addValue(start + seconds(10), 200);
+ EXPECT_EQ(11, ts.elapsed().count());
+
+ // elapsed() returns to 0 after clear()
+ ts.clear();
+ EXPECT_EQ(0, ts.elapsed().count());
+
+ // Restart, with the starting point on an easier number to work with
+ ts.addValue(seconds(10), 200);
+ EXPECT_EQ(1, ts.elapsed().count());
+ ts.addValue(seconds(580), 200);
+ EXPECT_EQ(571, ts.elapsed().count());
+ ts.addValue(seconds(590), 200);
+ EXPECT_EQ(581, ts.elapsed().count());
+ ts.addValue(seconds(598), 200);
+ EXPECT_EQ(589, ts.elapsed().count());
+ ts.addValue(seconds(599), 200);
+ EXPECT_EQ(590, ts.elapsed().count());
+ ts.addValue(seconds(600), 200);
+ EXPECT_EQ(591, ts.elapsed().count());
+ ts.addValue(seconds(608), 200);
+ EXPECT_EQ(599, ts.elapsed().count());
+ ts.addValue(seconds(609), 200);
+ EXPECT_EQ(600, ts.elapsed().count());
+ // Once we reach 600 seconds worth of data, when we move on to the next
+ // second a full bucket will get thrown out. Now we drop back down to 591
+ // seconds worth of data
+ ts.addValue(seconds(610), 200);
+ EXPECT_EQ(591, ts.elapsed().count());
+ ts.addValue(seconds(618), 200);
+ EXPECT_EQ(599, ts.elapsed().count());
+ ts.addValue(seconds(619), 200);
+ EXPECT_EQ(600, ts.elapsed().count());
+ ts.addValue(seconds(620), 200);
+ EXPECT_EQ(591, ts.elapsed().count());
+ ts.addValue(seconds(123419), 200);
+ EXPECT_EQ(600, ts.elapsed().count());
+ ts.addValue(seconds(123420), 200);
+ EXPECT_EQ(591, ts.elapsed().count());
+ ts.addValue(seconds(123425), 200);
+ EXPECT_EQ(596, ts.elapsed().count());
+
+ // Time never moves backwards.
+ // Calling update with an old timestamp will just be ignored.
+ ts.update(seconds(29));
+ EXPECT_EQ(596, ts.elapsed().count());
+}
+
+TEST(BucketedTimeSeries, rate) {
+ BucketedTimeSeries<int64_t> ts(60, seconds(600));
+
+ // Add 3 values every 2 seconds, until fill up the buckets
+ for (size_t n = 0; n < 600; n += 2) {
+ ts.addValue(seconds(n), 200, 3);
+ }
+
+ EXPECT_EQ(900, ts.count());
+ EXPECT_EQ(180000, ts.sum());
+ EXPECT_EQ(200, ts.avg());
+
+ // Really we only entered 599 seconds worth of data: [0, 598] (inclusive)
+ EXPECT_EQ(599, ts.elapsed().count());
+ EXPECT_NEAR(300.5, ts.rate(), 0.005);
+ EXPECT_NEAR(1.5, ts.countRate(), 0.005);
+
+ // If we add 1 more second, now we will have 600 seconds worth of data
+ ts.update(seconds(599));
+ EXPECT_EQ(600, ts.elapsed().count());
+ EXPECT_NEAR(300, ts.rate(), 0.005);
+ EXPECT_EQ(300, ts.rate<int>());
+ EXPECT_NEAR(1.5, ts.countRate(), 0.005);
+
+ // However, 1 more second after that and we will have filled up all the
+ // buckets, and have to drop one.
+ ts.update(seconds(600));
+ EXPECT_EQ(591, ts.elapsed().count());
+ EXPECT_NEAR(299.5, ts.rate(), 0.01);
+ EXPECT_EQ(299, ts.rate<int>());
+ EXPECT_NEAR(1.5, ts.countRate(), 0.005);
+}
+
+TEST(BucketedTimeSeries, forEachBucket) {
+ typedef BucketedTimeSeries<int64_t>::Bucket Bucket;
+ struct BucketInfo {
+ BucketInfo(const Bucket* b, seconds s, seconds ns)
+ : bucket(b), start(s), nextStart(ns) {}
+
+ const Bucket* bucket;
+ seconds start;
+ seconds nextStart;
+ };
+
+ for (const auto& data : testData) {
+ BucketedTimeSeries<int64_t> ts(data.numBuckets, seconds(data.duration));
+
+ vector<BucketInfo> info;
+ auto fn = [&](const Bucket& bucket, seconds bucketStart,
+ seconds bucketEnd) {
+ info.emplace_back(&bucket, bucketStart, bucketEnd);
+ return true;
+ };
+
+ // If we haven't yet added any data, the current bucket will start at 0,
+ // and all data previous buckets will have negative times.
+ ts.forEachBucket(fn);
+
+ CHECK_EQ(data.numBuckets, info.size());
+
+ // Check the data passed in to the function
+ size_t infoIdx = 0;
+ size_t bucketIdx = 1;
+ ssize_t offset = -data.duration;
+ for (size_t n = 0; n < data.numBuckets; ++n) {
+ if (bucketIdx >= data.numBuckets) {
+ bucketIdx = 0;
+ offset += data.duration;
+ }
+
+ EXPECT_EQ(data.bucketStarts[bucketIdx] + offset,
+ info[infoIdx].start.count()) <<
+ data.duration << "x" << data.numBuckets << ": bucketIdx=" <<
+ bucketIdx << ", infoIdx=" << infoIdx;
+
+ size_t nextBucketIdx = bucketIdx + 1;
+ ssize_t nextOffset = offset;
+ if (nextBucketIdx >= data.numBuckets) {
+ nextBucketIdx = 0;
+ nextOffset += data.duration;
+ }
+ EXPECT_EQ(data.bucketStarts[nextBucketIdx] + nextOffset,
+ info[infoIdx].nextStart.count()) <<
+ data.duration << "x" << data.numBuckets << ": bucketIdx=" <<
+ bucketIdx << ", infoIdx=" << infoIdx;
+
+ EXPECT_EQ(&ts.getBucketByIndex(bucketIdx), info[infoIdx].bucket);
+
+ ++bucketIdx;
+ ++infoIdx;
+ }
+ }
+}
+
+TEST(BucketedTimeSeries, queryByIntervalSimple) {
+ BucketedTimeSeries<int> a(3, seconds(12));
+ for (int i = 0; i < 8; i++) {
+ a.addValue(seconds(i), 1);
+ }
+ // We added 1 at each second from 0..7
+ // Query from the time period 0..2.
+ // This is entirely in the first bucket, which has a sum of 4.
+ // The code knows only part of the bucket is covered, and correctly
+ // estimates the desired sum as 3.
+ EXPECT_EQ(2, a.sum(seconds(0), seconds(2)));
+}
+
+TEST(BucketedTimeSeries, queryByInterval) {
+ // Set up a BucketedTimeSeries tracking 6 seconds in 3 buckets
+ const int kNumBuckets = 3;
+ const int kDuration = 6;
+ BucketedTimeSeries<double> b(kNumBuckets, seconds(kDuration));
+
+ for (unsigned int i = 0; i < kDuration; ++i) {
+ // add value 'i' at time 'i'
+ b.addValue(seconds(i), i);
+ }
+
+ // Current bucket state:
+ // 0: time=[0, 2): values=(0, 1), sum=1, count=2
+ // 1: time=[2, 4): values=(2, 3), sum=5, count=1
+ // 2: time=[4, 6): values=(4, 5), sum=9, count=2
+ double expectedSums1[kDuration + 1][kDuration + 1] = {
+ {0, 4.5, 9, 11.5, 14, 14.5, 15},
+ {0, 4.5, 7, 9.5, 10, 10.5, -1},
+ {0, 2.5, 5, 5.5, 6, -1, -1},
+ {0, 2.5, 3, 3.5, -1, -1, -1},
+ {0, 0.5, 1, -1, -1, -1, -1},
+ {0, 0.5, -1, -1, -1, -1, -1},
+ {0, -1, -1, -1, -1, -1, -1}
+ };
+ int expectedCounts1[kDuration + 1][kDuration + 1] = {
+ {0, 1, 2, 3, 4, 5, 6},
+ {0, 1, 2, 3, 4, 5, -1},
+ {0, 1, 2, 3, 4, -1, -1},
+ {0, 1, 2, 3, -1, -1, -1},
+ {0, 1, 2, -1, -1, -1, -1},
+ {0, 1, -1, -1, -1, -1, -1},
+ {0, -1, -1, -1, -1, -1, -1}
+ };
+
+ seconds currentTime = b.getLatestTime() + seconds(1);
+ for (int i = 0; i <= kDuration + 1; i++) {
+ for (int j = 0; j <= kDuration - i; j++) {
+ seconds start = currentTime - seconds(i + j);
+ seconds end = currentTime - seconds(i);
+ double expectedSum = expectedSums1[i][j];
+ EXPECT_EQ(expectedSum, b.sum(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+
+ uint64_t expectedCount = expectedCounts1[i][j];
+ EXPECT_EQ(expectedCount, b.count(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+
+ double expectedAvg = expectedCount ? expectedSum / expectedCount : 0;
+ EXPECT_EQ(expectedAvg, b.avg(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+
+ double expectedRate = j ? expectedSum / j : 0;
+ EXPECT_EQ(expectedRate, b.rate(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+ }
+ }
+
+ // Add 3 more values.
+ // This will overwrite 1 full bucket, and put us halfway through the next.
+ for (unsigned int i = kDuration; i < kDuration + 3; ++i) {
+ b.addValue(seconds(i), i);
+ }
+
+ // Current bucket state:
+ // 0: time=[6, 8): values=(6, 7), sum=13, count=2
+ // 1: time=[8, 10): values=(8), sum=8, count=1
+ // 2: time=[4, 6): values=(4, 5), sum=9, count=2
+ double expectedSums2[kDuration + 1][kDuration + 1] = {
+ {0, 8, 14.5, 21, 25.5, 30, 30},
+ {0, 6.5, 13, 17.5, 22, 22, -1},
+ {0, 6.5, 11, 15.5, 15.5, -1, -1},
+ {0, 4.5, 9, 9, -1, -1, -1},
+ {0, 4.5, 4.5, -1, -1, -1, -1},
+ {0, 0, -1, -1, -1, -1, -1},
+ {0, -1, -1, -1, -1, -1, -1}
+ };
+ int expectedCounts2[kDuration + 1][kDuration + 1] = {
+ {0, 1, 2, 3, 4, 5, 5},
+ {0, 1, 2, 3, 4, 4, -1},
+ {0, 1, 2, 3, 3, -1, -1},
+ {0, 1, 2, 2, -1, -1, -1},
+ {0, 1, 1, -1, -1, -1, -1},
+ {0, 0, -1, -1, -1, -1, -1},
+ {0, -1, -1, -1, -1, -1, -1}
+ };
+
+ currentTime = b.getLatestTime() + seconds(1);
+ for (int i = 0; i <= kDuration + 1; i++) {
+ for (int j = 0; j <= kDuration - i; j++) {
+ seconds start = currentTime - seconds(i + j);
+ seconds end = currentTime - seconds(i);
+ double expectedSum = expectedSums2[i][j];
+ EXPECT_EQ(expectedSum, b.sum(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+
+ uint64_t expectedCount = expectedCounts2[i][j];
+ EXPECT_EQ(expectedCount, b.count(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+
+ double expectedAvg = expectedCount ? expectedSum / expectedCount : 0;
+ EXPECT_EQ(expectedAvg, b.avg(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+
+ double expectedRate = j ? expectedSum / j : 0;
+ EXPECT_EQ(expectedRate, b.rate(start, end)) <<
+ "i=" << i << ", j=" << j <<
+ ", interval=[" << start.count() << ", " << end.count() << ")";
+ }
+ }
+}