Consistently have the namespace closing comment
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
index 7654dff5e93399f665ada289af2a0de622caef1b..754e0c520ee118d0bae5ccce75471359d1b82d1e 100644 (file)
@@ -20,8 +20,8 @@
 #include <folly/MPMCQueue.h>
 #include <folly/Range.h>
 #include <folly/Singleton.h>
-#include <folly/ThreadName.h>
 #include <folly/portability/GFlags.h>
+#include <folly/system/ThreadName.h>
 
 namespace folly {
 namespace observer_detail {
@@ -40,7 +40,7 @@ static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
 namespace {
 constexpr size_t kCurrentQueueSize{10 * 1024};
 constexpr size_t kNextQueueSize{10 * 1024};
-}
+} // namespace
 
 class ObserverManager::CurrentQueue {
  public:
@@ -106,28 +106,35 @@ class ObserverManager::NextQueue {
   explicit NextQueue(ObserverManager& manager)
       : manager_(manager), queue_(kNextQueueSize) {
     thread_ = std::thread([&]() {
-      Core::Ptr queueCore;
+      Core::WeakPtr queueCoreWeak;
 
       while (true) {
-        queue_.blockingRead(queueCore);
-
-        if (!queueCore) {
+        queue_.blockingRead(queueCoreWeak);
+        if (stop_) {
           return;
         }
 
         std::vector<Core::Ptr> cores;
-        cores.emplace_back(std::move(queueCore));
+        {
+          auto queueCore = queueCoreWeak.lock();
+          if (!queueCore) {
+            continue;
+          }
+          cores.emplace_back(std::move(queueCore));
+        }
 
         {
           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
 
           // We can't pick more tasks from the queue after we bumped the
           // version, so we have to do this while holding the lock.
-          while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
-            if (!queueCore) {
+          while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
+            if (stop_) {
               return;
             }
-            cores.emplace_back(std::move(queueCore));
+            if (auto queueCore = queueCoreWeak.lock()) {
+              cores.emplace_back(std::move(queueCore));
+            }
           }
 
           ++manager_.version_;
@@ -140,25 +147,27 @@ class ObserverManager::NextQueue {
     });
   }
 
-  void add(Core::Ptr core) {
+  void add(Core::WeakPtr core) {
     queue_.blockingWrite(std::move(core));
   }
 
   ~NextQueue() {
-    // Emtpy element signals thread to terminate
-    queue_.blockingWrite(nullptr);
+    stop_ = true;
+    // Write to the queue to notify the thread.
+    queue_.blockingWrite(Core::WeakPtr());
     thread_.join();
   }
 
  private:
   ObserverManager& manager_;
-  MPMCQueue<Core::Ptr> queue_;
+  MPMCQueue<Core::WeakPtr> queue_;
   std::thread thread_;
+  std::atomic<bool> stop_{false};
 };
 
 ObserverManager::ObserverManager() {
-  currentQueue_ = make_unique<CurrentQueue>();
-  nextQueue_ = make_unique<NextQueue>(*this);
+  currentQueue_ = std::make_unique<CurrentQueue>();
+  nextQueue_ = std::make_unique<NextQueue>(*this);
 }
 
 ObserverManager::~ObserverManager() {
@@ -172,7 +181,7 @@ void ObserverManager::scheduleCurrent(Function<void()> task) {
   currentQueue_->add(std::move(task));
 }
 
-void ObserverManager::scheduleNext(Core::Ptr core) {
+void ObserverManager::scheduleNext(Core::WeakPtr core) {
   nextQueue_->add(std::move(core));
 }
 
@@ -192,5 +201,5 @@ folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
   return Singleton::instance.try_get();
 }
-}
-}
+} // namespace observer_detail
+} // namespace folly