experimental/LockFreeRingBuffer.h \
experimental/NestedCommandLineApp.h \
experimental/observer/detail/Core.h \
+ experimental/observer/detail/GraphCycleDetector.h \
experimental/observer/detail/ObserverManager.h \
experimental/observer/detail/Observer-pre.h \
experimental/observer/Observable.h \
size_t Core::refresh(size_t version, bool force) {
CHECK(ObserverManager::inManagerThread());
+ ObserverManager::DependencyRecorder::markRefreshDependency(*this);
+ SCOPE_EXIT {
+ ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
+ };
+
if (version_ >= version) {
return versionLastChange_;
}
- bool refreshDependents = false;
-
{
std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
bool needRefresh = force || version_ == 0;
+ ObserverManager::DependencyRecorder dependencyRecorder(*this);
+
// This can be run in parallel, but we expect most updates to propagate
// bottom to top.
dependencies_.withRLock([&](const Dependencies& dependencies) {
for (const auto& dependency : dependencies) {
- if (dependency->refresh(version) > version_) {
+ try {
+ if (dependency->refresh(version) > version_) {
+ needRefresh = true;
+ break;
+ }
+ } catch (...) {
+ LOG(ERROR) << "Exception while checking dependencies for updates: "
+ << exceptionStr(std::current_exception());
+
needRefresh = true;
break;
}
return versionLastChange_;
}
- ObserverManager::DependencyRecorder dependencyRecorder;
-
try {
{
VersionedData newData{creator_(), version};
}
versionLastChange_ = version;
- refreshDependents = true;
} catch (...) {
LOG(ERROR) << "Exception while refreshing Observer: "
<< exceptionStr(std::current_exception());
version_ = version;
+ if (versionLastChange_ != version) {
+ return versionLastChange_;
+ }
+
auto newDependencies = dependencyRecorder.release();
dependencies_.withWLock([&](Dependencies& dependencies) {
for (const auto& dependency : newDependencies) {
});
}
- if (refreshDependents) {
- auto dependents = dependents_.copy();
+ auto dependents = dependents_.copy();
- for (const auto& dependentWeak : dependents) {
- if (auto dependent = dependentWeak.lock()) {
- ObserverManager::scheduleRefresh(std::move(dependent), version);
- }
+ for (const auto& dependentWeak : dependents) {
+ if (auto dependent = dependentWeak.lock()) {
+ ObserverManager::scheduleRefresh(std::move(dependent), version);
}
}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <unordered_map>
+#include <unordered_set>
+
+namespace folly {
+namespace observer_detail {
+
+template <typename NodeId>
+class GraphCycleDetector {
+ using NodeSet = std::unordered_set<NodeId>;
+
+ public:
+ /**
+ * Add new edge. If edge creates a cycle then it's not added and false is
+ * returned.
+ */
+ bool addEdge(NodeId from, NodeId to) {
+ // In general case DFS may be expensive here, but in most cases to-node will
+ // have no edges, so it should be O(1).
+ NodeSet visitedNodes;
+ dfs(visitedNodes, to);
+ if (visitedNodes.count(from)) {
+ return false;
+ }
+
+ auto& nodes = edges_[from];
+ DCHECK_EQ(0, nodes.count(to));
+ nodes.insert(to);
+
+ return true;
+ }
+
+ void removeEdge(NodeId from, NodeId to) {
+ auto& nodes = edges_[from];
+ DCHECK(nodes.count(to));
+ nodes.erase(to);
+ if (nodes.empty()) {
+ edges_.erase(from);
+ }
+ }
+
+ private:
+ void dfs(NodeSet& visitedNodes, NodeId node) {
+ // We don't terminate early if cycle is detected, because this is considered
+ // an error condition, so not worth optimizing for.
+ if (visitedNodes.count(node)) {
+ return;
+ }
+
+ visitedNodes.insert(node);
+
+ if (!edges_.count(node)) {
+ return;
+ }
+
+ for (const auto& to : edges_[node]) {
+ dfs(visitedNodes, to);
+ }
+ }
+
+ std::unordered_map<NodeId, NodeSet> edges_;
+};
+}
+}
#pragma once
#include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/GraphCycleDetector.h>
#include <folly/futures/Future.h>
namespace folly {
class DependencyRecorder {
public:
- using Dependencies = std::unordered_set<Core::Ptr>;
+ using DependencySet = std::unordered_set<Core::Ptr>;
+ struct Dependencies {
+ explicit Dependencies(const Core& core_) : core(core_) {}
- DependencyRecorder() {
+ DependencySet dependencies;
+ const Core& core;
+ };
+
+ explicit DependencyRecorder(const Core& core) : dependencies_(core) {
DCHECK(inManagerThread());
previousDepedencies_ = currentDependencies_;
DCHECK(inManagerThread());
DCHECK(currentDependencies_);
- currentDependencies_->insert(std::move(dependency));
+ currentDependencies_->dependencies.insert(std::move(dependency));
+ }
+
+ static void markRefreshDependency(const Core& core) {
+ if (!currentDependencies_) {
+ return;
+ }
+
+ if (auto instance = getInstance()) {
+ instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
+ bool hasCycle =
+ !cycleDetector.addEdge(¤tDependencies_->core, &core);
+ if (hasCycle) {
+ throw std::logic_error("Observer cycle detected.");
+ }
+ });
+ }
+ }
+
+ static void unmarkRefreshDependency(const Core& core) {
+ if (!currentDependencies_) {
+ return;
+ }
+
+ if (auto instance = getInstance()) {
+ instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
+ cycleDetector.removeEdge(¤tDependencies_->core, &core);
+ });
+ }
}
- Dependencies release() {
+ DependencySet release() {
DCHECK(currentDependencies_ == &dependencies_);
std::swap(currentDependencies_, previousDepedencies_);
previousDepedencies_ = nullptr;
- return std::move(dependencies_);
+ return std::move(dependencies_.dependencies);
}
~DependencyRecorder() {
- if (previousDepedencies_) {
+ if (currentDependencies_ == &dependencies_) {
release();
}
}
*/
SharedMutexReadPriority versionMutex_;
std::atomic<size_t> version_{1};
+
+ using CycleDetector = GraphCycleDetector<const Core*>;
+ folly::Synchronized<CycleDetector, std::mutex> cycleDetector_;
};
}
}
EXPECT_EQ(46, **oddObserver);
}
+TEST(Observer, Cycle) {
+ SimpleObservable<int> observable(0);
+ auto observer = observable.getObserver();
+ folly::Optional<Observer<int>> observerB;
+
+ auto observerA = makeObserver([observer, &observerB]() {
+ auto value = **observer;
+ if (value == 1) {
+ **observerB;
+ }
+ return value;
+ });
+
+ observerB = makeObserver([observerA]() { return **observerA; });
+
+ auto collectObserver = makeObserver([observer, observerA, &observerB]() {
+ auto value = **observer;
+ auto valueA = **observerA;
+ auto valueB = ***observerB;
+
+ if (value == 1) {
+ if (valueA == 0) {
+ EXPECT_EQ(0, valueB);
+ } else {
+ EXPECT_EQ(1, valueA);
+ EXPECT_EQ(0, valueB);
+ }
+ } else if (value == 2) {
+ EXPECT_EQ(value, valueA);
+ EXPECT_TRUE(valueB == 0 || valueB == 2);
+ } else {
+ EXPECT_EQ(value, valueA);
+ EXPECT_EQ(value, valueB);
+ }
+
+ return value;
+ });
+
+ folly::Baton<> baton;
+ auto waitingObserver = makeObserver([collectObserver, &baton]() {
+ *collectObserver;
+ baton.post();
+ return folly::Unit();
+ });
+
+ baton.reset();
+ EXPECT_EQ(0, **collectObserver);
+
+ for (size_t i = 1; i <= 3; ++i) {
+ observable.setValue(i);
+
+ EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+ baton.reset();
+
+ EXPECT_EQ(i, **collectObserver);
+ }
+}
+
TEST(Observer, Stress) {
SimpleObservable<int> observable(0);