2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/experimental/observer/detail/ObserverManager.h>
18 #include <folly/ExceptionString.h>
19 #include <folly/Format.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/Range.h>
22 #include <folly/Singleton.h>
23 #include <folly/portability/GFlags.h>
24 #include <folly/system/ThreadName.h>
27 namespace observer_detail {
29 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
30 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
31 ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
34 observer_manager_pool_size,
36 "How many internal threads ObserverManager should use");
38 static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
41 constexpr size_t kCurrentQueueSize{10 * 1024};
42 constexpr size_t kNextQueueSize{10 * 1024};
45 class ObserverManager::CurrentQueue {
47 CurrentQueue() : queue_(kCurrentQueueSize) {
48 if (FLAGS_observer_manager_pool_size < 1) {
49 LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
50 FLAGS_observer_manager_pool_size = 1;
52 for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
53 threads_.emplace_back([this, i]() {
55 folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
56 ObserverManager::inManagerThread_ = true;
59 Function<void()> task;
60 queue_.blockingRead(task);
69 LOG(ERROR) << "Exception while running CurrentQueue task: "
70 << exceptionStr(std::current_exception());
78 for (size_t i = 0; i < threads_.size(); ++i) {
79 queue_.blockingWrite(nullptr);
82 for (auto& thread : threads_) {
86 CHECK(queue_.isEmpty());
89 void add(Function<void()> task) {
90 if (ObserverManager::inManagerThread()) {
91 if (!queue_.write(std::move(task))) {
92 throw std::runtime_error("Too many Observers scheduled for update.");
95 queue_.blockingWrite(std::move(task));
100 MPMCQueue<Function<void()>> queue_;
101 std::vector<std::thread> threads_;
104 class ObserverManager::NextQueue {
106 explicit NextQueue(ObserverManager& manager)
107 : manager_(manager), queue_(kNextQueueSize) {
108 thread_ = std::thread([&]() {
109 Core::WeakPtr queueCoreWeak;
112 queue_.blockingRead(queueCoreWeak);
117 std::vector<Core::Ptr> cores;
119 auto queueCore = queueCoreWeak.lock();
123 cores.emplace_back(std::move(queueCore));
127 SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
129 // We can't pick more tasks from the queue after we bumped the
130 // version, so we have to do this while holding the lock.
131 while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
135 if (auto queueCore = queueCoreWeak.lock()) {
136 cores.emplace_back(std::move(queueCore));
143 for (auto& core : cores) {
144 manager_.scheduleRefresh(std::move(core), manager_.version_, true);
150 void add(Core::WeakPtr core) {
151 queue_.blockingWrite(std::move(core));
156 // Write to the queue to notify the thread.
157 queue_.blockingWrite(Core::WeakPtr());
162 ObserverManager& manager_;
163 MPMCQueue<Core::WeakPtr> queue_;
165 std::atomic<bool> stop_{false};
168 ObserverManager::ObserverManager() {
169 currentQueue_ = std::make_unique<CurrentQueue>();
170 nextQueue_ = std::make_unique<NextQueue>(*this);
173 ObserverManager::~ObserverManager() {
174 // Destroy NextQueue, before the rest of this object, since it expects
175 // ObserverManager to be alive.
177 currentQueue_.reset();
180 void ObserverManager::scheduleCurrent(Function<void()> task) {
181 currentQueue_->add(std::move(task));
184 void ObserverManager::scheduleNext(Core::WeakPtr core) {
185 nextQueue_->add(std::move(core));
188 struct ObserverManager::Singleton {
189 static folly::Singleton<ObserverManager> instance;
190 // MSVC 2015 doesn't let us access ObserverManager's constructor if we
191 // try to use a lambda to initialize instance, so we have to create
192 // an actual function instead.
193 static ObserverManager* createManager() {
194 return new ObserverManager();
198 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
201 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
202 return Singleton::instance.try_get();