make it really clear that wangle/rx is deprecated
authorHans Fugal <fugalh@fb.com>
Wed, 1 Jul 2015 17:24:18 +0000 (10:24 -0700)
committerSara Golemon <sgolemon@fb.com>
Wed, 1 Jul 2015 23:24:52 +0000 (16:24 -0700)
Summary: More people are starting to use Rx. Yay! But they're trying to use `wangle/rx`. So make sure there's no doubt about its deprecated status.

Reviewed By: @yfeldblum

Differential Revision: D2196273

20 files changed:
folly/Makefile.am
folly/wangle/concurrent/ThreadPoolExecutor.h
folly/wangle/deprecated/rx/Dummy.cpp [new file with mode: 0644]
folly/wangle/deprecated/rx/Observable.h [new file with mode: 0644]
folly/wangle/deprecated/rx/Observer.h [new file with mode: 0644]
folly/wangle/deprecated/rx/README.md [new file with mode: 0644]
folly/wangle/deprecated/rx/Subject.h [new file with mode: 0644]
folly/wangle/deprecated/rx/Subscription.h [new file with mode: 0644]
folly/wangle/deprecated/rx/test/RxBenchmark.cpp [new file with mode: 0644]
folly/wangle/deprecated/rx/test/RxTest.cpp [new file with mode: 0644]
folly/wangle/deprecated/rx/types.h [new file with mode: 0644]
folly/wangle/rx/Dummy.cpp [deleted file]
folly/wangle/rx/Observable.h [deleted file]
folly/wangle/rx/Observer.h [deleted file]
folly/wangle/rx/README.md [deleted file]
folly/wangle/rx/Subject.h [deleted file]
folly/wangle/rx/Subscription.h [deleted file]
folly/wangle/rx/test/RxBenchmark.cpp [deleted file]
folly/wangle/rx/test/RxTest.cpp [deleted file]
folly/wangle/rx/types.h [deleted file]

index c8e14e614a9d0194ddc767a0d7c433bdb8907a50..d85c052d8ceaea1a322b40c1d3f57758bf841f24 100644 (file)
@@ -308,11 +308,11 @@ nobase_follyinclude_HEADERS = \
        wangle/concurrent/NamedThreadFactory.h \
        wangle/concurrent/ThreadFactory.h \
        wangle/concurrent/ThreadPoolExecutor.h \
-       wangle/rx/Observable.h \
-       wangle/rx/Observer.h \
-       wangle/rx/Subject.h \
-       wangle/rx/Subscription.h \
-       wangle/rx/types.h \
+       wangle/deprecated/rx/Observable.h \
+       wangle/deprecated/rx/Observer.h \
+       wangle/deprecated/rx/Subject.h \
+       wangle/deprecated/rx/Subscription.h \
+       wangle/deprecated/rx/types.h \
        wangle/ssl/ClientHelloExtStats.h \
        wangle/ssl/DHParam.h \
        wangle/ssl/PasswordInFile.h \
index bb77d39c3cddde0174a20f40b392234a5f6ebcac..70a5375f63ede1aa3b2b1d061b8e724bc31f9b41 100644 (file)
@@ -18,7 +18,7 @@
 #include <folly/Executor.h>
 #include <folly/wangle/concurrent/LifoSemMPMCQueue.h>
 #include <folly/wangle/concurrent/NamedThreadFactory.h>
-#include <folly/wangle/rx/Observable.h>
+#include <folly/wangle/deprecated/rx/Observable.h>
 #include <folly/Baton.h>
 #include <folly/Memory.h>
 #include <folly/RWSpinLock.h>
diff --git a/folly/wangle/deprecated/rx/Dummy.cpp b/folly/wangle/deprecated/rx/Dummy.cpp
new file mode 100644 (file)
index 0000000..ec999ca
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// fbbuild is too dumb to know that .h files in the directory affect
+// our project, unless we have a .cpp file in the target, in the same
+// directory.
diff --git a/folly/wangle/deprecated/rx/Observable.h b/folly/wangle/deprecated/rx/Observable.h
new file mode 100644 (file)
index 0000000..bae27a0
--- /dev/null
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <folly/wangle/deprecated/rx/Subject.h>
+#include <folly/wangle/deprecated/rx/Subscription.h>
+
+#include <folly/RWSpinLock.h>
+#include <folly/SmallLocks.h>
+#include <folly/ThreadLocal.h>
+#include <folly/small_vector.h>
+#include <folly/Executor.h>
+#include <folly/Memory.h>
+#include <map>
+#include <memory>
+
+namespace folly { namespace wangle {
+
+template <class T, size_t InlineObservers>
+class Observable {
+ public:
+  Observable() : nextSubscriptionId_{1} {}
+
+  // TODO perhaps we want to provide this #5283229
+  Observable(Observable&& other) = delete;
+
+  virtual ~Observable() {
+    if (unsubscriber_) {
+      unsubscriber_->disable();
+    }
+  }
+
+  // The next three methods subscribe the given Observer to this Observable.
+  //
+  // If these are called within an Observer callback, the new observer will not
+  // get the current update but will get subsequent updates.
+  //
+  // subscribe() returns a Subscription object. The observer will continue to
+  // get updates until the Subscription is destroyed.
+  //
+  // observe(ObserverPtr<T>) creates an indefinite subscription
+  //
+  // observe(Observer<T>*) also creates an indefinite subscription, but the
+  // caller is responsible for ensuring that the given Observer outlives this
+  // Observable. This might be useful in high performance environments where
+  // allocations must be kept to a minimum. Template parameter InlineObservers
+  // specifies how many observers can been subscribed inline without any
+  // allocations (it's just the size of a folly::small_vector).
+  virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
+    return subscribeImpl(observer, false);
+  }
+
+  virtual void observe(ObserverPtr<T> observer) {
+    subscribeImpl(observer, true);
+  }
+
+  virtual void observe(Observer<T>* observer) {
+    if (inCallback_ && *inCallback_) {
+      if (!newObservers_) {
+        newObservers_.reset(new ObserverList());
+      }
+      newObservers_->push_back(observer);
+    } else {
+      RWSpinLock::WriteHolder{&observersLock_};
+      observers_.push_back(observer);
+    }
+  }
+
+  // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
+
+  /// Returns a new Observable that will call back on the given Scheduler.
+  /// The returned Observable must outlive the parent Observable.
+
+  // This and subscribeOn should maybe just be a first-class feature of an
+  // Observable, rather than making new ones whose lifetimes are tied to their
+  // parents. In that case it'd return a reference to this object for
+  // chaining.
+  ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
+    // you're right Hannes, if we have Observable::create we don't need this
+    // helper class.
+    struct ViaSubject : public Observable<T>
+    {
+      ViaSubject(SchedulerPtr sched,
+                 Observable* obs)
+        : scheduler_(sched), observable_(obs)
+      {}
+
+      Subscription<T> subscribe(ObserverPtr<T> o) override {
+        return observable_->subscribe(
+          Observer<T>::create(
+            [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
+            [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
+            [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
+      }
+
+     protected:
+      SchedulerPtr scheduler_;
+      Observable* observable_;
+    };
+
+    return std::make_shared<ViaSubject>(scheduler, this);
+  }
+
+  /// Returns a new Observable that will subscribe to this parent Observable
+  /// via the given Scheduler. This can be subtle and confusing at first, see
+  /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
+  std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
+    struct Subject_ : public Subject<T> {
+     public:
+      Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
+      }
+
+      Subscription<T> subscribe(ObserverPtr<T> o) {
+        scheduler_->add([=] {
+          observable_->subscribe(o);
+        });
+        return Subscription<T>(nullptr, 0); // TODO
+      }
+
+     protected:
+      SchedulerPtr scheduler_;
+      Observable* observable_;
+    };
+
+    return folly::make_unique<Subject_>(scheduler, this);
+  }
+
+ protected:
+  // Safely execute an operation on each observer. F must take a single
+  // Observer<T>* as its argument.
+  template <class F>
+  void forEachObserver(F f) {
+    if (UNLIKELY(!inCallback_)) {
+      inCallback_.reset(new bool{false});
+    }
+    CHECK(!(*inCallback_));
+    *inCallback_ = true;
+
+    {
+      RWSpinLock::ReadHolder rh(observersLock_);
+      for (auto o : observers_) {
+        f(o);
+      }
+
+      for (auto& kv : subscribers_) {
+        f(kv.second.get());
+      }
+    }
+
+    if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
+                 (newSubscribers_ && !newSubscribers_->empty()) ||
+                 (oldSubscribers_ && !oldSubscribers_->empty()))) {
+      {
+        RWSpinLock::WriteHolder wh(observersLock_);
+        if (newObservers_) {
+          for (auto observer : *(newObservers_)) {
+            observers_.push_back(observer);
+          }
+          newObservers_->clear();
+        }
+        if (newSubscribers_) {
+          for (auto& kv : *(newSubscribers_)) {
+            subscribers_.insert(std::move(kv));
+          }
+          newSubscribers_->clear();
+        }
+        if (oldSubscribers_) {
+          for (auto id : *(oldSubscribers_)) {
+            subscribers_.erase(id);
+          }
+          oldSubscribers_->clear();
+        }
+      }
+    }
+    *inCallback_ = false;
+  }
+
+ private:
+  Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
+    auto subscription = makeSubscription(indefinite);
+    typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
+    if (inCallback_ && *inCallback_) {
+      if (!newSubscribers_) {
+        newSubscribers_.reset(new SubscriberMap());
+      }
+      newSubscribers_->insert(std::move(kv));
+    } else {
+      RWSpinLock::WriteHolder{&observersLock_};
+      subscribers_.insert(std::move(kv));
+    }
+    return subscription;
+  }
+
+  class Unsubscriber {
+   public:
+    explicit Unsubscriber(Observable* observable) : observable_(observable) {
+      CHECK(observable_);
+    }
+
+    void unsubscribe(uint64_t id) {
+      CHECK(id > 0);
+      RWSpinLock::ReadHolder guard(lock_);
+      if (observable_) {
+        observable_->unsubscribe(id);
+      }
+    }
+
+    void disable() {
+      RWSpinLock::WriteHolder guard(lock_);
+      observable_ = nullptr;
+    }
+
+   private:
+    RWSpinLock lock_;
+    Observable* observable_;
+  };
+
+  std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
+  MicroSpinLock unsubscriberLock_{0};
+
+  friend class Subscription<T>;
+
+  void unsubscribe(uint64_t id) {
+    if (inCallback_ && *inCallback_) {
+      if (!oldSubscribers_) {
+        oldSubscribers_.reset(new std::vector<uint64_t>());
+      }
+      if (newSubscribers_) {
+        auto it = newSubscribers_->find(id);
+        if (it != newSubscribers_->end()) {
+          newSubscribers_->erase(it);
+          return;
+        }
+      }
+      oldSubscribers_->push_back(id);
+    } else {
+      RWSpinLock::WriteHolder{&observersLock_};
+      subscribers_.erase(id);
+    }
+  }
+
+  Subscription<T> makeSubscription(bool indefinite) {
+    if (indefinite) {
+      return Subscription<T>(nullptr, nextSubscriptionId_++);
+    } else {
+      if (!unsubscriber_) {
+        std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
+        if (!unsubscriber_) {
+          unsubscriber_ = std::make_shared<Unsubscriber>(this);
+        }
+      }
+      return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
+    }
+  }
+
+  std::atomic<uint64_t> nextSubscriptionId_;
+  RWSpinLock observersLock_;
+  folly::ThreadLocalPtr<bool> inCallback_;
+
+  typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
+  ObserverList observers_;
+  folly::ThreadLocalPtr<ObserverList> newObservers_;
+
+  typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
+  SubscriberMap subscribers_;
+  folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
+  folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
+};
+
+}}
diff --git a/folly/wangle/deprecated/rx/Observer.h b/folly/wangle/deprecated/rx/Observer.h
new file mode 100644 (file)
index 0000000..b671a7e
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <functional>
+#include <memory>
+#include <stdexcept>
+#include <folly/Memory.h>
+
+namespace folly { namespace wangle {
+
+template <class T> struct FunctionObserver;
+
+/// Observer interface. You can subclass it, or you can just use create()
+/// to use std::functions.
+template <class T>
+struct Observer {
+  // These are what it means to be an Observer.
+  virtual void onNext(const T&) = 0;
+  virtual void onError(Error) = 0;
+  virtual void onCompleted() = 0;
+
+  virtual ~Observer() = default;
+
+  /// Create an Observer with std::function callbacks. Handy to make ad-hoc
+  /// Observers with lambdas.
+  ///
+  /// Templated for maximum perfect forwarding flexibility, but ultimately
+  /// whatever you pass in has to implicitly become a std::function for the
+  /// same signature as onNext(), onError(), and onCompleted() respectively.
+  /// (see the FunctionObserver typedefs)
+  template <class N, class E, class C>
+  static std::unique_ptr<Observer> create(
+    N&& onNextFn, E&& onErrorFn, C&& onCompletedFn)
+  {
+    return folly::make_unique<FunctionObserver<T>>(
+      std::forward<N>(onNextFn),
+      std::forward<E>(onErrorFn),
+      std::forward<C>(onCompletedFn));
+  }
+
+  /// Create an Observer with only onNext and onError callbacks.
+  /// onCompleted will just be a no-op.
+  template <class N, class E>
+  static std::unique_ptr<Observer> create(N&& onNextFn, E&& onErrorFn) {
+    return folly::make_unique<FunctionObserver<T>>(
+      std::forward<N>(onNextFn),
+      std::forward<E>(onErrorFn),
+      nullptr);
+  }
+
+  /// Create an Observer with only an onNext callback.
+  /// onError and onCompleted will just be no-ops.
+  template <class N>
+  static std::unique_ptr<Observer> create(N&& onNextFn) {
+    return folly::make_unique<FunctionObserver<T>>(
+      std::forward<N>(onNextFn),
+      nullptr,
+      nullptr);
+  }
+};
+
+/// An observer that uses std::function callbacks. You don't really want to
+/// make one of these directly - instead use the Observer::create() methods.
+template <class T>
+struct FunctionObserver : public Observer<T> {
+  typedef std::function<void(const T&)> OnNext;
+  typedef std::function<void(Error)> OnError;
+  typedef std::function<void()> OnCompleted;
+
+  /// We don't need any fancy overloads of this constructor because that's
+  /// what Observer::create() is for.
+  template <class N = OnNext, class E = OnError, class C = OnCompleted>
+  FunctionObserver(N&& n, E&& e, C&& c)
+    : onNext_(std::forward<N>(n)),
+      onError_(std::forward<E>(e)),
+      onCompleted_(std::forward<C>(c))
+  {}
+
+  void onNext(const T& val) override {
+    if (onNext_) onNext_(val);
+  }
+
+  void onError(Error e) override {
+    if (onError_) onError_(e);
+  }
+
+  void onCompleted() override {
+    if (onCompleted_) onCompleted_();
+  }
+
+ protected:
+  OnNext onNext_;
+  OnError onError_;
+  OnCompleted onCompleted_;
+};
+
+}}
diff --git a/folly/wangle/deprecated/rx/README.md b/folly/wangle/deprecated/rx/README.md
new file mode 100644 (file)
index 0000000..ae9614f
--- /dev/null
@@ -0,0 +1,37 @@
+Rx is a pattern for "functional reactive programming" that started at
+Microsoft in C#, and has been reimplemented in various languages, notably
+RxJava for JVM languages.
+
+It is basically the plural of Futures (a la Wangle).
+
+```
+                    singular              |            plural
+        +---------------------------------+-----------------------------------
+  sync  |  Foo getData()                  |  std::vector<Foo> getData()
+  async |  wangle::Future<Foo> getData()  |  wangle::Observable<Foo> getData()
+```
+
+For more on Rx, I recommend these resources:
+
+Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
+Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html
+The RxJava wiki: https://github.com/Netflix/RxJava/wiki
+Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx
+https://rx.codeplex.com/
+
+I haven't even tried to support move-only data in this version. I'm on the
+fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich
+set of operations is obviously missing. I haven't decided how to handle
+subscriptions (and therefore cancellation), but I'm pretty sure C#'s
+"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe
+returns nothing at all and you can't cancel anything ever. The whole thing is
+probably riddled with lifetime corner case bugs that will come out like a
+swarm of angry bees as soon as someone tries an infinite sequence, or tries to
+partially observe a long sequence. I'm pretty sure subscribeOn has a bug that
+I haven't tracked down yet.
+
+DEPRECATED:
+This was an experimental exploration. There are better, more robust, and (most
+importantly) supported C++ implementations, notably
+[rxcpp](https://rxcpp.codeplex.com/). Use that instead. You really shouldn't
+use this one. It's unsupported and incomplete. Honest.
diff --git a/folly/wangle/deprecated/rx/Subject.h b/folly/wangle/deprecated/rx/Subject.h
new file mode 100644 (file)
index 0000000..08b7901
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <folly/wangle/deprecated/rx/Observable.h>
+#include <folly/wangle/deprecated/rx/Observer.h>
+
+namespace folly { namespace wangle {
+
+/// Subject interface. A Subject is both an Observable and an Observer. There
+/// is a default implementation of the Observer methods that just forwards the
+/// observed events to the Subject's observers.
+template <class T>
+struct Subject : public Observable<T>, public Observer<T> {
+  void onNext(const T& val) override {
+    this->forEachObserver([&](Observer<T>* o){
+      o->onNext(val);
+    });
+  }
+  void onError(Error e) override {
+    this->forEachObserver([&](Observer<T>* o){
+      o->onError(e);
+    });
+  }
+  void onCompleted() override {
+    this->forEachObserver([](Observer<T>* o){
+      o->onCompleted();
+    });
+  }
+};
+
+}}
diff --git a/folly/wangle/deprecated/rx/Subscription.h b/folly/wangle/deprecated/rx/Subscription.h
new file mode 100644 (file)
index 0000000..de8e235
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/deprecated/rx/types.h> // must come first
+#include <folly/wangle/deprecated/rx/Observable.h>
+
+namespace folly { namespace wangle {
+
+template <class T>
+class Subscription {
+ public:
+  Subscription() = default;
+
+  Subscription(const Subscription&) = delete;
+
+  Subscription(Subscription&& other) noexcept {
+    *this = std::move(other);
+  }
+
+  Subscription& operator=(Subscription&& other) noexcept {
+    unsubscribe();
+    unsubscriber_ = std::move(other.unsubscriber_);
+    id_ = other.id_;
+    other.unsubscriber_ = nullptr;
+    other.id_ = 0;
+    return *this;
+  }
+
+  ~Subscription() {
+    unsubscribe();
+  }
+
+ private:
+  typedef typename Observable<T>::Unsubscriber Unsubscriber;
+
+  Subscription(std::shared_ptr<Unsubscriber> unsubscriber, uint64_t id)
+    : unsubscriber_(std::move(unsubscriber)), id_(id) {
+    CHECK(id_ > 0);
+  }
+
+  void unsubscribe() {
+    if (unsubscriber_ && id_ > 0) {
+      unsubscriber_->unsubscribe(id_);
+      id_ = 0;
+      unsubscriber_ = nullptr;
+    }
+  }
+
+  std::shared_ptr<Unsubscriber> unsubscriber_;
+  uint64_t id_{0};
+
+  friend class Observable<T>;
+};
+
+}}
diff --git a/folly/wangle/deprecated/rx/test/RxBenchmark.cpp b/folly/wangle/deprecated/rx/test/RxBenchmark.cpp
new file mode 100644 (file)
index 0000000..88e9e1b
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/Benchmark.h>
+#include <folly/wangle/deprecated/rx/Observer.h>
+#include <folly/wangle/deprecated/rx/Subject.h>
+#include <gflags/gflags.h>
+
+using namespace folly::wangle;
+using folly::BenchmarkSuspender;
+
+static std::unique_ptr<Observer<int>> makeObserver() {
+  return Observer<int>::create([&] (int x) {});
+}
+
+void subscribeImpl(uint iters, int N, bool countUnsubscribe) {
+  for (uint iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<std::unique_ptr<Observer<int>>> observers;
+    std::vector<Subscription<int>> subscriptions;
+    subscriptions.reserve(N);
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver());
+    }
+    bs.dismiss();
+    for (int i = 0; i < N; i++) {
+      subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+    }
+    if (countUnsubscribe) {
+      subscriptions.clear();
+    }
+    bs.rehire();
+  }
+}
+
+void subscribeAndUnsubscribe(uint iters, int N) {
+  subscribeImpl(iters, N, true);
+}
+
+void subscribe(uint iters, int N) {
+  subscribeImpl(iters, N, false);
+}
+
+void observe(uint iters, int N) {
+  for (uint iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<std::unique_ptr<Observer<int>>> observers;
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver());
+    }
+    bs.dismiss();
+    for (int i = 0; i < N; i++) {
+      subject.observe(std::move(observers[i]));
+    }
+    bs.rehire();
+  }
+}
+
+void inlineObserve(uint iters, int N) {
+  for (uint iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<Observer<int>*> observers;
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver().release());
+    }
+    bs.dismiss();
+    for (int i = 0; i < N; i++) {
+      subject.observe(observers[i]);
+    }
+    bs.rehire();
+    for (int i = 0; i < N; i++) {
+      delete observers[i];
+    }
+  }
+}
+
+void notifySubscribers(uint iters, int N) {
+  for (uint iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<std::unique_ptr<Observer<int>>> observers;
+    std::vector<Subscription<int>> subscriptions;
+    subscriptions.reserve(N);
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver());
+    }
+    for (int i = 0; i < N; i++) {
+      subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+    }
+    bs.dismiss();
+    subject.onNext(42);
+    bs.rehire();
+  }
+}
+
+void notifyInlineObservers(uint iters, int N) {
+  for (uint iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<Observer<int>*> observers;
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver().release());
+    }
+    for (int i = 0; i < N; i++) {
+      subject.observe(observers[i]);
+    }
+    bs.dismiss();
+    subject.onNext(42);
+    bs.rehire();
+  }
+}
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1);
+BENCHMARK_RELATIVE_PARAM(observe, 1);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(observe, 1000);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1000);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000);
+
+int main(int argc, char** argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  folly::runBenchmarks();
+  return 0;
+}
diff --git a/folly/wangle/deprecated/rx/test/RxTest.cpp b/folly/wangle/deprecated/rx/test/RxTest.cpp
new file mode 100644 (file)
index 0000000..e75ddde
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/wangle/deprecated/rx/Observer.h>
+#include <folly/wangle/deprecated/rx/Subject.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+
+static std::unique_ptr<Observer<int>> incrementer(int& counter) {
+  return Observer<int>::create([&] (int x) {
+    counter++;
+  });
+}
+
+TEST(RxTest, Observe) {
+  Subject<int> subject;
+  auto count = 0;
+  subject.observe(incrementer(count));
+  subject.onNext(1);
+  EXPECT_EQ(1, count);
+}
+
+TEST(RxTest, ObserveInline) {
+  Subject<int> subject;
+  auto count = 0;
+  auto o = incrementer(count).release();
+  subject.observe(o);
+  subject.onNext(1);
+  EXPECT_EQ(1, count);
+  delete o;
+}
+
+TEST(RxTest, Subscription) {
+  Subject<int> subject;
+  auto count = 0;
+  {
+    auto s = subject.subscribe(incrementer(count));
+    subject.onNext(1);
+  }
+  // The subscription has gone out of scope so no one should get this.
+  subject.onNext(2);
+  EXPECT_EQ(1, count);
+}
+
+TEST(RxTest, SubscriptionMove) {
+  Subject<int> subject;
+  auto count = 0;
+  auto s = subject.subscribe(incrementer(count));
+  auto s2 = subject.subscribe(incrementer(count));
+  s2 = std::move(s);
+  subject.onNext(1);
+  Subscription<int> s3(std::move(s2));
+  subject.onNext(2);
+  EXPECT_EQ(2, count);
+}
+
+TEST(RxTest, SubscriptionOutlivesSubject) {
+  Subscription<int> s;
+  {
+    Subject<int> subject;
+    s = subject.subscribe(Observer<int>::create([](int){}));
+  }
+  // Don't explode when s is destroyed
+}
+
+TEST(RxTest, SubscribeDuringCallback) {
+  // A subscriber who was subscribed in the course of a callback should get
+  // subsequent updates but not the current update.
+  Subject<int> subject;
+  int outerCount = 0, innerCount = 0;
+  Subscription<int> s1, s2;
+  s1 = subject.subscribe(Observer<int>::create([&] (int x) {
+    outerCount++;
+    s2 = subject.subscribe(incrementer(innerCount));
+  }));
+  subject.onNext(42);
+  subject.onNext(0xDEADBEEF);
+  EXPECT_EQ(2, outerCount);
+  EXPECT_EQ(1, innerCount);
+}
+
+TEST(RxTest, ObserveDuringCallback) {
+  Subject<int> subject;
+  int outerCount = 0, innerCount = 0;
+  subject.observe(Observer<int>::create([&] (int x) {
+    outerCount++;
+    subject.observe(incrementer(innerCount));
+  }));
+  subject.onNext(42);
+  subject.onNext(0xDEADBEEF);
+  EXPECT_EQ(2, outerCount);
+  EXPECT_EQ(1, innerCount);
+}
+
+TEST(RxTest, ObserveInlineDuringCallback) {
+  Subject<int> subject;
+  int outerCount = 0, innerCount = 0;
+  auto innerO = incrementer(innerCount).release();
+  auto outerO = Observer<int>::create([&] (int x) {
+    outerCount++;
+    subject.observe(innerO);
+  }).release();
+  subject.observe(outerO);
+  subject.onNext(42);
+  subject.onNext(0xDEADBEEF);
+  EXPECT_EQ(2, outerCount);
+  EXPECT_EQ(1, innerCount);
+  delete innerO;
+  delete outerO;
+}
+
+TEST(RxTest, UnsubscribeDuringCallback) {
+  // A subscriber who was unsubscribed in the course of a callback should get
+  // the current update but not subsequent ones
+  Subject<int> subject;
+  int count1 = 0, count2 = 0;
+  auto s1 = subject.subscribe(incrementer(count1));
+  auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
+    count2++;
+    s1.~Subscription();
+  }));
+  subject.onNext(1);
+  subject.onNext(2);
+  EXPECT_EQ(1, count1);
+  EXPECT_EQ(2, count2);
+}
+
+TEST(RxTest, SubscribeUnsubscribeDuringCallback) {
+  // A subscriber who was subscribed and unsubscribed in the course of a
+  // callback should not get any updates
+  Subject<int> subject;
+  int outerCount = 0, innerCount = 0;
+  auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
+    outerCount++;
+    auto s2 = subject.subscribe(incrementer(innerCount));
+  }));
+  subject.onNext(1);
+  subject.onNext(2);
+  EXPECT_EQ(2, outerCount);
+  EXPECT_EQ(0, innerCount);
+}
+
+// Move only type
+typedef std::unique_ptr<int> MO;
+static MO makeMO() { return folly::make_unique<int>(1); }
+template <typename T>
+static ObserverPtr<T> makeMOObserver() {
+  return Observer<T>::create([](const T& mo) {
+    EXPECT_EQ(1, *mo);
+  });
+}
+
+TEST(RxTest, MoveOnlyRvalue) {
+  Subject<MO> subject;
+  auto s1 = subject.subscribe(makeMOObserver<MO>());
+  auto s2 = subject.subscribe(makeMOObserver<MO>());
+  auto mo = makeMO();
+  // Can't bind lvalues to rvalue references
+  // subject.onNext(mo);
+  subject.onNext(std::move(mo));
+  subject.onNext(makeMO());
+}
+
+// Copy only type
+struct CO {
+  CO() = default;
+  CO(const CO&) = default;
+  CO(CO&&) = delete;
+};
+
+template <typename T>
+static ObserverPtr<T> makeCOObserver() {
+  return Observer<T>::create([](const T& mo) {});
+}
+
+TEST(RxTest, CopyOnly) {
+  Subject<CO> subject;
+  auto s1 = subject.subscribe(makeCOObserver<CO>());
+  CO co;
+  subject.onNext(co);
+}
diff --git a/folly/wangle/deprecated/rx/types.h b/folly/wangle/deprecated/rx/types.h
new file mode 100644 (file)
index 0000000..3bb540e
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Executor.h>
+
+namespace folly { namespace wangle {
+  typedef folly::exception_wrapper Error;
+  // The Executor is basically an rx Scheduler (by design). So just
+  // alias it.
+  typedef std::shared_ptr<folly::Executor> SchedulerPtr;
+
+  template <class T, size_t InlineObservers = 3> class Observable;
+  template <class T> struct Observer;
+  template <class T> struct Subject;
+
+  template <class T> using ObservablePtr = std::shared_ptr<Observable<T>>;
+  template <class T> using ObserverPtr = std::shared_ptr<Observer<T>>;
+  template <class T> using SubjectPtr = std::shared_ptr<Subject<T>>;
+}}
diff --git a/folly/wangle/rx/Dummy.cpp b/folly/wangle/rx/Dummy.cpp
deleted file mode 100644 (file)
index ec999ca..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// fbbuild is too dumb to know that .h files in the directory affect
-// our project, unless we have a .cpp file in the target, in the same
-// directory.
diff --git a/folly/wangle/rx/Observable.h b/folly/wangle/rx/Observable.h
deleted file mode 100644 (file)
index 95b60bf..0000000
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <folly/wangle/rx/Subject.h>
-#include <folly/wangle/rx/Subscription.h>
-
-#include <folly/RWSpinLock.h>
-#include <folly/SmallLocks.h>
-#include <folly/ThreadLocal.h>
-#include <folly/small_vector.h>
-#include <folly/Executor.h>
-#include <folly/Memory.h>
-#include <map>
-#include <memory>
-
-namespace folly { namespace wangle {
-
-template <class T, size_t InlineObservers>
-class Observable {
- public:
-  Observable() : nextSubscriptionId_{1} {}
-
-  // TODO perhaps we want to provide this #5283229
-  Observable(Observable&& other) = delete;
-
-  virtual ~Observable() {
-    if (unsubscriber_) {
-      unsubscriber_->disable();
-    }
-  }
-
-  // The next three methods subscribe the given Observer to this Observable.
-  //
-  // If these are called within an Observer callback, the new observer will not
-  // get the current update but will get subsequent updates.
-  //
-  // subscribe() returns a Subscription object. The observer will continue to
-  // get updates until the Subscription is destroyed.
-  //
-  // observe(ObserverPtr<T>) creates an indefinite subscription
-  //
-  // observe(Observer<T>*) also creates an indefinite subscription, but the
-  // caller is responsible for ensuring that the given Observer outlives this
-  // Observable. This might be useful in high performance environments where
-  // allocations must be kept to a minimum. Template parameter InlineObservers
-  // specifies how many observers can been subscribed inline without any
-  // allocations (it's just the size of a folly::small_vector).
-  virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
-    return subscribeImpl(observer, false);
-  }
-
-  virtual void observe(ObserverPtr<T> observer) {
-    subscribeImpl(observer, true);
-  }
-
-  virtual void observe(Observer<T>* observer) {
-    if (inCallback_ && *inCallback_) {
-      if (!newObservers_) {
-        newObservers_.reset(new ObserverList());
-      }
-      newObservers_->push_back(observer);
-    } else {
-      RWSpinLock::WriteHolder{&observersLock_};
-      observers_.push_back(observer);
-    }
-  }
-
-  // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
-
-  /// Returns a new Observable that will call back on the given Scheduler.
-  /// The returned Observable must outlive the parent Observable.
-
-  // This and subscribeOn should maybe just be a first-class feature of an
-  // Observable, rather than making new ones whose lifetimes are tied to their
-  // parents. In that case it'd return a reference to this object for
-  // chaining.
-  ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
-    // you're right Hannes, if we have Observable::create we don't need this
-    // helper class.
-    struct ViaSubject : public Observable<T>
-    {
-      ViaSubject(SchedulerPtr sched,
-                 Observable* obs)
-        : scheduler_(sched), observable_(obs)
-      {}
-
-      Subscription<T> subscribe(ObserverPtr<T> o) override {
-        return observable_->subscribe(
-          Observer<T>::create(
-            [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
-            [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
-            [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
-      }
-
-     protected:
-      SchedulerPtr scheduler_;
-      Observable* observable_;
-    };
-
-    return std::make_shared<ViaSubject>(scheduler, this);
-  }
-
-  /// Returns a new Observable that will subscribe to this parent Observable
-  /// via the given Scheduler. This can be subtle and confusing at first, see
-  /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
-  std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
-    struct Subject_ : public Subject<T> {
-     public:
-      Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
-      }
-
-      Subscription<T> subscribe(ObserverPtr<T> o) {
-        scheduler_->add([=] {
-          observable_->subscribe(o);
-        });
-        return Subscription<T>(nullptr, 0); // TODO
-      }
-
-     protected:
-      SchedulerPtr scheduler_;
-      Observable* observable_;
-    };
-
-    return folly::make_unique<Subject_>(scheduler, this);
-  }
-
- protected:
-  // Safely execute an operation on each observer. F must take a single
-  // Observer<T>* as its argument.
-  template <class F>
-  void forEachObserver(F f) {
-    if (UNLIKELY(!inCallback_)) {
-      inCallback_.reset(new bool{false});
-    }
-    CHECK(!(*inCallback_));
-    *inCallback_ = true;
-
-    {
-      RWSpinLock::ReadHolder rh(observersLock_);
-      for (auto o : observers_) {
-        f(o);
-      }
-
-      for (auto& kv : subscribers_) {
-        f(kv.second.get());
-      }
-    }
-
-    if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
-                 (newSubscribers_ && !newSubscribers_->empty()) ||
-                 (oldSubscribers_ && !oldSubscribers_->empty()))) {
-      {
-        RWSpinLock::WriteHolder wh(observersLock_);
-        if (newObservers_) {
-          for (auto observer : *(newObservers_)) {
-            observers_.push_back(observer);
-          }
-          newObservers_->clear();
-        }
-        if (newSubscribers_) {
-          for (auto& kv : *(newSubscribers_)) {
-            subscribers_.insert(std::move(kv));
-          }
-          newSubscribers_->clear();
-        }
-        if (oldSubscribers_) {
-          for (auto id : *(oldSubscribers_)) {
-            subscribers_.erase(id);
-          }
-          oldSubscribers_->clear();
-        }
-      }
-    }
-    *inCallback_ = false;
-  }
-
- private:
-  Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
-    auto subscription = makeSubscription(indefinite);
-    typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
-    if (inCallback_ && *inCallback_) {
-      if (!newSubscribers_) {
-        newSubscribers_.reset(new SubscriberMap());
-      }
-      newSubscribers_->insert(std::move(kv));
-    } else {
-      RWSpinLock::WriteHolder{&observersLock_};
-      subscribers_.insert(std::move(kv));
-    }
-    return subscription;
-  }
-
-  class Unsubscriber {
-   public:
-    explicit Unsubscriber(Observable* observable) : observable_(observable) {
-      CHECK(observable_);
-    }
-
-    void unsubscribe(uint64_t id) {
-      CHECK(id > 0);
-      RWSpinLock::ReadHolder guard(lock_);
-      if (observable_) {
-        observable_->unsubscribe(id);
-      }
-    }
-
-    void disable() {
-      RWSpinLock::WriteHolder guard(lock_);
-      observable_ = nullptr;
-    }
-
-   private:
-    RWSpinLock lock_;
-    Observable* observable_;
-  };
-
-  std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
-  MicroSpinLock unsubscriberLock_{0};
-
-  friend class Subscription<T>;
-
-  void unsubscribe(uint64_t id) {
-    if (inCallback_ && *inCallback_) {
-      if (!oldSubscribers_) {
-        oldSubscribers_.reset(new std::vector<uint64_t>());
-      }
-      if (newSubscribers_) {
-        auto it = newSubscribers_->find(id);
-        if (it != newSubscribers_->end()) {
-          newSubscribers_->erase(it);
-          return;
-        }
-      }
-      oldSubscribers_->push_back(id);
-    } else {
-      RWSpinLock::WriteHolder{&observersLock_};
-      subscribers_.erase(id);
-    }
-  }
-
-  Subscription<T> makeSubscription(bool indefinite) {
-    if (indefinite) {
-      return Subscription<T>(nullptr, nextSubscriptionId_++);
-    } else {
-      if (!unsubscriber_) {
-        std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
-        if (!unsubscriber_) {
-          unsubscriber_ = std::make_shared<Unsubscriber>(this);
-        }
-      }
-      return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
-    }
-  }
-
-  std::atomic<uint64_t> nextSubscriptionId_;
-  RWSpinLock observersLock_;
-  folly::ThreadLocalPtr<bool> inCallback_;
-
-  typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
-  ObserverList observers_;
-  folly::ThreadLocalPtr<ObserverList> newObservers_;
-
-  typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
-  SubscriberMap subscribers_;
-  folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
-  folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
-};
-
-}}
diff --git a/folly/wangle/rx/Observer.h b/folly/wangle/rx/Observer.h
deleted file mode 100644 (file)
index 5797a0c..0000000
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <functional>
-#include <memory>
-#include <stdexcept>
-#include <folly/Memory.h>
-
-namespace folly { namespace wangle {
-
-template <class T> struct FunctionObserver;
-
-/// Observer interface. You can subclass it, or you can just use create()
-/// to use std::functions.
-template <class T>
-struct Observer {
-  // These are what it means to be an Observer.
-  virtual void onNext(const T&) = 0;
-  virtual void onError(Error) = 0;
-  virtual void onCompleted() = 0;
-
-  virtual ~Observer() = default;
-
-  /// Create an Observer with std::function callbacks. Handy to make ad-hoc
-  /// Observers with lambdas.
-  ///
-  /// Templated for maximum perfect forwarding flexibility, but ultimately
-  /// whatever you pass in has to implicitly become a std::function for the
-  /// same signature as onNext(), onError(), and onCompleted() respectively.
-  /// (see the FunctionObserver typedefs)
-  template <class N, class E, class C>
-  static std::unique_ptr<Observer> create(
-    N&& onNextFn, E&& onErrorFn, C&& onCompletedFn)
-  {
-    return folly::make_unique<FunctionObserver<T>>(
-      std::forward<N>(onNextFn),
-      std::forward<E>(onErrorFn),
-      std::forward<C>(onCompletedFn));
-  }
-
-  /// Create an Observer with only onNext and onError callbacks.
-  /// onCompleted will just be a no-op.
-  template <class N, class E>
-  static std::unique_ptr<Observer> create(N&& onNextFn, E&& onErrorFn) {
-    return folly::make_unique<FunctionObserver<T>>(
-      std::forward<N>(onNextFn),
-      std::forward<E>(onErrorFn),
-      nullptr);
-  }
-
-  /// Create an Observer with only an onNext callback.
-  /// onError and onCompleted will just be no-ops.
-  template <class N>
-  static std::unique_ptr<Observer> create(N&& onNextFn) {
-    return folly::make_unique<FunctionObserver<T>>(
-      std::forward<N>(onNextFn),
-      nullptr,
-      nullptr);
-  }
-};
-
-/// An observer that uses std::function callbacks. You don't really want to
-/// make one of these directly - instead use the Observer::create() methods.
-template <class T>
-struct FunctionObserver : public Observer<T> {
-  typedef std::function<void(const T&)> OnNext;
-  typedef std::function<void(Error)> OnError;
-  typedef std::function<void()> OnCompleted;
-
-  /// We don't need any fancy overloads of this constructor because that's
-  /// what Observer::create() is for.
-  template <class N = OnNext, class E = OnError, class C = OnCompleted>
-  FunctionObserver(N&& n, E&& e, C&& c)
-    : onNext_(std::forward<N>(n)),
-      onError_(std::forward<E>(e)),
-      onCompleted_(std::forward<C>(c))
-  {}
-
-  void onNext(const T& val) override {
-    if (onNext_) onNext_(val);
-  }
-
-  void onError(Error e) override {
-    if (onError_) onError_(e);
-  }
-
-  void onCompleted() override {
-    if (onCompleted_) onCompleted_();
-  }
-
- protected:
-  OnNext onNext_;
-  OnError onError_;
-  OnCompleted onCompleted_;
-};
-
-}}
diff --git a/folly/wangle/rx/README.md b/folly/wangle/rx/README.md
deleted file mode 100644 (file)
index 8c31ae0..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-Rx is a pattern for "functional reactive programming" that started at
-Microsoft in C#, and has been reimplemented in various languages, notably
-RxJava for JVM languages.
-
-It is basically the plural of Futures (a la Wangle).
-
-```
-                    singular              |            plural
-        +---------------------------------+-----------------------------------
-  sync  |  Foo getData()                  |  std::vector<Foo> getData()
-  async |  wangle::Future<Foo> getData()  |  wangle::Observable<Foo> getData()
-```
-
-For more on Rx, I recommend these resources:
-
-- Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
-- Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html
-- The RxJava wiki: https://github.com/Netflix/RxJava/wiki
-- Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx
-- https://rx.codeplex.com/
-
-There are open source C++ implementations, I haven't looked at them. They
-might be the best way to go rather than writing it NIH-style. I mostly did it
-as an exercise, to think through how closely we might want to integrate
-something like this with Wangle, and to get a feel for how it works in C++.
-
-I haven't even tried to support move-only data in this version. I'm on the
-fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich
-set of operations is obviously missing. I haven't decided how to handle
-subscriptions (and therefore cancellation), but I'm pretty sure C#'s
-"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe
-returns nothing at all and you can't cancel anything ever. The whole thing is
-probably riddled with lifetime corner case bugs that will come out like a
-swarm of angry bees as soon as someone tries an infinite sequence, or tries to
-partially observe a long sequence. I'm pretty sure subscribeOn has a bug that
-I haven't tracked down yet.
diff --git a/folly/wangle/rx/Subject.h b/folly/wangle/rx/Subject.h
deleted file mode 100644 (file)
index c806d70..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <folly/wangle/rx/Observable.h>
-#include <folly/wangle/rx/Observer.h>
-
-namespace folly { namespace wangle {
-
-/// Subject interface. A Subject is both an Observable and an Observer. There
-/// is a default implementation of the Observer methods that just forwards the
-/// observed events to the Subject's observers.
-template <class T>
-struct Subject : public Observable<T>, public Observer<T> {
-  void onNext(const T& val) override {
-    this->forEachObserver([&](Observer<T>* o){
-      o->onNext(val);
-    });
-  }
-  void onError(Error e) override {
-    this->forEachObserver([&](Observer<T>* o){
-      o->onError(e);
-    });
-  }
-  void onCompleted() override {
-    this->forEachObserver([](Observer<T>* o){
-      o->onCompleted();
-    });
-  }
-};
-
-}}
diff --git a/folly/wangle/rx/Subscription.h b/folly/wangle/rx/Subscription.h
deleted file mode 100644 (file)
index 09af8c9..0000000
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/rx/types.h> // must come first
-#include <folly/wangle/rx/Observable.h>
-
-namespace folly { namespace wangle {
-
-template <class T>
-class Subscription {
- public:
-  Subscription() = default;
-
-  Subscription(const Subscription&) = delete;
-
-  Subscription(Subscription&& other) noexcept {
-    *this = std::move(other);
-  }
-
-  Subscription& operator=(Subscription&& other) noexcept {
-    unsubscribe();
-    unsubscriber_ = std::move(other.unsubscriber_);
-    id_ = other.id_;
-    other.unsubscriber_ = nullptr;
-    other.id_ = 0;
-    return *this;
-  }
-
-  ~Subscription() {
-    unsubscribe();
-  }
-
- private:
-  typedef typename Observable<T>::Unsubscriber Unsubscriber;
-
-  Subscription(std::shared_ptr<Unsubscriber> unsubscriber, uint64_t id)
-    : unsubscriber_(std::move(unsubscriber)), id_(id) {
-    CHECK(id_ > 0);
-  }
-
-  void unsubscribe() {
-    if (unsubscriber_ && id_ > 0) {
-      unsubscriber_->unsubscribe(id_);
-      id_ = 0;
-      unsubscriber_ = nullptr;
-    }
-  }
-
-  std::shared_ptr<Unsubscriber> unsubscriber_;
-  uint64_t id_{0};
-
-  friend class Observable<T>;
-};
-
-}}
diff --git a/folly/wangle/rx/test/RxBenchmark.cpp b/folly/wangle/rx/test/RxBenchmark.cpp
deleted file mode 100644 (file)
index 4e17494..0000000
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/Benchmark.h>
-#include <folly/wangle/rx/Observer.h>
-#include <folly/wangle/rx/Subject.h>
-#include <gflags/gflags.h>
-
-using namespace folly::wangle;
-using folly::BenchmarkSuspender;
-
-static std::unique_ptr<Observer<int>> makeObserver() {
-  return Observer<int>::create([&] (int x) {});
-}
-
-void subscribeImpl(uint iters, int N, bool countUnsubscribe) {
-  for (uint iter = 0; iter < iters; iter++) {
-    BenchmarkSuspender bs;
-    Subject<int> subject;
-    std::vector<std::unique_ptr<Observer<int>>> observers;
-    std::vector<Subscription<int>> subscriptions;
-    subscriptions.reserve(N);
-    for (int i = 0; i < N; i++) {
-      observers.push_back(makeObserver());
-    }
-    bs.dismiss();
-    for (int i = 0; i < N; i++) {
-      subscriptions.push_back(subject.subscribe(std::move(observers[i])));
-    }
-    if (countUnsubscribe) {
-      subscriptions.clear();
-    }
-    bs.rehire();
-  }
-}
-
-void subscribeAndUnsubscribe(uint iters, int N) {
-  subscribeImpl(iters, N, true);
-}
-
-void subscribe(uint iters, int N) {
-  subscribeImpl(iters, N, false);
-}
-
-void observe(uint iters, int N) {
-  for (uint iter = 0; iter < iters; iter++) {
-    BenchmarkSuspender bs;
-    Subject<int> subject;
-    std::vector<std::unique_ptr<Observer<int>>> observers;
-    for (int i = 0; i < N; i++) {
-      observers.push_back(makeObserver());
-    }
-    bs.dismiss();
-    for (int i = 0; i < N; i++) {
-      subject.observe(std::move(observers[i]));
-    }
-    bs.rehire();
-  }
-}
-
-void inlineObserve(uint iters, int N) {
-  for (uint iter = 0; iter < iters; iter++) {
-    BenchmarkSuspender bs;
-    Subject<int> subject;
-    std::vector<Observer<int>*> observers;
-    for (int i = 0; i < N; i++) {
-      observers.push_back(makeObserver().release());
-    }
-    bs.dismiss();
-    for (int i = 0; i < N; i++) {
-      subject.observe(observers[i]);
-    }
-    bs.rehire();
-    for (int i = 0; i < N; i++) {
-      delete observers[i];
-    }
-  }
-}
-
-void notifySubscribers(uint iters, int N) {
-  for (uint iter = 0; iter < iters; iter++) {
-    BenchmarkSuspender bs;
-    Subject<int> subject;
-    std::vector<std::unique_ptr<Observer<int>>> observers;
-    std::vector<Subscription<int>> subscriptions;
-    subscriptions.reserve(N);
-    for (int i = 0; i < N; i++) {
-      observers.push_back(makeObserver());
-    }
-    for (int i = 0; i < N; i++) {
-      subscriptions.push_back(subject.subscribe(std::move(observers[i])));
-    }
-    bs.dismiss();
-    subject.onNext(42);
-    bs.rehire();
-  }
-}
-
-void notifyInlineObservers(uint iters, int N) {
-  for (uint iter = 0; iter < iters; iter++) {
-    BenchmarkSuspender bs;
-    Subject<int> subject;
-    std::vector<Observer<int>*> observers;
-    for (int i = 0; i < N; i++) {
-      observers.push_back(makeObserver().release());
-    }
-    for (int i = 0; i < N; i++) {
-      subject.observe(observers[i]);
-    }
-    bs.dismiss();
-    subject.onNext(42);
-    bs.rehire();
-  }
-}
-
-BENCHMARK_PARAM(subscribeAndUnsubscribe, 1);
-BENCHMARK_RELATIVE_PARAM(subscribe, 1);
-BENCHMARK_RELATIVE_PARAM(observe, 1);
-BENCHMARK_RELATIVE_PARAM(inlineObserve, 1);
-
-BENCHMARK_DRAW_LINE();
-
-BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000);
-BENCHMARK_RELATIVE_PARAM(subscribe, 1000);
-BENCHMARK_RELATIVE_PARAM(observe, 1000);
-BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000);
-
-BENCHMARK_DRAW_LINE();
-
-BENCHMARK_PARAM(notifySubscribers, 1);
-BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1);
-
-BENCHMARK_DRAW_LINE();
-
-BENCHMARK_PARAM(notifySubscribers, 1000);
-BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000);
-
-int main(int argc, char** argv) {
-  gflags::ParseCommandLineFlags(&argc, &argv, true);
-  folly::runBenchmarks();
-  return 0;
-}
diff --git a/folly/wangle/rx/test/RxTest.cpp b/folly/wangle/rx/test/RxTest.cpp
deleted file mode 100644 (file)
index 012a8c2..0000000
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/wangle/rx/Observer.h>
-#include <folly/wangle/rx/Subject.h>
-#include <gtest/gtest.h>
-
-using namespace folly::wangle;
-
-static std::unique_ptr<Observer<int>> incrementer(int& counter) {
-  return Observer<int>::create([&] (int x) {
-    counter++;
-  });
-}
-
-TEST(RxTest, Observe) {
-  Subject<int> subject;
-  auto count = 0;
-  subject.observe(incrementer(count));
-  subject.onNext(1);
-  EXPECT_EQ(1, count);
-}
-
-TEST(RxTest, ObserveInline) {
-  Subject<int> subject;
-  auto count = 0;
-  auto o = incrementer(count).release();
-  subject.observe(o);
-  subject.onNext(1);
-  EXPECT_EQ(1, count);
-  delete o;
-}
-
-TEST(RxTest, Subscription) {
-  Subject<int> subject;
-  auto count = 0;
-  {
-    auto s = subject.subscribe(incrementer(count));
-    subject.onNext(1);
-  }
-  // The subscription has gone out of scope so no one should get this.
-  subject.onNext(2);
-  EXPECT_EQ(1, count);
-}
-
-TEST(RxTest, SubscriptionMove) {
-  Subject<int> subject;
-  auto count = 0;
-  auto s = subject.subscribe(incrementer(count));
-  auto s2 = subject.subscribe(incrementer(count));
-  s2 = std::move(s);
-  subject.onNext(1);
-  Subscription<int> s3(std::move(s2));
-  subject.onNext(2);
-  EXPECT_EQ(2, count);
-}
-
-TEST(RxTest, SubscriptionOutlivesSubject) {
-  Subscription<int> s;
-  {
-    Subject<int> subject;
-    s = subject.subscribe(Observer<int>::create([](int){}));
-  }
-  // Don't explode when s is destroyed
-}
-
-TEST(RxTest, SubscribeDuringCallback) {
-  // A subscriber who was subscribed in the course of a callback should get
-  // subsequent updates but not the current update.
-  Subject<int> subject;
-  int outerCount = 0, innerCount = 0;
-  Subscription<int> s1, s2;
-  s1 = subject.subscribe(Observer<int>::create([&] (int x) {
-    outerCount++;
-    s2 = subject.subscribe(incrementer(innerCount));
-  }));
-  subject.onNext(42);
-  subject.onNext(0xDEADBEEF);
-  EXPECT_EQ(2, outerCount);
-  EXPECT_EQ(1, innerCount);
-}
-
-TEST(RxTest, ObserveDuringCallback) {
-  Subject<int> subject;
-  int outerCount = 0, innerCount = 0;
-  subject.observe(Observer<int>::create([&] (int x) {
-    outerCount++;
-    subject.observe(incrementer(innerCount));
-  }));
-  subject.onNext(42);
-  subject.onNext(0xDEADBEEF);
-  EXPECT_EQ(2, outerCount);
-  EXPECT_EQ(1, innerCount);
-}
-
-TEST(RxTest, ObserveInlineDuringCallback) {
-  Subject<int> subject;
-  int outerCount = 0, innerCount = 0;
-  auto innerO = incrementer(innerCount).release();
-  auto outerO = Observer<int>::create([&] (int x) {
-    outerCount++;
-    subject.observe(innerO);
-  }).release();
-  subject.observe(outerO);
-  subject.onNext(42);
-  subject.onNext(0xDEADBEEF);
-  EXPECT_EQ(2, outerCount);
-  EXPECT_EQ(1, innerCount);
-  delete innerO;
-  delete outerO;
-}
-
-TEST(RxTest, UnsubscribeDuringCallback) {
-  // A subscriber who was unsubscribed in the course of a callback should get
-  // the current update but not subsequent ones
-  Subject<int> subject;
-  int count1 = 0, count2 = 0;
-  auto s1 = subject.subscribe(incrementer(count1));
-  auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
-    count2++;
-    s1.~Subscription();
-  }));
-  subject.onNext(1);
-  subject.onNext(2);
-  EXPECT_EQ(1, count1);
-  EXPECT_EQ(2, count2);
-}
-
-TEST(RxTest, SubscribeUnsubscribeDuringCallback) {
-  // A subscriber who was subscribed and unsubscribed in the course of a
-  // callback should not get any updates
-  Subject<int> subject;
-  int outerCount = 0, innerCount = 0;
-  auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
-    outerCount++;
-    auto s2 = subject.subscribe(incrementer(innerCount));
-  }));
-  subject.onNext(1);
-  subject.onNext(2);
-  EXPECT_EQ(2, outerCount);
-  EXPECT_EQ(0, innerCount);
-}
-
-// Move only type
-typedef std::unique_ptr<int> MO;
-static MO makeMO() { return folly::make_unique<int>(1); }
-template <typename T>
-static ObserverPtr<T> makeMOObserver() {
-  return Observer<T>::create([](const T& mo) {
-    EXPECT_EQ(1, *mo);
-  });
-}
-
-TEST(RxTest, MoveOnlyRvalue) {
-  Subject<MO> subject;
-  auto s1 = subject.subscribe(makeMOObserver<MO>());
-  auto s2 = subject.subscribe(makeMOObserver<MO>());
-  auto mo = makeMO();
-  // Can't bind lvalues to rvalue references
-  // subject.onNext(mo);
-  subject.onNext(std::move(mo));
-  subject.onNext(makeMO());
-}
-
-// Copy only type
-struct CO {
-  CO() = default;
-  CO(const CO&) = default;
-  CO(CO&&) = delete;
-};
-
-template <typename T>
-static ObserverPtr<T> makeCOObserver() {
-  return Observer<T>::create([](const T& mo) {});
-}
-
-TEST(RxTest, CopyOnly) {
-  Subject<CO> subject;
-  auto s1 = subject.subscribe(makeCOObserver<CO>());
-  CO co;
-  subject.onNext(co);
-}
diff --git a/folly/wangle/rx/types.h b/folly/wangle/rx/types.h
deleted file mode 100644 (file)
index 3bb540e..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/Executor.h>
-
-namespace folly { namespace wangle {
-  typedef folly::exception_wrapper Error;
-  // The Executor is basically an rx Scheduler (by design). So just
-  // alias it.
-  typedef std::shared_ptr<folly::Executor> SchedulerPtr;
-
-  template <class T, size_t InlineObservers = 3> class Observable;
-  template <class T> struct Observer;
-  template <class T> struct Subject;
-
-  template <class T> using ObservablePtr = std::shared_ptr<Observable<T>>;
-  template <class T> using ObserverPtr = std::shared_ptr<Observer<T>>;
-  template <class T> using SubjectPtr = std::shared_ptr<Subject<T>>;
-}}