// Keep looping until all keep-alive handles are released. Each keep-alive
// handle signals that some external code will still schedule some work on
// this EventBase (so it's not safe to destroy it).
- while (!loopKeepAlive_.unique()) {
+ while (loopKeepAliveCount_ > 0) {
applyLoopKeepAlive();
loopOnce();
}
}
void EventBase::applyLoopKeepAlive() {
- if (loopKeepAliveActive_ && loopKeepAlive_.unique()) {
+ if (loopKeepAliveActive_ && loopKeepAliveCount_ == 0) {
// Restore the notification queue internal flag
fnRunner_->stopConsuming();
fnRunner_->startConsumingInternal(this, queue_.get());
loopKeepAliveActive_ = false;
- } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) {
+ } else if (!loopKeepAliveActive_ && loopKeepAliveCount_ > 0) {
// Update the notification queue event to treat it as a normal
// (non-internal) event. The notification queue event always remains
// installed, and the main loop won't exit with it installed.
{
SCOPE_EXIT {
applyLoopKeepAlive();
- loopForeverActive_ = false;
};
- loopForeverActive_ = true;
// Make sure notification queue events are treated as normal events.
- auto loopKeepAlive = loopKeepAlive_;
+ auto keepAlive = loopKeepAlive();
ret = loop();
}
loopOnce();
}
- using LoopKeepAlive = std::shared_ptr<void>;
+ struct LoopKeepAliveDeleter {
+ void operator()(EventBase* evb) {
+ DCHECK(evb->isInEventBaseThread());
+ evb->loopKeepAliveCount_--;
+ }
+ };
+ using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
/// Returns you a handle which make loop() behave like loopForever() until
/// destroyed. loop() will return to its original behavior only when all
///
/// May return no op LoopKeepAlive if loopForever() is already running.
LoopKeepAlive loopKeepAlive() {
- if (loopForeverActive_) {
- return nullptr;
- } else {
- return loopKeepAlive_;
- }
+ DCHECK(isInEventBaseThread());
+ loopKeepAliveCount_++;
+ return LoopKeepAlive(this);
}
private:
// to send function requests to the EventBase thread.
std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_;
- LoopKeepAlive loopKeepAlive_{std::make_shared<int>(42)};
+ size_t loopKeepAliveCount_{0};
bool loopKeepAliveActive_{false};
- std::atomic<bool> loopForeverActive_{false};
// limit for latency in microseconds (0 disables)
int64_t maxLatency_;
EventBase evb;
bool done = false;
- std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] {
+ std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
std::thread t;
evb.runInEventBaseThread([&] {
- t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] {
+ t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
t.join();
}
+TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
+ std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
+
+ bool done = false;
+
+ std::thread evThread([&] {
+ evb->loopForever();
+ evb.reset();
+ done = true;
+ });
+
+ {
+ auto* ev = evb.get();
+ EventBase::LoopKeepAlive keepAlive;
+ ev->runInEventBaseThreadAndWait(
+ [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
+ ASSERT_FALSE(done) << "Loop finished before we asked it to";
+ ev->terminateLoopSoon();
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(30));
+ ASSERT_FALSE(done) << "Loop terminated early";
+ ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
+ }
+
+ evThread.join();
+ ASSERT_TRUE(done);
+}
+
TEST(EventBaseTest, LoopKeepAliveShutdown) {
auto evb = folly::make_unique<EventBase>();
bool done = false;
- std::thread t(
- [&done, loopKeepAlive = evb->loopKeepAlive(), evbPtr = evb.get() ] {
- /* sleep override */ std::this_thread::sleep_for(
- std::chrono::milliseconds(100));
- evbPtr->runInEventBaseThread(
- [&done, loopKeepAlive = std::move(loopKeepAlive) ] {
- done = true;
- });
- });
+ std::thread t([
+ &done,
+ loopKeepAlive = evb->loopKeepAlive(),
+ evbPtr = evb.get()
+ ]() mutable {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evbPtr->runInEventBaseThread(
+ [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+ });
evb.reset();