Add BucketedTimeSeries to folly
authorAdam Simpkins <simpkins@fb.com>
Sun, 22 Jul 2012 05:40:46 +0000 (22:40 -0700)
committerTudor Bosman <tudorb@fb.com>
Sun, 26 Aug 2012 18:13:47 +0000 (11:13 -0700)
Summary:
Add the BucketedTimeSeries class to folly.  This tracks time series
counter data, using a circular buffer of buckets to expire old data
points as time progresses.

It supports querying the overall sum, count, average, and rate for the
duration of the time series, as well as estimating these values for
portions of the overall duration.

Test Plan: Unit tests included.

Reviewed By: andrei.alexandrescu@fb.com

FB internal diff: D527040

folly/Benchmark.h
folly/Histogram.h
folly/detail/Stats.h [new file with mode: 0644]
folly/stats/BucketedTimeSeries-defs.h [new file with mode: 0644]
folly/stats/BucketedTimeSeries.h [new file with mode: 0644]
folly/test/TimeseriesBenchmark.cpp [new file with mode: 0644]
folly/test/TimeseriesTest.cpp [new file with mode: 0644]

index d5bd4cca9e67b8d1b65c390fbb4cfeafd797976c..444ad81d216a87325911d1d66121f2c5ce7c17d9 100644 (file)
@@ -295,12 +295,38 @@ void doNotOptimizeAway(T&& datum) {
  * 1000000 for initialSize, and the iteration count for n.
  */
 #define BENCHMARK_PARAM(name, param)                                    \
+  BENCHMARK_NAMED_PARAM(name, param, param)
+
+/*
+ * Like BENCHMARK_PARAM(), but allows a custom name to be specified for each
+ * parameter, rather than using the parameter value.
+ *
+ * Useful when the parameter value is not a valid token for string pasting,
+ * of when you want to specify multiple parameter arguments.
+ *
+ * For example:
+ *
+ * void addValue(uint n, int64_t bucketSize, int64_t min, int64_t max) {
+ *   Histogram<int64_t> hist(bucketSize, min, max);
+ *   int64_t num = min;
+ *   FOR_EACH_RANGE (i, 0, n) {
+ *     hist.addValue(num);
+ *     ++num;
+ *     if (num > max) { num = min; }
+ *   }
+ * }
+ *
+ * BENCHMARK_NAMED_PARAM(addValue, 0_to_100, 1, 0, 100)
+ * BENCHMARK_NAMED_PARAM(addValue, 0_to_1000, 10, 0, 1000)
+ * BENCHMARK_NAMED_PARAM(addValue, 5k_to_20k, 250, 5000, 20000)
+ */
+#define BENCHMARK_NAMED_PARAM(name, param_name, ...)                    \
   BENCHMARK_IMPL(                                                       \
-      FB_CONCATENATE(name, FB_CONCATENATE(_, param)),                   \
-      FB_STRINGIZE(name) "(" FB_STRINGIZE(param) ")",                   \
+      FB_CONCATENATE(name, FB_CONCATENATE(_, param_name)),              \
+      FB_STRINGIZE(name) "(" FB_STRINGIZE(param_name) ")",              \
       unsigned,                                                         \
       iters) {                                                          \
-    name(iters, param);                                                 \
+    name(iters, ## __VA_ARGS__);                                        \
   }
 
 /**
@@ -338,12 +364,18 @@ void doNotOptimizeAway(T&& datum) {
  * A combination of BENCHMARK_RELATIVE and BENCHMARK_PARAM.
  */
 #define BENCHMARK_RELATIVE_PARAM(name, param)                           \
+  BENCHMARK_RELATIVE_NAMED_PARAM(name, param, param)
+
+/**
+ * A combination of BENCHMARK_RELATIVE and BENCHMARK_NAMED_PARAM.
+ */
+#define BENCHMARK_RELATIVE_NAMED_PARAM(name, param_name, ...)           \
   BENCHMARK_IMPL(                                                       \
-      FB_CONCATENATE(name, FB_CONCATENATE(_, param)),                   \
-      "%" FB_STRINGIZE(name) "(" FB_STRINGIZE(param) ")",               \
+      FB_CONCATENATE(name, FB_CONCATENATE(_, param_name)),              \
+      "%" FB_STRINGIZE(name) "(" FB_STRINGIZE(param_name) ")",          \
       unsigned,                                                         \
       iters) {                                                          \
-    name(iters, param);                                                 \
+    name(iters, ## __VA_ARGS__);                                        \
   }
 
 /**
index f25fcbb0b00d3b186c83b2efacd774e376315c00..302739944487baf43c288e4200ccae1dd879e6c2 100644 (file)
 #define FOLLY_HISTOGRAM_H_
 
 #include <cstddef>
-#include <cstdint>
 #include <limits>
 #include <string>
 #include <vector>
 #include <stdexcept>
 
+#include "folly/detail/Stats.h"
+
 namespace folly {
 
 namespace detail {
@@ -223,36 +224,7 @@ template <typename T>
 class Histogram {
  public:
   typedef T ValueType;
-
-  struct Bucket {
-    Bucket()
-      : sum(0),
-        count(0) {}
-
-    void clear() {
-      sum = 0;
-      count = 0;
-    }
-
-    Bucket& merge(const Bucket &bucket) {
-      if (this != &bucket) {
-        sum += bucket.sum;
-        count += bucket.count;
-      }
-      return *this;
-    }
-
-    Bucket& operator=(const Bucket& bucket) {
-      if (this != &bucket) {
-        sum = bucket.sum;
-        count = bucket.count;
-      }
-      return *this;
-    }
-
-    ValueType sum;
-    uint64_t count;
-  };
+  typedef detail::Bucket<T> Bucket;
 
   Histogram(ValueType bucketSize, ValueType min, ValueType max)
     : buckets_(bucketSize, min, max, Bucket()) {}
@@ -298,7 +270,7 @@ class Histogram {
     }
 
     for (int i = 0; i < buckets_.getNumBuckets(); i++) {
-      buckets_.getByIndex(i).merge(hist.buckets_.getByIndex(i));
+      buckets_.getByIndex(i) += hist.buckets_.getByIndex(i);
     }
   }
 
diff --git a/folly/detail/Stats.h b/folly/detail/Stats.h
new file mode 100644 (file)
index 0000000..1b74aa4
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_DETAIL_STATS_H_
+#define FOLLY_DETAIL_STATS_H_
+
+#include <cstdint>
+
+namespace folly { namespace detail {
+
+template<typename T>
+struct Bucket {
+ public:
+  typedef T ValueType;
+
+  Bucket()
+    : sum(ValueType()),
+      count(0) {}
+
+  void clear() {
+    sum = ValueType();
+    count = 0;
+  }
+
+  void add(const ValueType& s, uint64_t c) {
+    // TODO: It would be nice to handle overflow here.
+    sum += s;
+    count += c;
+  }
+
+  Bucket& operator+=(const Bucket& o) {
+    add(o.sum, o.count);
+    return *this;
+  }
+
+  Bucket& operator-=(const Bucket& o) {
+    // TODO: It would be nice to handle overflow here.
+    sum -= o.sum;
+    count -= o.count;
+    return *this;
+  }
+
+  template <typename ReturnType>
+  ReturnType avg() const {
+    return (count ?
+            static_cast<ReturnType>(sum) / count :
+            ReturnType(0));
+  }
+
+  ValueType sum;
+  uint64_t count;
+};
+
+}} // folly::detail
+
+#endif // FOLLY_DETAIL_STATS_H_
diff --git a/folly/stats/BucketedTimeSeries-defs.h b/folly/stats/BucketedTimeSeries-defs.h
new file mode 100644 (file)
index 0000000..e65b197
--- /dev/null
@@ -0,0 +1,413 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
+#define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
+
+#include <glog/logging.h>
+
+namespace folly {
+
+template <typename VT, typename TT>
+BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
+                                               TimeType duration)
+  : firstTime_(1),
+    latestTime_(0),
+    duration_(duration) {
+  // For tracking all-time data we only use total_, and don't need to bother
+  // with buckets_
+  if (!isAllTime()) {
+    // Round numBuckets down to duration_.count().
+    //
+    // There is no point in having more buckets than our timestamp
+    // granularity: otherwise we would have buckets that could never be used.
+    if (numBuckets > duration_.count()) {
+      numBuckets = duration_.count();
+    }
+
+    buckets_.resize(numBuckets, Bucket());
+  }
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
+  addValueAggregated(now, val, 1);
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
+                                          const ValueType& val,
+                                          int64_t times) {
+  addValueAggregated(now, val * times, times);
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
+                                                    const ValueType& sum,
+                                                    int64_t nsamples) {
+  // Make sure time doesn't go backwards
+  now = std::max(now, latestTime_);
+
+  if (isAllTime()) {
+    if (empty()) {
+      firstTime_ = now;
+    }
+    latestTime_ = now;
+    total_.add(sum, nsamples);
+    return;
+  }
+
+  // Update the buckets
+  size_t curBucket = update(now);
+  buckets_[curBucket].add(sum, nsamples);
+
+  // Update the aggregate sum/count
+  total_.add(sum, nsamples);
+}
+
+template <typename VT, typename TT>
+size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
+  if (empty()) {
+    // This is the first data point.
+    firstTime_ = now;
+  }
+
+  // For all-time data, all we need to do is update latestTime_
+  if (isAllTime()) {
+    latestTime_ = std::max(latestTime_, now);
+    return 0;
+  }
+
+  // Make sure time doesn't go backwards.
+  // If the time is less than or equal to the latest time we have already seen,
+  // we don't need to do anything.
+  if (now <= latestTime_) {
+    return getBucketIdx(latestTime_);
+  }
+
+  // We could cache nextBucketStart as a member variable, so we don't have to
+  // recompute it each time update() is called with a new timestamp value.
+  // This makes things faster when update() (or addValue()) is called once
+  // per second, but slightly slower when update() is called multiple times a
+  // second.  We care more about optimizing the cases where addValue() is being
+  // called frequently.  If addValue() is only being called once every few
+  // seconds, it doesn't matter as much if it is fast.
+
+  // Get info about the bucket that latestTime_ points at
+  size_t currentBucket;
+  TimeType currentBucketStart;
+  TimeType nextBucketStart;
+  getBucketInfo(latestTime_, &currentBucket,
+                &currentBucketStart, &nextBucketStart);
+
+  // Update latestTime_
+  latestTime_ = now;
+
+  if (now < nextBucketStart) {
+    // We're still in the same bucket.
+    // We're done after updating latestTime_.
+    return currentBucket;
+  } else if (now >= currentBucketStart + duration_) {
+    // It's been a while.  We have wrapped, and all of the buckets need to be
+    // cleared.
+    for (Bucket& bucket : buckets_) {
+      bucket.clear();
+    }
+    total_.clear();
+    return getBucketIdx(latestTime_);
+  } else {
+    // clear all the buckets between the last time and current time, meaning
+    // buckets in the range [(currentBucket+1), newBucket]. Note that
+    // the bucket (currentBucket+1) is always the oldest bucket we have. Since
+    // our array is circular, loop when we reach the end.
+    size_t newBucket = getBucketIdx(now);
+    size_t idx = currentBucket;
+    while (idx != newBucket) {
+      ++idx;
+      if (idx >= buckets_.size()) {
+        idx = 0;
+      }
+      total_ -= buckets_[idx];
+      buckets_[idx].clear();
+    }
+    return newBucket;
+  }
+}
+
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::clear() {
+  for (Bucket& bucket : buckets_) {
+    bucket.clear();
+  }
+  total_.clear();
+  // Set firstTime_ larger than latestTime_,
+  // to indicate that the timeseries is empty
+  firstTime_ = TimeType(1);
+  latestTime_ = TimeType(0);
+}
+
+
+template <typename VT, typename TT>
+TT BucketedTimeSeries<VT, TT>::elapsed() const {
+  if (empty()) {
+    return TimeType(0);
+  }
+
+  if (isAllTime()) {
+    return latestTime_ - firstTime_ + TimeType(1);
+  }
+
+  size_t currentBucket;
+  TimeType currentBucketStart;
+  TimeType nextBucketStart;
+  getBucketInfo(latestTime_, &currentBucket,
+                &currentBucketStart, &nextBucketStart);
+
+  // Subtract 1 duration from the start of the next bucket to find the
+  // earliest possible data point we could be tracking.
+  TimeType earliestTime = nextBucketStart - duration_;
+
+  // We're never tracking data before firstTime_
+  earliestTime = std::max(earliestTime, firstTime_);
+
+  return latestTime_ - earliestTime + TimeType(1);
+}
+
+template <typename VT, typename TT>
+VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
+  ValueType sum = ValueType();
+  forEachBucket(start, end, [&](const Bucket& bucket,
+                                TimeType bucketStart,
+                                TimeType nextBucketStart) {
+    sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+                             bucket.sum);
+    return true;
+  });
+
+  return sum;
+}
+
+template <typename VT, typename TT>
+uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
+  uint64_t count = 0;
+  forEachBucket(start, end, [&](const Bucket& bucket,
+                                TimeType bucketStart,
+                                TimeType nextBucketStart) {
+    count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+                               bucket.count);
+    return true;
+  });
+
+  return count;
+}
+
+template <typename VT, typename TT>
+template <typename ReturnType>
+ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
+  ValueType sum = ValueType();
+  uint64_t count = 0;
+  forEachBucket(start, end, [&](const Bucket& bucket,
+                                TimeType bucketStart,
+                                TimeType nextBucketStart) {
+    sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+                             bucket.sum);
+    count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
+                               bucket.count);
+    return true;
+  });
+
+  if (count == 0) {
+    return ReturnType(0);
+  }
+
+  return static_cast<ReturnType>(sum) / count;
+}
+
+/*
+ * A note about some of the bucket index calculations below:
+ *
+ * buckets_.size() may not divide evenly into duration_.  When this happens,
+ * some buckets will be wider than others.  We still want to spread the data
+ * out as evenly as possible among the buckets (as opposed to just making the
+ * last bucket be significantly wider than all of the others).
+ *
+ * To make the division work out, we pretend that the buckets are each
+ * duration_ wide, so that the overall duration becomes
+ * buckets.size() * duration_.
+ *
+ * To transform a real timestamp into the scale used by our buckets,
+ * we have to multiply by buckets_.size().  To figure out which bucket it goes
+ * into, we then divide by duration_.
+ */
+
+template <typename VT, typename TT>
+size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
+  // For all-time data we don't use buckets_.  Everything is tracked in total_.
+  DCHECK(!isAllTime());
+
+  time %= duration_;
+  return time.count() * buckets_.size() / duration_.count();
+}
+
+/*
+ * Compute the bucket index for the specified time, as well as the earliest
+ * time that falls into this bucket.
+ */
+template <typename VT, typename TT>
+void BucketedTimeSeries<VT, TT>::getBucketInfo(
+    TimeType time, size_t *bucketIdx,
+    TimeType* bucketStart, TimeType* nextBucketStart) const {
+  typedef typename TimeType::rep TimeInt;
+  DCHECK(!isAllTime());
+
+  // Keep these two lines together.  The compiler should be able to compute
+  // both the division and modulus with a single operation.
+  TimeType timeMod = time % duration_;
+  TimeInt numFullDurations = time / duration_;
+
+  TimeInt scaledTime = timeMod.count() * buckets_.size();
+
+  // Keep these two lines together.  The compiler should be able to compute
+  // both the division and modulus with a single operation.
+  *bucketIdx = scaledTime / duration_.count();
+  TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
+
+  TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
+  TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
+
+  TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
+                          buckets_.size());
+  TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
+                              buckets_.size());
+
+  TimeType durationStart(numFullDurations * duration_.count());
+  *bucketStart = bucketStartMod + durationStart;
+  *nextBucketStart = nextBucketStartMod + durationStart;
+}
+
+template <typename VT, typename TT>
+template <typename Function>
+void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
+  if (isAllTime()) {
+    fn(total_, firstTime_, latestTime_ + TimeType(1));
+    return;
+  }
+
+  typedef typename TimeType::rep TimeInt;
+
+  // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
+  // the same way as in getBucketInfo().
+  TimeType timeMod = latestTime_ % duration_;
+  TimeInt numFullDurations = latestTime_ / duration_;
+  TimeType durationStart(numFullDurations * duration_.count());
+  TimeInt scaledTime = timeMod.count() * buckets_.size();
+  size_t latestBucketIdx = scaledTime / duration_.count();
+  TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
+  TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
+  TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
+
+  // Walk through the buckets, starting one past the current bucket.
+  // The next bucket is from the previous cycle, so subtract 1 duration
+  // from durationStart.
+  size_t idx = latestBucketIdx;
+  durationStart -= duration_;
+
+  TimeType nextBucketStart =
+    TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
+    durationStart;
+  while (true) {
+    ++idx;
+    if (idx >= buckets_.size()) {
+      idx = 0;
+      durationStart += duration_;
+      scaledNextBucketStart = duration_.count();
+    } else {
+      scaledNextBucketStart += duration_.count();
+    }
+
+    TimeType bucketStart = nextBucketStart;
+    nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
+                               buckets_.size()) + durationStart;
+
+    // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
+    // For now we go ahead and invoke the function with these buckets.
+    // sum and count should always be 0 in these buckets.
+
+    DCHECK_LE(bucketStart.count(), latestTime_.count());
+    bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
+    if (!ret) {
+      break;
+    }
+
+    if (idx == latestBucketIdx) {
+      // all done
+      break;
+    }
+  }
+}
+
+/*
+ * Adjust the input value from the specified bucket to only account
+ * for the desired range.
+ *
+ * For example, if the bucket spans time [10, 20), but we only care about the
+ * range [10, 16), this will return 60% of the input value.
+ */
+template<typename VT, typename TT>
+VT BucketedTimeSeries<VT, TT>::rangeAdjust(
+    TimeType bucketStart, TimeType nextBucketStart,
+    TimeType start, TimeType end, ValueType input) const {
+  // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
+  // if it were latestTime_.  This makes us more accurate when someone is
+  // querying for all of the data up to latestTime_.  Even though latestTime_
+  // may only be partially through the bucket, we don't want to adjust
+  // downwards in this case, because the bucket really only has data up to
+  // latestTime_.
+  if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
+    nextBucketStart = latestTime_ + TimeType(1);
+  }
+
+  if (start <= bucketStart && end >= nextBucketStart) {
+    // The bucket is wholly contained in the [start, end) interval
+    return input;
+  }
+
+  TimeType intervalStart = std::max(start, bucketStart);
+  TimeType intervalEnd = std::min(end, nextBucketStart);
+  return input * (intervalEnd - intervalStart) /
+    (nextBucketStart - bucketStart);
+}
+
+template <typename VT, typename TT>
+template <typename Function>
+void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
+                                               Function fn) const {
+  forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
+                                     TimeType nextBucketStart) {
+    if (start >= nextBucketStart) {
+      return true;
+    }
+    if (end <= bucketStart) {
+      return false;
+    }
+    bool ret = fn(bucket, bucketStart, nextBucketStart);
+    return ret;
+  });
+}
+
+} // folly
+
+#endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
diff --git a/folly/stats/BucketedTimeSeries.h b/folly/stats/BucketedTimeSeries.h
new file mode 100644 (file)
index 0000000..f343c48
--- /dev/null
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_STATS_BUCKETEDTIMESERIES_H_
+#define FOLLY_STATS_BUCKETEDTIMESERIES_H_
+
+#include <chrono>
+#include <vector>
+
+#include "folly/detail/Stats.h"
+
+namespace folly {
+
+/*
+ * This class represents a bucketed time series which keeps track of values
+ * added in the recent past, and merges these values together into a fixed
+ * number of buckets to keep a lid on memory use if the number of values
+ * added is very large.
+ *
+ * For example, a BucketedTimeSeries() with duration == 60s and 10 buckets
+ * will keep track of 10 6-second buckets, and discard all data added more
+ * than 1 minute ago.  As time ticks by, a 6-second bucket at a time will
+ * be discarded and new data will go into the newly opened bucket.  Internally,
+ * it uses a circular array of buckets that it reuses as time advances.
+ *
+ * The class assumes that time advances forward --  you can't retroactively add
+ * values for events in the past -- the 'now' argument is provided for better
+ * efficiency and ease of unittesting.
+ *
+ *
+ * This class is not thread-safe -- use your own synchronization!
+ */
+template <typename VT, typename TT=std::chrono::seconds>
+class BucketedTimeSeries {
+ public:
+  typedef VT ValueType;
+  typedef TT TimeType;
+  typedef detail::Bucket<ValueType> Bucket;
+
+  /*
+   * Create a new BucketedTimeSeries.
+   *
+   * This creates a new BucketedTimeSeries with the specified number of
+   * buckets, storing data for the specified amount of time.
+   *
+   * If the duration is 0, the BucketedTimeSeries will track data forever,
+   * and does not need the rolling buckets.  The numBuckets parameter is
+   * ignored when duration is 0.
+   */
+  BucketedTimeSeries(size_t numBuckets, TimeType duration);
+
+  /*
+   * Adds the value 'val' at time 'now'
+   *
+   * This function expects time to always move forwards: it cannot be used to
+   * add historical data points that have occurred in the past.  If now is
+   * older than the another timestamp that has already been passed to
+   * addValue() or update(), now will be ignored and the latest timestamp will
+   * be used.
+   */
+  void addValue(TimeType now, const ValueType& val);
+
+  /*
+   * Adds the value 'val' the given number of 'times' at time 'now'
+   */
+  void addValue(TimeType now, const ValueType& val, int64_t times);
+
+  /*
+   * Adds the value 'sum' as the sum of 'nsamples' samples
+   */
+  void addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples);
+
+  /*
+   * Updates the container to the specified time, doing all the necessary
+   * work to rotate the buckets and remove any stale data points.
+   *
+   * The addValue() methods automatically call update() when adding new data
+   * points.  However, when reading data from the timeseries, you should make
+   * sure to manually call update() before accessing the data.  Otherwise you
+   * may be reading stale data if update() has not been called recently.
+   *
+   * Returns the current bucket index after the update.
+   */
+  size_t update(TimeType now);
+
+  /*
+   * Reset the timeseries to an empty state,
+   * as if no data points have ever been added to it.
+   */
+  void clear();
+
+  /*
+   * Get the latest time that has ever been passed to update() or addValue().
+   */
+  TimeType getLatestTime() const {
+    return latestTime_;
+  }
+
+  /*
+   * Return the number of buckets.
+   */
+  size_t numBuckets() const {
+    return buckets_.size();
+  }
+
+  /*
+   * Return the maximum duration of data that can be tracked by this
+   * BucketedTimeSeries.
+   */
+  TimeType duration() const {
+    return duration_;
+  }
+
+  /*
+   * Returns true if this BucketedTimeSeries stores data for all-time, without
+   * ever rolling over into new buckets.
+   */
+  bool isAllTime() const {
+    return (duration_ == TimeType(0));
+  }
+
+  /*
+   * Returns true if no calls to update() have been made since the last call to
+   * clear().
+   */
+  bool empty() const {
+    // We set firstTime_ greater than latestTime_ in the constructor and in
+    // clear, so we use this to distinguish if the timeseries is empty.
+    //
+    // Once a data point has been added, latestTime_ will always be greater
+    // than or equal to firstTime_.
+    return firstTime_ > latestTime_;
+  }
+
+  /*
+   * Get the amount of time tracked by this timeseries.
+   *
+   * For an all-time timeseries, this returns the length of time since the
+   * first data point was added to the time series.
+   *
+   * Otherwise, this never returns a value greater than the overall timeseries
+   * duration.  If the first data point was recorded less than a full duration
+   * ago, the time since the first data point is returned.  If a full duration
+   * has elapsed, and we have already thrown away some data, the time since the
+   * oldest bucket is returned.
+   *
+   * For example, say we are tracking 600 seconds worth of data, in 60 buckets.
+   * - If less than 600 seconds have elapsed since the first data point,
+   *   elapsed() returns the total elapsed time so far.
+   * - If more than 600 seconds have elapsed, we have already thrown away some
+   *   data.  However, we throw away a full bucket (10 seconds worth) at once,
+   *   so at any point in time we have from 590 to 600 seconds worth of data.
+   *   elapsed() will therefore return a value between 590 and 600.
+   *
+   * Note that you generally should call update() before calling elapsed(), to
+   * make sure you are not reading stale data.
+   */
+  TimeType elapsed() const;
+
+  /*
+   * Return the sum of all the data points currently tracked by this
+   * BucketedTimeSeries.
+   *
+   * Note that you generally should call update() before calling sum(), to
+   * make sure you are not reading stale data.
+   */
+  const ValueType& sum() const {
+    return total_.sum;
+  }
+
+  /*
+   * Return the number of data points currently tracked by this
+   * BucketedTimeSeries.
+   *
+   * Note that you generally should call update() before calling count(), to
+   * make sure you are not reading stale data.
+   */
+  uint64_t count() const {
+    return total_.count;
+  }
+
+  /*
+   * Return the average value (sum / count).
+   *
+   * The return type may be specified to control whether floating-point or
+   * integer division should be performed.
+   *
+   * Note that you generally should call update() before calling avg(), to
+   * make sure you are not reading stale data.
+   */
+  template <typename ReturnType=double>
+  ReturnType avg() const {
+    return total_.avg<ReturnType>();
+  }
+
+  /*
+   * Return the sum divided by the elapsed time.
+   *
+   * Note that you generally should call update() before calling rate(), to
+   * make sure you are not reading stale data.
+   */
+  template <typename ReturnType=double, typename Interval=TimeType>
+  ReturnType rate() const {
+    return rateHelper<ReturnType, Interval>(total_.sum, elapsed());
+  }
+
+  /*
+   * Return the count divided by the elapsed time.
+   *
+   * The Interval template parameter causes the elapsed time to be converted to
+   * the Interval type before using it.  For example, if Interval is
+   * std::chrono::seconds, the return value will be the count per second.
+   * If Interval is std::chrono::hours, the return value will be the count per
+   * hour.
+   *
+   * Note that you generally should call update() before calling countRate(),
+   * to make sure you are not reading stale data.
+   */
+  template <typename ReturnType=double, typename Interval=TimeType>
+  ReturnType countRate() const {
+    return rateHelper<ReturnType, Interval>(total_.count, elapsed());
+  }
+
+  /*
+   * Estimate the sum of the data points that occurred in the specified time
+   * period.
+   *
+   * The range queried is [start, end).
+   * That is, start is inclusive, and end is exclusive.
+   *
+   * Note that data outside of the timeseries duration will no longer be
+   * available for use in the estimation.  Specifying a start time earlier than
+   * (getLatestTime() - elapsed()) will not have much effect, since only data
+   * points after that point in time will be counted.
+   *
+   * Note that the value returned is an estimate, and may not be precise.
+   */
+  ValueType sum(TimeType start, TimeType end) const;
+
+  /*
+   * Estimate the number of data points that occurred in the specified time
+   * period.
+   *
+   * The same caveats documented in the sum(TimeType start, TimeType end)
+   * comments apply here as well.
+   */
+  uint64_t count(TimeType start, TimeType end) const;
+
+  /*
+   * Estimate the average value during the specified time period.
+   *
+   * The same caveats documented in the sum(TimeType start, TimeType end)
+   * comments apply here as well.
+   */
+  template <typename ReturnType=double>
+  ReturnType avg(TimeType start, TimeType end) const;
+
+  /*
+   * Estimate the rate during the specified time period.
+   *
+   * The same caveats documented in the sum(TimeType start, TimeType end)
+   * comments apply here as well.
+   */
+  template <typename ReturnType=double, typename Interval=TimeType>
+  ReturnType rate(TimeType start, TimeType end) const {
+    ValueType intervalSum = sum(start, end);
+    return rateHelper<ReturnType, Interval>(intervalSum, end - start);
+  }
+
+  /*
+   * Estimate the rate of data points being added during the specified time
+   * period.
+   *
+   * The same caveats documented in the sum(TimeType start, TimeType end)
+   * comments apply here as well.
+   */
+  template <typename ReturnType=double, typename Interval=TimeType>
+  ReturnType countRate(TimeType start, TimeType end) const {
+    uint64_t intervalCount = count(start, end);
+    return rateHelper<ReturnType, Interval>(intervalCount, end - start);
+  }
+
+  /*
+   * Invoke a function for each bucket.
+   *
+   * The function will take as arguments the bucket index,
+   * the bucket start time, and the start time of the subsequent bucket.
+   *
+   * It should return true to continue iterating through the buckets, and false
+   * to break out of the loop and stop, without calling the function on any
+   * more buckets.
+   *
+   * bool function(const Bucket& bucket, TimeType bucketStart,
+   *               TimeType nextBucketStart)
+   */
+  template <typename Function>
+  void forEachBucket(Function fn) const;
+
+  /*
+   * Get the index for the bucket containing the specified time.
+   *
+   * Note that the index is only valid if this time actually falls within one
+   * of the current buckets.  If you pass in a value more recent than
+   * getLatestTime() or older than (getLatestTime() - elapsed()), the index
+   * returned will not be valid.
+   *
+   * This method may not be called for all-time data.
+   */
+  size_t getBucketIdx(TimeType time) const;
+
+  /*
+   * Get the bucket at the specified index.
+   *
+   * This method may not be called for all-time data.
+   */
+  const Bucket& getBucketByIndex(size_t idx) const {
+    return buckets_[idx];
+  }
+
+  /*
+   * Compute the bucket index that the specified time falls into,
+   * as well as the bucket start time and the next bucket's start time.
+   *
+   * This method may not be called for all-time data.
+   */
+  void getBucketInfo(TimeType time, size_t* bucketIdx,
+                     TimeType* bucketStart, TimeType* nextBucketStart) const;
+
+ private:
+  template <typename ReturnType=double, typename Interval=TimeType>
+  ReturnType rateHelper(ReturnType numerator, TimeType elapsed) const {
+    if (elapsed == TimeType(0)) {
+      return 0;
+    }
+
+    // Use std::chrono::duration_cast to convert between the native
+    // duration and the desired interval.  However, convert the rates,
+    // rather than just converting the elapsed duration.  Converting the
+    // elapsed time first may collapse it down to 0 if the elapsed interval
+    // is less than the desired interval, which will incorrectly result in
+    // an infinite rate.
+    typedef std::chrono::duration<
+        ReturnType, std::ratio<TimeType::period::den,
+                               TimeType::period::num>> NativeRate;
+    typedef std::chrono::duration<
+        ReturnType, std::ratio<Interval::period::den,
+                               Interval::period::num>> DesiredRate;
+
+    NativeRate native(numerator / elapsed.count());
+    DesiredRate desired = std::chrono::duration_cast<DesiredRate>(native);
+    return desired.count();
+  }
+
+  ValueType rangeAdjust(TimeType bucketStart, TimeType nextBucketStart,
+                        TimeType start, TimeType end,
+                        ValueType input) const;
+
+  template <typename Function>
+  void forEachBucket(TimeType start, TimeType end, Function fn) const;
+
+  TimeType firstTime_;   // time of first update() since clear()/constructor
+  TimeType latestTime_;  // time of last update()
+  TimeType duration_;    // total duration ("window length") of the time series
+
+  Bucket total_;                 // sum and count of everything in time series
+  std::vector<Bucket> buckets_;  // actual buckets of values
+};
+
+} // folly
+
+#endif // FOLLY_STATS_BUCKETEDTIMESERIES_H_
diff --git a/folly/test/TimeseriesBenchmark.cpp b/folly/test/TimeseriesBenchmark.cpp
new file mode 100644 (file)
index 0000000..2ee7935
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "folly/stats/BucketedTimeSeries.h"
+#include "folly/stats/BucketedTimeSeries-defs.h"
+
+#include <glog/logging.h>
+
+#include "folly/Benchmark.h"
+
+using std::chrono::seconds;
+using folly::BenchmarkSuspender;
+using folly::BucketedTimeSeries;
+
+void addValue(unsigned int iters,
+              seconds duration, size_t numBuckets,
+              size_t callsPerSecond) {
+  BenchmarkSuspender suspend;
+  BucketedTimeSeries<int64_t> ts(numBuckets, duration);
+  suspend.dismiss();
+
+  seconds currentTime(1342000000);
+  size_t timeCounter = 0;
+  for (unsigned int n = 0; n < iters; ++n, ++timeCounter) {
+    if (timeCounter >= callsPerSecond) {
+      timeCounter = 0;
+      ++currentTime;
+    }
+    ts.addValue(currentTime, n);
+  }
+}
+
+BENCHMARK_NAMED_PARAM(addValue, AllTime_1perSec, seconds(0), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 3600x60_1perSec, seconds(3600), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 600x60_1perSec, seconds(600), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 60x60_1perSec, seconds(60), 60, 1);
+BENCHMARK_NAMED_PARAM(addValue, 100x10_1perSec, seconds(100), 10, 1);
+BENCHMARK_NAMED_PARAM(addValue, 71x5_1perSec, seconds(71), 5, 1);
+BENCHMARK_NAMED_PARAM(addValue, 1x1_1perSec, seconds(1), 1, 1);
+
+BENCHMARK_DRAW_LINE()
+
+BENCHMARK_NAMED_PARAM(addValue, AllTime_10perSec, seconds(0), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 3600x60_10perSec, seconds(3600), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 600x60_10perSec, seconds(600), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 60x60_10perSec, seconds(60), 60, 10);
+BENCHMARK_NAMED_PARAM(addValue, 100x10_10perSec, seconds(100), 10, 10);
+BENCHMARK_NAMED_PARAM(addValue, 71x5_10perSec, seconds(71), 5, 10);
+BENCHMARK_NAMED_PARAM(addValue, 1x1_10perSec, seconds(1), 1, 10);
+
+BENCHMARK_DRAW_LINE()
+
+BENCHMARK_NAMED_PARAM(addValue, AllTime_100perSec, seconds(0), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 3600x60_100perSec, seconds(3600), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 600x60_100perSec, seconds(600), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 60x60_100perSec, seconds(60), 60, 100);
+BENCHMARK_NAMED_PARAM(addValue, 100x10_100perSec, seconds(100), 10, 100);
+BENCHMARK_NAMED_PARAM(addValue, 71x5_100perSec, seconds(71), 5, 100);
+BENCHMARK_NAMED_PARAM(addValue, 1x1_100perSec, seconds(1), 1, 100);
+
+int main(int argc, char *argv[]) {
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  folly::runBenchmarks();
+  return 0;
+}
diff --git a/folly/test/TimeseriesTest.cpp b/folly/test/TimeseriesTest.cpp
new file mode 100644 (file)
index 0000000..84b64b1
--- /dev/null
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "folly/stats/BucketedTimeSeries.h"
+#include "folly/stats/BucketedTimeSeries-defs.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+using std::chrono::seconds;
+using std::string;
+using std::vector;
+using folly::BucketedTimeSeries;
+
+struct TestData {
+  size_t duration;
+  size_t numBuckets;
+  vector<ssize_t> bucketStarts;
+};
+vector<TestData> testData = {
+  // 71 seconds x 4 buckets
+  { 71, 4, {0, 18, 36, 54}},
+  // 100 seconds x 10 buckets
+  { 100, 10, {0, 10, 20, 30, 40, 50, 60, 70, 80, 90}},
+  // 10 seconds x 10 buckets
+  { 10, 10, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}},
+  // 10 seconds x 1 buckets
+  { 10, 1, {0}},
+  // 1 second x 1 buckets
+  { 1, 1, {0}},
+};
+
+TEST(BucketedTimeSeries, getBucketInfo) {
+  for (const auto& data : testData) {
+    BucketedTimeSeries<int64_t> ts(data.numBuckets, seconds(data.duration));
+
+    for (uint32_t n = 0; n < 10000; n += 1234) {
+      seconds offset(n * data.duration);
+
+      for (uint32_t idx = 0; idx < data.numBuckets; ++idx) {
+        seconds bucketStart(data.bucketStarts[idx]);
+        seconds nextBucketStart;
+        if (idx + 1 < data.numBuckets) {
+            nextBucketStart = seconds(data.bucketStarts[idx + 1]);
+        } else {
+            nextBucketStart = seconds(data.duration);
+        }
+
+        seconds expectedStart = offset + bucketStart;
+        seconds expectedNextStart = offset + nextBucketStart;
+        seconds midpoint = (expectedStart + expectedNextStart) / 2;
+
+        vector<std::pair<string, seconds>> timePoints = {
+          {"expectedStart", expectedStart},
+          {"midpoint", midpoint},
+          {"expectedEnd", expectedNextStart - seconds(1)},
+        };
+
+        for (const auto& point : timePoints) {
+          // Check that getBucketIdx() returns the expected index
+          EXPECT_EQ(idx, ts.getBucketIdx(point.second)) <<
+            data.duration << "x" << data.numBuckets << ": " <<
+            point.first << "=" << point.second.count();
+
+          // Check the data returned by getBucketInfo()
+          size_t returnedIdx;
+          seconds returnedStart;
+          seconds returnedNextStart;
+          ts.getBucketInfo(expectedStart, &returnedIdx,
+                           &returnedStart, &returnedNextStart);
+          EXPECT_EQ(idx, returnedIdx) <<
+            data.duration << "x" << data.numBuckets << ": " <<
+            point.first << "=" << point.second.count();
+          EXPECT_EQ(expectedStart.count(), returnedStart.count()) <<
+            data.duration << "x" << data.numBuckets << ": " <<
+            point.first << "=" << point.second.count();
+          EXPECT_EQ(expectedNextStart.count(), returnedNextStart.count()) <<
+            data.duration << "x" << data.numBuckets << ": " <<
+            point.first << "=" << point.second.count();
+        }
+      }
+    }
+  }
+}
+
+void testUpdate100x10(size_t offset) {
+  // This test code only works when offset is a multiple of the bucket width
+  CHECK_EQ(0, offset % 10);
+
+  // Create a 100 second timeseries, with 10 buckets
+  BucketedTimeSeries<int64_t> ts(10, seconds(100));
+
+  auto setup = [&] {
+    ts.clear();
+    // Add 1 value to each bucket
+    for (int n = 5; n <= 95; n += 10) {
+      ts.addValue(seconds(n + offset), 6);
+    }
+
+    EXPECT_EQ(10, ts.count());
+    EXPECT_EQ(60, ts.sum());
+    EXPECT_EQ(6, ts.avg());
+  };
+
+  // Update 2 buckets forwards.  This should throw away 2 data points.
+  setup();
+  ts.update(seconds(110 + offset));
+  EXPECT_EQ(8, ts.count());
+  EXPECT_EQ(48, ts.sum());
+  EXPECT_EQ(6, ts.avg());
+
+  // The last time we added was 95.
+  // Try updating to 189.  This should clear everything but the last bucket.
+  setup();
+  ts.update(seconds(151 + offset));
+  EXPECT_EQ(4, ts.count());
+  //EXPECT_EQ(6, ts.sum());
+  EXPECT_EQ(6, ts.avg());
+
+  // The last time we added was 95.
+  // Try updating to 193: This is nearly one full loop around,
+  // back to the same bucket.  update() needs to clear everything
+  setup();
+  ts.update(seconds(193 + offset));
+  EXPECT_EQ(0, ts.count());
+  EXPECT_EQ(0, ts.sum());
+  EXPECT_EQ(0, ts.avg());
+
+  // The last time we added was 95.
+  // Try updating to 197: This is slightly over one full loop around,
+  // back to the same bucket.  update() needs to clear everything
+  setup();
+  ts.update(seconds(197 + offset));
+  EXPECT_EQ(0, ts.count());
+  EXPECT_EQ(0, ts.sum());
+  EXPECT_EQ(0, ts.avg());
+
+  // The last time we added was 95.
+  // Try updating to 230: This is well over one full loop around,
+  // and everything should be cleared.
+  setup();
+  ts.update(seconds(230 + offset));
+  EXPECT_EQ(0, ts.count());
+  EXPECT_EQ(0, ts.sum());
+  EXPECT_EQ(0, ts.avg());
+}
+
+TEST(BucketedTimeSeries, update100x10) {
+  // Run testUpdate100x10() multiple times, with various offsets.
+  // This makes sure the update code works regardless of which bucket it starts
+  // at in the modulo arithmetic.
+  testUpdate100x10(0);
+  testUpdate100x10(50);
+  testUpdate100x10(370);
+  testUpdate100x10(1937090);
+}
+
+TEST(BucketedTimeSeries, update71x5) {
+  // Create a 71 second timeseries, with 5 buckets
+  // This tests when the number of buckets does not divide evenly into the
+  // duration.
+  BucketedTimeSeries<int64_t> ts(5, seconds(71));
+
+  auto setup = [&] {
+    ts.clear();
+    // Add 1 value to each bucket
+    ts.addValue(seconds(11), 6);
+    ts.addValue(seconds(24), 6);
+    ts.addValue(seconds(42), 6);
+    ts.addValue(seconds(43), 6);
+    ts.addValue(seconds(66), 6);
+
+    EXPECT_EQ(5, ts.count());
+    EXPECT_EQ(30, ts.sum());
+    EXPECT_EQ(6, ts.avg());
+  };
+
+  // Update 2 buckets forwards.  This should throw away 2 data points.
+  setup();
+  ts.update(seconds(99));
+  EXPECT_EQ(3, ts.count());
+  EXPECT_EQ(18, ts.sum());
+  EXPECT_EQ(6, ts.avg());
+
+  // Update 3 buckets forwards.  This should throw away 3 data points.
+  setup();
+  ts.update(seconds(100));
+  EXPECT_EQ(2, ts.count());
+  EXPECT_EQ(12, ts.sum());
+  EXPECT_EQ(6, ts.avg());
+
+  // Update 4 buckets forwards, just under the wrap limit.
+  // This should throw everything but the last bucket away.
+  setup();
+  ts.update(seconds(127));
+  EXPECT_EQ(1, ts.count());
+  EXPECT_EQ(6, ts.sum());
+  EXPECT_EQ(6, ts.avg());
+
+  // Update 5 buckets forwards, exactly at the wrap limit.
+  // This should throw everything away.
+  setup();
+  ts.update(seconds(128));
+  EXPECT_EQ(0, ts.count());
+  EXPECT_EQ(0, ts.sum());
+  EXPECT_EQ(0, ts.avg());
+
+  // Update very far forwards, wrapping multiple times.
+  // This should throw everything away.
+  setup();
+  ts.update(seconds(1234));
+  EXPECT_EQ(0, ts.count());
+  EXPECT_EQ(0, ts.sum());
+  EXPECT_EQ(0, ts.avg());
+}
+
+TEST(BucketedTimeSeries, elapsed) {
+  BucketedTimeSeries<int64_t> ts(60, seconds(600));
+
+  // elapsed() is 0 when no data points have been added
+  EXPECT_EQ(0, ts.elapsed().count());
+
+  // With exactly 1 data point, elapsed() should report 1 second of data
+  seconds start(239218);
+  ts.addValue(start + seconds(0), 200);
+  EXPECT_EQ(1, ts.elapsed().count());
+  // Adding a data point 10 seconds later should result in an elapsed time of
+  // 11 seconds (the time range is [0, 10], inclusive).
+  ts.addValue(start + seconds(10), 200);
+  EXPECT_EQ(11, ts.elapsed().count());
+
+  // elapsed() returns to 0 after clear()
+  ts.clear();
+  EXPECT_EQ(0, ts.elapsed().count());
+
+  // Restart, with the starting point on an easier number to work with
+  ts.addValue(seconds(10), 200);
+  EXPECT_EQ(1, ts.elapsed().count());
+  ts.addValue(seconds(580), 200);
+  EXPECT_EQ(571, ts.elapsed().count());
+  ts.addValue(seconds(590), 200);
+  EXPECT_EQ(581, ts.elapsed().count());
+  ts.addValue(seconds(598), 200);
+  EXPECT_EQ(589, ts.elapsed().count());
+  ts.addValue(seconds(599), 200);
+  EXPECT_EQ(590, ts.elapsed().count());
+  ts.addValue(seconds(600), 200);
+  EXPECT_EQ(591, ts.elapsed().count());
+  ts.addValue(seconds(608), 200);
+  EXPECT_EQ(599, ts.elapsed().count());
+  ts.addValue(seconds(609), 200);
+  EXPECT_EQ(600, ts.elapsed().count());
+  // Once we reach 600 seconds worth of data, when we move on to the next
+  // second a full bucket will get thrown out.  Now we drop back down to 591
+  // seconds worth of data
+  ts.addValue(seconds(610), 200);
+  EXPECT_EQ(591, ts.elapsed().count());
+  ts.addValue(seconds(618), 200);
+  EXPECT_EQ(599, ts.elapsed().count());
+  ts.addValue(seconds(619), 200);
+  EXPECT_EQ(600, ts.elapsed().count());
+  ts.addValue(seconds(620), 200);
+  EXPECT_EQ(591, ts.elapsed().count());
+  ts.addValue(seconds(123419), 200);
+  EXPECT_EQ(600, ts.elapsed().count());
+  ts.addValue(seconds(123420), 200);
+  EXPECT_EQ(591, ts.elapsed().count());
+  ts.addValue(seconds(123425), 200);
+  EXPECT_EQ(596, ts.elapsed().count());
+
+  // Time never moves backwards.
+  // Calling update with an old timestamp will just be ignored.
+  ts.update(seconds(29));
+  EXPECT_EQ(596, ts.elapsed().count());
+}
+
+TEST(BucketedTimeSeries, rate) {
+  BucketedTimeSeries<int64_t> ts(60, seconds(600));
+
+  // Add 3 values every 2 seconds, until fill up the buckets
+  for (size_t n = 0; n < 600; n += 2) {
+    ts.addValue(seconds(n), 200, 3);
+  }
+
+  EXPECT_EQ(900, ts.count());
+  EXPECT_EQ(180000, ts.sum());
+  EXPECT_EQ(200, ts.avg());
+
+  // Really we only entered 599 seconds worth of data: [0, 598] (inclusive)
+  EXPECT_EQ(599, ts.elapsed().count());
+  EXPECT_NEAR(300.5, ts.rate(), 0.005);
+  EXPECT_NEAR(1.5, ts.countRate(), 0.005);
+
+  // If we add 1 more second, now we will have 600 seconds worth of data
+  ts.update(seconds(599));
+  EXPECT_EQ(600, ts.elapsed().count());
+  EXPECT_NEAR(300, ts.rate(), 0.005);
+  EXPECT_EQ(300, ts.rate<int>());
+  EXPECT_NEAR(1.5, ts.countRate(), 0.005);
+
+  // However, 1 more second after that and we will have filled up all the
+  // buckets, and have to drop one.
+  ts.update(seconds(600));
+  EXPECT_EQ(591, ts.elapsed().count());
+  EXPECT_NEAR(299.5, ts.rate(), 0.01);
+  EXPECT_EQ(299, ts.rate<int>());
+  EXPECT_NEAR(1.5, ts.countRate(), 0.005);
+}
+
+TEST(BucketedTimeSeries, forEachBucket) {
+  typedef BucketedTimeSeries<int64_t>::Bucket Bucket;
+  struct BucketInfo {
+    BucketInfo(const Bucket* b, seconds s, seconds ns)
+      : bucket(b), start(s), nextStart(ns) {}
+
+    const Bucket* bucket;
+    seconds start;
+    seconds nextStart;
+  };
+
+  for (const auto& data : testData) {
+    BucketedTimeSeries<int64_t> ts(data.numBuckets, seconds(data.duration));
+
+    vector<BucketInfo> info;
+    auto fn = [&](const Bucket& bucket, seconds bucketStart,
+                  seconds bucketEnd) {
+      info.emplace_back(&bucket, bucketStart, bucketEnd);
+      return true;
+    };
+
+    // If we haven't yet added any data, the current bucket will start at 0,
+    // and all data previous buckets will have negative times.
+    ts.forEachBucket(fn);
+
+    CHECK_EQ(data.numBuckets, info.size());
+
+    // Check the data passed in to the function
+    size_t infoIdx = 0;
+    size_t bucketIdx = 1;
+    ssize_t offset = -data.duration;
+    for (size_t n = 0; n < data.numBuckets; ++n) {
+      if (bucketIdx >= data.numBuckets) {
+        bucketIdx = 0;
+        offset += data.duration;
+      }
+
+      EXPECT_EQ(data.bucketStarts[bucketIdx] + offset,
+                info[infoIdx].start.count()) <<
+        data.duration << "x" << data.numBuckets << ": bucketIdx=" <<
+        bucketIdx << ", infoIdx=" << infoIdx;
+
+      size_t nextBucketIdx = bucketIdx + 1;
+      ssize_t nextOffset = offset;
+      if (nextBucketIdx >= data.numBuckets) {
+        nextBucketIdx = 0;
+        nextOffset += data.duration;
+      }
+      EXPECT_EQ(data.bucketStarts[nextBucketIdx] + nextOffset,
+                info[infoIdx].nextStart.count()) <<
+        data.duration << "x" << data.numBuckets << ": bucketIdx=" <<
+        bucketIdx << ", infoIdx=" << infoIdx;
+
+      EXPECT_EQ(&ts.getBucketByIndex(bucketIdx), info[infoIdx].bucket);
+
+      ++bucketIdx;
+      ++infoIdx;
+    }
+  }
+}
+
+TEST(BucketedTimeSeries, queryByIntervalSimple) {
+  BucketedTimeSeries<int> a(3, seconds(12));
+  for (int i = 0; i < 8; i++) {
+    a.addValue(seconds(i), 1);
+  }
+  // We added 1 at each second from 0..7
+  // Query from the time period 0..2.
+  // This is entirely in the first bucket, which has a sum of 4.
+  // The code knows only part of the bucket is covered, and correctly
+  // estimates the desired sum as 3.
+  EXPECT_EQ(2, a.sum(seconds(0), seconds(2)));
+}
+
+TEST(BucketedTimeSeries, queryByInterval) {
+  // Set up a BucketedTimeSeries tracking 6 seconds in 3 buckets
+  const int kNumBuckets = 3;
+  const int kDuration = 6;
+  BucketedTimeSeries<double> b(kNumBuckets, seconds(kDuration));
+
+  for (unsigned int i = 0; i < kDuration; ++i) {
+    // add value 'i' at time 'i'
+    b.addValue(seconds(i), i);
+  }
+
+  // Current bucket state:
+  // 0: time=[0, 2): values=(0, 1), sum=1, count=2
+  // 1: time=[2, 4): values=(2, 3), sum=5, count=1
+  // 2: time=[4, 6): values=(4, 5), sum=9, count=2
+  double expectedSums1[kDuration + 1][kDuration + 1] = {
+    {0,  4.5,   9, 11.5,  14, 14.5,  15},
+    {0,  4.5,   7,  9.5,  10, 10.5,  -1},
+    {0,  2.5,   5,  5.5,   6,   -1,  -1},
+    {0,  2.5,   3,  3.5,  -1,   -1,  -1},
+    {0,  0.5,   1,   -1,  -1,   -1,  -1},
+    {0,  0.5,  -1,   -1,  -1,   -1,  -1},
+    {0,   -1,  -1,   -1,  -1,   -1,  -1}
+  };
+  int expectedCounts1[kDuration + 1][kDuration + 1] = {
+    {0,  1,  2,  3,  4,  5,  6},
+    {0,  1,  2,  3,  4,  5, -1},
+    {0,  1,  2,  3,  4, -1, -1},
+    {0,  1,  2,  3, -1, -1, -1},
+    {0,  1,  2, -1, -1, -1, -1},
+    {0,  1, -1, -1, -1, -1, -1},
+    {0, -1, -1, -1, -1, -1, -1}
+  };
+
+  seconds currentTime = b.getLatestTime() + seconds(1);
+  for (int i = 0; i <= kDuration + 1; i++) {
+    for (int j = 0; j <= kDuration - i; j++) {
+      seconds start = currentTime - seconds(i + j);
+      seconds end = currentTime - seconds(i);
+      double expectedSum = expectedSums1[i][j];
+      EXPECT_EQ(expectedSum, b.sum(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+
+      uint64_t expectedCount = expectedCounts1[i][j];
+      EXPECT_EQ(expectedCount, b.count(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+
+      double expectedAvg = expectedCount ? expectedSum / expectedCount : 0;
+      EXPECT_EQ(expectedAvg, b.avg(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+
+      double expectedRate = j ? expectedSum / j : 0;
+      EXPECT_EQ(expectedRate, b.rate(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+    }
+  }
+
+  // Add 3 more values.
+  // This will overwrite 1 full bucket, and put us halfway through the next.
+  for (unsigned int i = kDuration; i < kDuration + 3; ++i) {
+    b.addValue(seconds(i), i);
+  }
+
+  // Current bucket state:
+  // 0: time=[6,  8): values=(6, 7), sum=13, count=2
+  // 1: time=[8, 10): values=(8),    sum=8, count=1
+  // 2: time=[4,  6): values=(4, 5), sum=9, count=2
+  double expectedSums2[kDuration + 1][kDuration + 1] = {
+    {0,    8, 14.5,   21, 25.5,  30,  30},
+    {0,  6.5,   13, 17.5,   22,  22,  -1},
+    {0,  6.5,   11, 15.5, 15.5,  -1,  -1},
+    {0,  4.5,    9,    9,   -1,  -1,  -1},
+    {0,  4.5,  4.5,   -1,   -1,  -1,  -1},
+    {0,    0,   -1,   -1,   -1,  -1,  -1},
+    {0,   -1,   -1,   -1,   -1,  -1,  -1}
+  };
+  int expectedCounts2[kDuration + 1][kDuration + 1] = {
+    {0,  1,  2,  3,  4,  5,  5},
+    {0,  1,  2,  3,  4,  4, -1},
+    {0,  1,  2,  3,  3, -1, -1},
+    {0,  1,  2,  2, -1, -1, -1},
+    {0,  1,  1, -1, -1, -1, -1},
+    {0,  0, -1, -1, -1, -1, -1},
+    {0, -1, -1, -1, -1, -1, -1}
+  };
+
+  currentTime = b.getLatestTime() + seconds(1);
+  for (int i = 0; i <= kDuration + 1; i++) {
+    for (int j = 0; j <= kDuration - i; j++) {
+      seconds start = currentTime - seconds(i + j);
+      seconds end = currentTime - seconds(i);
+      double expectedSum = expectedSums2[i][j];
+      EXPECT_EQ(expectedSum, b.sum(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+
+      uint64_t expectedCount = expectedCounts2[i][j];
+      EXPECT_EQ(expectedCount, b.count(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+
+      double expectedAvg = expectedCount ? expectedSum / expectedCount : 0;
+      EXPECT_EQ(expectedAvg, b.avg(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+
+      double expectedRate = j ? expectedSum / j : 0;
+      EXPECT_EQ(expectedRate, b.rate(start, end)) <<
+        "i=" << i << ", j=" << j <<
+        ", interval=[" << start.count() << ", " << end.count() << ")";
+    }
+  }
+}