From: Andrii Grynenko Date: Thu, 5 May 2016 20:22:42 +0000 (-0700) Subject: Implement LoopKeepAlive for EventBase X-Git-Tag: 2016.07.26~274 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=2d2aed32cff14f35fd3163276a0475fcd682edf5;p=folly.git Implement LoopKeepAlive for EventBase Summary: LoopKeepAlive can be useful to tell EventBase, that loop() shouldn't return even if there are no events registered, because some tasks will later be added via runInEventBaseThread. While at least one LoopKeepAlive is alive - EventBase::loop() behaves like EventBase::loopForever(). Reviewed By: yfeldblum Differential Revision: D3261706 fb-gh-sync-id: d91424d3d12cae11abd72cffdbd57f136f628dae fbshipit-source-id: d91424d3d12cae11abd72cffdbd57f136f628dae --- diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 6f7e9a27..5c0b7812 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -319,6 +319,7 @@ bool EventBase::loopBody(int flags) { } while (!stop_.load(std::memory_order_acquire)) { + applyLoopKeepAlive(); ++nextLoopCnt_; // Run the before loop callbacks @@ -425,18 +426,34 @@ bool EventBase::loopBody(int flags) { return true; } -void EventBase::loopForever() { - // Update the notification queue event to treat it as a normal (non-internal) - // event. The notification queue event always remains installed, and the main - // loop won't exit with it installed. - fnRunner_->stopConsuming(); - fnRunner_->startConsuming(this, queue_.get()); - - bool ret = loop(); +void EventBase::applyLoopKeepAlive() { + if (loopKeepAliveActive_ && loopKeepAlive_.unique()) { + // Restore the notification queue internal flag + fnRunner_->stopConsuming(); + fnRunner_->startConsumingInternal(this, queue_.get()); + loopKeepAliveActive_ = false; + } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) { + // Update the notification queue event to treat it as a normal + // (non-internal) event. The notification queue event always remains + // installed, and the main loop won't exit with it installed. + fnRunner_->stopConsuming(); + fnRunner_->startConsuming(this, queue_.get()); + loopKeepAliveActive_ = true; + } +} - // Restore the notification queue internal flag - fnRunner_->stopConsuming(); - fnRunner_->startConsumingInternal(this, queue_.get()); +void EventBase::loopForever() { + bool ret; + { + SCOPE_EXIT { + applyLoopKeepAlive(); + loopForeverActive_ = false; + }; + loopForeverActive_ = true; + // Make sure notification queue events are treated as normal events. + auto loopKeepAlive = loopKeepAlive_; + ret = loop(); + } if (!ret) { folly::throwSystemError("error in EventBase::loopForever()"); diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index e678781e..5999687d 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -586,6 +586,22 @@ class EventBase : private boost::noncopyable, loopOnce(); } + using LoopKeepAlive = std::shared_ptr; + + /// Returns you a handle which make loop() behave like loopForever() until + /// destroyed. loop() will return to its original behavior only when all + /// loop keep-alives are released. Loop holder is safe to release only from + /// EventBase thread. + /// + /// May return no op LoopKeepAlive if loopForever() is already running. + LoopKeepAlive loopKeepAlive() { + if (loopForeverActive_) { + return nullptr; + } else { + return loopKeepAlive_; + } + } + private: // TimeoutManager void attachTimeoutManager(AsyncTimeout* obj, @@ -602,6 +618,8 @@ class EventBase : private boost::noncopyable, return isInEventBaseThread(); } + void applyLoopKeepAlive(); + /* * Helper function that tells us whether we have already handled * some event/timeout/callback in this loop iteration. @@ -673,6 +691,9 @@ class EventBase : private boost::noncopyable, // to send function requests to the EventBase thread. std::unique_ptr> queue_; std::unique_ptr fnRunner_; + LoopKeepAlive loopKeepAlive_{std::make_shared(42)}; + bool loopKeepAliveActive_{false}; + std::atomic loopForeverActive_{false}; // limit for latency in microseconds (0 disables) int64_t maxLatency_; diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index 8a456065..7cb49a4d 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -1728,3 +1728,41 @@ TEST(EventBaseTest, RunCallbacksOnDestruction) { ASSERT_TRUE(ran); } + +TEST(EventBaseTest, LoopKeepAlive) { + EventBase evb; + + bool done = false; + std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evb.runInEventBaseThread([&] { done = true; }); + }); + + evb.loop(); + + ASSERT_TRUE(done); + + t.join(); +} + +TEST(EventBaseTest, LoopKeepAliveInLoop) { + EventBase evb; + + bool done = false; + std::thread t; + + evb.runInEventBaseThread([&] { + t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evb.runInEventBaseThread([&] { done = true; }); + }); + }); + + evb.loop(); + + ASSERT_TRUE(done); + + t.join(); +}