From: Andrii Grynenko Date: Wed, 24 Aug 2016 18:24:33 +0000 (-0700) Subject: Cycle detection X-Git-Tag: v2016.08.29.00~18 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=76745f68de5354b1958282d72bce6c40b4d300ca;p=folly.git Cycle detection Summary: 1. This implements a GraphCycleDetector which can check if newly added edge belongs to a cycle in a directed graph. GraphCycleDetector is used to detect cycles between Observers when creator function is run. 2. This also fixes a bug where new dependencies could be saved even if Observer creator failed. Reviewed By: yfeldblum Differential Revision: D3746743 fbshipit-source-id: 99d10446c56fa4d8f7485f38309e8a282cd21bdf --- diff --git a/folly/Makefile.am b/folly/Makefile.am index a51285f4..fb3b5582 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -107,6 +107,7 @@ nobase_follyinclude_HEADERS = \ experimental/LockFreeRingBuffer.h \ experimental/NestedCommandLineApp.h \ experimental/observer/detail/Core.h \ + experimental/observer/detail/GraphCycleDetector.h \ experimental/observer/detail/ObserverManager.h \ experimental/observer/detail/Observer-pre.h \ experimental/observer/Observable.h \ diff --git a/folly/experimental/observer/detail/Core.cpp b/folly/experimental/observer/detail/Core.cpp index c4e317ec..a0372bff 100644 --- a/folly/experimental/observer/detail/Core.cpp +++ b/folly/experimental/observer/detail/Core.cpp @@ -41,12 +41,15 @@ Core::VersionedData Core::getData() { size_t Core::refresh(size_t version, bool force) { CHECK(ObserverManager::inManagerThread()); + ObserverManager::DependencyRecorder::markRefreshDependency(*this); + SCOPE_EXIT { + ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this); + }; + if (version_ >= version) { return versionLastChange_; } - bool refreshDependents = false; - { std::lock_guard lgRefresh(refreshMutex_); @@ -57,11 +60,21 @@ size_t Core::refresh(size_t version, bool force) { bool needRefresh = force || version_ == 0; + ObserverManager::DependencyRecorder dependencyRecorder(*this); + // This can be run in parallel, but we expect most updates to propagate // bottom to top. dependencies_.withRLock([&](const Dependencies& dependencies) { for (const auto& dependency : dependencies) { - if (dependency->refresh(version) > version_) { + try { + if (dependency->refresh(version) > version_) { + needRefresh = true; + break; + } + } catch (...) { + LOG(ERROR) << "Exception while checking dependencies for updates: " + << exceptionStr(std::current_exception()); + needRefresh = true; break; } @@ -73,8 +86,6 @@ size_t Core::refresh(size_t version, bool force) { return versionLastChange_; } - ObserverManager::DependencyRecorder dependencyRecorder; - try { { VersionedData newData{creator_(), version}; @@ -85,7 +96,6 @@ size_t Core::refresh(size_t version, bool force) { } versionLastChange_ = version; - refreshDependents = true; } catch (...) { LOG(ERROR) << "Exception while refreshing Observer: " << exceptionStr(std::current_exception()); @@ -98,6 +108,10 @@ size_t Core::refresh(size_t version, bool force) { version_ = version; + if (versionLastChange_ != version) { + return versionLastChange_; + } + auto newDependencies = dependencyRecorder.release(); dependencies_.withWLock([&](Dependencies& dependencies) { for (const auto& dependency : newDependencies) { @@ -116,13 +130,11 @@ size_t Core::refresh(size_t version, bool force) { }); } - if (refreshDependents) { - auto dependents = dependents_.copy(); + auto dependents = dependents_.copy(); - for (const auto& dependentWeak : dependents) { - if (auto dependent = dependentWeak.lock()) { - ObserverManager::scheduleRefresh(std::move(dependent), version); - } + for (const auto& dependentWeak : dependents) { + if (auto dependent = dependentWeak.lock()) { + ObserverManager::scheduleRefresh(std::move(dependent), version); } } diff --git a/folly/experimental/observer/detail/GraphCycleDetector.h b/folly/experimental/observer/detail/GraphCycleDetector.h new file mode 100644 index 00000000..5b0945a0 --- /dev/null +++ b/folly/experimental/observer/detail/GraphCycleDetector.h @@ -0,0 +1,80 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { +namespace observer_detail { + +template +class GraphCycleDetector { + using NodeSet = std::unordered_set; + + public: + /** + * Add new edge. If edge creates a cycle then it's not added and false is + * returned. + */ + bool addEdge(NodeId from, NodeId to) { + // In general case DFS may be expensive here, but in most cases to-node will + // have no edges, so it should be O(1). + NodeSet visitedNodes; + dfs(visitedNodes, to); + if (visitedNodes.count(from)) { + return false; + } + + auto& nodes = edges_[from]; + DCHECK_EQ(0, nodes.count(to)); + nodes.insert(to); + + return true; + } + + void removeEdge(NodeId from, NodeId to) { + auto& nodes = edges_[from]; + DCHECK(nodes.count(to)); + nodes.erase(to); + if (nodes.empty()) { + edges_.erase(from); + } + } + + private: + void dfs(NodeSet& visitedNodes, NodeId node) { + // We don't terminate early if cycle is detected, because this is considered + // an error condition, so not worth optimizing for. + if (visitedNodes.count(node)) { + return; + } + + visitedNodes.insert(node); + + if (!edges_.count(node)) { + return; + } + + for (const auto& to : edges_[node]) { + dfs(visitedNodes, to); + } + } + + std::unordered_map edges_; +}; +} +} diff --git a/folly/experimental/observer/detail/ObserverManager.h b/folly/experimental/observer/detail/ObserverManager.h index 6319bedb..49ceb67c 100644 --- a/folly/experimental/observer/detail/ObserverManager.h +++ b/folly/experimental/observer/detail/ObserverManager.h @@ -16,6 +16,7 @@ #pragma once #include +#include #include namespace folly { @@ -109,9 +110,15 @@ class ObserverManager { class DependencyRecorder { public: - using Dependencies = std::unordered_set; + using DependencySet = std::unordered_set; + struct Dependencies { + explicit Dependencies(const Core& core_) : core(core_) {} - DependencyRecorder() { + DependencySet dependencies; + const Core& core; + }; + + explicit DependencyRecorder(const Core& core) : dependencies_(core) { DCHECK(inManagerThread()); previousDepedencies_ = currentDependencies_; @@ -122,19 +129,47 @@ class ObserverManager { DCHECK(inManagerThread()); DCHECK(currentDependencies_); - currentDependencies_->insert(std::move(dependency)); + currentDependencies_->dependencies.insert(std::move(dependency)); + } + + static void markRefreshDependency(const Core& core) { + if (!currentDependencies_) { + return; + } + + if (auto instance = getInstance()) { + instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) { + bool hasCycle = + !cycleDetector.addEdge(¤tDependencies_->core, &core); + if (hasCycle) { + throw std::logic_error("Observer cycle detected."); + } + }); + } + } + + static void unmarkRefreshDependency(const Core& core) { + if (!currentDependencies_) { + return; + } + + if (auto instance = getInstance()) { + instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) { + cycleDetector.removeEdge(¤tDependencies_->core, &core); + }); + } } - Dependencies release() { + DependencySet release() { DCHECK(currentDependencies_ == &dependencies_); std::swap(currentDependencies_, previousDepedencies_); previousDepedencies_ = nullptr; - return std::move(dependencies_); + return std::move(dependencies_.dependencies); } ~DependencyRecorder() { - if (previousDepedencies_) { + if (currentDependencies_ == &dependencies_) { release(); } } @@ -176,6 +211,9 @@ class ObserverManager { */ SharedMutexReadPriority versionMutex_; std::atomic version_{1}; + + using CycleDetector = GraphCycleDetector; + folly::Synchronized cycleDetector_; }; } } diff --git a/folly/experimental/observer/test/ObserverTest.cpp b/folly/experimental/observer/test/ObserverTest.cpp index abfbc368..6ace4203 100644 --- a/folly/experimental/observer/test/ObserverTest.cpp +++ b/folly/experimental/observer/test/ObserverTest.cpp @@ -148,6 +148,64 @@ TEST(Observer, NullValue) { EXPECT_EQ(46, **oddObserver); } +TEST(Observer, Cycle) { + SimpleObservable observable(0); + auto observer = observable.getObserver(); + folly::Optional> observerB; + + auto observerA = makeObserver([observer, &observerB]() { + auto value = **observer; + if (value == 1) { + **observerB; + } + return value; + }); + + observerB = makeObserver([observerA]() { return **observerA; }); + + auto collectObserver = makeObserver([observer, observerA, &observerB]() { + auto value = **observer; + auto valueA = **observerA; + auto valueB = ***observerB; + + if (value == 1) { + if (valueA == 0) { + EXPECT_EQ(0, valueB); + } else { + EXPECT_EQ(1, valueA); + EXPECT_EQ(0, valueB); + } + } else if (value == 2) { + EXPECT_EQ(value, valueA); + EXPECT_TRUE(valueB == 0 || valueB == 2); + } else { + EXPECT_EQ(value, valueA); + EXPECT_EQ(value, valueB); + } + + return value; + }); + + folly::Baton<> baton; + auto waitingObserver = makeObserver([collectObserver, &baton]() { + *collectObserver; + baton.post(); + return folly::Unit(); + }); + + baton.reset(); + EXPECT_EQ(0, **collectObserver); + + for (size_t i = 1; i <= 3; ++i) { + observable.setValue(i); + + EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1})); + baton.reset(); + + EXPECT_EQ(i, **collectObserver); + } +} + TEST(Observer, Stress) { SimpleObservable observable(0);