2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <glog/logging.h>
21 #include <folly/Likely.h>
22 #include <folly/stats/BucketedTimeSeries.h>
26 template <typename VT, typename CT>
27 BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
30 : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
31 // For tracking all-time data we only use total_, and don't need to bother
34 // Round nBuckets down to duration_.count().
36 // There is no point in having more buckets than our timestamp
37 // granularity: otherwise we would have buckets that could never be used.
38 if (nBuckets > size_t(duration_.count())) {
39 nBuckets = duration_.count();
42 buckets_.resize(nBuckets, Bucket());
46 template <typename VT, typename CT>
47 bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) {
48 return addValueAggregated(now, val, 1);
51 template <typename VT, typename CT>
52 bool BucketedTimeSeries<VT, CT>::addValue(
56 return addValueAggregated(now, val * times, times);
59 template <typename VT, typename CT>
60 bool BucketedTimeSeries<VT, CT>::addValueAggregated(
62 const ValueType& total,
65 if (UNLIKELY(empty())) {
68 } else if (now > latestTime_) {
70 } else if (now < firstTime_) {
73 total_.add(total, nsamples);
78 if (UNLIKELY(empty())) {
79 // First data point we've ever seen
82 bucketIdx = getBucketIdx(now);
83 } else if (now > latestTime_) {
84 // More recent time. Need to update the buckets.
85 bucketIdx = updateBuckets(now);
86 } else if (LIKELY(now == latestTime_)) {
88 bucketIdx = getBucketIdx(now);
90 // An earlier time in the past. We need to check if this time still falls
92 if (now < getEarliestTimeNonEmpty()) {
95 bucketIdx = getBucketIdx(now);
98 total_.add(total, nsamples);
99 buckets_[bucketIdx].add(total, nsamples);
103 template <typename VT, typename CT>
104 size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) {
106 // This is the first data point.
110 // For all-time data, all we need to do is update latestTime_
112 latestTime_ = std::max(latestTime_, now);
116 // Make sure time doesn't go backwards.
117 // If the time is less than or equal to the latest time we have already seen,
118 // we don't need to do anything.
119 if (now <= latestTime_) {
120 return getBucketIdx(latestTime_);
123 return updateBuckets(now);
126 template <typename VT, typename CT>
127 size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) {
128 // We could cache nextBucketStart as a member variable, so we don't have to
129 // recompute it each time update() is called with a new timestamp value.
130 // This makes things faster when update() (or addValue()) is called once
131 // per second, but slightly slower when update() is called multiple times a
132 // second. We care more about optimizing the cases where addValue() is being
133 // called frequently. If addValue() is only being called once every few
134 // seconds, it doesn't matter as much if it is fast.
136 // Get info about the bucket that latestTime_ points at
137 size_t currentBucket;
138 TimePoint currentBucketStart;
139 TimePoint nextBucketStart;
140 getBucketInfo(latestTime_, ¤tBucket,
141 ¤tBucketStart, &nextBucketStart);
143 // Update latestTime_
146 if (now < nextBucketStart) {
147 // We're still in the same bucket.
148 // We're done after updating latestTime_.
149 return currentBucket;
150 } else if (now >= currentBucketStart + duration_) {
151 // It's been a while. We have wrapped, and all of the buckets need to be
153 for (Bucket& bucket : buckets_) {
157 return getBucketIdx(latestTime_);
159 // clear all the buckets between the last time and current time, meaning
160 // buckets in the range [(currentBucket+1), newBucket]. Note that
161 // the bucket (currentBucket+1) is always the oldest bucket we have. Since
162 // our array is circular, loop when we reach the end.
163 size_t newBucket = getBucketIdx(now);
164 size_t idx = currentBucket;
165 while (idx != newBucket) {
167 if (idx >= buckets_.size()) {
170 total_ -= buckets_[idx];
171 buckets_[idx].clear();
177 template <typename VT, typename CT>
178 void BucketedTimeSeries<VT, CT>::clear() {
179 for (Bucket& bucket : buckets_) {
183 // Set firstTime_ larger than latestTime_,
184 // to indicate that the timeseries is empty
185 firstTime_ = TimePoint(Duration(1));
186 latestTime_ = TimePoint();
189 template <typename VT, typename CT>
190 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
198 // Compute the earliest time we can track
199 TimePoint earliestTime = getEarliestTimeNonEmpty();
201 // We're never tracking data before firstTime_
202 earliestTime = std::max(earliestTime, firstTime_);
207 template <typename VT, typename CT>
208 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty()
210 size_t currentBucket;
211 TimePoint currentBucketStart;
212 TimePoint nextBucketStart;
213 getBucketInfo(latestTime_, ¤tBucket,
214 ¤tBucketStart, &nextBucketStart);
216 // Subtract 1 duration from the start of the next bucket to find the
217 // earliest possible data point we could be tracking.
218 return nextBucketStart - duration_;
221 template <typename VT, typename CT>
222 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
227 // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
228 return latestTime_ - getEarliestTime() + Duration(1);
231 template <typename VT, typename CT>
232 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed(
234 TimePoint end) const {
238 start = std::max(start, getEarliestTime());
239 end = std::min(end, latestTime_ + Duration(1));
240 end = std::max(start, end);
244 template <typename VT, typename CT>
245 VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const {
246 ValueType total = ValueType();
250 [&](const Bucket& bucket,
251 TimePoint bucketStart,
252 TimePoint nextBucketStart) -> bool {
253 total += this->rangeAdjust(
254 bucketStart, nextBucketStart, start, end, bucket.sum);
261 template <typename VT, typename CT>
262 uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end)
264 uint64_t sample_count = 0;
268 [&](const Bucket& bucket,
269 TimePoint bucketStart,
270 TimePoint nextBucketStart) -> bool {
271 sample_count += this->rangeAdjust(
272 bucketStart, nextBucketStart, start, end, bucket.count);
279 template <typename VT, typename CT>
280 template <typename ReturnType>
281 ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end)
283 ValueType total = ValueType();
284 uint64_t sample_count = 0;
288 [&](const Bucket& bucket,
289 TimePoint bucketStart,
290 TimePoint nextBucketStart) -> bool {
291 total += this->rangeAdjust(
292 bucketStart, nextBucketStart, start, end, bucket.sum);
293 sample_count += this->rangeAdjust(
294 bucketStart, nextBucketStart, start, end, bucket.count);
298 if (sample_count == 0) {
299 return ReturnType(0);
302 return detail::avgHelper<ReturnType>(total, sample_count);
306 * A note about some of the bucket index calculations below:
308 * buckets_.size() may not divide evenly into duration_. When this happens,
309 * some buckets will be wider than others. We still want to spread the data
310 * out as evenly as possible among the buckets (as opposed to just making the
311 * last bucket be significantly wider than all of the others).
313 * To make the division work out, we pretend that the buckets are each
314 * duration_ wide, so that the overall duration becomes
315 * buckets.size() * duration_.
317 * To transform a real timestamp into the scale used by our buckets,
318 * we have to multiply by buckets_.size(). To figure out which bucket it goes
319 * into, we then divide by duration_.
322 template <typename VT, typename CT>
323 size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const {
324 // For all-time data we don't use buckets_. Everything is tracked in total_.
325 DCHECK(!isAllTime());
327 auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
328 return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
332 * Compute the bucket index for the specified time, as well as the earliest
333 * time that falls into this bucket.
335 template <typename VT, typename CT>
336 void BucketedTimeSeries<VT, CT>::getBucketInfo(
339 TimePoint* bucketStart,
340 TimePoint* nextBucketStart) const {
341 typedef typename Duration::rep TimeInt;
342 DCHECK(!isAllTime());
344 // Keep these two lines together. The compiler should be able to compute
345 // both the division and modulus with a single operation.
346 Duration timeMod = time.time_since_epoch() % duration_;
347 TimeInt numFullDurations = time.time_since_epoch() / duration_;
349 TimeInt scaledTime = timeMod.count() * buckets_.size();
351 // Keep these two lines together. The compiler should be able to compute
352 // both the division and modulus with a single operation.
353 *bucketIdx = scaledTime / duration_.count();
354 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
356 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
357 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
359 Duration bucketStartMod(
360 (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
361 Duration nextBucketStartMod(
362 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
364 TimePoint durationStart(numFullDurations * duration_);
365 *bucketStart = bucketStartMod + durationStart;
366 *nextBucketStart = nextBucketStartMod + durationStart;
369 template <typename VT, typename CT>
370 template <typename Function>
371 void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const {
373 fn(total_, firstTime_, latestTime_ + Duration(1));
377 typedef typename Duration::rep TimeInt;
379 // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
380 // the same way as in getBucketInfo().
381 Duration timeMod = latestTime_.time_since_epoch() % duration_;
382 TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
383 TimePoint durationStart(numFullDurations * duration_);
384 TimeInt scaledTime = timeMod.count() * buckets_.size();
385 size_t latestBucketIdx = scaledTime / duration_.count();
386 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
387 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
388 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
390 // Walk through the buckets, starting one past the current bucket.
391 // The next bucket is from the previous cycle, so subtract 1 duration
392 // from durationStart.
393 size_t idx = latestBucketIdx;
394 durationStart -= duration_;
396 TimePoint nextBucketStart =
398 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
402 if (idx >= buckets_.size()) {
404 durationStart += duration_;
405 scaledNextBucketStart = duration_.count();
407 scaledNextBucketStart += duration_.count();
410 TimePoint bucketStart = nextBucketStart;
413 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
416 // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
417 // For now we go ahead and invoke the function with these buckets.
418 // sum and count should always be 0 in these buckets.
421 bucketStart.time_since_epoch().count(),
422 latestTime_.time_since_epoch().count());
423 bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
428 if (idx == latestBucketIdx) {
436 * Adjust the input value from the specified bucket to only account
437 * for the desired range.
439 * For example, if the bucket spans time [10, 20), but we only care about the
440 * range [10, 16), this will return 60% of the input value.
442 template <typename VT, typename CT>
443 VT BucketedTimeSeries<VT, CT>::rangeAdjust(
444 TimePoint bucketStart,
445 TimePoint nextBucketStart,
448 ValueType input) const {
449 // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
450 // if it were latestTime_. This makes us more accurate when someone is
451 // querying for all of the data up to latestTime_. Even though latestTime_
452 // may only be partially through the bucket, we don't want to adjust
453 // downwards in this case, because the bucket really only has data up to
455 if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
456 nextBucketStart = latestTime_ + Duration(1);
459 if (start <= bucketStart && end >= nextBucketStart) {
460 // The bucket is wholly contained in the [start, end) interval
464 TimePoint intervalStart = std::max(start, bucketStart);
465 TimePoint intervalEnd = std::min(end, nextBucketStart);
466 return input * (intervalEnd - intervalStart) /
467 (nextBucketStart - bucketStart);
470 template <typename VT, typename CT>
471 template <typename Function>
472 void BucketedTimeSeries<VT, CT>::forEachBucket(
478 const Bucket& bucket,
479 TimePoint bucketStart,
480 TimePoint nextBucketStart) -> bool {
481 if (start >= nextBucketStart) {
484 if (end <= bucketStart) {
487 bool ret = fn(bucket, bucketStart, nextBucketStart);