2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <folly/Baton.h>
24 * ServerBootstrap is a parent class intended to set up a
25 * high-performance TCP accepting server. It will manage a pool of
26 * accepting threads, any number of accepting sockets, a pool of
27 * IO-worker threads, and connection pool for each IO thread for you.
29 * The output is given as a ChannelPipeline template: given a
30 * PipelineFactory, it will create a new pipeline for each connection,
31 * and your server can handle the incoming bytes.
33 * BACKWARDS COMPATIBLITY: for servers already taking a pool of
34 * Acceptor objects, an AcceptorFactory can be given directly instead
35 * of a pipeline factory.
37 template <typename Pipeline>
38 class ServerBootstrap {
46 * If there is any work to be done BEFORE handing the work to IO
47 * threads, this handler is where the pipeline to do it would be
50 * This could be used for things like logging, load balancing, or
51 * advanced load balancing on IO threads. Netty also provides this.
53 ServerBootstrap* handler() {
58 * BACKWARDS COMPATIBILITY - an acceptor factory can be set. Your
59 * Acceptor is responsible for managing the connection pool.
61 * @param childHandler - acceptor factory to call for each IO thread
63 ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
64 acceptorFactory_ = childHandler;
69 * Set a pipeline factory that will be called for each new connection
71 * @param factory pipeline factory to use for each new connection
73 ServerBootstrap* childPipeline(
74 std::shared_ptr<PipelineFactory<Pipeline>> factory) {
75 pipelineFactory_ = factory;
80 * Set the IO executor. If not set, a default one will be created
81 * with one thread per core.
83 * @param io_group - io executor to use for IO threads.
85 ServerBootstrap* group(
86 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
87 return group(nullptr, io_group);
91 * Set the acceptor executor, and IO executor.
93 * If no acceptor executor is set, a single thread will be created for accepts
94 * If no IO executor is set, a default of one thread per core will be created
96 * @param group - acceptor executor to use for acceptor threads.
97 * @param io_group - io executor to use for IO threads.
99 ServerBootstrap* group(
100 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
101 std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
103 accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
104 1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
107 io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
108 32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
111 CHECK(acceptorFactory_ || pipelineFactory_);
113 if (acceptorFactory_) {
114 workerFactory_ = std::make_shared<ServerWorkerPool>(
115 acceptorFactory_, io_group.get(), &sockets_);
117 workerFactory_ = std::make_shared<ServerWorkerPool>(
118 std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_),
119 io_group.get(), &sockets_);
122 io_group->addObserver(workerFactory_);
124 acceptor_group_ = accept_group;
125 io_group_ = io_group;
131 * Bind to an existing socket
133 * @param sock Existing socket to use for accepting
135 void bind(folly::AsyncServerSocket::UniquePtr s) {
136 if (!workerFactory_) {
140 // Since only a single socket is given,
141 // we can only accept on a single thread
142 CHECK(acceptor_group_->numThreads() == 1);
143 std::shared_ptr<folly::AsyncServerSocket> socket(
144 s.release(), DelayedDestruction::Destructor());
146 folly::Baton<> barrier;
147 acceptor_group_->add([&](){
148 socket->attachEventBase(EventBaseManager::get()->getEventBase());
149 socket->listen(1024);
150 socket->startAccepting();
155 // Startup all the threads
156 workerFactory_->forEachWorker([this, socket](Acceptor* worker){
157 socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
158 socket->addAcceptCallback(worker, worker->getEventBase());
162 sockets_.push_back(socket);
165 void bind(folly::SocketAddress address) {
166 bindImpl(-1, address);
170 * Bind to a port and start listening.
171 * One of childPipeline or childHandler must be called before bind
173 * @param port Port to listen on
175 void bind(int port) {
177 bindImpl(port, folly::SocketAddress());
180 void bindImpl(int port, folly::SocketAddress address) {
181 if (!workerFactory_) {
185 bool reusePort = false;
186 if (acceptor_group_->numThreads() > 1) {
190 std::mutex sock_lock;
191 std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
193 auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
194 auto socket = folly::AsyncServerSocket::newSocket();
196 new_sockets.push_back(socket);
198 socket->setReusePortEnabled(reusePort);
199 socket->attachEventBase(EventBaseManager::get()->getEventBase());
203 socket->bind(address);
204 port = address.getPort();
206 socket->listen(socketConfig.acceptBacklog);
207 socket->startAccepting();
210 SocketAddress address;
211 socket->getAddress(&address);
212 port = address.getPort();
218 auto wait0 = std::make_shared<folly::Baton<>>();
219 acceptor_group_->add(std::bind(startupFunc, wait0));
222 for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
223 auto barrier = std::make_shared<folly::Baton<>>();
224 acceptor_group_->add(std::bind(startupFunc, barrier));
228 // Startup all the threads
229 for(auto socket : new_sockets) {
230 workerFactory_->forEachWorker([this, socket](Acceptor* worker){
231 socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
232 socket->addAcceptCallback(worker, worker->getEventBase());
237 for (auto& socket : new_sockets) {
238 sockets_.push_back(socket);
243 * Stop listening on all sockets.
246 for (auto socket : sockets_) {
247 folly::Baton<> barrier;
248 socket->getEventBase()->runInEventBaseThread([&barrier, socket]() {
249 socket->stopAccepting();
250 socket->detachEventBase();
257 if (acceptor_group_) {
258 acceptor_group_->join();
266 * Get the list of listening sockets
268 const std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
273 std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
277 template <typename F>
278 void forEachWorker(F&& f) const {
279 workerFactory_->forEachWorker(f);
282 ServerSocketConfig socketConfig;
285 std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
286 std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
288 std::shared_ptr<ServerWorkerPool> workerFactory_;
289 std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
291 std::shared_ptr<AcceptorFactory> acceptorFactory_;
292 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;