2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <boost/thread.hpp>
24 * ServerBootstrap is a parent class intended to set up a
25 * high-performance TCP accepting server. It will manage a pool of
26 * accepting threads, any number of accepting sockets, a pool of
27 * IO-worker threads, and connection pool for each IO thread for you.
29 * The output is given as a ChannelPipeline template: given a
30 * PipelineFactory, it will create a new pipeline for each connection,
31 * and your server can handle the incoming bytes.
33 * BACKWARDS COMPATIBLITY: for servers already taking a pool of
34 * Acceptor objects, an AcceptorFactory can be given directly instead
35 * of a pipeline factory.
37 template <typename Pipeline>
38 class ServerBootstrap {
42 * If there is any work to be done BEFORE handing the work to IO
43 * threads, this handler is where the pipeline to do it would be
46 * This could be used for things like logging, load balancing, or
47 * advanced load balancing on IO threads. Netty also provides this.
49 ServerBootstrap* handler() {
54 * BACKWARDS COMPATIBILITY - an acceptor factory can be set. Your
55 * Acceptor is responsible for managing the connection pool.
57 * @param childHandler - acceptor factory to call for each IO thread
59 ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
60 acceptorFactory_ = childHandler;
65 * Set a pipeline factory that will be called for each new connection
67 * @param factory pipeline factory to use for each new connection
69 ServerBootstrap* childPipeline(
70 std::shared_ptr<PipelineFactory<Pipeline>> factory) {
71 pipelineFactory_ = factory;
76 * Set the IO executor. If not set, a default one will be created
77 * with one thread per core.
79 * @param io_group - io executor to use for IO threads.
81 ServerBootstrap* group(
82 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
83 return group(nullptr, io_group);
87 * Set the acceptor executor, and IO executor.
89 * If no acceptor executor is set, a single thread will be created for accepts
90 * If no IO executor is set, a default of one thread per core will be created
92 * @param group - acceptor executor to use for acceptor threads.
93 * @param io_group - io executor to use for IO threads.
95 ServerBootstrap* group(
96 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
97 std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
99 accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
100 1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
103 io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
104 32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
106 auto factoryBase = io_group->getThreadFactory();
108 auto factory = std::dynamic_pointer_cast<folly::wangle::NamedThreadFactory>(
110 CHECK(factory); // Must be named thread factory
112 CHECK(acceptorFactory_ || pipelineFactory_);
114 if (acceptorFactory_) {
115 workerFactory_ = std::make_shared<ServerWorkerFactory>(
118 workerFactory_ = std::make_shared<ServerWorkerFactory>(
119 std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_));
121 workerFactory_->setInternalFactory(factory);
123 acceptor_group_ = accept_group;
124 io_group_ = io_group;
126 auto numThreads = io_group_->numThreads();
127 io_group_->setNumThreads(0);
128 io_group_->setThreadFactory(workerFactory_);
129 io_group_->setNumThreads(numThreads);
135 * Bind to a port and start listening.
136 * One of childPipeline or childHandler must be called before bind
138 * @param port Port to listen on
140 void bind(int port) {
141 // TODO take existing socket
143 if (!workerFactory_) {
147 bool reusePort = false;
148 if (acceptor_group_->numThreads() > 1) {
152 std::mutex sock_lock;
153 std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
155 auto startupFunc = [&](std::shared_ptr<boost::barrier> barrier){
156 auto socket = folly::AsyncServerSocket::newSocket();
158 new_sockets.push_back(socket);
160 socket->setReusePortEnabled(reusePort);
161 socket->attachEventBase(EventBaseManager::get()->getEventBase());
163 // TODO Take ServerSocketConfig
164 socket->listen(1024);
165 socket->startAccepting();
168 SocketAddress address;
169 socket->getAddress(&address);
170 port = address.getPort();
176 auto bind0 = std::make_shared<boost::barrier>(2);
177 acceptor_group_->add(std::bind(startupFunc, bind0));
180 auto barrier = std::make_shared<boost::barrier>(acceptor_group_->numThreads());
181 for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
182 acceptor_group_->add(std::bind(startupFunc, barrier));
186 // Startup all the threads
187 for(auto socket : new_sockets) {
188 workerFactory_->forEachWorker([this, socket](Acceptor* worker){
189 socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
190 socket->addAcceptCallback(worker, worker->getEventBase());
195 for (auto& socket : new_sockets) {
196 sockets_.push_back(socket);
201 * Stop listening on all sockets.
204 auto barrier = std::make_shared<boost::barrier>(sockets_.size() + 1);
205 for (auto socket : sockets_) {
206 socket->getEventBase()->runInEventBaseThread([barrier, socket]() {
207 socket->stopAccepting();
208 socket->detachEventBase();
215 acceptor_group_->join();
220 * Get the list of listening sockets
222 std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
228 std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
229 std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
231 std::shared_ptr<ServerWorkerFactory> workerFactory_;
232 std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
234 std::shared_ptr<AcceptorFactory> acceptorFactory_;
235 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;