Indefinite observers and inline observers
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 17 Nov 2014 23:26:00 +0000 (15:26 -0800)
committerDave Watson <davejwatson@fb.com>
Wed, 19 Nov 2014 20:52:39 +0000 (12:52 -0800)
Summary:
provides new APIs observe(ObserverPtr<T>) and observe(Observer<T>*) that
are not associated with a subscription. The latter method avoids allocations at the cost of disowning responsiblity for object lifetimes.

It could be used for batch-allocating filter chains, for instance. I hacked up some recursive templates that would make that easy to do, but
haven't included them. I'd rather see the first use case and then decide whether to let users set them up themselves or to provide an
abstraction that makes it easier.

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: folly-diffs@, fugalh, njormrod

FB internal diff: D1595200

Tasks: 5002278

Signature: t1:1595200:1416266578:95c0532af42443fd83e2f94d30790c3c0831ce65

folly/experimental/wangle/rx/Observable.h
folly/experimental/wangle/rx/Subject.h
folly/experimental/wangle/rx/Subscription.h
folly/experimental/wangle/rx/test/RxBenchmark.cpp [new file with mode: 0644]
folly/experimental/wangle/rx/test/RxTest.cpp
folly/experimental/wangle/rx/types.h

index c4bc70da85214acbeb78afc1841e37e96e5fca8f..c822bb710c993ba3e0a39e2473dd3be3c8538c58 100644 (file)
 #include <folly/RWSpinLock.h>
 #include <folly/SmallLocks.h>
 #include <folly/ThreadLocal.h>
+#include <folly/small_vector.h>
 #include <folly/wangle/Executor.h>
 #include <map>
 #include <memory>
 
 namespace folly { namespace wangle {
 
-template <class T>
+template <class T, size_t InlineObservers>
 class Observable {
  public:
   Observable() : nextSubscriptionId_{1} {}
@@ -43,27 +44,44 @@ class Observable {
     }
   }
 
-  typedef typename std::map<uint64_t, ObserverPtr<T>> ObserverMap;
-
-  // Subscribe the given Observer to this Observable.
+  // The next three methods subscribe the given Observer to this Observable.
   //
-  // If this is called within an Observer callback, the new observer will not
+  // If these are called within an Observer callback, the new observer will not
   // get the current update but will get subsequent updates.
+  //
+  // subscribe() returns a Subscription object. The observer will continue to
+  // get updates until the Subscription is destroyed.
+  //
+  // observe(ObserverPtr<T>) creates an indefinite subscription
+  //
+  // observe(Observer<T>*) also creates an indefinite subscription, but the
+  // caller is responsible for ensuring that the given Observer outlives this
+  // Observable. This might be useful in high performance environments where
+  // allocations must be kept to a minimum. Template parameter InlineObservers
+  // specifies how many observers can been subscribed inline without any
+  // allocations (it's just the size of a folly::small_vector).
   virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
-    auto subscription = makeSubscription();
-    typename ObserverMap::value_type kv{subscription.id_, std::move(observer)};
+    return subscribeImpl(observer, false);
+  }
+
+  virtual void observe(ObserverPtr<T> observer) {
+    subscribeImpl(observer, true);
+  }
+
+  virtual void observe(Observer<T>* observer) {
     if (inCallback_ && *inCallback_) {
       if (!newObservers_) {
-        newObservers_.reset(new ObserverMap());
+        newObservers_.reset(new ObserverList());
       }
-      newObservers_->insert(std::move(kv));
+      newObservers_->push_back(observer);
     } else {
       RWSpinLock::WriteHolder{&observersLock_};
-      observers_.insert(std::move(kv));
+      observers_.push_back(observer);
     }
-    return subscription;
   }
 
+  // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
+
   /// Returns a new Observable that will call back on the given Scheduler.
   /// The returned Observable must outlive the parent Observable.
 
@@ -122,53 +140,71 @@ class Observable {
   }
 
  protected:
-  const ObserverMap& getObservers() {
-    return observers_;
-  }
+  // Safely execute an operation on each observer. F must take a single
+  // Observer<T>* as its argument.
+  template <class F>
+  void forEachObserver(F f) {
+    if (UNLIKELY(!inCallback_)) {
+      inCallback_.reset(new bool{false});
+    }
+    CHECK(!(*inCallback_));
+    *inCallback_ = true;
 
-  // This guard manages deferred modification of the observers list.
-  // Subclasses should use this guard if they want to subscribe new observers
-  // in the course of a callback. New observers won't be added until the guard
-  // goes out of scope. See Subject for an example.
-  class ObserversGuard {
-   public:
-    explicit ObserversGuard(Observable* o) : o_(o) {
-      if (UNLIKELY(!o_->inCallback_)) {
-        o_->inCallback_.reset(new bool{false});
+    {
+      RWSpinLock::ReadHolder(observersLock_);
+      for (auto o : observers_) {
+        f(o);
+      }
+
+      for (auto& kv : subscribers_) {
+        f(kv.second.get());
       }
-      CHECK(!(*o_->inCallback_));
-      *o_->inCallback_ = true;
-      o_->observersLock_.lock_shared();
     }
 
-    ~ObserversGuard() {
-      o_->observersLock_.unlock_shared();
-      if (UNLIKELY((o_->newObservers_ && !o_->newObservers_->empty()) ||
-                   (o_->oldObservers_ && !o_->oldObservers_->empty()))) {
-        {
-          RWSpinLock::WriteHolder(o_->observersLock_);
-          if (o_->newObservers_) {
-            for (auto& kv : *(o_->newObservers_)) {
-              o_->observers_.insert(std::move(kv));
-            }
-            o_->newObservers_->clear();
+    if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
+                 (newSubscribers_ && !newSubscribers_->empty()) ||
+                 (oldSubscribers_ && !oldSubscribers_->empty()))) {
+      {
+        RWSpinLock::WriteHolder(observersLock_);
+        if (newObservers_) {
+          for (auto observer : *(newObservers_)) {
+            observers_.push_back(observer);
           }
-          if (o_->oldObservers_) {
-            for (auto id : *(o_->oldObservers_)) {
-              o_->observers_.erase(id);
-            }
-            o_->oldObservers_->clear();
+          newObservers_->clear();
+        }
+        if (newSubscribers_) {
+          for (auto& kv : *(newSubscribers_)) {
+            subscribers_.insert(std::move(kv));
           }
+          newSubscribers_->clear();
+        }
+        if (oldSubscribers_) {
+          for (auto id : *(oldSubscribers_)) {
+            subscribers_.erase(id);
+          }
+          oldSubscribers_->clear();
         }
       }
-      *o_->inCallback_ = false;
     }
-
-   private:
-    Observable* o_;
-  };
+    *inCallback_ = false;
+  }
 
  private:
+  Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
+    auto subscription = makeSubscription(indefinite);
+    typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
+    if (inCallback_ && *inCallback_) {
+      if (!newSubscribers_) {
+        newSubscribers_.reset(new SubscriberMap());
+      }
+      newSubscribers_->insert(std::move(kv));
+    } else {
+      RWSpinLock::WriteHolder{&observersLock_};
+      subscribers_.insert(std::move(kv));
+    }
+    return subscription;
+  }
+
   class Unsubscriber {
    public:
     explicit Unsubscriber(Observable* observable) : observable_(observable) {
@@ -200,39 +236,49 @@ class Observable {
 
   void unsubscribe(uint64_t id) {
     if (inCallback_ && *inCallback_) {
-      if (!oldObservers_) {
-        oldObservers_.reset(new std::vector<uint64_t>());
+      if (!oldSubscribers_) {
+        oldSubscribers_.reset(new std::vector<uint64_t>());
       }
-      if (newObservers_) {
-        auto it = newObservers_->find(id);
-        if (it != newObservers_->end()) {
-          newObservers_->erase(it);
+      if (newSubscribers_) {
+        auto it = newSubscribers_->find(id);
+        if (it != newSubscribers_->end()) {
+          newSubscribers_->erase(it);
           return;
         }
       }
-      oldObservers_->push_back(id);
+      oldSubscribers_->push_back(id);
     } else {
       RWSpinLock::WriteHolder{&observersLock_};
-      observers_.erase(id);
+      subscribers_.erase(id);
     }
   }
 
-  Subscription<T> makeSubscription() {
-    if (!unsubscriber_) {
-      std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
+  Subscription<T> makeSubscription(bool indefinite) {
+    if (indefinite) {
+      return Subscription<T>(nullptr, nextSubscriptionId_++);
+    } else {
       if (!unsubscriber_) {
-        unsubscriber_ = std::make_shared<Unsubscriber>(this);
+        std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
+        if (!unsubscriber_) {
+          unsubscriber_ = std::make_shared<Unsubscriber>(this);
+        }
       }
+      return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
     }
-    return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
   }
 
   std::atomic<uint64_t> nextSubscriptionId_;
-  ObserverMap observers_;
   RWSpinLock observersLock_;
   folly::ThreadLocalPtr<bool> inCallback_;
-  folly::ThreadLocalPtr<ObserverMap> newObservers_;
-  folly::ThreadLocalPtr<std::vector<uint64_t>> oldObservers_;
+
+  typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
+  ObserverList observers_;
+  folly::ThreadLocalPtr<ObserverList> newObservers_;
+
+  typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
+  SubscriberMap subscribers_;
+  folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
+  folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
 };
 
 }}
index 0cb357fe9cbdd56df71ed73acea74b45a5dfb63b..6ff04c0e6be72786d3f6879039e9a3f80fdbfd3f 100644 (file)
@@ -26,24 +26,20 @@ namespace folly { namespace wangle {
 /// observed events to the Subject's observers.
 template <class T>
 struct Subject : public Observable<T>, public Observer<T> {
-  typedef typename Observable<T>::ObserversGuard ObserversGuard;
   void onNext(const T& val) override {
-    ObserversGuard guard(this);
-    for (auto& kv : Observable<T>::getObservers()) {
-      kv.second->onNext(val);
-    }
+    this->forEachObserver([&](Observer<T>* o){
+      o->onNext(val);
+    });
   }
   void onError(Error e) override {
-    ObserversGuard guard(this);
-    for (auto& kv : Observable<T>::getObservers()) {
-      kv.second->onError(e);
-    }
+    this->forEachObserver([&](Observer<T>* o){
+      o->onError(e);
+    });
   }
   void onCompleted() override {
-    ObserversGuard guard(this);
-    for (auto& kv : Observable<T>::getObservers()) {
-      kv.second->onCompleted();
-    }
+    this->forEachObserver([](Observer<T>* o){
+      o->onCompleted();
+    });
   }
 };
 
index 0cf667e6445b1ad620d2f854cd5183ebef246e9e..7c058e23de2c946471d0cfa3165368a4eaaf90cc 100644 (file)
@@ -49,7 +49,6 @@ class Subscription {
 
   Subscription(std::shared_ptr<Unsubscriber> unsubscriber, uint64_t id)
     : unsubscriber_(std::move(unsubscriber)), id_(id) {
-    CHECK(unsubscriber_);
     CHECK(id_ > 0);
   }
 
diff --git a/folly/experimental/wangle/rx/test/RxBenchmark.cpp b/folly/experimental/wangle/rx/test/RxBenchmark.cpp
new file mode 100644 (file)
index 0000000..5a14bbc
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/Benchmark.h>
+#include <folly/experimental/wangle/rx/Observer.h>
+#include <folly/experimental/wangle/rx/Subject.h>
+#include <gflags/gflags.h>
+
+using namespace folly::wangle;
+using folly::BenchmarkSuspender;
+
+static std::unique_ptr<Observer<int>> makeObserver() {
+  return Observer<int>::create([&] (int x) {});
+}
+
+void subscribeImpl(uint iters, int N, bool countUnsubscribe) {
+  for (int iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<std::unique_ptr<Observer<int>>> observers;
+    std::vector<Subscription<int>> subscriptions;
+    subscriptions.reserve(N);
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver());
+    }
+    bs.dismiss();
+    for (int i = 0; i < N; i++) {
+      subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+    }
+    if (countUnsubscribe) {
+      subscriptions.clear();
+    }
+    bs.rehire();
+  }
+}
+
+void subscribeAndUnsubscribe(uint iters, int N) {
+  subscribeImpl(iters, N, true);
+}
+
+void subscribe(uint iters, int N) {
+  subscribeImpl(iters, N, false);
+}
+
+void observe(uint iters, int N) {
+  for (int iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<std::unique_ptr<Observer<int>>> observers;
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver());
+    }
+    bs.dismiss();
+    for (int i = 0; i < N; i++) {
+      subject.observe(std::move(observers[i]));
+    }
+    bs.rehire();
+  }
+}
+
+void inlineObserve(uint iters, int N) {
+  for (int iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<Observer<int>*> observers;
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver().release());
+    }
+    bs.dismiss();
+    for (int i = 0; i < N; i++) {
+      subject.observe(observers[i]);
+    }
+    bs.rehire();
+    for (int i = 0; i < N; i++) {
+      delete observers[i];
+    }
+  }
+}
+
+void notifySubscribers(uint iters, int N) {
+  for (int iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<std::unique_ptr<Observer<int>>> observers;
+    std::vector<Subscription<int>> subscriptions;
+    subscriptions.reserve(N);
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver());
+    }
+    for (int i = 0; i < N; i++) {
+      subscriptions.push_back(subject.subscribe(std::move(observers[i])));
+    }
+    bs.dismiss();
+    subject.onNext(42);
+    bs.rehire();
+  }
+}
+
+void notifyInlineObservers(uint iters, int N) {
+  for (int iter = 0; iter < iters; iter++) {
+    BenchmarkSuspender bs;
+    Subject<int> subject;
+    std::vector<Observer<int>*> observers;
+    for (int i = 0; i < N; i++) {
+      observers.push_back(makeObserver().release());
+    }
+    for (int i = 0; i < N; i++) {
+      subject.observe(observers[i]);
+    }
+    bs.dismiss();
+    subject.onNext(42);
+    bs.rehire();
+  }
+}
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1);
+BENCHMARK_RELATIVE_PARAM(observe, 1);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(subscribeAndUnsubscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(subscribe, 1000);
+BENCHMARK_RELATIVE_PARAM(observe, 1000);
+BENCHMARK_RELATIVE_PARAM(inlineObserve, 1000);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1);
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK_PARAM(notifySubscribers, 1000);
+BENCHMARK_RELATIVE_PARAM(notifyInlineObservers, 1000);
+
+int main(int argc, char** argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  folly::runBenchmarks();
+  return 0;
+}
index ab44657a2313b9c7cc51df898750d1adbaef0573..8cf2605d03a333995e68692e224162f9bd5e8175 100644 (file)
@@ -26,6 +26,24 @@ static std::unique_ptr<Observer<int>> incrementer(int& counter) {
   });
 }
 
+TEST(RxTest, Observe) {
+  Subject<int> subject;
+  auto count = 0;
+  subject.observe(incrementer(count));
+  subject.onNext(1);
+  EXPECT_EQ(1, count);
+}
+
+TEST(RxTest, ObserveInline) {
+  Subject<int> subject;
+  auto count = 0;
+  auto o = incrementer(count).release();
+  subject.observe(o);
+  subject.onNext(1);
+  EXPECT_EQ(1, count);
+  delete o;
+}
+
 TEST(RxTest, Subscription) {
   Subject<int> subject;
   auto count = 0;
@@ -75,6 +93,36 @@ TEST(RxTest, SubscribeDuringCallback) {
   EXPECT_EQ(1, innerCount);
 }
 
+TEST(RxTest, ObserveDuringCallback) {
+  Subject<int> subject;
+  int outerCount = 0, innerCount = 0;
+  subject.observe(Observer<int>::create([&] (int x) {
+    outerCount++;
+    subject.observe(incrementer(innerCount));
+  }));
+  subject.onNext(42);
+  subject.onNext(0xDEADBEEF);
+  EXPECT_EQ(2, outerCount);
+  EXPECT_EQ(1, innerCount);
+}
+
+TEST(RxTest, ObserveInlineDuringCallback) {
+  Subject<int> subject;
+  int outerCount = 0, innerCount = 0;
+  auto innerO = incrementer(innerCount).release();
+  auto outerO = Observer<int>::create([&] (int x) {
+    outerCount++;
+    subject.observe(innerO);
+  }).release();
+  subject.observe(outerO);
+  subject.onNext(42);
+  subject.onNext(0xDEADBEEF);
+  EXPECT_EQ(2, outerCount);
+  EXPECT_EQ(1, innerCount);
+  delete innerO;
+  delete outerO;
+}
+
 TEST(RxTest, UnsubscribeDuringCallback) {
   // A subscriber who was unsubscribed in the course of a callback should get
   // the current update but not subsequent ones
index 317fac147f709bcd19cb7285a5a345a4f93b50e0..0f10c1cb712496f529e8ba13d4f6d0d782defcbb 100644 (file)
@@ -25,7 +25,7 @@ namespace folly { namespace wangle {
   // alias it.
   typedef std::shared_ptr<folly::wangle::Executor> SchedulerPtr;
 
-  template <class T> struct Observable;
+  template <class T, size_t InlineObservers = 3> struct Observable;
   template <class T> struct Observer;
   template <class T> struct Subject;