dd5cbc566deaef9c9a99e8e7a9b2c750233ad63e
[folly.git] / folly / wangle / bootstrap / ServerBootstrap.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <boost/thread.hpp>
20
21 namespace folly {
22
23 /*
24  * ServerBootstrap is a parent class intended to set up a
25  * high-performance TCP accepting server.  It will manage a pool of
26  * accepting threads, any number of accepting sockets, a pool of
27  * IO-worker threads, and connection pool for each IO thread for you.
28  *
29  * The output is given as a ChannelPipeline template: given a
30  * PipelineFactory, it will create a new pipeline for each connection,
31  * and your server can handle the incoming bytes.
32  *
33  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
34  * Acceptor objects, an AcceptorFactory can be given directly instead
35  * of a pipeline factory.
36  */
37 template <typename Pipeline>
38 class ServerBootstrap {
39  public:
40   /* TODO(davejwatson)
41    *
42    * If there is any work to be done BEFORE handing the work to IO
43    * threads, this handler is where the pipeline to do it would be
44    * set.
45    *
46    * This could be used for things like logging, load balancing, or
47    * advanced load balancing on IO threads.  Netty also provides this.
48    */
49   ServerBootstrap* handler() {
50     return this;
51   }
52
53   /*
54    * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
55    * Acceptor is responsible for managing the connection pool.
56    *
57    * @param childHandler - acceptor factory to call for each IO thread
58    */
59   ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
60     acceptorFactory_ = childHandler;
61     return this;
62   }
63
64   /*
65    * Set a pipeline factory that will be called for each new connection
66    *
67    * @param factory pipeline factory to use for each new connection
68    */
69   ServerBootstrap* childPipeline(
70       std::shared_ptr<PipelineFactory<Pipeline>> factory) {
71     pipelineFactory_ = factory;
72     return this;
73   }
74
75   /*
76    * Set the IO executor.  If not set, a default one will be created
77    * with one thread per core.
78    *
79    * @param io_group - io executor to use for IO threads.
80    */
81   ServerBootstrap* group(
82       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
83     return group(nullptr, io_group);
84   }
85
86   /*
87    * Set the acceptor executor, and IO executor.
88    *
89    * If no acceptor executor is set, a single thread will be created for accepts
90    * If no IO executor is set, a default of one thread per core will be created
91    *
92    * @param group - acceptor executor to use for acceptor threads.
93    * @param io_group - io executor to use for IO threads.
94    */
95   ServerBootstrap* group(
96       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
97       std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
98     if (!accept_group) {
99       accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
100         1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
101     }
102     if (!io_group) {
103       io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
104         32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
105     }
106     auto factoryBase = io_group->getThreadFactory();
107     CHECK(factoryBase);
108     auto factory = std::dynamic_pointer_cast<folly::wangle::NamedThreadFactory>(
109       factoryBase);
110     CHECK(factory); // Must be named thread factory
111
112     CHECK(acceptorFactory_ || pipelineFactory_);
113
114     if (acceptorFactory_) {
115       workerFactory_ = std::make_shared<ServerWorkerFactory>(
116         acceptorFactory_);
117     } else {
118       workerFactory_ = std::make_shared<ServerWorkerFactory>(
119         std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_));
120     }
121     workerFactory_->setInternalFactory(factory);
122
123     acceptor_group_ = accept_group;
124     io_group_ = io_group;
125
126     auto numThreads = io_group_->numThreads();
127     io_group_->setNumThreads(0);
128     io_group_->setThreadFactory(workerFactory_);
129     io_group_->setNumThreads(numThreads);
130
131     return this;
132   }
133
134   /*
135    * Bind to a port and start listening.
136    * One of childPipeline or childHandler must be called before bind
137    *
138    * @param port Port to listen on
139    */
140   void bind(int port) {
141     // TODO take existing socket
142
143     if (!workerFactory_) {
144       group(nullptr);
145     }
146
147     bool reusePort = false;
148     if (acceptor_group_->numThreads() > 1) {
149       reusePort = true;
150     }
151
152     std::mutex sock_lock;
153     std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
154
155     auto startupFunc = [&](std::shared_ptr<boost::barrier> barrier){
156         auto socket = folly::AsyncServerSocket::newSocket();
157         sock_lock.lock();
158         new_sockets.push_back(socket);
159         sock_lock.unlock();
160         socket->setReusePortEnabled(reusePort);
161         socket->attachEventBase(EventBaseManager::get()->getEventBase());
162         socket->bind(port);
163         // TODO Take ServerSocketConfig
164         socket->listen(1024);
165         socket->startAccepting();
166
167         if (port == 0) {
168           SocketAddress address;
169           socket->getAddress(&address);
170           port = address.getPort();
171         }
172
173         barrier->wait();
174     };
175
176     auto bind0 = std::make_shared<boost::barrier>(2);
177     acceptor_group_->add(std::bind(startupFunc, bind0));
178     bind0->wait();
179
180     auto barrier = std::make_shared<boost::barrier>(acceptor_group_->numThreads());
181     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
182       acceptor_group_->add(std::bind(startupFunc, barrier));
183     }
184     barrier->wait();
185
186     // Startup all the threads
187     for(auto socket : new_sockets) {
188       workerFactory_->forEachWorker([this, socket](Acceptor* worker){
189         socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
190           socket->addAcceptCallback(worker, worker->getEventBase());
191         });
192       });
193     }
194
195     for (auto& socket : new_sockets) {
196       sockets_.push_back(socket);
197     }
198   }
199
200   /*
201    * Stop listening on all sockets.
202    */
203   void stop() {
204     auto barrier = std::make_shared<boost::barrier>(sockets_.size() + 1);
205     for (auto socket : sockets_) {
206       socket->getEventBase()->runInEventBaseThread([barrier, socket]() {
207         socket->stopAccepting();
208         socket->detachEventBase();
209         barrier->wait();
210       });
211     }
212     barrier->wait();
213     sockets_.clear();
214
215     acceptor_group_->join();
216     io_group_->join();
217   }
218
219   /*
220    * Get the list of listening sockets
221    */
222   std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
223   getSockets() {
224     return sockets_;
225   }
226
227  private:
228   std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
229   std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
230
231   std::shared_ptr<ServerWorkerFactory> workerFactory_;
232   std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
233
234   std::shared_ptr<AcceptorFactory> acceptorFactory_;
235   std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
236 };
237
238 } // namespace