wangle/acceptor/ServerSocketConfig.h \
wangle/acceptor/SocketOptions.h \
wangle/acceptor/TransportInfo.h \
+ wangle/bootstrap/ServerBootstrap.h \
+ wangle/bootstrap/ServerBootstrap-inl.h \
+ wangle/bootstrap/ClientBootstrap.h \
wangle/channel/AsyncSocketHandler.h \
wangle/channel/ChannelHandler.h \
wangle/channel/ChannelHandlerContext.h \
wangle/acceptor/ManagedConnection.cpp \
wangle/acceptor/SocketOptions.cpp \
wangle/acceptor/TransportInfo.cpp \
+ wangle/bootstrap/ServerBootstrap.cpp \
wangle/concurrent/CPUThreadPoolExecutor.cpp \
wangle/concurrent/Codel.cpp \
wangle/concurrent/IOThreadPoolExecutor.cpp \
*/
void drainAllConnections();
+ /**
+ * Drop all connections.
+ *
+ * forceStop() schedules dropAllConnections() to be called in the acceptor's
+ * thread.
+ */
+ void dropAllConnections();
+
protected:
friend class AcceptorHandshakeHelper;
std::chrono::milliseconds acceptLatency,
SSLErrorEnum error) noexcept {}
- /**
- * Drop all connections.
- *
- * forceStop() schedules dropAllConnections() to be called in the acceptor's
- * thread.
- */
- void dropAllConnections();
-
protected:
/**
class AcceptorFactory {
public:
- virtual std::shared_ptr<Acceptor> newAcceptor() = 0;
+ virtual std::shared_ptr<Acceptor> newAcceptor(folly::EventBase*) = 0;
virtual ~AcceptorFactory() = default;
};
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include <boost/thread.hpp>
using namespace folly::wangle;
using namespace folly;
server.stop();
CHECK(factory->pipelines == 5);
}
+
+TEST(Bootstrap, ExistingSocket) {
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ server.childPipeline(factory);
+ folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
+ server.bind(std::move(socket));
+}
public:
explicit ServerAcceptor(
- std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory)
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
+ EventBase* base)
: Acceptor(ServerSocketConfig())
, pipelineFactory_(pipelineFactory) {
- Acceptor::init(nullptr, &base_);
+ Acceptor::init(nullptr, base);
}
/* See Acceptor::onNewConnection for details */
Acceptor::addConnection(connection);
}
- ~ServerAcceptor() {
- Acceptor::dropAllConnections();
- }
-
private:
- EventBase base_;
-
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
};
std::shared_ptr<PipelineFactory<Pipeline>> factory)
: factory_(factory) {}
- std::shared_ptr<Acceptor> newAcceptor() {
- return std::make_shared<ServerAcceptor<Pipeline>>(factory_);
+ std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
+ return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
}
private:
std::shared_ptr<PipelineFactory<Pipeline>> factory_;
};
-class ServerWorkerFactory : public folly::wangle::ThreadFactory {
+class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
public:
- explicit ServerWorkerFactory(std::shared_ptr<AcceptorFactory> acceptorFactory)
- : internalFactory_(
- std::make_shared<folly::wangle::NamedThreadFactory>("BootstrapWorker"))
- , acceptorFactory_(acceptorFactory)
- {}
- virtual std::thread newThread(folly::Func&& func) override;
-
- void setInternalFactory(
- std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory);
- void setNamePrefix(folly::StringPiece prefix);
+ explicit ServerWorkerPool(
+ std::shared_ptr<AcceptorFactory> acceptorFactory,
+ folly::wangle::IOThreadPoolExecutor* exec,
+ std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
+ : acceptorFactory_(acceptorFactory)
+ , exec_(exec)
+ , sockets_(sockets) {
+ CHECK(exec);
+ }
template <typename F>
- void forEachWorker(F&& f);
+ void forEachWorker(F&& f) const;
+
+ void threadStarted(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle*);
+ void threadStopped(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle*);
+ void threadPreviouslyStarted(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
+ threadStarted(thread);
+ }
+ void threadNotYetStopped(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
+ threadStopped(thread);
+ }
private:
- std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory_;
- folly::RWSpinLock workersLock_;
- std::map<int32_t, std::shared_ptr<Acceptor>> workers_;
- int32_t nextWorkerId_{0};
-
+ std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
+ std::shared_ptr<Acceptor>> workers_;
std::shared_ptr<AcceptorFactory> acceptorFactory_;
+ folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
+ std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
};
template <typename F>
-void ServerWorkerFactory::forEachWorker(F&& f) {
- folly::RWSpinLock::ReadHolder guard(workersLock_);
+void ServerWorkerPool::forEachWorker(F&& f) const {
for (const auto& kv : workers_) {
f(kv.second.get());
}
namespace folly {
-std::thread ServerWorkerFactory::newThread(
- folly::Func&& func) {
- auto id = nextWorkerId_++;
- auto worker = acceptorFactory_->newAcceptor();
- {
- folly::RWSpinLock::WriteHolder guard(workersLock_);
- workers_.insert({id, worker});
+void ServerWorkerPool::threadStarted(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
+ auto worker = acceptorFactory_->newAcceptor(exec_->getEventBase(h));
+ workers_.insert({h, worker});
+
+ for(auto socket : *sockets_) {
+ socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
+ socket->addAcceptCallback(worker.get(), worker->getEventBase());
+ });
}
- return internalFactory_->newThread([=](){
- EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
- func();
- EventBaseManager::get()->clearEventBase();
-
- worker->drainAllConnections();
- {
- folly::RWSpinLock::WriteHolder guard(workersLock_);
- workers_.erase(id);
- }
- });
}
-void ServerWorkerFactory::setInternalFactory(
- std::shared_ptr<wangle::NamedThreadFactory> internalFactory) {
- CHECK(workers_.empty());
- internalFactory_ = internalFactory;
-}
+void ServerWorkerPool::threadStopped(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
+ auto worker = workers_.find(h);
+ CHECK(worker != workers_.end());
+
+ for (auto& socket : *sockets_) {
+ folly::Baton<> barrier;
+ socket->getEventBase()->runInEventBaseThread([&]() {
+ socket->removeAcceptCallback(worker->second.get(), nullptr);
+ barrier.post();
+ });
+ barrier.wait();
+ }
+
+ CHECK(worker->second->getEventBase() != nullptr);
+ CHECK(!worker->second->getEventBase()->isInEventBaseThread());
+ folly::Baton<> barrier;
+ worker->second->getEventBase()->runInEventBaseThread([&]() {
+ worker->second->dropAllConnections();
+ barrier.post();
+ });
-void ServerWorkerFactory::setNamePrefix(folly::StringPiece prefix) {
- CHECK(workers_.empty());
- internalFactory_->setNamePrefix(prefix);
+ barrier.wait();
+ workers_.erase(worker);
}
} // namespace
#pragma once
#include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
-#include <boost/thread.hpp>
+#include <folly/Baton.h>
namespace folly {
template <typename Pipeline>
class ServerBootstrap {
public:
+
+ ~ServerBootstrap() {
+ stop();
+ }
/* TODO(davejwatson)
*
* If there is any work to be done BEFORE handing the work to IO
io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
}
- auto factoryBase = io_group->getThreadFactory();
- CHECK(factoryBase);
- auto factory = std::dynamic_pointer_cast<folly::wangle::NamedThreadFactory>(
- factoryBase);
- CHECK(factory); // Must be named thread factory
CHECK(acceptorFactory_ || pipelineFactory_);
if (acceptorFactory_) {
- workerFactory_ = std::make_shared<ServerWorkerFactory>(
- acceptorFactory_);
+ workerFactory_ = std::make_shared<ServerWorkerPool>(
+ acceptorFactory_, io_group.get(), &sockets_);
} else {
- workerFactory_ = std::make_shared<ServerWorkerFactory>(
- std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_));
+ workerFactory_ = std::make_shared<ServerWorkerPool>(
+ std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_),
+ io_group.get(), &sockets_);
}
- workerFactory_->setInternalFactory(factory);
+
+ io_group->addObserver(workerFactory_);
acceptor_group_ = accept_group;
io_group_ = io_group;
- auto numThreads = io_group_->numThreads();
- io_group_->setNumThreads(0);
- io_group_->setThreadFactory(workerFactory_);
- io_group_->setNumThreads(numThreads);
-
return this;
}
+ /*
+ * Bind to an existing socket
+ *
+ * @param sock Existing socket to use for accepting
+ */
+ void bind(folly::AsyncServerSocket::UniquePtr s) {
+ if (!workerFactory_) {
+ group(nullptr);
+ }
+
+ // Since only a single socket is given,
+ // we can only accept on a single thread
+ CHECK(acceptor_group_->numThreads() == 1);
+ std::shared_ptr<folly::AsyncServerSocket> socket(
+ s.release(), DelayedDestruction::Destructor());
+
+ folly::Baton<> barrier;
+ acceptor_group_->add([&](){
+ socket->attachEventBase(EventBaseManager::get()->getEventBase());
+ socket->listen(1024);
+ socket->startAccepting();
+ barrier.post();
+ });
+ barrier.wait();
+
+ // Startup all the threads
+ workerFactory_->forEachWorker([this, socket](Acceptor* worker){
+ socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
+ socket->addAcceptCallback(worker, worker->getEventBase());
+ });
+ });
+
+ sockets_.push_back(socket);
+ }
+
+ void bind(folly::SocketAddress address) {
+ bindImpl(-1, address);
+ }
+
/*
* Bind to a port and start listening.
* One of childPipeline or childHandler must be called before bind
* @param port Port to listen on
*/
void bind(int port) {
- // TODO take existing socket
+ CHECK(port >= 0);
+ bindImpl(port, folly::SocketAddress());
+ }
+ void bindImpl(int port, folly::SocketAddress address) {
if (!workerFactory_) {
group(nullptr);
}
std::mutex sock_lock;
std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
- auto startupFunc = [&](std::shared_ptr<boost::barrier> barrier){
+ auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
auto socket = folly::AsyncServerSocket::newSocket();
sock_lock.lock();
new_sockets.push_back(socket);
sock_lock.unlock();
socket->setReusePortEnabled(reusePort);
socket->attachEventBase(EventBaseManager::get()->getEventBase());
- socket->bind(port);
- // TODO Take ServerSocketConfig
- socket->listen(1024);
+ if (port >= 0) {
+ socket->bind(port);
+ } else {
+ socket->bind(address);
+ port = address.getPort();
+ }
+ socket->listen(socketConfig.acceptBacklog);
socket->startAccepting();
if (port == 0) {
port = address.getPort();
}
- barrier->wait();
+ barrier->post();
};
- auto bind0 = std::make_shared<boost::barrier>(2);
- acceptor_group_->add(std::bind(startupFunc, bind0));
- bind0->wait();
+ auto wait0 = std::make_shared<folly::Baton<>>();
+ acceptor_group_->add(std::bind(startupFunc, wait0));
+ wait0->wait();
- auto barrier = std::make_shared<boost::barrier>(acceptor_group_->numThreads());
for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
+ auto barrier = std::make_shared<folly::Baton<>>();
acceptor_group_->add(std::bind(startupFunc, barrier));
+ barrier->wait();
}
- barrier->wait();
// Startup all the threads
for(auto socket : new_sockets) {
* Stop listening on all sockets.
*/
void stop() {
- auto barrier = std::make_shared<boost::barrier>(sockets_.size() + 1);
for (auto socket : sockets_) {
- socket->getEventBase()->runInEventBaseThread([barrier, socket]() {
+ folly::Baton<> barrier;
+ socket->getEventBase()->runInEventBaseThread([&barrier, socket]() {
socket->stopAccepting();
socket->detachEventBase();
- barrier->wait();
+ barrier.post();
});
+ barrier.wait();
}
- barrier->wait();
sockets_.clear();
- acceptor_group_->join();
- io_group_->join();
+ if (acceptor_group_) {
+ acceptor_group_->join();
+ }
+ if (io_group_) {
+ io_group_->join();
+ }
}
/*
* Get the list of listening sockets
*/
- std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
- getSockets() {
+ const std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
+ getSockets() const {
return sockets_;
}
+ std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
+ return io_group_;
+ }
+
+ template <typename F>
+ void forEachWorker(F&& f) const {
+ workerFactory_->forEachWorker(f);
+ }
+
+ ServerSocketConfig socketConfig;
+
private:
std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
- std::shared_ptr<ServerWorkerFactory> workerFactory_;
+ std::shared_ptr<ServerWorkerPool> workerFactory_;
std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
std::shared_ptr<AcceptorFactory> acceptorFactory_;