From: Adam Simpkins Date: Wed, 14 May 2014 20:40:33 +0000 (-0700) Subject: make BucketedTimeSeries::addValue() honor old timestamps X-Git-Tag: v0.22.0~531 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=253dd087cc83e0b921b73d63477a238fb707f28b;p=folly.git make BucketedTimeSeries::addValue() honor old timestamps Summary: Previously BucketedTimeSeries()::addValue() documented that it required time to move forwards. If it was ever called with a timestamp older than the most recent one it had seen, it would just use latestTime_ as the time, and add the value to the most recent bucket. This changes addValue() so that it always uses the timestamp passed in by the caller. If this time value refers to an old bucket that is still being tracked, the data point will be added to that bucket. If the time value is older than the oldest currently tracked bucket, the data point will be ignored, and false will be returned. I did consider leaving the current addValue() behavior as-is, and requiring a separate addHistoricalValue() for when users intentionally want to try adding old data points. However, it seems nicer to build this into the existing addValue() function. The old behavior of just replacing the supplied time value seems potentially surprising to users. This does change the behavior of addValue(), and therefore could affect the behavior of some programs. However, up until now no-one should have been calling addValue() with old time values, as it wouldn't have done what they want anyway. I did a brief search through our code base, and all call sites I saw always called addValue() with the current time. (Most of the callers use wall clock time, so this change might affect program behavior if the system time changes after the program starts. We should ideally change our programs to use monotonic clocks instead.) Test Plan: Included a new unit test. Also compared the timeseries_benchmark results before and after this change. Overall this new logic seems to be faster. For the "all time" case, the new code is over 2x as fast. For the normal, non-all-time case the new code is around 5% faster. Reviewed By: hans@fb.com Subscribers: doug, folly@lists, net-systems@, exa FB internal diff: D1338466 --- diff --git a/folly/stats/BucketedTimeSeries-defs.h b/folly/stats/BucketedTimeSeries-defs.h index 3474258d..d37539a7 100644 --- a/folly/stats/BucketedTimeSeries-defs.h +++ b/folly/stats/BucketedTimeSeries-defs.h @@ -18,6 +18,7 @@ #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_ #include +#include "folly/Likely.h" namespace folly { @@ -43,39 +44,58 @@ BucketedTimeSeries::BucketedTimeSeries(size_t numBuckets, } template -void BucketedTimeSeries::addValue(TimeType now, const ValueType& val) { - addValueAggregated(now, val, 1); +bool BucketedTimeSeries::addValue(TimeType now, const ValueType& val) { + return addValueAggregated(now, val, 1); } template -void BucketedTimeSeries::addValue(TimeType now, +bool BucketedTimeSeries::addValue(TimeType now, const ValueType& val, int64_t times) { - addValueAggregated(now, val * times, times); + return addValueAggregated(now, val * times, times); } template -void BucketedTimeSeries::addValueAggregated(TimeType now, +bool BucketedTimeSeries::addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples) { - // Make sure time doesn't go backwards - now = std::max(now, latestTime_); - if (isAllTime()) { - if (empty()) { + if (UNLIKELY(empty())) { + firstTime_ = now; + latestTime_ = now; + } else if (now > latestTime_) { + latestTime_ = now; + } else if (now < firstTime_) { firstTime_ = now; } - latestTime_ = now; total_.add(sum, nsamples); - return; + return true; } - // Update the buckets - size_t curBucket = update(now); - buckets_[curBucket].add(sum, nsamples); + size_t bucketIdx; + if (UNLIKELY(empty())) { + // First data point we've ever seen + firstTime_ = now; + latestTime_ = now; + bucketIdx = getBucketIdx(now); + } else if (now > latestTime_) { + // More recent time. Need to update the buckets. + bucketIdx = updateBuckets(now); + } else if (LIKELY(now == latestTime_)) { + // Current time. + bucketIdx = getBucketIdx(now); + } else { + // An earlier time in the past. We need to check if this time still falls + // within our window. + if (now < getEarliestTimeNonEmpty()) { + return false; + } + bucketIdx = getBucketIdx(now); + } - // Update the aggregate sum/count total_.add(sum, nsamples); + buckets_[bucketIdx].add(sum, nsamples); + return true; } template @@ -98,6 +118,11 @@ size_t BucketedTimeSeries::update(TimeType now) { return getBucketIdx(latestTime_); } + return updateBuckets(now); +} + +template +size_t BucketedTimeSeries::updateBuckets(TimeType now) { // We could cache nextBucketStart as a member variable, so we don't have to // recompute it each time update() is called with a new timestamp value. // This makes things faster when update() (or addValue()) is called once @@ -169,6 +194,17 @@ TT BucketedTimeSeries::getEarliestTime() const { return firstTime_; } + // Compute the earliest time we can track + TimeType earliestTime = getEarliestTimeNonEmpty(); + + // We're never tracking data before firstTime_ + earliestTime = std::max(earliestTime, firstTime_); + + return earliestTime; +} + +template +TT BucketedTimeSeries::getEarliestTimeNonEmpty() const { size_t currentBucket; TimeType currentBucketStart; TimeType nextBucketStart; @@ -177,12 +213,7 @@ TT BucketedTimeSeries::getEarliestTime() const { // Subtract 1 duration from the start of the next bucket to find the // earliest possible data point we could be tracking. - TimeType earliestTime = nextBucketStart - duration_; - - // We're never tracking data before firstTime_ - earliestTime = std::max(earliestTime, firstTime_); - - return earliestTime; + return nextBucketStart - duration_; } template diff --git a/folly/stats/BucketedTimeSeries.h b/folly/stats/BucketedTimeSeries.h index c6c4ad85..8336c7d9 100644 --- a/folly/stats/BucketedTimeSeries.h +++ b/folly/stats/BucketedTimeSeries.h @@ -36,10 +36,12 @@ namespace folly { * be discarded and new data will go into the newly opened bucket. Internally, * it uses a circular array of buckets that it reuses as time advances. * - * The class assumes that time advances forward -- you can't retroactively add - * values for events in the past -- the 'now' argument is provided for better - * efficiency and ease of unittesting. - * + * This class assumes that time advances forwards. The window of time tracked + * by the timeseries will advance forwards whenever a more recent timestamp is + * passed to addValue(). While it is possible to pass old time values to + * addValue(), this will never move the time window backwards. If the old time + * value falls outside the tracked window of time, the data point will be + * ignored. * * This class is not thread-safe -- use your own synchronization! */ @@ -65,23 +67,32 @@ class BucketedTimeSeries { /* * Adds the value 'val' at time 'now' * - * This function expects time to always move forwards: it cannot be used to - * add historical data points that have occurred in the past. If now is - * older than the another timestamp that has already been passed to - * addValue() or update(), now will be ignored and the latest timestamp will - * be used. + * This function expects time to generally move forwards. The window of time + * tracked by this time series will move forwards with time. If 'now' is + * more recent than any time previously seen, addValue() will automatically + * call update(now) to advance the time window tracked by this data + * structure. + * + * Values in the recent past may be added to the data structure by passing in + * a slightly older value of 'now', as long as this time point still falls + * within the tracked duration. If 'now' is older than the tracked duration + * of time, the data point value will be ignored, and addValue() will return + * false without doing anything else. + * + * Returns true on success, or false if now was older than the tracked time + * window. */ - void addValue(TimeType now, const ValueType& val); + bool addValue(TimeType now, const ValueType& val); /* * Adds the value 'val' the given number of 'times' at time 'now' */ - void addValue(TimeType now, const ValueType& val, int64_t times); + bool addValue(TimeType now, const ValueType& val, int64_t times); /* * Adds the value 'sum' as the sum of 'nsamples' samples */ - void addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples); + bool addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples); /* * Updates the container to the specified time, doing all the necessary @@ -376,6 +387,9 @@ class BucketedTimeSeries { elapsed); } + TimeType getEarliestTimeNonEmpty() const; + size_t updateBuckets(TimeType now); + ValueType rangeAdjust(TimeType bucketStart, TimeType nextBucketStart, TimeType start, TimeType end, ValueType input) const; diff --git a/folly/test/TimeseriesTest.cpp b/folly/test/TimeseriesTest.cpp index 638330d0..0d97f275 100644 --- a/folly/test/TimeseriesTest.cpp +++ b/folly/test/TimeseriesTest.cpp @@ -714,6 +714,52 @@ TEST(BucketedTimeSeries, rateByInterval) { EXPECT_EQ(1.0, b.countRate(seconds(0), kDuration * 10)); } +TEST(BucketedTimeSeries, addHistorical) { + const int kNumBuckets = 5; + const seconds kDuration(10); + BucketedTimeSeries b(kNumBuckets, kDuration); + + // Initially fill with a constant rate of data + for (seconds i = seconds(0); i < seconds(10); ++i) { + b.addValue(i, 10.0); + } + + EXPECT_EQ(10.0, b.rate()); + EXPECT_EQ(10.0, b.avg()); + EXPECT_EQ(10, b.count()); + + // Add some more data points to the middle bucket + b.addValue(seconds(4), 40.0); + b.addValue(seconds(5), 40.0); + EXPECT_EQ(15.0, b.avg()); + EXPECT_EQ(18.0, b.rate()); + EXPECT_EQ(12, b.count()); + + // Now start adding more current data points, until we are about to roll over + // the bucket where we added the extra historical data. + for (seconds i = seconds(10); i < seconds(14); ++i) { + b.addValue(i, 10.0); + } + EXPECT_EQ(15.0, b.avg()); + EXPECT_EQ(18.0, b.rate()); + EXPECT_EQ(12, b.count()); + + // Now roll over the middle bucket + b.addValue(seconds(14), 10.0); + b.addValue(seconds(15), 10.0); + EXPECT_EQ(10.0, b.avg()); + EXPECT_EQ(10.0, b.rate()); + EXPECT_EQ(10, b.count()); + + // Add more historical values past the bucket window. + // These should be ignored. + EXPECT_FALSE(b.addValue(seconds(4), 40.0)); + EXPECT_FALSE(b.addValue(seconds(5), 40.0)); + EXPECT_EQ(10.0, b.avg()); + EXPECT_EQ(10.0, b.rate()); + EXPECT_EQ(10, b.count()); +} + namespace IntMHTS { enum Levels { MINUTE,