From: James Sedgwick Date: Tue, 23 Sep 2014 12:48:08 +0000 (-0700) Subject: move rx to folly/experimental X-Git-Tag: v0.22.0~337 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=c049b56426c5b3706715163e340bcd021fb18486;p=folly.git move rx to folly/experimental Summary: As above. I want to use this for the thread pools and it probably belongs in folly long-term anywya (if we stick with it) Test Plan: compiled the one user Reviewed By: hans@fb.com Subscribers: fugalh, mwa, jgehring, fuegen, njormrod FB internal diff: D1560578 --- diff --git a/folly/experimental/wangle/rx/Observable.h b/folly/experimental/wangle/rx/Observable.h new file mode 100644 index 00000000..4769007f --- /dev/null +++ b/folly/experimental/wangle/rx/Observable.h @@ -0,0 +1,103 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "types.h" +#include "Subject.h" +#include "Subscription.h" + +#include +#include +#include + +namespace folly { namespace wangle { + +template +struct Observable { + virtual ~Observable() = default; + + /// Subscribe the given Observer to this Observable. + // Eventually this will return a Subscription object of some kind, in order + // to support cancellation. This is kinda really important. Maybe I should + // just do it now, using an dummy Subscription object. + virtual Subscription subscribe(ObserverPtr o) { + observers_.push_back(o); + return Subscription(); + } + + /// Returns a new Observable that will call back on the given Scheduler. + /// The returned Observable must outlive the parent Observable. + + // This and subscribeOn should maybe just be a first-class feature of an + // Observable, rather than making new ones whose lifetimes are tied to their + // parents. In that case it'd return a reference to this object for + // chaining. + ObservablePtr observeOn(SchedulerPtr scheduler) { + // you're right Hannes, if we have Observable::create we don't need this + // helper class. + struct ViaSubject : public Observable + { + ViaSubject(SchedulerPtr scheduler, + Observable* obs) + : scheduler_(scheduler), observable_(obs) + {} + + Subscription subscribe(ObserverPtr o) override { + return observable_->subscribe( + Observer::create( + [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); }, + [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); }, + [=]() { scheduler_->add([o] { o->onCompleted(); }); })); + } + + protected: + SchedulerPtr scheduler_; + Observable* observable_; + }; + + return std::make_shared(scheduler, this); + } + + /// Returns a new Observable that will subscribe to this parent Observable + /// via the given Scheduler. This can be subtle and confusing at first, see + /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn + std::unique_ptr subscribeOn(SchedulerPtr scheduler) { + struct Subject_ : public Subject { + public: + Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) { + } + + Subscription subscribe(ObserverPtr o) { + scheduler_->add([=] { + observable_->subscribe(o); + }); + return Subscription(); + } + + protected: + SchedulerPtr scheduler_; + Observable* observable_; + }; + + return folly::make_unique(scheduler, this); + } + + protected: + std::list> observers_; +}; + +}} diff --git a/folly/experimental/wangle/rx/Observer.h b/folly/experimental/wangle/rx/Observer.h new file mode 100644 index 00000000..8d4bbb42 --- /dev/null +++ b/folly/experimental/wangle/rx/Observer.h @@ -0,0 +1,113 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "types.h" +#include +#include +#include +#include + +namespace folly { namespace wangle { + +template class FunctionObserver; + +/// Observer interface. You can subclass it, or you can just use create() +/// to use std::functions. +template +struct Observer { + // These are what it means to be an Observer. + virtual void onNext(T) = 0; + virtual void onError(Error) = 0; + virtual void onCompleted() = 0; + + virtual ~Observer() = default; + + /// Create an Observer with std::function callbacks. Handy to make ad-hoc + /// Observers with lambdas. + /// + /// Templated for maximum perfect forwarding flexibility, but ultimately + /// whatever you pass in has to implicitly become a std::function for the + /// same signature as onNext(), onError(), and onCompleted() respectively. + /// (see the FunctionObserver typedefs) + template + static std::unique_ptr create( + N&& onNextFn, E&& onErrorFn, C&& onCompletedFn) + { + return folly::make_unique>( + std::forward(onNextFn), + std::forward(onErrorFn), + std::forward(onCompletedFn)); + } + + /// Create an Observer with only onNext and onError callbacks. + /// onCompleted will just be a no-op. + template + static std::unique_ptr create(N&& onNextFn, E&& onErrorFn) { + return folly::make_unique>( + std::forward(onNextFn), + std::forward(onErrorFn), + nullptr); + } + + /// Create an Observer with only an onNext callback. + /// onError and onCompleted will just be no-ops. + template + static std::unique_ptr create(N&& onNextFn) { + return folly::make_unique>( + std::forward(onNextFn), + nullptr, + nullptr); + } +}; + +/// An observer that uses std::function callbacks. You don't really want to +/// make one of these directly - instead use the Observer::create() methods. +template +struct FunctionObserver : public Observer { + typedef std::function OnNext; + typedef std::function OnError; + typedef std::function OnCompleted; + + /// We don't need any fancy overloads of this constructor because that's + /// what Observer::create() is for. + template + FunctionObserver(N&& n, E&& e, C&& c) + : onNext_(std::forward(n)), + onError_(std::forward(e)), + onCompleted_(std::forward(c)) + {} + + void onNext(T val) override { + if (onNext_) onNext_(val); + } + + void onError(Error e) override { + if (onError_) onError_(e); + } + + void onCompleted() override { + if (onCompleted_) onCompleted_(); + } + + protected: + OnNext onNext_; + OnError onError_; + OnCompleted onCompleted_; +}; + +}} diff --git a/folly/experimental/wangle/rx/README b/folly/experimental/wangle/rx/README new file mode 100644 index 00000000..ee170f35 --- /dev/null +++ b/folly/experimental/wangle/rx/README @@ -0,0 +1,36 @@ +Rx is a pattern for "functional reactive programming" that started at +Microsoft in C#, and has been reimplemented in various languages, notably +RxJava for JVM languages. + +It is basically the plural of Futures (a la Wangle). + + + singular | plural + +---------------------------------+----------------------------------- + sync | Foo getData() | std::vector getData() + async | wangle::Future getData() | wangle::Observable getData() + + +For more on Rx, I recommend these resources: + +Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html +Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html +The RxJava wiki: https://github.com/Netflix/RxJava/wiki +Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx +https://rx.codeplex.com/ + +There are open source C++ implementations, I haven't looked at them. They +might be the best way to go rather than writing it NIH-style. I mostly did it +as an exercise, to think through how closely we might want to integrate +something like this with Wangle, and to get a feel for how it works in C++. + +I haven't even tried to support move-only data in this version. I'm on the +fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich +set of operations is obviously missing. I haven't decided how to handle +subscriptions (and therefore cancellation), but I'm pretty sure C#'s +"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe +returns nothing at all and you can't cancel anything ever. The whole thing is +probably riddled with lifetime corner case bugs that will come out like a +swarm of angry bees as soon as someone tries an infinite sequence, or tries to +partially observe a long sequence. I'm pretty sure subscribeOn has a bug that +I haven't tracked down yet. diff --git a/folly/experimental/wangle/rx/Subject.h b/folly/experimental/wangle/rx/Subject.h new file mode 100644 index 00000000..41b59c4e --- /dev/null +++ b/folly/experimental/wangle/rx/Subject.h @@ -0,0 +1,42 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "Observable.h" +#include "Observer.h" + +namespace folly { namespace wangle { + +/// Subject interface. A Subject is both an Observable and an Observer. There +/// is a default implementation of the Observer methods that just forwards the +/// observed events to the Subject's observers. +template +struct Subject : public Observable, public Observer { + void onNext(T val) override { + for (auto& o : this->observers_) + o->onNext(val); + } + void onError(Error e) override { + for (auto& o : this->observers_) + o->onError(e); + } + void onCompleted() override { + for (auto& o : this->observers_) + o->onCompleted(); + } +}; + +}} diff --git a/folly/experimental/wangle/rx/Subscription.h b/folly/experimental/wangle/rx/Subscription.h new file mode 100644 index 00000000..16406a23 --- /dev/null +++ b/folly/experimental/wangle/rx/Subscription.h @@ -0,0 +1,25 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace folly { namespace wangle { + +// TODO +struct Subscription { +}; + +}} diff --git a/folly/experimental/wangle/rx/types.h b/folly/experimental/wangle/rx/types.h new file mode 100644 index 00000000..54dd0099 --- /dev/null +++ b/folly/experimental/wangle/rx/types.h @@ -0,0 +1,34 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace folly { namespace wangle { + typedef folly::exception_wrapper Error; + // The wangle::Executor is basically an rx Scheduler (by design). So just + // alias it. + typedef std::shared_ptr SchedulerPtr; + + template struct Observable; + template struct Observer; + template struct Subject; + + template using ObservablePtr = std::shared_ptr>; + template using ObserverPtr = std::shared_ptr>; + template using SubjectPtr = std::shared_ptr>; +}}