2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
18 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
21 #include <glog/logging.h>
22 #include <folly/Likely.h>
26 template <typename VT, typename TT>
27 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t nBuckets,
31 duration_(maxDuration) {
32 // For tracking all-time data we only use total_, and don't need to bother
35 // Round nBuckets down to duration_.count().
37 // There is no point in having more buckets than our timestamp
38 // granularity: otherwise we would have buckets that could never be used.
39 if (nBuckets > size_t(duration_.count())) {
40 nBuckets = duration_.count();
43 buckets_.resize(nBuckets, Bucket());
47 template <typename VT, typename TT>
48 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
49 return addValueAggregated(now, val, 1);
52 template <typename VT, typename TT>
53 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now,
56 return addValueAggregated(now, val * times, times);
59 template <typename VT, typename TT>
60 bool BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
61 const ValueType& total,
64 if (UNLIKELY(empty())) {
67 } else if (now > latestTime_) {
69 } else if (now < firstTime_) {
72 total_.add(total, nsamples);
77 if (UNLIKELY(empty())) {
78 // First data point we've ever seen
81 bucketIdx = getBucketIdx(now);
82 } else if (now > latestTime_) {
83 // More recent time. Need to update the buckets.
84 bucketIdx = updateBuckets(now);
85 } else if (LIKELY(now == latestTime_)) {
87 bucketIdx = getBucketIdx(now);
89 // An earlier time in the past. We need to check if this time still falls
91 if (now < getEarliestTimeNonEmpty()) {
94 bucketIdx = getBucketIdx(now);
97 total_.add(total, nsamples);
98 buckets_[bucketIdx].add(total, nsamples);
102 template <typename VT, typename TT>
103 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
105 // This is the first data point.
109 // For all-time data, all we need to do is update latestTime_
111 latestTime_ = std::max(latestTime_, now);
115 // Make sure time doesn't go backwards.
116 // If the time is less than or equal to the latest time we have already seen,
117 // we don't need to do anything.
118 if (now <= latestTime_) {
119 return getBucketIdx(latestTime_);
122 return updateBuckets(now);
125 template <typename VT, typename TT>
126 size_t BucketedTimeSeries<VT, TT>::updateBuckets(TimeType now) {
127 // We could cache nextBucketStart as a member variable, so we don't have to
128 // recompute it each time update() is called with a new timestamp value.
129 // This makes things faster when update() (or addValue()) is called once
130 // per second, but slightly slower when update() is called multiple times a
131 // second. We care more about optimizing the cases where addValue() is being
132 // called frequently. If addValue() is only being called once every few
133 // seconds, it doesn't matter as much if it is fast.
135 // Get info about the bucket that latestTime_ points at
136 size_t currentBucket;
137 TimeType currentBucketStart;
138 TimeType nextBucketStart;
139 getBucketInfo(latestTime_, ¤tBucket,
140 ¤tBucketStart, &nextBucketStart);
142 // Update latestTime_
145 if (now < nextBucketStart) {
146 // We're still in the same bucket.
147 // We're done after updating latestTime_.
148 return currentBucket;
149 } else if (now >= currentBucketStart + duration_) {
150 // It's been a while. We have wrapped, and all of the buckets need to be
152 for (Bucket& bucket : buckets_) {
156 return getBucketIdx(latestTime_);
158 // clear all the buckets between the last time and current time, meaning
159 // buckets in the range [(currentBucket+1), newBucket]. Note that
160 // the bucket (currentBucket+1) is always the oldest bucket we have. Since
161 // our array is circular, loop when we reach the end.
162 size_t newBucket = getBucketIdx(now);
163 size_t idx = currentBucket;
164 while (idx != newBucket) {
166 if (idx >= buckets_.size()) {
169 total_ -= buckets_[idx];
170 buckets_[idx].clear();
176 template <typename VT, typename TT>
177 void BucketedTimeSeries<VT, TT>::clear() {
178 for (Bucket& bucket : buckets_) {
182 // Set firstTime_ larger than latestTime_,
183 // to indicate that the timeseries is empty
184 firstTime_ = TimeType(1);
185 latestTime_ = TimeType(0);
189 template <typename VT, typename TT>
190 TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
198 // Compute the earliest time we can track
199 TimeType earliestTime = getEarliestTimeNonEmpty();
201 // We're never tracking data before firstTime_
202 earliestTime = std::max(earliestTime, firstTime_);
207 template <typename VT, typename TT>
208 TT BucketedTimeSeries<VT, TT>::getEarliestTimeNonEmpty() const {
209 size_t currentBucket;
210 TimeType currentBucketStart;
211 TimeType nextBucketStart;
212 getBucketInfo(latestTime_, ¤tBucket,
213 ¤tBucketStart, &nextBucketStart);
215 // Subtract 1 duration from the start of the next bucket to find the
216 // earliest possible data point we could be tracking.
217 return nextBucketStart - duration_;
220 template <typename VT, typename TT>
221 TT BucketedTimeSeries<VT, TT>::elapsed() const {
226 // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
227 return latestTime_ - getEarliestTime() + TimeType(1);
230 template <typename VT, typename TT>
231 TT BucketedTimeSeries<VT, TT>::elapsed(TimeType start, TimeType end) const {
235 start = std::max(start, getEarliestTime());
236 end = std::min(end, latestTime_ + TimeType(1));
237 end = std::max(start, end);
241 template <typename VT, typename TT>
242 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
243 ValueType total = ValueType();
244 forEachBucket(start, end, [&](const Bucket& bucket,
245 TimeType bucketStart,
246 TimeType nextBucketStart) -> bool {
247 total += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
255 template <typename VT, typename TT>
256 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
257 uint64_t sample_count = 0;
258 forEachBucket(start, end, [&](const Bucket& bucket,
259 TimeType bucketStart,
260 TimeType nextBucketStart) -> bool {
261 sample_count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
269 template <typename VT, typename TT>
270 template <typename ReturnType>
271 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
272 ValueType total = ValueType();
273 uint64_t sample_count = 0;
274 forEachBucket(start, end, [&](const Bucket& bucket,
275 TimeType bucketStart,
276 TimeType nextBucketStart) -> bool {
277 total += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
279 sample_count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
284 if (sample_count == 0) {
285 return ReturnType(0);
288 return detail::avgHelper<ReturnType>(total, sample_count);
292 * A note about some of the bucket index calculations below:
294 * buckets_.size() may not divide evenly into duration_. When this happens,
295 * some buckets will be wider than others. We still want to spread the data
296 * out as evenly as possible among the buckets (as opposed to just making the
297 * last bucket be significantly wider than all of the others).
299 * To make the division work out, we pretend that the buckets are each
300 * duration_ wide, so that the overall duration becomes
301 * buckets.size() * duration_.
303 * To transform a real timestamp into the scale used by our buckets,
304 * we have to multiply by buckets_.size(). To figure out which bucket it goes
305 * into, we then divide by duration_.
308 template <typename VT, typename TT>
309 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
310 // For all-time data we don't use buckets_. Everything is tracked in total_.
311 DCHECK(!isAllTime());
314 return time.count() * buckets_.size() / duration_.count();
318 * Compute the bucket index for the specified time, as well as the earliest
319 * time that falls into this bucket.
321 template <typename VT, typename TT>
322 void BucketedTimeSeries<VT, TT>::getBucketInfo(
323 TimeType time, size_t *bucketIdx,
324 TimeType* bucketStart, TimeType* nextBucketStart) const {
325 typedef typename TimeType::rep TimeInt;
326 DCHECK(!isAllTime());
328 // Keep these two lines together. The compiler should be able to compute
329 // both the division and modulus with a single operation.
330 TimeType timeMod = time % duration_;
331 TimeInt numFullDurations = time / duration_;
333 TimeInt scaledTime = timeMod.count() * buckets_.size();
335 // Keep these two lines together. The compiler should be able to compute
336 // both the division and modulus with a single operation.
337 *bucketIdx = scaledTime / duration_.count();
338 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
340 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
341 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
343 TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
345 TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
348 TimeType durationStart(numFullDurations * duration_.count());
349 *bucketStart = bucketStartMod + durationStart;
350 *nextBucketStart = nextBucketStartMod + durationStart;
353 template <typename VT, typename TT>
354 template <typename Function>
355 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
357 fn(total_, firstTime_, latestTime_ + TimeType(1));
361 typedef typename TimeType::rep TimeInt;
363 // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
364 // the same way as in getBucketInfo().
365 TimeType timeMod = latestTime_ % duration_;
366 TimeInt numFullDurations = latestTime_ / duration_;
367 TimeType durationStart(numFullDurations * duration_.count());
368 TimeInt scaledTime = timeMod.count() * buckets_.size();
369 size_t latestBucketIdx = scaledTime / duration_.count();
370 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
371 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
372 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
374 // Walk through the buckets, starting one past the current bucket.
375 // The next bucket is from the previous cycle, so subtract 1 duration
376 // from durationStart.
377 size_t idx = latestBucketIdx;
378 durationStart -= duration_;
380 TimeType nextBucketStart =
381 TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
385 if (idx >= buckets_.size()) {
387 durationStart += duration_;
388 scaledNextBucketStart = duration_.count();
390 scaledNextBucketStart += duration_.count();
393 TimeType bucketStart = nextBucketStart;
394 nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
395 buckets_.size()) + durationStart;
397 // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
398 // For now we go ahead and invoke the function with these buckets.
399 // sum and count should always be 0 in these buckets.
401 DCHECK_LE(bucketStart.count(), latestTime_.count());
402 bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
407 if (idx == latestBucketIdx) {
415 * Adjust the input value from the specified bucket to only account
416 * for the desired range.
418 * For example, if the bucket spans time [10, 20), but we only care about the
419 * range [10, 16), this will return 60% of the input value.
421 template<typename VT, typename TT>
422 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
423 TimeType bucketStart, TimeType nextBucketStart,
424 TimeType start, TimeType end, ValueType input) const {
425 // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
426 // if it were latestTime_. This makes us more accurate when someone is
427 // querying for all of the data up to latestTime_. Even though latestTime_
428 // may only be partially through the bucket, we don't want to adjust
429 // downwards in this case, because the bucket really only has data up to
431 if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
432 nextBucketStart = latestTime_ + TimeType(1);
435 if (start <= bucketStart && end >= nextBucketStart) {
436 // The bucket is wholly contained in the [start, end) interval
440 TimeType intervalStart = std::max(start, bucketStart);
441 TimeType intervalEnd = std::min(end, nextBucketStart);
442 return input * (intervalEnd - intervalStart) /
443 (nextBucketStart - bucketStart);
446 template <typename VT, typename TT>
447 template <typename Function>
448 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
450 forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
451 TimeType nextBucketStart) -> bool {
452 if (start >= nextBucketStart) {
455 if (end <= bucketStart) {
458 bool ret = fn(bucket, bucketStart, nextBucketStart);
465 #endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_