2 * Copyright 2016-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/experimental/observer/detail/Core.h>
19 #include <folly/experimental/observer/detail/GraphCycleDetector.h>
20 #include <folly/futures/Future.h>
23 namespace observer_detail {
26 * ObserverManager is a singleton which controls the re-computation of all
27 * Observers. Such re-computation always happens on the thread pool owned by
30 * ObserverManager has global current version. All existing Observers
31 * may have their version be less (yet to be updated) or equal (up to date)
32 * to the global current version.
34 * ObserverManager::CurrentQueue contains all of the Observers which need to be
35 * updated to the global current version. Those updates are peformed on the
36 * ObserverManager's thread pool, until the queue is empty. If some Observer is
37 * updated, all of its dependents are added to ObserverManager::CurrentQueue
40 * If some leaf Observer (i.e. created from Observable) is updated, then current
41 * version of the ObserverManager should be bumped. All such updated leaf
42 * Observers are added to the ObserverManager::NextQueue.
44 * *Only* when ObserverManager::CurrentQueue is empty, the global current
45 * version is bumped and all updates from the ObserverManager::NextQueue are
46 * performed. If leaf Observer gets updated more then once before being picked
47 * from the ObserverManager::NextQueue, then only the last update is processed.
49 class ObserverManager {
51 static size_t getVersion() {
52 auto instance = getInstance();
58 return instance->version_;
61 static bool inManagerThread() {
62 return inManagerThread_;
66 scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
67 if (core->getVersion() >= minVersion) {
68 return makeFuture<Unit>(Unit());
71 auto instance = getInstance();
74 return makeFuture<Unit>(
75 std::logic_error("ObserverManager requested during shutdown"));
78 Promise<Unit> promise;
79 auto future = promise.getFuture();
81 SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
83 instance->scheduleCurrent([
84 core = std::move(core),
85 promise = std::move(promise),
86 instancePtr = instance.get(),
90 promise.setWith([&]() { core->refresh(instancePtr->version_, force); });
96 static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak) {
97 auto instance = getInstance();
103 instance->scheduleNext(std::move(coreWeak));
106 static void initCore(Core::Ptr core) {
107 DCHECK(core->getVersion() == 0);
108 scheduleRefresh(std::move(core), 1).get();
111 class DependencyRecorder {
113 using DependencySet = std::unordered_set<Core::Ptr>;
114 struct Dependencies {
115 explicit Dependencies(const Core& core_) : core(core_) {}
117 DependencySet dependencies;
121 explicit DependencyRecorder(const Core& core) : dependencies_(core) {
122 DCHECK(inManagerThread());
124 previousDepedencies_ = currentDependencies_;
125 currentDependencies_ = &dependencies_;
128 static void markDependency(Core::Ptr dependency) {
129 DCHECK(inManagerThread());
130 DCHECK(currentDependencies_);
132 currentDependencies_->dependencies.insert(std::move(dependency));
135 static void markRefreshDependency(const Core& core) {
136 if (!currentDependencies_) {
140 if (auto instance = getInstance()) {
141 instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
143 !cycleDetector.addEdge(¤tDependencies_->core, &core);
145 throw std::logic_error("Observer cycle detected.");
151 static void unmarkRefreshDependency(const Core& core) {
152 if (!currentDependencies_) {
156 if (auto instance = getInstance()) {
157 instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
158 cycleDetector.removeEdge(¤tDependencies_->core, &core);
163 DependencySet release() {
164 DCHECK(currentDependencies_ == &dependencies_);
165 std::swap(currentDependencies_, previousDepedencies_);
166 previousDepedencies_ = nullptr;
168 return std::move(dependencies_.dependencies);
171 ~DependencyRecorder() {
172 if (currentDependencies_ == &dependencies_) {
178 Dependencies dependencies_;
179 Dependencies* previousDepedencies_;
181 static FOLLY_TLS Dependencies* currentDependencies_;
191 void scheduleCurrent(Function<void()>);
192 void scheduleNext(Core::WeakPtr);
197 std::unique_ptr<CurrentQueue> currentQueue_;
198 std::unique_ptr<NextQueue> nextQueue_;
200 static std::shared_ptr<ObserverManager> getInstance();
201 static FOLLY_TLS bool inManagerThread_;
204 * Version mutex is used to make sure all updates are processed from the
205 * CurrentQueue, before bumping the version and moving to the NextQueue.
207 * To achieve this every task added to CurrentQueue holds a reader lock.
208 * NextQueue grabs a writer lock before bumping the version, so it can only
209 * happen if CurrentQueue is empty (notice that we use read-priority shared
212 SharedMutexReadPriority versionMutex_;
213 std::atomic<size_t> version_{1};
215 using CycleDetector = GraphCycleDetector<const Core*>;
216 folly::Synchronized<CycleDetector, std::mutex> cycleDetector_;
218 } // namespace observer_detail