From: James Sedgwick Date: Tue, 23 Sep 2014 13:12:18 +0000 (-0700) Subject: thread safety for Observable::observers_ X-Git-Tag: v0.22.0~335 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=8cf3e4f8aae989f00a8a53b72cfd89898b036a36;p=folly.git thread safety for Observable::observers_ Summary: this way we can subscribe to an observable and blast data through it simultaneously from different threads Test Plan: not much... the one client of rx compiles Reviewed By: davejwatson@fb.com Subscribers: fugalh, njormrod, bmatheny FB internal diff: D1560647 --- diff --git a/folly/experimental/wangle/rx/Observable.h b/folly/experimental/wangle/rx/Observable.h index 4769007f..f85d52a2 100644 --- a/folly/experimental/wangle/rx/Observable.h +++ b/folly/experimental/wangle/rx/Observable.h @@ -20,6 +20,8 @@ #include "Subject.h" #include "Subscription.h" +#include +#include #include #include #include @@ -27,15 +29,33 @@ namespace folly { namespace wangle { template -struct Observable { +class Observable { + public: + Observable() = default; + Observable(Observable&& other) noexcept { + RWSpinLock::WriteHolder{&other.observersLock_}; + observers_ = std::move(other.observers_); + } + virtual ~Observable() = default; /// Subscribe the given Observer to this Observable. // Eventually this will return a Subscription object of some kind, in order // to support cancellation. This is kinda really important. Maybe I should // just do it now, using an dummy Subscription object. + // + // If this is called within an Observer callback, the new observer will not + // get the current update but will get subsequent updates. virtual Subscription subscribe(ObserverPtr o) { - observers_.push_back(o); + if (inCallback_ && *inCallback_) { + if (!newObservers_) { + newObservers_.reset(new std::list>()); + } + newObservers_->push_back(o); + } else { + RWSpinLock::WriteHolder{&observersLock_}; + observers_.push_back(o); + } return Subscription(); } @@ -97,7 +117,48 @@ struct Observable { } protected: + const std::list>& getObservers() { + return observers_; + } + + // This guard manages deferred modification of the observers list. + // Subclasses should use this guard if they want to subscribe new observers + // in the course of a callback. New observers won't be added until the guard + // goes out of scope. See Subject for an example. + class ObserversGuard { + public: + explicit ObserversGuard(Observable* o) : o_(o) { + if (UNLIKELY(!o_->inCallback_)) { + o_->inCallback_.reset(new bool{false}); + } + CHECK(!(*o_->inCallback_)); + *o_->inCallback_ = true; + o_->observersLock_.lock_shared(); + } + + ~ObserversGuard() { + o_->observersLock_.unlock_shared(); + if (UNLIKELY(o_->newObservers_ && !o_->newObservers_->empty())) { + { + RWSpinLock::WriteHolder(o_->observersLock_); + for (auto& o : *(o_->newObservers_)) { + o_->observers_.push_back(o); + } + } + o_->newObservers_->clear(); + } + *o_->inCallback_ = false; + } + + private: + Observable* o_; + }; + + private: std::list> observers_; + RWSpinLock observersLock_; + folly::ThreadLocalPtr inCallback_; + folly::ThreadLocalPtr>> newObservers_; }; }} diff --git a/folly/experimental/wangle/rx/Subject.h b/folly/experimental/wangle/rx/Subject.h index 41b59c4e..7d4c7cb8 100644 --- a/folly/experimental/wangle/rx/Subject.h +++ b/folly/experimental/wangle/rx/Subject.h @@ -25,17 +25,24 @@ namespace folly { namespace wangle { /// observed events to the Subject's observers. template struct Subject : public Observable, public Observer { + typedef typename Observable::ObserversGuard ObserversGuard; void onNext(T val) override { - for (auto& o : this->observers_) + ObserversGuard guard(this); + for (auto& o : Observable::getObservers()) { o->onNext(val); + } } void onError(Error e) override { - for (auto& o : this->observers_) + ObserversGuard guard(this); + for (auto& o : Observable::getObservers()) { o->onError(e); + } } void onCompleted() override { - for (auto& o : this->observers_) + ObserversGuard guard(this); + for (auto& o : Observable::getObservers()) { o->onCompleted(); + } } }; diff --git a/folly/experimental/wangle/rx/test/RxTest.cpp b/folly/experimental/wangle/rx/test/RxTest.cpp new file mode 100644 index 00000000..cf4d9dd0 --- /dev/null +++ b/folly/experimental/wangle/rx/test/RxTest.cpp @@ -0,0 +1,39 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +using namespace folly::wangle; + +TEST(RxTest, SubscribeDuringCallback) { + // A subscriber who was subscribed in the course of a callback should get + // subsequent updates but not the current update. + Subject subject; + int outerCount = 0; + int innerCount = 0; + subject.subscribe(Observer::create([&] (int x) { + outerCount++; + subject.subscribe(Observer::create([&] (int y) { + innerCount++; + })); + })); + subject.onNext(42); + subject.onNext(0xDEADBEEF); + EXPECT_EQ(2, outerCount); + EXPECT_EQ(1, innerCount); +}