workers_.insert({h, worker});
for(auto socket : *sockets_) {
- socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){
+ socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
+ [this, worker, socket](){
socketFactory_->addAcceptCB(
socket, worker.get(), worker->getEventBase());
});
CHECK(worker != workers_.end());
for (auto& socket : *sockets_) {
- folly::Baton<> barrier;
- socket->getEventBase()->runInEventBaseThreadAndWait([&]() {
- socketFactory_->removeAcceptCB(
- socket, worker->second.get(), nullptr);
- barrier.post();
+ socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
+ [&]() {
+ socketFactory_->removeAcceptCB(
+ socket, worker->second.get(), nullptr);
});
- barrier.wait();
}
if (!worker->second->getEventBase()->isInEventBaseThread()) {
- worker->second->getEventBase()->runInEventBaseThreadAndWait([=]() {
+ worker->second->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
+ [=]() {
worker->second->dropAllConnections();
});
} else {
// Startup all the threads
workerFactory_->forEachWorker([this, socket](Acceptor* worker){
- socket->getEventBase()->runInEventBaseThreadAndWait(
+ socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[this, worker, socket](){
socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
});
for (auto& socket : new_sockets) {
// Startup all the threads
workerFactory_->forEachWorker([this, socket](Acceptor* worker){
- socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){
- socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
+ socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
+ [this, worker, socket](){
+ socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
});
});
*/
void stop() {
for (auto socket : sockets_) {
- folly::Baton<> barrier;
- socket->getEventBase()->runInEventBaseThread([&]() mutable {
- socketFactory_->stopSocket(socket);
- barrier.post();
+ socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
+ [&]() mutable {
+ socketFactory_->stopSocket(socket);
});
- barrier.wait();
}
sockets_.clear();
}