2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 template <typename Observable, typename Traits>
22 class ObserverCreator<Observable, Traits>::Context {
24 template <typename... Args>
25 Context(Args&&... args) : observable_(std::forward<Args>(args)...) {
31 Traits::unsubscribe(observable_);
35 void setCore(observer_detail::Core::WeakPtr coreWeak) {
36 coreWeak_ = std::move(coreWeak);
39 std::shared_ptr<const T> get() {
40 updateRequested_ = false;
45 // This mutex ensures there's no race condition between initial update()
46 // call and update() calls from the subsciption callback.
48 // Additionally it helps avoid races between two different subscription
49 // callbacks (getting new value from observable and storing it into value_
51 std::lock_guard<std::mutex> lg(updateMutex_);
53 // Value didn't change, so we can skip the version update.
57 bool expected = false;
58 if (updateRequested_.compare_exchange_strong(expected, true)) {
59 observer_detail::ObserverManager::scheduleRefreshNewVersion(coreWeak_);
64 void subscribe(F&& callback) {
65 Traits::subscribe(observable_, std::forward<F>(callback));
70 auto newValue = Traits::get(observable_);
71 auto newValuePtr = newValue.get();
73 throw std::logic_error("Observable returned nullptr.");
75 value_.swap(newValue);
76 return newValuePtr != newValue.get();
79 folly::Synchronized<std::shared_ptr<const T>> value_;
80 std::atomic<bool> updateRequested_{false};
82 observer_detail::Core::WeakPtr coreWeak_;
84 Observable observable_;
86 std::mutex updateMutex_;
89 template <typename Observable, typename Traits>
90 template <typename... Args>
91 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
92 : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
94 template <typename Observable, typename Traits>
95 Observer<typename ObserverCreator<Observable, Traits>::T>
96 ObserverCreator<Observable, Traits>::getObserver()&& {
97 // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
98 // the same Context object, but using a separate reference count. Master
99 // shared_ptr destructor then blocks until all shared_ptrs obtained from
100 // derived weak_ptrs are released.
101 class ContextMasterPointer {
103 explicit ContextMasterPointer(std::shared_ptr<Context> context)
104 : contextMaster_(std::move(context)),
106 contextMaster_.get(),
107 [destroyBaton = destroyBaton_](Context*) {
108 destroyBaton->post();
110 ~ContextMasterPointer() {
113 destroyBaton_->wait();
116 ContextMasterPointer(const ContextMasterPointer&) = delete;
117 ContextMasterPointer(ContextMasterPointer&&) = default;
118 ContextMasterPointer& operator=(const ContextMasterPointer&) = delete;
119 ContextMasterPointer& operator=(ContextMasterPointer&&) = default;
121 Context* operator->() const {
122 return contextMaster_.get();
125 std::weak_ptr<Context> get_weak() {
130 std::shared_ptr<folly::Baton<>> destroyBaton_{
131 std::make_shared<folly::Baton<>>()};
132 std::shared_ptr<Context> contextMaster_;
133 std::shared_ptr<Context> context_;
135 // We want to make sure that Context can only be destroyed when Core is
136 // destroyed. So we have to avoid the situation when subscribe callback is
137 // locking Context shared_ptr and remains the last to release it.
138 // We solve this by having Core hold the master shared_ptr and subscription
139 // callback gets derived weak_ptr.
140 ContextMasterPointer contextMaster(context_);
141 auto contextWeak = contextMaster.get_weak();
142 auto observer = makeObserver([context = std::move(contextMaster)]() {
143 return context->get();
146 context_->setCore(observer.core_);
147 context_->subscribe([contextWeak = std::move(contextWeak)] {
148 if (auto context = contextWeak.lock()) {
153 // Do an extra update in case observable was updated between observer creation
154 // and setting updates callback.
160 } // namespace observer