#pragma once
-#include "types.h"
-#include "Subject.h"
-#include "Subscription.h"
+#include <folly/experimental/wangle/rx/Subject.h>
+#include <folly/experimental/wangle/rx/Subscription.h>
+#include <folly/experimental/wangle/rx/types.h>
#include <folly/RWSpinLock.h>
+#include <folly/SmallLocks.h>
#include <folly/ThreadLocal.h>
#include <folly/wangle/Executor.h>
-#include <list>
+#include <map>
#include <memory>
namespace folly { namespace wangle {
template <class T>
class Observable {
public:
- Observable() = default;
- Observable(Observable&& other) noexcept {
- RWSpinLock::WriteHolder{&other.observersLock_};
- observers_ = std::move(other.observers_);
+ Observable() : nextSubscriptionId_{1} {}
+
+ // TODO perhaps we want to provide this #5283229
+ Observable(Observable&& other) = delete;
+
+ virtual ~Observable() {
+ if (unsubscriber_) {
+ unsubscriber_->disable();
+ }
}
- virtual ~Observable() = default;
+ typedef typename std::map<uint64_t, ObserverPtr<T>> ObserverMap;
- /// Subscribe the given Observer to this Observable.
- // Eventually this will return a Subscription object of some kind, in order
- // to support cancellation. This is kinda really important. Maybe I should
- // just do it now, using an dummy Subscription object.
+ // Subscribe the given Observer to this Observable.
//
// If this is called within an Observer callback, the new observer will not
// get the current update but will get subsequent updates.
- virtual Subscription subscribe(ObserverPtr<T> o) {
+ virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
+ auto subscription = makeSubscription();
+ typename ObserverMap::value_type kv{subscription.id_, std::move(observer)};
if (inCallback_ && *inCallback_) {
if (!newObservers_) {
- newObservers_.reset(new std::list<ObserverPtr<T>>());
+ newObservers_.reset(new ObserverMap());
}
- newObservers_->push_back(o);
+ newObservers_->insert(std::move(kv));
} else {
RWSpinLock::WriteHolder{&observersLock_};
- observers_.push_back(o);
+ observers_.insert(std::move(kv));
}
- return Subscription();
+ return subscription;
}
/// Returns a new Observable that will call back on the given Scheduler.
: scheduler_(scheduler), observable_(obs)
{}
- Subscription subscribe(ObserverPtr<T> o) override {
+ Subscription<T> subscribe(ObserverPtr<T> o) override {
return observable_->subscribe(
Observer<T>::create(
[=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
}
- Subscription subscribe(ObserverPtr<T> o) {
+ Subscription<T> subscribe(ObserverPtr<T> o) {
scheduler_->add([=] {
observable_->subscribe(o);
});
- return Subscription();
+ return Subscription<T>(nullptr, 0); // TODO
}
protected:
}
protected:
- const std::list<ObserverPtr<T>>& getObservers() {
+ const ObserverMap& getObservers() {
return observers_;
}
~ObserversGuard() {
o_->observersLock_.unlock_shared();
- if (UNLIKELY(o_->newObservers_ && !o_->newObservers_->empty())) {
+ if (UNLIKELY((o_->newObservers_ && !o_->newObservers_->empty()) ||
+ (o_->oldObservers_ && !o_->oldObservers_->empty()))) {
{
RWSpinLock::WriteHolder(o_->observersLock_);
- for (auto& o : *(o_->newObservers_)) {
- o_->observers_.push_back(o);
+ if (o_->newObservers_) {
+ for (auto& kv : *(o_->newObservers_)) {
+ o_->observers_.insert(std::move(kv));
+ }
+ o_->newObservers_->clear();
+ }
+ if (o_->oldObservers_) {
+ for (auto id : *(o_->oldObservers_)) {
+ o_->observers_.erase(id);
+ }
+ o_->oldObservers_->clear();
}
}
- o_->newObservers_->clear();
}
*o_->inCallback_ = false;
}
};
private:
- std::list<ObserverPtr<T>> observers_;
+ class Unsubscriber {
+ public:
+ explicit Unsubscriber(Observable* observable) : observable_(observable) {
+ CHECK(observable_);
+ }
+
+ void unsubscribe(uint64_t id) {
+ CHECK(id > 0);
+ RWSpinLock::ReadHolder guard(lock_);
+ if (observable_) {
+ observable_->unsubscribe(id);
+ }
+ }
+
+ void disable() {
+ RWSpinLock::WriteHolder guard(lock_);
+ observable_ = nullptr;
+ }
+
+ private:
+ RWSpinLock lock_;
+ Observable* observable_;
+ };
+
+ std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
+ MicroSpinLock unsubscriberLock_{0};
+
+ friend class Subscription<T>;
+
+ void unsubscribe(uint64_t id) {
+ if (inCallback_ && *inCallback_) {
+ if (!oldObservers_) {
+ oldObservers_.reset(new std::vector<uint64_t>());
+ }
+ if (newObservers_) {
+ auto it = newObservers_->find(id);
+ if (it != newObservers_->end()) {
+ newObservers_->erase(it);
+ return;
+ }
+ }
+ oldObservers_->push_back(id);
+ } else {
+ RWSpinLock::WriteHolder{&observersLock_};
+ observers_.erase(id);
+ }
+ }
+
+ Subscription<T> makeSubscription() {
+ if (!unsubscriber_) {
+ std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
+ if (!unsubscriber_) {
+ unsubscriber_ = std::make_shared<Unsubscriber>(this);
+ }
+ }
+ return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
+ }
+
+ std::atomic<uint64_t> nextSubscriptionId_;
+ ObserverMap observers_;
RWSpinLock observersLock_;
folly::ThreadLocalPtr<bool> inCallback_;
- folly::ThreadLocalPtr<std::list<ObserverPtr<T>>> newObservers_;
+ folly::ThreadLocalPtr<ObserverMap> newObservers_;
+ folly::ThreadLocalPtr<std::vector<uint64_t>> oldObservers_;
};
}}
using namespace folly::wangle;
+static std::unique_ptr<Observer<int>> incrementer(int& counter) {
+ return Observer<int>::create([&] (int x) {
+ counter++;
+ });
+}
+
+TEST(RxTest, Subscription) {
+ Subject<int> subject;
+ auto count = 0;
+ {
+ auto s = subject.subscribe(incrementer(count));
+ subject.onNext(1);
+ }
+ // The subscription has gone out of scope so no one should get this.
+ subject.onNext(2);
+ EXPECT_EQ(1, count);
+}
+
+TEST(RxTest, SubscriptionMove) {
+ Subject<int> subject;
+ auto count = 0;
+ auto s = subject.subscribe(incrementer(count));
+ auto s2 = subject.subscribe(incrementer(count));
+ s2 = std::move(s);
+ subject.onNext(1);
+ Subscription<int> s3(std::move(s2));
+ subject.onNext(2);
+ EXPECT_EQ(2, count);
+}
+
+TEST(RxTest, SubscriptionOutlivesSubject) {
+ Subscription<int> s;
+ {
+ Subject<int> subject;
+ s = subject.subscribe(Observer<int>::create([](int){}));
+ }
+ // Don't explode when s is destroyed
+}
+
TEST(RxTest, SubscribeDuringCallback) {
// A subscriber who was subscribed in the course of a callback should get
// subsequent updates but not the current update.
Subject<int> subject;
- int outerCount = 0;
- int innerCount = 0;
- subject.subscribe(Observer<int>::create([&] (int x) {
+ int outerCount = 0, innerCount = 0;
+ Subscription<int> s1, s2;
+ s1 = subject.subscribe(Observer<int>::create([&] (int x) {
outerCount++;
- subject.subscribe(Observer<int>::create([&] (int y) {
- innerCount++;
- }));
+ s2 = subject.subscribe(incrementer(innerCount));
}));
subject.onNext(42);
subject.onNext(0xDEADBEEF);
EXPECT_EQ(2, outerCount);
EXPECT_EQ(1, innerCount);
}
+
+TEST(RxTest, UnsubscribeDuringCallback) {
+ // A subscriber who was unsubscribed in the course of a callback should get
+ // the current update but not subsequent ones
+ Subject<int> subject;
+ int count1 = 0, count2 = 0;
+ auto s1 = subject.subscribe(incrementer(count1));
+ auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
+ count2++;
+ s1.~Subscription();
+ }));
+ subject.onNext(1);
+ subject.onNext(2);
+ EXPECT_EQ(1, count1);
+ EXPECT_EQ(2, count2);
+}
+
+TEST(RxTest, SubscribeUnsubscribeDuringCallback) {
+ // A subscriber who was subscribed and unsubscribed in the course of a
+ // callback should not get any updates
+ Subject<int> subject;
+ int outerCount = 0, innerCount = 0;
+ auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
+ outerCount++;
+ auto s2 = subject.subscribe(incrementer(innerCount));
+ }));
+ subject.onNext(1);
+ subject.onNext(2);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(0, innerCount);
+}