}
while (!stop_.load(std::memory_order_acquire)) {
+ applyLoopKeepAlive();
++nextLoopCnt_;
// Run the before loop callbacks
return true;
}
-void EventBase::loopForever() {
- // Update the notification queue event to treat it as a normal (non-internal)
- // event. The notification queue event always remains installed, and the main
- // loop won't exit with it installed.
- fnRunner_->stopConsuming();
- fnRunner_->startConsuming(this, queue_.get());
-
- bool ret = loop();
+void EventBase::applyLoopKeepAlive() {
+ if (loopKeepAliveActive_ && loopKeepAlive_.unique()) {
+ // Restore the notification queue internal flag
+ fnRunner_->stopConsuming();
+ fnRunner_->startConsumingInternal(this, queue_.get());
+ loopKeepAliveActive_ = false;
+ } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) {
+ // Update the notification queue event to treat it as a normal
+ // (non-internal) event. The notification queue event always remains
+ // installed, and the main loop won't exit with it installed.
+ fnRunner_->stopConsuming();
+ fnRunner_->startConsuming(this, queue_.get());
+ loopKeepAliveActive_ = true;
+ }
+}
- // Restore the notification queue internal flag
- fnRunner_->stopConsuming();
- fnRunner_->startConsumingInternal(this, queue_.get());
+void EventBase::loopForever() {
+ bool ret;
+ {
+ SCOPE_EXIT {
+ applyLoopKeepAlive();
+ loopForeverActive_ = false;
+ };
+ loopForeverActive_ = true;
+ // Make sure notification queue events are treated as normal events.
+ auto loopKeepAlive = loopKeepAlive_;
+ ret = loop();
+ }
if (!ret) {
folly::throwSystemError("error in EventBase::loopForever()");
loopOnce();
}
+ using LoopKeepAlive = std::shared_ptr<void>;
+
+ /// Returns you a handle which make loop() behave like loopForever() until
+ /// destroyed. loop() will return to its original behavior only when all
+ /// loop keep-alives are released. Loop holder is safe to release only from
+ /// EventBase thread.
+ ///
+ /// May return no op LoopKeepAlive if loopForever() is already running.
+ LoopKeepAlive loopKeepAlive() {
+ if (loopForeverActive_) {
+ return nullptr;
+ } else {
+ return loopKeepAlive_;
+ }
+ }
+
private:
// TimeoutManager
void attachTimeoutManager(AsyncTimeout* obj,
return isInEventBaseThread();
}
+ void applyLoopKeepAlive();
+
/*
* Helper function that tells us whether we have already handled
* some event/timeout/callback in this loop iteration.
// to send function requests to the EventBase thread.
std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_;
+ LoopKeepAlive loopKeepAlive_{std::make_shared<int>(42)};
+ bool loopKeepAliveActive_{false};
+ std::atomic<bool> loopForeverActive_{false};
// limit for latency in microseconds (0 disables)
int64_t maxLatency_;
ASSERT_TRUE(ran);
}
+
+TEST(EventBaseTest, LoopKeepAlive) {
+ EventBase evb;
+
+ bool done = false;
+ std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evb.runInEventBaseThread([&] { done = true; });
+ });
+
+ evb.loop();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveInLoop) {
+ EventBase evb;
+
+ bool done = false;
+ std::thread t;
+
+ evb.runInEventBaseThread([&] {
+ t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evb.runInEventBaseThread([&] { done = true; });
+ });
+ });
+
+ evb.loop();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}