From e5f8a09e441addacde4bac3a3c169232d0d31194 Mon Sep 17 00:00:00 2001 From: davejwatson Date: Wed, 10 Dec 2014 07:58:27 -0800 Subject: [PATCH] Multi accept Summary: Use acceptor pool instead of current thread for accepts. Use reuse_port option to support multiple threads Test Plan: Updated unittests Reviewed By: hans@fb.com Subscribers: doug, fugalh, njormrod, folly-diffs@ FB internal diff: D1710619 Tasks: 5788110 Signature: t1:1710619:1417477350:eee5063186e582ef74c4802b8149563af029b3de --- .../wangle/bootstrap/BootstrapTest.cpp | 62 ++++++++++++---- .../wangle/bootstrap/ServerBootstrap.cpp | 12 ++-- .../wangle/bootstrap/ServerBootstrap.h | 72 +++++++++++++++---- 3 files changed, 114 insertions(+), 32 deletions(-) diff --git a/folly/experimental/wangle/bootstrap/BootstrapTest.cpp b/folly/experimental/wangle/bootstrap/BootstrapTest.cpp index 0d0d843f..b3cf00da 100644 --- a/folly/experimental/wangle/bootstrap/BootstrapTest.cpp +++ b/folly/experimental/wangle/bootstrap/BootstrapTest.cpp @@ -62,11 +62,7 @@ TEST(Bootstrap, ServerWithPipeline) { TestServer server; server.childPipeline(std::make_shared()); server.bind(0); - auto base = EventBaseManager::get()->getEventBase(); - base->runAfterDelay([&](){ - server.stop(); - }, 500); - base->loop(); + server.stop(); } TEST(Bootstrap, ClientServerTest) { @@ -81,10 +77,8 @@ TEST(Bootstrap, ClientServerTest) { TestClient client; client.connect(address); - base->runAfterDelay([&](){ - server.stop(); - }, 500); base->loop(); + server.stop(); CHECK(factory->pipelines == 1); } @@ -109,11 +103,55 @@ TEST(Bootstrap, ClientConnectionManagerTest) { TestClient client2; client2.connect(address); - base->runAfterDelay([&](){ - server.stop(); - }, 500); - base->loop(); + server.stop(); CHECK(factory->pipelines == 2); } + +TEST(Bootstrap, ServerAcceptGroupTest) { + // Verify that server is using the accept IO group + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.group(std::make_shared(1), nullptr); + server.bind(0); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + boost::barrier barrier(2); + auto thread = std::thread([&](){ + TestClient client; + client.connect(address); + EventBaseManager::get()->getEventBase()->loop(); + barrier.wait(); + }); + barrier.wait(); + server.stop(); + thread.join(); + + CHECK(factory->pipelines == 1); +} + +TEST(Bootstrap, ServerAcceptGroup2Test) { + // Verify that server is using the accept IO group + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.group(std::make_shared(4), nullptr); + server.bind(0); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.connect(address); + EventBaseManager::get()->getEventBase()->loop(); + + server.stop(); + + CHECK(factory->pipelines == 1); +} diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp b/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp index 17e4bb86..594e9604 100644 --- a/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp +++ b/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp @@ -21,13 +21,13 @@ namespace folly { std::thread ServerWorkerFactory::newThread( folly::wangle::Func&& func) { + auto id = nextWorkerId_++; + auto worker = acceptorFactory_->newAcceptor(); + { + folly::RWSpinLock::WriteHolder guard(workersLock_); + workers_.insert({id, worker}); + } return internalFactory_->newThread([=](){ - auto id = nextWorkerId_++; - auto worker = acceptorFactory_->newAcceptor(); - { - folly::RWSpinLock::WriteHolder guard(workersLock_); - workers_.insert({id, worker}); - } EventBaseManager::get()->setEventBase(worker->getEventBase(), false); func(); EventBaseManager::get()->clearEventBase(); diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap.h b/folly/experimental/wangle/bootstrap/ServerBootstrap.h index 4d55c4e3..f7389d98 100644 --- a/folly/experimental/wangle/bootstrap/ServerBootstrap.h +++ b/folly/experimental/wangle/bootstrap/ServerBootstrap.h @@ -16,6 +16,7 @@ #pragma once #include +#include namespace folly { @@ -137,37 +138,80 @@ class ServerBootstrap { * @param port Port to listen on */ void bind(int port) { - // TODO bind to v4 and v6 // TODO take existing socket - // TODO use the acceptor thread - auto socket = folly::AsyncServerSocket::newSocket( - EventBaseManager::get()->getEventBase()); - sockets_.push_back(socket); - socket->bind(port); - - // TODO Take ServerSocketConfig - socket->listen(1024); if (!workerFactory_) { group(nullptr); } + bool reusePort = false; + if (acceptor_group_->numThreads() >= 0) { + reusePort = true; + } + + std::mutex sock_lock; + std::vector> new_sockets; + + auto startupFunc = [&](std::shared_ptr barrier){ + auto socket = folly::AsyncServerSocket::newSocket(); + sock_lock.lock(); + new_sockets.push_back(socket); + sock_lock.unlock(); + socket->setReusePortEnabled(reusePort); + socket->attachEventBase(EventBaseManager::get()->getEventBase()); + socket->bind(port); + // TODO Take ServerSocketConfig + socket->listen(1024); + socket->startAccepting(); + + if (port == 0) { + SocketAddress address; + socket->getAddress(&address); + port = address.getPort(); + } + + barrier->wait(); + }; + + auto bind0 = std::make_shared(2); + acceptor_group_->add(std::bind(startupFunc, bind0)); + bind0->wait(); + + auto barrier = std::make_shared(acceptor_group_->numThreads()); + for (int i = 1; i < acceptor_group_->numThreads(); i++) { + acceptor_group_->add(std::bind(startupFunc, barrier)); + } + barrier->wait(); + // Startup all the threads - workerFactory_->forEachWorker([this, socket](Acceptor* worker){ + for(auto socket : new_sockets) { + workerFactory_->forEachWorker([this, socket](Acceptor* worker){ socket->getEventBase()->runInEventBaseThread([this, worker, socket](){ socket->addAcceptCallback(worker, worker->getEventBase()); }); - }); - socket->startAccepting(); + }); + } + + for (auto& socket : new_sockets) { + sockets_.push_back(socket); + } } /* * Stop listening on all sockets. */ void stop() { - for (auto& socket : sockets_) { - socket->stopAccepting(); + auto barrier = std::make_shared(sockets_.size() + 1); + for (auto socket : sockets_) { + socket->getEventBase()->runInEventBaseThread([barrier, socket]() { + socket->stopAccepting(); + socket->detachEventBase(); + barrier->wait(); + }); } + barrier->wait(); + sockets_.clear(); + acceptor_group_->join(); io_group_->join(); } -- 2.34.1