From: Dave Watson Date: Tue, 28 Oct 2014 21:46:19 +0000 (-0700) Subject: server bootstrap X-Git-Tag: v0.22.0~160 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=1a568d03206e2b35f24cb01163ff742783994c2e;p=folly.git server bootstrap Summary: ServerBootstrap a la netty. This should be enough for some refactoring of thrift server and proxygen servers - but there are still lots of TODOs left to do Test Plan: Unittests included Depends on D1638358 Reviewed By: jsedgwick@fb.com Subscribers: trunkagent, doug, fugalh, alandau, bmatheny, mshneer, jsedgwick, afrind, dcsommer FB internal diff: D1649521 Tasks: 5488516 Signature: t1:1649521:1416256073:fc003fd471bdfd137160dd6d7befd933ee8addd2 --- diff --git a/folly/experimental/wangle/acceptor/Acceptor.cpp b/folly/experimental/wangle/acceptor/Acceptor.cpp index 534e6f4f..bd9c67f1 100644 --- a/folly/experimental/wangle/acceptor/Acceptor.cpp +++ b/folly/experimental/wangle/acceptor/Acceptor.cpp @@ -188,14 +188,16 @@ Acceptor::init(AsyncServerSocket* serverSocket, downstreamConnectionManager_ = ConnectionManager::makeUnique( eventBase, accConfig_.connectionIdleTimeout, this); - serverSocket->addAcceptCallback(this, eventBase); - // SO_KEEPALIVE is the only setting that is inherited by accepted - // connections so only apply this setting - for (const auto& option: socketOptions_) { - if (option.first.level == SOL_SOCKET && - option.first.optname == SO_KEEPALIVE && option.second == 1) { - serverSocket->setKeepAliveEnabled(true); - break; + if (serverSocket) { + serverSocket->addAcceptCallback(this, eventBase); + // SO_KEEPALIVE is the only setting that is inherited by accepted + // connections so only apply this setting + for (const auto& option: socketOptions_) { + if (option.first.level == SOL_SOCKET && + option.first.optname == SO_KEEPALIVE && option.second == 1) { + serverSocket->setKeepAliveEnabled(true); + break; + } } } } diff --git a/folly/experimental/wangle/acceptor/Acceptor.h b/folly/experimental/wangle/acceptor/Acceptor.h index 69597018..404425e7 100644 --- a/folly/experimental/wangle/acceptor/Acceptor.h +++ b/folly/experimental/wangle/acceptor/Acceptor.h @@ -98,7 +98,7 @@ class Acceptor : /** * Access the Acceptor's event base. */ - EventBase* getEventBase() { return base_; } + virtual EventBase* getEventBase() const { return base_; } /** * Access the Acceptor's downstream (client-side) ConnectionManager @@ -173,6 +173,12 @@ class Acceptor : std::chrono::steady_clock::time_point acceptTime ) noexcept; + /** + * Drains all open connections of their outstanding transactions. When + * a connection's transaction count reaches zero, the connection closes. + */ + void drainAllConnections(); + protected: friend class AcceptorHandshakeHelper; @@ -239,11 +245,7 @@ class Acceptor : */ void dropAllConnections(); - /** - * Drains all open connections of their outstanding transactions. When - * a connection's transaction count reaches zero, the connection closes. - */ - void drainAllConnections(); + protected: /** * onConnectionsDrained() will be called once all connections have been @@ -335,4 +337,10 @@ class Acceptor : std::shared_ptr cacheProvider_; }; +class AcceptorFactory { + public: + virtual std::shared_ptr newAcceptor() = 0; + virtual ~AcceptorFactory() = default; +}; + } // namespace diff --git a/folly/experimental/wangle/bootstrap/BootstrapTest.cpp b/folly/experimental/wangle/bootstrap/BootstrapTest.cpp new file mode 100644 index 00000000..b8461b45 --- /dev/null +++ b/folly/experimental/wangle/bootstrap/BootstrapTest.cpp @@ -0,0 +1,120 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/experimental/wangle/bootstrap/ServerBootstrap.h" +#include "folly/experimental/wangle/bootstrap/ClientBootstrap.h" +#include "folly/experimental/wangle/channel/ChannelHandler.h" + +#include +#include + +using namespace folly::wangle; +using namespace folly; + +typedef ChannelHandlerAdapter BytesPassthrough; +typedef ChannelPipeline Pipeline; + +class TestServer : public ServerBootstrap { + Pipeline* newPipeline(std::shared_ptr) { + return nullptr; + } +}; + +class TestClient : public ClientBootstrap { + Pipeline* newPipeline(std::shared_ptr sock) { + CHECK(sock->good()); + + // We probably aren't connected immedately, check after a small delay + EventBaseManager::get()->getEventBase()->runAfterDelay([sock](){ + CHECK(sock->readable()); + }, 100); + return nullptr; + } +}; + +class TestPipelineFactory : public PipelineFactory { + public: + Pipeline* newPipeline(std::shared_ptr sock) { + pipelines++; + return new Pipeline(BytesPassthrough()); + } + std::atomic pipelines{0}; +}; + +TEST(Bootstrap, Basic) { + TestServer server; + TestClient client; +} + +TEST(Bootstrap, ServerWithPipeline) { + TestServer server; + server.childPipeline(std::make_shared()); + server.bind(0); + auto base = EventBaseManager::get()->getEventBase(); + base->runAfterDelay([&](){ + server.stop(); + }, 500); + base->loop(); +} + +TEST(Bootstrap, ClientServerTest) { + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.bind(0); + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.connect(address); + base->runAfterDelay([&](){ + server.stop(); + }, 500); + base->loop(); + + CHECK(factory->pipelines == 1); +} + +TEST(Bootstrap, ClientConnectionManagerTest) { + // Create a single IO thread, and verify that + // client connections are pooled properly + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.group(std::make_shared(1)); + server.bind(0); + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.connect(address); + + TestClient client2; + client2.connect(address); + + base->runAfterDelay([&](){ + server.stop(); + }, 500); + + base->loop(); + + CHECK(factory->pipelines == 2); +} diff --git a/folly/experimental/wangle/bootstrap/ClientBootstrap.h b/folly/experimental/wangle/bootstrap/ClientBootstrap.h new file mode 100644 index 00000000..dadbf5c5 --- /dev/null +++ b/folly/experimental/wangle/bootstrap/ClientBootstrap.h @@ -0,0 +1,54 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { + +/* + * A thin wrapper around ChannelPipeline and AsyncSocket to match + * ServerBootstrap. On connect() a new pipeline is created. + */ +template +class ClientBootstrap { + public: + ClientBootstrap() { + } + ClientBootstrap* bind(int port) { + port_ = port; + return this; + } + ClientBootstrap* connect(SocketAddress address) { + pipeline_.reset( + newPipeline( + AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address) + )); + return this; + } + + virtual ~ClientBootstrap() {} + + protected: + std::unique_ptr pipeline_; + + int port_; + + virtual Pipeline* newPipeline(std::shared_ptr socket) = 0; +}; + +} // namespace diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h b/folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h new file mode 100644 index 00000000..7268e2a7 --- /dev/null +++ b/folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h @@ -0,0 +1,134 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +namespace folly { + +template +class ServerAcceptor : public Acceptor { + typedef std::unique_ptr PipelinePtr; + + class ServerConnection : public wangle::ManagedConnection { + public: + explicit ServerConnection(PipelinePtr pipeline) + : pipeline_(std::move(pipeline)) {} + + ~ServerConnection() { + } + + void timeoutExpired() noexcept { + } + + void describe(std::ostream& os) const {} + bool isBusy() const { + return false; + } + void notifyPendingShutdown() {} + void closeWhenIdle() {} + void dropConnection() {} + void dumpConnectionState(uint8_t loglevel) {} + private: + PipelinePtr pipeline_; + }; + + public: + explicit ServerAcceptor( + std::shared_ptr> pipelineFactory) + : Acceptor(ServerSocketConfig()) + , pipelineFactory_(pipelineFactory) { + Acceptor::init(nullptr, &base_); + } + + /* See Acceptor::onNewConnection for details */ + void onNewConnection( + AsyncSocket::UniquePtr transport, const SocketAddress* address, + const std::string& nextProtocolName, const TransportInfo& tinfo) { + + std::unique_ptr + pipeline(pipelineFactory_->newPipeline( + std::shared_ptr( + transport.release(), + folly::DelayedDestruction::Destructor()))); + auto connection = new ServerConnection(std::move(pipeline)); + Acceptor::addConnection(connection); + } + + ~ServerAcceptor() { + Acceptor::dropAllConnections(); + } + + private: + EventBase base_; + + std::shared_ptr> pipelineFactory_; +}; + +template +class ServerAcceptorFactory : public AcceptorFactory { + public: + explicit ServerAcceptorFactory( + std::shared_ptr> factory) + : factory_(factory) {} + + std::shared_ptr newAcceptor() { + return std::make_shared>(factory_); + } + private: + std::shared_ptr> factory_; +}; + +class ServerWorkerFactory : public folly::wangle::ThreadFactory { + public: + explicit ServerWorkerFactory(std::shared_ptr acceptorFactory) + : internalFactory_( + std::make_shared("BootstrapWorker")) + , acceptorFactory_(acceptorFactory) + {} + virtual std::thread newThread(folly::wangle::Func&& func) override; + + void setInternalFactory( + std::shared_ptr internalFactory); + void setNamePrefix(folly::StringPiece prefix); + + template + void forEachWorker(F&& f); + + private: + std::shared_ptr internalFactory_; + folly::RWSpinLock workersLock_; + std::map> workers_; + int32_t nextWorkerId_{0}; + + std::shared_ptr acceptorFactory_; +}; + +template +void ServerWorkerFactory::forEachWorker(F&& f) { + folly::RWSpinLock::ReadHolder guard(workersLock_); + for (const auto& kv : workers_) { + f(kv.second.get()); + } +} + +} // namespace diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp b/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp new file mode 100644 index 00000000..17e4bb86 --- /dev/null +++ b/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp @@ -0,0 +1,54 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +namespace folly { + +std::thread ServerWorkerFactory::newThread( + folly::wangle::Func&& func) { + return internalFactory_->newThread([=](){ + auto id = nextWorkerId_++; + auto worker = acceptorFactory_->newAcceptor(); + { + folly::RWSpinLock::WriteHolder guard(workersLock_); + workers_.insert({id, worker}); + } + EventBaseManager::get()->setEventBase(worker->getEventBase(), false); + func(); + EventBaseManager::get()->clearEventBase(); + + worker->drainAllConnections(); + { + folly::RWSpinLock::WriteHolder guard(workersLock_); + workers_.erase(id); + } + }); +} + +void ServerWorkerFactory::setInternalFactory( + std::shared_ptr internalFactory) { + CHECK(workers_.empty()); + internalFactory_ = internalFactory; +} + +void ServerWorkerFactory::setNamePrefix(folly::StringPiece prefix) { + CHECK(workers_.empty()); + internalFactory_->setNamePrefix(prefix); +} + +} // namespace diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap.h b/folly/experimental/wangle/bootstrap/ServerBootstrap.h new file mode 100644 index 00000000..8f29cb52 --- /dev/null +++ b/folly/experimental/wangle/bootstrap/ServerBootstrap.h @@ -0,0 +1,191 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { + +/* + * ServerBootstrap is a parent class intended to set up a + * high-performance TCP accepting server. It will manage a pool of + * accepting threads, any number of accepting sockets, a pool of + * IO-worker threads, and connection pool for each IO thread for you. + * + * The output is given as a ChannelPipeline template: given a + * PipelineFactory, it will create a new pipeline for each connection, + * and your server can handle the incoming bytes. + * + * BACKWARDS COMPATIBLITY: for servers already taking a pool of + * Acceptor objects, an AcceptorFactory can be given directly instead + * of a pipeline factory. + */ +template +class ServerBootstrap { + public: + /* TODO(davejwatson) + * + * If there is any work to be done BEFORE handing the work to IO + * threads, this handler is where the pipeline to do it would be + * set. + * + * This could be used for things like logging, load balancing, or + * advanced load balancing on IO threads. Netty also provides this. + */ + ServerBootstrap* handler() { + return this; + } + + /* + * BACKWARDS COMPATIBILITY - an acceptor factory can be set. Your + * Acceptor is responsible for managing the connection pool. + * + * @param childHandler - acceptor factory to call for each IO thread + */ + ServerBootstrap* childHandler(std::shared_ptr childHandler) { + acceptorFactory_ = childHandler; + return this; + } + + /* + * Set a pipeline factory that will be called for each new connection + * + * @param factory pipeline factory to use for each new connection + */ + ServerBootstrap* childPipeline( + std::shared_ptr> factory) { + pipelineFactory_ = factory; + return this; + } + + /* + * Set the IO executor. If not set, a default one will be created + * with one thread per core. + * + * @param io_group - io executor to use for IO threads. + */ + ServerBootstrap* group( + std::shared_ptr io_group) { + return group(nullptr, io_group); + } + + /* + * Set the acceptor executor, and IO executor. + * + * If no acceptor executor is set, a single thread will be created for accepts + * If no IO executor is set, a default of one thread per core will be created + * + * @param group - acceptor executor to use for acceptor threads. + * @param io_group - io executor to use for IO threads. + */ + ServerBootstrap* group( + std::shared_ptr accept_group, + std::shared_ptr io_group) { + if (!accept_group) { + accept_group = std::make_shared( + 1, std::make_shared("Acceptor Thread")); + } + if (!io_group) { + io_group = std::make_shared( + 32, std::make_shared("IO Thread")); + } + auto factory = io_group->getThreadFactory(); + + //CHECK(factory == nullptr); // TODO + + CHECK(acceptorFactory_ || pipelineFactory_); + + if (acceptorFactory_) { + workerFactory_ = std::make_shared( + acceptorFactory_); + } else { + workerFactory_ = std::make_shared( + std::make_shared>(pipelineFactory_)); + } + + acceptor_group_ = accept_group; + io_group_ = io_group; + + auto numThreads = io_group_->numThreads(); + io_group_->setNumThreads(0); + io_group_->setThreadFactory(workerFactory_); + io_group_->setNumThreads(numThreads); + + return this; + } + + /* + * Bind to a port and start listening. + * One of childPipeline or childHandler must be called before bind + * + * @param port Port to listen on + */ + void bind(int port) { + // TODO bind to v4 and v6 + // TODO take existing socket + // TODO use the acceptor thread + auto socket = folly::AsyncServerSocket::newSocket( + EventBaseManager::get()->getEventBase()); + sockets_.push_back(socket); + socket->bind(port); + + // TODO Take ServerSocketConfig + socket->listen(1024); + + if (!workerFactory_) { + group(nullptr); + } + + // Startup all the threads + workerFactory_->forEachWorker([this, socket](Acceptor* worker){ + socket->getEventBase()->runInEventBaseThread([this, worker, socket](){ + socket->addAcceptCallback(worker, worker->getEventBase()); + }); + }); + socket->startAccepting(); + } + + /* + * Stop listening on all sockets. + */ + void stop() { + for (auto& socket : sockets_) { + socket->stopAccepting(); + } + acceptor_group_->join(); + io_group_->join(); + } + + /* + * Get the list of listening sockets + */ + std::vector>& + getSockets() { + return sockets_; + } + + private: + std::shared_ptr acceptor_group_; + std::shared_ptr io_group_; + + std::shared_ptr workerFactory_; + std::vector> sockets_; + + std::shared_ptr acceptorFactory_; + std::shared_ptr> pipelineFactory_; +}; + +} // namespace diff --git a/folly/experimental/wangle/channel/ChannelPipeline.h b/folly/experimental/wangle/channel/ChannelPipeline.h index 89212df5..de80a856 100644 --- a/folly/experimental/wangle/channel/ChannelPipeline.h +++ b/folly/experimental/wangle/channel/ChannelPipeline.h @@ -354,3 +354,16 @@ class ChannelPipeline }; }} + +namespace folly { + +class AsyncSocket; + +template +class PipelineFactory { + public: + virtual Pipeline* newPipeline(std::shared_ptr) = 0; + virtual ~PipelineFactory() {} +}; + +} diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index b1a3dd38..52c7d1f7 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -50,6 +50,10 @@ class ThreadPoolExecutor : public Executor { threadFactory_ = std::move(threadFactory); } + std::shared_ptr getThreadFactory(void) { + return threadFactory_; + } + size_t numThreads(); void setNumThreads(size_t numThreads); void stop();