#include <stdexcept>
+#include <glog/logging.h>
+
namespace folly {
void Executor::addWithPriority(Func, int8_t /* priority */) {
throw std::runtime_error(
"addWithPriority() is not implemented for this Executor");
}
+
+void Executor::keepAliveRelease() {
+ LOG(FATAL) << "keepAliveRelease() should not be called for folly::Executors "
+ << "which do not implement getKeepAliveToken()";
+}
}
void addPtr(P fn) {
this->add([fn]() mutable { (*fn)(); });
}
+
+ class KeepAlive {
+ public:
+ KeepAlive() {}
+
+ void reset() {
+ executor_.reset();
+ }
+
+ explicit operator bool() const {
+ return executor_ != nullptr;
+ }
+
+ private:
+ friend class Executor;
+ explicit KeepAlive(folly::Executor* executor) : executor_(executor) {}
+
+ struct Deleter {
+ void operator()(folly::Executor* executor) {
+ executor->keepAliveRelease();
+ }
+ };
+ std::unique_ptr<folly::Executor, Deleter> executor_;
+ };
+
+ /// Returns a keep-alive token which guarantees that Executor will keep
+ /// processing tasks until the token is released. keep-alive token can only
+ /// be destroyed from within the task, scheduled to be run on an executor.
+ ///
+ /// If executor does not support keep-alive functionality - dummy token will
+ /// be returned.
+ virtual KeepAlive getKeepAliveToken() {
+ return {};
+ }
+
+ protected:
+ virtual void keepAliveRelease();
+
+ KeepAlive makeKeepAlive() {
+ return KeepAlive{this};
+ }
};
} // folly
template <typename EventBaseT>
inline void EventBaseLoopControllerT<EventBaseT>::runLoop() {
if (!eventBaseKeepAlive_) {
- eventBaseKeepAlive_ = eventBase_->loopKeepAlive();
+ eventBaseKeepAlive_ = eventBase_->getKeepAliveToken();
}
if (loopRunner_) {
loopRunner_->run([&] { fm_->loopUntilNoReadyImpl(); });
bool awaitingScheduling_{false};
EventBaseT* eventBase_{nullptr};
- typename EventBaseT::LoopKeepAlive eventBaseKeepAlive_;
+ Executor::KeepAlive eventBaseKeepAlive_;
ControllerCallback callback_;
DestructionCallback destructionCallback_;
FiberManager* fm_{nullptr};
loopOnce();
}
- struct LoopKeepAliveDeleter {
- void operator()(EventBase* evb) {
- DCHECK(evb->isInEventBaseThread());
- evb->loopKeepAliveCount_--;
- }
- };
- using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
-
/// Returns you a handle which make loop() behave like loopForever() until
/// destroyed. loop() will return to its original behavior only when all
/// loop keep-alives are released. Loop holder is safe to release only from
/// EventBase thread.
- ///
- /// May return no op LoopKeepAlive if loopForever() is already running.
- LoopKeepAlive loopKeepAlive() {
- DCHECK(isInEventBaseThread());
- loopKeepAliveCount_++;
- return LoopKeepAlive(this);
- }
-
- // Thread-safe version of loopKeepAlive()
- LoopKeepAlive loopKeepAliveAtomic() {
+ KeepAlive getKeepAliveToken() override {
if (inRunningEventBaseThread()) {
- return loopKeepAlive();
+ loopKeepAliveCount_++;
+ } else {
+ loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
}
- loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
- return LoopKeepAlive(this);
+ return makeKeepAlive();
}
// TimeoutManager
return isInEventBaseThread();
}
+ protected:
+ void keepAliveRelease() override {
+ DCHECK(isInEventBaseThread());
+ loopKeepAliveCount_--;
+ }
+
private:
void applyLoopKeepAlive();
namespace folly {
VirtualEventBase::VirtualEventBase(EventBase& evb) : evb_(evb) {
- evbLoopKeepAlive_ = evb_.loopKeepAliveAtomic();
- loopKeepAlive_ = loopKeepAliveAtomic();
+ evbLoopKeepAlive_ = evb_.getKeepAliveToken();
+ loopKeepAlive_ = getKeepAliveToken();
}
VirtualEventBase::~VirtualEventBase() {
*
* Multiple VirtualEventBases can be backed by a single EventBase. Similarly
* to EventBase, VirtualEventBase implements loopKeepAlive() functionality,
- * which allows callbacks holding LoopKeepAlive token to keep EventBase looping
+ * which allows callbacks holding KeepAlive token to keep EventBase looping
* until they are complete.
*
* VirtualEventBase destructor blocks until all its KeepAliveTokens are released
*/
template <typename F>
void runInEventBaseThread(F&& f) {
- // LoopKeepAlive token has to be released in the EventBase thread. If
- // runInEventBaseThread() fails, we can't extract the LoopKeepAlive token
+ // KeepAlive token has to be released in the EventBase thread. If
+ // runInEventBaseThread() fails, we can't extract the KeepAlive token
// from the callback to properly release it.
CHECK(evb_.runInEventBaseThread([
- keepAlive = loopKeepAliveAtomic(),
+ keepAliveToken = getKeepAliveToken(),
f = std::forward<F>(f)
]() mutable { f(); }));
}
runInEventBaseThread(std::move(f));
}
- struct LoopKeepAliveDeleter {
- void operator()(VirtualEventBase* evb) {
- DCHECK(evb->getEventBase().inRunningEventBaseThread());
- if (evb->loopKeepAliveCountAtomic_.load()) {
- evb->loopKeepAliveCount_ += evb->loopKeepAliveCountAtomic_.exchange(0);
- }
- DCHECK(evb->loopKeepAliveCount_ > 0);
- if (--evb->loopKeepAliveCount_ == 0) {
- evb->loopKeepAliveBaton_.post();
- }
- }
- };
- using LoopKeepAlive = std::unique_ptr<VirtualEventBase, LoopKeepAliveDeleter>;
-
/**
* Returns you a handle which prevents VirtualEventBase from being destroyed.
- * LoopKeepAlive handle can be released from EventBase loop only.
- *
- * loopKeepAlive() can be called from EventBase thread only.
+ * KeepAlive handle can be released from EventBase loop only.
*/
- LoopKeepAlive loopKeepAlive() {
- DCHECK(evb_.isInEventBaseThread());
- ++loopKeepAliveCount_;
- return LoopKeepAlive(this);
- }
-
- /**
- * Thread-safe version of loopKeepAlive()
- */
- LoopKeepAlive loopKeepAliveAtomic() {
+ KeepAlive getKeepAliveToken() override {
if (evb_.inRunningEventBaseThread()) {
- return loopKeepAlive();
+ ++loopKeepAliveCount_;
+ } else {
+ ++loopKeepAliveCountAtomic_;
}
- ++loopKeepAliveCountAtomic_;
- return LoopKeepAlive(this);
+ return makeKeepAlive();
}
bool inRunningEventBaseThread() const {
return evb_.inRunningEventBaseThread();
}
+ protected:
+ void keepAliveRelease() override {
+ DCHECK(getEventBase().inRunningEventBaseThread());
+ if (loopKeepAliveCountAtomic_.load()) {
+ loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0);
+ }
+ DCHECK(loopKeepAliveCount_ > 0);
+ if (--loopKeepAliveCount_ == 0) {
+ loopKeepAliveBaton_.post();
+ }
+ }
+
private:
using LoopCallbackList = EventBase::LoopCallback::List;
ssize_t loopKeepAliveCount_{0};
std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
folly::Baton<> loopKeepAliveBaton_;
- LoopKeepAlive loopKeepAlive_;
+ KeepAlive loopKeepAlive_;
- EventBase::LoopKeepAlive evbLoopKeepAlive_;
+ KeepAlive evbLoopKeepAlive_;
folly::Synchronized<LoopCallbackList> onDestructionCallbacks_;
};
/*
+ * Copyright 2017-present Facebook, Inc.
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* specific language governing permissions and limitations
* under the License.
*/
-
#include <folly/Memory.h>
#include <folly/ScopeGuard.h>
EventBase evb;
bool done = false;
- std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+ std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
std::thread t;
evb.runInEventBaseThread([&] {
- t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+ t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
{
auto* ev = evb.get();
- EventBase::LoopKeepAlive keepAlive;
+ Executor::KeepAlive keepAlive;
ev->runInEventBaseThreadAndWait(
- [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
+ [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
ASSERT_FALSE(done) << "Loop finished before we asked it to";
ev->terminateLoopSoon();
/* sleep override */
std::thread t([
&done,
- loopKeepAlive = evb->loopKeepAlive(),
+ loopKeepAlive = evb->getKeepAliveToken(),
evbPtr = evb.get()
]() mutable {
/* sleep override */ std::this_thread::sleep_for(
for (size_t i = 0; i < kNumThreads; ++i) {
ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
- std::vector<EventBase::LoopKeepAlive> keepAlives;
+ std::vector<Executor::KeepAlive> keepAlives;
for (size_t j = 0; j < kNumTasks; ++j) {
- keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
+ keepAlives.emplace_back(evbPtr->getKeepAliveToken());
}
batonPtr->post();