2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/rx/types.h> // must come first
20 #include <folly/wangle/rx/Subject.h>
21 #include <folly/wangle/rx/Subscription.h>
23 #include <folly/RWSpinLock.h>
24 #include <folly/SmallLocks.h>
25 #include <folly/ThreadLocal.h>
26 #include <folly/small_vector.h>
27 #include <folly/Executor.h>
31 namespace folly { namespace wangle {
33 template <class T, size_t InlineObservers>
36 Observable() : nextSubscriptionId_{1} {}
38 // TODO perhaps we want to provide this #5283229
39 Observable(Observable&& other) = delete;
41 virtual ~Observable() {
43 unsubscriber_->disable();
47 // The next three methods subscribe the given Observer to this Observable.
49 // If these are called within an Observer callback, the new observer will not
50 // get the current update but will get subsequent updates.
52 // subscribe() returns a Subscription object. The observer will continue to
53 // get updates until the Subscription is destroyed.
55 // observe(ObserverPtr<T>) creates an indefinite subscription
57 // observe(Observer<T>*) also creates an indefinite subscription, but the
58 // caller is responsible for ensuring that the given Observer outlives this
59 // Observable. This might be useful in high performance environments where
60 // allocations must be kept to a minimum. Template parameter InlineObservers
61 // specifies how many observers can been subscribed inline without any
62 // allocations (it's just the size of a folly::small_vector).
63 virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
64 return subscribeImpl(observer, false);
67 virtual void observe(ObserverPtr<T> observer) {
68 subscribeImpl(observer, true);
71 virtual void observe(Observer<T>* observer) {
72 if (inCallback_ && *inCallback_) {
74 newObservers_.reset(new ObserverList());
76 newObservers_->push_back(observer);
78 RWSpinLock::WriteHolder{&observersLock_};
79 observers_.push_back(observer);
83 // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
85 /// Returns a new Observable that will call back on the given Scheduler.
86 /// The returned Observable must outlive the parent Observable.
88 // This and subscribeOn should maybe just be a first-class feature of an
89 // Observable, rather than making new ones whose lifetimes are tied to their
90 // parents. In that case it'd return a reference to this object for
92 ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
93 // you're right Hannes, if we have Observable::create we don't need this
95 struct ViaSubject : public Observable<T>
97 ViaSubject(SchedulerPtr sched,
99 : scheduler_(sched), observable_(obs)
102 Subscription<T> subscribe(ObserverPtr<T> o) override {
103 return observable_->subscribe(
105 [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
106 [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
107 [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
111 SchedulerPtr scheduler_;
112 Observable* observable_;
115 return std::make_shared<ViaSubject>(scheduler, this);
118 /// Returns a new Observable that will subscribe to this parent Observable
119 /// via the given Scheduler. This can be subtle and confusing at first, see
120 /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
121 std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
122 struct Subject_ : public Subject<T> {
124 Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
127 Subscription<T> subscribe(ObserverPtr<T> o) {
128 scheduler_->add([=] {
129 observable_->subscribe(o);
131 return Subscription<T>(nullptr, 0); // TODO
135 SchedulerPtr scheduler_;
136 Observable* observable_;
139 return folly::make_unique<Subject_>(scheduler, this);
143 // Safely execute an operation on each observer. F must take a single
144 // Observer<T>* as its argument.
146 void forEachObserver(F f) {
147 if (UNLIKELY(!inCallback_)) {
148 inCallback_.reset(new bool{false});
150 CHECK(!(*inCallback_));
154 RWSpinLock::ReadHolder rh(observersLock_);
155 for (auto o : observers_) {
159 for (auto& kv : subscribers_) {
164 if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
165 (newSubscribers_ && !newSubscribers_->empty()) ||
166 (oldSubscribers_ && !oldSubscribers_->empty()))) {
168 RWSpinLock::WriteHolder wh(observersLock_);
170 for (auto observer : *(newObservers_)) {
171 observers_.push_back(observer);
173 newObservers_->clear();
175 if (newSubscribers_) {
176 for (auto& kv : *(newSubscribers_)) {
177 subscribers_.insert(std::move(kv));
179 newSubscribers_->clear();
181 if (oldSubscribers_) {
182 for (auto id : *(oldSubscribers_)) {
183 subscribers_.erase(id);
185 oldSubscribers_->clear();
189 *inCallback_ = false;
193 Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
194 auto subscription = makeSubscription(indefinite);
195 typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
196 if (inCallback_ && *inCallback_) {
197 if (!newSubscribers_) {
198 newSubscribers_.reset(new SubscriberMap());
200 newSubscribers_->insert(std::move(kv));
202 RWSpinLock::WriteHolder{&observersLock_};
203 subscribers_.insert(std::move(kv));
210 explicit Unsubscriber(Observable* observable) : observable_(observable) {
214 void unsubscribe(uint64_t id) {
216 RWSpinLock::ReadHolder guard(lock_);
218 observable_->unsubscribe(id);
223 RWSpinLock::WriteHolder guard(lock_);
224 observable_ = nullptr;
229 Observable* observable_;
232 std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
233 MicroSpinLock unsubscriberLock_{0};
235 friend class Subscription<T>;
237 void unsubscribe(uint64_t id) {
238 if (inCallback_ && *inCallback_) {
239 if (!oldSubscribers_) {
240 oldSubscribers_.reset(new std::vector<uint64_t>());
242 if (newSubscribers_) {
243 auto it = newSubscribers_->find(id);
244 if (it != newSubscribers_->end()) {
245 newSubscribers_->erase(it);
249 oldSubscribers_->push_back(id);
251 RWSpinLock::WriteHolder{&observersLock_};
252 subscribers_.erase(id);
256 Subscription<T> makeSubscription(bool indefinite) {
258 return Subscription<T>(nullptr, nextSubscriptionId_++);
260 if (!unsubscriber_) {
261 std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
262 if (!unsubscriber_) {
263 unsubscriber_ = std::make_shared<Unsubscriber>(this);
266 return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
270 std::atomic<uint64_t> nextSubscriptionId_;
271 RWSpinLock observersLock_;
272 folly::ThreadLocalPtr<bool> inCallback_;
274 typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
275 ObserverList observers_;
276 folly::ThreadLocalPtr<ObserverList> newObservers_;
278 typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
279 SubscriberMap subscribers_;
280 folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
281 folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;