explicit FunctionLoopCallback(Cob&& function)
: function_(std::move(function)) {}
- explicit FunctionLoopCallback(const Cob& function)
- : function_(function) {}
+ explicit FunctionLoopCallback(const Cob& function) : function_(function) {}
void runLoopCallback() noexcept override {
function_();
private:
Callback function_;
};
-
}
namespace folly {
-const int kNoFD = -1;
-
/*
* EventBase::FunctionRunner
*/
-class EventBase::FunctionRunner
- : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
+class EventBase::FunctionRunner : public NotificationQueue<Cob>::Consumer {
public:
- void messageAvailable(std::pair<void (*)(void*), void*>&& msg) override {
+ void messageAvailable(Cob&& msg) override {
// In libevent2, internal events do not break the loop.
// Most users would expect loop(), followed by runInEventBaseThread(),
// stop_ flag as well as runInLoop callbacks, etc.
event_base_loopbreak(getEventBase()->evb_);
- if (msg.first == nullptr && msg.second == nullptr) {
+ if (!msg) {
// terminateLoopSoon() sends a null message just to
// wake up the loop. We can ignore these messages.
return;
}
- // If function is nullptr, just log and move on
- if (!msg.first) {
- LOG(ERROR) << "nullptr callback registered to be run in "
- << "event base thread";
- return;
- }
-
// The function should never throw an exception, because we have no
// way of knowing what sort of error handling to perform.
//
// If it does throw, log a message and abort the program.
try {
- msg.first(msg.second);
+ msg();
} catch (const std::exception& ex) {
LOG(ERROR) << "runInEventBaseThread() function threw a "
<< typeid(ex).name() << " exception: " << ex.what();
std::lock_guard<std::mutex> lock(libevent_mutex_);
event_base_free(evb_);
}
+
+ while (!runAfterDrainCallbacks_.empty()) {
+ LoopCallback* callback = &runAfterDrainCallbacks_.front();
+ runAfterDrainCallbacks_.pop_front();
+ callback->runLoopCallback();
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(localStorageMutex_);
+ for (auto storage : localStorageToDtor_) {
+ storage->onEventBaseDestruction(*this);
+ }
+ }
VLOG(5) << "EventBase(): Destroyed.";
}
// time-measurement variables.
std::chrono::steady_clock::time_point prev;
- int64_t idleStart;
+ int64_t idleStart = 0;
int64_t busy;
int64_t idle;
" avgLoopTime: " << avgLoopTime_.get() <<
" maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
" maxLatency_: " << maxLatency_ <<
+ " notificationQueueSize: " << getNotificationQueueSize() <<
" nothingHandledYet(): "<< nothingHandledYet();
// see if our average loop time has exceeded our limit
" (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
if(nothingHandledYet()) {
latestLoopCnt_ = nextLoopCnt_;
- // set the time
- startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now().time_since_epoch()).count();
+ if (enableTimeMeasurement_) {
+ // set the time
+ startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
- VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
- " (loop) startWork_ " << startWork_;
+ VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
+ " (loop) startWork_ " << startWork_;
+ }
return true;
}
return false;
// this likely means the EventBase already has lots of events waiting
// anyway.
try {
- queue_->putMessage(std::make_pair(nullptr, nullptr));
+ queue_->putMessage(nullptr);
} catch (...) {
// We don't care if putMessage() fails. This likely means
// the EventBase already has lots of events waiting anyway.
}
}
+void EventBase::runAfterDrain(Cob&& cob) {
+ auto callback = new FunctionLoopCallback<Cob>(std::move(cob));
+ std::lock_guard<std::mutex> lg(runAfterDrainCallbacksMutex_);
+ callback->cancelLoopCallback();
+ runAfterDrainCallbacks_.push_back(*callback);
+}
+
void EventBase::runOnDestruction(LoopCallback* callback) {
std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
callback->cancelLoopCallback();
runBeforeLoopCallbacks_.push_back(*callback);
}
-bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
+bool EventBase::runInEventBaseThread(const Cob& fn) {
// Send the message.
// It will be received by the FunctionRunner in the EventBase's thread.
// Short-circuit if we are already in our event base
if (inRunningEventBaseThread()) {
- runInLoop(new RunInLoopCallback(fn, arg));
+ runInLoop(fn);
return true;
}
try {
- queue_->putMessage(std::make_pair(fn, arg));
+ queue_->putMessage(fn);
} catch (const std::exception& ex) {
LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
- << fn << "for EventBase thread: " << ex.what();
- return false;
- }
-
- return true;
-}
-
-bool EventBase::runInEventBaseThread(const Cob& fn) {
- // Short-circuit if we are already in our event base
- if (inRunningEventBaseThread()) {
- runInLoop(fn);
- return true;
- }
-
- Cob* fnCopy;
- // Allocate a copy of the function so we can pass it to the other thread
- // The other thread will delete this copy once the function has been run
- try {
- fnCopy = new Cob(fn);
- } catch (const std::bad_alloc& ex) {
- LOG(ERROR) << "failed to allocate tr::function copy "
- << "for runInEventBaseThread()";
- return false;
- }
-
- if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
- delete fnCopy;
+ << "for EventBase thread: " << ex.what();
return false;
}
}
void EventBase::runAfterDelay(const Cob& cob,
- int milliseconds,
+ uint32_t milliseconds,
TimeoutManager::InternalEnum in) {
if (!tryRunAfterDelay(cob, milliseconds, in)) {
folly::throwSystemError(
}
bool EventBase::tryRunAfterDelay(const Cob& cob,
- int milliseconds,
+ uint32_t milliseconds,
TimeoutManager::InternalEnum in) {
CobTimeout* timeout = new CobTimeout(this, cob, in);
if (!timeout->scheduleTimeout(milliseconds)) {
void EventBase::initNotificationQueue() {
// Infinite size queue
- queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
+ queue_.reset(new NotificationQueue<Cob>());
// We allocate fnRunner_ separately, rather than declaring it directly
// as a member of EventBase solely so that we don't need to include
delete fn;
}
-EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
- : fn_(fn)
- , arg_(arg) {}
-
-void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
- fn_(arg_);
- delete this;
-}
-
void EventBase::attachTimeoutManager(AsyncTimeout* obj,
InternalEnum internal) {
event_base_set(getLibeventBase(), ev);
if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
// Set the EVLIST_INTERNAL flag
- ev->ev_flags |= EVLIST_INTERNAL;
+ event_ref_flags(ev) |= EVLIST_INTERNAL;
}
}