TestServer server;
server.childPipeline(std::make_shared<TestPipelineFactory>());
server.bind(0);
- auto base = EventBaseManager::get()->getEventBase();
- base->runAfterDelay([&](){
- server.stop();
- }, 500);
- base->loop();
+ server.stop();
}
TEST(Bootstrap, ClientServerTest) {
TestClient client;
client.connect(address);
- base->runAfterDelay([&](){
- server.stop();
- }, 500);
base->loop();
+ server.stop();
CHECK(factory->pipelines == 1);
}
TestClient client2;
client2.connect(address);
- base->runAfterDelay([&](){
- server.stop();
- }, 500);
-
base->loop();
+ server.stop();
CHECK(factory->pipelines == 2);
}
+
+TEST(Bootstrap, ServerAcceptGroupTest) {
+ // Verify that server is using the accept IO group
+
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ server.childPipeline(factory);
+ server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
+ server.bind(0);
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ boost::barrier barrier(2);
+ auto thread = std::thread([&](){
+ TestClient client;
+ client.connect(address);
+ EventBaseManager::get()->getEventBase()->loop();
+ barrier.wait();
+ });
+ barrier.wait();
+ server.stop();
+ thread.join();
+
+ CHECK(factory->pipelines == 1);
+}
+
+TEST(Bootstrap, ServerAcceptGroup2Test) {
+ // Verify that server is using the accept IO group
+
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ server.childPipeline(factory);
+ server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
+ server.bind(0);
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ TestClient client;
+ client.connect(address);
+ EventBaseManager::get()->getEventBase()->loop();
+
+ server.stop();
+
+ CHECK(factory->pipelines == 1);
+}
std::thread ServerWorkerFactory::newThread(
folly::wangle::Func&& func) {
+ auto id = nextWorkerId_++;
+ auto worker = acceptorFactory_->newAcceptor();
+ {
+ folly::RWSpinLock::WriteHolder guard(workersLock_);
+ workers_.insert({id, worker});
+ }
return internalFactory_->newThread([=](){
- auto id = nextWorkerId_++;
- auto worker = acceptorFactory_->newAcceptor();
- {
- folly::RWSpinLock::WriteHolder guard(workersLock_);
- workers_.insert({id, worker});
- }
EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
func();
EventBaseManager::get()->clearEventBase();
#pragma once
#include <folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h>
+#include <boost/thread.hpp>
namespace folly {
* @param port Port to listen on
*/
void bind(int port) {
- // TODO bind to v4 and v6
// TODO take existing socket
- // TODO use the acceptor thread
- auto socket = folly::AsyncServerSocket::newSocket(
- EventBaseManager::get()->getEventBase());
- sockets_.push_back(socket);
- socket->bind(port);
-
- // TODO Take ServerSocketConfig
- socket->listen(1024);
if (!workerFactory_) {
group(nullptr);
}
+ bool reusePort = false;
+ if (acceptor_group_->numThreads() >= 0) {
+ reusePort = true;
+ }
+
+ std::mutex sock_lock;
+ std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
+
+ auto startupFunc = [&](std::shared_ptr<boost::barrier> barrier){
+ auto socket = folly::AsyncServerSocket::newSocket();
+ sock_lock.lock();
+ new_sockets.push_back(socket);
+ sock_lock.unlock();
+ socket->setReusePortEnabled(reusePort);
+ socket->attachEventBase(EventBaseManager::get()->getEventBase());
+ socket->bind(port);
+ // TODO Take ServerSocketConfig
+ socket->listen(1024);
+ socket->startAccepting();
+
+ if (port == 0) {
+ SocketAddress address;
+ socket->getAddress(&address);
+ port = address.getPort();
+ }
+
+ barrier->wait();
+ };
+
+ auto bind0 = std::make_shared<boost::barrier>(2);
+ acceptor_group_->add(std::bind(startupFunc, bind0));
+ bind0->wait();
+
+ auto barrier = std::make_shared<boost::barrier>(acceptor_group_->numThreads());
+ for (int i = 1; i < acceptor_group_->numThreads(); i++) {
+ acceptor_group_->add(std::bind(startupFunc, barrier));
+ }
+ barrier->wait();
+
// Startup all the threads
- workerFactory_->forEachWorker([this, socket](Acceptor* worker){
+ for(auto socket : new_sockets) {
+ workerFactory_->forEachWorker([this, socket](Acceptor* worker){
socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
socket->addAcceptCallback(worker, worker->getEventBase());
});
- });
- socket->startAccepting();
+ });
+ }
+
+ for (auto& socket : new_sockets) {
+ sockets_.push_back(socket);
+ }
}
/*
* Stop listening on all sockets.
*/
void stop() {
- for (auto& socket : sockets_) {
- socket->stopAccepting();
+ auto barrier = std::make_shared<boost::barrier>(sockets_.size() + 1);
+ for (auto socket : sockets_) {
+ socket->getEventBase()->runInEventBaseThread([barrier, socket]() {
+ socket->stopAccepting();
+ socket->detachEventBase();
+ barrier->wait();
+ });
}
+ barrier->wait();
+ sockets_.clear();
+
acceptor_group_->join();
io_group_->join();
}