2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/experimental/observer/detail/Core.h>
17 #include <folly/experimental/observer/detail/ObserverManager.h>
20 namespace observer_detail {
22 Core::VersionedData Core::getData() {
23 if (!ObserverManager::inManagerThread()) {
27 ObserverManager::DependencyRecorder::markDependency(shared_from_this());
29 auto version = ObserverManager::getVersion();
31 if (version_ >= version) {
37 DCHECK_GE(version_, version);
41 size_t Core::refresh(size_t version, bool force) {
42 CHECK(ObserverManager::inManagerThread());
44 if (version_ >= version) {
45 return versionLastChange_;
48 bool refreshDependents = false;
51 std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
53 // Recheck in case this code was already refreshed
54 if (version_ >= version) {
55 return versionLastChange_;
58 bool needRefresh = force || version_ == 0;
60 // This can be run in parallel, but we expect most updates to propagate
62 dependencies_.withRLock([&](const Dependencies& dependencies) {
63 for (const auto& dependency : dependencies) {
64 if (dependency->refresh(version) > version_) {
73 return versionLastChange_;
76 ObserverManager::DependencyRecorder dependencyRecorder;
80 VersionedData newData{creator_(), version};
82 throw std::logic_error("Observer creator returned nullptr.");
87 versionLastChange_ = version;
88 refreshDependents = true;
90 LOG(ERROR) << "Exception while refreshing Observer: "
91 << exceptionStr(std::current_exception());
94 // Re-throw exception if this is the first time we run creator
101 auto newDependencies = dependencyRecorder.release();
102 dependencies_.withWLock([&](Dependencies& dependencies) {
103 for (const auto& dependency : newDependencies) {
104 if (!dependencies.count(dependency)) {
105 dependency->addDependent(this->shared_from_this());
109 for (const auto& dependency : dependencies) {
110 if (!newDependencies.count(dependency)) {
111 dependency->removeStaleDependents();
115 dependencies = std::move(newDependencies);
119 if (refreshDependents) {
120 auto dependents = dependents_.copy();
122 for (const auto& dependentWeak : dependents) {
123 if (auto dependent = dependentWeak.lock()) {
124 ObserverManager::scheduleRefresh(std::move(dependent), version);
129 return versionLastChange_;
132 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
133 : creator_(std::move(creator)) {}
136 dependencies_.withWLock([](const Dependencies& dependencies) {
137 for (const auto& dependecy : dependencies) {
138 dependecy->removeStaleDependents();
143 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
144 auto core = Core::Ptr(new Core(std::move(creator)));
148 void Core::addDependent(Core::WeakPtr dependent) {
149 dependents_.withWLock([&](Dependents& dependents) {
150 dependents.push_back(std::move(dependent));
154 void Core::removeStaleDependents() {
155 // This is inefficient, the assumption is that we won't have many dependents
156 dependents_.withWLock([](Dependents& dependents) {
157 for (size_t i = 0; i < dependents.size(); ++i) {
158 if (dependents[i].expired()) {
159 std::swap(dependents[i], dependents.back());
160 dependents.pop_back();