wangle/concurrent/NamedThreadFactory.h \
wangle/concurrent/ThreadFactory.h \
wangle/concurrent/ThreadPoolExecutor.h \
- wangle/rx/Observable.h \
- wangle/rx/Observer.h \
- wangle/rx/Subject.h \
- wangle/rx/Subscription.h \
- wangle/rx/types.h \
+ wangle/deprecated/rx/Observable.h \
+ wangle/deprecated/rx/Observer.h \
+ wangle/deprecated/rx/Subject.h \
+ wangle/deprecated/rx/Subscription.h \
+ wangle/deprecated/rx/types.h \
wangle/ssl/ClientHelloExtStats.h \
wangle/ssl/DHParam.h \
wangle/ssl/PasswordInFile.h \
#include <folly/Executor.h>
#include <folly/wangle/concurrent/LifoSemMPMCQueue.h>
#include <folly/wangle/concurrent/NamedThreadFactory.h>
-#include <folly/wangle/rx/Observable.h>
+#include <folly/wangle/deprecated/rx/Observable.h>
#include <folly/Baton.h>
#include <folly/Memory.h>
#include <folly/RWSpinLock.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// fbbuild is too dumb to know that .h files in the directory affect
+// our project, unless we have a .cpp file in the target, in the same
+// directory.
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <folly/wangle/deprecated/rx/Subject.h>
+#include <folly/wangle/deprecated/rx/Subscription.h>
+
+#include <folly/RWSpinLock.h>
+#include <folly/SmallLocks.h>
+#include <folly/ThreadLocal.h>
+#include <folly/small_vector.h>
+#include <folly/Executor.h>
+#include <folly/Memory.h>
+#include <map>
+#include <memory>
+
+namespace folly { namespace wangle {
+
+template <class T, size_t InlineObservers>
+class Observable {
+ public:
+ Observable() : nextSubscriptionId_{1} {}
+
+ // TODO perhaps we want to provide this #5283229
+ Observable(Observable&& other) = delete;
+
+ virtual ~Observable() {
+ if (unsubscriber_) {
+ unsubscriber_->disable();
+ }
+ }
+
+ // The next three methods subscribe the given Observer to this Observable.
+ //
+ // If these are called within an Observer callback, the new observer will not
+ // get the current update but will get subsequent updates.
+ //
+ // subscribe() returns a Subscription object. The observer will continue to
+ // get updates until the Subscription is destroyed.
+ //
+ // observe(ObserverPtr<T>) creates an indefinite subscription
+ //
+ // observe(Observer<T>*) also creates an indefinite subscription, but the
+ // caller is responsible for ensuring that the given Observer outlives this
+ // Observable. This might be useful in high performance environments where
+ // allocations must be kept to a minimum. Template parameter InlineObservers
+ // specifies how many observers can been subscribed inline without any
+ // allocations (it's just the size of a folly::small_vector).
+ virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
+ return subscribeImpl(observer, false);
+ }
+
+ virtual void observe(ObserverPtr<T> observer) {
+ subscribeImpl(observer, true);
+ }
+
+ virtual void observe(Observer<T>* observer) {
+ if (inCallback_ && *inCallback_) {
+ if (!newObservers_) {
+ newObservers_.reset(new ObserverList());
+ }
+ newObservers_->push_back(observer);
+ } else {
+ RWSpinLock::WriteHolder{&observersLock_};
+ observers_.push_back(observer);
+ }
+ }
+
+ // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
+
+ /// Returns a new Observable that will call back on the given Scheduler.
+ /// The returned Observable must outlive the parent Observable.
+
+ // This and subscribeOn should maybe just be a first-class feature of an
+ // Observable, rather than making new ones whose lifetimes are tied to their
+ // parents. In that case it'd return a reference to this object for
+ // chaining.
+ ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
+ // you're right Hannes, if we have Observable::create we don't need this
+ // helper class.
+ struct ViaSubject : public Observable<T>
+ {
+ ViaSubject(SchedulerPtr sched,
+ Observable* obs)
+ : scheduler_(sched), observable_(obs)
+ {}
+
+ Subscription<T> subscribe(ObserverPtr<T> o) override {
+ return observable_->subscribe(
+ Observer<T>::create(
+ [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
+ [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
+ [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
+ }
+
+ protected:
+ SchedulerPtr scheduler_;
+ Observable* observable_;
+ };
+
+ return std::make_shared<ViaSubject>(scheduler, this);
+ }
+
+ /// Returns a new Observable that will subscribe to this parent Observable
+ /// via the given Scheduler. This can be subtle and confusing at first, see
+ /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
+ std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
+ struct Subject_ : public Subject<T> {
+ public:
+ Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
+ }
+
+ Subscription<T> subscribe(ObserverPtr<T> o) {
+ scheduler_->add([=] {
+ observable_->subscribe(o);
+ });
+ return Subscription<T>(nullptr, 0); // TODO
+ }
+
+ protected:
+ SchedulerPtr scheduler_;
+ Observable* observable_;
+ };
+
+ return folly::make_unique<Subject_>(scheduler, this);
+ }
+
+ protected:
+ // Safely execute an operation on each observer. F must take a single
+ // Observer<T>* as its argument.
+ template <class F>
+ void forEachObserver(F f) {
+ if (UNLIKELY(!inCallback_)) {
+ inCallback_.reset(new bool{false});
+ }
+ CHECK(!(*inCallback_));
+ *inCallback_ = true;
+
+ {
+ RWSpinLock::ReadHolder rh(observersLock_);
+ for (auto o : observers_) {
+ f(o);
+ }
+
+ for (auto& kv : subscribers_) {
+ f(kv.second.get());
+ }
+ }
+
+ if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
+ (newSubscribers_ && !newSubscribers_->empty()) ||
+ (oldSubscribers_ && !oldSubscribers_->empty()))) {
+ {
+ RWSpinLock::WriteHolder wh(observersLock_);
+ if (newObservers_) {
+ for (auto observer : *(newObservers_)) {
+ observers_.push_back(observer);
+ }
+ newObservers_->clear();
+ }
+ if (newSubscribers_) {
+ for (auto& kv : *(newSubscribers_)) {
+ subscribers_.insert(std::move(kv));
+ }
+ newSubscribers_->clear();
+ }
+ if (oldSubscribers_) {
+ for (auto id : *(oldSubscribers_)) {
+ subscribers_.erase(id);
+ }
+ oldSubscribers_->clear();
+ }
+ }
+ }
+ *inCallback_ = false;
+ }
+
+ private:
+ Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
+ auto subscription = makeSubscription(indefinite);
+ typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
+ if (inCallback_ && *inCallback_) {
+ if (!newSubscribers_) {
+ newSubscribers_.reset(new SubscriberMap());
+ }
+ newSubscribers_->insert(std::move(kv));
+ } else {
+ RWSpinLock::WriteHolder{&observersLock_};
+ subscribers_.insert(std::move(kv));
+ }
+ return subscription;
+ }
+
+ class Unsubscriber {
+ public:
+ explicit Unsubscriber(Observable* observable) : observable_(observable) {
+ CHECK(observable_);
+ }
+
+ void unsubscribe(uint64_t id) {
+ CHECK(id > 0);
+ RWSpinLock::ReadHolder guard(lock_);
+ if (observable_) {
+ observable_->unsubscribe(id);
+ }
+ }
+
+ void disable() {
+ RWSpinLock::WriteHolder guard(lock_);
+ observable_ = nullptr;
+ }
+
+ private:
+ RWSpinLock lock_;
+ Observable* observable_;
+ };
+
+ std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
+ MicroSpinLock unsubscriberLock_{0};
+
+ friend class Subscription<T>;
+
+ void unsubscribe(uint64_t id) {
+ if (inCallback_ && *inCallback_) {
+ if (!oldSubscribers_) {
+ oldSubscribers_.reset(new std::vector<uint64_t>());
+ }
+ if (newSubscribers_) {
+ auto it = newSubscribers_->find(id);
+ if (it != newSubscribers_->end()) {
+ newSubscribers_->erase(it);
+ return;
+ }
+ }
+ oldSubscribers_->push_back(id);
+ } else {
+ RWSpinLock::WriteHolder{&observersLock_};
+ subscribers_.erase(id);
+ }
+ }
+
+ Subscription<T> makeSubscription(bool indefinite) {
+ if (indefinite) {
+ return Subscription<T>(nullptr, nextSubscriptionId_++);
+ } else {
+ if (!unsubscriber_) {
+ std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
+ if (!unsubscriber_) {
+ unsubscriber_ = std::make_shared<Unsubscriber>(this);
+ }
+ }
+ return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
+ }
+ }
+
+ std::atomic<uint64_t> nextSubscriptionId_;
+ RWSpinLock observersLock_;
+ folly::ThreadLocalPtr<bool> inCallback_;
+
+ typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
+ ObserverList observers_;
+ folly::ThreadLocalPtr<ObserverList> newObservers_;
+
+ typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
+ SubscriberMap subscribers_;
+ folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
+ folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <functional>
+#include <memory>
+#include <stdexcept>
+#include <folly/Memory.h>
+
+namespace folly { namespace wangle {
+
+template <class T> struct FunctionObserver;
+
+/// Observer interface. You can subclass it, or you can just use create()
+/// to use std::functions.
+template <class T>
+struct Observer {
+ // These are what it means to be an Observer.
+ virtual void onNext(const T&) = 0;
+ virtual void onError(Error) = 0;
+ virtual void onCompleted() = 0;
+
+ virtual ~Observer() = default;
+
+ /// Create an Observer with std::function callbacks. Handy to make ad-hoc
+ /// Observers with lambdas.
+ ///
+ /// Templated for maximum perfect forwarding flexibility, but ultimately
+ /// whatever you pass in has to implicitly become a std::function for the
+ /// same signature as onNext(), onError(), and onCompleted() respectively.
+ /// (see the FunctionObserver typedefs)
+ template <class N, class E, class C>
+ static std::unique_ptr<Observer> create(
+ N&& onNextFn, E&& onErrorFn, C&& onCompletedFn)
+ {
+ return folly::make_unique<FunctionObserver<T>>(
+ std::forward<N>(onNextFn),
+ std::forward<E>(onErrorFn),
+ std::forward<C>(onCompletedFn));
+ }
+
+ /// Create an Observer with only onNext and onError callbacks.
+ /// onCompleted will just be a no-op.
+ template <class N, class E>
+ static std::unique_ptr<Observer> create(N&& onNextFn, E&& onErrorFn) {
+ return folly::make_unique<FunctionObserver<T>>(
+ std::forward<N>(onNextFn),
+ std::forward<E>(onErrorFn),
+ nullptr);
+ }
+
+ /// Create an Observer with only an onNext callback.
+ /// onError and onCompleted will just be no-ops.
+ template <class N>
+ static std::unique_ptr<Observer> create(N&& onNextFn) {
+ return folly::make_unique<FunctionObserver<T>>(
+ std::forward<N>(onNextFn),
+ nullptr,
+ nullptr);
+ }
+};
+
+/// An observer that uses std::function callbacks. You don't really want to
+/// make one of these directly - instead use the Observer::create() methods.
+template <class T>
+struct FunctionObserver : public Observer<T> {
+ typedef std::function<void(const T&)> OnNext;
+ typedef std::function<void(Error)> OnError;
+ typedef std::function<void()> OnCompleted;
+
+ /// We don't need any fancy overloads of this constructor because that's
+ /// what Observer::create() is for.
+ template <class N = OnNext, class E = OnError, class C = OnCompleted>
+ FunctionObserver(N&& n, E&& e, C&& c)
+ : onNext_(std::forward<N>(n)),
+ onError_(std::forward<E>(e)),
+ onCompleted_(std::forward<C>(c))
+ {}
+
+ void onNext(const T& val) override {
+ if (onNext_) onNext_(val);
+ }
+
+ void onError(Error e) override {
+ if (onError_) onError_(e);
+ }
+
+ void onCompleted() override {
+ if (onCompleted_) onCompleted_();
+ }
+
+ protected:
+ OnNext onNext_;
+ OnError onError_;
+ OnCompleted onCompleted_;
+};
+
+}}
--- /dev/null
+Rx is a pattern for "functional reactive programming" that started at
+Microsoft in C#, and has been reimplemented in various languages, notably
+RxJava for JVM languages.
+
+It is basically the plural of Futures (a la Wangle).
+
+```
+ singular | plural
+ +---------------------------------+-----------------------------------
+ sync | Foo getData() | std::vector<Foo> getData()
+ async | wangle::Future<Foo> getData() | wangle::Observable<Foo> getData()
+```
+
+For more on Rx, I recommend these resources:
+
+Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
+Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html
+The RxJava wiki: https://github.com/Netflix/RxJava/wiki
+Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx
+https://rx.codeplex.com/
+
+I haven't even tried to support move-only data in this version. I'm on the
+fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich
+set of operations is obviously missing. I haven't decided how to handle
+subscriptions (and therefore cancellation), but I'm pretty sure C#'s
+"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe
+returns nothing at all and you can't cancel anything ever. The whole thing is
+probably riddled with lifetime corner case bugs that will come out like a
+swarm of angry bees as soon as someone tries an infinite sequence, or tries to
+partially observe a long sequence. I'm pretty sure subscribeOn has a bug that
+I haven't tracked down yet.
+
+DEPRECATED:
+This was an experimental exploration. There are better, more robust, and (most
+importantly) supported C++ implementations, notably
+[rxcpp](https://rxcpp.codeplex.com/). Use that instead. You really shouldn't
+use this one. It's unsupported and incomplete. Honest.
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <folly/wangle/deprecated/rx/Observable.h>
+#include <folly/wangle/deprecated/rx/Observer.h>
+
+namespace folly { namespace wangle {
+
+/// Subject interface. A Subject is both an Observable and an Observer. There
+/// is a default implementation of the Observer methods that just forwards the
+/// observed events to the Subject's observers.
+template <class T>
+struct Subject : public Observable<T>, public Observer<T> {
+ void onNext(const T& val) override {
+ this->forEachObserver([&](Observer<T>* o){
+ o->onNext(val);
+ });
+ }
+ void onError(Error e) override {
+ this->forEachObserver([&](Observer<T>* o){
+ o->onError(e);
+ });
+ }
+ void onCompleted() override {
+ this->forEachObserver([](Observer<T>* o){
+ o->onCompleted();
+ });
+ }
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <folly/wangle/deprecated/rx/Observable.h>
+
+namespace folly { namespace wangle {
+
+template <class T>
+class Subscription {
+ public:
+ Subscription() = default;
+
+ Subscription(const Subscription&) = delete;
+
+ Subscription(Subscription&& other) noexcept {
+ *this = std::move(other);
+ }
+
+ Subscription& operator=(Subscription&& other) noexcept {
+ unsubscribe();
+ unsubscriber_ = std::move(other.unsubscriber_);
+ id_ = other.id_;
+ other.unsubscriber_ = nullptr;
+ other.id_ = 0;
+ return *this;
+ }
+
+ ~Subscription() {
+ unsubscribe();
+ }
+
+ private:
+ typedef typename Observable<T>::Unsubscriber Unsubscriber;
+
+ Subscription(std::shared_ptr<Unsubscriber> unsubscriber, uint64_t id)
+ : unsubscriber_(std::move(unsubscriber)), id_(id) {
+ CHECK(id_ > 0);
+ }
+
+ void unsubscribe() {
+ if (unsubscriber_ && id_ > 0) {
+ unsubscriber_->unsubscribe(id_);
+ id_ = 0;
+ unsubscriber_ = nullptr;
+ }
+ }
+
+ std::shared_ptr<Unsubscriber> unsubscriber_;
+ uint64_t id_{0};
+
+ friend class Observable<T>;
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/Benchmark.h>
+#include <folly/wangle/deprecated/rx/Observer.h>
+#include <folly/wangle/deprecated/rx/Subject.h>
+#include <gflags/gflags.h>
+
+using namespace folly::wangle;
+using folly::BenchmarkSuspender;
+
+static std::unique_ptr<Observer<int>> makeObserver() {
+ return Observer<int>::create([&] (int x) {});
+}
+
+void subscribeImpl(uint iters, int N, bool countUnsubscribe) {
+ for (uint iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<std::unique_ptr<Observer<int>>> observers;
+ std::vector<Subscription<int>> subscriptions;
+ subscriptions.reserve(N);
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver());
+ }
+ bs.dismiss();
+ for (int i = 0; i < N; i++) {
+ subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+ }
+ if (countUnsubscribe) {
+ subscriptions.clear();
+ }
+ bs.rehire();
+ }
+}
+
+void subscribeAndUnsubscribe(uint iters, int N) {
+ subscribeImpl(iters, N, true);
+}
+
+void subscribe(uint iters, int N) {
+ subscribeImpl(iters, N, false);
+}
+
+void observe(uint iters, int N) {
+ for (uint iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<std::unique_ptr<Observer<int>>> observers;
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver());
+ }
+ bs.dismiss();
+ for (int i = 0; i < N; i++) {
+ subject.observe(std::move(observers[i]));
+ }
+ bs.rehire();
+ }
+}
+
+void inlineObserve(uint iters, int N) {
+ for (uint iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<Observer<int>*> observers;
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver().release());
+ }
+ bs.dismiss();
+ for (int i = 0; i < N; i++) {
+ subject.observe(observers[i]);
+ }
+ bs.rehire();
+ for (int i = 0; i < N; i++) {
+ delete observers[i];
+ }
+ }
+}
+
+void notifySubscribers(uint iters, int N) {
+ for (uint iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<std::unique_ptr<Observer<int>>> observers;
+ std::vector<Subscription<int>> subscriptions;
+ subscriptions.reserve(N);
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver());
+ }
+ for (int i = 0; i < N; i++) {
+ subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+ }
+ bs.dismiss();
+ subject.onNext(42);
+ bs.rehire();
+ }
+}
+
+void notifyInlineObservers(uint iters, int N) {
+ for (uint iter = 0; iter < iters; iter++) {
+ BenchmarkSuspender bs;
+ Subject<int> subject;
+ std::vector<Observer<int>*> observers;
+ for (int i = 0; i < N; i++) {
+ observers.push_back(makeObserver().release());
+ }
+ for (int i = 0; i < N; i++) {
+ subject.observe(observers[i]);
+ }
+ bs.dismiss();
+ subject.onNext(42);
+ bs.rehire();
+ }
+}
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1);
+BENCHMARK_RELATIVE_PARAM(observe, 1);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(observe, 1000);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1000);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000);
+
+int main(int argc, char** argv) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ folly::runBenchmarks();
+ return 0;
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/wangle/deprecated/rx/Observer.h>
+#include <folly/wangle/deprecated/rx/Subject.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+
+static std::unique_ptr<Observer<int>> incrementer(int& counter) {
+ return Observer<int>::create([&] (int x) {
+ counter++;
+ });
+}
+
+TEST(RxTest, Observe) {
+ Subject<int> subject;
+ auto count = 0;
+ subject.observe(incrementer(count));
+ subject.onNext(1);
+ EXPECT_EQ(1, count);
+}
+
+TEST(RxTest, ObserveInline) {
+ Subject<int> subject;
+ auto count = 0;
+ auto o = incrementer(count).release();
+ subject.observe(o);
+ subject.onNext(1);
+ EXPECT_EQ(1, count);
+ delete o;
+}
+
+TEST(RxTest, Subscription) {
+ Subject<int> subject;
+ auto count = 0;
+ {
+ auto s = subject.subscribe(incrementer(count));
+ subject.onNext(1);
+ }
+ // The subscription has gone out of scope so no one should get this.
+ subject.onNext(2);
+ EXPECT_EQ(1, count);
+}
+
+TEST(RxTest, SubscriptionMove) {
+ Subject<int> subject;
+ auto count = 0;
+ auto s = subject.subscribe(incrementer(count));
+ auto s2 = subject.subscribe(incrementer(count));
+ s2 = std::move(s);
+ subject.onNext(1);
+ Subscription<int> s3(std::move(s2));
+ subject.onNext(2);
+ EXPECT_EQ(2, count);
+}
+
+TEST(RxTest, SubscriptionOutlivesSubject) {
+ Subscription<int> s;
+ {
+ Subject<int> subject;
+ s = subject.subscribe(Observer<int>::create([](int){}));
+ }
+ // Don't explode when s is destroyed
+}
+
+TEST(RxTest, SubscribeDuringCallback) {
+ // A subscriber who was subscribed in the course of a callback should get
+ // subsequent updates but not the current update.
+ Subject<int> subject;
+ int outerCount = 0, innerCount = 0;
+ Subscription<int> s1, s2;
+ s1 = subject.subscribe(Observer<int>::create([&] (int x) {
+ outerCount++;
+ s2 = subject.subscribe(incrementer(innerCount));
+ }));
+ subject.onNext(42);
+ subject.onNext(0xDEADBEEF);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(1, innerCount);
+}
+
+TEST(RxTest, ObserveDuringCallback) {
+ Subject<int> subject;
+ int outerCount = 0, innerCount = 0;
+ subject.observe(Observer<int>::create([&] (int x) {
+ outerCount++;
+ subject.observe(incrementer(innerCount));
+ }));
+ subject.onNext(42);
+ subject.onNext(0xDEADBEEF);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(1, innerCount);
+}
+
+TEST(RxTest, ObserveInlineDuringCallback) {
+ Subject<int> subject;
+ int outerCount = 0, innerCount = 0;
+ auto innerO = incrementer(innerCount).release();
+ auto outerO = Observer<int>::create([&] (int x) {
+ outerCount++;
+ subject.observe(innerO);
+ }).release();
+ subject.observe(outerO);
+ subject.onNext(42);
+ subject.onNext(0xDEADBEEF);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(1, innerCount);
+ delete innerO;
+ delete outerO;
+}
+
+TEST(RxTest, UnsubscribeDuringCallback) {
+ // A subscriber who was unsubscribed in the course of a callback should get
+ // the current update but not subsequent ones
+ Subject<int> subject;
+ int count1 = 0, count2 = 0;
+ auto s1 = subject.subscribe(incrementer(count1));
+ auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
+ count2++;
+ s1.~Subscription();
+ }));
+ subject.onNext(1);
+ subject.onNext(2);
+ EXPECT_EQ(1, count1);
+ EXPECT_EQ(2, count2);
+}
+
+TEST(RxTest, SubscribeUnsubscribeDuringCallback) {
+ // A subscriber who was subscribed and unsubscribed in the course of a
+ // callback should not get any updates
+ Subject<int> subject;
+ int outerCount = 0, innerCount = 0;
+ auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
+ outerCount++;
+ auto s2 = subject.subscribe(incrementer(innerCount));
+ }));
+ subject.onNext(1);
+ subject.onNext(2);
+ EXPECT_EQ(2, outerCount);
+ EXPECT_EQ(0, innerCount);
+}
+
+// Move only type
+typedef std::unique_ptr<int> MO;
+static MO makeMO() { return folly::make_unique<int>(1); }
+template <typename T>
+static ObserverPtr<T> makeMOObserver() {
+ return Observer<T>::create([](const T& mo) {
+ EXPECT_EQ(1, *mo);
+ });
+}
+
+TEST(RxTest, MoveOnlyRvalue) {
+ Subject<MO> subject;
+ auto s1 = subject.subscribe(makeMOObserver<MO>());
+ auto s2 = subject.subscribe(makeMOObserver<MO>());
+ auto mo = makeMO();
+ // Can't bind lvalues to rvalue references
+ // subject.onNext(mo);
+ subject.onNext(std::move(mo));
+ subject.onNext(makeMO());
+}
+
+// Copy only type
+struct CO {
+ CO() = default;
+ CO(const CO&) = default;
+ CO(CO&&) = delete;
+};
+
+template <typename T>
+static ObserverPtr<T> makeCOObserver() {
+ return Observer<T>::create([](const T& mo) {});
+}
+
+TEST(RxTest, CopyOnly) {
+ Subject<CO> subject;
+ auto s1 = subject.subscribe(makeCOObserver<CO>());
+ CO co;
+ subject.onNext(co);
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Executor.h>
+
+namespace folly { namespace wangle {
+ typedef folly::exception_wrapper Error;
+ // The Executor is basically an rx Scheduler (by design). So just
+ // alias it.
+ typedef std::shared_ptr<folly::Executor> SchedulerPtr;
+
+ template <class T, size_t InlineObservers = 3> class Observable;
+ template <class T> struct Observer;
+ template <class T> struct Subject;
+
+ template <class T> using ObservablePtr = std::shared_ptr<Observable<T>>;
+ template <class T> using ObserverPtr = std::shared_ptr<Observer<T>>;
+ template <class T> using SubjectPtr = std::shared_ptr<Subject<T>>;
+}}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// fbbuild is too dumb to know that .h files in the directory affect
-// our project, unless we have a .cpp file in the target, in the same
-// directory.
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <folly/wangle/rx/Subject.h>
-#include <folly/wangle/rx/Subscription.h>
-
-#include <folly/RWSpinLock.h>
-#include <folly/SmallLocks.h>
-#include <folly/ThreadLocal.h>
-#include <folly/small_vector.h>
-#include <folly/Executor.h>
-#include <folly/Memory.h>
-#include <map>
-#include <memory>
-
-namespace folly { namespace wangle {
-
-template <class T, size_t InlineObservers>
-class Observable {
- public:
- Observable() : nextSubscriptionId_{1} {}
-
- // TODO perhaps we want to provide this #5283229
- Observable(Observable&& other) = delete;
-
- virtual ~Observable() {
- if (unsubscriber_) {
- unsubscriber_->disable();
- }
- }
-
- // The next three methods subscribe the given Observer to this Observable.
- //
- // If these are called within an Observer callback, the new observer will not
- // get the current update but will get subsequent updates.
- //
- // subscribe() returns a Subscription object. The observer will continue to
- // get updates until the Subscription is destroyed.
- //
- // observe(ObserverPtr<T>) creates an indefinite subscription
- //
- // observe(Observer<T>*) also creates an indefinite subscription, but the
- // caller is responsible for ensuring that the given Observer outlives this
- // Observable. This might be useful in high performance environments where
- // allocations must be kept to a minimum. Template parameter InlineObservers
- // specifies how many observers can been subscribed inline without any
- // allocations (it's just the size of a folly::small_vector).
- virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
- return subscribeImpl(observer, false);
- }
-
- virtual void observe(ObserverPtr<T> observer) {
- subscribeImpl(observer, true);
- }
-
- virtual void observe(Observer<T>* observer) {
- if (inCallback_ && *inCallback_) {
- if (!newObservers_) {
- newObservers_.reset(new ObserverList());
- }
- newObservers_->push_back(observer);
- } else {
- RWSpinLock::WriteHolder{&observersLock_};
- observers_.push_back(observer);
- }
- }
-
- // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
-
- /// Returns a new Observable that will call back on the given Scheduler.
- /// The returned Observable must outlive the parent Observable.
-
- // This and subscribeOn should maybe just be a first-class feature of an
- // Observable, rather than making new ones whose lifetimes are tied to their
- // parents. In that case it'd return a reference to this object for
- // chaining.
- ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
- // you're right Hannes, if we have Observable::create we don't need this
- // helper class.
- struct ViaSubject : public Observable<T>
- {
- ViaSubject(SchedulerPtr sched,
- Observable* obs)
- : scheduler_(sched), observable_(obs)
- {}
-
- Subscription<T> subscribe(ObserverPtr<T> o) override {
- return observable_->subscribe(
- Observer<T>::create(
- [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
- [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
- [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
- }
-
- protected:
- SchedulerPtr scheduler_;
- Observable* observable_;
- };
-
- return std::make_shared<ViaSubject>(scheduler, this);
- }
-
- /// Returns a new Observable that will subscribe to this parent Observable
- /// via the given Scheduler. This can be subtle and confusing at first, see
- /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
- std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
- struct Subject_ : public Subject<T> {
- public:
- Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
- }
-
- Subscription<T> subscribe(ObserverPtr<T> o) {
- scheduler_->add([=] {
- observable_->subscribe(o);
- });
- return Subscription<T>(nullptr, 0); // TODO
- }
-
- protected:
- SchedulerPtr scheduler_;
- Observable* observable_;
- };
-
- return folly::make_unique<Subject_>(scheduler, this);
- }
-
- protected:
- // Safely execute an operation on each observer. F must take a single
- // Observer<T>* as its argument.
- template <class F>
- void forEachObserver(F f) {
- if (UNLIKELY(!inCallback_)) {
- inCallback_.reset(new bool{false});
- }
- CHECK(!(*inCallback_));
- *inCallback_ = true;
-
- {
- RWSpinLock::ReadHolder rh(observersLock_);
- for (auto o : observers_) {
- f(o);
- }
-
- for (auto& kv : subscribers_) {
- f(kv.second.get());
- }
- }
-
- if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
- (newSubscribers_ && !newSubscribers_->empty()) ||
- (oldSubscribers_ && !oldSubscribers_->empty()))) {
- {
- RWSpinLock::WriteHolder wh(observersLock_);
- if (newObservers_) {
- for (auto observer : *(newObservers_)) {
- observers_.push_back(observer);
- }
- newObservers_->clear();
- }
- if (newSubscribers_) {
- for (auto& kv : *(newSubscribers_)) {
- subscribers_.insert(std::move(kv));
- }
- newSubscribers_->clear();
- }
- if (oldSubscribers_) {
- for (auto id : *(oldSubscribers_)) {
- subscribers_.erase(id);
- }
- oldSubscribers_->clear();
- }
- }
- }
- *inCallback_ = false;
- }
-
- private:
- Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
- auto subscription = makeSubscription(indefinite);
- typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
- if (inCallback_ && *inCallback_) {
- if (!newSubscribers_) {
- newSubscribers_.reset(new SubscriberMap());
- }
- newSubscribers_->insert(std::move(kv));
- } else {
- RWSpinLock::WriteHolder{&observersLock_};
- subscribers_.insert(std::move(kv));
- }
- return subscription;
- }
-
- class Unsubscriber {
- public:
- explicit Unsubscriber(Observable* observable) : observable_(observable) {
- CHECK(observable_);
- }
-
- void unsubscribe(uint64_t id) {
- CHECK(id > 0);
- RWSpinLock::ReadHolder guard(lock_);
- if (observable_) {
- observable_->unsubscribe(id);
- }
- }
-
- void disable() {
- RWSpinLock::WriteHolder guard(lock_);
- observable_ = nullptr;
- }
-
- private:
- RWSpinLock lock_;
- Observable* observable_;
- };
-
- std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
- MicroSpinLock unsubscriberLock_{0};
-
- friend class Subscription<T>;
-
- void unsubscribe(uint64_t id) {
- if (inCallback_ && *inCallback_) {
- if (!oldSubscribers_) {
- oldSubscribers_.reset(new std::vector<uint64_t>());
- }
- if (newSubscribers_) {
- auto it = newSubscribers_->find(id);
- if (it != newSubscribers_->end()) {
- newSubscribers_->erase(it);
- return;
- }
- }
- oldSubscribers_->push_back(id);
- } else {
- RWSpinLock::WriteHolder{&observersLock_};
- subscribers_.erase(id);
- }
- }
-
- Subscription<T> makeSubscription(bool indefinite) {
- if (indefinite) {
- return Subscription<T>(nullptr, nextSubscriptionId_++);
- } else {
- if (!unsubscriber_) {
- std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
- if (!unsubscriber_) {
- unsubscriber_ = std::make_shared<Unsubscriber>(this);
- }
- }
- return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
- }
- }
-
- std::atomic<uint64_t> nextSubscriptionId_;
- RWSpinLock observersLock_;
- folly::ThreadLocalPtr<bool> inCallback_;
-
- typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
- ObserverList observers_;
- folly::ThreadLocalPtr<ObserverList> newObservers_;
-
- typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
- SubscriberMap subscribers_;
- folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
- folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
-};
-
-}}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <functional>
-#include <memory>
-#include <stdexcept>
-#include <folly/Memory.h>
-
-namespace folly { namespace wangle {
-
-template <class T> struct FunctionObserver;
-
-/// Observer interface. You can subclass it, or you can just use create()
-/// to use std::functions.
-template <class T>
-struct Observer {
- // These are what it means to be an Observer.
- virtual void onNext(const T&) = 0;
- virtual void onError(Error) = 0;
- virtual void onCompleted() = 0;
-
- virtual ~Observer() = default;
-
- /// Create an Observer with std::function callbacks. Handy to make ad-hoc
- /// Observers with lambdas.
- ///
- /// Templated for maximum perfect forwarding flexibility, but ultimately
- /// whatever you pass in has to implicitly become a std::function for the
- /// same signature as onNext(), onError(), and onCompleted() respectively.
- /// (see the FunctionObserver typedefs)
- template <class N, class E, class C>
- static std::unique_ptr<Observer> create(
- N&& onNextFn, E&& onErrorFn, C&& onCompletedFn)
- {
- return folly::make_unique<FunctionObserver<T>>(
- std::forward<N>(onNextFn),
- std::forward<E>(onErrorFn),
- std::forward<C>(onCompletedFn));
- }
-
- /// Create an Observer with only onNext and onError callbacks.
- /// onCompleted will just be a no-op.
- template <class N, class E>
- static std::unique_ptr<Observer> create(N&& onNextFn, E&& onErrorFn) {
- return folly::make_unique<FunctionObserver<T>>(
- std::forward<N>(onNextFn),
- std::forward<E>(onErrorFn),
- nullptr);
- }
-
- /// Create an Observer with only an onNext callback.
- /// onError and onCompleted will just be no-ops.
- template <class N>
- static std::unique_ptr<Observer> create(N&& onNextFn) {
- return folly::make_unique<FunctionObserver<T>>(
- std::forward<N>(onNextFn),
- nullptr,
- nullptr);
- }
-};
-
-/// An observer that uses std::function callbacks. You don't really want to
-/// make one of these directly - instead use the Observer::create() methods.
-template <class T>
-struct FunctionObserver : public Observer<T> {
- typedef std::function<void(const T&)> OnNext;
- typedef std::function<void(Error)> OnError;
- typedef std::function<void()> OnCompleted;
-
- /// We don't need any fancy overloads of this constructor because that's
- /// what Observer::create() is for.
- template <class N = OnNext, class E = OnError, class C = OnCompleted>
- FunctionObserver(N&& n, E&& e, C&& c)
- : onNext_(std::forward<N>(n)),
- onError_(std::forward<E>(e)),
- onCompleted_(std::forward<C>(c))
- {}
-
- void onNext(const T& val) override {
- if (onNext_) onNext_(val);
- }
-
- void onError(Error e) override {
- if (onError_) onError_(e);
- }
-
- void onCompleted() override {
- if (onCompleted_) onCompleted_();
- }
-
- protected:
- OnNext onNext_;
- OnError onError_;
- OnCompleted onCompleted_;
-};
-
-}}
+++ /dev/null
-Rx is a pattern for "functional reactive programming" that started at
-Microsoft in C#, and has been reimplemented in various languages, notably
-RxJava for JVM languages.
-
-It is basically the plural of Futures (a la Wangle).
-
-```
- singular | plural
- +---------------------------------+-----------------------------------
- sync | Foo getData() | std::vector<Foo> getData()
- async | wangle::Future<Foo> getData() | wangle::Observable<Foo> getData()
-```
-
-For more on Rx, I recommend these resources:
-
-- Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
-- Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html
-- The RxJava wiki: https://github.com/Netflix/RxJava/wiki
-- Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx
-- https://rx.codeplex.com/
-
-There are open source C++ implementations, I haven't looked at them. They
-might be the best way to go rather than writing it NIH-style. I mostly did it
-as an exercise, to think through how closely we might want to integrate
-something like this with Wangle, and to get a feel for how it works in C++.
-
-I haven't even tried to support move-only data in this version. I'm on the
-fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich
-set of operations is obviously missing. I haven't decided how to handle
-subscriptions (and therefore cancellation), but I'm pretty sure C#'s
-"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe
-returns nothing at all and you can't cancel anything ever. The whole thing is
-probably riddled with lifetime corner case bugs that will come out like a
-swarm of angry bees as soon as someone tries an infinite sequence, or tries to
-partially observe a long sequence. I'm pretty sure subscribeOn has a bug that
-I haven't tracked down yet.
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <folly/wangle/rx/Observable.h>
-#include <folly/wangle/rx/Observer.h>
-
-namespace folly { namespace wangle {
-
-/// Subject interface. A Subject is both an Observable and an Observer. There
-/// is a default implementation of the Observer methods that just forwards the
-/// observed events to the Subject's observers.
-template <class T>
-struct Subject : public Observable<T>, public Observer<T> {
- void onNext(const T& val) override {
- this->forEachObserver([&](Observer<T>* o){
- o->onNext(val);
- });
- }
- void onError(Error e) override {
- this->forEachObserver([&](Observer<T>* o){
- o->onError(e);
- });
- }
- void onCompleted() override {
- this->forEachObserver([](Observer<T>* o){
- o->onCompleted();
- });
- }
-};
-
-}}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <folly/wangle/rx/Observable.h>
-
-namespace folly { namespace wangle {
-
-template <class T>
-class Subscription {
- public:
- Subscription() = default;
-
- Subscription(const Subscription&) = delete;
-
- Subscription(Subscription&& other) noexcept {
- *this = std::move(other);
- }
-
- Subscription& operator=(Subscription&& other) noexcept {
- unsubscribe();
- unsubscriber_ = std::move(other.unsubscriber_);
- id_ = other.id_;
- other.unsubscriber_ = nullptr;
- other.id_ = 0;
- return *this;
- }
-
- ~Subscription() {
- unsubscribe();
- }
-
- private:
- typedef typename Observable<T>::Unsubscriber Unsubscriber;
-
- Subscription(std::shared_ptr<Unsubscriber> unsubscriber, uint64_t id)
- : unsubscriber_(std::move(unsubscriber)), id_(id) {
- CHECK(id_ > 0);
- }
-
- void unsubscribe() {
- if (unsubscriber_ && id_ > 0) {
- unsubscriber_->unsubscribe(id_);
- id_ = 0;
- unsubscriber_ = nullptr;
- }
- }
-
- std::shared_ptr<Unsubscriber> unsubscriber_;
- uint64_t id_{0};
-
- friend class Observable<T>;
-};
-
-}}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/Benchmark.h>
-#include <folly/wangle/rx/Observer.h>
-#include <folly/wangle/rx/Subject.h>
-#include <gflags/gflags.h>
-
-using namespace folly::wangle;
-using folly::BenchmarkSuspender;
-
-static std::unique_ptr<Observer<int>> makeObserver() {
- return Observer<int>::create([&] (int x) {});
-}
-
-void subscribeImpl(uint iters, int N, bool countUnsubscribe) {
- for (uint iter = 0; iter < iters; iter++) {
- BenchmarkSuspender bs;
- Subject<int> subject;
- std::vector<std::unique_ptr<Observer<int>>> observers;
- std::vector<Subscription<int>> subscriptions;
- subscriptions.reserve(N);
- for (int i = 0; i < N; i++) {
- observers.push_back(makeObserver());
- }
- bs.dismiss();
- for (int i = 0; i < N; i++) {
- subscriptions.push_back(subject.subscribe(std::move(observers[i])));
- }
- if (countUnsubscribe) {
- subscriptions.clear();
- }
- bs.rehire();
- }
-}
-
-void subscribeAndUnsubscribe(uint iters, int N) {
- subscribeImpl(iters, N, true);
-}
-
-void subscribe(uint iters, int N) {
- subscribeImpl(iters, N, false);
-}
-
-void observe(uint iters, int N) {
- for (uint iter = 0; iter < iters; iter++) {
- BenchmarkSuspender bs;
- Subject<int> subject;
- std::vector<std::unique_ptr<Observer<int>>> observers;
- for (int i = 0; i < N; i++) {
- observers.push_back(makeObserver());
- }
- bs.dismiss();
- for (int i = 0; i < N; i++) {
- subject.observe(std::move(observers[i]));
- }
- bs.rehire();
- }
-}
-
-void inlineObserve(uint iters, int N) {
- for (uint iter = 0; iter < iters; iter++) {
- BenchmarkSuspender bs;
- Subject<int> subject;
- std::vector<Observer<int>*> observers;
- for (int i = 0; i < N; i++) {
- observers.push_back(makeObserver().release());
- }
- bs.dismiss();
- for (int i = 0; i < N; i++) {
- subject.observe(observers[i]);
- }
- bs.rehire();
- for (int i = 0; i < N; i++) {
- delete observers[i];
- }
- }
-}
-
-void notifySubscribers(uint iters, int N) {
- for (uint iter = 0; iter < iters; iter++) {
- BenchmarkSuspender bs;
- Subject<int> subject;
- std::vector<std::unique_ptr<Observer<int>>> observers;
- std::vector<Subscription<int>> subscriptions;
- subscriptions.reserve(N);
- for (int i = 0; i < N; i++) {
- observers.push_back(makeObserver());
- }
- for (int i = 0; i < N; i++) {
- subscriptions.push_back(subject.subscribe(std::move(observers[i])));
- }
- bs.dismiss();
- subject.onNext(42);
- bs.rehire();
- }
-}
-
-void notifyInlineObservers(uint iters, int N) {
- for (uint iter = 0; iter < iters; iter++) {
- BenchmarkSuspender bs;
- Subject<int> subject;
- std::vector<Observer<int>*> observers;
- for (int i = 0; i < N; i++) {
- observers.push_back(makeObserver().release());
- }
- for (int i = 0; i < N; i++) {
- subject.observe(observers[i]);
- }
- bs.dismiss();
- subject.onNext(42);
- bs.rehire();
- }
-}
-
-BENCHMARK_PARAM(subscribeAndUnsubscribe, 1);
-BENCHMARK_RELATIVE_PARAM(subscribe, 1);
-BENCHMARK_RELATIVE_PARAM(observe, 1);
-BENCHMARK_RELATIVE_PARAM(inlineObserve, 1);
-
-BENCHMARK_DRAW_LINE();
-
-BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000);
-BENCHMARK_RELATIVE_PARAM(subscribe, 1000);
-BENCHMARK_RELATIVE_PARAM(observe, 1000);
-BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000);
-
-BENCHMARK_DRAW_LINE();
-
-BENCHMARK_PARAM(notifySubscribers, 1);
-BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1);
-
-BENCHMARK_DRAW_LINE();
-
-BENCHMARK_PARAM(notifySubscribers, 1000);
-BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000);
-
-int main(int argc, char** argv) {
- gflags::ParseCommandLineFlags(&argc, &argv, true);
- folly::runBenchmarks();
- return 0;
-}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/wangle/rx/Observer.h>
-#include <folly/wangle/rx/Subject.h>
-#include <gtest/gtest.h>
-
-using namespace folly::wangle;
-
-static std::unique_ptr<Observer<int>> incrementer(int& counter) {
- return Observer<int>::create([&] (int x) {
- counter++;
- });
-}
-
-TEST(RxTest, Observe) {
- Subject<int> subject;
- auto count = 0;
- subject.observe(incrementer(count));
- subject.onNext(1);
- EXPECT_EQ(1, count);
-}
-
-TEST(RxTest, ObserveInline) {
- Subject<int> subject;
- auto count = 0;
- auto o = incrementer(count).release();
- subject.observe(o);
- subject.onNext(1);
- EXPECT_EQ(1, count);
- delete o;
-}
-
-TEST(RxTest, Subscription) {
- Subject<int> subject;
- auto count = 0;
- {
- auto s = subject.subscribe(incrementer(count));
- subject.onNext(1);
- }
- // The subscription has gone out of scope so no one should get this.
- subject.onNext(2);
- EXPECT_EQ(1, count);
-}
-
-TEST(RxTest, SubscriptionMove) {
- Subject<int> subject;
- auto count = 0;
- auto s = subject.subscribe(incrementer(count));
- auto s2 = subject.subscribe(incrementer(count));
- s2 = std::move(s);
- subject.onNext(1);
- Subscription<int> s3(std::move(s2));
- subject.onNext(2);
- EXPECT_EQ(2, count);
-}
-
-TEST(RxTest, SubscriptionOutlivesSubject) {
- Subscription<int> s;
- {
- Subject<int> subject;
- s = subject.subscribe(Observer<int>::create([](int){}));
- }
- // Don't explode when s is destroyed
-}
-
-TEST(RxTest, SubscribeDuringCallback) {
- // A subscriber who was subscribed in the course of a callback should get
- // subsequent updates but not the current update.
- Subject<int> subject;
- int outerCount = 0, innerCount = 0;
- Subscription<int> s1, s2;
- s1 = subject.subscribe(Observer<int>::create([&] (int x) {
- outerCount++;
- s2 = subject.subscribe(incrementer(innerCount));
- }));
- subject.onNext(42);
- subject.onNext(0xDEADBEEF);
- EXPECT_EQ(2, outerCount);
- EXPECT_EQ(1, innerCount);
-}
-
-TEST(RxTest, ObserveDuringCallback) {
- Subject<int> subject;
- int outerCount = 0, innerCount = 0;
- subject.observe(Observer<int>::create([&] (int x) {
- outerCount++;
- subject.observe(incrementer(innerCount));
- }));
- subject.onNext(42);
- subject.onNext(0xDEADBEEF);
- EXPECT_EQ(2, outerCount);
- EXPECT_EQ(1, innerCount);
-}
-
-TEST(RxTest, ObserveInlineDuringCallback) {
- Subject<int> subject;
- int outerCount = 0, innerCount = 0;
- auto innerO = incrementer(innerCount).release();
- auto outerO = Observer<int>::create([&] (int x) {
- outerCount++;
- subject.observe(innerO);
- }).release();
- subject.observe(outerO);
- subject.onNext(42);
- subject.onNext(0xDEADBEEF);
- EXPECT_EQ(2, outerCount);
- EXPECT_EQ(1, innerCount);
- delete innerO;
- delete outerO;
-}
-
-TEST(RxTest, UnsubscribeDuringCallback) {
- // A subscriber who was unsubscribed in the course of a callback should get
- // the current update but not subsequent ones
- Subject<int> subject;
- int count1 = 0, count2 = 0;
- auto s1 = subject.subscribe(incrementer(count1));
- auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
- count2++;
- s1.~Subscription();
- }));
- subject.onNext(1);
- subject.onNext(2);
- EXPECT_EQ(1, count1);
- EXPECT_EQ(2, count2);
-}
-
-TEST(RxTest, SubscribeUnsubscribeDuringCallback) {
- // A subscriber who was subscribed and unsubscribed in the course of a
- // callback should not get any updates
- Subject<int> subject;
- int outerCount = 0, innerCount = 0;
- auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
- outerCount++;
- auto s2 = subject.subscribe(incrementer(innerCount));
- }));
- subject.onNext(1);
- subject.onNext(2);
- EXPECT_EQ(2, outerCount);
- EXPECT_EQ(0, innerCount);
-}
-
-// Move only type
-typedef std::unique_ptr<int> MO;
-static MO makeMO() { return folly::make_unique<int>(1); }
-template <typename T>
-static ObserverPtr<T> makeMOObserver() {
- return Observer<T>::create([](const T& mo) {
- EXPECT_EQ(1, *mo);
- });
-}
-
-TEST(RxTest, MoveOnlyRvalue) {
- Subject<MO> subject;
- auto s1 = subject.subscribe(makeMOObserver<MO>());
- auto s2 = subject.subscribe(makeMOObserver<MO>());
- auto mo = makeMO();
- // Can't bind lvalues to rvalue references
- // subject.onNext(mo);
- subject.onNext(std::move(mo));
- subject.onNext(makeMO());
-}
-
-// Copy only type
-struct CO {
- CO() = default;
- CO(const CO&) = default;
- CO(CO&&) = delete;
-};
-
-template <typename T>
-static ObserverPtr<T> makeCOObserver() {
- return Observer<T>::create([](const T& mo) {});
-}
-
-TEST(RxTest, CopyOnly) {
- Subject<CO> subject;
- auto s1 = subject.subscribe(makeCOObserver<CO>());
- CO co;
- subject.onNext(co);
-}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/Executor.h>
-
-namespace folly { namespace wangle {
- typedef folly::exception_wrapper Error;
- // The Executor is basically an rx Scheduler (by design). So just
- // alias it.
- typedef std::shared_ptr<folly::Executor> SchedulerPtr;
-
- template <class T, size_t InlineObservers = 3> class Observable;
- template <class T> struct Observer;
- template <class T> struct Subject;
-
- template <class T> using ObservablePtr = std::shared_ptr<Observable<T>>;
- template <class T> using ObserverPtr = std::shared_ptr<Observer<T>>;
- template <class T> using SubjectPtr = std::shared_ptr<Subject<T>>;
-}}