From: Hans Fugal Date: Wed, 1 Jul 2015 17:24:18 +0000 (-0700) Subject: make it really clear that wangle/rx is deprecated X-Git-Tag: v0.49.0~18 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=668192ca7476ebc5a858056ca9598a3fc2eb2240;p=folly.git make it really clear that wangle/rx is deprecated Summary: More people are starting to use Rx. Yay! But they're trying to use `wangle/rx`. So make sure there's no doubt about its deprecated status. Reviewed By: @yfeldblum Differential Revision: D2196273 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index c8e14e61..d85c052d 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -308,11 +308,11 @@ nobase_follyinclude_HEADERS = \ wangle/concurrent/NamedThreadFactory.h \ wangle/concurrent/ThreadFactory.h \ wangle/concurrent/ThreadPoolExecutor.h \ - wangle/rx/Observable.h \ - wangle/rx/Observer.h \ - wangle/rx/Subject.h \ - wangle/rx/Subscription.h \ - wangle/rx/types.h \ + wangle/deprecated/rx/Observable.h \ + wangle/deprecated/rx/Observer.h \ + wangle/deprecated/rx/Subject.h \ + wangle/deprecated/rx/Subscription.h \ + wangle/deprecated/rx/types.h \ wangle/ssl/ClientHelloExtStats.h \ wangle/ssl/DHParam.h \ wangle/ssl/PasswordInFile.h \ diff --git a/folly/wangle/concurrent/ThreadPoolExecutor.h b/folly/wangle/concurrent/ThreadPoolExecutor.h index bb77d39c..70a5375f 100644 --- a/folly/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/wangle/concurrent/ThreadPoolExecutor.h @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/folly/wangle/deprecated/rx/Dummy.cpp b/folly/wangle/deprecated/rx/Dummy.cpp new file mode 100644 index 00000000..ec999ca4 --- /dev/null +++ b/folly/wangle/deprecated/rx/Dummy.cpp @@ -0,0 +1,19 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// fbbuild is too dumb to know that .h files in the directory affect +// our project, unless we have a .cpp file in the target, in the same +// directory. diff --git a/folly/wangle/deprecated/rx/Observable.h b/folly/wangle/deprecated/rx/Observable.h new file mode 100644 index 00000000..bae27a0b --- /dev/null +++ b/folly/wangle/deprecated/rx/Observable.h @@ -0,0 +1,285 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // must come first +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace folly { namespace wangle { + +template +class Observable { + public: + Observable() : nextSubscriptionId_{1} {} + + // TODO perhaps we want to provide this #5283229 + Observable(Observable&& other) = delete; + + virtual ~Observable() { + if (unsubscriber_) { + unsubscriber_->disable(); + } + } + + // The next three methods subscribe the given Observer to this Observable. + // + // If these are called within an Observer callback, the new observer will not + // get the current update but will get subsequent updates. + // + // subscribe() returns a Subscription object. The observer will continue to + // get updates until the Subscription is destroyed. + // + // observe(ObserverPtr) creates an indefinite subscription + // + // observe(Observer*) also creates an indefinite subscription, but the + // caller is responsible for ensuring that the given Observer outlives this + // Observable. This might be useful in high performance environments where + // allocations must be kept to a minimum. Template parameter InlineObservers + // specifies how many observers can been subscribed inline without any + // allocations (it's just the size of a folly::small_vector). + virtual Subscription subscribe(ObserverPtr observer) { + return subscribeImpl(observer, false); + } + + virtual void observe(ObserverPtr observer) { + subscribeImpl(observer, true); + } + + virtual void observe(Observer* observer) { + if (inCallback_ && *inCallback_) { + if (!newObservers_) { + newObservers_.reset(new ObserverList()); + } + newObservers_->push_back(observer); + } else { + RWSpinLock::WriteHolder{&observersLock_}; + observers_.push_back(observer); + } + } + + // TODO unobserve(ObserverPtr), unobserve(Observer*) + + /// Returns a new Observable that will call back on the given Scheduler. + /// The returned Observable must outlive the parent Observable. + + // This and subscribeOn should maybe just be a first-class feature of an + // Observable, rather than making new ones whose lifetimes are tied to their + // parents. In that case it'd return a reference to this object for + // chaining. + ObservablePtr observeOn(SchedulerPtr scheduler) { + // you're right Hannes, if we have Observable::create we don't need this + // helper class. + struct ViaSubject : public Observable + { + ViaSubject(SchedulerPtr sched, + Observable* obs) + : scheduler_(sched), observable_(obs) + {} + + Subscription subscribe(ObserverPtr o) override { + return observable_->subscribe( + Observer::create( + [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); }, + [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); }, + [=]() { scheduler_->add([o] { o->onCompleted(); }); })); + } + + protected: + SchedulerPtr scheduler_; + Observable* observable_; + }; + + return std::make_shared(scheduler, this); + } + + /// Returns a new Observable that will subscribe to this parent Observable + /// via the given Scheduler. This can be subtle and confusing at first, see + /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn + std::unique_ptr subscribeOn(SchedulerPtr scheduler) { + struct Subject_ : public Subject { + public: + Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) { + } + + Subscription subscribe(ObserverPtr o) { + scheduler_->add([=] { + observable_->subscribe(o); + }); + return Subscription(nullptr, 0); // TODO + } + + protected: + SchedulerPtr scheduler_; + Observable* observable_; + }; + + return folly::make_unique(scheduler, this); + } + + protected: + // Safely execute an operation on each observer. F must take a single + // Observer* as its argument. + template + void forEachObserver(F f) { + if (UNLIKELY(!inCallback_)) { + inCallback_.reset(new bool{false}); + } + CHECK(!(*inCallback_)); + *inCallback_ = true; + + { + RWSpinLock::ReadHolder rh(observersLock_); + for (auto o : observers_) { + f(o); + } + + for (auto& kv : subscribers_) { + f(kv.second.get()); + } + } + + if (UNLIKELY((newObservers_ && !newObservers_->empty()) || + (newSubscribers_ && !newSubscribers_->empty()) || + (oldSubscribers_ && !oldSubscribers_->empty()))) { + { + RWSpinLock::WriteHolder wh(observersLock_); + if (newObservers_) { + for (auto observer : *(newObservers_)) { + observers_.push_back(observer); + } + newObservers_->clear(); + } + if (newSubscribers_) { + for (auto& kv : *(newSubscribers_)) { + subscribers_.insert(std::move(kv)); + } + newSubscribers_->clear(); + } + if (oldSubscribers_) { + for (auto id : *(oldSubscribers_)) { + subscribers_.erase(id); + } + oldSubscribers_->clear(); + } + } + } + *inCallback_ = false; + } + + private: + Subscription subscribeImpl(ObserverPtr observer, bool indefinite) { + auto subscription = makeSubscription(indefinite); + typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)}; + if (inCallback_ && *inCallback_) { + if (!newSubscribers_) { + newSubscribers_.reset(new SubscriberMap()); + } + newSubscribers_->insert(std::move(kv)); + } else { + RWSpinLock::WriteHolder{&observersLock_}; + subscribers_.insert(std::move(kv)); + } + return subscription; + } + + class Unsubscriber { + public: + explicit Unsubscriber(Observable* observable) : observable_(observable) { + CHECK(observable_); + } + + void unsubscribe(uint64_t id) { + CHECK(id > 0); + RWSpinLock::ReadHolder guard(lock_); + if (observable_) { + observable_->unsubscribe(id); + } + } + + void disable() { + RWSpinLock::WriteHolder guard(lock_); + observable_ = nullptr; + } + + private: + RWSpinLock lock_; + Observable* observable_; + }; + + std::shared_ptr unsubscriber_{nullptr}; + MicroSpinLock unsubscriberLock_{0}; + + friend class Subscription; + + void unsubscribe(uint64_t id) { + if (inCallback_ && *inCallback_) { + if (!oldSubscribers_) { + oldSubscribers_.reset(new std::vector()); + } + if (newSubscribers_) { + auto it = newSubscribers_->find(id); + if (it != newSubscribers_->end()) { + newSubscribers_->erase(it); + return; + } + } + oldSubscribers_->push_back(id); + } else { + RWSpinLock::WriteHolder{&observersLock_}; + subscribers_.erase(id); + } + } + + Subscription makeSubscription(bool indefinite) { + if (indefinite) { + return Subscription(nullptr, nextSubscriptionId_++); + } else { + if (!unsubscriber_) { + std::lock_guard guard(unsubscriberLock_); + if (!unsubscriber_) { + unsubscriber_ = std::make_shared(this); + } + } + return Subscription(unsubscriber_, nextSubscriptionId_++); + } + } + + std::atomic nextSubscriptionId_; + RWSpinLock observersLock_; + folly::ThreadLocalPtr inCallback_; + + typedef folly::small_vector*, InlineObservers> ObserverList; + ObserverList observers_; + folly::ThreadLocalPtr newObservers_; + + typedef std::map> SubscriberMap; + SubscriberMap subscribers_; + folly::ThreadLocalPtr newSubscribers_; + folly::ThreadLocalPtr> oldSubscribers_; +}; + +}} diff --git a/folly/wangle/deprecated/rx/Observer.h b/folly/wangle/deprecated/rx/Observer.h new file mode 100644 index 00000000..b671a7e4 --- /dev/null +++ b/folly/wangle/deprecated/rx/Observer.h @@ -0,0 +1,113 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // must come first +#include +#include +#include +#include + +namespace folly { namespace wangle { + +template struct FunctionObserver; + +/// Observer interface. You can subclass it, or you can just use create() +/// to use std::functions. +template +struct Observer { + // These are what it means to be an Observer. + virtual void onNext(const T&) = 0; + virtual void onError(Error) = 0; + virtual void onCompleted() = 0; + + virtual ~Observer() = default; + + /// Create an Observer with std::function callbacks. Handy to make ad-hoc + /// Observers with lambdas. + /// + /// Templated for maximum perfect forwarding flexibility, but ultimately + /// whatever you pass in has to implicitly become a std::function for the + /// same signature as onNext(), onError(), and onCompleted() respectively. + /// (see the FunctionObserver typedefs) + template + static std::unique_ptr create( + N&& onNextFn, E&& onErrorFn, C&& onCompletedFn) + { + return folly::make_unique>( + std::forward(onNextFn), + std::forward(onErrorFn), + std::forward(onCompletedFn)); + } + + /// Create an Observer with only onNext and onError callbacks. + /// onCompleted will just be a no-op. + template + static std::unique_ptr create(N&& onNextFn, E&& onErrorFn) { + return folly::make_unique>( + std::forward(onNextFn), + std::forward(onErrorFn), + nullptr); + } + + /// Create an Observer with only an onNext callback. + /// onError and onCompleted will just be no-ops. + template + static std::unique_ptr create(N&& onNextFn) { + return folly::make_unique>( + std::forward(onNextFn), + nullptr, + nullptr); + } +}; + +/// An observer that uses std::function callbacks. You don't really want to +/// make one of these directly - instead use the Observer::create() methods. +template +struct FunctionObserver : public Observer { + typedef std::function OnNext; + typedef std::function OnError; + typedef std::function OnCompleted; + + /// We don't need any fancy overloads of this constructor because that's + /// what Observer::create() is for. + template + FunctionObserver(N&& n, E&& e, C&& c) + : onNext_(std::forward(n)), + onError_(std::forward(e)), + onCompleted_(std::forward(c)) + {} + + void onNext(const T& val) override { + if (onNext_) onNext_(val); + } + + void onError(Error e) override { + if (onError_) onError_(e); + } + + void onCompleted() override { + if (onCompleted_) onCompleted_(); + } + + protected: + OnNext onNext_; + OnError onError_; + OnCompleted onCompleted_; +}; + +}} diff --git a/folly/wangle/deprecated/rx/README.md b/folly/wangle/deprecated/rx/README.md new file mode 100644 index 00000000..ae9614f9 --- /dev/null +++ b/folly/wangle/deprecated/rx/README.md @@ -0,0 +1,37 @@ +Rx is a pattern for "functional reactive programming" that started at +Microsoft in C#, and has been reimplemented in various languages, notably +RxJava for JVM languages. + +It is basically the plural of Futures (a la Wangle). + +``` + singular | plural + +---------------------------------+----------------------------------- + sync | Foo getData() | std::vector getData() + async | wangle::Future getData() | wangle::Observable getData() +``` + +For more on Rx, I recommend these resources: + +Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html +Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html +The RxJava wiki: https://github.com/Netflix/RxJava/wiki +Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx +https://rx.codeplex.com/ + +I haven't even tried to support move-only data in this version. I'm on the +fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich +set of operations is obviously missing. I haven't decided how to handle +subscriptions (and therefore cancellation), but I'm pretty sure C#'s +"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe +returns nothing at all and you can't cancel anything ever. The whole thing is +probably riddled with lifetime corner case bugs that will come out like a +swarm of angry bees as soon as someone tries an infinite sequence, or tries to +partially observe a long sequence. I'm pretty sure subscribeOn has a bug that +I haven't tracked down yet. + +DEPRECATED: +This was an experimental exploration. There are better, more robust, and (most +importantly) supported C++ implementations, notably +[rxcpp](https://rxcpp.codeplex.com/). Use that instead. You really shouldn't +use this one. It's unsupported and incomplete. Honest. diff --git a/folly/wangle/deprecated/rx/Subject.h b/folly/wangle/deprecated/rx/Subject.h new file mode 100644 index 00000000..08b79012 --- /dev/null +++ b/folly/wangle/deprecated/rx/Subject.h @@ -0,0 +1,47 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // must come first +#include +#include + +namespace folly { namespace wangle { + +/// Subject interface. A Subject is both an Observable and an Observer. There +/// is a default implementation of the Observer methods that just forwards the +/// observed events to the Subject's observers. +template +struct Subject : public Observable, public Observer { + void onNext(const T& val) override { + this->forEachObserver([&](Observer* o){ + o->onNext(val); + }); + } + void onError(Error e) override { + this->forEachObserver([&](Observer* o){ + o->onError(e); + }); + } + void onCompleted() override { + this->forEachObserver([](Observer* o){ + o->onCompleted(); + }); + } +}; + +}} diff --git a/folly/wangle/deprecated/rx/Subscription.h b/folly/wangle/deprecated/rx/Subscription.h new file mode 100644 index 00000000..de8e2354 --- /dev/null +++ b/folly/wangle/deprecated/rx/Subscription.h @@ -0,0 +1,70 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // must come first +#include + +namespace folly { namespace wangle { + +template +class Subscription { + public: + Subscription() = default; + + Subscription(const Subscription&) = delete; + + Subscription(Subscription&& other) noexcept { + *this = std::move(other); + } + + Subscription& operator=(Subscription&& other) noexcept { + unsubscribe(); + unsubscriber_ = std::move(other.unsubscriber_); + id_ = other.id_; + other.unsubscriber_ = nullptr; + other.id_ = 0; + return *this; + } + + ~Subscription() { + unsubscribe(); + } + + private: + typedef typename Observable::Unsubscriber Unsubscriber; + + Subscription(std::shared_ptr unsubscriber, uint64_t id) + : unsubscriber_(std::move(unsubscriber)), id_(id) { + CHECK(id_ > 0); + } + + void unsubscribe() { + if (unsubscriber_ && id_ > 0) { + unsubscriber_->unsubscribe(id_); + id_ = 0; + unsubscriber_ = nullptr; + } + } + + std::shared_ptr unsubscriber_; + uint64_t id_{0}; + + friend class Observable; +}; + +}} diff --git a/folly/wangle/deprecated/rx/test/RxBenchmark.cpp b/folly/wangle/deprecated/rx/test/RxBenchmark.cpp new file mode 100644 index 00000000..88e9e1b2 --- /dev/null +++ b/folly/wangle/deprecated/rx/test/RxBenchmark.cpp @@ -0,0 +1,155 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +using namespace folly::wangle; +using folly::BenchmarkSuspender; + +static std::unique_ptr> makeObserver() { + return Observer::create([&] (int x) {}); +} + +void subscribeImpl(uint iters, int N, bool countUnsubscribe) { + for (uint iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector>> observers; + std::vector> subscriptions; + subscriptions.reserve(N); + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver()); + } + bs.dismiss(); + for (int i = 0; i < N; i++) { + subscriptions.push_back(subject.subscribe(std::move(observers[i]))); + } + if (countUnsubscribe) { + subscriptions.clear(); + } + bs.rehire(); + } +} + +void subscribeAndUnsubscribe(uint iters, int N) { + subscribeImpl(iters, N, true); +} + +void subscribe(uint iters, int N) { + subscribeImpl(iters, N, false); +} + +void observe(uint iters, int N) { + for (uint iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector>> observers; + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver()); + } + bs.dismiss(); + for (int i = 0; i < N; i++) { + subject.observe(std::move(observers[i])); + } + bs.rehire(); + } +} + +void inlineObserve(uint iters, int N) { + for (uint iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector*> observers; + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver().release()); + } + bs.dismiss(); + for (int i = 0; i < N; i++) { + subject.observe(observers[i]); + } + bs.rehire(); + for (int i = 0; i < N; i++) { + delete observers[i]; + } + } +} + +void notifySubscribers(uint iters, int N) { + for (uint iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector>> observers; + std::vector> subscriptions; + subscriptions.reserve(N); + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver()); + } + for (int i = 0; i < N; i++) { + subscriptions.push_back(subject.subscribe(std::move(observers[i]))); + } + bs.dismiss(); + subject.onNext(42); + bs.rehire(); + } +} + +void notifyInlineObservers(uint iters, int N) { + for (uint iter = 0; iter < iters; iter++) { + BenchmarkSuspender bs; + Subject subject; + std::vector*> observers; + for (int i = 0; i < N; i++) { + observers.push_back(makeObserver().release()); + } + for (int i = 0; i < N; i++) { + subject.observe(observers[i]); + } + bs.dismiss(); + subject.onNext(42); + bs.rehire(); + } +} + +BENCHMARK_PARAM(subscribeAndUnsubscribe, 1); +BENCHMARK_RELATIVE_PARAM(subscribe, 1); +BENCHMARK_RELATIVE_PARAM(observe, 1); +BENCHMARK_RELATIVE_PARAM(inlineObserve, 1); + +BENCHMARK_DRAW_LINE(); + +BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000); +BENCHMARK_RELATIVE_PARAM(subscribe, 1000); +BENCHMARK_RELATIVE_PARAM(observe, 1000); +BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000); + +BENCHMARK_DRAW_LINE(); + +BENCHMARK_PARAM(notifySubscribers, 1); +BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1); + +BENCHMARK_DRAW_LINE(); + +BENCHMARK_PARAM(notifySubscribers, 1000); +BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000); + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + folly::runBenchmarks(); + return 0; +} diff --git a/folly/wangle/deprecated/rx/test/RxTest.cpp b/folly/wangle/deprecated/rx/test/RxTest.cpp new file mode 100644 index 00000000..e75ddde7 --- /dev/null +++ b/folly/wangle/deprecated/rx/test/RxTest.cpp @@ -0,0 +1,195 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +using namespace folly::wangle; + +static std::unique_ptr> incrementer(int& counter) { + return Observer::create([&] (int x) { + counter++; + }); +} + +TEST(RxTest, Observe) { + Subject subject; + auto count = 0; + subject.observe(incrementer(count)); + subject.onNext(1); + EXPECT_EQ(1, count); +} + +TEST(RxTest, ObserveInline) { + Subject subject; + auto count = 0; + auto o = incrementer(count).release(); + subject.observe(o); + subject.onNext(1); + EXPECT_EQ(1, count); + delete o; +} + +TEST(RxTest, Subscription) { + Subject subject; + auto count = 0; + { + auto s = subject.subscribe(incrementer(count)); + subject.onNext(1); + } + // The subscription has gone out of scope so no one should get this. + subject.onNext(2); + EXPECT_EQ(1, count); +} + +TEST(RxTest, SubscriptionMove) { + Subject subject; + auto count = 0; + auto s = subject.subscribe(incrementer(count)); + auto s2 = subject.subscribe(incrementer(count)); + s2 = std::move(s); + subject.onNext(1); + Subscription s3(std::move(s2)); + subject.onNext(2); + EXPECT_EQ(2, count); +} + +TEST(RxTest, SubscriptionOutlivesSubject) { + Subscription s; + { + Subject subject; + s = subject.subscribe(Observer::create([](int){})); + } + // Don't explode when s is destroyed +} + +TEST(RxTest, SubscribeDuringCallback) { + // A subscriber who was subscribed in the course of a callback should get + // subsequent updates but not the current update. + Subject subject; + int outerCount = 0, innerCount = 0; + Subscription s1, s2; + s1 = subject.subscribe(Observer::create([&] (int x) { + outerCount++; + s2 = subject.subscribe(incrementer(innerCount)); + })); + subject.onNext(42); + subject.onNext(0xDEADBEEF); + EXPECT_EQ(2, outerCount); + EXPECT_EQ(1, innerCount); +} + +TEST(RxTest, ObserveDuringCallback) { + Subject subject; + int outerCount = 0, innerCount = 0; + subject.observe(Observer::create([&] (int x) { + outerCount++; + subject.observe(incrementer(innerCount)); + })); + subject.onNext(42); + subject.onNext(0xDEADBEEF); + EXPECT_EQ(2, outerCount); + EXPECT_EQ(1, innerCount); +} + +TEST(RxTest, ObserveInlineDuringCallback) { + Subject subject; + int outerCount = 0, innerCount = 0; + auto innerO = incrementer(innerCount).release(); + auto outerO = Observer::create([&] (int x) { + outerCount++; + subject.observe(innerO); + }).release(); + subject.observe(outerO); + subject.onNext(42); + subject.onNext(0xDEADBEEF); + EXPECT_EQ(2, outerCount); + EXPECT_EQ(1, innerCount); + delete innerO; + delete outerO; +} + +TEST(RxTest, UnsubscribeDuringCallback) { + // A subscriber who was unsubscribed in the course of a callback should get + // the current update but not subsequent ones + Subject subject; + int count1 = 0, count2 = 0; + auto s1 = subject.subscribe(incrementer(count1)); + auto s2 = subject.subscribe(Observer::create([&] (int x) { + count2++; + s1.~Subscription(); + })); + subject.onNext(1); + subject.onNext(2); + EXPECT_EQ(1, count1); + EXPECT_EQ(2, count2); +} + +TEST(RxTest, SubscribeUnsubscribeDuringCallback) { + // A subscriber who was subscribed and unsubscribed in the course of a + // callback should not get any updates + Subject subject; + int outerCount = 0, innerCount = 0; + auto s2 = subject.subscribe(Observer::create([&] (int x) { + outerCount++; + auto s2 = subject.subscribe(incrementer(innerCount)); + })); + subject.onNext(1); + subject.onNext(2); + EXPECT_EQ(2, outerCount); + EXPECT_EQ(0, innerCount); +} + +// Move only type +typedef std::unique_ptr MO; +static MO makeMO() { return folly::make_unique(1); } +template +static ObserverPtr makeMOObserver() { + return Observer::create([](const T& mo) { + EXPECT_EQ(1, *mo); + }); +} + +TEST(RxTest, MoveOnlyRvalue) { + Subject subject; + auto s1 = subject.subscribe(makeMOObserver()); + auto s2 = subject.subscribe(makeMOObserver()); + auto mo = makeMO(); + // Can't bind lvalues to rvalue references + // subject.onNext(mo); + subject.onNext(std::move(mo)); + subject.onNext(makeMO()); +} + +// Copy only type +struct CO { + CO() = default; + CO(const CO&) = default; + CO(CO&&) = delete; +}; + +template +static ObserverPtr makeCOObserver() { + return Observer::create([](const T& mo) {}); +} + +TEST(RxTest, CopyOnly) { + Subject subject; + auto s1 = subject.subscribe(makeCOObserver()); + CO co; + subject.onNext(co); +} diff --git a/folly/wangle/deprecated/rx/types.h b/folly/wangle/deprecated/rx/types.h new file mode 100644 index 00000000..3bb540e6 --- /dev/null +++ b/folly/wangle/deprecated/rx/types.h @@ -0,0 +1,35 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace folly { namespace wangle { + typedef folly::exception_wrapper Error; + // The Executor is basically an rx Scheduler (by design). So just + // alias it. + typedef std::shared_ptr SchedulerPtr; + + template class Observable; + template struct Observer; + template struct Subject; + + template using ObservablePtr = std::shared_ptr>; + template using ObserverPtr = std::shared_ptr>; + template using SubjectPtr = std::shared_ptr>; +}} diff --git a/folly/wangle/rx/Dummy.cpp b/folly/wangle/rx/Dummy.cpp deleted file mode 100644 index ec999ca4..00000000 --- a/folly/wangle/rx/Dummy.cpp +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// fbbuild is too dumb to know that .h files in the directory affect -// our project, unless we have a .cpp file in the target, in the same -// directory. diff --git a/folly/wangle/rx/Observable.h b/folly/wangle/rx/Observable.h deleted file mode 100644 index 95b60bfc..00000000 --- a/folly/wangle/rx/Observable.h +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include // must come first -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace folly { namespace wangle { - -template -class Observable { - public: - Observable() : nextSubscriptionId_{1} {} - - // TODO perhaps we want to provide this #5283229 - Observable(Observable&& other) = delete; - - virtual ~Observable() { - if (unsubscriber_) { - unsubscriber_->disable(); - } - } - - // The next three methods subscribe the given Observer to this Observable. - // - // If these are called within an Observer callback, the new observer will not - // get the current update but will get subsequent updates. - // - // subscribe() returns a Subscription object. The observer will continue to - // get updates until the Subscription is destroyed. - // - // observe(ObserverPtr) creates an indefinite subscription - // - // observe(Observer*) also creates an indefinite subscription, but the - // caller is responsible for ensuring that the given Observer outlives this - // Observable. This might be useful in high performance environments where - // allocations must be kept to a minimum. Template parameter InlineObservers - // specifies how many observers can been subscribed inline without any - // allocations (it's just the size of a folly::small_vector). - virtual Subscription subscribe(ObserverPtr observer) { - return subscribeImpl(observer, false); - } - - virtual void observe(ObserverPtr observer) { - subscribeImpl(observer, true); - } - - virtual void observe(Observer* observer) { - if (inCallback_ && *inCallback_) { - if (!newObservers_) { - newObservers_.reset(new ObserverList()); - } - newObservers_->push_back(observer); - } else { - RWSpinLock::WriteHolder{&observersLock_}; - observers_.push_back(observer); - } - } - - // TODO unobserve(ObserverPtr), unobserve(Observer*) - - /// Returns a new Observable that will call back on the given Scheduler. - /// The returned Observable must outlive the parent Observable. - - // This and subscribeOn should maybe just be a first-class feature of an - // Observable, rather than making new ones whose lifetimes are tied to their - // parents. In that case it'd return a reference to this object for - // chaining. - ObservablePtr observeOn(SchedulerPtr scheduler) { - // you're right Hannes, if we have Observable::create we don't need this - // helper class. - struct ViaSubject : public Observable - { - ViaSubject(SchedulerPtr sched, - Observable* obs) - : scheduler_(sched), observable_(obs) - {} - - Subscription subscribe(ObserverPtr o) override { - return observable_->subscribe( - Observer::create( - [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); }, - [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); }, - [=]() { scheduler_->add([o] { o->onCompleted(); }); })); - } - - protected: - SchedulerPtr scheduler_; - Observable* observable_; - }; - - return std::make_shared(scheduler, this); - } - - /// Returns a new Observable that will subscribe to this parent Observable - /// via the given Scheduler. This can be subtle and confusing at first, see - /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn - std::unique_ptr subscribeOn(SchedulerPtr scheduler) { - struct Subject_ : public Subject { - public: - Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) { - } - - Subscription subscribe(ObserverPtr o) { - scheduler_->add([=] { - observable_->subscribe(o); - }); - return Subscription(nullptr, 0); // TODO - } - - protected: - SchedulerPtr scheduler_; - Observable* observable_; - }; - - return folly::make_unique(scheduler, this); - } - - protected: - // Safely execute an operation on each observer. F must take a single - // Observer* as its argument. - template - void forEachObserver(F f) { - if (UNLIKELY(!inCallback_)) { - inCallback_.reset(new bool{false}); - } - CHECK(!(*inCallback_)); - *inCallback_ = true; - - { - RWSpinLock::ReadHolder rh(observersLock_); - for (auto o : observers_) { - f(o); - } - - for (auto& kv : subscribers_) { - f(kv.second.get()); - } - } - - if (UNLIKELY((newObservers_ && !newObservers_->empty()) || - (newSubscribers_ && !newSubscribers_->empty()) || - (oldSubscribers_ && !oldSubscribers_->empty()))) { - { - RWSpinLock::WriteHolder wh(observersLock_); - if (newObservers_) { - for (auto observer : *(newObservers_)) { - observers_.push_back(observer); - } - newObservers_->clear(); - } - if (newSubscribers_) { - for (auto& kv : *(newSubscribers_)) { - subscribers_.insert(std::move(kv)); - } - newSubscribers_->clear(); - } - if (oldSubscribers_) { - for (auto id : *(oldSubscribers_)) { - subscribers_.erase(id); - } - oldSubscribers_->clear(); - } - } - } - *inCallback_ = false; - } - - private: - Subscription subscribeImpl(ObserverPtr observer, bool indefinite) { - auto subscription = makeSubscription(indefinite); - typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)}; - if (inCallback_ && *inCallback_) { - if (!newSubscribers_) { - newSubscribers_.reset(new SubscriberMap()); - } - newSubscribers_->insert(std::move(kv)); - } else { - RWSpinLock::WriteHolder{&observersLock_}; - subscribers_.insert(std::move(kv)); - } - return subscription; - } - - class Unsubscriber { - public: - explicit Unsubscriber(Observable* observable) : observable_(observable) { - CHECK(observable_); - } - - void unsubscribe(uint64_t id) { - CHECK(id > 0); - RWSpinLock::ReadHolder guard(lock_); - if (observable_) { - observable_->unsubscribe(id); - } - } - - void disable() { - RWSpinLock::WriteHolder guard(lock_); - observable_ = nullptr; - } - - private: - RWSpinLock lock_; - Observable* observable_; - }; - - std::shared_ptr unsubscriber_{nullptr}; - MicroSpinLock unsubscriberLock_{0}; - - friend class Subscription; - - void unsubscribe(uint64_t id) { - if (inCallback_ && *inCallback_) { - if (!oldSubscribers_) { - oldSubscribers_.reset(new std::vector()); - } - if (newSubscribers_) { - auto it = newSubscribers_->find(id); - if (it != newSubscribers_->end()) { - newSubscribers_->erase(it); - return; - } - } - oldSubscribers_->push_back(id); - } else { - RWSpinLock::WriteHolder{&observersLock_}; - subscribers_.erase(id); - } - } - - Subscription makeSubscription(bool indefinite) { - if (indefinite) { - return Subscription(nullptr, nextSubscriptionId_++); - } else { - if (!unsubscriber_) { - std::lock_guard guard(unsubscriberLock_); - if (!unsubscriber_) { - unsubscriber_ = std::make_shared(this); - } - } - return Subscription(unsubscriber_, nextSubscriptionId_++); - } - } - - std::atomic nextSubscriptionId_; - RWSpinLock observersLock_; - folly::ThreadLocalPtr inCallback_; - - typedef folly::small_vector*, InlineObservers> ObserverList; - ObserverList observers_; - folly::ThreadLocalPtr newObservers_; - - typedef std::map> SubscriberMap; - SubscriberMap subscribers_; - folly::ThreadLocalPtr newSubscribers_; - folly::ThreadLocalPtr> oldSubscribers_; -}; - -}} diff --git a/folly/wangle/rx/Observer.h b/folly/wangle/rx/Observer.h deleted file mode 100644 index 5797a0c1..00000000 --- a/folly/wangle/rx/Observer.h +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include // must come first -#include -#include -#include -#include - -namespace folly { namespace wangle { - -template struct FunctionObserver; - -/// Observer interface. You can subclass it, or you can just use create() -/// to use std::functions. -template -struct Observer { - // These are what it means to be an Observer. - virtual void onNext(const T&) = 0; - virtual void onError(Error) = 0; - virtual void onCompleted() = 0; - - virtual ~Observer() = default; - - /// Create an Observer with std::function callbacks. Handy to make ad-hoc - /// Observers with lambdas. - /// - /// Templated for maximum perfect forwarding flexibility, but ultimately - /// whatever you pass in has to implicitly become a std::function for the - /// same signature as onNext(), onError(), and onCompleted() respectively. - /// (see the FunctionObserver typedefs) - template - static std::unique_ptr create( - N&& onNextFn, E&& onErrorFn, C&& onCompletedFn) - { - return folly::make_unique>( - std::forward(onNextFn), - std::forward(onErrorFn), - std::forward(onCompletedFn)); - } - - /// Create an Observer with only onNext and onError callbacks. - /// onCompleted will just be a no-op. - template - static std::unique_ptr create(N&& onNextFn, E&& onErrorFn) { - return folly::make_unique>( - std::forward(onNextFn), - std::forward(onErrorFn), - nullptr); - } - - /// Create an Observer with only an onNext callback. - /// onError and onCompleted will just be no-ops. - template - static std::unique_ptr create(N&& onNextFn) { - return folly::make_unique>( - std::forward(onNextFn), - nullptr, - nullptr); - } -}; - -/// An observer that uses std::function callbacks. You don't really want to -/// make one of these directly - instead use the Observer::create() methods. -template -struct FunctionObserver : public Observer { - typedef std::function OnNext; - typedef std::function OnError; - typedef std::function OnCompleted; - - /// We don't need any fancy overloads of this constructor because that's - /// what Observer::create() is for. - template - FunctionObserver(N&& n, E&& e, C&& c) - : onNext_(std::forward(n)), - onError_(std::forward(e)), - onCompleted_(std::forward(c)) - {} - - void onNext(const T& val) override { - if (onNext_) onNext_(val); - } - - void onError(Error e) override { - if (onError_) onError_(e); - } - - void onCompleted() override { - if (onCompleted_) onCompleted_(); - } - - protected: - OnNext onNext_; - OnError onError_; - OnCompleted onCompleted_; -}; - -}} diff --git a/folly/wangle/rx/README.md b/folly/wangle/rx/README.md deleted file mode 100644 index 8c31ae00..00000000 --- a/folly/wangle/rx/README.md +++ /dev/null @@ -1,36 +0,0 @@ -Rx is a pattern for "functional reactive programming" that started at -Microsoft in C#, and has been reimplemented in various languages, notably -RxJava for JVM languages. - -It is basically the plural of Futures (a la Wangle). - -``` - singular | plural - +---------------------------------+----------------------------------- - sync | Foo getData() | std::vector getData() - async | wangle::Future getData() | wangle::Observable getData() -``` - -For more on Rx, I recommend these resources: - -- Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html -- Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html -- The RxJava wiki: https://github.com/Netflix/RxJava/wiki -- Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx -- https://rx.codeplex.com/ - -There are open source C++ implementations, I haven't looked at them. They -might be the best way to go rather than writing it NIH-style. I mostly did it -as an exercise, to think through how closely we might want to integrate -something like this with Wangle, and to get a feel for how it works in C++. - -I haven't even tried to support move-only data in this version. I'm on the -fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich -set of operations is obviously missing. I haven't decided how to handle -subscriptions (and therefore cancellation), but I'm pretty sure C#'s -"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe -returns nothing at all and you can't cancel anything ever. The whole thing is -probably riddled with lifetime corner case bugs that will come out like a -swarm of angry bees as soon as someone tries an infinite sequence, or tries to -partially observe a long sequence. I'm pretty sure subscribeOn has a bug that -I haven't tracked down yet. diff --git a/folly/wangle/rx/Subject.h b/folly/wangle/rx/Subject.h deleted file mode 100644 index c806d705..00000000 --- a/folly/wangle/rx/Subject.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include // must come first -#include -#include - -namespace folly { namespace wangle { - -/// Subject interface. A Subject is both an Observable and an Observer. There -/// is a default implementation of the Observer methods that just forwards the -/// observed events to the Subject's observers. -template -struct Subject : public Observable, public Observer { - void onNext(const T& val) override { - this->forEachObserver([&](Observer* o){ - o->onNext(val); - }); - } - void onError(Error e) override { - this->forEachObserver([&](Observer* o){ - o->onError(e); - }); - } - void onCompleted() override { - this->forEachObserver([](Observer* o){ - o->onCompleted(); - }); - } -}; - -}} diff --git a/folly/wangle/rx/Subscription.h b/folly/wangle/rx/Subscription.h deleted file mode 100644 index 09af8c9d..00000000 --- a/folly/wangle/rx/Subscription.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include // must come first -#include - -namespace folly { namespace wangle { - -template -class Subscription { - public: - Subscription() = default; - - Subscription(const Subscription&) = delete; - - Subscription(Subscription&& other) noexcept { - *this = std::move(other); - } - - Subscription& operator=(Subscription&& other) noexcept { - unsubscribe(); - unsubscriber_ = std::move(other.unsubscriber_); - id_ = other.id_; - other.unsubscriber_ = nullptr; - other.id_ = 0; - return *this; - } - - ~Subscription() { - unsubscribe(); - } - - private: - typedef typename Observable::Unsubscriber Unsubscriber; - - Subscription(std::shared_ptr unsubscriber, uint64_t id) - : unsubscriber_(std::move(unsubscriber)), id_(id) { - CHECK(id_ > 0); - } - - void unsubscribe() { - if (unsubscriber_ && id_ > 0) { - unsubscriber_->unsubscribe(id_); - id_ = 0; - unsubscriber_ = nullptr; - } - } - - std::shared_ptr unsubscriber_; - uint64_t id_{0}; - - friend class Observable; -}; - -}} diff --git a/folly/wangle/rx/test/RxBenchmark.cpp b/folly/wangle/rx/test/RxBenchmark.cpp deleted file mode 100644 index 4e174942..00000000 --- a/folly/wangle/rx/test/RxBenchmark.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -using namespace folly::wangle; -using folly::BenchmarkSuspender; - -static std::unique_ptr> makeObserver() { - return Observer::create([&] (int x) {}); -} - -void subscribeImpl(uint iters, int N, bool countUnsubscribe) { - for (uint iter = 0; iter < iters; iter++) { - BenchmarkSuspender bs; - Subject subject; - std::vector>> observers; - std::vector> subscriptions; - subscriptions.reserve(N); - for (int i = 0; i < N; i++) { - observers.push_back(makeObserver()); - } - bs.dismiss(); - for (int i = 0; i < N; i++) { - subscriptions.push_back(subject.subscribe(std::move(observers[i]))); - } - if (countUnsubscribe) { - subscriptions.clear(); - } - bs.rehire(); - } -} - -void subscribeAndUnsubscribe(uint iters, int N) { - subscribeImpl(iters, N, true); -} - -void subscribe(uint iters, int N) { - subscribeImpl(iters, N, false); -} - -void observe(uint iters, int N) { - for (uint iter = 0; iter < iters; iter++) { - BenchmarkSuspender bs; - Subject subject; - std::vector>> observers; - for (int i = 0; i < N; i++) { - observers.push_back(makeObserver()); - } - bs.dismiss(); - for (int i = 0; i < N; i++) { - subject.observe(std::move(observers[i])); - } - bs.rehire(); - } -} - -void inlineObserve(uint iters, int N) { - for (uint iter = 0; iter < iters; iter++) { - BenchmarkSuspender bs; - Subject subject; - std::vector*> observers; - for (int i = 0; i < N; i++) { - observers.push_back(makeObserver().release()); - } - bs.dismiss(); - for (int i = 0; i < N; i++) { - subject.observe(observers[i]); - } - bs.rehire(); - for (int i = 0; i < N; i++) { - delete observers[i]; - } - } -} - -void notifySubscribers(uint iters, int N) { - for (uint iter = 0; iter < iters; iter++) { - BenchmarkSuspender bs; - Subject subject; - std::vector>> observers; - std::vector> subscriptions; - subscriptions.reserve(N); - for (int i = 0; i < N; i++) { - observers.push_back(makeObserver()); - } - for (int i = 0; i < N; i++) { - subscriptions.push_back(subject.subscribe(std::move(observers[i]))); - } - bs.dismiss(); - subject.onNext(42); - bs.rehire(); - } -} - -void notifyInlineObservers(uint iters, int N) { - for (uint iter = 0; iter < iters; iter++) { - BenchmarkSuspender bs; - Subject subject; - std::vector*> observers; - for (int i = 0; i < N; i++) { - observers.push_back(makeObserver().release()); - } - for (int i = 0; i < N; i++) { - subject.observe(observers[i]); - } - bs.dismiss(); - subject.onNext(42); - bs.rehire(); - } -} - -BENCHMARK_PARAM(subscribeAndUnsubscribe, 1); -BENCHMARK_RELATIVE_PARAM(subscribe, 1); -BENCHMARK_RELATIVE_PARAM(observe, 1); -BENCHMARK_RELATIVE_PARAM(inlineObserve, 1); - -BENCHMARK_DRAW_LINE(); - -BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000); -BENCHMARK_RELATIVE_PARAM(subscribe, 1000); -BENCHMARK_RELATIVE_PARAM(observe, 1000); -BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000); - -BENCHMARK_DRAW_LINE(); - -BENCHMARK_PARAM(notifySubscribers, 1); -BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1); - -BENCHMARK_DRAW_LINE(); - -BENCHMARK_PARAM(notifySubscribers, 1000); -BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000); - -int main(int argc, char** argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - folly::runBenchmarks(); - return 0; -} diff --git a/folly/wangle/rx/test/RxTest.cpp b/folly/wangle/rx/test/RxTest.cpp deleted file mode 100644 index 012a8c2f..00000000 --- a/folly/wangle/rx/test/RxTest.cpp +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include - -using namespace folly::wangle; - -static std::unique_ptr> incrementer(int& counter) { - return Observer::create([&] (int x) { - counter++; - }); -} - -TEST(RxTest, Observe) { - Subject subject; - auto count = 0; - subject.observe(incrementer(count)); - subject.onNext(1); - EXPECT_EQ(1, count); -} - -TEST(RxTest, ObserveInline) { - Subject subject; - auto count = 0; - auto o = incrementer(count).release(); - subject.observe(o); - subject.onNext(1); - EXPECT_EQ(1, count); - delete o; -} - -TEST(RxTest, Subscription) { - Subject subject; - auto count = 0; - { - auto s = subject.subscribe(incrementer(count)); - subject.onNext(1); - } - // The subscription has gone out of scope so no one should get this. - subject.onNext(2); - EXPECT_EQ(1, count); -} - -TEST(RxTest, SubscriptionMove) { - Subject subject; - auto count = 0; - auto s = subject.subscribe(incrementer(count)); - auto s2 = subject.subscribe(incrementer(count)); - s2 = std::move(s); - subject.onNext(1); - Subscription s3(std::move(s2)); - subject.onNext(2); - EXPECT_EQ(2, count); -} - -TEST(RxTest, SubscriptionOutlivesSubject) { - Subscription s; - { - Subject subject; - s = subject.subscribe(Observer::create([](int){})); - } - // Don't explode when s is destroyed -} - -TEST(RxTest, SubscribeDuringCallback) { - // A subscriber who was subscribed in the course of a callback should get - // subsequent updates but not the current update. - Subject subject; - int outerCount = 0, innerCount = 0; - Subscription s1, s2; - s1 = subject.subscribe(Observer::create([&] (int x) { - outerCount++; - s2 = subject.subscribe(incrementer(innerCount)); - })); - subject.onNext(42); - subject.onNext(0xDEADBEEF); - EXPECT_EQ(2, outerCount); - EXPECT_EQ(1, innerCount); -} - -TEST(RxTest, ObserveDuringCallback) { - Subject subject; - int outerCount = 0, innerCount = 0; - subject.observe(Observer::create([&] (int x) { - outerCount++; - subject.observe(incrementer(innerCount)); - })); - subject.onNext(42); - subject.onNext(0xDEADBEEF); - EXPECT_EQ(2, outerCount); - EXPECT_EQ(1, innerCount); -} - -TEST(RxTest, ObserveInlineDuringCallback) { - Subject subject; - int outerCount = 0, innerCount = 0; - auto innerO = incrementer(innerCount).release(); - auto outerO = Observer::create([&] (int x) { - outerCount++; - subject.observe(innerO); - }).release(); - subject.observe(outerO); - subject.onNext(42); - subject.onNext(0xDEADBEEF); - EXPECT_EQ(2, outerCount); - EXPECT_EQ(1, innerCount); - delete innerO; - delete outerO; -} - -TEST(RxTest, UnsubscribeDuringCallback) { - // A subscriber who was unsubscribed in the course of a callback should get - // the current update but not subsequent ones - Subject subject; - int count1 = 0, count2 = 0; - auto s1 = subject.subscribe(incrementer(count1)); - auto s2 = subject.subscribe(Observer::create([&] (int x) { - count2++; - s1.~Subscription(); - })); - subject.onNext(1); - subject.onNext(2); - EXPECT_EQ(1, count1); - EXPECT_EQ(2, count2); -} - -TEST(RxTest, SubscribeUnsubscribeDuringCallback) { - // A subscriber who was subscribed and unsubscribed in the course of a - // callback should not get any updates - Subject subject; - int outerCount = 0, innerCount = 0; - auto s2 = subject.subscribe(Observer::create([&] (int x) { - outerCount++; - auto s2 = subject.subscribe(incrementer(innerCount)); - })); - subject.onNext(1); - subject.onNext(2); - EXPECT_EQ(2, outerCount); - EXPECT_EQ(0, innerCount); -} - -// Move only type -typedef std::unique_ptr MO; -static MO makeMO() { return folly::make_unique(1); } -template -static ObserverPtr makeMOObserver() { - return Observer::create([](const T& mo) { - EXPECT_EQ(1, *mo); - }); -} - -TEST(RxTest, MoveOnlyRvalue) { - Subject subject; - auto s1 = subject.subscribe(makeMOObserver()); - auto s2 = subject.subscribe(makeMOObserver()); - auto mo = makeMO(); - // Can't bind lvalues to rvalue references - // subject.onNext(mo); - subject.onNext(std::move(mo)); - subject.onNext(makeMO()); -} - -// Copy only type -struct CO { - CO() = default; - CO(const CO&) = default; - CO(CO&&) = delete; -}; - -template -static ObserverPtr makeCOObserver() { - return Observer::create([](const T& mo) {}); -} - -TEST(RxTest, CopyOnly) { - Subject subject; - auto s1 = subject.subscribe(makeCOObserver()); - CO co; - subject.onNext(co); -} diff --git a/folly/wangle/rx/types.h b/folly/wangle/rx/types.h deleted file mode 100644 index 3bb540e6..00000000 --- a/folly/wangle/rx/types.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -namespace folly { namespace wangle { - typedef folly::exception_wrapper Error; - // The Executor is basically an rx Scheduler (by design). So just - // alias it. - typedef std::shared_ptr SchedulerPtr; - - template class Observable; - template struct Observer; - template struct Subject; - - template using ObservablePtr = std::shared_ptr>; - template using ObserverPtr = std::shared_ptr>; - template using SubjectPtr = std::shared_ptr>; -}}