EventBase keepAlive counter is not atomic
authorJoseph Griego <jgriego@fb.com>
Tue, 14 Jun 2016 00:49:38 +0000 (17:49 -0700)
committerFacebook Github Bot 9 <facebook-github-bot-9-bot@fb.com>
Tue, 14 Jun 2016 00:53:28 +0000 (17:53 -0700)
Summary: Since loopKeepAlive() is always used from the EventBase thread, there's no need for the overhead of an shared_ptr (and therefore, an atomic ref counter); we can get away without thread safety. This also allows us to discard the (sometimes incorrect) optimization of not returning a handle when it appears the loop will continue running anyways

Reviewed By: andriigrynenko

Differential Revision: D3375503

fbshipit-source-id: 474e4fcf992bdc4fcca9370d3c57bdcc4e042386

folly/io/async/EventBase.cpp
folly/io/async/EventBase.h
folly/io/async/test/EventBaseTest.cpp

index 9d5949e0a0718746d83eb3253d70027eb60760c6..db2efcef3209fae8ebf8489e0837e3b0d0ac7234 100644 (file)
@@ -198,7 +198,7 @@ EventBase::~EventBase() {
   // Keep looping until all keep-alive handles are released. Each keep-alive
   // handle signals that some external code will still schedule some work on
   // this EventBase (so it's not safe to destroy it).
-  while (!loopKeepAlive_.unique()) {
+  while (loopKeepAliveCount_ > 0) {
     applyLoopKeepAlive();
     loopOnce();
   }
@@ -448,12 +448,12 @@ bool EventBase::loopBody(int flags) {
 }
 
 void EventBase::applyLoopKeepAlive() {
-  if (loopKeepAliveActive_ && loopKeepAlive_.unique()) {
+  if (loopKeepAliveActive_ && loopKeepAliveCount_ == 0) {
     // Restore the notification queue internal flag
     fnRunner_->stopConsuming();
     fnRunner_->startConsumingInternal(this, queue_.get());
     loopKeepAliveActive_ = false;
-  } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) {
+  } else if (!loopKeepAliveActive_ && loopKeepAliveCount_ > 0) {
     // Update the notification queue event to treat it as a normal
     // (non-internal) event.  The notification queue event always remains
     // installed, and the main loop won't exit with it installed.
@@ -468,11 +468,9 @@ void EventBase::loopForever() {
   {
     SCOPE_EXIT {
       applyLoopKeepAlive();
-      loopForeverActive_ = false;
     };
-    loopForeverActive_ = true;
     // Make sure notification queue events are treated as normal events.
-    auto loopKeepAlive = loopKeepAlive_;
+    auto keepAlive = loopKeepAlive();
     ret = loop();
   }
 
index 3fcf60e3a63295af0e14e4aca561e23f137cafb2..89e89fd1b1765bf0d2b77bf98641a47d3dd46941 100644 (file)
@@ -587,7 +587,13 @@ class EventBase : private boost::noncopyable,
     loopOnce();
   }
 
-  using LoopKeepAlive = std::shared_ptr<void>;
+  struct LoopKeepAliveDeleter {
+    void operator()(EventBase* evb) {
+      DCHECK(evb->isInEventBaseThread());
+      evb->loopKeepAliveCount_--;
+    }
+  };
+  using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
 
   /// Returns you a handle which make loop() behave like loopForever() until
   /// destroyed. loop() will return to its original behavior only when all
@@ -596,11 +602,9 @@ class EventBase : private boost::noncopyable,
   ///
   /// May return no op LoopKeepAlive if loopForever() is already running.
   LoopKeepAlive loopKeepAlive() {
-    if (loopForeverActive_) {
-      return nullptr;
-    } else {
-      return loopKeepAlive_;
-    }
+    DCHECK(isInEventBaseThread());
+    loopKeepAliveCount_++;
+    return LoopKeepAlive(this);
   }
 
  private:
@@ -692,9 +696,8 @@ class EventBase : private boost::noncopyable,
   // to send function requests to the EventBase thread.
   std::unique_ptr<NotificationQueue<Func>> queue_;
   std::unique_ptr<FunctionRunner> fnRunner_;
-  LoopKeepAlive loopKeepAlive_{std::make_shared<int>(42)};
+  size_t loopKeepAliveCount_{0};
   bool loopKeepAliveActive_{false};
-  std::atomic<bool> loopForeverActive_{false};
 
   // limit for latency in microseconds (0 disables)
   int64_t maxLatency_;
index bc8988abb1ff5787c7251d1168f0bbceb53fe5ce..d5998bf103775ba879073e0137021a612b5291d4 100644 (file)
@@ -1733,7 +1733,7 @@ TEST(EventBaseTest, LoopKeepAlive) {
   EventBase evb;
 
   bool done = false;
-  std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] {
+  std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
     /* sleep override */ std::this_thread::sleep_for(
         std::chrono::milliseconds(100));
     evb.runInEventBaseThread(
@@ -1754,7 +1754,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
   std::thread t;
 
   evb.runInEventBaseThread([&] {
-    t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] {
+    t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
       /* sleep override */ std::this_thread::sleep_for(
           std::chrono::milliseconds(100));
       evb.runInEventBaseThread(
@@ -1769,20 +1769,49 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
   t.join();
 }
 
+TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
+  std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
+
+  bool done = false;
+
+  std::thread evThread([&] {
+    evb->loopForever();
+    evb.reset();
+    done = true;
+  });
+
+  {
+    auto* ev = evb.get();
+    EventBase::LoopKeepAlive keepAlive;
+    ev->runInEventBaseThreadAndWait(
+        [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
+    ASSERT_FALSE(done) << "Loop finished before we asked it to";
+    ev->terminateLoopSoon();
+    /* sleep override */
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    ASSERT_FALSE(done) << "Loop terminated early";
+    ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
+  }
+
+  evThread.join();
+  ASSERT_TRUE(done);
+}
+
 TEST(EventBaseTest, LoopKeepAliveShutdown) {
   auto evb = folly::make_unique<EventBase>();
 
   bool done = false;
 
-  std::thread t(
-      [&done, loopKeepAlive = evb->loopKeepAlive(), evbPtr = evb.get() ] {
-        /* sleep override */ std::this_thread::sleep_for(
-            std::chrono::milliseconds(100));
-        evbPtr->runInEventBaseThread(
-            [&done, loopKeepAlive = std::move(loopKeepAlive) ] {
-              done = true;
-            });
-      });
+  std::thread t([
+    &done,
+    loopKeepAlive = evb->loopKeepAlive(),
+    evbPtr = evb.get()
+  ]() mutable {
+    /* sleep override */ std::this_thread::sleep_for(
+        std::chrono::milliseconds(100));
+    evbPtr->runInEventBaseThread(
+        [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+  });
 
   evb.reset();