2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 template <typename Observable, typename Traits>
22 class ObserverCreator<Observable, Traits>::Context {
24 template <typename... Args>
25 Context(Args&&... args) : observable_(std::forward<Args>(args)...) {
31 Traits::unsubscribe(observable_);
35 void setCore(observer_detail::Core::WeakPtr coreWeak) {
36 coreWeak_ = std::move(coreWeak);
39 std::shared_ptr<const T> get() {
40 updateRequested_ = false;
45 // This mutex ensures there's no race condition between initial update()
46 // call and update() calls from the subsciption callback.
48 // Additionally it helps avoid races between two different subscription
49 // callbacks (getting new value from observable and storing it into value_
51 std::lock_guard<std::mutex> lg(updateMutex_);
54 bool expected = false;
55 if (updateRequested_.compare_exchange_strong(expected, true)) {
56 observer_detail::ObserverManager::scheduleRefreshNewVersion(coreWeak_);
61 void subscribe(F&& callback) {
62 Traits::subscribe(observable_, std::forward<F>(callback));
67 auto newValue = Traits::get(observable_);
69 throw std::logic_error("Observable returned nullptr.");
71 value_.swap(newValue);
74 folly::Synchronized<std::shared_ptr<const T>> value_;
75 std::atomic<bool> updateRequested_{false};
77 observer_detail::Core::WeakPtr coreWeak_;
79 Observable observable_;
81 std::mutex updateMutex_;
84 template <typename Observable, typename Traits>
85 template <typename... Args>
86 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
87 : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
89 template <typename Observable, typename Traits>
90 Observer<typename ObserverCreator<Observable, Traits>::T>
91 ObserverCreator<Observable, Traits>::getObserver()&& {
92 // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
93 // the same Context object, but using a separate reference count. Master
94 // shared_ptr destructor then blocks until all shared_ptrs obtained from
95 // derived weak_ptrs are released.
96 class ContextMasterPointer {
98 explicit ContextMasterPointer(std::shared_ptr<Context> context)
99 : contextMaster_(std::move(context)),
101 contextMaster_.get(),
102 [destroyBaton = destroyBaton_](Context*) {
103 destroyBaton->post();
105 ~ContextMasterPointer() {
108 destroyBaton_->wait();
111 ContextMasterPointer(const ContextMasterPointer&) = delete;
112 ContextMasterPointer(ContextMasterPointer&&) = default;
113 ContextMasterPointer& operator=(const ContextMasterPointer&) = delete;
114 ContextMasterPointer& operator=(ContextMasterPointer&&) = default;
116 Context* operator->() const {
117 return contextMaster_.get();
120 std::weak_ptr<Context> get_weak() {
125 std::shared_ptr<folly::Baton<>> destroyBaton_{
126 std::make_shared<folly::Baton<>>()};
127 std::shared_ptr<Context> contextMaster_;
128 std::shared_ptr<Context> context_;
130 // We want to make sure that Context can only be destroyed when Core is
131 // destroyed. So we have to avoid the situation when subscribe callback is
132 // locking Context shared_ptr and remains the last to release it.
133 // We solve this by having Core hold the master shared_ptr and subscription
134 // callback gets derived weak_ptr.
135 ContextMasterPointer contextMaster(context_);
136 auto contextWeak = contextMaster.get_weak();
137 auto observer = makeObserver([context = std::move(contextMaster)]() {
138 return context->get();
141 context_->setCore(observer.core_);
142 context_->subscribe([contextWeak = std::move(contextWeak)] {
143 if (auto context = contextWeak.lock()) {
148 // Do an extra update in case observable was updated between observer creation
149 // and setting updates callback.