#include "Subject.h"
#include "Subscription.h"
+#include <folly/RWSpinLock.h>
+#include <folly/ThreadLocal.h>
#include <folly/wangle/Executor.h>
#include <list>
#include <memory>
namespace folly { namespace wangle {
template <class T>
-struct Observable {
+class Observable {
+ public:
+ Observable() = default;
+ Observable(Observable&& other) noexcept {
+ RWSpinLock::WriteHolder{&other.observersLock_};
+ observers_ = std::move(other.observers_);
+ }
+
virtual ~Observable() = default;
/// Subscribe the given Observer to this Observable.
// Eventually this will return a Subscription object of some kind, in order
// to support cancellation. This is kinda really important. Maybe I should
// just do it now, using an dummy Subscription object.
+ //
+ // If this is called within an Observer callback, the new observer will not
+ // get the current update but will get subsequent updates.
virtual Subscription subscribe(ObserverPtr<T> o) {
- observers_.push_back(o);
+ if (inCallback_ && *inCallback_) {
+ if (!newObservers_) {
+ newObservers_.reset(new std::list<ObserverPtr<T>>());
+ }
+ newObservers_->push_back(o);
+ } else {
+ RWSpinLock::WriteHolder{&observersLock_};
+ observers_.push_back(o);
+ }
return Subscription();
}
}
protected:
+ const std::list<ObserverPtr<T>>& getObservers() {
+ return observers_;
+ }
+
+ // This guard manages deferred modification of the observers list.
+ // Subclasses should use this guard if they want to subscribe new observers
+ // in the course of a callback. New observers won't be added until the guard
+ // goes out of scope. See Subject for an example.
+ class ObserversGuard {
+ public:
+ explicit ObserversGuard(Observable* o) : o_(o) {
+ if (UNLIKELY(!o_->inCallback_)) {
+ o_->inCallback_.reset(new bool{false});
+ }
+ CHECK(!(*o_->inCallback_));
+ *o_->inCallback_ = true;
+ o_->observersLock_.lock_shared();
+ }
+
+ ~ObserversGuard() {
+ o_->observersLock_.unlock_shared();
+ if (UNLIKELY(o_->newObservers_ && !o_->newObservers_->empty())) {
+ {
+ RWSpinLock::WriteHolder(o_->observersLock_);
+ for (auto& o : *(o_->newObservers_)) {
+ o_->observers_.push_back(o);
+ }
+ }
+ o_->newObservers_->clear();
+ }
+ *o_->inCallback_ = false;
+ }
+
+ private:
+ Observable* o_;
+ };
+
+ private:
std::list<ObserverPtr<T>> observers_;
+ RWSpinLock observersLock_;
+ folly::ThreadLocalPtr<bool> inCallback_;
+ folly::ThreadLocalPtr<std::list<ObserverPtr<T>>> newObservers_;
};
}}
/// observed events to the Subject's observers.
template <class T>
struct Subject : public Observable<T>, public Observer<T> {
+ typedef typename Observable<T>::ObserversGuard ObserversGuard;
void onNext(T val) override {
- for (auto& o : this->observers_)
+ ObserversGuard guard(this);
+ for (auto& o : Observable<T>::getObservers()) {
o->onNext(val);
+ }
}
void onError(Error e) override {
- for (auto& o : this->observers_)
+ ObserversGuard guard(this);
+ for (auto& o : Observable<T>::getObservers()) {
o->onError(e);
+ }
}
void onCompleted() override {
- for (auto& o : this->observers_)
+ ObserversGuard guard(this);
+ for (auto& o : Observable<T>::getObservers()) {
o->onCompleted();
+ }
}
};
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/rx/Observer.h>
+#include <folly/experimental/wangle/rx/Subject.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+
+TEST(RxTest, SubscribeDuringCallback) {
+ // A subscriber who was subscribed in the course of a callback should get
+ // subsequent updates but not the current update.
+ Subject<int> subject;
+ int outerCount = 0;
+ int innerCount = 0;
+ subject.subscribe(Observer<int>::create([&] (int x) {
+ outerCount++;
+ subject.subscribe(Observer<int>::create([&] (int y) {
+ innerCount++;
+ }));
+ }));
+ subject.onNext(42);
+ subject.onNext(0xDEADBEEF);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(1, innerCount);
+}