#include <folly/RWSpinLock.h>
#include <folly/SmallLocks.h>
#include <folly/ThreadLocal.h>
+#include <folly/small_vector.h>
#include <folly/wangle/Executor.h>
#include <map>
#include <memory>
namespace folly { namespace wangle {
-template <class T>
+template <class T, size_t InlineObservers>
class Observable {
public:
Observable() : nextSubscriptionId_{1} {}
}
}
- typedef typename std::map<uint64_t, ObserverPtr<T>> ObserverMap;
-
- // Subscribe the given Observer to this Observable.
+ // The next three methods subscribe the given Observer to this Observable.
//
- // If this is called within an Observer callback, the new observer will not
+ // If these are called within an Observer callback, the new observer will not
// get the current update but will get subsequent updates.
+ //
+ // subscribe() returns a Subscription object. The observer will continue to
+ // get updates until the Subscription is destroyed.
+ //
+ // observe(ObserverPtr<T>) creates an indefinite subscription
+ //
+ // observe(Observer<T>*) also creates an indefinite subscription, but the
+ // caller is responsible for ensuring that the given Observer outlives this
+ // Observable. This might be useful in high performance environments where
+ // allocations must be kept to a minimum. Template parameter InlineObservers
+ // specifies how many observers can been subscribed inline without any
+ // allocations (it's just the size of a folly::small_vector).
virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
- auto subscription = makeSubscription();
- typename ObserverMap::value_type kv{subscription.id_, std::move(observer)};
+ return subscribeImpl(observer, false);
+ }
+
+ virtual void observe(ObserverPtr<T> observer) {
+ subscribeImpl(observer, true);
+ }
+
+ virtual void observe(Observer<T>* observer) {
if (inCallback_ && *inCallback_) {
if (!newObservers_) {
- newObservers_.reset(new ObserverMap());
+ newObservers_.reset(new ObserverList());
}
- newObservers_->insert(std::move(kv));
+ newObservers_->push_back(observer);
} else {
RWSpinLock::WriteHolder{&observersLock_};
- observers_.insert(std::move(kv));
+ observers_.push_back(observer);
}
- return subscription;
}
+ // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
+
/// Returns a new Observable that will call back on the given Scheduler.
/// The returned Observable must outlive the parent Observable.
}
protected:
- const ObserverMap& getObservers() {
- return observers_;
- }
+ // Safely execute an operation on each observer. F must take a single
+ // Observer<T>* as its argument.
+ template <class F>
+ void forEachObserver(F f) {
+ if (UNLIKELY(!inCallback_)) {
+ inCallback_.reset(new bool{false});
+ }
+ CHECK(!(*inCallback_));
+ *inCallback_ = true;
- // This guard manages deferred modification of the observers list.
- // Subclasses should use this guard if they want to subscribe new observers
- // in the course of a callback. New observers won't be added until the guard
- // goes out of scope. See Subject for an example.
- class ObserversGuard {
- public:
- explicit ObserversGuard(Observable* o) : o_(o) {
- if (UNLIKELY(!o_->inCallback_)) {
- o_->inCallback_.reset(new bool{false});
+ {
+ RWSpinLock::ReadHolder(observersLock_);
+ for (auto o : observers_) {
+ f(o);
+ }
+
+ for (auto& kv : subscribers_) {
+ f(kv.second.get());
}
- CHECK(!(*o_->inCallback_));
- *o_->inCallback_ = true;
- o_->observersLock_.lock_shared();
}
- ~ObserversGuard() {
- o_->observersLock_.unlock_shared();
- if (UNLIKELY((o_->newObservers_ && !o_->newObservers_->empty()) ||
- (o_->oldObservers_ && !o_->oldObservers_->empty()))) {
- {
- RWSpinLock::WriteHolder(o_->observersLock_);
- if (o_->newObservers_) {
- for (auto& kv : *(o_->newObservers_)) {
- o_->observers_.insert(std::move(kv));
- }
- o_->newObservers_->clear();
+ if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
+ (newSubscribers_ && !newSubscribers_->empty()) ||
+ (oldSubscribers_ && !oldSubscribers_->empty()))) {
+ {
+ RWSpinLock::WriteHolder(observersLock_);
+ if (newObservers_) {
+ for (auto observer : *(newObservers_)) {
+ observers_.push_back(observer);
}
- if (o_->oldObservers_) {
- for (auto id : *(o_->oldObservers_)) {
- o_->observers_.erase(id);
- }
- o_->oldObservers_->clear();
+ newObservers_->clear();
+ }
+ if (newSubscribers_) {
+ for (auto& kv : *(newSubscribers_)) {
+ subscribers_.insert(std::move(kv));
}
+ newSubscribers_->clear();
+ }
+ if (oldSubscribers_) {
+ for (auto id : *(oldSubscribers_)) {
+ subscribers_.erase(id);
+ }
+ oldSubscribers_->clear();
}
}
- *o_->inCallback_ = false;
}
-
- private:
- Observable* o_;
- };
+ *inCallback_ = false;
+ }
private:
+ Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
+ auto subscription = makeSubscription(indefinite);
+ typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
+ if (inCallback_ && *inCallback_) {
+ if (!newSubscribers_) {
+ newSubscribers_.reset(new SubscriberMap());
+ }
+ newSubscribers_->insert(std::move(kv));
+ } else {
+ RWSpinLock::WriteHolder{&observersLock_};
+ subscribers_.insert(std::move(kv));
+ }
+ return subscription;
+ }
+
class Unsubscriber {
public:
explicit Unsubscriber(Observable* observable) : observable_(observable) {
void unsubscribe(uint64_t id) {
if (inCallback_ && *inCallback_) {
- if (!oldObservers_) {
- oldObservers_.reset(new std::vector<uint64_t>());
+ if (!oldSubscribers_) {
+ oldSubscribers_.reset(new std::vector<uint64_t>());
}
- if (newObservers_) {
- auto it = newObservers_->find(id);
- if (it != newObservers_->end()) {
- newObservers_->erase(it);
+ if (newSubscribers_) {
+ auto it = newSubscribers_->find(id);
+ if (it != newSubscribers_->end()) {
+ newSubscribers_->erase(it);
return;
}
}
- oldObservers_->push_back(id);
+ oldSubscribers_->push_back(id);
} else {
RWSpinLock::WriteHolder{&observersLock_};
- observers_.erase(id);
+ subscribers_.erase(id);
}
}
- Subscription<T> makeSubscription() {
- if (!unsubscriber_) {
- std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
+ Subscription<T> makeSubscription(bool indefinite) {
+ if (indefinite) {
+ return Subscription<T>(nullptr, nextSubscriptionId_++);
+ } else {
if (!unsubscriber_) {
- unsubscriber_ = std::make_shared<Unsubscriber>(this);
+ std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
+ if (!unsubscriber_) {
+ unsubscriber_ = std::make_shared<Unsubscriber>(this);
+ }
}
+ return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
}
- return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
}
std::atomic<uint64_t> nextSubscriptionId_;
- ObserverMap observers_;
RWSpinLock observersLock_;
folly::ThreadLocalPtr<bool> inCallback_;
- folly::ThreadLocalPtr<ObserverMap> newObservers_;
- folly::ThreadLocalPtr<std::vector<uint64_t>> oldObservers_;
+
+ typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
+ ObserverList observers_;
+ folly::ThreadLocalPtr<ObserverList> newObservers_;
+
+ typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
+ SubscriberMap subscribers_;
+ folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
+ folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
};
}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/Benchmark.h>
+#include <folly/experimental/wangle/rx/Observer.h>
+#include <folly/experimental/wangle/rx/Subject.h>
+#include <gflags/gflags.h>
+
+using namespace folly::wangle;
+using folly::BenchmarkSuspender;
+
+static std::unique_ptr<Observer<int>> makeObserver() {
+ return Observer<int>::create([&] (int x) {});
+}
+
+void subscribeImpl(uint iters, int N, bool countUnsubscribe) {
+ for (int iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<std::unique_ptr<Observer<int>>> observers;
+ std::vector<Subscription<int>> subscriptions;
+ subscriptions.reserve(N);
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver());
+ }
+ bs.dismiss();
+ for (int i = 0; i < N; i++) {
+ subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+ }
+ if (countUnsubscribe) {
+ subscriptions.clear();
+ }
+ bs.rehire();
+ }
+}
+
+void subscribeAndUnsubscribe(uint iters, int N) {
+ subscribeImpl(iters, N, true);
+}
+
+void subscribe(uint iters, int N) {
+ subscribeImpl(iters, N, false);
+}
+
+void observe(uint iters, int N) {
+ for (int iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<std::unique_ptr<Observer<int>>> observers;
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver());
+ }
+ bs.dismiss();
+ for (int i = 0; i < N; i++) {
+ subject.observe(std::move(observers[i]));
+ }
+ bs.rehire();
+ }
+}
+
+void inlineObserve(uint iters, int N) {
+ for (int iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<Observer<int>*> observers;
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver().release());
+ }
+ bs.dismiss();
+ for (int i = 0; i < N; i++) {
+ subject.observe(observers[i]);
+ }
+ bs.rehire();
+ for (int i = 0; i < N; i++) {
+ delete observers[i];
+ }
+ }
+}
+
+void notifySubscribers(uint iters, int N) {
+ for (int iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<std::unique_ptr<Observer<int>>> observers;
+ std::vector<Subscription<int>> subscriptions;
+ subscriptions.reserve(N);
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver());
+ }
+ for (int i = 0; i < N; i++) {
+ subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+ }
+ bs.dismiss();
+ subject.onNext(42);
+ bs.rehire();
+ }
+}
+
+void notifyInlineObservers(uint iters, int N) {
+ for (int iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<Observer<int>*> observers;
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver().release());
+ }
+ for (int i = 0; i < N; i++) {
+ subject.observe(observers[i]);
+ }
+ bs.dismiss();
+ subject.onNext(42);
+ bs.rehire();
+ }
+}
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1);
+BENCHMARK_RELATIVE_PARAM(observe, 1);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(observe, 1000);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1000);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000);
+
+int main(int argc, char** argv) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ folly::runBenchmarks();
+ return 0;
+}
});
}
+TEST(RxTest, Observe) {
+ Subject<int> subject;
+ auto count = 0;
+ subject.observe(incrementer(count));
+ subject.onNext(1);
+ EXPECT_EQ(1, count);
+}
+
+TEST(RxTest, ObserveInline) {
+ Subject<int> subject;
+ auto count = 0;
+ auto o = incrementer(count).release();
+ subject.observe(o);
+ subject.onNext(1);
+ EXPECT_EQ(1, count);
+ delete o;
+}
+
TEST(RxTest, Subscription) {
Subject<int> subject;
auto count = 0;
EXPECT_EQ(1, innerCount);
}
+TEST(RxTest, ObserveDuringCallback) {
+ Subject<int> subject;
+ int outerCount = 0, innerCount = 0;
+ subject.observe(Observer<int>::create([&] (int x) {
+ outerCount++;
+ subject.observe(incrementer(innerCount));
+ }));
+ subject.onNext(42);
+ subject.onNext(0xDEADBEEF);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(1, innerCount);
+}
+
+TEST(RxTest, ObserveInlineDuringCallback) {
+ Subject<int> subject;
+ int outerCount = 0, innerCount = 0;
+ auto innerO = incrementer(innerCount).release();
+ auto outerO = Observer<int>::create([&] (int x) {
+ outerCount++;
+ subject.observe(innerO);
+ }).release();
+ subject.observe(outerO);
+ subject.onNext(42);
+ subject.onNext(0xDEADBEEF);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(1, innerCount);
+ delete innerO;
+ delete outerO;
+}
+
TEST(RxTest, UnsubscribeDuringCallback) {
// A subscriber who was unsubscribed in the course of a callback should get
// the current update but not subsequent ones