2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/experimental/observer/detail/ObserverManager.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Singleton.h>
22 namespace observer_detail {
24 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
25 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
26 ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
29 constexpr size_t kCurrentThreadPoolSize{4};
30 constexpr size_t kCurrentQueueSize{10 * 1024};
31 constexpr size_t kNextQueueSize{10 * 1024};
34 class ObserverManager::CurrentQueue {
36 CurrentQueue() : queue_(kCurrentQueueSize) {
37 for (size_t i = 0; i < kCurrentThreadPoolSize; ++i) {
38 threads_.emplace_back([&]() {
39 ObserverManager::inManagerThread_ = true;
42 Function<void()> task;
43 queue_.blockingRead(task);
52 LOG(ERROR) << "Exception while running CurrentQueue task: "
53 << exceptionStr(std::current_exception());
61 for (size_t i = 0; i < threads_.size(); ++i) {
62 queue_.blockingWrite(nullptr);
65 for (auto& thread : threads_) {
69 CHECK(queue_.isEmpty());
72 void add(Function<void()> task) {
73 if (ObserverManager::inManagerThread()) {
74 if (!queue_.write(std::move(task))) {
75 throw std::runtime_error("Too many Observers scheduled for update.");
78 queue_.blockingWrite(std::move(task));
83 MPMCQueue<Function<void()>> queue_;
84 std::vector<std::thread> threads_;
87 class ObserverManager::NextQueue {
89 explicit NextQueue(ObserverManager& manager)
90 : manager_(manager), queue_(kNextQueueSize) {
91 thread_ = std::thread([&]() {
95 queue_.blockingRead(queueCore);
101 std::vector<Core::Ptr> cores;
102 cores.emplace_back(std::move(queueCore));
105 SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
107 // We can't pick more tasks from the queue after we bumped the
108 // version, so we have to do this while holding the lock.
109 while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
113 cores.emplace_back(std::move(queueCore));
119 for (auto& core : cores) {
120 manager_.scheduleRefresh(std::move(core), manager_.version_, true);
126 void add(Core::Ptr core) {
127 queue_.blockingWrite(std::move(core));
131 // Emtpy element signals thread to terminate
132 queue_.blockingWrite(nullptr);
137 ObserverManager& manager_;
138 MPMCQueue<Core::Ptr> queue_;
142 ObserverManager::ObserverManager() {
143 currentQueue_ = make_unique<CurrentQueue>();
144 nextQueue_ = make_unique<NextQueue>(*this);
147 ObserverManager::~ObserverManager() {
148 // Destroy NextQueue, before the rest of this object, since it expects
149 // ObserverManager to be alive.
151 currentQueue_.reset();
154 void ObserverManager::scheduleCurrent(Function<void()> task) {
155 currentQueue_->add(std::move(task));
158 void ObserverManager::scheduleNext(Core::Ptr core) {
159 nextQueue_->add(std::move(core));
162 struct ObserverManager::Singleton {
163 static folly::Singleton<ObserverManager> instance;
164 // MSVC 2015 doesn't let us access ObserverManager's constructor if we
165 // try to use a lambda to initialize instance, so we have to create
166 // an actual function instead.
167 static ObserverManager* createManager() {
168 return new ObserverManager();
172 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
175 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
176 return Singleton::instance.try_get();