2 * Copyright 2012 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
18 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
20 #include <glog/logging.h>
24 template <typename VT, typename TT>
25 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
30 // For tracking all-time data we only use total_, and don't need to bother
33 // Round numBuckets down to duration_.count().
35 // There is no point in having more buckets than our timestamp
36 // granularity: otherwise we would have buckets that could never be used.
37 if (numBuckets > duration_.count()) {
38 numBuckets = duration_.count();
41 buckets_.resize(numBuckets, Bucket());
45 template <typename VT, typename TT>
46 void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
47 addValueAggregated(now, val, 1);
50 template <typename VT, typename TT>
51 void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
54 addValueAggregated(now, val * times, times);
57 template <typename VT, typename TT>
58 void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
61 // Make sure time doesn't go backwards
62 now = std::max(now, latestTime_);
69 total_.add(sum, nsamples);
74 size_t curBucket = update(now);
75 buckets_[curBucket].add(sum, nsamples);
77 // Update the aggregate sum/count
78 total_.add(sum, nsamples);
81 template <typename VT, typename TT>
82 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
84 // This is the first data point.
88 // For all-time data, all we need to do is update latestTime_
90 latestTime_ = std::max(latestTime_, now);
94 // Make sure time doesn't go backwards.
95 // If the time is less than or equal to the latest time we have already seen,
96 // we don't need to do anything.
97 if (now <= latestTime_) {
98 return getBucketIdx(latestTime_);
101 // We could cache nextBucketStart as a member variable, so we don't have to
102 // recompute it each time update() is called with a new timestamp value.
103 // This makes things faster when update() (or addValue()) is called once
104 // per second, but slightly slower when update() is called multiple times a
105 // second. We care more about optimizing the cases where addValue() is being
106 // called frequently. If addValue() is only being called once every few
107 // seconds, it doesn't matter as much if it is fast.
109 // Get info about the bucket that latestTime_ points at
110 size_t currentBucket;
111 TimeType currentBucketStart;
112 TimeType nextBucketStart;
113 getBucketInfo(latestTime_, ¤tBucket,
114 ¤tBucketStart, &nextBucketStart);
116 // Update latestTime_
119 if (now < nextBucketStart) {
120 // We're still in the same bucket.
121 // We're done after updating latestTime_.
122 return currentBucket;
123 } else if (now >= currentBucketStart + duration_) {
124 // It's been a while. We have wrapped, and all of the buckets need to be
126 for (Bucket& bucket : buckets_) {
130 return getBucketIdx(latestTime_);
132 // clear all the buckets between the last time and current time, meaning
133 // buckets in the range [(currentBucket+1), newBucket]. Note that
134 // the bucket (currentBucket+1) is always the oldest bucket we have. Since
135 // our array is circular, loop when we reach the end.
136 size_t newBucket = getBucketIdx(now);
137 size_t idx = currentBucket;
138 while (idx != newBucket) {
140 if (idx >= buckets_.size()) {
143 total_ -= buckets_[idx];
144 buckets_[idx].clear();
150 template <typename VT, typename TT>
151 void BucketedTimeSeries<VT, TT>::clear() {
152 for (Bucket& bucket : buckets_) {
156 // Set firstTime_ larger than latestTime_,
157 // to indicate that the timeseries is empty
158 firstTime_ = TimeType(1);
159 latestTime_ = TimeType(0);
163 template <typename VT, typename TT>
164 TT BucketedTimeSeries<VT, TT>::elapsed() const {
170 return latestTime_ - firstTime_ + TimeType(1);
173 size_t currentBucket;
174 TimeType currentBucketStart;
175 TimeType nextBucketStart;
176 getBucketInfo(latestTime_, ¤tBucket,
177 ¤tBucketStart, &nextBucketStart);
179 // Subtract 1 duration from the start of the next bucket to find the
180 // earliest possible data point we could be tracking.
181 TimeType earliestTime = nextBucketStart - duration_;
183 // We're never tracking data before firstTime_
184 earliestTime = std::max(earliestTime, firstTime_);
186 return latestTime_ - earliestTime + TimeType(1);
189 template <typename VT, typename TT>
190 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
191 ValueType sum = ValueType();
192 forEachBucket(start, end, [&](const Bucket& bucket,
193 TimeType bucketStart,
194 TimeType nextBucketStart) -> bool {
195 sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
203 template <typename VT, typename TT>
204 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
206 forEachBucket(start, end, [&](const Bucket& bucket,
207 TimeType bucketStart,
208 TimeType nextBucketStart) -> bool {
209 count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
217 template <typename VT, typename TT>
218 template <typename ReturnType>
219 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
220 ValueType sum = ValueType();
222 forEachBucket(start, end, [&](const Bucket& bucket,
223 TimeType bucketStart,
224 TimeType nextBucketStart) -> bool {
225 sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
227 count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
233 return ReturnType(0);
236 return detail::avgHelper<ReturnType>(sum, count);
240 * A note about some of the bucket index calculations below:
242 * buckets_.size() may not divide evenly into duration_. When this happens,
243 * some buckets will be wider than others. We still want to spread the data
244 * out as evenly as possible among the buckets (as opposed to just making the
245 * last bucket be significantly wider than all of the others).
247 * To make the division work out, we pretend that the buckets are each
248 * duration_ wide, so that the overall duration becomes
249 * buckets.size() * duration_.
251 * To transform a real timestamp into the scale used by our buckets,
252 * we have to multiply by buckets_.size(). To figure out which bucket it goes
253 * into, we then divide by duration_.
256 template <typename VT, typename TT>
257 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
258 // For all-time data we don't use buckets_. Everything is tracked in total_.
259 DCHECK(!isAllTime());
262 return time.count() * buckets_.size() / duration_.count();
266 * Compute the bucket index for the specified time, as well as the earliest
267 * time that falls into this bucket.
269 template <typename VT, typename TT>
270 void BucketedTimeSeries<VT, TT>::getBucketInfo(
271 TimeType time, size_t *bucketIdx,
272 TimeType* bucketStart, TimeType* nextBucketStart) const {
273 typedef typename TimeType::rep TimeInt;
274 DCHECK(!isAllTime());
276 // Keep these two lines together. The compiler should be able to compute
277 // both the division and modulus with a single operation.
278 TimeType timeMod = time % duration_;
279 TimeInt numFullDurations = time / duration_;
281 TimeInt scaledTime = timeMod.count() * buckets_.size();
283 // Keep these two lines together. The compiler should be able to compute
284 // both the division and modulus with a single operation.
285 *bucketIdx = scaledTime / duration_.count();
286 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
288 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
289 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
291 TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
293 TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
296 TimeType durationStart(numFullDurations * duration_.count());
297 *bucketStart = bucketStartMod + durationStart;
298 *nextBucketStart = nextBucketStartMod + durationStart;
301 template <typename VT, typename TT>
302 template <typename Function>
303 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
305 fn(total_, firstTime_, latestTime_ + TimeType(1));
309 typedef typename TimeType::rep TimeInt;
311 // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
312 // the same way as in getBucketInfo().
313 TimeType timeMod = latestTime_ % duration_;
314 TimeInt numFullDurations = latestTime_ / duration_;
315 TimeType durationStart(numFullDurations * duration_.count());
316 TimeInt scaledTime = timeMod.count() * buckets_.size();
317 size_t latestBucketIdx = scaledTime / duration_.count();
318 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
319 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
320 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
322 // Walk through the buckets, starting one past the current bucket.
323 // The next bucket is from the previous cycle, so subtract 1 duration
324 // from durationStart.
325 size_t idx = latestBucketIdx;
326 durationStart -= duration_;
328 TimeType nextBucketStart =
329 TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
333 if (idx >= buckets_.size()) {
335 durationStart += duration_;
336 scaledNextBucketStart = duration_.count();
338 scaledNextBucketStart += duration_.count();
341 TimeType bucketStart = nextBucketStart;
342 nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
343 buckets_.size()) + durationStart;
345 // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
346 // For now we go ahead and invoke the function with these buckets.
347 // sum and count should always be 0 in these buckets.
349 DCHECK_LE(bucketStart.count(), latestTime_.count());
350 bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
355 if (idx == latestBucketIdx) {
363 * Adjust the input value from the specified bucket to only account
364 * for the desired range.
366 * For example, if the bucket spans time [10, 20), but we only care about the
367 * range [10, 16), this will return 60% of the input value.
369 template<typename VT, typename TT>
370 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
371 TimeType bucketStart, TimeType nextBucketStart,
372 TimeType start, TimeType end, ValueType input) const {
373 // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
374 // if it were latestTime_. This makes us more accurate when someone is
375 // querying for all of the data up to latestTime_. Even though latestTime_
376 // may only be partially through the bucket, we don't want to adjust
377 // downwards in this case, because the bucket really only has data up to
379 if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
380 nextBucketStart = latestTime_ + TimeType(1);
383 if (start <= bucketStart && end >= nextBucketStart) {
384 // The bucket is wholly contained in the [start, end) interval
388 TimeType intervalStart = std::max(start, bucketStart);
389 TimeType intervalEnd = std::min(end, nextBucketStart);
390 return input * (intervalEnd - intervalStart) /
391 (nextBucketStart - bucketStart);
394 template <typename VT, typename TT>
395 template <typename Function>
396 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
398 forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
399 TimeType nextBucketStart) -> bool {
400 if (start >= nextBucketStart) {
403 if (end <= bucketStart) {
406 bool ret = fn(bucket, bucketStart, nextBucketStart);
413 #endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_