2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
21 #include <folly/io/async/EventBase.h>
23 #include <folly/ThreadName.h>
24 #include <folly/io/async/NotificationQueue.h>
26 #include <boost/static_assert.hpp>
34 using folly::EventBase;
36 template <typename Callback>
37 class FunctionLoopCallback : public EventBase::LoopCallback {
39 explicit FunctionLoopCallback(Cob&& function)
40 : function_(std::move(function)) {}
42 explicit FunctionLoopCallback(const Cob& function)
43 : function_(function) {}
45 virtual void runLoopCallback() noexcept {
61 * EventBase::FunctionRunner
64 class EventBase::FunctionRunner
65 : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
67 void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
69 // In libevent2, internal events do not break the loop.
70 // Most users would expect loop(), followed by runInEventBaseThread(),
71 // to break the loop and check if it should exit or not.
72 // To have similar bejaviour to libevent1.4, tell the loop to break here.
73 // Note that loop() may still continue to loop, but it will also check the
74 // stop_ flag as well as runInLoop callbacks, etc.
75 event_base_loopbreak(getEventBase()->evb_);
77 if (msg.first == nullptr && msg.second == nullptr) {
78 // terminateLoopSoon() sends a null message just to
79 // wake up the loop. We can ignore these messages.
83 // If function is nullptr, just log and move on
85 LOG(ERROR) << "nullptr callback registered to be run in "
86 << "event base thread";
90 // The function should never throw an exception, because we have no
91 // way of knowing what sort of error handling to perform.
93 // If it does throw, log a message and abort the program.
95 msg.first(msg.second);
96 } catch (const std::exception& ex) {
97 LOG(ERROR) << "runInEventBaseThread() function threw a "
98 << typeid(ex).name() << " exception: " << ex.what();
101 LOG(ERROR) << "runInEventBaseThread() function threw an exception";
108 * EventBase::CobTimeout methods
111 void EventBase::CobTimeout::timeoutExpired() noexcept {
112 // For now, we just swallow any exceptions that the callback threw.
115 } catch (const std::exception& ex) {
116 LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
117 << typeid(ex).name() << " exception: " << ex.what();
119 LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
123 // The CobTimeout object was allocated on the heap by runAfterDelay(),
124 // so delete it now that the it has fired.
132 EventBase::EventBase()
133 : runOnceCallbacks_(nullptr)
136 , evb_(static_cast<event_base*>(event_init()))
140 , avgLoopTime_(2000000)
141 , maxLatencyLoopTime_(avgLoopTime_)
142 , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
143 , latestLoopCnt_(nextLoopCnt_)
146 , observerSampleCount_(0) {
147 if (UNLIKELY(evb_ == nullptr)) {
148 LOG(ERROR) << "EventBase(): Failed to init event base.";
149 folly::throwSystemError("error in EventBase::EventBase()");
151 VLOG(5) << "EventBase(): Created.";
152 initNotificationQueue();
153 RequestContext::getStaticContext();
156 // takes ownership of the event_base
157 EventBase::EventBase(event_base* evb)
158 : runOnceCallbacks_(nullptr)
165 , avgLoopTime_(2000000)
166 , maxLatencyLoopTime_(avgLoopTime_)
167 , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
168 , latestLoopCnt_(nextLoopCnt_)
171 , observerSampleCount_(0) {
172 if (UNLIKELY(evb_ == nullptr)) {
173 LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
174 throw std::invalid_argument("EventBase(): event base cannot be nullptr");
176 initNotificationQueue();
177 RequestContext::getStaticContext();
180 EventBase::~EventBase() {
181 // Call all destruction callbacks, before we start cleaning up our state.
182 while (!onDestructionCallbacks_.empty()) {
183 LoopCallback* callback = &onDestructionCallbacks_.front();
184 onDestructionCallbacks_.pop_front();
185 callback->runLoopCallback();
188 // Delete any unfired CobTimeout objects, so that we don't leak memory
189 // (Note that we don't fire them. The caller is responsible for cleaning up
190 // its own data structures if it destroys the EventBase with unfired events
192 while (!pendingCobTimeouts_.empty()) {
193 CobTimeout* timeout = &pendingCobTimeouts_.front();
197 (void) runLoopCallbacks(false);
199 // Stop consumer before deleting NotificationQueue
200 fnRunner_->stopConsuming();
201 event_base_free(evb_);
202 VLOG(5) << "EventBase(): Destroyed.";
205 int EventBase::getNotificationQueueSize() const {
206 return queue_->size();
209 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
210 fnRunner_->setMaxReadAtOnce(maxAtOnce);
213 // Set smoothing coefficient for loop load average; input is # of milliseconds
214 // for exp(-1) decay.
215 void EventBase::setLoadAvgMsec(uint32_t ms) {
216 uint64_t us = 1000 * ms;
218 maxLatencyLoopTime_.setTimeInterval(us);
219 avgLoopTime_.setTimeInterval(us);
221 LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
225 void EventBase::resetLoadAvg(double value) {
226 avgLoopTime_.reset(value);
227 maxLatencyLoopTime_.reset(value);
230 static std::chrono::milliseconds
231 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
232 auto result = std::chrono::steady_clock::now() - *prev;
233 *prev = std::chrono::steady_clock::now();
235 return std::chrono::duration_cast<std::chrono::milliseconds>(result);
238 void EventBase::waitUntilRunning() {
239 while (!isRunning()) {
244 // enters the event_base loop -- will only exit when forced to
245 bool EventBase::loop() {
249 bool EventBase::loopOnce(int flags) {
250 return loopBody(flags | EVLOOP_ONCE);
253 bool EventBase::loopBody(int flags) {
254 VLOG(5) << "EventBase(): Starting loop.";
256 bool ranLoopCallbacks;
257 bool blocking = !(flags & EVLOOP_NONBLOCK);
258 bool once = (flags & EVLOOP_ONCE);
260 loopThread_.store(pthread_self(), std::memory_order_release);
262 if (!name_.empty()) {
263 setThreadName(name_);
266 auto prev = std::chrono::steady_clock::now();
267 int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
268 std::chrono::steady_clock::now().time_since_epoch()).count();
270 // TODO: Read stop_ atomically with an acquire barrier.
274 // nobody can add loop callbacks from within this thread if
275 // we don't have to handle anything to start with...
276 if (blocking && loopCallbacks_.empty()) {
277 res = event_base_loop(evb_, EVLOOP_ONCE);
279 res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
281 ranLoopCallbacks = runLoopCallbacks();
283 int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
284 std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
285 int64_t idle = startWork_ - idleStart;
287 avgLoopTime_.addSample(idle, busy);
288 maxLatencyLoopTime_.addSample(idle, busy);
291 if (observerSampleCount_++ == observer_->getSampleRate()) {
292 observerSampleCount_ = 0;
293 observer_->loopSample(busy, idle);
297 VLOG(11) << "EventBase " << this << " did not timeout "
298 " loop time guess: " << busy + idle <<
299 " idle time: " << idle <<
300 " busy time: " << busy <<
301 " avgLoopTime: " << avgLoopTime_.get() <<
302 " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
303 " maxLatency_: " << maxLatency_ <<
304 " nothingHandledYet(): "<< nothingHandledYet();
306 // see if our average loop time has exceeded our limit
307 if ((maxLatency_ > 0) &&
308 (maxLatencyLoopTime_.get() > double(maxLatency_))) {
310 // back off temporarily -- don't keep spamming maxLatencyCob_
311 // if we're only a bit over the limit
312 maxLatencyLoopTime_.dampen(0.9);
315 // Our loop run did real work; reset the idle timer
316 idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
317 std::chrono::steady_clock::now().time_since_epoch()).count();
319 // If the event loop indicate that there were no more events, and
320 // we also didn't have any loop callbacks to run, there is nothing left to
322 if (res != 0 && !ranLoopCallbacks) {
323 // Since Notification Queue is marked 'internal' some events may not have
324 // run. Run them manually if so, and continue looping.
326 if (getNotificationQueueSize() > 0) {
327 fnRunner_->handlerReady(0);
333 VLOG(5) << "EventBase " << this << " loop time: " <<
334 getTimeDelta(&prev).count();
340 // Reset stop_ so loop() can be called again
344 LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
346 } else if (res == 1) {
347 VLOG(5) << "EventBase: ran out of events (exiting loop)!";
348 } else if (res > 1) {
349 LOG(ERROR) << "EventBase: unknown event loop result = " << res;
353 loopThread_.store(0, std::memory_order_release);
355 VLOG(5) << "EventBase(): Done with loop.";
359 void EventBase::loopForever() {
360 // Update the notification queue event to treat it as a normal (non-internal)
361 // event. The notification queue event always remains installed, and the main
362 // loop won't exit with it installed.
363 fnRunner_->stopConsuming();
364 fnRunner_->startConsuming(this, queue_.get());
368 // Restore the notification queue internal flag
369 fnRunner_->stopConsuming();
370 fnRunner_->startConsumingInternal(this, queue_.get());
373 folly::throwSystemError("error in EventBase::loopForever()");
377 bool EventBase::bumpHandlingTime() {
378 VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
379 " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
380 if(nothingHandledYet()) {
381 latestLoopCnt_ = nextLoopCnt_;
383 startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
384 std::chrono::steady_clock::now().time_since_epoch()).count();
386 VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
387 " (loop) startWork_ " << startWork_;
393 void EventBase::terminateLoopSoon() {
394 VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
396 // Set stop to true, so the event loop will know to exit.
397 // TODO: We should really use an atomic operation here with a release
401 // Call event_base_loopbreak() so that libevent will exit the next time
403 event_base_loopbreak(evb_);
405 // If terminateLoopSoon() is called from another thread,
406 // the EventBase thread might be stuck waiting for events.
407 // In this case, it won't wake up and notice that stop_ is set until it
408 // receives another event. Send an empty frame to the notification queue
409 // so that the event loop will wake up even if there are no other events.
411 // We don't care about the return value of trySendFrame(). If it fails
412 // this likely means the EventBase already has lots of events waiting
415 queue_->putMessage(std::make_pair(nullptr, nullptr));
417 // We don't care if putMessage() fails. This likely means
418 // the EventBase already has lots of events waiting anyway.
422 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
423 DCHECK(isInEventBaseThread());
424 callback->cancelLoopCallback();
425 callback->context_ = RequestContext::saveContext();
426 if (runOnceCallbacks_ != nullptr && thisIteration) {
427 runOnceCallbacks_->push_back(*callback);
429 loopCallbacks_.push_back(*callback);
433 void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
434 DCHECK(isInEventBaseThread());
435 auto wrapper = new FunctionLoopCallback<Cob>(cob);
436 wrapper->context_ = RequestContext::saveContext();
437 if (runOnceCallbacks_ != nullptr && thisIteration) {
438 runOnceCallbacks_->push_back(*wrapper);
440 loopCallbacks_.push_back(*wrapper);
444 void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
445 DCHECK(isInEventBaseThread());
446 auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
447 wrapper->context_ = RequestContext::saveContext();
448 if (runOnceCallbacks_ != nullptr && thisIteration) {
449 runOnceCallbacks_->push_back(*wrapper);
451 loopCallbacks_.push_back(*wrapper);
455 void EventBase::runOnDestruction(LoopCallback* callback) {
456 DCHECK(isInEventBaseThread());
457 callback->cancelLoopCallback();
458 onDestructionCallbacks_.push_back(*callback);
461 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
463 // It will be received by the FunctionRunner in the EventBase's thread.
465 // We try not to schedule nullptr callbacks
467 LOG(ERROR) << "EventBase " << this
468 << ": Scheduling nullptr callbacks is not allowed";
472 // Short-circuit if we are already in our event base
473 if (inRunningEventBaseThread()) {
474 runInLoop(new RunInLoopCallback(fn, arg));
480 queue_->putMessage(std::make_pair(fn, arg));
481 } catch (const std::exception& ex) {
482 LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
483 << fn << "for EventBase thread: " << ex.what();
490 bool EventBase::runInEventBaseThread(const Cob& fn) {
491 // Short-circuit if we are already in our event base
492 if (inRunningEventBaseThread()) {
498 // Allocate a copy of the function so we can pass it to the other thread
499 // The other thread will delete this copy once the function has been run
501 fnCopy = new Cob(fn);
502 } catch (const std::bad_alloc& ex) {
503 LOG(ERROR) << "failed to allocate tr::function copy "
504 << "for runInEventBaseThread()";
508 if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
516 bool EventBase::runAfterDelay(const Cob& cob,
518 TimeoutManager::InternalEnum in) {
519 CobTimeout* timeout = new CobTimeout(this, cob, in);
520 if (!timeout->scheduleTimeout(milliseconds)) {
525 pendingCobTimeouts_.push_back(*timeout);
529 bool EventBase::runLoopCallbacks(bool setContext) {
530 if (!loopCallbacks_.empty()) {
532 // Swap the loopCallbacks_ list with a temporary list on our stack.
533 // This way we will only run callbacks scheduled at the time
534 // runLoopCallbacks() was invoked.
536 // If any of these callbacks in turn call runInLoop() to schedule more
537 // callbacks, those new callbacks won't be run until the next iteration
538 // around the event loop. This prevents runInLoop() callbacks from being
539 // able to start file descriptor and timeout based events.
540 LoopCallbackList currentCallbacks;
541 currentCallbacks.swap(loopCallbacks_);
542 runOnceCallbacks_ = ¤tCallbacks;
544 while (!currentCallbacks.empty()) {
545 LoopCallback* callback = ¤tCallbacks.front();
546 currentCallbacks.pop_front();
548 RequestContext::setContext(callback->context_);
550 callback->runLoopCallback();
553 runOnceCallbacks_ = nullptr;
559 void EventBase::initNotificationQueue() {
560 // Infinite size queue
561 queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
563 // We allocate fnRunner_ separately, rather than declaring it directly
564 // as a member of EventBase solely so that we don't need to include
565 // NotificationQueue.h from EventBase.h
566 fnRunner_.reset(new FunctionRunner());
568 // Mark this as an internal event, so event_base_loop() will return if
569 // there are no other events besides this one installed.
571 // Most callers don't care about the internal notification queue used by
572 // EventBase. The queue is always installed, so if we did count the queue as
573 // an active event, loop() would never exit with no more events to process.
574 // Users can use loopForever() if they do care about the notification queue.
575 // (This is useful for EventBase threads that do nothing but process
576 // runInEventBaseThread() notifications.)
577 fnRunner_->startConsumingInternal(this, queue_.get());
580 void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
581 expCoeff_ = -1.0/timeInterval;
582 VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
585 void EventBase::SmoothLoopTime::reset(double value) {
589 void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
591 * Position at which the busy sample is considered to be taken.
592 * (Allows to quickly skew our average without editing much code)
594 enum BusySamplePosition {
595 RIGHT = 0, // busy sample placed at the end of the iteration
596 CENTER = 1, // busy sample placed at the middle point of the iteration
597 LEFT = 2, // busy sample placed at the beginning of the iteration
600 VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
601 " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
602 " busy " << busy << " " << __PRETTY_FUNCTION__;
603 idle += oldBusyLeftover_ + busy;
604 oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
605 idle -= oldBusyLeftover_;
607 double coeff = exp(idle * expCoeff_);
609 value_ += (1.0 - coeff) * busy;
612 bool EventBase::nothingHandledYet() {
613 VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
614 return (nextLoopCnt_ != latestLoopCnt_);
618 void EventBase::runFunctionPtr(Cob* fn) {
619 // The function should never throw an exception, because we have no
620 // way of knowing what sort of error handling to perform.
622 // If it does throw, log a message and abort the program.
625 } catch (const std::exception &ex) {
626 LOG(ERROR) << "runInEventBaseThread() std::function threw a "
627 << typeid(ex).name() << " exception: " << ex.what();
630 LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
634 // The function object was allocated by runInEventBaseThread().
635 // Delete it once it has been run.
639 EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
643 void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
648 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
649 InternalEnum internal) {
651 struct event* ev = obj->getEvent();
652 assert(ev->ev_base == nullptr);
654 event_base_set(getLibeventBase(), ev);
655 if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
656 // Set the EVLIST_INTERNAL flag
657 ev->ev_flags |= EVLIST_INTERNAL;
661 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
663 struct event* ev = obj->getEvent();
664 ev->ev_base = nullptr;
667 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
668 std::chrono::milliseconds timeout) {
669 assert(isInEventBaseThread());
670 // Set up the timeval and add the event
672 tv.tv_sec = timeout.count() / 1000LL;
673 tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
675 struct event* ev = obj->getEvent();
676 if (event_add(ev, &tv) < 0) {
677 LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
684 void EventBase::cancelTimeout(AsyncTimeout* obj) {
685 assert(isInEventBaseThread());
686 struct event* ev = obj->getEvent();
687 if (EventUtil::isEventRegistered(ev)) {
692 void EventBase::setName(const std::string& name) {
693 assert(isInEventBaseThread());
697 setThreadName(loopThread_.load(std::memory_order_relaxed),
702 const std::string& EventBase::getName() {
703 assert(isInEventBaseThread());