// Keep looping until all keep-alive handles are released. Each keep-alive
// handle signals that some external code will still schedule some work on
// this EventBase (so it's not safe to destroy it).
- while (loopKeepAliveCount_ > 0) {
+ while (loopKeepAliveCount() > 0) {
applyLoopKeepAlive();
loopOnce();
}
return true;
}
+ssize_t EventBase::loopKeepAliveCount() {
+ if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
+ loopKeepAliveCount_ +=
+ loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
+ }
+ DCHECK_GE(loopKeepAliveCount_, 0);
+ return loopKeepAliveCount_;
+}
+
void EventBase::applyLoopKeepAlive() {
- if (loopKeepAliveActive_ && loopKeepAliveCount_ == 0) {
+ if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) {
// Restore the notification queue internal flag
fnRunner_->stopConsuming();
fnRunner_->startConsumingInternal(this, queue_.get());
loopKeepAliveActive_ = false;
- } else if (!loopKeepAliveActive_ && loopKeepAliveCount_ > 0) {
+ } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) {
// Update the notification queue event to treat it as a normal
// (non-internal) event. The notification queue event always remains
// installed, and the main loop won't exit with it installed.
applyLoopKeepAlive();
};
// Make sure notification queue events are treated as normal events.
- auto keepAlive = loopKeepAlive();
+ // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
+ // released inside a loop.
+ ++loopKeepAliveCount_;
+ SCOPE_EXIT {
+ --loopKeepAliveCount_;
+ };
ret = loop();
}
#include <folly/Executor.h>
#include <folly/Function.h>
#include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
#include <folly/experimental/ExecutionObserver.h>
#include <folly/futures/DrivableExecutor.h>
#include <folly/io/async/AsyncTimeout.h>
/// Implements the DrivableExecutor interface
void drive() override {
- auto keepAlive = loopKeepAlive();
+ // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
+ // released inside a loop.
+ ++loopKeepAliveCount_;
+ SCOPE_EXIT {
+ --loopKeepAliveCount_;
+ };
loopOnce();
}
return LoopKeepAlive(this);
}
+ // Thread-safe version of loopKeepAlive()
+ LoopKeepAlive loopKeepAliveAtomic() {
+ if (inRunningEventBaseThread()) {
+ return loopKeepAlive();
+ }
+ loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
+ return LoopKeepAlive(this);
+ }
+
// TimeoutManager
void attachTimeoutManager(
AsyncTimeout* obj,
private:
void applyLoopKeepAlive();
+ ssize_t loopKeepAliveCount();
+
/*
* Helper function that tells us whether we have already handled
* some event/timeout/callback in this loop iteration.
// to send function requests to the EventBase thread.
std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_;
- size_t loopKeepAliveCount_{0};
+ ssize_t loopKeepAliveCount_{0};
+ std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
bool loopKeepAliveActive_{false};
// limit for latency in microseconds (0 disables)
t.join();
}
+TEST(EventBaseTest, LoopKeepAliveAtomic) {
+ auto evb = folly::make_unique<EventBase>();
+
+ constexpr size_t kNumThreads = 100;
+ constexpr size_t kNumTasks = 100;
+
+ std::vector<std::thread> ts;
+ std::vector<std::unique_ptr<Baton<>>> batons;
+ size_t done{0};
+
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ batons.emplace_back(std::make_unique<Baton<>>());
+ }
+
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
+ std::vector<EventBase::LoopKeepAlive> keepAlives;
+ for (size_t i = 0; i < kNumTasks; ++i) {
+ keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
+ }
+
+ batonPtr->post();
+
+ /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ for (auto& keepAlive : keepAlives) {
+ evbPtr->runInEventBaseThread(
+ [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
+ }
+ });
+ }
+
+ for (auto& baton : batons) {
+ baton->wait();
+ }
+
+ evb.reset();
+
+ EXPECT_EQ(kNumThreads * kNumTasks, done);
+
+ for (auto& t : ts) {
+ t.join();
+ }
+}
+
TEST(EventBaseTest, DrivableExecutorTest) {
folly::Promise<bool> p;
auto f = p.getFuture();