experimental/JSONSchema.h \
experimental/LockFreeRingBuffer.h \
experimental/NestedCommandLineApp.h \
+ experimental/observer/detail/Core.h \
+ experimental/observer/detail/ObserverManager.h \
+ experimental/observer/detail/Observer-pre.h \
+ experimental/observer/Observable.h \
+ experimental/observer/Observable-inl.h \
+ experimental/observer/Observer.h \
+ experimental/observer/Observer-inl.h \
+ experimental/observer/SimpleObservable.h \
+ experimental/observer/SimpleObservable-inl.h \
experimental/ProgramOptions.h \
experimental/ReadMostlySharedPtr.h \
experimental/symbolizer/Elf.h \
experimental/io/FsUtil.cpp \
experimental/JSONSchema.cpp \
experimental/NestedCommandLineApp.cpp \
+ experimental/observer/detail/Core.cpp \
+ experimental/observer/detail/ObserverManager.cpp \
experimental/ProgramOptions.cpp \
experimental/Select64.cpp \
experimental/TestUtil.cpp
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly {
+namespace observer {
+
+template <typename Observable, typename Traits>
+class ObserverCreator<Observable, Traits>::Context {
+ public:
+ template <typename... Args>
+ Context(Args&&... args) : observable_(std::forward<Args>(args)...) {}
+
+ ~Context() {
+ if (value_.copy()) {
+ Traits::unsubscribe(observable_);
+ }
+ }
+
+ void setCore(observer_detail::Core::WeakPtr coreWeak) {
+ coreWeak_ = std::move(coreWeak);
+ }
+
+ std::shared_ptr<const T> get() {
+ updateRequested_ = false;
+ return value_.copy();
+ }
+
+ void update() {
+ {
+ auto newValue = Traits::get(observable_);
+ DCHECK(newValue);
+ value_.swap(newValue);
+ }
+
+ bool expected = false;
+ if (updateRequested_.compare_exchange_strong(expected, true)) {
+ if (auto core = coreWeak_.lock()) {
+ observer_detail::ObserverManager::scheduleRefreshNewVersion(
+ std::move(core));
+ }
+ }
+ }
+
+ template <typename F>
+ void subscribe(F&& callback) {
+ Traits::subscribe(observable_, std::forward<F>(callback));
+ }
+
+ private:
+ folly::Synchronized<std::shared_ptr<const T>> value_;
+ std::atomic<bool> updateRequested_{false};
+
+ observer_detail::Core::WeakPtr coreWeak_;
+
+ Observable observable_;
+};
+
+template <typename Observable, typename Traits>
+template <typename... Args>
+ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
+ : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
+
+template <typename Observable, typename Traits>
+Observer<typename ObserverCreator<Observable, Traits>::T>
+ObserverCreator<Observable, Traits>::getObserver()&& {
+ auto core = observer_detail::Core::create([context = context_]() {
+ return context->get();
+ });
+
+ context_->setCore(core);
+
+ context_->subscribe([contextWeak = std::weak_ptr<Context>(context_)] {
+ if (auto context = contextWeak.lock()) {
+ context->update();
+ }
+ });
+
+ context_->update();
+ context_.reset();
+
+ DCHECK(core->getVersion() > 0);
+
+ return Observer<T>(std::move(core));
+}
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/Observer.h>
+
+namespace folly {
+namespace observer {
+
+template <typename Observable>
+struct ObservableTraits {
+ using element_type =
+ typename std::remove_reference<Observable>::type::element_type;
+
+ static std::shared_ptr<const element_type> get(Observable& observable) {
+ return observable.get();
+ }
+
+ template <typename F>
+ static void subscribe(Observable& observable, F&& callback) {
+ observable.subscribe(std::forward<F>(callback));
+ }
+
+ static void unsubscribe(Observable& observable) {
+ observable.unsubscribe();
+ }
+};
+
+template <typename Observable, typename Traits = ObservableTraits<Observable>>
+class ObserverCreator {
+ public:
+ using T = typename Traits::element_type;
+
+ template <typename... Args>
+ explicit ObserverCreator(Args&&... args);
+
+ Observer<T> getObserver() &&;
+
+ private:
+ class Context;
+
+ std::shared_ptr<Context> context_;
+};
+}
+}
+
+#include <folly/experimental/observer/Observable-inl.h>
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+namespace folly {
+namespace observer {
+
+template <typename T>
+Snapshot<T> Observer<T>::getSnapshot() const {
+ auto data = core_->getData();
+ return Snapshot<T>(
+ *core_,
+ std::static_pointer_cast<const T>(std::move(data.data)),
+ data.version);
+}
+
+template <typename T>
+Observer<T>::Observer(observer_detail::Core::Ptr core)
+ : core_(std::move(core)) {}
+
+template <typename F>
+Observer<observer_detail::ResultOfUnwrapSharedPtr<F>> makeObserver(
+ F&& creator) {
+ auto core = observer_detail::Core::
+ create([creator = std::forward<F>(creator)]() mutable {
+ return std::static_pointer_cast<void>(creator());
+ });
+
+ observer_detail::ObserverManager::scheduleRefreshNewVersion(core);
+
+ return Observer<observer_detail::ResultOfUnwrapSharedPtr<F>>(core);
+}
+
+template <typename F>
+Observer<observer_detail::ResultOf<F>> makeObserver(F&& creator) {
+ return makeObserver([creator = std::forward<F>(creator)]() mutable {
+ return std::make_shared<observer_detail::ResultOf<F>>(creator());
+ });
+}
+
+template <typename T>
+TLObserver<T>::TLObserver(Observer<T> observer)
+ : observer_(observer),
+ snapshot_([&] { return new Snapshot<T>(observer_.getSnapshot()); }) {}
+
+template <typename T>
+const Snapshot<T>& TLObserver<T>::getSnapshotRef() const {
+ auto& snapshot = *snapshot_;
+ if (observer_.needRefresh(snapshot) ||
+ observer_detail::ObserverManager::inManagerThread()) {
+ snapshot = observer_.getSnapshot();
+ }
+
+ return snapshot;
+}
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/ThreadLocal.h>
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/Observer-pre.h>
+
+namespace folly {
+namespace observer {
+
+/**
+ * Observer - a library which lets you create objects which track updates of
+ * their dependencies and get re-computed when any of the dependencies changes.
+ *
+ *
+ * Given an Observer, you can get a snapshot of the current version of the
+ * object it holds:
+ *
+ * Observer<int> myObserver = ...;
+ * Snapshot<int> mySnapshot = myObserver.getSnapshot();
+ * or simply
+ * Snapshot<int> mySnapshot = *myObserver;
+ *
+ * Snapshot will hold a view of the object, even if object in the Observer
+ * gets updated.
+ *
+ *
+ * What makes Observer powerful is its ability to track updates to other
+ * Observers. Imagine we have two separate Observers A and B which hold
+ * integers.
+ *
+ * Observer<int> observerA = ...;
+ * Observer<int> observerB = ...;
+ *
+ * To compute a sum of A and B we can create a new Observer which would track
+ * updates to A and B and re-compute the sum only when necessary.
+ *
+ * Observer<int> sumObserver = makeObserver(
+ * [observerA, observerB] {
+ * int a = **observerA;
+ * int b = **observerB;
+ * return a + b;
+ * });
+ *
+ * int sum = **sumObserver;
+ *
+ * Notice that a + b will be only called when either a or b is changed. Getting
+ * a snapshot from sumObserver won't trigger any re-computation.
+ *
+ *
+ * TLObserver is very similar to Observer, but it also keeps a thread-local
+ * cache for the observed object.
+ *
+ * Observer<int> observer = ...;
+ * TLObserver<int> tlObserver(observer);
+ * auto& snapshot = *tlObserver;
+ *
+ *
+ * See ObserverCreator class if you want to wrap any existing subscription API
+ * in an Observer object.
+ */
+template <typename T>
+class Observer;
+
+template <typename T>
+class Snapshot {
+ public:
+ const T& operator*() const {
+ return *get();
+ }
+
+ const T* operator->() const {
+ return get();
+ }
+
+ const T* get() const {
+ return data_.get();
+ }
+
+ /**
+ * Return the version of the observed object.
+ */
+ size_t getVersion() const {
+ return version_;
+ }
+
+ private:
+ friend class Observer<T>;
+
+ Snapshot(
+ const observer_detail::Core& core,
+ std::shared_ptr<const T> data,
+ size_t version)
+ : data_(std::move(data)), version_(version), core_(&core) {
+ DCHECK(data_);
+ }
+
+ std::shared_ptr<const T> data_;
+ size_t version_;
+ const observer_detail::Core* core_;
+};
+
+template <typename T>
+class Observer {
+ public:
+ explicit Observer(observer_detail::Core::Ptr core);
+
+ Snapshot<T> getSnapshot() const;
+ Snapshot<T> operator*() const {
+ return getSnapshot();
+ }
+
+ /**
+ * Check if we have a newer version of the observed object than the snapshot.
+ * Snapshot should have been originally from this Observer.
+ */
+ bool needRefresh(const Snapshot<T>& snapshot) const {
+ DCHECK_EQ(core_.get(), snapshot.core_);
+ return snapshot.getVersion() < core_->getVersionLastChange();
+ }
+
+ private:
+ observer_detail::Core::Ptr core_;
+};
+
+/**
+ * makeObserver(...) creates a new Observer<T> object given a functor to
+ * compute it. The functor can return T or std::shared_ptr<const T>.
+ *
+ * makeObserver(...) blocks until the initial version of Observer is computed.
+ * If creator functor fails (throws or returns a nullptr) during this first
+ * call, the exception is re-thrown by makeObserver(...).
+ *
+ * For all subsequent updates if creator functor fails (throws or returs a
+ * nullptr), the Observer (and all its dependents) is not updated.
+ */
+template <typename F>
+Observer<observer_detail::ResultOf<F>> makeObserver(F&& creator);
+
+template <typename F>
+Observer<observer_detail::ResultOfUnwrapSharedPtr<F>> makeObserver(F&& creator);
+
+template <typename T>
+class TLObserver {
+ public:
+ explicit TLObserver(Observer<T> observer);
+
+ const Snapshot<T>& getSnapshotRef() const;
+ const Snapshot<T>& operator*() const {
+ return getSnapshotRef();
+ }
+
+ private:
+ Observer<T> observer_;
+ folly::ThreadLocal<Snapshot<T>> snapshot_;
+};
+
+/**
+ * Same as makeObserver(...), but creates TLObserver.
+ */
+template <typename T>
+TLObserver<T> makeTLObserver(Observer<T> observer) {
+ return TLObserver<T>(std::move(observer));
+}
+
+template <typename F>
+auto makeTLObserver(F&& creator) {
+ return makeTLObserver(makeObserver(std::forward<F>(creator)));
+}
+
+template <typename T, bool CacheInThreadLocal>
+struct ObserverTraits {};
+
+template <typename T>
+struct ObserverTraits<T, false> {
+ using type = Observer<T>;
+};
+
+template <typename T>
+struct ObserverTraits<T, true> {
+ using type = TLObserver<T>;
+};
+
+template <typename T, bool CacheInThreadLocal>
+using ObserverT = typename ObserverTraits<T, CacheInThreadLocal>::type;
+}
+}
+
+#include <folly/experimental/observer/Observer-inl.h>
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/Observable.h>
+
+namespace folly {
+namespace observer {
+
+template <typename T>
+SimpleObservable<T>::SimpleObservable(T value)
+ : context_(std::make_shared<Context>()) {
+ setValue(std::move(value));
+}
+
+template <typename T>
+SimpleObservable<T>::SimpleObservable(std::shared_ptr<const T> value)
+ : context_(std::make_shared<Context>()) {
+ setValue(std::move(value));
+}
+
+template <typename T>
+void SimpleObservable<T>::setValue(T value) {
+ setValue(std::make_shared<const T>(std::move(value)));
+}
+
+template <typename T>
+void SimpleObservable<T>::setValue(std::shared_ptr<const T> value) {
+ context_->value_.swap(value);
+
+ context_->callback_.withWLock([](folly::Function<void()>& callback) {
+ if (callback) {
+ callback();
+ }
+ });
+}
+
+template <typename T>
+Observer<T> SimpleObservable<T>::getObserver() {
+ struct SimpleObservableWrapper {
+ using element_type = T;
+
+ std::shared_ptr<Context> context;
+
+ std::shared_ptr<const T> get() {
+ return context->value_.copy();
+ }
+
+ void subscribe(folly::Function<void()> callback) {
+ context->callback_.swap(callback);
+ }
+
+ void unsubscribe() {
+ folly::Function<void()> empty;
+ context->callback_.swap(empty);
+ }
+ };
+
+ std::call_once(observerInit_, [&]() {
+ SimpleObservableWrapper wrapper;
+ wrapper.context = context_;
+ ObserverCreator<SimpleObservableWrapper> creator(std::move(wrapper));
+ observer_ = std::move(creator).getObserver();
+ });
+ return *observer_;
+}
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Function.h>
+#include <folly/Synchronized.h>
+#include <folly/experimental/observer/Observer.h>
+
+namespace folly {
+namespace observer {
+
+template <typename T>
+class SimpleObservable {
+ public:
+ explicit SimpleObservable(T value);
+ explicit SimpleObservable(std::shared_ptr<const T> value);
+
+ void setValue(T value);
+ void setValue(std::shared_ptr<const T> value);
+
+ Observer<T> getObserver();
+
+ private:
+ struct Context {
+ folly::Synchronized<std::shared_ptr<const T>> value_;
+ folly::Synchronized<folly::Function<void()>> callback_;
+ };
+ std::shared_ptr<Context> context_;
+
+ std::once_flag observerInit_;
+ folly::Optional<Observer<T>> observer_;
+};
+}
+}
+
+#include <folly/experimental/observer/SimpleObservable-inl.h>
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+namespace folly {
+namespace observer_detail {
+
+Core::VersionedData Core::getData() {
+ if (!ObserverManager::inManagerThread()) {
+ return data_.copy();
+ }
+
+ ObserverManager::DependencyRecorder::markDependency(shared_from_this());
+
+ auto version = ObserverManager::getVersion();
+
+ if (version_ >= version) {
+ return data_.copy();
+ }
+
+ refresh(version);
+
+ DCHECK_GE(version_, version);
+ return data_.copy();
+}
+
+size_t Core::refresh(size_t version, bool force) {
+ CHECK(ObserverManager::inManagerThread());
+
+ if (version_ >= version) {
+ return versionLastChange_;
+ }
+
+ bool refreshDependents = false;
+
+ {
+ std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
+
+ // Recheck in case this code was already refreshed
+ if (version_ >= version) {
+ return versionLastChange_;
+ }
+
+ bool needRefresh = force || version_ == 0;
+
+ // This can be run in parallel, but we expect most updates to propagate
+ // bottom to top.
+ dependencies_.withRLock([&](const Dependencies& dependencies) {
+ for (const auto& dependency : dependencies) {
+ if (dependency->refresh(version) > version_) {
+ needRefresh = true;
+ break;
+ }
+ }
+ });
+
+ if (!needRefresh) {
+ version_ = version;
+ return versionLastChange_;
+ }
+
+ ObserverManager::DependencyRecorder dependencyRecorder;
+
+ try {
+ {
+ VersionedData newData{creator_(), version};
+ if (!newData.data) {
+ throw std::logic_error("Observer creator returned nullptr.");
+ }
+ data_.swap(newData);
+ }
+
+ versionLastChange_ = version;
+ refreshDependents = true;
+ } catch (...) {
+ LOG(ERROR) << "Exception while refreshing Observer: "
+ << exceptionStr(std::current_exception());
+
+ if (version_ == 0) {
+ // Re-throw exception if this is the first time we run creator
+ throw;
+ }
+ }
+
+ version_ = version;
+
+ auto newDependencies = dependencyRecorder.release();
+ dependencies_.withWLock([&](Dependencies& dependencies) {
+ for (const auto& dependency : newDependencies) {
+ if (!dependencies.count(dependency)) {
+ dependency->addDependent(this->shared_from_this());
+ }
+ }
+
+ for (const auto& dependency : dependencies) {
+ if (!newDependencies.count(dependency)) {
+ dependency->removeStaleDependents();
+ }
+ }
+
+ dependencies = std::move(newDependencies);
+ });
+ }
+
+ if (refreshDependents) {
+ auto dependents = dependents_.copy();
+
+ for (const auto& dependentWeak : dependents) {
+ if (auto dependent = dependentWeak.lock()) {
+ ObserverManager::scheduleRefresh(std::move(dependent), version);
+ }
+ }
+ }
+
+ return versionLastChange_;
+}
+
+Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
+ : creator_(std::move(creator)) {}
+
+Core::~Core() {
+ dependencies_.withWLock([](const Dependencies& dependencies) {
+ for (const auto& dependecy : dependencies) {
+ dependecy->removeStaleDependents();
+ }
+ });
+}
+
+Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
+ auto core = Core::Ptr(new Core(std::move(creator)));
+ return core;
+}
+
+void Core::addDependent(Core::WeakPtr dependent) {
+ dependents_.withWLock([&](Dependents& dependents) {
+ dependents.push_back(std::move(dependent));
+ });
+}
+
+void Core::removeStaleDependents() {
+ // This is inefficient, the assumption is that we won't have many dependents
+ dependents_.withWLock([](Dependents& dependents) {
+ for (size_t i = 0; i < dependents.size(); ++i) {
+ if (dependents[i].expired()) {
+ std::swap(dependents[i], dependents.back());
+ dependents.pop_back();
+ --i;
+ }
+ }
+ });
+}
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Synchronized.h>
+#include <folly/futures/Future.h>
+#include <set>
+
+namespace folly {
+namespace observer_detail {
+
+class ObserverManager;
+
+/**
+ * Core stores the current version of the object held by Observer. It also keeps
+ * all dependencies and dependents of the Observer.
+ */
+class Core : public std::enable_shared_from_this<Core> {
+ public:
+ using Ptr = std::shared_ptr<Core>;
+ using WeakPtr = std::weak_ptr<Core>;
+
+ /**
+ * Blocks until creator is successfully run by ObserverManager
+ */
+ static Ptr create(folly::Function<std::shared_ptr<const void>()> creator);
+
+ /**
+ * View of the observed object and its version
+ */
+ struct VersionedData {
+ VersionedData() {}
+
+ VersionedData(std::shared_ptr<const void> data_, size_t version_)
+ : data(std::move(data_)), version(version_) {}
+
+ std::shared_ptr<const void> data;
+ size_t version{0};
+ };
+
+ /**
+ * Gets current view of the observed object.
+ * This is safe to call from any thread. If this is called from other Observer
+ * functor then that Observer is marked as dependent on current Observer.
+ */
+ VersionedData getData();
+
+ /**
+ * Gets the version of the observed object.
+ */
+ size_t getVersion() const {
+ return version_;
+ }
+
+ /**
+ * Get the last version at which the observed object was actually changed.
+ */
+ size_t getVersionLastChange() {
+ return versionLastChange_;
+ }
+
+ /**
+ * Check if the observed object needs to be re-computed. Returns the version
+ * of last change. If force is true, re-computes the observed object, even if
+ * dependencies didn't change.
+ *
+ * This should be only called from ObserverManager thread.
+ */
+ size_t refresh(size_t version, bool force = false);
+
+ ~Core();
+
+ private:
+ explicit Core(folly::Function<std::shared_ptr<const void>()> creator);
+
+ void addDependent(Core::WeakPtr dependent);
+ void removeStaleDependents();
+
+ using Dependents = std::vector<WeakPtr>;
+ using Dependencies = std::unordered_set<Ptr>;
+
+ folly::Synchronized<Dependents> dependents_;
+ folly::Synchronized<Dependencies> dependencies_;
+
+ std::atomic<size_t> version_{0};
+ std::atomic<size_t> versionLastChange_{0};
+
+ folly::Synchronized<VersionedData> data_;
+
+ folly::Function<std::shared_ptr<const void>()> creator_;
+
+ std::mutex refreshMutex_;
+};
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly {
+namespace observer_detail {
+
+template <typename T>
+struct NonSharedPtr {
+ using type = typename std::decay<T>::type;
+};
+
+template <typename T>
+struct NonSharedPtr<std::shared_ptr<T>> {};
+
+template <typename T>
+struct UnwrapSharedPtr {};
+
+template <typename T>
+struct UnwrapSharedPtr<std::shared_ptr<T>> {
+ using type = typename std::decay<T>::type;
+};
+
+template <typename F>
+using ResultOf =
+ typename NonSharedPtr<typename std::result_of<F()>::type>::type;
+
+template <typename F>
+using ResultOfUnwrapSharedPtr =
+ typename UnwrapSharedPtr<typename std::result_of<F()>::type>::type;
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+#include <folly/MPMCQueue.h>
+#include <folly/Singleton.h>
+
+namespace folly {
+namespace observer_detail {
+
+FOLLY_TLS bool ObserverManager::inManagerThread_{false};
+FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
+ ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
+
+namespace {
+constexpr size_t kCurrentThreadPoolSize{4};
+constexpr size_t kCurrentQueueSize{10 * 1024};
+constexpr size_t kNextQueueSize{10 * 1024};
+}
+
+class ObserverManager::CurrentQueue {
+ public:
+ CurrentQueue() : queue_(kCurrentQueueSize) {
+ for (size_t i = 0; i < kCurrentThreadPoolSize; ++i) {
+ threads_.emplace_back([&]() {
+ ObserverManager::inManagerThread_ = true;
+
+ while (true) {
+ Function<void()> task;
+ queue_.blockingRead(task);
+
+ if (!task) {
+ return;
+ }
+
+ try {
+ task();
+ } catch (...) {
+ LOG(ERROR) << "Exception while running CurrentQueue task: "
+ << exceptionStr(std::current_exception());
+ }
+ }
+ });
+ }
+ }
+
+ ~CurrentQueue() {
+ for (size_t i = 0; i < threads_.size(); ++i) {
+ queue_.blockingWrite(nullptr);
+ }
+
+ for (auto& thread : threads_) {
+ thread.join();
+ }
+
+ CHECK(queue_.isEmpty());
+ }
+
+ void add(Function<void()> task) {
+ if (ObserverManager::inManagerThread()) {
+ if (!queue_.write(std::move(task))) {
+ throw std::runtime_error("Too many Observers scheduled for update.");
+ }
+ } else {
+ queue_.blockingWrite(std::move(task));
+ }
+ }
+
+ private:
+ MPMCQueue<Function<void()>> queue_;
+ std::vector<std::thread> threads_;
+};
+
+class ObserverManager::NextQueue {
+ public:
+ explicit NextQueue(ObserverManager& manager)
+ : manager_(manager), queue_(kNextQueueSize) {
+ thread_ = std::thread([&]() {
+ Core::Ptr queueCore;
+
+ while (true) {
+ queue_.blockingRead(queueCore);
+
+ if (!queueCore) {
+ return;
+ }
+
+ std::vector<Core::Ptr> cores;
+ cores.emplace_back(std::move(queueCore));
+
+ {
+ SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
+
+ // We can't pick more tasks from the queue after we bumped the
+ // version, so we have to do this while holding the lock.
+ while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
+ if (!queueCore) {
+ return;
+ }
+ cores.emplace_back(std::move(queueCore));
+ }
+
+ ++manager_.version_;
+ }
+
+ for (auto& core : cores) {
+ manager_.scheduleRefresh(std::move(core), manager_.version_, true);
+ }
+ }
+ });
+ }
+
+ void add(Core::Ptr core) {
+ queue_.blockingWrite(std::move(core));
+ }
+
+ ~NextQueue() {
+ // Emtpy element signals thread to terminate
+ queue_.blockingWrite(nullptr);
+ thread_.join();
+ }
+
+ private:
+ ObserverManager& manager_;
+ MPMCQueue<Core::Ptr> queue_;
+ std::thread thread_;
+};
+
+ObserverManager::ObserverManager() {
+ currentQueue_ = make_unique<CurrentQueue>();
+ nextQueue_ = make_unique<NextQueue>(*this);
+}
+
+ObserverManager::~ObserverManager() {
+ // Destroy NextQueue, before the rest of this object, since it expects
+ // ObserverManager to be alive.
+ nextQueue_.release();
+ currentQueue_.release();
+}
+
+void ObserverManager::scheduleCurrent(Function<void()> task) {
+ currentQueue_->add(std::move(task));
+}
+
+void ObserverManager::scheduleNext(Core::Ptr core) {
+ nextQueue_->add(std::move(core));
+}
+
+struct ObserverManager::Singleton {
+ static folly::Singleton<ObserverManager> instance;
+};
+
+folly::Singleton<ObserverManager> ObserverManager::Singleton::instance([] {
+ return new ObserverManager();
+});
+
+std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
+ return Singleton::instance.try_get();
+}
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/futures/Future.h>
+
+namespace folly {
+namespace observer_detail {
+
+/**
+ * ObserverManager is a singleton which controls the re-computation of all
+ * Observers. Such re-computation always happens on the thread pool owned by
+ * ObserverManager.
+ *
+ * ObserverManager has global current version. All existing Observers
+ * may have their version be less (yet to be updated) or equal (up to date)
+ * to the global current version.
+ *
+ * ObserverManager::CurrentQueue contains all of the Observers which need to be
+ * updated to the global current version. Those updates are peformed on the
+ * ObserverManager's thread pool, until the queue is empty. If some Observer is
+ * updated, all of its dependents are added to ObserverManager::CurrentQueue
+ * to be updated.
+ *
+ * If some leaf Observer (i.e. created from Observable) is updated, then current
+ * version of the ObserverManager should be bumped. All such updated leaf
+ * Observers are added to the ObserverManager::NextQueue.
+ *
+ * *Only* when ObserverManager::CurrentQueue is empty, the global current
+ * version is bumped and all updates from the ObserverManager::NextQueue are
+ * performed. If leaf Observer gets updated more then once before being picked
+ * from the ObserverManager::NextQueue, then only the last update is processed.
+ */
+class ObserverManager {
+ public:
+ static size_t getVersion() {
+ auto instance = getInstance();
+
+ if (!instance) {
+ return 1;
+ }
+
+ return instance->version_;
+ }
+
+ static bool inManagerThread() {
+ return inManagerThread_;
+ }
+
+ static Future<Unit>
+ scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
+ if (core->getVersion() >= minVersion) {
+ return makeFuture<Unit>(Unit());
+ }
+
+ auto instance = getInstance();
+
+ if (!instance) {
+ return makeFuture<Unit>(
+ std::logic_error("ObserverManager requested during shutdown"));
+ }
+
+ Promise<Unit> promise;
+ auto future = promise.getFuture();
+
+ SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
+
+ instance->scheduleCurrent([
+ core = std::move(core),
+ promise = std::move(promise),
+ instancePtr = instance.get(),
+ rh = std::move(rh),
+ force
+ ]() mutable {
+ promise.setWith([&]() { core->refresh(instancePtr->version_, force); });
+ });
+
+ return future;
+ }
+
+ static void scheduleRefreshNewVersion(Core::Ptr core) {
+ if (core->getVersion() == 0) {
+ scheduleRefresh(std::move(core), 1).get();
+ return;
+ }
+
+ auto instance = getInstance();
+
+ if (!instance) {
+ return;
+ }
+
+ instance->scheduleNext(std::move(core));
+ }
+
+ class DependencyRecorder {
+ public:
+ using Dependencies = std::unordered_set<Core::Ptr>;
+
+ DependencyRecorder() {
+ DCHECK(inManagerThread());
+
+ previousDepedencies_ = currentDependencies_;
+ currentDependencies_ = &dependencies_;
+ }
+
+ static void markDependency(Core::Ptr dependency) {
+ DCHECK(inManagerThread());
+ DCHECK(currentDependencies_);
+
+ currentDependencies_->insert(std::move(dependency));
+ }
+
+ Dependencies release() {
+ DCHECK(currentDependencies_ == &dependencies_);
+ std::swap(currentDependencies_, previousDepedencies_);
+ previousDepedencies_ = nullptr;
+
+ return std::move(dependencies_);
+ }
+
+ ~DependencyRecorder() {
+ if (previousDepedencies_) {
+ release();
+ }
+ }
+
+ private:
+ Dependencies dependencies_;
+ Dependencies* previousDepedencies_;
+
+ static FOLLY_TLS Dependencies* currentDependencies_;
+ };
+
+ ~ObserverManager();
+
+ private:
+ ObserverManager();
+
+ struct Singleton;
+
+ void scheduleCurrent(Function<void()>);
+ void scheduleNext(Core::Ptr);
+
+ class CurrentQueue;
+ class NextQueue;
+
+ std::unique_ptr<CurrentQueue> currentQueue_;
+ std::unique_ptr<NextQueue> nextQueue_;
+
+ static std::shared_ptr<ObserverManager> getInstance();
+ static FOLLY_TLS bool inManagerThread_;
+
+ /**
+ * Version mutex is used to make sure all updates are processed from the
+ * CurrentQueue, before bumping the version and moving to the NextQueue.
+ *
+ * To achieve this every task added to CurrentQueue holds a reader lock.
+ * NextQueue grabs a writer lock before bumping the version, so it can only
+ * happen if CurrentQueue is empty (notice that we use read-priority shared
+ * mutex).
+ */
+ SharedMutexReadPriority versionMutex_;
+ std::atomic<size_t> version_{1};
+};
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <thread>
+
+#include <folly/Baton.h>
+#include <folly/experimental/observer/SimpleObservable.h>
+
+using namespace folly::observer;
+
+TEST(Observer, Observable) {
+ SimpleObservable<int> observable(42);
+ auto observer = observable.getObserver();
+
+ EXPECT_EQ(42, **observer);
+
+ folly::Baton<> baton;
+ auto waitingObserver = makeObserver([observer, &baton]() {
+ *observer;
+ baton.post();
+ return folly::Unit();
+ });
+ baton.reset();
+
+ observable.setValue(24);
+
+ EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+ EXPECT_EQ(24, **observer);
+}
+
+TEST(Observer, MakeObserver) {
+ SimpleObservable<int> observable(42);
+
+ auto observer = makeObserver([child = observable.getObserver()]() {
+ return **child + 1;
+ });
+
+ EXPECT_EQ(43, **observer);
+
+ folly::Baton<> baton;
+ auto waitingObserver = makeObserver([observer, &baton]() {
+ *observer;
+ baton.post();
+ return folly::Unit();
+ });
+ baton.reset();
+
+ observable.setValue(24);
+
+ EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+ EXPECT_EQ(25, **observer);
+}
+
+TEST(Observer, MakeObserverDiamond) {
+ SimpleObservable<int> observable(42);
+
+ auto observer1 = makeObserver([child = observable.getObserver()]() {
+ return **child + 1;
+ });
+
+ auto observer2 = makeObserver([child = observable.getObserver()]() {
+ return std::make_shared<int>(**child + 2);
+ });
+
+ auto observer = makeObserver(
+ [observer1, observer2]() { return (**observer1) * (**observer2); });
+
+ EXPECT_EQ(43 * 44, *observer.getSnapshot());
+
+ folly::Baton<> baton;
+ auto waitingObserver = makeObserver([observer, &baton]() {
+ *observer;
+ baton.post();
+ return folly::Unit();
+ });
+ baton.reset();
+
+ observable.setValue(24);
+
+ EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+ EXPECT_EQ(25 * 26, **observer);
+}
+
+TEST(Observer, CreateException) {
+ struct ExpectedException {};
+ EXPECT_THROW(
+ auto observer = makeObserver(
+ []() -> std::shared_ptr<int> { throw ExpectedException(); }),
+ ExpectedException);
+
+ EXPECT_THROW(
+ auto observer =
+ makeObserver([]() -> std::shared_ptr<int> { return nullptr; }),
+ std::logic_error);
+}
+
+TEST(Observer, NullValue) {
+ SimpleObservable<int> observable(41);
+ auto oddObserver = makeObserver([innerObserver = observable.getObserver()]() {
+ auto value = **innerObserver;
+
+ if (value % 2 != 0) {
+ return value * 2;
+ }
+
+ throw std::logic_error("I prefer odd numbers");
+ });
+
+ folly::Baton<> baton;
+ auto waitingObserver = makeObserver([oddObserver, &baton]() {
+ *oddObserver;
+ baton.post();
+ return folly::Unit();
+ });
+
+ baton.reset();
+ EXPECT_EQ(82, **oddObserver);
+
+ observable.setValue(2);
+
+ // Waiting observer shouldn't be updated
+ EXPECT_FALSE(baton.timed_wait(std::chrono::seconds{1}));
+ baton.reset();
+
+ EXPECT_EQ(82, **oddObserver);
+
+ observable.setValue(23);
+
+ EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+ EXPECT_EQ(46, **oddObserver);
+}
+
+TEST(Observer, Stress) {
+ SimpleObservable<int> observable(0);
+
+ folly::Synchronized<std::vector<int>> values;
+
+ auto observer = makeObserver([ child = observable.getObserver(), &values ]() {
+ auto value = **child * 10;
+ values.withWLock(
+ [&](std::vector<int>& values) { values.push_back(value); });
+ return value;
+ });
+
+ EXPECT_EQ(0, **observer);
+ values.withRLock([](const std::vector<int>& values) {
+ EXPECT_EQ(1, values.size());
+ EXPECT_EQ(0, values.back());
+ });
+
+ constexpr size_t numIters = 10000;
+
+ for (size_t i = 1; i <= numIters; ++i) {
+ observable.setValue(i);
+ }
+
+ while (**observer != numIters * 10) {
+ std::this_thread::yield();
+ }
+
+ values.withRLock([numIters = numIters](const std::vector<int>& values) {
+ EXPECT_EQ(numIters * 10, values.back());
+ EXPECT_LT(values.size(), numIters / 2);
+ EXPECT_GT(values.size(), 10);
+
+ for (auto value : values) {
+ EXPECT_EQ(0, value % 10);
+ }
+
+ for (size_t i = 0; i < values.size() - 1; ++i) {
+ EXPECT_LE(values[i], values[i + 1]);
+ }
+ });
+}