From: Andrii Grynenko Date: Wed, 17 Aug 2016 02:25:54 +0000 (-0700) Subject: folly::Observer X-Git-Tag: v2016.08.22.00~24 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=2ed41baf00ad62d46c118c11f92303542e61e235;p=folly.git folly::Observer Summary: This is a basic implementation of a new Observer API. I mostly see this being useful as a replacement for various configuration subscription APIs (Configerator, SMC etc.) The library provides an Observer primitive. At any time user can take a Snapshot of data in the Observer (which is a view with the most recent version of the data). New Observer can be created by providing a generator functor. Such Observer mays depend on other Observers and its generator functor is re-run automatically every time, when at least one of the dependencies are updated. The implementation may also ignore intermediate updates and will only use the most recent version of other Observers, when re-computing Observer data. Reviewed By: yfeldblum Differential Revision: D3481231 fbshipit-source-id: 96c165f8081cff0141d5814ec2bc88adeb2e1e74 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 16228fb2..b9d5424a 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -107,6 +107,15 @@ nobase_follyinclude_HEADERS = \ experimental/JSONSchema.h \ experimental/LockFreeRingBuffer.h \ experimental/NestedCommandLineApp.h \ + experimental/observer/detail/Core.h \ + experimental/observer/detail/ObserverManager.h \ + experimental/observer/detail/Observer-pre.h \ + experimental/observer/Observable.h \ + experimental/observer/Observable-inl.h \ + experimental/observer/Observer.h \ + experimental/observer/Observer-inl.h \ + experimental/observer/SimpleObservable.h \ + experimental/observer/SimpleObservable-inl.h \ experimental/ProgramOptions.h \ experimental/ReadMostlySharedPtr.h \ experimental/symbolizer/Elf.h \ @@ -469,6 +478,8 @@ libfolly_la_SOURCES = \ experimental/io/FsUtil.cpp \ experimental/JSONSchema.cpp \ experimental/NestedCommandLineApp.cpp \ + experimental/observer/detail/Core.cpp \ + experimental/observer/detail/ObserverManager.cpp \ experimental/ProgramOptions.cpp \ experimental/Select64.cpp \ experimental/TestUtil.cpp diff --git a/folly/experimental/observer/Observable-inl.h b/folly/experimental/observer/Observable-inl.h new file mode 100644 index 00000000..75061ac2 --- /dev/null +++ b/folly/experimental/observer/Observable-inl.h @@ -0,0 +1,100 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace folly { +namespace observer { + +template +class ObserverCreator::Context { + public: + template + Context(Args&&... args) : observable_(std::forward(args)...) {} + + ~Context() { + if (value_.copy()) { + Traits::unsubscribe(observable_); + } + } + + void setCore(observer_detail::Core::WeakPtr coreWeak) { + coreWeak_ = std::move(coreWeak); + } + + std::shared_ptr get() { + updateRequested_ = false; + return value_.copy(); + } + + void update() { + { + auto newValue = Traits::get(observable_); + DCHECK(newValue); + value_.swap(newValue); + } + + bool expected = false; + if (updateRequested_.compare_exchange_strong(expected, true)) { + if (auto core = coreWeak_.lock()) { + observer_detail::ObserverManager::scheduleRefreshNewVersion( + std::move(core)); + } + } + } + + template + void subscribe(F&& callback) { + Traits::subscribe(observable_, std::forward(callback)); + } + + private: + folly::Synchronized> value_; + std::atomic updateRequested_{false}; + + observer_detail::Core::WeakPtr coreWeak_; + + Observable observable_; +}; + +template +template +ObserverCreator::ObserverCreator(Args&&... args) + : context_(std::make_shared(std::forward(args)...)) {} + +template +Observer::T> +ObserverCreator::getObserver()&& { + auto core = observer_detail::Core::create([context = context_]() { + return context->get(); + }); + + context_->setCore(core); + + context_->subscribe([contextWeak = std::weak_ptr(context_)] { + if (auto context = contextWeak.lock()) { + context->update(); + } + }); + + context_->update(); + context_.reset(); + + DCHECK(core->getVersion() > 0); + + return Observer(std::move(core)); +} +} +} diff --git a/folly/experimental/observer/Observable.h b/folly/experimental/observer/Observable.h new file mode 100644 index 00000000..f1aaf0ad --- /dev/null +++ b/folly/experimental/observer/Observable.h @@ -0,0 +1,60 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { +namespace observer { + +template +struct ObservableTraits { + using element_type = + typename std::remove_reference::type::element_type; + + static std::shared_ptr get(Observable& observable) { + return observable.get(); + } + + template + static void subscribe(Observable& observable, F&& callback) { + observable.subscribe(std::forward(callback)); + } + + static void unsubscribe(Observable& observable) { + observable.unsubscribe(); + } +}; + +template > +class ObserverCreator { + public: + using T = typename Traits::element_type; + + template + explicit ObserverCreator(Args&&... args); + + Observer getObserver() &&; + + private: + class Context; + + std::shared_ptr context_; +}; +} +} + +#include diff --git a/folly/experimental/observer/Observer-inl.h b/folly/experimental/observer/Observer-inl.h new file mode 100644 index 00000000..eb1338d7 --- /dev/null +++ b/folly/experimental/observer/Observer-inl.h @@ -0,0 +1,72 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { +namespace observer { + +template +Snapshot Observer::getSnapshot() const { + auto data = core_->getData(); + return Snapshot( + *core_, + std::static_pointer_cast(std::move(data.data)), + data.version); +} + +template +Observer::Observer(observer_detail::Core::Ptr core) + : core_(std::move(core)) {} + +template +Observer> makeObserver( + F&& creator) { + auto core = observer_detail::Core:: + create([creator = std::forward(creator)]() mutable { + return std::static_pointer_cast(creator()); + }); + + observer_detail::ObserverManager::scheduleRefreshNewVersion(core); + + return Observer>(core); +} + +template +Observer> makeObserver(F&& creator) { + return makeObserver([creator = std::forward(creator)]() mutable { + return std::make_shared>(creator()); + }); +} + +template +TLObserver::TLObserver(Observer observer) + : observer_(observer), + snapshot_([&] { return new Snapshot(observer_.getSnapshot()); }) {} + +template +const Snapshot& TLObserver::getSnapshotRef() const { + auto& snapshot = *snapshot_; + if (observer_.needRefresh(snapshot) || + observer_detail::ObserverManager::inManagerThread()) { + snapshot = observer_.getSnapshot(); + } + + return snapshot; +} +} +} diff --git a/folly/experimental/observer/Observer.h b/folly/experimental/observer/Observer.h new file mode 100644 index 00000000..8037b24d --- /dev/null +++ b/folly/experimental/observer/Observer.h @@ -0,0 +1,203 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +namespace folly { +namespace observer { + +/** + * Observer - a library which lets you create objects which track updates of + * their dependencies and get re-computed when any of the dependencies changes. + * + * + * Given an Observer, you can get a snapshot of the current version of the + * object it holds: + * + * Observer myObserver = ...; + * Snapshot mySnapshot = myObserver.getSnapshot(); + * or simply + * Snapshot mySnapshot = *myObserver; + * + * Snapshot will hold a view of the object, even if object in the Observer + * gets updated. + * + * + * What makes Observer powerful is its ability to track updates to other + * Observers. Imagine we have two separate Observers A and B which hold + * integers. + * + * Observer observerA = ...; + * Observer observerB = ...; + * + * To compute a sum of A and B we can create a new Observer which would track + * updates to A and B and re-compute the sum only when necessary. + * + * Observer sumObserver = makeObserver( + * [observerA, observerB] { + * int a = **observerA; + * int b = **observerB; + * return a + b; + * }); + * + * int sum = **sumObserver; + * + * Notice that a + b will be only called when either a or b is changed. Getting + * a snapshot from sumObserver won't trigger any re-computation. + * + * + * TLObserver is very similar to Observer, but it also keeps a thread-local + * cache for the observed object. + * + * Observer observer = ...; + * TLObserver tlObserver(observer); + * auto& snapshot = *tlObserver; + * + * + * See ObserverCreator class if you want to wrap any existing subscription API + * in an Observer object. + */ +template +class Observer; + +template +class Snapshot { + public: + const T& operator*() const { + return *get(); + } + + const T* operator->() const { + return get(); + } + + const T* get() const { + return data_.get(); + } + + /** + * Return the version of the observed object. + */ + size_t getVersion() const { + return version_; + } + + private: + friend class Observer; + + Snapshot( + const observer_detail::Core& core, + std::shared_ptr data, + size_t version) + : data_(std::move(data)), version_(version), core_(&core) { + DCHECK(data_); + } + + std::shared_ptr data_; + size_t version_; + const observer_detail::Core* core_; +}; + +template +class Observer { + public: + explicit Observer(observer_detail::Core::Ptr core); + + Snapshot getSnapshot() const; + Snapshot operator*() const { + return getSnapshot(); + } + + /** + * Check if we have a newer version of the observed object than the snapshot. + * Snapshot should have been originally from this Observer. + */ + bool needRefresh(const Snapshot& snapshot) const { + DCHECK_EQ(core_.get(), snapshot.core_); + return snapshot.getVersion() < core_->getVersionLastChange(); + } + + private: + observer_detail::Core::Ptr core_; +}; + +/** + * makeObserver(...) creates a new Observer object given a functor to + * compute it. The functor can return T or std::shared_ptr. + * + * makeObserver(...) blocks until the initial version of Observer is computed. + * If creator functor fails (throws or returns a nullptr) during this first + * call, the exception is re-thrown by makeObserver(...). + * + * For all subsequent updates if creator functor fails (throws or returs a + * nullptr), the Observer (and all its dependents) is not updated. + */ +template +Observer> makeObserver(F&& creator); + +template +Observer> makeObserver(F&& creator); + +template +class TLObserver { + public: + explicit TLObserver(Observer observer); + + const Snapshot& getSnapshotRef() const; + const Snapshot& operator*() const { + return getSnapshotRef(); + } + + private: + Observer observer_; + folly::ThreadLocal> snapshot_; +}; + +/** + * Same as makeObserver(...), but creates TLObserver. + */ +template +TLObserver makeTLObserver(Observer observer) { + return TLObserver(std::move(observer)); +} + +template +auto makeTLObserver(F&& creator) { + return makeTLObserver(makeObserver(std::forward(creator))); +} + +template +struct ObserverTraits {}; + +template +struct ObserverTraits { + using type = Observer; +}; + +template +struct ObserverTraits { + using type = TLObserver; +}; + +template +using ObserverT = typename ObserverTraits::type; +} +} + +#include diff --git a/folly/experimental/observer/SimpleObservable-inl.h b/folly/experimental/observer/SimpleObservable-inl.h new file mode 100644 index 00000000..463e85bf --- /dev/null +++ b/folly/experimental/observer/SimpleObservable-inl.h @@ -0,0 +1,81 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { +namespace observer { + +template +SimpleObservable::SimpleObservable(T value) + : context_(std::make_shared()) { + setValue(std::move(value)); +} + +template +SimpleObservable::SimpleObservable(std::shared_ptr value) + : context_(std::make_shared()) { + setValue(std::move(value)); +} + +template +void SimpleObservable::setValue(T value) { + setValue(std::make_shared(std::move(value))); +} + +template +void SimpleObservable::setValue(std::shared_ptr value) { + context_->value_.swap(value); + + context_->callback_.withWLock([](folly::Function& callback) { + if (callback) { + callback(); + } + }); +} + +template +Observer SimpleObservable::getObserver() { + struct SimpleObservableWrapper { + using element_type = T; + + std::shared_ptr context; + + std::shared_ptr get() { + return context->value_.copy(); + } + + void subscribe(folly::Function callback) { + context->callback_.swap(callback); + } + + void unsubscribe() { + folly::Function empty; + context->callback_.swap(empty); + } + }; + + std::call_once(observerInit_, [&]() { + SimpleObservableWrapper wrapper; + wrapper.context = context_; + ObserverCreator creator(std::move(wrapper)); + observer_ = std::move(creator).getObserver(); + }); + return *observer_; +} +} +} diff --git a/folly/experimental/observer/SimpleObservable.h b/folly/experimental/observer/SimpleObservable.h new file mode 100644 index 00000000..1feb21d2 --- /dev/null +++ b/folly/experimental/observer/SimpleObservable.h @@ -0,0 +1,49 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +namespace folly { +namespace observer { + +template +class SimpleObservable { + public: + explicit SimpleObservable(T value); + explicit SimpleObservable(std::shared_ptr value); + + void setValue(T value); + void setValue(std::shared_ptr value); + + Observer getObserver(); + + private: + struct Context { + folly::Synchronized> value_; + folly::Synchronized> callback_; + }; + std::shared_ptr context_; + + std::once_flag observerInit_; + folly::Optional> observer_; +}; +} +} + +#include diff --git a/folly/experimental/observer/detail/Core.cpp b/folly/experimental/observer/detail/Core.cpp new file mode 100644 index 00000000..c4e317ec --- /dev/null +++ b/folly/experimental/observer/detail/Core.cpp @@ -0,0 +1,167 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +namespace folly { +namespace observer_detail { + +Core::VersionedData Core::getData() { + if (!ObserverManager::inManagerThread()) { + return data_.copy(); + } + + ObserverManager::DependencyRecorder::markDependency(shared_from_this()); + + auto version = ObserverManager::getVersion(); + + if (version_ >= version) { + return data_.copy(); + } + + refresh(version); + + DCHECK_GE(version_, version); + return data_.copy(); +} + +size_t Core::refresh(size_t version, bool force) { + CHECK(ObserverManager::inManagerThread()); + + if (version_ >= version) { + return versionLastChange_; + } + + bool refreshDependents = false; + + { + std::lock_guard lgRefresh(refreshMutex_); + + // Recheck in case this code was already refreshed + if (version_ >= version) { + return versionLastChange_; + } + + bool needRefresh = force || version_ == 0; + + // This can be run in parallel, but we expect most updates to propagate + // bottom to top. + dependencies_.withRLock([&](const Dependencies& dependencies) { + for (const auto& dependency : dependencies) { + if (dependency->refresh(version) > version_) { + needRefresh = true; + break; + } + } + }); + + if (!needRefresh) { + version_ = version; + return versionLastChange_; + } + + ObserverManager::DependencyRecorder dependencyRecorder; + + try { + { + VersionedData newData{creator_(), version}; + if (!newData.data) { + throw std::logic_error("Observer creator returned nullptr."); + } + data_.swap(newData); + } + + versionLastChange_ = version; + refreshDependents = true; + } catch (...) { + LOG(ERROR) << "Exception while refreshing Observer: " + << exceptionStr(std::current_exception()); + + if (version_ == 0) { + // Re-throw exception if this is the first time we run creator + throw; + } + } + + version_ = version; + + auto newDependencies = dependencyRecorder.release(); + dependencies_.withWLock([&](Dependencies& dependencies) { + for (const auto& dependency : newDependencies) { + if (!dependencies.count(dependency)) { + dependency->addDependent(this->shared_from_this()); + } + } + + for (const auto& dependency : dependencies) { + if (!newDependencies.count(dependency)) { + dependency->removeStaleDependents(); + } + } + + dependencies = std::move(newDependencies); + }); + } + + if (refreshDependents) { + auto dependents = dependents_.copy(); + + for (const auto& dependentWeak : dependents) { + if (auto dependent = dependentWeak.lock()) { + ObserverManager::scheduleRefresh(std::move(dependent), version); + } + } + } + + return versionLastChange_; +} + +Core::Core(folly::Function()> creator) + : creator_(std::move(creator)) {} + +Core::~Core() { + dependencies_.withWLock([](const Dependencies& dependencies) { + for (const auto& dependecy : dependencies) { + dependecy->removeStaleDependents(); + } + }); +} + +Core::Ptr Core::create(folly::Function()> creator) { + auto core = Core::Ptr(new Core(std::move(creator))); + return core; +} + +void Core::addDependent(Core::WeakPtr dependent) { + dependents_.withWLock([&](Dependents& dependents) { + dependents.push_back(std::move(dependent)); + }); +} + +void Core::removeStaleDependents() { + // This is inefficient, the assumption is that we won't have many dependents + dependents_.withWLock([](Dependents& dependents) { + for (size_t i = 0; i < dependents.size(); ++i) { + if (dependents[i].expired()) { + std::swap(dependents[i], dependents.back()); + dependents.pop_back(); + --i; + } + } + }); +} +} +} diff --git a/folly/experimental/observer/detail/Core.h b/folly/experimental/observer/detail/Core.h new file mode 100644 index 00000000..2d1dd7f2 --- /dev/null +++ b/folly/experimental/observer/detail/Core.h @@ -0,0 +1,108 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +namespace folly { +namespace observer_detail { + +class ObserverManager; + +/** + * Core stores the current version of the object held by Observer. It also keeps + * all dependencies and dependents of the Observer. + */ +class Core : public std::enable_shared_from_this { + public: + using Ptr = std::shared_ptr; + using WeakPtr = std::weak_ptr; + + /** + * Blocks until creator is successfully run by ObserverManager + */ + static Ptr create(folly::Function()> creator); + + /** + * View of the observed object and its version + */ + struct VersionedData { + VersionedData() {} + + VersionedData(std::shared_ptr data_, size_t version_) + : data(std::move(data_)), version(version_) {} + + std::shared_ptr data; + size_t version{0}; + }; + + /** + * Gets current view of the observed object. + * This is safe to call from any thread. If this is called from other Observer + * functor then that Observer is marked as dependent on current Observer. + */ + VersionedData getData(); + + /** + * Gets the version of the observed object. + */ + size_t getVersion() const { + return version_; + } + + /** + * Get the last version at which the observed object was actually changed. + */ + size_t getVersionLastChange() { + return versionLastChange_; + } + + /** + * Check if the observed object needs to be re-computed. Returns the version + * of last change. If force is true, re-computes the observed object, even if + * dependencies didn't change. + * + * This should be only called from ObserverManager thread. + */ + size_t refresh(size_t version, bool force = false); + + ~Core(); + + private: + explicit Core(folly::Function()> creator); + + void addDependent(Core::WeakPtr dependent); + void removeStaleDependents(); + + using Dependents = std::vector; + using Dependencies = std::unordered_set; + + folly::Synchronized dependents_; + folly::Synchronized dependencies_; + + std::atomic version_{0}; + std::atomic versionLastChange_{0}; + + folly::Synchronized data_; + + folly::Function()> creator_; + + std::mutex refreshMutex_; +}; +} +} diff --git a/folly/experimental/observer/detail/Observer-pre.h b/folly/experimental/observer/detail/Observer-pre.h new file mode 100644 index 00000000..506306a6 --- /dev/null +++ b/folly/experimental/observer/detail/Observer-pre.h @@ -0,0 +1,45 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace folly { +namespace observer_detail { + +template +struct NonSharedPtr { + using type = typename std::decay::type; +}; + +template +struct NonSharedPtr> {}; + +template +struct UnwrapSharedPtr {}; + +template +struct UnwrapSharedPtr> { + using type = typename std::decay::type; +}; + +template +using ResultOf = + typename NonSharedPtr::type>::type; + +template +using ResultOfUnwrapSharedPtr = + typename UnwrapSharedPtr::type>::type; +} +} diff --git a/folly/experimental/observer/detail/ObserverManager.cpp b/folly/experimental/observer/detail/ObserverManager.cpp new file mode 100644 index 00000000..24acabad --- /dev/null +++ b/folly/experimental/observer/detail/ObserverManager.cpp @@ -0,0 +1,174 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include + +namespace folly { +namespace observer_detail { + +FOLLY_TLS bool ObserverManager::inManagerThread_{false}; +FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies* + ObserverManager::DependencyRecorder::currentDependencies_{nullptr}; + +namespace { +constexpr size_t kCurrentThreadPoolSize{4}; +constexpr size_t kCurrentQueueSize{10 * 1024}; +constexpr size_t kNextQueueSize{10 * 1024}; +} + +class ObserverManager::CurrentQueue { + public: + CurrentQueue() : queue_(kCurrentQueueSize) { + for (size_t i = 0; i < kCurrentThreadPoolSize; ++i) { + threads_.emplace_back([&]() { + ObserverManager::inManagerThread_ = true; + + while (true) { + Function task; + queue_.blockingRead(task); + + if (!task) { + return; + } + + try { + task(); + } catch (...) { + LOG(ERROR) << "Exception while running CurrentQueue task: " + << exceptionStr(std::current_exception()); + } + } + }); + } + } + + ~CurrentQueue() { + for (size_t i = 0; i < threads_.size(); ++i) { + queue_.blockingWrite(nullptr); + } + + for (auto& thread : threads_) { + thread.join(); + } + + CHECK(queue_.isEmpty()); + } + + void add(Function task) { + if (ObserverManager::inManagerThread()) { + if (!queue_.write(std::move(task))) { + throw std::runtime_error("Too many Observers scheduled for update."); + } + } else { + queue_.blockingWrite(std::move(task)); + } + } + + private: + MPMCQueue> queue_; + std::vector threads_; +}; + +class ObserverManager::NextQueue { + public: + explicit NextQueue(ObserverManager& manager) + : manager_(manager), queue_(kNextQueueSize) { + thread_ = std::thread([&]() { + Core::Ptr queueCore; + + while (true) { + queue_.blockingRead(queueCore); + + if (!queueCore) { + return; + } + + std::vector cores; + cores.emplace_back(std::move(queueCore)); + + { + SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_); + + // We can't pick more tasks from the queue after we bumped the + // version, so we have to do this while holding the lock. + while (cores.size() < kNextQueueSize && queue_.read(queueCore)) { + if (!queueCore) { + return; + } + cores.emplace_back(std::move(queueCore)); + } + + ++manager_.version_; + } + + for (auto& core : cores) { + manager_.scheduleRefresh(std::move(core), manager_.version_, true); + } + } + }); + } + + void add(Core::Ptr core) { + queue_.blockingWrite(std::move(core)); + } + + ~NextQueue() { + // Emtpy element signals thread to terminate + queue_.blockingWrite(nullptr); + thread_.join(); + } + + private: + ObserverManager& manager_; + MPMCQueue queue_; + std::thread thread_; +}; + +ObserverManager::ObserverManager() { + currentQueue_ = make_unique(); + nextQueue_ = make_unique(*this); +} + +ObserverManager::~ObserverManager() { + // Destroy NextQueue, before the rest of this object, since it expects + // ObserverManager to be alive. + nextQueue_.release(); + currentQueue_.release(); +} + +void ObserverManager::scheduleCurrent(Function task) { + currentQueue_->add(std::move(task)); +} + +void ObserverManager::scheduleNext(Core::Ptr core) { + nextQueue_->add(std::move(core)); +} + +struct ObserverManager::Singleton { + static folly::Singleton instance; +}; + +folly::Singleton ObserverManager::Singleton::instance([] { + return new ObserverManager(); +}); + +std::shared_ptr ObserverManager::getInstance() { + return Singleton::instance.try_get(); +} +} +} diff --git a/folly/experimental/observer/detail/ObserverManager.h b/folly/experimental/observer/detail/ObserverManager.h new file mode 100644 index 00000000..6319bedb --- /dev/null +++ b/folly/experimental/observer/detail/ObserverManager.h @@ -0,0 +1,181 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { +namespace observer_detail { + +/** + * ObserverManager is a singleton which controls the re-computation of all + * Observers. Such re-computation always happens on the thread pool owned by + * ObserverManager. + * + * ObserverManager has global current version. All existing Observers + * may have their version be less (yet to be updated) or equal (up to date) + * to the global current version. + * + * ObserverManager::CurrentQueue contains all of the Observers which need to be + * updated to the global current version. Those updates are peformed on the + * ObserverManager's thread pool, until the queue is empty. If some Observer is + * updated, all of its dependents are added to ObserverManager::CurrentQueue + * to be updated. + * + * If some leaf Observer (i.e. created from Observable) is updated, then current + * version of the ObserverManager should be bumped. All such updated leaf + * Observers are added to the ObserverManager::NextQueue. + * + * *Only* when ObserverManager::CurrentQueue is empty, the global current + * version is bumped and all updates from the ObserverManager::NextQueue are + * performed. If leaf Observer gets updated more then once before being picked + * from the ObserverManager::NextQueue, then only the last update is processed. + */ +class ObserverManager { + public: + static size_t getVersion() { + auto instance = getInstance(); + + if (!instance) { + return 1; + } + + return instance->version_; + } + + static bool inManagerThread() { + return inManagerThread_; + } + + static Future + scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) { + if (core->getVersion() >= minVersion) { + return makeFuture(Unit()); + } + + auto instance = getInstance(); + + if (!instance) { + return makeFuture( + std::logic_error("ObserverManager requested during shutdown")); + } + + Promise promise; + auto future = promise.getFuture(); + + SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_); + + instance->scheduleCurrent([ + core = std::move(core), + promise = std::move(promise), + instancePtr = instance.get(), + rh = std::move(rh), + force + ]() mutable { + promise.setWith([&]() { core->refresh(instancePtr->version_, force); }); + }); + + return future; + } + + static void scheduleRefreshNewVersion(Core::Ptr core) { + if (core->getVersion() == 0) { + scheduleRefresh(std::move(core), 1).get(); + return; + } + + auto instance = getInstance(); + + if (!instance) { + return; + } + + instance->scheduleNext(std::move(core)); + } + + class DependencyRecorder { + public: + using Dependencies = std::unordered_set; + + DependencyRecorder() { + DCHECK(inManagerThread()); + + previousDepedencies_ = currentDependencies_; + currentDependencies_ = &dependencies_; + } + + static void markDependency(Core::Ptr dependency) { + DCHECK(inManagerThread()); + DCHECK(currentDependencies_); + + currentDependencies_->insert(std::move(dependency)); + } + + Dependencies release() { + DCHECK(currentDependencies_ == &dependencies_); + std::swap(currentDependencies_, previousDepedencies_); + previousDepedencies_ = nullptr; + + return std::move(dependencies_); + } + + ~DependencyRecorder() { + if (previousDepedencies_) { + release(); + } + } + + private: + Dependencies dependencies_; + Dependencies* previousDepedencies_; + + static FOLLY_TLS Dependencies* currentDependencies_; + }; + + ~ObserverManager(); + + private: + ObserverManager(); + + struct Singleton; + + void scheduleCurrent(Function); + void scheduleNext(Core::Ptr); + + class CurrentQueue; + class NextQueue; + + std::unique_ptr currentQueue_; + std::unique_ptr nextQueue_; + + static std::shared_ptr getInstance(); + static FOLLY_TLS bool inManagerThread_; + + /** + * Version mutex is used to make sure all updates are processed from the + * CurrentQueue, before bumping the version and moving to the NextQueue. + * + * To achieve this every task added to CurrentQueue holds a reader lock. + * NextQueue grabs a writer lock before bumping the version, so it can only + * happen if CurrentQueue is empty (notice that we use read-priority shared + * mutex). + */ + SharedMutexReadPriority versionMutex_; + std::atomic version_{1}; +}; +} +} diff --git a/folly/experimental/observer/test/ObserverTest.cpp b/folly/experimental/observer/test/ObserverTest.cpp new file mode 100644 index 00000000..abfbc368 --- /dev/null +++ b/folly/experimental/observer/test/ObserverTest.cpp @@ -0,0 +1,192 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include + +#include +#include + +using namespace folly::observer; + +TEST(Observer, Observable) { + SimpleObservable observable(42); + auto observer = observable.getObserver(); + + EXPECT_EQ(42, **observer); + + folly::Baton<> baton; + auto waitingObserver = makeObserver([observer, &baton]() { + *observer; + baton.post(); + return folly::Unit(); + }); + baton.reset(); + + observable.setValue(24); + + EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1})); + + EXPECT_EQ(24, **observer); +} + +TEST(Observer, MakeObserver) { + SimpleObservable observable(42); + + auto observer = makeObserver([child = observable.getObserver()]() { + return **child + 1; + }); + + EXPECT_EQ(43, **observer); + + folly::Baton<> baton; + auto waitingObserver = makeObserver([observer, &baton]() { + *observer; + baton.post(); + return folly::Unit(); + }); + baton.reset(); + + observable.setValue(24); + + EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1})); + + EXPECT_EQ(25, **observer); +} + +TEST(Observer, MakeObserverDiamond) { + SimpleObservable observable(42); + + auto observer1 = makeObserver([child = observable.getObserver()]() { + return **child + 1; + }); + + auto observer2 = makeObserver([child = observable.getObserver()]() { + return std::make_shared(**child + 2); + }); + + auto observer = makeObserver( + [observer1, observer2]() { return (**observer1) * (**observer2); }); + + EXPECT_EQ(43 * 44, *observer.getSnapshot()); + + folly::Baton<> baton; + auto waitingObserver = makeObserver([observer, &baton]() { + *observer; + baton.post(); + return folly::Unit(); + }); + baton.reset(); + + observable.setValue(24); + + EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1})); + + EXPECT_EQ(25 * 26, **observer); +} + +TEST(Observer, CreateException) { + struct ExpectedException {}; + EXPECT_THROW( + auto observer = makeObserver( + []() -> std::shared_ptr { throw ExpectedException(); }), + ExpectedException); + + EXPECT_THROW( + auto observer = + makeObserver([]() -> std::shared_ptr { return nullptr; }), + std::logic_error); +} + +TEST(Observer, NullValue) { + SimpleObservable observable(41); + auto oddObserver = makeObserver([innerObserver = observable.getObserver()]() { + auto value = **innerObserver; + + if (value % 2 != 0) { + return value * 2; + } + + throw std::logic_error("I prefer odd numbers"); + }); + + folly::Baton<> baton; + auto waitingObserver = makeObserver([oddObserver, &baton]() { + *oddObserver; + baton.post(); + return folly::Unit(); + }); + + baton.reset(); + EXPECT_EQ(82, **oddObserver); + + observable.setValue(2); + + // Waiting observer shouldn't be updated + EXPECT_FALSE(baton.timed_wait(std::chrono::seconds{1})); + baton.reset(); + + EXPECT_EQ(82, **oddObserver); + + observable.setValue(23); + + EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1})); + + EXPECT_EQ(46, **oddObserver); +} + +TEST(Observer, Stress) { + SimpleObservable observable(0); + + folly::Synchronized> values; + + auto observer = makeObserver([ child = observable.getObserver(), &values ]() { + auto value = **child * 10; + values.withWLock( + [&](std::vector& values) { values.push_back(value); }); + return value; + }); + + EXPECT_EQ(0, **observer); + values.withRLock([](const std::vector& values) { + EXPECT_EQ(1, values.size()); + EXPECT_EQ(0, values.back()); + }); + + constexpr size_t numIters = 10000; + + for (size_t i = 1; i <= numIters; ++i) { + observable.setValue(i); + } + + while (**observer != numIters * 10) { + std::this_thread::yield(); + } + + values.withRLock([numIters = numIters](const std::vector& values) { + EXPECT_EQ(numIters * 10, values.back()); + EXPECT_LT(values.size(), numIters / 2); + EXPECT_GT(values.size(), 10); + + for (auto value : values) { + EXPECT_EQ(0, value % 10); + } + + for (size_t i = 0; i < values.size() - 1; ++i) { + EXPECT_LE(values[i], values[i + 1]); + } + }); +}