wheel_ = wheel;
- // Only update the now_ time if we're not in a timeout expired callback
- if (wheel_->count_ == 0 && !wheel_->processingCallbacksGuard_) {
- wheel_->now_ = getCurTime();
- }
-
- expiration_ = wheel_->now_ + timeout;
+ expiration_ = getCurTime() + timeout;
}
void HHWheelTimer::Callback::cancelTimeoutImpl() {
: AsyncTimeout(timeoutMananger, internal),
interval_(intervalMS),
defaultTimeout_(defaultTimeoutMS),
- nextTick_(1),
+ lastTick_(1),
+ expireTick_(1),
count_(0),
+ startTime_(getCurTime()),
processingCallbacksGuard_(nullptr) {}
HHWheelTimer::~HHWheelTimer() {
void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
std::chrono::milliseconds timeout) {
- int64_t due = timeToWheelTicks(timeout) + nextTick_;
- int64_t diff = due - nextTick_;
+ auto nextTick = calcNextTick();
+ int64_t due = timeToWheelTicks(timeout) + nextTick;
+ int64_t diff = due - nextTick;
CallbackList* list;
if (diff < 0) {
- list = &buckets_[0][nextTick_ & WHEEL_MASK];
+ list = &buckets_[0][nextTick & WHEEL_MASK];
} else if (diff < WHEEL_SIZE) {
list = &buckets_[0][due & WHEEL_MASK];
} else if (diff < 1 << (2 * WHEEL_BITS)) {
/* in largest slot */
if (diff > LARGEST_SLOT) {
diff = LARGEST_SLOT;
- due = diff + nextTick_;
+ due = diff + nextTick;
}
list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK];
}
callback->context_ = RequestContext::saveContext();
- if (count_ == 0 && !processingCallbacksGuard_) {
- this->AsyncTimeout::scheduleTimeout(interval_.count());
- }
+ uint64_t prev = count_;
+ count_++;
callback->setScheduled(this, timeout);
scheduleTimeoutImpl(callback, timeout);
- count_++;
+
+ /* If we're calling callbacks, timer will be reset after all
+ * callbacks are called.
+ */
+ if (!processingCallbacksGuard_) {
+ scheduleNextTimeout();
+ }
}
void HHWheelTimer::scheduleTimeout(Callback* callback) {
while (!cbs.empty()) {
auto* cb = &cbs.front();
cbs.pop_front();
- scheduleTimeoutImpl(cb, cb->getTimeRemaining(now_));
+ scheduleTimeoutImpl(cb, cb->getTimeRemaining(getCurTime()));
}
// If tick is zero, timeoutExpired will cascade the next bucket.
}
void HHWheelTimer::timeoutExpired() noexcept {
+ auto nextTick = calcNextTick();
+
// If the last smart pointer for "this" is reset inside the callback's
// timeoutExpired(), then the guard will detect that it is time to bail from
// this method.
// timeoutExpired() can only be invoked directly from the event base loop.
// It should never be invoked recursively.
//
- milliseconds catchup = std::chrono::duration_cast<milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch());
- while (now_ < catchup) {
- now_ += interval_;
+ lastTick_ = expireTick_;
+ while (lastTick_ < nextTick) {
+ int idx = lastTick_ & WHEEL_MASK;
- int idx = nextTick_ & WHEEL_MASK;
- if (0 == idx) {
+ if (idx == 0) {
// Cascade timers
- if (cascadeTimers(1, (nextTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
- cascadeTimers(2, (nextTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
- cascadeTimers(3, (nextTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
+ if (cascadeTimers(1, (lastTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
+ cascadeTimers(2, (lastTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
+ cascadeTimers(3, (lastTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
}
}
- nextTick_++;
+ lastTick_++;
CallbackList* cbs = &buckets_[0][idx];
while (!cbs->empty()) {
auto* cb = &cbs->front();
return;
}
}
- if (count_ > 0) {
- this->AsyncTimeout::scheduleTimeout(interval_.count());
- }
+ scheduleNextTimeout();
}
size_t HHWheelTimer::cancelAll() {
return count;
}
+void HHWheelTimer::scheduleNextTimeout() {
+ auto nextTick = calcNextTick();
+ long tick = 1;
+ if (nextTick & WHEEL_MASK) {
+ for (tick = nextTick & WHEEL_MASK; tick < WHEEL_SIZE; tick++) {
+ if (!buckets_[0][tick].empty()) {
+ break;
+ }
+ }
+ tick -= (nextTick - 1) & WHEEL_MASK;
+ }
+
+ if (count_ > 0) {
+ if (!this->AsyncTimeout::isScheduled() ||
+ (expireTick_ > tick + nextTick - 1)) {
+ this->AsyncTimeout::scheduleTimeout(interval_ * tick);
+ expireTick_ = tick + nextTick - 1;
+ }
+ } else {
+ this->AsyncTimeout::cancelTimeout();
+ }
+}
+
+int64_t HHWheelTimer::calcNextTick() {
+ auto intervals =
+ (getCurTime().count() - startTime_.count()) / interval_.count();
+ // Slow eventbases will have skew between the actual time and the
+ // callback time. To avoid racing the next scheduleNextTimeout()
+ // call, always schedule new timeouts against the actual
+ // timeoutExpired() time.
+ if (!processingCallbacksGuard_) {
+ return intervals;
+ } else {
+ return lastTick_;
+ }
+}
+
} // folly
/**
* Hashed Hierarchical Wheel Timer
*
- * Comparison:
- * AsyncTimeout - a single timeout.
- * HHWheelTimer - a set of efficient timeouts with different interval,
- * but timeouts are not exact.
- *
- * All of the above are O(1) in insertion, tick update and cancel
-
- * This implementation ticks once every 10ms.
* We model timers as the number of ticks until the next
* due event. We allow 32-bits of space to track this
* due interval, and break that into 4 regions of 8 bits.
* into a different bucket.
*
* This technique results in a very cheap mechanism for
- * maintaining time and timers, provided that we can maintain
- * a consistent rate of ticks.
+ * maintaining time and timers.
+ *
+ * Unlike the original timer wheel paper, this implementation does
+ * *not* tick constantly, and instead calculates the exact next wakeup
+ * time.
*/
class HHWheelTimer : private folly::AsyncTimeout,
public folly::DelayedDestruction {
}
bool cascadeTimers(int bucket, int tick);
- int64_t nextTick_;
+ int64_t lastTick_;
+ int64_t expireTick_;
uint64_t count_;
- std::chrono::milliseconds now_;
+ std::chrono::milliseconds startTime_;
+
+ int64_t calcNextTick();
+
+ void scheduleNextTimeout();
bool* processingCallbacksGuard_;
CallbackList timeouts; // Timeouts queued to run
+
+ std::chrono::milliseconds getCurTime() {
+ return std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch());
+ }
};
} // folly
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include <folly/Random.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/HHWheelTimer.h>
#include <folly/io/async/test/UndelayedDestruction.h>
T_CHECK_TIMEOUT(
start, t2.timestamps[0], milliseconds(300), milliseconds(256));
}
+
+TEST_F(HHWheelTimerTest, Stress) {
+ StackWheelTimer t(&eventBase, milliseconds(1));
+
+ long timeoutcount = 10000;
+ TestTimeout timeouts[10000];
+ long runtimeouts = 0;
+ for (long i = 0; i < timeoutcount; i++) {
+ long newtimeout = Random::rand32(1, 10000);
+ if (Random::rand32(3)) {
+ // NOTE: hhwheel timer runs before eventbase runAfterDelay,
+ // so runAfterDelay cancelTimeout() must run at least one timerwheel
+ // before scheduleTimeout, to ensure it runs first.
+ newtimeout += 256;
+ t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
+ eventBase.runAfterDelay(
+ [&, i]() {
+ timeouts[i].fn = nullptr;
+ timeouts[i].cancelTimeout();
+ runtimeouts++;
+ LOG(INFO) << "Ran " << runtimeouts << " timeouts, cancelled";
+ },
+ newtimeout - 256);
+ timeouts[i].fn = [&, i, newtimeout]() {
+ LOG(INFO) << "FAIL:timer " << i << " still fired in " << newtimeout;
+ EXPECT_FALSE(true);
+ };
+ } else {
+ t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
+ timeouts[i].fn = [&, i]() {
+ timeoutcount++;
+ long newtimeout = Random::rand32(1, 10000);
+ t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
+ runtimeouts++;
+ /* sleep override */ usleep(1000);
+ LOG(INFO) << "Ran " << runtimeouts << " timeouts of " << timeoutcount;
+ timeouts[i].fn = [&, i]() {
+ runtimeouts++;
+ LOG(INFO) << "Ran " << runtimeouts << " timeouts of " << timeoutcount;
+ };
+ };
+ }
+ }
+
+ LOG(INFO) << "RUNNING TEST";
+ eventBase.loop();
+
+ EXPECT_EQ(runtimeouts, timeoutcount);
+}