From 55ee7359c4aa43ac5f3c2d5edb92717cc5284259 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 17 Nov 2014 15:26:00 -0800 Subject: [PATCH] Indefinite observers and inline observers Summary: provides new APIs observe(ObserverPtr) and observe(Observer*) that are not associated with a subscription. The latter method avoids allocations at the cost of disowning responsiblity for object lifetimes. It could be used for batch-allocating filter chains, for instance. I hacked up some recursive templates that would make that easy to do, but haven't included them. I'd rather see the first use case and then decide whether to let users set them up themselves or to provide an abstraction that makes it easier. Test Plan: unit Reviewed By: davejwatson@fb.com Subscribers: folly-diffs@, fugalh, njormrod FB internal diff: D1595200 Tasks: 5002278 Signature: t1:1595200:1416266578:95c0532af42443fd83e2f94d30790c3c0831ce65 --- folly/experimental/wangle/rx/Observable.h | 172 +++++++++++------- folly/experimental/wangle/rx/Subject.h | 22 +-- folly/experimental/wangle/rx/Subscription.h | 1 - .../wangle/rx/test/RxBenchmark.cpp | 155 ++++++++++++++++ folly/experimental/wangle/rx/test/RxTest.cpp | 48 +++++ folly/experimental/wangle/rx/types.h | 2 +- 6 files changed, 322 insertions(+), 78 deletions(-) create mode 100644 folly/experimental/wangle/rx/test/RxBenchmark.cpp diff --git a/folly/experimental/wangle/rx/Observable.h b/folly/experimental/wangle/rx/Observable.h index c4bc70da..c822bb71 100644 --- a/folly/experimental/wangle/rx/Observable.h +++ b/folly/experimental/wangle/rx/Observable.h @@ -23,13 +23,14 @@ #include #include #include +#include #include #include #include namespace folly { namespace wangle { -template +template class Observable { public: Observable() : nextSubscriptionId_{1} {} @@ -43,27 +44,44 @@ class Observable { } } - typedef typename std::map> ObserverMap; - - // Subscribe the given Observer to this Observable. + // The next three methods subscribe the given Observer to this Observable. // - // If this is called within an Observer callback, the new observer will not + // If these are called within an Observer callback, the new observer will not // get the current update but will get subsequent updates. + // + // subscribe() returns a Subscription object. The observer will continue to + // get updates until the Subscription is destroyed. + // + // observe(ObserverPtr) creates an indefinite subscription + // + // observe(Observer*) also creates an indefinite subscription, but the + // caller is responsible for ensuring that the given Observer outlives this + // Observable. This might be useful in high performance environments where + // allocations must be kept to a minimum. Template parameter InlineObservers + // specifies how many observers can been subscribed inline without any + // allocations (it's just the size of a folly::small_vector). virtual Subscription subscribe(ObserverPtr observer) { - auto subscription = makeSubscription(); - typename ObserverMap::value_type kv{subscription.id_, std::move(observer)}; + return subscribeImpl(observer, false); + } + + virtual void observe(ObserverPtr observer) { + subscribeImpl(observer, true); + } + + virtual void observe(Observer* observer) { if (inCallback_ && *inCallback_) { if (!newObservers_) { - newObservers_.reset(new ObserverMap()); + newObservers_.reset(new ObserverList()); } - newObservers_->insert(std::move(kv)); + newObservers_->push_back(observer); } else { RWSpinLock::WriteHolder{&observersLock_}; - observers_.insert(std::move(kv)); + observers_.push_back(observer); } - return subscription; } + // TODO unobserve(ObserverPtr), unobserve(Observer*) + /// Returns a new Observable that will call back on the given Scheduler. /// The returned Observable must outlive the parent Observable. @@ -122,53 +140,71 @@ class Observable { } protected: - const ObserverMap& getObservers() { - return observers_; - } + // Safely execute an operation on each observer. F must take a single + // Observer* as its argument. + template + void forEachObserver(F f) { + if (UNLIKELY(!inCallback_)) { + inCallback_.reset(new bool{false}); + } + CHECK(!(*inCallback_)); + *inCallback_ = true; - // This guard manages deferred modification of the observers list. - // Subclasses should use this guard if they want to subscribe new observers - // in the course of a callback. New observers won't be added until the guard - // goes out of scope. See Subject for an example. - class ObserversGuard { - public: - explicit ObserversGuard(Observable* o) : o_(o) { - if (UNLIKELY(!o_->inCallback_)) { - o_->inCallback_.reset(new bool{false}); + { + RWSpinLock::ReadHolder(observersLock_); + for (auto o : observers_) { + f(o); + } + + for (auto& kv : subscribers_) { + f(kv.second.get()); } - CHECK(!(*o_->inCallback_)); - *o_->inCallback_ = true; - o_->observersLock_.lock_shared(); } - ~ObserversGuard() { - o_->observersLock_.unlock_shared(); - if (UNLIKELY((o_->newObservers_ && !o_->newObservers_->empty()) || - (o_->oldObservers_ && !o_->oldObservers_->empty()))) { - { - RWSpinLock::WriteHolder(o_->observersLock_); - if (o_->newObservers_) { - for (auto& kv : *(o_->newObservers_)) { - o_->observers_.insert(std::move(kv)); - } - o_->newObservers_->clear(); + if (UNLIKELY((newObservers_ && !newObservers_->empty()) || + (newSubscribers_ && !newSubscribers_->empty()) || + (oldSubscribers_ && !oldSubscribers_->empty()))) { + { + RWSpinLock::WriteHolder(observersLock_); + if (newObservers_) { + for (auto observer : *(newObservers_)) { + observers_.push_back(observer); } - if (o_->oldObservers_) { - for (auto id : *(o_->oldObservers_)) { - o_->observers_.erase(id); - } - o_->oldObservers_->clear(); + newObservers_->clear(); + } + if (newSubscribers_) { + for (auto& kv : *(newSubscribers_)) { + subscribers_.insert(std::move(kv)); } + newSubscribers_->clear(); + } + if (oldSubscribers_) { + for (auto id : *(oldSubscribers_)) { + subscribers_.erase(id); + } + oldSubscribers_->clear(); } } - *o_->inCallback_ = false; } - - private: - Observable* o_; - }; + *inCallback_ = false; + } private: + Subscription subscribeImpl(ObserverPtr observer, bool indefinite) { + auto subscription = makeSubscription(indefinite); + typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)}; + if (inCallback_ && *inCallback_) { + if (!newSubscribers_) { + newSubscribers_.reset(new SubscriberMap()); + } + newSubscribers_->insert(std::move(kv)); + } else { + RWSpinLock::WriteHolder{&observersLock_}; + subscribers_.insert(std::move(kv)); + } + return subscription; + } + class Unsubscriber { public: explicit Unsubscriber(Observable* observable) : observable_(observable) { @@ -200,39 +236,49 @@ class Observable { void unsubscribe(uint64_t id) { if (inCallback_ && *inCallback_) { - if (!oldObservers_) { - oldObservers_.reset(new std::vector()); + if (!oldSubscribers_) { + oldSubscribers_.reset(new std::vector()); } - if (newObservers_) { - auto it = newObservers_->find(id); - if (it != newObservers_->end()) { - newObservers_->erase(it); + if (newSubscribers_) { + auto it = newSubscribers_->find(id); + if (it != newSubscribers_->end()) { + newSubscribers_->erase(it); return; } } - oldObservers_->push_back(id); + oldSubscribers_->push_back(id); } else { RWSpinLock::WriteHolder{&observersLock_}; - observers_.erase(id); + subscribers_.erase(id); } } - Subscription makeSubscription() { - if (!unsubscriber_) { - std::lock_guard guard(unsubscriberLock_); + Subscription makeSubscription(bool indefinite) { + if (indefinite) { + return Subscription(nullptr, nextSubscriptionId_++); + } else { if (!unsubscriber_) { - unsubscriber_ = std::make_shared(this); + std::lock_guard guard(unsubscriberLock_); + if (!unsubscriber_) { + unsubscriber_ = std::make_shared(this); + } } + return Subscription(unsubscriber_, nextSubscriptionId_++); } - return Subscription(unsubscriber_, nextSubscriptionId_++); } std::atomic nextSubscriptionId_; - ObserverMap observers_; RWSpinLock observersLock_; folly::ThreadLocalPtr inCallback_; - folly::ThreadLocalPtr newObservers_; - folly::ThreadLocalPtr> oldObservers_; + + typedef folly::small_vector*, InlineObservers> ObserverList; + ObserverList observers_; + folly::ThreadLocalPtr newObservers_; + + typedef std::map> SubscriberMap; + SubscriberMap subscribers_; + folly::ThreadLocalPtr newSubscribers_; + folly::ThreadLocalPtr> oldSubscribers_; }; }} diff --git a/folly/experimental/wangle/rx/Subject.h b/folly/experimental/wangle/rx/Subject.h index 0cb357fe..6ff04c0e 100644 --- a/folly/experimental/wangle/rx/Subject.h +++ b/folly/experimental/wangle/rx/Subject.h @@ -26,24 +26,20 @@ namespace folly { namespace wangle { /// observed events to the Subject's observers. template struct Subject : public Observable, public Observer { - typedef typename Observable::ObserversGuard ObserversGuard; void onNext(const T& val) override { - ObserversGuard guard(this); - for (auto& kv : Observable::getObservers()) { - kv.second->onNext(val); - } + this->forEachObserver([&](Observer* o){ + o->onNext(val); + }); } void onError(Error e) override { - ObserversGuard guard(this); - for (auto& kv : Observable::getObservers()) { - kv.second->onError(e); - } + this->forEachObserver([&](Observer* o){ + o->onError(e); + }); } void onCompleted() override { - ObserversGuard guard(this); - for (auto& kv : Observable::getObservers()) { - kv.second->onCompleted(); - } + this->forEachObserver([](Observer* o){ + o->onCompleted(); + }); } }; diff --git a/folly/experimental/wangle/rx/Subscription.h b/folly/experimental/wangle/rx/Subscription.h index 0cf667e6..7c058e23 100644 --- a/folly/experimental/wangle/rx/Subscription.h +++ b/folly/experimental/wangle/rx/Subscription.h @@ -49,7 +49,6 @@ class Subscription { Subscription(std::shared_ptr unsubscriber, uint64_t id) : unsubscriber_(std::move(unsubscriber)), id_(id) { - CHECK(unsubscriber_); CHECK(id_ > 0); } diff --git a/folly/experimental/wangle/rx/test/RxBenchmark.cpp b/folly/experimental/wangle/rx/test/RxBenchmark.cpp new file mode 100644 index 00000000..5a14bbc6 --- /dev/null +++ b/folly/experimental/wangle/rx/test/RxBenchmark.cpp @@ -0,0 +1,155 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +using namespace folly::wangle; +using folly::BenchmarkSuspender; + +static std::unique_ptr> makeObserver() { + return Observer::create([&] (int x) {}); +} + +void subscribeImpl(uint iters, int N, bool countUnsubscribe) { + for (int iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector>> observers; + std::vector> subscriptions; + subscriptions.reserve(N); + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver()); + } + bs.dismiss(); + for (int i = 0; i < N; i++) { + subscriptions.push_back(subject.subscribe(std::move(observers[i]))); + } + if (countUnsubscribe) { + subscriptions.clear(); + } + bs.rehire(); + } +} + +void subscribeAndUnsubscribe(uint iters, int N) { + subscribeImpl(iters, N, true); +} + +void subscribe(uint iters, int N) { + subscribeImpl(iters, N, false); +} + +void observe(uint iters, int N) { + for (int iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector>> observers; + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver()); + } + bs.dismiss(); + for (int i = 0; i < N; i++) { + subject.observe(std::move(observers[i])); + } + bs.rehire(); + } +} + +void inlineObserve(uint iters, int N) { + for (int iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector*> observers; + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver().release()); + } + bs.dismiss(); + for (int i = 0; i < N; i++) { + subject.observe(observers[i]); + } + bs.rehire(); + for (int i = 0; i < N; i++) { + delete observers[i]; + } + } +} + +void notifySubscribers(uint iters, int N) { + for (int iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector>> observers; + std::vector> subscriptions; + subscriptions.reserve(N); + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver()); + } + for (int i = 0; i < N; i++) { + subscriptions.push_back(subject.subscribe(std::move(observers[i]))); + } + bs.dismiss(); + subject.onNext(42); + bs.rehire(); + } +} + +void notifyInlineObservers(uint iters, int N) { + for (int iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector*> observers; + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver().release()); + } + for (int i = 0; i < N; i++) { + subject.observe(observers[i]); + } + bs.dismiss(); + subject.onNext(42); + bs.rehire(); + } +} + +BENCHMARK_PARAM(subscribeAndUnsubscribe, 1); +BENCHMARK_RELATIVE_PARAM(subscribe, 1); +BENCHMARK_RELATIVE_PARAM(observe, 1); +BENCHMARK_RELATIVE_PARAM(inlineObserve, 1); + +BENCHMARK_DRAW_LINE(); + +BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000); +BENCHMARK_RELATIVE_PARAM(subscribe, 1000); +BENCHMARK_RELATIVE_PARAM(observe, 1000); +BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000); + +BENCHMARK_DRAW_LINE(); + +BENCHMARK_PARAM(notifySubscribers, 1); +BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1); + +BENCHMARK_DRAW_LINE(); + +BENCHMARK_PARAM(notifySubscribers, 1000); +BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000); + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + folly::runBenchmarks(); + return 0; +} diff --git a/folly/experimental/wangle/rx/test/RxTest.cpp b/folly/experimental/wangle/rx/test/RxTest.cpp index ab44657a..8cf2605d 100644 --- a/folly/experimental/wangle/rx/test/RxTest.cpp +++ b/folly/experimental/wangle/rx/test/RxTest.cpp @@ -26,6 +26,24 @@ static std::unique_ptr> incrementer(int& counter) { }); } +TEST(RxTest, Observe) { + Subject subject; + auto count = 0; + subject.observe(incrementer(count)); + subject.onNext(1); + EXPECT_EQ(1, count); +} + +TEST(RxTest, ObserveInline) { + Subject subject; + auto count = 0; + auto o = incrementer(count).release(); + subject.observe(o); + subject.onNext(1); + EXPECT_EQ(1, count); + delete o; +} + TEST(RxTest, Subscription) { Subject subject; auto count = 0; @@ -75,6 +93,36 @@ TEST(RxTest, SubscribeDuringCallback) { EXPECT_EQ(1, innerCount); } +TEST(RxTest, ObserveDuringCallback) { + Subject subject; + int outerCount = 0, innerCount = 0; + subject.observe(Observer::create([&] (int x) { + outerCount++; + subject.observe(incrementer(innerCount)); + })); + subject.onNext(42); + subject.onNext(0xDEADBEEF); + EXPECT_EQ(2, outerCount); + EXPECT_EQ(1, innerCount); +} + +TEST(RxTest, ObserveInlineDuringCallback) { + Subject subject; + int outerCount = 0, innerCount = 0; + auto innerO = incrementer(innerCount).release(); + auto outerO = Observer::create([&] (int x) { + outerCount++; + subject.observe(innerO); + }).release(); + subject.observe(outerO); + subject.onNext(42); + subject.onNext(0xDEADBEEF); + EXPECT_EQ(2, outerCount); + EXPECT_EQ(1, innerCount); + delete innerO; + delete outerO; +} + TEST(RxTest, UnsubscribeDuringCallback) { // A subscriber who was unsubscribed in the course of a callback should get // the current update but not subsequent ones diff --git a/folly/experimental/wangle/rx/types.h b/folly/experimental/wangle/rx/types.h index 317fac14..0f10c1cb 100644 --- a/folly/experimental/wangle/rx/types.h +++ b/folly/experimental/wangle/rx/types.h @@ -25,7 +25,7 @@ namespace folly { namespace wangle { // alias it. typedef std::shared_ptr SchedulerPtr; - template struct Observable; + template struct Observable; template struct Observer; template struct Subject; -- 2.34.1