Thread-safe version of loopKeepAlive()
authorAndrii Grynenko <andrii@fb.com>
Tue, 15 Nov 2016 22:52:48 +0000 (14:52 -0800)
committerFacebook Github Bot <facebook-github-bot-bot@fb.com>
Tue, 15 Nov 2016 22:53:28 +0000 (14:53 -0800)
Reviewed By: yfeldblum

Differential Revision: D4152380

fbshipit-source-id: 8b3c6dc4b14b9138bb5012e05f50496e51c0fa4b

folly/io/async/EventBase.cpp
folly/io/async/EventBase.h
folly/io/async/test/EventBaseTest.cpp

index 2474a0e8cdd91840dba2c3a2aa93a9450bead423..4b3bfac29ab200d4b4bd6c7922ebaee8845d20f7 100644 (file)
@@ -176,7 +176,7 @@ EventBase::~EventBase() {
   // Keep looping until all keep-alive handles are released. Each keep-alive
   // handle signals that some external code will still schedule some work on
   // this EventBase (so it's not safe to destroy it).
-  while (loopKeepAliveCount_ > 0) {
+  while (loopKeepAliveCount() > 0) {
     applyLoopKeepAlive();
     loopOnce();
   }
@@ -412,13 +412,22 @@ bool EventBase::loopBody(int flags) {
   return true;
 }
 
+ssize_t EventBase::loopKeepAliveCount() {
+  if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
+    loopKeepAliveCount_ +=
+        loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
+  }
+  DCHECK_GE(loopKeepAliveCount_, 0);
+  return loopKeepAliveCount_;
+}
+
 void EventBase::applyLoopKeepAlive() {
-  if (loopKeepAliveActive_ && loopKeepAliveCount_ == 0) {
+  if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) {
     // Restore the notification queue internal flag
     fnRunner_->stopConsuming();
     fnRunner_->startConsumingInternal(this, queue_.get());
     loopKeepAliveActive_ = false;
-  } else if (!loopKeepAliveActive_ && loopKeepAliveCount_ > 0) {
+  } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) {
     // Update the notification queue event to treat it as a normal
     // (non-internal) event.  The notification queue event always remains
     // installed, and the main loop won't exit with it installed.
@@ -435,7 +444,12 @@ void EventBase::loopForever() {
       applyLoopKeepAlive();
     };
     // Make sure notification queue events are treated as normal events.
-    auto keepAlive = loopKeepAlive();
+    // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
+    // released inside a loop.
+    ++loopKeepAliveCount_;
+    SCOPE_EXIT {
+      --loopKeepAliveCount_;
+    };
     ret = loop();
   }
 
index a8d4926f26ac0c05e16c8d6f7b78fe0c05741532..4e5bf78a0e5e0cd795ad4a705d0fa8d26c27ee77 100644 (file)
@@ -37,6 +37,7 @@
 #include <folly/Executor.h>
 #include <folly/Function.h>
 #include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
 #include <folly/experimental/ExecutionObserver.h>
 #include <folly/futures/DrivableExecutor.h>
 #include <folly/io/async/AsyncTimeout.h>
@@ -555,7 +556,12 @@ class EventBase : private boost::noncopyable,
 
   /// Implements the DrivableExecutor interface
   void drive() override {
-    auto keepAlive = loopKeepAlive();
+    // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
+    // released inside a loop.
+    ++loopKeepAliveCount_;
+    SCOPE_EXIT {
+      --loopKeepAliveCount_;
+    };
     loopOnce();
   }
 
@@ -579,6 +585,15 @@ class EventBase : private boost::noncopyable,
     return LoopKeepAlive(this);
   }
 
+  // Thread-safe version of loopKeepAlive()
+  LoopKeepAlive loopKeepAliveAtomic() {
+    if (inRunningEventBaseThread()) {
+      return loopKeepAlive();
+    }
+    loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
+    return LoopKeepAlive(this);
+  }
+
   // TimeoutManager
   void attachTimeoutManager(
       AsyncTimeout* obj,
@@ -598,6 +613,8 @@ class EventBase : private boost::noncopyable,
  private:
   void applyLoopKeepAlive();
 
+  ssize_t loopKeepAliveCount();
+
   /*
    * Helper function that tells us whether we have already handled
    * some event/timeout/callback in this loop iteration.
@@ -645,7 +662,8 @@ class EventBase : private boost::noncopyable,
   // to send function requests to the EventBase thread.
   std::unique_ptr<NotificationQueue<Func>> queue_;
   std::unique_ptr<FunctionRunner> fnRunner_;
-  size_t loopKeepAliveCount_{0};
+  ssize_t loopKeepAliveCount_{0};
+  std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
   bool loopKeepAliveActive_{false};
 
   // limit for latency in microseconds (0 disables)
index 06deeebd18af485f5717baf9c16d59e89be6bea4..524cff7bb6390c6f2557c05bb952417e7560816b 100644 (file)
@@ -1826,6 +1826,51 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) {
   t.join();
 }
 
+TEST(EventBaseTest, LoopKeepAliveAtomic) {
+  auto evb = folly::make_unique<EventBase>();
+
+  constexpr size_t kNumThreads = 100;
+  constexpr size_t kNumTasks = 100;
+
+  std::vector<std::thread> ts;
+  std::vector<std::unique_ptr<Baton<>>> batons;
+  size_t done{0};
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    batons.emplace_back(std::make_unique<Baton<>>());
+  }
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
+      std::vector<EventBase::LoopKeepAlive> keepAlives;
+      for (size_t i = 0; i < kNumTasks; ++i) {
+        keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
+      }
+
+      batonPtr->post();
+
+      /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+      for (auto& keepAlive : keepAlives) {
+        evbPtr->runInEventBaseThread(
+            [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
+      }
+    });
+  }
+
+  for (auto& baton : batons) {
+    baton->wait();
+  }
+
+  evb.reset();
+
+  EXPECT_EQ(kNumThreads * kNumTasks, done);
+
+  for (auto& t : ts) {
+    t.join();
+  }
+}
+
 TEST(EventBaseTest, DrivableExecutorTest) {
   folly::Promise<bool> p;
   auto f = p.getFuture();