std::chrono::milliseconds timeout,
const SSLContext::SSLVerifyPeerEnum& verifyPeer) {
DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
verifyPeer_ = verifyPeer;
// Make sure we're in the uninitialized state
std::chrono::milliseconds timeout,
const SSLContext::SSLVerifyPeerEnum& verifyPeer) {
DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
// Cache local and remote socket addresses to keep them available
// after socket file descriptor is closed.
VLOG(10) << "AsyncServerSocket::stopAccepting " << this <<
handler.socket_;
}
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// When destroy is called, unregister and close the socket immediately.
accepting_ = false;
void AsyncServerSocket::attachEventBase(EventBase *eventBase) {
assert(eventBase_ == nullptr);
- assert(eventBase->isInEventBaseThread());
+ eventBase->dcheckIsInEventBaseThread();
eventBase_ = eventBase;
for (auto& handler : sockets_) {
void AsyncServerSocket::detachEventBase() {
assert(eventBase_ != nullptr);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
assert(!accepting_);
eventBase_ = nullptr;
}
void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
if (sockets_.size() > 0) {
throw std::invalid_argument(
}
void AsyncServerSocket::bind(const SocketAddress& address) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// useExistingSocket() may have been called to initialize socket_ already.
// However, in the normal case we need to create a new socket now.
}
void AsyncServerSocket::listen(int backlog) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// Start listening
for (auto& handler : sockets_) {
void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
EventBase *eventBase,
uint32_t maxAtOnce) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// If this is the first accept callback and we are supposed to be accepting,
// start accepting once the callback is installed.
void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback,
EventBase *eventBase) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// Find the matching AcceptCallback.
// We just do a simple linear search; we don't expect removeAcceptCallback()
}
void AsyncServerSocket::startAccepting() {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
accepting_ = true;
if (callbacks_.empty()) {
}
void AsyncServerSocket::pauseAccepting() {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
accepting_ = false;
for (auto& handler : sockets_) {
handler. unregisterHandler();
// the backoff timeout.
assert(accepting_);
// We can't be detached from the EventBase without being paused
- assert(eventBase_ != nullptr && eventBase_->isInEventBaseThread());
+ assert(eventBase_ != nullptr);
+ eventBase_->dcheckIsInEventBaseThread();
// If all of the callbacks were removed, we shouldn't re-enable accepts
if (callbacks_.empty()) {
* socket's primary EventBase.
*/
int64_t getNumPendingMessagesInQueue() const {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
int64_t numMsgs = 0;
for (const auto& callback : callbacks_) {
numMsgs += callback.consumer->getQueue()->size();
// init() method, since constructor forwarding isn't supported in most
// compilers yet.
void AsyncSocket::init() {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
shutdownFlags_ = 0;
state_ = StateEnum::UNINIT;
eventFlags_ = EventHandler::NONE;
const OptionMap &options,
const folly::SocketAddress& bindAddr) noexcept {
DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
addr_ = address;
void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
sendTimeout_ = milliseconds;
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// If we are currently pending on write requests, immediately update
// writeTimeout_ with the new value.
}
DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
if (callback == nullptr) {
// We should be able to reset the callback regardless of the
}
DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
switch ((StateEnum)state_) {
case StateEnum::CONNECTING:
<< ", state=" << state_;
DestructorGuard dg(this);
unique_ptr<IOBuf>ioBuf(std::move(buf));
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
// No new writes may be performed after the write side of the socket has
// Declare a DestructorGuard to ensure that the AsyncSocket cannot be
// destroyed until close() returns.
DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
// Since there are write requests pending, we have to set the
// SHUT_WRITE_PENDING flag, and wait to perform the real close until the
<< ", state=" << state_ << ", shutdownFlags="
<< std::hex << (int) shutdownFlags_;
DestructorGuard dg(this);
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
switch (state_) {
case StateEnum::ESTABLISHED:
return;
}
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
// There are pending writes. Set SHUT_WRITE_PENDING so that the actual
// shutdown will be performed once all writes complete.
}
DestructorGuard dg(this);
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
switch (static_cast<StateEnum>(state_)) {
case StateEnum::ESTABLISHED:
<< ", state=" << state_ << ", events="
<< std::hex << eventFlags_ << ")";
assert(eventBase_ == nullptr);
- assert(eventBase->isInEventBaseThread());
+ eventBase->dcheckIsInEventBaseThread();
eventBase_ = eventBase;
ioHandler_.attachEventBase(eventBase);
<< ", old evb=" << eventBase_ << ", state=" << state_
<< ", events=" << std::hex << eventFlags_ << ")";
assert(eventBase_ != nullptr);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
eventBase_ = nullptr;
ioHandler_.detachEventBase();
bool AsyncSocket::isDetachable() const {
DCHECK(eventBase_ != nullptr);
- DCHECK(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
}
<< ", events=" << std::hex << events << ", state=" << state_;
DestructorGuard dg(this);
assert(events & EventHandler::READ_WRITE);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
EventBase* originalEventBase = eventBase_;
VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
<< "state=" << state_ << ", events=" << std::hex << eventFlags_;
DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
if (state_ == StateEnum::CONNECTING) {
// connect() timed out
* and call all currently installed callbacks. After an error, the
* AsyncSocket is completely unregistered.
*
- * @return Returns true on succcess, or false on error.
+ * @return Returns true on success, or false on error.
*/
bool AsyncSocket::updateEventRegistration() {
VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
<< ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
<< ", events=" << std::hex << eventFlags_;
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
if (eventFlags_ == EventHandler::NONE) {
ioHandler_.unregisterHandler();
return true;
eventBase_(evb),
fd_(-1),
readCallback_(nullptr) {
- DCHECK(evb->isInEventBaseThread());
+ evb->dcheckIsInEventBaseThread();
}
AsyncUDPSocket::~AsyncUDPSocket() {
}
void AsyncUDPSocket::close() {
- DCHECK(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
if (readCallback_) {
auto cob = readCallback_;
fnRunner_->setMaxReadAtOnce(maxAtOnce);
}
+void EventBase::checkIsInEventBaseThread() const {
+ auto evbTid = loopThread_.load(std::memory_order_relaxed);
+ if (evbTid == std::thread::id()) {
+ return;
+ }
+
+ // Using getThreadName(evbTid) instead of name_ will work also if
+ // the thread name is set outside of EventBase (and name_ is empty).
+ auto curTid = std::this_thread::get_id();
+ CHECK(evbTid == curTid)
+ << "This logic must be executed in the event base thread. "
+ << "Event base thread name: \""
+ << folly::getThreadName(evbTid).value_or("")
+ << "\", current thread name: \""
+ << folly::getThreadName(curTid).value_or("") << "\"";
+}
+
// Set smoothing coefficient for loop load average; input is # of milliseconds
// for exp(-1) decay.
void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
}
void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
- DCHECK(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
callback->cancelLoopCallback();
callback->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
}
void EventBase::runInLoop(Func cob, bool thisIteration) {
- DCHECK(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
auto wrapper = new FunctionLoopCallback(std::move(cob));
wrapper->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
}
void EventBase::runBeforeLoop(LoopCallback* callback) {
- DCHECK(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
callback->cancelLoopCallback();
runBeforeLoopCallbacks_.push_back(*callback);
}
bool EventBase::scheduleTimeout(AsyncTimeout* obj,
TimeoutManager::timeout_type timeout) {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
// Set up the timeval and add the event
struct timeval tv;
tv.tv_sec = long(timeout.count() / 1000LL);
}
void EventBase::cancelTimeout(AsyncTimeout* obj) {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
struct event* ev = obj->getEvent();
if (EventUtil::isEventRegistered(ev)) {
event_del(ev);
}
void EventBase::setName(const std::string& name) {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
name_ = name;
if (isRunning()) {
}
const std::string& EventBase::getName() {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
return name_;
}
std::this_thread::get_id();
}
+ /**
+ * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
+ * dcheckIsInEventBaseThread), but it prints more information on
+ * failure.
+ */
+ void checkIsInEventBaseThread() const;
+ void dcheckIsInEventBaseThread() const {
+ if (kIsDebug) {
+ checkIsInEventBaseThread();
+ }
+ }
+
HHWheelTimer& timer() {
if (!wheelTimer_) {
wheelTimer_ = HHWheelTimer::newTimer(this);
protected:
void keepAliveRelease() override {
- DCHECK(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
loopKeepAliveCount_--;
}
}
void* EventBaseLocalBase::getVoid(EventBase& evb) {
- DCHECK(evb.isInEventBaseThread());
+ evb.dcheckIsInEventBaseThread();
return folly::get_default(evb.localStorage_, key_, {}).get();
}
void EventBaseLocalBase::erase(EventBase& evb) {
- DCHECK(evb.isInEventBaseThread());
+ evb.dcheckIsInEventBaseThread();
evb.localStorage_.erase(key_);
evb.localStorageToDtor_.erase(this);
}
void EventBaseLocalBase::onEventBaseDestruction(EventBase& evb) {
- DCHECK(evb.isInEventBaseThread());
+ evb.dcheckIsInEventBaseThread();
SYNCHRONIZED(eventBases_) {
eventBases_.erase(&evb);
}
void EventBaseLocalBase::setVoid(EventBase& evb, std::shared_ptr<void>&& ptr) {
- DCHECK(evb.isInEventBaseThread());
+ evb.dcheckIsInEventBaseThread();
auto alreadyExists =
evb.localStorage_.find(key_) != evb.localStorage_.end();
assert(event_.ev_base == nullptr);
assert(!isHandlerRegistered());
// This must be invoked from the EventBase's thread
- assert(eventBase->isInEventBaseThread());
+ eventBase->dcheckIsInEventBaseThread();
setEventBase(eventBase);
}
void NotificationQueue<MessageT>::Consumer::init(
EventBase* eventBase,
NotificationQueue* queue) {
- assert(eventBase->isInEventBaseThread());
+ eventBase->dcheckIsInEventBaseThread();
assert(queue_ == nullptr);
assert(!isHandlerRegistered());
queue->checkPid();
protected:
void keepAliveRelease() override {
- DCHECK(getEventBase().isInEventBaseThread());
+ getEventBase().dcheckIsInEventBaseThread();
if (loopKeepAliveCountAtomic_.load()) {
loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0);
}