2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/observer/detail/Core.h>
19 #include <folly/ExceptionString.h>
20 #include <folly/experimental/observer/detail/ObserverManager.h>
23 namespace observer_detail {
25 Core::VersionedData Core::getData() {
26 if (!ObserverManager::inManagerThread()) {
30 ObserverManager::DependencyRecorder::markDependency(shared_from_this());
32 auto version = ObserverManager::getVersion();
34 if (version_ >= version) {
40 DCHECK_GE(version_, version);
44 size_t Core::refresh(size_t version, bool force) {
45 CHECK(ObserverManager::inManagerThread());
47 ObserverManager::DependencyRecorder::markRefreshDependency(*this);
49 ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
52 if (version_ >= version) {
53 return versionLastChange_;
57 std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
59 // Recheck in case this code was already refreshed
60 if (version_ >= version) {
61 return versionLastChange_;
64 bool needRefresh = force || version_ == 0;
66 ObserverManager::DependencyRecorder dependencyRecorder(*this);
68 // This can be run in parallel, but we expect most updates to propagate
70 dependencies_.withRLock([&](const Dependencies& dependencies) {
71 for (const auto& dependency : dependencies) {
73 if (dependency->refresh(version) > version_) {
78 LOG(ERROR) << "Exception while checking dependencies for updates: "
79 << exceptionStr(std::current_exception());
89 return versionLastChange_;
94 VersionedData newData{creator_(), version};
96 throw std::logic_error("Observer creator returned nullptr.");
101 versionLastChange_ = version;
103 LOG(ERROR) << "Exception while refreshing Observer: "
104 << exceptionStr(std::current_exception());
107 // Re-throw exception if this is the first time we run creator
114 if (versionLastChange_ != version) {
115 return versionLastChange_;
118 auto newDependencies = dependencyRecorder.release();
119 dependencies_.withWLock([&](Dependencies& dependencies) {
120 for (const auto& dependency : newDependencies) {
121 if (!dependencies.count(dependency)) {
122 dependency->addDependent(this->shared_from_this());
126 for (const auto& dependency : dependencies) {
127 if (!newDependencies.count(dependency)) {
128 dependency->removeStaleDependents();
132 dependencies = std::move(newDependencies);
136 auto dependents = dependents_.copy();
138 for (const auto& dependentWeak : dependents) {
139 if (auto dependent = dependentWeak.lock()) {
140 ObserverManager::scheduleRefresh(std::move(dependent), version);
144 return versionLastChange_;
147 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
148 : creator_(std::move(creator)) {}
151 dependencies_.withWLock([](const Dependencies& dependencies) {
152 for (const auto& dependecy : dependencies) {
153 dependecy->removeStaleDependents();
158 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
159 auto core = Core::Ptr(new Core(std::move(creator)));
163 void Core::addDependent(Core::WeakPtr dependent) {
164 dependents_.withWLock([&](Dependents& dependents) {
165 dependents.push_back(std::move(dependent));
169 void Core::removeStaleDependents() {
170 // This is inefficient, the assumption is that we won't have many dependents
171 dependents_.withWLock([](Dependents& dependents) {
172 for (size_t i = 0; i < dependents.size(); ++i) {
173 if (dependents[i].expired()) {
174 std::swap(dependents[i], dependents.back());
175 dependents.pop_back();