Summary:
Previously BucketedTimeSeries()::addValue() documented that it required
time to move forwards. If it was ever called with a timestamp older
than the most recent one it had seen, it would just use latestTime_ as
the time, and add the value to the most recent bucket.
This changes addValue() so that it always uses the timestamp passed in
by the caller. If this time value refers to an old bucket that is still
being tracked, the data point will be added to that bucket. If the time
value is older than the oldest currently tracked bucket, the data point
will be ignored, and false will be returned.
I did consider leaving the current addValue() behavior as-is, and
requiring a separate addHistoricalValue() for when users intentionally
want to try adding old data points. However, it seems nicer to build
this into the existing addValue() function. The old behavior of just
replacing the supplied time value seems potentially surprising to users.
This does change the behavior of addValue(), and therefore could affect
the behavior of some programs. However, up until now no-one should have
been calling addValue() with old time values, as it wouldn't have done
what they want anyway. I did a brief search through our code base, and
all call sites I saw always called addValue() with the current time.
(Most of the callers use wall clock time, so this change might affect
program behavior if the system time changes after the program starts.
We should ideally change our programs to use monotonic clocks instead.)
Test Plan:
Included a new unit test.
Also compared the timeseries_benchmark results before and after this
change. Overall this new logic seems to be faster. For the "all time"
case, the new code is over 2x as fast. For the normal, non-all-time
case the new code is around 5% faster.
Reviewed By: hans@fb.com
Subscribers: doug, folly@lists, net-systems@, exa
FB internal diff:
D1338466
#define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
#include <glog/logging.h>
+#include "folly/Likely.h"
namespace folly {
}
template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
- addValueAggregated(now, val, 1);
+bool BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
+ return addValueAggregated(now, val, 1);
}
template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
+bool BucketedTimeSeries<VT, TT>::addValue(TimeType now,
const ValueType& val,
int64_t times) {
- addValueAggregated(now, val * times, times);
+ return addValueAggregated(now, val * times, times);
}
template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
+bool BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
const ValueType& sum,
int64_t nsamples) {
- // Make sure time doesn't go backwards
- now = std::max(now, latestTime_);
-
if (isAllTime()) {
- if (empty()) {
+ if (UNLIKELY(empty())) {
+ firstTime_ = now;
+ latestTime_ = now;
+ } else if (now > latestTime_) {
+ latestTime_ = now;
+ } else if (now < firstTime_) {
firstTime_ = now;
}
- latestTime_ = now;
total_.add(sum, nsamples);
- return;
+ return true;
}
- // Update the buckets
- size_t curBucket = update(now);
- buckets_[curBucket].add(sum, nsamples);
+ size_t bucketIdx;
+ if (UNLIKELY(empty())) {
+ // First data point we've ever seen
+ firstTime_ = now;
+ latestTime_ = now;
+ bucketIdx = getBucketIdx(now);
+ } else if (now > latestTime_) {
+ // More recent time. Need to update the buckets.
+ bucketIdx = updateBuckets(now);
+ } else if (LIKELY(now == latestTime_)) {
+ // Current time.
+ bucketIdx = getBucketIdx(now);
+ } else {
+ // An earlier time in the past. We need to check if this time still falls
+ // within our window.
+ if (now < getEarliestTimeNonEmpty()) {
+ return false;
+ }
+ bucketIdx = getBucketIdx(now);
+ }
- // Update the aggregate sum/count
total_.add(sum, nsamples);
+ buckets_[bucketIdx].add(sum, nsamples);
+ return true;
}
template <typename VT, typename TT>
return getBucketIdx(latestTime_);
}
+ return updateBuckets(now);
+}
+
+template <typename VT, typename TT>
+size_t BucketedTimeSeries<VT, TT>::updateBuckets(TimeType now) {
// We could cache nextBucketStart as a member variable, so we don't have to
// recompute it each time update() is called with a new timestamp value.
// This makes things faster when update() (or addValue()) is called once
return firstTime_;
}
+ // Compute the earliest time we can track
+ TimeType earliestTime = getEarliestTimeNonEmpty();
+
+ // We're never tracking data before firstTime_
+ earliestTime = std::max(earliestTime, firstTime_);
+
+ return earliestTime;
+}
+
+template <typename VT, typename TT>
+TT BucketedTimeSeries<VT, TT>::getEarliestTimeNonEmpty() const {
size_t currentBucket;
TimeType currentBucketStart;
TimeType nextBucketStart;
// Subtract 1 duration from the start of the next bucket to find the
// earliest possible data point we could be tracking.
- TimeType earliestTime = nextBucketStart - duration_;
-
- // We're never tracking data before firstTime_
- earliestTime = std::max(earliestTime, firstTime_);
-
- return earliestTime;
+ return nextBucketStart - duration_;
}
template <typename VT, typename TT>
* be discarded and new data will go into the newly opened bucket. Internally,
* it uses a circular array of buckets that it reuses as time advances.
*
- * The class assumes that time advances forward -- you can't retroactively add
- * values for events in the past -- the 'now' argument is provided for better
- * efficiency and ease of unittesting.
- *
+ * This class assumes that time advances forwards. The window of time tracked
+ * by the timeseries will advance forwards whenever a more recent timestamp is
+ * passed to addValue(). While it is possible to pass old time values to
+ * addValue(), this will never move the time window backwards. If the old time
+ * value falls outside the tracked window of time, the data point will be
+ * ignored.
*
* This class is not thread-safe -- use your own synchronization!
*/
/*
* Adds the value 'val' at time 'now'
*
- * This function expects time to always move forwards: it cannot be used to
- * add historical data points that have occurred in the past. If now is
- * older than the another timestamp that has already been passed to
- * addValue() or update(), now will be ignored and the latest timestamp will
- * be used.
+ * This function expects time to generally move forwards. The window of time
+ * tracked by this time series will move forwards with time. If 'now' is
+ * more recent than any time previously seen, addValue() will automatically
+ * call update(now) to advance the time window tracked by this data
+ * structure.
+ *
+ * Values in the recent past may be added to the data structure by passing in
+ * a slightly older value of 'now', as long as this time point still falls
+ * within the tracked duration. If 'now' is older than the tracked duration
+ * of time, the data point value will be ignored, and addValue() will return
+ * false without doing anything else.
+ *
+ * Returns true on success, or false if now was older than the tracked time
+ * window.
*/
- void addValue(TimeType now, const ValueType& val);
+ bool addValue(TimeType now, const ValueType& val);
/*
* Adds the value 'val' the given number of 'times' at time 'now'
*/
- void addValue(TimeType now, const ValueType& val, int64_t times);
+ bool addValue(TimeType now, const ValueType& val, int64_t times);
/*
* Adds the value 'sum' as the sum of 'nsamples' samples
*/
- void addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples);
+ bool addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples);
/*
* Updates the container to the specified time, doing all the necessary
elapsed);
}
+ TimeType getEarliestTimeNonEmpty() const;
+ size_t updateBuckets(TimeType now);
+
ValueType rangeAdjust(TimeType bucketStart, TimeType nextBucketStart,
TimeType start, TimeType end,
ValueType input) const;
EXPECT_EQ(1.0, b.countRate(seconds(0), kDuration * 10));
}
+TEST(BucketedTimeSeries, addHistorical) {
+ const int kNumBuckets = 5;
+ const seconds kDuration(10);
+ BucketedTimeSeries<double> b(kNumBuckets, kDuration);
+
+ // Initially fill with a constant rate of data
+ for (seconds i = seconds(0); i < seconds(10); ++i) {
+ b.addValue(i, 10.0);
+ }
+
+ EXPECT_EQ(10.0, b.rate());
+ EXPECT_EQ(10.0, b.avg());
+ EXPECT_EQ(10, b.count());
+
+ // Add some more data points to the middle bucket
+ b.addValue(seconds(4), 40.0);
+ b.addValue(seconds(5), 40.0);
+ EXPECT_EQ(15.0, b.avg());
+ EXPECT_EQ(18.0, b.rate());
+ EXPECT_EQ(12, b.count());
+
+ // Now start adding more current data points, until we are about to roll over
+ // the bucket where we added the extra historical data.
+ for (seconds i = seconds(10); i < seconds(14); ++i) {
+ b.addValue(i, 10.0);
+ }
+ EXPECT_EQ(15.0, b.avg());
+ EXPECT_EQ(18.0, b.rate());
+ EXPECT_EQ(12, b.count());
+
+ // Now roll over the middle bucket
+ b.addValue(seconds(14), 10.0);
+ b.addValue(seconds(15), 10.0);
+ EXPECT_EQ(10.0, b.avg());
+ EXPECT_EQ(10.0, b.rate());
+ EXPECT_EQ(10, b.count());
+
+ // Add more historical values past the bucket window.
+ // These should be ignored.
+ EXPECT_FALSE(b.addValue(seconds(4), 40.0));
+ EXPECT_FALSE(b.addValue(seconds(5), 40.0));
+ EXPECT_EQ(10.0, b.avg());
+ EXPECT_EQ(10.0, b.rate());
+ EXPECT_EQ(10, b.count());
+}
+
namespace IntMHTS {
enum Levels {
MINUTE,