folly: fix clang build with -Wunused-const-variable
[folly.git] / folly / io / async / EventBase.cpp
index 9280f001513551a65a0929584129675b7a22c710..51a320a1c045fe303ab9826045ae3683dade4005 100644 (file)
@@ -41,8 +41,7 @@ class FunctionLoopCallback : public EventBase::LoopCallback {
   explicit FunctionLoopCallback(Cob&& function)
       : function_(std::move(function)) {}
 
-  explicit FunctionLoopCallback(const Cob& function)
-      : function_(function) {}
+  explicit FunctionLoopCallback(const Cob& function) : function_(function) {}
 
   void runLoopCallback() noexcept override {
     function_();
@@ -52,21 +51,17 @@ class FunctionLoopCallback : public EventBase::LoopCallback {
  private:
   Callback function_;
 };
-
 }
 
 namespace folly {
 
-const int kNoFD = -1;
-
 /*
  * EventBase::FunctionRunner
  */
 
-class EventBase::FunctionRunner
-    : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
+class EventBase::FunctionRunner : public NotificationQueue<Cob>::Consumer {
  public:
-  void messageAvailable(std::pair<void (*)(void*), void*>&& msg) override {
+  void messageAvailable(Cob&& msg) override {
 
     // In libevent2, internal events do not break the loop.
     // Most users would expect loop(), followed by runInEventBaseThread(),
@@ -76,25 +71,18 @@ class EventBase::FunctionRunner
     // stop_ flag as well as runInLoop callbacks, etc.
     event_base_loopbreak(getEventBase()->evb_);
 
-    if (msg.first == nullptr && msg.second == nullptr) {
+    if (!msg) {
       // terminateLoopSoon() sends a null message just to
       // wake up the loop.  We can ignore these messages.
       return;
     }
 
-    // If function is nullptr, just log and move on
-    if (!msg.first) {
-      LOG(ERROR) << "nullptr callback registered to be run in "
-                 << "event base thread";
-      return;
-    }
-
     // The function should never throw an exception, because we have no
     // way of knowing what sort of error handling to perform.
     //
     // If it does throw, log a message and abort the program.
     try {
-      msg.first(msg.second);
+      msg();
     } catch (const std::exception& ex) {
       LOG(ERROR) << "runInEventBaseThread() function threw a "
                  << typeid(ex).name() << " exception: " << ex.what();
@@ -237,6 +225,19 @@ EventBase::~EventBase() {
     std::lock_guard<std::mutex> lock(libevent_mutex_);
     event_base_free(evb_);
   }
+
+  while (!runAfterDrainCallbacks_.empty()) {
+    LoopCallback* callback = &runAfterDrainCallbacks_.front();
+    runAfterDrainCallbacks_.pop_front();
+    callback->runLoopCallback();
+  }
+
+  {
+    std::lock_guard<std::mutex> lock(localStorageMutex_);
+    for (auto storage : localStorageToDtor_) {
+      storage->onEventBaseDestruction(*this);
+    }
+  }
   VLOG(5) << "EventBase(): Destroyed.";
 }
 
@@ -299,7 +300,7 @@ bool EventBase::loopBody(int flags) {
 
   // time-measurement variables.
   std::chrono::steady_clock::time_point prev;
-  int64_t idleStart;
+  int64_t idleStart = 0;
   int64_t busy;
   int64_t idle;
 
@@ -361,6 +362,7 @@ bool EventBase::loopBody(int flags) {
         " avgLoopTime: "        << avgLoopTime_.get() <<
         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
         " maxLatency_: "        << maxLatency_ <<
+        " notificationQueueSize: " << getNotificationQueueSize() <<
         " nothingHandledYet(): "<< nothingHandledYet();
 
       // see if our average loop time has exceeded our limit
@@ -446,12 +448,14 @@ bool EventBase::bumpHandlingTime() {
     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
   if(nothingHandledYet()) {
     latestLoopCnt_ = nextLoopCnt_;
-    // set the time
-    startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
-      std::chrono::steady_clock::now().time_since_epoch()).count();
+    if (enableTimeMeasurement_) {
+      // set the time
+      startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
+        std::chrono::steady_clock::now().time_since_epoch()).count();
 
-    VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
-      " (loop) startWork_ " << startWork_;
+      VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
+        " (loop) startWork_ " << startWork_;
+    }
     return true;
   }
   return false;
@@ -479,7 +483,7 @@ void EventBase::terminateLoopSoon() {
   // this likely means the EventBase already has lots of events waiting
   // anyway.
   try {
-    queue_->putMessage(std::make_pair(nullptr, nullptr));
+    queue_->putMessage(nullptr);
   } catch (...) {
     // We don't care if putMessage() fails.  This likely means
     // the EventBase already has lots of events waiting anyway.
@@ -519,6 +523,13 @@ void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
   }
 }
 
+void EventBase::runAfterDrain(Cob&& cob) {
+  auto callback = new FunctionLoopCallback<Cob>(std::move(cob));
+  std::lock_guard<std::mutex> lg(runAfterDrainCallbacksMutex_);
+  callback->cancelLoopCallback();
+  runAfterDrainCallbacks_.push_back(*callback);
+}
+
 void EventBase::runOnDestruction(LoopCallback* callback) {
   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
   callback->cancelLoopCallback();
@@ -531,7 +542,7 @@ void EventBase::runBeforeLoop(LoopCallback* callback) {
   runBeforeLoopCallbacks_.push_back(*callback);
 }
 
-bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
+bool EventBase::runInEventBaseThread(const Cob& fn) {
   // Send the message.
   // It will be received by the FunctionRunner in the EventBase's thread.
 
@@ -544,42 +555,16 @@ bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
 
   // Short-circuit if we are already in our event base
   if (inRunningEventBaseThread()) {
-    runInLoop(new RunInLoopCallback(fn, arg));
+    runInLoop(fn);
     return true;
 
   }
 
   try {
-    queue_->putMessage(std::make_pair(fn, arg));
+    queue_->putMessage(fn);
   } catch (const std::exception& ex) {
     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
-               << fn << "for EventBase thread: " << ex.what();
-    return false;
-  }
-
-  return true;
-}
-
-bool EventBase::runInEventBaseThread(const Cob& fn) {
-  // Short-circuit if we are already in our event base
-  if (inRunningEventBaseThread()) {
-    runInLoop(fn);
-    return true;
-  }
-
-  Cob* fnCopy;
-  // Allocate a copy of the function so we can pass it to the other thread
-  // The other thread will delete this copy once the function has been run
-  try {
-    fnCopy = new Cob(fn);
-  } catch (const std::bad_alloc& ex) {
-    LOG(ERROR) << "failed to allocate tr::function copy "
-               << "for runInEventBaseThread()";
-    return false;
-  }
-
-  if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
-    delete fnCopy;
+               << "for EventBase thread: " << ex.what();
     return false;
   }
 
@@ -623,7 +608,7 @@ bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn) {
 }
 
 void EventBase::runAfterDelay(const Cob& cob,
-                              int milliseconds,
+                              uint32_t milliseconds,
                               TimeoutManager::InternalEnum in) {
   if (!tryRunAfterDelay(cob, milliseconds, in)) {
     folly::throwSystemError(
@@ -632,7 +617,7 @@ void EventBase::runAfterDelay(const Cob& cob,
 }
 
 bool EventBase::tryRunAfterDelay(const Cob& cob,
-                                 int milliseconds,
+                                 uint32_t milliseconds,
                                  TimeoutManager::InternalEnum in) {
   CobTimeout* timeout = new CobTimeout(this, cob, in);
   if (!timeout->scheduleTimeout(milliseconds)) {
@@ -675,7 +660,7 @@ bool EventBase::runLoopCallbacks(bool setContext) {
 
 void EventBase::initNotificationQueue() {
   // Infinite size queue
-  queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
+  queue_.reset(new NotificationQueue<Cob>());
 
   // We allocate fnRunner_ separately, rather than declaring it directly
   // as a member of EventBase solely so that we don't need to include
@@ -755,15 +740,6 @@ void EventBase::runFunctionPtr(Cob* fn) {
   delete fn;
 }
 
-EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
-    : fn_(fn)
-    , arg_(arg) {}
-
-void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
-  fn_(arg_);
-  delete this;
-}
-
 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
                                       InternalEnum internal) {
 
@@ -773,7 +749,7 @@ void EventBase::attachTimeoutManager(AsyncTimeout* obj,
   event_base_set(getLibeventBase(), ev);
   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
     // Set the EVLIST_INTERNAL flag
-    ev->ev_flags |= EVLIST_INTERNAL;
+    event_ref_flags(ev) |= EVLIST_INTERNAL;
   }
 }