--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "types.h"
+#include "Subject.h"
+#include "Subscription.h"
+
+#include <folly/wangle/Executor.h>
+#include <list>
+#include <memory>
+
+namespace folly { namespace wangle {
+
+template <class T>
+struct Observable {
+ virtual ~Observable() = default;
+
+ /// Subscribe the given Observer to this Observable.
+ // Eventually this will return a Subscription object of some kind, in order
+ // to support cancellation. This is kinda really important. Maybe I should
+ // just do it now, using an dummy Subscription object.
+ virtual Subscription subscribe(ObserverPtr<T> o) {
+ observers_.push_back(o);
+ return Subscription();
+ }
+
+ /// Returns a new Observable that will call back on the given Scheduler.
+ /// The returned Observable must outlive the parent Observable.
+
+ // This and subscribeOn should maybe just be a first-class feature of an
+ // Observable, rather than making new ones whose lifetimes are tied to their
+ // parents. In that case it'd return a reference to this object for
+ // chaining.
+ ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
+ // you're right Hannes, if we have Observable::create we don't need this
+ // helper class.
+ struct ViaSubject : public Observable<T>
+ {
+ ViaSubject(SchedulerPtr scheduler,
+ Observable* obs)
+ : scheduler_(scheduler), observable_(obs)
+ {}
+
+ Subscription subscribe(ObserverPtr<T> o) override {
+ return observable_->subscribe(
+ Observer<T>::create(
+ [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
+ [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
+ [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
+ }
+
+ protected:
+ SchedulerPtr scheduler_;
+ Observable* observable_;
+ };
+
+ return std::make_shared<ViaSubject>(scheduler, this);
+ }
+
+ /// Returns a new Observable that will subscribe to this parent Observable
+ /// via the given Scheduler. This can be subtle and confusing at first, see
+ /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
+ std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
+ struct Subject_ : public Subject<T> {
+ public:
+ Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
+ }
+
+ Subscription subscribe(ObserverPtr<T> o) {
+ scheduler_->add([=] {
+ observable_->subscribe(o);
+ });
+ return Subscription();
+ }
+
+ protected:
+ SchedulerPtr scheduler_;
+ Observable* observable_;
+ };
+
+ return folly::make_unique<Subject_>(scheduler, this);
+ }
+
+ protected:
+ std::list<ObserverPtr<T>> observers_;
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "types.h"
+#include <functional>
+#include <memory>
+#include <stdexcept>
+#include <folly/Memory.h>
+
+namespace folly { namespace wangle {
+
+template <class T> class FunctionObserver;
+
+/// Observer interface. You can subclass it, or you can just use create()
+/// to use std::functions.
+template <class T>
+struct Observer {
+ // These are what it means to be an Observer.
+ virtual void onNext(T) = 0;
+ virtual void onError(Error) = 0;
+ virtual void onCompleted() = 0;
+
+ virtual ~Observer() = default;
+
+ /// Create an Observer with std::function callbacks. Handy to make ad-hoc
+ /// Observers with lambdas.
+ ///
+ /// Templated for maximum perfect forwarding flexibility, but ultimately
+ /// whatever you pass in has to implicitly become a std::function for the
+ /// same signature as onNext(), onError(), and onCompleted() respectively.
+ /// (see the FunctionObserver typedefs)
+ template <class N, class E, class C>
+ static std::unique_ptr<Observer> create(
+ N&& onNextFn, E&& onErrorFn, C&& onCompletedFn)
+ {
+ return folly::make_unique<FunctionObserver<T>>(
+ std::forward<N>(onNextFn),
+ std::forward<E>(onErrorFn),
+ std::forward<C>(onCompletedFn));
+ }
+
+ /// Create an Observer with only onNext and onError callbacks.
+ /// onCompleted will just be a no-op.
+ template <class N, class E>
+ static std::unique_ptr<Observer> create(N&& onNextFn, E&& onErrorFn) {
+ return folly::make_unique<FunctionObserver<T>>(
+ std::forward<N>(onNextFn),
+ std::forward<E>(onErrorFn),
+ nullptr);
+ }
+
+ /// Create an Observer with only an onNext callback.
+ /// onError and onCompleted will just be no-ops.
+ template <class N>
+ static std::unique_ptr<Observer> create(N&& onNextFn) {
+ return folly::make_unique<FunctionObserver<T>>(
+ std::forward<N>(onNextFn),
+ nullptr,
+ nullptr);
+ }
+};
+
+/// An observer that uses std::function callbacks. You don't really want to
+/// make one of these directly - instead use the Observer::create() methods.
+template <class T>
+struct FunctionObserver : public Observer<T> {
+ typedef std::function<void(T)> OnNext;
+ typedef std::function<void(Error)> OnError;
+ typedef std::function<void()> OnCompleted;
+
+ /// We don't need any fancy overloads of this constructor because that's
+ /// what Observer::create() is for.
+ template <class N = OnNext, class E = OnError, class C = OnCompleted>
+ FunctionObserver(N&& n, E&& e, C&& c)
+ : onNext_(std::forward<N>(n)),
+ onError_(std::forward<E>(e)),
+ onCompleted_(std::forward<C>(c))
+ {}
+
+ void onNext(T val) override {
+ if (onNext_) onNext_(val);
+ }
+
+ void onError(Error e) override {
+ if (onError_) onError_(e);
+ }
+
+ void onCompleted() override {
+ if (onCompleted_) onCompleted_();
+ }
+
+ protected:
+ OnNext onNext_;
+ OnError onError_;
+ OnCompleted onCompleted_;
+};
+
+}}
--- /dev/null
+Rx is a pattern for "functional reactive programming" that started at
+Microsoft in C#, and has been reimplemented in various languages, notably
+RxJava for JVM languages.
+
+It is basically the plural of Futures (a la Wangle).
+
+
+ singular | plural
+ +---------------------------------+-----------------------------------
+ sync | Foo getData() | std::vector<Foo> getData()
+ async | wangle::Future<Foo> getData() | wangle::Observable<Foo> getData()
+
+
+For more on Rx, I recommend these resources:
+
+Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
+Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html
+The RxJava wiki: https://github.com/Netflix/RxJava/wiki
+Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx
+https://rx.codeplex.com/
+
+There are open source C++ implementations, I haven't looked at them. They
+might be the best way to go rather than writing it NIH-style. I mostly did it
+as an exercise, to think through how closely we might want to integrate
+something like this with Wangle, and to get a feel for how it works in C++.
+
+I haven't even tried to support move-only data in this version. I'm on the
+fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich
+set of operations is obviously missing. I haven't decided how to handle
+subscriptions (and therefore cancellation), but I'm pretty sure C#'s
+"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe
+returns nothing at all and you can't cancel anything ever. The whole thing is
+probably riddled with lifetime corner case bugs that will come out like a
+swarm of angry bees as soon as someone tries an infinite sequence, or tries to
+partially observe a long sequence. I'm pretty sure subscribeOn has a bug that
+I haven't tracked down yet.
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "Observable.h"
+#include "Observer.h"
+
+namespace folly { namespace wangle {
+
+/// Subject interface. A Subject is both an Observable and an Observer. There
+/// is a default implementation of the Observer methods that just forwards the
+/// observed events to the Subject's observers.
+template <class T>
+struct Subject : public Observable<T>, public Observer<T> {
+ void onNext(T val) override {
+ for (auto& o : this->observers_)
+ o->onNext(val);
+ }
+ void onError(Error e) override {
+ for (auto& o : this->observers_)
+ o->onError(e);
+ }
+ void onCompleted() override {
+ for (auto& o : this->observers_)
+ o->onCompleted();
+ }
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace folly { namespace wangle {
+
+// TODO
+struct Subscription {
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+
+namespace folly { namespace wangle {
+ typedef folly::exception_wrapper Error;
+ // The wangle::Executor is basically an rx Scheduler (by design). So just
+ // alias it.
+ typedef std::shared_ptr<folly::wangle::Executor> SchedulerPtr;
+
+ template <class T> struct Observable;
+ template <class T> struct Observer;
+ template <class T> struct Subject;
+
+ template <class T> using ObservablePtr = std::shared_ptr<Observable<T>>;
+ template <class T> using ObserverPtr = std::shared_ptr<Observer<T>>;
+ template <class T> using SubjectPtr = std::shared_ptr<Subject<T>>;
+}}