From 59e4f0d059f214f8aa185517551fdb99511fece0 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Thu, 19 Feb 2015 10:06:27 -0800 Subject: [PATCH] use ServerBootstrap Summary: Use server bootstrap. * code reuse * Allows multiple accept threads easily. Accept now doesn't happen in the server's eventBase, but a new thread. I think I hit all the corner cases involving existingSocket, duplex, stopListening(), etc. There are a lot of minor changes here to support all the cases, let me know if I should split anything to separate diffs Test Plan: fbconfig -r thrift/lib/cpp2; fbmake runtests Reviewed By: hans@fb.com Subscribers: haijunz, yfeldblum, ruibalp, fbcode-common-diffs@, davejwatson, jsedgwick, trunkagent, doug, alandau, bmatheny, mshneer, folly-diffs@, mcduff, andrewcox, fugalh FB internal diff: D1732895 Tasks: 5788102 Signature: t1:1732895:1423087631:379bbb131c35ce2221801bc7cec226f87ba0b1d9 --- folly/Makefile.am | 4 + folly/wangle/acceptor/Acceptor.h | 18 +-- folly/wangle/bootstrap/BootstrapTest.cpp | 9 ++ folly/wangle/bootstrap/ServerBootstrap-inl.h | 63 ++++----- folly/wangle/bootstrap/ServerBootstrap.cpp | 58 +++++---- folly/wangle/bootstrap/ServerBootstrap.h | 129 +++++++++++++------ 6 files changed, 181 insertions(+), 100 deletions(-) diff --git a/folly/Makefile.am b/folly/Makefile.am index a8d54828..b273f71d 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -225,6 +225,9 @@ nobase_follyinclude_HEADERS = \ wangle/acceptor/ServerSocketConfig.h \ wangle/acceptor/SocketOptions.h \ wangle/acceptor/TransportInfo.h \ + wangle/bootstrap/ServerBootstrap.h \ + wangle/bootstrap/ServerBootstrap-inl.h \ + wangle/bootstrap/ClientBootstrap.h \ wangle/channel/AsyncSocketHandler.h \ wangle/channel/ChannelHandler.h \ wangle/channel/ChannelHandlerContext.h \ @@ -343,6 +346,7 @@ libfolly_la_SOURCES = \ wangle/acceptor/ManagedConnection.cpp \ wangle/acceptor/SocketOptions.cpp \ wangle/acceptor/TransportInfo.cpp \ + wangle/bootstrap/ServerBootstrap.cpp \ wangle/concurrent/CPUThreadPoolExecutor.cpp \ wangle/concurrent/Codel.cpp \ wangle/concurrent/IOThreadPoolExecutor.cpp \ diff --git a/folly/wangle/acceptor/Acceptor.h b/folly/wangle/acceptor/Acceptor.h index 22c46719..ee73b471 100644 --- a/folly/wangle/acceptor/Acceptor.h +++ b/folly/wangle/acceptor/Acceptor.h @@ -180,6 +180,14 @@ class Acceptor : */ void drainAllConnections(); + /** + * Drop all connections. + * + * forceStop() schedules dropAllConnections() to be called in the acceptor's + * thread. + */ + void dropAllConnections(); + protected: friend class AcceptorHandshakeHelper; @@ -238,14 +246,6 @@ class Acceptor : std::chrono::milliseconds acceptLatency, SSLErrorEnum error) noexcept {} - /** - * Drop all connections. - * - * forceStop() schedules dropAllConnections() to be called in the acceptor's - * thread. - */ - void dropAllConnections(); - protected: /** @@ -340,7 +340,7 @@ class Acceptor : class AcceptorFactory { public: - virtual std::shared_ptr newAcceptor() = 0; + virtual std::shared_ptr newAcceptor(folly::EventBase*) = 0; virtual ~AcceptorFactory() = default; }; diff --git a/folly/wangle/bootstrap/BootstrapTest.cpp b/folly/wangle/bootstrap/BootstrapTest.cpp index 9f2f664e..4bbd80c2 100644 --- a/folly/wangle/bootstrap/BootstrapTest.cpp +++ b/folly/wangle/bootstrap/BootstrapTest.cpp @@ -20,6 +20,7 @@ #include #include +#include using namespace folly::wangle; using namespace folly; @@ -227,3 +228,11 @@ TEST(Bootstrap, SharedThreadPool) { server.stop(); CHECK(factory->pipelines == 5); } + +TEST(Bootstrap, ExistingSocket) { + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket); + server.bind(std::move(socket)); +} diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h index a224a9f9..acaa29be 100644 --- a/folly/wangle/bootstrap/ServerBootstrap-inl.h +++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h @@ -53,10 +53,11 @@ class ServerAcceptor : public Acceptor { public: explicit ServerAcceptor( - std::shared_ptr> pipelineFactory) + std::shared_ptr> pipelineFactory, + EventBase* base) : Acceptor(ServerSocketConfig()) , pipelineFactory_(pipelineFactory) { - Acceptor::init(nullptr, &base_); + Acceptor::init(nullptr, base); } /* See Acceptor::onNewConnection for details */ @@ -74,13 +75,7 @@ class ServerAcceptor : public Acceptor { Acceptor::addConnection(connection); } - ~ServerAcceptor() { - Acceptor::dropAllConnections(); - } - private: - EventBase base_; - std::shared_ptr> pipelineFactory_; }; @@ -91,41 +86,51 @@ class ServerAcceptorFactory : public AcceptorFactory { std::shared_ptr> factory) : factory_(factory) {} - std::shared_ptr newAcceptor() { - return std::make_shared>(factory_); + std::shared_ptr newAcceptor(folly::EventBase* base) { + return std::make_shared>(factory_, base); } private: std::shared_ptr> factory_; }; -class ServerWorkerFactory : public folly::wangle::ThreadFactory { +class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer { public: - explicit ServerWorkerFactory(std::shared_ptr acceptorFactory) - : internalFactory_( - std::make_shared("BootstrapWorker")) - , acceptorFactory_(acceptorFactory) - {} - virtual std::thread newThread(folly::Func&& func) override; - - void setInternalFactory( - std::shared_ptr internalFactory); - void setNamePrefix(folly::StringPiece prefix); + explicit ServerWorkerPool( + std::shared_ptr acceptorFactory, + folly::wangle::IOThreadPoolExecutor* exec, + std::vector>* sockets) + : acceptorFactory_(acceptorFactory) + , exec_(exec) + , sockets_(sockets) { + CHECK(exec); + } template - void forEachWorker(F&& f); + void forEachWorker(F&& f) const; + + void threadStarted( + folly::wangle::ThreadPoolExecutor::ThreadHandle*); + void threadStopped( + folly::wangle::ThreadPoolExecutor::ThreadHandle*); + void threadPreviouslyStarted( + folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) { + threadStarted(thread); + } + void threadNotYetStopped( + folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) { + threadStopped(thread); + } private: - std::shared_ptr internalFactory_; - folly::RWSpinLock workersLock_; - std::map> workers_; - int32_t nextWorkerId_{0}; - + std::map> workers_; std::shared_ptr acceptorFactory_; + folly::wangle::IOThreadPoolExecutor* exec_{nullptr}; + std::vector>* sockets_; }; template -void ServerWorkerFactory::forEachWorker(F&& f) { - folly::RWSpinLock::ReadHolder guard(workersLock_); +void ServerWorkerPool::forEachWorker(F&& f) const { for (const auto& kv : workers_) { f(kv.second.get()); } diff --git a/folly/wangle/bootstrap/ServerBootstrap.cpp b/folly/wangle/bootstrap/ServerBootstrap.cpp index 1f07fadf..dc7d5d1e 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.cpp +++ b/folly/wangle/bootstrap/ServerBootstrap.cpp @@ -19,36 +19,42 @@ namespace folly { -std::thread ServerWorkerFactory::newThread( - folly::Func&& func) { - auto id = nextWorkerId_++; - auto worker = acceptorFactory_->newAcceptor(); - { - folly::RWSpinLock::WriteHolder guard(workersLock_); - workers_.insert({id, worker}); +void ServerWorkerPool::threadStarted( + folly::wangle::ThreadPoolExecutor::ThreadHandle* h) { + auto worker = acceptorFactory_->newAcceptor(exec_->getEventBase(h)); + workers_.insert({h, worker}); + + for(auto socket : *sockets_) { + socket->getEventBase()->runInEventBaseThread([this, worker, socket](){ + socket->addAcceptCallback(worker.get(), worker->getEventBase()); + }); } - return internalFactory_->newThread([=](){ - EventBaseManager::get()->setEventBase(worker->getEventBase(), false); - func(); - EventBaseManager::get()->clearEventBase(); - - worker->drainAllConnections(); - { - folly::RWSpinLock::WriteHolder guard(workersLock_); - workers_.erase(id); - } - }); } -void ServerWorkerFactory::setInternalFactory( - std::shared_ptr internalFactory) { - CHECK(workers_.empty()); - internalFactory_ = internalFactory; -} +void ServerWorkerPool::threadStopped( + folly::wangle::ThreadPoolExecutor::ThreadHandle* h) { + auto worker = workers_.find(h); + CHECK(worker != workers_.end()); + + for (auto& socket : *sockets_) { + folly::Baton<> barrier; + socket->getEventBase()->runInEventBaseThread([&]() { + socket->removeAcceptCallback(worker->second.get(), nullptr); + barrier.post(); + }); + barrier.wait(); + } + + CHECK(worker->second->getEventBase() != nullptr); + CHECK(!worker->second->getEventBase()->isInEventBaseThread()); + folly::Baton<> barrier; + worker->second->getEventBase()->runInEventBaseThread([&]() { + worker->second->dropAllConnections(); + barrier.post(); + }); -void ServerWorkerFactory::setNamePrefix(folly::StringPiece prefix) { - CHECK(workers_.empty()); - internalFactory_->setNamePrefix(prefix); + barrier.wait(); + workers_.erase(worker); } } // namespace diff --git a/folly/wangle/bootstrap/ServerBootstrap.h b/folly/wangle/bootstrap/ServerBootstrap.h index dd5cbc56..142c4871 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.h +++ b/folly/wangle/bootstrap/ServerBootstrap.h @@ -16,7 +16,7 @@ #pragma once #include -#include +#include namespace folly { @@ -37,6 +37,10 @@ namespace folly { template class ServerBootstrap { public: + + ~ServerBootstrap() { + stop(); + } /* TODO(davejwatson) * * If there is any work to be done BEFORE handing the work to IO @@ -103,34 +107,65 @@ class ServerBootstrap { io_group = std::make_shared( 32, std::make_shared("IO Thread")); } - auto factoryBase = io_group->getThreadFactory(); - CHECK(factoryBase); - auto factory = std::dynamic_pointer_cast( - factoryBase); - CHECK(factory); // Must be named thread factory CHECK(acceptorFactory_ || pipelineFactory_); if (acceptorFactory_) { - workerFactory_ = std::make_shared( - acceptorFactory_); + workerFactory_ = std::make_shared( + acceptorFactory_, io_group.get(), &sockets_); } else { - workerFactory_ = std::make_shared( - std::make_shared>(pipelineFactory_)); + workerFactory_ = std::make_shared( + std::make_shared>(pipelineFactory_), + io_group.get(), &sockets_); } - workerFactory_->setInternalFactory(factory); + + io_group->addObserver(workerFactory_); acceptor_group_ = accept_group; io_group_ = io_group; - auto numThreads = io_group_->numThreads(); - io_group_->setNumThreads(0); - io_group_->setThreadFactory(workerFactory_); - io_group_->setNumThreads(numThreads); - return this; } + /* + * Bind to an existing socket + * + * @param sock Existing socket to use for accepting + */ + void bind(folly::AsyncServerSocket::UniquePtr s) { + if (!workerFactory_) { + group(nullptr); + } + + // Since only a single socket is given, + // we can only accept on a single thread + CHECK(acceptor_group_->numThreads() == 1); + std::shared_ptr socket( + s.release(), DelayedDestruction::Destructor()); + + folly::Baton<> barrier; + acceptor_group_->add([&](){ + socket->attachEventBase(EventBaseManager::get()->getEventBase()); + socket->listen(1024); + socket->startAccepting(); + barrier.post(); + }); + barrier.wait(); + + // Startup all the threads + workerFactory_->forEachWorker([this, socket](Acceptor* worker){ + socket->getEventBase()->runInEventBaseThread([this, worker, socket](){ + socket->addAcceptCallback(worker, worker->getEventBase()); + }); + }); + + sockets_.push_back(socket); + } + + void bind(folly::SocketAddress address) { + bindImpl(-1, address); + } + /* * Bind to a port and start listening. * One of childPipeline or childHandler must be called before bind @@ -138,8 +173,11 @@ class ServerBootstrap { * @param port Port to listen on */ void bind(int port) { - // TODO take existing socket + CHECK(port >= 0); + bindImpl(port, folly::SocketAddress()); + } + void bindImpl(int port, folly::SocketAddress address) { if (!workerFactory_) { group(nullptr); } @@ -152,16 +190,20 @@ class ServerBootstrap { std::mutex sock_lock; std::vector> new_sockets; - auto startupFunc = [&](std::shared_ptr barrier){ + auto startupFunc = [&](std::shared_ptr> barrier){ auto socket = folly::AsyncServerSocket::newSocket(); sock_lock.lock(); new_sockets.push_back(socket); sock_lock.unlock(); socket->setReusePortEnabled(reusePort); socket->attachEventBase(EventBaseManager::get()->getEventBase()); - socket->bind(port); - // TODO Take ServerSocketConfig - socket->listen(1024); + if (port >= 0) { + socket->bind(port); + } else { + socket->bind(address); + port = address.getPort(); + } + socket->listen(socketConfig.acceptBacklog); socket->startAccepting(); if (port == 0) { @@ -170,18 +212,18 @@ class ServerBootstrap { port = address.getPort(); } - barrier->wait(); + barrier->post(); }; - auto bind0 = std::make_shared(2); - acceptor_group_->add(std::bind(startupFunc, bind0)); - bind0->wait(); + auto wait0 = std::make_shared>(); + acceptor_group_->add(std::bind(startupFunc, wait0)); + wait0->wait(); - auto barrier = std::make_shared(acceptor_group_->numThreads()); for (size_t i = 1; i < acceptor_group_->numThreads(); i++) { + auto barrier = std::make_shared>(); acceptor_group_->add(std::bind(startupFunc, barrier)); + barrier->wait(); } - barrier->wait(); // Startup all the threads for(auto socket : new_sockets) { @@ -201,34 +243,49 @@ class ServerBootstrap { * Stop listening on all sockets. */ void stop() { - auto barrier = std::make_shared(sockets_.size() + 1); for (auto socket : sockets_) { - socket->getEventBase()->runInEventBaseThread([barrier, socket]() { + folly::Baton<> barrier; + socket->getEventBase()->runInEventBaseThread([&barrier, socket]() { socket->stopAccepting(); socket->detachEventBase(); - barrier->wait(); + barrier.post(); }); + barrier.wait(); } - barrier->wait(); sockets_.clear(); - acceptor_group_->join(); - io_group_->join(); + if (acceptor_group_) { + acceptor_group_->join(); + } + if (io_group_) { + io_group_->join(); + } } /* * Get the list of listening sockets */ - std::vector>& - getSockets() { + const std::vector>& + getSockets() const { return sockets_; } + std::shared_ptr getIOGroup() const { + return io_group_; + } + + template + void forEachWorker(F&& f) const { + workerFactory_->forEachWorker(f); + } + + ServerSocketConfig socketConfig; + private: std::shared_ptr acceptor_group_; std::shared_ptr io_group_; - std::shared_ptr workerFactory_; + std::shared_ptr workerFactory_; std::vector> sockets_; std::shared_ptr acceptorFactory_; -- 2.34.1