2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
18 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
20 #include <glog/logging.h>
21 #include <folly/Likely.h>
25 template <typename VT, typename TT>
26 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
31 // For tracking all-time data we only use total_, and don't need to bother
34 // Round numBuckets down to duration_.count().
36 // There is no point in having more buckets than our timestamp
37 // granularity: otherwise we would have buckets that could never be used.
38 if (numBuckets > duration_.count()) {
39 numBuckets = duration_.count();
42 buckets_.resize(numBuckets, Bucket());
46 template <typename VT, typename TT>
47 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
48 return addValueAggregated(now, val, 1);
51 template <typename VT, typename TT>
52 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now,
55 return addValueAggregated(now, val * times, times);
58 template <typename VT, typename TT>
59 bool BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
63 if (UNLIKELY(empty())) {
66 } else if (now > latestTime_) {
68 } else if (now < firstTime_) {
71 total_.add(sum, nsamples);
76 if (UNLIKELY(empty())) {
77 // First data point we've ever seen
80 bucketIdx = getBucketIdx(now);
81 } else if (now > latestTime_) {
82 // More recent time. Need to update the buckets.
83 bucketIdx = updateBuckets(now);
84 } else if (LIKELY(now == latestTime_)) {
86 bucketIdx = getBucketIdx(now);
88 // An earlier time in the past. We need to check if this time still falls
90 if (now < getEarliestTimeNonEmpty()) {
93 bucketIdx = getBucketIdx(now);
96 total_.add(sum, nsamples);
97 buckets_[bucketIdx].add(sum, nsamples);
101 template <typename VT, typename TT>
102 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
104 // This is the first data point.
108 // For all-time data, all we need to do is update latestTime_
110 latestTime_ = std::max(latestTime_, now);
114 // Make sure time doesn't go backwards.
115 // If the time is less than or equal to the latest time we have already seen,
116 // we don't need to do anything.
117 if (now <= latestTime_) {
118 return getBucketIdx(latestTime_);
121 return updateBuckets(now);
124 template <typename VT, typename TT>
125 size_t BucketedTimeSeries<VT, TT>::updateBuckets(TimeType now) {
126 // We could cache nextBucketStart as a member variable, so we don't have to
127 // recompute it each time update() is called with a new timestamp value.
128 // This makes things faster when update() (or addValue()) is called once
129 // per second, but slightly slower when update() is called multiple times a
130 // second. We care more about optimizing the cases where addValue() is being
131 // called frequently. If addValue() is only being called once every few
132 // seconds, it doesn't matter as much if it is fast.
134 // Get info about the bucket that latestTime_ points at
135 size_t currentBucket;
136 TimeType currentBucketStart;
137 TimeType nextBucketStart;
138 getBucketInfo(latestTime_, ¤tBucket,
139 ¤tBucketStart, &nextBucketStart);
141 // Update latestTime_
144 if (now < nextBucketStart) {
145 // We're still in the same bucket.
146 // We're done after updating latestTime_.
147 return currentBucket;
148 } else if (now >= currentBucketStart + duration_) {
149 // It's been a while. We have wrapped, and all of the buckets need to be
151 for (Bucket& bucket : buckets_) {
155 return getBucketIdx(latestTime_);
157 // clear all the buckets between the last time and current time, meaning
158 // buckets in the range [(currentBucket+1), newBucket]. Note that
159 // the bucket (currentBucket+1) is always the oldest bucket we have. Since
160 // our array is circular, loop when we reach the end.
161 size_t newBucket = getBucketIdx(now);
162 size_t idx = currentBucket;
163 while (idx != newBucket) {
165 if (idx >= buckets_.size()) {
168 total_ -= buckets_[idx];
169 buckets_[idx].clear();
175 template <typename VT, typename TT>
176 void BucketedTimeSeries<VT, TT>::clear() {
177 for (Bucket& bucket : buckets_) {
181 // Set firstTime_ larger than latestTime_,
182 // to indicate that the timeseries is empty
183 firstTime_ = TimeType(1);
184 latestTime_ = TimeType(0);
188 template <typename VT, typename TT>
189 TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
197 // Compute the earliest time we can track
198 TimeType earliestTime = getEarliestTimeNonEmpty();
200 // We're never tracking data before firstTime_
201 earliestTime = std::max(earliestTime, firstTime_);
206 template <typename VT, typename TT>
207 TT BucketedTimeSeries<VT, TT>::getEarliestTimeNonEmpty() const {
208 size_t currentBucket;
209 TimeType currentBucketStart;
210 TimeType nextBucketStart;
211 getBucketInfo(latestTime_, ¤tBucket,
212 ¤tBucketStart, &nextBucketStart);
214 // Subtract 1 duration from the start of the next bucket to find the
215 // earliest possible data point we could be tracking.
216 return nextBucketStart - duration_;
219 template <typename VT, typename TT>
220 TT BucketedTimeSeries<VT, TT>::elapsed() const {
225 // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
226 return latestTime_ - getEarliestTime() + TimeType(1);
229 template <typename VT, typename TT>
230 TT BucketedTimeSeries<VT, TT>::elapsed(TimeType start, TimeType end) const {
234 start = std::max(start, getEarliestTime());
235 end = std::min(end, latestTime_ + TimeType(1));
236 end = std::max(start, end);
240 template <typename VT, typename TT>
241 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
242 ValueType sum = ValueType();
243 forEachBucket(start, end, [&](const Bucket& bucket,
244 TimeType bucketStart,
245 TimeType nextBucketStart) -> bool {
246 sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
254 template <typename VT, typename TT>
255 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
257 forEachBucket(start, end, [&](const Bucket& bucket,
258 TimeType bucketStart,
259 TimeType nextBucketStart) -> bool {
260 count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
268 template <typename VT, typename TT>
269 template <typename ReturnType>
270 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
271 ValueType sum = ValueType();
273 forEachBucket(start, end, [&](const Bucket& bucket,
274 TimeType bucketStart,
275 TimeType nextBucketStart) -> bool {
276 sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
278 count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
284 return ReturnType(0);
287 return detail::avgHelper<ReturnType>(sum, count);
291 * A note about some of the bucket index calculations below:
293 * buckets_.size() may not divide evenly into duration_. When this happens,
294 * some buckets will be wider than others. We still want to spread the data
295 * out as evenly as possible among the buckets (as opposed to just making the
296 * last bucket be significantly wider than all of the others).
298 * To make the division work out, we pretend that the buckets are each
299 * duration_ wide, so that the overall duration becomes
300 * buckets.size() * duration_.
302 * To transform a real timestamp into the scale used by our buckets,
303 * we have to multiply by buckets_.size(). To figure out which bucket it goes
304 * into, we then divide by duration_.
307 template <typename VT, typename TT>
308 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
309 // For all-time data we don't use buckets_. Everything is tracked in total_.
310 DCHECK(!isAllTime());
313 return time.count() * buckets_.size() / duration_.count();
317 * Compute the bucket index for the specified time, as well as the earliest
318 * time that falls into this bucket.
320 template <typename VT, typename TT>
321 void BucketedTimeSeries<VT, TT>::getBucketInfo(
322 TimeType time, size_t *bucketIdx,
323 TimeType* bucketStart, TimeType* nextBucketStart) const {
324 typedef typename TimeType::rep TimeInt;
325 DCHECK(!isAllTime());
327 // Keep these two lines together. The compiler should be able to compute
328 // both the division and modulus with a single operation.
329 TimeType timeMod = time % duration_;
330 TimeInt numFullDurations = time / duration_;
332 TimeInt scaledTime = timeMod.count() * buckets_.size();
334 // Keep these two lines together. The compiler should be able to compute
335 // both the division and modulus with a single operation.
336 *bucketIdx = scaledTime / duration_.count();
337 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
339 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
340 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
342 TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
344 TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
347 TimeType durationStart(numFullDurations * duration_.count());
348 *bucketStart = bucketStartMod + durationStart;
349 *nextBucketStart = nextBucketStartMod + durationStart;
352 template <typename VT, typename TT>
353 template <typename Function>
354 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
356 fn(total_, firstTime_, latestTime_ + TimeType(1));
360 typedef typename TimeType::rep TimeInt;
362 // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
363 // the same way as in getBucketInfo().
364 TimeType timeMod = latestTime_ % duration_;
365 TimeInt numFullDurations = latestTime_ / duration_;
366 TimeType durationStart(numFullDurations * duration_.count());
367 TimeInt scaledTime = timeMod.count() * buckets_.size();
368 size_t latestBucketIdx = scaledTime / duration_.count();
369 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
370 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
371 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
373 // Walk through the buckets, starting one past the current bucket.
374 // The next bucket is from the previous cycle, so subtract 1 duration
375 // from durationStart.
376 size_t idx = latestBucketIdx;
377 durationStart -= duration_;
379 TimeType nextBucketStart =
380 TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
384 if (idx >= buckets_.size()) {
386 durationStart += duration_;
387 scaledNextBucketStart = duration_.count();
389 scaledNextBucketStart += duration_.count();
392 TimeType bucketStart = nextBucketStart;
393 nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
394 buckets_.size()) + durationStart;
396 // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
397 // For now we go ahead and invoke the function with these buckets.
398 // sum and count should always be 0 in these buckets.
400 DCHECK_LE(bucketStart.count(), latestTime_.count());
401 bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
406 if (idx == latestBucketIdx) {
414 * Adjust the input value from the specified bucket to only account
415 * for the desired range.
417 * For example, if the bucket spans time [10, 20), but we only care about the
418 * range [10, 16), this will return 60% of the input value.
420 template<typename VT, typename TT>
421 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
422 TimeType bucketStart, TimeType nextBucketStart,
423 TimeType start, TimeType end, ValueType input) const {
424 // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
425 // if it were latestTime_. This makes us more accurate when someone is
426 // querying for all of the data up to latestTime_. Even though latestTime_
427 // may only be partially through the bucket, we don't want to adjust
428 // downwards in this case, because the bucket really only has data up to
430 if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
431 nextBucketStart = latestTime_ + TimeType(1);
434 if (start <= bucketStart && end >= nextBucketStart) {
435 // The bucket is wholly contained in the [start, end) interval
439 TimeType intervalStart = std::max(start, bucketStart);
440 TimeType intervalEnd = std::min(end, nextBucketStart);
441 return input * (intervalEnd - intervalStart) /
442 (nextBucketStart - bucketStart);
445 template <typename VT, typename TT>
446 template <typename Function>
447 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
449 forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
450 TimeType nextBucketStart) -> bool {
451 if (start >= nextBucketStart) {
454 if (end <= bucketStart) {
457 bool ret = fn(bucket, bucketStart, nextBucketStart);
464 #endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_